SageMakerでPyTorchのモデルを学習させる

pythonpytorchmachinelearningaws

AWSの機械学習サービスSageMakerでPyTorchのモデルを学習させる。

コード

まず学習させるモデルとそれを呼び出すエントリーポイントになるコードを書く。全体のコードはGitHubにある。 実際の環境と同じSageMakerのコンテナをローカルで動かしてVSCodeのRemote Developmentで接続して開発すると入っていないパッケージは警告が出たりして良い。

VSCodeのRemote DevelopmentでSageMakerのコンテナ環境でモデルを開発する - sambaiz-net

モデル

以前作ったMNISTのモデルを使う。

PyTorchでMNISTする - sambaiz-net

$ cat model.py
import torch
from torch import nn, cuda
import torch.nn.functional as F
import torch.distributed as dist
import torch.optim as optim


class Model(nn.Module):
    def __init__(self, dropout):
        super(Model, self).__init__()
        self.conv1 = nn.Conv2d(1, 64, 5)  # -> 24x24
        self.pool1 = nn.MaxPool2d(2)  # -> 12x12
        self.conv2 = nn.Conv2d(64, 128, 5)  # -> 8x8
        self.dropout = nn.Dropout(p=dropout)
        self.dense = nn.Linear(128 * 8 * 8, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool1(x)
        x = F.relu(self.conv2(x))
        x = self.dropout(x)
        x = x.view(x.size(0), -1)  # Flatten
        return F.relu(self.dense(x))


def _average_gradients(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size


def train(model, train_loader, device, is_distributed, lr, momentum):
    optimizer = optim.SGD(model.parameters(), lr=lr, momentum=momentum)
    criterion = nn.CrossEntropyLoss()
    model.train()
    for data, target in train_loader:
        data, target = data.to(device), target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        if is_distributed and not cuda.is_available():
            # average gradients manually for multi-machine cpu case only
            _average_gradients(model)
        optimizer.step()


def test(model, test_loader, device):
    model.eval()
    correct = 0
    loss_sum = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss_sum += F.nll_loss(output, target, reduction='sum').cpu().data  # sum up batch loss
            pred = output.max(1, keepdim=True)[1]  # get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
    accuracy = correct / len(test_loader.dataset)
    loss = loss_sum / len(test_loader.dataset)
    return accuracy, loss

エントリーポイント

サンプルコードをベースに 環境変数や引数でパラメータを受け取り、GPUや分散学習のための準備を行う。 fit()のinputで渡したS3のファイルは${SM_CHANNEL_XXXX}のパスにダウンロードされていて、${SM_MODEL_DIR}に置いたファイルが保存される。 ホスティング時にEIを用いる場合model.ptという名前のTorchScriptとして保存する必要がある。

SageMakerで学習したPyTorchのモデルをElastic Inferenceを有効にしてデプロイする - sambaiz-net

import logging
import torch
from torchvision import datasets, transforms
import torch.distributed as dist
import os
import model as md
import argparse
import sys
import json

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler(sys.stdout))


def _make_train_loader(batch_size, data_dir, is_distributed, **kwargs):
    dataset = datasets.MNIST(data_dir, train=True, transform=transforms.ToTensor(), download=False)
    train_sampler = torch.utils.data.distributed.DistributedSampler(dataset) if is_distributed else None
    return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=train_sampler is None, sampler=train_sampler, **kwargs)


def _make_test_loader(batch_size, data_dir, **kwargs):
    dataset = datasets.MNIST(data_dir, train=False, transform=transforms.ToTensor(), download=False)
    return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, **kwargs)


def train(args):
    is_distributed = len(args.hosts) > 1 and args.backend is not None
    use_cuda = args.num_gpus > 0
    kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
    device = torch.device("cuda" if use_cuda else "cpu")

    if is_distributed:
        # Initialize the distributed environment.
        world_size = len(args.hosts)
        os.environ['WORLD_SIZE'] = str(world_size)
        host_rank = args.hosts.index(args.current_host)
        os.environ['RANK'] = str(host_rank)
        dist.init_process_group(backend=args.backend, rank=host_rank, world_size=world_size)

    # set the seed for generating random numbers
    torch.manual_seed(args.seed)
    if use_cuda:
        torch.cuda.manual_seed(args.seed)

    model = md.Model(args.dropout).to(device)
    if is_distributed and use_cuda:
        # multi-machine multi-gpu case
        model = torch.nn.parallel.DistributedDataParallel(model)
    else:
        # single-machine multi-gpu case or single-machine or multi-machine cpu case
        model = torch.nn.DataParallel(model)

    train_loader = _make_train_loader(args.batch_size, args.data_dir, is_distributed, **kwargs)
    test_loader = _make_test_loader(args.test_batch_size, args.data_dir, **kwargs)

    for epoch in range(1, args.epochs + 1):
        logger.info(f'epoch: {epoch}/{args.epochs}')
        md.train(model, train_loader, device, is_distributed, args.lr, args.momentum)
        test_accuracy, test_loss = md.test(model, test_loader, device)
        logger.info(f'test accuracy: {test_accuracy}, test loss: {test_loss};')
    save_model(model, args.model_dir)


def save_model(model, model_dir):
    path = os.path.join(model_dir, 'model.pt')
    torch.jit.save(torch.jit.script(model.module), path)


if __name__ == '__main__':
    parser = argparse.ArgumentParser()

    # hyperparameters
    parser.add_argument('--batch-size', type=int, default=64, metavar='N', help='input batch size for training (default: 64)')
    parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N', help='input batch size for testing (default: 1000)')
    parser.add_argument('--epochs', type=int, default=10, metavar='N', help='number of epochs to train (default: 10)')
    parser.add_argument('--seed', type=int, default=1, metavar='S', help='random seed (default: 1)')
    parser.add_argument('--backend', type=str, default=None, help='backend for distributed training (tcp, gloo on cpu and gloo, nccl on gpu)')
    # model params
    parser.add_argument('--dropout', type=float, default=0.5, metavar='DROP', help='dropout rate (default: 0.5)')
    # optimizer params
    parser.add_argument('--lr', type=float, default=0.01, metavar='LR', help='learning rate (default: 0.01)')
    parser.add_argument('--momentum', type=float, default=0.5, metavar='M', help='SGD momentum (default: 0.5)')

    # Container environment
    parser.add_argument('--hosts', type=str, default=os.environ.get('SM_HOSTS', '[]'))
    parser.add_argument('--current-host', type=str, default=os.environ.get('SM_CURRENT_HOST'))
    parser.add_argument('--model-dir', type=str, default=os.environ.get('SM_MODEL_DIR', '.'))
    parser.add_argument('--num-gpus', type=int, default=os.environ.get('SM_NUM_GPUS', 0))

    # fit() inputs (SM_CHANNEL_XXXX)
    parser.add_argument('--data-dir', type=str, default=os.environ.get('SM_CHANNEL_TRAINING', 'mnist'))

    args = parser.parse_args()
    args.hosts = json.loads(args.hosts)

    train(args)

ローカル実行

ローカルで実行できることを確認する。

$ cat Makefile
download_datasets:
	python download_datasets.py

local_run: download_datasets
	python main.py

$ cat download_datasets.py
from torchvision import datasets

if __name__ == '__main__':
    dataset = datasets.MNIST('.', train=True, download=True)
    dataset = datasets.MNIST('.', train=False, download=True)
$ make local_run
epoch: 1/10
test_accuracy: 0.9593, test_loss_avg: -10.719537734985352
...

実行

SageMaker Studioから実行した。

SageMaker Studioへのオンボード

SageMaker Studioを初めて使う場合はユーザー名とS3などにアクセスするためのExecution roleなどを設定する必要がある。QuickstartではIAM認証になるが、Standard setupでSSO認証にすることもできる。

Onboard

StatusがReadyになるとユーザー一覧からOpen Studioできる。

Onboard

前準備

必要なパッケージをインストールし、データセットをS3に上げる。

import sys
!{sys.executable} -m pip install sagemaker-experiments
!{sys.executable} -m pip install torch
!{sys.executable} -m pip install torchvision
!apt-get install -y make

import os
sess = boto3.Session()
account_id = sess.client('sts').get_caller_identity()["Account"]
os.environ['DATASETS_BUCKET'] = f'sagemaker-test-mnist-{sess.region_name}-{account_id}'
os.environ['DATASETS_KEY_PREFIX'] = 'mnist'
!make upload_datasets

学習

PyTorchのEstimatorを作成し、fit()でTraining jobsを開始する。get_execution_role()はNotebookインスタンスのroleのARNを返す関数で、それ以外の環境で実行するとエラーになる。

Training jobs

from sagemaker.pytorch import PyTorch
import sagemaker

estimator = PyTorch(
    source_dir='/root/sagemaker-pytorch-mnist',
    entry_point='main.py',
    output_path=f's3://{os.environ["DATASETS_BUCKET"]}/artifacts',
    role=sagemaker.get_execution_role(),
    framework_version='1.3.1',
    py_version='py3',
    train_instance_count=2,
    train_instance_type='ml.g4dn.xlarge',
    hyperparameters={
        'epochs': 10,
        'backend': 'gloo',
        'dropout': 0.2,
    },
    metric_definitions=[
        {'Name':'test:accuracy', 'Regex':'test accuracy: (.*?),'},
        {'Name':'test:loss', 'Regex':'test loss: (.*?);'}
    ],
    enable_sagemaker_metrics=True,
)

inputs = sagemaker.inputs.s3_input(f's3://{os.environ["DATASETS_BUCKET"]}/{os.environ["DATASETS_KEY_PREFIX"]}')

estimator.fit(inputs={'training': inputs})

出力したログの正規表現で一致した部分の値がメトリクスとしてCloudWatchに送られる。

メトリクス

学習が終わるとoutput_pathにモデルが保存される。

SageMaker Studioの使っていないKernelを自動でシャットダウンするsagemaker-studio-auto-shutdown-extension - sambaiz-net