Compare commits

...

3 Commits

4 changed files with 37 additions and 34 deletions

View File

@ -12,16 +12,13 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send
# from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json
import json, uuid
from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
@ -77,23 +74,23 @@ def getOpenClose(type):
return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file:
data = file.read()
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log)
@ -122,11 +119,12 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4)
log("Sending message to PricingSave queue", 'INFO')
log("Message: {}".format(message), 'INFO')
syncId = uuid.uuid4()
activeMQSender(messageJson)
log("Sending message to PricingSave queue", 'INFO', syncId)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson, syncId)
def timeFunction():
global time

View File

@ -1 +0,0 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message):
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO')
log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level):
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message)
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message)
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message)
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message)
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})