#!/usr/bin/env python import datetime from time import sleep import schedule import numpy as np from pricing.exchanges.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from src.utils.databaseConnect import send from src.utils.activemqConnect import activeMQSender import json import logging as logger logger.basicConfig( level=logger.INFO, format="%(asctime)s: %(levelname)s -- %(message)s", datefmt='%Y-%m-%d %H:%M:%S' ) btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql" def averager(type): timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) bitstampH, bitstampL = bitstampHighLow(type, logger) krakenH, krakenL = krakenHighLow(type, logger) bitstamp_P = (bitstampH + bitstampL)/2 kraken_P = (krakenH + krakenL)/2 av_array = np.array([bitstamp_P, kraken_P]) averagePrice = av_array[np.nonzero(av_array)].mean() averagePrice = round(averagePrice, 2) logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice)) return averagePrice, timestamp def getVol(type): bitV, bitA, bitB = bitstampVolAskBid(type, logger) kV, kA, kB = krakenVolAskBid(type, logger) bV, bA, bB = bitfinexVolAskBid(type, logger) gV, gA, gB = geminiVolAskBid(type, logger) v_array = np.array([bitV, kV, bV, gV]) volume = v_array[np.nonzero(v_array)].mean() return volume def getHighLow(type): kH, kL = krakenHighLow(type, logger) bH, bL = bitstampHighLow(type, logger) h_array = np.array([kH, bH]) l_array = np.array([kL, bL]) high = h_array[np.nonzero(h_array)].mean() low = l_array[np.nonzero(l_array)].mean() return high, low def getOpenClose(type): bO, bC = bitstampOpenClose(type, logger) kO, kC = krakenOpenClose(type, logger) o_array = np.array([bO, kO]) c_array = np.array([bC, kC]) open = o_array[np.nonzero(o_array)].mean() close = c_array[np.nonzero(c_array)].mean() return open, close def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): with open(btc_usd, 'r') as file: data = file.read() strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' type = '"'+c_type+'"' query = data % (timestamp, type, round(av_price, 2), round(high, 2), round(low, 2), round(o_price, 2), round(c_price, 2), round(vol, 2)) logger.info("Query sending down to db-gateway -- ({})".format(query)) status, response = send(query, logger) if status != 200: logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status)) logger.error("With Response of : {}".format(response)) else: logger.info("Query executed successfully with Status = {}".format(status)) logger.info("With Response of : {}".format(response)) def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price): strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' message = { "timestamp" : timestamp, "type" : c_type, "average_price" : av_price, "high_price" : high, "low_price" : low, "open_price": o_price, "close_price" : c_price, "volume" : vol } # messageJson = json.dumps(message, indent = 4) messageJson = str(message) logger.info("Sending message to PricingSave queue") activeMQSender(messageJson, logger) def timeFunction(): global time time = datetime.datetime.now() time = time + datetime.timedelta(hours = 1) time = str(time) time = ":".join(time.split(":", 2)[:2]) time = time.split(" ")[1].lstrip().split(" ")[0] return time def collector(c_type): schedule.clear("collector") global time time = timeFunction() print(time) av_price, timestamp = averager(c_type) vol = getVol(c_type) high, low = getHighLow(c_type) o_price, c_price = getOpenClose(c_type) sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price) # sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) schedule.every().hour.at(time).do(collector, c_type).tag("collector") # Dynamically Spin up Child process for each type wanting to track def collectorMain(c_type): logger.info("== Historical Price Collector ==") collector(c_type) while True: schedule.run_pending() sleep(1)