[09.10.20] Tweet Collector Service

This commit is contained in:
andrewso 2020-10-09 18:11:06 +01:00
parent 1c2b58dcfc
commit da687a0bcc
15 changed files with 589 additions and 70 deletions

10
Dockerfile Normal file
View File

@ -0,0 +1,10 @@
FROM python:3.7-alpine
MAINTAINER Andrew Sotheran <cryptosky.user@gmail.com>
RUN apk update && \
apk add py-pip libc-dev gcc
RUN python -m pip install --upgrade pip
RUN pip install utils pycryptodome && \
pip install python-dotenv schedule tweepy stomp.py python-json-logger
COPY . /home/tweet-collector/.
EXPOSE 9090
CMD ["python", "/home/tweet-collector/src/main.py"]

View File

@ -0,0 +1,134 @@
apiVersion: apps/v1
kind: Deployment
metadata:
annotations:
linkerd.io/inject: enabled
labels:
name: LABEL
name: RESOURCE_NAME
namespace: production
spec:
replicas: 1
selector:
matchLabels:
app: RESOURCE_NAME
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 0
template:
metadata:
annotations:
linkerd.io/inject: enabled
labels:
app: RESOURCE_NAME
spec:
containers:
- image: REPOSITORY/IMAGE
name: RESOURCE_NAME
env:
- name: KUBERNETES_NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: CONTAINER_CORE_LIMIT
valueFrom:
resourceFieldRef:
resource: limits.cpu
- name: CONTAINER_MAX_MEMORY
valueFrom:
resourceFieldRef:
resource: limits.memory
- name: DATABASE_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: dbGateway.url
- name: AMQ_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: amqStomp.url
- name: BROKER_USER
valueFrom:
secretKeyRef:
name: amq
key: amq.username
- name: BROKER_PASSWORD
valueFrom:
secretKeyRef:
name: amq
key: amq.password
- name: DB_GATEWAY_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: dbGateway.url
- name: FILTER_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: spamFilter.url
- name: SENTIMENT_URL
valueFrom:
configMapKeyRef:
name: endpoints
key: sentiment.url
- name: API_KEY
valueFrom:
secretKeyRef:
name: twitter
key: twitter.api.key
- name: API_SECRET
valueFrom:
secretKeyRef:
name: twitter
key: twitter.api.secret
- name: ACCESS_TOKEN
valueFrom:
secretKeyRef:
name: twitter
key: twitter.access.token
- name: ACCESS_SECRET
valueFrom:
secretKeyRef:
name: twitter
key: twitter.access.secret
ports:
- containerPort: 9090
name: RESOURCE_NAME
# livenessProbe:
# httpGet:
# path: /health
# port: 9090
# initialDelaySeconds: 30
# periodSeconds: 30
# timeoutSeconds: 1
# successThreshold: 1
# failureThreshold: 1
# readinessProbe:
# httpGet:
# port: 9090
# path: /readiness
# initialDelaySeconds: 30
# periodSeconds: 5
# timeoutSeconds: 1
# successThreshold: 1
# failureThreshold: 1
imagePullPolicy: Always
resources:
requests:
cpu: 32m
memory: 32Mi
limits:
cpu: 75m
memory: 64Mi
securityContext:
capabilities:
add:
- NET_ADMIN
- NET_RAW
restartPolicy: Always
imagePullSecrets:
- name: registry-cryptosky-image-registry

View File

@ -0,0 +1,16 @@
kind: Service
apiVersion: v1
metadata:
labels:
name: LABEL
name: RESOURCE_NAME
namespace: production
spec:
selector:
app: RESOURCE_NAME
ports:
- port: 9090
protocol: TCP
targetPort: 9090
sessionAffinity: None
type: ClusterIP

View File

@ -0,0 +1,95 @@
#!/usr/bin/env groovy
env.APPLICATION_NAME = 'tweet-collector'
env.APPLICATION_LABEL = 'pricing'
env.GIT_BRANCH = 'master'
env.GIT_REPOSITORY_PATH = "github.com/andyjk15/${env.APPLICATION_NAME}.git"
env.GIT_REPOSITORY_URL = "https://${env.GIT_REPOSITORY_PATH}"
env.GITHUB_CREDENTIALS_ID = 'Github'
env.DIGITAL_OCEAN = 'registry.digitalocean.com'
env.DIGITAL_OCEAN_REPO = 'cryptosky-image-registry'
env.DOCKER_BUILDER = 'registry.cryptosky.me'
env.DOCKER_REPOSITORY = "${env.DIGITAL_OCEAN}/${env.DIGITAL_OCEAN_REPO}"
env.DOCKER_REPOSITORY_TCP = "tcp://${env.DOCKER_BUILDER}:4243"
env.NAMESPACE = 'production'
env.SLAVE_LABEL = "cryptosky-aio-build"
String get_application_version() {
"1.0.0-b${env.BUILD_NUMBER}"
}
String executeShellScript( String shellPath, String arg1 = '', String arg2 = '', String arg3 = '', String arg4 = '', String arg5 = '' ) {
sh "./${shellPath} ${arg1} ${arg2} ${arg3} ${arg4} ${arg5}"
}
try {
timestamps {
node ("${env.SLAVE_LABEL}") {
stage('Initialise') {
checkout([$class: 'GitSCM', branches: [[name: 'master']], doGenerateSubmoduleConfigurations: false, extensions: [], submoduleCfg: [], userRemoteConfigs: [[credentialsId: 'Github', url: env.GIT_REPOSITORY_URL]]])
env.APPLICATION_VERSION = get_application_version()
withCredentials(
[usernamePassword(
credentialsId: 'doctl',
passwordVariable: 'DOCTL_TOKEN',
usernameVariable: 'DOCTL_USERNAME'
)]
) {
sh "doctl auth init --access-token ${DOCTL_TOKEN}"
sh "doctl registry login"
sh "doctl kubernetes cluster kubeconfig save cryptosky-cluster"
}
}
stage('Build Image') {
executeShellScript("configuration/scripts/mapVarsToConfigs.sh",
env.DIGITAL_OCEAN,
env.DIGITAL_OCEAN_REPO,
env.APPLICATION_NAME,
env.APPLICATION_VERSION,
env.APPLICATION_LABEL)
withDockerServer([uri: "${env.DOCKER_REPOSITORY_TCP}"]) {
docker.build("${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}")
docker.build("${env.APPLICATION_NAME}:latest")
sh "docker tag ${env.APPLICATION_NAME}:${env.APPLICATION_VERSION} ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
sh "docker tag ${env.APPLICATION_NAME}:latest ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:${env.APPLICATION_VERSION}"
sh "docker push ${env.DOCKER_REPOSITORY}/${env.APPLICATION_NAME}:latest"
}
}
stage('Tag Repository') {
withCredentials(
[usernamePassword(
credentialsId: env.GITHUB_CREDENTIALS_ID,
passwordVariable: 'GIT_PASSWORD',
usernameVariable: 'GIT_USERNAME'
)]
) {
sh "git tag ${env.APPLICATION_VERSION}"
sh "git push https://${GIT_USERNAME}:${GIT_PASSWORD}@${env.GIT_REPOSITORY_PATH} ${env.APPLICATION_VERSION}"
}
}
stage('Deploy') {
executeShellScript("configuration/scripts/deployToKubernetes.sh",
env.APPLICATION_NAME)
}
}
}
} catch ( exception ) {
currentBuild.result = 'FAILURE'
throw exception
} finally {
currentBuild.result = 'SUCCESS'
}

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
APPLICATION_NAME=$1
kubectl apply -f configuration/kubernetes/deployment.yaml
kubectl apply -f configuration/kubernetes/service.yaml
kubectl rollout status deployment/${APPLICATION_NAME} --namespace=production

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
DIGITAL_OCEAN=$1
DIGITAL_OCEAN_REPO=$2
APPLICATION_NAME=$3
APPLICATION_VERSION=$4
APPLICATION_LABEL=$5
DOCKER_REPOSITORY="${DIGITAL_OCEAN}\/${DIGITAL_OCEAN_REPO}"
sed -i "s/REPOSITORY/${DOCKER_REPOSITORY}/g" configuration/kubernetes/deployment.yaml
sed -i "s/IMAGE/${APPLICATION_NAME}:${APPLICATION_VERSION}/g" configuration/kubernetes/deployment.yaml
sed -i "s/RESOURCE_NAME/${APPLICATION_NAME}/g" configuration/kubernetes/deployment.yaml
sed -i "s/LABEL/${APPLICATION_LABEL}/g" configuration/kubernetes/deployment.yaml
sed -i "s/RESOURCE_NAME/${APPLICATION_NAME}/g" configuration/kubernetes/service.yaml
sed -i "s/LABEL/${APPLICATION_LABEL}/g" configuration/kubernetes/service.yaml

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
import sys
sys.path.append('/home/price-collector/')
from threading import Thread
from tweets.collector import collectorMain
from src.utils.jsonLogger import setup_logging
from probes.probes import runFlaskProbes
def callCollector(args):
collectorMain(args)
def callProbes():
runFlaskProbes()
if __name__=='__main__':
setup_logging()
Thread(target=callProbes).start()
hashtags = [ "bitcoin" ]
callCollector(hashtags)
# for i in range(len(currencies)):
# Thread(target=callCollector, args=[hashtags[i], hourStack]).start()

0
src/probes/__init__.py Normal file
View File

43
src/probes/probes.py Normal file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python
from flask import Flask
import json
app = Flask(__name__)
@app.route('/health')
def health():
return json.dumps({'status': 'UP'}), 200, {'ContentType':'application/json'}
@app.route('/readiness')
def readiness():
# Can it make a call to an exchange?
tweet = 'dsd'
if tweet != "" :
return json.dumps({
'status': 'UP',
'app': {
'name': 'CryptoSky Price Collector',
'description': 'Projects Price Collector service that collects the: High, Low, Open, Close prices, Volume and calculates average price for the hour.',
'check_status': 'Success - Call to coinbase exchange',
'tweet': tweet
}
}), 200, {'ContentType': 'application/json'}
else:
return json.dumps({
'status': 'DOWN',
'app': {
'name': 'CryptoSky Price Collector',
'description': 'Projects Price Collector service that collects the: High, Low, Open, Close prices, Volume and calculates average price for the hour.',
'check_status': 'Failed - Call to coinbase exchange',
'tweet': tweet
}
}), 503, {'ContentType': 'application/json'}
def runFlaskProbes():
app.run(port=9090, host="0.0.0.0")
if __name__ == '__main__':
runFlaskProbes()

View File

@ -1,20 +1,24 @@
#!/usr/bin/env python
import os, sys
import os, sys, json
from datetime import datetime, timedelta
import time
from time import sleep, time
from tweepy import OAuthHandler
from tweepy import Stream
import schedule
from threading import Thread
from tweepy import Stream, API, OAuthHandler
from tweepy.streaming import StreamListener
from dotenv import load_dotenv
from pathlib import Path # python3 only
env_path = Path('.') / 'configuration/twitter.env'
load_dotenv(dotenv_path=env_path)
from src.utils.tweetPreprocessing import fixLines, cleanTweet, removeSpacing, checkLength, detectLaguage, remove_non_ascii
from src.utils.spamFilter import callSpamFilter
from src.utils.sentimentAnalyser import callSentimentAnalyser
from src.utils.activemqConnect import activeMQSender
from src.utils.jsonLogger import log
from utils.tweetPreprocessing import *
hourStack = []
class keys():
@ -23,95 +27,147 @@ class keys():
self.api_secret = os.getenv("API_SECRET")
self.access_token = os.getenv("ACCESS_TOKEN")
self.access_secret = os.getenv("ACCESS_SECRET")
self.currency_hashtags = os.getenv("CURRENCY_HASHTAGS")
def sendToArtemis(pos, neu, neg, compound):
timestamp = datetime.now() + timedelta(hours=1)
strippedTimestamp = timestamp.replace(minute=0, second=0, microsecond=0)
timestamp = strippedTimestamp.strftime('%Y-%m-%dT%H:%M:%S')
message = { "timestamp" : timestamp, "pos" : pos, "neu" : neu, "neg" : neg, "compound" : compound }
messageJson = json.dumps(message, indent = 4)
log("Sending message to TweetSave queue", 'INFO')
log("Message: {}".format(message), 'INFO')
activeMQSender(messageJson)
class Streamer():
def __init__(self):
pass
def stream_tweets(self, tweets_file, temp_tweets, hashtag, tweetFilter, analyser):
listener = Listener(tweets_file, temp_tweets, tweetFilter, analyser)
def stream_tweets(self, hashtag):
listener = Listener()
auth = OAuthHandler(keys().api_key, keys().api_secret)
print("Console: ", "Authorising with twitter API")
sys.stdout.flush()
log("Authorising with twitter API...", 'INFO')
auth.set_access_token(keys().access_token, keys().access_secret)
print("Console: ", "Streaming Tweets")
sys.stdout.flush()
api = API(auth, wait_on_rate_limit=True,
wait_on_rate_limit_notify=True)
log("Streaming Tweets", 'INFO')
stream = Stream(auth, listener, tweet_mode='extended')
stream = Stream(auth=api.auth, listener=listener, tweet_mode='extended')
stream.filter(languages=["en"], track=hashtag)
class Listener(StreamListener):
def __init__(self, tweets_file, temp_tweets, tweetFilter, analyser, time_limit=3000):
self.tweets_file = tweets_file
self.temp_tweets = temp_tweets
self.tweetFilter = tweetFilter
self.analyser = analyser
self.stack = {}
self.start_time = time.time()
def __init__(self, time_limit=3000):
self.start_time = time()
self.limit = time_limit
def on_data(self, data):
if (time.time() - self.start_time) < self.limit:
now = datetime.now() + timedelta(hours=1)
if (time() - self.start_time) < self.limit:
data = json.loads(data)
try:
# Check if tweet is a retweet
if 'retweeted_status' in data:
if 'extended_tweet' in data['retweeted_status']:
#if tweet is over the 140 word limit
text = data['retweeted_status']['extended_tweet']['full_text']
print("Uncleaned Tweet:", text)
sys.stdout.flush()
self.processTweet(text)
else:
text = data['retweeted_status']['text']
print("Uncleaned Tweet:", text)
sys.stdout.flush()
self.processTweet(text)
else:
# Else if a normal Tweeet
if 'extended_tweet' in data:
# If tweet is over 140 word limit
text = data['extended_tweet']['full_text']
print("Uncleaned Tweet:", text)
sys.stdout.flush()
else:
text = data['text']
print("Uncleaned Tweet: ", text)
sys.stdout.flush()
removedLines = utilityFuncs().fixLines(text)
removedSpecialChars = utilityFuncs().cleanTweet(removedLines)
removedSpacing = utilityFuncs().removeSpacing(removedSpecialChars[0])
tweetLength = utilityFuncs().checkLength(removedSpacing)
self.processTweet(text)
def processTweet(self, text):
removedLines = fixLines(text)
removedSpecialChars = cleanTweet(removedLines)
removedSpacing = removeSpacing(removedSpecialChars[0])
tweetLength = checkLength(removedSpacing)
if tweetLength == True:
checkIfEnglish = utilityFuncs().detectLaguage(removedSpecialChars[0])
checkIfEnglish = detectLaguage(removedSpecialChars[0])
if checkIfEnglish == True:
tweetText = utilityFuncs().remove_non_ascii(removedSpacing)
tweetText = remove_non_ascii(removedSpacing)
print("Cleaned Tweet: ", tweetText)
sys.stdout.flush()
# log("Cleaned Tweet: {}".format(tweetText), 'INFO')
cleanedTweet = tweetText+' '+removedSpecialChars[1]
cleanedTweet = tweetText + ' ' + removedSpecialChars[1]
## Check with spam filter
# Sent Tweet to http request to spam filter service queue
if callSpamFilter(cleanedTweet) != 'spam':
pos, neu, neg, compound = callSentimentAnalyser(cleanedTweet)
class queueListener():
## Check with spam filter
classification = self.tweetFilter.testTweet(cleanedTweet)
hourTweet = {'pos': pos, 'neu': neu, 'neg': neg, 'compound': compound}
hourStack.append(hourTweet)
def collector(hashtag):
log("Thread Start...", 'INFO')
streamer = Streamer()
streamer.stream_tweets(hashtag)
def timeFunction():
global timeF
timeF = datetime.now()
timeF = timeF + timedelta(hours = 1)
timeF = str(timeF)
timeF = ":".join(timeF.split(":", 2)[:2])
timeF = timeF.split(" ")[1].lstrip().split(" ")[0]
return timeF
def createHourJob():
schedule.clear("sendToArtemis")
ovPos, ovNeu, ovNeg, ovCompound = 0, 0, 0, 0
global timeF
timeF = timeFunction()
if len(hourStack) != 0:
for item in hourStack:
ovPos = ovPos + item['pos']
ovNeu = ovNeu + item['neu']
ovNeg = ovNeg + item['neg']
ovCompound = ovCompound + item['compound']
pos = round(ovPos/len(hourStack), 3)
neu = round(ovNeu/len(hourStack), 3)
neg = round(ovNeg/len(hourStack), 3)
compound = round(ovCompound/len(hourStack), 3)
hourStack.clear()
sendToArtemis(pos, neu, neg, compound)
schedule.every().hour.at(timeF).do(createHourJob).tag("sendToArtemis")
def collectorMain(hashtag):
log("Starting Tweet Collector", 'INFO')
for i in range(len(hashtag)):
Thread(target=collector, args=[hashtag[i]]).start()
createHourJob()
while True:
schedule.run_pending()
sleep(1)

View File

@ -0,0 +1,30 @@
#!/usr/bin/env python
import stomp
import os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.uri = os.getenv("AMQ_URL")
self.amqU = os.getenv("BROKER_USER")
self.amqP = os.getenv("BROKER_PASSWORD")
self.addr = self.uri.split(':')[0]
self.port = self.uri.split(':')[1]
def returnKeys(self):
return self.addr, self.port, self.amqU, self.amqP
def activeMQSender(message):
addr, port, mqUser, mqPass = keys().returnKeys()
log("Attempting Connection to Artemis...", 'INFO')
con = stomp.Connection([(addr, port)], auto_content_length=False)
con.connect( mqUser, mqPass, wait=True)
con.send("TweetSave", message, content_type="application/json", headers={"Content-Type":"application/json"})
con.disconnect()

40
src/utils/jsonLogger.py Normal file
View File

@ -0,0 +1,40 @@
#!/usr/bin/env python
import logging
from pythonjsonlogger import jsonlogger
import datetime
class CustomJsonFormatter(jsonlogger.JsonFormatter):
def add_fields(self, log_record, record, message_dict):
super(CustomJsonFormatter, self).add_fields(log_record, record, message_dict)
if not log_record.get('@timestamp'):
# this doesn't use record.created, so it is slightly off
now = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%fZ')
log_record['@timestamp'] = now
if log_record.get('level'):
log_record['level'] = log_record['level'].upper()
else:
log_record['level'] = record.levelname
def setup_logging(log_level='INFO'):
logger = logging.getLogger(__name__)
logger.propagate = 0
logger.setLevel(log_level)
logHandler = logging.StreamHandler()
formatter = CustomJsonFormatter('%(@timestamp)s %(level)s %(name)s %(message)s')
logHandler.setFormatter(formatter)
logger.addHandler(logHandler)
def log(message, level):
logger = logging.getLogger(__name__)
if level == 'INFO':
logger.info(message)
elif level == 'WARN':
logger.warn(message)
elif level == 'ERR':
logger.error(message)
elif level == 'DEBUG':
logger.debug(message)

View File

@ -0,0 +1,23 @@
#!/usr/bin/env python
import json, requests, os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.sentiment_analyser_uri = os.getenv("SENTIMENT_URL")
def callSentimentAnalyser(tweet):
log("Calling Sentiment Analyser", 'INFO')
try:
uri = keys().sentiment_analyser_uri + "/sentiment?tweet="+tweet
response = requests.request("GET", uri)
scores = response["result"]["Score"]
return scores["pos"], scores["neu"], scores["neg"], scores["compound"]
except:
log("Could not call Sentiment Analyser Service", 'ERR')
return 0, 0, 0, 0

20
src/utils/spamFilter.py Normal file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env python
import json, requests, os
from src.utils.jsonLogger import log
class keys():
def __init__(self):
self.spamFilter_uri = os.getenv("FILTER_URL")
def callSpamFilter(tweet):
try:
uri = keys().spamFilter_uri + "/predict?tweet="+tweet
response = requests.request("GET", uri)
return response["result"]
except:
log("Could not call spam filter service", 'ERR')
return ""

View File

@ -45,9 +45,6 @@ def detectLaguage(text):
highest_ratio = max(ratios, key=ratios.get)
print("Console: Text is - ", highest_ratio)
sys.stdout.flush()
if highest_ratio == 'english':
return True
else: