Compare commits

..

No commits in common. "master" and "1.0.0-b54" have entirely different histories.

6 changed files with 32 additions and 49 deletions

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import os, sys, json, uuid import os, sys, json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from time import sleep, time from time import sleep, time
@ -34,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN") self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET") self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(syncId, pos, neu, neg, compound, type): def sendToArtemis(pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1) timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO') log("Sending message to TweetSave queue", 'INFO')
log("Message: {}".format(message), 'INFO') log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson, syncId) activeMQSender(messageJson)
class Streamer(): class Streamer():
@ -90,8 +90,6 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit: if (time() - self.start_time) < self.limit:
data = json.loads(data) data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet # Check if tweet is a retweet
if 'retweeted_status' in data: if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
@ -111,7 +109,7 @@ class Listener(StreamListener):
text = filterOutTweetsWithNoneWhitelistedWords(text) text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text}) dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(syncId): def processTweet():
log(len(dumpStack), 'INFO') log(len(dumpStack), 'INFO')
@ -141,11 +139,11 @@ def processTweet(syncId):
cleanedTweet = tweetText + ' ' + removedSpecialChars[1] cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet, syncId) != 'spam': if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId) pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0 and neu <= 0.6: if compound != 0.0 or neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]} hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet) hourStack.append(hourTweet)
@ -182,9 +180,7 @@ def createHourJob():
global timeF global timeF
timeF = timeFunction() timeF = timeFunction()
syncId = uuid.uuid4() processTweet()
processTweet(syncId)
processStack = hourStack.copy() processStack = hourStack.copy()
hourStack.clear() hourStack.clear()
@ -193,7 +189,7 @@ def createHourJob():
if len(processStack) != 0: if len(processStack) != 0:
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO') log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
for item in processStack: for item in hourStack:
ovPos = ovPos + item['pos'] ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu'] ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg'] ovNeg = ovNeg + item['neg']
@ -210,7 +206,7 @@ def createHourJob():
processStack.clear() processStack.clear()
sendToArtemis(syncId, pos, neu, neg, compound, type) sendToArtemis(pos, neu, neg, compound, type)
else: else:
log("Stack is empty", 'WARN') log("Stack is empty", 'WARN')
@ -224,7 +220,6 @@ def collectorMain(hashtag):
for i in range(len(hashtag)): for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start() Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob() createHourJob()
while True: while True:

View File

@ -17,20 +17,14 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId): def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level, syncId=""): def log(message, level):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.info(message)
elif level == 'WARN': elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.warn(message)
elif level == 'ERR': elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.error(message)
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.debug(message)

View File

@ -9,14 +9,11 @@ class keys():
def __init__(self): def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL") self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet, syncId): def callSentimentAnalyser(tweet):
headers = { # log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
@ -24,5 +21,5 @@ def callSentimentAnalyser(tweet, syncId):
return scores["pos"], scores["neu"], scores["neg"], scores["compound"] return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except: except:
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId) log("Could not call Sentiment Analyser Service", 'ERR')
return 0, 0, 0, 0 return 0, 0, 0, 0

View File

@ -9,18 +9,16 @@ class keys():
def __init__(self): def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL") self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet, syncId): def callSpamFilter(tweet):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
return response["result"] return response["result"]
except: except:
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId) log("Could not call spam filter service", 'ERR')
return "" return ""

View File

@ -1,5 +1,7 @@
#!/usr/bin/env python #!/usr/bin/env python
from src.utils.jsonLogger import log
whitelist = [ whitelist = [
"bull", "bull",
"bear", "bear",
@ -138,15 +140,12 @@ whitelist = [
"sucking", "sucking",
"shard", "shard",
"sharding", "sharding",
"industry", "industry"
"powerful",
"better",
"worse"
] ]
def filterOutTweetsWithNoneWhitelistedWords(text): def filterOutTweetsWithNoneWhitelistedWords(text):
if any(x in text for x in whitelist): if any(x in text for x in whitelist):
return text return text
else: else:
# log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN') log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN')
return "" return ""