Compare commits

..

22 Commits

Author SHA1 Message Date
andrewso
b3f8ba4cec [15.10.20] Testing 2020-10-15 20:19:39 +01:00
andrewso
da43817926 [15.10.20] Testing 2020-10-15 20:15:55 +01:00
andrewso
17bd1399cc [15.10.20] Testing 2020-10-15 18:10:57 +01:00
andrewso
7325a509bd [15.10.20] Testing 2020-10-15 17:19:44 +01:00
andrewso
affd25379a [15.10.20] Testing 2020-10-15 14:01:06 +01:00
andrewso
0106ec9044 [15.10.20] Testing 2020-10-15 11:31:50 +01:00
andrewso
5c832a70fe [15.10.20] Testing 2020-10-15 11:20:50 +01:00
andrewso
3c25310d76 [14.10.20] Testing 2020-10-14 18:51:36 +01:00
andrewso
6626c0864b [14.10.20] Testing 2020-10-14 18:11:10 +01:00
andrewso
1d459420de [14.10.20] Testing 2020-10-14 13:17:16 +01:00
andrewso
fba738d690 [14.10.20] Testing 2020-10-14 12:04:08 +01:00
andrewso
73191de94f [14.10.20] Testing - Limits 2020-10-14 12:00:30 +01:00
andrewso
8e15f8c9f8 [14.10.20] Testing 2020-10-14 09:48:40 +01:00
andrewso
ab669a1ed7 [13.10.20] Testing 2020-10-13 09:36:20 +01:00
andrewso
9baf56f267 [13.10.20] Testing 2020-10-13 09:31:39 +01:00
andrewso
50e2ef40fd [12.10.20] Testing 2020-10-12 18:05:18 +01:00
andrewso
fe90e08191 [12.10.20] Testing 2020-10-12 17:58:59 +01:00
andrewso
f11b94df52 [12.10.20] Testing 2020-10-12 17:48:57 +01:00
andrewso
ab1288351c [12.10.20] Testing 2020-10-12 17:41:15 +01:00
andrewso
e2df32e5d6 [12.10.20] Testing 2020-10-12 17:30:37 +01:00
andrewso
dea841345c [12.10.20] Testing 2020-10-12 17:27:48 +01:00
andrewso
73c4a171ab [12.10.20] Testing 2020-10-12 17:26:06 +01:00
7 changed files with 238 additions and 62 deletions

View File

@ -119,10 +119,10 @@ spec:
imagePullPolicy: Always imagePullPolicy: Always
resources: resources:
requests: requests:
cpu: 64m cpu: 100m
memory: 64Mi memory: 64Mi
limits: limits:
cpu: 150m cpu: 500m
memory: 256Mi memory: 256Mi
securityContext: securityContext:
capabilities: capabilities:

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
import os, sys, json import os, sys, json, uuid
from datetime import datetime, timedelta from datetime import datetime, timedelta
from time import sleep, time from time import sleep, time
@ -17,12 +17,14 @@ from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log from src.utils.jsonLogger import log
from src.utils.whitelistedWords import filterOutTweetsWithNoneWhitelistedWords
from http.client import IncompleteRead from http.client import IncompleteRead
from urllib3.exceptions import ProtocolError from urllib3.exceptions import ProtocolError
hourStack = [] hourStack = []
dumpStack = [] dumpStack = []
processStack = []
class keys(): class keys():
@ -32,20 +34,20 @@ class keys():
self.access_token = os.getenv("ACCESS_TOKEN") self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET") self.access_secret = os.getenv("ACCESS_SECRET")
def sendToArtemis(pos, neu, neg, compound, type): def sendToArtemis(syncId, pos, neu, neg, compound, type):
timestamp = datetime.now() + timedelta(hours=1) timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0) strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S') timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type } message = { "timestamp" : timestamp, "syncId": str(syncId), "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound, "type": type }
messageJson = json.dumps(message, indent = 4) messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO') log("Sending message to TweetSave queue for SyncId [{}]".format(syncId), 'INFO')
log("Message: {}".format(message), 'INFO') log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson) activeMQSender(messageJson, syncId)
class Streamer(): class Streamer():
@ -54,6 +56,7 @@ class Streamer():
def stream_tweets(self, hashtag): def stream_tweets(self, hashtag):
listener = Listener(hashtag) listener = Listener(hashtag)
auth = OAuthHandler(keys().api_key, keys().api_secret) auth = OAuthHandler(keys().api_key, keys().api_secret)
log("Authorising with twitter API...", 'INFO') log("Authorising with twitter API...", 'INFO')
@ -66,17 +69,13 @@ class Streamer():
while True: while True:
try: try:
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended') stream = Stream(auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag) stream.filter(languages=["en"], track=hashtag)
except IncompleteRead: except IncompleteRead:
log("Incomplete Read Error", 'ERR') log("Incomplete Read Error", 'ERR')
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
continue continue
except ProtocolError: except ProtocolError:
log("Protocol Error", 'ERR') log("Protocol Error", 'ERR')
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
continue continue
class Listener(StreamListener): class Listener(StreamListener):
@ -91,61 +90,68 @@ class Listener(StreamListener):
if (time() - self.start_time) < self.limit: if (time() - self.start_time) < self.limit:
data = json.loads(data) data = json.loads(data)
log(len(dumpStack), 'INFO')
# Check if tweet is a retweet # Check if tweet is a retweet
if 'retweeted_status' in data: if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']: if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit #if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text'] text = data['retweeted_status']['extended_tweet']['full_text']
processTweet(text, self.hashtag) text = filterOutTweetsWithNoneWhitelistedWords(text)
# dumpStack.append({'type': self.hashtag, 'tweet': text}) dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
text = data['retweeted_status']['text'] text = data['retweeted_status']['text']
processTweet(text, self.hashtag) text = filterOutTweetsWithNoneWhitelistedWords(text)
# dumpStack.append({'type': self.hashtag, 'tweet': text}) dumpStack.append({'type': self.hashtag, 'tweet': text})
else: else:
# Else if a normal Tweeet # Else if a normal Tweeet
if 'extended_tweet' in data: if 'extended_tweet' in data:
# If tweet is over 140 word limit # If tweet is over 140 word limit
text = data['extended_tweet']['full_text'] text = data['extended_tweet']['full_text']
processTweet(text, self.hashtag) text = filterOutTweetsWithNoneWhitelistedWords(text)
# dumpStack.append({'type': self.hashtag, 'tweet': text}) dumpStack.append({'type': self.hashtag, 'tweet': text})
def processTweet(text, type): def processTweet(syncId):
log("hourStack size :: [{}]".format(len(hourStack)), "INFO") log(len(dumpStack), 'INFO')
# processStack = dumpStack.copy()
# dumpStack.clear()
# log("Processing [{}] Tweets...".format(len(processStack)), 'INFO') processStack = dumpStack.copy()
dumpStack.clear()
# if len(processStack) != 0: # log("Processing [{}] Tweet...".format(text), 'INFO')
# for tweet in processStack:
removedLines = fixLines(text)
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
checkIfEnglish = detectLaguage(removedSpecialChars[0]) if len(processStack) != 0:
for tweet in processStack:
if checkIfEnglish == True: removedLines = fixLines(tweet["tweet"])
tweetText = remove_non_ascii(removedSpacing) removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
# log("Cleaned Tweet: {}".format(tweetText), 'INFO') tweetLength = checkLength(removedSpacing)
if tweetLength == True:
cleanedTweet = tweetText + ' ' + removedSpecialChars[1] checkIfEnglish = detectLaguage(removedSpecialChars[0])
if callSpamFilter(cleanedTweet) != 'spam': if checkIfEnglish == True:
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
if compound != 0.0 | neu <= 0.8: tweetText = remove_non_ascii(removedSpacing)
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': type}
hourStack.append(hourTweet) # log("Cleaned Tweet: {}".format(tweetText), 'INFO')
# processStack.clear()
# else: cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
# log("Dump Stack was Empty", 'WARN')
if callSpamFilter(cleanedTweet, syncId) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet, syncId)
if compound != 0.0 and neu <= 0.6:
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound, 'type': tweet["type"]}
hourStack.append(hourTweet)
processStack.clear()
else:
log("Dump Stack was Empty", 'WARN')
def collector(hashtag): def collector(hashtag):
@ -171,17 +177,23 @@ def createHourJob():
log("Creating hour job...", 'INFO') log("Creating hour job...", 'INFO')
schedule.clear("sendToArtemis") schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0 ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
type = ""
global timeF global timeF
timeF = timeFunction() timeF = timeFunction()
syncId = uuid.uuid4()
processTweet(syncId)
processStack = hourStack.copy() processStack = hourStack.copy()
hourStack.clear()
log("Extracting sentiment scores...", 'INFO') log("Extracting sentiment scores...", 'INFO')
if len(processStack) != 0: if len(processStack) != 0:
log("Process stack size is :: [{}]".format(len(processStack)), 'INFO') log("Process stack size is :: [{}]".format(len(processStack)), 'INFO')
for item in hourStack: for item in processStack:
ovPos = ovPos + item['pos'] ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu'] ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg'] ovNeg = ovNeg + item['neg']
@ -196,9 +208,9 @@ def createHourJob():
if type == "bitcoin": if type == "bitcoin":
type = 'btc_usd' type = 'btc_usd'
hourStack.clear() processStack.clear()
sendToArtemis(pos, neu, neg, compound, type) sendToArtemis(syncId, pos, neu, neg, compound, type)
else: else:
log("Stack is empty", 'WARN') log("Stack is empty", 'WARN')
@ -212,6 +224,7 @@ def collectorMain(hashtag):
for i in range(len(hashtag)): for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start() Thread(target=collector, args=[hashtag[i]]).start()
sleep(2)
createHourJob() createHourJob()
while True: while True:

View File

@ -17,14 +17,20 @@ class keys():
def returnKeys(self): def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message): def activeMQSender(message, syncId):
addr, port, mqUser, mqPass = keys().returnKeys() addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO') log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False) con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True) con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"}) con.send("TweetSave",
message,
content_type="application/json",
headers={
"Content-Type":"application/json",
"X-CRYPTO-Sync-ID":syncId
})
con.disconnect() con.disconnect()

View File

@ -28,13 +28,13 @@ def setup_logging(log_level='INFO'):
logHandler.setFormatter(formatter) logHandler.setFormatter(formatter)
logger.addHandler(logHandler) logger.addHandler(logHandler)
def log(message, level): def log(message, level, syncId=""):
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
if level == 'INFO': if level == 'INFO':
logger.info(message) logger.info(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'WARN': elif level == 'WARN':
logger.warn(message) logger.warn(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'ERR': elif level == 'ERR':
logger.error(message) logger.error(message, extra={"X-CRYPTO-Sync-ID" : syncId})
elif level == 'DEBUG': elif level == 'DEBUG':
logger.debug(message) logger.debug(message, extra={"X-CRYPTO-Sync-ID" : syncId})

View File

@ -9,11 +9,14 @@ class keys():
def __init__(self): def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL") self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet): def callSentimentAnalyser(tweet, syncId):
# log("Calling Sentiment Analyser for [{}]".format(tweet), 'INFO') headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", uri) response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text) response = json.loads(response.text)
@ -21,5 +24,5 @@ def callSentimentAnalyser(tweet):
return scores["pos"], scores["neu"], scores["neg"], scores["compound"] return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except: except:
log("Could not call Sentiment Analyser Service", 'ERR') log("Could not call Sentiment Analyser Service with syncId of [{}]".format(syncId), 'ERR', syncId)
return 0, 0, 0, 0 return 0, 0, 0, 0

View File

@ -9,16 +9,18 @@ class keys():
def __init__(self): def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL") self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet): def callSpamFilter(tweet, syncId):
headers = {
"content-type":"text",
"X-CRYPTO-Sync-ID" : str(syncId)
}
try: try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", uri) response = requests.request("GET", url=uri, headers=headers)
response = json.loads(response.text) response = json.loads(response.text)
# log("Spam Filter result for [{}] is [{}]".format(tweet, response["result"]), 'INFO')
return response["result"] return response["result"]
except: except:
log("Could not call spam filter service", 'ERR') log("Could not call spam filter service with syncId of [{}]".format(syncId), 'ERR', syncId)
return "" return ""

View File

@ -0,0 +1,152 @@
#!/usr/bin/env python
whitelist = [
"bull",
"bear",
"bullish",
"bearish",
"up",
"down",
"high",
"low",
"higher",
"lower",
"absconded",
"maximalists",
"regulate",
"infamous",
"tradehigher",
"tradelower",
"revival",
"centralized",
"decentralized",
"centralised",
"decentralised",
"decentralization",
"decentralisation",
"centralization",
"centralisation",
"bans",
"hodl",
"ambiguity",
"revolutionize",
"revolutionise",
"consolidation",
"shorts",
"longs",
"long",
"short",
"shorting",
"grow",
"volatile",
"rally",
"rallying",
"noob",
"noobs",
"innovation",
"bottom",
"top",
"topped",
"bottomed",
"upwards",
"downwards",
"invest",
"raging",
"rocketing",
"swing",
"swinging",
"stake",
"whale",
"whales",
"lull",
"moon",
"choppy",
"buy",
"buying",
"sell",
"selling",
"startselling",
"stopselling",
"startbuying",
"stopbuying",
"bitcoin",
"btc",
"eth",
"xmr",
"xrp",
"ripple",
"block",
"reward",
"airdrop",
"drop",
"raise",
"stack",
"stake",
"invest",
"pull",
"push",
"token",
"sale",
"unhappy",
"happy",
"expert",
"novice"
"passed",
"mark",
"decline",
"incline",
"fees",
"crypto",
"wallet",
"price",
"history",
"reached",
"upward",
"downward",
"trading",
"mining",
"defi",
"finance",
"blockchain",
"interest",
"alt",
"alts",
"fiat",
"fiat",
"currency",
"currencies",
"wealth",
"hype",
"hyped",
"achievement",
"platform",
"incremental",
"increment",
"decrement",
"decremental",
"success",
"loss",
"win",
"lose",
"worth",
"strongest",
"weakest",
"strong",
"weak",
"trade",
"popping",
"sucking",
"shard",
"sharding",
"industry",
"powerful",
"better",
"worse"
]
def filterOutTweetsWithNoneWhitelistedWords(text):
if any(x in text for x in whitelist):
return text
else:
# log("Tweet [{}] did not contain any keywords for it to be considered crypto related".format(text), 'WARN')
return ""