How to conditionally skip tasks in an Airflow DAG
The more I use Airflow, the stranger uses cases I find. Recently, I implemented a DAG, which we are going to use for data backfills.
The problems started with a simple question: what is the DAG supposed to do when there is nothing to backfill?
For sure, we can manually disable it when we don’t need the DAG, but I am willing to bet my money on that someone will forget to pause the DAG, and it is going to run when it is not supposed to do anything.
Another way to avoid problems is to make sure that all tasks in the DAG don’t break anything if the output already exists. It’s not ideal for two reasons.
First, the sole purpose of a backfill DAG is to have an easy way to overwrite files when we need to fix something. It is supposed to destroy the existing data and replace it with the freshly calculated values.
Second, at some point, someone is going to add another task to that DAG. I will bet my money again that the author of that task is not going to read the documentation or other task source code before making the change.
Use XCom to store the backfill date
I am not using the backfill feature available in Airflow because I need to pass more parameters than just an execution date.
Also, it annoys me that the backfill feature does not schedule all DAG runs at once. I want to run only one script that setups the backfill and don’t do anything more. Fire and forget. No long-running screen sessions with SSH connection to Airflow to make sure that the backfill schedules all dates.
Because of that, my backfill DAG reads the execution date from a DynamoDB table. Of course, after reading the backfill parameters, it marks the particular backfill as started in the DynamoDB to avoid reprocessing in case of DAG failures.
As you saw in the subsection title, I use the XCom feature to store the backfill parameters. We can do it in two ways. I can either return a value from the function running in the PythonOperator, which automatically assigns it to the
return_value variable of the task or explicitly call the
xcom_push function inside my function.
Let me show you an example in which I use both methods. Note that I need access to the task instance, so the function run by the
PythonOperator takes the Airflow context as a parameter, and I have to set
provide_context to True.
1 2 3 4 5 6 7 8 9 10 11 12 def some_function(**kwargs): //some other code task_instance = kwargs['task_instance'] task_instance.xcom_push(key='backfill_param_1', value='value') ... return backfill_date PythonOperator( task_id='retrieve_next_backfill_date', python_callable=some_function, provide_context=True, dag=dag)
Stopping DAG execution conditionally
Now, I have the function that returns the next backfill date or None if there is nothing to backfill. I am going to read the
return_value and check whether it is None. If the backfill date is not defined, I have to stop the current DAG run.
In Airflow, we have the Sensors to trigger tasks when we observe a desired external state. In this case, I am going to use the
PythonSensor, which runs a Python function and continues running the DAG if the value returned by that function is truthy - boolean True or anything that produces True after being cast to a boolean.
Once again, my function needs access to the Airflow context to use XCom functions. I also need the name of the task that stored the variable in XCom and the variable name.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 def should_continue_backfill(**kwargs): task_instance = kwargs['task_instance'] backfill_date = task_instance.xcom_pull( 'retrieve_next_backfill_date', key='return_value' ) return backfill_date PythonSensor( task_id='should_continue_backfill', python_callable=should_continue_backfill, provide_context=True, timeout=10, dag=dag)
Skip DAG instead of failing the run
By default, the sensor either continues the DAG or marks the DAG execution as failed. This is not what I want. If the DAG has nothing to backfill, it should skip all the remaining tasks, not fail the DAG. Fortunately, there is a simple configuration parameter that changes the sensor behavior. When I set
soft_fail to True, it skips the tasks instead of marking them as failed.
PythonSensor waits for too long
BaseSensorOperator parameters (inherited by
PythonSensor) are not ideal in this case. By default, it calls the Python function every minute for a week before it gives up! That would be ok if I were querying the DynamoDB table inside the function called by the
PythonSensor. I don’t want to do that, so the sensor must not wait for that long.
To change the maximal waiting time, I change the timeout parameter to 10 seconds. Strangely, because of that, the PythonSensor waits over a minute before it skips the tasks. It turns out that the timeout is evaluated after the function is called, so if my function returns False, the task sleeps for 60 seconds before it re-evaluates the conditions (configurable using the
poke_interval parameter, 60 seconds by default).
You may also like
- Dependencies between DAGs: How to wait until another DAG finishes in Airflow?
- Data flow - what functional programming and Unix philosophy can teach us about data streaming
- How to send metrics to AWS CloudWatch from custom Python code
- Why my Airflow tasks got stuck in "no_status" and how I fixed it
- How to speed up a PySpark job