#!/usr/bin/env python import os, sys, json from datetime import datetime, timedelta from time import sleep, time import schedule from threading import Thread from tweepy import Stream, API, OAuthHandler from tweepy.streaming import StreamListener from src.utils.tweetPreprocessing import fixLines, cleanTweet, removeSpacing, checkLength, detectLaguage, remove_non_ascii from src.utils.spamFilter import callSpamFilter from src.utils.sentimentAnalyser import callSentimentAnalyser from src.utils.activemqConnect import activeMQSender from src.utils.jsonLogger import log from http.client import IncompleteRead from urllib3.exceptions import ProtocolError hourStack = [] dumpStack = [] class keys(): def __init__(self): self.api_key = os.getenv("API_KEY") self.api_secret = os.getenv("API_SECRET") self.access_token = os.getenv("ACCESS_TOKEN") self.access_secret = os.getenv("ACCESS_SECRET") def sendToArtemis(pos, neu, neg, compound, type): timestamp = datetime.now() + timedelta(hours=1) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } messageJson = json.dumps(message, indent = 4) log("Sending message to TweetSave queue", 'INFO') log("Message: {}".format(message), 'INFO') activeMQSender(messageJson) class Streamer(): def __init__(self): pass def stream_tweets(self, hashtag): listener = Listener(hashtag) auth = OAuthHandler(keys().api_key, keys().api_secret) log("Authorising with twitter API...", 'INFO') auth.set_access_token(keys().access_token, keys().access_secret) api = API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True) log("Streaming Tweets", 'INFO') while True: try: stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') stream.filter(languages=["en"], track=hashtag) except IncompleteRead: log("Incomplete Read Error", 'ERR') continue except ProtocolError: log("Protocol Error", 'ERR') continue except: log("Some other Error", 'ERR') continue class Listener(StreamListener): def __init__(self, hashtag, time_limit=3000): self.hashtag = hashtag self.start_time = time() self.limit = time_limit def on_data(self, data): if (time() - self.start_time) < self.limit: data = json.loads(data) # Check if tweet is a retweet if 'retweeted_status' in data: if 'extended_tweet' in data['retweeted_status']: #if tweet is over the 140 word limit text = data['retweeted_status']['extended_tweet']['full_text'] processTweet(text, self.hashtag) # dumpStack.append({'type': self.hashtag, 'tweet': text}) else: text = data['retweeted_status']['text'] processTweet(text, self.hashtag) # dumpStack.append({'type': self.hashtag, 'tweet': text}) else: # Else if a normal Tweeet if 'extended_tweet' in data: # If tweet is over 140 word limit text = data['extended_tweet']['full_text'] processTweet(text, self.hashtag) # dumpStack.append({'type': self.hashtag, 'tweet': text}) else: processTweet(data["text"], self.hashtag) def processTweet(text, type): log("hourStack size :: [{}]".format(len(hourStack)), "INFO") log("Processing [{}] Tweet...".format(text), 'INFO') removedLines = fixLines(text) log("fixLines", 'DEBUG') removedSpecialChars = cleanTweet(removedLines) log("cleanTweet", 'DEBUG') removedSpacing = removeSpacing(removedSpecialChars[0]) log("removeSpacing", 'DEBUG') tweetLength = checkLength(removedSpacing) log("checkLength", 'DEBUG') if tweetLength == True: checkIfEnglish = detectLaguage(removedSpecialChars[0]) log("Boop2", 'DEBUG') if checkIfEnglish == True: tweetText = remove_non_ascii(removedSpacing) log("Boop3", 'DEBUG') # log("Cleaned Tweet: {}".format(tweetText), 'INFO') cleanedTweet = tweetText + ' ' + removedSpecialChars[1] if callSpamFilter(cleanedTweet) != 'spam': log("After spam call", 'DEBUG') pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet) log("After Sentiment Call", 'DEBUG') if compound != 0.0 | neu <= 0.8: log("if not 0.0 or above 0.8", 'DEBUG') hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': type} hourStack.append(hourTweet) # processStack.clear() # else: # log("Dump Stack was Empty", 'WARN') def collector(hashtag): log("Thread Start...", 'INFO') streamer = Streamer() streamer.stream_tweets(hashtag) def timeFunction(): global timeF timeF = datetime.now() timeF = timeF + timedelta(hours = 1) timeF = str(timeF) timeF = ":".join(timeF.split(":", 2)[:2]) timeF = timeF.split(" ")[1].lstrip().split(" ")[0] return timeF def createHourJob(): log("Creating hour job...", 'INFO') schedule.clear("sendToArtemis") ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0 global timeF timeF = timeFunction() processStack = hourStack.copy() log("Extracting sentiment scores...", 'INFO') if len(processStack) != 0: log("Process stack size is :: [{}]".format(len(processStack)), 'INFO') for item in hourStack: ovPos = ovPos + item['pos'] ovNeu = ovNeu + item['neu'] ovNeg = ovNeg + item['neg'] ovCompound = ovCompound + item['compound'] type = item["type"] pos = round(ovPos/len(processStack), 3) neu = round(ovNeu/len(processStack), 3) neg = round(ovNeg/len(processStack), 3) compound = round(ovCompound/len(processStack), 3) if type == "bitcoin": type = 'btc_usd' hourStack.clear() sendToArtemis(pos, neu, neg, compound, type) else: log("Stack is empty", 'WARN') schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis") log("Collection will run again at {} every hour".format(timeF), 'INFO') def collectorMain(hashtag): log("Starting Tweet Collector", 'INFO') for i in range(len(hashtag)): Thread(target=collector, args=[hashtag[i]]).start() createHourJob() while True: schedule.run_pending() sleep(1)