In general, most Python applications are sequential applications. That is, they usually run from a defined entry point to a defined exit point, with each execution being a single process from beginning to end.
This stands in contrast to many more asynchronous languages, such as JavaScript and Go. For example, JavaScript relies heavily on asynchronous work, with any web requests happening in the background being called in a separate thread, and relying on callbacks to run correct functions once data has loaded.
There is no right or wrong answer to whether a language should approach most problems sequentially or asynchronously, but cases certainly exist where one model is more useful than the other for particular problems. This is where the asyncio
module comes in. It makes it easy to do asynchronous work in Python when the problem warrants it.
Right now, asyncio
is a provisional module. While sweeping, backward-incompatible changes are unlikely (because Python shies away from such things once items have been placed in the standard library), it is likely that asyncio
may undergo significant revision in the next couple of Python versions.
The asyncio
module was introduced in Python 3.4, and is not available in Python 2. If you are on Python 3.3, you can get it from PyPI; it is not yet in the standard library. Therefore, if you want to use the features provided by asyncio
, you will be limiting yourself to newer versions of Python. Similarly, the asyncio
module has been under active development over the lifetime of Python 3.4, so you will want to be on the newest incremental revision if possible.
Because most Python applications are sequential applications, several concepts may be foreign to you if you have not done a reasonable amount of work outside of sequential languages. This chapter covers these concepts in detail.
The fundamental way that most asynchronous applications work is via an event loop that runs in the background. When something needs to run, it is registered to the event loop.
Registering a function to an event loop causes it to be made into a task. The event loop is then responsible for running the task as soon as it can get to it. Alternatively, sometimes the event loop is told to wait a certain amount of time, and then run the task.
Although you may not be familiar with writing code that uses event loops, you use programs that depend on them frequently. Almost any server is an event loop. A database server, for example, sits around and waits for connections and queries, and then executes queries as fast as possible. If two different connections provide two different queries, it prioritizes and runs both of them. Desktop applications are also event-driven, displaying a screen that allows input in various places and responding to said inputs. Most video games are also event loops. The game waits for control input and takes action based on it.
In most cases, you do not need to create an event loop object yourself. You can get a BaseEventLoop
object by using the asyncio.get_event_loop()
function. What you will actually get will be a subclass; which subclass you get is platform-dependent. You do not need to worry about this implementation detail too much. The API between all of them is the same. However, a few platform-dependent limitations exist.
When you first get the loop object, it will not be running.
>>> loop = asyncio.get_event_loop()
>>> loop.is_running()
False
The following event loop does not have anything registered to it yet, but you can run it anyway:
>>> loop.run_forever()
There is one minor hitch, however. If you ran this, you just lost control of your Python interpreter, because the loop is running in it forever. Press Ctrl+C to get your interpreter back. (Of course, this will stop the loop.)
Unfortunately, asyncio
does not have a “fire and forget” method to run a loop in a separate thread. For most application code, this is actually not a huge hindrance, because you are probably writing a server or daemon where the purpose of the program is to run the loop in the foreground and have other processes issue commands.
For testing or experimenting, however, this presents a serious challenge, because the majority of asyncio
methods are not actually thread-safe. For most examples in this chapter, you will get around this by simply not running the loop forever.
Tasks are primarily registered to the loop using call_soon
, which operates as a FIFO (“first in, first out”) queue. Therefore, most examples in this chapter will simply include a final task that stops the loop, as shown here:
>>> import functools
>>> def hello_world():
... print('Hello world!')
...
>>> def stop_loop(loop):
... print('Stopping loop.')
... loop.stop()
...
>>> loop.call_soon(hello_world)
Handle(<function hello_world at 0x1003c0b70>, ())
>>> loop.call_soon(functools.partial(stop_loop, loop))
Handle(functools.partial(<function stop_loop at 0x101ccf268>,
<asyncio.unix_events._UnixSelectorEventLoop
object at 0x1007399e8>), ())
>>> loop.run_forever()
Hello world!
Stopping loop.
>>>
In this example, the hello_world
function was registered to the loop. Then, the stop_loop
function was also registered. When the loop was started (with loop.run_forever()
), it ran both tasks, in order. Because the second task stopped the loop, it exited the loop once the task completed.
It is also possible to register a task, but indicate that it should not be called until later. You can do this using the call_later
method, which takes a delay (in number of seconds) as well as the function to be called.
>>> loop.call_later(10, hello_world)
TimerHandle(60172.411042585, <function hello_world at 0x1003c0b70>, ())
>>> loop.call_later(20, functools.partial(stop_loop, loop))
TimerHandle(60194.829461844, functools.partial(
<function stop_loop at 0x101ccf268>,
<asyncio.unix_events._UnixSelectorEventLoop object at 0x1007399e8>),
())
>>> loop.run_forever()
Note that it is possible to have two or more delayed calls come up at the same time. If this happens, they may occur in either order.
You may have also noticed the use of functools.partial
in the previous example. Most asyncio
methods that take functions only take function objects (or other callables), but not arguments to be sent to those functions once they are called. The functools.partial
method is a solution to that problem. The partial
method itself takes the arguments and keyword arguments that must be passed to the underlying function when it is called.
For instance, the hello_world
function in the previous example is actually entirely unnecessary. It is an analogue to functools.partial(print, 'Hello world!')
. Therefore, the previous example could be written as follows:
>>> import functools
>>> def stop_loop(loop):
... print('Stopping loop.')
... loop.stop()
...
>>> loop.call_soon(functools.partial(print, 'Hello world! ')
Handle(functools.partial(<built-in function print>, 'Hello world'), ())
>>> loop.call_soon(functools.partial(stop_loop, loop))
Handle(functools.partial(<function stop_loop at 0x101ccf268>,
<asyncio.unix_events._UnixSelectorEventLoop object
at 0x1007399e8>), ())
>>> loop.run_forever()
Hello world!
Stopping loop.
>>>
Why have partials at all? After all, it is usually easy enough to wrap such calls in functions that do not require arguments. The answer is in debugging. The partial
object knows what it is calling and with what arguments. This is represented as data to the partial
, and the partial
uses that data when called to perform the proper function call. By contrast, the hello_world
function is just that: a function. The function call within it is code. There is no way to easily inspect the hello_world
function and pull out the underlying call.
You can see this difference by creating a partial and then inspecting its underlying function and arguments.
>>> partial = functools.partial(stop_loop, loop)
>>> partial.func
<function stop_loop at 0x10223e488>
>>> partial.args
(<asyncio.unix_events._UnixSelectorEventLoop object at 0x102238b70>,)
It is also possible to run the loop until a task completes, as shown here:
>>> @asyncio.coroutine
... def trivial():
... return 'Hello world!'
...
>>> loop.run_until_complete(trivial())
'Hello world!'
In this example, the @asyncio.coroutine
decorator transforms this normal Python function into a coroutine, which is covered in more detail later. When you call run_until_complete
, it registers the task and then runs the loop only until the task completes. Because it is the only task in the queue, it completes and exits the loop, returning the result of that task.
It is possible to run an event loop in the background, using the threading
module that is available in the Python standard library.
>>> import asyncio
>>> import threading
>>>
>>> def run_loop_forever_in_background(loop):
... def thread_func(l):
... asyncio.set_event_loop(l)
... l.run_forever()
... thread = threading.Thread(target=thread_func, args=(loop,))
... thread.start()
... return thread
...
>>>
>>> loop = asyncio.get_event_loop()
>>> run_loop_forever_in_background(loop)
<Thread(Thread-1, started 4344254464)>
>>>
>>> loop.is_running()
True
Note that this is a useful idiom for getting started, but is almost certainly not what you will want in your final application. (For example, you will have a hard time stopping the loop; loop.stop
does not work anymore.) It is fine for learning, though.
This loop is still relatively uninteresting. After all, while it is running, it has nothing to do. You have not registered any tasks to it yet. Consider what happens when you register a trivial task to run as soon as possible.
>>> loop.call_soon_threadsafe(functools.partial(print, 'Hello world'))
Handle(functools.partial(<built-in function print>, 'Hello world'), ())
>>> Hello world
This output might be a bit confusing. First, you called call_soon_threadsafe
. This tells the loop to run the given function asynchronously as soon as possible. Note that, in most cases, you will simply use the call_soon
function, because you will not be running the event loop in a thread.
The call_soon_threadsafe
function returns a Handle
object. This is an object with one method: cancel
. It is able to cancel the task entirely if appropriate.
Next, you have the >>> prompt (suggesting that the interpreter expects input), followed by Hello world
. That was printed from the previous function call, after the prompt was written to the screen.
Because event loops are not thread safe, the remainder of the examples in this chapter use other models to explain the concepts.
Most functions that are used within asyncio
should be coroutines. A coroutine is a special kind of function designed to run within an event loop. Additionally, if a coroutine is created but is never run, an error will be issued to the logs.
You can make a function into a coroutine by decorating it with @asyncio.coroutine
. Consider this example of running a simple coroutine with the event handler's run_until_complete
:
>>> import asyncio
>>> @asyncio.coroutine
... def coro_sum(*args):
... answer = 0
... for i in args:
... answer += i
... return answer
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(coro_sum(1, 2, 3, 4, 5))
15
The coro_sum
function created here is no longer a regular function; it is a coroutine, and it is called by the event loop. It is worth noting that you can no longer call it the regular way and get what you may expect.
>>> coro_sum(1, 2, 3, 4, 5)
<generator object coro at 0x104056e10>
Coroutines are, in fact, special generators that are consumed by the event loop. That is why the run_until_complete
method is able to take what appears to be a standard function call. The function is not actually run at that point. The event loop is what consumes the generator and ultimately extracts the result.
What actually happens under the hood essentially looks like this:
>>> try:
... next(coro_sum(1, 2, 3, 4, 5))
... except StopIteration as ex:
... ex.value
...
15
The generator does not yield any values. It immediately raises StopIteration
. The StopIteration
exception is given a value, which is the return value of the function. The event loop is then able to extract this and handle it appropriately.
Coroutines provide a special mechanism to call other coroutines (or Future
instances, as discussed shortly) in a fashion that mimics that of sequential programming. By using the yield from
statement, a coroutine can run another coroutine, and the statement returns the result. This is one mechanism available to write asynchronous code in a sequential manner.
The following simple coroutine calls another coroutine using yield from
:
>>> import asyncio
>>> @asyncio.coroutine
... def nested(*args):
... print('The 'nested' function ran with args: %r' % (args,))
... return [i + 1 for i in args]
...
>>> @asyncio.coroutine
... def outer(*args):
... print('The 'outer' function ran with args: %r' % (args,))
... answer = yield from nested(*[i * 2 for i in args])
... return answer
...
>>> loop = asyncio.get_event_loop()
>>> loop.run_until_complete(outer(2, 3, 5, 8))
The 'outer' function ran with args: (2, 3, 5, 8)
The 'nested' function ran with args: (4, 6, 10, 16)
[5, 7, 11, 17]
Here you have two coroutines, with the outer coroutine calling the nested coroutine using the yield from
syntax. You can see from the output to standard out that both coroutines run, and the final result is returned at the end of outer
.
Incidentally, what is happening here under the hood is that the outer
coroutine is actually suspended when it encounters the yield from
statement. The nested
coroutine is then placed on the event loop and the event loop runs it. The outer
coroutine does not continue until nested
completes and a result is available.
A couple things are worth noting. First, the yield from
statement returns the result of the coroutine it runs. That is why you see an assignment to a variable in the example.
Second, why would you not simply call the function directly? This would be fine if it were a procedural function, but this is a coroutine. Calling it directly would return a generator rather than the value. You could write nested
as a standard function, but consider the following situation where you would also want to be able to assign it to the event loop directly.
>>> loop.run_until_complete(nested(5, 10, 15))
The 'nested' function ran with args: (5, 10, 15)
[6, 11, 16]
The capability to have a coroutine call another coroutine using yield from
addresses this. It increases the capability to reuse coroutines.
Because most work using asyncio
is done asynchronously, you must contend with how to deal with the results of functions that are run in this manner. The yield from
statement provides one way to do this, but sometimes, for example, you want to run asynchronous functions in parallel.
In sequential programming, return values are straightforward. You run a function, and it returns its result. However, in asynchronous programming, while the function returns its result as before, what happens to the result then? There is no clear caller to return the result to.
A mechanism for dealing with this particular challenge is the Future
object. Essentially, a Future
is an object that is told about the status of an asynchronous function. This includes the status of the function—whether that function is running, has completed, or was canceled. This also includes the result of the function, or, if the function ended by raising an exception, the exception and traceback.
The Future
is a standalone object. It is independent of the actual function that is running. It does nothing but store the state and result information.
A Task
is a subclass of Future
, as well as what you will generally be using when programming with asyncio
. Whenever a coroutine is scheduled on the event loop, that coroutine is wrapped in a Task
. So, in the previous example, when you called run_until_complete
and passed a coroutine, that coroutine was wrapped in a Task
class and then executed. It was the Task
that stored the result and handled providing it in the yield from
statement.
The run_until_complete
method is not the only way (or even the primary way) for a coroutine to be wrapped in a class, however. After all, in many applications, your event loop runs forever. How do tasks get placed on the event loop in such a system?
The primary way you do this is by using the asyncio.async
method. This method will place a coroutine on the event loop, and return the associated Task
.
To demonstrate this, first get the event loop and write a garden-variety coroutine, as shown here:
>>> import asyncio
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... asyncio.get_event_loop().stop()
... return '%s tea' % variety
...
>>>
This is still a trivial task, but one new thing here that you have not seen yet is that the task actually stops the event loop. This is simply a nice workaround to dodge the fact that when you start the loop (with run_forever
), it will run forever.
Next, register the task with the event loop.
>>> task = asyncio.async(make_tea('chamomile'))
This is all you actually need to do to register the task with the loop, but because the loop is not running, the task is not going to execute for now. Indeed, you can inspect the task object using the done
and result
methods and see this.
>>> task.done()
False
>>> task.result()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/
futures.py", line 237, in result
raise InvalidStateError('Result is not ready.')
asyncio.futures.InvalidStateError: Result is not ready.
Next, you must start the loop. It is okay to start the loop with run_forever
now; the actual task will stop it as soon as the task completes because of the call to loop.stop()
.
>>> loop = asyncio.get_event_loop()
>>> loop.run_forever()
Now making chamomile tea.
>>>
Sure enough, the loop starts, runs the task, and then immediately stops. Now if you inspect the task
variable, you will get different results.
>>> task.done()
True
>>> task.result()
'chamomile tea'
Whenever you create a Task
object with asyncio.async
, you will get a Task
object back. You can inspect that object at any time to get the status or result of the task.
Another feature of Future
objects (and therefore Task
objects, because Task
subclasses Future
) is the capability to register callbacks to the Future
. A callback is simply a function (or coroutine) that should execute once the Future
is done, and which receives the Future
as an argument.
In some ways, callbacks represent a reversal of the yield from
model. When a coroutine uses yield from
, that coroutine ensures that the nested coroutine runs before or during its execution. When you register a callback, you are working in the opposite direction. The callback is being attached to the original task, to run after the execution of the task.
You can add a callback to any Future
object by using that object's add_done_callback
method. Callbacks are expected to take a single argument, which is the Future
object itself (which will contain the status and result, if applicable, of the underlying task).
Consider the following example of a callback in action:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... return '%s tea' % variety
...
>>> def confirm_tea(future):
... print('The %s is made.' % future.result())
...
>>> task = asyncio.async(make_tea('green'))
>>> task.add_done_callback(confirm_tea)
>>>
>>> loop.run_until_complete(task)
Now making green tea.
The green tea is made.
'green tea'
The first thing that is happening is that you again made a make_tea
coroutine, identical to the one in the previous example, except that this one does not stop the loop.
Next, notice the confirm_tea
function. This is a plain function; it is not a coroutine. In fact, you cannot send a coroutine as a callback here. It will raise an exception when you run the loop if you try. This function receives the Future
object (which is the task
variable in this case) that it is registered to once the callback runs. The Future
object contains the result of the coroutine—which is that is the 'green tea'
string in this case.
Finally, notice the call to add_done_callback
. This is where the confirm_tea
method is assigned as a callback to the task. Also, notice that it is assigned to the task (a particular invocation of a coroutine), not the coroutine itself. If another task was registered to the loop with asyncio.async
that called the same coroutine, it would not have this callback.
The output shows that both functions ran, in the order you expected. The return value is the return value from make_tea
, provided to you because that is how run_until_complete
works.
There is one important thing to note. Simply because a Future
is done does not guarantee that it ran successfully. This example simply assumes that future.result()
will be populated, but that may not be the case. The Task
could have ended in an exception, in which case, attempting to access future.result()
will raise that exception.
Similarly, it is possible to cancel a task (using the Future.cancel()
method or by other means). If this occurs, the task will be marked Cancelled
, and the callbacks will be scheduled. In this case, attempting to access future.result()
will raise CancelledError
.
Internally, asyncio
informs the Future
object that it is done. The Future
object then takes each of the callbacks registered against it and calls call_soon_threadsafe
on each of them.
Be aware that there is no guarantee of order when it comes to callbacks. It is entirely possible (and fine) to register multiple callbacks to the same task. However, you do not have any way of controlling which callbacks will be run in which order.
One limitation of the callback system is that, as noted, the callback receives the Future
as a positional argument, and accepts no other arguments.
It is possible to send other arguments to a callback through the use of functools.partial
. If you do this, however, the callback must still accept the Future
as a positional argument. In practice, the Future
is appended to the end of the positional arguments list before the callback is called.
Consider the following case of a callback that expects another argument:
>>> import asyncio
>>> import functools
>>>
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... return '%s tea' % variety
...
>>> def add_ingredient(ingredient, future):
... print('Now adding %s to the %s.' % (ingredient, future.result()))
...
>>>
>>> task = asyncio.async(make_tea('herbal'))
>>> task.add_done_callback(functools.partial(add_ingredient, 'honey'))
>>>
>>> loop.run_until_complete(task)
Now making herbal tea.
Now adding honey to the herbal tea.
'herbal tea'
This is mostly similar to the previous example. The only significant difference is in how the callback is registered. Instead of passing the function object directly (as you did in the previous example), you instantiate a functools.partial
object with the positional argument you are sending ('honey'
).
Again, notice that the add_ingredient
function is written to accept two positional arguments, but the partial
only specifies one argument. The Future
object is sent as the last positional argument in cases where a partial
is used. The function signature for add_ingredient
reflects this.
The asyncio
module provides a convenient way to aggregate tasks. You have two major reasons to do something like this. The first reason is to take some sort of action once any task in a set of tasks has completed. The second reason is to take some sort of action once all tasks in the set have completed.
The first mechanism that asyncio
provides for this purpose is the gather
function. The gather
function takes a sequence of coroutines or tasks and returns a single task that aggregates all of them (wrapping any coroutines it receives in tasks as appropriate).
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... return '%s tea' % variety
...
>>> meta_task = asyncio.gather(
... make_tea('chamomile'),
... make_tea('green'),
... make_tea('herbal')
... )
...
>>> meta_task.done()
False
>>>
>>> loop.run_until_complete(meta_task)
Now making chamomile tea.
Now making herbal tea.
Now making green tea.
['chamomile tea', 'green tea', 'herbal tea']
>>> meta_task.done()
True
In this case, the asyncio.gather
function received three coroutine objects. It wrapped them all in tasks under the hood, and returned a single task that serves as an aggregation of all three.
Notice that scheduling the meta_task
object effectively schedules the three tasks gathered underneath it. Once you run the loop, the three subtasks all run.
In the case of a task created with asyncio.gather
, the result is always a list, and that list contains the results of the individual tasks that were gathered. The order of the list of results is guaranteed to be the same order in which the tasks were gathered (but the tasks are not guaranteed to be run in that order). Therefore, the list of strings you got back are in the same order as the registered coroutines in the asyncio.gather
call.
The asyncio.gather
paradigm also provides the opportunity to add a callback to the set of tasks as a whole, rather than the individual tasks. What if you only want a callback to run once all of the tasks are completed, but it does not matter to you in which order they complete?
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... return '%s tea' % variety
...
>>> def mix(future):
... print('Mixing the %s together.' % ' and '.join(future.result()))
...
>>> meta_task = asyncio.gather(make_tea('herbal'), make_tea('green'))
>>> meta_task.add_done_callback(mix)
>>>
>>> loop.run_until_complete(meta_task)
Now making green tea.
Now making herbal tea.
Mixing the green tea and herbal tea together.
['green tea', 'herbal tea']
The first thing that happened when you called run_until_complete
was that both of the individual tasks gathered into meta_task
ran, individually. Finally, the mix
function ran, only after both of the individual tasks had run. This is because the meta_task
is not considered to be done until after all of its individual tasks are done, so only once both individual tasks complete does it trigger the callback.
You can also see that the Future
object that the mix
function received was meta_task
, not the individual tasks, and, therefore, its result
method returned a list of both of the individual results.
Another tool that the asyncio
module provides is the built-in wait
coroutine. The asyncio.wait
coroutine takes a sequence of coroutines or tasks (wrapping any coroutines in tasks) and returns once they are done. Note that the signature here is distinct from asyncio.gather
. gather
takes each coroutine or task as a single positional argument, whereas wait
expects a list.
Additionally, wait
accepts a parameter to return when any of its tasks complete, rather than only returning when all of them do. Regardless of whether this flag is set, the wait
method always returns a two-tuple, with the first element being the Future
objects that have completed, and the second element being those that are still pending.
Consider the following example that is similar to how you previously used asyncio.gather
:
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def make_tea(variety):
... print('Now making %s tea.' % variety)
... return '%s tea' % variety
...
>>> coro = asyncio.wait([make_tea('chamomile'), make_tea('herbal')])
>>>
>>> loop.run_until_complete(coro)
Now making chamomile tea.
Now making herbal tea.
({Task(<coro>)<result='herbal tea'>, Task(<coro>)<result='chamomile tea'>}, set())
Note a couple of subtle differences here. First, unlike the gather
method, the wait
method returns a coroutine. This has its value; you can use it in a yield from
statement, for example.
On the other hand, you are unable to attach callbacks directly to a coroutine returned from wait
. If you want to do this, you must wrap it in a task using asyncio.async
.
Also, the result is different. The asyncio.gather
function aggregated the results in a list, and returned that. The result for asyncio.wait
is a two-tuple containing the actual Future
objects (which themselves contain their results). Additionally, the Future
objects are reorganized. The asyncio.wait routine
places them into two sets—one set for those that are done, and another set for those that are not. Because sets are themselves an unordered structure, that means you must rely on the Future
objects to piece together which result corresponds to which task.
It is possible to have the asyncio.wait
coroutine return when a specific amount of time has passed, regardless of whether all of the tasks have completed. To do this, you pass the timeout
keyword argument to asyncio.wait
.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([asyncio.sleep(5), asyncio.sleep(1)], timeout=3)
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>}, {Task(<sleep>)<PENDING>})
In this case, you are just using a coroutine provided by the asyncio
module: asyncio.sleep
. This simply waits for a given number of seconds, and then returns None
. The timing in this example is set up so that one of the tasks (the second one) will complete before the wait
function times out, but the other will not.
The first difference to note is that the second element of the two-tuple now has a task in it; the sleep
coroutine that failed to complete in time is still pending. The other, however, did complete, and has a result (None
).
The use of timeout
does not necessitate that the entire time period designated by timeout
must elapse. If all of the tasks complete before time expires, the coroutine will complete immediately.
One of the biggest features of asyncio.wait
is the capability to have the coroutine return when any of the Future
objects under its care completes. The asyncio.wait
function also accepts a return_when
keyword argument. By sending it a special constant (asyncio.FIRST_COMPLETED
), the coroutine will complete once any task has finished, rather than waiting for every task.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([
... asyncio.sleep(3),
... asyncio.sleep(2),
... asyncio.sleep(1),
... ], return_when=asyncio.FIRST_COMPLETED)
>>>
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>},
{Task(<sleep>)<PENDING>, Task(<sleep>)<PENDING>})
In this case, the asyncio.wait
call is given a list of three asyncio.sleep
coroutines, which will sleep for 3, 2, and 1 seconds. Once the coroutine is called, it runs all the tasks underneath it. The asyncio.sleep
coroutine that is only asked to wait for 1 second completes first, which completes the wait
. Therefore, you get a two-tuple back with one item in the first set (tasks that are complete), and two items in the second set (tasks that are still pending).
It is also possible to have a call to asyncio.wait
complete whenever it encounters a task that completed with an exception, rather than exiting normally. This is a valuable tool in situations where you want to trap the exceptional cases as early as possible and deal with them.
You can trigger this behavior using the return_from
keyword argument as before, but by sending the asyncio.FIRST_EXCEPTION
constant instead.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> @asyncio.coroutine
... def raise_ex_after(seconds):
... yield from asyncio.sleep(seconds)
... raise RuntimeError('Raising an exception.')
...
>>> coro = asyncio.wait([
... asyncio.sleep(1),
... raise_ex_after(2),
... asyncio.sleep(3),
... ], return_when=asyncio.FIRST_EXCEPTION)
>>>
>>> loop.run_until_complete(coro)
({Task(<raise_ex_after>)<exception=RuntimeError('Raising an exception.',)>,
Task(<sleep>)<result=None>},
{Task(<sleep>)<PENDING>})
In this case, the asyncio.wait
coroutine stopped as soon as a task completed with an exception. This means that the 1-second asyncio.sleep
completed successfully, and it is in the first set in the return value. The raise_ex_after
coroutine also completed, so it is in the first set also. However, the fact that it raised an exception caused wait
to trigger its completion before the 3-second sleep could complete, so it is returned in the second (pending) set.
Sometimes, there may not be any task that actually raises an exception (which is usually a convenient case). In this case, the wait completes once all of the tasks have completed as normal.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>>
>>> coro = asyncio.wait([
... asyncio.sleep(1),
... asyncio.sleep(2),
... ], return_when=asyncio.FIRST_EXCEPTION)
>>>
>>> loop.run_until_complete(coro)
({Task(<sleep>)<result=None>, Task(<sleep>)<result=None>}, set())
The asyncio
module provides several common patterns that are built upon the fundamental building blocks of the event loop and Future
objects. One of these is a basic queuing system.
A queue is a collection of tasks to be processed by a task runner. The Python ecosystem includes several third-party task queue utilities, with the most popular of these probably being celery
. This is not a fully featured queuing application. Rather, the asyncio
module provides simply the fundamental queue itself, which application developers can build on top of.
Why is Queue
part of asyncio
? This Queue
class provides methods to be used in a sequential or an asynchronous context.
Consider first a very simple example of a Queue
in action:
>>> import asyncio
>>> queue = asyncio.Queue()
>>> queue.put_nowait('foo')
>>> queue.qsize()
1
>>> queue.get_nowait()
'foo'
>>> queue.qsize()
0
In addition to being trivially simple, there is nothing particularly asynchronous going on here. You did not even bother to get or run the event loop. This is a very direct FIFO queue.
Note the use of the put_nowait
and get_nowait
methods. These methods are designed to perform the addition or removal of the item to or from the queue immediately. If, for example, you try to call get_nowait
on an empty queue, you get a QueueEmpty
exception.
>>> queue.get_nowait()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/asyncio/
queues.py", line 206, in get_nowait
raise QueueEmpty
asyncio.queues.QueueEmpty
The Queue
class also provides a method called get
. Instead of returning an exception on an empty queue, the get
method will patiently wait for an item to be added to the queue, and then retrieve it from the queue and return it immediately. Unlike get_nowait
, this method is a coroutine, and runs in an asynchronous context.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> queue = asyncio.Queue()
>>>
>>> queue.put_nowait('foo')
>>> loop.run_until_complete(queue.get())
'foo'
In this case, an item was already on the queue, so the get
method still returns immediately. If there was not an item on the queue yet, a simple call to loop.run_until_complete
would never complete, and block your interpreter.
You can use the timeout
parameter in asyncio.wait
to see this concept in action, though.
>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> queue = asyncio.Queue()
>>>
>>> task = asyncio.async(queue.get())
>>> coro = asyncio.wait([task], timeout=1)
>>>
>>> loop.run_until_complete(coro)
(set(), {Task(<get>)<PENDING>})
At this point, there is still nothing on the queue, so the task to get the item off the queue is just continuing indefinitely. You also have the task
variable, and can inspect its status.
>>> task.done()
False
Next, place an item on the queue, as shown here:
>>> queue.put_nowait('bar')
You will notice that the task still is not done yet, because the event loop is no longer running. The task
is still registered, though, so register a callback to stop the loop once it completes, and it is possible to start it again.
>>> import functools
>>> def stop(l, future):
... l.stop()
...
>>> task.add_done_callback(functools.partial(stop, loop))
>>>
>>> loop.run_forever()
Now, because there was an item on the queue, the task is done, and the task's result is the item on the queue ('bar'
).
>>> task.done()
True
>>> task.result()
'bar'
It is also possible to give a Queue
object a maximum size, by setting the maxsize
keyword argument when creating the queue.
>>> import asyncio
>>> queue = asyncio.Queue(maxsize=5)
If you do this, the Queue
will not allow any more than the maximum number of items onto the queue. A call to the put
method will simply wait until a previous item is removed, and then (and only then) will it place the item on the queue. If you call put_nowait
and the queue is full, it will raise QueueFull
.
One of the most common uses of the asyncio
module is to create services that can run as a daemon and accept commands. The asyncio
module defines a Protocol
class that is able to fire appropriate events on receiving or losing a connection, and when it receives data.
Additionally, the event loop defines a create_server
method that opens a socket, allowing data to be sent to the event loop and on to the protocol.
Consider a simple server that can do nothing but add numbers and shut itself down.
import asyncio
class Shutdown(Exception):
pass
class ServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
self.transport = transport
self.write('Welcome.')
def data_received(self, data):
# Sanity check: Do nothing on empty commands.
if not data:
return
# Commands to this server shall be a single word, with
# space separated arguments.
message = data.decode('ascii')
command = message.strip().split(' ')[0].lower()
args = message.strip().split(' ')[1:]
# Sanity check: Verify the presence of the appropriate command.
if not hasattr(self, 'command_%s' % command):
self.write('Invalid command: %s' % command)
return
# Run the appropriate command.
try:
return getattr(self, 'command_%s' % command)(*args)
except Exception as ex:
self.write('Error: %s\n' % str(ex))
def write(self, msg_string):
string += '\n'
self.transport.write(msg_string.encode('ascii', 'ignore'))
def command_add(self, *args):
args = [int(i) for i in args]
self.write('%d' % sum(args))
def command_shutdown(self):
self.write('Okay. Shutting down.')
raise KeyboardInterrupt
if __name__ == '__main__':
loop = asyncio.get_event_loop()
coro = loop.create_server(ServerProtocol, '127.0.0.1', 8000)
asyncio.async(coro)
try:
loop.run_forever()
except KeyboardInterrupt:
pass
This is a somewhat long module, but a few details are worth noting. First, the ServerProtocol
class subclasses asyncio.Protocol
. The connection_made
and data_received
methods are defined in the superclass, but do nothing. The other three methods are custom.
Remember that when you make a socket connection between machines, you are essentially always sending bytes, not text strings. The write
method here does that conversion in one place, rather than forcing you to convert to a byte string every time you want to write to the transport.
The guts of this are in the data_received
method. It takes a line of data and tries to figure out what to do with it. It only understands two basic commands, and anything else is an error.
Finally, the block at the end of the file actually starts up the server, and runs it against the local machine on a particular port. This is all the code you need to start up a server and have it listen for commands.
You can verify that the server receives commands by starting it up and then, in another shell window, using telnet
to connect to it.
$ telnet 127.0.0.1 8000
Trying 127.0.0.1...
Connected to localhost.
Escape character is 'ˆ]'.
Welcome.
add 3 5
8
make_tea
Invalid command: make_tea
shutdown
Okay. Shutting down.
Connection closed by foreign host.
You have a very simple server. It can accept two commands: add
and shutdown
. It can provide errors if you try to issue a command it does not understand. And, the server is, in fact, able to shut itself down.
Python is, at its core, a sequential language. It is a sequential language that, with asyncio
, is getting budding asynchronous features built in to the standard library.
One thing that makes asyncio
valuable is that it enables you to write code that follows sequential patterns, but is actually asynchronous under the hood, by using the yield from
statement. However, if you intend to write an asynchronous application, you still must understand the advantages and disadvantages of this paradigm.
As you have seen, many things are different. You may not always know in what order tasks will run. It is possible for tasks to be intentionally canceled. Finally, code may be registered to run using a callback system, rather than through direct sequential function calls. All of these things represent a break from “normal” Python programming.
Still, if you have a robust Python 3 application and need certain asynchronous elements, asyncio
may be the right tool for you.
In Chapter 14, you learn about style norms and recommendations in Python.