Time Series Feature Engineering for Electricity Demand Forecasting
Core Feature Categories in Time Series Prediction
Effective time series forecasting relies heavily on feature engineering. The following categories represent essential feature types that capture different temporal patterns in electricity demand data.
1. Temporal Decomposition Features
Date-based features can be decomposed into multiple granularity levels:
# Extract date components from timestamp
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['week'] = df['date'].dt.isocalendar().week
df['day_of_week'] = df['date'].dt.dayofweek
df['day_of_year'] = df['date'].dt.dayofyear
df['hour'] = df['date'].dt.hour
2. Cyclcial Encoding
Periodic patterns require proper encoding to capture repetition:
import numpy as np
def cyclical_encode(df, col, period):
df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period)
df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period)
return df
# Encode weekly periodicity (period=7)
df = cyclical_encode(df, 'day_of_week', 7)
# Encode daily periodicity (period=24)
df = cyclical_encode(df, 'hour', 24)
3. Trend Extraction
Long-term trends can be captured through multiple appraoches:
# Moving average as trend indicator
df['trend_ma7'] = df.groupby('series_id')['value'].transform(
lambda x: x.rolling(window=7, min_periods=1).mean()
)
df['trend_ma30'] = df.groupby('series_id')['value'].transform(
lambda x: x.rolling(window=30, min_periods=1).mean()
)
# Linear trend coefficient using least squares
from scipy import stats
def compute_trend_slope(group):
x = np.arange(len(group))
slope, _, _, _, _ = stats.linregress(x, group)
return slope
df['trend_slope'] = df.groupby('series_id')['value'].transform(compute_trend_slope)
Advanced Feature Engineering Strategies
Historical Shift Features (Lag Features)
Lag features capture the relationship between past values and current predictions:
def create_lag_features(df, target_col, id_col, max_lag=35, min_lag=10):
for lag in range(min_lag, max_lag + 1):
df[f'{target_col}_lag{lag}'] = df.groupby(id_col)[target_col].shift(lag)
return df
Difference Features
First-order and higher-order differences capture rate of change:
def create_diff_features(df, base_col, id_col, max_order=3):
for order in range(1, max_order + 1):
diff_col = f'{base_col}_diff{order}'
df[diff_col] = df.groupby(id_col)[base_col].diff(order)
return df
# Usage: creates diff1, diff2, diff3 based on lag10 features
df = create_diff_features(df, 'target_lag10', 'id')
Rolling Window Statistics
Rolling statistics summarize local patterns within time windows:
def create_rolling_stats(df, target_col, id_col, windows=[15, 30, 50, 70]):
for win in windows:
rolled = df.groupby(id_col)[target_col].rolling(
window=win,
min_periods=3,
closed='left'
)
df[f'{target_col}_win{win}_mean'] = rolled.mean().values
df[f'{target_col}_win{win}_max'] = rolled.max().values
df[f'{target_col}_win{win}_min'] = rolled.min().values
df[f'{target_col}_win{win}_std'] = rolled.std().values
df[f'{target_col}_win{win}_median'] = rolled.median().values
return df
Model Ensemble Techniques
Weighted Average Ensemble
Combining predictions from multiple gradient boosting models:
import lightgbm as lgb
import xgboost as xgb
from catboost import CatBoostRegressor
from sklearn.model_selection import KFold
from sklearn.metrics import mean_absolute_error
import numpy as np
def cross_validate_model(framework, train_data, train_labels, test_data, model_name, seed=42):
"""
Unified cross-validation interface for multiple ML frameworks.
"""
kfold = KFold(n_splits=5, shuffle=True, random_state=seed)
oof_preds = np.zeros(len(train_data))
test_preds = np.zeros(len(test_data))
fold_scores = []
for fold_idx, (tr_idx, val_idx) in enumerate(kfold.split(train_data)):
X_tr, y_tr = train_data.iloc[tr_idx], train_labels[tr_idx]
X_val, y_val = train_data.iloc[val_idx], train_labels[val_idx]
if model_name == 'lgb':
params = {
'objective': 'regression',
'metric': 'mae',
'num_leaves': 64,
'learning_rate': 0.05,
'feature_fraction': 0.8,
'bagging_fraction': 0.8,
'bagging_freq': 4,
'lambda_l1': 0.1,
'lambda_l2': 10,
'min_child_weight': 6,
'verbose': -1,
'seed': seed
}
train_set = framework.Dataset(X_tr, label=y_tr)
val_set = framework.Dataset(X_val, label=y_val)
model = framework.train(
params, train_set,
num_boost_round=1000,
valid_sets=[train_set, val_set],
callbacks=[lgb.early_stopping(100), lgb.log_evaluation(200)]
)
val_pred = model.predict(X_val, num_iteration=model.best_iteration)
test_pred = model.predict(test_data, num_iteration=model.best_iteration)
elif model_name == 'xgb':
params = {
'objective': 'reg:squarederror',
'eval_metric': 'mae',
'max_depth': 6,
'learning_rate': 0.05,
'subsample': 0.7,
'colsample_bytree': 0.7,
'reg_alpha': 0.1,
'reg_lambda': 10,
'seed': seed
}
dtrain = framework.DMatrix(X_tr, label=y_tr)
dval = framework.DMatrix(X_val, label=y_val)
dtest = framework.DMatrix(test_data)
model = framework.train(
params, dtrain,
num_boost_round=1000,
evals=[(dtrain, 'train'), (dval, 'valid')],
early_stopping_rounds=100,
verbose_eval=200
)
val_pred = model.predict(dval)
test_pred = model.predict(dtest)
elif model_name == 'cat':
params = {
'iterations': 1000,
'learning_rate': 0.05,
'depth': 6,
'loss_function': 'MAE',
'random_seed': seed,
'verbose': 200,
'early_stopping_rounds': 100
}
model = framework(**params)
model.fit(
X_tr, y_tr,
eval_set=(X_val, y_val),
use_best_model=True
)
val_pred = model.predict(X_val)
test_pred = model.predict(test_data)
oof_preds[val_idx] = val_pred
test_preds += test_pred / kfold.n_splits
fold_score = mean_absolute_error(y_val, val_pred)
fold_scores.append(fold_score)
print(f"Fold {fold_idx + 1}: MAE = {fold_score:.4f}")
print(f"\nMean CV MAE: {np.mean(fold_scores):.4f} (+/- {np.std(fold_scores):.4f})")
return oof_preds, test_preds
Stacking Ensemble
A two-layer stacking approach uses base model predictions as meta-features:
from sklearn.linear_model import Ridge
from sklearn.model_selection import RepeatedKFold
def stacking_ensemble(base_oof_preds, base_test_preds, train_labels, n_splits=5, n_repeats=2):
"""
Second-level meta-learner that combines base model predictions.
"""
train_meta = np.column_stack(base_oof_preds)
test_meta = np.column_stack(base_test_preds)
meta_oof = np.zeros(len(train_meta))
meta_test = np.zeros(len(test_meta))
fold_scores = []
kfold = RepeatedKFold(n_splits=n_splits, n_repeats=n_repeats, random_state=2021)
for fold_idx, (tr_idx, val_idx) in enumerate(kfold.split(train_meta)):
X_tr_meta, y_tr_meta = train_meta[tr_idx], train_labels[tr_idx]
X_val_meta, y_val_meta = train_meta[val_idx], train_labels[val_idx]
meta_model = Ridge(alpha=1.0, random_state=2021)
meta_model.fit(X_tr_meta, y_tr_meta)
meta_oof[val_idx] = meta_model.predict(X_val_meta)
meta_test += meta_model.predict(test_meta) / (n_splits * n_repeats)
fold_score = mean_absolute_error(y_val_meta, meta_oof[val_idx])
fold_scores.append(fold_score)
print(f"Fold {fold_idx + 1}: MAE = {fold_score:.4f}")
print(f"\nStacking Mean MAE: {np.mean(fold_scores):.4f}")
return meta_oof, meta_test
Deep Learning Approach: LSTM for Sequence Prediction
Long Short-Term Memory networks excel at capturing temporal dependencies in electricity demand sequences.
Data Preprocessing Pipeline
import pandas as pd
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
def prepare_sequences(df, target_col, id_col, lookback=7):
"""
Create sequences for LSTM input with proper feature engineering.
"""
# Cyclical features for time components
df['weekly_cycle'] = df['dt'] % 7
df['monthly_cycle'] = (df['dt'] // 30) % 12
df['quarterly_cycle'] = (df['dt'] // 90) % 4
# Encode cyclical patterns
for col, period in [('weekly_cycle', 7), ('monthly_cycle', 12), ('quarterly_cycle', 4)]:
df[f'{col}_sin'] = np.sin(2 * np.pi * df[col] / period)
df[f'{col}_cos'] = np.cos(2 * np.pi * df[col] / period)
# Lag features for recent history
for lag in range(1, 8):
df[f'lag_{lag}'] = df.groupby(id_col)[target_col].shift(lag)
# Rolling statistics
for window in [3, 7, 14]:
rolled = df.groupby(id_col)[target_col].rolling(window=window)
df[f'roll_mean_{window}'] = rolled.mean().reset_index(0, drop=True)
df[f'roll_std_{window}'] = rolled.std().reset_index(0, drop=True)
# Fill NaN values
df.fillna(0, inplace=True)
return df
def reshape_to_sequences(features, labels, lookback):
"""
Reshape 2D data into 3D sequences for LSTM (samples, timesteps, features).
"""
sequences = []
for i in range(len(features) - lookback):
sequences.append(features[i:i + lookback])
return np.array(sequences), labels[lookback:]
LSTM Model Architecture
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense, Dropout, BatchNormalization
from tensorflow.keras.callbacks import EarlyStopping
from sklearn.metrics import mean_squared_error
def build_lstm_model(input_shape):
"""
Two-layer LSTM architecture with dropout regularization.
"""
model = Sequential([
LSTM(64, return_sequences=True, input_shape=input_shape),
Dropout(0.2),
BatchNormalization(),
LSTM(32, return_sequences=False),
Dropout(0.2),
BatchNormalization(),
Dense(16, activation='relu'),
Dense(1)
])
model.compile(optimizer='adam', loss='mse', metrics=['mae'])
return model
# Complete training pipeline
def train_lstm_model(train_df, test_df, target_col, id_col, lookback=7, epochs=50, batch_size=32):
# Prepare data
train_processed = prepare_sequences(train_df.copy(), target_col, id_col, lookback)
test_processed = prepare_sequences(test_df.copy(), target_col, id_col, lookback)
# Define feature columns (exclude non-feature columns)
feature_cols = [c for c in train_processed.columns if c not in [target_col, id_col, 'type']]
X_train_full = train_processed[feature_cols].values
y_train_full = train_processed[target_col].values
X_test = test_processed[feature_cols].values
# Standardize features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_full)
X_test_scaled = scaler.transform(X_test)
# Reshape for LSTM
X_lstm, y_lstm = reshape_to_sequences(X_train_scaled, y_train_full, lookback)
X_test_lstm, _ = reshape_to_sequences(X_test_scaled, np.zeros(len(X_test_scaled)), lookback)
# Split train/validation
X_tr, X_val, y_tr, y_val = train_test_split(
X_lstm, y_lstm, test_size=0.2, random_state=42
)
# Build and train model
model = build_lstm_model(input_shape=(X_tr.shape[1], X_tr.shape[2]))
early_stop = EarlyStopping(
monitor='val_loss',
patience=10,
restore_best_weights=True
)
history = model.fit(
X_tr, y_tr,
validation_data=(X_val, y_val),
epochs=epochs,
batch_size=batch_size,
callbacks=[early_stop],
verbose=1
)
# Evaluate on validation set
val_pred = model.predict(X_val)
val_mse = mean_squared_error(y_val, val_pred)
print(f"Validation MSE: {val_mse:.4f}")
# Generate test predictions
test_pred = model.predict(X_test_lstm)
return model, test_pred, history
Feature Selection and Validation
Not all engineered features contribute equal to prediction accuracy. Feature importance analysis helps identify the most impactful features:
def analyze_feature_importance(model, feature_names, top_n=20):
"""
Extract and rank feature importances from LightGBM model.
"""
importance = pd.DataFrame({
'feature': feature_names,
'importance': model.feature_importance(importance_type='gain')
}).sort_values('importance', ascending=False)
return importance.head(top_n)
def remove_low_importance_features(X_train, X_val, importance_threshold=0.01):
"""
Filter out features with negligible importance.
"""
# Train initial model to get importance scores
model = lgb.LGBMRegressor(n_estimators=100, verbose=-1)
model.fit(X_train, y_train)
importance_df = pd.DataFrame({
'feature': X_train.columns,
'importance': model.feature_importances_
})
selected_features = importance_df[
importance_df['importance'] > importance_threshold
]['feature'].tolist()
return selected_features