Developing a computational pipeline using the asyncio module in Python 3

Asynchronous Programming

"Asynchronous programming" is a vague concept that will likely bring to mind a range of related ideas  multithreading, callbacks, event-driven programming, network I/O, hardware interrupts, etc. Fundamentally, asynchronous programming refers to a separation between the conceptual control flow of a program or subroutine and its actual flow of execution  that there's an input that needs to be processed, but there might be a delay in retrieving that input, and there are other things that might need to be done while we're waiting for that input to be ready.

This concept of "there's more than one thing to be done" is so common in most applications that we tend to think more in terms of the patterns we use for solving it than the actual problem itself. A UI will need to have callbacks to deal with user-triggered events like "button clicked". A background worker will have to have its own thread which (say) schedules itself to review its tasks on a timed basis.

But the usage of callbacks or explicit worker threads quickly gets ugly when attempting to maintain a data processing pipeline. In this context, a set of data might need to be delegated to multiple sub-processes, and only once those processes are finished can the next stage of data processing take place. A serial approach (executing each task one after another) will suffer from a loss of performance by not taking advantage of potential parallelism. A callback approach will have to juggle passing along all data that might be needed further down the pipeline, and furthermore will have to be negotiated by some logical process that knows when all of the data needed for the next step is available. A dedicated thread approach will have to intelligently decide how to delegate tasks to threads  are we okay with creating a new thread for every single subprocess, or could it overwhelm the threading system? If we create a thread pool, are we setting ourselves up for a potential deadlock when we have a bunch of sleeping threads waiting on the result of jobs that are waiting in the queue? Can we properly engineer a solution that will handle the potentially high variation in the number and complexity of jobs that enter the pipeline?

Asynchronous Frameworks and the asyncio Module

The hallmark of an asynchronous framework is that it provides the ability to write a pipeline in a "synchronous-ish" looking way while defining the context-switching points where the framework will re-delegate control to another asynchronous method until the data required to continue is available. In Python 3.5 this is provided with asyncio module and a new type of functions called coroutines, defined using the new "async" and "await" keywords.

Note: In Python 3.4, coroutines were created using a modified form of generators. This is confusing because generators are fundamentally the opposite of coroutines: as explained by David Beazley, generators produce data while coroutines consume data. While it's possible to use the "yield" syntax as a consumer of data and not just a producer, I prefer not to mix the concepts. Any of the following examples can be run in 3.4 however by replacing the "async def" syntax with the @asyncio.coroutine decorator, and the "await <X>" syntax with "yield from <X>".

Trivial async example
import asyncio
import logging

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')

async def print_after(msg, wait_secs):
    await asyncio.sleep(wait_secs)

async def test_async():
    await asyncio.gather(
        print_after("One second", 1),
        print_after("Two seconds", 2),
        print_after("Three seconds", 3))"finished")

def main():
    loop = asyncio.get_event_loop()

if __name__ == '__main__':


[10388]2015-11-22 18:38:50,129: One second
[10388]2015-11-22 18:38:51,130: Two seconds
[10388]2015-11-22 18:38:52,130: Three seconds
[10388]2015-11-22 18:38:52,131: finished

What we've done here is created three asynchronous jobs that print a message after 1, 2, and 3 seconds respectively. The most important thing to note here is that there is no threading involved  the asynchronous behavior is mediated by an event loop which is triggered by an "I/O" event  in this case, just a timed event.

The "gather" statement simply groups together a group of coroutines  "awaiting" on the gather call returns execution (and, if relevant, results) to that asynchronous method once all of those coroutines have finished.

Where does the I/O come in?

I'm not a fan of the name "asyncio" simply because I/O, to most people, implies a very specific set of low-level operations (specifically network I/O and disk I/O) and an asynchronous paradigm, while certainly useful in those cases, is much more versatile. Let's jump straight to a mocked up computational pipeline example.

Demonstration of the asyncio module in Python 3.5

This simulation is composed of three layers, each of which will split up the
data into some different subsets, pass the subsets off to the next layer, wait
for results, and then do some non-trivial processing to return to the previous
layer (in this case, sleeping for a few seconds). The expensive operations are
offloaded to a ThreadPoolExecutor, which maintains a pool of processing
threads, allowing for the utilization of multiple cores (hypothetically).
import asyncio
import logging
import time
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(format="[%(thread)-5d]%(asctime)s: %(message)s")
logger = logging.getLogger('async')

executor = ThreadPoolExecutor(max_workers=10)
loop = asyncio.get_event_loop()

def cpu_bound_op(exec_time, *data):
    Simulation of a long-running CPU-bound operation
    :param exec_time: how long this operation will take
    :param data: data to "process" (sum it up)
    :return: the processed result
    """"Running cpu-bound op on {} for {} seconds".format(data, exec_time))
    return sum(data)

async def process_pipeline(data):
    # just pass the data along to level_a and return the results
    results = await level_a(data)
    return results

async def level_a(data):
    # tweak the data a few different ways and pass them each to level b.
    level_b_inputs = data, 2*data, 3*data
    results = await asyncio.gather(*[level_b(val) for val in level_b_inputs])
    # we've now effectively called level_b(...) three times with three inputs,
    # and (once the await returns) they've all finished, so now we'll take
    # the results and pass them along to our own long-running CPU-bound
    # process via the thread pool.
    # Note the signature of run_in_executor: (executor, func, *args)
    # The third argument and beyond will be passed to cpu_bound_op when it is called.
    result = await loop.run_in_executor(executor, cpu_bound_op, 3, *results)
    # level_a processing is now done, pass back the results
    return result

async def level_b(data):
    # similar to level a
    level_c_inputs = data/2, data/4, data/7
    results = await asyncio.gather(*[level_c(val) for val in level_c_inputs])
    result = await loop.run_in_executor(executor, cpu_bound_op, 2, *results)
    return result

async def level_c(data):
    # final level - queue up the long-running CPU-bound process in the
    # thread pool immediately
    result = await loop.run_in_executor(executor, cpu_bound_op, 1, data)
    return result

def main():
    start_time = time.time()
    result = loop.run_until_complete(process_pipeline(2.5))"Completed ({}) in {} seconds".format(result, time.time() - start_time))

if __name__ == '__main__':

For this example we've mocked up a three-level pipeline, where each layer is modifying the input data and passing it along in multiple forms to a lower layer. Once the lower layer has finished processing the data, it will then pass off the CPU-heavy computation to a separate (synchronous) function, which might be a numpy/scipy method or a C algorithm (presumably something that releases the GIL so we can take advantage of paraellism). Note that we use time.sleep here intentionally in this mock-up rather than asyncio.sleep because this is meant to simulate an expensive synchronous operation.

In this example, data will flow down quickly to the lowest layer  level_c  where processing will begin.

Once we hit level C, CPU-heavy computation (defined to take 1 second for this layer) will be offloaded to the thread pool, which will return execution to the asynchronous level_c method once completed. Without the thread pool, each invocation of level C would be run sequentially; since our computation is an actual CPU-bound computation and not an asynchronous wait event like an I/O read, we need the thread pool in order to achieve parallelism. As these computations complete, results will be bubbled back up to level B (defined to take 2 seconds), which will again offload work and defer execution until those computations complete, and likewise bubble back up to level A.

Since we have 10 workers available in the thread pool, all 9 of the level C invocations will begin "computing" (i.e., sleeping) immediately, while the 10th slot remains unused since there's nothing for it to do. These will all complete after 1 second, at which point all of the B computations will initiate. Likewise, after 2 seconds, they will all complete and be passed along back up to the A layer (which is defined to take up 3 seconds of CPU time). Output (note the timestamps):

[11416]2015-11-23 00:47:14,407: Running cpu-bound op on (0.7142857142857143,) for 1 seconds
[5676 ]2015-11-23 00:47:14,408: Running cpu-bound op on (1.25,) for 1 seconds
[13120]2015-11-23 00:47:14,408: Running cpu-bound op on (2.5,) for 1 seconds
[13624]2015-11-23 00:47:14,408: Running cpu-bound op on (1.0714285714285714,) for 1 seconds
[8740 ]2015-11-23 00:47:14,408: Running cpu-bound op on (1.875,) for 1 seconds
[14820]2015-11-23 00:47:14,408: Running cpu-bound op on (3.75,) for 1 seconds
[1820 ]2015-11-23 00:47:14,409: Running cpu-bound op on (0.625,) for 1 seconds
[11860]2015-11-23 00:47:14,409: Running cpu-bound op on (1.25,) for 1 seconds
[13124]2015-11-23 00:47:14,409: Running cpu-bound op on (0.35714285714285715,) for 1 seconds
[11416]2015-11-23 00:47:15,409: Running cpu-bound op on (2.5, 1.25, 0.7142857142857143) for 2 seconds
[13120]2015-11-23 00:47:15,409: Running cpu-bound op on (3.75, 1.875, 1.0714285714285714) for 2 seconds
[5676 ]2015-11-23 00:47:15,410: Running cpu-bound op on (1.25, 0.625, 0.35714285714285715) for 2 seconds
[5676 ]2015-11-23 00:47:17,411: Running cpu-bound op on (2.232142857142857, 4.464285714285714, 6.696428571428571) for 3 seconds
[13828]2015-11-23 00:47:20,412: Completed (13.392857142857142) in 6.005390644073486 seconds

As expected, total execution time is 6 seconds (1 second for all of level C, 2 seconds for level B, and 3 seconds for level A).

Well that's neat

The elegant thing about this solution is that all we really needed to know about the problem we want to solve is which parts of the pipeline are the computationally expensive bits we want to offload to a thread pool. Otherwise we can handle just about any computational pipeline with the same pattern  there might be 10 or 20 or 100 layers, or layer A might invoke layer B a hundred times per call, or we might have a case where A only calls B once and B only calls C once. In any case, all of our worker threads are always only being used for actual computations while active (they're never blocking on another thread), we're always using them as much as possible as defined by the pipeline, we're never instantiating more threads than needed, and the overall flow of the pipeline is always being handled by the main (event loop) thread.

Furthermore, we didn't really have to worry about how the main loop figures out which part of the pipeline is ready to go based off of which tasks have completed. For simplicity in this example we made each layer take up a pre-defined amount of time, which leads to a pretty predictable ordering of events, but in reality there might be much greater variation in execution time for each processing event  maybe C6, C1, and C7 finish first, in which case we do not yet have enough data to begin processing anything at layer B. But that's fine  the results from those computations will wait while the other threads continue to chug away, and once enough data has been computed to begin a layer B computation, it will get kicked off immediately (assuming the main event loop isn't busy doing anything else).

In fact perhaps the biggest downside to this architecture is that it allows you to shoot yourself in the foot by trying to maintain too many active portions of the pipeline. Say instead of passing single floats around, we're processing a large N dimensional numpy dataset. We could exhaust our memory resources by jumping around too much in the pipeline while maintaining state for all the different bits we've started processing on, and might have to instead restrict execution at some level. More on that later...

Supplemental Material

David Beazley's slides on coroutine
And his examples
PEP-3156  Asynchronous IO Support Rebooted: the "asyncio" Module
PEP-0492  Coroutines with async and await syntax
Guido on Tulip/asyncio