Unleashing the Power of Python Asyncio’s Queue

Mastering the producer-consumer pattern with asyncio through real-life examples

Unleashing the Power of Python Asyncio’s Queue
Photo Credit: Created by Author, Canva

In this article, I will explain the API usage and application scenarios of various queues in Python asyncio relaxedly.

At the end of the article, I will demonstrate the practical usage of asyncio.Queue in a classic shopping scenario.


Introduction

Why do we need asyncio.Queue

As readers who have read my previous articles know, I love asyncio because it is an almost perfect solution for concurrent programming.

However, in a large-scale, highly concurrent project, a large number of uncontrollable concurrent tasks waiting will occupy system resources, leading to poor performance.

Therefore, it is necessary to control the number of concurrent tasks.

Why can’t we use asyncio.Semaphore

In my previous article on synchronization primitives, I introduced using Semaphore locks to control the number of concurrent tasks running simultaneously.

Mastering Synchronization Primitives in Python Asyncio: A Comprehensive Guide
Best practices for asyncio.Lock, asyncio.Semaphore, asyncio.Event and asyncio.Condition

Set the number of Semaphore locks first, and the tasks that get the lock will be executed, while those who don’t will wait.

However, asyncio.Semaphore can only limit the concurrency of tasks accessing a resource through IO; the number of concurrent tasks cannot be limited.

Therefore, in this scenario, asyncio.Semaphore is not a perfect solution.

asyncio.Queue is the way

Using asyncio.Queue, we can start a fixed number of concurrent tasks when the program starts, and then pass the data to be processed through the queue to these tasks.

This is the well-known producer-consumer pattern. At the same time, like the multiprocessing queue, asyncio.Queue also serves to pass messages between concurrent tasks.


The Magical World of asyncio’s Queue

Why can asyncio.Queue plays such a significant role? In fact, we also encounter similar problems in real life:

Crowds of people in the supermarket.
Crowds of people in the supermarket. Created by Author, Canva

The most typical example is large shopping supermarkets. In such supermarkets, there are always many customers.

After each customer finishesshopping, they need to check out. Checking out takes some time, which can lead to congestion.

A more straightforward solution is to hire more cashiers, preferably one for each customer, for instant checkouts.

However, this is unrealistic because so many cashiers would mean colossal cost pressure (and resource consumption) for the boss.

So, a brilliant person came up with a good solution: have customers line up in a queue, and then have a few cashiers check out customers in turn.

Customers in the supermarket are lining up.
Customers in the supermarket are lining up. Photo by Lisanto 李奕良 on Unsplash

The only cost is that customers need to wait a little longer. At the same time, if the queue is too long, the manager can choose to temporarily add a few more cashiers. In this way, the system can flexibly expand.

By comparing the customer queue to data entering the queue and cashiers to concurrent tasks, we can see the benefits asyncio.Queue brings:

  • It is a good implementation of the producer-consumer pattern.
  • It can control the number of concurrent tasks.
  • Making resource consumption manageable, and the system can also be flexibly expanded.

The Adventures of the Producer-Consumer Pattern in asyncio

What is the producer-consumer pattern

Imagine two types of tasks sharing a queue. Task A produces data and puts it into the queue, while Task B retrieves data from the queue for processing.

This is the producer-consumer pattern, where Task A is the producer, and Task B is the consumer.

In analogy with a supermarket, customers are producers, cashiers are consumers, and the customer queue represents the queue.

Why use the producer-consumer pattern

In high-concurrency programs, producers often generate data quickly, while consumers process data slowly. Thus, producers must wait for consumers to finish processing before continuing to produce data.

Sometimes, consumers process data quickly, while producers generate data slowly. This leads to consumers waiting for producers to generate data before continuing to run.

To balance between producers and consumers, a queue is needed to store the data produced by the producer. The queue acts as a buffer and decouples the producer and consumer.

The Diagram of Producer-Consumer Pattern.
The Diagram of Producer-Consumer Pattern. Image by Author

Implementing the Producer-Consumer Pattern with asyncio’s Queue

Now, let’s implement the supermarket shopping scenario mentioned earlier using asyncio.Queue.

import asyncio
from asyncio import Queue
from random import randrange


class Product:
    def __init__(self, product_name: str, checkout_time: float):
        self.product_name = product_name
        self.checkout_time = checkout_time


class Customer:
    def __init__(self, customer_id: int, products: list[Product]):
        self.customer_id = customer_id
        self.products = products

As shown in the code, we first implement the Customer and Product classes, representing customers and products that need to be checked out. The Product class has a checkout_time attribute, which represents the time required for checking out the product.

After that, we implement a checkout_customer method that acts as a consumer.

async def checkout_customer(queue: Queue, cashier_number: int):
    while not queue.empty():
        customer: Customer = await queue.get()
        print(f"The Cashier_{cashier_number} "
              f"will checkout Customer_{customer.customer_id}")
        for product in customer.products:
            print(f"The Cashier_{cashier_number} "
                  f"will checkout Customer_{customer.customer_id}'s "
                  f"Product_{product.product_name}")
            await asyncio.sleep(product.checkout_time)
        print(f"The Cashier_{cashier_number} "
              f"finished checkout Customer_{customer.customer_id}")
        queue.task_done()

As long as there is data in the queue, this method will continue to loop. During each iteration, it uses a get method to retrieve a Customer instance.

If there is no data in the queue, it will wait.

After retrieving a piece of data (in this case, a Customer instance), it iterates through the products attribute and uses asyncio.sleep to simulate the checkout process.

After finishing processing the data, we use queue.task_done() to tell the queue that the data has been successfully processed.

Next, we implement the generate_customer method as a factory method for producing customers.

We first define a product series and the required checkout time for each product. Then, we place 0 to 10 products in each customer’s shopping cart.

def generate_customer(customer_id: int) -> Customer:
    all_products = [Product('deer', 2),
                    Product('banana', .5),
                    Product('sausage', .2),
                    Product('diapers', .2)]
    products = [all_products[randrange(len(all_products))] for _ in range(randrange(10))]
    return Customer(customer_id, products)

Furthermore, we implement the customer_generation method as a producer. This method generates several customer instances regularly and puts them in the queue. If the queue is full, the put method will wait.

async def customer_generation(queue: Queue):
    customer_count = 0
    while True:
        customers = [generate_customer(the_id)
                     for the_id in range(customer_count, customer_count+randrange(5))]
        for customer in customers:
            print("Waiting to put customer in line....")
            await queue.put(customer)
            print("Customer put in line...")
        customer_count = customer_count + len(customers)
        await asyncio.sleep(.3)

Finally, we use the main method to initialize the queue, producer, and consumer, and start all concurrent tasks.

async def main():
    customer_queue = Queue(2)
    customer_producer = asyncio.create_task(customer_generation(customer_queue))
    cashiers = [checkout_customer(customer_queue, i) for i in range(3)]

    await asyncio.gather(customer_producer, *cashiers)


if __name__ == "__main__":
    asyncio.run(main())
The implementation is successful.
The implementation is successful. Image by Author

As expected, the implementation is successful.


Introducing the PriorityQueue

Why use asyncio.PriorityQueue

The queue mentioned earlier is a First-In-First-Out (FIFO) queue, where the first item to enter the queue is the first to be retrieved. This is suitable when all tasks in the queue have the same priority.

However, consider the following situation:

Suppose there is a queue with tasks waiting in line, each requiring a long processing time.

An error log or VIP user access is a high-priority task that needs immediate attention. What should we do?

An error log or VIP user access is a high-priority task that needs immediate attention.
Photo by Ethan Hu on Unsplash

This is where asyncio.PriorityQueue comes into play.

Briefly describe asyncio.PriorityQueue’s implementation

Unlike FIFO queues based on lists, asyncio.PriorityQueue is based on heaps. It is built using a binary tree structure.

You may be familiar with binary search trees, which ensure that the most minor node is always the leftmost node.

However, the binary tree in asyncio.PriorityQueue ensures that the most minor node is always at the top, so the highest priority node is permanently removed first.

On the left is the binary tree used by PriorityQueue, and on the right is the binary search tree.
On the left is the binary tree used by PriorityQueue, and on the right is the binary search tree. Image by Author

Real-world example with asyncio.PriorityQueue

Let’s illustrate the usage of asyncio.PriorityQueue with a real-world scenario that exists in practice.

Imagine we have an order service API. The API takes time for each order to process, but we can’t keep users waiting too long.

So when a user places an order, the API first puts the order into a queue, allowing a background task to process it asynchronously while immediately returning a message to the user.

This API accepts orders from two types of users: regular users and VIP users. It must ensure that VIP user orders are processed with the highest priority.

VIP orders are processed with the highest priority.
VIP orders are processed with the highest priority. Image by Author

To keep the learning curve low for readers, in this example, we will use aiohttp to implement the server. The specific code is as follows:

import asyncio
from asyncio import PriorityQueue, Task
from dataclasses import dataclass, field
from enum import IntEnum
from random import randrange

from aiohttp import web
from aiohttp.web_app import Application
from aiohttp.web_request import Request
from aiohttp.web_response import Response

app = Application()
routers = web.RouteTableDef()
QUEUE_KEY = "QUEUE_KEY"
TASK_KEY = "TASK_KEY"


class UserType(IntEnum):
    POWER_USER = 1
    NORMAL_USER = 2


@dataclass(order=True)
class WorkItem:
    user_type: UserType
    order_delay: int = field(compare=False)

First, we define an enumeration marking the two categories: regular users and VIP users.

Next, we use dataclass to define a user's order, which contains the user type and order processing duration. The order duration is not considered in priority sorting.

Then we define the consumer method process_order_worker, which retrieves orders from the queue and simulates the order processing.

Don’t forget to use queue.task_done() to tell the queue that we finished processing the order.

async def process_order_worker(worker_id: int, queue: PriorityQueue):
    while True:
        work_item: WorkItem = await queue.get()
        print(f"process_order_worker: Worker_{worker_id} begin to process worker {work_item}")
        await asyncio.sleep(work_item.order_delay)
        print(f"process_order_worker: Worker_{worker_id} finished to process worker {work_item}")
        queue.task_done()

Following that, we implement the order API using aiohttp. This API responds to user requests, generates an order object, and places it in the asyncio.PriorityQueue.

It then immediately returns a response to the user, avoiding user wait time.

@routers.post("/order")
async def order(request: Request) -> Response:
    queue: PriorityQueue = app[QUEUE_KEY]
    body = await request.json()
    user_type = UserType.POWER_USER if body['power_user'] == 'True' else UserType.NORMAL_USER
    work_item = WorkItem(user_type, randrange(5))
    await queue.put(work_item)

    return Response(body="order placed!")

When the program starts, we use create_order_queue to initialize the queue and order consumption tasks.

async def create_order_queue(app: Application):
    print("create_order_queue: Begin to initialize queue and tasks.")
    queue: PriorityQueue = PriorityQueue(10)
    tasks = [asyncio.create_task(process_order_worker(i, queue)) for i in range(3)]
    app[QUEUE_KEY] = queue
    app[TASK_KEY] = tasks
    print("create_order_queue: Initialize queue and tasks success..")

When the program ends, we use destroy_order_queue to ensure that all orders in the queue are processed and the background tasks are closed correctly.

queue.join() will wait for all the data in the queue to be processed. asyncio.wait_for sets a timeout of 20 seconds, after which it will no longer wait queue.join() to complete.

async def destroy_order_queue(app: Application):
    queue: PriorityQueue = app[QUEUE_KEY]
    tasks: list[Task] = app[TASK_KEY]

    try:
        print("destroy_order_queue: Wait for 20 sec to let all work done.")
        await asyncio.wait_for(queue.join(), timeout=20.0)
    except Exception as e:
        print("destroy_order_queue: Cancel all tasks.")
        [task.cancel() for task in tasks]


app.add_routes(routers)
app.on_startup.append(create_order_queue)
app.on_shutdown.append(destroy_order_queue)
web.run_app(app)

We can test this implementation using PyCharm’s HTTP Request:

POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "False"}

###
POST http://localhost:8080/order
Content-Type: application/json

{"power_user": "True"}
API prioritizes orders from VIP users whenever possible.
API prioritizes orders from VIP users whenever possible. Image by Author

As you can see, the two high-priority tasks are processed as expected. Perfect!


Conclusion

In this article, I introduced the usage and best practices of asyncio.Queue.

When you need to control the concurrency of a program, I recommend using asyncio.Queue to effectively manage resource consumption.

I introduced the producer-consumer pattern and its benefits:

  1. Balancing between producers and consumers, maximizing resource utilization.
  2. Decoupling the system, allows producers and consumers to scale independently.

Finally, I showed how to use asyncio.PriorityQueue to handle scenarios where tasks require prioritization through a real-world example.

Due to space constraints, I could not cover all aspects of asyncio.Queue. However, I hope this article provided a solid understanding of the basic concepts and helpful examples.

Asynchronous programming in Python is a powerful tool, and the producer-consumer pattern with asyncio.Queue is a versatile approach to handling concurrency and prioritization in your applications.


🎉
Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome—let's discuss in the comments below!