From c7609ff6cd1a2a946d3e86f16a6c11186ddc455b Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Thu, 9 Jul 2020 20:04:24 +0100 Subject: [PATCH] [09.07.20] Linkerd injection, works for DO docker registry extra exchanges, removal of ask and bid --- configuration/kubernetes/deployment.yaml | 9 +- configuration/pipelines/build.groovy | 33 ++-- configuration/scripts/mapVarsToConfigs.sh | 11 +- src/main.py | 4 +- src/pricing/bitfinex.py | 16 +- src/pricing/bitstamp.py | 46 +++++ src/pricing/coinbase.py | 2 +- src/pricing/collector.py | 174 ++++++++++-------- src/pricing/gemini.py | 24 ++- src/pricing/kraken.py | 73 ++++++++ .../V1_INSERT_NEW_PRICE_RECORD_BTC.graphql | 2 +- 11 files changed, 269 insertions(+), 125 deletions(-) create mode 100644 src/pricing/bitstamp.py create mode 100644 src/pricing/kraken.py diff --git a/configuration/kubernetes/deployment.yaml b/configuration/kubernetes/deployment.yaml index e073e6f..f5e51e0 100644 --- a/configuration/kubernetes/deployment.yaml +++ b/configuration/kubernetes/deployment.yaml @@ -1,6 +1,8 @@ apiVersion: apps/v1 kind: Deployment metadata: + annotations: + linkerd.io/inject: enabled labels: name: LABEL name: RESOURCE_NAME @@ -90,6 +92,11 @@ spec: limits: cpu: 25m memory: 32Mi + securityContext: + capabilities: + add: + - NET_ADMIN + - NET_RAW restartPolicy: Always imagePullSecrets: - - name: registry-secret \ No newline at end of file + - name: registry-cryptosky-image-registry \ No newline at end of file diff --git a/configuration/pipelines/build.groovy b/configuration/pipelines/build.groovy index fc921c1..cea4a3c 100644 --- a/configuration/pipelines/build.groovy +++ b/configuration/pipelines/build.groovy @@ -7,25 +7,21 @@ env.GIT_REPOSITORY_PATH = "github.com/andyjk15/${env.APPLICATION_NAME}.git" env.GIT_REPOSITORY_URL = "https://${env.GIT_REPOSITORY_PATH}" env.GITHUB_CREDENTIALS_ID = 'Github' env.DIGITAL_OCEAN = 'registry.digitalocean.com' +env.DIGITAL_OCEAN_REPO = 'cryptosky-image-registry' env.DOCKER_BUILDER = 'registry.cryptosky.me' -env.DOCKER_REPOSITORY = "${env.DIGITAL_OCEAN}/cryptosky-image-registry" +env.DOCKER_REPOSITORY = "${env.DIGITAL_OCEAN}/${env.DIGITAL_OCEAN_REPO}" env.DOCKER_REPOSITORY_TCP = "tcp://${env.DOCKER_BUILDER}:4243" - - env.NAMESPACE = 'production' env.SLAVE_LABEL = "cryptosky-aio-build" -def mvn( String gloals ) { - sh "mvn -s configuration/settings.xml --show-version --batch-mode ${gloals}" -} String get_application_version() { "1.0.0-b${env.BUILD_NUMBER}" } -String executeShellScript( String shellPath, String arg1 = '', String arg2 = '', String arg3 = '', String arg4 = '' ) { - sh "./${shellPath} ${arg1} ${arg2} ${arg3} ${arg4}" +String executeShellScript( String shellPath, String arg1 = '', String arg2 = '', String arg3 = '', String arg4 = '', String arg5 = '' ) { + sh "./${shellPath} ${arg1} ${arg2} ${arg3} ${arg4} ${arg5}" } try { @@ -45,7 +41,7 @@ try { ) { sh "doctl auth init --access-token ${DOCTL_TOKEN}" sh "doctl registry login" -// sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster" + sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster" } } @@ -61,7 +57,8 @@ try { // mvn 'clean package -DskipTests' executeShellScript("configuration/scripts/mapVarsToConfigs.sh", - env.DOCKER_REPOSITORY, + env.DIGITAL_OCEAN, + env.DIGITAL_OCEAN_REPO, env.APPLICATION_NAME, env.APPLICATION_VERSION, env.APPLICATION_LABEL) @@ -71,14 +68,14 @@ try { stage('Tag Repository') { withDockerServer([uri: "${env.DOCKER_REPOSITORY_TCP}"]) { - docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}") - docker.build("${env.APPLICATION_NAME}:latest") + docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}") + docker.build("${env.APPLICATION_NAME}:latest") - sh "docker tag ${env.APPLICATION_NAME}:${env.APPLICATION_VERSION} ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" - sh "docker tag ${env.APPLICATION_NAME}:latest ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" + sh "docker tag ${env.APPLICATION_NAME}:${env.APPLICATION_VERSION} ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" + sh "docker tag ${env.APPLICATION_NAME}:latest ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" - sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" - sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" + sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}" + sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest" } withCredentials( @@ -94,8 +91,8 @@ try { } stage('Deploy') { -// executeShellScript("configuration/scripts/deployToKubernetes.sh", -// env.APPLICATION_NAME) + executeShellScript("configuration/scripts/deployToKubernetes.sh", + env.APPLICATION_NAME) } } } diff --git a/configuration/scripts/mapVarsToConfigs.sh b/configuration/scripts/mapVarsToConfigs.sh index ad52c5d..ed3af80 100755 --- a/configuration/scripts/mapVarsToConfigs.sh +++ b/configuration/scripts/mapVarsToConfigs.sh @@ -1,9 +1,12 @@ #!/usr/bin/env bash -DOCKER_REPOSITORY=$1 -APPLICATION_NAME=$2 -APPLICATION_VERSION=$3 -APPLICATION_LABEL=$4 +DIGITAL_OCEAN=$1 +DIGITAL_OCEAN_REPO=$2 +APPLICATION_NAME=$3 +APPLICATION_VERSION=$4 +APPLICATION_LABEL=$5 + +DOCKER_REPOSITORY="${DIGITAL_OCEAN}\/${DIGITAL_OCEAN_REPO}" sed -i "s/REPOSITORY/${DOCKER_REPOSITORY}/g" configuration/kubernetes/deployment.yaml sed -i "s/IMAGE/${APPLICATION_NAME}:${APPLICATION_VERSION}/g" configuration/kubernetes/deployment.yaml diff --git a/src/main.py b/src/main.py index fbcd602..13ca76d 100644 --- a/src/main.py +++ b/src/main.py @@ -4,12 +4,12 @@ import sys sys.path.append('/home/price-collector/') from threading import Thread -from pricing.collector import collector +from pricing.collector import collectorMain from probes.probes import runFlaskProbes def callCollector(args): - collector(args) + collectorMain(args) def callProbes(): runFlaskProbes() diff --git a/src/pricing/bitfinex.py b/src/pricing/bitfinex.py index 954870b..1fbc123 100644 --- a/src/pricing/bitfinex.py +++ b/src/pricing/bitfinex.py @@ -9,15 +9,15 @@ def bitfinexPublicTicker(type, logger): response = requests.request("GET", uri) response = json.loads(response.text) - price = (float(response[0][7])+ float(response[0][9]) + float(response[0][10]))/3 - price = round(price, 3) + price = (float(response[0][7])+ float(response[0][1]) + float(response[0][3]))/3 + price = round(price, 2) return price except KeyError as e: logger.error("Bitfinex Spot Price Error: {}".format(e)) price = 0 return price -def bitfinexHighLowVol(type, logger): +def bitfinexVolAskBid(type, logger): try: uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') @@ -25,14 +25,12 @@ def bitfinexHighLowVol(type, logger): response = requests.request("GET", uri) response = json.loads(response.text) - high = round(float(response[0][9]), 3) - low = round(float(response[0][10]), 3) - vol = round(float((response[0][8]))/24, 3) + vol = round(float((response[0][2])+response[0][4])/2, 2) - ask = round(float(response[0][1]), 3) - bid = round(float(response[0][3]), 3) + ask = round(float(response[0][1]), 2) # Hourly High + bid = round(float(response[0][3]), 2) # Hourly Low - return high, low, vol, ask, bid + return vol, ask, bid except KeyError as e: logger.error("Bitfinex High Low Volume Error: {}".format(e)) return 0, 0, 0 \ No newline at end of file diff --git a/src/pricing/bitstamp.py b/src/pricing/bitstamp.py new file mode 100644 index 0000000..097794c --- /dev/null +++ b/src/pricing/bitstamp.py @@ -0,0 +1,46 @@ +#!/usr/bin/env python + +import requests, json, sys + +def bitstampVolAskBid(type, logger): + try: + uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" + response = requests.request("GET", uri) + response = json.loads(response.text) + + ask = round(float(response['ask']), 2) + bid = round(float(response['bid']), 2) + vol = round(float(response['volume']), 2) + + return vol, ask, bid + except KeyError as e: + logger.error("Bitstamp Volume Ask Bid Error: {}".format(e)) + return 0, 0, 0 + +def bitstampOpenClose(type, logger): + try: + uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" + response = requests.request("GET", uri) + response = json.loads(response.text) + + open = round(float(response['open']), 2) + close = round(float(response['last']), 2) + + return open, close + except KeyError as e: + logger.error("Bitstamp Open Close Error: {}".format(e)) + return 0, 0, 0 + +def bitstampHighLow(type, logger): + try: + uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/" + response = requests.request("GET", uri) + response = json.loads(response.text) + + high = round(float(response['high']), 2) + low = round(float(response['low']), 2) + + return high, low + except KeyError as e: + logger.error("Bitstamp Open Close Error: {}".format(e)) + return 0, 0, 0 \ No newline at end of file diff --git a/src/pricing/coinbase.py b/src/pricing/coinbase.py index 7f3c8d5..c816d1b 100644 --- a/src/pricing/coinbase.py +++ b/src/pricing/coinbase.py @@ -21,7 +21,7 @@ def coinbasePublicTicker(type, logger): client = Client(api_key, api_secret) repsonse = client.get_spot_price(currency_pair = type) price = (float(repsonse['amount'])) - price = round(price, 3) + price = round(price, 2) return price except KeyError as e: logger.error("Coinbase Spot Price Error: {}".format(e)) diff --git a/src/pricing/collector.py b/src/pricing/collector.py index e24cbd1..460a936 100644 --- a/src/pricing/collector.py +++ b/src/pricing/collector.py @@ -1,15 +1,19 @@ #!/usr/bin/env python -from datetime import datetime, timedelta +import datetime from time import sleep -import sys +import schedule -from pricing.bitfinex import bitfinexPublicTicker, bitfinexHighLowVol +import numpy as np + +from pricing.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid from pricing.coinbase import coinbasePublicTicker -from pricing.gemini import geminiPublicTicker, geminiHighLowVol, geminiOpenClose +from pricing.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose +from pricing.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose +from pricing.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose -from src.utils.databaseConnect import send +from utils.databaseConnect import send import logging as logger @@ -19,72 +23,62 @@ logger.basicConfig( datefmt='%Y-%m-%d %H:%M:%S' ) -btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" +btc_usd="src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" def averager(type): - timestamp = datetime.now()# + timedelta(hours=1) + timestamp = datetime.datetime.now()# + timedelta(hours=1) - coinbase_P = coinbasePublicTicker(type, logger) - bitfinex_P = bitfinexPublicTicker(type, logger) - gemini_P = geminiPublicTicker(type, logger) + bitstampH, bitstampL = bitstampHighLow(type, logger) + krakenH, krakenL = krakenHighLow(type, logger) - if coinbase_P == 0 or bitfinex_P == 0 or gemini_P == 0: - if coinbase_P and bitfinex_P == 0: - averagePrice = gemini_P - return - elif coinbase_P and gemini_P == 0: - averagePrice = bitfinex_P - return - elif bitfinex_P and gemini_P == 0: - averagePrice = coinbase_P - return - averagePrice = (coinbase_P + bitfinex_P + gemini_P)/2 - else: - averagePrice = (coinbase_P + bitfinex_P + gemini_P)/3 + bitstamp_P = (bitstampH + bitstampL)/2 + kraken_P = (krakenH + krakenL)/2 - averagePrice = round(averagePrice, 3) + av_array = np.array([bitstamp_P, kraken_P]) + averagePrice = av_array[np.nonzero(av_array)].mean() + + averagePrice = round(averagePrice, 2) logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice)) return averagePrice, timestamp -def getHighLowVol(type): - bH, bL, bV, bA, bB = bitfinexHighLowVol(type, logger) - gH, gL, gV, gA, gB = geminiHighLowVol(type, logger) +def getVol(type): + bitV, bitA, bitB = bitstampVolAskBid(type, logger) + kV, kA, kB = krakenVolAskBid(type, logger) - if ( bH == 0 or bL == 0 or bV == 0 or bA == 0, bB == 0 ) \ - or ( gH == 0 or gL == 0 or gL == 0 or gA == 0 or gB == 0): - if bH == 0: - high = gH - elif gH == 0: - high = bH - if bL == 0: - low = gL - elif gL == 0: - low = bL - if bV == 0: - vol = gV - elif gV == 0: - vol = bV - if bA == 0: - ask = gA - elif gA == 0: - ask = bA - if bB == 0: - bid = gB - elif gB == 0: - bid = bB - else: - high = (bH + gH)/2 - low = (bL + gL)/2 - vol = (bV + gV)/2 - ask = (bA + gA)/2 - bid = (bB + gB)/2 + v_array = np.array([bitV, kV]) - return high, low, vol, ask, bid + volume = v_array[np.nonzero(v_array)].mean() -def sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price, c_price): + return volume + +def getHighLow(type): + kH, kL = krakenHighLow(type, logger) + bH, bL = bitstampHighLow(type, logger) + + h_array = np.array([kH, bH]) + l_array = np.array([kL, bL]) + + high = h_array[np.nonzero(h_array)].mean() + low = l_array[np.nonzero(l_array)].mean() + + return high, low + +def getOpenClose(type): + bO, bC = bitstampOpenClose(type, logger) + kO, kC = krakenOpenClose(type, logger) + + o_array = np.array([bO, kO]) + c_array = np.array([bC, kC]) + + open = o_array[np.nonzero(o_array)].mean() + close = c_array[np.nonzero(c_array)].mean() + + return open, close + +def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price): with open(btc_usd, 'r') as file: data = file.read() @@ -92,32 +86,60 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' type = '"'+c_type+'"' - query = data % (timestamp, type, av_price, high, low, ask, bid, o_price, c_price, vol) + query = data % (timestamp, type, round(av_price, 2), + round(high, 2), + round(low, 2), + round(o_price, 2), + round(c_price, 2), + round(vol, 2)) logger.info("Query sending down to db-gateway -- ({})".format(query)) - status, response = send(query, logger) + # status, response = send(query, logger) + # + # if status != 200: + # logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status)) + # logger.error("With Response of : {}".format(response)) + # else: + # logger.info("Query executed successfully with Status = {}".format(status)) + # logger.info("With Response of : {}".format(response)) - if status != 200: - logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status)) - logger.error("With Response of : {}".format(response)) - else: - logger.info("Query executed successfully with Status = {}".format(status)) - logger.info("With Response of : {}".format(response)) +def timeFunction(): + global time -def getOpenClose(type): - open, close = geminiOpenClose(type, logger) - return open, close + time = datetime.datetime.now() + time = time + datetime.timedelta(hours = 1) + + time = str(time) + time = ":".join(time.split(":", 2)[:2]) + + time = time.split(" ")[1].lstrip().split(" ")[0] + + return time + +def collector(c_type): + schedule.clear("collector") + + global time + time = timeFunction() + + print(time) + + av_price, timestamp = averager(c_type) + vol = getVol(c_type) + high, low = getHighLow(c_type) + o_price, c_price = getOpenClose(c_type) + + sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price) + + schedule.every().hour.at(time).do(collector, c_type).tag("collector") # Dynamically Spin up Child process for each type wanting to track -def collector(c_type): +def collectorMain(c_type): logger.info("== Historical Price Collector ==") + collector(c_type) + while True: - av_price, timestamp = averager(c_type) - high, low, vol, ask, bid = getHighLowVol(c_type) - o_price, c_price = getOpenClose(c_type) - - sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price, c_price) - - sleep(3596) \ No newline at end of file + schedule.run_pending() + sleep(1) \ No newline at end of file diff --git a/src/pricing/gemini.py b/src/pricing/gemini.py index 91ebe12..ae8c063 100644 --- a/src/pricing/gemini.py +++ b/src/pricing/gemini.py @@ -11,7 +11,7 @@ def geminiPublicTicker(type, logger): response = json.loads(response.text) price = (float(response['last']) + float(response['ask']) + float(response['bid']))/3 - price = round(price, 3) + price = round(price, 2) return price except KeyError as e: @@ -19,36 +19,34 @@ def geminiPublicTicker(type, logger): price = 0 return price -def geminiHighLowVol(type, logger): +def geminiVolAskBid(type, logger): try: uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') response = requests.request("GET", uri) response = json.loads(response.text) - high = float(response['high']) - low = float(response['low']) - - ask = float(response['ask']) - bid = float(response['bid']) + ask = round(float(response['ask']), 2) + bid = round(float(response['bid']), 2) uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') response = requests.request("GET", uri) response = json.loads(response.text) - vol = float(response['volume']['BTC'])/24 - return high, low, vol, ask, bid + vol = round(float(response['volume']['BTC'])/24, 2) + + return vol, ask, bid except KeyError as e: - logger.error("Gemini High Low Volume Error: {}".format(e)) + logger.error("Gemini Volume Ask Bid Error: {}".format(e)) return 0, 0, 0 -def geminiOpenClose(type, logger): +def geminiDailyOpenClose(type, logger): try: uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') response = requests.request("GET", uri) response = json.loads(response.text) - open = float(response['open']) - close = float(response['close']) + dailyOpen = round(float(response['open']), 2) + dailyClose = round(float(response['close']), 2) return open, close except KeyError as e: diff --git a/src/pricing/kraken.py b/src/pricing/kraken.py new file mode 100644 index 0000000..3439c9f --- /dev/null +++ b/src/pricing/kraken.py @@ -0,0 +1,73 @@ +#!/usr/bin/env python + +import requests, json, sys + +def krakenCalculateOHLC(response): + open, high, low, close = 0, 0, 0, 0 + + keys = list(response["result"].keys()) + size = len(response["result"][keys[0]]) + + for value in response["result"][keys[0]]: + open = open + float(value[1]) + high = high + float(value[2]) + low = low + float(value[3]) + close = close + float(value[4]) + + fopen = open / size + fhigh = high / size + flow = low / size + fclose = close / size + + return fopen, fhigh, flow, fclose + +def krakenVolAskBid(type, logger): + try: + uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + response = requests.request("GET", uri) + response = json.loads(response.text) + + keys = list(response["result"].keys()) + + ask = round(float(response["result"][keys[0]]['a'][0]), 2) + bid = round(float(response["result"][keys[0]]['b'][0]), 2) + + vol = round(float(response["result"][keys[0]]['a'][2]), 2) + \ + round(float(response["result"][keys[0]]['b'][2]), 2) + + return vol, ask, bid + except KeyError as e: + logger.error("Kraken Volume Ask Bid Error: {}".format(e)) + return 0, 0, 0 + +def krakenOpenClose(type, logger): + try: + uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" + response = requests.request("GET", uri) + response = json.loads(response.text) + + open, high, low, close = krakenCalculateOHLC(response) + + fopen = round(open, 2) + fclose = round(close, 2) + + return fopen, fclose + except KeyError as e: + logger.error("Kraken Open Close Error: {}".format(e)) + return 0, 0, 0 + +def krakenHighLow(type, logger): + try: + uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60" + response = requests.request("GET", uri) + response = json.loads(response.text) + + open, high, low, close = krakenCalculateOHLC(response) + + fhigh = round(high, 2) + flow = round(low, 2) + + return fhigh, flow + except KeyError as e: + logger.error("Kraken Open Close Error: {}".format(e)) + return 0, 0 \ No newline at end of file diff --git a/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql b/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql index 8f246fb..7a47952 100644 --- a/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql +++ b/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql @@ -1 +1 @@ -mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, ask_price: %f, bid_price: %f, open_price: %f, close_price: %f, volume: %f){ id } } \ No newline at end of file +mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } } \ No newline at end of file