#!/usr/bin/env python import os, sys from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import PorterStemmer from math import log, sqrt import pandas as pd import numpy as np from sklearn.naive_bayes import MultinomialNB from sklearn.feature_extraction.text import CountVectorizer from sklearn.model_selection import train_test_split from sklearn.metrics import classification_report, accuracy_score from src.utils.jsonLogger import log # Global Metrics HB_NB_Precision = 0 HB_NB_Recall = 0 HB_NB_F_Score = 0 HB_NB_Accuracy = 0 ## Logic def processTweet(tweet, gram = 2): tweet = tweet.lower() #Lower cases words = word_tokenize(tweet) #Tokenise words in text words = [w for w in words if len(w) > 2] if gram > 2: ## Increasing grams can increase accuracy w = [] for i in range(len(words) - gram + 1): w += [' '.join(words[i:i + gram])] return w # Remove stopwords sw = stopwords.words('english') words = [word for word in words if word not in sw] stemmer = PorterStemmer() # Stem words words = [stemmer.stem(word) for word in words] return words class classifier(object): def __init__(self, trainData): self.tweet = trainData['tweet'] self.labels = trainData['class'] def train(self): self.TF_and_IDF() ## Bag of Words self.TF_IDF() ## Term Frequecies def TF_and_IDF(self): noTweets = self.tweet.shape[0] self.spam = self.labels.value_counts()[1] self.ham = self.labels.value_counts()[0] self.total = self.spam + self.ham # Initialise spam vars self.spamCount = 0 self.hamCount = 0 self.tfSpam = dict() self.tfHam = dict() self.idfSpam = dict() self.idfHam = dict() ## Logic for entry in range(noTweets): processed = processTweet(self.tweet[entry]) count = list() #To keep track of whether the word has occured in the message or not. IDF count for word in processed: if self.labels[entry]: self.tfSpam[word] = self.tfSpam.get(word, 0) + 1 self.spamCount += 1 else: self.tfHam[word] = self.tfHam.get(word, 0) + 1 self.hamCount += 1 if word not in count: ## And below is Addictive Smoothing count += [word] for word in count: if self.labels[entry]: self.idfSpam[word] = self.idfSpam.get(word, 0) + 1 else: self.idfHam[word] = self.idfHam.get(word, 0) + 1 def TF_IDF(self): self.probSpam = dict() self.probHam = dict() self.sumSpam = 0 self.sumHam = 0 for word in self.tfSpam: self.probSpam[word] = (self.tfSpam[word]) * log((self.spam + self.ham) / (self.idfSpam[word] + self.idfHam.get(word, 0))) self.sumSpam += self.probSpam[word] for word in self.tfSpam: self.probSpam[word] = (self.probSpam[word] + 1) / (self.sumSpam + len(list(self.probSpam.keys()))) for word in self.tfHam: self.probHam[word] = (self.tfHam[word]) * log((self.spam + self.ham) / (self.idfSpam.get(word, 0) + self.idfHam[word])) self.sumHam += self.probHam[word] for word in self.tfHam: self.probHam[word] = (self.probHam[word] + 1) / (self.sumHam + len(list(self.probHam.keys()))) self.probSpamTotal, self.probHamTotal = self.spam / self.total, self.ham / self.total def classify(self, processed): pSpam, pHam = 0, 0 for word in processed: if word in self.probSpam: pSpam += log(self.probSpam[word]) else: pSpam -= log(self.sumSpam + len(list(self.probSpam.keys()))) if word in self.probHam: pHam += log(self.probHam[word]) else: pHam -= log(self.sumHam + len(list(self.probHam.keys()))) pSpam += log(self.probSpamTotal) pHam += log(self.probHamTotal) return pSpam >= pHam def predict(self, testData): result = dict() for (i, tweet) in enumerate(testData): processed = processTweet(tweet) result[i] = int(self.classify(processed)) return result def metrics(labels, predictions): true_pos, true_neg, false_pos, false_neg = 0, 0, 0, 0 for i in range(len(labels)): true_pos += int(labels[i] == 1 and predictions[i] == 1) true_neg += int(labels[i] == 0 and predictions[i] == 0) false_pos += int(labels[i] == 0 and predictions[i] == 1) false_neg += int(labels[i] == 1 and predictions[i] == 0) HB_NB_Precision = true_pos / (true_pos + false_pos) HB_NB_Recall = true_pos / (true_pos + false_neg) HB_NB_F_Score = 2 * HB_NB_Precision * HB_NB_Recall / (HB_NB_Precision + HB_NB_Recall) HB_NB_Accuracy = (true_pos + true_neg) / (true_pos + true_neg + false_pos + false_neg) print("HB Precision: ", HB_NB_Precision) print("HB Recall: ", HB_NB_Recall) print("HB F-score: ", HB_NB_F_Score) print("HB Accuracy: ", HB_NB_Accuracy) class filterSpam(object): def __init__(self, training_set): self.training_set = training_set def trainFilter(self): self.dataset() self.train() def dataset(self): self.data = pd.read_csv(self.training_set) self.data['class'] = self.data['classes'].map({'ham': 0, 'spam': 1}) self.data.drop(['classes'], axis=1, inplace=True) self.trainIndex, self.testIndex = list(), list() for i in range(self.data.shape[0]): if np.random.uniform(0, 1) < 0.75: self.trainIndex += [i] else: self.testIndex += [i] self.trainData = self.data.loc[self.trainIndex] self.testData = self.data.loc[self.testIndex] self.trainData.reset_index(inplace=True) self.testData.reset_index(inplace=True) self.trainData.drop(['index'], axis=1, inplace=True) self.testData.drop(['index'], axis=1, inplace=True) def train(self): self.spamFilter = classifier(self.trainData) self.spamFilter.train() def testData_Prediction(self): prediction = self.spamFilter.predict(self.testData['tweet']) return prediction def testPrediction(self): # Test Spam/Ham tweets - should return True and False respectivly spam = processTweet("Earn more than 0015 btc free No deposit No investment Free Bitcoins - Earn $65 free btc in 5 minutes bitcoin freebtc getbtc") ham = processTweet("Bitcoin closed with some gains in month of February") hamTweet = self.spamFilter.classify(ham) spamTweet = self.spamFilter.classify(spam) print("Console: ", "Spam Tweet -- ", spamTweet) sys.stdout.flush() print("Console: ", "Ham Tweet -- ", hamTweet) sys.stdout.flush() def filterStatistics(self, prediction): metrics(self.testData['class'], prediction) def filterTweet(self, tweet): processed = processTweet(tweet) classified = self.spamFilter.classify(processed) return classified class multinomialNaiveBayes(object): def __init__(self, training_set): self.training_set = training_set def trainFilter(self): self.dataset() self.train() self.predictTest() def dataset(self): log("Creating Training and Test datasets", 'INFO') self.data = pd.read_csv(self.training_set) self.data.drop_duplicates(inplace = True) self.data['class'] = self.data['classes'].map({'ham': 0, 'spam': 1}) self.cv = CountVectorizer(analyzer=processTweet) messages_bow = self.cv.fit_transform(self.data['tweet']) self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(messages_bow, self.data['class'], test_size = 0.20, random_state = 0) def train(self): self.classifier = MultinomialNB() log("Fitting Split Train datasets against Bayes Classifier", 'INFO') self.classifier.fit(self.X_train, self.y_train) def predictTest(self): log("Testing Prediction agaist X Test Dataset", 'INFO') self.pred = self.classifier.predict(self.X_test) log('Accuracy Prediction against Y Test Dataset :: {}'.format(accuracy_score(self.y_test, self.pred)), 'INFO') def predict(self, tweet): message = self.cv.transform([tweet]).toarray() return self.classifier.predict(message) class tweetFilter(object): def __init__(self): pass def tweetFilterTrain(self): self.Filter = multinomialNaiveBayes("src/resources/tweet_spam_ham.csv") log("Training Filter", 'INFO') self.Filter.trainFilter() ### Self coded NB get around 75 -> 85% accuracy ( not as good as SKlearns ) # Filter.trainFilter() # # prediction = Filter.testData_Prediction() # Filter.filterStatistics(prediction) # # Filter.testPrediction() def tweetFilterPredit(self, tweet): df = pd.DataFrame(self.Filter.predict(tweet)) df[0] = df[0].map({0: 'ham', 1: 'spam'}) log("Classification of tweet as {}".format(df[0][0]), 'INFO') return df[0][0]