How to postpone Airflow DAG until files get uploaded into an S3 bucket

When we have to postpone an Airflow DAG until files get uploaded into S3, we have two options. In this article, I am going to show you both and explain when to use them.

S3PrefixSensor

We can use the S3PrefixSensor to detect that at least one file with a given prefix exists. To make it work, we need the bucket name and the prefix of the S3 object key:

from airflow.sensors.s3_prefix_sensor import S3PrefixSensor

sensor = S3PrefixSensor(
    bucket_name='some_s3_bucket',
    prefix='key/of/the/object'
)

The S3PrefixSensor starts looking for the files in the root of the given bucket, so the following objects will be matched by it:

  • s3://some_s3_bucket/key/of/the/object
  • s3://some_s3_bucket/key/of/the/object/another_one
  • s3://some_s3_bucket/key/of/the/objects

It makes sense to use the S3PrefixSensor when we don’t know the exact names of the files uploaded, and there is no risk that we start the DAG too early. Such an issue occurs when an external service uploads files to our S3 bucket. It has already uploaded a file that matches the sensor’s prefix, but there are more files to upload. In this situation, we may miss the other files if Airflow starts running too soon.

To avoid this issue, we may use the TimeDeltaSensor to wait a few minutes after S3PrefixSensor detects the matching file, use more than one S3PrefixSensor to wait for all required files, or wait until a marker file gets uploaded to indicate that the upload has been finished.

S3KeySensor

When we use marker files (empty files that exist only to mark a directory as fully processed), we may prefer using the S3KeySensor because, in this case, we usually know precisely the name of the file and the location where we expect the file.

The usage of the S3KeySensor is similar to the prefix sensor:

from airflow.sensors.s3_key_sensor import S3KeySensor

sensor = S3KeySensor(
    bucket_key='s3://some_s3_bucket/key/of/the/object/marker.file'
)

This sensor waits for a file which key matches the given bucket_key precisely and ignores all other files.

Older post

What is the difference between a transformation and an action in Apache Spark?

What is an action in Apache Spark? What do you understand as transformations in Apache Spark?

Newer post

How to deal with the jinja2 TemplateNotFound error in Airflow

How to fix the TemplateNotFound error while using a custom Airflow operator