price-collector/src/pricing/collector.py
2020-10-06 22:52:19 +01:00

177 lines
5.0 KiB
Python

#!/usr/bin/env python
import datetime
from time import sleep
import schedule
import numpy as np
from pricing.exchanges.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json
from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, log)
krakenH, krakenL = krakenHighLow(type, log)
bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2
av_array = np.array([bitstamp_P, kraken_P])
averagePrice = av_array[np.nonzero(av_array)].mean()
averagePrice = round(averagePrice, 2)
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO')
sleep(0.05)
return averagePrice, timestamp
def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, log)
kV, kA, kB = krakenVolAskBid(type, log)
bV, bA, bB = bitfinexVolAskBid(type, log)
gV, gA, gB = geminiVolAskBid(type, log)
v_array = np.array([bitV, kV, bV, gV])
volume = v_array[np.nonzero(v_array)].mean()
return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, log)
bH, bL = bitstampHighLow(type, log)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
high = h_array[np.nonzero(h_array)].mean()
low = l_array[np.nonzero(l_array)].mean()
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, log)
kO, kC = krakenOpenClose(type, log)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
open = o_array[np.nonzero(o_array)].mean()
close = c_array[np.nonzero(c_array)].mean()
return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file:
data = file.read()
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log)
# if status != 200:
# log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
# log("With Response of : {}".format(response), 'ERR')
# else:
# log("Query executed successfully with Status = {}".format(status), 'INFO')
# log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = {
"timestamp" : timestamp,
"type" : c_type,
"average_price" : av_price,
"high_price" : high,
"low_price" : low,
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
messageJson = json.dumps(message, indent = 4)
log("Sending message to PricingSave queue", 'INFO')
sleep(0.05)
log("Message: {}".format(message), 'INFO')
sleep(0.05)
activeMQSender(messageJson)
def timeFunction():
global time
time = datetime.datetime.now()
time = time + datetime.timedelta(hours = 1)
time = str(time)
time = ":".join(time.split(":", 2)[:2])
time = time.split(" ")[1].lstrip().split(" ")[0]
return time
def collector(c_type):
schedule.clear("collector")
global time
time = timeFunction()
print(time)
av_price, timestamp = averager(c_type)
vol = getVol(c_type)
high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type)
sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
sleep(0.05)
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
log("== Historical Price Collector ==", 'INFO')
sleep(0.05)
collector(c_type)
while True:
schedule.run_pending()
sleep(1)