Compare commits
No commits in common. "master" and "1.0.0-b67" have entirely different histories.
@ -99,10 +99,10 @@ spec:
|
|||||||
imagePullPolicy: Always
|
imagePullPolicy: Always
|
||||||
resources:
|
resources:
|
||||||
requests:
|
requests:
|
||||||
cpu: 25m
|
cpu: 32m
|
||||||
memory: 32Mi
|
memory: 32Mi
|
||||||
limits:
|
limits:
|
||||||
cpu: 25m
|
cpu: 75m
|
||||||
memory: 64Mi
|
memory: 64Mi
|
||||||
securityContext:
|
securityContext:
|
||||||
capabilities:
|
capabilities:
|
||||||
|
|||||||
@ -12,13 +12,16 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
|
|||||||
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
|
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
|
||||||
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
|
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
|
||||||
|
|
||||||
# from src.utils.databaseConnect import send
|
from src.utils.databaseConnect import send
|
||||||
from src.utils.activemqConnect import activeMQSender
|
from src.utils.activemqConnect import activeMQSender
|
||||||
|
|
||||||
import json, uuid
|
import json
|
||||||
|
|
||||||
from src.utils.jsonLogger import log
|
from src.utils.jsonLogger import log
|
||||||
|
|
||||||
|
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
|
||||||
|
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
|
||||||
|
|
||||||
def averager(type):
|
def averager(type):
|
||||||
|
|
||||||
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
|
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
|
||||||
@ -74,23 +77,23 @@ def getOpenClose(type):
|
|||||||
|
|
||||||
return open, close
|
return open, close
|
||||||
|
|
||||||
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
|
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
|
||||||
#
|
|
||||||
# with open(btc_usd, 'r') as file:
|
with open(btc_usd, 'r') as file:
|
||||||
# data = file.read()
|
data = file.read()
|
||||||
#
|
|
||||||
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
||||||
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
|
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
|
||||||
# type = '"'+c_type+'"'
|
type = '"'+c_type+'"'
|
||||||
#
|
|
||||||
# query = data % (timestamp, type, round(av_price, 2),
|
query = data % (timestamp, type, round(av_price, 2),
|
||||||
# round(high, 2),
|
round(high, 2),
|
||||||
# round(low, 2),
|
round(low, 2),
|
||||||
# round(o_price, 2),
|
round(o_price, 2),
|
||||||
# round(c_price, 2),
|
round(c_price, 2),
|
||||||
# round(vol, 2))
|
round(vol, 2))
|
||||||
#
|
|
||||||
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
|
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
|
||||||
|
|
||||||
# status, response = send(query, log)
|
# status, response = send(query, log)
|
||||||
|
|
||||||
@ -119,12 +122,11 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
|
|||||||
|
|
||||||
messageJson = json.dumps(message, indent = 4)
|
messageJson = json.dumps(message, indent = 4)
|
||||||
|
|
||||||
syncId = uuid.uuid4()
|
log("Sending message to PricingSave queue", 'INFO')
|
||||||
|
log("Message: {}".format(message), 'INFO')
|
||||||
|
|
||||||
log("Sending message to PricingSave queue", 'INFO', syncId)
|
activeMQSender(messageJson)
|
||||||
log("Message: {}".format(message), 'INFO', syncId)
|
|
||||||
|
|
||||||
activeMQSender(messageJson, syncId)
|
|
||||||
|
|
||||||
def timeFunction():
|
def timeFunction():
|
||||||
global time
|
global time
|
||||||
@ -145,6 +147,8 @@ def collector(c_type):
|
|||||||
global time
|
global time
|
||||||
time = timeFunction()
|
time = timeFunction()
|
||||||
|
|
||||||
|
print(time)
|
||||||
|
|
||||||
av_price, timestamp = averager(c_type)
|
av_price, timestamp = averager(c_type)
|
||||||
vol = getVol(c_type)
|
vol = getVol(c_type)
|
||||||
high, low = getHighLow(c_type)
|
high, low = getHighLow(c_type)
|
||||||
@ -159,7 +163,7 @@ def collector(c_type):
|
|||||||
|
|
||||||
# Dynamically Spin up Child process for each type wanting to track
|
# Dynamically Spin up Child process for each type wanting to track
|
||||||
def collectorMain(c_type):
|
def collectorMain(c_type):
|
||||||
log("Starting Historical Price Collector", 'INFO')
|
log("== Historical Price Collector ==", 'INFO')
|
||||||
|
|
||||||
collector(c_type)
|
collector(c_type)
|
||||||
|
|
||||||
|
|||||||
@ -0,0 +1 @@
|
|||||||
|
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }
|
||||||
@ -17,20 +17,14 @@ class keys():
|
|||||||
def returnKeys(self):
|
def returnKeys(self):
|
||||||
return self.addr, self.port, self.amqU, self.amqP
|
return self.addr, self.port, self.amqU, self.amqP
|
||||||
|
|
||||||
def activeMQSender(message, syncId):
|
def activeMQSender(message):
|
||||||
addr, port, mqUser, mqPass = keys().returnKeys()
|
addr, port, mqUser, mqPass = keys().returnKeys()
|
||||||
|
|
||||||
log("Attempting Connection to Artemis...", 'INFO', syncId)
|
log("Attempting Connection to Artemis...", 'INFO')
|
||||||
con = stomp.Connection([(addr, port)], auto_content_length=False)
|
con = stomp.Connection([(addr, port)], auto_content_length=False)
|
||||||
con.connect( mqUser, mqPass, wait=True)
|
con.connect( mqUser, mqPass, wait=True)
|
||||||
|
|
||||||
con.send("PricingSave",
|
con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
|
||||||
message,
|
|
||||||
content_type="application/json",
|
|
||||||
headers={
|
|
||||||
"Content-Type":"application/json",
|
|
||||||
"X-CRYPTO-Sync-ID":syncId
|
|
||||||
})
|
|
||||||
|
|
||||||
con.disconnect()
|
con.disconnect()
|
||||||
|
|
||||||
|
|||||||
@ -8,10 +8,10 @@ import datetime
|
|||||||
class CustomJsonFormatter(jsonlogger.JsonFormatter):
|
class CustomJsonFormatter(jsonlogger.JsonFormatter):
|
||||||
def add_fields(self, log_record, record, message_dict):
|
def add_fields(self, log_record, record, message_dict):
|
||||||
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
|
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
|
||||||
if not log_record.get('@timestamp'):
|
if not log_record.get('timestamp'):
|
||||||
# this doesn't use record.created, so it is slightly off
|
# this doesn't use record.created, so it is slightly off
|
||||||
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
|
||||||
log_record['@timestamp'] = now
|
log_record['timestamp'] = now
|
||||||
if log_record.get('level'):
|
if log_record.get('level'):
|
||||||
log_record['level'] = log_record['level'].upper()
|
log_record['level'] = log_record['level'].upper()
|
||||||
else:
|
else:
|
||||||
@ -23,18 +23,18 @@ def setup_logging(log_level='INFO'):
|
|||||||
logger.setLevel(log_level)
|
logger.setLevel(log_level)
|
||||||
logHandler = logging.StreamHandler()
|
logHandler = logging.StreamHandler()
|
||||||
|
|
||||||
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
|
formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
|
||||||
|
|
||||||
logHandler.setFormatter(formatter)
|
logHandler.setFormatter(formatter)
|
||||||
logger.addHandler(logHandler)
|
logger.addHandler(logHandler)
|
||||||
|
|
||||||
def log(message, level, syncId=""):
|
def log(message, level):
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
if level == 'INFO':
|
if level == 'INFO':
|
||||||
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
logger.info(message)
|
||||||
elif level == 'WARN':
|
elif level == 'WARN':
|
||||||
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
logger.warn(message)
|
||||||
elif level == 'ERR':
|
elif level == 'ERR':
|
||||||
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
logger.error(message)
|
||||||
elif level == 'DEBUG':
|
elif level == 'DEBUG':
|
||||||
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
logger.debug(message)
|
||||||
Loading…
x
Reference in New Issue
Block a user