Compare commits

..

4 Commits

Author SHA1 Message Date
andrewso
b3f8ba4cec [15.10.20] Testing 2020-10-15 20:19:39 +01:00
andrewso
da43817926 [15.10.20] Testing 2020-10-15 20:15:55 +01:00
andrewso
17bd1399cc [15.10.20] Testing 2020-10-15 18:10:57 +01:00
andrewso
7325a509bd [15.10.20] Testing 2020-10-15 17:19:44 +01:00
5 changed files with 42 additions and 26 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
import os, sys, json
import os, sys, json, uuid
from datetime import datetime, timedelta
from time import sleep, time
@ -34,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound, type):
def sendToArtemis(syncId, pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO')
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson)
activeMQSender(messageJson, syncId)
class Streamer():
@ -90,6 +90,8 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit:
data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
@ -109,7 +111,7 @@ class Listener(StreamListener):
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet():
def processTweet(syncId):
log(len(dumpStack), 'INFO')
@ -139,9 +141,9 @@ def processTweet():
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet) != 'spam':
if callSpamFilter(cleanedTweet, syncId) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
@ -180,7 +182,9 @@ def createHourJob():
global timeF
timeF = timeFunction()
processTweet()
syncId = uuid.uuid4()
processTweet(syncId)
processStack = hourStack.copy()
hourStack.clear()
@ -206,7 +210,7 @@ def createHourJob():
processStack.clear()
sendToArtemis(pos, neu, neg, compound, type)
sendToArtemis(syncId, pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
@ -220,6 +224,7 @@ def collectorMain(hashtag):
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob()
while True:

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message):
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.send("TweetSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level):
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message)
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message)
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message)
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message)
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})

View File

@ -9,11 +9,14 @@ class keys():
def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet):
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
def callSentimentAnalyser(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", uri)
response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text)
@ -21,5 +24,5 @@ def callSentimentAnalyser(tweet):
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except:
log("Could not call Sentiment Analyser Service", 'ERR')
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
return 0, 0, 0, 0

View File

@ -9,16 +9,18 @@ class keys():
def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet):
def callSpamFilter(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", uri)
response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text)
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
return response["result"]
except:
log("Could not call spam filter service", 'ERR')
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
return ""