Compare commits

..

No commits in common. "master" and "1.0.0-b12" have entirely different histories.

7 changed files with 55 additions and 277 deletions

View File

@ -119,11 +119,11 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 100m cpu: 32m
memory: 64Mi memory: 32Mi
limits: limits:
cpu: 500m cpu: 75m
memory: 256Mi memory: 64Mi
securityContext: securityContext:
capabilities: capabilities:
add: add:

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import os, sys, json, uuid import os, sys, json
from datetime import datetime, timedelta from datetime import datetime, timedelta
from time import sleep, time from time import sleep, time
@ -17,14 +17,8 @@ from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
from src.utils.whitelistedWords import filterOutTweetsWithNoneWhitelistedWords
from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError
hourStack = [] hourStack = []
dumpStack = []
processStack = []
class keys(): class keys():
@ -34,20 +28,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN") self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET") self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(syncId, pos, neu, neg, compound, type): def sendToArtemis(pos, neu, neg, compound):
timestamp = datetime.now() + timedelta(hours=1) timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound }
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO') log("Sending message to TweetSave queue", 'INFO')
log("Message: {}".format(message), 'INFO') log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson, syncId) activeMQSender(messageJson)
class Streamer(): class Streamer():
@ -55,8 +49,7 @@ class Streamer():
pass pass
def stream_tweets(self, hashtag): def stream_tweets(self, hashtag):
listener = Listener(hashtag) listener = Listener()
auth = OAuthHandler(keys().api_key, keys().api_secret) auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO') log("Authorising with twitter API...", 'INFO')
@ -67,21 +60,12 @@ class Streamer():
wait_on_rate_limit_notify=True) wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO') log("Streaming Tweets", 'INFO')
while True: stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
try:
stream = Stream(auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag) stream.filter(languages=["en"], track=hashtag)
except IncompleteRead:
log("Incomplete Read Error", 'ERR')
continue
except ProtocolError:
log("Protocol Error", 'ERR')
continue
class Listener(StreamListener): class Listener(StreamListener):
def __init__(self, hashtag, time_limit=3000): def __init__(self, time_limit=3000):
self.hashtag = hashtag
self.start_time = time() self.start_time = time()
self.limit = time_limit self.limit = time_limit
@ -90,44 +74,26 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit: if (time() - self.start_time) < self.limit:
data = json.loads(data) data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet # Check if tweet is a retweet
if 'retweeted_status' in data: if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit #if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text'] text = data['retweeted_status']['extended_tweet']['full_text']
text = filterOutTweetsWithNoneWhitelistedWords(text) self.processTweet(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
text = data['retweeted_status']['text'] text = data['retweeted_status']['text']
text = filterOutTweetsWithNoneWhitelistedWords(text) self.processTweet(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
# Else if a normal Tweeet # Else if a normal Tweeet
if 'extended_tweet' in data: if 'extended_tweet' in data:
# If tweet is over 140 word limit # If tweet is over 140 word limit
text = data['extended_tweet']['full_text'] text = data['extended_tweet']['full_text']
text = filterOutTweetsWithNoneWhitelistedWords(text) self.processTweet(text)
dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(syncId):
log(len(dumpStack), 'INFO')
processStack = dumpStack.copy()
dumpStack.clear()
# log("Processing [{}] Tweet...".format(text), 'INFO')
if len(processStack) != 0:
for tweet in processStack:
removedLines = fixLines(tweet["tweet"])
def processTweet(self, text):
removedLines = fixLines(text)
removedSpecialChars = cleanTweet(removedLines) removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0]) removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing) tweetLength = checkLength(removedSpacing)
if tweetLength == True: if tweetLength == True:
@ -141,17 +107,13 @@ def processTweet(syncId):
cleanedTweet = tweetText + ' ' + removedSpecialChars[1] cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
if callSpamFilter(cleanedTweet, syncId) != 'spam': if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId) hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound}
if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet) hourStack.append(hourTweet)
processStack.clear()
else:
log("Dump Stack was Empty", 'WARN')
def collector(hashtag): def collector(hashtag):
@ -174,57 +136,36 @@ def timeFunction():
return timeF return timeF
def createHourJob(): def createHourJob():
log("Creating hour job...", 'INFO')
schedule.clear("sendToArtemis") schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0 ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
type = ""
global timeF global timeF
timeF = timeFunction() timeF = timeFunction()
syncId = uuid.uuid4() if len(hourStack) != 0:
for item in hourStack:
processTweet(syncId)
processStack = hourStack.copy()
hourStack.clear()
log("Extracting sentiment scores...", 'INFO')
if len(processStack) != 0:
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
for item in processStack:
ovPos = ovPos + item['pos'] ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu'] ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg'] ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound'] ovCompound = ovCompound + item['compound']
type = item["type"]
pos = round(ovPos/len(processStack), 3) pos = round(ovPos/len(hourStack), 3)
neu = round(ovNeu/len(processStack), 3) neu = round(ovNeu/len(hourStack), 3)
neg = round(ovNeg/len(processStack), 3) neg = round(ovNeg/len(hourStack), 3)
compound = round(ovCompound/len(processStack), 3) compound = round(ovCompound/len(hourStack), 3)
if type == "bitcoin": hourStack.clear()
type = 'btc_usd'
processStack.clear() sendToArtemis(pos, neu, neg, compound)
sendToArtemis(syncId, pos, neu, neg, compound, type)
else:
log("Stack is empty", 'WARN')
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis") schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")
log("Collection will run again at {} every hour".format(timeF), 'INFO')
def collectorMain(hashtag): def collectorMain(hashtag):
log("Starting Tweet Collector", 'INFO') log("Starting Tweet Collector", 'INFO')
for i in range(len(hashtag)): for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start() Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob() createHourJob()
while True: while True:

View File

@ -17,20 +17,14 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message, syncId): def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level, syncId=""): def log(message, level):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.info(message)
elif level == 'WARN': elif level == 'WARN':
logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.warn(message)
elif level == 'ERR': elif level == 'ERR':
logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.error(message)
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId}) logger.debug(message)

View File

@ -9,20 +9,18 @@ class keys():
def __init__(self): def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL") self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet, syncId): def callSentimentAnalyser(tweet):
headers = { log("Calling Sentiment Analyser", 'INFO')
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
print(response["result"]["Score"])
scores = response["result"]["Score"] scores = response["result"]["Score"]
return scores["pos"], scores["neu"], scores["neg"], scores["compound"] return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except: except:
log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId) log("Could not call Sentiment Analyser Service", 'ERR')
return 0, 0, 0, 0 return 0, 0, 0, 0

View File

@ -9,18 +9,15 @@ class keys():
def __init__(self): def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL") self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet, syncId): def callSpamFilter(tweet):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", url=uri, headers=headers) response = requests.request("GET", uri)
response = json.loads(response.text) response = json.loads(response.text)
print(response["result"])
return response["result"] return response["result"]
except: except:
log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId) log("Could not call spam filter service", 'ERR')
return "" return ""

View File

@ -1,152 +0,0 @@
#!/usr/bin/env python
whitelist = [
"bull",
"bear",
"bullish",
"bearish",
"up",
"down",
"high",
"low",
"higher",
"lower",
"absconded",
"maximalists",
"regulate",
"infamous",
"tradehigher",
"tradelower",
"revival",
"centralized",
"decentralized",
"centralised",
"decentralised",
"decentralization",
"decentralisation",
"centralization",
"centralisation",
"bans",
"hodl",
"ambiguity",
"revolutionize",
"revolutionise",
"consolidation",
"shorts",
"longs",
"long",
"short",
"shorting",
"grow",
"volatile",
"rally",
"rallying",
"noob",
"noobs",
"innovation",
"bottom",
"top",
"topped",
"bottomed",
"upwards",
"downwards",
"invest",
"raging",
"rocketing",
"swing",
"swinging",
"stake",
"whale",
"whales",
"lull",
"moon",
"choppy",
"buy",
"buying",
"sell",
"selling",
"startselling",
"stopselling",
"startbuying",
"stopbuying",
"bitcoin",
"btc",
"eth",
"xmr",
"xrp",
"ripple",
"block",
"reward",
"airdrop",
"drop",
"raise",
"stack",
"stake",
"invest",
"pull",
"push",
"token",
"sale",
"unhappy",
"happy",
"expert",
"novice"
"passed",
"mark",
"decline",
"incline",
"fees",
"crypto",
"wallet",
"price",
"history",
"reached",
"upward",
"downward",
"trading",
"mining",
"defi",
"finance",
"blockchain",
"interest",
"alt",
"alts",
"fiat",
"fiat",
"currency",
"currencies",
"wealth",
"hype",
"hyped",
"achievement",
"platform",
"incremental",
"increment",
"decrement",
"decremental",
"success",
"loss",
"win",
"lose",
"worth",
"strongest",
"weakest",
"strong",
"weak",
"trade",
"popping",
"sucking",
"shard",
"sharding",
"industry",
"powerful",
"better",
"worse"
]
def filterOutTweetsWithNoneWhitelistedWords(text):
if any(x in text for x in whitelist):
return text
else:
# log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN')
return ""