Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Building Data Processing Pipelines with Python Generators

Tech 2

Processing large datasets that cannot fit entire into memory requires a streaming apporach. Python's generator functions provide an effective mechanism for cosntructing data pipelines, where each stage processes data incrementally.

Consider a scenario where you need to analyze a large collection of compressed log files stored in a directory structure:

logs/
    server-a/
        access-2023-01.gz
        access-2023-02.bz2
    server-b/
        access-2023-03.gz
        access-2023-04

Each file contains entries in a common log format:

192.168.1.10 - alice [15/Mar/2023:14:22:01 -0800] "GET /api/data HTTP/1.1" 200 512
203.0.113.5 - - [15/Mar/2023:14:22:03 -0800] "POST /submit HTTP/1.1" 201 128

You can implement a pipeline using generator functions that each perform a specific transformation:

import os
import fnmatch
import gzip
import bz2
import re

def find_files(pattern, root_dir):
    """Yield file paths matching pattern within directory tree."""
    for dir_path, _, file_list in os.walk(root_dir):
        for filename in fnmatch.filter(file_list, pattern):
            yield os.path.join(dir_path, filename)

def open_files(file_paths):
    """Yield file objects, automatically closing previous file."""
    for path in file_paths:
        if path.endswith('.gz'):
            file_obj = gzip.open(path, 'rt')
        elif path.endswith('.bz2'):
            file_obj = bz2.open(path, 'rt')
        else:
            file_obj = open(path, 'rt')
        yield file_obj
        file_obj.close()

def merge_streams(streams):
    """Combine multiple iterators into a single sequence."""
    for stream in streams:
        yield from stream

def filter_lines(regex_pattern, text_lines):
    """Yield lines matching the regular expression."""
    compiled_pattern = re.compile(regex_pattern)
    for line_text in text_lines:
        if compiled_pattern.search(line_text):
            yield line_text

These components can be connected to form a processing pipeline. To example, to find all log entries containing "error":

log_files = find_files('access-*.gz', 'logs')
opened_files = open_files(log_files)
all_lines = merge_streams(opened_files)
error_lines = filter_lines('(?i)error', all_lines)

for entry in error_lines:
    print(entry)

The pipeline can be extended with additional processing stages using generator expressions. This example extracts and sums the response sizes from matching entreis:

log_files = find_files('access-*', 'logs')
opened_files = open_files(log_files)
all_lines = merge_streams(opened_files)
error_lines = filter_lines('(?i)error', all_lines)

size_fields = (line.split()[-1] for line in error_lines)
sizes = (int(val) for val in size_fields if val.isdigit())
total_size = sum(sizes)
print(f'Total response size: {total_size}')

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.