How to use AWSAthenaOperator in Airflow to verify that a DAG finished successfully
How do we know that an Airflow DAG ran successfully? For sure, we should check the DAG run status and the statuses of all tasks, but is it enough to see that were no errors? What if everything is ok only because there was no data in the data source?
This article shows how to use the AWSAthenaOperator
to check that an Athena SQL statement returns data. I use it to verify that we have input data in a given partition before running a DAG and ensure that data analysts can access the output.
First, we need to import the BaseOperator
and the AWSAthenaOperator
:
1
2
from airflow.contrib.operators.aws_athena_operator import AWSAthenaOperator
from airflow.models import BaseOperator
Now, we will create a custom Airflow operator:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
class AWSAthenaTableNotEmpty(BaseOperator):
template_fields = ('query', 'database')
template_ext = ('.sql')
def __init__(
self,
query: str,
database: str,
s3_output_locations: str,
*args,
**kwargs):
super().__init__(*args, **kwargs)
self.extra_dag_args = args
self.extra_dag_kwargs = kwargs
self.query = query
self.database = database
self.output_location = "s3://" + s3_output_location
self.run_query_task = None
In the execute
function, we will run the query, retrieve the results, and check that we received at least one row.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def execute(self, context):
self.run_query_task = AWSAthenaOperator(query=self.query,
database=self.database,
output_location=self.output_location,
*self.extra_dag_args,
**self.extra_dag_kwargs)
self.run_query_task.execute(context)
query_execution_id = self.run_query_task.query_execution_id
athena_hook = self.run_query_task.get_hook()
athena_hook.get_conn()
raw_results = athena_hook.get_query_results(query_execution_id)
result_count = len(raw_results['ResultSet']['Rows'][1:])
assert result_count > 0,\
'There are no rows in the table! ' \
f'Database: {self.database} ' \
f'Query: {self.query} '
We should also make sure that Airflow can stop the task, so we must implement the on_kill
function:
1
2
3
def on_kill(self):
if self.run_query_task:
self.run_query_task.on_kill()
Would you like to help fight youth unemployment while getting mentoring experience?
Develhope is looking for tutors (part-time, freelancers) for their upcoming Data Engineer Courses.
The role of a tutor is to be the point of contact for students, guiding them throughout the 6-month learning program. The mentor supports learners through 1:1 meetings, giving feedback on assignments, and responding to messages in Discord channels—no live teaching sessions.
Expected availability: 15h/week. You can schedule the 1:1 sessions whenever you want, but the sessions must happen between 9 - 18 (9 am - 6 pm) CEST Monday-Friday.
Check out their job description.
(free advertisement, no affiliate links)
How to use the AWSAthenaTableNotEmpty operator
To check that we have the input data for an Airflow DAG or that it has produced some results, we should create an instance of the operator and put it either at the beginning or the end of the DAG:
1
2
3
4
5
6
7
8
9
10
verify_input = AWSAthenaTableNotEmpty(
task_id=f'verify_input_not_empty',
query=f'select * from some_input_table '
"WHERE year = '{{ macros.ds_format(ds, '%Y-%m-%d', '%Y') }}' "
"AND month = '{{ macros.ds_format(ds, '%Y-%m-%d', '%m') }}' "
"AND day = '{{ macros.ds_format(ds, '%Y-%m-%d', '%d') }}' "
"LIMIT 1",
database='{{ params.database_name }}')
You may also like