Compare commits

..

No commits in common. "master" and "1.0.0-b56" have entirely different histories.

5 changed files with 27 additions and 43 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
import os, sys, json, uuid
import os, sys, json
from datetime import datetime, timedelta
from time import sleep, time
@ -34,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(syncId, pos, neu, neg, compound, type):
def sendToArtemis(pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Sending message to TweetSave queue", 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson, syncId)
activeMQSender(messageJson)
class Streamer():
@ -90,8 +90,6 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit:
data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
@ -111,7 +109,7 @@ class Listener(StreamListener):
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(syncId):
def processTweet():
log(len(dumpStack), 'INFO')
@ -141,11 +139,11 @@ def processTweet(syncId):
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet, syncId) != 'spam':
if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0 and neu <= 0.6:
if compound != 0.0 or neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet)
@ -182,9 +180,7 @@ def createHourJob():
global timeF
timeF = timeFunction()
syncId = uuid.uuid4()
processTweet(syncId)
processTweet()
processStack = hourStack.copy()
hourStack.clear()
@ -210,7 +206,7 @@ def createHourJob():
processStack.clear()
sendToArtemis(syncId, pos, neu, neg, compound, type)
sendToArtemis(pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
@ -224,7 +220,6 @@ def collectorMain(hashtag):
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob()
while True:

View File

@ -17,20 +17,14 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId):
def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level, syncId=""):
def log(message, level):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.info(message)
elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.warn(message)
elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.error(message)
elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})
logger.debug(message)

View File

@ -9,14 +9,11 @@ class keys():
def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
def callSentimentAnalyser(tweet):
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers)
response = requests.request("GET", uri)
response = json.loads(response.text)
@ -24,5 +21,5 @@ def callSentimentAnalyser(tweet, syncId):
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except:
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
log("Could not call Sentiment Analyser Service", 'ERR')
return 0, 0, 0, 0

View File

@ -9,18 +9,16 @@ class keys():
def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
def callSpamFilter(tweet):
try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers)
response = requests.request("GET", uri)
response = json.loads(response.text)
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
return response["result"]
except:
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
log("Could not call spam filter service", 'ERR')
return ""