Remove a directory from S3 using Airflow S3Hook

Even though S3 has no concept of catalogs, we tend to put / as delimiters in the object keys and think of files with the same key prefix as files in the same directory. After all, when we open the S3 web interface, it looks like a file system with directories.

Because of that, removing files with a common prefix is an everyday use case, as it is the S3 equivalent of removing a directory.

This operation is not trivial for two reasons:

  • We have to list all objects in a “directory” because we need exact keys to remove anything
  • We cannot remove more than 1000 keys at once

I will do all of that in three stages. First, I have to retrieve all object keys that begin with a given prefix. Note that the S3Hook uses pagination internally to get all files, so we don’t have to worry about that:

from airflow.hooks.S3_hook import S3Hook

s3_hook = S3Hook('s3_connection_id')

object_keys = s3_hook.list_keys(bucket_name=bucket_name, prefix=key_prefix)

I cannot pass all of the keys directly into the delete_objects functions because I may have more than 1000 keys in the object_keys list. To solve that problem, I will define the chunks function that splits the list into multiple lists that contain no more than the given number of elements:

def chunks(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

Now, I use chunks to split the keys into batches and remove every batch using the delete_objects function:

if object_keys:
    batches = chunks(object_keys, 1000)
    for batch in batches:
        s3_hook.delete_objects(bucket=bucket_name, keys=batch)
Older post

Run a command on a remote server using SSH in Airflow

How to use the SSHHook in a PythonOperator to connect to a remote server from Airflow using SSH and execute a command.

Newer post

How to use Virtualenv to prepare a separate environment for Python function running in Airflow

How to use the PythonVirtualenvOperator in Airflow