AWSの機械学習サービスSageMakerでPyTorchのモデルを学習させる。
コード
まず学習させるモデルとそれを呼び出すエントリーポイントになるコードを書く。全体のコードはGitHubにある。 実際の環境と同じSageMakerのコンテナをローカルで動かしてVSCodeのRemote Developmentで接続して開発すると入っていないパッケージは警告が出たりして良い。
VSCodeのRemote DevelopmentでSageMakerのコンテナ環境でモデルを開発する - sambaiz-net
モデル
以前作ったMNISTのモデルを使う。
$ cat model.py
import torch
from torch import nn, cuda
import torch.nn.functional as F
import torch.distributed as dist
import torch.optim as optim
class Model(nn.Module):
def __init__(self, dropout):
super(Model, self).__init__()
self.conv1 = nn.Conv2d(1, 64, 5) # -> 24x24
self.pool1 = nn.MaxPool2d(2) # -> 12x12
self.conv2 = nn.Conv2d(64, 128, 5) # -> 8x8
self.dropout = nn.Dropout(p=dropout)
self.dense = nn.Linear(128 * 8 * 8, 10)
def forward(self, x):
x = F.relu(self.conv1(x))
x = self.pool1(x)
x = F.relu(self.conv2(x))
x = self.dropout(x)
x = x.view(x.size(0), -1) # Flatten
return F.relu(self.dense(x))
def _average_gradients(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
param.grad.data /= size
def train(model, train_loader, device, is_distributed, lr, momentum):
optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
criterion = nn.CrossEntropyLoss()
model.train()
for data, target in train_loader:
data, target = data.to(device), target.to(device)
optimizer.zero_grad()
output = model(data)
loss = criterion(output, target)
loss.backward()
if is_distributed and not cuda.is_available():
# average gradients manually for multi-machine cpu case only
_average_gradients(model)
optimizer.step()
def test(model, test_loader, device):
model.eval()
correct = 0
loss_sum = 0
with torch.no_grad():
for data, target in test_loader:
data, target = data.to(device), target.to(device)
output = model(data)
loss_sum += F.nll_loss(output, target, reduction='sum').cpu().data # sum up batch loss
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability
correct += pred.eq(target.view_as(pred)).sum().item()
accuracy = correct / len(test_loader.dataset)
loss = loss_sum / len(test_loader.dataset)
return accuracy, loss
エントリーポイント
サンプルコードをベースに
環境変数や引数でパラメータを受け取り、GPUや分散学習のための準備を行う。
fit()
のinputで渡したS3のファイルは${SM_CHANNEL_XXXX}
のパスにダウンロードされていて、${SM_MODEL_DIR}
に置いたファイルが保存される。
ホスティング時にEIを用いる場合model.pt
という名前のTorchScriptとして保存する必要がある。
SageMakerで学習したPyTorchのモデルをElastic Inferenceを有効にしてデプロイする - sambaiz-net
import logging
import torch
from torchvision import datasets, transforms
import torch.distributed as dist
import os
import model as md
import argparse
import sys
import json
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))
def _make_train_loader(batch_size, data_dir, is_distributed, **kwargs):
dataset = datasets.MNIST(data_dir, train=True, transform=transforms.ToTensor(), download=False)
train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, sampler=train_sampler, **kwargs)
def _make_test_loader(batch_size, data_dir, **kwargs):
dataset = datasets.MNIST(data_dir, train=False, transform=transforms.ToTensor(), download=False)
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, **kwargs)
def train(args):
is_distributed = len(args.hosts) > 1 and args.backend is not None
use_cuda = args.num_gpus > 0
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
device = torch.device("cuda" if use_cuda else "cpu")
if is_distributed:
# Initialize the distributed environment.
world_size = len(args.hosts)
os.environ['WORLD_SIZE'] = str(world_size)
host_rank = args.hosts.index(args.current_host)
os.environ['RANK'] = str(host_rank)
dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)
# set the seed for generating random numbers
torch.manual_seed(args.seed)
if use_cuda:
torch.cuda.manual_seed(args.seed)
model = md.Model(args.dropout).to(device)
if is_distributed and use_cuda:
# multi-machine multi-gpu case
model = torch.nn.parallel.DistributedDataParallel(model)
else:
# single-machine multi-gpu case or single-machine or multi-machine cpu case
model = torch.nn.DataParallel(model)
train_loader = _make_train_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)
test_loader = _make_test_loader(args.test_batch_size, args.data_dir, **kwargs)
for epoch in range(1, args.epochs + 1):
logger.info(f'epoch: {epoch}/{args.epochs}')
md.train(model, train_loader, device, is_distributed, args.lr, args.momentum)
test_accuracy, test_loss = md.test(model, test_loader, device)
logger.info(f'test accuracy: {test_accuracy}, test loss: {test_loss};')
save_model(model, args.model_dir)
def save_model(model, model_dir):
path = os.path.join(model_dir, 'model.pt')
torch.jit.save(torch.jit.script(model.module), path)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
# hyperparameters
parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N', help='number of epochs to train (default: 10)')
parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)')
parser.add_argument('--backend', type=str, default=None, help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')
# model params
parser.add_argument('--dropout', type=float, default=0.5, metavar='DROP', help='dropout rate (default: 0.5)')
# optimizer params
parser.add_argument('--lr', type=float, default=0.01, metavar='LR', help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M', help='SGD momentum (default: 0.5)')
# Container environment
parser.add_argument('--hosts', type=str, default=os.environ.get('SM_HOSTS', '[]'))
parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))
parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR', '.'))
parser.add_argument('--num-gpus', type=int, default=os.environ.get('SM_NUM_GPUS', 0))
# fit() inputs (SM_CHANNEL_XXXX)
parser.add_argument('--data-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAINING', 'mnist'))
args = parser.parse_args()
args.hosts = json.loads(args.hosts)
train(args)
ローカル実行
ローカルで実行できることを確認する。
$ cat Makefile
download_datasets:
python download_datasets.py
local_run: download_datasets
python main.py
$ cat download_datasets.py
from torchvision import datasets
if __name__ == '__main__':
dataset = datasets.MNIST('.', train=True, download=True)
dataset = datasets.MNIST('.', train=False, download=True)
$ make local_run
epoch: 1/10
test_accuracy: 0.9593, test_loss_avg: -10.719537734985352
...
実行
SageMaker Studioから実行した。
SageMaker Studioへのオンボード
SageMaker Studioを初めて使う場合はユーザー名とS3などにアクセスするためのExecution roleなどを設定する必要がある。QuickstartではIAM認証になるが、Standard setupでSSO認証にすることもできる。
StatusがReadyになるとユーザー一覧からOpen Studioできる。
前準備
必要なパッケージをインストールし、データセットをS3に上げる。
import sys
!{sys.executable} -m pip install sagemaker-experiments
!{sys.executable} -m pip install torch
!{sys.executable} -m pip install torchvision
!apt-get install -y make
import os
sess = boto3.Session()
account_id = sess.client('sts').get_caller_identity()["Account"]
os.environ['DATASETS_BUCKET'] = f'sagemaker-test-mnist-{sess.region_name}-{account_id}'
os.environ['DATASETS_KEY_PREFIX'] = 'mnist'
!make upload_datasets
学習
PyTorchのEstimatorを作成し、fit()
でTraining jobsを開始する。get_execution_role()はNotebookインスタンスのroleのARNを返す関数で、それ以外の環境で実行するとエラーになる。
from sagemaker.pytorch import PyTorch
import sagemaker
estimator = PyTorch(
source_dir='/root/sagemaker-pytorch-mnist',
entry_point='main.py',
output_path=f's3://{os.environ["DATASETS_BUCKET"]}/artifacts',
role=sagemaker.get_execution_role(),
framework_version='1.3.1',
py_version='py3',
train_instance_count=2,
train_instance_type='ml.g4dn.xlarge',
hyperparameters={
'epochs': 10,
'backend': 'gloo',
'dropout': 0.2,
},
metric_definitions=[
{'Name':'test:accuracy', 'Regex':'test accuracy: (.*?),'},
{'Name':'test:loss', 'Regex':'test loss: (.*?);'}
],
enable_sagemaker_metrics=True,
)
inputs = sagemaker.inputs.s3_input(f's3://{os.environ["DATASETS_BUCKET"]}/{os.environ["DATASETS_KEY_PREFIX"]}')
estimator.fit(inputs={'training': inputs})
出力したログの正規表現で一致した部分の値がメトリクスとしてCloudWatchに送られる。
学習が終わるとoutput_path
にモデルが保存される。
SageMaker Studioの使っていないKernelを自動でシャットダウンするsagemaker-studio-auto-shutdown-extension - sambaiz-net