import math
import numpy as np
import pandas as pd
import random
from docx import Document
import re
random.seed(0)
pd.options.display.max_rows = None
# --------------------------- Configuration parameters ---------------------------
doc_path = r"simple_word.docx"
learning_rate = 0.01
embedding_dim = 10
total_epochs = 50
window_size = 4 # must be even; actual left/right context = window_size // 2
# --------------------------- Load and tokenise corpus ---------------------------
doc = Document(doc_path)
raw_text = ''
sentences = []
for paragraph in doc.paragraphs:
text = paragraph.text
if len(text) == 0:
continue
tokens = [token for token in re.split(r' |!|\?|\.|。|,|,|\(|\)', text) if token]
sentences.append(tokens)
raw_text += text
token_sequence = [token for token in re.split(r' |!|\?|\.|。|,|,|\(|\)', raw_text) if token]
vocabulary = set(token_sequence)
vocab_size = len(vocabulary)
# ----------------- Word frequency & 3/4-power subsampling probabilities -----------------
freq_series = pd.value_counts(token_sequence)
smoothed_prob = {}
for word, count in freq_series.items():
smoothed_prob[word] = count ** (3 / 4)
# ----------------- Build one-hot encoding for each word -----------------
word_to_onehot = {}
for idx, word in enumerate(vocabulary):
vec = [0] * vocab_size
vec[idx] = 1
word_to_onehot[word] = vec
vocab_onehot_df = pd.DataFrame(word_to_onehot)
# ----------------- Initialise word vectors (W) and output vectors (u) -----------------
word_vecs = {}
for word in vocabulary:
word_vecs[word] = [random.random() for _ in range(embedding_dim)]
W_df = pd.DataFrame(word_vecs)
output_vecs = {}
for word in vocabulary:
output_vecs[word] = [1.0 for _ in range(embedding_dim)]
u_df = pd.DataFrame(output_vecs)
print("Initial word embedding matrix W:")
print(W_df)
print("Initial output parameter matrix u:")
print(u_df)
# ----------------- Extract all training samples (target, context) -----------------
half_window = window_size // 2
training_samples = []
context_h_cache = {}
for sentence in sentences:
for pos, target_word in enumerate(sentence):
left_context = sentence[max(0, pos - half_window):pos]
right_context = sentence[pos + 1:pos + 1 + half_window]
context_words = left_context + right_context
# Compute hidden vector h (sum of context word embeddings)
h_vector = list(W_df[context_words].sum(axis=1))
sample = (target_word, h_vector, context_words)
training_samples.append(sample)
context_h_cache[target_word] = list(h_vector)
h_lookup = pd.DataFrame(context_h_cache)
# --------------------------- Helper functions ---------------------------
def sigmoid_product(vec_a, vec_b):
"""sigmoid of dot product between vec_a and vec_b"""
dot_prod = np.dot(vec_a, vec_b)
# Prevent overflow
if dot_prod > 700:
dot_prod = 700
elif dot_prod < -700:
dot_prod = -700
return 1.0 / (1.0 + math.exp(-dot_prod))
def draw_negative_samples(num_negatives=6):
"""
Draw negative words using 3/4-power frequency distribution.
Cumulative distribution is built once and reused.
"""
if not hasattr(draw_negative_samples, "cdf_map"):
total_mass = sum(smoothed_prob.values())
cdf = 0.0
draw_negative_samples.cdf_map = []
for w, prob in smoothed_prob.items():
cdf += prob / total_mass * 100
draw_negative_samples.cdf_map.append((w, cdf))
draw_negative_samples.max_val = cdf
negatives = []
for _ in range(num_negatives):
rand_val = random.uniform(0, draw_negative_samples.max_val)
for w, limit in draw_negative_samples.cdf_map:
if rand_val < limit:
negatives.append(w)
break
else:
negatives.append(w)
return negatives
def softmax(query_dict, candidate_dict):
"""
Compute softmax over candidate vectors given a query vector.
query_dict: {'vec': query_vector}
candidate_dict: {word: vector}
"""
numerator_exp = math.exp(np.dot(query_dict['vec'], candidate_dict))
denominator = sum(math.exp(np.dot(query_dict['vec'], candidate_dict[word])) for word in candidate_dict)
return numerator_exp / denominator
# ============================= Training with Negative Sampling =============================
# Strategy: sample negative words once per target word for both u and w updates.
for epoch in range(total_epochs):
print(f"Epoch {epoch + 1}/{total_epochs}")
for target, h_vec, context_words in training_samples:
negative_candidates = draw_negative_samples(6)
combined_words = negative_candidates.copy()
combined_words.append(target) # ensures positive sample is included
# -------- Update output vector u for the current target word --------
u_old = u_df[target]
u_gradient_accum = np.zeros(embedding_dim)
for word in combined_words:
label = 1 if word == target else 0
h_word = h_lookup[word]
error = label - sigmoid_product(u_old, h_word)
u_gradient_accum += learning_rate * error * np.array(h_word)
u_df[target] = np.array(u_old) + u_gradient_accum
# -------- Update word vectors W for the context words --------
w_gradient_accum = np.zeros(embedding_dim)
for word in combined_words:
label = 1 if word == target else 0
u_word = u_df[word]
error = label - sigmoid_product(u_word, h_vec)
w_gradient_accum += learning_rate * error * np.array(u_word)
for context_word in context_words:
W_df[context_word] = np.array(W_df[context_word]) + w_gradient_accum
print("Training finished.")
print("Updated W:")
print(W_df)
# ============================= Predict center word given context =============================
def predict_center(h_query, output_embedding_df):
"""Return the word with maximum softmax value given hidden vector h_query."""
best_word = None
best_score = -1
for candidate in output_embedding_df.columns:
score = np.dot(h_query, output_embedding_df[candidate])
if score > best_score:
best_score = score
best_word = candidate
return best_word
correct_count = 0
for target, h_vec, context_words in training_samples:
predicted = predict_center(h_vec, u_df)
if predicted == target:
correct_count += 1
accuracy = correct_count / len(training_samples) * 100
print(f"Prediction accuracy: {accuracy:.2f}%")
# ===================== Compute cosine similarity for similar words =====================
def cosine_similarity(vec_a, vec_b):
dot = np.dot(vec_a, vec_b)
norm_a = np.linalg.norm(vec_a)
norm_b = np.linalg.norm(vec_b)
return dot / (norm_a * norm_b) if norm_a != 0 and norm_b != 0 else 0.0
query_word = "different"
query_vec = W_df[query_word]
similarity_scores = {}
for word in W_df.columns:
similarity_scores[word] = cosine_similarity(query_vec, W_df[word])
similarity_series = pd.Series(similarity_scores).sort_values()
print("Cosine similarities with '{}':".format(query_word))
print(similarity_series)