tweet-collector/src/tweets/collector.py
2020-10-10 23:05:39 +01:00

214 lines
6.3 KiB
Python

#!/usr/bin/env python
import os, sys, json
from datetime import datetime, timedelta
from time import sleep, time
import schedule
from threading import Thread
from tweepy import Stream, API, OAuthHandler
from tweepy.streaming import StreamListener
from src.utils.tweetPreprocessing import fixLines, cleanTweet, removeSpacing, checkLength, detectLaguage, remove_non_ascii
from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log
from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError
hourStack = []
dumpStack = []
class keys():
def __init__(self):
self.api_key = os.getenv("API_KEY")
self.api_secret = os.getenv("API_SECRET")
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson)
class Streamer():
def __init__(self):
pass
def stream_tweets(self, hashtag):
listener = Listener(hashtag)
auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO')
auth.set_access_token(keys().access_token, keys().access_secret)
api = API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO')
while True:
try:
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
except IncompleteRead:
continue
except ProtocolError:
continue
class Listener(StreamListener):
def __init__(self, hashtag, time_limit=3000):
self.hashtag = hashtag
self.start_time = time()
self.limit = time_limit
def on_data(self, data):
if (time() - self.start_time) < self.limit:
data = json.loads(data)
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text']
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
text = data['retweeted_status']['text']
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
# Else if a normal Tweeet
if 'extended_tweet' in data:
# If tweet is over 140 word limit
text = data['extended_tweet']['full_text']
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet():
log("Dump Stack {}".format(len(dumpStack)), 'INFO')
processStack = dumpStack
dumpStack.clear()
log("Processing Tweets Stack...", 'INFO')
log(len(processStack), 'INFO')
if len(processStack) != 0:
for tweet in processStack:
removedLines = fixLines(tweet["tweet"])
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
checkIfEnglish = detectLaguage(removedSpecialChars[0])
if checkIfEnglish == True:
tweetText = remove_non_ascii(removedSpacing)
# log("Cleaned Tweet: {}".format(tweetText), 'INFO')
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet)
processStack.clear()
else:
log("Dump Stack was Empty", 'WARN')
def collector(hashtag):
log("Thread Start...", 'INFO')
streamer = Streamer()
streamer.stream_tweets(hashtag)
def timeFunction():
global timeF
timeF = datetime.now()
timeF = timeF + timedelta(hours = 1)
timeF = str(timeF)
timeF = ":".join(timeF.split(":", 2)[:2])
timeF = timeF.split(" ")[1].lstrip().split(" ")[0]
return timeF
def createHourJob():
log("Creating hour job...", 'INFO')
schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
global timeF
timeF = timeFunction()
processTweet()
log("Extracting sentiment scores...", 'INFO')
log(len(hourStack), 'INFO')
if len(hourStack) != 0:
log("Boop", 'INFO')
for item in hourStack:
ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound']
type = item["type"]
pos = round(ovPos/len(hourStack), 3)
neu = round(ovNeu/len(hourStack), 3)
neg = round(ovNeg/len(hourStack), 3)
compound = round(ovCompound/len(hourStack), 3)
print(pos, neu, neg, compound)
hourStack.clear()
sendToArtemis(pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")
log("Collection will run again at {} every hour".format(timeF), 'INFO')
def collectorMain(hashtag):
log("Starting Tweet Collector", 'INFO')
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
sleep(5)
createHourJob()
while True:
schedule.run_pending()
sleep(1)