diff --git a/configuration/kubernetes/deployment.yaml b/configuration/kubernetes/deployment.yaml index f024e92..8495209 100644 --- a/configuration/kubernetes/deployment.yaml +++ b/configuration/kubernetes/deployment.yaml @@ -122,8 +122,8 @@ spec: cpu: 32m memory: 32Mi limits: - cpu: 75m - memory: 64Mi + cpu: 150m + memory: 256Mi securityContext: capabilities: add: diff --git a/src/tweets/collector.py b/src/tweets/collector.py index 4592102..db64787 100644 --- a/src/tweets/collector.py +++ b/src/tweets/collector.py @@ -19,6 +19,7 @@ from src.utils.activemqConnect import activeMQSender from src.utils.jsonLogger import log from http.client import IncompleteRead +from urllib3.exceptions import ProtocolError hourStack = [] dumpStack = [] @@ -31,13 +32,13 @@ class keys(): self.access_token = os.getenv("ACCESS_TOKEN") self.access_secret = os.getenv("ACCESS_SECRET") -def sendToArtemis(pos, neu, neg, compound): +def sendToArtemis(pos, neu, neg, compound, type): timestamp = datetime.now() + timedelta(hours=1) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') - message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound } + message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } messageJson = json.dumps(message, indent = 4) @@ -52,7 +53,7 @@ class Streamer(): pass def stream_tweets(self, hashtag): - listener = Listener() + listener = Listener(hashtag) auth = OAuthHandler(keys().api_key, keys().api_secret) log("Authorising with twitter API...", 'INFO') @@ -69,10 +70,13 @@ class Streamer(): stream.filter(languages=["en"], track=hashtag) except IncompleteRead: continue + except ProtocolError: + continue class Listener(StreamListener): - def __init__(self, time_limit=3000): + def __init__(self, hashtag, time_limit=3000): + self.hashtag = hashtag self.start_time = time() self.limit = time_limit @@ -86,22 +90,25 @@ class Listener(StreamListener): if 'extended_tweet' in data['retweeted_status']: #if tweet is over the 140 word limit text = data['retweeted_status']['extended_tweet']['full_text'] - dumpStack.append(text) + dumpStack.append({'type': self.hashtag, 'tweet': text}) else: text = data['retweeted_status']['text'] - dumpStack.append(text) + dumpStack.append({'type': self.hashtag, 'tweet': text}) else: # Else if a normal Tweeet if 'extended_tweet' in data: # If tweet is over 140 word limit text = data['extended_tweet']['full_text'] - dumpStack.append(text) + dumpStack.append({'type': self.hashtag, 'tweet': text}) def processTweet(): - if len(dumpStack) != 0: - for tweet in dumpStack: - removedLines = fixLines(tweet) + processStack = dumpStack + dumpStack.clear() + + if len(processStack) != 0: + for tweet in processStack: + removedLines = fixLines(tweet["tweet"]) removedSpecialChars = cleanTweet(removedLines) removedSpacing = removeSpacing(removedSpecialChars[0]) tweetLength = checkLength(removedSpacing) @@ -121,10 +128,10 @@ def processTweet(): pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) if compound != 0.0: - hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound} + hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]} hourStack.append(hourTweet) - + processStack.clear() def collector(hashtag): @@ -161,6 +168,7 @@ def createHourJob(): ovNeu = ovNeu + item['neu'] ovNeg = ovNeg + item['neg'] ovCompound = ovCompound + item['compound'] + type = item["type"] pos = round(ovPos/len(hourStack), 3) neu = round(ovNeu/len(hourStack), 3) @@ -168,9 +176,8 @@ def createHourJob(): compound = round(ovCompound/len(hourStack), 3) hourStack.clear() - dumpStack.clear() - sendToArtemis(pos, neu, neg, compound) + sendToArtemis(pos, neu, neg, compound, type) schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")