Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Implementing a Stock Trend Classifier with PySpark and Random Forest

Tech May 10 4

Objective Definition

The goal of this implementation is to predict the directional movement of a stock's closing price based on daily financial metrics. The target variable is binary: a label of 1 indicates a non-negative price change (Change >= 0), while 0 indicates a negative change (Change < 0).

The input features used for prediction include:

  • Daily Highest Price (High)
  • Daily Lowest Price (Low)
  • Turnover Rate (Turnover_Rate)
  • Trading Volume (Volume)
  • Day of the Week (to capture weekly seasonality)
  • Relationship between Short-term and Long-term Moving Averages (MA5 vs MA10)

Environment Setup and Imports

Initialize the necessary PySpark modules for SQL operations and machine learning pipelines. We distinguish between the legacy RDD-based API (MLlib) and the modern DataFrame-based API (Spark ML). This implementation utilizes the Spark ML API, which is optimized for structured data and distributed training.

from pyspark.sql import SparkSession, Row
from pyspark.sql.types import DateType, IntegerType, DoubleType
from pyspark.sql.functions import col, dayofweek

from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline

Initializing Spark Session

Create a SparkSession to handle the application context. In a production environment, the master URL would point to a cluster manager; here, we use local threads for demonstration.

spark_session = SparkSession.builder \
    .master("local[*]") \
    .appName("FinancialTrendAnalysis") \
    .getOrCreate()

Data Ingestion and Schema Enforcement

Load historical stock data from a CSV source. It is critical to enforce explicit data types to prevent type mismatch errors during subsequent calculations.

file_path = "file:///path/to/stock_data.csv"

# Load data with headers
df_raw = spark_session.read.option("header", "true").option("encoding", "utf-8").csv(file_path)

# Cast columns to appropriate types
df_typed = df_raw \
    .withColumn("Date", col("Date").cast(DateType())) \
    .withColumn("Open", col("Open").cast(DoubleType())) \
    .withColumn("High", col("High").cast(DoubleType())) \
    .withColumn("Low", col("Low").cast(DoubleType())) \
    .withColumn("Close", col("Close").cast(DoubleType())) \
    .withColumn("Pre_Close", col("Pre_Close").cast(DoubleType())) \
    .withColumn("Change", col("Change").cast(DoubleType())) \
    .withColumn("Turnover_Rate", col("Turnover_Rate").cast(DoubleType())) \
    .withColumn("Volume", col("Volume").cast(IntegerType())) \
    .withColumn("MA5", col("MA5").cast(DoubleType())) \
    .withColumn("MA10", col("MA10").cast(DoubleType()))

df_typed.printSchema()

Feature Engineering via RDD Transformations

While DataFrames are preferred, certain complex row-level logic may be easier to implement using RDD transformations before converting back to a DataFrame. We define helper functions to calculate derived features.

import pyspark.sql.types as T

def extend_row_with_data(original_row, new_field, value):
    """Adds a new field to a Row object."""
    row_dict = original_row.asDict()
    row_dict[new_field] = value
    return T.Row(**row_dict)

def get_trend_direction(change_rate):
    """Encodes the price direction: 1 for Up/Flat, 0 for Down."""
    return 1 if change_rate >= 0.0 else 0

def get_ma_status(short_ma, long_ma):
    """Encodes the relationship between moving averages."""
    if short_ma > long_ma: return 1
    if short_ma == long_ma: return 0
    return -1

def get_weekday_index(date_obj):
    """Returns the weekday index (0=Monday, 6=Sunday)."""
    return date_obj.weekday()

Apply these transformations to the underlying RDD. This step adds features for the day of the week, price trend, and moving average status.

# Convert DataFrame to RDD for custom transformations
rdd_transformed = df_typed.rdd

# Apply mapping functions
rdd_enriched = rdd_transformed.map(
    lambda r: extend_row_with_data(r, "Weekday_Idx", get_weekday_index(r["Date"]))
)

rdd_enriched = rdd_enriched.map(
    lambda r: extend_row_with_data(r, "Trend_Label", get_trend_direction(r["Change"]))
)

rdd_enriched = rdd_enriched.map(
    lambda r: extend_row_with_data(r, "MA_Status", get_ma_status(r["MA5"], r["MA10"]))
)

# Convert back to DataFrame
df_features = spark_session.createDataFrame(rdd_enriched)

df_features.select("Date", "Close", "Change", "Trend_Label", "MA_Status").show(5)

Feature Preparation for Machine Learning

Spark ML algorithms require features to be contained in a single vector column. We use VectorAssembler to combine the selected input columns. Additionally, we rename the target column to label for standard ML compatibility.

# Define the feature columns
feature_columns = ["High", "Low", "Turnover_Rate", "Volume", "Weekday_Idx", "MA_Status"]

# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_vec")

# Rename target column to meet MLlib naming conventions
df_model = df_features.withColumnRenamed("Trend_Label", "label")

# Transform the data
df_assembled = assembler.transform(df_model)

df_assembled.select("features_vec", "label").show(5, truncate=False)

Model Training and Evaluation

Split the dataset into training and testing subsets. We utilize a Pipeline to streamline the workflow, chaining the assembler and the classifier.

# Split data: 80% training, 20% testing
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)

# Initialize Random Forest Classifier
rf_classifier = RandomForestClassifier(
    labelCol="label", 
    featuresCol="features_vec", 
    numTrees=20,  # Number of trees in the forest
    maxDepth=5
)

# Create and execute the pipeline
pipeline = Pipeline(stages=[assembler, rf_classifier])
model = pipeline.fit(train_data)

Prediction and Performance Metrics

Apply the trained model to the test set to generate predictions. We evaluate the performance using the Area Under the ROC Curve (AUC), a standard metric for binary classification.

# Generate predictions
predictions = model.transform(test_data)

# Display sample predictions
predictions.select("Date", "label", "prediction", "probability").show(10)

# Evaluate model performance using AUC
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc_score = evaluator.evaluate(predictions)

print(f"Area Under ROC (AUC): {auc_score}")

# Stop the session
spark_session.stop()

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.