[09.10.2020] Handling of incomplete read when API gets overloaded

This commit is contained in:
Andy Sotheran 2020-10-09 22:57:05 +01:00
parent 0903b69b8e
commit 06c95da108

View File

@ -18,7 +18,10 @@ from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
from http.client import IncompleteRead
hourStack = [] hourStack = []
dumpStack = []
class keys(): class keys():
@ -60,8 +63,12 @@ class Streamer():
wait_on_rate_limit_notify=True) wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO') log("Streaming Tweets", 'INFO')
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') while True:
stream.filter(languages=["en"], track=hashtag) try:
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
except IncompleteRead:
continue
class Listener(StreamListener): class Listener(StreamListener):
@ -79,10 +86,10 @@ class Listener(StreamListener):
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit #if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text'] text = data['retweeted_status']['extended_tweet']['full_text']
self.processTweet(text) dumpStack.append(text)
else: else:
text = data['retweeted_status']['text'] text = data['retweeted_status']['text']
self.processTweet(text) dumpStack.append(text)
else: else:
# Else if a normal Tweeet # Else if a normal Tweeet
if 'extended_tweet' in data: if 'extended_tweet' in data:
@ -90,30 +97,33 @@ class Listener(StreamListener):
text = data['extended_tweet']['full_text'] text = data['extended_tweet']['full_text']
self.processTweet(text) self.processTweet(text)
def processTweet(self, text): def processTweet():
removedLines = fixLines(text)
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
checkIfEnglish = detectLaguage(removedSpecialChars[0]) if len(dumpStack) != 0:
for tweet in dumpStack:
removedLines = fixLines(tweet)
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
if checkIfEnglish == True: checkIfEnglish = detectLaguage(removedSpecialChars[0])
tweetText = remove_non_ascii(removedSpacing) if checkIfEnglish == True:
# log("Cleaned Tweet: {}".format(tweetText), 'INFO') tweetText = remove_non_ascii(removedSpacing)
cleanedTweet = tweetText + ' ' + removedSpecialChars[1] # log("Cleaned Tweet: {}".format(tweetText), 'INFO')
if callSpamFilter(cleanedTweet) != 'spam': cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0: if callSpamFilter(cleanedTweet) != 'spam':
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound} pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
hourStack.append(hourTweet) if compound != 0.0:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound}
hourStack.append(hourTweet)
def collector(hashtag): def collector(hashtag):
@ -143,6 +153,8 @@ def createHourJob():
global timeF global timeF
timeF = timeFunction() timeF = timeFunction()
processTweet()
if len(hourStack) != 0: if len(hourStack) != 0:
for item in hourStack: for item in hourStack:
ovPos = ovPos + item['pos'] ovPos = ovPos + item['pos']