From 14e340b7241cc64e223ceb142428ae4778212f9a Mon Sep 17 00:00:00 2001 From: Andy Sotheran Date: Wed, 23 Sep 2020 19:28:09 +0100 Subject: [PATCH] [23.09.2020] Added STOMP configuration for sending message to Artemis rather than to graphQL of dbGateway --- Dockerfile | 2 +- configuration/kubernetes/deployment.yaml | 17 +++++++++++++++- src/pricing/collector.py | 25 ++++++++++++++++++++++- src/pricing/exchanges/bitstamp.py | 4 ++-- src/utils/activemqConnect.py | 26 ++++++++++++++++++++++++ 5 files changed, 69 insertions(+), 5 deletions(-) create mode 100644 src/utils/activemqConnect.py diff --git a/Dockerfile b/Dockerfile index 960d13f..8dad54b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -3,7 +3,7 @@ MAINTAINER Andrew Sotheran RUN apk update && \ apk add py-pip libc-dev gcc RUN pip install utils pycryptodome && \ - pip install python-dotenv coinbase flask schedule numpy + pip install python-dotenv coinbase flask schedule numpy stomp COPY . /home/price-collector/. EXPOSE 9090 CMD ["python", "/home/price-collector/src/main.py"] \ No newline at end of file diff --git a/configuration/kubernetes/deployment.yaml b/configuration/kubernetes/deployment.yaml index 70f1a6f..91a2aa3 100644 --- a/configuration/kubernetes/deployment.yaml +++ b/configuration/kubernetes/deployment.yaml @@ -42,9 +42,24 @@ spec: resource: limits.memory - name: DATABASE_URL valueFrom: - secretKeyRef: + configMapKeyRef: name: endpoints key: dbGateway.url + - name: AMQ_URL + valueFrom: + configMapKeyRef: + name: endpoints + key: amqStomp.url + - name: BROKER_USER + valueFrom: + secretKeyRef: + name: amq + key: amq.username + - name: BROKER_PASSWORD + valueFrom: + secretKeyRef: + name: amq + key: amq.password - name: DATABASE_PORT valueFrom: secretKeyRef: diff --git a/src/pricing/collector.py b/src/pricing/collector.py index 51dbfc8..6c62cac 100644 --- a/src/pricing/collector.py +++ b/src/pricing/collector.py @@ -13,6 +13,7 @@ from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitst from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from src.utils.databaseConnect import send +from src.utils.activemqConnect import activeMQSender import logging as logger @@ -107,6 +108,27 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) logger.info("Query executed successfully with Status = {}".format(status)) logger.info("With Response of : {}".format(response)) +def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price): + + strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) + timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' + + message = { + "timestamp" : timestamp, + "type" : c_type, + "average_price" : av_price, + "high_price" : high, + "low_price" : low, + "open_price": o_price, + "close_price" : c_price, + "volume" : vol + } + + logger.info("Sending message to PricingSave queue") + + activeMQSender(message, logger) + + def timeFunction(): global time @@ -133,7 +155,8 @@ def collector(c_type): high, low = getHighLow(c_type) o_price, c_price = getOpenClose(c_type) - sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) + sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) + # sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) schedule.every().hour.at(time).do(collector, c_type).tag("collector") diff --git a/src/pricing/exchanges/bitstamp.py b/src/pricing/exchanges/bitstamp.py index 097794c..b86af16 100644 --- a/src/pricing/exchanges/bitstamp.py +++ b/src/pricing/exchanges/bitstamp.py @@ -29,7 +29,7 @@ def bitstampOpenClose(type, logger): return open, close except KeyError as e: logger.error("Bitstamp Open Close Error: {}".format(e)) - return 0, 0, 0 + return 0, 0 def bitstampHighLow(type, logger): try: @@ -43,4 +43,4 @@ def bitstampHighLow(type, logger): return high, low except KeyError as e: logger.error("Bitstamp Open Close Error: {}".format(e)) - return 0, 0, 0 \ No newline at end of file + return 0, 0 \ No newline at end of file diff --git a/src/utils/activemqConnect.py b/src/utils/activemqConnect.py new file mode 100644 index 0000000..e1c5f95 --- /dev/null +++ b/src/utils/activemqConnect.py @@ -0,0 +1,26 @@ +#!/usr/bin/env python + +import stomp +import os + +class keys(): + + def __init__(self): + self.uri = os.getenv("AMQ_URL") + self.amqU = os.getenv("BROKER_USER") + self.amqP = os.getenv("BROKER_PASSWORD") + self.addr = self.uri.split(':')[0] + self.port = self.uri.split(':')[1] + + def returnKeys(self): + return self.addr, self.port, self.amqU, self.amqP + +class activeMQSender(message, logger): + addr, port, mqUser, mqPass = keys().returnKeys() + + logger.info("Attempting Connection to Artemis...") + con = stomp.Connection([(addr, port)]) + con.connect( mqUser, mqPass, wait=True) + + con.send('/queue/PricingSave', message) +