Anomaly detection in Airflow DAG using Prophet library

What is the most critical question data engineering teams have to answer? “Is it correct?” We get data from external systems that we, in most cases, cannot control, so we should know what values are possible and expected. It is good to know how the data is produced and understand the business process. It is also good to validate the data and check whether it looks “normal.”

This article shows how to implement an Airflow operator to train a Prophet model for time-series forecasting and use the model to detect anomalies in data.

Let’s assume that we have an Airflow DAG processing events from other applications. The first check we want is verifying whether the number of events is correct. Of course, the value varies every day. The value also depends on the day of the week. Most likely, the amount of data differs on weekends and holidays. Fortunately, the Prophet library handles seasonality and holidays automatically.

Getting the data

Let’s start implementing the Airflow operator. We need the function returning historical data for training the model and the number of events on the current day. I don’t know your data source, so I’ll not implement data retrieval, but I want to point out the expected data format. We have to return a Pandas DataFrame containing a column “ds” with the date or datetime (can be a string) and the “y” column containing the value of the time series. In this case, the number of events generated on a given day.

We also need a DataFrame containing a single row with the datetime (the day to be predicted) and the actual number of events.

In Prophet, we can also include additional feature columns (regressors). I’ll not cover it in this tutorial but remember to use the add_regressor function when you need to have more features in the model.

Here is an example function returning constant values in the expected format:

import pandas as pd

def get_data():
    historical = pd.DataFrame([
        ["2021-01-01", "287"],
        ...
        ["2021-01-31", "295"]
    ], columns=["ds", "y"])

    to_be_predicted = pd.DataFrame([[
        '2021-02-01'
    ]], columns=["ds"])

    actual = 291

    return historical, to_be_predicted, actual

Configuring the model

In the next step, we have to create the Prophet model configuration. In this example, I use default values for every parameter, but if you want to tweak the model, you should do that in this function:

from fbprophet import *

def create_model():
    return Prophet()

Sending a notification

There are two ways to handle the anomalies. We can either throw an error to stop the DAG or send a notification without stopping the DAG. Let’s implement configurable notifications. If we pass a function to the Airflow operator, it calls the function. Otherwise, it raises an error.

Here is a function sending Slack notifications. Note that we have to add the Slack webhook as a connection in Airflow:

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

def send_to_slack(actual_value, lower_bound, upper_bound):
    SlackWebhookOperator(
        task_id='send_to_slack',
        http_conn_id=SLACK_CONNECTION_ID,
        message=f'{actual_value} is not in the expected range: ({lower_bound}, {upper_bound})'
    ).execute(context=None)

Defining the Airflow operator

Now, we have everything we need to implement the Airflow operator:

from airflow.models import BaseOperator

class ProphetAnomalyDetectionOperator(BaseOperator):
    def __init__(
            self,
            input_data_function,
            create_model_function,
            on_anomaly=None,
            *args,
            **kwargs):
        super().__init__(*args, **kwargs)

        self.input_data_function = input_data_function
        self.create_model_function = create_model_function
        self.on_anomaly_function = on_anomaly

    def execute(self, context):
        historical, current_independent_features, current_dependent_feature = self.input_data_function()
        model = self.create_model_function()

        model.fit(historical)
        prediction = model.predict(current_independent_features)

        lower_bound = prediction[['yhat_lower']].values[0][0]
        upper_bound = prediction[['yhat_upper']].values[0][0]

        if current_dependent_feature > upper_bound or current_dependent_feature < lower_bound:
            if self.on_anomaly_function:
                self.on_anomaly_function(current_dependent_feature, upper_bound, lower_bound)
            else:
                raise ValueError(f'{current_dependent_feature} lower of higher than the expected values: ({lower_bound}, {upper_bound})')

This operator uses the historical data to train a Prophet model, makes a prediction using the current value’s independent features, and compares the forecast to the actual value.

By default, Prophet returns a prediction with 80% confidence interval. The upper bound of the confidence interval is in the yhat_upper variable. The lower bound is in the yhat_lower variable. If the actual value is not within the predicted range of values, we report it as an anomaly.

Using the ProphetAnomalyDetectionOperator

Before we use the operator, we have to add all of the dependencies to Airflow. Prophet requires installing development tools (like GCC compiler and python3-dev library) and pystan Python library. If you use Airflow in a Docker container and get the error: error: command 'gcc' failed with exit status 4, increase the RAM used by Docker (you need at least 4GB).

To add the anomaly detection operator to an Airflow DAG, use this code:

anomaly_detection = ProphetAnomalyDetectionOperator(
    task_id="verify_number_of_events",
    input_data_function = get_data,
    create_model_function = create_model,
    on_anomaly=None,
    dag=dag
)

some_task_that_should_run_first >> anomaly_detection

Of course, when we use Prophet like this, we use an Airflow worker to train the model. It may take some time, so in the future blog posts, I’ll show you how to use Dask and an EMR cluster to train Prophet models.

Older post

How to test REST API contract using BDD

Testing a REST API using Behave in Python

Newer post

How to use AWS Batch to run a Python script

How to build a Docker image, define an AWS Batch job using Terraform, and run the AWS Batch job using Airflow