Compare commits

...

3 Commits

4 changed files with 37 additions and 34 deletions

View File

@ -12,16 +12,13 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send # from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
import json import json, uuid
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type): def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
@ -77,23 +74,23 @@ def getOpenClose(type):
return open, close return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): # def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
with open(btc_usd, 'r') as file: # with open(btc_usd, 'r') as file:
data = file.read() # data = file.read()
#
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) # strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' # timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"' # type = '"'+c_type+'"'
#
query = data % (timestamp, type, round(av_price, 2), # query = data % (timestamp, type, round(av_price, 2),
round(high, 2), # round(high, 2),
round(low, 2), # round(low, 2),
round(o_price, 2), # round(o_price, 2),
round(c_price, 2), # round(c_price, 2),
round(vol, 2)) # round(vol, 2))
#
log("Query sending down to db-gateway -- ({})".format(query), 'INFO') # log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log) # status, response = send(query, log)
@ -122,11 +119,12 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to PricingSave queue", 'INFO') syncId = uuid.uuid4()
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson) log("Sending message to PricingSave queue", 'INFO', syncId)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson, syncId)
def timeFunction(): def timeFunction():
global time global time

View File

@ -1 +0,0 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message): def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"}) con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level): def log(message, level, syncId=""):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message) logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN': elif level == 'WARN':
logger.warn(message) logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR': elif level == 'ERR':
logger.error(message) logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message) logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})