tweet-collector/src/tweets/collector.py
2020-10-15 18:10:57 +01:00

232 lines
7.0 KiB
Python

#!/usr/bin/env python
import os, sys, json, uuid
from datetime import datetime, timedelta
from time import sleep, time
import schedule
from threading import Thread
from tweepy import Stream, API, OAuthHandler
from tweepy.streaming import StreamListener
from src.utils.tweetPreprocessing import fixLines, cleanTweet, removeSpacing, checkLength, detectLaguage, remove_non_ascii
from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log
from src.utils.whitelistedWords import filterOutTweetsWithNoneWhitelistedWords
from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError
hourStack = []
dumpStack = []
processStack = []
class keys():
def __init__(self):
self.api_key = os.getenv("API_KEY")
self.api_secret = os.getenv("API_SECRET")
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(syncId, pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "syncId": syncId, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson, syncId)
class Streamer():
def __init__(self):
pass
def stream_tweets(self, hashtag):
listener = Listener(hashtag)
auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO')
auth.set_access_token(keys().access_token, keys().access_secret)
api = API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO')
while True:
try:
stream = Stream(auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
except IncompleteRead:
log("Incomplete Read Error", 'ERR')
continue
except ProtocolError:
log("Protocol Error", 'ERR')
continue
class Listener(StreamListener):
def __init__(self, hashtag, time_limit=3000):
self.hashtag = hashtag
self.start_time = time()
self.limit = time_limit
def on_data(self, data):
if (time() - self.start_time) < self.limit:
data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text']
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
text = data['retweeted_status']['text']
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
# Else if a normal Tweeet
if 'extended_tweet' in data:
# If tweet is over 140 word limit
text = data['extended_tweet']['full_text']
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(syncId):
log(len(dumpStack), 'INFO')
processStack = dumpStack.copy()
dumpStack.clear()
# log("Processing [{}] Tweet...".format(text), 'INFO')
if len(processStack) != 0:
for tweet in processStack:
removedLines = fixLines(tweet["tweet"])
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
checkIfEnglish = detectLaguage(removedSpecialChars[0])
if checkIfEnglish == True:
tweetText = remove_non_ascii(removedSpacing)
# log("Cleaned Tweet: {}".format(tweetText), 'INFO')
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet, syncId) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet)
processStack.clear()
else:
log("Dump Stack was Empty", 'WARN')
def collector(hashtag):
log("Thread Start...", 'INFO')
streamer = Streamer()
streamer.stream_tweets(hashtag)
def timeFunction():
global timeF
timeF = datetime.now()
timeF = timeF + timedelta(hours = 1)
timeF = str(timeF)
timeF = ":".join(timeF.split(":", 2)[:2])
timeF = timeF.split(" ")[1].lstrip().split(" ")[0]
return timeF
def createHourJob():
log("Creating hour job...", 'INFO')
schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
type = ""
global timeF
timeF = timeFunction()
syncId = uuid.uuid4()
processTweet(syncId)
processStack = hourStack.copy()
hourStack.clear()
log("Extracting sentiment scores...", 'INFO')
if len(processStack) != 0:
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
for item in processStack:
ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound']
type = item["type"]
pos = round(ovPos/len(processStack), 3)
neu = round(ovNeu/len(processStack), 3)
neg = round(ovNeg/len(processStack), 3)
compound = round(ovCompound/len(processStack), 3)
if type == "bitcoin":
type = 'btc_usd'
processStack.clear()
sendToArtemis(syncId, pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")
log("Collection will run again at {} every hour".format(timeF), 'INFO')
def collectorMain(hashtag):
log("Starting Tweet Collector", 'INFO')
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob()
while True:
schedule.run_pending()
sleep(1)