From 11c29cc92192044410a1d92a15b3d1e1313709e7 Mon Sep 17 00:00:00 2001 From: andrewso <9V5f1FkzI2LD> Date: Thu, 6 Feb 2020 18:16:32 +0000 Subject: [PATCH] [05.02.20] Start of tweet collector merge micro service Spam and analysis service need to be extracted and messages sent to activemq --- src/__init__.py | 0 src/main.py | 0 src/tweets/__init__.py | 0 src/tweets/collector.py | 117 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 117 insertions(+) create mode 100644 src/__init__.py create mode 100644 src/main.py create mode 100644 src/tweets/__init__.py create mode 100644 src/tweets/collector.py diff --git a/src/__init__.py b/src/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/main.py b/src/main.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tweets/__init__.py b/src/tweets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/tweets/collector.py b/src/tweets/collector.py new file mode 100644 index 0000000..855f202 --- /dev/null +++ b/src/tweets/collector.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python + +import os, re, sys + +from nltk import wordpunct_tokenize +from nltk.corpus import stopwords +from datetime import datetime, timedelta +import time + +from tweepy import OAuthHandler +from tweepy import Stream +from tweepy.streaming import StreamListener + +from dotenv import load_dotenv +from pathlib import Path # python3 only +env_path = Path('.') / 'configuration/twitter.env' +load_dotenv(dotenv_path=env_path) + +class keys(): + + def __init__(self): + self.api_key = os.getenv("API_KEY") + self.api_secret = os.getenv("API_SECRET") + self.access_token = os.getenv("ACCESS_TOKEN") + self.access_secret = os.getenv("ACCESS_SECRET") + self.currency_hashtags = os.getenv("CURRENCY_HASHTAGS") + +class Streamer(): + + def __init__(self): + pass + + def stream_tweets(self, tweets_file, temp_tweets, hashtag, tweetFilter, analyser): + listener = Listener(tweets_file, temp_tweets, tweetFilter, analyser) + auth = OAuthHandler(keys().api_key, keys().api_secret) + + print("Console: ", "Authorising with twitter API") + sys.stdout.flush() + + auth.set_access_token(keys().access_token, keys().access_secret) + + print("Console: ", "Streaming Tweets") + sys.stdout.flush() + + stream = Stream(auth, listener, tweet_mode='extended') + stream.filter(languages=["en"], track=hashtag) + +class Listener(StreamListener): + + def __init__(self, tweets_file, temp_tweets, tweetFilter, analyser, time_limit=3000): + self.tweets_file = tweets_file + self.temp_tweets = temp_tweets + self.tweetFilter = tweetFilter + self.analyser = analyser + self.stack = {} + + self.start_time = time.time() + self.limit = time_limit + + def on_data(self, data): + + if (time.time() - self.start_time) < self.limit: + + now = datetime.now() + timedelta(hours=1) + + data = json.loads(data) + + try: + # Check if tweet is a retweet + if 'retweeted_status' in data: + if 'extended_tweet' in data['retweeted_status']: + #if tweet is over the 140 word limit + text = data['retweeted_status']['extended_tweet']['full_text'] + print("Uncleaned Tweet:", text) + sys.stdout.flush() + else: + text = data['retweeted_status']['text'] + print("Uncleaned Tweet:", text) + sys.stdout.flush() + else: + # Else if a normal Tweeet + if 'extended_tweet' in data: + # If tweet is over 140 word limit + text = data['extended_tweet']['full_text'] + print("Uncleaned Tweet:", text) + sys.stdout.flush() + else: + text = data['text'] + print("Uncleaned Tweet: ", text) + sys.stdout.flush() + + removedLines = utilityFuncs().fixLines(text) + removedSpecialChars = utilityFuncs().cleanTweet(removedLines) + removedSpacing = utilityFuncs().removeSpacing(removedSpecialChars[0]) + + tweetLength = utilityFuncs().checkLength(removedSpacing) + + if tweetLength == True: + + checkIfEnglish = utilityFuncs().detectLaguage(removedSpecialChars[0]) + + + if checkIfEnglish == True: + + tweetText = utilityFuncs().remove_non_ascii(removedSpacing) + + print("Cleaned Tweet: ", tweetText) + sys.stdout.flush() + + cleanedTweet = tweetText+' '+removedSpecialChars[1] + + ## Check with spam filter + # Sent Tweet to http request to spam filter service queue + +class queueListener(): + ## Check with spam filter + classification = self.tweetFilter.testTweet(cleanedTweet) \ No newline at end of file