price-collector/src/pricing/collector.py

177 lines
5.0 KiB
Python

#!/usr/bin/env python
import datetime
from time import sleep
import schedule
import numpy as np
from pricing.exchanges.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, logger)
krakenH, krakenL = krakenHighLow(type, logger)
bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2
av_array = np.array([bitstamp_P, kraken_P])
averagePrice = av_array[np.nonzero(av_array)].mean()
averagePrice = round(averagePrice, 2)
logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
return averagePrice, timestamp
def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, logger)
kV, kA, kB = krakenVolAskBid(type, logger)
bV, bA, bB = bitfinexVolAskBid(type, logger)
gV, gA, gB = geminiVolAskBid(type, logger)
v_array = np.array([bitV, kV, bV, gV])
volume = v_array[np.nonzero(v_array)].mean()
return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, logger)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
high = h_array[np.nonzero(h_array)].mean()
low = l_array[np.nonzero(l_array)].mean()
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, logger)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
open = o_array[np.nonzero(o_array)].mean()
close = c_array[np.nonzero(c_array)].mean()
return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file:
data = file.read()
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
logger.info("Query sending down to db-gateway -- ({})".format(query))
status, response = send(query, logger)
if status != 200:
logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
logger.error("With Response of : {}".format(response))
else:
logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response))
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
message = {
"timestamp" : timestamp,
"type" : c_type,
"average_price" : av_price,
"high_price" : high,
"low_price" : low,
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
# messageJson = json.dumps(message, indent = 4)
messageJson = str(message)
logger.info("Sending message to PricingSave queue")
activeMQSender(messageJson, logger)
def timeFunction():
global time
time = datetime.datetime.now()
time = time + datetime.timedelta(hours = 1)
time = str(time)
time = ":".join(time.split(":", 2)[:2])
time = time.split(" ")[1].lstrip().split(" ")[0]
return time
def collector(c_type):
schedule.clear("collector")
global time
time = timeFunction()
print(time)
av_price, timestamp = averager(c_type)
vol = getVol(c_type)
high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type)
sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector")
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
logger.info("== Historical Price Collector ==")
collector(c_type)
while True:
schedule.run_pending()
sleep(1)