Chapter 13
asyncio

In general, most Python applications are sequential applications. That is, they usually run from a defined entry point to a defined exit point, with each execution being a single process from beginning to end.

This stands in contrast to many more asynchronous languages, such as JavaScript and Go. For example, JavaScript relies heavily on asynchronous work, with any web requests happening in the background being called in a separate thread, and relying on callbacks to run correct functions once data has loaded.

There is no right or wrong answer to whether a language should approach most problems sequentially or asynchronously, but cases certainly exist where one model is more useful than the other for particular problems. This is where the asyncio module comes in. It makes it easy to do asynchronous work in Python when the problem warrants it.

Right now, asyncio is a provisional module. While sweeping, backward-incompatible changes are unlikely (because Python shies away from such things once items have been placed in the standard library), it is likely that asyncio may undergo significant revision in the next couple of Python versions.

The asyncio module was introduced in Python 3.4, and is not available in Python 2. If you are on Python 3.3, you can get it from PyPI; it is not yet in the standard library. Therefore, if you want to use the features provided by asyncio, you will be limiting yourself to newer versions of Python. Similarly, the asyncio module has been under active development over the lifetime of Python 3.4, so you will want to be on the newest incremental revision if possible.

Because most Python applications are sequential applications, several concepts may be foreign to you if you have not done a reasonable amount of work outside of sequential languages. This chapter covers these concepts in detail.

The Event Loop

The fundamental way that most asynchronous applications work is via an event loop that runs in the background. When something needs to run, it is registered to the event loop.

Registering a function to an event loop causes it to be made into a task. The event loop is then responsible for running the task as soon as it can get to it. Alternatively, sometimes the event loop is told to wait a certain amount of time, and then run the task.

Although you may not be familiar with writing code that uses event loops, you use programs that depend on them frequently. Almost any server is an event loop. A database server, for example, sits around and waits for connections and queries, and then executes queries as fast as possible. If two different connections provide two different queries, it prioritizes and runs both of them. Desktop applications are also event-driven, displaying a screen that allows input in various places and responding to said inputs. Most video games are also event loops. The game waits for control input and takes action based on it.

A Simple Event Loop

In most cases, you do not need to create an event loop object yourself. You can get a BaseEventLoop object by using the asyncio.get_event_loop() function. What you will actually get will be a subclass; which subclass you get is platform-dependent. You do not need to worry about this implementation detail too much. The API between all of them is the same. However, a few platform-dependent limitations exist.

When you first get the loop object, it will not be running.

>>> loop = asyncio.get_event_loop()
>>> loop.is_running()
False

Running the Loop

The following event loop does not have anything registered to it yet, but you can run it anyway:

>>> loop.run_forever()

There is one minor hitch, however. If you ran this, you just lost control of your Python interpreter, because the loop is running in it forever. Press Ctrl+C to get your interpreter back. (Of course, this will stop the loop.)

Unfortunately, asyncio does not have a “fire and forget” method to run a loop in a separate thread. For most application code, this is actually not a huge hindrance, because you are probably writing a server or daemon where the purpose of the program is to run the loop in the foreground and have other processes issue commands.

For testing or experimenting, however, this presents a serious challenge, because the majority of asyncio methods are not actually thread-safe. For most examples in this chapter, you will get around this by simply not running the loop forever.

Registering Tasks and Running the Loop

Tasks are primarily registered to the loop using call_soon, which operates as a FIFO (“first in, first out”) queue. Therefore, most examples in this chapter will simply include a final task that stops the loop, as shown here:

>>> import functools
>>> def hello_world():
...     print('Hello world!')
...
>>> def stop_loop(loop):
...     print('Stopping loop.')
...     loop.stop()
...
>>> loop.call_soon(hello_world)
Handle(<function hello_world at 0x1003c0b70>, ())
>>> loop.call_soon(functools.partial(stop_loop, loop))
Handle(functools.partial(<function stop_loop at 0x101ccf268>,
     <asyncio.unix_events._UnixSelectorEventLoop
     object at 0x1007399e8>), ())
>>> loop.run_forever()
Hello world!
Stopping loop.
>>> 

In this example, the hello_world function was registered to the loop. Then, the stop_loop function was also registered. When the loop was started (with loop.run_forever()), it ran both tasks, in order. Because the second task stopped the loop, it exited the loop once the task completed.

Delaying Calls

It is also possible to register a task, but indicate that it should not be called until later. You can do this using the call_later method, which takes a delay (in number of seconds) as well as the function to be called.

>>> loop.call_later(10, hello_world)
TimerHandle(60172.411042585, <function hello_world at 0x1003c0b70>, ())
>>> loop.call_later(20, functools.partial(stop_loop, loop))
TimerHandle(60194.829461844, functools.partial(
     <function stop_loop at 0x101ccf268>,
     <asyncio.unix_events._UnixSelectorEventLoop object at 0x1007399e8>),
     ())
>>> loop.run_forever()

Note that it is possible to have two or more delayed calls come up at the same time. If this happens, they may occur in either order.

Partials

You may have also noticed the use of functools.partial in the previous example. Most asyncio methods that take functions only take function objects (or other callables), but not arguments to be sent to those functions once they are called. The functools.partial method is a solution to that problem. The partial method itself takes the arguments and keyword arguments that must be passed to the underlying function when it is called.

For instance, the hello_world function in the previous example is actually entirely unnecessary. It is an analogue to functools.partial(print, 'Hello world!'). Therefore, the previous example could be written as follows:

>>> import functools
>>> def stop_loop(loop):
...     print('Stopping loop.')
...     loop.stop()
...
>>> loop.call_soon(functools.partial(print, 'Hello world! ')
Handle(functools.partial(<built-in function print>, 'Hello world'), ())
>>> loop.call_soon(functools.partial(stop_loop, loop))
Handle(functools.partial(<function stop_loop at 0x101ccf268>,
     <asyncio.unix_events._UnixSelectorEventLoop object
     at 0x1007399e8>), ())
>>> loop.run_forever()
Hello world!
Stopping loop.
>>> 

Why have partials at all? After all, it is usually easy enough to wrap such calls in functions that do not require arguments. The answer is in debugging. The partial object knows what it is calling and with what arguments. This is represented as data to the partial, and the partial uses that data when called to perform the proper function call. By contrast, the hello_world function is just that: a function. The function call within it is code. There is no way to easily inspect the hello_world function and pull out the underlying call.

You can see this difference by creating a partial and then inspecting its underlying function and arguments.

>>> partial = functools.partial(stop_loop, loop)
>>> partial.func
<function stop_loop at 0x10223e488>
>>> partial.args
(<asyncio.unix_events._UnixSelectorEventLoop object at 0x102238b70>,)

Running the Loop until a Task Completes

It is also possible to run the loop until a task completes, as shown here:

>>> @asyncio.coroutine
... def trivial():
...     return 'Hello world!'
...
>>> loop.run_until_complete(trivial())
'Hello world!'

In this example, the @asyncio.coroutine decorator transforms this normal Python function into a coroutine, which is covered in more detail later. When you call run_until_complete, it registers the task and then runs the loop only until the task completes. Because it is the only task in the queue, it completes and exits the loop, returning the result of that task.

Running a Background Loop

It is possible to run an event loop in the background, using the threading module that is available in the Python standard library.

>>> import asyncio
>>> import threading
>>>
>>> def run_loop_forever_in_background(loop):
...     def thread_func(l):
...         asyncio.set_event_loop(l)
...         l.run_forever()
...     thread = threading.Thread(target=thread_func, args=(loop,))
...     thread.start()
...     return thread
...
>>>
>>> loop = asyncio.get_event_loop()
>>> run_loop_forever_in_background(loop)
<Thread(Thread-1, started 4344254464)>
>>>
>>> loop.is_running()
True

Note that this is a useful idiom for getting started, but is almost certainly not what you will want in your final application. (For example, you will have a hard time stopping the loop; loop.stop does not work anymore.) It is fine for learning, though.

This loop is still relatively uninteresting. After all, while it is running, it has nothing to do. You have not registered any tasks to it yet. Consider what happens when you register a trivial task to run as soon as possible.

>>> loop.call_soon_threadsafe(functools.partial(print, 'Hello world'))
Handle(functools.partial(<built-in function print>, 'Hello world'), ())
>>> Hello world

This output might be a bit confusing. First, you called call_soon_threadsafe. This tells the loop to run the given function asynchronously as soon as possible. Note that, in most cases, you will simply use the call_soon function, because you will not be running the event loop in a thread.

The call_soon_threadsafe function returns a Handle object. This is an object with one method: cancel. It is able to cancel the task entirely if appropriate.

Next, you have the >>> prompt (suggesting that the interpreter expects input), followed by Hello world. That was printed from the previous function call, after the prompt was written to the screen.

Because event loops are not thread safe, the remainder of the examples in this chapter use other models to explain the concepts.

Coroutines

Most functions that are used within asyncio should be coroutines. A coroutine is a special kind of function designed to run within an event loop. Additionally, if a coroutine is created but is never run, an error will be issued to the logs.

You can make a function into a coroutine by decorating it with @asyncio.coroutine. Consider this example of running a simple coroutine with the event handler's run_until_complete:

>>> import asyncio
>>> @asyncio.coroutine
... def coro_sum(*args):
...     answer = 0
...     for i in args:
...         answer += i
...     return answer
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(coro_sum(1, 2, 3, 4, 5))
15

The coro_sum function created here is no longer a regular function; it is a coroutine, and it is called by the event loop. It is worth noting that you can no longer call it the regular way and get what you may expect.

>>> coro_sum(1, 2, 3, 4, 5)
<generator object coro at 0x104056e10>

Coroutines are, in fact, special generators that are consumed by the event loop. That is why the run_until_complete method is able to take what appears to be a standard function call. The function is not actually run at that point. The event loop is what consumes the generator and ultimately extracts the result.

What actually happens under the hood essentially looks like this:

>>> try:
...     next(coro_sum(1, 2, 3, 4, 5))
... except StopIteration as ex:
...     ex.value
...
15

The generator does not yield any values. It immediately raises StopIteration. The StopIteration exception is given a value, which is the return value of the function. The event loop is then able to extract this and handle it appropriately.

Nested Coroutines

Coroutines provide a special mechanism to call other coroutines (or Future instances, as discussed shortly) in a fashion that mimics that of sequential programming. By using the yield from statement, a coroutine can run another coroutine, and the statement returns the result. This is one mechanism available to write asynchronous code in a sequential manner.

The following simple coroutine calls another coroutine using yield from:

>>> import asyncio
>>> @asyncio.coroutine
... def nested(*args):
...     print('The 'nested' function ran with args: %r' % (args,))
...     return [i + 1 for i in args]
...
>>> @asyncio.coroutine
... def outer(*args):
...     print('The 'outer' function ran with args: %r' % (args,))
...     answer = yield from nested(*[i * 2 for i in args])
...     return answer
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(outer(2, 3, 5, 8))
The 'outer' function ran with args: (2, 3, 5, 8)
The 'nested' function ran with args: (4, 6, 10, 16)
[5, 7, 11, 17]

Here you have two coroutines, with the outer coroutine calling the nested coroutine using the yield from syntax. You can see from the output to standard out that both coroutines run, and the final result is returned at the end of outer.

Incidentally, what is happening here under the hood is that the outer coroutine is actually suspended when it encounters the yield from statement. The nested coroutine is then placed on the event loop and the event loop runs it. The outer coroutine does not continue until nested completes and a result is available.

A couple things are worth noting. First, the yield from statement returns the result of the coroutine it runs. That is why you see an assignment to a variable in the example.

Second, why would you not simply call the function directly? This would be fine if it were a procedural function, but this is a coroutine. Calling it directly would return a generator rather than the value. You could write nested as a standard function, but consider the following situation where you would also want to be able to assign it to the event loop directly.

>>> loop.run_until_complete(nested(5, 10, 15))
The 'nested' function ran with args: (5, 10, 15)
[6, 11, 16]

The capability to have a coroutine call another coroutine using yield from addresses this. It increases the capability to reuse coroutines.

Futures and Tasks

Because most work using asyncio is done asynchronously, you must contend with how to deal with the results of functions that are run in this manner. The yield from statement provides one way to do this, but sometimes, for example, you want to run asynchronous functions in parallel.

In sequential programming, return values are straightforward. You run a function, and it returns its result. However, in asynchronous programming, while the function returns its result as before, what happens to the result then? There is no clear caller to return the result to.

Futures

A mechanism for dealing with this particular challenge is the Future object. Essentially, a Future is an object that is told about the status of an asynchronous function. This includes the status of the function—whether that function is running, has completed, or was canceled. This also includes the result of the function, or, if the function ended by raising an exception, the exception and traceback.

The Future is a standalone object. It is independent of the actual function that is running. It does nothing but store the state and result information.

Tasks

A Task is a subclass of Future, as well as what you will generally be using when programming with asyncio. Whenever a coroutine is scheduled on the event loop, that coroutine is wrapped in a Task. So, in the previous example, when you called run_until_complete and passed a coroutine, that coroutine was wrapped in a Task class and then executed. It was the Task that stored the result and handled providing it in the yield from statement.

The run_until_complete method is not the only way (or even the primary way) for a coroutine to be wrapped in a class, however. After all, in many applications, your event loop runs forever. How do tasks get placed on the event loop in such a system?

The primary way you do this is by using the asyncio.async method. This method will place a coroutine on the event loop, and return the associated Task.

To demonstrate this, first get the event loop and write a garden-variety coroutine, as shown here:

>>> import asyncio
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     asyncio.get_event_loop().stop()
...     return '%s tea' % variety
...
>>>

This is still a trivial task, but one new thing here that you have not seen yet is that the task actually stops the event loop. This is simply a nice workaround to dodge the fact that when you start the loop (with run_forever), it will run forever.

Next, register the task with the event loop.

>>> task = asyncio.async(make_tea('chamomile'))

This is all you actually need to do to register the task with the loop, but because the loop is not running, the task is not going to execute for now. Indeed, you can inspect the task object using the done and result methods and see this.

>>> task.done()
False
>>> task.result()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/
       futures.py", line 237, in result
    raise InvalidStateError('Result is not ready.')
asyncio.futures.InvalidStateError: Result is not ready.

Next, you must start the loop. It is okay to start the loop with run_forever now; the actual task will stop it as soon as the task completes because of the call to loop.stop().

>>> loop = asyncio.get_event_loop()
>>> loop.run_forever()
Now making chamomile tea.
>>>

Sure enough, the loop starts, runs the task, and then immediately stops. Now if you inspect the task variable, you will get different results.

>>> task.done()
True
>>> task.result()
'chamomile tea'

Whenever you create a Task object with asyncio.async, you will get a Task object back. You can inspect that object at any time to get the status or result of the task.

Callbacks

Another feature of Future objects (and therefore Task objects, because Task subclasses Future) is the capability to register callbacks to the Future. A callback is simply a function (or coroutine) that should execute once the Future is done, and which receives the Future as an argument.

In some ways, callbacks represent a reversal of the yield from model. When a coroutine uses yield from, that coroutine ensures that the nested coroutine runs before or during its execution. When you register a callback, you are working in the opposite direction. The callback is being attached to the original task, to run after the execution of the task.

You can add a callback to any Future object by using that object's add_done_callback method. Callbacks are expected to take a single argument, which is the Future object itself (which will contain the status and result, if applicable, of the underlying task).

Consider the following example of a callback in action:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     return '%s tea' % variety
...
>>> def confirm_tea(future):
...     print('The %s is made.' % future.result())
...
>>> task = asyncio.async(make_tea('green'))
>>> task.add_done_callback(confirm_tea)
>>>
>>> loop.run_until_complete(task)
Now making green tea.
The green tea is made.
'green tea'

The first thing that is happening is that you again made a make_tea coroutine, identical to the one in the previous example, except that this one does not stop the loop.

Next, notice the confirm_tea function. This is a plain function; it is not a coroutine. In fact, you cannot send a coroutine as a callback here. It will raise an exception when you run the loop if you try. This function receives the Future object (which is the task variable in this case) that it is registered to once the callback runs. The Future object contains the result of the coroutine—which is that is the 'green tea' string in this case.

Finally, notice the call to add_done_callback. This is where the confirm_tea method is assigned as a callback to the task. Also, notice that it is assigned to the task (a particular invocation of a coroutine), not the coroutine itself. If another task was registered to the loop with asyncio.async that called the same coroutine, it would not have this callback.

The output shows that both functions ran, in the order you expected. The return value is the return value from make_tea, provided to you because that is how run_until_complete works.

No Guarantee of Success

There is one important thing to note. Simply because a Future is done does not guarantee that it ran successfully. This example simply assumes that future.result() will be populated, but that may not be the case. The Task could have ended in an exception, in which case, attempting to access future.result() will raise that exception.

Similarly, it is possible to cancel a task (using the Future.cancel() method or by other means). If this occurs, the task will be marked Cancelled, and the callbacks will be scheduled. In this case, attempting to access future.result() will raise CancelledError.

Under the Hood

Internally, asyncio informs the Future object that it is done. The Future object then takes each of the callbacks registered against it and calls call_soon_threadsafe on each of them.

Be aware that there is no guarantee of order when it comes to callbacks. It is entirely possible (and fine) to register multiple callbacks to the same task. However, you do not have any way of controlling which callbacks will be run in which order.

Callbacks with Arguments

One limitation of the callback system is that, as noted, the callback receives the Future as a positional argument, and accepts no other arguments.

It is possible to send other arguments to a callback through the use of functools.partial. If you do this, however, the callback must still accept the Future as a positional argument. In practice, the Future is appended to the end of the positional arguments list before the callback is called.

Consider the following case of a callback that expects another argument:

>>> import asyncio
>>> import functools
>>>
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     return '%s tea' % variety
...
>>> def add_ingredient(ingredient, future):
...     print('Now adding %s to the %s.' % (ingredient, future.result()))
...
>>>
>>> task = asyncio.async(make_tea('herbal'))
>>> task.add_done_callback(functools.partial(add_ingredient, 'honey'))
>>>
>>> loop.run_until_complete(task)
Now making herbal tea.
Now adding honey to the herbal tea.
'herbal tea'

This is mostly similar to the previous example. The only significant difference is in how the callback is registered. Instead of passing the function object directly (as you did in the previous example), you instantiate a functools.partial object with the positional argument you are sending ('honey').

Again, notice that the add_ingredient function is written to accept two positional arguments, but the partial only specifies one argument. The Future object is sent as the last positional argument in cases where a partial is used. The function signature for add_ingredient reflects this.

Task Aggregation

The asyncio module provides a convenient way to aggregate tasks. You have two major reasons to do something like this. The first reason is to take some sort of action once any task in a set of tasks has completed. The second reason is to take some sort of action once all tasks in the set have completed.

Gathering Tasks

The first mechanism that asyncio provides for this purpose is the gather function. The gather function takes a sequence of coroutines or tasks and returns a single task that aggregates all of them (wrapping any coroutines it receives in tasks as appropriate).

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     return '%s tea' % variety
...
>>> meta_task = asyncio.gather(
...     make_tea('chamomile'),
...     make_tea('green'),
...     make_tea('herbal')
... )
...
>>> meta_task.done()
False
>>>
>>> loop.run_until_complete(meta_task)
Now making chamomile tea.
Now making herbal tea.
Now making green tea.
['chamomile tea', 'green tea', 'herbal tea']
>>> meta_task.done()
True

In this case, the asyncio.gather function received three coroutine objects. It wrapped them all in tasks under the hood, and returned a single task that serves as an aggregation of all three.

Notice that scheduling the meta_task object effectively schedules the three tasks gathered underneath it. Once you run the loop, the three subtasks all run.

In the case of a task created with asyncio.gather, the result is always a list, and that list contains the results of the individual tasks that were gathered. The order of the list of results is guaranteed to be the same order in which the tasks were gathered (but the tasks are not guaranteed to be run in that order). Therefore, the list of strings you got back are in the same order as the registered coroutines in the asyncio.gather call.

The asyncio.gather paradigm also provides the opportunity to add a callback to the set of tasks as a whole, rather than the individual tasks. What if you only want a callback to run once all of the tasks are completed, but it does not matter to you in which order they complete?

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     return '%s tea' % variety
...
>>> def mix(future):
...     print('Mixing the %s together.' % ' and '.join(future.result()))
...
>>> meta_task = asyncio.gather(make_tea('herbal'), make_tea('green'))
>>> meta_task.add_done_callback(mix)
>>>
>>> loop.run_until_complete(meta_task)
Now making green tea.
Now making herbal tea.
Mixing the green tea and herbal tea together.
['green tea', 'herbal tea']

The first thing that happened when you called run_until_complete was that both of the individual tasks gathered into meta_task ran, individually. Finally, the mix function ran, only after both of the individual tasks had run. This is because the meta_task is not considered to be done until after all of its individual tasks are done, so only once both individual tasks complete does it trigger the callback.

You can also see that the Future object that the mix function received was meta_task, not the individual tasks, and, therefore, its result method returned a list of both of the individual results.

Waiting on Tasks

Another tool that the asyncio module provides is the built-in wait coroutine. The asyncio.wait coroutine takes a sequence of coroutines or tasks (wrapping any coroutines in tasks) and returns once they are done. Note that the signature here is distinct from asyncio.gather. gather takes each coroutine or task as a single positional argument, whereas wait expects a list.

Additionally, wait accepts a parameter to return when any of its tasks complete, rather than only returning when all of them do. Regardless of whether this flag is set, the wait method always returns a two-tuple, with the first element being the Future objects that have completed, and the second element being those that are still pending.

Consider the following example that is similar to how you previously used asyncio.gather:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
...     print('Now making %s tea.' % variety)
...     return '%s tea' % variety
...
>>> coro = asyncio.wait([make_tea('chamomile'), make_tea('herbal')])
>>>
>>> loop.run_until_complete(coro)
Now making chamomile tea.
Now making herbal tea.
({Task(<coro>)<result='herbal tea'>, Task(<coro>)<result='chamomile tea'>}, set())

Note a couple of subtle differences here. First, unlike the gather method, the wait method returns a coroutine. This has its value; you can use it in a yield from statement, for example.

On the other hand, you are unable to attach callbacks directly to a coroutine returned from wait. If you want to do this, you must wrap it in a task using asyncio.async.

Also, the result is different. The asyncio.gather function aggregated the results in a list, and returned that. The result for asyncio.wait is a two-tuple containing the actual Future objects (which themselves contain their results). Additionally, the Future objects are reorganized. The asyncio.wait routine places them into two sets—one set for those that are done, and another set for those that are not. Because sets are themselves an unordered structure, that means you must rely on the Future objects to piece together which result corresponds to which task.

Timeouts

It is possible to have the asyncio.wait coroutine return when a specific amount of time has passed, regardless of whether all of the tasks have completed. To do this, you pass the timeout keyword argument to asyncio.wait.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([asyncio.sleep(5), asyncio.sleep(1)], timeout=3)
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>}, {Task(<sleep>)<PENDING>})

In this case, you are just using a coroutine provided by the asyncio module: asyncio.sleep. This simply waits for a given number of seconds, and then returns None. The timing in this example is set up so that one of the tasks (the second one) will complete before the wait function times out, but the other will not.

The first difference to note is that the second element of the two-tuple now has a task in it; the sleep coroutine that failed to complete in time is still pending. The other, however, did complete, and has a result (None).

The use of timeout does not necessitate that the entire time period designated by timeout must elapse. If all of the tasks complete before time expires, the coroutine will complete immediately.

Waiting on Any Task

One of the biggest features of asyncio.wait is the capability to have the coroutine return when any of the Future objects under its care completes. The asyncio.wait function also accepts a return_when keyword argument. By sending it a special constant (asyncio.FIRST_COMPLETED), the coroutine will complete once any task has finished, rather than waiting for every task.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([
...     asyncio.sleep(3),
...     asyncio.sleep(2),
...     asyncio.sleep(1),
... ], return_when=asyncio.FIRST_COMPLETED)
>>>
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>},
 {Task(<sleep>)<PENDING>, Task(<sleep>)<PENDING>})

In this case, the asyncio.wait call is given a list of three asyncio.sleep coroutines, which will sleep for 3, 2, and 1 seconds. Once the coroutine is called, it runs all the tasks underneath it. The asyncio.sleep coroutine that is only asked to wait for 1 second completes first, which completes the wait. Therefore, you get a two-tuple back with one item in the first set (tasks that are complete), and two items in the second set (tasks that are still pending).

Waiting on an Exception

It is also possible to have a call to asyncio.wait complete whenever it encounters a task that completed with an exception, rather than exiting normally. This is a valuable tool in situations where you want to trap the exceptional cases as early as possible and deal with them.

You can trigger this behavior using the return_from keyword argument as before, but by sending the asyncio.FIRST_EXCEPTION constant instead.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def raise_ex_after(seconds):
...     yield from asyncio.sleep(seconds)
...     raise RuntimeError('Raising an exception.')
...
>>> coro = asyncio.wait([
...     asyncio.sleep(1),
...     raise_ex_after(2),
...     asyncio.sleep(3),
... ], return_when=asyncio.FIRST_EXCEPTION)
>>>
>>> loop.run_until_complete(coro)
({Task(<raise_ex_after>)<exception=RuntimeError('Raising an exception.',)>,
  Task(<sleep>)<result=None>},
 {Task(<sleep>)<PENDING>})

In this case, the asyncio.wait coroutine stopped as soon as a task completed with an exception. This means that the 1-second asyncio.sleep completed successfully, and it is in the first set in the return value. The raise_ex_after coroutine also completed, so it is in the first set also. However, the fact that it raised an exception caused wait to trigger its completion before the 3-second sleep could complete, so it is returned in the second (pending) set.

Sometimes, there may not be any task that actually raises an exception (which is usually a convenient case). In this case, the wait completes once all of the tasks have completed as normal.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([
...     asyncio.sleep(1),
...     asyncio.sleep(2),
... ], return_when=asyncio.FIRST_EXCEPTION)
>>>
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>, Task(<sleep>)<result=None>}, set()) 

Queues

The asyncio module provides several common patterns that are built upon the fundamental building blocks of the event loop and Future objects. One of these is a basic queuing system.

A queue is a collection of tasks to be processed by a task runner. The Python ecosystem includes several third-party task queue utilities, with the most popular of these probably being celery. This is not a fully featured queuing application. Rather, the asyncio module provides simply the fundamental queue itself, which application developers can build on top of.

Why is Queue part of asyncio? This Queue class provides methods to be used in a sequential or an asynchronous context.

Consider first a very simple example of a Queue in action:

>>> import asyncio
>>> queue = asyncio.Queue()
>>> queue.put_nowait('foo')
>>> queue.qsize()
1
>>> queue.get_nowait()
'foo'
>>> queue.qsize()
0

In addition to being trivially simple, there is nothing particularly asynchronous going on here. You did not even bother to get or run the event loop. This is a very direct FIFO queue.

Note the use of the put_nowait and get_nowait methods. These methods are designed to perform the addition or removal of the item to or from the queue immediately. If, for example, you try to call get_nowait on an empty queue, you get a QueueEmpty exception.

>>> queue.get_nowait()
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/
       queues.py", line 206, in get_nowait
    raise QueueEmpty
asyncio.queues.QueueEmpty

The Queue class also provides a method called get. Instead of returning an exception on an empty queue, the get method will patiently wait for an item to be added to the queue, and then retrieve it from the queue and return it immediately. Unlike get_nowait, this method is a coroutine, and runs in an asynchronous context.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> queue = asyncio.Queue()
>>>
>>> queue.put_nowait('foo')
>>> loop.run_until_complete(queue.get())
'foo'

In this case, an item was already on the queue, so the get method still returns immediately. If there was not an item on the queue yet, a simple call to loop.run_until_complete would never complete, and block your interpreter.

You can use the timeout parameter in asyncio.wait to see this concept in action, though.

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> queue = asyncio.Queue()
>>>
>>> task = asyncio.async(queue.get())
>>> coro = asyncio.wait([task], timeout=1)
>>>
>>> loop.run_until_complete(coro)
(set(), {Task(<get>)<PENDING>})

At this point, there is still nothing on the queue, so the task to get the item off the queue is just continuing indefinitely. You also have the task variable, and can inspect its status.

>>> task.done()
False

Next, place an item on the queue, as shown here:

>>> queue.put_nowait('bar')

You will notice that the task still is not done yet, because the event loop is no longer running. The task is still registered, though, so register a callback to stop the loop once it completes, and it is possible to start it again.

>>> import functools
>>> def stop(l, future):
...     l.stop()
...
>>> task.add_done_callback(functools.partial(stop, loop))
>>>
>>> loop.run_forever()

Now, because there was an item on the queue, the task is done, and the task's result is the item on the queue ('bar').

>>> task.done()
True
>>> task.result()
'bar'

Maximum Size

It is also possible to give a Queue object a maximum size, by setting the maxsize keyword argument when creating the queue.

>>> import asyncio
>>> queue = asyncio.Queue(maxsize=5)

If you do this, the Queue will not allow any more than the maximum number of items onto the queue. A call to the put method will simply wait until a previous item is removed, and then (and only then) will it place the item on the queue. If you call put_nowait and the queue is full, it will raise QueueFull.

Servers

One of the most common uses of the asyncio module is to create services that can run as a daemon and accept commands. The asyncio module defines a Protocol class that is able to fire appropriate events on receiving or losing a connection, and when it receives data.

Additionally, the event loop defines a create_server method that opens a socket, allowing data to be sent to the event loop and on to the protocol.

Consider a simple server that can do nothing but add numbers and shut itself down.

import asyncio

class Shutdown(Exception): pass
class ServerProtocol(asyncio.Protocol): def connection_made(self, transport): self.transport = transport self.write('Welcome.')
def data_received(self, data): # Sanity check: Do nothing on empty commands. if not data: return
# Commands to this server shall be a single word, with # space separated arguments. message = data.decode('ascii') command = message.strip().split(' ')[0].lower() args = message.strip().split(' ')[1:]
# Sanity check: Verify the presence of the appropriate command. if not hasattr(self, 'command_%s' % command): self.write('Invalid command: %s' % command) return
# Run the appropriate command. try: return getattr(self, 'command_%s' % command)(*args) except Exception as ex: self.write('Error: %s\n' % str(ex))
def write(self, msg_string): string += '\n' self.transport.write(msg_string.encode('ascii', 'ignore'))
def command_add(self, *args): args = [int(i) for i in args] self.write('%d' % sum(args))
def command_shutdown(self): self.write('Okay. Shutting down.') raise KeyboardInterrupt

if __name__ == '__main__': loop = asyncio.get_event_loop() coro = loop.create_server(ServerProtocol, '127.0.0.1', 8000) asyncio.async(coro) try: loop.run_forever() except KeyboardInterrupt: pass

This is a somewhat long module, but a few details are worth noting. First, the ServerProtocol class subclasses asyncio.Protocol. The connection_made and data_received methods are defined in the superclass, but do nothing. The other three methods are custom.

Remember that when you make a socket connection between machines, you are essentially always sending bytes, not text strings. The write method here does that conversion in one place, rather than forcing you to convert to a byte string every time you want to write to the transport.

The guts of this are in the data_received method. It takes a line of data and tries to figure out what to do with it. It only understands two basic commands, and anything else is an error.

Finally, the block at the end of the file actually starts up the server, and runs it against the local machine on a particular port. This is all the code you need to start up a server and have it listen for commands.

You can verify that the server receives commands by starting it up and then, in another shell window, using telnet to connect to it.

$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is 'ˆ]'.
Welcome.
add 3 5
8
make_tea
Invalid command: make_tea
shutdown
Okay. Shutting down.
Connection closed by foreign host.

You have a very simple server. It can accept two commands: add and shutdown. It can provide errors if you try to issue a command it does not understand. And, the server is, in fact, able to shut itself down.

Summary

Python is, at its core, a sequential language. It is a sequential language that, with asyncio, is getting budding asynchronous features built in to the standard library.

One thing that makes asyncio valuable is that it enables you to write code that follows sequential patterns, but is actually asynchronous under the hood, by using the yield from statement. However, if you intend to write an asynchronous application, you still must understand the advantages and disadvantages of this paradigm.

As you have seen, many things are different. You may not always know in what order tasks will run. It is possible for tasks to be intentionally canceled. Finally, code may be registered to run using a callback system, rather than through direct sequential function calls. All of these things represent a break from “normal” Python programming.

Still, if you have a robust Python 3 application and need certain asynchronous elements, asyncio may be the right tool for you.

In Chapter 14, you learn about style norms and recommendations in Python.

    Reset