From 3fecb1b49182525d1a35c5442d700e6c2e17734e Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Tue, 6 Oct 2020 14:38:36 +0100 Subject: [PATCH] [06.10.20] Structured logging for Kibana monitoring --- src/main.py | 3 ++ src/pricing/collector.py | 50 +++++++++++++------------------ src/pricing/exchanges/bitfinex.py | 10 ++++--- src/pricing/exchanges/bitstamp.py | 14 +++++---- src/pricing/exchanges/coinbase.py | 6 ++-- src/pricing/exchanges/gemini.py | 18 ++++++----- src/pricing/exchanges/kraken.py | 14 +++++---- src/probes/probes.py | 9 +----- src/utils/__init__.py | 0 src/utils/activemqConnect.py | 6 ++-- src/utils/databaseConnect.py | 8 +++-- src/utils/jsonLogger.py | 40 +++++++++++++++++++++++++ 12 files changed, 110 insertions(+), 68 deletions(-) create mode 100644 src/utils/__init__.py create mode 100644 src/utils/jsonLogger.py diff --git a/src/main.py b/src/main.py index 13ca76d..7ad5b70 100644 --- a/src/main.py +++ b/src/main.py @@ -6,6 +6,8 @@ sys.path.append('/home/price-collector/') from threading import Thread from pricing.collector import collectorMain +from src.utils.jsonLogger import setup_logging + from probes.probes import runFlaskProbes def callCollector(args): @@ -15,6 +17,7 @@ def callProbes(): runFlaskProbes() if __name__=='__main__': + setup_logging() Thread(target=callProbes).start() diff --git a/src/pricing/collector.py b/src/pricing/collector.py index 7c72b92..c7e463a 100644 --- a/src/pricing/collector.py +++ b/src/pricing/collector.py @@ -17,13 +17,7 @@ from src.utils.activemqConnect import activeMQSender import json -import logging as logger - -logger.basicConfig( - level=logger.INFO, - format="%(asctime)s: %(levelname)s -- %(message)s", - datefmt='%Y-%m-%d %H:%M:%S' -) +from src.utils.jsonLogger import log btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql" @@ -32,8 +26,8 @@ def averager(type): timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) - bitstampH, bitstampL = bitstampHighLow(type, logger) - krakenH, krakenL = krakenHighLow(type, logger) + bitstampH, bitstampL = bitstampHighLow(type) + krakenH, krakenL = krakenHighLow(type) bitstamp_P = (bitstampH + bitstampL)/2 kraken_P = (krakenH + krakenL)/2 @@ -43,15 +37,15 @@ def averager(type): averagePrice = round(averagePrice, 2) - logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice)) + log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO') return averagePrice, timestamp def getVol(type): - bitV, bitA, bitB = bitstampVolAskBid(type, logger) - kV, kA, kB = krakenVolAskBid(type, logger) - bV, bA, bB = bitfinexVolAskBid(type, logger) - gV, gA, gB = geminiVolAskBid(type, logger) + bitV, bitA, bitB = bitstampVolAskBid(type) + kV, kA, kB = krakenVolAskBid(type) + bV, bA, bB = bitfinexVolAskBid(type) + gV, gA, gB = geminiVolAskBid(type) v_array = np.array([bitV, kV, bV, gV]) @@ -60,8 +54,8 @@ def getVol(type): return volume def getHighLow(type): - kH, kL = krakenHighLow(type, logger) - bH, bL = bitstampHighLow(type, logger) + kH, kL = krakenHighLow(type) + bH, bL = bitstampHighLow(type) h_array = np.array([kH, bH]) l_array = np.array([kL, bL]) @@ -72,8 +66,8 @@ def getHighLow(type): return high, low def getOpenClose(type): - bO, bC = bitstampOpenClose(type, logger) - kO, kC = krakenOpenClose(type, logger) + bO, bC = bitstampOpenClose(type) + kO, kC = krakenOpenClose(type) o_array = np.array([bO, kO]) c_array = np.array([bC, kC]) @@ -99,16 +93,16 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) round(c_price, 2), round(vol, 2)) - logger.info("Query sending down to db-gateway -- ({})".format(query)) + log("Query sending down to db-gateway -- ({})".format(query), 'INFO') - status, response = send(query, logger) + status, response = send(query, log) if status != 200: - logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status)) - logger.error("With Response of : {}".format(response)) + log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN') + log("With Response of : {}".format(response), 'ERR') else: - logger.info("Query executed successfully with Status = {}".format(status)) - logger.info("With Response of : {}".format(response)) + log("Query executed successfully with Status = {}".format(status), 'INFO') + log("With Response of : {}".format(response), 'INFO') def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price): @@ -128,11 +122,9 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) messageJson = json.dumps(message, indent = 4) - # messageJson = str(message) + log("Sending message to PricingSave queue", 'INFO') - logger.info("Sending message to PricingSave queue") - - activeMQSender(messageJson, logger) + activeMQSender(messageJson) def timeFunction(): @@ -168,7 +160,7 @@ def collector(c_type): # Dynamically Spin up Child process for each type wanting to track def collectorMain(c_type): - logger.info("== Historical Price Collector ==") + log("== Historical Price Collector ==", 'INFO') collector(c_type) diff --git a/src/pricing/exchanges/bitfinex.py b/src/pricing/exchanges/bitfinex.py index 1fbc123..dff6dc8 100644 --- a/src/pricing/exchanges/bitfinex.py +++ b/src/pricing/exchanges/bitfinex.py @@ -2,7 +2,9 @@ import requests, json, sys -def bitfinexPublicTicker(type, logger): +from src.utils.jsonLogger import log + +def bitfinexPublicTicker(type): try: uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') @@ -13,11 +15,11 @@ def bitfinexPublicTicker(type, logger): price = round(price, 2) return price except KeyError as e: - logger.error("Bitfinex Spot Price Error: {}".format(e)) + log("Bitfinex Spot Price Error: {}".format(e), 'ERR') price = 0 return price -def bitfinexVolAskBid(type, logger): +def bitfinexVolAskBid(type): try: uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') @@ -32,5 +34,5 @@ def bitfinexVolAskBid(type, logger): return vol, ask, bid except KeyError as e: - logger.error("Bitfinex High Low Volume Error: {}".format(e)) + log("Bitfinex High Low Volume Error: {}".format(e), 'ERR') return 0, 0, 0 \ No newline at end of file diff --git a/src/pricing/exchanges/bitstamp.py b/src/pricing/exchanges/bitstamp.py index b86af16..fdbf830 100644 --- a/src/pricing/exchanges/bitstamp.py +++ b/src/pricing/exchanges/bitstamp.py @@ -2,7 +2,9 @@ import requests, json, sys -def bitstampVolAskBid(type, logger): +from src.utils.jsonLogger import log + +def bitstampVolAskBid(type): try: uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" response = requests.request("GET", uri) @@ -14,10 +16,10 @@ def bitstampVolAskBid(type, logger): return vol, ask, bid except KeyError as e: - logger.error("Bitstamp Volume Ask Bid Error: {}".format(e)) + log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR') return 0, 0, 0 -def bitstampOpenClose(type, logger): +def bitstampOpenClose(type): try: uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" response = requests.request("GET", uri) @@ -28,10 +30,10 @@ def bitstampOpenClose(type, logger): return open, close except KeyError as e: - logger.error("Bitstamp Open Close Error: {}".format(e)) + log("Bitstamp Open Close Error: {}".format(e), 'ERR') return 0, 0 -def bitstampHighLow(type, logger): +def bitstampHighLow(type): try: uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" response = requests.request("GET", uri) @@ -42,5 +44,5 @@ def bitstampHighLow(type, logger): return high, low except KeyError as e: - logger.error("Bitstamp Open Close Error: {}".format(e)) + log("Bitstamp Open Close Error: {}".format(e), 'ERR') return 0, 0 \ No newline at end of file diff --git a/src/pricing/exchanges/coinbase.py b/src/pricing/exchanges/coinbase.py index c816d1b..7061507 100644 --- a/src/pricing/exchanges/coinbase.py +++ b/src/pricing/exchanges/coinbase.py @@ -4,13 +4,15 @@ import sys, os from coinbase.wallet.client import Client +from src.utils.jsonLogger import log + class keys(): def __init__(self): self.api_key = os.getenv('COINBASE_KEY') self.api_secret = os.getenv("COINBASE_SECRET") -def coinbasePublicTicker(type, logger): +def coinbasePublicTicker(type): api_key = keys().api_key api_secret = keys().api_secret @@ -24,6 +26,6 @@ def coinbasePublicTicker(type, logger): price = round(price, 2) return price except KeyError as e: - logger.error("Coinbase Spot Price Error: {}".format(e)) + log("Coinbase Spot Price Error: {}".format(e), 'ERR') price = 0 return price \ No newline at end of file diff --git a/src/pricing/exchanges/gemini.py b/src/pricing/exchanges/gemini.py index ae8c063..7e0f2f5 100644 --- a/src/pricing/exchanges/gemini.py +++ b/src/pricing/exchanges/gemini.py @@ -2,7 +2,9 @@ import requests, json, sys -def geminiPublicTicker(type, logger): +from src.utils.jsonLogger import log + +def geminiPublicTicker(type): try: uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') @@ -15,11 +17,11 @@ def geminiPublicTicker(type, logger): return price except KeyError as e: - logger.error("Gemini Spot Price Error: {}".format(e)) + log("Gemini Spot Price Error: {}".format(e), 'ERR') price = 0 return price -def geminiVolAskBid(type, logger): +def geminiVolAskBid(type): try: uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') response = requests.request("GET", uri) @@ -36,20 +38,20 @@ def geminiVolAskBid(type, logger): return vol, ask, bid except KeyError as e: - logger.error("Gemini Volume Ask Bid Error: {}".format(e)) + log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR') return 0, 0, 0 -def geminiDailyOpenClose(type, logger): +def geminiDailyOpenClose(type): try: uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') response = requests.request("GET", uri) response = json.loads(response.text) - dailyOpen = round(float(response['open']), 2) - dailyClose = round(float(response['close']), 2) + open = round(float(response['open']), 2) + close = round(float(response['close']), 2) return open, close except KeyError as e: - logger.error("Gemini Open Close Error: {}".format(e)) + log("Gemini Open Close Error: {}".format(e), 'ERR') sys.stdout.flush() return 0, 0 diff --git a/src/pricing/exchanges/kraken.py b/src/pricing/exchanges/kraken.py index 3439c9f..bd5dd20 100644 --- a/src/pricing/exchanges/kraken.py +++ b/src/pricing/exchanges/kraken.py @@ -2,6 +2,8 @@ import requests, json, sys +from src.utils.jsonLogger import log + def krakenCalculateOHLC(response): open, high, low, close = 0, 0, 0, 0 @@ -21,7 +23,7 @@ def krakenCalculateOHLC(response): return fopen, fhigh, flow, fclose -def krakenVolAskBid(type, logger): +def krakenVolAskBid(type): try: uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') response = requests.request("GET", uri) @@ -37,10 +39,10 @@ def krakenVolAskBid(type, logger): return vol, ask, bid except KeyError as e: - logger.error("Kraken Volume Ask Bid Error: {}".format(e)) + log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR') return 0, 0, 0 -def krakenOpenClose(type, logger): +def krakenOpenClose(type): try: uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" response = requests.request("GET", uri) @@ -53,10 +55,10 @@ def krakenOpenClose(type, logger): return fopen, fclose except KeyError as e: - logger.error("Kraken Open Close Error: {}".format(e)) + log("Kraken Open Close Error: {}".format(e), 'ERR') return 0, 0, 0 -def krakenHighLow(type, logger): +def krakenHighLow(type): try: uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" response = requests.request("GET", uri) @@ -69,5 +71,5 @@ def krakenHighLow(type, logger): return fhigh, flow except KeyError as e: - logger.error("Kraken Open Close Error: {}".format(e)) + log("Kraken Open Close Error: {}".format(e), 'ERR') return 0, 0 \ No newline at end of file diff --git a/src/probes/probes.py b/src/probes/probes.py index eab776f..511c8ac 100644 --- a/src/probes/probes.py +++ b/src/probes/probes.py @@ -6,13 +6,6 @@ import json from pricing.exchanges.coinbase import coinbasePublicTicker -import logging as logger -logger.basicConfig( - level=logger.INFO, - format="%(asctime)s: %(levelname)s -- %(message)s", - datefmt='%Y-%m-%d %H:%M:%S' -) - app = Flask(__name__) @app.route('/health') @@ -22,7 +15,7 @@ def health(): @app.route('/readiness') def readiness(): # Can it make a call to an exchange? - price = coinbasePublicTicker('btc_usd', logger) + price = coinbasePublicTicker('btc_usd') if price != 0 : return json.dumps({ diff --git a/src/utils/__init__.py b/src/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/utils/activemqConnect.py b/src/utils/activemqConnect.py index 2affece..4430167 100644 --- a/src/utils/activemqConnect.py +++ b/src/utils/activemqConnect.py @@ -3,6 +3,8 @@ import stomp import os +from src.utils.jsonLogger import log + class keys(): def __init__(self): @@ -15,10 +17,10 @@ class keys(): def returnKeys(self): return self.addr, self.port, self.amqU, self.amqP -def activeMQSender(message, logger): +def activeMQSender(message): addr, port, mqUser, mqPass = keys().returnKeys() - logger.info("Attempting Connection to Artemis...") + log("Attempting Connection to Artemis...", 'INFO') con = stomp.Connection([(addr, port)], auto_content_length=False) con.connect( mqUser, mqPass, wait=True) diff --git a/src/utils/databaseConnect.py b/src/utils/databaseConnect.py index 18abc51..00bf7cf 100644 --- a/src/utils/databaseConnect.py +++ b/src/utils/databaseConnect.py @@ -2,12 +2,14 @@ import requests, os, json, sys +from src.utils.jsonLogger import log + class keys(): def __init__(self): self.uri = os.getenv("DATABASE_URL") -def send(query, logger): +def send(query): try: uri = keys().uri + "/graphql" headers = {'Content-type': 'application/json'} @@ -19,8 +21,8 @@ def send(query, logger): return statusCode, response except requests.exceptions.HTTPError as e: - logger.critical("Unable to send data down to db-gateway: {}".format(e)) + log("Unable to send data down to db-gateway: {}".format(e), 'ERR') sys.exit(1) except requests.exceptions.RequestException as e: - logger.critical("Unable to send data down to db-gateway: {}".format(e)) + log("Unable to send data down to db-gateway: {}".format(e), 'ERR') sys.exit(1) \ No newline at end of file diff --git a/src/utils/jsonLogger.py b/src/utils/jsonLogger.py new file mode 100644 index 0000000..54377ac --- /dev/null +++ b/src/utils/jsonLogger.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python + +import logging +from pythonjsonlogger import jsonlogger + +import datetime + +class CustomJsonFormatter(jsonlogger.JsonFormatter): + def add_fields(self, log_record, record, message_dict): + super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict) + if not log_record.get('timestamp'): + # this doesn't use record.created, so it is slightly off + now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') + log_record['timestamp'] = now + if log_record.get('level'): + log_record['level'] = log_record['level'].upper() + else: + log_record['level'] = record.levelname + +def setup_logging(log_level='INFO'): + logger = logging.getLogger(__name__) + logger.propagate = 0 + logger.setLevel(log_level) + logHandler = logging.StreamHandler() + + formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s') + + logHandler.setFormatter(formatter) + logger.addHandler(logHandler) + +def log(message, level): + logger = logging.getLogger(__name__) + if level == 'INFO': + logger.info(message) + elif level == 'WARN': + logger.warn(message) + elif level == 'ERR': + logger.error(message) + elif level == 'DEBUG': + logger.debug(message) \ No newline at end of file