以前PyTorchのモデルを学習させたが、そのTensorFlow版。
SageMakerでPyTorchのモデルを学習させる - sambaiz-net
コード
全体のコードはGitHubにある。
モデル
モデルはTitanicのを使う。
TensorFlow2のKeras APIでTitanicのモデルを作る - sambaiz-net
make_csv_dataset()はbatch_size
が必須になっているが、
これをそのままfilter()
しようとすると、ValueError: predicate return type must be convertible to a scalar boolean tensor.
になってしまうのでunbatch()
している。
SageMakerのServing containerを用いる場合はsave_format="tf"
にしてSavedModel形式で保存する必要がある。
$ cat model.py
import tensorflow as tf
import logging
class Model:
def __init__(self, logger: logging.Logger, dropout: float):
self.logger = logger
self.model = tf.keras.Sequential([
tf.keras.layers.DenseFeatures(self._feature_columns()),
tf.keras.layers.Dense(128, activation=tf.nn.relu),
tf.keras.layers.Dropout(dropout),
tf.keras.layers.Dense(128, activation=tf.nn.relu),
tf.keras.layers.Dense(2, activation=tf.nn.sigmoid),
])
self.model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
def _feature_columns(self):
return [
tf.feature_column.numeric_column('age'),
tf.feature_column.indicator_column(tf.feature_column.categorical_column_with_identity('sex', 2)),
tf.feature_column.numeric_column('fare')
]
def _fill(self, feature: str, value):
def __fill(x):
if x[feature] == -1.0:
x[feature] = value
return x
return __fill
def _preprocess(self, ds: tf.data.Dataset):
ds_size = len(list(ds.filter(lambda x: x['age'] != -1.0)))
avg_age = ds.filter(lambda x: x['age'] != -1.0).reduce(0.0, lambda x, y: x + y['age']) / ds_size
ds = ds.map(self._fill('age', avg_age))
return ds.shuffle(1000).batch(100).prefetch(5)
def train(self, data_path: str):
ds = tf.data.experimental.make_csv_dataset(data_path, num_epochs=1, batch_size=1).unbatch()
for features in self._preprocess(ds):
self.model.fit(features, tf.one_hot(features['survived'], 2))
loss, accuracy = self.model.evaluate(features, tf.one_hot(features['survived'], 2), verbose=0)
self.logger.info(f'train accuracy: {accuracy}, train loss: {loss};')
def save(self, path: str):
self.model.save(path, save_format="tf")
エントリーポイント
スクリプトモードで動かす。
$ cat training.py
import argparse
import os
from model import Model
import json
import logging
import sys
def _make_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
return logger
def _parse_args():
parser = argparse.ArgumentParser()
# model params
parser.add_argument('--dropout', type=float, default=0.2, metavar='DROP', help='dropout rate (default: 0.2)')
# Container environment
parser.add_argument('--model_dir', type=str, default=os.environ.get('SM_MODEL_DIR', 'model'))
parser.add_argument('--hosts', type=str, default=os.environ.get('SM_HOSTS', '[]'))
parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))
# fit() inputs (SM_CHANNEL_XXXX)
parser.add_argument('--data-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAINING', 'dataset'))
args = parser.parse_args()
args.hosts = json.loads(args.hosts)
return args
if __name__ == "__main__":
args = _parse_args()
logger = _make_logger()
m = Model(logger, args.dropout)
m.train(os.path.join(args.data_dir, 'titanic.csv'))
if len(args.hosts) == 0 or args.current_host == args.hosts[0]:
m.save(args.model_dir)
実行
distributions
に分散学習でパラメータを共有するためのパラメータサーバーかHorovodの設定を書くとクラスタを作ってくれる。
今回はパラメータサーバーを有効にしている。
Destributed TensorFlowの流れとSavedModelの出力 - sambaiz-net
from sagemaker.tensorflow import TensorFlow
import sagemaker
estimator = TensorFlow(
source_dir='/root/sagemaker-tensorflow-titanic',
entry_point='training.py',
model_dir='/opt/ml/model', # => SM_MODEL_DIR
output_path=f's3://{os.environ["DATASETS_BUCKET"]}/artifacts',
role=sagemaker.get_execution_role(),
framework_version='2.2.0',
py_version='py37',
train_instance_count=2,
train_instance_type='ml.g4dn.xlarge',
distributions={
'parameter_server': { 'enabled': True }
},
hyperparameters={
'dropout': 0.2,
},
metric_definitions=[
{'Name':'test:accuracy', 'Regex':'train accuracy: (.*?),'},
{'Name':'test:loss', 'Regex':'train loss: (.*?);'}
],
enable_sagemaker_metrics=True,
)
inputs = sagemaker.inputs.s3_input(f's3://{os.environ["DATASETS_BUCKET"]}/{os.environ["DATASETS_KEY_PREFIX"]}')
estimator.fit(inputs={'training': inputs})
学習が終わるとoutput_path
にモデルが保存される。