How to add custom preprocessing code to a Sagemaker Endpoint running a Tensorflow model

This article shows how to add custom preprocessing/postprocessing code to a Sagemaker Endpoint running a Tensorflow model. We’ll do two things: create a Python file with functions used to convert the values and configure Sagemaker Endpoint to use the file.

How Does It Work?

When we call a Sagemaker Endpoint, it passes the input to the input_handler function, which accepts two parameters: the request body and context (which contains HTTP headers). The input handler must return a proper input to the Tensorflow Serving endpoint. Afterward, Sagemaker Endpoint passes the data to Tensorflow, makes the prediction, and converts the Tensorflow Serving output using the output_handler function. The output function also accepts two parameters (data and context) and returns the converted response and the content type.

Defining the Handlers

To define both handlers, let’s create a new Python file in the src directory. If we include the requirements.txt file in the directory, Sagemaker Endpoints will install the dependencies while deploying the endpoint.

In the Python file, we can import dependencies and define the global state. I suggest avoiding updating any mutable state (internal or external) in the handler functions. Those functions exist to convert the data. If you need access control, use IAM to limit access to the endpoint. If you need caching, put an AWS Lambda between the Sagemaker Endpoint and the caller. If you want to log the requests/responses, use the built-in Data Capture feature.

Input Handler

We will import the AutoTokenizer from the transformers library and tokenize the input. In our example, Tensorflow Serving runs a BERT NLP model, so we pass both input_ids and the attention_mask to the model. Therefore, our input_handler function parses the JSON input, tokenizes it, extracts the required parameters from the tokens, and builds the input JSON for Tensorflow serving.

from transformers import AutoTokenizer
import json

max_seq_length = 64

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased", cache_dir="/tmp/tokenizer")

def input_handler(data, context):
    data_str = data.read().decode("utf-8")
    jsonlines = data_str.split("\n")
    text_before_tokenization = json.loads(jsonlines[0])["text"]

    encode_plus_tokens = tokenizer(
        text_before_tokenization,
        add_special_tokens=True,
        truncation=True,
        max_length=max_seq_length,
        padding="max_length",
        return_attention_mask=True,
        return_token_type_ids= False,
        return_tensors="tf"
    )

    input_ids = encode_plus_tokens["input_ids"]
    input_mask = encode_plus_tokens["attention_mask"]

    transformed_instance = {"input_ids": input_ids.numpy().tolist()[0], "input_mask": input_mask.numpy().tolist()[0]}
    transformed_data = {"signature_name": "serving_default", "instances": [transformed_instance]}

    return json.dumps(transformed_data)

Output Handler

We’ll return the output without any modifications. Remember to pass the content type in a tuple with the response!

def output_handler(response, context):
    response_content_type = context.accept_header
    return json.dumps(response.json()), response_content_type

Configuring the Endpoint

To use the input and output handlers in the endpoint configuration, we have to add the entry_point and source_dir parameter to the model configuration:

model = TensorFlowModel(
    name='name',
    role=role,
    entry_point='inference.py',  # replace it with the name of your file containing handlers
    source_dir='src',
    model_data='s3_path',
    framework_version="2.3",
    sagemaker_session=sagemaker_session
)

You can find more information about deploying models using Sagemaker Endpoints in my articles about:

How to Use the Model

When we call the Sagemaker Endpoint from a Python code, we have to configure a boto3 client and call the invoke_endpoint function. Note that the payload passed to the endpoint must match the expected input of the input_handler function:

payload = json.dumps({"text": text_to_classify})

runtime = boto3.client(
    "runtime.sagemaker",
    aws_access_key_id='',
    aws_secret_access_key='',
    region_name=''
)
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="application/json", Body=payload
)

response = response["Body"].read()
result = json.loads(response.decode("utf-8"))

prediction = result['predictions'][0][0]

What To Do When You Don’t Know the Expected Parameters of the Tensorflow Model?

If you received a gzipped Tensorflow model from a data scientist and you don’t know what parameters you must pass to the model, install Tensorflow as a Python library and run the following command in the directory with the saved_model.pb file:

saved_model_cli show --dir . --all

You’ll see something like this:

signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['input_ids'] tensor_info:
        dtype: DT_INT32
        shape: (-1, 64)
        name: serving_default_input_ids:0
    inputs['input_mask'] tensor_info:
        dtype: DT_INT32
        shape: (-1, 64)
        name: serving_default_input_mask:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['outputs'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predic

What do we do when we see such a signature? Our ML model needs a JSON object with the field signature_name set to serving_default and a field instances containing an array of objects with fields input_ids and input_mask. As the output, we will get a single number (one-element array nested in another array).

Older post

How to A/B test Tensorflow models using Sagemaker Endpoints

How to deploy multiple model versions as one Sagemaker Endpoint

Newer post

Data versioning with LakeFS

Why you should use LakeFS to build a data lake that supports data versioning