[10.10.20] Type and resource increase

This commit is contained in:
andrewso 2020-10-10 19:59:08 +01:00
parent a65be3b9be
commit 886470d17e
2 changed files with 23 additions and 16 deletions

View File

@ -122,8 +122,8 @@ spec:
cpu: 32m cpu: 32m
memory: 32Mi memory: 32Mi
limits: limits:
cpu: 75m cpu: 150m
memory: 64Mi memory: 256Mi
securityContext: securityContext:
capabilities: capabilities:
add: add:

View File

@ -19,6 +19,7 @@ from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
from http.client import IncompleteRead from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError
hourStack = [] hourStack = []
dumpStack = [] dumpStack = []
@ -31,13 +32,13 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN") self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET") self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound): def sendToArtemis(pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1) timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound } message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
@ -52,7 +53,7 @@ class Streamer():
pass pass
def stream_tweets(self, hashtag): def stream_tweets(self, hashtag):
listener = Listener() listener = Listener(hashtag)
auth = OAuthHandler(keys().api_key, keys().api_secret) auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO') log("Authorising with twitter API...", 'INFO')
@ -69,10 +70,13 @@ class Streamer():
stream.filter(languages=["en"], track=hashtag) stream.filter(languages=["en"], track=hashtag)
except IncompleteRead: except IncompleteRead:
continue continue
except ProtocolError:
continue
class Listener(StreamListener): class Listener(StreamListener):
def __init__(self, time_limit=3000): def __init__(self, hashtag, time_limit=3000):
self.hashtag = hashtag
self.start_time = time() self.start_time = time()
self.limit = time_limit self.limit = time_limit
@ -86,22 +90,25 @@ class Listener(StreamListener):
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit #if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text'] text = data['retweeted_status']['extended_tweet']['full_text']
dumpStack.append(text) dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
text = data['retweeted_status']['text'] text = data['retweeted_status']['text']
dumpStack.append(text) dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
# Else if a normal Tweeet # Else if a normal Tweeet
if 'extended_tweet' in data: if 'extended_tweet' in data:
# If tweet is over 140 word limit # If tweet is over 140 word limit
text = data['extended_tweet']['full_text'] text = data['extended_tweet']['full_text']
dumpStack.append(text) dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(): def processTweet():
if len(dumpStack) != 0: processStack = dumpStack
for tweet in dumpStack: dumpStack.clear()
removedLines = fixLines(tweet)
if len(processStack) != 0:
for tweet in processStack:
removedLines = fixLines(tweet["tweet"])
removedSpecialChars = cleanTweet(removedLines) removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0]) removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing) tweetLength = checkLength(removedSpacing)
@ -121,10 +128,10 @@ def processTweet():
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0: if compound != 0.0:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound} hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet) hourStack.append(hourTweet)
processStack.clear()
def collector(hashtag): def collector(hashtag):
@ -161,6 +168,7 @@ def createHourJob():
ovNeu = ovNeu + item['neu'] ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg'] ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound'] ovCompound = ovCompound + item['compound']
type = item["type"]
pos = round(ovPos/len(hourStack), 3) pos = round(ovPos/len(hourStack), 3)
neu = round(ovNeu/len(hourStack), 3) neu = round(ovNeu/len(hourStack), 3)
@ -168,9 +176,8 @@ def createHourJob():
compound = round(ovCompound/len(hourStack), 3) compound = round(ovCompound/len(hourStack), 3)
hourStack.clear() hourStack.clear()
dumpStack.clear()
sendToArtemis(pos, neu, neg, compound) sendToArtemis(pos, neu, neg, compound, type)
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis") schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")