Mastering Synchronization Primitives in Python Asyncio: A Comprehensive Guide

Best practices for asyncio.Lock, asyncio.Semaphore, asyncio.Event and asyncio.Condition

Mastering Synchronization Primitives in Python Asyncio: A Comprehensive Guide
Photo Credit: Created by Author, Canva

In this article, I will introduce why you need synchronization primitives in Python’s asyncio and best practices for several synchronization primitives.

And in the last part of the article, I will walk you through an example of synchronization primitives in action.


Introduction

Why do you need synchronization primitives in asyncio

Anyone who has used Python multithreading knows that multiple threads share the same memory block.

So when multiple threads perform non-atomic operations on the same area simultaneously, a thread-safe problem occurs.

Since asyncio runs on a single thread, does it not have similar thread-safe issues? The answer is no.

Concurrent tasks in asyncio are executed asynchronously, which means that there may be alternating execution of multiple tasks in time.

A concurrency bug is triggered when one task accesses a particular memory area and waits for an IO operation to return, another task is also accessing this memory simultaneously.

To avoid such bugs, Python asyncio introduces a synchronization primitive feature similar to multithreading.

Also, to avoid too many tasks accessing a resource concurrently, asyncio’s synchronization primitives provide the ability to protect the resource by limiting the number of tasks accessing it simultaneously.

Next, let’s take a look at what synchronization primitives are available in asyncio.


Python Asyncio’s Synchronization Primitives

Lock

Before we introduce this API, let’s look at a situation:

Suppose we have a concurrent task that needs a copy of the website data. It will first check if it’s in the cache; if it is, it will fetch it from the cache, and if not, it will read it from the website.

Since it takes some time to read the website data to return and update the cache, when multiple concurrent tasks are executed at the same time, they all assume that this data does not exist in the cache and launch remote requests at the same time, as shown in the following code:

import asyncio
import aiohttp

cache = dict()


async def request_remote():
    print("Will request the website to get status.")
    async with aiohttp.ClientSession() as session:
        response = await session.get("https://www.example.com")
        return response.status


async def get_value(key: str):
    if key not in cache:
        print(f"The value of key {key} is not in cache.")
        value = await request_remote()
        cache[key] = value
    else:
        print(f"The value of key {key} is already in cache.")
        value = cache[key]
    print(f"The value of {key} is {value}")
    return value


async def main():
    task_one = asyncio.create_task(get_value("status"))
    task_two = asyncio.create_task(get_value("status"))

    await asyncio.gather(task_one, task_two)


if __name__ == "__main__":
    asyncio.run(main())
Both tasks think there is no data in the cache, thus accessing the remote site.
Both tasks think there is no data in the cache, thus accessing the remote site. Image by Author

Which is not in line with our original design intent, so asyncio.Lock comes in handy.

We can check if there is data in the cache when concurrent tasks need to get a lock first, and other tasks that do not get a lock will wait.

Until the task that gets the lock finishes updating the cache and releases the lock, the other tasks can continue to execute.

The entire flowchart is shown below:

The asyncio.Lock's entire flowchart.
The asyncio.Lock's entire flowchart. Image by Author

Let’s see how to write the code:

import asyncio
from asyncio import Lock

import aiohttp

cache = dict()
lock = Lock()


async def request_remote():
    # ...


async def get_value(key: str):
    async with lock:
        if key not in cache:
            print(f"The value of key {key} is not in cache.")
            value = await request_remote()
            cache[key] = value
        else:
            print(f"The value of key {key} is already in cache.")
            value = cache[key]
        print(f"The value of {key} is {value}")
        return value


async def main():
    # ...


if __name__ == "__main__":
    asyncio.run(main())
Only the first task needs to update the cache.
Only the first task needs to update the cache. Image by Author

Problem solved, isn’t it simple?

Semaphore

Sometimes, we need to access a resource with limited concurrent requests.

For example, a particular database only allows five connections to be opened simultaneously. Or depending on the type of subscription you have, a web API only supports a certain number of concurrent requests at the same time.

In this case, you need to use asyncio.Semaphore. asyncio.Semaphore uses an internal counter that decrements by one each time a Semaphore lock is acquired until it reaches zero.

Semaphore will limit the number of concurrent tasks.
Semaphore will limit the number of concurrent tasks. Image by Author

When the counter of asyncio.Semaphore is zero, other tasks that need the lock will wait.

When calling the release method after the execution of other tasks, the counter will be increased by one. The waiting tasks can continue to execute.

The code example is as follows:

import asyncio
from asyncio import Semaphore
from aiohttp import ClientSession


async def get_url(url: str, session: ClientSession, semaphore: Semaphore):
    print('Waiting to acquire semaphore...')
    async with semaphore:
        print('Semaphore acquired, requesting...')
        response = await session.get(url)
        print('Finishing requesting')
        return response.status


async def main():
    # Although we start 1000 tasks, only 10 tasks will be executed at the same time.
    semaphore: Semaphore = Semaphore(10)
    async with ClientSession() as session:
        tasks = [asyncio.create_task(get_url("https://www.example.com", session, semaphore))
                 for _ in range(1000)]
        await asyncio.gather(*tasks)


if __name__ == "__main__":
    asyncio.run(main())

In this way, we can limit the number of connections that can be accessed concurrently.

BoundedSemaphore

Sometimes, due to code limitations, we can’t use async with to manage the acquire and release of semaphore locks, so we might call acquire somewhere and release somewhere else.

What happens if we accidentally call the asyncio.Semaphorerelease method multiple times?

import asyncio
from asyncio import Semaphore


async def acquire(semaphore: Semaphore):
    print("acquire: Waiting to acquire...")
    async with semaphore:
        print("acquire: Acquired...")
        await asyncio.sleep(5)
    print("acquire: Release...")


async def release(semaphore: Semaphore):
    print("release: Releasing as one off...")
    semaphore.release()
    print("release: Released as one off...")


async def main():
    semaphore = Semaphore(2)

    await asyncio.gather(asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(release(semaphore)))

    await asyncio.gather(asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)),
                         asyncio.create_task(acquire(semaphore)))


if __name__ == "__main__":
    asyncio.run(main())

As the code shows, we are limited to running two tasks simultaneously, but because we called release more than once, we can run three tasks at the same time next time.

To solve this problem, we can use asyncio.BoundedSemaphore .

As we know from the source code, when calling the release, a ValueError is thrown if the counter value is greater than the value set during initialization:

import asyncio
from asyncio import BoundedSemaphore


async def main():
    semaphore = BoundedSemaphore(2)

    await semaphore.acquire()
    semaphore.release()
    semaphore.release()


if __name__ == "__main__":
    asyncio.run(main())
When we call the release method multiple times, a ValueError is thrown.
When we call the release method multiple times, a ValueError is thrown. Image by Author

Therefore, the problem is being solved.

Event

Event maintains an internal boolean variable as a flag. asyncio.Event has three common methods: wait, set, and clear.

When the task runs to event.wait(), the task is in wait. At this point, you can call event.set() to set the internal marker to True, and all the waiting tasks can continue to execute.

When the task is finished, you need to call event.clear() method to reset the value of the marker to False, to restore the event to its initial state, and you can continue to use the event next time.

Instead of the sample code, I will show you how to use Event to implement an event bus at the end of the article.

Condition

asyncio.Condition is similar to asyncio.Lock and asyncio.Event combined.

First, we will use async with to ensure that the condition lock is acquired, and then we call condition.wait() to release the condition lock and make the task wait temporarily.

When condition.wait() passes, we regain the condition lock to ensure that only one task executes simultaneously.

While a task temporarily releases the lock and goes into wait by condition.wait(), another task can either async with to the condition lock and notify all waiting tasks to continue execution by the condition.notify_all() method.

The flowchart is shown below:

The workflow of asyncio.Condition. Image by Author

We can demonstrate the effect of asyncio.Condition with a piece of code:

import asyncio
from asyncio import Condition


async def do_work(condition: Condition):
    print("do_work: Acquiring condition lock...")
    async with condition:
        print("do_work: Acquired lock, release and waiting for notify...")
        await condition.wait()
        print("do_work: Condition notified, re-acquire and do work.")
        await asyncio.sleep(1)
        print("do_work: Finished work, release condition lock.")


async def fire_event(condition: Condition):
    await asyncio.sleep(5)
    print("fire_event: Acquiring condition lock....")
    async with condition:
        print("fire_event: Acquired lock, notify all workers.")
        condition.notify_all()
        print("fire_event: Notify finished, release the work...")


async def main():
    condition = Condition()
    asyncio.create_task(fire_event(condition))

    await asyncio.gather(do_work(condition), do_work(condition))


if __name__ == "__main__":
    asyncio.run(main())

Sometimes, we need asyncio.Condition to wait for a specific event to occur before proceeding to the next step. We can call the condition.wait_for() method and pass a method as an argument.

Each time condition.notify_all is called, condition.wait_for checks the result of the execution of the parameter method and ends the wait if it is True, or continues to wait if it is False.

We can demonstrate the effect of wait_for with an example. In the following code, we will simulate a database connection.

Before executing the SQL statement, the code will check if the database connection is initialized and execute the query if the connection initialization is completed, or wait until the connection is completed initializing:

import asyncio
from asyncio import Condition
from enum import Enum


class ConnectionState(Enum):
    WAIT_INIT = 0
    INITIALING = 1
    INITIALIZED = 2


class Connection:
    def __init__(self):
        self._state = ConnectionState.WAIT_INIT
        self._condition = Condition()

    async def initialize(self):
        print("initialize: Preparing initialize the connection.")
        await self._change_state(ConnectionState.INITIALING)
        await asyncio.sleep(5)
        print("initialize: Connection initialized")
        await self._change_state(ConnectionState.INITIALIZED)

    async def execute(self, query: str):
        async with self._condition:
            print("execute: Waiting for connection initialized")
            await self._condition.wait_for(self._is_initialized)
            print(f"execute: Connection initialized, executing query: {query}")
            await asyncio.sleep(5)
            print("execute: Execute finished.")

    async def _change_state(self, state: ConnectionState):
        print(f"_change_state: Will change state from {self._state} to {state}")
        self._state = state
        print("_change_state: Change the state and notify all..")
        async with self._condition:
            self._condition.notify_all()

    def _is_initialized(self):
        if self._state is not ConnectionState.INITIALIZED:
            print("_is_initialized: The connection is not initialized.")
            return False
        print("_is_initialized: The connection is ready.")
        return True


async def main():
    connection = Connection()
    task_one = asyncio.create_task(connection.execute("SELECT * FROM table"))
    task_two = asyncio.create_task(connection.execute("SELECT * FROM other_table"))

    asyncio.create_task(connection.initialize())
    await asyncio.gather(task_one, task_two)


if __name__ == "__main__":
    asyncio.run(main())

Some Tips for Using Synchronization Primitives

Remember to use timeout or cancelation when needed

When using synchronization primitives, we are generally waiting for the completion of a specific IO operation.

However, due to network fluctuations or other unknown reasons, the IO operation of a task may take longer than others.

In this case, we should set a timeout for the operation, so that when the execution time is too long, we can release the lock and allow other tasks to execute in time.

In another case, we may loop through a task. It may keep some tasks waiting in the background and prevent the program from ending properly.

At this point, remember to use cancel to terminate the cyclic execution of the task.

Avoid using synchronization primitives or locking only the fewest resources

We all know that the advantage of asyncio is that a task can switch to another task to execute while waiting for IO to return.

But an asyncio task often contains both IO-bound operations and CPU-bound operations.

If we lock too much code on the task, it will not be able to switch to another task in time, which will affect the performance.

Therefore, if not necessary, try not to use synchronization primitives or only lock the least amount of resources.

To avoid some other competitive locking situations

There is no RLock in asyncio, so don’t use locks in recursive code.

As with multithreading, asyncio also has the possibility of deadlocks, so try to avoid using multiple locks simultaneously.


Advanced Techniques in Action: Asyncio-based Event Bus

After the introduction earlier in the article, I believe you have a clear understanding of how to use asyncio’s synchronization primitives properly.

Next, I will teach you how to use the synchronization primitives in real projects by taking you through the implementation of an event bus.

As usual, the first step as an architect is to design the EventBus API.

import asyncio
from asyncio import Event
import inspect
from typing import Callable


class EventBus:
    def __init__(self):
        self._event_dict = dict()

    async def on(self, event_name: str, fn: Callable):
        PASS

    def trigger(self, event_name: str, *args, **kwargs):
        PASS

    def _get_event(self, event_name: str):
        PASS

Since EventBus communicates using strings and internally, I intend to use asyncio.Event to implement the events corresponding to each string, we’ll start by implementing a _get_event method:

    def _get_event(self, event_name: str):
        if event_name in self._event_dict:
            print("event already inited...")
            event = self._event_dict.get(event_name)
        else:
            print(f"need to init a new event for {event_name}")
            event = Event()
            self._event_dict[event_name] = event
        return event

The on method will bind a callback function to a specific event:

async def on(self, event_name: str, fn: Callable):
        event = self._get_event(event_name)
        while True:
            await event.wait()
            print("event fired")
            result = fn(*event.args, **event.kwargs)
            if inspect.isawaitable(result):
                await result
            
            # Since the callback function is likely a synchronous method, 
            # we must perform an await here to allow other tasks to execute.
            await asyncio.sleep(0.1)
            event.clear()

The trigger method can manually trigger an event and pass in the corresponding data:

def trigger(self, event_name: str, *args, **kwargs):
    event = self._get_event(event_name)

    event.args = args
    event.kwargs = kwargs
    event.set()

Finally, let’s write a main method to test the effect of EventBus:

def a_sync_callback(data):
    print(f"A sync callback with data {data} is triggered")


async def a_async_callback(data):
    await asyncio.sleep(1)
    print(f"A async callback with data {data} is triggered")


async def main():
    event_bus = EventBus()
    task_one = asyncio.create_task(event_bus.on("some_event", a_async_callback))
    task_two = asyncio.create_task(event_bus.on("some_event", a_sync_callback))

    event_bus.trigger("some_event", {id: 1})
    await asyncio.wait([task_one, task_two], timeout=20.0)

At the end of the main method, remember to use timeout to prevent the program from executing all the time, as I warned before.

The code is executed as expected.
The code is executed as expected. Image by Author

As you can see, the code is executed as expected. Isn’t it easy?


Conclusion

This article first introduced why Python asyncio needs synchronization primitives.

Then, I introduced the best practices for Lock, Semaphore, Event, and Condition, and gave some tips on how to use them correctly.

Finally, I have completed a small project with hands-on training on asyncio synchronization primitives, which I hope will help you better use synchronization primitives in real projects.

Feel free to comment, share, or discuss topics about asyncio with me.


🎉
Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome—let's discuss in the comments below!