Remove a directory from S3 using Airflow S3Hook
Even though S3 has no concept of catalogs, we tend to put
/ as delimiters in the object keys and think of files with the same key prefix as files in the same directory. After all, when we open the S3 web interface, it looks like a file system with directories.
Because of that, removing files with a common prefix is an everyday use case, as it is the S3 equivalent of removing a directory.
This operation is not trivial for two reasons:
- We have to list all objects in a “directory” because we need exact keys to remove anything
- We cannot remove more than 1000 keys at once
I will do all of that in three stages. First, I have to retrieve all object keys that begin with a given prefix. Note that the
S3Hook uses pagination internally to get all files, so we don’t have to worry about that:
1 2 3 4 5 from airflow.hooks.S3_hook import S3Hook s3_hook = S3Hook('s3_connection_id') object_keys = s3_hook.list_keys(bucket_name=bucket_name, prefix=key_prefix)
I cannot pass all of the keys directly into the
delete_objects functions because I may have more than 1000 keys in the
object_keys list. To solve that problem, I will define the
chunks function that splits the list into multiple lists that contain no more than the given number of elements:
1 2 3 def chunks(lst, n): for i in range(0, len(lst), n): yield lst[i:i + n]
Now, I use
chunks to split the keys into batches and remove every batch using the
1 2 3 4 if object_keys: batches = chunks(object_keys, 1000) for batch in batches: s3_hook.delete_objects(bucket=bucket_name, keys=batch)
You may also like
- How to run Airflow DAGs for a specified date in the past?
- How to select a random sample of rows using Athena
- How to postpone Airflow DAG until files get uploaded into an S3 bucket
- How to use Virtualenv to prepare a separate environment for Python function running in Airflow
- How to send metrics to AWS CloudWatch from custom Python code