[05.02.20] Start of tweet collector merge micro service
Spam and analysis service need to be extracted and messages sent to activemq
This commit is contained in:
parent
f9f6cae2ee
commit
11c29cc921
0
src/__init__.py
Normal file
0
src/__init__.py
Normal file
0
src/main.py
Normal file
0
src/main.py
Normal file
0
src/tweets/__init__.py
Normal file
0
src/tweets/__init__.py
Normal file
117
src/tweets/collector.py
Normal file
117
src/tweets/collector.py
Normal file
@ -0,0 +1,117 @@
|
|||||||
|
#!/usr/bin/env python
|
||||||
|
|
||||||
|
import os, re, sys
|
||||||
|
|
||||||
|
from nltk import wordpunct_tokenize
|
||||||
|
from nltk.corpus import stopwords
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import time
|
||||||
|
|
||||||
|
from tweepy import OAuthHandler
|
||||||
|
from tweepy import Stream
|
||||||
|
from tweepy.streaming import StreamListener
|
||||||
|
|
||||||
|
from dotenv import load_dotenv
|
||||||
|
from pathlib import Path # python3 only
|
||||||
|
env_path = Path('.') / 'configuration/twitter.env'
|
||||||
|
load_dotenv(dotenv_path=env_path)
|
||||||
|
|
||||||
|
class keys():
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.api_key = os.getenv("API_KEY")
|
||||||
|
self.api_secret = os.getenv("API_SECRET")
|
||||||
|
self.access_token = os.getenv("ACCESS_TOKEN")
|
||||||
|
self.access_secret = os.getenv("ACCESS_SECRET")
|
||||||
|
self.currency_hashtags = os.getenv("CURRENCY_HASHTAGS")
|
||||||
|
|
||||||
|
class Streamer():
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def stream_tweets(self, tweets_file, temp_tweets, hashtag, tweetFilter, analyser):
|
||||||
|
listener = Listener(tweets_file, temp_tweets, tweetFilter, analyser)
|
||||||
|
auth = OAuthHandler(keys().api_key, keys().api_secret)
|
||||||
|
|
||||||
|
print("Console: ", "Authorising with twitter API")
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
auth.set_access_token(keys().access_token, keys().access_secret)
|
||||||
|
|
||||||
|
print("Console: ", "Streaming Tweets")
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
stream = Stream(auth, listener, tweet_mode='extended')
|
||||||
|
stream.filter(languages=["en"], track=hashtag)
|
||||||
|
|
||||||
|
class Listener(StreamListener):
|
||||||
|
|
||||||
|
def __init__(self, tweets_file, temp_tweets, tweetFilter, analyser, time_limit=3000):
|
||||||
|
self.tweets_file = tweets_file
|
||||||
|
self.temp_tweets = temp_tweets
|
||||||
|
self.tweetFilter = tweetFilter
|
||||||
|
self.analyser = analyser
|
||||||
|
self.stack = {}
|
||||||
|
|
||||||
|
self.start_time = time.time()
|
||||||
|
self.limit = time_limit
|
||||||
|
|
||||||
|
def on_data(self, data):
|
||||||
|
|
||||||
|
if (time.time() - self.start_time) < self.limit:
|
||||||
|
|
||||||
|
now = datetime.now() + timedelta(hours=1)
|
||||||
|
|
||||||
|
data = json.loads(data)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Check if tweet is a retweet
|
||||||
|
if 'retweeted_status' in data:
|
||||||
|
if 'extended_tweet' in data['retweeted_status']:
|
||||||
|
#if tweet is over the 140 word limit
|
||||||
|
text = data['retweeted_status']['extended_tweet']['full_text']
|
||||||
|
print("Uncleaned Tweet:", text)
|
||||||
|
sys.stdout.flush()
|
||||||
|
else:
|
||||||
|
text = data['retweeted_status']['text']
|
||||||
|
print("Uncleaned Tweet:", text)
|
||||||
|
sys.stdout.flush()
|
||||||
|
else:
|
||||||
|
# Else if a normal Tweeet
|
||||||
|
if 'extended_tweet' in data:
|
||||||
|
# If tweet is over 140 word limit
|
||||||
|
text = data['extended_tweet']['full_text']
|
||||||
|
print("Uncleaned Tweet:", text)
|
||||||
|
sys.stdout.flush()
|
||||||
|
else:
|
||||||
|
text = data['text']
|
||||||
|
print("Uncleaned Tweet: ", text)
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
removedLines = utilityFuncs().fixLines(text)
|
||||||
|
removedSpecialChars = utilityFuncs().cleanTweet(removedLines)
|
||||||
|
removedSpacing = utilityFuncs().removeSpacing(removedSpecialChars[0])
|
||||||
|
|
||||||
|
tweetLength = utilityFuncs().checkLength(removedSpacing)
|
||||||
|
|
||||||
|
if tweetLength == True:
|
||||||
|
|
||||||
|
checkIfEnglish = utilityFuncs().detectLaguage(removedSpecialChars[0])
|
||||||
|
|
||||||
|
|
||||||
|
if checkIfEnglish == True:
|
||||||
|
|
||||||
|
tweetText = utilityFuncs().remove_non_ascii(removedSpacing)
|
||||||
|
|
||||||
|
print("Cleaned Tweet: ", tweetText)
|
||||||
|
sys.stdout.flush()
|
||||||
|
|
||||||
|
cleanedTweet = tweetText+' '+removedSpecialChars[1]
|
||||||
|
|
||||||
|
## Check with spam filter
|
||||||
|
# Sent Tweet to http request to spam filter service queue
|
||||||
|
|
||||||
|
class queueListener():
|
||||||
|
## Check with spam filter
|
||||||
|
classification = self.tweetFilter.testTweet(cleanedTweet)
|
||||||
Loading…
x
Reference in New Issue
Block a user