Implementation and Optimization of a Plagiarism Detection System Using TF-IDF and Cosine Similarity
The system is designed as a modular Python application for detecting document similarity. Its core functionality relies on processing text files, performing text analysis, and calculating a similarity score.
Core Architecture
The software is structured into three primary modules, each with distinct responsibilities.
File Handler Module
The FileHandler class manages all file system operations. Key responsibilities include reading and writing text files, validating file paths for cross-platform compatibility, and handling various text encodings. It implements robust error handling for scanarios such as missing files, permission issues, and unsupported character encodings.
Similarity Calculator Module
The TextSimilarityCalculator class encapsulates the core plagiarism detection logic. It performs text preprocessing, TF-IDF vectorization, and cosine similarity calculations. The class is designed to be reusable and independent of the input/output mechanisms.
Main Application Module
The main function serves as the program's entry point. It orchestrates the workflow by parsing command-line arguments, delegating tasks to the other modules, and outputting the final result to a specified file.
Core Algorithm Workflow
The system follows a sequential pipeline to compute document similarity:
- File Reading: The original and suspect documents are loaded into memory.
- Text Preprocessing: Each document's text is cleaned by removing punctuation and special characters. It is then tokenized into words using Jieba for Chinese text. Common stop words are filtered out.
- Feature Vectorization: A TF-IDF (Term Frequency-Inverse Document Frequency) model is constructed. This transforms each document into a numerical vector where each dimension represents the importance of a unique word.
- Similarity Scoring: The cosine similarity is computed between the two TF-IDF vectors. This measures the cosine of the angle between the vectors, producing a score between 0 (completely dissimilar) and 1 (identical).
- Result Output: The calculated similarity score is written to an output file.
Implementation Details and Optimizations
Text Preprocessing with Jieba
The tokenization process is optimized for performance on larger texts.
import jieba
import re
class TextSimilarityCalculator:
def __init__(self):
self.stopwords = self._load_stopwords()
def preprocess_document(self, raw_text: str) -> list:
if not raw_text:
return []
# Remove non-alphanumeric and non-Chinese characters
cleaned_text = re.sub(r'[^\u4e00-\u9fa5a-zA-Z0-9\s]', ' ', raw_text)
# Enable parallel processing for faster tokenization
jieba.enable_parallel(4)
try:
tokens = jieba.lcut(cleaned_text)
finally:
jieba.disable_parallel()
# Filter out stopwords and empty strings
filtered_tokens = [t for t in tokens if t and t not in self.stopwords]
return filtered_tokens
Robust File Path Validation
The system includes validation logic to handle various operating system constraints.
import os
class FileHandler:
def is_valid_path(self, path_string: str) -> bool:
if not path_string:
return False
# Define characters that are generally illegal in file paths
forbidden_chars = ['<', '>', '"', '|', '?', '*']
for char in forbidden_chars:
if char in path_string:
return False
# Special handling for colon, which is only valid in Windows drive letters (e.g., C:)
if ':' in path_string:
# Check if it's a valid Windows drive letter pattern (single letter followed by colon)
if not (len(path_string) >= 2 and path_string[1] == ':' and path_string[0].isalpha()):
return False
return True
Efficient TF-IDF and Cosine Similarity Calculation
The algorithm is implemented to avoid redundant calculations.
import numpy as np
from collections import Counter
class TextSimilarityCalculator:
def compute_tfidf_vectors(self, token_lists: list) -> tuple:
"""
Calculates TF-IDF vectors for a list of tokenized documents.
Returns a list of TF-IDF vector dictionaries and the IDF dictionary.
"""
tf_vectors = []
df_counter = Counter() # Document Frequency
# First pass: Calculate Term Frequency (TF) and collect unique terms per document for DF
for tokens in token_lists:
unique_terms = set(tokens)
df_counter.update(unique_terms)
term_freq = Counter(tokens)
total_terms = len(tokens)
# Normalize TF by document length
normalized_tf = {term: freq / total_terms for term, freq in term_freq.items()}
tf_vectors.append(normalized_tf)
# Calculate Inverse Document Frequency (IDF)
total_docs = len(token_lists)
idf_dict = {term: np.log(total_docs / (df + 1)) for term, df in df_counter.items()}
# Combine TF and IDF to create final TF-IDF vectors
tfidf_vectors = []
for tf_vec in tf_vectors:
tfidf_vec = {term: tf * idf_dict[term] for term, tf in tf_vec.items()}
tfidf_vectors.append(tfidf_vec)
return tfidf_vectors, idf_dict
def cosine_similarity_score(self, vector_a: dict, vector_b: dict) -> float:
"""
Calculates the cosine similarity between two sparse vectors represented as dictionaries.
"""
all_features = set(vector_a.keys()) | set(vector_b.keys())
if not all_features:
return 0.0
# Convert sparse dictionaries to dense numpy arrays for calculation
array_a = np.array([vector_a.get(feat, 0.0) for feat in all_features])
array_b = np.array([vector_b.get(feat, 0.0) for feat in all_features])
dot_product = np.dot(array_a, array_b)
norm_a = np.linalg.norm(array_a)
norm_b = np.linalg.norm(array_b)
if norm_a == 0.0 or norm_b == 0.0:
return 0.0
similarity = dot_product / (norm_a * norm_b)
# Ensure result is clamped between 0.0 and 1.0
return max(0.0, min(1.0, similarity))
Unit Testing Strategy
Comprehensive unit tests ensure the reliability of each module.
Testing the Similarity Calculator
import unittest
class TestTextSimilarityCalculator(unittest.TestCase):
def setUp(self):
self.calc = TextSimilarityCalculator()
def test_preprocess_with_punctuation(self):
sample = "Hello, world! This is a test (with parentheses)."
result = self.calc.preprocess_document(sample)
self.assertIn("Hello", result)
self.assertIn("world", result)
self.assertIn("test", result)
# Ensure punctuation is removed
self.assertNotIn("(", result)
self.assertNotIn(")", result)
def test_cosine_similarity_for_identical_vectors(self):
vec1 = {"apple": 0.5, "banana": 0.8, "orange": 0.2}
vec2 = {"apple": 0.5, "banana": 0.8, "orange": 0.2}
score = self.calc.cosine_similarity_score(vec1, vec2)
self.assertAlmostEqual(score, 1.0, places=5)
def test_cosine_similarity_for_orthogonal_vectors(self):
vec1 = {"apple": 1.0, "banana": 0.0}
vec2 = {"apple": 0.0, "banana": 1.0}
score = self.calc.cosine_similarity_score(vec1, vec2)
self.assertAlmostEqual(score, 0.0, places=5)
def test_similarity_with_empty_document(self):
doc1 = "A meaningful sentence."
doc2 = ""
score = self.calc.calculate_similarity(doc1, doc2)
self.assertEqual(score, 0.0)
Integration Test for End-to-End Workflow
import subprocess
import tempfile
import os
class TestIntegration(unittest.TestCase):
def test_complete_pipeline(self):
# Create temporary test files
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f1:
f1.write("The quick brown fox jumps over the lazy dog.")
file1 = f1.name
with tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False) as f2:
f2.write("A quick brown fox leaps over a sleeping dog.")
file2 = f2.name
output_file = "test_output.txt"
# Run the main script via command line
cmd = ['python', 'main.py', file1, file2, output_file]
result = subprocess.run(cmd, capture_output=True, text=True)
# Cleanup input files
os.unlink(file1)
os.unlink(file2)
# Assert the command succeeded
self.assertEqual(result.returncode, 0, msg=f"Command failed: {result.stderr}")
# Assert the output file was created and contains a valid score
self.assertTrue(os.path.exists(output_file))
with open(output_file, 'r') as f:
content = f.read().strip()
similarity = float(content)
self.assertTrue(0.0 <= similarity <= 1.0)
# Cleanup output file
os.unlink(output_file)
Error Handling Design
The system implements a multi-layered error handling strategy to gracefully manage failures.
Centralized Error Handling in Main Workflow
import sys
import logging
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
def execute_plagiarism_check(source_path, target_path, result_path):
"""
Orchestrates the plagiarism check with comprehensive error handling.
Returns True on success, False on failure.
"""
try:
logger.info(f"Starting check: {source_path} vs {target_path}")
handler = FileHandler()
calculator = TextSimilarityCalculator()
# Read documents
doc_a_content = handler.read_file(source_path)
if doc_a_content is None:
logger.error(f"Could not read source file: {source_path}")
return False
doc_b_content = handler.read_file(target_path)
if doc_b_content is None:
logger.error(f"Could not read target file: {target_path}")
return False
# Calculate similarity
final_score = calculator.calculate_similarity(doc_a_content, doc_b_content)
logger.info(f"Calculated similarity score: {final_score}")
# Write result
write_success = handler.write_file(result_path, f"{final_score:.4f}")
if not write_success:
logger.error(f"Failed to write result to: {result_path}")
return False
logger.info("Plagiarism check completed successfully.")
return True
except FileNotFoundError as fnf_error:
logger.error(f"File not found error: {fnf_error}")
print(f"Error: Input file could not be found. {fnf_error}")
except PermissionError as perm_error:
logger.error(f"File permission error: {perm_error}")
print(f"Error: Insufficient permissions to access file. {perm_error}")
except ValueError as val_error:
logger.error(f"Data processing error: {val_error}")
print(f"Error: Invalid data encountered. {val_error}")
except Exception as generic_error:
logger.error(f"Unexpected error during execution: {generic_error}", exc_info=True)
print(f"An unexpected error occurred. Check the log for details.")
return False