Email Spam Classification Using Machine Learning
Data Loading
def load_sms_data():
messages = open('../data/SMSSpamCollection', 'r', encoding='utf-8')
categories = []
contents = []
reader = csv.reader(messages, delimiter='\t')
for row in reader:
categories.append(row[0])
contents.append(clean_text(row[1]))
messages.close()
return contents, categories
Text Preprocessing
def clean_text(document):
words = [term for sentence in nltk.sent_tokenize(document) for term in nltk.word_tokenize(sentence)]
excluded = stopwords.words('english')
filtered_words = [word for word in words if word not in excluded]
normalized = [word.lower() for word in filtered_words if len(word) >= 3]
pos_tags = nltk.pos_tag(normalized)
lemmatizer = WordNetLemmatizer()
processed = [lemmatizer.lemmatize(word, pos=get_pos_tag(pos_tags[i][1])) for i, word in enumerate(normalized)]
return ' '.join(processed)
Dataset Partitioning
def partition_data(features, labels):
train_features, test_features, train_labels, test_labels = train_test_split(
features, labels, test_size=0.2, random_state=0, stratify=labels
)
return train_features, test_features, train_labels, test_labels
Feature Extraction
def extract_tfidf_features(train_texts, test_texts):
vectorizer = TfidfVectorizer()
train_vectors = vectorizer.fit_transform(train_texts)
test_vectors = vectorizer.transform(test_texts)
return train_vectors, test_vectors, vectorizer
def decode_vector_to_text(original_texts, vectorized_texts, model):
first_vector = vectorized_texts.toarray()[0]
print("First email vector representation:", first_vector)
non_zero_indices = np.flatnonzero(vectorized_texts.toarray()[0])
print("Non-zero element positions:", non_zero_indices)
print("Non-zero element values:", first_vector[non_zero_indices])
vocab = model.vocabulary_
matching_words = []
for word, index in vocab.items():
if index in non_zero_indices:
matching_words.append(word)
print("Words corresponding to non-zero elements:", matching_words)
print("Original email text:", original_texts[0])
Model Selectino
def train_naive_bayes_classifier(train_features, test_features, train_labels, test_labels):
classifier = MultinomialNB()
classifier.fit(train_features, train_labels)
predictions = classifier.predict(test_features)
print("Total samples:", len(test_labels))
print("Correct predictions:", (predictions == test_labels).sum())
return predictions
Model Evaluation
def evaluate_model(predictions, actual_labels):
matrix = confusion_matrix(actual_labels, predictions)
print("=" * 50)
print("Confusion Matrix:\n", matrix)
report = classification_report(actual_labels, predictions)
print("=" * 50)
print("Classification Report:\n", report)
print("=" * 50)
accuracy = (matrix[0][0] + matrix[1][1]) / np.sum(matrix)
print("Model Accuracy:", accuracy)
Complete Implementation
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.metrics import confusion_matrix, classification_report
import numpy as np
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import csv
def get_pos_tag(tag):
if tag.startswith('J'):
return nltk.corpus.wordnet.ADJ
elif tag.startswith('V'):
return nltk.corpus.wordnet.VERB
elif tag.startswith('N'):
return nltk.corpus.wordnet.NOUN
elif tag.startswith('R'):
return nltk.corpus.wordnet.ADV
else:
return nltk.corpus.wordnet.NOUN
def clean_text(document):
words = [term for sentence in nltk.sent_tokenize(document) for term in nltk.word_tokenize(sentence)]
excluded = stopwords.words('english')
filtered_words = [word for word in words if word not in excluded]
normalized = [word.lower() for word in filtered_words if len(word) >= 3]
pos_tags = nltk.pos_tag(normalized)
lemmatizer = WordNetLemmatizer()
processed = [lemmatizer.lemmatize(word, pos=get_pos_tag(pos_tags[i][1])) for i, word in enumerate(normalized)]
return ' '.join(processed)
def load_sms_data():
messages = open('../data/SMSSpamCollection', 'r', encoding='utf-8')
categories = []
contents = []
reader = csv.reader(messages, delimiter='\t')
for row in reader:
categories.append(row[0])
contents.append(clean_text(row[1]))
messages.close()
return contents, categories
def partition_data(features, labels):
train_features, test_features, train_labels, test_labels = train_test_split(
features, labels, test_size=0.2, random_state=0, stratify=labels
)
return train_features, test_features, train_labels, test_labels
def extract_tfidf_features(train_texts, test_texts):
vectorizer = TfidfVectorizer()
train_vectors = vectorizer.fit_transform(train_texts)
test_vectors = vectorizer.transform(test_texts)
return train_vectors, test_vectors, vectorizer
def decode_vector_to_text(original_texts, vectorized_texts, model):
first_vector = vectorized_texts.toarray()[0]
print("First email vector representation:", first_vector)
non_zero_indices = np.flatnonzero(vectorized_texts.toarray()[0])
print("Non-zero element positions:", non_zero_indices)
print("Non-zero element values:", first_vector[non_zero_indices])
vocab = model.vocabulary_
matching_words = []
for word, index in vocab.items():
if index in non_zero_indices:
matching_words.append(word)
print("Words corresponding to non-zero elements:", matching_words)
print("Original email text:", original_texts[0])
def train_naive_bayes_classifier(train_features, test_features, train_labels, test_labels):
classifier = MultinomialNB()
classifier.fit(train_features, train_labels)
predictions = classifier.predict(test_features)
print("Total samples:", len(test_labels))
print("Correct predictions:", (predictions == test_labels).sum())
return predictions
def evaluate_model(predictions, actual_labels):
matrix = confusion_matrix(actual_labels, predictions)
print("=" * 50)
print("Confusion Matrix:\n", matrix)
report = classification_report(actual_labels, predictions)
print("=" * 50)
print("Classification Report:\n", report)
print("=" * 50)
accuracy = (matrix[0][0] + matrix[1][1]) / np.sum(matrix)
print("Model Accuracy:", accuracy)
if __name__ == '__main__':
message_contents, message_categories = load_sms_data()
train_X, test_X, train_y, test_y = partition_data(message_contents, message_categories)
train_vectors, test_vectors, vector_model = extract_tfidf_features(train_X, test_X)
decode_vector_to_text(train_X, train_vectors, vector_model)
model_predictions = train_naive_bayes_classifier(train_vectors, test_vectors, train_y, test_y)
evaluate_model(model_predictions, test_y)
Performance Comparison
When comparing CountVectorizer with TfidfVectorizer for feature extraction, notable differences emerge. CountVectorizer focuses solely on term frequency within documents, implementing a bag-of-words appproach. In contrast, TfidfVectorizer considers both local term frequency and global document frequency, reducing the impact of commonly occurring but semantically insignificant terms.
In practice, TfidfVectorizer demonstrates superior performance with larger datasets by emphasizing meaningful features while de-emphasizing common vocabulary. Although CountVectorizer may show slightly higher overall accuracy, TfidfVectorizer provides more balanced individual sample predictions and better captures text semantics, making it particularly suitable for spam detection tasks.