Airflow

Important

This tutorial requires soopervisor 0.6.1 or higher

Note

Got questions? Reach out to us on Slack.

This tutorial shows you how to export a Ploomber pipeline to Airflow.

If you encounter any issues with this tutorial, let us know.

Pre-requisites

Building Docker image

We provide a Docker image so you can quickly run this example:

# get repository
git clone https://github.com/ploomber/soopervisor
cd soopervisor/tutorials/airflow

# create a directory to store the pipeline output
export SHARED_DIR=$HOME/ploomber-airflow
mkdir -p $SHARED_DIR

# build image
docker build --tag ploomber-airflow .

# start
docker run -i -t -p 8080:8080 --privileged=true \
    -v /var/run/docker.sock:/var/run/docker.sock \
    --volume $SHARED_DIR:/mnt/shared-folder \
    --env SHARED_DIR \
    --env PLOOMBER_STATS_ENABLED=false \
    ploomber-airflow /bin/bash

Note

We need to run docker run in privileged mode since we’ll be running docker commands inside the container. More on that here

Create Kubernetes cluster

By default, the Airflow integration exports each task in your pipeline as a Airflow task using the KubernetesPodOperator, so we need to create a Kubernetes cluster to run the example:

The Docker image comes with k3d pre-installed; let’s create a cluster:

# create cluster
k3d cluster create mycluster --volume $SHARED_DIR:/host

# check cluster
kubectl get nodes

Get sample Ploomber pipeline

# get example
ploomber examples -n templates/ml-intermediate -o ml-intermediate
cd ml-intermediate

cp environment.yml environment.lock.yml
# configure development environment
pip install ploomber soopervisor
pip install -r requirements.txt

Configure target platform

# add a new target platform
soopervisor add training --backend airflow

Usually, you’d manually edit soopervisor.yaml to configure your environment; for this example, let’s use one that we already configured, which tells soopervisor to mount a local directory to every pod so we can review results later:

cp ../soopervisor-airflow.yaml soopervisor.yaml

We must configure the project to store all outputs in the shared folder so we copy the pre-configured file:

cp ../env-airflow.yaml env.yaml

Submit pipeline

soopervisor export training --skip-tests --ignore-git

# import image to the cluster
k3d image import ml-intermediate:latest --cluster mycluster

Note

k3d image import is only required if creating the cluster with k3d.

Once the export process finishes, you’ll see a new training/ folder with two files: ml-intermediate.py (Airflow DAG) and ml-intermediate.json (DAG structure).

Customizing Airflow DAG

The .py file generated by soopervisor export contains the logic to convert our pipeline into an Airflow DAG with basic defaults. However, we can further customize it. In our case, we need some initialization parameters in the generated KubernetesPodOperator tasks. Execute the following command to replace the generated file with one that has the appropriate settings:

cp ../ml-intermediate.py training/ml-intermediate.py

Submitting pipeline

To execute the pipeline, move the generated files to your AIRFLOW_HOME. For this example, AIRFLOW_HOME is /root/airflow:

mkdir -p /root/airflow/dags
cp training/ml-intermediate.py ~/airflow/dags
cp training/ml-intermediate.json ~/airflow/dags

ls /root/airflow/dags

If everything is working, you should see the ml-intermediate DAG here:

airflow dags list

Let’s start the airflow UI and scheduler:

bash /start_airflow.sh

Let’s unpause the DAG then trigger the run:

airflow dags unpause ml-intermediate

After unpausing, you should see the following message:

Dag: ml-intermediate, paused: False

If you don’t, likely, the Airflow scheduler isn’t ready yet, so wait for a few seconds and try again.

Trigger execution:

airflow dags trigger ml-intermediate

Congratulations! You just ran Ploomber on Airflow! 🎉

Note

If you encounter issues with Airflow, you can find the logs at /airflow-scheduler.log and /airflow-webserver.log.

Monitoring execution status

You may track execution progress from Airflow’s UI by opening http://localhost:8080 (Username: ploomber, Password: ploomber)

Alternatively, with the following command:

airflow dags state ml-intermediate {TIMESTAMP}

The TIMESTAMP shows after running airflow dags trigger ml-intermediate, for example, once you execute the airflow dags trigger command, you’ll see something like this in the console:

Created <DagRun ml-intermediate @ 2022-01-02T18:05:19+00:00: manual__2022-01-02T18:05:19+00:00, externally triggered: True>

Then, you can get the execution status with:

airflow dags state ml-intermediate 2022-01-02T18:05:19+00:00

Incremental builds

Try exporting the pipeline again:

soopervisor export training --skip-tests --ignore-git

You’ll see a message like this: Loaded DAG in 'incremental' mode has no tasks to submit. Soopervisor checks the status of your pipeline and only schedules tasks that have changed since the last run; since all your tasks are the same, there is nothing to run!

Let’s now modify one of the tasks and submit again:

# modify the fit.py task, add a print statement
echo -e "\nprint('Hello from Kubernetes')" >> fit.py

# re-build docker image
soopervisor export training --skip-tests --ignore-git

# import image
k3d image import ml-intermediate:latest --cluster mycluster

# copy files to the dags directory
cp training/ml-intermediate.py ~/airflow/dags
cp training/ml-intermediate.json ~/airflow/dags

# trigger execution
airflow dags trigger ml-intermediate

If you open the UI, you’ll see that this time, only the fit task ran because that’s the only tasks whose source code change; we call this incremental builds, and they’re a great feature for quickly running experiments in your pipeline such as changing model hyperparameters or adding new pre-processing methods; it saves a lot of time since you don’t have to execute the entire pipeline every time.

Clean up

To delete the cluster:

k3d cluster delete mycluster

Using the DockerOperator

If you prefer so, you may switch KubernetesPodOperator for DockerOperator. Edit the generated .py file:

# ...
# ...

from airflow.providers.docker.operators.docker import DockerOperator

# ...
# ...

for task in spec['tasks']:
    DockerOperator(image=spec['image'],
                   command=task['command'],
                   dag=dag,
                   task_id=task['name'],
                   # other arguments you may want...
                   )

Attention

Due to a bug in the DockerOperator, we must set enable_xcom_pickling = True in airflow.cfg file. By default, this file is located at ~/airflow/airflow.cfg.