Compare commits

...

13 Commits

Author SHA1 Message Date
andrewso
2a2acbef03 [11.10.20] Passing SynId through AMQ and logging, removed graphql query 2020-10-12 15:05:32 +01:00
andrewso
d39996da0a [11.10.20] Passing SynId through AMQ and logging, removed graphql query 2020-10-12 14:48:52 +01:00
andrewso
abd53bce27 [11.10.20] syncId generation 2020-10-12 14:01:51 +01:00
andrewso
aeb332c15a [10.10.20] Reduced CPU limit 2020-10-11 14:07:09 +01:00
andrewso
d44ae664e8 [10.10.20] Reduced CPU limit 2020-10-11 14:03:03 +01:00
andrewso
25a01025b8 [07.10.20] Log timestamp with @ 2020-10-07 10:03:53 +01:00
366ea11345 Revert "[06.10.20] Fixed some logging"
This reverts commit aa8b22a8
2020-10-06 22:52:30 +01:00
d727655cdf Revert "[06.10.20] Log timing and amq error handling"
This reverts commit 2f94bac9
2020-10-06 22:52:19 +01:00
andrewso
2f94bac950 [06.10.20] Log timing and amq error handling 2020-10-06 18:57:32 +01:00
andrewso
aa8b22a843 [06.10.20] Fixed some logging 2020-10-06 16:58:12 +01:00
andrewso
996f4a45df [06.10.20] Fixed some logging 2020-10-06 16:01:08 +01:00
andrewso
39e4eee1ec [06.10.20] forgot to include package 2020-10-06 15:10:23 +01:00
andrewso
1bc359941b [06.10.20] forgot to include package 2020-10-06 15:06:46 +01:00
11 changed files with 75 additions and 81 deletions

View File

@ -4,7 +4,7 @@ RUN apk update && \
apk add py-pip libc-dev gcc apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \ RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask schedule numpy stomp.py pip install python-dotenv coinbase flask schedule numpy stomp.py python-json-logger
COPY . /home/price-collector/. COPY . /home/price-collector/.
EXPOSE 9090 EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"] CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -99,10 +99,10 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 32m cpu: 25m
memory: 32Mi memory: 32Mi
limits: limits:
cpu: 75m cpu: 25m
memory: 64Mi memory: 64Mi
securityContext: securityContext:
capabilities: capabilities:

View File

@ -12,22 +12,19 @@ from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, gemini
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send # from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
import json import json, uuid
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
xmr_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_XMR.graphql"
def averager(type): def averager(type):
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1) timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type) bitstampH, bitstampL = bitstampHighLow(type, log)
krakenH, krakenL = krakenHighLow(type) krakenH, krakenL = krakenHighLow(type, log)
bitstamp_P = (bitstampH + bitstampL)/2 bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2 kraken_P = (krakenH + krakenL)/2
@ -42,10 +39,10 @@ def averager(type):
return averagePrice, timestamp return averagePrice, timestamp
def getVol(type): def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type) bitV, bitA, bitB = bitstampVolAskBid(type, log)
kV, kA, kB = krakenVolAskBid(type) kV, kA, kB = krakenVolAskBid(type, log)
bV, bA, bB = bitfinexVolAskBid(type) bV, bA, bB = bitfinexVolAskBid(type, log)
gV, gA, gB = geminiVolAskBid(type) gV, gA, gB = geminiVolAskBid(type, log)
v_array = np.array([bitV, kV, bV, gV]) v_array = np.array([bitV, kV, bV, gV])
@ -54,8 +51,8 @@ def getVol(type):
return volume return volume
def getHighLow(type): def getHighLow(type):
kH, kL = krakenHighLow(type) kH, kL = krakenHighLow(type, log)
bH, bL = bitstampHighLow(type) bH, bL = bitstampHighLow(type, log)
h_array = np.array([kH, bH]) h_array = np.array([kH, bH])
l_array = np.array([kL, bL]) l_array = np.array([kL, bL])
@ -66,8 +63,8 @@ def getHighLow(type):
return high, low return high, low
def getOpenClose(type): def getOpenClose(type):
bO, bC = bitstampOpenClose(type) bO, bC = bitstampOpenClose(type, log)
kO, kC = krakenOpenClose(type) kO, kC = krakenOpenClose(type, log)
o_array = np.array([bO, kO]) o_array = np.array([bO, kO])
c_array = np.array([bC, kC]) c_array = np.array([bC, kC])
@ -77,32 +74,32 @@ def getOpenClose(type):
return open, close return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): # def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
with open(btc_usd, 'r') as file: # status, response = send(query, log)
data = file.read()
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) # if status != 200:
timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' # log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
type = '"'+c_type+'"' # log("With Response of : {}".format(response), 'ERR')
# else:
query = data % (timestamp, type, round(av_price, 2), # log("Query executed successfully with Status = {}".format(status), 'INFO')
round(high, 2), # log("With Response of : {}".format(response), 'INFO')
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
status, response = send(query, log)
if status != 200:
log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
log("With Response of : {}".format(response), 'ERR')
else:
log("Query executed successfully with Status = {}".format(status), 'INFO')
log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price): def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
@ -122,10 +119,12 @@ def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to PricingSave queue", 'INFO') syncId = uuid.uuid4()
activeMQSender(messageJson) log("Sending message to PricingSave queue", 'INFO', syncId)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson, syncId)
def timeFunction(): def timeFunction():
global time global time
@ -146,8 +145,6 @@ def collector(c_type):
global time global time
time = timeFunction() time = timeFunction()
print(time)
av_price, timestamp = averager(c_type) av_price, timestamp = averager(c_type)
vol = getVol(c_type) vol = getVol(c_type)
high, low = getHighLow(c_type) high, low = getHighLow(c_type)
@ -158,9 +155,11 @@ def collector(c_type):
schedule.every().hour.at(time).do(collector, c_type).tag("collector") schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
# Dynamically Spin up Child process for each type wanting to track # Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type): def collectorMain(c_type):
log("== Historical Price Collector ==", 'INFO') log("Starting Historical Price Collector", 'INFO')
collector(c_type) collector(c_type)

View File

@ -2,9 +2,7 @@
import requests, json, sys import requests, json, sys
from src.utils.jsonLogger import log def bitfinexPublicTicker(type, log):
def bitfinexPublicTicker(type):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -19,7 +17,7 @@ def bitfinexPublicTicker(type):
price = 0 price = 0
return price return price
def bitfinexVolAskBid(type): def bitfinexVolAskBid(type, log):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')

View File

@ -2,9 +2,7 @@
import requests, json, sys import requests, json, sys
from src.utils.jsonLogger import log def bitstampVolAskBid(type, log):
def bitstampVolAskBid(type):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -19,7 +17,7 @@ def bitstampVolAskBid(type):
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR') log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0 return 0, 0, 0
def bitstampOpenClose(type): def bitstampOpenClose(type, log):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -33,7 +31,7 @@ def bitstampOpenClose(type):
log("Bitstamp Open Close Error: {}".format(e), 'ERR') log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0 return 0, 0
def bitstampHighLow(type): def bitstampHighLow(type, log):
try: try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri) response = requests.request("GET", uri)

View File

@ -4,15 +4,13 @@ import sys, os
from coinbase.wallet.client import Client from coinbase.wallet.client import Client
from src.utils.jsonLogger import log
class keys(): class keys():
def __init__(self): def __init__(self):
self.api_key = os.getenv('COINBASE_KEY') self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET") self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type): def coinbasePublicTicker(type, log):
api_key = keys().api_key api_key = keys().api_key
api_secret = keys().api_secret api_secret = keys().api_secret

View File

@ -2,9 +2,7 @@
import requests, json, sys import requests, json, sys
from src.utils.jsonLogger import log def geminiPublicTicker(type, log):
def geminiPublicTicker(type):
try: try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -21,7 +19,7 @@ def geminiPublicTicker(type):
price = 0 price = 0
return price return price
def geminiVolAskBid(type): def geminiVolAskBid(type, log):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -41,7 +39,7 @@ def geminiVolAskBid(type):
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR') log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0 return 0, 0, 0
def geminiDailyOpenClose(type): def geminiDailyOpenClose(type, log):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)

View File

@ -2,8 +2,6 @@
import requests, json, sys import requests, json, sys
from src.utils.jsonLogger import log
def krakenCalculateOHLC(response): def krakenCalculateOHLC(response):
open, high, low, close = 0, 0, 0, 0 open, high, low, close = 0, 0, 0, 0
@ -23,7 +21,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose return fopen, fhigh, flow, fclose
def krakenVolAskBid(type): def krakenVolAskBid(type, log):
try: try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -42,7 +40,7 @@ def krakenVolAskBid(type):
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR') log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0 return 0, 0, 0
def krakenOpenClose(type): def krakenOpenClose(type, log):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)
@ -58,7 +56,7 @@ def krakenOpenClose(type):
log("Kraken Open Close Error: {}".format(e), 'ERR') log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0, 0 return 0, 0, 0
def krakenHighLow(type): def krakenHighLow(type, log):
try: try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri) response = requests.request("GET", uri)

View File

@ -1 +0,0 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message): def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave", message, content_type="application/json", headers={"Content-Type":"application/json"}) con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -8,10 +8,10 @@ import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter): class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict): def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict) super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('timestamp'): if not log_record.get('@timestamp'):
# this doesn't use record.created, so it is slightly off # this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ') now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['timestamp'] = now log_record['@timestamp'] = now
if log_record.get('level'): if log_record.get('level'):
log_record['level'] = log_record['level'].upper() log_record['level'] = log_record['level'].upper()
else: else:
@ -23,18 +23,18 @@ def setup_logging(log_level='INFO'):
logger.setLevel(log_level) logger.setLevel(log_level)
logHandler = logging.StreamHandler() logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(timestamp)s %(level)s %(name)s %(message)s') formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level): def log(message, level, syncId=""):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message) logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN': elif level == 'WARN':
logger.warn(message) logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR': elif level == 'ERR':
logger.error(message) logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message) logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})