How to get a notification when a new file is uploaded to an S3 bucket

This article is a part of my "100 data engineering tutorials in 100 days" challenge. (94/100)

To get a Slack notification when a new file is uploaded to an S3 bucket, we have to configure the bucket notifications feature. We can do this using Terraform. In the following example, I assume that you have already defined the bucket and the SNS topic:

1
2
3
4
5
6
7
8
resource "aws_s3_bucket_notfication" "bucket_notification" {
    bucket = aws_s3_bucket.some-bucket.id

    topic {
        topic_arn = aws_sns_queue.queue-name.arn
        events = ["s3:ObjectCreated:*"]
    }
}

After that, we have to define a Lambda function that reacts to the messages sent to the SNS topic. I will use Serverless, so the yml configuration of my lambda function looks like this (it is only the part about the function; the whole file is longer):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux
    layer: true

functions:
  function_name:
    handler: file_name.function_name
    memorySize: 128
    layers:
      - {Ref: PythonRequirementsLambdaLayer}
    events:
      - sns: sns_arn

In the Python function, we have to extract the data from the SNS message and parse the messages:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import requests
import json
import boto3


def _parse_message(raw_message):
    message_records = json.loads(raw_message)
    for message in message_records['Records']:
        bucket_name = message['s3']['bucket']['name']
        file_name = message['s3']['object']['key']
        file_size = message['s3']['object']['size']
        file_size_in_mb = '{:.5f}'.format(file_size / (1024**2))
        yield f"File {file_name} uploaded to bucket {bucket_name} (size: {file_size_in_mb} MB)"

def _extract_file_info(record):
    sns_event = record['Sns']
    raw_message = sns_event['Message']
    message = list(_parse_message(raw_message))
    return message

We use those functions in the handler function to build the message:

1
2
3
4
5
def function_name(event, context):
    updated_file = [_extract_file_info(record) for record in event['Records']]
    message = '\n'.join([item for sublist in updated_file for item in sublist])
    
    ...

In the end, we have to send the message to the Slack webhook URL. I assume that you have the URL defined as a constant. In a real application, it is best to pass it as an environment variable or read from the Secrets Manager:

1
    requests.post(SLACK_WEBHOOK_URL, json={'text': message, 'link_names': 1})

Subscribe to the newsletter and join the free email course.


Remember to share on social media!
If you like this text, please share it on Facebook/Twitter/LinkedIn/Reddit or other social media.

If you want to contact me, send me a message on LinkedIn or Twitter.

Would you like to have a call and talk? Please schedule a meeting using this link.


Bartosz Mikulski
Bartosz Mikulski * MLOps Engineer / data engineer * conference speaker * co-founder of Software Craft Poznan & Poznan Scala User Group

Subscribe to the newsletter and get access to my free email course on building trustworthy data pipelines.