Why my Airflow tasks got stuck in "no_status" and how I fixed it

Recently, I came across an annoying problem. One of our Airflow DAGs were not scheduling tasks. The issue looked very strange because it wasn’t happening all the time.

In the case of some DAG runs, everything was running normally. On other occasions, Airflow was scheduling and running half of the tasks, but the other half got stuck in the no_status state.

I could run the task manually. When I did that, the manually triggered task was doing its job, but the next task was not getting scheduled either.

My two mistakes

I was trying to find the problem by looking at the code and comparing it with other DAGs that don’t have the same issue.

Here, I made the first mistake. I could not spot the difference between normally running DAGs and the faulty one, even though there was a difference in the DAG configuration.

I SSHed to the Airflow scheduler to look at the scheduler log. I saw that the scheduler was printing ValueError in the log because it could not parse the value of enum in our code.

At this point, I made the second mistake. I noted it down as an issue to fix later when I finish dealing with not working scheduler and moved on.

Some failing code which I ignored

I did not pay attention to the error in the log because it occurred in a custom function we wrote to send notifications about errors, SLA misses, etc. to Slack channels. The function gets the type of failure and the DAG owner to figure out which Slack channel should receive the notification.

In addition to the function, we have an Owner enum which looks like this:

from enum Import Enum
class Owner(Enum):
    DE_TEAM = 'DE Team'
    # other teams that get notifications from Airflow

The ValueError was raised when we tried to parse the DAG owner value: Owner(dag.owner). It was failing because, for some reason, a few of our DAGs (including the failing one) had owners set to Airflow, DE Team.

It was strange, but again, I decided to deal with it later when I fix the issue with scheduling.

Long and fruitless debugging

In the DAG configuration, we were intentionally limiting the number of DAG runs and the running tasks. We have set the max_active_runs to 1, disabled the Airflow “catch up” feature, and limited the task concurrency to 1.

Because of that, my debugging attempts focused mostly on figuring out how those three parameters interact with each other and break task scheduling. I was sure it had to be caused by those settings. I just had to find the issue.

When my attempts to tweak concurrency parameters failed, I started changing the DAG structure. That one DAG was kind of complicated. It started with a few tasks running sequentially. After that, the tasks branched out to share the common upstream dependency. In the next step, the task paths merged again because of a common downstream task, run some additional steps sequentially, and branched out again in the end.

The dependencies definition looked like this (note that I changed variable names, task_5 is a BranchPythonOperator that picks one of two branches):

first_group = [a_1, a_2, ... a_6]
second_group = [b_1, b_2, ... b_6]

task_1 >> task_2 >> task_3 >> first_group >> task_4 >> task_5 >> [end_task, second_group]

I concluded that I could define it as a sequence of tasks. After all, I was not going to let multiple tasks run at the same time because that would mess up the result. So I put all of the tasks in a long sequence:

task_1 >> task_2 >> task_3 >> a_1 >> a_2 >> ...

The only branching left was the BranchPythonOperator, but the tasks in the second group were running in a sequence.

It did not solve the problem.

The difference that should not matter

I was looking at the differences between the tasks again. This time, I focused on the DAG owners. For some reason, the faulty DAG was owned by Airflow and DE Team at the same time. That was strange because we always assign the owner to one of the teams.

I searched for the code that sets Airflow as the DAG owner. I could not find it, so it had to be somewhere in the Airflow configuration. There it was. In the operators section of the airflow.cfg file, I saw default_owner = Airflow.

But why does it use the default owner? When we create a new instance of DAG, we explicitly pass the owner’s name. There were no Airflow, just “DE Team!” So what was wrong?

I cloned the Airflow source code and began the search for DAG owner. The property is defined in the dag.py file and looks like this:

", ".join({t.owner for t in self.tasks})

Why? Why are some of my tasks without an owner? I was clicking every single task in the Airflow UI to check its owner.

At the end of my DAG, in the second_group of tasks, I found the problem: Airflow as the owner. I looked at the source code of my DAG and noticed that all of the tasks assigned to the default owner don’t have the dag parameter specified.

If they were instances of the DummyOperator, the code would look like this:

task = DummyOperator(task_id="some_id")

instead of:

task = DummyOperator(task_id="some_id", dag=dag_instance)

I added the dag to all of the tasks and redeployed the DAG configuration. I did not expect anything to happen, but the previously stuck task instance started running! That was strange, but maybe DAG redeployment triggered it. I was sure that it is going to get stuck again after processing the currently running task, but it did not happen! The DAG was running!

You gotta be kidding me

How could the DAG owner prevent the scheduler from scheduling the tasks? It could not be the real cause of the problem! After all, the DAG owner was wrong all of the time, but the DAG wasn’t always getting stuck.

I opened the Airflow UI again and looked for differences between the DAG runs that ran successfully and the ones that were getting stuck. It looked that the faulty DAG runs had more data to process, but how could it affect scheduling, and how is it related to the DAG owner?

I looked at our code that sends notifications to see when do we send them. There were only two cases: when a task failed and when we miss an SLA. I checked the DAG runs again. All of the faulty ones were missing the SLA.

A series of unfortunate events

When our tasks were running for too long, our code tried to send a notification to the Slack channel. This function was failing because we had an invalid value of the DAG owner property, and it did not match any of the Owner enum values. Because of that, Airflow was continuously trying to send the notification. As a result, Airflow could not execute the code that schedules the tasks because of a ValueError happening earlier.

All of that happened, because I did not set the dag property of some tasks and Airflow assigned them to the default owner.

The hypothesis sounds crazy, but could it be true? How do I test it?

To verify my assumptions, I created a simple test DAG:

dag = DAG(..., dagrun_timeout=timedelta(minutes=1)

def sleep_more_than_timeout(**kwargs):
    sleep(120)

def print_some_stuff(**kwargs):
    print('some stuff')

task_1 = PythonOperator(
  task_id = 'sleep',
  python_calllable=sleep_more_than_timeout,
  provide_context=True,
  dag=dag)

task_2 = PythonOperator(
  task_id = 'print',
  python_calllable=print_some_stuff,
  provide_context=True,
  dag=dag)

task_1 >> task_2

As expected, this DAG was running without any issues.

In the next test, I removed the dag property of the second task. This time, the first task ran successfully, but it exceeded the SLA. Airflow was trying to notify me about it, but the code was failing because of a ValueError caused by the invalid DAG owner. As a consequence, the second task got stuck in the no_status state. Everything was just like my production DAG.

Prevention

We do code reviews, but somehow nobody noticed the missing property. More reviews or more procedures will not help us avoid making the same mistake in the future.

I decided that the best solution is to overwrite the notification recipient when we can’t get it from the DAG properties.

In the function that sends notifications, I added a try ... except block which sends an additional notification about invalid DAG owner configuration to our Slack channel and changes the recipient of the original error message to DE Team:

try:
  owner = Owner(dag.owner)
except ValueError as exc:
  _send_notification_about_invalid_owner(dag, exc)
  owner = Owner.DE_TEAM

# continue sending the original alert
Older post

What is Kafka log compaction, and how does it work?

How the log compaction is implemented in Apache Kafka and how to configure Kafka log compaction properly

Newer post

How does Kafka Connect work?

How does a Connector work? What is a Worker in Kafka Connect? How does the data get processed inside Kafka Connect, and why does it need internal Kafka topics?