Create a cost-optimized real-time inference endpoint with SageMaker Inference Recommender
awspythonmachinelearningSageMaker Inference Recommender is a feature to recommend endpoint’s instance types and settings.
SageMakerで学習したPyTorchのモデルをElastic Inferenceを有効にしてデプロイする - sambaiz-net
I try running an Inference Recommender job for a PyTorch model created in the past and setting up an inference endpoint with the lowest cost instance type. The full code is on GitHub.
Default
First, run a job with Default JobType and npy input that default inference handler supports.
def start_default_inference_recommendations_job(sagemaker_client, model_name: str):
job_name = f"{model_name}-default"
sagemaker_client.create_inference_recommendations_job(
JobName=job_name,
JobType='Default',
RoleArn=os.getenv("SAGEMAKER_JOB_ROLE"),
InputConfig={
'ModelName': model_name,
'ContainerConfig': {
'Domain': 'COMPUTER_VISION',
'Framework': 'PYTORCH',
'FrameworkVersion': '1.13',
'PayloadConfig': {
# TODO upload & change this if you need
'SamplePayloadUrl': f's3://{os.getenv("S3_BUCKET")}/sample_payload.tar.gz',
'SupportedContentTypes': ['application/x-npy'],
},
'Task': 'IMAGE_CLASSIFICATION',
},
}
)
while True:
response = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
execution_time = datetime.now(timezone.utc) - response['CreationTime']
print(f"\033[34mDefault job: {response['Status']} ({execution_time})\033[0m")
if response['Status'] == 'COMPLETED':
recommendations = [
{
'InstanceType': r['EndpointConfiguration']['InstanceType'],
'InitialInstanceCount': r['EndpointConfiguration']['InitialInstanceCount'],
'CostPerInference': r['Metrics']['CostPerInference'],
'ModelLatency': r['Metrics']['ModelLatency'],
'CpuUtilization': r['Metrics']['CpuUtilization'],
'MemoryUtilization': r['Metrics']['MemoryUtilization'],
'MaxInvocationsPerMinute': r['Metrics']['MaxInvocations'],
} for r in response['InferenceRecommendations']
]
return sorted(recommendations, key=lambda v: v['CostPerInference'])
elif response['Status'] == 'FAILED':
raise response
time.sleep(15)
The result is as follows. The model is small, so the smallest instance type seems to be enough.
Default job: COMPLETED (0:27:45.502514)
--- Default ----
ml.c6i.large x 1: $8.428847309005505e-08/inference
ml.c6i.xlarge x 1: $1.2575290497807146e-07/inference
ml.c5.2xlarge x 1: $2.536017404963786e-07/inference
ml.c6i.4xlarge x 1: $4.970098075318674e-07/inference
ml.c6i.8xlarge x 1: $9.742772135723499e-07/inference
-------
Advanced
Next, run an Advanced JobType, which allows you to run load tests with custom traffic patterns, for the instance types recommended by the Default Job.
def start_advanced_inference_recommendations_job(
sagemaker_client,
model_name: str,
instance_type_options: list[str]):
job_name = f"{model_name}-advanced"
sagemaker_client.create_inference_recommendations_job(
JobName=job_name,
JobType='Advanced',
RoleArn=os.getenv("SAGEMAKER_JOB_ROLE"),
InputConfig={
'ModelName': model_name,
'JobDurationInSeconds': 6000,
'ContainerConfig': {
'Domain': 'COMPUTER_VISION',
'Framework': 'PYTORCH',
'FrameworkVersion': '1.13',
'PayloadConfig': {
# TODO upload & change this
'SamplePayloadUrl': f's3://{os.getenv("S3_BUCKET")}/sample_payload.tar.gz',
'SupportedContentTypes': ['application/x-npy'],
},
'Task': 'IMAGE_CLASSIFICATION',
},
'TrafficPattern': {
'TrafficType': 'PHASES',
'Phases': [
{
'InitialNumberOfUsers': 1,
'SpawnRate': 1,
'DurationInSeconds': 120
},
{
'InitialNumberOfUsers': 1,
'SpawnRate': 1,
'DurationInSeconds': 120
}
]
},
'ResourceLimit': {
'MaxNumberOfTests': 10,
'MaxParallelOfTests': 3
},
"EndpointConfigurations": [{'InstanceType': instance_type} for instance_type in instance_type_options]
},
StoppingConditions={
"MaxInvocations": 1000,
"ModelLatencyThresholds": [{"Percentile": "P95", "ValueInMilliseconds": 500}],
},
)
while True:
response = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
elapsed_time = datetime.now(timezone.utc) - response['CreationTime']
print(f"\033[32mAdvanced job: {response['Status']} ({elapsed_time})\033[0m")
if response['Status'] == 'COMPLETED':
recommendations = [
{
'InstanceType': r['EndpointConfiguration']['InstanceType'],
'InitialInstanceCount': r['EndpointConfiguration']['InitialInstanceCount'],
'CostPerInference': r['Metrics']['CostPerInference'],
'ModelLatency': r['Metrics']['ModelLatency'],
'CpuUtilization': r['Metrics']['CpuUtilization'],
'MemoryUtilization': r['Metrics']['MemoryUtilization'],
'MaxInvocationsPerMinute': r['Metrics']['MaxInvocations'],
} for r in response['InferenceRecommendations']
]
return sorted(recommendations, key=lambda v: v['CostPerInference'])
elif response['Status'] == 'FAILED':
raise response
time.sleep(15)
The result is as follows.
Advanced job: COMPLETED (0:29:44.586078)
--- Advanced ----
ml.c6i.large x 1: $2.8025144160892523e-07/inference
ml.c6i.xlarge x 1: $5.61164256396296e-07/inference
ml.c5.2xlarge x 1: $1.0954817071251455e-06/inference
ml.c6i.4xlarge x 1: $2.252038939332124e-06/inference
ml.c6i.8xlarge x 1: $4.375719072413631e-06/inference
-------
Auto scaling
Inference Recommender can also be run for existing endpoints, but it does not currently support auto scaling, so it should be set based on the MaxInvocations (sample response in the document has MaximumInvocations instead, so it might be changed), which is the maximum number of calls per minute.
Auto scaling can be set with Application Auto Scaling’s ServiceNamespace=“sagemaker”.
Application Auto Scalingのcustom-resourceによるKinesis Data Streamsのオートスケール設定 - sambaiz-net
def register_auto_scale_settings(
sagemaker_client,
autoscaling_client,
endpoint_name: str,
variant_name: str,
min_capacity: int,
max_capacity: int,
invocation_per_instance_minute: int):
while True:
endpoint = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
elapsed_time = datetime.now(timezone.utc) - endpoint['LastModifiedTime']
print(f"\033[33mEndpoint status: {endpoint['EndpointStatus']} ({elapsed_time})\033[0m")
if endpoint['EndpointStatus'] == 'InService':
break
elif endpoint['EndpointStatus'] == 'Failed':
raise endpoint
time.sleep(15)
resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"
autoscaling_client.register_scalable_target(
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
MinCapacity=min_capacity,
MaxCapacity=max_capacity,
)
autoscaling_client.put_scaling_policy(
PolicyName="Invocations-ScalingPolicy",
ServiceNamespace="sagemaker",
ResourceId=resource_id,
ScalableDimension="sagemaker:variant:DesiredInstanceCount",
PolicyType="TargetTrackingScaling",
TargetTrackingScalingPolicyConfiguration={
"TargetValue": invocation_per_instance_minute,
"PredefinedMetricSpecification": {
"PredefinedMetricType": "SageMakerVariantInvocationsPerInstance"
}
},
)