How to get a notification when a new file is uploaded to an S3 bucket

To get a Slack notification when a new file is uploaded to an S3 bucket, we have to configure the bucket notifications feature. We can do this using Terraform. In the following example, I assume that you have already defined the bucket and the SNS topic:

resource "aws_s3_bucket_notfication" "bucket_notification" {
    bucket = aws_s3_bucket.some-bucket.id

    topic {
        topic_arn = aws_sns_queue.queue-name.arn
        events = ["s3:ObjectCreated:*"]
    }
}

After that, we have to define a Lambda function that reacts to the messages sent to the SNS topic. I will use Serverless, so the yml configuration of my lambda function looks like this (it is only the part about the function; the whole file is longer):

plugins:
  - serverless-python-requirements

custom:
  pythonRequirements:
    dockerizePip: non-linux
    layer: true

functions:
  function_name:
    handler: file_name.function_name
    memorySize: 128
    layers:
      - {Ref: PythonRequirementsLambdaLayer}
    events:
      - sns: sns_arn

In the Python function, we have to extract the data from the SNS message and parse the messages:

import requests
import json
import boto3


def _parse_message(raw_message):
    message_records = json.loads(raw_message)
    for message in message_records['Records']:
        bucket_name = message['s3']['bucket']['name']
        file_name = message['s3']['object']['key']
        file_size = message['s3']['object']['size']
        file_size_in_mb = '{:.5f}'.format(file_size / (1024**2))
        yield f"File {file_name} uploaded to bucket {bucket_name} (size: {file_size_in_mb} MB)"

def _extract_file_info(record):
    sns_event = record['Sns']
    raw_message = sns_event['Message']
    message = list(_parse_message(raw_message))
    return message

We use those functions in the handler function to build the message:

def function_name(event, context):
    updated_file = [_extract_file_info(record) for record in event['Records']]
    message = '\n'.join([item for sublist in updated_file for item in sublist])

    ...

In the end, we have to send the message to the Slack webhook URL. I assume that you have the URL defined as a constant. In a real application, it is best to pass it as an environment variable or read from the Secrets Manager:

    requests.post(SLACK_WEBHOOK_URL, json={'text': message, 'link_names': 1})
Older post

Get an XCom value in the Airflow on_failure_callback function

How to get the task instance in the on_failure_callback to get access to XCom

Newer post

How to enable S3 bucket versioning using Terraform

How to configure S3 bucket versioning in Terraform