Check how job parameters of SageMaker Batch Transform work from called functions in entrypoint and its arguments
awsmachinelearningSageMaker Batch Transform is a feature that runs a one-time batch inference job. It calls the entrypoint function of the Model as in the case of inference endpoints. I check how job parameters work from the function and its arguments.
SageMakerで学習したPyTorchのモデルをElastic Inferenceを有効にしてデプロイする - sambaiz-net
from sagemaker.transformer import Transformer
transformer = Transformer(
model_name=model_name,
instance_type='ml.m5.xlarge',
instance_count=1,
output_path=f's3://{os.getenv("S3_BUCKET")}/output/',
# strategy='SingleRecord',
# max_payload=1,
# assemble_with='Line',
# accept='application/json',
)
transformer.transform(
data=f's3://{os.getenv("S3_BUCKET")}/batch_input/',
content_type='application/json',
# compression_type='Gzip',
# split_type='Line',
# output_filter="$['SageMakerOutput','value']",
# join_source='Input',
)
Looking at the Transformer class in the SageMaker Inference Toolkit, I found that the following functions are called in transform_fn().
import json
class Model:
warmup = False
def predict(self, data):
if not self.warmup:
raise Exception("Model not warmed up")
return ["🍣", "🍵"][data % 2]
def pre_model_fn(model_dir, context=None):
print("pre_model_fn")
print(model_dir, context)
def model_fn(model_dir, context=None):
print("model_fn")
print(model_dir, context)
return Model()
def model_warmup_fn(model_dir, model, context=None):
print("model_warmup_fn")
print(model_dir, model, context)
model.warmup = True
def input_fn(input_data, content_type, context=None):
print("input_fn")
print(input_data, content_type, context)
return json.loads(input_data)
def predict_fn(input_data, model, context=None):
print("predict_fn")
print(input_data, model, context)
return model.predict(input_data['value'])
def output_fn(prediction, accept, context=None):
print("output_fn")
print(prediction, accept, context)
return json.dumps({"output": f"I want to {prediction}"}, ensure_ascii=False)
Inputs are 2 jsonl files (each 1.2 MB) containing 100000 jsonlines as follows.
$ cat testdata1.jsonl
{"value":1}
{"value":3}
{"value":4}
{"value":15}
{"value":13}
...
{"value":3000}
Case of strategy=‘SingleRecord’ and max_payload=1 (MB)
Whole file contents were passed to input_fn() and the size reached max_payload, so it failed with Too much data for max payload size error.
2023-08-13T10:31:04,142 [INFO ] W-9003-model_1.0-stdout MODEL_LOG - input_fn
2023-08-13T10:40:59,040 [INFO ] W-9002-model_1.0-stdout MODEL_LOG - {"value":1}
2023-08-13T10:40:59,040 [INFO ] W-9002-model_1.0-stdout MODEL_LOG - {"value":3}
2023-08-13T10:40:59,040 [INFO ] W-9002-model_1.0-stdout MODEL_LOG - {"value":4}
2023-08-13T10:40:59,040 [INFO ] W-9002-model_1.0-stdout MODEL_LOG - {"value":15}
...
2023-08-13T10:40:59,061 [INFO ] W-9002-model_1.0-stdout MODEL_LOG - {"value":3} application/json <ts.context.Context object at 0x7f4cde8cef10>
...
on <ts.context.Context object at 0x7f4cde8cef10>
2023-08-13T10:40:59,061 [INFO ] W-9002-model_1.0-stdout MODEL_METRICS - PredictionTime.Milliseconds:0.51|#ModelName:model,Level:Model|#hostname:26f3ed57bfc4,requestID:987d258e-61f6-4dd1-9ab9-7cea700d688d,timestamp:1691923258
2023-08-13T10:40:57.646:[sagemaker logs]: MaxConcurrentTransforms=1, MaxPayloadInMB=6, BatchStrategy=SINGLE_RECORD
2023-08-13T10:40:58.380:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Bad HTTP status received from algorithm: 500
2023-08-13T10:40:58.381:[sagemaker logs]: ****/batch_input/testdata2.jsonl:
2023-08-13T10:40:58.381:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Message:
2023-08-13T10:40:58.385:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Extra data: line 2 column 1 (char 12)
...
2023-08-13T10:40:59.069:[sagemaker logs]: ****/batch_input/testdata1.jsonl: return json.loads(input_data)
Case of split_type=‘Line’, strategy=‘SingleRecord’, max_payload=1 (MB)
When split_type=‘Line’ is passed, the data is split by line, but the output is not separated by line.
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - input_fn
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - {"value":8}
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - application/json <ts.context.Context object at 0x7f9caa819d90>
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - predict_fn
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - {'value': 8} <inference.Model object at 0x7f9caa827340> <ts.context.Context object at 0x7f9caa819d90>
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - output_fn
2023-08-13T10:42:50,220 [INFO ] W-9001-model_1.0-stdout MODEL_LOG - 🍣 application/json <ts.context.Context object at 0x7f9caa819d90>
$ cat testdata1.jsonl.out
I want to 🍵I want to 🍵I want to 🍣I want to 🍵I want to 🍵I want to 🍣I want to 🍵...
$ cat testdata2.jsonl.out
I want to 🍵I want to 🍵I want to 🍵I want to 🍣I want to 🍵I want to 🍣...
Case of split_type=‘Line’, strategy=‘SingleRecord’, assemble_with=‘Line’
assemble_with=‘Line’ separates outputs by line.
$ cat testdata1.jsonl.out
{"output": "I want to 🍵"}
{"output": "I want to 🍵"}
{"output": "I want to 🍣"}
{"output": "I want to 🍵"}
{"output": "I want to 🍵"}
...
$ cat testdata2.jsonl.out
{"output": "I want to 🍵"}
{"output": "I want to 🍵"}
{"output": "I want to 🍣"}
{"output": "I want to 🍵"}
{"output": "I want to 🍵"}
...
Case of split_type=‘Line’, strategy=‘MultiRecord’, max_payload=1
When strategy is MultiRecord, I thought that an array of multi-line data would be passed to input_fn, but the data with line breaks was passed as in the case of split_type=None. However, unlike when split_type=None, the size did not reach max_payload, so it did not occur Too much data but json.loads() error.
2023-08-13T14:51:12.281:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Bad HTTP status received from algorithm: 500
2023-08-13T14:51:12.281:[sagemaker logs]: ****/batch_input/testdata2.jsonl:
2023-08-13T14:51:12.281:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Message:
2023-08-13T14:51:12.282:[sagemaker logs]: ****/batch_input/testdata2.jsonl: Extra data: line 2 column 1 (char 12)
...
2023-08-13T14:51:18.004:[sagemaker logs]: ****/batch_input/testdata1.jsonl: return json.loads(input_data)
Splitting in input_fn and joining in output_fn resulted in the same output as case of SingleRecord.
def input_fn(input_data, content_type, context=None):
print("input_fn")
print(input_data, content_type, context)
lines = filter(lambda x: x != '', input_data.split('\n'))
return map(lambda x: json.loads(x), lines)
def predict_fn(input_data, model, context=None):
print("predict_fn")
print(input_data, model, context)
return map(lambda x: model.predict(x['value']), input_data)
def output_fn(prediction, accept, context=None):
print("output_fn")
print(prediction, accept, context)
return '\n'.join(map(lambda x: json.dumps({"output": f"I want to {x}"}, ensure_ascii=False), prediction))
Case of join_source=‘Input’
It returns the value of output_fn as a SageMakerOutput field appended to the input data. content_type must be the same as accept.
{"SageMakerOutput":{"output":"I want to 🍵"},"value":1}
{"SageMakerOutput":{"output":"I want to 🍵"},"value":3}
{"SageMakerOutput":{"output":"I want to 🍣"},"value":4}
{"SageMakerOutput":{"output":"I want to 🍵"},"value":15}
{"SageMakerOutput":{"output":"I want to 🍵"},"value":13}