Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3f8ba4cec | ||
|
|
da43817926 | ||
|
|
17bd1399cc | ||
|
|
7325a509bd |
@ -1,6 +1,6 @@
|
|||||||
#!/usr/bin/env python
|
#!/usr/bin/env python
|
||||||
|
|
||||||
import os, sys, json
|
import os, sys, json, uuid
|
||||||
|
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from time import sleep, time
|
from time import sleep, time
|
||||||
@ -34,20 +34,20 @@ class keys():
|
|||||||
self.access_token = os.getenv("ACCESS_TOKEN")
|
self.access_token = os.getenv("ACCESS_TOKEN")
|
||||||
self.access_secret = os.getenv("ACCESS_SECRET")
|
self.access_secret = os.getenv("ACCESS_SECRET")
|
||||||
|
|
||||||
def sendToArtemis(pos, neu, neg, compound, type):
|
def sendToArtemis(syncId, pos, neu, neg, compound, type):
|
||||||
|
|
||||||
timestamp = datetime.now() + timedelta(hours=1)
|
timestamp = datetime.now() + timedelta(hours=1)
|
||||||
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
||||||
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
|
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
|
||||||
|
|
||||||
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
|
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
|
||||||
|
|
||||||
messageJson = json.dumps(message, indent = 4)
|
messageJson = json.dumps(message, indent = 4)
|
||||||
|
|
||||||
log("Sending message to TweetSave queue", 'INFO')
|
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
|
||||||
log("Message: {}".format(message), 'INFO')
|
log("Message: {}".format(message), 'INFO')
|
||||||
|
|
||||||
activeMQSender(messageJson)
|
activeMQSender(messageJson, syncId)
|
||||||
|
|
||||||
class Streamer():
|
class Streamer():
|
||||||
|
|
||||||
@ -90,6 +90,8 @@ class Listener(StreamListener):
|
|||||||
if (time() - self.start_time) < self.limit:
|
if (time() - self.start_time) < self.limit:
|
||||||
data = json.loads(data)
|
data = json.loads(data)
|
||||||
|
|
||||||
|
log(len(dumpStack), 'INFO')
|
||||||
|
|
||||||
# Check if tweet is a retweet
|
# Check if tweet is a retweet
|
||||||
if 'retweeted_status' in data:
|
if 'retweeted_status' in data:
|
||||||
if 'extended_tweet' in data['retweeted_status']:
|
if 'extended_tweet' in data['retweeted_status']:
|
||||||
@ -109,7 +111,7 @@ class Listener(StreamListener):
|
|||||||
text = filterOutTweetsWithNoneWhitelistedWords(text)
|
text = filterOutTweetsWithNoneWhitelistedWords(text)
|
||||||
dumpStack.append({'type': self.hashtag, 'tweet': text})
|
dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||||
|
|
||||||
def processTweet():
|
def processTweet(syncId):
|
||||||
|
|
||||||
log(len(dumpStack), 'INFO')
|
log(len(dumpStack), 'INFO')
|
||||||
|
|
||||||
@ -139,9 +141,9 @@ def processTweet():
|
|||||||
|
|
||||||
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
|
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
|
||||||
|
|
||||||
if callSpamFilter(cleanedTweet) != 'spam':
|
if callSpamFilter(cleanedTweet, syncId) != 'spam':
|
||||||
|
|
||||||
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
|
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
|
||||||
|
|
||||||
if compound != 0.0 and neu <= 0.6:
|
if compound != 0.0 and neu <= 0.6:
|
||||||
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
|
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
|
||||||
@ -180,7 +182,9 @@ def createHourJob():
|
|||||||
global timeF
|
global timeF
|
||||||
timeF = timeFunction()
|
timeF = timeFunction()
|
||||||
|
|
||||||
processTweet()
|
syncId = uuid.uuid4()
|
||||||
|
|
||||||
|
processTweet(syncId)
|
||||||
|
|
||||||
processStack = hourStack.copy()
|
processStack = hourStack.copy()
|
||||||
hourStack.clear()
|
hourStack.clear()
|
||||||
@ -206,7 +210,7 @@ def createHourJob():
|
|||||||
|
|
||||||
processStack.clear()
|
processStack.clear()
|
||||||
|
|
||||||
sendToArtemis(pos, neu, neg, compound, type)
|
sendToArtemis(syncId, pos, neu, neg, compound, type)
|
||||||
else:
|
else:
|
||||||
log("Stack is empty", 'WARN')
|
log("Stack is empty", 'WARN')
|
||||||
|
|
||||||
@ -220,6 +224,7 @@ def collectorMain(hashtag):
|
|||||||
for i in range(len(hashtag)):
|
for i in range(len(hashtag)):
|
||||||
Thread(target=collector, args=[hashtag[i]]).start()
|
Thread(target=collector, args=[hashtag[i]]).start()
|
||||||
|
|
||||||
|
sleep(2)
|
||||||
createHourJob()
|
createHourJob()
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
|
|||||||
@ -17,14 +17,20 @@ class keys():
|
|||||||
def returnKeys(self):
|
def returnKeys(self):
|
||||||
return self.addr, self.port, self.amqU, self.amqP
|
return self.addr, self.port, self.amqU, self.amqP
|
||||||
|
|
||||||
def activeMQSender(message):
|
def activeMQSender(message, syncId):
|
||||||
addr, port, mqUser, mqPass = keys().returnKeys()
|
addr, port, mqUser, mqPass = keys().returnKeys()
|
||||||
|
|
||||||
log("Attempting Connection to Artemis...", 'INFO')
|
log("Attempting Connection to Artemis...", 'INFO')
|
||||||
con = stomp.Connection([(addr, port)], auto_content_length=False)
|
con = stomp.Connection([(addr, port)], auto_content_length=False)
|
||||||
con.connect( mqUser, mqPass, wait=True)
|
con.connect( mqUser, mqPass, wait=True)
|
||||||
|
|
||||||
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
|
con.send("TweetSave",
|
||||||
|
message,
|
||||||
|
content_type="application/json",
|
||||||
|
headers={
|
||||||
|
"Content-Type":"application/json",
|
||||||
|
"X-CRYPTO-Sync-ID":syncId
|
||||||
|
})
|
||||||
|
|
||||||
con.disconnect()
|
con.disconnect()
|
||||||
|
|
||||||
|
|||||||
@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
|
|||||||
logHandler.setFormatter(formatter)
|
logHandler.setFormatter(formatter)
|
||||||
logger.addHandler(logHandler)
|
logger.addHandler(logHandler)
|
||||||
|
|
||||||
def log(message, level):
|
def log(message, level, syncId=""):
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
if level == 'INFO':
|
if level == 'INFO':
|
||||||
logger.info(message)
|
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||||
elif level == 'WARN':
|
elif level == 'WARN':
|
||||||
logger.warn(message)
|
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||||
elif level == 'ERR':
|
elif level == 'ERR':
|
||||||
logger.error(message)
|
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||||
elif level == 'DEBUG':
|
elif level == 'DEBUG':
|
||||||
logger.debug(message)
|
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||||
@ -9,11 +9,14 @@ class keys():
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
|
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
|
||||||
|
|
||||||
def callSentimentAnalyser(tweet):
|
def callSentimentAnalyser(tweet, syncId):
|
||||||
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
|
headers = {
|
||||||
|
"content-type":"text",
|
||||||
|
"X-CRYPTO-Sync-ID" : str(syncId)
|
||||||
|
}
|
||||||
try:
|
try:
|
||||||
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
|
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
|
||||||
response = requests.request("GET", uri)
|
response = requests.request("GET", url=uri, headers=headers)
|
||||||
|
|
||||||
response = json.loads(response.text)
|
response = json.loads(response.text)
|
||||||
|
|
||||||
@ -21,5 +24,5 @@ def callSentimentAnalyser(tweet):
|
|||||||
|
|
||||||
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
|
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
|
||||||
except:
|
except:
|
||||||
log("Could not call Sentiment Analyser Service", 'ERR')
|
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
|
||||||
return 0, 0, 0, 0
|
return 0, 0, 0, 0
|
||||||
@ -9,16 +9,18 @@ class keys():
|
|||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.spamFilter_uri = os.getenv("FILTER_URL")
|
self.spamFilter_uri = os.getenv("FILTER_URL")
|
||||||
|
|
||||||
def callSpamFilter(tweet):
|
def callSpamFilter(tweet, syncId):
|
||||||
|
headers = {
|
||||||
|
"content-type":"text",
|
||||||
|
"X-CRYPTO-Sync-ID" : str(syncId)
|
||||||
|
}
|
||||||
try:
|
try:
|
||||||
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
|
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
|
||||||
response = requests.request("GET", uri)
|
response = requests.request("GET", url=uri, headers=headers)
|
||||||
|
|
||||||
response = json.loads(response.text)
|
response = json.loads(response.text)
|
||||||
|
|
||||||
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
|
|
||||||
|
|
||||||
return response["result"]
|
return response["result"]
|
||||||
except:
|
except:
|
||||||
log("Could not call spam filter service", 'ERR')
|
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
|
||||||
return ""
|
return ""
|
||||||
Loading…
x
Reference in New Issue
Block a user