How to conditionally skip tasks in an Airflow DAG

The more I use Airflow, the stranger uses cases I find. Recently, I implemented a DAG, which we are going to use for data backfills.

The problems started with a simple question: what is the DAG supposed to do when there is nothing to backfill?

For sure, we can manually disable it when we don’t need the DAG, but I am willing to bet my money on that someone will forget to pause the DAG, and it is going to run when it is not supposed to do anything.

Another way to avoid problems is to make sure that all tasks in the DAG don’t break anything if the output already exists. It’s not ideal for two reasons.

First, the sole purpose of a backfill DAG is to have an easy way to overwrite files when we need to fix something. It is supposed to destroy the existing data and replace it with the freshly calculated values.

Second, at some point, someone is going to add another task to that DAG. I will bet my money again that the author of that task is not going to read the documentation or other task source code before making the change.

Use XCom to store the backfill date

I am not using the backfill feature available in Airflow because I need to pass more parameters than just an execution date.

Also, it annoys me that the backfill feature does not schedule all DAG runs at once. I want to run only one script that setups the backfill and don’t do anything more. Fire and forget. No long-running screen sessions with SSH connection to Airflow to make sure that the backfill schedules all dates.

Because of that, my backfill DAG reads the execution date from a DynamoDB table. Of course, after reading the backfill parameters, it marks the particular backfill as started in the DynamoDB to avoid reprocessing in case of DAG failures.

As you saw in the subsection title, I use the XCom feature to store the backfill parameters. We can do it in two ways. I can either return a value from the function running in the PythonOperator, which automatically assigns it to the return_value variable of the task or explicitly call the xcom_push function inside my function.

Let me show you an example in which I use both methods. Note that I need access to the task instance, so the function run by the PythonOperator takes the Airflow context as a parameter, and I have to set provide_context to True.

def some_function(**kwargs):
    //some other code
    task_instance = kwargs['task_instance']
    task_instance.xcom_push(key='backfill_param_1', value='value')
    ...
    return backfill_date

PythonOperator(
             task_id='retrieve_next_backfill_date',
             python_callable=some_function,
             provide_context=True,
             dag=dag)

Stopping DAG execution conditionally

Now, I have the function that returns the next backfill date or None if there is nothing to backfill. I am going to read the return_value and check whether it is None. If the backfill date is not defined, I have to stop the current DAG run.

In Airflow, we have the Sensors to trigger tasks when we observe a desired external state. In this case, I am going to use the PythonSensor, which runs a Python function and continues running the DAG if the value returned by that function is truthy - boolean True or anything that produces True after being cast to a boolean.

Once again, my function needs access to the Airflow context to use XCom functions. I also need the name of the task that stored the variable in XCom and the variable name.

def should_continue_backfill(**kwargs):
    task_instance = kwargs['task_instance']
    backfill_date = task_instance.xcom_pull(
        'retrieve_next_backfill_date',
        key='return_value'
    )
   return backfill_date

PythonSensor(
			 task_id='should_continue_backfill',
			 python_callable=should_continue_backfill,
			 provide_context=True,
			 timeout=10,
			 dag=dag)

Skip DAG instead of failing the run

By default, the sensor either continues the DAG or marks the DAG execution as failed. This is not what I want. If the DAG has nothing to backfill, it should skip all the remaining tasks, not fail the DAG. Fortunately, there is a simple configuration parameter that changes the sensor behavior. When I set soft_fail to True, it skips the tasks instead of marking them as failed.

PythonSensor waits for too long

The default BaseSensorOperator parameters (inherited by PythonSensor) are not ideal in this case. By default, it calls the Python function every minute for a week before it gives up! That would be ok if I were querying the DynamoDB table inside the function called by the PythonSensor. I don’t want to do that, so the sensor must not wait for that long.

To change the maximal waiting time, I change the timeout parameter to 10 seconds. Strangely, because of that, the PythonSensor waits over a minute before it skips the tasks. It turns out that the timeout is evaluated after the function is called, so if my function returns False, the task sleeps for 60 seconds before it re-evaluates the conditions (configurable using the poke_interval parameter, 60 seconds by default).

Older post

The problem with software testing in data engineering

Why data engineers don't write unit tests?

Newer post

Measuring data quality using AWS Deequ

How to measure data quality in Athena tables using AWS Deequ running on an EMR cluster.