Airflow ======= .. important:: This tutorial requires soopervisor ``0.6.1`` or higher .. note:: This tutorial exports an Airflow DAG using the ``KubernetesPodOperator``, to use alternative Operators, see :doc:`Airflow cookbook <../cookbook/airflow>`. **Got questions?** Reach out to us on `Slack `_. This tutorial shows you how to export a Ploomber pipeline to Airflow. If you encounter any issues with this tutorial, `let us know `_. Pre-requisites -------------- * `docker `_ Building Docker image --------------------- We provide a Docker image so you can quickly run this example: .. code-block:: bash # get repository git clone https://github.com/ploomber/soopervisor cd soopervisor/tutorials/airflow # create a directory to store the pipeline output export SHARED_DIR=$HOME/ploomber-airflow mkdir -p $SHARED_DIR # build image docker build --tag ploomber-airflow . # start docker run -i -t -p 8080:8080 --privileged=true \ -v /var/run/docker.sock:/var/run/docker.sock \ --volume $SHARED_DIR:/mnt/shared-folder \ --env SHARED_DIR \ --env PLOOMBER_STATS_ENABLED=false \ ploomber-airflow /bin/bash .. note:: We need to run ``docker run`` in privileged mode since we'll be running ``docker`` commands inside the container. `More on that here `_ Create Kubernetes cluster ------------------------- By default, the Airflow integration exports each task in your pipeline as a Airflow task using the `KubernetesPodOperator `_, so we need to create a Kubernetes cluster to run the example: The Docker image comes with ``k3d`` pre-installed; let's create a cluster: .. code-block:: bash # create cluster k3d cluster create mycluster --volume $SHARED_DIR:/host # check cluster kubectl get nodes Get sample Ploomber pipeline ----------------------------- .. code-block:: bash # get example ploomber examples -n templates/ml-intermediate -o ml-intermediate cd ml-intermediate cp requirements.txt requirements.lock.txt # configure development environment pip install ploomber soopervisor pip install -r requirements.txt Configure target platform ------------------------- .. code-block:: bash # add a new target platform soopervisor add training --backend airflow Usually, you'd manually edit ``soopervisor.yaml`` to configure your environment; for this example, let's use one that we `already configured `_, which tells soopervisor to mount a local directory to every pod so we can review results later: .. code-block:: bash cp ../soopervisor-airflow.yaml soopervisor.yaml We must configure the project to store all outputs in the shared folder so we copy the `pre-configured file `_: .. code-block:: bash cp ../env-airflow.yaml env.yaml Submit pipeline --------------- .. code-block:: bash soopervisor export training --skip-tests --ignore-git # import image to the cluster k3d image import ml-intermediate:latest --cluster mycluster .. note:: ``k3d image import`` is only required if creating the cluster with ``k3d``. Once the export process finishes, you'll see a new ``training/`` folder with two files: ``ml-intermediate.py`` (Airflow DAG) and ``ml-intermediate.json`` (DAG structure). Customizing Airflow DAG ----------------------- The ``.py`` file generated by ``soopervisor export`` contains the logic to convert our pipeline into an Airflow DAG with basic defaults. However, we can further customize it. In our case, we need some initialization parameters in the generated ``KubernetesPodOperator`` tasks. Execute the following command to replace the generated file with one that has the appropriate settings: .. code-block:: bash cp ../ml-intermediate.py training/ml-intermediate.py Submitting pipeline ------------------- To execute the pipeline, move the generated files to your ``AIRFLOW_HOME``. For this example, ``AIRFLOW_HOME`` is ``/root/airflow``: .. code-block:: bash mkdir -p /root/airflow/dags cp training/ml-intermediate.py ~/airflow/dags cp training/ml-intermediate.json ~/airflow/dags ls /root/airflow/dags If everything is working, you should see the ``ml-intermediate`` DAG here: .. code-block:: sh airflow dags list Let's start the airflow UI and scheduler (this will take a few seconds): .. NOTE: we're starting airflow until this point because if we start it .. at the beginning and then add the DAG, Airflow won't pick it up .. code-block:: bash bash /start_airflow.sh Let's unpause the DAG then trigger the run: .. code-block:: sh airflow dags unpause ml-intermediate After unpausing, you should see the following message: Dag: ml-intermediate, paused: False If you don't, likely, the Airflow scheduler isn't ready yet, so wait for a few seconds and try again. Trigger execution: .. code-block:: sh airflow dags trigger ml-intermediate **Congratulations! You just ran Ploomber on Airflow! 🎉** .. note:: If you encounter issues with Airflow, you can find the logs at ``/airflow-scheduler.log`` and ``/airflow-webserver.log``. Monitoring execution status --------------------------- You may track execution progress from Airflow's UI by opening http://localhost:8080 (Username: ``ploomber``, Password: ``ploomber``) Alternatively, with the following command: .. skip-next .. code-block:: sh airflow dags state ml-intermediate {TIMESTAMP} The TIMESTAMP shows after running ``airflow dags trigger ml-intermediate``, for example, once you execute the ``airflow dags trigger`` command, you'll see something like this in the console: Created Then, you can get the execution status with: .. skip-next .. code-block:: sh airflow dags state ml-intermediate 2022-01-02T18:05:19+00:00 Incremental builds ------------------ Try exporting the pipeline again: .. code-block:: bash soopervisor export training --skip-tests --ignore-git You'll see a message like this: ``Loaded DAG in 'incremental' mode has no tasks to submit``. Soopervisor checks the status of your pipeline and only schedules tasks that have changed since the last run; since all your tasks are the same, there is nothing to run! Let's now modify one of the tasks and submit again: .. code-block:: bash # modify the fit.py task, add a print statement echo -e "\nprint('Hello from Kubernetes')" >> fit.py # re-build docker image soopervisor export training --skip-tests --ignore-git # import image k3d image import ml-intermediate:latest --cluster mycluster # copy files to the dags directory cp training/ml-intermediate.py ~/airflow/dags cp training/ml-intermediate.json ~/airflow/dags # trigger execution airflow dags trigger ml-intermediate If you open the UI, you'll see that this time, only the ``fit`` task ran because that's the only tasks whose source code change; we call this incremental builds, and they're a great feature for quickly running experiments in your pipeline such as changing model hyperparameters or adding new pre-processing methods; it saves a lot of time since you don't have to execute the entire pipeline every time. Clean up -------- To delete the cluster: .. code-block:: bash k3d cluster delete mycluster Using other Operator -------------------- If you want to generate Airflow DAGs using other operators, check out the :doc:`Airflow cookbook <../cookbook/airflow>`