Dependencies between DAGs: How to wait until another DAG finishes in Airflow?

In this article, I am going to show how to set up dependencies between two DAGs. Imagine that I have a DAG that dumps data from production databases and another DAG that aggregates the raw data and pushes the result into a reporting database.

I want the second DAG to run when the first one finishes, but I don’t want to move its tasks into the first DAG because that would make a mess in the configuration.

Airflow does not allow to set up dependencies between DAGs explicitly, but we can use Sensors to postpone the start of the second DAG until the first one successfully finishes.

ExternalTaskSensor

To configure the sensor, we need the identifier of another DAG (we will wait until that DAG finishes). Additionally, we can also specify the identifier of a task within the DAG (if we want to wait for a single task). If we want to wait for the whole DAG we must set external_task_id = None.

1
2
3
4
5
6
7
8
9
10
11
12
13
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor

dag = DAG('dependency_dag', description='DAG with sensor', schedule_interval='* * * * *',
          start_date=datetime(2019, 7, 10))

sensor = ExternalTaskSensor(task_id='dag_sensor', external_dag_id = 'another_dag_id', external_task_id = None, dag=dag, mode = 'reschedule')

task = DummyOperator(task_id='some_task', retries=1, dag=dag)

task.set_upstream(sensor)

By default, the sensor waits for the successful execution of the dependency on the current day. We can also specify the status and time delta, so it is possible to run a dependency when a DAG fails or run it every hour. For details, look at the allowed_states and execution_delta parameters in the documentation.

Would you like to help fight youth unemployment while getting mentoring experience?

Develhope is looking for tutors (part-time, freelancers) for their upcoming Data Engineer Courses.

The role of a tutor is to be the point of contact for students, guiding them throughout the 6-month learning program. The mentor supports learners through 1:1 meetings, giving feedback on assignments, and responding to messages in Discord channels—no live teaching sessions.

Expected availability: 15h/week. You can schedule the 1:1 sessions whenever you want, but the sessions must happen between 9 - 18 (9 am - 6 pm) CEST Monday-Friday.

Check out their job description.

(free advertisement, no affiliate links)

How to use sensors

The sensor is just another type of task, so I create a new DAG which begins with a sensor. In the default configuration, the sensor checks the dependency status every minute.

After I configure the sensor, I should specify the rest of the tasks in the DAG. As I wrote in the previous paragraph, we use sensors like regular tasks, so I connect the task with the sensor using the upstream/downstream operator. I do it in the last line:

1
task.set_upstream(sensor)

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.


Bartosz Mikulski
Bartosz Mikulski * MLOps Engineer / data engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.