CH03: A first asyncio application
References
- Python Concurrency with asyncio live book
- Ten Thousand meters: asyncio
- runners:_run_once
- tasks:_step
- runners:run
- base_events:create_task
- asyncio.sleep
- loop.sock_recv
Generators and Iterators
- An iterable implements
__iter__which should return an iterator. - An iterator implements
__next__. - An
iteratoris also aniterablebecause it implements__iter__which returns itself. - A generator is a convenient way to create iterators
from functions. Every function with a
yieldstatement is considered a generator. - You can think that Python wraps the function around a class that implements the iterator methods.
- An important thing about
generators(and consequently, aboutiteratorstoo) is that they can hold state. That means that they can suspend their execution and restart later. - PEP-255 introduced generators in Python.
Note
Coroutines were mentioned in the motivation of PEP-255.
Generators as coroutines
The idea is to have a loop iterating over scheduled tasks that
once in a while will be suspended and yield control back to the
loop. In the example below, the run_server task is added in
the top of the queue (line 28).
The run_server task starts execution when next is called for the
first time (line 6). It executes until the yield instruction when
it is suspended. The trick is to register a listener on the socket
(line 11) such that selector can sign whenever the socket is ready.
When that happens, we add the task in the queue again (line 17).
Eventually the next method is called for the task and the task
restart from where it was suspended.
Not really a coroutine yet
In order to be considered a coroutine, run_server should also be able
to yield execution control to another coroutine. We could do that while
still using generator primitives, but the code becomes a bit convoluted.
A first attempt to solve the issue was to enhance generator in PEP-342.
The PEP-342 promoted yield statement to yield expression; and introduced
the send and throw builtin functions that accept generators.
The function send extends next in the sense that it does the same thing as next and more.
Besides the next operation, it allows the caller to send a value back to the generator.
That means that when the generator restart (for example whenever next or send is called again),
the first thing that will be done is an assignment operation (the value passed to send
will be the return value of the yield expression in the generator).
Important
Notice that after creating the generator we must call next or send(None). It is an
error to call send with a parameter. That's because the generator function was never
started, meaning that there is no yield statement previously executed to return
something.
The last missing component is yield from. It allows a generator to behave like a proxy to
another generator. A generator using yield from G yield values from G and pass all values
communicated via send and throw to G.
Note
See PEP-380 for more details.
async and await
We have all the pieces to have coroutines in Python, but to make a distinction between a coroutine
and a generator, PEP-492 introduces async and await keywords.
When you define a function with async def f, f() returns a native coroutine object. A native
coroutine object is pretty much the same thing as a generator (it does not implement a __next__ method,
but an __await__ instead).
The await keyword does exactly what yield from does but for native coroutines.
Danger
When you use generators as coroutines, you must end every chain of yield
from calls with a generator that does yield. Similarly, you must end every
chain of await calls with a yield expression. However, if you try to use a
yield expression in an async def function, what you'll get is not a native
coroutine but something called an asynchronous generator. The final coroutine
in the chain should be labeled with @types.coroutine
The asyncio event loop protocol for tasks
- Coroutines must await other
awaitables(coroutines or futures) or yield a value (if properly labeled with@types.coroutine). - Coroutines must yields Future or None. Any other type is considered an error.
- If it yields
None, a new task is rescheduled to run in the event loop. - Otherwise adds a callback to when the future is done executing. The callback will reschedule the task to run once again to collect the result of the future.
- If it yields
Full commented example
To follow the example, keep in mind the relationship between coroutine,
task and future.
flowchart BT
Coroutine --> Awaitable
Future --> Awaitable
Task --> Future
"""
Wraps the coroutine returned by `main()` into a Task. At this point, the event
loop is created and a first iteration is executed.
"""
asyncio.run(main())
"""
A new Task object is created and immediately scheduled to run.
Scheduling is done in the `__init__` method of Task by calling the event loop
method `call_soon`
The method `_step` of the Task is passed as the argument callback of
`call_soon`. The event loop will call this callback during its execution.
Notice that the current running task is main, but the delay tasks are scheduled
to be run in the next iteration of the event loop.
"""
asyncio.create_task(delay(s))
"""
Recall that event loop is currently executing the main task. More precisely,
the `_step` method of the main Task.
In the `_step` method we have the code: result = coro.send(None)
That means: Execute until find a `yield` statement. Remember that `await` is
implemented as `yield from` statement.
Therefore, code is executed until the instruction `await t`.
At this point, we start executing the task t. Remember that `t` is also
scheduled to run in the event loop. This is not a problem. Eventually, when the
event loop selects `t` to run it will realize that the task is complete already
and will ignore it.
The `await t` instruction calls one of the delay coroutines.
"""
await t
"""
The asyncio.sleep creates a future that will be completed after the given delay
in seconds.
In the asyncio.sleep we have: return await future
All futures have the same __await__ implementation. They are going to simply
yield itself if the future is not complete.
That means that we finally arrive at an yield statement. Let's recall the
calls.
"""
await asyncio.sleep(seconds)
asyncio.run(main())
self._ready.popleft()._run() # event_loop:_run_once
result=coro.send(None) # [main] Task:_step
t2=create_task(delay(2)) # main
t1=create_task(delay(1))
await t2
F = self
yield F # Future:__await__
********Sets a done callback when the future F is complete
The callback is the [main] Task:_step itself.
The future yielded here is the Task t2 (delay_2). In the
next iteration of the event loop, the t2 task is executed.
Eventually, this task will be finished and `super().set_result()`
will be called (look at tasks:_step method).
This is what happens the next time [main] Taks:_step is called
result=coro.send(None) # event_loop:_run_once
await t2 # [main] Task:_step
return self.result() # Future:__await__
The coroutine (generator) is exhausted, and await
finally returns.
This iteration of the event loop is over
********
self._ready.popleft()._run() # event_loop:_run_once
result=coro.send(None) # [t2] Task:_step
await asyncio.sleep # delay
loop.call_later # asyncio.sleep
return await future
yield self # Future:__await__
********Sets a done callback when the future F is complete
The callback is the [t2] Task:_step itself.
********
self._ready.popleft()._run() # event_loop:_run_once
result=coro.send(None) # [t1] Task:_step
await asyncio.sleep # delay
loop.call_later # asyncio.sleep
return await future
yield self # Future:__await__
********Sets a done callback when the future F is complete
The callback is the [t1] Task:_step itself.
This iteration of the event loop is over
********
The core.send(None) triggers all the sequence of await up to the yield
self statement in future:__await__.
The _step method expects a Future, a None (bare yield) object or that
coro.send(None) raises a StopIteration exception.
If it is a Future, it sets a done callback which is called when the future is
completed. The callback is the _step method of the Task itself, which is
executed once more as soon as the set_result method of Future is called.
If it is a None object, it reschedules the Task to be run in the next event
loop iteration (call_soon).
If we have a StopIteration exception, this indicates that the Task has
finished. We set its result (recall that a Task is a Future)
Coming back to the example above. Within asyncio:sleep, the call
loop.call_later schedules a callback to be run by the first iteration of the
event loop after a specific amount of time (the parameter passed to delay).
This sole goal of this callback is to complete the future.
Therefore:
Task:_step:resultholds aFuture.- The
Futureis completed afterdelayseconds. - This triggers the done callback of the future, which calls
Taks:_steponce again. result=core.send(None)raises aStopIterationbecause the sequence ofyieldis exhausted.- The
await tis completed, and the next instruction is executed.
"""
START LOOP
RUN: <TaskStepMethWrapper object at 0x791cefeaac10>
Create Task: delay_2
Create Task: delay_1
Start Awaiting d2
END LOOP
START LOOP
RUN: <TaskStepMethWrapper object at 0x791cefeaaca0>
Start delay of 2 seconds
Sleep
RUN: <TaskStepMethWrapper object at 0x791cefeaadc0>
Start delay of 1 seconds
Sleep
END LOOP
START LOOP
RUN: <function _set_result_unless_cancelled at 0x791ceff8baf0>
END LOOP
START LOOP
RUN: <TaskWakeupMethWrapper object at 0x791cefeaaeb0>
End delay of 1 seconds
END LOOP
START LOOP
RUN: <function _set_result_unless_cancelled at 0x791ceff8baf0>
END LOOP
START LOOP
RUN: <TaskWakeupMethWrapper object at 0x791cefeaae50>
End delay of 2 seconds
END LOOP
START LOOP
RUN: <TaskWakeupMethWrapper object at 0x791cefeaadf0>
Finished awaiting d2
Start Awaiting d1
Finished awaiting d1
END LOOP
START LOOP
RUN: <function _run_until_complete_cb at 0x791cefee8700>
END LOOP
START LOOP
RUN: <TaskStepMethWrapper object at 0x791cefeaac70>
END LOOP
START LOOP
RUN: <function _run_until_complete_cb at 0x791cefee8700>
END LOOP
"""