How to send a customized Slack notification when an Airflow task fails

Our Airflow DAG failed again. Nobody noticed for hours because the email notification got lost between hundreds of emails from GitHub, Jenkins, and a few other applications that send way too many messages. What can we do about it? Of course, the best option is to limit the overall number of emails, but what if we are not allowed to do that?

Can we get a Slack notification from Airflow? Sure! That is easy!

Slack webhook

The first thing we need is a configured Slack incoming webhook. It is an URL that receives POST requests and forwards every request to the pre-configured Slack channel. Because of that, we must keep that URL secret! Anyone who has access to it can send messages to our Slack!

We can configure the incoming hook using the steps described in the Slack documentation: https://api.slack.com/messaging/webhooks#getting_started.



Airflow callback

After defining a webhook, we must create a callback function in Airflow. The function gets an Airflow DAG context as the parameter and does not return anything. Inside this function, we will build the message and send it to the Slack webhook. To communicate with Slack, we will use the SlackWebhookOperator because it encapsulates the HTTP connection and we don’t need to worry about that code:

1
2
3
4
5
6
7
8
9
10
11
12
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

def alert_slack_channel(context):
    webhook = 'put here the webhook URL or read it from configuration'
    
    msg = 'here is the message' # we will change it in the next step
    
    SlackWebhookOperator(
        task_id='notify_slack_channel',
        http_conn_id=webhook,
        message=msg,
    ).execute(context=None)

After defining the callback function, we have to use it as the “on failure callback” in the Airflow DAG:

1
2
3
4
5
6
dag = DAG(
        ...
        default_args={
            'on_failure_callback': alert_slack_channel
        }
    )

Customize the message

The last step is message customization. We would like to include the name of the Airflow task that failed, a link to the log, and the error message.

First, let’s extract the task name from the DAG context:

1
2
last_task: Optional[TaskInstance] = context.get('task_instance')
task_name = last_task.task_id

After that, we create a link to the log:

1
log_link = f"<{last_task.log_url}|{task_name}>"

The last thing we need is the error message or the failure reason:

1
error_message = context.get('exception') or context.get('reason')

When we send a message using the webhook, we can use the same markup language as the one available in the Slack application. We can even use emoticons!

1
2
3
4
5
6
7
8
9
10
execution_date = context.get('execution_date')
title = f':red_circle: {task_name} has failed!'
msg_parts = {
    'Execution date': execution_date,
    'Log': log_link,
    'Error': error_message
}
msg = "\\n".join([title,
    *[f"*{key}*: {value}" for key, value in msg_parts.items()]
]).strip()

As a result, Slack displays a message which looks like this:

Slack notification
Slack notification

Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group