Conditionally pick an Airflow DAG branch using an SQL query
In addition to the
BranchPythonOperator, which lets us execute a Python function that returns the ids of the subsequent tasks that should run, we can also use a SQL query to choose a branch. Of course, we will not do it by querying the SQL database in the Python function. There is a shorter way.
We need to add a
BranchSQLOperator to our DAG. This operator is a little bit different than the
BranchPythonOperator. In the case of the Python operator, the function returns the ids of the tasks to run. The SQL version of the operator expects a boolean value in the first column of the first row. Optionally, it can also return a numeric. Every non-zero value is interpreted as True. Zero = False. We can also use one of the string values with a boolean equivalent (take a look at the documentation if you want to use a string value).
If the value is true, Airflow will execute the tasks specified in the
follow_task_ids_if_true parameter. Otherwise, it runs the tasks defined in the
1 2 3 4 5 6 7 8 operator = BranchSQLOperator( task_id="some_task_id", conn_id="sql_connection_id", sql="SELECT count(1) FROM a_table", follow_task_ids_if_true="task_to_execute_if_count_at_least_one", follow_task_ids_if_false="tasks_to_execute_if_counts_is_zero", dag=dag )
Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?
Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!
You may also like
- Data/MLOps engineer by day
- DevRel/copywriter by night
- Python and data engineering trainer
- Conference speaker
- Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
- Twitter: @mikulskibartosz