Building a mental model for async programs

Nov 16, 2024

In my experience writing async programs in Dart and Rust, I have built up a model in my head to avoid bugs caused by async. These bugs are usually hard to find and reproduce, so there is a lot of value in taking preventive measures to avoid them. I hope to improve this model in my head by writing this post and to share them with anyone who might find it helpful.

We will be building a mental model that applies for any async language, this post will not focus on the exact implementation details of some async runtime of a specific language. The post will concentrate more on thinking about async programs in a certain way to uncover bugs.

Call graph

Call graph is a visual representation of a program that shows the trace of function calls during the execution flow of a program.

Above is a simple program that calculates either the maximum or the average of a given array of numbers depending on whether its length is even or odd. The program flow can take two possible paths, either (A) or (B) depending on the input argument to calculate.

In the path (A) the following happens, you would observe this if you put a breakpoint at the beginning of calculate and step through the program line by line using the debugger,

Usually call graphs are represented as trees, but I have chosen this block representation for this post. Call graphs usually come up when you profile your program. The above example is a very contrived tiny example, call graphs for real programs are much larger and have higher levels of nesting.

Tasks

A task is an instance of a call graph with resumable yield points. This statement will become more clear in the next section. All languages with an async runtime have their own way of creating or spawning a task

Co-operative scheduling

In this section we will find out what exactly happens during the execution of an async program and what does “resumable yield points” mean. Let’s inspect a very simple async program in Javascript (Deno runtime)

async function sleep(ms: number) {
    await new Promise((resolve, _) => setTimeout(resolve, ms))
}

async function bar() {
    console.log('bar start')
    await sleep(2)
    console.log('bar end')
}

async function foo1() {
    console.log('foo1 start')
    await bar()
    console.log('foo1 between')
    await sleep(3)
    console.log('foo1 end')
}

async function foo2() {
    console.log('foo2 start')
    await sleep(3)
    console.log('foo2 between')
    await sleep(3)
    console.log('foo2 end')
}

function main() {
    foo1()
    foo2()
}

main()

The above program no matter how many times you run it, will always produce the following output

foo1 start
bar start
foo2 start
bar end
foo1 between
foo2 between
foo1 end
foo2 end

Let’s figure out why by constructing the call graph.

First we have the initial call graph (“start”) at the beginning of the program, where the function main is being called at line 32. main is a synchronous function, so that is just a normal call.

Inside main, two more functions are called, foo1 and foo2. Both are async functions being called without the await keyword. So both call graphs of foo1 and foo2 are spawned as tasks by main’s call graph. You can see foo1’s and foo2’s call graph are not part of the initial call graph they are called from, instead they are spawned off as their own call graph as a task.

Within foo1 the function bar is being called with an await keyword. So no task spawning happens, bar is part of foo1’s call graph. Same applies for the call to sleep within bar.

If we keep going through the program and uncovering function calls or task spawns, we can build a call graph like this. After constructing the call graph, we will observe that we will only have await for OS calls such as time, file IO or network IO at the very inner most nesting level 1 2. In our example here, it was await Promise on setTimeout. It could also be something like await fetch (network IO) or await writeFile (file IO).

Our next step is to split these tasks into “atomic executable units”. To do that, we first mark these inner most time/IO await points

And then we break and split the tasks into executable units along these lines that mark the inner most time/IO await points

The job of the language’s async runtime is to run these executable units in some order on a given set of CPU core(s). Most languages only support using a single core for async (Python, Javascript, Dart), so we will look at that first.

One unit has to completely finish before executing another unit. The runtime maintains a list of units it has to execute. This list will get bigger as the runtime discovers more units to execute and the list gets smaller as the runtime finishes executing a unit.

Below is generic illustration of what happens in the runtime for our example program, it will be helpful to keep it open on the side on another tab for reference

Let’s breakdown the illustration

This example is very simple, real programs may have hundreds of units or even millions of such units. Real programs may also have more spawning, in this example all of the task spawning happened at the beginning by the start unit, it could also happen in any of the other following units. In this example, conveniently only one of the units from the list became runnable after waiting, it could be possible that multiple units become runnable and then runtime has to do some sort of prioritization. Also we always picked the runnable task from top of the list (example picking foo1 (partA) over foo2 (part A) at T2), some other order is also possible.

Let’s contrast the async execution we saw here to that of a synchronous one. If this program was synchronous, a call to sleep(ms: number) would actually make the program wait and do nothing and force the runtime to be idle for ms time. In an async program, a call to sleep(ms: number) marks a breaking of a task into executable units and the runtime attempts to execute some other runnable unit instead of doing nothing (like you saw at T3).

Actual implementation of async runtimes don’t actually keep track of units, they keep track of tasks. When the runtime runs the task, the task “pauses” itself or yields when it reaches the end of a unit. It then asks the runtime to resume itself from the next unit when the operation between the two units that broke up the task is complete (remember that tasks are broken up into units along the innermost nested time/IO await points).

The tasks willingly gives up execution for some other task to run, hence the name “co-operative scheduling”. The runtime runs the task until the task pauses itself or “yields” and resumes it when it becomes runnable. Hence “resumable yield points”.

IO Operations

As you can see from the illustration, the two tasks were co-operatively sharing execution. The key that made this possible was the ability to keep track of time while simultaneously some unit was running and to get notified when the time has reached a certain value. In this example at T3, someone (the operating system) has to keep track of time while foo2 (part A) is running and notify the runtime at T3 + 2 so foo1 (part B) can be run.

In this example we only saw calls to sleep and time as being the condition or constraints for a unit to become runnable. It can also be file IO or network IO (something like fetch or writeFile). For example a unit could only become runnable if a write operation to some file completes, or if we receive some packets on the network.

Just like how time can be kept track while some unit is running, we can also have a unit running while an IO operation on a network or a file is ongoing. The language’s async runtime uses an API to fire IO events and to get notified when they are complete. This API 3 is provided by the operating system and usually called the IO event notification API. So async will only help if you have lots of IO operations within your tasks, if you don’t then there is no opportunity for another task to run and take advantage of the IO event notification API.

You don’t have to use the IO event notification API to perform IO, this API is only used by async programs. If you are writing a synchronous program, it uses a completely different set of APIs to perform IO also provided by the operating system. I will refer to these as Non-async IO APIs from here on.

Tasks vs Threads

There is another way to run the code in foo1 and foo2 at the same time on a single core instead of async. For that we make foo1 and foo2 separate synchronous programs and run them as two separate processes from our terminal4.

foo1.ts file

function sleepSync(ms: number) {
    Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms)
}

function bar() {
    console.log('bar start')
    sleepSync(2)
    console.log('bar end')
}

function foo1() {
    console.log('foo1 start')
    bar()
    console.log('foo1 between')
    sleepSync(3)
    console.log('foo1 end')
}

foo1()

foo2.ts file

function sleepSync(ms: number) {
    Atomics.wait(new Int32Array(new SharedArrayBuffer(4)), 0, 0, ms)
}

function foo2() {
    console.log('foo2 start')
    sleepSync(3)
    console.log('foo2 between')
    sleepSync(3)
    console.log('foo2 end')
}

foo2()

Don’t worry about the implementation of sleepSync here, all it does it it makes the program do nothing for ms milliseconds and is not async (forces the runtime to go to idle).

If we run them as two separate processes from our terminal like so

deno foo1.ts & deno foo2.ts

You get an output like below (your result might be different)

foo2 start
foo1 start
bar start
bar end
foo1 between
foo2 between
foo1 end
foo2 end

Looking at the output, you can see the two programs do run at the same time. The test was done on a single core machine (you likely don’t have one), but if you to get your hands on one (VM), the result will be similar.

We can also do the same experiment in Python by using the threading and time.sleep to start synchronous versions of foo1 and foo2 as separate threads instead of async def and asyncio.sleep. Processes and threads are same except that threads can share memory.

Here is the async version that uses Async tasks and start foo1 and foo2 as separate async tasks,

import asyncio

async def sleep(ms: int):
    await asyncio.sleep(ms / 1000)

async def bar():
    print("bar start")
    await sleep(2)
    print("bar end")

async def foo1():
    print("foo1 start")
    await bar()
    print("foo1 between")
    await sleep(3)
    print("foo1 end")

async def foo2():
    print("foo2 start")
    await sleep(3)
    print("foo2 between")
    await sleep(3)
    print("foo2 end")

async def main():
    await asyncio.gather(foo1(), foo2())

asyncio.run(main())

Here is the sync version that uses threads instead of async tasks. Here synchronous versions of foo1 and foo2 are started as two separate threads

import threading
import time

def sleep(ms: int):
    time.sleep(ms / 1000)

def bar():
    print("bar start")
    sleep(2)
    print("bar end")

def foo1():
    print("foo1 start")
    bar()
    print("foo1 between")
    sleep(3)
    print("foo1 end")

def foo2():
    print("foo2 start")
    sleep(3)
    print("foo2 between")
    sleep(3)
    print("foo2 end")

def main():
    thread1 = threading.Thread(target=foo1)
    thread2 = threading.Thread(target=foo2)

    thread1.start()
    thread2.start()

    thread1.join()
    thread2.join()


main()

Your operating system is scheduling these two processes or threads to run at the same time, very similar to how our language’s async runtime was scheduling async tasks to run at the same time. In fact an async language runtime and tasks is kind of like the OS’s scheduler and processes (at least from the programmer’s perspective).

So why async tasks when we can use multiple processes or threads? Tasks are cheaper than processes.

Typical HTTP backend

In a typical HTTP backend, your framework will create one task per request. This task creation is hidden from you. Usually there is an “app” object which has methods to add or register request handlers (closures) and “app.run” would start a infinite loop which listens for requests and create a task every time it gets a request.

Here is how it would look in a imaginary language, this is what you would see as the user of a web framework.

import { App, Request } from 'web_framework'

let app = App()

app.get('/hello', (request: Request) => {
    return 'hello'
})

app.run()

Here app.get would register the closure for the given endpoint in something like a map. Inside app.run will be the task creation, which when over simplified would look like below. This is hidden from you within the run method inside the framework.

fn run(this: App) {
    while true {
        let request = listen("0.0.0.0")

        // Spawns a task, no await keyword
        this.parseAndHandleRequest(request)
    }
}

The parseAndHandleRequest parses the HTTP request, request body, query parameters etc. It then finds the corresponding registered handler (closure or function) for the URL in the parsed request and calls that handler with the parsed request.

Uncovering Bugs

Before looking at an example that has a bug, let’s take a look at an example that does not have a bug

let counter = 0

async function increment() {
    counter += 1
}

async function main() {
    let promises: Promise<void>[] = []

    for (let i = 0; i < 1000; i += 1) {
        promises.push(increment())
    }

    await Promise.all(promises)

    console.log(counter)
}

main()

We start increment function at line 3 as a task 1000 times at line 11. Each one of these tasks increment the counter by one and line 16 will print 1000 as expected. Line 8 and line 14 are needed so we print the value of the counter after all tasks have finished executing.

If we look at how this tasks as scheduled, we can see each task executes one by one and increments the counter. Remember that we are still looking at single core, and as you can see each task is executed serially. This serial behavior is because there are no yield points in the task (the await keyword).

Now let’s make the counter a file,

import { writeFile } from 'node:fs/promises'
import { readFile } from 'node:fs/promises'

async function increment() {
    const contents = await readFile('counter.txt', { encoding: 'utf-8' })
    const value = parseInt(contents) || 0
    const valueUpdated = value + 1
    await writeFile('counter.txt', valueUpdated.toString())
}

async function main() {
    for (let i = 0; i < 1000; i += 1) {
        increment()
    }
}

main()

If you make a file called counter.txt and run the program, you can see the file will only have 1 (or some other very low number) despite starting 1000 tasks at line 13.

If you look at how these tasks are scheduled, all the 1000 tasks read the 0 and all of them write back a 1.

As you can see bugs in async can happen when multiple task attempt to modify a common resource across a yield points (await keyword, line 5 and line 8 in this case). Let’s look at some mitigation strategies to prevent this bug

Using a lock

Locks forces serial execution on tasks.

let lock = new AsyncLock()

async function increment() {
    await lock.acquire()

    const contents = await readFile('counter.txt', { encoding: 'utf-8' })
    const value = parseInt(contents) || 0
    const valueUpdated = value + 1
    await writeFile('counter.txt', valueUpdated.toString())

    lock.release()
}

async function main() {
    for (let i = 0; i < 1000; i += 1) {
        increment()
    }
}

main()

Here for the first increment task at i = 0, lock.acquire() at line 4 resolves immediately but for every other consecutive task has to wait for the previous (i - 1)th task to call lock.release() at line 11. This forces serial execution.

There are several ways to implement AsyncLock, usually this is part of the language’s standard library, but we will have to implement it ourselves for Javascript. Here is one way to implement it

class AsyncLock {
    locked: boolean
    release: () => void

    constructor() {
        this.locked = false
        this.release = () => {
            this.locked = false
        }
    }

    acquire() {
        if (!this.locked) {
            this.locked = true
            return Promise.resolve()
        }

        return new Promise<void>((resolve) => {
            const prevRelease = this.release
            this.release = () => {
                this.release = prevRelease
                resolve()
            }
        })
    }
}

WAL files

Another way to fix this bug would be make the counter.txt a log file instead. Here each task will append a new line to the file and then we can count the number of lines in the file to get the count.

import { appendFile } from 'node:fs/promises'

async function increment() {
    await appendFile('counter.txt', '+1\n')
}

async function main() {
    for (let i = 0; i < 1000; i += 1) {
        increment()
    }
}

main()

In this example, after running the program, you will see 1000 lines of "+1 in the file. This solution assumes appendFile is guaranteed to be atomic by the operating system.

Using synchronous code

We can also fallback to using synchronous code within an async functions. This strategy should be used very sparingly, using sync code forces the runtime to go to idle until the operation completes, even if other runnable units are present. This strategy can be used if you have an application with very low IO load eg. mobile apps. If you have a backend application handling lots of requests, you probably should not do this.

import { readFileSync, writeFileSync } from 'node:fs'

async function increment() {
    const contents = readFileSync('counter.txt', { encoding: 'utf-8' })
    const value = parseInt(contents) || 0
    const valueUpdated = value + 1
    writeFileSync('counter.txt', valueUpdated.toString())
}

async function main() {
    for (let i = 0; i < 1000; i += 1) {
        increment()
    }
}

main()

After running the sync program, you will the value stored in the file counter.txt will always increment by 1000.

Multi-core and work stealing

Now let’s talk about how we can take advantage of multi-core CPUs. For languages that support only single core, multi-core is achieved by just running multiple instances (workers) of the program and having a master process that manages and distributes load across the instances. Let’s take the example of a Javascript backend,

Here the master process is Nginx that takes care of distributing the load. If you take the example of FastAPI and Python, the master process becomes uvicorn and the distribution of load is done by the operating system by having all of the workers share the same network socket for incoming requests.

The biggest disadvantage of this type of multi-core setup is that one worker can be unlucky enough to get hit with all the heavy requests. In that case that one worker will have lots of tasks in its queue that are waiting to be complete. Other workers cannot help the unlucky worker, because in this setup all workers are independent having their own task queues and they don’t share memory.

This is where work stealing comes in. In this model the language runtime supports running multiple workers that share5 the same task queue, the workers can steal tasks and help out other workers. Here there is no master process distributing the load. A task listens for requests and creates new tasks. The new tasks can then be picked up by any worker. The task that is listening for requests or any other ongoing task can also suddenly start running on another worker instance on yielding and resuming. This model also goes by the name M workers N tasks scheduling in the Golang world.

There is a catch, if you have a work stealing executor, you have to make sure all of your code can now run concurrently across multiple threads. Take the first example in the Uncovering bugs section where we incremented a global variable and port it to Golang.

package main

import (
    "fmt"
    "sync"
)

var counter int = 0

func increment(wg *sync.WaitGroup) {
    counter += 1
    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    wg.Add(1000)
    for range 1000 {
        go increment(&wg)
    }
    wg.Wait()

    fmt.Println(counter)
}

Notice we are using a WaitGroup, it is equivalent to using Promise.all. This was not a bug in Javascript because Javascript does not have a multi-core work stealing executor, but in Golang you will observe that you always get a value lower than 1000. This is because now multiple threads are trying to modify the global variable.

You will have to use Mutex (or Atomics) now to prevent the bug.

package main

import (
    "fmt"
    "sync"
)

var counter int = 0
var mu sync.Mutex

func increment(wg *sync.WaitGroup) {
    mu.Lock()
    counter += 1
    mu.Unlock()
    wg.Done()
}

func main() {
    var wg sync.WaitGroup

    wg.Add(1000)
    for range 1000 {
        go increment(&wg)
    }
    wg.Wait()

    fmt.Println(counter)
}

Explicit vs Automatic Future types

In some languages, a promise immediately starts running the moment you instantiate it, while in other languages, an explicit await is required for the promise to run. In other words, the promise is not “discovered” by the runtime without an explicit await.

Let’s see this behavior in action. Javascript is an example of having automatic promises (futures), so in the below example the promise assigned to p runs and prints “Hello” even without an explicit await on it.

async function sleep(ms: number) {
    await new Promise((resolve, _) => setTimeout(resolve, ms))
}

async function sayHello() {
    await sleep(100)
    console.log('Hello')
}

async function main() {
    // instantiate a promise an assign it to `p`
    const p = sayHello()

    await sleep(200)
}

main()

If you port this example to Rust, you can see the promise will NOT print “Hello” without an explicit await.

use std::time::Duration;

async fn say_hello() {
    tokio::time::sleep(Duration::from_millis(100)).await;
    println!("Hello");
}

#[tokio::main]
async fn main() {
    // instantiate a promise an assign it to `p`
    let p = say_hello();

    // `p` will NOT run, uncomment for `p` to run
    // p.await;

    tokio::time::sleep(Duration::from_millis(200)).await;
}

Python is also similar to Rust, the interpreter even warns you if you don’t have an await.

import asyncio


async def say_hello():
    await asyncio.sleep(0.01)
    print("Hello")


async def main():
    # instantiate a promise an assign it to `p`
    p = say_hello()

    # `p` will NOT run, uncomment for `p` to run
    # await p

    await asyncio.sleep(0.02)


asyncio.run(main())

Comparing async implementations

All (major) async languages so far defer by below factors

Let’s compare some languages based on these factors

Language Syntax type Work stealing Automatic or Explicit Storage for local variables
Javascript async and await No Automatic Heap
Dart async and await No Automatic Heap
Python async and await No Explicit Heap
Rust async and await Yes and No (runtime dependent) Explicit Mixed and runtime dependent6
Golang go or spawn keyword Yes Not applicable Dynamic stack 7 (heap for collection types as decided by the compiler)

Conclusion

This post was an attempt to to put everything I know about async into a single post. The information in this post is what I use today to understand and debug any async programs in the wild and write bug free async programs.


  1. Assuming you have simplified all of your .then (Javascript, Dart) or add_done_callback (Python) to using await↩︎

  2. Assuming you have simplified any complex nested construction of new Promise or Future into using await and/or one liner Promises on something like setTimeout (time, file IO or network IO)↩︎

  3. Also known as system calls since they are provided and implemented by the operating system, in this blog, API will mean system calls↩︎

  4. You can also use the worker_threads API↩︎

  5. The sharing strategy in reality is complex and varies between languages, the blog shows an oversimplified picture↩︎

  6. See discussion on Reddit↩︎

  7. “Dynamic stack” is beyond the scope of this blog, but there are plenty of other resources explaining this↩︎