Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Essential asyncio APIs for Python Asynchronous Programming

Tech 1

asyncio.run

Launches a fresh event loop on the current thread and executes the provided coroutine until completion.

# Source code analysis:
def run(main, *, debug=False):
    if events._get_running_loop() is not None:
        # Check if an event loop is already running in the current thread
        # Raises an error if one exists
        raise RuntimeError(
            "asyncio.run() cannot be called from a running event loop")

    if not coroutines.iscoroutine(main):
        # Verify that the input is actually a coroutine
        raise ValueError("a coroutine was expected, got {!r}".format(main))

    # Instantiate a new event loop
    loop = events.new_event_loop()
    try:
        # Register this loop as the current thread's default loop
        events.set_event_loop(loop)
        loop.set_debug(debug)
        # Begin execution
        return loop.run_until_complete(main)
    finally:
        try:
            # Cancel any remaining tasks before shutdown
            _cancel_all_tasks(loop)
            loop.run_until_complete(loop.shutdown_asyncgens())
        finally:
            # Detach the loop from the thread
            events.set_event_loop(None)
            # Clean up resources
            loop.close()

Key points:

  1. Each thread can only host a single event loop at any given time.
  2. asyncio.run() creates a fresh event loop, executes the coroutine, then cleans up by removing and closing the loop.

asyncio.new_event_loop and asyncio.set_event_loop

my_loop = asyncio.new_event_loop()  # Creates a new event loop in the current thread
asyncio.set_event_loop(my_loop)  # Sets it as the default loop for this thread

# set_event_loop implementation
def set_event_loop(self, loop):
    """Associate an event loop with the current thread."""
    self._local._set_called = True  # Track that a loop has been explicitly set
    assert loop is None or isinstance(loop, AbstractEventLoop)
    self._local._loop = loop  # Store the loop reference

Key points:

  1. These methods are useful for manual event loop management in custom threading scenarios. Always call loop.close() when finished to prevent resource leaks.
  2. Setting a loop doesn't start it—events._get_running_loop() returns None at this stage.
  3. Avoid using asyncio.run() when manually managing loops, as it will create its own loop and overwrite yours.

asyncio.get_event_loop

Retrieves the event loop for the current thread.

# Source code analysis:
## get_event_loop() ##
def get_event_loop():
    # This function is implemented in C (_asynciomodule.c)
    # First, try to get the running loop
    current_loop = _get_running_loop()
    if current_loop is not None:
        return current_loop

    # Fall back to the policy-based approach
    return get_event_loop_policy().get_event_loop()

## get_event_loop_policy().get_event_loop() ##
def get_event_loop(self):
    # Auto-create a loop only if: no existing loop, never set manually,
    # and we're in the main thread
    if (self._local._loop is None and
            not self._local._set_called and
            isinstance(threading.current_thread(), threading._MainThread)):
        self.set_event_loop(self.new_event_loop())

    # Raise error if still no loop found
    if self._local._loop is None:
        raise RuntimeError('There is no current event loop in thread %r.'
                           % threading.current_thread().name)

    return self._local._loop

Key points:

  1. This method works regardless of whether the loop is actively running.
  2. In the main thread, if no loop has ever been set, it automatically creates one.
  3. After asyncio.run() completes, calling asyncio.get_event_loop() will raise an exception because the loop has been closed.

asyncio.wait_for

Wraps a coroutine with timeout handling. The operation fails if it exceeds the specified duration.

# Source code analysis:
async def wait_for(fut, timeout, *, loop=None):
    # Requires an existing event loop; won't create one automatically
    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8,"
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    # No timeout constraint
    if timeout is None:
        return await fut

    # Immediate timeout (zero or negative)
    if timeout <= 0:
        fut = ensure_future(fut, loop=loop)
        if fut.done():
            return fut.result()
        fut.cancel()
        raise exceptions.TimeoutError()

    # Create a future to act as a timeout sentinel
    waiter = loop.create_future()
    # Schedule timeout callback
    timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
    # Callback to release waiter when task completes
    cb = functools.partial(_release_waiter, waiter)
    # Wrap the coroutine in a future
    fut = ensure_future(fut, loop=loop)
    # Register completion callback
    fut.add_done_callback(cb)

    try:
        # Wait for either completion or timeout
        try:
            await waiter
        except exceptions.CancelledError:
            fut.remove_done_callback(cb)
            fut.cancel()
            raise

        if fut.done():
            return fut.result()
        else:
            fut.remove_done_callback(cb)
            # Send cancellation and wait for graceful shutdown
            await _cancel_and_wait(fut, loop=loop)
            raise exceptions.TimeoutError()
    finally:
        timeout_handle.cancel()

Key points:

  1. Requires an active event loop—does not instantiate one on its own.
  2. On timeout, wait_for attempts to cancel the future and waits for it to terminate cleanly.
  3. Only tracks the outer future; nested tasks created with create_task inside the coroutine must be managed separately. Neglecting them causes resource leaks.
  4. In CPU-intensive sections, inserting await asyncio.sleep(0) yields control, preventing other coroutines from starvation and allowing wait_for to cancel tasks promptly.

asyncio.wait

Accepts a collection of tasks and waits according to a configurable completion strategy. Returns two sets: done tasks and pending tasks.

# Source code analysis:
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
    if futures.isfuture(fs) or coroutines.iscoroutine(fs):
        raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    if not fs:
        raise ValueError('Set of coroutines/Futures is empty.')

    # Three completion strategies available:
    # FIRST_COMPLETED: return when any task finishes
    # FIRST_EXCEPTION: return when any task raises an exception
    # ALL_COMPLETED: return when all tasks finish
    if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
        raise ValueError(f'Invalid return_when value: {return_when}')

    if loop is None:
        loop = events.get_running_loop()
    else:
        warnings.warn("The loop argument is deprecated since Python 3.8,"
                      "and scheduled for removal in Python 3.10.",
                      DeprecationWarning, stacklevel=2)

    # Normalize all inputs to futures and schedule them
    fs = {ensure_future(f, loop=loop) for f in set(fs)}

    return await _wait(fs, timeout, return_when, loop)

asyncio.create_task

Encapsulates a coroutine into a Task object, automatically schedules it in the event loop, and returns the Task for awaiting.

# Source implementation:
def create_task(coro, *, name=None):
    loop = events.get_running_loop()
    task = loop.create_task(coro)
    _set_task_name(task, name)
    return task

Key points:

  1. Tasks are queued for execution but don't run immediately—they execute on the next event loop iteration. The delay is typically negligible.

asyncio.gather

Executes multiple coroutines or Futures concurrently and collects their results.

def gather(*coros_or_futures, loop=None, return_exceptions=False):
    ...

Key points:

  1. The return_exceptions parameter controls error handling behavior.
  2. When False, any task exception propagates to the caller and terminates gather immediately.
  3. When True, exceptions are captured and returned as results, allowing all tasks to complete normally.

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.