How to postpone Airflow DAG until files get uploaded into an S3 bucket

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (26/100)

When we have to postpone an Airflow DAG until files get uploaded into S3, we have two options. In this article, I am going to show you both and explain when to use them.

S3PrefixSensor

We can use the S3PrefixSensor to detect that at least one file with a given prefix exists. To make it work, we need the bucket name and the prefix of the S3 object key:

1
2
3
4
5
6
from airflow.sensors.s3_prefix_sensor import S3PrefixSensor

sensor = S3PrefixSensor(
    bucket_name='some_s3_bucket',
    prefix='key/of/the/object'
)

The S3PrefixSensor starts looking for the files in the root of the given bucket, so the following objects will be matched by it:

  • s3://some_s3_bucket/key/of/the/object
  • s3://some_s3_bucket/key/of/the/object/another_one
  • s3://some_s3_bucket/key/of/the/objects

It makes sense to use the S3PrefixSensor when we don’t know the exact names of the files uploaded, and there is no risk that we start the DAG too early. Such an issue occurs when an external service uploads files to our S3 bucket. It has already uploaded a file that matches the sensor’s prefix, but there are more files to upload. In this situation, we may miss the other files if Airflow starts running too soon.

To avoid this issue, we may use the TimeDeltaSensor to wait a few minutes after S3PrefixSensor detects the matching file, use more than one S3PrefixSensor to wait for all required files, or wait until a marker file gets uploaded to indicate that the upload has been finished.



S3KeySensor

When we use marker files (empty files that exist only to mark a directory as fully processed), we may prefer using the S3KeySensor because, in this case, we usually know precisely the name of the file and the location where we expect the file.

The usage of the S3KeySensor is similar to the prefix sensor:

1
2
3
4
5
from airflow.sensors.s3_key_sensor import S3KeySensor

sensor = S3KeySensor(
    bucket_key='s3://some_s3_bucket/key/of/the/object/marker.file'
)

This sensor waits for a file which key matches the given bucket_key precisely and ignores all other files.


Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * data/machine learning engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group


This website DOES NOT use cookies
but you may still see the cookies set earlier if you have already visited it.