Combining Multiprocessing and Asyncio in Python for Performance Boosts

Using a real-world example to demonstrate a map-reduce program

Combining Multiprocessing and Asyncio in Python for Performance Boosts
Photo by Mitchell Luo on Unsplash

Introduction

Thanks to GIL, using multiple threads to perform CPU-bound tasks has never been an option. With the popularity of multicore CPUs, Python offers a multiprocessing solution to perform CPU-bound tasks. But until now, there were still some problems with using multiprocess-related APIs directly.

Before we start, we still have a small piece of code to aid in the demonstration:

import time
from multiprocessing import Process


def sum_to_num(final_num: int) -> int:
    start = time.monotonic()

    result = 0
    for i in range(0, final_num+1, 1):
        result += i

    print(f"The method with {final_num} completed in {time.monotonic() - start:.2f} second(s).")
    return result

The method takes one argument and starts accumulating from 0 to this argument. Print the method execution time and return the result.

Problems with multiprocessing

def main():
    # We initialize the two processes with two parameters, from largest to smallest
    process_a = Process(target=sum_to_num, args=(200_000_000,))
    process_b = Process(target=sum_to_num, args=(50_000_000,))

    # And then let them start executing
    process_a.start()
    process_b.start()

    # Note that the join method is blocking and gets results sequentially
    start_a = time.monotonic()
    process_a.join()
    print(f"Process_a completed in {time.monotonic() - start_a:.2f} seconds")

    # Because when we wait process_a for join. The process_b has joined already.
    # so the time counter is 0 seconds.
    start_b = time.monotonic()
    process_b.join()
    print(f"Process_b completed in {time.monotonic() - start_b:.2f} seconds")

As the code shows, we directly create and start multiple processes, and call the start and join methods of each process. However, there are some problems here:

  1. The join method cannot return the result of task execution.
  2. the join method blocks the main process and executes it sequentially.

Even if the later tasks are executed faster than the earlier ones, as shown in the following figure:

The screenshot shows the execution sequence of join.
The screenshot shows the execution sequence of join. Image by Author
Although process_b finishes executing first, it still has to wait for process_a.
Although process_b finishes executing first, it still has to wait for process_a. Image by Author

Problems of using Pool

If we use multiprocessing.Pool, there are also some problems:

def main():
    with Pool() as pool:
        result_a = pool.apply(sum_to_num, args=(200_000_000,))
        result_b = pool.apply(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b}.")

As the code shows, Pool’s apply method is synchronous, which means you have to wait for the previously apply task to finish before the next apply task can start executing.

multiprocessing.Pool.apply method is synchronous.
multiprocessing.Pool.apply method is synchronous. Image by Author

Of course, we can use the apply_async method to create the task asynchronously. But again, you need to use the get method to get the result blockingly. It brings us back to the problem with the join method:

def main():
    with Pool() as pool:
        result_a = pool.apply_async(sum_to_num, args=(200_000_000,))
        result_b = pool.apply_async(sum_to_num, args=(50_000_000,))

        print(f"sum_to_num with 200_000_000 got a result of {result_a.get()}.")
        print(f"sum_to_num with 50_000_000 got a result of {result_b.get()}.")
Although apply_async is asynchronous, get will still block and execute sequentially.
Although apply_async is asynchronous, get will still block and execute sequentially. Image by Author

The problem with using ProcessPoolExecutor directly

So, what if we use concurrent.futures.ProcesssPoolExecutor to execute our CPU-bound tasks?

def main():
    with ProcessPoolExecutor() as executor:
        numbers = [200_000_000, 50_000_000]
        for result in executor.map(sum_to_num, numbers):
            print(f"sum_to_num got a result which is {result}.")

As the code shows, everything looks great and is called just like asyncio.as_completed. But look at the results; they are still fetched in startup order. This is not at all the same as asyncio.as_completed, which gets the results in the order in which they were executed:

Results are fetched in startup order.
Results are fetched in startup order. Image by Author
The result of the iteration still maintains the call order and blocks.
The result of the iteration still maintains the call order and blocks. Image by Author

Use asyncio’s run_in_executor to fix it

Fortunately, we can use asyncio to handle IO-bound tasks, and its run_in_executor method to invoke multi-process tasks in the same way as asyncio. Not only unifying concurrent and parallel APIs but also solving the various problems we encountered above:

async def main():
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for number in [200_000_000, 50_000_000]:
            tasks.append(loop.run_in_executor(executor, sum_to_num, number))
        
        # Or we can just use the method asyncio.gather(*tasks)
        for done in asyncio.as_completed(tasks):
            result = await done
            print(f"sum_to_num got a result which is {result}")
Combining asyncio and ProcessPoolExecutor.
Combining asyncio and ProcessPoolExecutor. Image by Author

Since the sample code in the previous article was all about simulating what we should call the methods of the concurrent process, many readers still need help understanding how to use it in the actual coding after learning it. So after understanding why we need to perform CPU-bound parallel tasks in asyncio, today we will use a real-world example to explain how to use asyncio to handle IO-bound and CPU-bound tasks simultaneously and appreciate the efficiency of asyncio for our code.

Note: Before continuing, if you are interested in the practice of using asyncio.gather and asyncio.as_completed, you can read this article of mine:

Use These Methods to Make Your Python Concurrent Tasks Perform Better
Best practices for asyncio.gather, asyncio.as_completed, and asyncio.wait

Real-world Case: Concurrent File Reading and Map-reduce Data Processing

In this case today, we will deal with two problems:

  1. How to read multiple datasets concurrently. Especially if the datasets are large or many. How to use asyncio to improve efficiency.
  2. How to use asyncio’s run_in_executor method to implement a MapReduce program and process datasets efficiently.

Before we start, I will explain to you how our code is going to be executed using a diagram:

The diagram shows how the entire code works.
The diagram shows how the entire code works. Image by Author

The yellow part represents our concurrent tasks. Since the CPU can process data from memory faster than IO can read data from disk, we first read all datasets into memory concurrently.

After the initial data merging and slicing, we come to the green part that represents the CPU parallel task. In this part, we will start several processes to map the data.

Finally, we get the intermediate results of all the processes in the main process and then use a reduce program to get the final results.

Of course, if you're looking for an out-of-the-box solution, Aiomultiprocess is what you need. Here's a detailed article about it:

Supercharge Your Python Asyncio With Aiomultiprocess: A Comprehensive Guide
Harness the power of asyncio and multiprocessing to turbocharge your applications

Data Preparation and Installation of Dependencies

Data preparation

In this case, we will use the Google Books Ngram Dataset, which counts the frequency of each string combination in various books by year from 1500 to 2012.

The Google Books Ngram dataset is free for any purpose, and today we will use these datasets below:

We aim to count the cumulative number of times each word is counted by the result set.

Dependency installation

To read the files concurrently, we will use the aiofiles library, which can support asyncio’s concurrent implementation.

If you are using pip, you can install it as follows:

$ pip install aiofiles

If you are using Anaconda, you can install it as follows:

$ conda install -c anaconda aiofiles

Code Structure Design

Since this case is still relatively simple, for the sake of demonstration, we will use a .py script to do the whole thing here.

As an architect, before you start, you should plan your methods according to the flowchart design and try to follow the “single responsibility principle” for each method. Thus, do only one thing once upon each method:

import asyncio
from concurrent.futures import ProcessPoolExecutor
import functools
import time

from tqdm import tqdm
from tqdm.asyncio import tqdm_asyncio

from aiofiles import open


async def read_file(filename: str):
    """ We will use the aiofiles API to read the files concurrently """
    pass


async def get_all_file_content(file_names: list[str]):
    """ Start concurrent tasks and join the file contents together """
    pass


def partition(contents: list[str], partition_size: int):
    """ Split the contents into multiple lists of partition_size length and return them as generator """
    pass


def map_resource(chunk: list[str]) -> dict[str, int]:
    """ The method that actually performs the map task
    returns the sum of the counts corresponding to the keywords in the current partition.
    """
    pass


def map_with_process(chunks: list[list[str]]):
    """ Execute map tasks in parallel and join the results of multiple processes into lists """
    pass


async def merge_resource(first: dict[str, int], second: dict[str, int]) -> dict[str, int]:
    """ The actually reduce method sums the counts of two dicts with the same key """
    pass


def reduce(intermediate_results: list[dict[str, int]]) -> dict[str, int]:
    """ Use the functools.reduce method to combine all the items in the list """
    pass


async def main(partition_size: int):
    """ Entrance to all methods """
    pass


if __name__ == "__main__":
    asyncio.run(main(partition_size=60_000))

Code Implementation

Next, we will implement each method step by step and finally integrate them to run together in the main method.

File reading

Method read_file will implement reading a single file with aiofiles:

async def read_file(filename: str):
    """ We will use the aiofiles API to read the files concurrently """
    async with open(filename, "r", encoding="utf-8") as f:
        return await f.readlines()

Method get_all_file_content will start the file reading task and, after all the files have been read, will merge each line of text into a list and return it.

async def get_all_file_content(file_names: list[str]):
    """ Start concurrent tasks and join the file contents together """
    print("Begin to read files...")
    start = time.monotonic()
    tasks, results = [], []

    for filename in file_names:
        tasks.append(asyncio.create_task(read_file(filename)))
    temp_results = await asyncio.gather(*tasks)

    results = [item for sublist in temp_results for item in sublist]
    print(f"All files are read in {time.monotonic() - start:.2f} second(s)")
    return results

Data grouping

Method partition will decompose the list into multiple smaller lists of partition_size length according to the passed partition_size and facilitate subsequent iterations using the generator:

def partition(contents: list[str], partition_size: int):
    """ Split the contents into multiple lists of partition_size length and return them as generator """
    for i in range(0, len(contents), partition_size):
        yield contents[i: i+partition_size]

Map processing data

Method map_resource is the actual map method. Use it to read each line of data from the list, use the word as the key and the sum of the frequencies as the value, and finally return a dict result.

def map_resource(chunk: list[str]) -> dict[str, int]:
    """ The method that actually performs the map task
    returns the sum of the counts corresponding to the keywords in the current partition.
    """
    result = {}
    for line in chunk:
        word, _, count, _ = line.split('\t')
        if word in result:
            result[word] = result[word] + int(count)
        else:
            result[word] = int(count)

    return result

Integrating asyncio with multiprocessing

Method map_with_process calls asyncio’s run_in_executor method, which starts a pool of processes according to the number of CPU cores and executes the map method in parallel. And the final result is merged into a list by asyncio.gather method.

async def map_with_process(chunks: list[list[str]]):
    """ Execute map tasks in parallel and join the results of multiple processes into lists """
    print("Start parallel execution of map tasks...")
    start = time.monotonic()
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for chunk in chunks:
            tasks.append(loop.run_in_executor(executor, map_resource, chunk))

    print(f"All map tasks are executed in {time.monotonic()-start:.2f} second(s)")
    return await asyncio.gather(*tasks)

Reducing the merged data

Since the previous map process ends up with a list of word frequencies processed by multiple processes, we also need to use a reduce method to merge numerous dicts into a single final result, recording the final frequency of each word. Here we first write the method implementation of the reduce process.

def merge_resource(first: dict[str, int], second: dict[str, int]) -> dict[str, int]:
    """ The actually reduce method sums the counts of two dicts with the same key """
    merged = first
    for key in second:
        if key in merged:
            merged[key] = merged[key] + second[key]
        else:
            merged[key] = second[key]
    return merged

Then we call the functools.reduce method directly to merge the data.

def reduce(intermediate_results: list[dict[str, int]]) -> dict[str, int]:
    """ Use the functools.reduce method to combine all the items in the list """
    return functools.reduce(merge_resource, intermediate_results)

Finally, implement the main method

Eventually, we integrate all the methods into the main method and call.

async def main(partition_size: int):
    """ Entrance to all methods """
    file_names = ["../data/googlebooks-eng-all-1gram-20120701-a",
                  "../data/googlebooks-eng-all-1gram-20120701-b",
                  "../data/googlebooks-eng-all-1gram-20120701-c"]
    contents = await get_all_file_content(file_names)
    chunks = partition(contents, partition_size)
    intermediate_results = await map_with_process(chunks)
    final_results = reduce(intermediate_results)

    print(f'Aardvark has appeared {final_results["Aardvark"]} times.')

Great! We get the sum of the frequencies of the word Aardvark in all the datasets. Task complete.

Using tqdm to indicate progress

In the previous article, we explained how to use tqdm to indicate the progress of asyncio tasks.

Using Tqdm with Asyncio in Python
An efficient way to monitor concurrent tasks’ progress

Since in the real world, data processing of large datasets often takes a long time, during which we need to track the progress of code execution, we also need to add tqdm progress bars in the right places.

async def get_all_file_content(file_names: list[str]):
    """ Start concurrent tasks and join the file contents together """
    print("Begin to read files...")
    start = time.monotonic()
    tasks, results = [], []

    for filename in file_names:
        tasks.append(asyncio.create_task(read_file(filename)))
    temp_results = await tqdm_asyncio.gather(*tasks)  # add tqdm asyncio API

    results = [item for sublist in temp_results for item in sublist]
    print(f"All files are read in {time.monotonic() - start:.2f} second(s)")
    return results
    
  
#########################################
async def map_with_process(chunks: list[list[str]]):
    """ Execute map tasks in parallel and join the results of multiple processes into lists """
    print("Start parallel execution of map tasks...")
    start = time.monotonic()
    loop = asyncio.get_running_loop()
    tasks = []

    with ProcessPoolExecutor() as executor:
        for chunk in chunks:
            tasks.append(loop.run_in_executor(executor, map_resource, chunk))

        print(f"All map tasks are executed in {time.monotonic()-start:.2f} second(s)")
        return await tqdm_asyncio.gather(*tasks)  # add tqdm asyncio API
    
    
###########################################
def reduce(intermediate_results: list[dict[str, int]]) -> dict[str, int]:
    """ Use the functools.reduce method to combine all the items in the list """
    return functools.reduce(merge_resource, tqdm(intermediate_results))

It looks much more professional now.

The resulting screenshot after adding the tqdm APIs.
The resulting screenshot after adding the tqdm APIs. Image by Author

Conclusion

In today’s article, we explored some of the problems with multi-process code, such as the hassle of getting the results of each process and the inability to get the results in the order in which we execute the tasks.

We also explored the feasibility of integrating asyncio with ProcessPoolExecutor and the advantages that such integration brings to us. For example, it unifies the API for concurrent and parallel programming, simplifies our programming process, and allows us to obtain execution results in order of completion.

Finally, we explain how we can alternate between concurrent and parallel programming techniques to help us execute our code efficiently in data science tasks through a real-world case study that exists.

Due to the limited ability of individuals, there are inevitably few places in the case, so I welcome your comments and corrections so that we can learn and progress together.


Want to know more about executing asyncio in multiprocessing? Ask my assistant now!

🎉
Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome—let's discuss in the comments below!