Fading Coder

One Final Commit for the Last Sprint

Home > Tech > Content

Understanding Asynchronous Concurrency in Python with asyncio

Tech 1

Processes, Threads, and Coroutines

  • A process can contain multiple threads; at minimum, it houses one.
  • A thread can accommodate multiple coroutines.

Comparison

  1. A process represents a resource allocation unit.
  2. A thread represents the basic unit of OS scheduling.
  3. Context switching for processes incurs the highest overhead and least efficiency.
  4. Thread switching overhead is moderate and offers reasonable performance (setting aside the GIL).
  5. Coroutine context switching is lightweight and highly efficient.
  6. Multiprocessing and multithreading may achieve parallelism depending on CPU cores, while coroutines within a single thread achieve concurrency.

Choosing between processes, threads, or coroutines depends on practical needs. Threads and coroutines are commonly favored due to low resource consumption, with coroutiens requiring the fewest resources.

Single Event Loop

import asyncio


async def fetch_page(url):
    print("fetch init")
    await asyncio.sleep(1.5)
    print("fetch done")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    start = loop.time()
    loop.run_until_complete(fetch_page("https://example.com"))
    print(loop.time() - start)

Concurrent Execution

import asyncio


async def fetch_page(url):
    print("fetch init")
    await asyncio.sleep(1.5)
    print("fetch done")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    start = loop.time()
    jobs = [fetch_page(f"url_{i}") for i in range(8)]
    loop.run_until_complete(asyncio.wait(jobs))
    print(loop.time() - start)

Retrieving Results from Coroutines

import asyncio


async def fetch_data(source):
    print("fetch start")
    await asyncio.sleep(0.8)
    print("fetch end")
    return f"result-{source}"


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    future = asyncio.ensure_future(fetch_data("server1"), loop=loop)
    loop.run_until_complete(future)
    print(future.result())

    task = loop.create_task(fetch_data("server2"))
    loop.run_until_complete(task)
    print(task.result())

Using Callbacks

import asyncio
from functools import partial


async def fetch_data(source):
    print("fetch start")
    await asyncio.sleep(0.8)
    print("fetch end")
    return f"result-{source}"


def notify(host, future):
    print(f"Notification for {host}")


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    task = loop.create_task(fetch_data("server1"))
    task.add_done_callback(partial(notify, "server1"))
    loop.run_until_complete(task)
    print(task.result())

Distinction Between gather and wait

gather offers grouping capabilities and higher-level orchestration. Tasks can be organized into subsets and individually canceled when needed.

import asyncio


async def fetch_page(url):
    print("start", url)
    await asyncio.sleep(1.2)
    print("end", url)
    return url.upper()


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    batch1 = [fetch_page(f"siteA/{i}") for i in range(4)]
    batch2 = [fetch_page(f"siteB/{i}") for i in range(4)]

    g1 = asyncio.gather(*batch1)
    g2 = asyncio.gather(*batch2)
    g1.cancel()  # cancel the entire first group

    loop.run_until_complete(asyncio.gather(g1, g2, return_exceptions=True))
    loop.close()

Task Cancellation

import asyncio


async def delayed_task(duration):
    print("awaiting")
    await asyncio.sleep(duration)
    print(f"completed after {duration}s")


if __name__ == '__main__':
    jobs = [delayed_task(2), delayed_task(3), delayed_task(3)]
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        loop.run_until_complete(asyncio.wait(jobs))
    except KeyboardInterrupt:
        for t in asyncio.all_tasks(loop=loop):
            t.cancel()
        loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop=loop), return_exceptions=True))
    finally:
        loop.close()

Additional Loop Utilities

import asyncio


def after_delay(seconds):
    print(f"Triggered after {seconds}s")


def stop_loop(loop):
    loop.stop()


if __name__ == '__main__':
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    loop.call_later(2, after_delay, 2)
    loop.call_soon(after_delay, 1)
    current = loop.time()
    loop.call_at(current + 3, after_delay, 3)
    loop.call_soon(stop_loop, loop)
    loop.run_forever()
    loop.close()

Reference: https://juejin.cn/post/6971037591952949256

Related Articles

Understanding Strong and Weak References in Java

Strong References Strong reference are the most prevalent type of object referencing in Java. When an object has a strong reference pointing to it, the garbage collector will not reclaim its memory. F...

Comprehensive Guide to SSTI Explained with Payload Bypass Techniques

Introduction Server-Side Template Injection (SSTI) is a vulnerability in web applications where user input is improper handled within the template engine and executed on the server. This exploit can r...

Implement Image Upload Functionality for Django Integrated TinyMCE Editor

Django’s Admin panel is highly user-friendly, and pairing it with TinyMCE, an effective rich text editor, simplifies content management significantly. Combining the two is particular useful for bloggi...

Leave a Comment

Anonymous

◎Feel free to join the discussion and share your thoughts.