How to send a customized Slack notification when an Airflow task fails

Our Airflow DAG failed again. Nobody noticed for hours because the email notification got lost between hundreds of emails from GitHub, Jenkins, and a few other applications that send way too many messages. What can we do about it? Of course, the best option is to limit the overall number of emails, but what if we are not allowed to do that?

Can we get a Slack notification from Airflow? Sure! That is easy!

Slack webhook

The first thing we need is a configured Slack incoming webhook. It is an URL that receives POST requests and forwards every request to the pre-configured Slack channel. Because of that, we must keep that URL secret! Anyone who has access to it can send messages to our Slack!

We can configure the incoming hook using the steps described in the Slack documentation: https://api.slack.com/messaging/webhooks#getting_started.

Airflow callback

After defining a webhook, we must create a callback function in Airflow. The function gets an Airflow DAG context as the parameter and does not return anything. Inside this function, we will build the message and send it to the Slack webhook. To communicate with Slack, we will use the SlackWebhookOperator because it encapsulates the HTTP connection and we don’t need to worry about that code:

1
2
3
4
5
6
7
8
9
10
11
12
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

def alert_slack_channel(context):
    webhook = 'put here the webhook URL or read it from configuration'
    
    msg = 'here is the message' # we will change it in the next step
    
    SlackWebhookOperator(
        task_id='notify_slack_channel',
        http_conn_id=webhook,
        message=msg,
    ).execute(context=None)

After defining the callback function, we have to use it as the “on failure callback” in the Airflow DAG:

1
2
3
4
5
6
dag = DAG(
        ...
        default_args={
            'on_failure_callback': alert_slack_channel
        }
    )

Customize the message

The last step is message customization. We would like to include the name of the Airflow task that failed, a link to the log, and the error message.

First, let’s extract the task name from the DAG context:

1
2
last_task: Optional[TaskInstance] = context.get('task_instance')
task_name = last_task.task_id

After that, we create a link to the log:

1
log_link = f"<{last_task.log_url}|{task_name}>"

The last thing we need is the error message or the failure reason:

1
error_message = context.get('exception') or context.get('reason')

When we send a message using the webhook, we can use the same markup language as the one available in the Slack application. We can even use emoticons!

1
2
3
4
5
6
7
8
9
10
execution_date = context.get('execution_date')
title = f':red_circle: {task_name} has failed!'
msg_parts = {
    'Execution date': execution_date,
    'Log': log_link,
    'Error': error_message
}
msg = "\\n".join([title,
    *[f"*{key}*: {value}" for key, value in msg_parts.items()]
]).strip()

As a result, Slack displays a message which looks like this:

Slack notification
Slack notification

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.