SageMaker Inference Recommender でコスト最適なインスタンスタイプの推論エンドポイントを立てる

awspythonmachinelearning

SageMaker Inference Recommender は、 推論エンドポイントのインスタンスタイプや設定の候補を挙げてくれる機能。

SageMakerで学習したPyTorchのモデルをElastic Inferenceを有効にしてデプロイする - sambaiz-net

以前作成した MNIST の PyTorch モデルに対して Inference Recommender の Job を実行し、最もコストが低いインスタンスタイプで推論エンドポイントを立ててみる。 全体のコードは GitHub にある。

Default

まずは Default JobType で Job を実行してみる。入力にはデフォルトの inference handler が対応している npy にしている。

def start_default_inference_recommendations_job(sagemaker_client, model_name: str):
  job_name = f"{model_name}-default"
  sagemaker_client.create_inference_recommendations_job(
    JobName=job_name,
    JobType='Default',
    RoleArn=os.getenv("SAGEMAKER_JOB_ROLE"),
    InputConfig={
      'ModelName': model_name,
      'ContainerConfig': {
        'Domain': 'COMPUTER_VISION',
        'Framework': 'PYTORCH',
        'FrameworkVersion': '1.13',
        'PayloadConfig': {
          # TODO upload & change this if you need
          'SamplePayloadUrl': f's3://{os.getenv("S3_BUCKET")}/sample_payload.tar.gz',
          'SupportedContentTypes': ['application/x-npy'],
        },
        'Task': 'IMAGE_CLASSIFICATION',
      },
    }
  )

  while True:
    response = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
    execution_time = datetime.now(timezone.utc) - response['CreationTime']
    print(f"\033[34mDefault job: {response['Status']} ({execution_time})\033[0m")

    if response['Status'] == 'COMPLETED':
      recommendations = [
        {
          'InstanceType': r['EndpointConfiguration']['InstanceType'],
          'InitialInstanceCount': r['EndpointConfiguration']['InitialInstanceCount'],
          'CostPerInference': r['Metrics']['CostPerInference'],
          'ModelLatency': r['Metrics']['ModelLatency'],
          'CpuUtilization': r['Metrics']['CpuUtilization'],
          'MemoryUtilization': r['Metrics']['MemoryUtilization'],
          'MaxInvocationsPerMinute': r['Metrics']['MaxInvocations'],
        } for r in response['InferenceRecommendations']
      ]
      return sorted(recommendations, key=lambda v: v['CostPerInference'])

    elif response['Status'] == 'FAILED':
      raise response

    time.sleep(15)

結果は次のようになった。モデルが小さいこともあって小さなインスタンスタイプで十分なようだ。

Default job: COMPLETED (0:27:45.502514)
--- Default ----
ml.c6i.large x 1: $8.428847309005505e-08/inference
ml.c6i.xlarge x 1: $1.2575290497807146e-07/inference
ml.c5.2xlarge x 1: $2.536017404963786e-07/inference
ml.c6i.4xlarge x 1: $4.970098075318674e-07/inference
ml.c6i.8xlarge x 1: $9.742772135723499e-07/inference
-------

Advanced

Default で挙げられたインスタンスタイプを対象に、独自のトラフィックパターンでロードテストを行える Advanced な Job も実行してみる。

def start_advanced_inference_recommendations_job(
        sagemaker_client, 
        model_name: str, 
        instance_type_options: list[str]):
  job_name = f"{model_name}-advanced"
  sagemaker_client.create_inference_recommendations_job(
    JobName=job_name,
    JobType='Advanced',
    RoleArn=os.getenv("SAGEMAKER_JOB_ROLE"),
    InputConfig={
      'ModelName': model_name,
      'JobDurationInSeconds': 6000,
      'ContainerConfig': {
        'Domain': 'COMPUTER_VISION',
        'Framework': 'PYTORCH',
        'FrameworkVersion': '1.13',
        'PayloadConfig': {
          # TODO upload & change this
          'SamplePayloadUrl': f's3://{os.getenv("S3_BUCKET")}/sample_payload.tar.gz',
          'SupportedContentTypes': ['application/x-npy'],
        },
        'Task': 'IMAGE_CLASSIFICATION',
      },
      'TrafficPattern': {
        'TrafficType': 'PHASES',
        'Phases': [
          {
            'InitialNumberOfUsers': 1,
            'SpawnRate': 1,
            'DurationInSeconds': 120
          },
          {
            'InitialNumberOfUsers': 1,
            'SpawnRate': 1,
            'DurationInSeconds': 120
          }
        ]
      },
      'ResourceLimit': {
        'MaxNumberOfTests': 10,
        'MaxParallelOfTests': 3
      },
      "EndpointConfigurations": [{'InstanceType': instance_type} for instance_type in instance_type_options]
    },
    StoppingConditions={
      "MaxInvocations": 1000,
      "ModelLatencyThresholds": [{"Percentile": "P95", "ValueInMilliseconds": 500}],
    },
  )

  while True:
    response = sagemaker_client.describe_inference_recommendations_job(JobName=job_name)
    elapsed_time = datetime.now(timezone.utc) - response['CreationTime']
    print(f"\033[32mAdvanced job: {response['Status']} ({elapsed_time})\033[0m")

    if response['Status'] == 'COMPLETED':
      recommendations = [
        {
          'InstanceType': r['EndpointConfiguration']['InstanceType'],
          'InitialInstanceCount': r['EndpointConfiguration']['InitialInstanceCount'],
          'CostPerInference': r['Metrics']['CostPerInference'],
          'ModelLatency': r['Metrics']['ModelLatency'],
          'CpuUtilization': r['Metrics']['CpuUtilization'],
          'MemoryUtilization': r['Metrics']['MemoryUtilization'],
          'MaxInvocationsPerMinute': r['Metrics']['MaxInvocations'],
        } for r in response['InferenceRecommendations']
      ]
      return sorted(recommendations, key=lambda v: v['CostPerInference'])

    elif response['Status'] == 'FAILED':
      raise response

    time.sleep(15)

結果は次のようになった。

Advanced job: COMPLETED (0:29:44.586078)
--- Advanced ----
ml.c6i.large x 1: $2.8025144160892523e-07/inference
ml.c6i.xlarge x 1: $5.61164256396296e-07/inference
ml.c5.2xlarge x 1: $1.0954817071251455e-06/inference
ml.c6i.4xlarge x 1: $2.252038939332124e-06/inference
ml.c6i.8xlarge x 1: $4.375719072413631e-06/inference
-------

Auto scaling

Inference Recommender は既存のエンドポイントに対して 実行することもできるが、現状オートスケールには対応していないため分速の最大呼び出し数である Metrics.MaxInvocations (ドキュメントのサンプルレスポンスでは MaximumInvocations になっているので変わったのかもしれない) を参考に設定する。

オートスケールは Application Auto Scaling の ServiceNamespace=“sagemaker” で設定できる。

Application Auto Scalingのcustom-resourceによるKinesis Data Streamsのオートスケール設定 - sambaiz-net

def register_auto_scale_settings(
        sagemaker_client,
        autoscaling_client,
        endpoint_name: str,
        variant_name: str,
        min_capacity: int,
        max_capacity: int,
        invocation_per_instance_minute: int):
  while True:
    endpoint = sagemaker_client.describe_endpoint(EndpointName=endpoint_name)
    elapsed_time = datetime.now(timezone.utc) - endpoint['LastModifiedTime']
    print(f"\033[33mEndpoint status: {endpoint['EndpointStatus']} ({elapsed_time})\033[0m")

    if endpoint['EndpointStatus'] == 'InService':
      break
    elif endpoint['EndpointStatus'] == 'Failed':
      raise endpoint

    time.sleep(15)

  resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"
  autoscaling_client.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=min_capacity,
    MaxCapacity=max_capacity,
  )

  autoscaling_client.put_scaling_policy(
    PolicyName="Invocations-ScalingPolicy",
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="TargetTrackingScaling",
    TargetTrackingScalingPolicyConfiguration={
      "TargetValue": invocation_per_instance_minute,
      "PredefinedMetricSpecification": {
        "PredefinedMetricType": "SageMakerVariantInvocationsPerInstance"
      }
    },
  )