Essential asyncio APIs for Python Asynchronous Programming
asyncio.run
Launches a fresh event loop on the current thread and executes the provided coroutine until completion.
# Source code analysis:
def run(main, *, debug=False):
if events._get_running_loop() is not None:
# Check if an event loop is already running in the current thread
# Raises an error if one exists
raise RuntimeError(
"asyncio.run() cannot be called from a running event loop")
if not coroutines.iscoroutine(main):
# Verify that the input is actually a coroutine
raise ValueError("a coroutine was expected, got {!r}".format(main))
# Instantiate a new event loop
loop = events.new_event_loop()
try:
# Register this loop as the current thread's default loop
events.set_event_loop(loop)
loop.set_debug(debug)
# Begin execution
return loop.run_until_complete(main)
finally:
try:
# Cancel any remaining tasks before shutdown
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
finally:
# Detach the loop from the thread
events.set_event_loop(None)
# Clean up resources
loop.close()
Key points:
- Each thread can only host a single event loop at any given time.
asyncio.run()creates a fresh event loop, executes the coroutine, then cleans up by removing and closing the loop.
asyncio.new_event_loop and asyncio.set_event_loop
my_loop = asyncio.new_event_loop() # Creates a new event loop in the current thread
asyncio.set_event_loop(my_loop) # Sets it as the default loop for this thread
# set_event_loop implementation
def set_event_loop(self, loop):
"""Associate an event loop with the current thread."""
self._local._set_called = True # Track that a loop has been explicitly set
assert loop is None or isinstance(loop, AbstractEventLoop)
self._local._loop = loop # Store the loop reference
Key points:
- These methods are useful for manual event loop management in custom threading scenarios. Always call
loop.close()when finished to prevent resource leaks. - Setting a loop doesn't start it—
events._get_running_loop()returnsNoneat this stage. - Avoid using
asyncio.run()when manually managing loops, as it will create its own loop and overwrite yours.
asyncio.get_event_loop
Retrieves the event loop for the current thread.
# Source code analysis:
## get_event_loop() ##
def get_event_loop():
# This function is implemented in C (_asynciomodule.c)
# First, try to get the running loop
current_loop = _get_running_loop()
if current_loop is not None:
return current_loop
# Fall back to the policy-based approach
return get_event_loop_policy().get_event_loop()
## get_event_loop_policy().get_event_loop() ##
def get_event_loop(self):
# Auto-create a loop only if: no existing loop, never set manually,
# and we're in the main thread
if (self._local._loop is None and
not self._local._set_called and
isinstance(threading.current_thread(), threading._MainThread)):
self.set_event_loop(self.new_event_loop())
# Raise error if still no loop found
if self._local._loop is None:
raise RuntimeError('There is no current event loop in thread %r.'
% threading.current_thread().name)
return self._local._loop
Key points:
- This method works regardless of whether the loop is actively running.
- In the main thread, if no loop has ever been set, it automatically creates one.
- After
asyncio.run()completes, callingasyncio.get_event_loop()will raise an exception because the loop has been closed.
asyncio.wait_for
Wraps a coroutine with timeout handling. The operation fails if it exceeds the specified duration.
# Source code analysis:
async def wait_for(fut, timeout, *, loop=None):
# Requires an existing event loop; won't create one automatically
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8,"
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
# No timeout constraint
if timeout is None:
return await fut
# Immediate timeout (zero or negative)
if timeout <= 0:
fut = ensure_future(fut, loop=loop)
if fut.done():
return fut.result()
fut.cancel()
raise exceptions.TimeoutError()
# Create a future to act as a timeout sentinel
waiter = loop.create_future()
# Schedule timeout callback
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
# Callback to release waiter when task completes
cb = functools.partial(_release_waiter, waiter)
# Wrap the coroutine in a future
fut = ensure_future(fut, loop=loop)
# Register completion callback
fut.add_done_callback(cb)
try:
# Wait for either completion or timeout
try:
await waiter
except exceptions.CancelledError:
fut.remove_done_callback(cb)
fut.cancel()
raise
if fut.done():
return fut.result()
else:
fut.remove_done_callback(cb)
# Send cancellation and wait for graceful shutdown
await _cancel_and_wait(fut, loop=loop)
raise exceptions.TimeoutError()
finally:
timeout_handle.cancel()
Key points:
- Requires an active event loop—does not instantiate one on its own.
- On timeout,
wait_forattempts to cancel the future and waits for it to terminate cleanly. - Only tracks the outer future; nested tasks created with
create_taskinside the coroutine must be managed separately. Neglecting them causes resource leaks. - In CPU-intensive sections, inserting
await asyncio.sleep(0)yields control, preventing other coroutines from starvation and allowingwait_forto cancel tasks promptly.
asyncio.wait
Accepts a collection of tasks and waits according to a configurable completion strategy. Returns two sets: done tasks and pending tasks.
# Source code analysis:
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
# Three completion strategies available:
# FIRST_COMPLETED: return when any task finishes
# FIRST_EXCEPTION: return when any task raises an exception
# ALL_COMPLETED: return when all tasks finish
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')
if loop is None:
loop = events.get_running_loop()
else:
warnings.warn("The loop argument is deprecated since Python 3.8,"
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
# Normalize all inputs to futures and schedule them
fs = {ensure_future(f, loop=loop) for f in set(fs)}
return await _wait(fs, timeout, return_when, loop)
asyncio.create_task
Encapsulates a coroutine into a Task object, automatically schedules it in the event loop, and returns the Task for awaiting.
# Source implementation:
def create_task(coro, *, name=None):
loop = events.get_running_loop()
task = loop.create_task(coro)
_set_task_name(task, name)
return task
Key points:
- Tasks are queued for execution but don't run immediately—they execute on the next event loop iteration. The delay is typically negligible.
asyncio.gather
Executes multiple coroutines or Futures concurrently and collects their results.
def gather(*coros_or_futures, loop=None, return_exceptions=False):
...
Key points:
- The
return_exceptionsparameter controls error handling behavior. - When
False, any task exception propagates to the caller and terminatesgatherimmediately. - When
True, exceptions are captured and returned as results, allowing all tasks to complete normally.