Compare commits

...

57 Commits

Author SHA1 Message Date
andrewso
b3f8ba4cec [15.10.20] Testing 2020-10-15 20:19:39 +01:00
andrewso
da43817926 [15.10.20] Testing 2020-10-15 20:15:55 +01:00
andrewso
17bd1399cc [15.10.20] Testing 2020-10-15 18:10:57 +01:00
andrewso
7325a509bd [15.10.20] Testing 2020-10-15 17:19:44 +01:00
andrewso
affd25379a [15.10.20] Testing 2020-10-15 14:01:06 +01:00
andrewso
0106ec9044 [15.10.20] Testing 2020-10-15 11:31:50 +01:00
andrewso
5c832a70fe [15.10.20] Testing 2020-10-15 11:20:50 +01:00
andrewso
3c25310d76 [14.10.20] Testing 2020-10-14 18:51:36 +01:00
andrewso
6626c0864b [14.10.20] Testing 2020-10-14 18:11:10 +01:00
andrewso
1d459420de [14.10.20] Testing 2020-10-14 13:17:16 +01:00
andrewso
fba738d690 [14.10.20] Testing 2020-10-14 12:04:08 +01:00
andrewso
73191de94f [14.10.20] Testing - Limits 2020-10-14 12:00:30 +01:00
andrewso
8e15f8c9f8 [14.10.20] Testing 2020-10-14 09:48:40 +01:00
andrewso
ab669a1ed7 [13.10.20] Testing 2020-10-13 09:36:20 +01:00
andrewso
9baf56f267 [13.10.20] Testing 2020-10-13 09:31:39 +01:00
andrewso
50e2ef40fd [12.10.20] Testing 2020-10-12 18:05:18 +01:00
andrewso
fe90e08191 [12.10.20] Testing 2020-10-12 17:58:59 +01:00
andrewso
f11b94df52 [12.10.20] Testing 2020-10-12 17:48:57 +01:00
andrewso
ab1288351c [12.10.20] Testing 2020-10-12 17:41:15 +01:00
andrewso
e2df32e5d6 [12.10.20] Testing 2020-10-12 17:30:37 +01:00
andrewso
dea841345c [12.10.20] Testing 2020-10-12 17:27:48 +01:00
andrewso
73c4a171ab [12.10.20] Testing 2020-10-12 17:26:06 +01:00
andrewso
f9cfa0b83c [12.10.20] Testing 2020-10-12 17:17:27 +01:00
andrewso
3a008ea088 [12.10.20] Testing - not allowing over 80% Neutral Tweets to be passed, and removed some logging noise 2020-10-12 16:51:05 +01:00
andrewso
c4b57a7e0f [12.10.20] Testing 2020-10-12 15:38:21 +01:00
andrewso
b7bdeb22f9 [12.10.20] Testing 2020-10-12 15:34:07 +01:00
andrewso
060695c809 [12.10.20] Testing 2020-10-12 15:33:23 +01:00
andrewso
c095aa1560 [12.10.20] Testing 2020-10-12 13:07:04 +01:00
andrewso
477cab8f09 [12.10.20] Testing 2020-10-12 11:51:37 +01:00
andrewso
167932e390 [11.10.20] Testing 2020-10-11 17:59:52 +01:00
andrewso
0b52e13a67 [11.10.20] Removal of Testing 2020-10-11 14:03:45 +01:00
andrewso
700eaaa73e [10.10.20] Testing 2020-10-10 23:08:15 +01:00
andrewso
335fc5e8c6 [10.10.20] Testing 2020-10-10 23:05:39 +01:00
andrewso
fe991b71a6 [10.10.20] Testing 2020-10-10 23:01:44 +01:00
andrewso
8542ebd0ae [10.10.20] Testing 2020-10-10 22:55:45 +01:00
andrewso
0c41b72602 [10.10.20] Testing 2020-10-10 22:49:26 +01:00
andrewso
044fd62dcf [10.10.20] Testing 2020-10-10 22:43:33 +01:00
andrewso
18e5f50d61 [10.10.20] Testing 2020-10-10 22:38:00 +01:00
andrewso
2eb41af132 [10.10.20] Testing 2020-10-10 22:34:37 +01:00
andrewso
8ff0f53887 [10.10.20] Testing 2020-10-10 22:31:23 +01:00
andrewso
5882204833 [10.10.20] Testing 2020-10-10 22:15:57 +01:00
andrewso
2a06d431b2 [10.10.20] Testing 2020-10-10 22:12:44 +01:00
andrewso
886470d17e [10.10.20] Type and resource increase 2020-10-10 19:59:08 +01:00
andrewso
a65be3b9be [10.10.20] clearing stacks 2020-10-10 11:00:31 +01:00
06c95da108 [09.10.2020] Handling of incomplete read when API gets overloaded 2020-10-09 22:57:05 +01:00
andrewso
0903b69b8e [09.10.20] Filtering out neutral tweets 2020-10-09 20:11:04 +01:00
andrewso
e19e1eac46 [09.10.20] Connection testing 2020-10-09 19:52:57 +01:00
andrewso
651bea1cd4 [09.10.20] Connection testing 2020-10-09 19:49:13 +01:00
andrewso
139619bd79 [09.10.20] Connection testing 2020-10-09 19:44:47 +01:00
andrewso
8076b6420a [09.10.20] Connection testing 2020-10-09 19:34:11 +01:00
andrewso
70a46842ed [09.10.20] Connection testing 2020-10-09 19:30:34 +01:00
andrewso
f934e6acbe [09.10.20] Connection testing 2020-10-09 19:22:05 +01:00
andrewso
73003deb68 [09.10.20] flask package 2020-10-09 19:13:04 +01:00
andrewso
ec2aceda5a [09.10.20] Emoji package 2020-10-09 18:56:53 +01:00
andrewso
cadb3a9278 [09.10.20] NLTK package itself 2020-10-09 18:26:56 +01:00
andrewso
6bdb4f118d [09.10.20] NLTK packages path 2020-10-09 18:25:00 +01:00
andrewso
8977ba6332 [09.10.20] NLTK packages to dockerfile 2020-10-09 18:21:54 +01:00
9 changed files with 291 additions and 54 deletions

View File

@ -4,7 +4,9 @@ RUN apk update && \
apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \
pip install python-dotenv schedule tweepy stomp.py python-json-logger
pip install python-dotenv schedule tweepy stomp.py python-json-logger nltk emoji flask && \
rm -rf /var/lib/apt/lists/*
COPY . /home/tweet-collector/.
RUN python3 /home/tweet-collector/configuration/scripts/nltk_package_downloads.py
EXPOSE 9090
CMD ["python", "/home/tweet-collector/src/main.py"]

View File

@ -119,11 +119,11 @@ spec:
imagePullPolicy: Always
resources:
requests:
cpu: 32m
memory: 32Mi
limits:
cpu: 75m
cpu: 100m
memory: 64Mi
limits:
cpu: 500m
memory: 256Mi
securityContext:
capabilities:
add:

View File

@ -0,0 +1,7 @@
#!/usr/bin/env python
import nltk
if __name__ == '__main__':
nltk.download('wordpunct_tokenize')
nltk.download('stopwords')

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
import os, sys, json
import os, sys, json, uuid
from datetime import datetime, timedelta
from time import sleep, time
@ -17,8 +17,14 @@ from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log
from src.utils.whitelistedWords import filterOutTweetsWithNoneWhitelistedWords
from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError
hourStack = []
dumpStack = []
processStack = []
class keys():
@ -28,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound):
def sendToArtemis(syncId, pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound }
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO')
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson)
activeMQSender(messageJson, syncId)
class Streamer():
@ -49,7 +55,8 @@ class Streamer():
pass
def stream_tweets(self, hashtag):
listener = Listener()
listener = Listener(hashtag)
auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO')
@ -60,12 +67,21 @@ class Streamer():
wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO')
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
while True:
try:
stream = Stream(auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
except IncompleteRead:
log("Incomplete Read Error", 'ERR')
continue
except ProtocolError:
log("Protocol Error", 'ERR')
continue
class Listener(StreamListener):
def __init__(self, time_limit=3000):
def __init__(self, hashtag, time_limit=3000):
self.hashtag = hashtag
self.start_time = time()
self.limit = time_limit
@ -74,26 +90,44 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit:
data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text']
self.processTweet(text)
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
text = data['retweeted_status']['text']
self.processTweet(text)
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else:
# Else if a normal Tweeet
if 'extended_tweet' in data:
# If tweet is over 140 word limit
text = data['extended_tweet']['full_text']
self.processTweet(text)
text = filterOutTweetsWithNoneWhitelistedWords(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(syncId):
log(len(dumpStack), 'INFO')
processStack = dumpStack.copy()
dumpStack.clear()
# log("Processing [{}] Tweet...".format(text), 'INFO')
if len(processStack) != 0:
for tweet in processStack:
removedLines = fixLines(tweet["tweet"])
def processTweet(self, text):
removedLines = fixLines(text)
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
@ -107,13 +141,17 @@ class Listener(StreamListener):
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if callSpamFilter(cleanedTweet, syncId) != 'spam':
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound}
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet)
processStack.clear()
else:
log("Dump Stack was Empty", 'WARN')
def collector(hashtag):
@ -136,36 +174,57 @@ def timeFunction():
return timeF
def createHourJob():
log("Creating hour job...", 'INFO')
schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
type = ""
global timeF
timeF = timeFunction()
if len(hourStack) != 0:
for item in hourStack:
syncId = uuid.uuid4()
processTweet(syncId)
processStack = hourStack.copy()
hourStack.clear()
log("Extracting sentiment scores...", 'INFO')
if len(processStack) != 0:
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
for item in processStack:
ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound']
type = item["type"]
pos = round(ovPos/len(hourStack), 3)
neu = round(ovNeu/len(hourStack), 3)
neg = round(ovNeg/len(hourStack), 3)
compound = round(ovCompound/len(hourStack), 3)
pos = round(ovPos/len(processStack), 3)
neu = round(ovNeu/len(processStack), 3)
neg = round(ovNeg/len(processStack), 3)
compound = round(ovCompound/len(processStack), 3)
hourStack.clear()
if type == "bitcoin":
type = 'btc_usd'
sendToArtemis(pos, neu, neg, compound)
processStack.clear()
sendToArtemis(syncId, pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")
log("Collection will run again at {} every hour".format(timeF), 'INFO')
def collectorMain(hashtag):
log("Starting Tweet Collector", 'INFO')
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob()
while True:

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message):
def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.send("TweetSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level):
def log(message, level, syncId=""):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message)
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN':
logger.warn(message)
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR':
logger.error(message)
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG':
logger.debug(message)
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})

View File

@ -9,15 +9,20 @@ class keys():
def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet):
log("Calling Sentiment Analyser", 'INFO')
def callSentimentAnalyser(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", uri)
response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text)
scores = response["result"]["Score"]
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except:
log("Could not call Sentiment Analyser Service", 'ERR')
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
return 0, 0, 0, 0

View File

@ -9,12 +9,18 @@ class keys():
def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet):
def callSpamFilter(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", uri)
response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text)
return response["result"]
except:
log("Could not call spam filter service", 'ERR')
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
return ""

View File

@ -0,0 +1,152 @@
#!/usr/bin/env python
whitelist = [
"bull",
"bear",
"bullish",
"bearish",
"up",
"down",
"high",
"low",
"higher",
"lower",
"absconded",
"maximalists",
"regulate",
"infamous",
"tradehigher",
"tradelower",
"revival",
"centralized",
"decentralized",
"centralised",
"decentralised",
"decentralization",
"decentralisation",
"centralization",
"centralisation",
"bans",
"hodl",
"ambiguity",
"revolutionize",
"revolutionise",
"consolidation",
"shorts",
"longs",
"long",
"short",
"shorting",
"grow",
"volatile",
"rally",
"rallying",
"noob",
"noobs",
"innovation",
"bottom",
"top",
"topped",
"bottomed",
"upwards",
"downwards",
"invest",
"raging",
"rocketing",
"swing",
"swinging",
"stake",
"whale",
"whales",
"lull",
"moon",
"choppy",
"buy",
"buying",
"sell",
"selling",
"startselling",
"stopselling",
"startbuying",
"stopbuying",
"bitcoin",
"btc",
"eth",
"xmr",
"xrp",
"ripple",
"block",
"reward",
"airdrop",
"drop",
"raise",
"stack",
"stake",
"invest",
"pull",
"push",
"token",
"sale",
"unhappy",
"happy",
"expert",
"novice"
"passed",
"mark",
"decline",
"incline",
"fees",
"crypto",
"wallet",
"price",
"history",
"reached",
"upward",
"downward",
"trading",
"mining",
"defi",
"finance",
"blockchain",
"interest",
"alt",
"alts",
"fiat",
"fiat",
"currency",
"currencies",
"wealth",
"hype",
"hyped",
"achievement",
"platform",
"incremental",
"increment",
"decrement",
"decremental",
"success",
"loss",
"win",
"lose",
"worth",
"strongest",
"weakest",
"strong",
"weak",
"trade",
"popping",
"sucking",
"shard",
"sharding",
"industry",
"powerful",
"better",
"worse"
]
def filterOutTweetsWithNoneWhitelistedWords(text):
if any(x in text for x in whitelist):
return text
else:
# log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN')
return ""