Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building a CBOW Model from Scratch with Negative Sampling

Tech May 14 1
import math
import numpy as np
import pandas as pd
import random
from docx import Document
import re

random.seed(0)
pd.options.display.max_rows = None

# --------------------------- Configuration parameters ---------------------------
doc_path = r"simple_word.docx"
learning_rate = 0.01
embedding_dim = 10
total_epochs = 50
window_size = 4  # must be even; actual left/right context = window_size // 2

# --------------------------- Load and tokenise corpus ---------------------------
doc = Document(doc_path)
raw_text = ''
sentences = []
for paragraph in doc.paragraphs:
    text = paragraph.text
    if len(text) == 0:
        continue
    tokens = [token for token in re.split(r' |!|\?|\.|。|,|,|\(|\)', text) if token]
    sentences.append(tokens)
    raw_text += text

token_sequence = [token for token in re.split(r' |!|\?|\.|。|,|,|\(|\)', raw_text) if token]
vocabulary = set(token_sequence)
vocab_size = len(vocabulary)

# ----------------- Word frequency & 3/4-power subsampling probabilities -----------------
freq_series = pd.value_counts(token_sequence)
smoothed_prob = {}
for word, count in freq_series.items():
    smoothed_prob[word] = count ** (3 / 4)

# ----------------- Build one-hot encoding for each word -----------------
word_to_onehot = {}
for idx, word in enumerate(vocabulary):
    vec = [0] * vocab_size
    vec[idx] = 1
    word_to_onehot[word] = vec
vocab_onehot_df = pd.DataFrame(word_to_onehot)

# ----------------- Initialise word vectors (W) and output vectors (u) -----------------
word_vecs = {}
for word in vocabulary:
    word_vecs[word] = [random.random() for _ in range(embedding_dim)]
W_df = pd.DataFrame(word_vecs)

output_vecs = {}
for word in vocabulary:
    output_vecs[word] = [1.0 for _ in range(embedding_dim)]
u_df = pd.DataFrame(output_vecs)

print("Initial word embedding matrix W:")
print(W_df)
print("Initial output parameter matrix u:")
print(u_df)
# ----------------- Extract all training samples (target, context) -----------------
half_window = window_size // 2
training_samples = []
context_h_cache = {}

for sentence in sentences:
    for pos, target_word in enumerate(sentence):
        left_context = sentence[max(0, pos - half_window):pos]
        right_context = sentence[pos + 1:pos + 1 + half_window]
        context_words = left_context + right_context

        # Compute hidden vector h (sum of context word embeddings)
        h_vector = list(W_df[context_words].sum(axis=1))

        sample = (target_word, h_vector, context_words)
        training_samples.append(sample)
        context_h_cache[target_word] = list(h_vector)

h_lookup = pd.DataFrame(context_h_cache)
# --------------------------- Helper functions ---------------------------
def sigmoid_product(vec_a, vec_b):
    """sigmoid of dot product between vec_a and vec_b"""
    dot_prod = np.dot(vec_a, vec_b)
    # Prevent overflow
    if dot_prod > 700:
        dot_prod = 700
    elif dot_prod < -700:
        dot_prod = -700
    return 1.0 / (1.0 + math.exp(-dot_prod))


def draw_negative_samples(num_negatives=6):
    """
    Draw negative words using 3/4-power frequency distribution.
    Cumulative distribution is built once and reused.
    """
    if not hasattr(draw_negative_samples, "cdf_map"):
        total_mass = sum(smoothed_prob.values())
        cdf = 0.0
        draw_negative_samples.cdf_map = []
        for w, prob in smoothed_prob.items():
            cdf += prob / total_mass * 100
            draw_negative_samples.cdf_map.append((w, cdf))
        draw_negative_samples.max_val = cdf

    negatives = []
    for _ in range(num_negatives):
        rand_val = random.uniform(0, draw_negative_samples.max_val)
        for w, limit in draw_negative_samples.cdf_map:
            if rand_val < limit:
                negatives.append(w)
                break
        else:
            negatives.append(w)
    return negatives


def softmax(query_dict, candidate_dict):
    """
    Compute softmax over candidate vectors given a query vector.
    query_dict: {'vec': query_vector}
    candidate_dict: {word: vector}
    """
    numerator_exp = math.exp(np.dot(query_dict['vec'], candidate_dict))
    denominator = sum(math.exp(np.dot(query_dict['vec'], candidate_dict[word])) for word in candidate_dict)
    return numerator_exp / denominator
# ============================= Training with Negative Sampling =============================
# Strategy: sample negative words once per target word for both u and w updates.

for epoch in range(total_epochs):
    print(f"Epoch {epoch + 1}/{total_epochs}")

    for target, h_vec, context_words in training_samples:
        negative_candidates = draw_negative_samples(6)
        combined_words = negative_candidates.copy()
        combined_words.append(target)  # ensures positive sample is included

        # -------- Update output vector u for the current target word --------
        u_old = u_df[target]
        u_gradient_accum = np.zeros(embedding_dim)

        for word in combined_words:
            label = 1 if word == target else 0
            h_word = h_lookup[word]
            error = label - sigmoid_product(u_old, h_word)
            u_gradient_accum += learning_rate * error * np.array(h_word)

        u_df[target] = np.array(u_old) + u_gradient_accum

        # -------- Update word vectors W for the context words --------
        w_gradient_accum = np.zeros(embedding_dim)

        for word in combined_words:
            label = 1 if word == target else 0
            u_word = u_df[word]
            error = label - sigmoid_product(u_word, h_vec)
            w_gradient_accum += learning_rate * error * np.array(u_word)

        for context_word in context_words:
            W_df[context_word] = np.array(W_df[context_word]) + w_gradient_accum

print("Training finished.")
print("Updated W:")
print(W_df)
# ============================= Predict center word given context =============================
def predict_center(h_query, output_embedding_df):
    """Return the word with maximum softmax value given hidden vector h_query."""
    best_word = None
    best_score = -1
    for candidate in output_embedding_df.columns:
        score = np.dot(h_query, output_embedding_df[candidate])
        if score > best_score:
            best_score = score
            best_word = candidate
    return best_word


correct_count = 0
for target, h_vec, context_words in training_samples:
    predicted = predict_center(h_vec, u_df)
    if predicted == target:
        correct_count += 1

accuracy = correct_count / len(training_samples) * 100
print(f"Prediction accuracy: {accuracy:.2f}%")
# ===================== Compute cosine similarity for similar words =====================
def cosine_similarity(vec_a, vec_b):
    dot = np.dot(vec_a, vec_b)
    norm_a = np.linalg.norm(vec_a)
    norm_b = np.linalg.norm(vec_b)
    return dot / (norm_a * norm_b) if norm_a != 0 and norm_b != 0 else 0.0


query_word = "different"
query_vec = W_df[query_word]
similarity_scores = {}

for word in W_df.columns:
    similarity_scores[word] = cosine_similarity(query_vec, W_df[word])

similarity_series = pd.Series(similarity_scores).sort_values()
print("Cosine similarities with '{}':".format(query_word))
print(similarity_series)

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.