Implementing a Stock Trend Classifier with PySpark and Random Forest
Objective Definition
The goal of this implementation is to predict the directional movement of a stock's closing price based on daily financial metrics. The target variable is binary: a label of 1 indicates a non-negative price change (Change >= 0), while 0 indicates a negative change (Change < 0).
The input features used for prediction include:
- Daily Highest Price (
High) - Daily Lowest Price (
Low) - Turnover Rate (
Turnover_Rate) - Trading Volume (
Volume) - Day of the Week (to capture weekly seasonality)
- Relationship between Short-term and Long-term Moving Averages (
MA5vsMA10)
Environment Setup and Imports
Initialize the necessary PySpark modules for SQL operations and machine learning pipelines. We distinguish between the legacy RDD-based API (MLlib) and the modern DataFrame-based API (Spark ML). This implementation utilizes the Spark ML API, which is optimized for structured data and distributed training.
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import DateType, IntegerType, DoubleType
from pyspark.sql.functions import col, dayofweek
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml import Pipeline
Initializing Spark Session
Create a SparkSession to handle the application context. In a production environment, the master URL would point to a cluster manager; here, we use local threads for demonstration.
spark_session = SparkSession.builder \
.master("local[*]") \
.appName("FinancialTrendAnalysis") \
.getOrCreate()
Data Ingestion and Schema Enforcement
Load historical stock data from a CSV source. It is critical to enforce explicit data types to prevent type mismatch errors during subsequent calculations.
file_path = "file:///path/to/stock_data.csv"
# Load data with headers
df_raw = spark_session.read.option("header", "true").option("encoding", "utf-8").csv(file_path)
# Cast columns to appropriate types
df_typed = df_raw \
.withColumn("Date", col("Date").cast(DateType())) \
.withColumn("Open", col("Open").cast(DoubleType())) \
.withColumn("High", col("High").cast(DoubleType())) \
.withColumn("Low", col("Low").cast(DoubleType())) \
.withColumn("Close", col("Close").cast(DoubleType())) \
.withColumn("Pre_Close", col("Pre_Close").cast(DoubleType())) \
.withColumn("Change", col("Change").cast(DoubleType())) \
.withColumn("Turnover_Rate", col("Turnover_Rate").cast(DoubleType())) \
.withColumn("Volume", col("Volume").cast(IntegerType())) \
.withColumn("MA5", col("MA5").cast(DoubleType())) \
.withColumn("MA10", col("MA10").cast(DoubleType()))
df_typed.printSchema()
Feature Engineering via RDD Transformations
While DataFrames are preferred, certain complex row-level logic may be easier to implement using RDD transformations before converting back to a DataFrame. We define helper functions to calculate derived features.
import pyspark.sql.types as T
def extend_row_with_data(original_row, new_field, value):
"""Adds a new field to a Row object."""
row_dict = original_row.asDict()
row_dict[new_field] = value
return T.Row(**row_dict)
def get_trend_direction(change_rate):
"""Encodes the price direction: 1 for Up/Flat, 0 for Down."""
return 1 if change_rate >= 0.0 else 0
def get_ma_status(short_ma, long_ma):
"""Encodes the relationship between moving averages."""
if short_ma > long_ma: return 1
if short_ma == long_ma: return 0
return -1
def get_weekday_index(date_obj):
"""Returns the weekday index (0=Monday, 6=Sunday)."""
return date_obj.weekday()
Apply these transformations to the underlying RDD. This step adds features for the day of the week, price trend, and moving average status.
# Convert DataFrame to RDD for custom transformations
rdd_transformed = df_typed.rdd
# Apply mapping functions
rdd_enriched = rdd_transformed.map(
lambda r: extend_row_with_data(r, "Weekday_Idx", get_weekday_index(r["Date"]))
)
rdd_enriched = rdd_enriched.map(
lambda r: extend_row_with_data(r, "Trend_Label", get_trend_direction(r["Change"]))
)
rdd_enriched = rdd_enriched.map(
lambda r: extend_row_with_data(r, "MA_Status", get_ma_status(r["MA5"], r["MA10"]))
)
# Convert back to DataFrame
df_features = spark_session.createDataFrame(rdd_enriched)
df_features.select("Date", "Close", "Change", "Trend_Label", "MA_Status").show(5)
Feature Preparation for Machine Learning
Spark ML algorithms require features to be contained in a single vector column. We use VectorAssembler to combine the selected input columns. Additionally, we rename the target column to label for standard ML compatibility.
# Define the feature columns
feature_columns = ["High", "Low", "Turnover_Rate", "Volume", "Weekday_Idx", "MA_Status"]
# Assemble features into a single vector column
assembler = VectorAssembler(inputCols=feature_columns, outputCol="features_vec")
# Rename target column to meet MLlib naming conventions
df_model = df_features.withColumnRenamed("Trend_Label", "label")
# Transform the data
df_assembled = assembler.transform(df_model)
df_assembled.select("features_vec", "label").show(5, truncate=False)
Model Training and Evaluation
Split the dataset into training and testing subsets. We utilize a Pipeline to streamline the workflow, chaining the assembler and the classifier.
# Split data: 80% training, 20% testing
train_data, test_data = df_assembled.randomSplit([0.8, 0.2], seed=42)
# Initialize Random Forest Classifier
rf_classifier = RandomForestClassifier(
labelCol="label",
featuresCol="features_vec",
numTrees=20, # Number of trees in the forest
maxDepth=5
)
# Create and execute the pipeline
pipeline = Pipeline(stages=[assembler, rf_classifier])
model = pipeline.fit(train_data)
Prediction and Performance Metrics
Apply the trained model to the test set to generate predictions. We evaluate the performance using the Area Under the ROC Curve (AUC), a standard metric for binary classification.
# Generate predictions
predictions = model.transform(test_data)
# Display sample predictions
predictions.select("Date", "label", "prediction", "probability").show(10)
# Evaluate model performance using AUC
evaluator = BinaryClassificationEvaluator(labelCol="label", metricName="areaUnderROC")
auc_score = evaluator.evaluate(predictions)
print(f"Area Under ROC (AUC): {auc_score}")
# Stop the session
spark_session.stop()