From 8e15f8c9f801d43280f846590b32c426f58b5f98 Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Wed, 14 Oct 2020 09:48:40 +0100 Subject: [PATCH] [14.10.20] Testing --- src/tweets/collector.py | 80 +++++++++++++++++++---------------------- 1 file changed, 36 insertions(+), 44 deletions(-) diff --git a/src/tweets/collector.py b/src/tweets/collector.py index 01a715f..3b12353 100644 --- a/src/tweets/collector.py +++ b/src/tweets/collector.py @@ -23,6 +23,7 @@ from urllib3.exceptions import ProtocolError hourStack = [] dumpStack = [] +processStack = [] class keys(): @@ -67,7 +68,7 @@ class Streamer(): while True: try: - stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') + stream = Stream(auth, listener=listener, tweet_mode='extended') stream.filter(languages=["en"], track=hashtag) except IncompleteRead: log("Incomplete Read Error", 'ERR') @@ -75,9 +76,6 @@ class Streamer(): except ProtocolError: log("Protocol Error", 'ERR') continue - except: - log("Some other Error", 'ERR') - continue class Listener(StreamListener): @@ -91,73 +89,63 @@ class Listener(StreamListener): if (time() - self.start_time) < self.limit: data = json.loads(data) + log(len(dumpStack), 'INFO') + # Check if tweet is a retweet if 'retweeted_status' in data: if 'extended_tweet' in data['retweeted_status']: #if tweet is over the 140 word limit text = data['retweeted_status']['extended_tweet']['full_text'] - processTweet(text, self.hashtag) - # dumpStack.append({'type': self.hashtag, 'tweet': text}) + dumpStack.append({'type': self.hashtag, 'tweet': text}) else: text = data['retweeted_status']['text'] - processTweet(text, self.hashtag) - # dumpStack.append({'type': self.hashtag, 'tweet': text}) + dumpStack.append({'type': self.hashtag, 'tweet': text}) else: # Else if a normal Tweeet if 'extended_tweet' in data: # If tweet is over 140 word limit text = data['extended_tweet']['full_text'] - processTweet(text, self.hashtag) - # dumpStack.append({'type': self.hashtag, 'tweet': text}) - else: - processTweet(data["text"], self.hashtag) + dumpStack.append({'type': self.hashtag, 'tweet': text}) -def processTweet(text, type): +def processTweet(): - log("hourStack size :: [{}]".format(len(hourStack)), "INFO") + processStack = dumpStack.copy() + dumpStack.clear() - log("Processing [{}] Tweet...".format(text), 'INFO') + # log("Processing [{}] Tweet...".format(text), 'INFO') - removedLines = fixLines(text) - log("fixLines", 'DEBUG') + if len(processStack) != 0: + for tweet in processStack: - removedSpecialChars = cleanTweet(removedLines) - log("cleanTweet", 'DEBUG') - removedSpacing = removeSpacing(removedSpecialChars[0]) - log("removeSpacing", 'DEBUG') + removedLines = fixLines(str(tweet)) - tweetLength = checkLength(removedSpacing) - log("checkLength", 'DEBUG') - if tweetLength == True: + removedSpecialChars = cleanTweet(removedLines) + removedSpacing = removeSpacing(removedSpecialChars[0]) - checkIfEnglish = detectLaguage(removedSpecialChars[0]) - log("Boop2", 'DEBUG') + tweetLength = checkLength(removedSpacing) + if tweetLength == True: - if checkIfEnglish == True: + checkIfEnglish = detectLaguage(removedSpecialChars[0]) - tweetText = remove_non_ascii(removedSpacing) - log("Boop3", 'DEBUG') + if checkIfEnglish == True: - # log("Cleaned Tweet: {}".format(tweetText), 'INFO') + tweetText = remove_non_ascii(removedSpacing) - cleanedTweet = tweetText + ' ' + removedSpecialChars[1] + # log("Cleaned Tweet: {}".format(tweetText), 'INFO') - if callSpamFilter(cleanedTweet) != 'spam': - log("After spam call", 'DEBUG') + cleanedTweet = tweetText + ' ' + removedSpecialChars[1] - pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) + if callSpamFilter(cleanedTweet) != 'spam': - log("After Sentiment Call", 'DEBUG') + pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) - if compound != 0.0 | neu <= 0.8: - log("if not 0.0 or above 0.8", 'DEBUG') + if compound != 0.0 or neu <= 0.8: + hourTweet = {'pos': 1, 'neu': 1, 'neg': 1, 'compound': 1, 'type': ""} - hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': type} - - hourStack.append(hourTweet) - # processStack.clear() - # else: - # log("Dump Stack was Empty", 'WARN') + hourStack.append(hourTweet) + processStack.clear() + else: + log("Dump Stack was Empty", 'WARN') def collector(hashtag): @@ -183,11 +171,15 @@ def createHourJob(): log("Creating hour job...", 'INFO') schedule.clear("sendToArtemis") ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0 + type = "" global timeF timeF = timeFunction() + processTweet() + processStack = hourStack.copy() + hourStack.clear() log("Extracting sentiment scores...", 'INFO') @@ -208,7 +200,7 @@ def createHourJob(): if type == "bitcoin": type = 'btc_usd' - hourStack.clear() + processStack.clear() sendToArtemis(pos, neu, neg, compound, type) else: