How to use AWSAthenaOperator in Airflow to verify that a DAG finished successfully

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (20/100)

How do we know that an Airflow DAG ran successfully? For sure, we should check the DAG run status and the statuses of all tasks, but is it enough to see that were no errors? What if everything is ok only because there was no data in the data source?

This article shows how to use the AWSAthenaOperator to check that an Athena SQL statement returns data. I use it to verify that we have input data in a given partition before running a DAG and ensure that data analysts can access the output.

First, we need to import the BaseOperator and the AWSAthenaOperator:

1
2
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.models import BaseOperator

Now, we will create a custom Airflow operator:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AWSAthenaTableNotEmpty(BaseOperator):
    template_fields = ('query', 'database')
    template_ext = ('.sql')

    def __init__(
            self,
            query: str,
            database: str,
            s3_output_locations: str,
            *args,
            **kwargs):

        super().__init__(*args, **kwargs)
        self.extra_dag_args = args
        self.extra_dag_kwargs = kwargs

        self.query = query
        self.database = database
        self.output_location = "s3://" + s3_output_location

        self.run_query_task = None

In the execute function, we will run the query, retrieve the results, and check that we received at least one row.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def execute(self, context):
    self.run_query_task = AWSAthenaOperator(query=self.query,
                                        database=self.database,
                                        output_location=self.output_location,
                                        *self.extra_dag_args,
                                        **self.extra_dag_kwargs)
    self.run_query_task.execute(context)

    query_execution_id = self.run_query_task.query_execution_id
    athena_hook = self.run_query_task.get_hook()
    athena_hook.get_conn() 
    raw_results = athena_hook.get_query_results(query_execution_id)
    
    result_count = len(raw_results['ResultSet']['Rows'][1:])

    assert result_count > 0,\
        'There are no rows in the table! ' \
        f'Database: {self.database} ' \
        f'Query: {self.query} '

We should also make sure that Airflow can stop the task, so we must implement the on_kill function:

1
2
3
def on_kill(self):
    if self.run_query_task:
        self.run_query_task.on_kill()

How to use the AWSAthenaTableNotEmpty operator

To check that we have the input data for an Airflow DAG or that it has produced some results, we should create an instance of the operator and put it either at the beginning or the end of the DAG:

1
2
3
4
5
6
7
8
9
10
verify_input = AWSAthenaTableNotEmpty(
        task_id=f'verify_input_not_empty',
        query=f'select * from some_input_table '
                  "WHERE year = '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}' "
                  "AND month = '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}' "
                  "AND day = '{{ macros.ds_format(ds, '%Y-%m-%d', '%d') }}' "
                  "LIMIT 1",
        database='{{ params.database_name }}')

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.