Implementing Core Classification Algorithms for Machine Learning Systems
The k-Nearest Neighbors (kNN) algorithm operates on a proximity-based classification paradigm. It determines the category of an unlabeled instance by computing geometric distances to all records in a labeled training set and selecting the most frequent class among the closest neighbors. The method is non-parametric, requiring no assumptions about underlying data distributions, and exhibits high tolerance for anomalous values. Its primary trade-offs involve computational expense during inference, as every prediction requires scanning the entire dataset, and significant memory consumption to store training instances. The algorithm supports both continuous and categorical feature types.
The standard workflow consists of data ingestion, numerical normalization, distance computation, proximity ranking, and majority voting. Notably, kNN does not feature an explicit training phase; model construction is deferred entirely to the prediction stage.
import numpy as np
from collections import Counter
def load_reference_samples():
coordinates = np.array([[1.0, 1.1], [1.0, 1.0], [0.0, 0.0], [0.0, 0.1]])
target_labels = ['ClassAlpha', 'ClassAlpha', 'ClassBeta', 'ClassBeta']
return coordinates, target_labels
def predict_label(query_point, reference_matrix, reference_labels, k_neighbors):
sample_count = reference_matrix.shape[0]
distance_matrix = np.tile(query_point, (sample_count, 1)) - reference_matrix
squared_differences = np.sum(distance_matrix ** 2, axis=1)
euclidean_distances = np.sqrt(squared_differences)
sorted_indices = np.argsort(euclidean_distances)
neighbor_votes = Counter()
for i in range(k_neighbors):
closest_label = reference_labels[sorted_indices[i]]
neighbor_votes[closest_label] += 1
return neighbor_votes.most_common(1)[0][0]
def run_basic_test():
features, classes = load_reference_samples()
sample_a = np.array([1.2, 1.1])
result_a = predict_label(sample_a, features, classes, 3)
sample_b = np.array([0.1, 0.3])
result_b = predict_label(sample_b, features, classes, 3)
print(f"Query {sample_a} classified as: {result_a}")
print(f"Query {sample_b} classified as: {result_b}")
In practical applications, feature scaling is critical when attributes possess varying ranges. Normalization maps all values to a consistent interval, typically [0, 1], preventing high-magnitude features from dominating the distance calculations.
def normalize_dataset(raw_matrix):
minimums = raw_matrix.min(axis=0)
maximums = raw_matrix.max(axis=0)
ranges = maximums - minimums
row_count = raw_matrix.shape[0]
scaled_matrix = raw_matrix - np.tile(minimums, (row_count, 1))
normalized = scaled_matrix / np.tile(ranges, (row_count, 1))
return normalized, ranges, minimums
Decision trees utilize a hierarchical, rule-based structure to partition feature spaces recursive. Each internal node evaluates a specific attribute, outgoing branches represent attribute values, and terminal leaf nodes deliver class predictions. The ID3 variant guides splits using information theory, selecting attributes that maximize enformation gain and minimize dataset entropy. Shannon entropy measures the impurity of a dataset using the formula: H(S) = -Σ p(x) log₂ p(x), where p(x) is the proportion of instances belonging to class x. While decision trees are computationally efficient and produce highly interpretable models, they are prone to overfitting on noisy data. They handle both continuous (after discretization) and nominal data effectively.
import math
from collections import Counter
def compute_entropy(dataset):
total_records = len(dataset)
class_frequencies = Counter(row[-1] for row in dataset)
entropy_value = 0.0
for count in class_frequencies.values():
probability = count / total_records
if probability > 0:
entropy_value -= probability * math.log2(probability)
return entropy_value
def partition_data(dataset, feature_idx, split_value):
subsets = []
for row in dataset:
if row[feature_idx] == split_value:
reduced_row = row[:feature_idx] + row[feature_idx+1:]
subsets.append(reduced_row)
return subsets
def select_optimal_feature(dataset):
base_entropy = compute_entropy(dataset)
feature_count = len(dataset[0]) - 1
max_gain = 0.0
best_index = -1
for idx in range(feature_count):
column_values = [row[idx] for row in dataset]
unique_vals = set(column_values)
weighted_entropy = 0.0
for val in unique_vals:
subset = partition_data(dataset, idx, val)
proportion = len(subset) / len(dataset)
weighted_entropy += proportion * compute_entropy(subset)
info_gain = base_entropy - weighted_entropy
if info_gain > max_gain:
max_gain = info_gain
best_index = idx
return best_index
Recursive tree construction proceeds by repeatedly applying the optimal feature selection until a stopping condition is met: all instances in a subset share the same class label, or no features remain for splitting. When features are exhausted but class labels still vary, majority voting determines the leaf node's prediction.
def resolve_ties(class_list):
return Counter(class_list).most_common(1)[0][0]
def construct_tree(dataset, feature_names):
labels = [row[-1] for row in dataset]
if labels.count(labels[0]) == len(labels):
return labels[0]
if len(dataset[0]) == 1:
return resolve_ties(labels)
optimal_feature = select_optimal_feature(dataset)
feature_label = feature_names[optimal_feature]
tree_structure = {feature_label: {}}
del feature_names[optimal_feature]
column_values = [row[optimal_feature] for row in dataset]
for value in set(column_values):
remaining_labels = feature_names.copy()
subset = partition_data(dataset, optimal_feature, value)
tree_structure[feature_label][value] = construct_tree(subset, remaining_labels)
return tree_structure
def classify_instance(tree, feature_names, test_vector):
root_feature = next(iter(tree))
feature_idx = feature_names.index(root_feature)
branches = tree[root_feature]
for branch_val, sub_tree in branches.items():
if test_vector[feature_idx] == branch_val:
if isinstance(sub_tree, dict):
return classify_instance(sub_tree, feature_names, test_vector)
else:
return sub_tree
To optimize deployment performance, trained tree structures can be serialized using standard binary serialization libraries. This bypasses reconstruction overhead and allows rapid model loading during inference.
import pickle
def persist_model(tree_obj, file_path):
with open(file_path, 'wb') as model_file:
pickle.dump(tree_obj, model_file)
def load_model(file_path):
with open(file_path, 'rb') as model_file:
return pickle.load(model_file)