[06.10.20] Structured logging for Kibana monitoring

This commit is contained in:
andrewso 2020-10-06 14:38:36 +01:00
parent bf662f2c75
commit 3fecb1b491
12 changed files with 110 additions and 68 deletions

View File

@ -6,6 +6,8 @@ sys.path.append('/home/price-collector/')
from threading import Thread
from pricing.collector import collectorMain
from src.utils.jsonLogger import setup_logging
from probes.probes import runFlaskProbes
def callCollector(args):
@ -15,6 +17,7 @@ def callProbes():
runFlaskProbes()
if __name__=='__main__':
setup_logging()
Thread(target=callProbes).start()

View File

@ -17,13 +17,7 @@ from src.utils.activemqConnect import activeMQSender
import json
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
@ -32,8 +26,8 @@ def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, logger)
krakenH, krakenL = krakenHighLow(type, logger)
bitstampH, bitstampL = bitstampHighLow(type)
krakenH, krakenL = krakenHighLow(type)
bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2
@ -43,15 +37,15 @@ def averager(type):
averagePrice = round(averagePrice, 2)
logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO')
return averagePrice, timestamp
def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, logger)
kV, kA, kB = krakenVolAskBid(type, logger)
bV, bA, bB = bitfinexVolAskBid(type, logger)
gV, gA, gB = geminiVolAskBid(type, logger)
bitV, bitA, bitB = bitstampVolAskBid(type)
kV, kA, kB = krakenVolAskBid(type)
bV, bA, bB = bitfinexVolAskBid(type)
gV, gA, gB = geminiVolAskBid(type)
v_array = np.array([bitV, kV, bV, gV])
@ -60,8 +54,8 @@ def getVol(type):
return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, logger)
kH, kL = krakenHighLow(type)
bH, bL = bitstampHighLow(type)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
@ -72,8 +66,8 @@ def getHighLow(type):
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, logger)
bO, bC = bitstampOpenClose(type)
kO, kC = krakenOpenClose(type)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
@ -99,16 +93,16 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
round(c_price, 2),
round(vol, 2))
logger.info("Query sending down to db-gateway -- ({})".format(query))
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
status, response = send(query, logger)
status, response = send(query, log)
if status != 200:
logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
logger.error("With Response of : {}".format(response))
log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
log("With Response of : {}".format(response), 'ERR')
else:
logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response))
log("Query executed successfully with Status = {}".format(status), 'INFO')
log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
@ -128,11 +122,9 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4)
# messageJson = str(message)
log("Sending message to PricingSave queue", 'INFO')
logger.info("Sending message to PricingSave queue")
activeMQSender(messageJson, logger)
activeMQSender(messageJson)
def timeFunction():
@ -168,7 +160,7 @@ def collector(c_type):
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
logger.info("== Historical Price Collector ==")
log("== Historical Price Collector ==", 'INFO')
collector(c_type)

View File

@ -2,7 +2,9 @@
import requests, json, sys
def bitfinexPublicTicker(type, logger):
from src.utils.jsonLogger import log
def bitfinexPublicTicker(type):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -13,11 +15,11 @@ def bitfinexPublicTicker(type, logger):
price = round(price, 2)
return price
except KeyError as e:
logger.error("Bitfinex Spot Price Error: {}".format(e))
log("Bitfinex Spot Price Error: {}".format(e), 'ERR')
price = 0
return price
def bitfinexVolAskBid(type, logger):
def bitfinexVolAskBid(type):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -32,5 +34,5 @@ def bitfinexVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Bitfinex High Low Volume Error: {}".format(e))
log("Bitfinex High Low Volume Error: {}".format(e), 'ERR')
return 0, 0, 0

View File

@ -2,7 +2,9 @@
import requests, json, sys
def bitstampVolAskBid(type, logger):
from src.utils.jsonLogger import log
def bitstampVolAskBid(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -14,10 +16,10 @@ def bitstampVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Bitstamp Volume Ask Bid Error: {}".format(e))
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def bitstampOpenClose(type, logger):
def bitstampOpenClose(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -28,10 +30,10 @@ def bitstampOpenClose(type, logger):
return open, close
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0
def bitstampHighLow(type, logger):
def bitstampHighLow(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -42,5 +44,5 @@ def bitstampHighLow(type, logger):
return high, low
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0

View File

@ -4,13 +4,15 @@ import sys, os
from coinbase.wallet.client import Client
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type, logger):
def coinbasePublicTicker(type):
api_key = keys().api_key
api_secret = keys().api_secret
@ -24,6 +26,6 @@ def coinbasePublicTicker(type, logger):
price = round(price, 2)
return price
except KeyError as e:
logger.error("Coinbase Spot Price Error: {}".format(e))
log("Coinbase Spot Price Error: {}".format(e), 'ERR')
price = 0
return price

View File

@ -2,7 +2,9 @@
import requests, json, sys
def geminiPublicTicker(type, logger):
from src.utils.jsonLogger import log
def geminiPublicTicker(type):
try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -15,11 +17,11 @@ def geminiPublicTicker(type, logger):
return price
except KeyError as e:
logger.error("Gemini Spot Price Error: {}".format(e))
log("Gemini Spot Price Error: {}".format(e), 'ERR')
price = 0
return price
def geminiVolAskBid(type, logger):
def geminiVolAskBid(type):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)
@ -36,20 +38,20 @@ def geminiVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Gemini Volume Ask Bid Error: {}".format(e))
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def geminiDailyOpenClose(type, logger):
def geminiDailyOpenClose(type):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)
response = json.loads(response.text)
dailyOpen = round(float(response['open']), 2)
dailyClose = round(float(response['close']), 2)
open = round(float(response['open']), 2)
close = round(float(response['close']), 2)
return open, close
except KeyError as e:
logger.error("Gemini Open Close Error: {}".format(e))
log("Gemini Open Close Error: {}".format(e), 'ERR')
sys.stdout.flush()
return 0, 0

View File

@ -2,6 +2,8 @@
import requests, json, sys
from src.utils.jsonLogger import log
def krakenCalculateOHLC(response):
open, high, low, close = 0, 0, 0, 0
@ -21,7 +23,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, logger):
def krakenVolAskBid(type):
try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri)
@ -37,10 +39,10 @@ def krakenVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Kraken Volume Ask Bid Error: {}".format(e))
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenOpenClose(type, logger):
def krakenOpenClose(type):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
@ -53,10 +55,10 @@ def krakenOpenClose(type, logger):
return fopen, fclose
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenHighLow(type, logger):
def krakenHighLow(type):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
@ -69,5 +71,5 @@ def krakenHighLow(type, logger):
return fhigh, flow
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0

View File

@ -6,13 +6,6 @@ import json
from pricing.exchanges.coinbase import coinbasePublicTicker
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
app = Flask(__name__)
@app.route('/health')
@ -22,7 +15,7 @@ def health():
@app.route('/readiness')
def readiness():
# Can it make a call to an exchange?
price = coinbasePublicTicker('btc_usd', logger)
price = coinbasePublicTicker('btc_usd')
if price != 0 :
return json.dumps({

0
src/utils/__init__.py Normal file
View File

View File

@ -3,6 +3,8 @@
import stomp
import os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
@ -15,10 +17,10 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, logger):
def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys()
logger.info("Attempting Connection to Artemis...")
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)

View File

@ -2,12 +2,14 @@
import requests, os, json, sys
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("DATABASE_URL")
def send(query, logger):
def send(query):
try:
uri = keys().uri + "/graphql"
headers = {'Content-type': 'application/json'}
@ -19,8 +21,8 @@ def send(query, logger):
return statusCode, response
except requests.exceptions.HTTPError as e:
logger.critical("Unable to send data down to db-gateway: {}".format(e))
log("Unable to send data down to db-gateway: {}".format(e), 'ERR')
sys.exit(1)
except requests.exceptions.RequestException as e:
logger.critical("Unable to send data down to db-gateway: {}".format(e))
log("Unable to send data down to db-gateway: {}".format(e), 'ERR')
sys.exit(1)

40
src/utils/jsonLogger.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
import logging
from pythonjsonlogger import jsonlogger
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
def setup_logging(log_level='INFO'):
logger = logging.getLogger(__name__)
logger.propagate = 0
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message)
elif level == 'WARN':
logger.warn(message)
elif level == 'ERR':
logger.error(message)
elif level == 'DEBUG':
logger.debug(message)