Skip to main content

Argo Workflows Walkthrough

Introduction

Using a workflow manager can help you automate ETL pipelines, schedule regular analysis, or just chain together a sequence of functions. Argo is available on Nebari for workflow management. If you haven't already, check out the introductory documentation on using Argo.

For this tutorial we'll be using the Hera interface to Argo. This will allow us to write a workflow script in Python.

The most basic of all examples

The following workflow is perhaps the simplest possible workflow. There are quite a few bits here so we'll step through it.

Global Configuration

Nebari sets up several environment variables with tokens, etc. that enable us to use Argo more smoothly. However, there are two global configuration settings that we'll need to manually add to each workflow script.

from hera.shared import global_config
import os

global_config_host = f"https://{os.environ['ARGO_SERVER'].rsplit(':')[0]}{os.environ['ARGO_BASE_HREF']}/"
global_config.host = global_config_host
global_config.token = os.environ['HERA_TOKEN']

Workflow Labels

Next, we'll set some labels on our workflows. Because Nebari uses a service account token by default, we need to tell Argo which user we are. We also need to tell Argo to use the Nebari Workflow Controller so that we have access to our Nebari file system and conda environments from within the Argo pod (more information).

The workflow labels must be hexadecimal ASCII while the usernames have no such constraint so we have a helper function sanitize_labels to ensure that our label is valid for Argo.

import re

def sanitize_label(label: str) -> str:
"""
The sanitize_label function converts all characters that are not
alphanumeric or a - to their hexadecimal ASCII equivalent. This is
because kubernetes will complain if certain characters are being
used. This is the same approach taken by Jupyter for sanitizing.

On the Nebari Workflow Controller, there is a `desanitize_label`
function that reverses these changes so we can then perform a user
look up.

>>> sanitize_label("user@email.com")
user-40email-2ecom

Parameters
----------
label: str
Username to be sanitized (typically Jupyterhub username)

Returns
-------
Hexadecimall ASCII equivalent of `label`

"""
label = label.lower()
pattern = r"[^A-Za-z0-9]"
return re.sub(pattern, lambda x: "-" + hex(ord(x.group()))[2:], label)



username = os.environ["JUPYTERHUB_USER"]
labels = {
"workflows.argoproj.io/creator-preferred-username": sanitize_label(username),
'jupyterflow-override': 'true',
}

Time to Live Strategy

By default, Argo does not destroy servers after completion of a workflow. Because this can cause substantial unexpected cloud costs, we highly recommend always setting the "Time to live strategy", or TTLStrategy on every workflow.

note

The Argo UI will only show the workflow details until the TTLStrategy time has elapsed so make sure you have enough time to evaluate logs, etc. before those details are removed.

from hera.workflows.models import TTLStrategy

DEFAULT_TTL = 90
ttl_strategy = TTLStrategy(
seconds_after_completion=DEFAULT_TTL,
seconds_after_success=DEFAULT_TTL,
seconds_after_failure=DEFAULT_TTL,
)

Extra parameters

We also will be setting a node_selector parameter, but it is optional. If you do not include it, Argo will run on your current user instance. If you include it, Argo will run on the server type that you request. These server types correspond to the names in your nebari-config.yml. Nebari Jupyter instances are always in the user group so that's a good place to start, but you may want to use other CPU or GPU configurations that have been specified in your config. Its also important to note that if the node pool is limited to (1) node in the config, Argo will not be able to spin up. Also note that key in this dictionary refers to the cloud-specific kubernetes node selector label. For example, AWS uses "eks.amazonaws.com/nodegroup" while GCP uses "cloud.google.com/gke-nodepool".

The namespace parameter is set to dev by default, but Nebari sets it up as part of the environment variables so we'll pull it from there. The generate_name parameter allows us to give our job a prefix and Argo will add a suffix to ensure uniqueness. Lastly, we'll give the workflow an entrypoint. This parameter needs to match with the name of the Steps parameter you're using (or DAG).

Workflow constructor

Let's put this all together and have a closer look.

from hera.workflows import Steps, script
from hera.workflows import Workflow

@script()
def echo(message: str):
print(message)


with Workflow(
generate_name="hello-user",
entrypoint="steps",
node_selector={"eks.amazonaws.com/nodegroup": "user"},
labels=labels,
namespace=os.environ["ARGO_NAMESPACE"],
ttl_strategy=ttl_strategy,
) as w:
with Steps(name="steps"):
echo(arguments={"message": "hello"})

w.create()

Workflows are essentially managing a series of tasks for us. There are two basic mechanisms to construct these in Argo - Steps and DAGs.

For this example, we've used Steps. When you build your workflow with Steps as a context manager as we've done here, you can add as many methods as you'd like (for example, duplicating the call to echo()) and Argo will separate out these commands into individual DAG points in a series.

For example, if I wanted to run two separate functions, it might look like this

...
with Steps(name="steps"):
echo(arguments={"message": "hello"})
echo(arguments={"message": "goodbye"})

Each step would have its own resource management and logs within Argo. You can also tell Hera that the function calls within the Steps context manager should be run in parallel.

If you'd like even more control over your workflow, for example diamond or branching workflows, the DAG constructor will allow you to specify that level of complexity.

Beyond "Hello World"

We've proven that we can run something on Argo, but what about actually running some Python code? Let's review the requirements.

We've already discussed setting the jupyter-overrides label which tells Nebari to mount our home directory and the conda environments onto our Argo pod. We will also need to use a Docker image which has conda set up and initialized. We'll grab the Nebari Jupyter image. This has the added benefit of bringing parity between running a code on your Nebari instance and running on Argo.

"Argo Assistant" code

As you've seen, we're creating quite a bit of peripheral code and we're about to add even more. Let's bring some structure in to help us organize things. For this, we have a little "Argo Assistant" code that will help us out.

import logging
import os
import subprocess
from pathlib import Path

from hera.workflows import Container, Parameter, Steps, Workflow
from hera.workflows.models import TTLStrategy

LOGGER = logging.getLogger()

DEFAULT_TTL = 90 # seconds
DEFAULT_ARGO_NODE_TYPE = "user"
DEFAULT_K8S_SELECTOR_LABEL = "eks.amazonaws.com/nodegroup"


class NebariWorkflow(Workflow):
"""Hera Workflow object with required/reasonable default for running
on Nebari
"""

def __init__(self, **kwargs):
super().__init__(**kwargs)

if "ttl_strategy" in kwargs.keys():
self.ttl_strategy = kwargs["ttl_strategy"]
else:
self.ttl_strategy = TTLStrategy(
seconds_after_completion=DEFAULT_TTL,
seconds_after_success=DEFAULT_TTL,
seconds_after_failure=DEFAULT_TTL,
)

if "node_selector" in kwargs.keys():
self.node_selector = kwargs["node_selector"]
else:
self.node_selector = {DEFAULT_K8S_SELECTOR_LABEL: DEFAULT_ARGO_NODE_TYPE}

self.namespace = os.environ["ARGO_NAMESPACE"]

self.labels = {
"workflows.argoproj.io/creator-preferred-username": sanitize_label(username),
'jupyterflow-override': 'true',
}


def create_conda_command(
script_path,
conda_env,
stdout_path="stdout.txt",
):
"""Workflows need to be submitted via a bash command that runs a
python script. This function creates a conda run command that
will run a script from a given location using a given conda
environment.

Parameters
----------
script_path: str
Path to the python script (including extension) to be run on Argo
conda_env: str
Conda environment name in which to the run the `script_path`
stdout_path: str
Local Nebari path (for your user) for standard output from
the given script. Defaults to `stdout.txt`.

Returns
-------
String bash command
"""

conda_command = f'conda run -n {conda_env} python "{script_path}" >> {stdout_path}'
return conda_command


def create_bash_container(name="bash-container"):
"""Create a workflow container that is able to receive bash commands"""
bash_container = Container(
name="bash-container",
image="thiswilloverridden",
inputs=[
Parameter(name="bash_command")
], # inform argo that an input called bash_command is coming
command=["bash", "-c"],
args=["{{inputs.parameters.bash_command}}"], # use the input parameter
)
return bash_container


def submit_argo_script(script_path, conda_env, stdout_path="stdout.txt"):
"""Submit a script to be run via Argo in a specific environment"""
validated = validate_submission(script_path, conda_env)

if not validated:
raise RuntimeError("Unable to submit Argo workflow")

conda_command = create_conda_command(script_path, conda_env, stdout_path)

LOGGER.debug("Submitting command {conda_command} to Argo")

with NebariWorkflow(
generate_name="workflow-name-",
entrypoint="steps",
) as w:
bash_container = create_bash_container()
with Steps(
name="steps", # must match Workflow entrypoint
annotations={"go": "here"},
):
bash_container(
name="step-name",
arguments=[Parameter(name="bash_command", value=conda_command)],
)

workflow = w.create()
return workflow

Next, you'll need to create a python script and a conda environment. Then to submit the workflow to Argo you would run the high level command:

path = '/path/to/pyfile.py'
nebari_conda_env = 'analyst-workflow-env'
submit_argo_script(path, nebari_conda_env)

Now you can go to the Argo UI and monitor progress!

Conclusion

Well done! You've learned how to submit a python workflow to Argo and have a few extra tools to help you along.