Conditionally pick an Airflow DAG branch using an SQL query

In addition to the BranchPythonOperator, which lets us execute a Python function that returns the ids of the subsequent tasks that should run, we can also use a SQL query to choose a branch. Of course, we will not do it by querying the SQL database in the Python function. There is a shorter way.

We need to add a BranchSQLOperator to our DAG. This operator is a little bit different than the BranchPythonOperator. In the case of the Python operator, the function returns the ids of the tasks to run. The SQL version of the operator expects a boolean value in the first column of the first row. Optionally, it can also return a numeric. Every non-zero value is interpreted as True. Zero = False. We can also use one of the string values with a boolean equivalent (take a look at the documentation if you want to use a string value).

If the value is true, Airflow will execute the tasks specified in the follow_task_ids_if_true parameter. Otherwise, it runs the tasks defined in the follow_task_ids_if_false argument.

operator = BranchSQLOperator(
    task_id="some_task_id",
    conn_id="sql_connection_id",
    sql="SELECT count(1) FROM a_table",
    follow_task_ids_if_true="task_to_execute_if_count_at_least_one",
    follow_task_ids_if_false="tasks_to_execute_if_counts_is_zero",
    dag=dag
)
Older post

How to trigger an Airflow DAG from another DAG

How to trigger another DAG from an Airflow DAG

Newer post

How Data Mechanics can reduce your Apache Spark costs by 70%

Stop wasting time and money tuning Apache Spark parameters