How to postpone Airflow DAG until files get uploaded into an S3 bucket

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (26/100)

When we have to postpone an Airflow DAG until files get uploaded into S3, we have two options. In this article, I am going to show you both and explain when to use them.

S3PrefixSensor

We can use the S3PrefixSensor to detect that at least one file with a given prefix exists. To make it work, we need the bucket name and the prefix of the S3 object key:

1
2
3
4
5
6
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor

sensor = S3PrefixSensor(
    bucket_name='some_s3_bucket',
    prefix='key/of/the/object'
)

The S3PrefixSensor starts looking for the files in the root of the given bucket, so the following objects will be matched by it:

  • s3://some_s3_bucket/key/of/the/object
  • s3://some_s3_bucket/key/of/the/object/another_one
  • s3://some_s3_bucket/key/of/the/objects

It makes sense to use the S3PrefixSensor when we don’t know the exact names of the files uploaded, and there is no risk that we start the DAG too early. Such an issue occurs when an external service uploads files to our S3 bucket. It has already uploaded a file that matches the sensor’s prefix, but there are more files to upload. In this situation, we may miss the other files if Airflow starts running too soon.

To avoid this issue, we may use the TimeDeltaSensor to wait a few minutes after S3PrefixSensor detects the matching file, use more than one S3PrefixSensor to wait for all required files, or wait until a marker file gets uploaded to indicate that the upload has been finished.



S3KeySensor

When we use marker files (empty files that exist only to mark a directory as fully processed), we may prefer using the S3KeySensor because, in this case, we usually know precisely the name of the file and the location where we expect the file.

The usage of the S3KeySensor is similar to the prefix sensor:

1
2
3
4
5
from airflow.sensors.s3_key_sensor import S3KeySensor

sensor = S3KeySensor(
    bucket_key='s3://some_s3_bucket/key/of/the/object/marker.file'
)

This sensor waits for a file which key matches the given bucket_key precisely and ignores all other files.

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.