price-collector/src/pricing/collector.py

113 lines
3.2 KiB
Python

#!/usr/bin/env python
from datetime import datetime, timedelta
import sys
from time import sleep
from pricing.bitfinex import bitfinexPublicTicker, bitfinexHighLowVol
from pricing.coinbase import coinbasePublicTicker
from pricing.gemini import geminiPublicTicker, geminiHighLowVol, geminiOpenClose
from utils.databaseConnect import send
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
btc_usd="src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
def averager(type):
timestamp = datetime.now()# + timedelta(hours=1)
coinbase_P = coinbasePublicTicker(type, logger)
bitfinex_P = bitfinexPublicTicker(type, logger)
gemini_P = geminiPublicTicker(type, logger)
if coinbase_P == 0 or bitfinex_P == 0 or gemini_P == 0:
if coinbase_P and bitfinex_P == 0:
averagePrice = gemini_P
return
elif coinbase_P and gemini_P == 0:
averagePrice = bitfinex_P
return
elif bitfinex_P and gemini_P == 0:
averagePrice = coinbase_P
return
averagePrice = (coinbase_P + bitfinex_P + gemini_P)/2
else:
averagePrice = (coinbase_P + bitfinex_P + gemini_P)/3
averagePrice = round(averagePrice, 3)
logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
return averagePrice, timestamp
def getHighLowVol(type):
bH, bL, bV = bitfinexHighLowVol(type, logger)
gH, gL, gV = geminiHighLowVol(type, logger)
if ( bH == 0 or bL == 0 or bV == 0 ) or ( gH == 0 or gL == 0 or gL == 0):
if bH == 0:
high = gH
elif gH == 0:
high = bH
if bL == 0:
low = gL
elif gL == 0:
low = bL
if bV == 0:
vol = gV
elif gV == 0:
vol = bV
else:
high = (bH + gH)/2
low = (bL + gL)/2
vol = (bV + gV)/2
return high, low, vol
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file:
data = file.read()
timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, av_price, high, low, o_price, c_price, vol)
logger.info("Query sending down to db-gateway -- ({})".format(query))
status, response = send(query, logger)
if status != '200':
logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
logger.error("With Response of : {}".format(response))
else:
logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response))
def getOpenClose(type):
open, close = geminiOpenClose(type, logger)
return open, close
# Dynamically Spin up Child process for each type wanting to track
def collector(c_type):
logger.info("== Historical Price Collector ==")
while True:
av_price, timestamp = averager(c_type)
high, low, vol = getHighLowVol(c_type)
o_price, c_price = getOpenClose(c_type)
sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
sleep(3600)