How to get a notification when a new file is uploaded to an S3 bucket

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (94/100)

To get a Slack notification when a new file is uploaded to an S3 bucket, we have to configure the bucket notifications feature. We can do this using Terraform. In the following example, I assume that you have already defined the bucket and the SNS topic:

1
2
3
4
5
6
7
8
resource "aws_s3_bucket_notfication" "bucket_notification" {
    bucket = aws_s3_bucket.some-bucket.id

    topic {
        topic_arn = aws_sns_queue.queue-name.arn
        events = ["s3:ObjectCreated:*"]
    }
}

After that, we have to define a Lambda function that reacts to the messages sent to the SNS topic. I will use Serverless, so the yml configuration of my lambda function looks like this (it is only the part about the function; the whole file is longer):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux
    layer: true

functions:
  function_name:
    handler: file_name.function_name
    memorySize: 128
    layers:
      - {Ref: PythonRequirementsLambdaLayer}
    events:
      - sns: sns_arn

In the Python function, we have to extract the data from the SNS message and parse the messages:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import requests
import json
import boto3


def _parse_message(raw_message):
    message_records = json.loads(raw_message)
    for message in message_records['Records']:
        bucket_name = message['s3']['bucket']['name']
        file_name = message['s3']['object']['key']
        file_size = message['s3']['object']['size']
        file_size_in_mb = '{:.5f}'.format(file_size / (1024**2))
        yield f"File {file_name} uploaded to bucket {bucket_name} (size: {file_size_in_mb} MB)"

def _extract_file_info(record):
    sns_event = record['Sns']
    raw_message = sns_event['Message']
    message = list(_parse_message(raw_message))
    return message

We use those functions in the handler function to build the message:

1
2
3
4
5
def function_name(event, context):
    updated_file = [_extract_file_info(record) for record in event['Records']]
    message = '\n'.join([item for sublist in updated_file for item in sublist])
    
    ...

In the end, we have to send the message to the Slack webhook URL. I assume that you have the URL defined as a constant. In a real application, it is best to pass it as an environment variable or read from the Secrets Manager:

1
    requests.post(SLACK_WEBHOOK_URL, json={'text': message, 'link_names': 1})

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.