Using Tqdm with Asyncio in Python

An efficient way to monitor concurrent tasks’ progress

Using Tqdm with Asyncio in Python
Photo by Jungwoo Hong on Unsplash

Introduction

What’s bothering me

Using concurrent programming in Python for efficiency is not unusual for a data scientist. Watching various sub-processes or concurrent threads in the background to keep my computation or IO-bound tasks in order is always satisfying.

But one thing that still bothers me is that when I’m concurrently processing hundreds or thousands of files or executing hundreds of processes in the background, I’m always worried about whether a few tasks will hang secretly and the whole code will never finish. I also have difficulty knowing where the code is now in execution.

The worst part is that when I’m looking at a blank screen, it’s hard to tell how much longer my code will take to execute or what the ETA is. This is very detrimental to my ability to organize my work schedule.

Therefore, I wanted a way to let me know where the code execution had gotten to.

How it was done in the past

A more traditional approach is to share a memory area between tasks, put a counter in this memory area, let this counter+1 when a task is finished, and then use a thread to keep printing the value of this counter.

This is never a good solution: On the one hand, I need to add a code for counting into your existing business logic, which violates the principle of “low coupling, high cohesion”. On the other hand, I’d have to be very careful with the locking mechanism due to thread-safety issues, which would cause unnecessary performance problems.

tqdm is the way

tqdm uses a progress bar to indicate the progress of your tasks.
tqdm uses a progress bar to indicate the progress of your tasks. Image by Author

One day, I discovered the tqdm library, which uses a progress bar to visualize the progress of my code. Could I use the progress bar to visualize the completion and ETA of my asyncio tasks?

I went ahead and researched, and I succeeded. Then I’m sharing this method with you so that every programmer can have a chance to monitor their concurrent tasks' progress. Let’s go.


Background on asyncio in Python

Before we start, I’d like you to get some background on Python asyncio. My article describes the usage of some of asyncio’s common APIs, which will help us better understand the design of tqdm:

Use These Methods to Make Your Python Concurrent Tasks Perform Better
Best practices for asyncio.gather, asyncio.as_completed, and asyncio.wait

Overview of tqdm

As the official website describes, tqdm is a tool that displays a progress bar for your loops. It is straightforward to use, highly customizable and has a shallow resource footprint.

A typical usage is to pass an iterable object into the tqdm constructor, and you get a progress bar like the following:

from time import sleep
from tqdm import tqdm


def main():
    for _ in tqdm(range(100)):
        # do something in the loop
        sleep(0.1)


if __name__ == "__main__":
    main()

Or you can manually go through and update the progress of the progress bar as the file is being read:

import os
from tqdm import tqdm


def main():
    filename = "../data/large-dataset"
    with (tqdm(total=os.path.getsize(filename)) as bar,
            open(filename, "r", encoding="utf-8") as f):
        for line in f:
            bar.update(len(line))


if __name__ == "__main__":
    main()
Use tqdm to indicate the progress of reading a large dataset.
Use tqdm to indicate the progress of reading a large dataset. Image by Author

Integrating tqdm with asyncio

Overall, tqdm is very easy to use. However, there needs to be more information on GitHub about integrating tqdm with asyncio. So I went digging through the source code to see if tqdm supports asyncio.

Fortunately, the latest version of tqdm provides the package tqdm.asyncio, which provides the Class tqdm_asyncio.

The Class tqdm_asyncio has two related methods. One is tqdm_asyncio.as_completed. As you can see from the source code, it is a wrapper for asyncio.as_completed:

@classmethod
    def as_completed(cls, fs, *, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.as_completed`.
        """
        if total is None:
            total = len(fs)
        kwargs = {}
        if version_info[:2] < (3, 10):
            kwargs['loop'] = loop
        yield from cls(asyncio.as_completed(fs, timeout=timeout, **kwargs),
                       total=total, **tqdm_kwargs)

The other is tqdm_asyncio.gather , which, as seen from the source code, is based on an implementation of tqdm_asyncio.as_completed that emulates the functionality of asyncio.gather:

@classmethod
    async def gather(cls, *fs, loop=None, timeout=None, total=None, **tqdm_kwargs):
        """
        Wrapper for `asyncio.gather`.
        """
        async def wrap_awaitable(i, f):
            return i, await f

        ifs = [wrap_awaitable(i, f) for i, f in enumerate(fs)]
        res = [await f for f in cls.as_completed(ifs, loop=loop, timeout=timeout,
                                                 total=total, **tqdm_kwargs)]
        return [i for _, i in sorted(res)]

So, next, I will describe the usage of these two APIs. Before we start, we also need to do some preparation work. Here, I have written a simple method that simulates a concurrent task with a random sleep time:

import asyncio
import random

from tqdm.asyncio import tqdm_asyncio


class AsyncException(Exception):
    def __int__(self, message):
        super.__init__(self, message)


async def some_coro(simu_exception=False):
    delay = round(random.uniform(1.0, 5.0), 2)

    # We will simulate throwing an exception if simu_exception is True
    if delay > 4 and simu_exception:
        raise AsyncException("something wrong!")

    await asyncio.sleep(delay)

    return delay

Immediately afterward, we will create 2000 concurrent tasks and then use tqdm_asyncio.gather instead of the familiar asyncio.gather method to see if the progress bar works properly:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())
    await tqdm_asyncio.gather(*tasks)

    print(f"All tasks done.")


if __name__ == "__main__":
    asyncio.run(main())
The effect of tqdm_asyncio.gather.
The effect of tqdm_asyncio.gather. Image by Author

Ta-da! I finally know where my task is done. Pretty cool.

Or let’s replace tqdm_asyncio.gather with tqdm_asyncio.as_completed and try again:

async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    for done in tqdm_asyncio.as_completed(tasks):
        await done

    print(f"The tqdm_asyncio.as_completed also works fine.")


if __name__ == "__main__":
    asyncio.run(main())
tqdm_asyncio.as_completed also works fine.
tqdm_asyncio.as_completed also works fine. Image by Author

Great, it still works fine.


Advanced Tips and Tricks

Some common configuration items

tqdm has a rich set of configuration items, so here are some common ones.

  • desc. You can configure a desc parameter to display a title in front of the progress bar, which is useful when distinguishing between multiple groups of tasks.
async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    _ = await tqdm_asyncio.gather(*tasks, desc="The progress of works")

    print(f"The role of the desc configuration item.")


if __name__ == "__main__":
    asyncio.run(main())
The role of the desc configuration item.
The role of the desc configuration item. Image by Author
  • ncols. If the default progress bar is too short, you can make it longer with this parameter.
async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    _ = await tqdm_asyncio.gather(*tasks, desc="The progress of works",
                                  ncols=105)

    print(f"Use ncols to change the width of the bar.")


if __name__ == "__main__":
    asyncio.run(main())
Use ncols to change the width of the bar.
Use ncols to change the width of the bar. Image by Author
  • colour. Pycharm’s cli shows the progress bar in red by default, which is still too harsh, so you can use this parameter to change the bar to another color. But as of writing this article, I still haven’t found a way to change the text to white.
async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    _ = await tqdm_asyncio.gather(*tasks, colour="white")

    print("Use colour to change the color of the bar.")


if __name__ == "__main__":
    asyncio.run(main())
Use colour to change the color of the bar.
Use colour to change the color of the bar. Image by Author
  • bar_format. This option allows you to flexibly control the content and format of the progress bar display. For example, if you want to display an ETA at the top.
async def main():
    tasks = []
    for _ in range(2000):
        tasks.append(some_coro())

    _ = await tqdm_asyncio.gather(*tasks, bar_format="Eta:{eta}.|{bar}{r_bar}")

    print("Use bar_format to customize the content of the progress bar.")


if __name__ == "__main__":
    asyncio.run(main())
Use bar_format to customize the content of the progress bar.
Use bar_format to customize the content of the progress bar. Image by Author

Handling of exceptions

As you can see from the source code, tqdm implements the gather method through the tqdm_asyncio.as_completed method. Therefore, we can’t skip exception-catching by using the return_exceptions parameter.

Which is a pity. But we can still handle exceptions within tqdm_asyncio.as_completed via try…exception in tqdm_asyncio.as_completed:

async def main():
    tasks, errs = [], 0
    for _ in range(2000):
        tasks.append(some_coro(simu_exception=True))

    for done in tqdm_asyncio.as_completed(tasks):
        try:
            _ = await done
        except AsyncException:
            errs += 1

    print(f"All tasks done. {errs} task(s) failed")


if __name__ == "__main__":
    asyncio.run(main())
Handling of exceptions.
Handling of exceptions. Image by Author

Real-World Use Cases

Many code examples for asyncio are used asyncio.sleep to simulate IO-bound cases, which unfortunately oversimplifies the real-world matter. We should use a real-world case to explain using tqdm in asyncio.

However, we cannot use a real case in this chapter for space reasons. In the next chapter, we will demonstrate how the tqdm progress bar works in the real world in an example of using asyncio to implement a map-reduce program to handle large files.

Combining Multiprocessing and Asyncio in Python for Performance Boosts
Combining Multiprocessing and asyncio via run_in_executor unifies the API for concurrent and parallel programming, simplifies our programming process, and allows us to obtain execution results in order of completion. This article will use a Real-world Example to Explain the Code Implementation

Conclusion

Using tqdm to indicate progress in asyncio code has many benefits:

  • We can show progress in the caller’s progress bar without intruding into the business code.
  • All work can be done in the main process without worrying about thread safety and performance issues.
  • The graphical presentation is always much more vivid than boring text descriptions.
  • And all this with just one line of code.

I’ve also tried other libraries for progress bars, such as alive-progress, which is much cooler in presentation, but alive-progress doesn’t support asyncio.

tqdm can also produce some cool effects if set up correctly, but I haven’t delved into it due to time, so feel free to discuss further and leave comments. You may help more interested readers.


🎉
Enjoyed this read? Subscribe now to get more cutting-edge data science tips straight to your inbox! Your feedback and questions are welcome—let's discuss in the comments below!