Compare commits

...

61 Commits

Author SHA1 Message Date
andrewso
2a2acbef03 [11.10.20] Passing SynId through AMQ and logging, removed graphql query 2020-10-12 15:05:32 +01:00
andrewso
d39996da0a [11.10.20] Passing SynId through AMQ and logging, removed graphql query 2020-10-12 14:48:52 +01:00
andrewso
abd53bce27 [11.10.20] syncId generation 2020-10-12 14:01:51 +01:00
andrewso
aeb332c15a [10.10.20] Reduced CPU limit 2020-10-11 14:07:09 +01:00
andrewso
d44ae664e8 [10.10.20] Reduced CPU limit 2020-10-11 14:03:03 +01:00
andrewso
25a01025b8 [07.10.20] Log timestamp with @ 2020-10-07 10:03:53 +01:00
366ea11345 Revert "[06.10.20] Fixed some logging"
This reverts commit aa8b22a8
2020-10-06 22:52:30 +01:00
d727655cdf Revert "[06.10.20] Log timing and amq error handling"
This reverts commit 2f94bac9
2020-10-06 22:52:19 +01:00
andrewso
2f94bac950 [06.10.20] Log timing and amq error handling 2020-10-06 18:57:32 +01:00
andrewso
aa8b22a843 [06.10.20] Fixed some logging 2020-10-06 16:58:12 +01:00
andrewso
996f4a45df [06.10.20] Fixed some logging 2020-10-06 16:01:08 +01:00
andrewso
39e4eee1ec [06.10.20] forgot to include package 2020-10-06 15:10:23 +01:00
andrewso
1bc359941b [06.10.20] forgot to include package 2020-10-06 15:06:46 +01:00
andrewso
3fecb1b491 [06.10.20] Structured logging for Kibana monitoring 2020-10-06 14:38:36 +01:00
bf662f2c75 [26.09.2020] pipeline changes 2020-09-26 19:08:31 +01:00
79dd6881af [26.09.2020] Removed get pods function from deployment script 2020-09-26 19:07:21 +01:00
45aef7a228 [25.09.2020] Changed to actual consumer queue 2020-09-25 13:58:30 +01:00
e035c4cf71 [23.09.2020] sending testing 2020-09-23 23:37:19 +01:00
9cdb599cf4 [23.09.2020] sending testing 2020-09-23 23:32:13 +01:00
ea57723a3c [23.09.2020] sending testing 2020-09-23 23:29:37 +01:00
cd7870f3dd [23.09.2020] sending testing 2020-09-23 23:26:02 +01:00
4ecadfc7cd [23.09.2020] sending testing 2020-09-23 23:22:51 +01:00
cef7b771ca [23.09.2020] sending testing 2020-09-23 23:05:39 +01:00
2feccc3a29 [23.09.2020] sending testing 2020-09-23 23:01:36 +01:00
b85dd30dc3 [23.09.2020] sending testing 2020-09-23 22:57:42 +01:00
774c17d6e8 [23.09.2020] sending testing 2020-09-23 22:51:24 +01:00
a3360dda67 [23.09.2020] sending testing 2020-09-23 22:46:13 +01:00
328e6b200b [23.09.2020] sending testing 2020-09-23 22:41:31 +01:00
1ab04cae3e [23.09.2020] sending testing 2020-09-23 22:27:21 +01:00
2010a0134f [23.09.2020] sending testing 2020-09-23 22:23:54 +01:00
80b3485138 [23.09.2020] sending testing 2020-09-23 22:18:38 +01:00
fcc9512515 [23.09.2020] sending testing 2020-09-23 22:14:43 +01:00
befc693db3 [23.09.2020] sending testing 2020-09-23 22:10:15 +01:00
eced8d59b6 [23.09.2020] sending testing 2020-09-23 22:01:51 +01:00
f127f20a1f [23.09.2020] sending testing 2020-09-23 21:47:26 +01:00
bdbf88cde1 [23.09.2020] sending testing 2020-09-23 21:42:31 +01:00
7e167cfe7e [23.09.2020] Convert Dict to String 2020-09-23 21:31:27 +01:00
bf78fb7c40 [23.09.2020] Convert Dict to Json 2020-09-23 21:16:20 +01:00
89c110ac38 [23.09.2020] class fix 2020-09-23 21:00:30 +01:00
d8ca3ee20d [23.09.2020] ConfigMap - Secret 2020-09-23 20:20:43 +01:00
b6ec9ec1f1 [23.09.2020] stomp.py instead of stomp 2020-09-23 20:02:34 +01:00
27cf19f7b1 [23.09.2020] Keep pip updated 2020-09-23 19:41:10 +01:00
1bd4e89a68 [23.09.2020] Changed Cluster name due to cluster rebuild 2020-09-23 19:31:02 +01:00
14e340b724 [23.09.2020] Added STOMP configuration for sending message to Artemis rather than to graphQL of dbGateway 2020-09-23 19:28:09 +01:00
andrewso
36cec493b0 [14.07.20] New folders and linkerd injection 2020-07-14 10:03:02 +01:00
andrewso
4853a0238d [10.07.20] Increased limits 2020-07-11 17:04:31 +01:00
andrewso
137df14462 [10.07.20] Uncomment 2020-07-11 00:11:02 +01:00
andrewso
cb287aee83 [10.07.20] Zeroing out timestamp to always be on the hour 2020-07-10 23:32:29 +01:00
andrewso
e7f8964c62 [10.07.20] Bigger Volumes 2020-07-10 20:09:41 +01:00
andrewso
98da51a404 [10.07.20] spin up pod again 2020-07-10 19:53:50 +01:00
andrewso
beab7a3ca8 [10.07.20] Kill pod 2020-07-10 19:49:08 +01:00
andrewso
87a760f4f5 [10.07.20] Up pod 2020-07-10 16:00:23 +01:00
andrewso
a6bcc9a34d [10.07.20] Kill pod again 2020-07-10 14:12:58 +01:00
andrewso
8899f7138e [10.07.20] Moved building of image to build step 2020-07-10 13:15:15 +01:00
andrewso
0c9025e564 [10.07.20] Forgot numpy import 2020-07-10 12:08:46 +01:00
andrewso
44fa6b23f2 [10.07.20] Schedule import 2020-07-10 12:05:38 +01:00
andrewso
7784b587e9 [10.07.20] Spin up replicas 2020-07-10 12:01:56 +01:00
andrewso
82c8ea8742 [10.07.20] Reset replicas 2020-07-10 11:25:27 +01:00
andrewso
0d54db0183 [10.07.20] Spin up replicas 2020-07-10 11:16:11 +01:00
andrewso
883045beec [09.07.20] Paths 2020-07-09 20:14:58 +01:00
andrewso
a3550c61a9 [09.07.20] TCP protocol 2020-07-09 20:12:48 +01:00
19 changed files with 217 additions and 120 deletions

View File

@ -2,8 +2,9 @@ FROM python:3.7-alpine
MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com>
RUN apk update && \
apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \
pip install python-dotenv coinbase flask
pip install python-dotenv coinbase flask schedule numpy stomp.py python-json-logger
COPY . /home/price-collector/.
EXPOSE 9090
CMD ["python", "/home/price-collector/src/main.py"]

View File

@ -8,7 +8,7 @@ metadata:
name: RESOURCE_NAME
namespace: production
spec:
replicas: 0
replicas: 1
selector:
matchLabels:
app: RESOURCE_NAME
@ -19,6 +19,8 @@ spec:
maxUnavailable: 0
template:
metadata:
annotations:
linkerd.io/inject: enabled
labels:
app: RESOURCE_NAME
spec:
@ -40,14 +42,24 @@ spec:
resource: limits.memory
- name: DATABASE_URL
valueFrom:
secretKeyRef:
configMapKeyRef:
name: endpoints
key: dbGateway.url
- name: DATABASE_PORT
- name: AMQ_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: amqStomp.url
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: endpoints
key: dbGateway.port
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
- name: COINBASE_KEY
valueFrom:
secretKeyRef:
@ -60,7 +72,7 @@ spec:
key: coinbase.api.secret
- name: DB_GATEWAY_URL
valueFrom:
secretKeyRef:
configMapKeyRef:
name: endpoints
key: dbGateway.url
ports:
@ -87,11 +99,11 @@ spec:
imagePullPolicy: Always
resources:
requests:
cpu: 10m
memory: 16Mi
limits:
cpu: 25m
memory: 32Mi
limits:
cpu: 25m
memory: 64Mi
securityContext:
capabilities:
add:

View File

@ -10,7 +10,7 @@ spec:
app: RESOURCE_NAME
ports:
- port: 9090
protocol: HTTP
protocol: TCP
targetPort: 9090
sessionAffinity: None
type: ClusterIP

View File

@ -28,7 +28,7 @@ try {
timestamps {
node ("${env.SLAVE_LABEL}") {
stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: 'https://github.com/andyjk15/price-collector.git']]])
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]])
env.APPLICATION_VERSION = get_application_version()
@ -41,20 +41,11 @@ try {
) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
}
}
stage('Test Artifact') {
try {
// mvn 'verify -DskipUTs -DskipTests'
} finally {
// mvn 'test'
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster"
}
}
stage('Build Image') {
// mvn 'clean package -DskipTests'
executeShellScript("configuration/scripts/mapVarsToConfigs.sh",
env.DIGITAL_OCEAN,
@ -63,10 +54,6 @@ try {
env.APPLICATION_VERSION,
env.APPLICATION_LABEL)
}
stage('Tag Repository') {
withDockerServer([uri: "${env.DOCKER_REPOSITORY_TCP}"]) {
docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}")
docker.build("${env.APPLICATION_NAME}:latest")
@ -78,6 +65,10 @@ try {
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
}
}
stage('Tag Repository') {
withCredentials(
[usernamePassword(
credentialsId: env.GITHUB_CREDENTIALS_ID,

View File

@ -5,6 +5,4 @@ APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml
kubectl get pods
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

View File

@ -6,6 +6,8 @@ sys.path.append('/home/price-collector/')
from threading import Thread
from pricing.collector import collectorMain
from src.utils.jsonLogger import setup_logging
from probes.probes import runFlaskProbes
def callCollector(args):
@ -15,6 +17,7 @@ def callProbes():
runFlaskProbes()
if __name__=='__main__':
setup_logging()
Thread(target=callProbes).start()

View File

@ -7,30 +7,24 @@ import schedule
import numpy as np
from pricing.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.coinbase import coinbasePublicTicker
from pricing.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from pricing.exchanges.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.exchanges.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.exchanges.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.exchanges.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from utils.databaseConnect import send
# from src.utils.databaseConnect import send
from src.utils.activemqConnect import activeMQSender
import logging as logger
import json, uuid
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
btc_usd="src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
from src.utils.jsonLogger import log
def averager(type):
timestamp = datetime.datetime.now()# + timedelta(hours=1)
timestamp = datetime.datetime.now() + datetime.timedelta(hours=1)
bitstampH, bitstampL = bitstampHighLow(type, logger)
krakenH, krakenL = krakenHighLow(type, logger)
bitstampH, bitstampL = bitstampHighLow(type, log)
krakenH, krakenL = krakenHighLow(type, log)
bitstamp_P = (bitstampH + bitstampL)/2
kraken_P = (krakenH + krakenL)/2
@ -40,23 +34,25 @@ def averager(type):
averagePrice = round(averagePrice, 2)
logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
log("Hourly Price for ({}) is {}".format(timestamp ,averagePrice), 'INFO')
return averagePrice, timestamp
def getVol(type):
bitV, bitA, bitB = bitstampVolAskBid(type, logger)
kV, kA, kB = krakenVolAskBid(type, logger)
bitV, bitA, bitB = bitstampVolAskBid(type, log)
kV, kA, kB = krakenVolAskBid(type, log)
bV, bA, bB = bitfinexVolAskBid(type, log)
gV, gA, gB = geminiVolAskBid(type, log)
v_array = np.array([bitV, kV])
v_array = np.array([bitV, kV, bV, gV])
volume = v_array[np.nonzero(v_array)].mean()
return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, logger)
kH, kL = krakenHighLow(type, log)
bH, bL = bitstampHighLow(type, log)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
@ -67,8 +63,8 @@ def getHighLow(type):
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, logger)
bO, bC = bitstampOpenClose(type, log)
kO, kC = krakenOpenClose(type, log)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
@ -78,31 +74,57 @@ def getOpenClose(type):
return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file:
data = file.read()
timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"'
query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
logger.info("Query sending down to db-gateway -- ({})".format(query))
# status, response = send(query, logger)
# def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
#
# with open(btc_usd, 'r') as file:
# data = file.read()
#
# strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
# timestamp = '"'+strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
# type = '"'+c_type+'"'
#
# query = data % (timestamp, type, round(av_price, 2),
# round(high, 2),
# round(low, 2),
# round(o_price, 2),
# round(c_price, 2),
# round(vol, 2))
#
# log("Query sending down to db-gateway -- ({})".format(query), 'INFO')
# status, response = send(query, log)
# if status != 200:
# logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
# logger.error("With Response of : {}".format(response))
# log("Query wasn't executed properly, view logs. Status = {}".format(status), 'WARN')
# log("With Response of : {}".format(response), 'ERR')
# else:
# logger.info("Query executed successfully with Status = {}".format(status))
# logger.info("With Response of : {}".format(response))
# log("Query executed successfully with Status = {}".format(status), 'INFO')
# log("With Response of : {}".format(response), 'INFO')
def sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = {
"timestamp" : timestamp,
"type" : c_type,
"average_price" : av_price,
"high_price" : high,
"low_price" : low,
"open_price": o_price,
"close_price" : c_price,
"volume" : vol
}
messageJson = json.dumps(message, indent = 4)
syncId = uuid.uuid4()
log("Sending message to PricingSave queue", 'INFO', syncId)
log("Message: {}".format(message), 'INFO', syncId)
activeMQSender(messageJson, syncId)
def timeFunction():
global time
@ -123,20 +145,21 @@ def collector(c_type):
global time
time = timeFunction()
print(time)
av_price, timestamp = averager(c_type)
vol = getVol(c_type)
high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type)
sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
sentToArtemis(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
# sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
schedule.every().hour.at(time).do(collector, c_type).tag("collector")
log("Collection will run again at {} every hour".format(time), 'INFO')
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
logger.info("== Historical Price Collector ==")
log("Starting Historical Price Collector", 'INFO')
collector(c_type)

View File

View File

@ -2,7 +2,7 @@
import requests, json, sys
def bitfinexPublicTicker(type, logger):
def bitfinexPublicTicker(type, log):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -13,11 +13,11 @@ def bitfinexPublicTicker(type, logger):
price = round(price, 2)
return price
except KeyError as e:
logger.error("Bitfinex Spot Price Error: {}".format(e))
log("Bitfinex Spot Price Error: {}".format(e), 'ERR')
price = 0
return price
def bitfinexVolAskBid(type, logger):
def bitfinexVolAskBid(type, log):
try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -32,5 +32,5 @@ def bitfinexVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Bitfinex High Low Volume Error: {}".format(e))
log("Bitfinex High Low Volume Error: {}".format(e), 'ERR')
return 0, 0, 0

View File

@ -2,7 +2,7 @@
import requests, json, sys
def bitstampVolAskBid(type, logger):
def bitstampVolAskBid(type, log):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -14,10 +14,10 @@ def bitstampVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Bitstamp Volume Ask Bid Error: {}".format(e))
log("Bitstamp Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def bitstampOpenClose(type, logger):
def bitstampOpenClose(type, log):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -28,10 +28,10 @@ def bitstampOpenClose(type, logger):
return open, close
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0
log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0
def bitstampHighLow(type, logger):
def bitstampHighLow(type, log):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
@ -42,5 +42,5 @@ def bitstampHighLow(type, logger):
return high, low
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0
log("Bitstamp Open Close Error: {}".format(e), 'ERR')
return 0, 0

View File

@ -10,7 +10,7 @@ class keys():
self.api_key = os.getenv('COINBASE_KEY')
self.api_secret = os.getenv("COINBASE_SECRET")
def coinbasePublicTicker(type, logger):
def coinbasePublicTicker(type, log):
api_key = keys().api_key
api_secret = keys().api_secret
@ -24,6 +24,6 @@ def coinbasePublicTicker(type, logger):
price = round(price, 2)
return price
except KeyError as e:
logger.error("Coinbase Spot Price Error: {}".format(e))
log("Coinbase Spot Price Error: {}".format(e), 'ERR')
price = 0
return price

View File

@ -2,7 +2,7 @@
import requests, json, sys
def geminiPublicTicker(type, logger):
def geminiPublicTicker(type, log):
try:
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
@ -15,11 +15,11 @@ def geminiPublicTicker(type, logger):
return price
except KeyError as e:
logger.error("Gemini Spot Price Error: {}".format(e))
log("Gemini Spot Price Error: {}".format(e), 'ERR')
price = 0
return price
def geminiVolAskBid(type, logger):
def geminiVolAskBid(type, log):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)
@ -36,20 +36,20 @@ def geminiVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Gemini Volume Ask Bid Error: {}".format(e))
log("Gemini Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def geminiDailyOpenClose(type, logger):
def geminiDailyOpenClose(type, log):
try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri)
response = json.loads(response.text)
dailyOpen = round(float(response['open']), 2)
dailyClose = round(float(response['close']), 2)
open = round(float(response['open']), 2)
close = round(float(response['close']), 2)
return open, close
except KeyError as e:
logger.error("Gemini Open Close Error: {}".format(e))
log("Gemini Open Close Error: {}".format(e), 'ERR')
sys.stdout.flush()
return 0, 0

View File

@ -21,7 +21,7 @@ def krakenCalculateOHLC(response):
return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, logger):
def krakenVolAskBid(type, log):
try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri)
@ -37,10 +37,10 @@ def krakenVolAskBid(type, logger):
return vol, ask, bid
except KeyError as e:
logger.error("Kraken Volume Ask Bid Error: {}".format(e))
log("Kraken Volume Ask Bid Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenOpenClose(type, logger):
def krakenOpenClose(type, log):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
@ -53,10 +53,10 @@ def krakenOpenClose(type, logger):
return fopen, fclose
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0, 0
def krakenHighLow(type, logger):
def krakenHighLow(type, log):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
@ -69,5 +69,5 @@ def krakenHighLow(type, logger):
return fhigh, flow
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
log("Kraken Open Close Error: {}".format(e), 'ERR')
return 0, 0

View File

@ -4,14 +4,7 @@ from flask import Flask
import json
from pricing.coinbase import coinbasePublicTicker
import logging as logger
logger.basicConfig(
level=logger.INFO,
format="%(asctime)s: %(levelname)s -- %(message)s",
datefmt='%Y-%m-%d %H:%M:%S'
)
from pricing.exchanges.coinbase import coinbasePublicTicker
app = Flask(__name__)
@ -22,7 +15,7 @@ def health():
@app.route('/readiness')
def readiness():
# Can it make a call to an exchange?
price = coinbasePublicTicker('btc_usd', logger)
price = coinbasePublicTicker('btc_usd')
if price != 0 :
return json.dumps({

View File

@ -1 +0,0 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }

0
src/utils/__init__.py Normal file
View File

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python
import stomp
import os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("AMQ_URL")
self.amqU = os.getenv("BROKER_USER")
self.amqP = os.getenv("BROKER_PASSWORD")
self.addr = self.uri.split(':')[0]
self.port = self.uri.split(':')[1]
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO', syncId)
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("PricingSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -2,13 +2,14 @@
import requests, os, json, sys
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("DATABASE_URL")
self.port = os.getenv("DATABASE_PORT")
def send(query, logger):
def send(query):
try:
uri = keys().uri + "/graphql"
headers = {'Content-type': 'application/json'}
@ -20,8 +21,8 @@ def send(query, logger):
return statusCode, response
except requests.exceptions.HTTPError as e:
logger.critical("Unable to send data down to db-gateway: {}".format(e))
log("Unable to send data down to db-gateway: {}".format(e), 'ERR')
sys.exit(1)
except requests.exceptions.RequestException as e:
logger.critical("Unable to send data down to db-gateway: {}".format(e))
log("Unable to send data down to db-gateway: {}".format(e), 'ERR')
sys.exit(1)

40
src/utils/jsonLogger.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
import logging
from pythonjsonlogger import jsonlogger
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
def setup_logging(log_level='INFO'):
logger = logging.getLogger(__name__)
logger.propagate = 0
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})