Compare commits

..

No commits in common. "master" and "1.0.0-b57" have entirely different histories.

11 changed files with 81 additions and 75 deletions

View File

@ -4,7 +4,7 @@ RUN apk update && \
apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask schedule numpy stomp.py python-json-logger
pip install python-dotenv coinbase flask schedule numpy stomp.py
COPY . /home/price-collector/.
EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -99,10 +99,10 @@ spec:
imagePullPolicy: Always
resources:
requests:
cpu: 25m
cpu: 32m
memory: 32Mi
limits:
cpu: 25m
cpu: 75m
memory: 64Mi
securityContext:
capabilities:

View File

@ -12,19 +12,22 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
# from src.utils.databaseConnect import send
from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import json, uuid
import json
from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, log)
krakenH, krakenL = krakenHighLow(type, log)
bitstampH, bitstampL = bitstampHighLow(type)
krakenH, krakenL = krakenHighLow(type)
bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2
@ -39,10 +42,10 @@ def averager(type):
return averagePrice, timestamp
def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, log)
kV, kA, kB = krakenVolAskBid(type, log)
bV, bA, bB = bitfinexVolAskBid(type, log)
gV, gA, gB = geminiVolAskBid(type, log)
bitV, bitA, bitB = bitstampVolAskBid(type)
kV, kA, kB = krakenVolAskBid(type)
bV, bA, bB = bitfinexVolAskBid(type)
gV, gA, gB = geminiVolAskBid(type)
v_array = np.array([bitV, kV, bV, gV])
@ -51,8 +54,8 @@ def getVol(type):
return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, log)
bH, bL = bitstampHighLow(type, log)
kH, kL = krakenHighLow(type)
bH, bL = bitstampHighLow(type)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
@ -63,8 +66,8 @@ def getHighLow(type):
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, log)
kO, kC = krakenOpenClose(type, log)
bO, bC = bitstampOpenClose(type)
kO, kC = krakenOpenClose(type)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
@ -74,32 +77,32 @@ def getOpenClose(type):
return open, close
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
# status, response = send(query, log)
with open(btc_usd, 'r') as file:
data = file.read()
# if status != 200:
# log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
# log("With Response of : {}".format(response), 'ERR')
# else:
# log("Query executed successfully with Status = {}".format(status), 'INFO')
# log("With Response of : {}".format(response), 'INFO')
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
status, response = send(query, log)
if status != 200:
log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
log("With Response of : {}".format(response), 'ERR')
else:
log("Query executed successfully with Status = {}".format(status), 'INFO')
log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
@ -119,12 +122,10 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4)
syncId = uuid.uuid4()
log("Sending message to PricingSave queue", 'INFO')
log("Sending message to PricingSave queue", 'INFO', syncId)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson)
activeMQSender(messageJson, syncId)
def timeFunction():
global time
@ -145,6 +146,8 @@ def collector(c_type):
global time
time = timeFunction()
print(time)
av_price, timestamp = averager(c_type)
vol = getVol(c_type)
high, low = getHighLow(c_type)
@ -155,11 +158,9 @@ def collector(c_type):
schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
log("Starting Historical Price Collector", 'INFO')
log("== Historical Price Collector ==", 'INFO')
collector(c_type)

View File

@ -2,7 +2,9 @@
import requests, json, sys
def bitfinexPublicTicker(type, log):
from src.utils.jsonLogger import log
def bitfinexPublicTicker(type):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -17,7 +19,7 @@ def bitfinexPublicTicker(type, log):
price = 0
return price
def bitfinexVolAskBid(type, log):
def bitfinexVolAskBid(type):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')

View File

@ -2,7 +2,9 @@
import requests, json, sys
def bitstampVolAskBid(type, log):
from src.utils.jsonLogger import log
def bitstampVolAskBid(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -17,7 +19,7 @@ def bitstampVolAskBid(type, log):
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def bitstampOpenClose(type, log):
def bitstampOpenClose(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -31,7 +33,7 @@ def bitstampOpenClose(type, log):
log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0
def bitstampHighLow(type, log):
def bitstampHighLow(type):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)

View File

@ -4,13 +4,15 @@ import sys, os
from coinbase.wallet.client import Client
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type, log):
def coinbasePublicTicker(type):
api_key = keys().api_key
api_secret = keys().api_secret

View File

@ -2,7 +2,9 @@
import requests, json, sys
def geminiPublicTicker(type, log):
from src.utils.jsonLogger import log
def geminiPublicTicker(type):
try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -19,7 +21,7 @@ def geminiPublicTicker(type, log):
price = 0
return price
def geminiVolAskBid(type, log):
def geminiVolAskBid(type):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)
@ -39,7 +41,7 @@ def geminiVolAskBid(type, log):
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def geminiDailyOpenClose(type, log):
def geminiDailyOpenClose(type):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)

View File

@ -2,6 +2,8 @@
import requests, json, sys
from src.utils.jsonLogger import log
def krakenCalculateOHLC(response):
open, high, low, close = 0, 0, 0, 0
@ -21,7 +23,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, log):
def krakenVolAskBid(type):
try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri)
@ -40,7 +42,7 @@ def krakenVolAskBid(type, log):
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenOpenClose(type, log):
def krakenOpenClose(type):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
@ -56,7 +58,7 @@ def krakenOpenClose(type, log):
log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenHighLow(type, log):
def krakenHighLow(type):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)

View File

@ -0,0 +1 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

@ -17,20 +17,14 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId):
def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO', syncId)
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.disconnect()

View File

@ -8,10 +8,10 @@ import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'):
if not log_record.get('timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now
log_record['timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
@ -23,18 +23,18 @@ def setup_logging(log_level='INFO'):
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level, syncId=""):
def log(message, level):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.info(message)
elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.warn(message)
elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.error(message)
elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.debug(message)