Compare commits
18 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3f8ba4cec | ||
|
|
da43817926 | ||
|
|
17bd1399cc | ||
|
|
7325a509bd | ||
|
|
affd25379a | ||
|
|
0106ec9044 | ||
|
|
5c832a70fe | ||
|
|
3c25310d76 | ||
|
|
6626c0864b | ||
|
|
1d459420de | ||
|
|
fba738d690 | ||
|
|
73191de94f | ||
|
|
8e15f8c9f8 | ||
|
|
ab669a1ed7 | ||
|
|
9baf56f267 | ||
|
|
50e2ef40fd | ||
|
|
fe90e08191 | ||
|
|
f11b94df52 |
@ -119,10 +119,10 @@ spec:
|
||||
imagePullPolicy: Always
|
||||
resources:
|
||||
requests:
|
||||
cpu: 64m
|
||||
cpu: 100m
|
||||
memory: 64Mi
|
||||
limits:
|
||||
cpu: 150m
|
||||
cpu: 500m
|
||||
memory: 256Mi
|
||||
securityContext:
|
||||
capabilities:
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
import os, sys, json
|
||||
import os, sys, json, uuid
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from time import sleep, time
|
||||
@ -17,12 +17,14 @@ from src.utils.spamFilter import callSpamFilter
|
||||
from src.utils.sentimentAnalyser import callSentimentAnalyser
|
||||
from src.utils.activemqConnect import activeMQSender
|
||||
from src.utils.jsonLogger import log
|
||||
from src.utils.whitelistedWords import filterOutTweetsWithNoneWhitelistedWords
|
||||
|
||||
from http.client import IncompleteRead
|
||||
from urllib3.exceptions import ProtocolError
|
||||
|
||||
hourStack = []
|
||||
dumpStack = []
|
||||
processStack = []
|
||||
|
||||
class keys():
|
||||
|
||||
@ -32,20 +34,20 @@ class keys():
|
||||
self.access_token = os.getenv("ACCESS_TOKEN")
|
||||
self.access_secret = os.getenv("ACCESS_SECRET")
|
||||
|
||||
def sendToArtemis(pos, neu, neg, compound, type):
|
||||
def sendToArtemis(syncId, pos, neu, neg, compound, type):
|
||||
|
||||
timestamp = datetime.now() + timedelta(hours=1)
|
||||
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
|
||||
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
|
||||
|
||||
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
|
||||
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
|
||||
|
||||
messageJson = json.dumps(message, indent = 4)
|
||||
|
||||
log("Sending message to TweetSave queue", 'INFO')
|
||||
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
|
||||
log("Message: {}".format(message), 'INFO')
|
||||
|
||||
activeMQSender(messageJson)
|
||||
activeMQSender(messageJson, syncId)
|
||||
|
||||
class Streamer():
|
||||
|
||||
@ -54,6 +56,7 @@ class Streamer():
|
||||
|
||||
def stream_tweets(self, hashtag):
|
||||
listener = Listener(hashtag)
|
||||
|
||||
auth = OAuthHandler(keys().api_key, keys().api_secret)
|
||||
|
||||
log("Authorising with twitter API...", 'INFO')
|
||||
@ -66,23 +69,14 @@ class Streamer():
|
||||
|
||||
while True:
|
||||
try:
|
||||
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
|
||||
stream = Stream(auth, listener=listener, tweet_mode='extended')
|
||||
stream.filter(languages=["en"], track=hashtag)
|
||||
except IncompleteRead:
|
||||
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
|
||||
stream.filter(languages=["en"], track=hashtag)
|
||||
log("Incomplete Read Error", 'ERR')
|
||||
continue
|
||||
except ProtocolError:
|
||||
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
|
||||
stream.filter(languages=["en"], track=hashtag)
|
||||
log("Protocol Error", 'ERR')
|
||||
continue
|
||||
except:
|
||||
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
|
||||
stream.filter(languages=["en"], track=hashtag)
|
||||
log("Some other Error", 'ERR')
|
||||
continue
|
||||
|
||||
class Listener(StreamListener):
|
||||
|
||||
@ -96,40 +90,44 @@ class Listener(StreamListener):
|
||||
if (time() - self.start_time) < self.limit:
|
||||
data = json.loads(data)
|
||||
|
||||
log(len(dumpStack), 'INFO')
|
||||
|
||||
# Check if tweet is a retweet
|
||||
if 'retweeted_status' in data:
|
||||
if 'extended_tweet' in data['retweeted_status']:
|
||||
#if tweet is over the 140 word limit
|
||||
text = data['retweeted_status']['extended_tweet']['full_text']
|
||||
processTweet(text, self.hashtag)
|
||||
# dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
text = filterOutTweetsWithNoneWhitelistedWords(text)
|
||||
dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
else:
|
||||
text = data['retweeted_status']['text']
|
||||
processTweet(text, self.hashtag)
|
||||
# dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
text = filterOutTweetsWithNoneWhitelistedWords(text)
|
||||
dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
else:
|
||||
# Else if a normal Tweeet
|
||||
if 'extended_tweet' in data:
|
||||
# If tweet is over 140 word limit
|
||||
text = data['extended_tweet']['full_text']
|
||||
processTweet(text, self.hashtag)
|
||||
# dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
else:
|
||||
processTweet(data["text"], self.hashtag)
|
||||
text = filterOutTweetsWithNoneWhitelistedWords(text)
|
||||
dumpStack.append({'type': self.hashtag, 'tweet': text})
|
||||
|
||||
def processTweet(text, type):
|
||||
def processTweet(syncId):
|
||||
|
||||
log("hourStack size :: [{}]".format(len(hourStack)), "INFO")
|
||||
# processStack = dumpStack.copy()
|
||||
# dumpStack.clear()
|
||||
log(len(dumpStack), 'INFO')
|
||||
|
||||
# log("Processing [{}] Tweets...".format(len(processStack)), 'INFO')
|
||||
processStack = dumpStack.copy()
|
||||
dumpStack.clear()
|
||||
|
||||
# log("Processing [{}] Tweet...".format(text), 'INFO')
|
||||
|
||||
if len(processStack) != 0:
|
||||
for tweet in processStack:
|
||||
|
||||
removedLines = fixLines(tweet["tweet"])
|
||||
|
||||
# if len(processStack) != 0:
|
||||
# for tweet in processStack:
|
||||
removedLines = fixLines(text)
|
||||
removedSpecialChars = cleanTweet(removedLines)
|
||||
removedSpacing = removeSpacing(removedSpecialChars[0])
|
||||
|
||||
tweetLength = checkLength(removedSpacing)
|
||||
if tweetLength == True:
|
||||
|
||||
@ -143,16 +141,17 @@ def processTweet(text, type):
|
||||
|
||||
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
|
||||
|
||||
if callSpamFilter(cleanedTweet) != 'spam':
|
||||
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
|
||||
if callSpamFilter(cleanedTweet, syncId) != 'spam':
|
||||
|
||||
if compound != 0.0 | neu <= 0.8:
|
||||
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': type}
|
||||
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
|
||||
|
||||
if compound != 0.0 and neu <= 0.6:
|
||||
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
|
||||
|
||||
hourStack.append(hourTweet)
|
||||
# processStack.clear()
|
||||
# else:
|
||||
# log("Dump Stack was Empty", 'WARN')
|
||||
processStack.clear()
|
||||
else:
|
||||
log("Dump Stack was Empty", 'WARN')
|
||||
|
||||
def collector(hashtag):
|
||||
|
||||
@ -178,17 +177,23 @@ def createHourJob():
|
||||
log("Creating hour job...", 'INFO')
|
||||
schedule.clear("sendToArtemis")
|
||||
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
|
||||
type = ""
|
||||
|
||||
global timeF
|
||||
timeF = timeFunction()
|
||||
|
||||
syncId = uuid.uuid4()
|
||||
|
||||
processTweet(syncId)
|
||||
|
||||
processStack = hourStack.copy()
|
||||
hourStack.clear()
|
||||
|
||||
log("Extracting sentiment scores...", 'INFO')
|
||||
|
||||
if len(processStack) != 0:
|
||||
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
|
||||
for item in hourStack:
|
||||
for item in processStack:
|
||||
ovPos = ovPos + item['pos']
|
||||
ovNeu = ovNeu + item['neu']
|
||||
ovNeg = ovNeg + item['neg']
|
||||
@ -203,9 +208,9 @@ def createHourJob():
|
||||
if type == "bitcoin":
|
||||
type = 'btc_usd'
|
||||
|
||||
hourStack.clear()
|
||||
processStack.clear()
|
||||
|
||||
sendToArtemis(pos, neu, neg, compound, type)
|
||||
sendToArtemis(syncId, pos, neu, neg, compound, type)
|
||||
else:
|
||||
log("Stack is empty", 'WARN')
|
||||
|
||||
@ -219,6 +224,7 @@ def collectorMain(hashtag):
|
||||
for i in range(len(hashtag)):
|
||||
Thread(target=collector, args=[hashtag[i]]).start()
|
||||
|
||||
sleep(2)
|
||||
createHourJob()
|
||||
|
||||
while True:
|
||||
|
||||
@ -17,14 +17,20 @@ class keys():
|
||||
def returnKeys(self):
|
||||
return self.addr, self.port, self.amqU, self.amqP
|
||||
|
||||
def activeMQSender(message):
|
||||
def activeMQSender(message, syncId):
|
||||
addr, port, mqUser, mqPass = keys().returnKeys()
|
||||
|
||||
log("Attempting Connection to Artemis...", 'INFO')
|
||||
con = stomp.Connection([(addr, port)], auto_content_length=False)
|
||||
con.connect( mqUser, mqPass, wait=True)
|
||||
|
||||
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
|
||||
con.send("TweetSave",
|
||||
message,
|
||||
content_type="application/json",
|
||||
headers={
|
||||
"Content-Type":"application/json",
|
||||
"X-CRYPTO-Sync-ID":syncId
|
||||
})
|
||||
|
||||
con.disconnect()
|
||||
|
||||
|
||||
@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
|
||||
logHandler.setFormatter(formatter)
|
||||
logger.addHandler(logHandler)
|
||||
|
||||
def log(message, level):
|
||||
def log(message, level, syncId=""):
|
||||
logger = logging.getLogger(__name__)
|
||||
if level == 'INFO':
|
||||
logger.info(message)
|
||||
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||
elif level == 'WARN':
|
||||
logger.warn(message)
|
||||
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||
elif level == 'ERR':
|
||||
logger.error(message)
|
||||
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||
elif level == 'DEBUG':
|
||||
logger.debug(message)
|
||||
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})
|
||||
@ -9,11 +9,14 @@ class keys():
|
||||
def __init__(self):
|
||||
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
|
||||
|
||||
def callSentimentAnalyser(tweet):
|
||||
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO')
|
||||
def callSentimentAnalyser(tweet, syncId):
|
||||
headers = {
|
||||
"content-type":"text",
|
||||
"X-CRYPTO-Sync-ID" : str(syncId)
|
||||
}
|
||||
try:
|
||||
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
|
||||
response = requests.request("GET", uri)
|
||||
response = requests.request("GET", url=uri, headers=headers)
|
||||
|
||||
response = json.loads(response.text)
|
||||
|
||||
@ -21,5 +24,5 @@ def callSentimentAnalyser(tweet):
|
||||
|
||||
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
|
||||
except:
|
||||
log("Could not call Sentiment Analyser Service", 'ERR')
|
||||
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
|
||||
return 0, 0, 0, 0
|
||||
@ -9,16 +9,18 @@ class keys():
|
||||
def __init__(self):
|
||||
self.spamFilter_uri = os.getenv("FILTER_URL")
|
||||
|
||||
def callSpamFilter(tweet):
|
||||
def callSpamFilter(tweet, syncId):
|
||||
headers = {
|
||||
"content-type":"text",
|
||||
"X-CRYPTO-Sync-ID" : str(syncId)
|
||||
}
|
||||
try:
|
||||
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
|
||||
response = requests.request("GET", uri)
|
||||
response = requests.request("GET", url=uri, headers=headers)
|
||||
|
||||
response = json.loads(response.text)
|
||||
|
||||
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
|
||||
|
||||
return response["result"]
|
||||
except:
|
||||
log("Could not call spam filter service", 'ERR')
|
||||
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
|
||||
return ""
|
||||
152
src/utils/whitelistedWords.py
Normal file
152
src/utils/whitelistedWords.py
Normal file
@ -0,0 +1,152 @@
|
||||
#!/usr/bin/env python
|
||||
|
||||
whitelist = [
|
||||
"bull",
|
||||
"bear",
|
||||
"bullish",
|
||||
"bearish",
|
||||
"up",
|
||||
"down",
|
||||
"high",
|
||||
"low",
|
||||
"higher",
|
||||
"lower",
|
||||
"absconded",
|
||||
"maximalists",
|
||||
"regulate",
|
||||
"infamous",
|
||||
"tradehigher",
|
||||
"tradelower",
|
||||
"revival",
|
||||
"centralized",
|
||||
"decentralized",
|
||||
"centralised",
|
||||
"decentralised",
|
||||
"decentralization",
|
||||
"decentralisation",
|
||||
"centralization",
|
||||
"centralisation",
|
||||
"bans",
|
||||
"hodl",
|
||||
"ambiguity",
|
||||
"revolutionize",
|
||||
"revolutionise",
|
||||
"consolidation",
|
||||
"shorts",
|
||||
"longs",
|
||||
"long",
|
||||
"short",
|
||||
"shorting",
|
||||
"grow",
|
||||
"volatile",
|
||||
"rally",
|
||||
"rallying",
|
||||
"noob",
|
||||
"noobs",
|
||||
"innovation",
|
||||
"bottom",
|
||||
"top",
|
||||
"topped",
|
||||
"bottomed",
|
||||
"upwards",
|
||||
"downwards",
|
||||
"invest",
|
||||
"raging",
|
||||
"rocketing",
|
||||
"swing",
|
||||
"swinging",
|
||||
"stake",
|
||||
"whale",
|
||||
"whales",
|
||||
"lull",
|
||||
"moon",
|
||||
"choppy",
|
||||
"buy",
|
||||
"buying",
|
||||
"sell",
|
||||
"selling",
|
||||
"startselling",
|
||||
"stopselling",
|
||||
"startbuying",
|
||||
"stopbuying",
|
||||
"bitcoin",
|
||||
"btc",
|
||||
"eth",
|
||||
"xmr",
|
||||
"xrp",
|
||||
"ripple",
|
||||
"block",
|
||||
"reward",
|
||||
"airdrop",
|
||||
"drop",
|
||||
"raise",
|
||||
"stack",
|
||||
"stake",
|
||||
"invest",
|
||||
"pull",
|
||||
"push",
|
||||
"token",
|
||||
"sale",
|
||||
"unhappy",
|
||||
"happy",
|
||||
"expert",
|
||||
"novice"
|
||||
"passed",
|
||||
"mark",
|
||||
"decline",
|
||||
"incline",
|
||||
"fees",
|
||||
"crypto",
|
||||
"wallet",
|
||||
"price",
|
||||
"history",
|
||||
"reached",
|
||||
"upward",
|
||||
"downward",
|
||||
"trading",
|
||||
"mining",
|
||||
"defi",
|
||||
"finance",
|
||||
"blockchain",
|
||||
"interest",
|
||||
"alt",
|
||||
"alts",
|
||||
"fiat",
|
||||
"fiat",
|
||||
"currency",
|
||||
"currencies",
|
||||
"wealth",
|
||||
"hype",
|
||||
"hyped",
|
||||
"achievement",
|
||||
"platform",
|
||||
"incremental",
|
||||
"increment",
|
||||
"decrement",
|
||||
"decremental",
|
||||
"success",
|
||||
"loss",
|
||||
"win",
|
||||
"lose",
|
||||
"worth",
|
||||
"strongest",
|
||||
"weakest",
|
||||
"strong",
|
||||
"weak",
|
||||
"trade",
|
||||
"popping",
|
||||
"sucking",
|
||||
"shard",
|
||||
"sharding",
|
||||
"industry",
|
||||
"powerful",
|
||||
"better",
|
||||
"worse"
|
||||
]
|
||||
|
||||
def filterOutTweetsWithNoneWhitelistedWords(text):
|
||||
if any(x in text for x in whitelist):
|
||||
return text
|
||||
else:
|
||||
# log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN')
|
||||
return ""
|
||||
Loading…
x
Reference in New Issue
Block a user