How to postpone Airflow DAG until files get uploaded into an S3 bucket
When we have to postpone an Airflow DAG until files get uploaded into S3, we have two options. In this article, I am going to show you both and explain when to use them.
We can use the
S3PrefixSensor to detect that at least one file with a given prefix exists. To make it work, we need the bucket name and the prefix of the S3 object key:
1 2 3 4 5 6 from airflow.sensors.s3_prefix_sensor import S3PrefixSensor sensor = S3PrefixSensor( bucket_name='some_s3_bucket', prefix='key/of/the/object' )
S3PrefixSensor starts looking for the files in the root of the given bucket, so the following objects will be matched by it:
It makes sense to use the S3PrefixSensor when we don’t know the exact names of the files uploaded, and there is no risk that we start the DAG too early. Such an issue occurs when an external service uploads files to our S3 bucket. It has already uploaded a file that matches the sensor’s prefix, but there are more files to upload. In this situation, we may miss the other files if Airflow starts running too soon.
To avoid this issue, we may use the
TimeDeltaSensor to wait a few minutes after
S3PrefixSensor detects the matching file, use more than one
S3PrefixSensor to wait for all required files, or wait until a marker file gets uploaded to indicate that the upload has been finished.
When we use marker files (empty files that exist only to mark a directory as fully processed), we may prefer using the
S3KeySensor because, in this case, we usually know precisely the name of the file and the location where we expect the file.
The usage of the
S3KeySensor is similar to the prefix sensor:
1 2 3 4 5 from airflow.sensors.s3_key_sensor import S3KeySensor sensor = S3KeySensor( bucket_key='s3://some_s3_bucket/key/of/the/object/marker.file' )
This sensor waits for a file which key matches the given
bucket_key precisely and ignores all other files.
You may also like
- How to deal with the jinja2 TemplateNotFound error in Airflow
- Dependencies between DAGs: How to wait until another DAG finishes in Airflow?
- How to conditionally skip tasks in an Airflow DAG
- How to retrieve the statuses of the recent DAG executions from Airflow database
- How to run Airflow DAGs for a specified date in the past?