SageMaker Experimentsは Processing での前処理や Training での学習に用いたパラメータやモデルの精度を記録する機能。 今回は前処理から学習までの流れを Experiments の Run として追跡し、複数の結果を比較できることを確認する。
全体のコードは GitHub にある。
SageMaker Experiments
experiments.Run() で Experiment が存在しない場合は作成し Run を開始する。 以前は sagemaker-experiments というライブラリに別れていたが、現在は sagemaker に統合されている。
from sagemaker import experiments
experiment_name = f'test-experiment-{now.strftime("%Y%m%d%H%M%S")}'
with experiments.Run(experiment_name, run_name="run1") as run:
preprocess(run)
train(run, {'aaa': 0.4, 'bbb': True})
各スクリプト内で load_run() して log_parameter() すると Run にパラメータが記録される。
from sagemaker.experiments import load_run
with load_run() as run:
run.log_parameter("param1", "value1")
メトリクスについても Experiments に記録されるが、こちらは Run ではなく training job の Trial Component に記録される。
metric_definitions=[{'Name': 'test:accuracy', 'Regex': 'test:accuracy=(\d\.\d+)'}],
enable_sagemaker_metrics=True,
SageMaker Processing
SageMaker Studio またはローカルから次のような関数を呼ぶとリソースが立ち上がりスクリプトが実行される。 この例では自前のイメージで動かしているが、 PySparkProcessor や SKLearnProcessor などの SageMaker が用意しているイメージを使うこともできる。
def preprocess(run: experiments.Run):
processor = Processor(
base_job_name=f'preprocess-{run.run_name}',
image_uri=f'{os.getenv("ECR_REPOSITORY_PREPROCESS")}:latest',
role=os.getenv('ROLE_ARN'),
instance_count=1,
instance_type='ml.m5.xlarge',
env={'AWS_DEFAULT_REGION': os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-1')},
)
processor.run(
inputs=[
ProcessingInput(
source=os.getenv('S3_DATA_PATH'),
destination='/opt/ml/processing/input',
# s3_data_distribution_type='ShardedByS3Key',
),
],
outputs=[
ProcessingOutput(
source='/opt/ml/processing/output',
destination=os.getenv('S3_DATA_PATH'),
# s3_upload_mode='Continuous'
),
],
arguments=[
'--xxx', '12345',
],
wait=True,
)
inputs に指定した /opt/ml/processing/ から始まるパスに source にあるオブジェクトがダウンロードされ、 outputs に指定したパスに書き込むとアップロードされる。 デフォルトでは全てのオブジェクトが各インスタンスにダウンロードされるが、 s3_data_distribution_type=‘ShardedByS3Key’ にすると分散されるようになる。 また、s3_upload_mode=‘Continuous’ にするとジョブの終了時ではなく都度アップロードされる。
import argparse
import pandas as pd
from sagemaker.experiments import load_run
def main():
print("- preprocessing started")
parser = argparse.ArgumentParser()
parser.add_argument('--xxx', type=int, required=True)
args = parser.parse_args()
with load_run() as run:
run.log_parameters({'preprocess:arg_xxx': args.xxx})
df = pd.read_csv("/opt/ml/processing/input/raw.csv")
df.iloc[:5,:].to_csv('/opt/ml/processing/output/train.csv')
df.iloc[5:,:].to_csv('/opt/ml/processing/output/test.csv')
if __name__ == "__main__":
main()
SageMaker Training
Processing と同様に次のような関数を呼ぶとリソースが立ち上がり学習が始まる。 各フレームワークの Estimator が用意されているが、今回は自前のイメージで動かしている。
SageMakerでPyTorchのモデルを学習させる - sambaiz-net
SageMakerでTensorFlowのモデルを学習させる - sambaiz-net
def train(run: experiments.Run, hyperparameters: object = {}):
estimator = Estimator(
base_job_name=f'train-{run.run_name}',
image_uri=f'{os.getenv("ECR_REPOSITORY_TRAIN")}:latest',
training_repository_access_mode='Platform',
role=os.getenv('ROLE_ARN'),
instance_count=1,
instance_type='ml.m5.xlarge',
hyperparameters=hyperparameters,
environment={'AWS_DEFAULT_REGION': os.getenv('AWS_DEFAULT_REGION', 'ap-northeast-1')},
# output_path=,
)
data_path = os.getenv('S3_DATA_PATH')
estimator.fit(
inputs={'training': data_path, 'testing': data_path},
wait=True
)
inputs は /opt/ml/input/data 下に、 ハイパーパラメータは /opt/ml/input/config/hyperparameters.json に書き込まれ、 学習したモデルは /opt/ml/model に書き込むとアップロードされる。 sagemaker_training ライブラリでこれらのパスを参照することができる。 output_path を指定しなかった場合、SageMakerのデフォルトBucketである sagemaker-region-account-id にアップロードされる。
import os
from sagemaker.experiments import load_run
from sagemaker_training import environment
import pandas as pd
def train(data_path: str, hyperparameters: dict) -> str:
print(f"data_path: {data_path}")
print(f"hyperparameters: {hyperparameters}")
print(f"training data:\n{pd.read_csv(data_path)}")
return 'trained model'
def test(data_path: str, hyperparameters: dict) -> float:
with load_run() as run:
print(f"test data:\n{pd.read_csv(data_path)}")
run.log_parameters({'test:hp_bbb': hyperparameters.get('bbb')})
for epoch in range(1, 10):
run.log_metric(name="test:accuracy", value=hyperparameters.get('aaa', 0.0) / epoch, step=epoch)
def main():
print("- training started")
env = environment.Environment()
print(f"master_hostname: {env.master_hostname}, current_host: {env.current_host}")
model = train(os.path.join(env.channel_input_dirs['training'], 'train.csv'), env.hyperparameters) # /opt/ml/input/data/training
with open(os.path.join(env.model_dir, 'some_model.dat'), 'w') as f: # /opt/ml/model
f.write(model)
test(os.path.join(env.channel_input_dirs['testing'], 'test.csv'), env.hyperparameters) # /opt/ml/input/data/testing
if __name__ == '__main__':
main()
実験結果
ExperimentAnalytics で Experiments の結果を確認する。
from sagemaker.analytics import ExperimentAnalytics
experiment_analytics = ExperimentAnalytics(experiment_name)df = experiment_analytics.dataframe()
df['Source'] = df['SourceArn'].apply(lambda x: str(x).split(':')[-1])
print(df[[
"TrialComponentName",
"Source",
"preprocess:arg_xxx",
"test:hp_bbb",
"test:accuracy - Last"
]])
log_parameter() した値が含まれている TrialComponent のほかに processing-job や training-job を Source とするものが含まれている。 run1 と run2 を比較することでどちらの精度が高いかをパラメータの値と共に確認できる。
TrialComponentName Source preprocess:arg_xxx test:hp_bbb test:accuracy - Last
0 train-run2-2023-05-02-16-40-40-901-aws-trainin... training-job/train-run2-2023-05-02-16-40-40-901 NaN NaN NaN
1 test-experiment-20230503012814-run2 nan 12345.0 False 0.8
2 preprocess-run2-2023-05-02-16-36-12-041-aws-pr... processing-job/preprocess-run2-2023-05-02-16-3... NaN NaN NaN
3 test-experiment-20230503012814-run1 nan 12345.0 True 0.4
4 train-run1-2023-05-02-16-33-44-735-aws-trainin... training-job/train-run1-2023-05-02-16-33-44-735 NaN NaN NaN
5 preprocess-run1-2023-05-02-16-28-15-055-aws-pr... processing-job/preprocess-run1-2023-05-02-16-2... NaN NaN NaN
SageMaker Studio での可視化
SageMaker Studio 上でも Experiments の結果を確認できる。
Run を選んで Analyze するとStepごとの精度などを可視化できる。