This post is about async Python and gives a brief explanation and a set of tools to get from zero to one. If you are interested in this kind of content, please subscribe:
Code for the post can be found here, together with the Mocking and Faking OpenAI calls posts: https://github.com/xLaszlo/mopenai
I’ve recently started working on various GenAI projects, and one of the typical aspects of this type of job is that most work is done by other machines. The main program just calls APIs and then waits for their return. Given that Python is single-threaded by default due to the GIL, your program is mostly idle. Your program will take a long time to finish while not doing anything.
If you have an embarrassingly parallel problem (the same computation must be done on different data independently), you can go multiprocessing. But then, instead of waiting in one thread, you are now in multiple threads. Faster but more expensive. Your cores are still idling but now in parallel…
It would be great if, instead of waiting, the process could immediately start another call so it can wait for two (or more) API calls simultaneously. Whichever return first will be continued to completion, then onto the next and so on…
This sounds great but sounds very hairy to implement from scratch. Great news that someone took care of this in the form of the asyncio module.
What I described above is called “asynchronous” execution and is designed for precisely the usecases where other machines do the majority of work through API calls.
Asyncio
Instead of running the commands directly, in async programming, we start an “eventloop” (read: asyncio.run(main)). The code is defined as “coroutines” (read: async def instead of just def), and execution happens by calling a coroutine and then waiting for the result to return (read: await). While we are awaiting, the execution is handed back to the event loop handler, which can start another coroutine; then, we await that as well.
These awaited functions (sorry, “coroutines”) form a queue; when one return execution continues there until it runs into another await.
This has three consequences:
You need many things to run at the same time
The more coroutines you have, the more granular your execution will be and the faster it will finish.
If you have a lot of non-coroutine code that takes a long time to run, you will have a bad time. Code is still running in a single thread, so that will occupy that lone thread, and all other tasks will be waiting or not even starting (thereby defeating the purpose of this whole exercise).
So to summarise: The best usecase for async code is when you have many tasks, and each task is built of multiple API calls that take a long time.
This is literally GenAI. For example, document processing for RAGs consists of:
Getting the data (waiting for the documents to download)
Chunking and embedding (waiting for the embedding model to return the vectors)
Saving the data into a database and waiting for the transaction to complete
Waiting for summarisation
Waiting for structured extraction
The core program rarely does anything else other than orchestrate the above. There is little (heavy) numerical computation (like in traditional Machine Learning, more on this later).
Writing async code
Let’s start with the basics (… in main() indicate details that will come later):
To start the event loop, write a coroutine (add “async” in front of a function), import asyncio and call the coroutine as asyncio.run(). You can pass parameters to the function just like a normal Python function.
And that’s it.
Even easier if you typically wrap your code with FastAPI as it initialises the event loop by default, and all route functions can be async. Calling multiple endpoints in quick succession utilises the eventloop to execute them quasi-parallel.
AsyncWorker will be a processing service that can schedule the execution of tasks and check if they are finished. A basic exercise that we regularly run into.
To implement it, we first need to have some tasks. We will define them as pydantic classes. As you can see, there is nothing async-specific in this.
The tasks are handled by the AsyncWorker, which has three services:
It holds the tasks to be processed. A task can be scheduled and run, and we can check if all tasks are finished.
Here, we can see three async elements:
Loop: We get a reference to the event loop, which we can use to create tasks.
Lock: This is used in context managers as “async with.” It enables that only one function to access a variable at the same time. If a different coroutine runs into another “async with” with the same lock, it will block its execution until the first context manager finishes.
Semaphore: This is like many locks. The number indicates that the maximum N number of coroutines can be in the body of the context manager at the same time. Very useful if you have a resource that is limited by the number of requests to process in unit time. For example, OpenAI tokens/minute limitations require semaphores. If you start too many coroutines simultaneously, all of them will call OpenAI at the same time, and you will get HTTP 429 “Too Many Requests” errors.
Let’s look at the implementation of the functions of AsyncWorker:
async def schedule():
As we can see from the “async def” syntax, this is a coroutine. That’s because we will use the “async with” context manager.
Typically, you can only use async syntax (async with or await) in async functions. So async coroutines will chain back directly to where you are calling asyncio.run(main()).
So here we are accessing the tasks through a lock as other processes can write the same resource, and you don’t want to mix with them. When you are mutating this variable, you will always use a lock.
The last line demonstrates how to create a new task. Just like in a sync program, the above means self.run(task.id) will be run. The question is just when.
There is no return value, but anything can be returned to the caller, just like in regular Python.
async def is_finished()
As we can see, the is_finished() function is also a coroutine, so we will use the lock again. We check which tasks are finished and return True if all of them are. We don’t want an inconsistent result if something was changed while we were running this function, hence the lock. (Maybe this is excessive here, and in reality, you can’t get an inconsistent result, but it will do for demonstration purposes)
async def run()
This is the actual process, so let’s go through it line by line. The logging will help us later understand the flow of the program.
First, we run into the semaphore. The first 3 (see construction) tasks of 5 (see the Task class) will start execution while the last two will wait until at least one will exit the context manager.
The lock will help acquire the task; this will wait if the is_finished() coroutine is running. Then, based on the task’s parameters, we display some message and wait a certain amount of seconds.
This is what the “await” syntax does. It starts a coroutine and immediately hands back execution to the event loop so something else can be done, and this one goes to the end of the queue. The event loop gets the next task and starts executing it until that coroutine runs into an await as well. When the asyncio.sleep surfaces again, the eventloop checks if the timer has expired, and if it did, it continues the execution in the following line (logging the task message as seen above).
Before the routine finishes, we acquire the lock (wait for it to be free) and update the task status.
async def main()
Let’s tie all this together:
First, we create the service.
Then we go through each of the tasks and schedule it. Notice the await, while we are calling the schedule() functions the eventloop already creating the tasks and potentially running them (see at schedule() at self.loop.create_task(self.run(task.id))). This is why we are using the locks. We don’t know what state all other coroutines will be.
Once we scheduled all we start listening to see if they are finished: this the “while not is_finished():”, but is_finished() is a coroutine! It needs to be awaited!
So correctly, we are waiting in the while loop with “while not await is_finished()”; when the coroutine finally finishes (gets its turn in the event loop, acquires the lock, does the calculation), it will return with a common boolean value like a normal Python function.
One last time, we acquire the lock and display the task statuses.
Logs
Let’s take a look at the logs (comments inline):
If you download the code from https://github.com/xLaszlo/mopenai, you can run it with “uv run python mopenai/main_async.py” to recreate the above. (If you don’t have uv, check it out here, I can assure you it’s worth it. Or just “pip install uv”.
Simplified execution:
Sometimes, you just want to run a bunch of coroutines at the same time. This is what “gather()” is for. Replace the main() body with:
Of course, not monitoring or anything else. But it does the same job.
Real use
In the above example, “asyncio.sleep()” is a stand-in for long-running API calls. If you would like to use it in an actual situation, look for the async version of clients for various APIs.
For example:
“from openai import OpenAI” → “from openai import AsyncOpenAI”
And then:
“chat_completion = client.chat.completions.create()“
→
“chat_completion = await client.chat.completions.create()”
Pretty much that’s it. Just don’t forget that you need to await for every coroutine.
As you saw in my previous Mock/Fake articles, it is good practice to wrap the clients with your own wrappers. Then, I typically add the async_ prefix to coroutines to make it deliberate and explicit.
Closing
I hope you find this post helpful. I wrote it as a reminder to myself how to implement the most common patterns I encountered recently. I also thought it could be useful for someone who is just getting familiar with async Python.
If you would like to read more similar content, check out my two articles on mocking and faking the OpenAI client:
And, of course: