How to add custom preprocessing code to a Sagemaker Endpoint running a Tensorflow model

This article shows how to add custom preprocessing/postprocessing code to a Sagemaker Endpoint running a Tensorflow model. We’ll do two things: create a Python file with functions used to convert the values and configure Sagemaker Endpoint to use the file.

How Does It Work?

When we call a Sagemaker Endpoint, it passes the input to the input_handler function, which accepts two parameters: the request body and context (which contains HTTP headers). The input handler must return a proper input to the Tensorflow Serving endpoint. Afterward, Sagemaker Endpoint passes the data to Tensorflow, makes the prediction, and converts the Tensorflow Serving output using the output_handler function. The output function also accepts two parameters (data and context) and returns the converted response and the content type.

Defining the Handlers

To define both handlers, let’s create a new Python file in the src directory. If we include the requirements.txt file in the directory, Sagemaker Endpoints will install the dependencies while deploying the endpoint.

In the Python file, we can import dependencies and define the global state. I suggest avoiding updating any mutable state (internal or external) in the handler functions. Those functions exist to convert the data. If you need access control, use IAM to limit access to the endpoint. If you need caching, put an AWS Lambda between the Sagemaker Endpoint and the caller. If you want to log the requests/responses, use the built-in Data Capture feature.

Input Handler

We will import the AutoTokenizer from the transformers library and tokenize the input. In our example, Tensorflow Serving runs a BERT NLP model, so we pass both input_ids and the attention_mask to the model. Therefore, our input_handler function parses the JSON input, tokenizes it, extracts the required parameters from the tokens, and builds the input JSON for Tensorflow serving.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from transformers import AutoTokenizer
import json

max_seq_length = 64

tokenizer = AutoTokenizer.from_pretrained("bert-base-cased", cache_dir="/tmp/tokenizer")

def input_handler(data, context):
    data_str = data.read().decode("utf-8")
    jsonlines = data_str.split("\n")
    text_before_tokenization = json.loads(jsonlines[0])["text"]

    encode_plus_tokens = tokenizer(
        text_before_tokenization,
        add_special_tokens=True,
        truncation=True,
        max_length=max_seq_length,
        padding="max_length",
        return_attention_mask=True,
        return_token_type_ids= False,
        return_tensors="tf"
    )

    input_ids = encode_plus_tokens["input_ids"]
    input_mask = encode_plus_tokens["attention_mask"]

    transformed_instance = {"input_ids": input_ids.numpy().tolist()[0], "input_mask": input_mask.numpy().tolist()[0]}
    transformed_data = {"signature_name": "serving_default", "instances": [transformed_instance]}

    return json.dumps(transformed_data)

Output Handler

We’ll return the output without any modifications. Remember to pass the content type in a tuple with the response!

1
2
3
def output_handler(response, context):
    response_content_type = context.accept_header
    return json.dumps(response.json()), response_content_type

Configuring the Endpoint

To use the input and output handlers in the endpoint configuration, we have to add the entry_point and source_dir parameter to the model configuration:

1
2
3
4
5
6
7
8
9
model = TensorFlowModel(
    name='name',
    role=role,
    entry_point='inference.py',  # replace it with the name of your file containing handlers
    source_dir='src',
    model_data='s3_path',
    framework_version="2.3",
    sagemaker_session=sagemaker_session
)

You can find more information about deploying models using Sagemaker Endpoints in my articles about:

How to Use the Model

When we call the Sagemaker Endpoint from a Python code, we have to configure a boto3 client and call the invoke_endpoint function. Note that the payload passed to the endpoint must match the expected input of the input_handler function:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
payload = json.dumps({"text": text_to_classify})

runtime = boto3.client(
    "runtime.sagemaker",
    aws_access_key_id='',
    aws_secret_access_key='',
    region_name=''
)
response = runtime.invoke_endpoint(
    EndpointName=endpoint_name, ContentType="application/json", Body=payload
)

response = response["Body"].read()
result = json.loads(response.decode("utf-8"))

prediction = result['predictions'][0][0]

What To Do When You Don’t Know the Expected Parameters of the Tensorflow Model?

If you received a gzipped Tensorflow model from a data scientist and you don’t know what parameters you must pass to the model, install Tensorflow as a Python library and run the following command in the directory with the saved_model.pb file:

1
saved_model_cli show --dir . --all

You’ll see something like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
signature_def['serving_default']:
  The given SavedModel SignatureDef contains the following input(s):
    inputs['input_ids'] tensor_info:
        dtype: DT_INT32
        shape: (-1, 64)
        name: serving_default_input_ids:0
    inputs['input_mask'] tensor_info:
        dtype: DT_INT32
        shape: (-1, 64)
        name: serving_default_input_mask:0
  The given SavedModel SignatureDef contains the following output(s):
    outputs['outputs'] tensor_info:
        dtype: DT_FLOAT
        shape: (-1, 1)
        name: StatefulPartitionedCall:0
  Method name is: tensorflow/serving/predic

What do we do when we see such a signature? Our ML model needs a JSON object with the field signature_name set to serving_default and a field instances containing an array of objects with fields input_ids and input_mask. As the output, we will get a single number (one-element array nested in another array).

Did you enjoy reading this article?
Would you like to learn more about software craft in data engineering and MLOps?

Subscribe to the newsletter or add this blog to your RSS reader (does anyone still use them?) to get a notification when I publish a new essay!

Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.

Bartosz Mikulski

Bartosz Mikulski

  • Data/MLOps engineer by day
  • DevRel/copywriter by night
  • Python and data engineering trainer
  • Conference speaker
  • Contributed a chapter to the book "97 Things Every Data Engineer Should Know"
  • Twitter: @mikulskibartosz
Newsletter

Do you enjoy reading my articles?
Subscribe to the newsletter if you don't want to miss the new content, business offers, and free training materials.