Airflow#
Important
This tutorial requires soopervisor 0.6.1
or higher
Note
This tutorial exports an Airflow DAG using the KubernetesPodOperator
, to
use alternative Operators, see Airflow cookbook.
Got questions? Reach out to us on Slack.
This tutorial shows you how to export a Ploomber pipeline to Airflow.
If you encounter any issues with this tutorial, let us know.
Pre-requisites#
Building Docker image#
We provide a Docker image so you can quickly run this example:
# get repository
git clone https://github.com/ploomber/soopervisor
cd soopervisor/tutorials/airflow
# create a directory to store the pipeline output
export SHARED_DIR=$HOME/ploomber-airflow
mkdir -p $SHARED_DIR
# build image
docker build --tag ploomber-airflow .
# start
docker run -i -t -p 8080:8080 --privileged=true \
-v /var/run/docker.sock:/var/run/docker.sock \
--volume $SHARED_DIR:/mnt/shared-folder \
--env SHARED_DIR \
--env PLOOMBER_STATS_ENABLED=false \
ploomber-airflow /bin/bash
Note
We need to run docker run
in privileged mode since we’ll be running
docker
commands inside the container.
More on that here
Create Kubernetes cluster#
By default, the Airflow integration exports each task in your pipeline as a Airflow task using the KubernetesPodOperator, so we need to create a Kubernetes cluster to run the example:
The Docker image comes with k3d
pre-installed; let’s create a cluster:
# create cluster
k3d cluster create mycluster --volume $SHARED_DIR:/host
# check cluster
kubectl get nodes
Get sample Ploomber pipeline#
# get example
ploomber examples -n templates/ml-intermediate -o ml-intermediate
cd ml-intermediate
cp requirements.txt requirements.lock.txt
# configure development environment
pip install ploomber soopervisor
pip install -r requirements.txt
Configure target platform#
# add a new target platform
soopervisor add training --backend airflow
Usually, you’d manually edit soopervisor.yaml
to configure your
environment; for this example, let’s use one that we
already configured,
which tells soopervisor to mount a local directory to every pod so we can review results later:
cp ../soopervisor-airflow.yaml soopervisor.yaml
We must configure the project to store all outputs in the shared folder so we copy the pre-configured file:
cp ../env-airflow.yaml env.yaml
Submit pipeline#
soopervisor export training --skip-tests --ignore-git
# import image to the cluster
k3d image import ml-intermediate:latest --cluster mycluster
Note
k3d image import
is only required if creating the cluster with k3d
.
Once the export process finishes, you’ll see a new training/
folder with
two files: ml-intermediate.py
(Airflow DAG) and
ml-intermediate.json
(DAG structure).
Customizing Airflow DAG#
The .py
file generated by soopervisor export
contains the logic to
convert our pipeline into an Airflow DAG with basic defaults. However, we
can further customize it. In our case, we need some initialization
parameters in the generated KubernetesPodOperator
tasks. Execute the
following command to replace the generated file with one that has the
appropriate settings:
cp ../ml-intermediate.py training/ml-intermediate.py
Submitting pipeline#
To execute the pipeline, move the generated files to your AIRFLOW_HOME
.
For this example, AIRFLOW_HOME
is /root/airflow
:
mkdir -p /root/airflow/dags
cp training/ml-intermediate.py ~/airflow/dags
cp training/ml-intermediate.json ~/airflow/dags
ls /root/airflow/dags
If everything is working, you should see the ml-intermediate
DAG here:
airflow dags list
Let’s start the airflow UI and scheduler (this will take a few seconds):
bash /start_airflow.sh
Let’s unpause the DAG then trigger the run:
airflow dags unpause ml-intermediate
After unpausing, you should see the following message:
Dag: ml-intermediate, paused: False
If you don’t, likely, the Airflow scheduler isn’t ready yet, so wait for a few seconds and try again.
Trigger execution:
airflow dags trigger ml-intermediate
Congratulations! You just ran Ploomber on Airflow! 🎉
Note
If you encounter issues with Airflow, you can find the logs at
/airflow-scheduler.log
and /airflow-webserver.log
.
Monitoring execution status#
You may track execution progress from Airflow’s UI by opening
http://localhost:8080 (Username: ploomber
, Password: ploomber
)
Alternatively, with the following command:
airflow dags state ml-intermediate {TIMESTAMP}
The TIMESTAMP shows after running airflow dags trigger ml-intermediate
,
for example, once you execute the airflow dags trigger
command, you’ll see
something like this in the console:
Created <DagRun ml-intermediate @ 2022-01-02T18:05:19+00:00: manual__2022-01-02T18:05:19+00:00, externally triggered: True>
Then, you can get the execution status with:
airflow dags state ml-intermediate 2022-01-02T18:05:19+00:00
Incremental builds#
Try exporting the pipeline again:
soopervisor export training --skip-tests --ignore-git
You’ll see a message like this: Loaded DAG in 'incremental' mode has no tasks to submit
.
Soopervisor checks the status of your pipeline and only schedules tasks that have changed
since the last run; since all your tasks are the same, there is nothing to run!
Let’s now modify one of the tasks and submit again:
# modify the fit.py task, add a print statement
echo -e "\nprint('Hello from Kubernetes')" >> fit.py
# re-build docker image
soopervisor export training --skip-tests --ignore-git
# import image
k3d image import ml-intermediate:latest --cluster mycluster
# copy files to the dags directory
cp training/ml-intermediate.py ~/airflow/dags
cp training/ml-intermediate.json ~/airflow/dags
# trigger execution
airflow dags trigger ml-intermediate
If you open the UI, you’ll see that this time, only the fit
task ran because
that’s the only tasks whose source code change; we call this incremental
builds, and they’re a great feature for quickly running experiments in your
pipeline such as changing model hyperparameters or adding new pre-processing
methods; it saves a lot of time since you don’t have to execute the entire
pipeline every time.
Clean up#
To delete the cluster:
k3d cluster delete mycluster
Using other Operator#
If you want to generate Airflow DAGs using other operators, check out the Airflow cookbook