From 06c95da108551a63c2874300873ddbe3b37afec5 Mon Sep 17 00:00:00 2001 From: Andy Sotheran Date: Fri, 9 Oct 2020 22:57:05 +0100 Subject: [PATCH] [09.10.2020] Handling of incomplete read when API gets overloaded --- src/tweets/collector.py | 52 +++++++++++++++++++++++++---------------- 1 file changed, 32 insertions(+), 20 deletions(-) diff --git a/src/tweets/collector.py b/src/tweets/collector.py index 5e18567..8370ee3 100644 --- a/src/tweets/collector.py +++ b/src/tweets/collector.py @@ -18,7 +18,10 @@ from src.utils.sentimentAnalyser import callSentimentAnalyser from src.utils.activemqConnect import activeMQSender from src.utils.jsonLogger import log +from http.client import IncompleteRead + hourStack = [] +dumpStack = [] class keys(): @@ -60,8 +63,12 @@ class Streamer(): wait_on_rate_limit_notify=True) log("Streaming Tweets", 'INFO') - stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') - stream.filter(languages=["en"], track=hashtag) + while True: + try: + stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') + stream.filter(languages=["en"], track=hashtag) + except IncompleteRead: + continue class Listener(StreamListener): @@ -79,10 +86,10 @@ class Listener(StreamListener): if 'extended_tweet' in data['retweeted_status']: #if tweet is over the 140 word limit text = data['retweeted_status']['extended_tweet']['full_text'] - self.processTweet(text) + dumpStack.append(text) else: text = data['retweeted_status']['text'] - self.processTweet(text) + dumpStack.append(text) else: # Else if a normal Tweeet if 'extended_tweet' in data: @@ -90,30 +97,33 @@ class Listener(StreamListener): text = data['extended_tweet']['full_text'] self.processTweet(text) - def processTweet(self, text): - removedLines = fixLines(text) - removedSpecialChars = cleanTweet(removedLines) - removedSpacing = removeSpacing(removedSpecialChars[0]) - tweetLength = checkLength(removedSpacing) - if tweetLength == True: +def processTweet(): - checkIfEnglish = detectLaguage(removedSpecialChars[0]) + if len(dumpStack) != 0: + for tweet in dumpStack: + removedLines = fixLines(tweet) + removedSpecialChars = cleanTweet(removedLines) + removedSpacing = removeSpacing(removedSpecialChars[0]) + tweetLength = checkLength(removedSpacing) + if tweetLength == True: - if checkIfEnglish == True: + checkIfEnglish = detectLaguage(removedSpecialChars[0]) - tweetText = remove_non_ascii(removedSpacing) + if checkIfEnglish == True: - # log("Cleaned Tweet: {}".format(tweetText), 'INFO') + tweetText = remove_non_ascii(removedSpacing) - cleanedTweet = tweetText + ' ' + removedSpecialChars[1] + # log("Cleaned Tweet: {}".format(tweetText), 'INFO') - if callSpamFilter(cleanedTweet) != 'spam': - pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) + cleanedTweet = tweetText + ' ' + removedSpecialChars[1] - if compound != 0.0: - hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound} + if callSpamFilter(cleanedTweet) != 'spam': + pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) - hourStack.append(hourTweet) + if compound != 0.0: + hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound} + + hourStack.append(hourTweet) def collector(hashtag): @@ -143,6 +153,8 @@ def createHourJob(): global timeF timeF = timeFunction() + processTweet() + if len(hourStack) != 0: for item in hourStack: ovPos = ovPos + item['pos']