Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Time Series Feature Engineering for Electricity Demand Forecasting

Tech 1

Core Feature Categories in Time Series Prediction

Effective time series forecasting relies heavily on feature engineering. The following categories represent essential feature types that capture different temporal patterns in electricity demand data.

1. Temporal Decomposition Features

Date-based features can be decomposed into multiple granularity levels:

# Extract date components from timestamp
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['week'] = df['date'].dt.isocalendar().week
df['day_of_week'] = df['date'].dt.dayofweek
df['day_of_year'] = df['date'].dt.dayofyear
df['hour'] = df['date'].dt.hour

2. Cyclcial Encoding

Periodic patterns require proper encoding to capture repetition:

import numpy as np

def cyclical_encode(df, col, period):
    df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period)
    df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period)
    return df

# Encode weekly periodicity (period=7)
df = cyclical_encode(df, 'day_of_week', 7)

# Encode daily periodicity (period=24)
df = cyclical_encode(df, 'hour', 24)

3. Trend Extraction

Long-term trends can be captured through multiple appraoches:

# Moving average as trend indicator
df['trend_ma7'] = df.groupby('series_id')['value'].transform(
    lambda x: x.rolling(window=7, min_periods=1).mean()
)
df['trend_ma30'] = df.groupby('series_id')['value'].transform(
    lambda x: x.rolling(window=30, min_periods=1).mean()
)

# Linear trend coefficient using least squares
from scipy import stats

def compute_trend_slope(group):
    x = np.arange(len(group))
    slope, _, _, _, _ = stats.linregress(x, group)
    return slope

df['trend_slope'] = df.groupby('series_id')['value'].transform(compute_trend_slope)

Advanced Feature Engineering Strategies

Historical Shift Features (Lag Features)

Lag features capture the relationship between past values and current predictions:

def create_lag_features(df, target_col, id_col, max_lag=35, min_lag=10):
    for lag in range(min_lag, max_lag + 1):
        df[f'{target_col}_lag{lag}'] = df.groupby(id_col)[target_col].shift(lag)
    return df

Difference Features

First-order and higher-order differences capture rate of change:

def create_diff_features(df, base_col, id_col, max_order=3):
    for order in range(1, max_order + 1):
        diff_col = f'{base_col}_diff{order}'
        df[diff_col] = df.groupby(id_col)[base_col].diff(order)
    return df

# Usage: creates diff1, diff2, diff3 based on lag10 features
df = create_diff_features(df, 'target_lag10', 'id')

Rolling Window Statistics

Rolling statistics summarize local patterns within time windows:

def create_rolling_stats(df, target_col, id_col, windows=[15, 30, 50, 70]):
    for win in windows:
        rolled = df.groupby(id_col)[target_col].rolling(
            window=win, 
            min_periods=3, 
            closed='left'
        )
        
        df[f'{target_col}_win{win}_mean'] = rolled.mean().values
        df[f'{target_col}_win{win}_max'] = rolled.max().values
        df[f'{target_col}_win{win}_min'] = rolled.min().values
        df[f'{target_col}_win{win}_std'] = rolled.std().values
        df[f'{target_col}_win{win}_median'] = rolled.median().values
    
    return df

Model Ensemble Techniques

Weighted Average Ensemble

Combining predictions from multiple gradient boosting models:

import lightgbm as lgb
import xgboost as xgb
from catboost import CatBoostRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error
import numpy as np

def cross_validate_model(framework, train_data, train_labels, test_data, model_name, seed=42):
    """
    Unified cross-validation interface for multiple ML frameworks.
    """
    kfold = KFold(n_splits=5, shuffle=True, random_state=seed)
    oof_preds = np.zeros(len(train_data))
    test_preds = np.zeros(len(test_data))
    fold_scores = []
    
    for fold_idx, (tr_idx, val_idx) in enumerate(kfold.split(train_data)):
        X_tr, y_tr = train_data.iloc[tr_idx], train_labels[tr_idx]
        X_val, y_val = train_data.iloc[val_idx], train_labels[val_idx]
        
        if model_name == 'lgb':
            params = {
                'objective': 'regression',
                'metric': 'mae',
                'num_leaves': 64,
                'learning_rate': 0.05,
                'feature_fraction': 0.8,
                'bagging_fraction': 0.8,
                'bagging_freq': 4,
                'lambda_l1': 0.1,
                'lambda_l2': 10,
                'min_child_weight': 6,
                'verbose': -1,
                'seed': seed
            }
            train_set = framework.Dataset(X_tr, label=y_tr)
            val_set = framework.Dataset(X_val, label=y_val)
            model = framework.train(
                params, train_set, 
                num_boost_round=1000,
                valid_sets=[train_set, val_set],
                callbacks=[lgb.early_stopping(100), lgb.log_evaluation(200)]
            )
            val_pred = model.predict(X_val, num_iteration=model.best_iteration)
            test_pred = model.predict(test_data, num_iteration=model.best_iteration)
            
        elif model_name == 'xgb':
            params = {
                'objective': 'reg:squarederror',
                'eval_metric': 'mae',
                'max_depth': 6,
                'learning_rate': 0.05,
                'subsample': 0.7,
                'colsample_bytree': 0.7,
                'reg_alpha': 0.1,
                'reg_lambda': 10,
                'seed': seed
            }
            dtrain = framework.DMatrix(X_tr, label=y_tr)
            dval = framework.DMatrix(X_val, label=y_val)
            dtest = framework.DMatrix(test_data)
            
            model = framework.train(
                params, dtrain,
                num_boost_round=1000,
                evals=[(dtrain, 'train'), (dval, 'valid')],
                early_stopping_rounds=100,
                verbose_eval=200
            )
            val_pred = model.predict(dval)
            test_pred = model.predict(dtest)
            
        elif model_name == 'cat':
            params = {
                'iterations': 1000,
                'learning_rate': 0.05,
                'depth': 6,
                'loss_function': 'MAE',
                'random_seed': seed,
                'verbose': 200,
                'early_stopping_rounds': 100
            }
            model = framework(**params)
            model.fit(
                X_tr, y_tr,
                eval_set=(X_val, y_val),
                use_best_model=True
            )
            val_pred = model.predict(X_val)
            test_pred = model.predict(test_data)
        
        oof_preds[val_idx] = val_pred
        test_preds += test_pred / kfold.n_splits
        
        fold_score = mean_absolute_error(y_val, val_pred)
        fold_scores.append(fold_score)
        print(f"Fold {fold_idx + 1}: MAE = {fold_score:.4f}")
    
    print(f"\nMean CV MAE: {np.mean(fold_scores):.4f} (+/- {np.std(fold_scores):.4f})")
    return oof_preds, test_preds

Stacking Ensemble

A two-layer stacking approach uses base model predictions as meta-features:

from sklearn.linear_model import Ridge
from sklearn.model_selection import RepeatedKFold

def stacking_ensemble(base_oof_preds, base_test_preds, train_labels, n_splits=5, n_repeats=2):
    """
    Second-level meta-learner that combines base model predictions.
    """
    train_meta = np.column_stack(base_oof_preds)
    test_meta = np.column_stack(base_test_preds)
    
    meta_oof = np.zeros(len(train_meta))
    meta_test = np.zeros(len(test_meta))
    fold_scores = []
    
    kfold = RepeatedKFold(n_splits=n_splits, n_repeats=n_repeats, random_state=2021)
    
    for fold_idx, (tr_idx, val_idx) in enumerate(kfold.split(train_meta)):
        X_tr_meta, y_tr_meta = train_meta[tr_idx], train_labels[tr_idx]
        X_val_meta, y_val_meta = train_meta[val_idx], train_labels[val_idx]
        
        meta_model = Ridge(alpha=1.0, random_state=2021)
        meta_model.fit(X_tr_meta, y_tr_meta)
        
        meta_oof[val_idx] = meta_model.predict(X_val_meta)
        meta_test += meta_model.predict(test_meta) / (n_splits * n_repeats)
        
        fold_score = mean_absolute_error(y_val_meta, meta_oof[val_idx])
        fold_scores.append(fold_score)
        print(f"Fold {fold_idx + 1}: MAE = {fold_score:.4f}")
    
    print(f"\nStacking Mean MAE: {np.mean(fold_scores):.4f}")
    return meta_oof, meta_test

Deep Learning Approach: LSTM for Sequence Prediction

Long Short-Term Memory networks excel at capturing temporal dependencies in electricity demand sequences.

Data Preprocessing Pipeline

import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


def prepare_sequences(df, target_col, id_col, lookback=7):
    """
    Create sequences for LSTM input with proper feature engineering.
    """
    # Cyclical features for time components
    df['weekly_cycle'] = df['dt'] % 7
    df['monthly_cycle'] = (df['dt'] // 30) % 12
    df['quarterly_cycle'] = (df['dt'] // 90) % 4
    
    # Encode cyclical patterns
    for col, period in [('weekly_cycle', 7), ('monthly_cycle', 12), ('quarterly_cycle', 4)]:
        df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period)
        df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period)
    
    # Lag features for recent history
    for lag in range(1, 8):
        df[f'lag_{lag}'] = df.groupby(id_col)[target_col].shift(lag)
    
    # Rolling statistics
    for window in [3, 7, 14]:
        rolled = df.groupby(id_col)[target_col].rolling(window=window)
        df[f'roll_mean_{window}'] = rolled.mean().reset_index(0, drop=True)
        df[f'roll_std_{window}'] = rolled.std().reset_index(0, drop=True)
    
    # Fill NaN values
    df.fillna(0, inplace=True)
    
    return df

def reshape_to_sequences(features, labels, lookback):
    """
    Reshape 2D data into 3D sequences for LSTM (samples, timesteps, features).
    """
    sequences = []
    for i in range(len(features) - lookback):
        sequences.append(features[i:i + lookback])
    return np.array(sequences), labels[lookback:]

LSTM Model Architecture

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import mean_squared_error

def build_lstm_model(input_shape):
    """
    Two-layer LSTM architecture with dropout regularization.
    """
    model = Sequential([
        LSTM(64, return_sequences=True, input_shape=input_shape),
        Dropout(0.2),
        BatchNormalization(),
        
        LSTM(32, return_sequences=False),
        Dropout(0.2),
        BatchNormalization(),
        
        Dense(16, activation='relu'),
        Dense(1)
    ])
    
    model.compile(optimizer='adam', loss='mse', metrics=['mae'])
    return model

# Complete training pipeline
def train_lstm_model(train_df, test_df, target_col, id_col, lookback=7, epochs=50, batch_size=32):
    # Prepare data
    train_processed = prepare_sequences(train_df.copy(), target_col, id_col, lookback)
    test_processed = prepare_sequences(test_df.copy(), target_col, id_col, lookback)
    
    # Define feature columns (exclude non-feature columns)
    feature_cols = [c for c in train_processed.columns if c not in [target_col, id_col, 'type']]
    
    X_train_full = train_processed[feature_cols].values
    y_train_full = train_processed[target_col].values
    X_test = test_processed[feature_cols].values
    
    # Standardize features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train_full)
    X_test_scaled = scaler.transform(X_test)
    
    # Reshape for LSTM
    X_lstm, y_lstm = reshape_to_sequences(X_train_scaled, y_train_full, lookback)
    X_test_lstm, _ = reshape_to_sequences(X_test_scaled, np.zeros(len(X_test_scaled)), lookback)
    
    # Split train/validation
    X_tr, X_val, y_tr, y_val = train_test_split(
        X_lstm, y_lstm, test_size=0.2, random_state=42
    )
    
    # Build and train model
    model = build_lstm_model(input_shape=(X_tr.shape[1], X_tr.shape[2]))
    
    early_stop = EarlyStopping(
        monitor='val_loss',
        patience=10,
        restore_best_weights=True
    )
    
    history = model.fit(
        X_tr, y_tr,
        validation_data=(X_val, y_val),
        epochs=epochs,
        batch_size=batch_size,
        callbacks=[early_stop],
        verbose=1
    )
    
    # Evaluate on validation set
    val_pred = model.predict(X_val)
    val_mse = mean_squared_error(y_val, val_pred)
    print(f"Validation MSE: {val_mse:.4f}")
    
    # Generate test predictions
    test_pred = model.predict(X_test_lstm)
    
    return model, test_pred, history

Feature Selection and Validation

Not all engineered features contribute equal to prediction accuracy. Feature importance analysis helps identify the most impactful features:

def analyze_feature_importance(model, feature_names, top_n=20):
    """
    Extract and rank feature importances from LightGBM model.
    """
    importance = pd.DataFrame({
        'feature': feature_names,
        'importance': model.feature_importance(importance_type='gain')
    }).sort_values('importance', ascending=False)
    
    return importance.head(top_n)

def remove_low_importance_features(X_train, X_val, importance_threshold=0.01):
    """
    Filter out features with negligible importance.
    """
    # Train initial model to get importance scores
    model = lgb.LGBMRegressor(n_estimators=100, verbose=-1)
    model.fit(X_train, y_train)
    
    importance_df = pd.DataFrame({
        'feature': X_train.columns,
        'importance': model.feature_importances_
    })
    
    selected_features = importance_df[
        importance_df['importance'] > importance_threshold
    ]['feature'].tolist()
    
    return selected_features

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.