[23.09.2020] Added STOMP configuration for sending message to Artemis rather than to graphQL of dbGateway

This commit is contained in:
Andy Sotheran 2020-09-23 19:28:09 +01:00
parent 36cec493b0
commit 14e340b724
5 changed files with 69 additions and 5 deletions

View File

@ -3,7 +3,7 @@ MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com>
RUN apk update && \ RUN apk update && \
apk add py-pip libc-dev gcc apk add py-pip libc-dev gcc
RUN pip install utils pycryptodome && \ RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask schedule numpy pip install python-dotenv coinbase flask schedule numpy stomp
COPY . /home/price-collector/. COPY . /home/price-collector/.
EXPOSE 9090 EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"] CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -42,9 +42,24 @@ spec:
resource: limits.memory resource: limits.memory
- name: DATABASE_URL - name: DATABASE_URL
valueFrom: valueFrom:
secretKeyRef: configMapKeyRef:
name: endpoints name: endpoints
key: dbGateway.url key: dbGateway.url
- name: AMQ_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: amqStomp.url
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
- name: DATABASE_PORT - name: DATABASE_PORT
valueFrom: valueFrom:
secretKeyRef: secretKeyRef:

View File

@ -13,6 +13,7 @@ from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitst
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import logging as logger import logging as logger
@ -107,6 +108,27 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
logger.info("Query executed successfully with Status = {}".format(status)) logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response)) logger.info("With Response of : {}".format(response))
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
message = {
"timestamp" : timestamp,
"type" : c_type,
"average_price" : av_price,
"high_price" : high,
"low_price" : low,
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
logger.info("Sending message to PricingSave queue")
activeMQSender(message, logger)
def timeFunction(): def timeFunction():
global time global time
@ -133,7 +155,8 @@ def collector(c_type):
high, low = getHighLow(c_type) high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type) o_price, c_price = getOpenClose(c_type)
sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector") schedule.every().hour.at(time).do(collector, c_type).tag("collector")

View File

@ -29,7 +29,7 @@ def bitstampOpenClose(type, logger):
return open, close return open, close
except KeyError as e: except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e)) logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0 return 0, 0
def bitstampHighLow(type, logger): def bitstampHighLow(type, logger):
try: try:
@ -43,4 +43,4 @@ def bitstampHighLow(type, logger):
return high, low return high, low
except KeyError as e: except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e)) logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0 return 0, 0

View File

@ -0,0 +1,26 @@
#!/usr/bin/env python
import stomp
import os
class keys():
def __init__(self):
self.uri = os.getenv("AMQ_URL")
self.amqU = os.getenv("BROKER_USER")
self.amqP = os.getenv("BROKER_PASSWORD")
self.addr = self.uri.split(':')[0]
self.port = self.uri.split(':')[1]
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
class activeMQSender(message, logger):
addr, port, mqUser, mqPass = keys().returnKeys()
logger.info("Attempting Connection to Artemis...")
con = stomp.Connection([(addr, port)])
con.connect( mqUser, mqPass, wait=True)
con.send('/queue/PricingSave', message)