Compare commits

..

No commits in common. "master" and "1.0.0-b16" have entirely different histories.

18 changed files with 113 additions and 209 deletions

View File

@ -2,9 +2,8 @@ FROM python:3.7-alpine
MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com> MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com>
RUN apk update && \ RUN apk update && \
apk add py-pip libc-dev gcc apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \ RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask schedule numpy stomp.py python-json-logger pip install python-dotenv coinbase flask schedule numpy
COPY . /home/price-collector/. COPY . /home/price-collector/.
EXPOSE 9090 EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"] CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -8,7 +8,7 @@ metadata:
name: RESOURCE_NAME name: RESOURCE_NAME
namespace: production namespace: production
spec: spec:
replicas: 1 replicas: 0
selector: selector:
matchLabels: matchLabels:
app: RESOURCE_NAME app: RESOURCE_NAME
@ -19,8 +19,6 @@ spec:
maxUnavailable: 0 maxUnavailable: 0
template: template:
metadata: metadata:
annotations:
linkerd.io/inject: enabled
labels: labels:
app: RESOURCE_NAME app: RESOURCE_NAME
spec: spec:
@ -42,24 +40,14 @@ spec:
resource: limits.memory resource: limits.memory
- name: DATABASE_URL - name: DATABASE_URL
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: dbGateway.url key: dbGateway.url
- name: AMQ_URL - name: DATABASE_PORT
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: amqStomp.url key: dbGateway.port
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
- name: COINBASE_KEY - name: COINBASE_KEY
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:
@ -72,7 +60,7 @@ spec:
key: coinbase.api.secret key: coinbase.api.secret
- name: DB_GATEWAY_URL - name: DB_GATEWAY_URL
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: dbGateway.url key: dbGateway.url
ports: ports:
@ -99,11 +87,11 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 25m cpu: 10m
memory: 32Mi memory: 16Mi
limits: limits:
cpu: 25m cpu: 25m
memory: 64Mi memory: 32Mi
securityContext: securityContext:
capabilities: capabilities:
add: add:

View File

@ -28,7 +28,7 @@ try {
timestamps { timestamps {
node ("${env.SLAVE_LABEL}") { node ("${env.SLAVE_LABEL}") {
stage('Initialise') { stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]]) checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/price-collector.git']]])
env.APPLICATION_VERSION = get_application_version() env.APPLICATION_VERSION = get_application_version()
@ -41,11 +41,20 @@ try {
) { ) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}" sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login" sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster" sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
}
}
stage('Test Artifact') {
try {
// mvn 'verify -DskipUTs -DskipTests'
} finally {
// mvn 'test'
} }
} }
stage('Build Image') { stage('Build Image') {
// mvn 'clean package -DskipTests'
executeShellScript("configuration/scripts/mapVarsToConfigs.sh", executeShellScript("configuration/scripts/mapVarsToConfigs.sh",
env.DIGITAL_OCEAN, env.DIGITAL_OCEAN,
@ -69,6 +78,7 @@ try {
stage('Tag Repository') { stage('Tag Repository') {
withCredentials( withCredentials(
[usernamePassword( [usernamePassword(
credentialsId: env.GITHUB_CREDENTIALS_ID, credentialsId: env.GITHUB_CREDENTIALS_ID,

View File

@ -5,4 +5,6 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

View File

@ -6,8 +6,6 @@ sys.path.append('/home/price-collector/')
from threading import Thread from threading import Thread
from pricing.collector import collectorMain from pricing.collector import collectorMain
from src.utils.jsonLogger import setup_logging
from probes.probes import runFlaskProbes from probes.probes import runFlaskProbes
def callCollector(args): def callCollector(args):
@ -17,7 +15,6 @@ def callProbes():
runFlaskProbes() runFlaskProbes()
if __name__=='__main__': if __name__=='__main__':
setup_logging()
Thread(target=callProbes).start() Thread(target=callProbes).start()

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def bitfinexPublicTicker(type, log): def bitfinexPublicTicker(type, logger):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -13,11 +13,11 @@ def bitfinexPublicTicker(type, log):
price = round(price, 2) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
log("Bitfinex Spot Price Error: {}".format(e), 'ERR') logger.error("Bitfinex Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price
def bitfinexVolAskBid(type, log): def bitfinexVolAskBid(type, logger):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -32,5 +32,5 @@ def bitfinexVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Bitfinex High Low Volume Error: {}".format(e), 'ERR') logger.error("Bitfinex High Low Volume Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def bitstampVolAskBid(type, log): def bitstampVolAskBid(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -14,10 +14,10 @@ def bitstampVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Bitstamp Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def bitstampOpenClose(type, log): def bitstampOpenClose(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -28,10 +28,10 @@ def bitstampOpenClose(type, log):
return open, close return open, close
except KeyError as e: except KeyError as e:
log("Bitstamp Open Close Error: {}".format(e), 'ERR') logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0 return 0, 0, 0
def bitstampHighLow(type, log): def bitstampHighLow(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -42,5 +42,5 @@ def bitstampHighLow(type, log):
return high, low return high, low
except KeyError as e: except KeyError as e:
log("Bitstamp Open Close Error: {}".format(e), 'ERR') logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0 return 0, 0, 0

View File

@ -10,7 +10,7 @@ class keys():
self.api_key = os.getenv('COINBASE_KEY') self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET") self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type, log): def coinbasePublicTicker(type, logger):
api_key = keys().api_key api_key = keys().api_key
api_secret = keys().api_secret api_secret = keys().api_secret
@ -24,6 +24,6 @@ def coinbasePublicTicker(type, log):
price = round(price, 2) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
log("Coinbase Spot Price Error: {}".format(e), 'ERR') logger.error("Coinbase Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price

View File

@ -7,24 +7,30 @@ import schedule
import numpy as np import numpy as np
from pricing.exchanges.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid from pricing.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose from pricing.coinbase import coinbasePublicTicker
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
# from src.utils.databaseConnect import send from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json, uuid import logging as logger
from src.utils.jsonLogger import log logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
def averager(type): def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, log) bitstampH, bitstampL = bitstampHighLow(type, logger)
krakenH, krakenL = krakenHighLow(type, log) krakenH, krakenL = krakenHighLow(type, logger)
bitstamp_P = (bitstampH + bitstampL)/2 bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2 kraken_P = (krakenH + krakenL)/2
@ -34,25 +40,23 @@ def averager(type):
averagePrice = round(averagePrice, 2) averagePrice = round(averagePrice, 2)
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO') logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
return averagePrice, timestamp return averagePrice, timestamp
def getVol(type): def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, log) bitV, bitA, bitB = bitstampVolAskBid(type, logger)
kV, kA, kB = krakenVolAskBid(type, log) kV, kA, kB = krakenVolAskBid(type, logger)
bV, bA, bB = bitfinexVolAskBid(type, log)
gV, gA, gB = geminiVolAskBid(type, log)
v_array = np.array([bitV, kV, bV, gV]) v_array = np.array([bitV, kV])
volume = v_array[np.nonzero(v_array)].mean() volume = v_array[np.nonzero(v_array)].mean()
return volume return volume
def getHighLow(type): def getHighLow(type):
kH, kL = krakenHighLow(type, log) kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, log) bH, bL = bitstampHighLow(type, logger)
h_array = np.array([kH, bH]) h_array = np.array([kH, bH])
l_array = np.array([kL, bL]) l_array = np.array([kL, bL])
@ -63,8 +67,8 @@ def getHighLow(type):
return high, low return high, low
def getOpenClose(type): def getOpenClose(type):
bO, bC = bitstampOpenClose(type, log) bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, log) kO, kC = krakenOpenClose(type, logger)
o_array = np.array([bO, kO]) o_array = np.array([bO, kO])
c_array = np.array([bC, kC]) c_array = np.array([bC, kC])
@ -74,57 +78,31 @@ def getOpenClose(type):
return open, close return open, close
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log) with open(btc_usd, 'r') as file:
data = file.read()
# if status != 200: timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN') type = '"'+c_type+'"'
# log("With Response of : {}".format(response), 'ERR')
# else:
# log("Query executed successfully with Status = {}".format(status), 'INFO')
# log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price): query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) logger.info("Query sending down to db-gateway -- ({})".format(query))
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { status, response = send(query, logger)
"timestamp" : timestamp,
"type" : c_type,
"average_price" : av_price,
"high_price" : high,
"low_price" : low,
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
messageJson = json.dumps(message, indent = 4) if status != 200:
logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
syncId = uuid.uuid4() logger.error("With Response of : {}".format(response))
else:
log("Sending message to PricingSave queue", 'INFO', syncId) logger.info("Query executed successfully with Status = {}".format(status))
log("Message: {}".format(message), 'INFO', syncId) logger.info("With Response of : {}".format(response))
activeMQSender(messageJson, syncId)
def timeFunction(): def timeFunction():
global time global time
@ -145,21 +123,20 @@ def collector(c_type):
global time global time
time = timeFunction() time = timeFunction()
print(time)
av_price, timestamp = averager(c_type) av_price, timestamp = averager(c_type)
vol = getVol(c_type) vol = getVol(c_type)
high, low = getHighLow(c_type) high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type) o_price, c_price = getOpenClose(c_type)
sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector") schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
# Dynamically Spin up Child process for each type wanting to track # Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type): def collectorMain(c_type):
log("Starting Historical Price Collector", 'INFO') logger.info("== Historical Price Collector ==")
collector(c_type) collector(c_type)

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def geminiPublicTicker(type, log): def geminiPublicTicker(type, logger):
try: try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -15,11 +15,11 @@ def geminiPublicTicker(type, log):
return price return price
except KeyError as e: except KeyError as e:
log("Gemini Spot Price Error: {}".format(e), 'ERR') logger.error("Gemini Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price
def geminiVolAskBid(type, log): def geminiVolAskBid(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -36,20 +36,20 @@ def geminiVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Gemini Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def geminiDailyOpenClose(type, log): def geminiDailyOpenClose(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
open = round(float(response['open']), 2) dailyOpen = round(float(response['open']), 2)
close = round(float(response['close']), 2) dailyClose = round(float(response['close']), 2)
return open, close return open, close
except KeyError as e: except KeyError as e:
log("Gemini Open Close Error: {}".format(e), 'ERR') logger.error("Gemini Open Close Error: {}".format(e))
sys.stdout.flush() sys.stdout.flush()
return 0, 0 return 0, 0

View File

@ -21,7 +21,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, log): def krakenVolAskBid(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -37,10 +37,10 @@ def krakenVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Kraken Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def krakenOpenClose(type, log): def krakenOpenClose(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -53,10 +53,10 @@ def krakenOpenClose(type, log):
return fopen, fclose return fopen, fclose
except KeyError as e: except KeyError as e:
log("Kraken Open Close Error: {}".format(e), 'ERR') logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def krakenHighLow(type, log): def krakenHighLow(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -69,5 +69,5 @@ def krakenHighLow(type, log):
return fhigh, flow return fhigh, flow
except KeyError as e: except KeyError as e:
log("Kraken Open Close Error: {}".format(e), 'ERR') logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0 return 0, 0

View File

@ -4,7 +4,14 @@ from flask import Flask
import json import json
from pricing.exchanges.coinbase import coinbasePublicTicker from pricing.coinbase import coinbasePublicTicker
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
app = Flask(__name__) app = Flask(__name__)
@ -15,7 +22,7 @@ def health():
@app.route('/readiness') @app.route('/readiness')
def readiness(): def readiness():
# Can it make a call to an exchange? # Can it make a call to an exchange?
price = coinbasePublicTicker('btc_usd') price = coinbasePublicTicker('btc_usd', logger)
if price != 0 : if price != 0 :
return json.dumps({ return json.dumps({

View File

@ -0,0 +1 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

View File

@ -1,36 +0,0 @@
#!/usr/bin/env python
import stomp
import os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("AMQ_URL")
self.amqU = os.getenv("BROKER_USER")
self.amqP = os.getenv("BROKER_PASSWORD")
self.addr = self.uri.split(':')[0]
self.port = self.uri.split(':')[1]
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -2,14 +2,13 @@
import requests, os, json, sys import requests, os, json, sys
from src.utils.jsonLogger import log
class keys(): class keys():
def __init__(self): def __init__(self):
self.uri = os.getenv("DATABASE_URL") self.uri = os.getenv("DATABASE_URL")
self.port = os.getenv("DATABASE_PORT")
def send(query): def send(query, logger):
try: try:
uri = keys().uri + "/graphql" uri = keys().uri + "/graphql"
headers = {'Content-type': 'application/json'} headers = {'Content-type': 'application/json'}
@ -21,8 +20,8 @@ def send(query):
return statusCode, response return statusCode, response
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
log("Unable to send data down to db-gateway: {}".format(e), 'ERR') logger.critical("Unable to send data down to db-gateway: {}".format(e))
sys.exit(1) sys.exit(1)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
log("Unable to send data down to db-gateway: {}".format(e), 'ERR') logger.critical("Unable to send data down to db-gateway: {}".format(e))
sys.exit(1) sys.exit(1)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import logging
from pythonjsonlogger import jsonlogger
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
def setup_logging(log_level='INFO'):
logger = logging.getLogger(__name__)
logger.propagate = 0
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})