How to run batch inference using Sagemaker Batch Transform Jobs

This article will show you how to run batch machine learning inference on a dataset stored in S3. We’ll use the Sagemaker Batch Transform Jobs and a pre-trained machine learning model.

I assume you have already trained the model, pushed the Docker image to ECR, and registered the model in Sagemaker. If you don’t know how to do it, take a look at one of my previous articles:

Preprocessing the data

We’ll need the input data in CSV files or text files containing JSON objects. In the case of JSON, the file should contain one JSON per line - the jsonl format. Sagemaker batch transform jobs can read uncompressed data and files using gzip compression.

Our example model classifies text and does the entire data preprocessing internally in the Docker container. We send raw data to the endpoint, and the Docker container cleans the input, tokenizes it, and passes the tokenization result to the model. If the model requires two input parameters called: feature_A and feature_B, we create jsonl files looking like this:

{"feature_A": "This is a text", "feature_B": "Some other text"}

In the case of CSV files, we CAN’T use headers, so the Docker container must recognize parameters using their position in the input array instead of feature names.

Running the batch inference

To run the batch inference, we need the identifier of the Sagemaker model we want to use and the location of the input data. We’ll also need to decide where Sagemaker will store the output.

First, we have to configure a Transformer. We’ll use the “assembly with line” mode to combine the output with the input. It’ll make it easier to process the results. However, we create a duplicate of input data, so you may want to use a different mode if you must limit the cost of S3 storage.

We have also specified the input data format application/json and instruct Sagemaker to process data points one by one (SingleRecord strategy). Alternatively, we could use batching and send multiple data points to the Docker container at once, but our code (in the Docker container) must support such an input format. In this example, I reuse the model I was using for real-time inference, so SingleRecord is my only option.

from sagemaker.transformer import Transformer

transformer = Transformer(
    model_name='model_id',
    instance_count=1,
    instance_type='ml.m5.large',
    strategy='SingleRecord',
    output_path='s3://bucket_name/key_prefix',
    base_transform_job_name='transform_job_name_prefix',
    accept='application/json',
    assembly_with='Line'
)

When we have the Transformer, we can start a new batch transform job. Unfortunately, the API requires parameter duplication, so we specify the content_type and the join_source again, even though application/json and Line are the only options we can use with our Transformer (because of the parameters we passed to its constructor).

We can pass the input location in two ways. We can pass the object key or a prefix of multiple object keys as in the example below or pass the object key of a ManifestFile. In the ManifestFile, we list the exact S3 keys of the files we want to process.

transformer.transform(
    's3://bucket/input_file_key_or_prefix',
    content_type='application_json',
    split_type='Line',
    join_source='Input',
    wait=False)

The transform method doesn’t return the job’s identifier, so to check whether we can read the output, we need to find the running job using boto3 and check the job status. We can do it like this:

import boto3
client = boto3.client('sagemaker')
client.list_transform_jobs(NameContains='transform_job_name_prefix')

It’ll find the jobs with the given prefix. It’s the same prefix as the base_transform_job_name we used in the Transformer constructor. Alternatively, we could set the wait parameter to True and wait for the result.

Speeding up the processing

We have only one instance running, so processing the entire file may take some time. We can increase the number of instances using the instance_count parameter to speed it up. We can send multiple requests to the Docker container simultaneously, too. The configure concurrent transformations we must use the max_concurrent_transforms parameter. However, when we use this option, we must also make sure the instance type we selected is powerful enough to handle multiple requests at the same time. It may be an issue when we run a large language processing model.

Processing the output

In the end, we must get access to the output. We’ll find the output files in the location specified in the Transformer constructor. Every line contains the prediction and the input parameters. For example:

{"SageMakerOutput":{"predictions": [[0.76]]}, "feature_A": "This is a text", "feature_B": "Some other text"}

If you didn’t configure the join of the output with the input, the file would contain only the predictions. You can join the files using the line numbers (or the index number after loading the datasets to Pandas).

Older post

How to build maintainable software by abstracting the business rules in data engineering

Are we building the right abstractions in software?

Newer post

Data pipeline documentation without wasting your time

How to document an ETL pipeline or ML inference pipeline without doing useless work