Aiomultiprocess: Super Easy Integrate Multiprocessing & Asyncio in Python

Even no need to know much about asyncio and multiprocessing

Aiomultiprocess: Super Easy Integrate Multiprocessing & Asyncio in Python
Aiomultiprocess makes your code fast and easy. Photo Credit: Created by Author, Canva

In this article, I will introduce how to integrate multiprocessing and asyncio using the aiomultiprocess library easily. The article includes a web scraping project example and the best practices for using this library.


Introduction

My colleague Wang came to me today and said that his boss assigned him a new task: to write web scraping code to fetch information from 1,000 books on books.toscrape.com as quickly as possible.

Wang told me: “I’ve read your related articles, and since the boss has performance requirements, why don’t I write one using asyncio? It doesn’t seem too difficult.”

“30.09 seconds,” I told him a number.

“What’s that?” Wang asked.

I said I had just tried it and that only using concurrent tasks with asyncio for web scraping would take that long on my computer. This speed is already relatively fast.

“12.64 seconds,” I told him another number.

The speed doubled! Wang was stunned.

Because I used a mighty library called aiomultiprocess, which can easily integrate multiprocessing and asyncio. And the performance can be improved by modifying the web scraping code with aiomultiprocess on the same network and computer.


Multiprocessing and Asyncio: A Quick Recap

Wang said: “That’s amazing! Teach me how to use aiomultiprocess quickly!”

I told him not to hurry. Although the library is simple enough that he doesn’t need to understand what asyncio and multiprocessing are, I still need to give some theoretical introductions to help him truly master the implementation principles of this library.

Key concepts of asyncio

Asyncio is a new feature introduced in Python 3.4. Its main part is to execute code snippets in a loop through an event loop in the main thread.

Users can switch to another task while waiting for a network call (or disk read/write) to return.

Since it is single-threaded and not constrained by the GIL, asyncio is very suitable for executing IO-bound code.

Key concepts of multiprocessing

Multiprocessing is a feature introduced in Python for compute-intensive tasks. Its principle is to use multiple processes to execute different Python codes in parallel.

It makes full use of the performance of multi-core CPUs, so it is very suitable for running CPU-bound code.

Benefits of combining both approaches

However, using asyncio or multiprocessing alone is only ideal in specific situations. In reality, the boundaries between IO-bound and CPU-bound tasks are not so clear.

Take the web scraping scenario as an example:

Web scraping is divided into two parts: fetching the HTML of the page from the network and parsing the required content from the HTML. The former is an IO-bound task, and the latter is a CPU-bound task.

Web scraping contains both IO-bound and CPU-bound tasks.
Web scraping contains both IO-bound and CPU-bound tasks. Image by Author

This brings us a problem: if we only use asyncio, and the CPU time slice occupied by the CPU-bound task is too long, the single-threaded calculation performance will not be ideal.

If we only use multiprocessing, the number of CPU cores will limit the concurrency.

In a previous article, I introduced a way to integrate asyncio and multiprocessing. Specifically:

In the main process’s event loop, use loop.run_in_executor to start multiple subprocesses.

Then, use asyncio.run in each subprocess to create an event loop individually.

The diagram is as follows:

This diagram shows the way to integrate asyncio and multiprocessing.
This diagram shows the way to integrate asyncio and multiprocessing. Image by Author

However, this method has several problems:

  1. It requires understanding more low-level asyncio and multiprocessing APIs, which is impractical.
  2. It requires calculating which task to execute in which process initially without a flexible task allocation mechanism.
  3. Due to mixing concurrent and parallel mechanisms, using queues or other methods to communicate between multiple tasks is challenging.
  4. It is difficult to add new tasks during code execution.

In summary, this method is too low-level and thus difficult to use.

The aiomultiprocess library, which perfectly encapsulates the underlying code and exposes only a few upper-layer interfaces, can help us solve these problems well.


Getting Started with Aiomultiprocess

Installation and setup

If you’re using pip:

Python -m pip install aiomultiprocess

If you’re using Anaconda since the default channel does not contain this package, use:

conda install -c conda-forge aiomultiprocess

Basic syntax and setup

By examining the source code or referring to the official documentation, we can find that aiomultiprocess only requires three classes:

  1. Process: Executes a coroutine task within a subprocess. It’s not commonly used, but the Worker and Pool classes inherit it.
  2. Worker: Executes a coroutine task within a subprocess and returns the result. You can use this class to modify an existing coroutine function to run in a subprocess.
  3. Pool: The core class we’ll be using. It aims to launch a process pool and allocate each coroutine task to a subprocess for execution. This class has two methods to master:
  • map: Takes a task function (coroutine function) and an iterable object as arguments. It applies each item in the iterable as an argument to the task function, running it in a subprocess. The method returns a generator object, and you can use async for to retrieve each value in the result.
import asyncio
import random

import aiomultiprocess


async def coro_func(value: int) -> int:
    await asyncio.sleep(random.randint(1, 3))
    return value * 2


async def main():
    results = []
    async with aiomultiprocess.Pool() as pool:
        async for result in pool.map(coro_func, [1, 2, 3]):
            results.append(result)

        # The result depends on the order in which the parameters are passed in,
        # not on which task end first
        # Output: [2, 4, 6]
        print(results)


if __name__ == "__main__":
    asyncio.run(main())
  • apply: Takes a task function, as well as args and kwargs. It combines the task function with args and kwargs, runs them in a subprocess, and returns an asyncio task. You can obtain all task results using asyncio.gather.
import asyncio
import random

import aiomultiprocess


async def coro_func(value: int) -> int:
    await asyncio.sleep(random.randint(1, 3))
    return value * 2


async def main():
    tasks = []
    async with aiomultiprocess.Pool() as pool:
        tasks.append(pool.apply(coro_func, (1,)))
        tasks.append(pool.apply(coro_func, (2,)))
        tasks.append(pool.apply(coro_func, (3,)))

        results = await asyncio.gather(*tasks)
        print(results)  # Output: [2, 4, 6]


if __name__ == "__main__":
    asyncio.run(main())

Understanding the key components

Before diving into aiomultiprocess examples, we need to understand the implementation principles of the Pool class.

Pool mainly consists of three modules: scheduler, queue, and process. Among them:

  1. The scheduler module is responsible for task allocation. The default scheduler evenly distributes tasks across subprocesses in the order they are received. You can also implement a priority-based scheduler using PriorityQueue.
  2. The queue module contains task and result queues, connecting the scheduler and subprocesses. Both queues are implemented using multiprocessing.Manager().Queue(). It is responsible for passing tasks to the subprocesses and returning results from the subprocesses.
  3. The process module is implemented with the Process class, which acquires subprocesses through the spawn method by default. An event loop is created in each subprocess. The results of the loop execution are passed back to the main process through the result queue.

The entire schematic diagram is as follows:

Aiomultiprocess consists of three components: scheduler, queue, and process.
Aiomultiprocess consists of three components: scheduler, queue, and process. Image by Author

Real-world Example: Web Scraping with Aiomultiprocess

After introducing the basic usage and implementation principles of aiomultiprocess, I’ll now fulfill my promise to demonstrate how aiomultiprocess can easily improve existing code and achieve significant performance improvements.

First, let me show you the asyncio version of the web scraping code. The goal of this code is simple: to fetch links to detail pages from 50 list pages on books.toscrape.com, extract the desired book data from the detail pages, and finally write it to a CSV file:

import asyncio
import csv
import time
from string import Template
from urllib.parse import urljoin

from aiohttp import request
from aiomultiprocess import Pool
from bs4 import BeautifulSoup

list_url_t = Template("https://books.toscrape.com/catalogue/category/books_1/page-$page.html")


def get_detail_url(base_url: str, html: str) -> list[str]:
    """ Grab the link to the detail page of each book from the HTML code of the list page """
    result = []
    soup = BeautifulSoup(html, "html.parser")
    a_tags = soup.select("article.product_pod div.image_container a")
    for a_tag in a_tags:
        result.append(urljoin(base_url, a_tag.get("href")))
    return result


def parse_detail_page(html):
    """ Parse the HTML of the detail page to get the desired book data """
    soup = BeautifulSoup(html, "lxml")
    title = soup.select_one("div.product_main h1").text
    price = soup.select_one("div.product_main p.price_color").text
    description_tag = soup.select_one("div#product_description + p")
    description = description_tag.text if description_tag else ""

    return {"title": title, "price": price, "description": description}


async def fetch_list(url: str) -> list[str]:
    """ Get the URL of each detail page from the list page URL """
    print(f"fetch_list: begin to process url: {url}")
    async with request("GET", url) as response:
        html = await response.text()
        urls = get_detail_url(url, html)
    return urls


async def fetch_detail(url: str) -> dict:
    """ Get the book data on the detail page from the detail page URL """
    async with request("GET", url) as response:
        html = await response.text()
        detail = parse_detail_page(html)
        return detail


def write_to_csv(all_books: list):
    """ Writing data to CSV files """
    print(f"write_to_csv: begin to write books detail to csv.")
    with open("../../raw_data/scraping_result.csv", "w", newline="", encoding="utf-8") as csv_file:
        fieldnames = all_books[0].keys()
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writerows(all_books)


async def asyncio_main():
    """ Implementing web scraping by using asyncio alone """
    start = time.monotonic()
    all_books, detail_urls = [], []
    fetch_list_tasks = [asyncio.create_task(fetch_list(list_url_t.substitute(page=i + 1))) for i in range(50)]
    for urls in asyncio.as_completed(fetch_list_tasks):
        detail_urls.extend(await urls)

    fetch_detail_tasks = [asyncio.create_task(fetch_detail(detail_url)) for detail_url in detail_urls]
    for detail in asyncio.as_completed(fetch_detail_tasks):
        all_books.append(await detail)
    write_to_csv(all_books)
    print(f"All done in {time.monotonic() - start} seconds")


if __name__ == "__main__":
    asyncio.run(asyncio_main())
The result of executing the asyncio version of the code.
The result of executing the asyncio version of the code. Screenshot by Author

Next, we only need to modify the main function, using aiomultiprocess’s Pool and corresponding APIs to start the web scraping code. The original logic code does not need to be changed:

async def aiomultiprocess_main():
    """
    Integrating multiprocessing and asyncio with the help of aiomultiprocess,
    requires only a simple rewriting of the main function
    """
    start = time.monotonic()
    all_books = []
    async with Pool() as pool:
        detail_urls = []
        async for urls in pool.map(fetch_list,
                                   [list_url_t.substitute(page=i + 1) for i in range(50)]):
            detail_urls.extend(urls)

        async for detail in pool.map(fetch_detail, detail_urls):
            all_books.append(detail)

    write_to_csv(all_books)
    print(f"All done in {time.monotonic() - start} seconds")


if __name__ == "__main__":
    asyncio.run(aiomultiprocess_main())
The result of the code execution of the aiomultiprocess version.
The result of the code execution of the aiomultiprocess version. Screenshot by Author

This way, a purely asyncio-based code is transformed into a version that integrates multiprocessing and asyncio. Isn’t it super simple?


Best Practices for Using Aiomultiprocess

aiomultiprocess.Pool provides parameters like process, queuecount, and childconcurrency for performance tuning. In the following sections, I’ll explain various methods of optimization based on different scenarios.

Computation-intensive scenarios

In this case, a task consumes significant processing time, leading to delayed IO responses when running high concurrency on a single process.

To optimize, we can reduce the values of childconcurrency and queuecount while moderately increasing the process.

High IO, high latency scenarios

In this scenario, the IO pressure is relatively high, and the process needs to frequently switch between multiple IOs to promptly respond to IO returns. Here, we can increase childconcurrency to enhance each process’s concurrency.

High concurrency, high throughput scenarios

The queue is filled with numerous tasks in this situation, and multiple processes compete for queue resources.

We can increase queuecount (but stay within the number of processes) to ensure that each process has sufficient queues for task allocation.

Uneven task execution time scenarios

Sometimes, some tasks require a lot of time for computation, such as parsing complex web pages. Others need substantial time to wait for IO, like writing massive amounts of data to a database.

This results in task skew, with uneven pressure on processes executing tasks. Some processes quickly complete tasks and enter the waiting state, but others still need considerable time to finish the remaining tasks.

In this case, we can execute different types of tasks separately, completing simple tasks first and tackling the more complex ones.

Alternatively, we can lower the queuecount value to ensure fewer tasks enter the waiting queue, distributing tasks evenly among processes.

We can also implement a custom scheduler to prioritize specific tasks, reducing task skew.


Conclusion

It’s best to combine concurrent and parallel code to maximize code performance. However, integrating asyncio and multiprocessing code requires interacting with many low-level APIs and writing a lot of foundational code, which can be daunting for many readers.

aiomultiprocess solves this issue by enabling existing asyncio code to run on multiprocessing without any modifications. It simply utilizes its API calls, resulting in significant performance improvements.

I hope you enjoy coding! If you’re interested in any of the points discussed in this article, please feel free to leave a comment and engage in discussion.


🎉
Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome—let's discuss in the comments below!