Skip to content

CH03: A first asyncio application

References

Generators and Iterators

  • An iterable implements __iter__ which should return an iterator.
  • An iterator implements __next__.
  • An iterator is also an iterable because it implements __iter__ which returns itself.
  • A generator is a convenient way to create iterators from functions. Every function with a yield statement is considered a generator.
  • You can think that Python wraps the function around a class that implements the iterator methods.
  • An important thing about generators (and consequently, about iterators too) is that they can hold state. That means that they can suspend their execution and restart later.
  • PEP-255 introduced generators in Python.
Note

Coroutines were mentioned in the motivation of PEP-255.

Generators as coroutines

The idea is to have a loop iterating over scheduled tasks that once in a while will be suspended and yield control back to the loop. In the example below, the run_server task is added in the top of the queue (line 28).

The run_server task starts execution when next is called for the first time (line 6). It executes until the yield instruction when it is suspended. The trick is to register a listener on the socket (line 11) such that selector can sign whenever the socket is ready. When that happens, we add the task in the queue again (line 17).

Eventually the next method is called for the task and the task restart from where it was suspended.

class Loop:
    def run(self):
        if self.tasks_to_run:
            task = self.task_queue.popleft()
            try:
                op, arg = next(task)
            except StopIteration:
                continue

            if op=="wait_read":
                self.selector.register(arg, EVENT_READ, task)
        else:
            for key, _ in self.selector.select():
                task = key.data
                sock = key.fileobj
                self.selector.unregister(sock)
                self.create_task(task)


def run_server():
    sock = Socket()
    sock.listen()
    while True:
        yield "wait_read", sock 
        client_sock,addr = sock.accept()

if __name__=="__main__":
    loop.create_task(run_server())
    loop.run()

Not really a coroutine yet

In order to be considered a coroutine, run_server should also be able to yield execution control to another coroutine. We could do that while still using generator primitives, but the code becomes a bit convoluted.

def sock_accept(sock):
    yield "wait_read", sock 
    return sock.accept()

def run_server():
    sock = Socket()
    sock.listen()
    while True:
        try:
            op, arg = sock_accept()
            yield op, arg
        except StopIteration as ex:
            client_sock, addr = ex.value

A first attempt to solve the issue was to enhance generator in PEP-342. The PEP-342 promoted yield statement to yield expression; and introduced the send and throw builtin functions that accept generators.

def g():
    print("Start executing g")
    a = yield 1
    print("Value of a is:", a)
    b = yield 2
    print("Value of b is:", b)
    return 42


def main():
    print("Create generator")
    G = g()
    print("Calling next")
    c = next(G)
    print("Value of c is", c)
    d = G.send(10)
    print("Value of d is", d)
    try:
        G.send(20)
    except StopIteration as ex:
        print("Value of e is", ex.value)


main()

# Create generator
# Calling next
# Start executing g
# Value of c is 1
# Value of a is: 10
# Value of d is 2
# Value of b is: 20
# Value of e is 42

The function send extends next in the sense that it does the same thing as next and more. Besides the next operation, it allows the caller to send a value back to the generator. That means that when the generator restart (for example whenever next or send is called again), the first thing that will be done is an assignment operation (the value passed to send will be the return value of the yield expression in the generator).

Caller and Generator. Send and Yield

Important

Notice that after creating the generator we must call next or send(None). It is an error to call send with a parameter. That's because the generator function was never started, meaning that there is no yield statement previously executed to return something.

The last missing component is yield from. It allows a generator to behave like a proxy to another generator. A generator using yield from G yield values from G and pass all values communicated via send and throw to G.

Note

See PEP-380 for more details.

def a(i):
    print("**********Start A***********")
    a_1 = yield i*10
    print("a_1:", a_1)
    a_2 = yield i*100
    print("a_2:", a_2)
    return f"A{i}"


def b(i):
    print("**********Start B***********")
    b_1 = yield from a(i)
    print("b_1:", b_1)
    return f"B{i}"


def c(i):
    print("**********Start C***********")
    c_1 = yield from b(i)
    print("c_1:", c_1)
    return f"C{i}"


def main():
    C = c(1)
    m_0 = next(C)
    print("m_0 value is", m_0)

    print("SEND: ", 1)
    m_1 = C.send(1)
    print("m_1 value is", m_1)

    try:
        print("SEND: ", 2)
        C.send(2)
    except StopIteration as ex:
        print("m_2 value is", ex.value)


main()

#**********Start C***********
#**********Start B***********
#**********Start A***********
#m_0 value is 10
#SEND:  1
#a_1: 1
#m_1 value is 100
#SEND:  2
#a_2: 2
#b_1: A1
#c_1: B1
#m_2 value is C1

async and await

We have all the pieces to have coroutines in Python, but to make a distinction between a coroutine and a generator, PEP-492 introduces async and await keywords.

When you define a function with async def f, f() returns a native coroutine object. A native coroutine object is pretty much the same thing as a generator (it does not implement a __next__ method, but an __await__ instead).

The await keyword does exactly what yield from does but for native coroutines.

Danger

When you use generators as coroutines, you must end every chain of yield from calls with a generator that does yield. Similarly, you must end every chain of await calls with a yield expression. However, if you try to use a yield expression in an async def function, what you'll get is not a native coroutine but something called an asynchronous generator. The final coroutine in the chain should be labeled with @types.coroutine

>>> import types
>>> @types.coroutine
... def gen_coro():
...     yield 3
... 
>>> async def coro3():
...     await gen_coro()
... 
>>> coro3().send(None)
3

The asyncio event loop protocol for tasks

  1. Coroutines must await other awaitables (coroutines or futures) or yield a value (if properly labeled with @types.coroutine).
  2. Coroutines must yields Future or None. Any other type is considered an error.
    1. If it yields None, a new task is rescheduled to run in the event loop.
    2. Otherwise adds a callback to when the future is done executing. The callback will reschedule the task to run once again to collect the result of the future.
async def future_waiter():
    res = await some_future


class Future:
    # ...

    def __await__(self):
        if not self.done():
            self._asyncio_future_blocking = True
            yield self  # This tells Task to wait for completion.
        if not self.done():
            raise RuntimeError("await wasn't used with future")
        return self.result()  # May raise too.

Full commented example

"""
Start delay of 2 seconds
Start delay of 1 seconds
End delay of 1 seconds
End delay of 2 seconds
"""

import asyncio

async def delay(seconds: int):
    print(f"Start delay of {seconds} seconds")
    await asyncio.sleep(seconds)
    print(f"End delay of {seconds} seconds")


async def main():
    tasks = [asyncio.create_task(delay(s)) for s in range(2, 0, -1)]
    [await t for t in tasks]


if __name__ == "__main__":
    asyncio.run(main())

To follow the example, keep in mind the relationship between coroutine, task and future.

flowchart BT
    Coroutine --> Awaitable
    Future --> Awaitable 
    Task --> Future

"""
Wraps the coroutine returned by `main()` into a Task. At this point, the event
loop is created and a first iteration is executed.
"""
asyncio.run(main())

"""
A new Task object is created and immediately scheduled to run.

Scheduling is done in the `__init__` method of Task by calling the event loop
method `call_soon`

The method `_step` of the Task is passed as the argument callback of
`call_soon`. The event loop will call this callback during its execution.

Notice that the current running task is main, but the delay tasks are scheduled
to be run in the next iteration of the event loop.
"""
asyncio.create_task(delay(s))

"""
Recall that event loop is currently executing the main task. More precisely,
the `_step` method of the main Task.

In the `_step` method we have the code: result = coro.send(None)

That means: Execute until find a `yield` statement. Remember that `await` is
implemented as `yield from` statement.

Therefore, code is executed until the instruction `await t`.

At this point, we start executing the task t. Remember that `t` is also
scheduled to run in the event loop. This is not a problem. Eventually, when the
event loop selects `t` to run it will realize that the task is complete already
and will ignore it.

The `await t` instruction calls one of the delay coroutines.
"""
await t 

"""
The asyncio.sleep creates a future that will be completed after the given delay
in seconds.

In the asyncio.sleep we have: return await future 

All futures have the same __await__ implementation. They are going to simply
yield itself if the future is not complete.

That means that we finally arrive at an yield statement. Let's recall the
calls.
"""
await asyncio.sleep(seconds)
asyncio.run(main())
    self._ready.popleft()._run()                        # event_loop:_run_once 
        result=coro.send(None)                          # [main] Task:_step
            t2=create_task(delay(2))                    # main 
            t1=create_task(delay(1))
            await t2
                F = self 
                yield F                                 # Future:__await__
********Sets a done callback when the future F is complete 
        The callback is the [main] Task:_step itself.

        The future yielded here is the Task t2 (delay_2). In the 
        next iteration of the event loop, the t2 task is executed.
        Eventually, this task will be finished and `super().set_result()`
        will be called (look at tasks:_step method).

        This is what happens the next time [main] Taks:_step is called

        result=coro.send(None)                          # event_loop:_run_once 
            await t2                                    # [main] Task:_step 
                return self.result()                    # Future:__await__

        The coroutine (generator) is exhausted, and await 
        finally returns.

        This iteration of the event loop is over
********
        self._ready.popleft()._run()                    # event_loop:_run_once 
            result=coro.send(None)                      # [t2] Task:_step 
                await asyncio.sleep                     # delay 
                    loop.call_later                     # asyncio.sleep
                    return await future             
                        yield self                      # Future:__await__
********Sets a done callback when the future F is complete 
        The callback is the [t2] Task:_step itself.
********
        self._ready.popleft()._run()                    # event_loop:_run_once 
            result=coro.send(None)                      # [t1] Task:_step 
                await asyncio.sleep                     # delay 
                    loop.call_later                     # asyncio.sleep
                    return await future             
                        yield self                      # Future:__await__
********Sets a done callback when the future F is complete 
        The callback is the [t1] Task:_step itself.

        This iteration of the event loop is over
********

The core.send(None) triggers all the sequence of await up to the yield self statement in future:__await__.

The _step method expects a Future, a None (bare yield) object or that coro.send(None) raises a StopIteration exception.

If it is a Future, it sets a done callback which is called when the future is completed. The callback is the _step method of the Task itself, which is executed once more as soon as the set_result method of Future is called.

If it is a None object, it reschedules the Task to be run in the next event loop iteration (call_soon).

If we have a StopIteration exception, this indicates that the Task has finished. We set its result (recall that a Task is a Future)

Coming back to the example above. Within asyncio:sleep, the call loop.call_later schedules a callback to be run by the first iteration of the event loop after a specific amount of time (the parameter passed to delay). This sole goal of this callback is to complete the future.

Therefore:

  1. Task:_step:result holds a Future.
  2. The Future is completed after delay seconds.
  3. This triggers the done callback of the future, which calls Taks:_step once again.
  4. result=core.send(None) raises a StopIteration because the sequence of yield is exhausted.
  5. The await t is completed, and the next instruction is executed.
"""
START LOOP
    RUN:  <TaskStepMethWrapper object at 0x791cefeaac10>
    Create Task:  delay_2
    Create Task:  delay_1
    Start Awaiting d2
END LOOP
START LOOP
    RUN:  <TaskStepMethWrapper object at 0x791cefeaaca0>
    Start delay of 2 seconds
Sleep
    RUN:  <TaskStepMethWrapper object at 0x791cefeaadc0>
    Start delay of 1 seconds
    Sleep
END LOOP
START LOOP
    RUN:  <function _set_result_unless_cancelled at 0x791ceff8baf0>
END LOOP
START LOOP
    RUN:  <TaskWakeupMethWrapper object at 0x791cefeaaeb0>
    End delay of 1 seconds
END LOOP
START LOOP
    RUN:  <function _set_result_unless_cancelled at 0x791ceff8baf0>
END LOOP
START LOOP
    RUN:  <TaskWakeupMethWrapper object at 0x791cefeaae50>
    End delay of 2 seconds
END LOOP
START LOOP
    RUN:  <TaskWakeupMethWrapper object at 0x791cefeaadf0>
    Finished awaiting d2
    Start Awaiting d1
    Finished awaiting d1
END LOOP
START LOOP
    RUN:  <function _run_until_complete_cb at 0x791cefee8700>
END LOOP
START LOOP
    RUN:  <TaskStepMethWrapper object at 0x791cefeaac70>
END LOOP
START LOOP
    RUN:  <function _run_until_complete_cb at 0x791cefee8700>
END LOOP
"""