Creating a workflow orchestrator for Kubernetes

Oct 25, 2023

I built a workflow orchestrator partly to learn Rust and partly because I was curious how existing orchestrators worked. In the context of Kubernetes, a workflow orchestrator helps you manage, deploy and chain multiple jobs. You can define a tree of jobs or tasks, where one task is dependent on output of another task. This is really useful for creating pipelines that crunch and transform large amounts of data or perform intensive computations with several steps.

In this blog we will go over the inner workings of a workflow orchestrator I created, called Flowmium and also my developer experience with Rust while I worked on this project. We will also take a look at a small and minimal Python framework that was developed on top of Flowmium that lets us define workflows where a Kubernetes job is a plain function and dependencies between jobs can be marked with decorators.

Flow definition model

Let’s take a look at a simple example flow to get a feel for what our orchestrator can do

name: testing
tasks:
  - name: hallo-world
    image: debian:latest
    depends: []
    cmd:
      - sh
      - -c
      - echo $MESSAGE  >> /task-output-foo.txt
    env:
      - name: MESSAGE
        value: Hallo world
    outputs:
      - name: foo-output
        path: /task-output-foo.txt
  - name: replace-letter-a
    image: debian:latest
    depends:
      - hallo-world
    cmd:
      - sh
      - -c
      - cat /task-input.txt | sed 's/a/e/g' > /task-output-replace-letter-a.txt
    env: []
    inputs:
      - from: foo-output
        path: /task-input.txt
    outputs:
      - name: replace-letter-a-output
        path: /task-output-replace-letter-a.txt

This flow defines two tasks (defined at line 3 and line 16) that will each run in its own Kubernetes Job.

So we just defined two Kubernetes Jobs that have to run serially one after another, chained the output of the first Job into the other one. Notice the declaration of dependency in line 19. You can try and run this flow to see it in action. First you will have to setup Flowmium on your machine by following these very straightforward instructions.

Then you can run this flow using the helper CLI tool

flowctl --url http://localhost:8080 submit test-flow.yaml

You can download the output artefact by running

flowctl --url http://localhost:8080 download <flow-id> replace-letter-a-output .

If you open the file, you will see “Hello world”. you can also download the other output named foo-output and see “Hallo world”.

If you have worked with workflow orchestrators before, this will seem very familiar, otherwise the file transfer between the two Jobs might feel like magic. The orchestrator is doing a number of things for us

The next few sections of the blog will go over these in more detail.

Planner

The planner component of the orchestrator is responsible for validating the flow and finding out which tasks can be deployed in parallel. To do that first we need to construct something called the adjacency list. Let’s take a more complex flow example

The illustration shows an example flow definition without all the details, only the dependencies between the tasks. These dependencies can be visualized as a graph. The adjacency list for this graph would look like this

{
    "task-a": ["task-b", "task-c", "task-d", "task-e"],
    "task-b": ["task-d"],
    "task-c": ["task-d"],
    "task-d": ["task-e"],
    "task-e": []
}

This is very easy to construct from the flow definition. You can also identify circular dependencies while doing so. We can also use indices instead of task names and make it a list of set of integers instead. So in Rust, it would look like this

vec![
    BTreeSet::from([1, 2, 3, 4]),
    BTreeSet::from([3]),
    BTreeSet::from([3]),
    BTreeSet::from([4]),
    BTreeSet::from([]),
]

The next step is to figure out a series of deploys (kubectl apply). For our example here,

You can think if these deploys as a series of steps, or stages. The representation of this would look like this

{
    "stage-0": ["task-e"],
    "stage-1": ["task-d"],
    "stage-2": ["task-b", "task-c"],
    "stage-3": ["task-a"]
}

Again like before we can use indices instead of task names, and make this representation a list of set of integers

vec![
    BTreeSet::from([4]),
    BTreeSet::from([3]),
    BTreeSet::from([2, 1]),
    BTreeSet::from([0]),
]

This representation is what I call the “plan” or “stages”. The “plan” or “stages” describes a series of deploys to run a workflow. So the planner first constructs the adjacency list and then transforms the adjacency list to the plan representation. Going from the adjacency list to the plan representation needs careful thinking, you also need to take care of indirect dependencies (example, task-b is indirectly dependent on task-e via task-d).

The source for the planner can be found here. The planner also performs other validations on the flow definition.

Task

Now that we have a “plan”, we need to start deploying and running the tasks. Each task in the flow definition gets converted to a Kubernetes Job manifest and the orchestrator uses the Kubernetes API to deploy them. We will concentrate on what happens inside the task in this section.

Our orchestrator allows us to declare file paths as inputs and outputs. The input files “magically” appear in the task container and the outputs are “auto-magically” uploaded. This is possible because the tasks are deployed as a Job with an init container and a shared in memory volume. The init container is then used to inject an executable into the task container via the volume.

The command that is specified in the flow definition is not the actual command that is run, that command is instead passed to the injected executable. The injected executable then downloads all the inputs that is necessary for the task, runs the passed in command as a subprocess and then uploads all the declared outputs. Let’s take a look at an example manifest deployed by the orchestrator to get a more clear understanding

apiVersion: batch/v1
kind: Job
metadata:
  name: flow-1-task-replace-letter-a
spec:
  backoffLimit: 0
  template:
    metadata:
      name: replace-letter-a
      labels:
        flowmium.io/flow-id: '1'
        flowmium.io/task-id: '1'
    spec:
      initContainers:
        - name: init
          image: docker.io/shnoo28/flowmium:latest
          command:
            - /flowmium
            - init
            - /flowmium
            - /var/run/flowmium
          volumeMounts:
            - mountPath: /var/run
              name: executable
      containers:
        - name: replace-letter-a
          image: debian:latest
          command:
            - /var/run/flowmium
            - task
            - sh
            - -c
            - cat /task-input.txt | sed 's/a/e/g' > /task-output-replace-letter-a.txt
          env:
            - name: FLOWMIUM_INPUT_JSON
              value: '[{"from":"foo-output","path":"/task-input.txt"}]'
            - name: FLOWMIUM_OUTPUT_JSON
              value: '[{"name":"replace-letter-a-output","path":"/task-output-replace-letter-a.txt"}]'
            - name: FLOWMIUM_FLOW_ID
              value: '1'
            - name: FLOWMIUM_ACCESS_KEY
              value: minio
            - name: FLOWMIUM_SECRET_KEY
              value: password
            - name: FLOWMIUM_BUCKET_NAME
              value: flowmium-test
            - name: FLOWMIUM_TASK_STORE_URL
              value: http://flowmium-minio-service.default.svc.cluster.local:9000
          volumeMounts:
            - mountPath: /var/run
              name: executable
      volumes:
        - emptyDir:
            medium: Memory
          name: executable
      restartPolicy: Never

Let’s inspect this manifest and note down our observations

Now we know how to convert each task in the flow definition into a Kubernetes Job manifest and how it works. Notice that the user controls the image for the task at line 27, which comes from the flow definition. This is the reason why we needed the init container, we can’t just ask users to always use an image with our executable already present. This also means that our executable will be run in an unknown environment, so it should be capable of running on any Linux distribution and cannot assume any dependencies to be present.

So the executable cannot be written in any language that requires a runtime like Python or Javascript, we have to use a compiled language. Even when we use a compiled language, we will face issues with dynamically linked libraries. What dynamically linked libraries are is beyond the scope of this blog, but the summary is when you use a compiled language you have the choice of producing a dynamically linked executable or a statically linked executable. When you produce a dynamically linked executable, that executable might not run in other distributions. So for example if you compile a program in Ubuntu, it might not run in Debian.

You can confirm if your executable is statically linked or dynamically linked by using the ldd command. If the ldd command looks like below, it means it is statically linked (which is what we want)

$ ldd flowmium
    not a dynamic executable

Golang and Zig produce statically linked executable by default, so an executable produce by them will run in any distribution. Rust and C++ by default produce dynamically linked executable. So that means we have to figure out how to make Rust produce a statically linked executable instead, which I will go over in a later section.

So because the executable is statically linked and does not require anything from the distribution, the docker image for the init container uses FROM scratch (see here), which means the image only depends on the host’s kernel and nothing else. So if you exec into this docker image and ask for a shell, you won’t be able to because the shell does not exist in this image, nothing does, only the executable.

$ docker run -it docker.io/shnoo28/flowmium:latest sh
docker: Error response from daemon: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "sh": executable file not found in $PATH: unknown.
ERRO[0000] error waiting for container: context canceled

Scheduler

We now can convert each task in the flow definition to a deployable Kubernetes Job manifest and we also have a plan to tell us when to deploy each one of those manifests. Next we need something that persist plans for each flow (assuming in a database), track which stage each flow is in, track which tasks have completed/failed/running, fetch the next stage once a stage is complete etc. That something is what I call the scheduler. The scheduler persists the status of every flow in a table and also acts like storage for past runs.

Let’s go over the scheduler table’s schema and the operations it provides over that table to understand it in more detail

Column Type Description
id int Unique id for the flow
plan json Plan representation stored as json. Eg. "[[0],[1,2],[3]]"
current_stage int Current stage the flow is in
running_tasks int[] Indices of tasks that are running
finished_tasks int[] Indices of tasks that have finished running
failed_tasks int[] Indices of tasks that have failed
flow_status enum Status of the flow, can be one of pending, running, success and failed
task_definitions json JSON dump of contents of the tasks key in the flow definition
flow_name text Name of the flow

That’s the schema, let’s take a look at an example record to understand it more clearly

Column Type Value
id int 1
plan json "[[0],[1,2],[3]]"
current_stage int 1
running_tasks int[] [1, 2]
finished_tasks int[] [0]
failed_tasks int[] []
flow_status enum running
task_definitions json ..
flow_name text "test-flow"

This record tells us that the flow has the plan "[[0],[1,2],[3]]", so it has three stages and four tasks. It is currently in stage 1, which means all tasks in stage 0 have finished successfully. The tasks [1, 2] are running, and tasks [0] have finished (which checks out with the current stage). The status of the flow is running.

Now let’s take a look at all the operations that the scheduler provides over the table (all signatures are in pseudo-code and body in pseudo-query). Task means one of the elements from the tasks key of the flow definition. The Task type can be converted to a Kubernetes Job manifest and can be deployed as seen in the previous section. Also task_id is just the index of a task as seen on the plan representation.

Every task will go through mark_task_running first and end up at either end up at mark_task_finished or mark_task_failed. Notice that the mark_* operations also set flow_status when appropriate.

The operation schedule_tasks returns the set of tasks in the next stage if all tasks in current stage have finished (see the third and the last conditions). So only if mark_task_finished was called on all the tasks in current stage, schedule_tasks will return tasks in the next stage. The second condition is for bootstrapping the first stage.

Make sure to go over these operations multiple times, will really come handy in the next section! The scheduler is fully written in SQL, the source can be found here. The last operation is implemented as a single SQL statement, it really is a head scratcher.

Executor

The last thing that is remaining for our orchestrator is to tie everything together. We have to accept requests to create a new flow, use the Kubernetes API to deploy tasks and poll for their status and use the scheduler to persist and fetch tasks to be deployed. Let’s call this wire up the executor. Let’s start taking a look at some Rust like pseudo-code to see how all this works.

fn create_flow(sched: Scheduler, flow_definition: Flow) {
    let plan = create_plan(flow_definition)
    let task_definitions = flow_definition.tasks
    let flow_name = flow_definition.name

    sched.create_flow(flow_name, plan, task_definitions)
}

The first function we will look at is create_flow. This function accepts a flow definition and creates a new record in the scheduler by calling create_flow. It also calls the planner to create a plan first before doing so.

fn deploy_tasks(flow_id: i32, tasks: Vec<(i32, Task)>) {
    for (task_id, task) in tasks {
        let manifest = to_job_manifest(flow_id, task_id, task)
        deploy(manifest)

        sched.mark_task_running(task_id)
    }
}

The next function is deploy_tasks. This accepts a list of Task along with their task_id (i32), converts each task into a Kubernetes Job manifest and then deploys it. Remember Task is just one of the elements from the tasks key in the flow definition and task_id is the index. The to_job_manifest also sets flow_id and task_id as labels so it can be polled for status later (see line 11 and 12 in the manifest on the Task section).

fn mark_task(sched: Scheduler, flow_id: i32, task_id: i32) {
    let k8s_status = get_status(flow_id, task_id);

    match k8s_status {
        K8sFinished => sched.mark_task_finished(flow_id, task_id)
        K8sFailed => sched.mark_task_failed(flow_id, task_id)
        K8sPending | K8sRunning => return
    }
}

This function polls Kubernetes for status of a task and calls the appropriate mark_* function. Notice that we are not calling mark_task_running, because that would have already been called by deploy_tasks and is redundant here, once a task is running it will stay in running until it fails or succeeds. The get status function polls kubernetes by using the labels set in the manifest by deploy_tasks.

fn schedule_and_run_tasks(sched: Scheduler) {
    let flows = sched.get_running_or_pending_flow_ids();

    for (flow_id, running_tasks) in flows {
        let next_stage_tasks = sched.schedule_tasks(flow_id)

        if let Some(tasks) = next_stage_tasks {
            deploy_tasks(tasks)
        } else {
            for task_id in running_tasks {
                mark_task(sched, flow_id, task_id)
            }
        }
  }
}

This function schedule_and_run_tasks ties it all and is expected to be called periodically at a fixed interval in the main loop. Let’s go over the lifecycle of a workflow to understand this function

And that’s it. That is our orchestrator. Also remember this cycle of scheduling and marking is happening individually for each flow. What we have built is also a pretty robust, if at any point our orchestrator is killed, it will pick up right were it left off on restart because of all of the scheduler’s persisted book-keeping in the database.

Now all that is left is to make the main loop, which is trivial.

fn main() {
    let sched = Scheduler::new()

    http_server(|flow_definition_yaml| {
        create_flow(sched, deserialize(flow_definition_yaml))
    })

    loop {
        schedule_and_run_tasks(sched)
        sleep(1000)
    }
}

Besides calling schedule_and_run_tasks periodically at line 9, we also accept requests to create flows on line 5 which will eventually be picked up in subsequent calls to schedule_and_run_tasks. I have also ignored a lot of things here like handling errors such as unable to connect to kubernetes, a Job was deleted by the admin etc. You can see the source for the executor here, it looks a bit involved because of all the error handling.

Framework for Python

Most workflows and data crunching programs are written in Python. It is possible for us to define each task in Python, package each task into a container and define a YAML flow definition, but that involves a lot of grunt work. It would be nice if each task were just a plain function and dependencies between the task can be marked with decorators and that way users can stay in the Python world. This is exactly what the framework does, and it is actually pretty easy to create one, you don’t even need to do any FFI.

Let’s take a look at an example flow defined in Python

from flowmium import Flow

flow = Flow("testing")

@flow.task()
def hallo_world() -> str:
    return "Hallo world"

@flow.task({"input_str": hallo_world})
def replace_letter_a(input_str: str) -> str:
    return input_str.replace("a", "e")

if __name__ == "__main__":
    flow.run()

This is the Python version of the flow that you saw at the beginning of this blog. You can see each task is marked with a decorator @flow.task. The second task declares its dependency and inputs by passing a dict to the decorator. In {"input_str": hallo_world}, The key "input_str" is the name of one of the arguments to the function (task) and hallo_world is the source task (function) from where the input for the argument comes from. Anything that is returned from the function is considered as an output.

The decorator @flow.task replaces the function with a wrapped version of itself. When the wrapped version of the function is called, it deserializes and loads all the inputs it needs from pkl files, then runs the actual inner function then serializes and writes the output as pkl files.

Next this script is packaged into a docker container and uploaded to a registry that is accessible from Kubernetes.

FROM python:bookworm
RUN pip install flowmium
COPY . .

Then you can generate a flow definition by running this script and passing the name of the image uploaded to the registry

python3 my_flow.py \
    --image registry:5000/python-script-workflow-test:latest \
    --cmd 'python3 my_flow.py' \
    --flowmium-server http://localhost:8080 \
    --dry-run

This will run the flow.run() call, the flow object is aware of all the tasks that were registered using the @flow.task decorator, and it will generate a flow definition. If you remove the --dry-run argument, it will make a request and submit the flow to the orchestrator. The --cmd argument tells how to run the script within the container.

{
  "name": "testing",
  "tasks": [
    {
      "cmd": [
        "python3",
        "my_flow.py"
      ],
      "depends": [
      ],
      "env": [
        {
          "name": "FLOWMIUM_FRAMEWORK_TASK_ID",
          "value": "0"
        }
      ],
      "image": "registry:5000/python-script-workflow-test:latest",
      "inputs": [
      ],
      "name": "hallo-world",
      "outputs": [
        {
          "name": "hallo-world-output.pkl",
          "path": "task-output-hallo-world.pkl"
        }
      ]
    },
    {
      "cmd": [
        "python3",
        "my_flow.py"
      ],
      "depends": [
        "hallo-world"
      ],
      "env": [
        {
          "name": "FLOWMIUM_FRAMEWORK_TASK_ID",
          "value": "1"
        }
      ],
      "image": "registry:5000/python-script-workflow-test:latest",
      "inputs": [
        {
          "from": "hallo-world-output.pkl",
          "path": "task-inputs-input_str.pkl"
        }
      ],
      "name": "replace-letter-a",
      "outputs": [
        {
          "name": "replace-letter-a-output.pkl",
          "path": "task-output-replace-letter-a.pkl"
        }
      ]
    }
  ]
}

You can see inside the tasks (at line 6), the same script is called so the same flow.run() will be called inside the task also, but the environment variable FLOWMIUM_FRAMEWORK_TASK_ID is set (line 13), this will tell flow.run() to run one of the task functions instead. Each task function is assigned an index, which will be the value of FLOWMIUM_FRAMEWORK_TASK_ID environment variable when the flow is generated.

The source for the framework is here. You can also find the documentation for the framework here and examples here. The framework also includes extra things like changing the serializer for inputs and output.

We can build similar frameworks for any language. Even C if you are willing to write a CLI tool to parse and generate C code and use comments instead of decorators. Doing this for Rust is a very good macros exercise.

Rust developer experience

It is actually impressive how a language can be so low-level that it can be used for “systems programming” and writing firmware but also high level that we can use web frameworks like actix and have an async runtime like tokio, all that without a garbage collector and a great type system. Not that having a garbage collector is bad, just that not many languages support that. I did not run into any async or memory management ergonomics issues, but the project might not have been big enough also. There are plenty of packages available in cargo and I never felt I had to go out of the way to build something I did not want to. These are some of the highlights of my Rust experience

Structured Errors

Error handling in Rust is the best I have seen so far. I used thiserror and I was able to handle and report errors beautifully like this

$ flowctl download 2 replace-letter-a-output.pkl .
request error: error sending request for url (http://localhost:8080/api/v1/artefact/2/replace-letter-a-output.pkl): error trying to connect: tcp connect error: Connection refused (os error 61)

The Result and the Option types increased my “confidence” in my application not crashing.

Macros and type inference

Macros and type inference in Rust made me forget that I was using a strictly typed language sometimes. Like for example getting serializing and de-serializing requests for free

#[post("/job")]
async fn create_job(
    flow: web::Json<Flow>,
    sched: web::Data<Scheduler>,
) -> Result<String, ExecutorError> {
    ...
}

Here Flow is automatically deserialized and if it fails a 4xx response is sent back, all the done for you automatically. This is very similar to something like FastAPI in Python. Or the type inference here were I did not need to pass the type to decode a JSON value

fn print_flow(flow: Flow) {
    println!("Flow is {:?}", flow);
}

fn deserialize_and_print_flow(contents: &'static str) {
    let flow = serde_json::from_str(contents).unwrap();
    print_flow(flow);
}

On line 6, I do not need to do serde_json::from_str::<Flow> with the extra <Flow> of let flow: Flow with the extra : Flow, it just works.

SQLx compile-time checked queries

The compile-time checked queries from SQLx was very useful in writing the scheduler, it save me from a lot of back and forth with SQL-foo. While it was great, the issue with compile-time queries is that you can’t publish your project to cargo if you use them. The compile-time checked queries required the user to also have the database running for cargo install to work, otherwise cargo install would just fail with compile errors. It would have been nice to just have the compile-time checks for the queries but still use runtime queries.

Building static binaries

The only way this worked for me was cargo zigbuild and was the easiest to setup. Other ways like using musl targets or using the compiler flag target-feature=+crt-static required a lot of setup and running into obstacles like having musl setup on your system or the program crashing because it still was dependent on some dynamic lib.