Building Data Processing Pipelines with Python Generators
Processing large datasets that cannot fit entire into memory requires a streaming apporach. Python's generator functions provide an effective mechanism for cosntructing data pipelines, where each stage processes data incrementally.
Consider a scenario where you need to analyze a large collection of compressed log files stored in a directory structure:
logs/
server-a/
access-2023-01.gz
access-2023-02.bz2
server-b/
access-2023-03.gz
access-2023-04
Each file contains entries in a common log format:
192.168.1.10 - alice [15/Mar/2023:14:22:01 -0800] "GET /api/data HTTP/1.1" 200 512
203.0.113.5 - - [15/Mar/2023:14:22:03 -0800] "POST /submit HTTP/1.1" 201 128
You can implement a pipeline using generator functions that each perform a specific transformation:
import os
import fnmatch
import gzip
import bz2
import re
def find_files(pattern, root_dir):
"""Yield file paths matching pattern within directory tree."""
for dir_path, _, file_list in os.walk(root_dir):
for filename in fnmatch.filter(file_list, pattern):
yield os.path.join(dir_path, filename)
def open_files(file_paths):
"""Yield file objects, automatically closing previous file."""
for path in file_paths:
if path.endswith('.gz'):
file_obj = gzip.open(path, 'rt')
elif path.endswith('.bz2'):
file_obj = bz2.open(path, 'rt')
else:
file_obj = open(path, 'rt')
yield file_obj
file_obj.close()
def merge_streams(streams):
"""Combine multiple iterators into a single sequence."""
for stream in streams:
yield from stream
def filter_lines(regex_pattern, text_lines):
"""Yield lines matching the regular expression."""
compiled_pattern = re.compile(regex_pattern)
for line_text in text_lines:
if compiled_pattern.search(line_text):
yield line_text
These components can be connected to form a processing pipeline. To example, to find all log entries containing "error":
log_files = find_files('access-*.gz', 'logs')
opened_files = open_files(log_files)
all_lines = merge_streams(opened_files)
error_lines = filter_lines('(?i)error', all_lines)
for entry in error_lines:
print(entry)
The pipeline can be extended with additional processing stages using generator expressions. This example extracts and sums the response sizes from matching entreis:
log_files = find_files('access-*', 'logs')
opened_files = open_files(log_files)
all_lines = merge_streams(opened_files)
error_lines = filter_lines('(?i)error', all_lines)
size_fields = (line.split()[-1] for line in error_lines)
sizes = (int(val) for val in size_fields if val.isdigit())
total_size = sum(sizes)
print(f'Total response size: {total_size}')