[15.10.20] Testing

This commit is contained in:
andrewso 2020-10-15 17:19:44 +01:00
parent affd25379a
commit 7325a509bd
5 changed files with 31 additions and 21 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import os, sys, json import os, sys, json, uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from time import sleep, time from time import sleep, time
@ -34,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN") self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET") self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound, type): def sendToArtemis(syncId, pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1) timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } message = { "timestamp" : timestamp, "syncId": syncId, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO') log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Message: {}".format(message), 'INFO') log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson) activeMQSender(messageJson, syncId)
class Streamer(): class Streamer():
@ -90,6 +90,8 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit: if (time() - self.start_time) < self.limit:
data = json.loads(data) data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet # Check if tweet is a retweet
if 'retweeted_status' in data: if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
@ -109,7 +111,7 @@ class Listener(StreamListener):
text = filterOutTweetsWithNoneWhitelistedWords(text) text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text}) dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(): def processTweet(syncId):
log(len(dumpStack), 'INFO') log(len(dumpStack), 'INFO')
@ -139,9 +141,9 @@ def processTweet():
cleanedTweet = tweetText + ' ' + removedSpecialChars[1] cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet) != 'spam': if callSpamFilter(cleanedTweet, syncId) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
if compound != 0.0 and neu <= 0.6: if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]} hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
@ -180,7 +182,9 @@ def createHourJob():
global timeF global timeF
timeF = timeFunction() timeF = timeFunction()
processTweet() syncId = uuid.uuid4()
processTweet(syncId)
processStack = hourStack.copy() processStack = hourStack.copy()
hourStack.clear() hourStack.clear()
@ -206,7 +210,7 @@ def createHourJob():
processStack.clear() processStack.clear()
sendToArtemis(pos, neu, neg, compound, type) sendToArtemis(syncId, pos, neu, neg, compound, type)
else: else:
log("Stack is empty", 'WARN') log("Stack is empty", 'WARN')

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message): def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"}) con.send("TweetSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level): def log(message, level, syncId=""):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message) logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN': elif level == 'WARN':
logger.warn(message) logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR': elif level == 'ERR':
logger.error(message) logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message) logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})

View File

@ -9,11 +9,11 @@ class keys():
def __init__(self): def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL") self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet): def callSentimentAnalyser(tweet, syncId):
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO') # log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
try: try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", uri) response = requests.get(uri, headers={"X-CRYPTO-Sync-ID" : syncId})
response = json.loads(response.text) response = json.loads(response.text)

View File

@ -9,10 +9,10 @@ class keys():
def __init__(self): def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL") self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet): def callSpamFilter(tweet, syncId):
try: try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", uri) response = requests.get(uri, headers={"X-CRYPTO-Sync-ID" : syncId})
response = json.loads(response.text) response = json.loads(response.text)