Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Non-blocking Integration of Multiprocessing Queues with Asyncio Event Loops

Tech 1

Standard multiprocessing.Queue and multiprocessing.Event operations are inherently blocking and incompatible with direct use inside asyncio event loops. Calling .get() or .wait() on these primitives suspends the entire OS thread, freezing the event loop and preventing other coroutines from exceuting. This creates a fundamental conflcit between Python's multiprocessing IPC mechanisms and cooperative multitasking.

To resolve this, isolate all multiprocessing interactions within a thread pool, allowing the event loop to continue processing other tasks while waiting for IPC operations to complete.

import asyncio
import multiprocessing
from concurrent.futures import ThreadPoolExecutor
from typing import Any

class AsyncQueueBridge:
    def __init__(self, maxsize: int = 0):
        ctx = multiprocessing.get_context('spawn')
        self._sync_queue = ctx.Queue(maxsize=maxsize)
        self._executor = ThreadPoolExecutor(max_workers=2)
    
    async def fetch(self, loop: asyncio.AbstractEventLoop) -> Any:
        """Non-blocking get from multiprocessing queue"""
        return await loop.run_in_executor(
            self._executor, 
            self._sync_queue.get
        )
    
    async def submit(self, loop: asyncio.AbstractEventLoop, item: Any):
        """Non-blocking put to multiprocessing queue"""
        return await loop.run_in_executor(
            self._executor,
            self._sync_queue.put,
            item
        )
    
    def underlying_queue(self):
        return self._sync_queue

async def coordinate_workers():
    loop = asyncio.get_running_loop()
    bridge = AsyncQueueBridge()
    shutdown_signal = asyncio.Event()
    
    async def ingest_stream():
        """Simulate async data source feeding into multiprocessing"""
        for idx in range(100):
            await bridge.submit(loop, {"id": idx, "payload": idx ** 2})
            await asyncio.sleep(0.01)
        await bridge.submit(loop, None)  # Sentinel value
    
    async def process_results():
        """Retrieve processed data from worker processes"""
        while True:
            result = await bridge.fetch(loop)
            if result is None:
                shutdown_signal.set()
                break
            await handle_result(result)
    
    def handle_result(data):
        # Simulate async I/O operation
        print(f"Processed: {data}")
    
    # Launch background CPU workers in separate processes
    cpu_worker_ctx = multiprocessing.get_context('spawn')
    worker_process = cpu_worker_ctx.Process(
        target=transform_worker,
        args=(bridge.underlying_queue(),)
    )
    worker_process.start()
    
    await asyncio.gather(
        ingest_stream(),
        process_results()
    )
    
    worker_process.join()

def transform_worker(queue):
    """CPU-bound operation running in separate process"""
    while True:
        item = queue.get()
        if item is None:
            queue.put(None)  # Propagate termination
            break
        # Heavy computation
        processed = {**item, "transformed": item["payload"] * 2}
        queue.put(processed)

if __name__ == "__main__":
    asyncio.run(coordinate_workers())

For scenarios requiring multiprocessing.Event synchronization, wrap the blocking .wait() call similarly:

async def await_event(
    mp_event: multiprocessing.Event, 
    loop: asyncio.AbstractEventLoop
):
    return await loop.run_in_executor(
        None,  # Default executor
        mp_event.wait
    )

This architecture maintains full concurrency: the event loop never blocks on IPC, worker processes handle CPU-intensive tasks in parallel, and memory isolation between processes prevents GIL contention.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.