diff --git a/configuration/dbgateway.env b/configuration/dbgateway.env new file mode 100644 index 0000000..3b591ed --- /dev/null +++ b/configuration/dbgateway.env @@ -0,0 +1,2 @@ +URI="http://localhost" +PORT="8080" \ No newline at end of file diff --git a/src/pricing/collector.py b/src/pricing/collector.py index d6f3357..c8d8bfa 100644 --- a/src/pricing/collector.py +++ b/src/pricing/collector.py @@ -3,13 +3,15 @@ import sys, json, os from datetime import datetime, timedelta +from time import sleep + from pricing.bitfinex import bitfinexPublicTicker, bitfinexHighLowVol from pricing.coinbase import coinbasePublicTicker from pricing.gemini import geminiPublicTicker, geminiHighLowVol, geminiOpenClose -from util.databaseConnect import connect, closeConnection +from utils.databaseConnect import send -btc_usd="resources/sql/V1_INSERT_NEW_PRICE_RECORD_BTC.sql" +btc_usd="resources/sql/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql" def getInsertForType(type): if type == "btc_usd": @@ -73,15 +75,16 @@ def getHighLowVol(type): return high, low, vol -def saveToDatabase(type, timestamp, av_price, high, low, vol, open, close): +def sendToGateway(type, timestamp, av_price, high, low, vol, open, close): try: - cur = connect() + with open(btc_usd, 'r') as queryFile: + data = queryFile.read() + query = data % (type, timestamp, av_price, high, low, vol, open, close) - sql = getInsertForType(type) + status, response = send(query) - cur.execute(sql, (type, timestamp, av_price, high, low, open, close, vol)) - - closeConnection(cur) + print("Status: ", status) + print("Response: ", response) return True except BaseException as exception: print("Error: %s" % str(exception)) @@ -103,4 +106,4 @@ def collector(type): high, low, vol = getHighLowVol(type) open, close = getOpenClose(type) - saveToDatabase(type, timestamp, av_price, high, low, vol, open, close) \ No newline at end of file + sendToGateway(type, timestamp, av_price, high, low, vol, open, close) \ No newline at end of file diff --git a/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql b/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql new file mode 100644 index 0000000..7a47952 --- /dev/null +++ b/src/resources/queries/V1_INSERT_NEW_PRICE_RECORD_BTC.graphql @@ -0,0 +1 @@ +mutation { createBtc(createdDate: %s, type: %s, average_price: %f, high_price: %f, low_price: %f, open_price: %f, close_price: %f, volume: %f){ id } } \ No newline at end of file diff --git a/src/resources/sql/V1_INSERT_NEW_PRICE_RECORD_BTC.sql b/src/resources/sql/V1_INSERT_NEW_PRICE_RECORD_BTC.sql deleted file mode 100644 index 4abdf96..0000000 --- a/src/resources/sql/V1_INSERT_NEW_PRICE_RECORD_BTC.sql +++ /dev/null @@ -1,2 +0,0 @@ -INSERT INTO btc_price(timestamp, symbol, av_price, h_price, l_price, o_price, c_price, volume) -VALUES(%s, %s, %f, %f, %f, %f, %f, %f); \ No newline at end of file diff --git a/src/utils/databaseConfig.py b/src/utils/databaseConfig.py deleted file mode 100644 index cd24a93..0000000 --- a/src/utils/databaseConfig.py +++ /dev/null @@ -1,17 +0,0 @@ -#!/usr/bin/env python - -from configparser import ConfigParser - -def config(filename='configuration/database.ini', section='postgresql'): - parser = ConfigParser() - parser.read(filename) - - db = {} - if parser.has_section(section): - params = parser.items(section) - for param in params: - db[param[0]] = param[1] - else: - raise Exception('Section {0} not found in the {1} file'.format(section, filename)) - - return db \ No newline at end of file diff --git a/src/utils/databaseConnect.py b/src/utils/databaseConnect.py index 161be90..cda409a 100644 --- a/src/utils/databaseConnect.py +++ b/src/utils/databaseConnect.py @@ -1,39 +1,26 @@ #!/usr/bin/env python -import psycopg2 -from utils.databaseConfig import config +import requests, os, json -def connect(): - """ Connect to the PostgreSQL database server """ - conn = None - try: - # read connection parameters - params = config() +from dotenv import load_dotenv +from pathlib import Path # python3 only +env_path = Path('.') / 'configuration/dbgateway.env.env' +load_dotenv(dotenv_path=env_path) - # connect to the PostgreSQL server - print('Connecting to the PostgreSQL database...') - conn = psycopg2.connect(**params) +class keys(): - # create a cursor - cur = conn.cursor() + def __init__(self): + self.uri = os.getenv('URI') + self.port = os.getenv("PORT") - # execute a statement - print('PostgreSQL database version:') - cur.execute('SELECT version()') +def send(query): - # display the PostgreSQL database server version - db_version = cur.fetchone() - print(db_version) + uri = keys().uri + ":"+ keys().port + "/graphql" + headers = {'Content-type': 'application/graphql'} - # close the communication with the PostgreSQL - # cur.close() - return cur; - except (Exception, psycopg2.DatabaseError) as error: - print(error) - finally: - if conn is not None: - conn.close() - print('Database connection closed.') + response = requests.post(uri, data=query, headers=headers) -def closeConnection(cur): - cur.close() \ No newline at end of file + statusCode = json.loads(response.status_code) + response = json.loads(response.text) + + return statusCode, response \ No newline at end of file