From 7325a509bd815b88b9a4123721043613d7b1cc18 Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Thu, 15 Oct 2020 17:19:44 +0100 Subject: [PATCH] [15.10.20] Testing --- src/tweets/collector.py | 24 ++++++++++++++---------- src/utils/activemqConnect.py | 10 ++++++++-- src/utils/jsonLogger.py | 10 +++++----- src/utils/sentimentAnalyser.py | 4 ++-- src/utils/spamFilter.py | 4 ++-- 5 files changed, 31 insertions(+), 21 deletions(-) diff --git a/src/tweets/collector.py b/src/tweets/collector.py index 047e95f..6c3d4dd 100644 --- a/src/tweets/collector.py +++ b/src/tweets/collector.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -import os, sys, json +import os, sys, json, uuid from datetime import datetime, timedelta from time import sleep, time @@ -34,20 +34,20 @@ class keys(): self.access_token = os.getenv("ACCESS_TOKEN") self.access_secret = os.getenv("ACCESS_SECRET") -def sendToArtemis(pos, neu, neg, compound, type): +def sendToArtemis(syncId, pos, neu, neg, compound, type): timestamp = datetime.now() + timedelta(hours=1) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') - message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } + message = { "timestamp" : timestamp, "syncId": syncId, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } messageJson = json.dumps(message, indent = 4) - log("Sending message to TweetSave queue", 'INFO') + log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO') log("Message: {}".format(message), 'INFO') - activeMQSender(messageJson) + activeMQSender(messageJson, syncId) class Streamer(): @@ -90,6 +90,8 @@ class Listener(StreamListener): if (time() - self.start_time) < self.limit: data = json.loads(data) + log(len(dumpStack), 'INFO') + # Check if tweet is a retweet if 'retweeted_status' in data: if 'extended_tweet' in data['retweeted_status']: @@ -109,7 +111,7 @@ class Listener(StreamListener): text = filterOutTweetsWithNoneWhitelistedWords(text) dumpStack.append({'type': self.hashtag, 'tweet': text}) -def processTweet(): +def processTweet(syncId): log(len(dumpStack), 'INFO') @@ -139,9 +141,9 @@ def processTweet(): cleanedTweet = tweetText + ' ' + removedSpecialChars[1] - if callSpamFilter(cleanedTweet) != 'spam': + if callSpamFilter(cleanedTweet, syncId) != 'spam': - pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) + pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId) if compound != 0.0 and neu <= 0.6: hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]} @@ -180,7 +182,9 @@ def createHourJob(): global timeF timeF = timeFunction() - processTweet() + syncId = uuid.uuid4() + + processTweet(syncId) processStack = hourStack.copy() hourStack.clear() @@ -206,7 +210,7 @@ def createHourJob(): processStack.clear() - sendToArtemis(pos, neu, neg, compound, type) + sendToArtemis(syncId, pos, neu, neg, compound, type) else: log("Stack is empty", 'WARN') diff --git a/src/utils/activemqConnect.py b/src/utils/activemqConnect.py index 55e0aca..205ba70 100644 --- a/src/utils/activemqConnect.py +++ b/src/utils/activemqConnect.py @@ -17,14 +17,20 @@ class keys(): def returnKeys(self): return self.addr, self.port, self.amqU, self.amqP -def activeMQSender(message): +def activeMQSender(message, syncId): addr, port, mqUser, mqPass = keys().returnKeys() log("Attempting Connection to Artemis...", 'INFO') con = stomp.Connection([(addr, port)], auto_content_length=False) con.connect( mqUser, mqPass, wait=True) - con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"}) + con.send("TweetSave", + message, + content_type="application/json", + headers={ + "Content-Type":"application/json", + "X-CRYPTO-Sync-ID":syncId + }) con.disconnect() diff --git a/src/utils/jsonLogger.py b/src/utils/jsonLogger.py index a781145..fba89cc 100644 --- a/src/utils/jsonLogger.py +++ b/src/utils/jsonLogger.py @@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'): logHandler.setFormatter(formatter) logger.addHandler(logHandler) -def log(message, level): +def log(message, level, syncId=""): logger = logging.getLogger(__name__) if level == 'INFO': - logger.info(message) + logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'WARN': - logger.warn(message) + logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'ERR': - logger.error(message) + logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId}) elif level == 'DEBUG': - logger.debug(message) \ No newline at end of file + logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId}) \ No newline at end of file diff --git a/src/utils/sentimentAnalyser.py b/src/utils/sentimentAnalyser.py index 1d9aec8..12df199 100644 --- a/src/utils/sentimentAnalyser.py +++ b/src/utils/sentimentAnalyser.py @@ -9,11 +9,11 @@ class keys(): def __init__(self): self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL") -def callSentimentAnalyser(tweet): +def callSentimentAnalyser(tweet, syncId): # log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO') try: uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet - response = requests.request("GET", uri) + response = requests.get(uri, headers={"X-CRYPTO-Sync-ID" : syncId}) response = json.loads(response.text) diff --git a/src/utils/spamFilter.py b/src/utils/spamFilter.py index 9a0ba63..e06cafd 100644 --- a/src/utils/spamFilter.py +++ b/src/utils/spamFilter.py @@ -9,10 +9,10 @@ class keys(): def __init__(self): self.spamFilter_uri = os.getenv("FILTER_URL") -def callSpamFilter(tweet): +def callSpamFilter(tweet, syncId): try: uri = keys().spamFilter_uri + "/predict?tweet="+tweet - response = requests.request("GET", uri) + response = requests.get(uri, headers={"X-CRYPTO-Sync-ID" : syncId}) response = json.loads(response.text)