Compare commits

..

No commits in common. "master" and "1.0.0-b24" have entirely different histories.

17 changed files with 107 additions and 198 deletions

View File

@ -2,9 +2,8 @@ FROM python:3.7-alpine
MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com> MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com>
RUN apk update && \ RUN apk update && \
apk add py-pip libc-dev gcc apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \ RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask schedule numpy stomp.py python-json-logger pip install python-dotenv coinbase flask schedule numpy
COPY . /home/price-collector/. COPY . /home/price-collector/.
EXPOSE 9090 EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"] CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -42,24 +42,14 @@ spec:
resource: limits.memory resource: limits.memory
- name: DATABASE_URL - name: DATABASE_URL
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: dbGateway.url key: dbGateway.url
- name: AMQ_URL - name: DATABASE_PORT
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: amqStomp.url key: dbGateway.port
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
- name: COINBASE_KEY - name: COINBASE_KEY
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:
@ -72,7 +62,7 @@ spec:
key: coinbase.api.secret key: coinbase.api.secret
- name: DB_GATEWAY_URL - name: DB_GATEWAY_URL
valueFrom: valueFrom:
configMapKeyRef: secretKeyRef:
name: endpoints name: endpoints
key: dbGateway.url key: dbGateway.url
ports: ports:
@ -99,10 +89,10 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 25m cpu: 32m
memory: 32Mi memory: 32Mi
limits: limits:
cpu: 25m cpu: 75m
memory: 64Mi memory: 64Mi
securityContext: securityContext:
capabilities: capabilities:

View File

@ -28,7 +28,7 @@ try {
timestamps { timestamps {
node ("${env.SLAVE_LABEL}") { node ("${env.SLAVE_LABEL}") {
stage('Initialise') { stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]]) checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/price-collector.git']]])
env.APPLICATION_VERSION = get_application_version() env.APPLICATION_VERSION = get_application_version()
@ -41,11 +41,20 @@ try {
) { ) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}" sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login" sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster" sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
}
}
stage('Test Artifact') {
try {
// mvn 'verify -DskipUTs -DskipTests'
} finally {
// mvn 'test'
} }
} }
stage('Build Image') { stage('Build Image') {
// mvn 'clean package -DskipTests'
executeShellScript("configuration/scripts/mapVarsToConfigs.sh", executeShellScript("configuration/scripts/mapVarsToConfigs.sh",
env.DIGITAL_OCEAN, env.DIGITAL_OCEAN,
@ -69,6 +78,7 @@ try {
stage('Tag Repository') { stage('Tag Repository') {
withCredentials( withCredentials(
[usernamePassword( [usernamePassword(
credentialsId: env.GITHUB_CREDENTIALS_ID, credentialsId: env.GITHUB_CREDENTIALS_ID,

View File

@ -5,4 +5,6 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

View File

@ -6,8 +6,6 @@ sys.path.append('/home/price-collector/')
from threading import Thread from threading import Thread
from pricing.collector import collectorMain from pricing.collector import collectorMain
from src.utils.jsonLogger import setup_logging
from probes.probes import runFlaskProbes from probes.probes import runFlaskProbes
def callCollector(args): def callCollector(args):
@ -17,7 +15,6 @@ def callProbes():
runFlaskProbes() runFlaskProbes()
if __name__=='__main__': if __name__=='__main__':
setup_logging()
Thread(target=callProbes).start() Thread(target=callProbes).start()

View File

@ -12,19 +12,25 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
# from src.utils.databaseConnect import send from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json, uuid import logging as logger
from src.utils.jsonLogger import log logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type): def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, log) bitstampH, bitstampL = bitstampHighLow(type, logger)
krakenH, krakenL = krakenHighLow(type, log) krakenH, krakenL = krakenHighLow(type, logger)
bitstamp_P = (bitstampH + bitstampL)/2 bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2 kraken_P = (krakenH + krakenL)/2
@ -34,15 +40,15 @@ def averager(type):
averagePrice = round(averagePrice, 2) averagePrice = round(averagePrice, 2)
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO') logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
return averagePrice, timestamp return averagePrice, timestamp
def getVol(type): def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, log) bitV, bitA, bitB = bitstampVolAskBid(type, logger)
kV, kA, kB = krakenVolAskBid(type, log) kV, kA, kB = krakenVolAskBid(type, logger)
bV, bA, bB = bitfinexVolAskBid(type, log) bV, bA, bB = bitfinexVolAskBid(type, logger)
gV, gA, gB = geminiVolAskBid(type, log) gV, gA, gB = geminiVolAskBid(type, logger)
v_array = np.array([bitV, kV, bV, gV]) v_array = np.array([bitV, kV, bV, gV])
@ -51,8 +57,8 @@ def getVol(type):
return volume return volume
def getHighLow(type): def getHighLow(type):
kH, kL = krakenHighLow(type, log) kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, log) bH, bL = bitstampHighLow(type, logger)
h_array = np.array([kH, bH]) h_array = np.array([kH, bH])
l_array = np.array([kL, bL]) l_array = np.array([kL, bL])
@ -63,8 +69,8 @@ def getHighLow(type):
return high, low return high, low
def getOpenClose(type): def getOpenClose(type):
bO, bC = bitstampOpenClose(type, log) bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, log) kO, kC = krakenOpenClose(type, logger)
o_array = np.array([bO, kO]) o_array = np.array([bO, kO])
c_array = np.array([bC, kC]) c_array = np.array([bC, kC])
@ -74,57 +80,32 @@ def getOpenClose(type):
return open, close return open, close
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log) with open(btc_usd, 'r') as file:
data = file.read()
# if status != 200:
# log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
# log("With Response of : {}".format(response), 'ERR')
# else:
# log("Query executed successfully with Status = {}".format(status), 'INFO')
# log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
message = { query = data % (timestamp, type, round(av_price, 2),
"timestamp" : timestamp, round(high, 2),
"type" : c_type, round(low, 2),
"average_price" : av_price, round(o_price, 2),
"high_price" : high, round(c_price, 2),
"low_price" : low, round(vol, 2))
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
messageJson = json.dumps(message, indent = 4) logger.info("Query sending down to db-gateway -- ({})".format(query))
syncId = uuid.uuid4() status, response = send(query, logger)
log("Sending message to PricingSave queue", 'INFO', syncId) if status != 200:
log("Message: {}".format(message), 'INFO', syncId) logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
logger.error("With Response of : {}".format(response))
activeMQSender(messageJson, syncId) else:
logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response))
def timeFunction(): def timeFunction():
global time global time
@ -145,21 +126,20 @@ def collector(c_type):
global time global time
time = timeFunction() time = timeFunction()
print(time)
av_price, timestamp = averager(c_type) av_price, timestamp = averager(c_type)
vol = getVol(c_type) vol = getVol(c_type)
high, low = getHighLow(c_type) high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type) o_price, c_price = getOpenClose(c_type)
sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector") schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
# Dynamically Spin up Child process for each type wanting to track # Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type): def collectorMain(c_type):
log("Starting Historical Price Collector", 'INFO') logger.info("== Historical Price Collector ==")
collector(c_type) collector(c_type)

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def bitfinexPublicTicker(type, log): def bitfinexPublicTicker(type, logger):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -13,11 +13,11 @@ def bitfinexPublicTicker(type, log):
price = round(price, 2) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
log("Bitfinex Spot Price Error: {}".format(e), 'ERR') logger.error("Bitfinex Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price
def bitfinexVolAskBid(type, log): def bitfinexVolAskBid(type, logger):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -32,5 +32,5 @@ def bitfinexVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Bitfinex High Low Volume Error: {}".format(e), 'ERR') logger.error("Bitfinex High Low Volume Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def bitstampVolAskBid(type, log): def bitstampVolAskBid(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -14,10 +14,10 @@ def bitstampVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Bitstamp Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def bitstampOpenClose(type, log): def bitstampOpenClose(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -28,10 +28,10 @@ def bitstampOpenClose(type, log):
return open, close return open, close
except KeyError as e: except KeyError as e:
log("Bitstamp Open Close Error: {}".format(e), 'ERR') logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0 return 0, 0, 0
def bitstampHighLow(type, log): def bitstampHighLow(type, logger):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -42,5 +42,5 @@ def bitstampHighLow(type, log):
return high, low return high, low
except KeyError as e: except KeyError as e:
log("Bitstamp Open Close Error: {}".format(e), 'ERR') logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0 return 0, 0, 0

View File

@ -10,7 +10,7 @@ class keys():
self.api_key = os.getenv('COINBASE_KEY') self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET") self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type, log): def coinbasePublicTicker(type, logger):
api_key = keys().api_key api_key = keys().api_key
api_secret = keys().api_secret api_secret = keys().api_secret
@ -24,6 +24,6 @@ def coinbasePublicTicker(type, log):
price = round(price, 2) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
log("Coinbase Spot Price Error: {}".format(e), 'ERR') logger.error("Coinbase Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price

View File

@ -2,7 +2,7 @@
import requests, json, sys import requests, json, sys
def geminiPublicTicker(type, log): def geminiPublicTicker(type, logger):
try: try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -15,11 +15,11 @@ def geminiPublicTicker(type, log):
return price return price
except KeyError as e: except KeyError as e:
log("Gemini Spot Price Error: {}".format(e), 'ERR') logger.error("Gemini Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price
def geminiVolAskBid(type, log): def geminiVolAskBid(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -36,20 +36,20 @@ def geminiVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Gemini Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def geminiDailyOpenClose(type, log): def geminiDailyOpenClose(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
open = round(float(response['open']), 2) dailyOpen = round(float(response['open']), 2)
close = round(float(response['close']), 2) dailyClose = round(float(response['close']), 2)
return open, close return open, close
except KeyError as e: except KeyError as e:
log("Gemini Open Close Error: {}".format(e), 'ERR') logger.error("Gemini Open Close Error: {}".format(e))
sys.stdout.flush() sys.stdout.flush()
return 0, 0 return 0, 0

View File

@ -21,7 +21,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, log): def krakenVolAskBid(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -37,10 +37,10 @@ def krakenVolAskBid(type, log):
return vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR') logger.error("Kraken Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def krakenOpenClose(type, log): def krakenOpenClose(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -53,10 +53,10 @@ def krakenOpenClose(type, log):
return fopen, fclose return fopen, fclose
except KeyError as e: except KeyError as e:
log("Kraken Open Close Error: {}".format(e), 'ERR') logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def krakenHighLow(type, log): def krakenHighLow(type, logger):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -69,5 +69,5 @@ def krakenHighLow(type, log):
return fhigh, flow return fhigh, flow
except KeyError as e: except KeyError as e:
log("Kraken Open Close Error: {}".format(e), 'ERR') logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0 return 0, 0

View File

@ -6,6 +6,13 @@ import json
from pricing.exchanges.coinbase import coinbasePublicTicker from pricing.exchanges.coinbase import coinbasePublicTicker
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
app = Flask(__name__) app = Flask(__name__)
@app.route('/health') @app.route('/health')
@ -15,7 +22,7 @@ def health():
@app.route('/readiness') @app.route('/readiness')
def readiness(): def readiness():
# Can it make a call to an exchange? # Can it make a call to an exchange?
price = coinbasePublicTicker('btc_usd') price = coinbasePublicTicker('btc_usd', logger)
if price != 0 : if price != 0 :
return json.dumps({ return json.dumps({

View File

@ -0,0 +1 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

View File

@ -1,36 +0,0 @@
#!/usr/bin/env python
import stomp
import os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("AMQ_URL")
self.amqU = os.getenv("BROKER_USER")
self.amqP = os.getenv("BROKER_PASSWORD")
self.addr = self.uri.split(':')[0]
self.port = self.uri.split(':')[1]
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -2,14 +2,13 @@
import requests, os, json, sys import requests, os, json, sys
from src.utils.jsonLogger import log
class keys(): class keys():
def __init__(self): def __init__(self):
self.uri = os.getenv("DATABASE_URL") self.uri = os.getenv("DATABASE_URL")
self.port = os.getenv("DATABASE_PORT")
def send(query): def send(query, logger):
try: try:
uri = keys().uri + "/graphql" uri = keys().uri + "/graphql"
headers = {'Content-type': 'application/json'} headers = {'Content-type': 'application/json'}
@ -21,8 +20,8 @@ def send(query):
return statusCode, response return statusCode, response
except requests.exceptions.HTTPError as e: except requests.exceptions.HTTPError as e:
log("Unable to send data down to db-gateway: {}".format(e), 'ERR') logger.critical("Unable to send data down to db-gateway: {}".format(e))
sys.exit(1) sys.exit(1)
except requests.exceptions.RequestException as e: except requests.exceptions.RequestException as e:
log("Unable to send data down to db-gateway: {}".format(e), 'ERR') logger.critical("Unable to send data down to db-gateway: {}".format(e))
sys.exit(1) sys.exit(1)

View File

@ -1,40 +0,0 @@
#!/usr/bin/env python
import logging
from pythonjsonlogger import jsonlogger
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
def setup_logging(log_level='INFO'):
logger = logging.getLogger(__name__)
logger.propagate = 0
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})