SageMakerでTensorFlowのモデルを学習させる

pythontensorflowmachinelearningaws

以前PyTorchのモデルを学習させたが、そのTensorFlow版。

SageMakerでPyTorchのモデルを学習させる - sambaiz-net

コード

全体のコードはGitHubにある。

モデル

モデルはTitanicのを使う。

TensorFlow2のKeras APIでTitanicのモデルを作る - sambaiz-net

make_csv_dataset()batch_sizeが必須になっているが、 これをそのままfilter()しようとすると、ValueError: predicate return type must be convertible to a scalar boolean tensor.になってしまうのでunbatch()している。 SageMakerのServing containerを用いる場合はsave_format="tf"にしてSavedModel形式で保存する必要がある。

$ cat model.py
import tensorflow as tf
import logging

class Model:
    def __init__(self, logger: logging.Logger, dropout: float):
        self.logger = logger
        self.model = tf.keras.Sequential([
            tf.keras.layers.DenseFeatures(self._feature_columns()),
            tf.keras.layers.Dense(128, activation=tf.nn.relu),
            tf.keras.layers.Dropout(dropout),
            tf.keras.layers.Dense(128, activation=tf.nn.relu),
            tf.keras.layers.Dense(2, activation=tf.nn.sigmoid),
        ])
        self.model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

    def _feature_columns(self):
        return [
            tf.feature_column.numeric_column('age'),
            tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity('sex', 2)),
            tf.feature_column.numeric_column('fare')
        ]

    def _fill(self, feature: str, value):
        def __fill(x):
            if x[feature] == -1.0:
                x[feature] = value
            return x

        return __fill

    def _preprocess(self, ds: tf.data.Dataset):
        ds_size = len(list(ds.filter(lambda x: x['age'] != -1.0)))
        avg_age = ds.filter(lambda x: x['age'] != -1.0).reduce(0.0, lambda x, y: x + y['age']) / ds_size
        ds = ds.map(self._fill('age', avg_age))
        return ds.shuffle(1000).batch(100).prefetch(5)

    def train(self, data_path: str):
        ds = tf.data.experimental.make_csv_dataset(data_path, num_epochs=1, batch_size=1).unbatch()
        for features in self._preprocess(ds):
            self.model.fit(features, tf.one_hot(features['survived'], 2))
            loss, accuracy = self.model.evaluate(features, tf.one_hot(features['survived'], 2), verbose=0)
            self.logger.info(f'train accuracy: {accuracy}, train loss: {loss};')

    def save(self, path: str):
        self.model.save(path, save_format="tf")

エントリーポイント

スクリプトモードで動かす。

$ cat training.py
import argparse
import os
from model import Model
import json
import logging
import sys


def _make_logger():
    logger = logging.getLogger(__name__)
    logger.setLevel(logging.DEBUG)
    logger.addHandler(logging.StreamHandler(sys.stdout))
    return logger


def _parse_args():
    parser = argparse.ArgumentParser()

    # model params
    parser.add_argument('--dropout', type=float, default=0.2, metavar='DROP', help='dropout rate (default: 0.2)')

    # Container environment
    parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR', 'model'))
    parser.add_argument('--hosts', type=str, default=os.environ.get('SM_HOSTS', '[]'))
    parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))

    # fit() inputs (SM_CHANNEL_XXXX)
    parser.add_argument('--data-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAINING', 'dataset'))
    args = parser.parse_args()
    args.hosts = json.loads(args.hosts)
    return args


if __name__ == "__main__":
    args = _parse_args()
    logger = _make_logger()
    m = Model(logger, args.dropout)
    m.train(os.path.join(args.data_dir, 'titanic.csv'))
    if len(args.hosts) == 0 or args.current_host == args.hosts[0]:
        m.save(args.model_dir)

実行

distributions に分散学習でパラメータを共有するためのパラメータサーバーかHorovodの設定を書くとクラスタを作ってくれる。 今回はパラメータサーバーを有効にしている。

Destributed TensorFlowの流れとSavedModelの出力 - sambaiz-net

from sagemaker.tensorflow import TensorFlow
import sagemaker

estimator = TensorFlow(
    source_dir='/root/sagemaker-tensorflow-titanic',
    entry_point='training.py',
    model_dir='/opt/ml/model', # => SM_MODEL_DIR
    output_path=f's3://{os.environ["DATASETS_BUCKET"]}/artifacts',
    role=sagemaker.get_execution_role(),
    framework_version='2.2.0',
    py_version='py37',
    train_instance_count=2,
    train_instance_type='ml.g4dn.xlarge',
    distributions={
        'parameter_server': { 'enabled': True }
    },
    hyperparameters={
        'dropout': 0.2,
    },
    metric_definitions=[
        {'Name':'test:accuracy', 'Regex':'train accuracy: (.*?),'},
        {'Name':'test:loss', 'Regex':'train loss: (.*?);'}
    ],
    enable_sagemaker_metrics=True,
)

inputs = sagemaker.inputs.s3_input(f's3://{os.environ["DATASETS_BUCKET"]}/{os.environ["DATASETS_KEY_PREFIX"]}')

estimator.fit(inputs={'training': inputs})

学習が終わるとoutput_pathにモデルが保存される。