#!/usr/bin/env python import os, sys, json from datetime import datetime, timedelta from time import sleep, time import schedule from threading import Thread from tweepy import Stream, API, OAuthHandler from tweepy.streaming import StreamListener from src.utils.tweetPreprocessing import fixLines, cleanTweet, removeSpacing, checkLength, detectLaguage, remove_non_ascii from src.utils.spamFilter import callSpamFilter from src.utils.sentimentAnalyser import callSentimentAnalyser from src.utils.activemqConnect import activeMQSender from src.utils.jsonLogger import log from http.client import IncompleteRead from urllib3.exceptions import ProtocolError hourStack = [] dumpStack = [] class keys(): def __init__(self): self.api_key = os.getenv("API_KEY") self.api_secret = os.getenv("API_SECRET") self.access_token = os.getenv("ACCESS_TOKEN") self.access_secret = os.getenv("ACCESS_SECRET") def sendToArtemis(pos, neu, neg, compound, type): timestamp = datetime.now() + timedelta(hours=1) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } messageJson = json.dumps(message, indent = 4) log("Sending message to TweetSave queue", 'INFO') log("Message: {}".format(message), 'INFO') activeMQSender(messageJson) class Streamer(): def __init__(self): pass def stream_tweets(self, hashtag): listener = Listener(hashtag) auth = OAuthHandler(keys().api_key, keys().api_secret) log("Authorising with twitter API...", 'INFO') auth.set_access_token(keys().access_token, keys().access_secret) api = API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) log("Streaming Tweets", 'INFO') while True: try: stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') stream.filter(languages=["en"], track=hashtag) except IncompleteRead: continue except ProtocolError: continue class Listener(StreamListener): def __init__(self, hashtag, time_limit=3000): self.hashtag = hashtag self.start_time = time() self.limit = time_limit def on_data(self, data): log("Received Tweet...") if (time() - self.start_time) < self.limit: data = json.loads(data) # Check if tweet is a retweet if 'retweeted_status' in data: if 'extended_tweet' in data['retweeted_status']: #if tweet is over the 140 word limit text = data['retweeted_status']['extended_tweet']['full_text'] dumpStack.append({'type': self.hashtag, 'tweet': text}) else: text = data['retweeted_status']['text'] dumpStack.append({'type': self.hashtag, 'tweet': text}) else: # Else if a normal Tweeet if 'extended_tweet' in data: # If tweet is over 140 word limit text = data['extended_tweet']['full_text'] dumpStack.append({'type': self.hashtag, 'tweet': text}) def processTweet(): processStack = dumpStack dumpStack.clear() if len(processStack) != 0: for tweet in processStack: removedLines = fixLines(tweet["tweet"]) removedSpecialChars = cleanTweet(removedLines) removedSpacing = removeSpacing(removedSpecialChars[0]) tweetLength = checkLength(removedSpacing) if tweetLength == True: checkIfEnglish = detectLaguage(removedSpecialChars[0]) if checkIfEnglish == True: tweetText = remove_non_ascii(removedSpacing) # log("Cleaned Tweet: {}".format(tweetText), 'INFO') cleanedTweet = tweetText + ' ' + removedSpecialChars[1] if callSpamFilter(cleanedTweet) != 'spam': pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) if compound != 0.0: hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]} hourStack.append(hourTweet) processStack.clear() else: log("Dump Stack was Empty", 'WARN') def collector(hashtag): log("Thread Start...", 'INFO') streamer = Streamer() streamer.stream_tweets(hashtag) def timeFunction(): global timeF timeF = datetime.now() timeF = timeF + timedelta(hours = 1) timeF = str(timeF) timeF = ":".join(timeF.split(":", 2)[:2]) timeF = timeF.split(" ")[1].lstrip().split(" ")[0] return timeF def createHourJob(): log("Creating hour job...", 'INFO') schedule.clear("sendToArtemis") ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0 global timeF timeF = timeFunction() processTweet() if len(hourStack) != 0: for item in hourStack: ovPos = ovPos + item['pos'] ovNeu = ovNeu + item['neu'] ovNeg = ovNeg + item['neg'] ovCompound = ovCompound + item['compound'] type = item["type"] pos = round(ovPos/len(hourStack), 3) neu = round(ovNeu/len(hourStack), 3) neg = round(ovNeg/len(hourStack), 3) compound = round(ovCompound/len(hourStack), 3) hourStack.clear() sendToArtemis(pos, neu, neg, compound, type) else: log("Stack is empty", 'WARN') schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis") log("Collection will run again at {} every hour".format(timeF), 'INFO') def collectorMain(hashtag): log("Starting Tweet Collector", 'INFO') for i in range(len(hashtag)): Thread(target=collector, args=[hashtag[i]]).start() createHourJob() while True: schedule.run_pending() sleep(1)