[09.07.20] Linkerd injection, works for DO docker registry extra exchanges, removal of ask and bid

This commit is contained in:
andrewso 2020-07-09 20:04:24 +01:00
parent f7ae028926
commit c7609ff6cd
11 changed files with 269 additions and 125 deletions

View File

@ -1,6 +1,8 @@
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
annotations:
linkerd.io/inject: enabled
labels: labels:
name: LABEL name: LABEL
name: RESOURCE_NAME name: RESOURCE_NAME
@ -90,6 +92,11 @@ spec:
limits: limits:
cpu: 25m cpu: 25m
memory: 32Mi memory: 32Mi
securityContext:
capabilities:
add:
- NET_ADMIN
- NET_RAW
restartPolicy: Always restartPolicy: Always
imagePullSecrets: imagePullSecrets:
- name: registry-secret - name: registry-cryptosky-image-registry

View File

@ -7,25 +7,21 @@ env.GIT_REPOSITORY_PATH = "github.com/andyjk15/${env.APPLICATION_NAME}.git"
env.GIT_REPOSITORY_URL = "https://${env.GIT_REPOSITORY_PATH}" env.GIT_REPOSITORY_URL = "https://${env.GIT_REPOSITORY_PATH}"
env.GITHUB_CREDENTIALS_ID = 'Github' env.GITHUB_CREDENTIALS_ID = 'Github'
env.DIGITAL_OCEAN = 'registry.digitalocean.com' env.DIGITAL_OCEAN = 'registry.digitalocean.com'
env.DIGITAL_OCEAN_REPO = 'cryptosky-image-registry'
env.DOCKER_BUILDER = 'registry.cryptosky.me' env.DOCKER_BUILDER = 'registry.cryptosky.me'
env.DOCKER_REPOSITORY = "${env.DIGITAL_OCEAN}/cryptosky-image-registry" env.DOCKER_REPOSITORY = "${env.DIGITAL_OCEAN}/${env.DIGITAL_OCEAN_REPO}"
env.DOCKER_REPOSITORY_TCP = "tcp://${env.DOCKER_BUILDER}:4243" env.DOCKER_REPOSITORY_TCP = "tcp://${env.DOCKER_BUILDER}:4243"
env.NAMESPACE = 'production' env.NAMESPACE = 'production'
env.SLAVE_LABEL = "cryptosky-aio-build" env.SLAVE_LABEL = "cryptosky-aio-build"
def mvn( String gloals ) {
sh "mvn -s configuration/settings.xml --show-version --batch-mode ${gloals}"
}
String get_application_version() { String get_application_version() {
"1.0.0-b${env.BUILD_NUMBER}" "1.0.0-b${env.BUILD_NUMBER}"
} }
String executeShellScript( String shellPath, String arg1 = '', String arg2 = '', String arg3 = '', String arg4 = '' ) { String executeShellScript( String shellPath, String arg1 = '', String arg2 = '', String arg3 = '', String arg4 = '', String arg5 = '' ) {
sh "./${shellPath} ${arg1} ${arg2} ${arg3} ${arg4}" sh "./${shellPath} ${arg1} ${arg2} ${arg3} ${arg4} ${arg5}"
} }
try { try {
@ -45,7 +41,7 @@ try {
) { ) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}" sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login" sh "doctl registry login"
// sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster" sh "doctl kubernetes cluster kubeconfig save cryptosky-kubernetes-cluster"
} }
} }
@ -61,7 +57,8 @@ try {
// mvn 'clean package -DskipTests' // mvn 'clean package -DskipTests'
executeShellScript("configuration/scripts/mapVarsToConfigs.sh", executeShellScript("configuration/scripts/mapVarsToConfigs.sh",
env.DOCKER_REPOSITORY, env.DIGITAL_OCEAN,
env.DIGITAL_OCEAN_REPO,
env.APPLICATION_NAME, env.APPLICATION_NAME,
env.APPLICATION_VERSION, env.APPLICATION_VERSION,
env.APPLICATION_LABEL) env.APPLICATION_LABEL)
@ -94,8 +91,8 @@ try {
} }
stage('Deploy') { stage('Deploy') {
// executeShellScript("configuration/scripts/deployToKubernetes.sh", executeShellScript("configuration/scripts/deployToKubernetes.sh",
// env.APPLICATION_NAME) env.APPLICATION_NAME)
} }
} }
} }

View File

@ -1,9 +1,12 @@
#!/usr/bin/env bash #!/usr/bin/env bash
DOCKER_REPOSITORY=$1 DIGITAL_OCEAN=$1
APPLICATION_NAME=$2 DIGITAL_OCEAN_REPO=$2
APPLICATION_VERSION=$3 APPLICATION_NAME=$3
APPLICATION_LABEL=$4 APPLICATION_VERSION=$4
APPLICATION_LABEL=$5
DOCKER_REPOSITORY="${DIGITAL_OCEAN}\/${DIGITAL_OCEAN_REPO}"
sed -i "s/REPOSITORY/${DOCKER_REPOSITORY}/g" configuration/kubernetes/deployment.yaml sed -i "s/REPOSITORY/${DOCKER_REPOSITORY}/g" configuration/kubernetes/deployment.yaml
sed -i "s/IMAGE/${APPLICATION_NAME}:${APPLICATION_VERSION}/g" configuration/kubernetes/deployment.yaml sed -i "s/IMAGE/${APPLICATION_NAME}:${APPLICATION_VERSION}/g" configuration/kubernetes/deployment.yaml

View File

@ -4,12 +4,12 @@ import sys
sys.path.append('/home/price-collector/') sys.path.append('/home/price-collector/')
from threading import Thread from threading import Thread
from pricing.collector import collector from pricing.collector import collectorMain
from probes.probes import runFlaskProbes from probes.probes import runFlaskProbes
def callCollector(args): def callCollector(args):
collector(args) collectorMain(args)
def callProbes(): def callProbes():
runFlaskProbes() runFlaskProbes()

View File

@ -9,15 +9,15 @@ def bitfinexPublicTicker(type, logger):
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
price = (float(response[0][7])+ float(response[0][9]) + float(response[0][10]))/3 price = (float(response[0][7])+ float(response[0][1]) + float(response[0][3]))/3
price = round(price, 3) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
logger.error("Bitfinex Spot Price Error: {}".format(e)) logger.error("Bitfinex Spot Price Error: {}".format(e))
price = 0 price = 0
return price return price
def bitfinexHighLowVol(type, logger): def bitfinexVolAskBid(type, logger):
try: try:
uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '') uri = "https://api.bitfinex.com/v2/tickers?symbols=" + "t"+type.upper().replace('_', '')
@ -25,14 +25,12 @@ def bitfinexHighLowVol(type, logger):
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
high = round(float(response[0][9]), 3) vol = round(float((response[0][2])+response[0][4])/2, 2)
low = round(float(response[0][10]), 3)
vol = round(float((response[0][8]))/24, 3)
ask = round(float(response[0][1]), 3) ask = round(float(response[0][1]), 2) # Hourly High
bid = round(float(response[0][3]), 3) bid = round(float(response[0][3]), 2) # Hourly Low
return high, low, vol, ask, bid return vol, ask, bid
except KeyError as e: except KeyError as e:
logger.error("Bitfinex High Low Volume Error: {}".format(e)) logger.error("Bitfinex High Low Volume Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0

46
src/pricing/bitstamp.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/env python
import requests, json, sys
def bitstampVolAskBid(type, logger):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
response = json.loads(response.text)
ask = round(float(response['ask']), 2)
bid = round(float(response['bid']), 2)
vol = round(float(response['volume']), 2)
return vol, ask, bid
except KeyError as e:
logger.error("Bitstamp Volume Ask Bid Error: {}".format(e))
return 0, 0, 0
def bitstampOpenClose(type, logger):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
response = json.loads(response.text)
open = round(float(response['open']), 2)
close = round(float(response['last']), 2)
return open, close
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0
def bitstampHighLow(type, logger):
try:
uri = "https://www.bitstamp.net/api/v2/ticker_hour/" + type.lower().replace('_', '') + "/"
response = requests.request("GET", uri)
response = json.loads(response.text)
high = round(float(response['high']), 2)
low = round(float(response['low']), 2)
return high, low
except KeyError as e:
logger.error("Bitstamp Open Close Error: {}".format(e))
return 0, 0, 0

View File

@ -21,7 +21,7 @@ def coinbasePublicTicker(type, logger):
client = Client(api_key, api_secret) client = Client(api_key, api_secret)
repsonse = client.get_spot_price(currency_pair = type) repsonse = client.get_spot_price(currency_pair = type)
price = (float(repsonse['amount'])) price = (float(repsonse['amount']))
price = round(price, 3) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
logger.error("Coinbase Spot Price Error: {}".format(e)) logger.error("Coinbase Spot Price Error: {}".format(e))

View File

@ -1,15 +1,19 @@
#!/usr/bin/env python #!/usr/bin/env python
from datetime import datetime, timedelta import datetime
from time import sleep from time import sleep
import sys import schedule
from pricing.bitfinex import bitfinexPublicTicker, bitfinexHighLowVol import numpy as np
from pricing.bitfinex import bitfinexPublicTicker, bitfinexVolAskBid
from pricing.coinbase import coinbasePublicTicker from pricing.coinbase import coinbasePublicTicker
from pricing.gemini import geminiPublicTicker, geminiHighLowVol, geminiOpenClose from pricing.gemini import geminiPublicTicker, geminiVolAskBid, geminiDailyOpenClose
from pricing.bitstamp import bitstampVolAskBid, bitstampHighLow, bitstampOpenClose
from pricing.kraken import krakenVolAskBid, krakenHighLow, krakenOpenClose
from src.utils.databaseConnect import send from utils.databaseConnect import send
import logging as logger import logging as logger
@ -19,72 +23,62 @@ logger.basicConfig(
datefmt='%Y-%m-%d %H:%M:%S' datefmt='%Y-%m-%d %H:%M:%S'
) )
btc_usd="/home/price-collector/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" btc_usd="src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql"
def averager(type): def averager(type):
timestamp = datetime.now()# + timedelta(hours=1) timestamp = datetime.datetime.now()# + timedelta(hours=1)
coinbase_P = coinbasePublicTicker(type, logger) bitstampH, bitstampL = bitstampHighLow(type, logger)
bitfinex_P = bitfinexPublicTicker(type, logger) krakenH, krakenL = krakenHighLow(type, logger)
gemini_P = geminiPublicTicker(type, logger)
if coinbase_P == 0 or bitfinex_P == 0 or gemini_P == 0: bitstamp_P = (bitstampH + bitstampL)/2
if coinbase_P and bitfinex_P == 0: kraken_P = (krakenH + krakenL)/2
averagePrice = gemini_P
return
elif coinbase_P and gemini_P == 0:
averagePrice = bitfinex_P
return
elif bitfinex_P and gemini_P == 0:
averagePrice = coinbase_P
return
averagePrice = (coinbase_P + bitfinex_P + gemini_P)/2
else:
averagePrice = (coinbase_P + bitfinex_P + gemini_P)/3
averagePrice = round(averagePrice, 3) av_array = np.array([bitstamp_P, kraken_P])
averagePrice = av_array[np.nonzero(av_array)].mean()
averagePrice = round(averagePrice, 2)
logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice)) logger.info("Hourly Price for ({}) is {}".format(timestamp ,averagePrice))
return averagePrice, timestamp return averagePrice, timestamp
def getHighLowVol(type): def getVol(type):
bH, bL, bV, bA, bB = bitfinexHighLowVol(type, logger) bitV, bitA, bitB = bitstampVolAskBid(type, logger)
gH, gL, gV, gA, gB = geminiHighLowVol(type, logger) kV, kA, kB = krakenVolAskBid(type, logger)
if ( bH == 0 or bL == 0 or bV == 0 or bA == 0, bB == 0 ) \ v_array = np.array([bitV, kV])
or ( gH == 0 or gL == 0 or gL == 0 or gA == 0 or gB == 0):
if bH == 0:
high = gH
elif gH == 0:
high = bH
if bL == 0:
low = gL
elif gL == 0:
low = bL
if bV == 0:
vol = gV
elif gV == 0:
vol = bV
if bA == 0:
ask = gA
elif gA == 0:
ask = bA
if bB == 0:
bid = gB
elif gB == 0:
bid = bB
else:
high = (bH + gH)/2
low = (bL + gL)/2
vol = (bV + gV)/2
ask = (bA + gA)/2
bid = (bB + gB)/2
return high, low, vol, ask, bid volume = v_array[np.nonzero(v_array)].mean()
def sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price, c_price): return volume
def getHighLow(type):
kH, kL = krakenHighLow(type, logger)
bH, bL = bitstampHighLow(type, logger)
h_array = np.array([kH, bH])
l_array = np.array([kL, bL])
high = h_array[np.nonzero(h_array)].mean()
low = l_array[np.nonzero(l_array)].mean()
return high, low
def getOpenClose(type):
bO, bC = bitstampOpenClose(type, logger)
kO, kC = krakenOpenClose(type, logger)
o_array = np.array([bO, kO])
c_array = np.array([bC, kC])
open = o_array[np.nonzero(o_array)].mean()
close = c_array[np.nonzero(c_array)].mean()
return open, close
def sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price):
with open(btc_usd, 'r') as file: with open(btc_usd, 'r') as file:
data = file.read() data = file.read()
@ -92,32 +86,60 @@ def sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price
timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"' timestamp = '"'+timestamp.strftime('%Y-%m-%dT%H:%M:%S')+'"'
type = '"'+c_type+'"' type = '"'+c_type+'"'
query = data % (timestamp, type, av_price, high, low, ask, bid, o_price, c_price, vol) query = data % (timestamp, type, round(av_price, 2),
round(high, 2),
round(low, 2),
round(o_price, 2),
round(c_price, 2),
round(vol, 2))
logger.info("Query sending down to db-gateway -- ({})".format(query)) logger.info("Query sending down to db-gateway -- ({})".format(query))
status, response = send(query, logger) # status, response = send(query, logger)
#
# if status != 200:
# logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status))
# logger.error("With Response of : {}".format(response))
# else:
# logger.info("Query executed successfully with Status = {}".format(status))
# logger.info("With Response of : {}".format(response))
if status != 200: def timeFunction():
logger.critical("Query wasn't executed properly, view logs. Status = {}".format(status)) global time
logger.error("With Response of : {}".format(response))
else:
logger.info("Query executed successfully with Status = {}".format(status))
logger.info("With Response of : {}".format(response))
def getOpenClose(type): time = datetime.datetime.now()
open, close = geminiOpenClose(type, logger) time = time + datetime.timedelta(hours = 1)
return open, close
time = str(time)
time = ":".join(time.split(":", 2)[:2])
time = time.split(" ")[1].lstrip().split(" ")[0]
return time
# Dynamically Spin up Child process for each type wanting to track
def collector(c_type): def collector(c_type):
logger.info("== Historical Price Collector ==") schedule.clear("collector")
global time
time = timeFunction()
print(time)
while True:
av_price, timestamp = averager(c_type) av_price, timestamp = averager(c_type)
high, low, vol, ask, bid = getHighLowVol(c_type) vol = getVol(c_type)
high, low = getHighLow(c_type)
o_price, c_price = getOpenClose(c_type) o_price, c_price = getOpenClose(c_type)
sendToGateway(c_type, timestamp, av_price, high, low, vol, ask, bid, o_price, c_price) sendToGateway(c_type, timestamp, av_price, high, low, vol, o_price, c_price)
sleep(3596) schedule.every().hour.at(time).do(collector, c_type).tag("collector")
# Dynamically Spin up Child process for each type wanting to track
def collectorMain(c_type):
logger.info("== Historical Price Collector ==")
collector(c_type)
while True:
schedule.run_pending()
sleep(1)

View File

@ -11,7 +11,7 @@ def geminiPublicTicker(type, logger):
response = json.loads(response.text) response = json.loads(response.text)
price = (float(response['last']) + float(response['ask']) + float(response['bid']))/3 price = (float(response['last']) + float(response['ask']) + float(response['bid']))/3
price = round(price, 3) price = round(price, 2)
return price return price
except KeyError as e: except KeyError as e:
@ -19,36 +19,34 @@ def geminiPublicTicker(type, logger):
price = 0 price = 0
return price return price
def geminiHighLowVol(type, logger): def geminiVolAskBid(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
high = float(response['high']) ask = round(float(response['ask']), 2)
low = float(response['low']) bid = round(float(response['bid']), 2)
ask = float(response['ask'])
bid = float(response['bid'])
uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v1/pubticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
vol = float(response['volume']['BTC'])/24
return high, low, vol, ask, bid vol = round(float(response['volume']['BTC'])/24, 2)
return vol, ask, bid
except KeyError as e: except KeyError as e:
logger.error("Gemini High Low Volume Error: {}".format(e)) logger.error("Gemini Volume Ask Bid Error: {}".format(e))
return 0, 0, 0 return 0, 0, 0
def geminiOpenClose(type, logger): def geminiDailyOpenClose(type, logger):
try: try:
uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '') uri = "https://api.gemini.com/v2/ticker/" + type.lower().replace('_', '')
response = requests.request("GET", uri) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
open = float(response['open']) dailyOpen = round(float(response['open']), 2)
close = float(response['close']) dailyClose = round(float(response['close']), 2)
return open, close return open, close
except KeyError as e: except KeyError as e:

73
src/pricing/kraken.py Normal file
View File

@ -0,0 +1,73 @@
#!/usr/bin/env python
import requests, json, sys
def krakenCalculateOHLC(response):
open, high, low, close = 0, 0, 0, 0
keys = list(response["result"].keys())
size = len(response["result"][keys[0]])
for value in response["result"][keys[0]]:
open = open + float(value[1])
high = high + float(value[2])
low = low + float(value[3])
close = close + float(value[4])
fopen = open / size
fhigh = high / size
flow = low / size
fclose = close / size
return fopen, fhigh, flow, fclose
def krakenVolAskBid(type, logger):
try:
uri = "https://api.kraken.com/0/public/Ticker?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT')
response = requests.request("GET", uri)
response = json.loads(response.text)
keys = list(response["result"].keys())
ask = round(float(response["result"][keys[0]]['a'][0]), 2)
bid = round(float(response["result"][keys[0]]['b'][0]), 2)
vol = round(float(response["result"][keys[0]]['a'][2]), 2) + \
round(float(response["result"][keys[0]]['b'][2]), 2)
return vol, ask, bid
except KeyError as e:
logger.error("Kraken Volume Ask Bid Error: {}".format(e))
return 0, 0, 0
def krakenOpenClose(type, logger):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
response = json.loads(response.text)
open, high, low, close = krakenCalculateOHLC(response)
fopen = round(open, 2)
fclose = round(close, 2)
return fopen, fclose
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0, 0
def krakenHighLow(type, logger):
try:
uri = "https://api.kraken.com/0/public/OHLC?pair=" + type.upper().replace('_', '').replace('BTC', 'XBT') + "&?interval\=60"
response = requests.request("GET", uri)
response = json.loads(response.text)
open, high, low, close = krakenCalculateOHLC(response)
fhigh = round(high, 2)
flow = round(low, 2)
return fhigh, flow
except KeyError as e:
logger.error("Kraken Open Close Error: {}".format(e))
return 0, 0

View File

@ -1 +1 @@
mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, ask_price: %f, bid_price: %f, open_price: %f, close_price: %f, volume: %f){ id } } mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } }