Understanding Asynchronous Concurrency in Python with asyncio
Processes, Threads, and Coroutines
- A process can contain multiple threads; at minimum, it houses one.
- A thread can accommodate multiple coroutines.
Comparison
- A process represents a resource allocation unit.
- A thread represents the basic unit of OS scheduling.
- Context switching for processes incurs the highest overhead and least efficiency.
- Thread switching overhead is moderate and offers reasonable performance (setting aside the GIL).
- Coroutine context switching is lightweight and highly efficient.
- Multiprocessing and multithreading may achieve parallelism depending on CPU cores, while coroutines within a single thread achieve concurrency.
Choosing between processes, threads, or coroutines depends on practical needs. Threads and coroutines are commonly favored due to low resource consumption, with coroutiens requiring the fewest resources.
Single Event Loop
import asyncio
async def fetch_page(url):
print("fetch init")
await asyncio.sleep(1.5)
print("fetch done")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start = loop.time()
loop.run_until_complete(fetch_page("https://example.com"))
print(loop.time() - start)
Concurrent Execution
import asyncio
async def fetch_page(url):
print("fetch init")
await asyncio.sleep(1.5)
print("fetch done")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
start = loop.time()
jobs = [fetch_page(f"url_{i}") for i in range(8)]
loop.run_until_complete(asyncio.wait(jobs))
print(loop.time() - start)
Retrieving Results from Coroutines
import asyncio
async def fetch_data(source):
print("fetch start")
await asyncio.sleep(0.8)
print("fetch end")
return f"result-{source}"
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
future = asyncio.ensure_future(fetch_data("server1"), loop=loop)
loop.run_until_complete(future)
print(future.result())
task = loop.create_task(fetch_data("server2"))
loop.run_until_complete(task)
print(task.result())
Using Callbacks
import asyncio
from functools import partial
async def fetch_data(source):
print("fetch start")
await asyncio.sleep(0.8)
print("fetch end")
return f"result-{source}"
def notify(host, future):
print(f"Notification for {host}")
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
task = loop.create_task(fetch_data("server1"))
task.add_done_callback(partial(notify, "server1"))
loop.run_until_complete(task)
print(task.result())
Distinction Between gather and wait
gather offers grouping capabilities and higher-level orchestration. Tasks can be organized into subsets and individually canceled when needed.
import asyncio
async def fetch_page(url):
print("start", url)
await asyncio.sleep(1.2)
print("end", url)
return url.upper()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
batch1 = [fetch_page(f"siteA/{i}") for i in range(4)]
batch2 = [fetch_page(f"siteB/{i}") for i in range(4)]
g1 = asyncio.gather(*batch1)
g2 = asyncio.gather(*batch2)
g1.cancel() # cancel the entire first group
loop.run_until_complete(asyncio.gather(g1, g2, return_exceptions=True))
loop.close()
Task Cancellation
import asyncio
async def delayed_task(duration):
print("awaiting")
await asyncio.sleep(duration)
print(f"completed after {duration}s")
if __name__ == '__main__':
jobs = [delayed_task(2), delayed_task(3), delayed_task(3)]
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(asyncio.wait(jobs))
except KeyboardInterrupt:
for t in asyncio.all_tasks(loop=loop):
t.cancel()
loop.run_until_complete(asyncio.gather(*asyncio.all_tasks(loop=loop), return_exceptions=True))
finally:
loop.close()
Additional Loop Utilities
import asyncio
def after_delay(seconds):
print(f"Triggered after {seconds}s")
def stop_loop(loop):
loop.stop()
if __name__ == '__main__':
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.call_later(2, after_delay, 2)
loop.call_soon(after_delay, 1)
current = loop.time()
loop.call_at(current + 3, after_delay, 3)
loop.call_soon(stop_loop, loop)
loop.run_forever()
loop.close()
Reference: https://juejin.cn/post/6971037591952949256