From d39996da0ae6095392240c5125bb4b353dd18143 Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Mon, 12 Oct 2020 14:48:52 +0100 Subject: [PATCH] [11.10.20] Passing SynId through AMQ and logging, removed graphql query --- src/pricing/collector.py | 48 +++++++++---------- .../V1_INSERT_NEW_PRICE_RECORD_BTC.graphql | 1 - src/utils/activemqConnect.py | 7 ++- src/utils/jsonLogger.py | 10 ++-- 4 files changed, 31 insertions(+), 35 deletions(-) delete mode 100644 src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql diff --git a/src/pricing/collector.py b/src/pricing/collector.py index 4d03523..cc32194 100644 --- a/src/pricing/collector.py +++ b/src/pricing/collector.py @@ -12,16 +12,13 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose -from src.utils.databaseConnect import send +# from src.utils.databaseConnect import send from src.utils.activemqConnect import activeMQSender -import json +import json, uuid from src.utils.jsonLogger import log -btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" -xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql" - def averager(type): timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) @@ -77,23 +74,23 @@ def getOpenClose(type): return open, close -def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): - - with open(btc_usd, 'r') as file: - data = file.read() - - strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) - timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' - type = '"'+c_type+'"' - - query = data % (timestamp, type, round(av_price, 2), - round(high, 2), - round(low, 2), - round(o_price, 2), - round(c_price, 2), - round(vol, 2)) - - log("Query sending down to db-gateway -- ({})".format(query), 'INFO') +# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): +# +# with open(btc_usd, 'r') as file: +# data = file.read() +# +# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) +# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' +# type = '"'+c_type+'"' +# +# query = data % (timestamp, type, round(av_price, 2), +# round(high, 2), +# round(low, 2), +# round(o_price, 2), +# round(c_price, 2), +# round(vol, 2)) +# +# log("Query sending down to db-gateway -- ({})".format(query), 'INFO') # status, response = send(query, log) @@ -122,11 +119,12 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) messageJson = json.dumps(message, indent = 4) - log("Sending message to PricingSave queue", 'INFO') - log("Message: {}".format(message), 'INFO') + syncId = uuid.uuid4() - activeMQSender(messageJson) + log("Sending message to PricingSave queue", 'INFO', syncId) + log("Message: {}".format(message), 'INFO', syncId) + activeMQSender(messageJson, syncId) def timeFunction(): global time diff --git a/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql b/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql deleted file mode 100644 index 7a47952..0000000 --- a/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql +++ /dev/null @@ -1 +0,0 @@ -mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } } \ No newline at end of file diff --git a/src/utils/activemqConnect.py b/src/utils/activemqConnect.py index d160297..714a8d0 100644 --- a/src/utils/activemqConnect.py +++ b/src/utils/activemqConnect.py @@ -1,7 +1,7 @@ #!/usr/bin/env python import stomp -import os, uuid +import os from src.utils.jsonLogger import log @@ -17,11 +17,10 @@ class keys(): def returnKeys(self): return self.addr, self.port, self.amqU, self.amqP -def activeMQSender(message): +def activeMQSender(message, syncId): addr, port, mqUser, mqPass = keys().returnKeys() - syncId = uuid.uuid4() - log("Attempting Connection to Artemis...", 'INFO') + log("Attempting Connection to Artemis...", 'INFO', syncId) con = stomp.Connection([(addr, port)], auto_content_length=False) con.connect( mqUser, mqPass, wait=True) diff --git a/src/utils/jsonLogger.py b/src/utils/jsonLogger.py index a781145..0558544 100644 --- a/src/utils/jsonLogger.py +++ b/src/utils/jsonLogger.py @@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'): logHandler.setFormatter(formatter) logger.addHandler(logHandler) -def log(message, level): +def log(message, level, syncId): logger = logging.getLogger(__name__) if level == 'INFO': - logger.info(message) + logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'WARN': - logger.warn(message) + logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'ERR': - logger.error(message) + logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'DEBUG': - logger.debug(message) \ No newline at end of file + logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId}) \ No newline at end of file