Oct 25, 2023
I built a workflow orchestrator partly to learn Rust and partly because I was curious how existing orchestrators worked. In the context of Kubernetes, a workflow orchestrator helps you manage, deploy and chain multiple jobs. You can define a tree of jobs or tasks, where one task is dependent on output of another task. This is really useful for creating pipelines that crunch and transform large amounts of data or perform intensive computations with several steps.
In this blog we will go over the inner workings of a workflow orchestrator I created, called Flowmium and also my developer experience with Rust while I worked on this project. We will also take a look at a small and minimal Python framework that was developed on top of Flowmium that lets us define workflows where a Kubernetes job is a plain function and dependencies between jobs can be marked with decorators.
Let’s take a look at a simple example flow to get a feel for what our orchestrator can do
name: testing
tasks:
- name: hallo-world
image: debian:latest
depends: []
cmd:
- sh
- -c
- echo $MESSAGE >> /task-output-foo.txt
env:
- name: MESSAGE
value: Hallo world
outputs:
- name: foo-output
path: /task-output-foo.txt
- name: replace-letter-a
image: debian:latest
depends:
- hallo-world
cmd:
- sh
- -c
- cat /task-input.txt | sed 's/a/e/g' > /task-output-replace-letter-a.txt
env: []
inputs:
- from: foo-output
path: /task-input.txt
outputs:
- name: replace-letter-a-output
path: /task-output-replace-letter-a.txt
This flow defines two tasks (defined at line 3 and line 16) that will each run in its own Kubernetes Job.
/task-output-foo.txt
(line 9). This file
is declared as the first task’s output with the name foo-output
(line 14).
foo-output
available
at the path /task-input.txt
(line 26).
/task-input.txt
, replaces the
letter “a” with an “e” and writes “Hello world” to
/task-output-replace-letter-a.txt
(line 23). This file like before is declared
as the second task’s output with the name replace-letter-a-output
(line 29).
So we just defined two Kubernetes Jobs that have to run serially one after another, chained the output of the first Job into the other one. Notice the declaration of dependency in line 19. You can try and run this flow to see it in action. First you will have to setup Flowmium on your machine by following these very straightforward instructions.
Then you can run this flow using the helper CLI tool
flowctl --url http://localhost:8080 submit test-flow.yaml
You can download the output artefact by running
flowctl --url http://localhost:8080 download <flow-id> replace-letter-a-output .
If you open the file, you will see “Hello world”. you can also download the other output named
foo-output
and see “Hallo world”.
If you have worked with workflow orchestrators before, this will seem very familiar, otherwise the file transfer between the two Jobs might feel like magic. The orchestrator is doing a number of things for us
The next few sections of the blog will go over these in more detail.
The planner component of the orchestrator is responsible for validating the flow and finding out which tasks can be deployed in parallel. To do that first we need to construct something called the adjacency list. Let’s take a more complex flow example
The illustration shows an example flow definition without all the details, only the dependencies between the tasks. These dependencies can be visualized as a graph. The adjacency list for this graph would look like this
{
"task-a": ["task-b", "task-c", "task-d", "task-e"],
"task-b": ["task-d"],
"task-c": ["task-d"],
"task-d": ["task-e"],
"task-e": []
}
This is very easy to construct from the flow definition. You can also identify circular dependencies while doing so. We can also use indices instead of task names and make it a list of set of integers instead. So in Rust, it would look like this
vec![
BTreeSet::from([1, 2, 3, 4]),
BTreeSet::from([3]),
BTreeSet::from([3]),
BTreeSet::from([4]),
BTreeSet::from([]),
]
The next step is to figure out a series of deploys (kubectl apply
). For our
example here,
task-e
first, because it is not dependent on any other
tasks at all.
task-e
is finished we can deploy task-d
. We can’t deploy
task-b
and task-c
yet, because they are dependent on
task-d
.
task-d
is finished, now we can deploy task-b
and
task-c
at the same time. We can do this because
task-b
and task-c
are independent of each other.
task-a
, which depends on a lot of other tasks.You can think if these deploys as a series of steps, or stages. The representation of this would look like this
{
"stage-0": ["task-e"],
"stage-1": ["task-d"],
"stage-2": ["task-b", "task-c"],
"stage-3": ["task-a"]
}
Again like before we can use indices instead of task names, and make this representation a list of set of integers
This representation is what I call the “plan” or “stages”. The “plan” or “stages” describes a
series of deploys to run a workflow. So the planner first constructs the adjacency list and
then transforms the adjacency list to the plan representation. Going from the adjacency list
to the plan representation needs careful thinking, you also need to take care of indirect
dependencies (example, task-b
is indirectly dependent on task-e
via
task-d
).
The source for the planner can be found here. The planner also performs other validations on the flow definition.
Now that we have a “plan”, we need to start deploying and running the tasks. Each task in the flow definition gets converted to a Kubernetes Job manifest and the orchestrator uses the Kubernetes API to deploy them. We will concentrate on what happens inside the task in this section.
Our orchestrator allows us to declare file paths as inputs and outputs. The input files
“magically” appear in the task container and the outputs are “auto-magically” uploaded. This
is possible because the tasks are deployed as a Job
with an
init container
and a shared
in memory volume.
The init container is then used to inject an executable into the task container via the
volume.
The command that is specified in the flow definition is not the actual command that is run, that command is instead passed to the injected executable. The injected executable then downloads all the inputs that is necessary for the task, runs the passed in command as a subprocess and then uploads all the declared outputs. Let’s take a look at an example manifest deployed by the orchestrator to get a more clear understanding
apiVersion: batch/v1
kind: Job
metadata:
name: flow-1-task-replace-letter-a
spec:
backoffLimit: 0
template:
metadata:
name: replace-letter-a
labels:
flowmium.io/flow-id: '1'
flowmium.io/task-id: '1'
spec:
initContainers:
- name: init
image: docker.io/shnoo28/flowmium:latest
command:
- /flowmium
- init
- /flowmium
- /var/run/flowmium
volumeMounts:
- mountPath: /var/run
name: executable
containers:
- name: replace-letter-a
image: debian:latest
command:
- /var/run/flowmium
- task
- sh
- -c
- cat /task-input.txt | sed 's/a/e/g' > /task-output-replace-letter-a.txt
env:
- name: FLOWMIUM_INPUT_JSON
value: '[{"from":"foo-output","path":"/task-input.txt"}]'
- name: FLOWMIUM_OUTPUT_JSON
value: '[{"name":"replace-letter-a-output","path":"/task-output-replace-letter-a.txt"}]'
- name: FLOWMIUM_FLOW_ID
value: '1'
- name: FLOWMIUM_ACCESS_KEY
value: minio
- name: FLOWMIUM_SECRET_KEY
value: password
- name: FLOWMIUM_BUCKET_NAME
value: flowmium-test
- name: FLOWMIUM_TASK_STORE_URL
value: http://flowmium-minio-service.default.svc.cluster.local:9000
volumeMounts:
- mountPath: /var/run
name: executable
volumes:
- emptyDir:
medium: Memory
name: executable
restartPolicy: Never
Let’s inspect this manifest and note down our observations
/var/run
.
/var/run/flowmium
.
/flowmium init <src> <dest>
where init
is
equivalent cp
and notice that the src
is itself
FLOWMIUM_INPUT_JSON
environment variable. This is just the contents of the
inputs
key from the flow definition. The executable will go ahead and download
all inputs from s3 like storage.
FLOWMIUM_OUTPUT_JSON
environment variable.
Now we know how to convert each task in the flow definition into a Kubernetes
Job
manifest and how it works. Notice that the
user controls the image for the task at line 27, which comes from the flow
definition. This is the reason why we needed the init container, we can’t just ask users to
always use an image with our executable already present. This also means that our executable
will be run in an unknown environment, so it should be capable of running on any Linux
distribution and cannot assume any dependencies to be present.
So the executable cannot be written in any language that requires a runtime like Python or Javascript, we have to use a compiled language. Even when we use a compiled language, we will face issues with dynamically linked libraries. What dynamically linked libraries are is beyond the scope of this blog, but the summary is when you use a compiled language you have the choice of producing a dynamically linked executable or a statically linked executable. When you produce a dynamically linked executable, that executable might not run in other distributions. So for example if you compile a program in Ubuntu, it might not run in Debian.
You can confirm if your executable is statically linked or dynamically linked by using the
ldd
command. If the ldd
command looks like below, it means it is
statically linked (which is what we want)
$ ldd flowmium
not a dynamic executable
Golang and Zig produce statically linked executable by default, so an executable produce by them will run in any distribution. Rust and C++ by default produce dynamically linked executable. So that means we have to figure out how to make Rust produce a statically linked executable instead, which I will go over in a later section.
So because the executable is statically linked and does not require anything from the
distribution, the docker image for the init container uses FROM scratch
(see
here), which means the image only depends on the host’s kernel and nothing else. So if you exec
into this docker image and ask for a shell, you won’t be able to because the shell does not
exist in this image, nothing does, only the executable.
$ docker run -it docker.io/shnoo28/flowmium:latest sh
docker: Error response from daemon: failed to create shim task: OCI runtime create failed: runc create failed: unable to start container process: exec: "sh": executable file not found in $PATH: unknown.
ERRO[0000] error waiting for container: context canceled
We now can convert each task in the flow definition to a deployable Kubernetes Job manifest and we also have a plan to tell us when to deploy each one of those manifests. Next we need something that persist plans for each flow (assuming in a database), track which stage each flow is in, track which tasks have completed/failed/running, fetch the next stage once a stage is complete etc. That something is what I call the scheduler. The scheduler persists the status of every flow in a table and also acts like storage for past runs.
Let’s go over the scheduler table’s schema and the operations it provides over that table to understand it in more detail
Column | Type | Description |
---|---|---|
id |
int |
Unique id for the flow |
plan |
json |
Plan representation stored as json. Eg. "[[0],[1,2],[3]]"
|
current_stage |
int |
Current stage the flow is in |
running_tasks |
int[] |
Indices of tasks that are running |
finished_tasks |
int[] |
Indices of tasks that have finished running |
failed_tasks |
int[] |
Indices of tasks that have failed |
flow_status |
enum |
Status of the flow, can be one of pending , running ,
success and failed
|
task_definitions |
json |
JSON dump of contents of the tasks key in the flow definition
|
flow_name |
text |
Name of the flow |
That’s the schema, let’s take a look at an example record to understand it more clearly
Column | Type | Value |
---|---|---|
id |
int |
1 |
plan |
json |
"[[0],[1,2],[3]]" |
current_stage |
int |
1 |
running_tasks |
int[] |
[1, 2] |
finished_tasks |
int[] |
[0] |
failed_tasks |
int[] |
[] |
flow_status |
enum |
running |
task_definitions |
json |
.. |
flow_name |
text |
"test-flow" |
This record tells us that the flow has the plan "[[0],[1,2],[3]]"
, so it has
three stages and four tasks. It is currently in stage 1, which means all tasks in stage 0 have
finished successfully. The tasks [1, 2]
are running, and tasks
[0]
have finished (which checks out with the current stage). The status of the
flow is running
.
Now let’s take a look at all the operations that the scheduler provides over the table (all
signatures are in pseudo-code and body in pseudo-query). Task
means one of the
elements from the tasks
key of the flow definition. The Task
type
can be converted to a Kubernetes Job manifest and can be deployed as seen in the previous
section. Also task_id
is just the index of a task as seen on the plan
representation.
create_flow(flow_name: String, plan: Vec<Set<int>>, task_definitions:
Vec<Task>)
Create a record in the table with current_stage = 0
and
flow_status = pending
. The *_tasks
are set to empty array.
The tasks
key from the flow definition is deserialized and passed as
Vec<Task>
mark_task_running(flow_id: i32, task_id: i32)
Append value task_id
into running_tasks
and set
flow_status = running
Where id is flow_id
mark_task_finished(flow_id: i32, task_id: i32)
Remove value task_id
from running_tasks
and append it to
finished_tasks
,
After append, if number of finished_tasks
is equal to total tasks in
task_definitions
, then set flow_status = success
,
Where id is flow_id
mark_task_failed(flow_id: i32, task_id: i32)
Remove value task_id
from running_tasks
and append it to
failed_tasks
and set flow_status = failed
Where id is flow_id
.
get_running_or_pending_flow_ids() -> Vec<(i32, Vec<i32>)>
Return tuples of (id
and running_tasks
)
Where
flow_status = running OR flow_status = pending
schedule_tasks(flow_id: i32) -> Option<Vec<(i32, Task)>>
Where id is flow_id
,
If flow_status
is failed
or success
don’t
return anything (None
)
If flow_status is pending
then return tuples of
(task_id
and task_definitions[task_id]
)
for task_id
in plan[0]
(stage 0).
If finished_tasks
does not contain all elements in
plan[current_stage]
don’t return anything (None
)
If current_stage
is the last stage (plan.length - 1
), don’t
return anything (None
)
If flow_status is running
then return tuples of
(task_id
and task_definitions[task_id]
)
for task_id
in plan[current_stage]
and
then increment current_stage+=1
afterwards.
Every task will go through mark_task_running
first and end up at
either end up at mark_task_finished
or
mark_task_failed
. Notice that the mark_*
operations also set
flow_status
when appropriate.
The operation schedule_tasks
returns the set of tasks in the
next stage if all tasks in current stage have finished (see the third and the last
conditions). So only if mark_task_finished
was called on all the
tasks in current stage, schedule_tasks
will return tasks in the
next stage. The second condition is for bootstrapping the first stage.
Make sure to go over these operations multiple times, will really come handy in the next section! The scheduler is fully written in SQL, the source can be found here. The last operation is implemented as a single SQL statement, it really is a head scratcher.
The last thing that is remaining for our orchestrator is to tie everything together. We have to accept requests to create a new flow, use the Kubernetes API to deploy tasks and poll for their status and use the scheduler to persist and fetch tasks to be deployed. Let’s call this wire up the executor. Let’s start taking a look at some Rust like pseudo-code to see how all this works.
fn create_flow(sched: Scheduler, flow_definition: Flow) {
let plan = create_plan(flow_definition)
let task_definitions = flow_definition.tasks
let flow_name = flow_definition.name
sched.create_flow(flow_name, plan, task_definitions)
}
The first function we will look at is create_flow
. This function accepts a flow
definition and creates a new record in the scheduler by calling
create_flow
. It also calls the planner to create a plan first before doing so.
fn deploy_tasks(flow_id: i32, tasks: Vec<(i32, Task)>) {
for (task_id, task) in tasks {
let manifest = to_job_manifest(flow_id, task_id, task)
deploy(manifest)
sched.mark_task_running(task_id)
}
}
The next function is deploy_tasks
. This accepts a list of Task
along
with their task_id
(i32
), converts each task into a Kubernetes Job
manifest and then deploys it. Remember Task
is just one of the elements from the
tasks
key in the flow definition and task_id
is the index. The
to_job_manifest
also sets flow_id
and task_id
as labels
so it can be polled for status later (see line 11 and 12 in the manifest on the
Task section).
fn mark_task(sched: Scheduler, flow_id: i32, task_id: i32) {
let k8s_status = get_status(flow_id, task_id);
match k8s_status {
K8sFinished => sched.mark_task_finished(flow_id, task_id)
K8sFailed => sched.mark_task_failed(flow_id, task_id)
K8sPending | K8sRunning => return
}
}
This function polls Kubernetes for status of a task and calls the appropriate
mark_*
function. Notice that we are not calling
mark_task_running
, because that would have already been called by deploy_tasks
and is redundant
here, once a task is running it will stay in running until it fails or succeeds. The get
status function polls kubernetes by using the labels set in the manifest by
deploy_tasks
.
fn schedule_and_run_tasks(sched: Scheduler) {
let flows = sched.get_running_or_pending_flow_ids();
for (flow_id, running_tasks) in flows {
let next_stage_tasks = sched.schedule_tasks(flow_id)
if let Some(tasks) = next_stage_tasks {
deploy_tasks(tasks)
} else {
for task_id in running_tasks {
mark_task(sched, flow_id, task_id)
}
}
}
}
This function schedule_and_run_tasks
ties it all and is expected to be called
periodically at a fixed interval in the main loop. Let’s go over the lifecycle of a workflow
to understand this function
First the flow is created by calling create_flow
, which creates a new record
in the scheduler with flow_status = pending
.
The flow will be picked up at line 2 by
get_running_or_pending_flow_ids
at some point (remember the
function is being called periodically).
schedule_tasks
at line 5 will return the tasks in the
stage-0.
All tasks in stage-0 will be deployed at line 8, and all of those tasks will be marked
running with mark_task_running
and
flow_status = running
will be set.
The next time schedule_and_run_tasks
is called,
schedule_tasks
at line 5 will not return anything. This is
because all the tasks in the current stage will have to be marked finished with
mark_task_finished
for it to return tasks in the next stage.
So the control will now flow to the next statement at line 10 and all the tasks in the current stage (stage-0) will be polled for status.
This polling will continue happening as schedule_and_run_tasks
is called at
regular interval in a loop, and at some point all tasks in the current stage will be
marked finished with mark_task_finished
.
In the next call, schedule_tasks
at line 5 will return the
tasks in the stage-1 because all tasks in stage-0 have been marked finished. Now all tasks
in stage-1 will be deployed.
In the next call to schedule_and_run_tasks
,
schedule_tasks
at line 5 will not return anything until all
tasks in stage-1 is complete.
So the control will now flow to the next statement at line 10 and all tasks in the current stage (stage-1) will be polled for status.
This cycle of marking all tasks in the current stage and deploying tasks in the next stage continues until the final stage is reached.
When the final stage is reached and all tasks have been marked finished,
mark_task_finished
on the final task will also set
flow_status = finished
. So in the next call to
schedule_and_run_tasks
the flow won’t be picked up at line 2. Now the flow is
complete.
If at any point any task fails, mark_task_failed
will be
called and flow_status = failed
will be set, and the flow will not be picked
up in subsequent calls at line 2.
And that’s it. That is our orchestrator. Also remember this cycle of scheduling and marking is happening individually for each flow. What we have built is also a pretty robust, if at any point our orchestrator is killed, it will pick up right were it left off on restart because of all of the scheduler’s persisted book-keeping in the database.
Now all that is left is to make the main loop, which is trivial.
fn main() {
let sched = Scheduler::new()
http_server(|flow_definition_yaml| {
create_flow(sched, deserialize(flow_definition_yaml))
})
loop {
schedule_and_run_tasks(sched)
sleep(1000)
}
}
Besides calling schedule_and_run_tasks
periodically at line 9, we also accept
requests to create flows on line 5 which will eventually be picked up in subsequent calls to
schedule_and_run_tasks
. I have also ignored a lot of things here like handling
errors such as unable to connect to kubernetes, a Job was deleted by the admin etc. You can
see the source for the executor
here, it looks a bit involved because of all the error handling.
Most workflows and data crunching programs are written in Python. It is possible for us to define each task in Python, package each task into a container and define a YAML flow definition, but that involves a lot of grunt work. It would be nice if each task were just a plain function and dependencies between the task can be marked with decorators and that way users can stay in the Python world. This is exactly what the framework does, and it is actually pretty easy to create one, you don’t even need to do any FFI.
Let’s take a look at an example flow defined in Python
from flowmium import Flow
flow = Flow("testing")
@flow.task()
def hallo_world() -> str:
return "Hallo world"
@flow.task({"input_str": hallo_world})
def replace_letter_a(input_str: str) -> str:
return input_str.replace("a", "e")
if __name__ == "__main__":
flow.run()
This is the Python version of the flow that you saw at the beginning of this blog. You can see
each task is marked with a decorator @flow.task
. The second task declares its
dependency and inputs by passing a dict to the decorator. In
{"input_str": hallo_world}
, The key "input_str"
is the name of one
of the arguments to the function (task) and hallo_world
is the source task
(function) from where the input for the argument comes from. Anything that is returned from
the function is considered as an output.
The decorator @flow.task
replaces the function with a wrapped version of itself.
When the wrapped version of the function is called, it deserializes and loads all the inputs
it needs from pkl files, then runs
the actual inner function then serializes and writes the output as
pkl
files.
Next this script is packaged into a docker container and uploaded to a registry that is accessible from Kubernetes.
Then you can generate a flow definition by running this script and passing the name of the image uploaded to the registry
python3 my_flow.py \
--image registry:5000/python-script-workflow-test:latest \
--cmd 'python3 my_flow.py' \
--flowmium-server http://localhost:8080 \
--dry-run
This will run the flow.run()
call, the flow
object is aware of all the tasks that were registered using the
@flow.task
decorator, and it will generate a flow definition. If you remove the
--dry-run
argument, it will make a request and submit the flow to the
orchestrator. The --cmd
argument tells how to run the script within the
container.
{
"name": "testing",
"tasks": [
{
"cmd": [
"python3",
"my_flow.py"
],
"depends": [
],
"env": [
{
"name": "FLOWMIUM_FRAMEWORK_TASK_ID",
"value": "0"
}
],
"image": "registry:5000/python-script-workflow-test:latest",
"inputs": [
],
"name": "hallo-world",
"outputs": [
{
"name": "hallo-world-output.pkl",
"path": "task-output-hallo-world.pkl"
}
]
},
{
"cmd": [
"python3",
"my_flow.py"
],
"depends": [
"hallo-world"
],
"env": [
{
"name": "FLOWMIUM_FRAMEWORK_TASK_ID",
"value": "1"
}
],
"image": "registry:5000/python-script-workflow-test:latest",
"inputs": [
{
"from": "hallo-world-output.pkl",
"path": "task-inputs-input_str.pkl"
}
],
"name": "replace-letter-a",
"outputs": [
{
"name": "replace-letter-a-output.pkl",
"path": "task-output-replace-letter-a.pkl"
}
]
}
]
}
You can see inside the tasks (at line 6), the same script is called so the same
flow.run()
will be called inside the task also, but the environment variable
FLOWMIUM_FRAMEWORK_TASK_ID
is set (line 13), this will tell
flow.run()
to run one of the task functions instead. Each task function is
assigned an index, which will be the value of
FLOWMIUM_FRAMEWORK_TASK_ID
environment variable when the flow is generated.
The source for the framework is here. You can also find the documentation for the framework here and examples here. The framework also includes extra things like changing the serializer for inputs and output.
We can build similar frameworks for any language. Even C if you are willing to write a CLI tool to parse and generate C code and use comments instead of decorators. Doing this for Rust is a very good macros exercise.
It is actually impressive how a language can be so low-level that it can be used for “systems programming” and writing firmware but also high level that we can use web frameworks like actix and have an async runtime like tokio, all that without a garbage collector and a great type system. Not that having a garbage collector is bad, just that not many languages support that. I did not run into any async or memory management ergonomics issues, but the project might not have been big enough also. There are plenty of packages available in cargo and I never felt I had to go out of the way to build something I did not want to. These are some of the highlights of my Rust experience
Error handling in Rust is the best I have seen so far. I used thiserror and I was able to handle and report errors beautifully like this
$ flowctl download 2 replace-letter-a-output.pkl .
request error: error sending request for url (http://localhost:8080/api/v1/artefact/2/replace-letter-a-output.pkl): error trying to connect: tcp connect error: Connection refused (os error 61)
The Result
and the Option
types increased my “confidence” in my
application not crashing.
Macros and type inference in Rust made me forget that I was using a strictly typed language sometimes. Like for example getting serializing and de-serializing requests for free
#[post("/job")]
async fn create_job(
flow: web::Json<Flow>,
sched: web::Data<Scheduler>,
) -> Result<String, ExecutorError> {
...
}
Here Flow
is automatically deserialized and if it fails a 4xx response is sent
back, all the done for you automatically. This is very similar to something like
FastAPI in Python. Or the type inference here were
I did not need to pass the type to decode a JSON value
fn print_flow(flow: Flow) {
println!("Flow is {:?}", flow);
}
fn deserialize_and_print_flow(contents: &'static str) {
let flow = serde_json::from_str(contents).unwrap();
print_flow(flow);
}
On line 6, I do not need to do serde_json::from_str::<Flow>
with the extra
<Flow>
of let flow: Flow
with the extra : Flow
,
it just works.
The compile-time checked queries from
SQLx was very useful in writing the
scheduler, it save me from a lot of back and forth with SQL-foo. While it was great, the issue
with compile-time queries is that you can’t publish your project to cargo if you use them. The
compile-time checked queries required the user to also have the database running for
cargo install
to work, otherwise cargo install
would just fail with
compile errors. It would have been nice to just have the compile-time checks for the queries
but still use runtime queries.
The only way this worked for me was
cargo zigbuild and was the easiest
to setup. Other ways like using musl targets or using the compiler flag
target-feature=+crt-static
required a lot of setup and running into obstacles
like having musl setup on your system or the program crashing because it still was dependent
on some dynamic lib.