Compare commits

..

No commits in common. "master" and "1.0.0-b62" have entirely different histories.

5 changed files with 62 additions and 48 deletions

View File

@ -99,10 +99,10 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 25m cpu: 32m
memory: 32Mi memory: 32Mi
limits: limits:
cpu: 25m cpu: 75m
memory: 64Mi memory: 64Mi
securityContext: securityContext:
capabilities: capabilities:

View File

@ -12,13 +12,16 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
# from src.utils.databaseConnect import send from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
import json, uuid import json
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type): def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
@ -35,6 +38,7 @@ def averager(type):
averagePrice = round(averagePrice, 2) averagePrice = round(averagePrice, 2)
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO') log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO')
sleep(0.5)
return averagePrice, timestamp return averagePrice, timestamp
@ -74,23 +78,23 @@ def getOpenClose(type):
return open, close return open, close
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file: with open(btc_usd, 'r') as file:
# data = file.read() data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"' type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2), query = data % (timestamp, type, round(av_price, 2),
# round(high, 2), round(high, 2),
# round(low, 2), round(low, 2),
# round(o_price, 2), round(o_price, 2),
# round(c_price, 2), round(c_price, 2),
# round(vol, 2)) round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO') log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log) # status, response = send(query, log)
@ -119,12 +123,13 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
syncId = uuid.uuid4() log("Sending message to PricingSave queue", 'INFO')
sleep(0.5)
log("Message: {}".format(message), 'INFO')
sleep(0.5)
log("Sending message to PricingSave queue", 'INFO', syncId) activeMQSender(messageJson)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson, syncId)
def timeFunction(): def timeFunction():
global time global time
@ -145,6 +150,8 @@ def collector(c_type):
global time global time
time = timeFunction() time = timeFunction()
print(time)
av_price, timestamp = averager(c_type) av_price, timestamp = averager(c_type)
vol = getVol(c_type) vol = getVol(c_type)
high, low = getHighLow(c_type) high, low = getHighLow(c_type)
@ -156,10 +163,12 @@ def collector(c_type):
schedule.every().hour.at(time).do(collector, c_type).tag("collector") schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO') log("Collection will run again at {} every hour".format(time), 'INFO')
sleep(0.5)
# Dynamically Spin up Child process for each type wanting to track # Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type): def collectorMain(c_type):
log("Starting Historical Price Collector", 'INFO') log("== Historical Price Collector ==", 'INFO')
sleep(0.5)
collector(c_type) collector(c_type)

View File

@ -0,0 +1 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

@ -1,9 +1,10 @@
#!/usr/bin/env python #!/usr/bin/env python
import stomp import stomp
import os import os, sys
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
from time import sleep
class keys(): class keys():
@ -17,20 +18,23 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId): def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO', syncId) log("Attempting Connection to Artemis...", 'INFO')
sleep(1)
try:
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
try:
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave", con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
except:
e = sys.exc_info()[0]
log("And Error occurred when sending to AMQ :: {}".format(e), 'ERR')
con.disconnect() con.disconnect()
con.disconnect()
except:
e = sys.exc_info()[0]
log("And Error occurred when connecting to AMQ :: {}".format(e), 'ERR')

View File

@ -8,10 +8,10 @@ import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter): class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict): def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict) super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'): if not log_record.get('timestamp'):
# this doesn't use record.created, so it is slightly off # this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now log_record['timestamp'] = now
if log_record.get('level'): if log_record.get('level'):
log_record['level'] = log_record['level'].upper() log_record['level'] = log_record['level'].upper()
else: else:
@ -23,18 +23,18 @@ def setup_logging(log_level='INFO'):
logger.setLevel(log_level) logger.setLevel(log_level)
logHandler = logging.StreamHandler() logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s') formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level, syncId=""): def log(message, level):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.info(message)
elif level == 'WARN': elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.warn(message)
elif level == 'ERR': elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.error(message)
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.debug(message)