TPU(Tensor Processing Unit)は Google開発のニューラルネットワークの学習に特化したASIC(Application Specific Integrated Circuit)。 一般的なGPUと比べて15~30倍もの性能が出る らしく検索や翻訳などGoogleのサービスでも使われている。
TPUを使える環境として、無料で使えるJupyter NotebooksのGoogle Colabと GCPのCloud TPUがある。ColabのTPUも裏側ではCloud TPUが動いている。 Cloud TPUを直に使うとVMから接続して使うことになるので、TPUの料金に加えてVMの料金もかかる。
モデルのTPU対応
CNNのモデルをTPUEstimatorでTPUに対応させる。
EstimatorはTensorFlowの高レベルAPIで、 train()、 evaluate()、 predict()、 export_saved_model() といったモデルの学習から保存まで必要な機能を一通り提供する。
初めは比較的低レベルのAPIを使おうとしていたが、XLA(Accelerated Linear Algebra)によるコンパイルがうまくいかないなど様々な問題にあたって大変だったので使っておくと良いと思う。 それでもトライアンドエラーの繰り返しで、典型的なものはTroubleshootingにあるが、ないものは調べるなりしてなんとかやっていくしかない。
定数などの定義。BATCH_SIZEはCloud TPUのシャード数の8で割り切れる値にする必要がある。
import pandas as pd
from sklearn.model_selection import train_test_split
import tensorflow as tf
import numpy as np
flags = tf.app.flags
flags.DEFINE_boolean('use_tpu', True, 'use tpu or not')
tf.app.flags.DEFINE_string('f', '', 'kernel')
FLAGS = flags.FLAGS
EPOCH_NUM = 100
BATCH_SIZE = 800 # must be divisible by number of replicas 8
EVAL_BATCH_SIZE = 800
SHARD_NUM = 8 # A single Cloud TPU has 8 shards.
ITERATION_NUM = 100 # Number of training steps to run on the Cloud TPU before returning control.
入力データの準備
入力は関数で渡し、tf.data APIのdatasetを返せばイテレートしてくれる。
TensorFlowのtf.data API - sambaiz-net
batch()でdrop_remainderをTrueにして端数を切り捨てないと、shapeが確定せずコンパイルできない。
from google.colab import auth
from googleapiclient.discovery import build
from io import BytesIO
auth.authenticate_user()
bucket = "<bucket_name>"
gcs_service = build('storage', 'v1')
train_data = gcs_service.objects().get_media(bucket=bucket, object='train.csv').execute()
train = pd.read_csv(BytesIO(train_data))
MODEL_DIR = 'gs://{}/model/tpu'.format(bucket)
(x_train, x_valid, y_train, y_valid) = train_test_split(
train.drop('label', axis=1).values.reshape((-1, 28, 28, 1)).astype(np.float32),
np.identity(10)[train['label']].astype(np.float32),
test_size = 0.1, random_state = 100)
def train_input_fn(params):
dataset = tf.data.Dataset.from_tensor_slices(({'x': x_train}, y_train)) # (features, labels)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(params['batch_size'], drop_remainder=True)
return dataset
def valid_input_fn(params):
dataset = tf.data.Dataset.from_tensor_slices(({'x': x_valid}, y_valid))
dataset = dataset.batch(params['batch_size'], drop_remainder=True)
return dataset
入出力するファイルはローカルではなくGCSなどに置く必要があるのでCloud TPUからも読み書きできるようにする。
{
"domain": "global",
"reason": "forbidden",
"message": "service-******@cloud-tpu.iam.gserviceaccount.com does not have storage.objects.create access to <bucket_name>."
}
!gsutil acl ch -u service-*****@cloud-tpu.iam.gserviceaccount.com:WRITER gs://<bucket_name>
モデルの作成
Estimatorには次のシグネチャのmodel_fnを渡す。
引数のfeaturesとlabelsはinput_fnの返り値で、
modeはtf.estimator.ModeKeysのTRAIN
、EVAL
、PREDICT
で、
paramsはEstimator生成時に渡せるパラメータ。
optimizerはCrossShardOptimizerでwrapする。
TPUに対応しているopで作る必要がある。
def model_fn(features, labels, mode, params):
def metric_fn(labels, logits):
return {
'accuracy': tf.metrics.accuracy(
labels=tf.argmax(labels, axis=1), predictions=tf.argmax(logits, axis=1))
}
is_training = tf.equal(mode, tf.estimator.ModeKeys.TRAIN)
conv1 = tf.layers.conv2d(
inputs=features['x'],
filters=32,
kernel_size=[5, 5],
padding="same",
activation=tf.nn.relu)
pool1 = tf.layers.max_pooling2d(inputs=conv1, pool_size=[2, 2], strides=2)
conv2 = tf.layers.conv2d(
inputs=pool1,
filters=64,
kernel_size=[5, 5],
padding="same",
activation=tf.nn.relu)
pool2 = tf.layers.max_pooling2d(inputs=conv2, pool_size=[2, 2], strides=2)
pool2_flat = tf.layers.flatten(pool2)
dense = tf.layers.dense(inputs=pool2_flat, units=128, activation=tf.nn.relu)
dropout = tf.layers.dropout(
inputs=dense, rate=0.4, training=mode == tf.estimator.ModeKeys.TRAIN)
logits = tf.layers.dense(inputs=dropout, units=10)
loss = tf.losses.softmax_cross_entropy(
onehot_labels=labels, logits=logits)
if mode == tf.estimator.ModeKeys.EVAL:
return tf.contrib.tpu.TPUEstimatorSpec(mode, loss=loss,
eval_metrics=(metric_fn, [labels, logits]))
optimizer = tf.train.AdamOptimizer(0.01)
if FLAGS.use_tpu:
optimizer = tf.contrib.tpu.CrossShardOptimizer(optimizer)
return tf.contrib.tpu.TPUEstimatorSpec(
mode=mode,
loss=loss,
predictions={
'pred': tf.argmax(logits, axis=1)
},
train_op=optimizer.minimize(loss, tf.train.get_or_create_global_step()))
TPUEstimatorの生成
TPUのアドレスが環境変数COLAB_TPU_ADDRに入るのでこれをmasterとする。
if FLAGS.use_tpu:
master = 'grpc://' + os.environ['COLAB_TPU_ADDR']
run_config = tf.contrib.tpu.RunConfig(
master=master,
session_config=tf.ConfigProto(
allow_soft_placement=True, log_device_placement=True),
tpu_config=tf.contrib.tpu.TPUConfig(ITERATION_NUM, SHARD_NUM))
else:
run_config = tf.contrib.tpu.RunConfig()
classifier = tf.contrib.tpu.TPUEstimator(
model_fn=model_fn,
model_dir=MODEL_DIR,
config=run_config,
params={},
train_batch_size=BATCH_SIZE,
eval_batch_size=EVAL_BATCH_SIZE,
predict_batch_size=BATCH_SIZE,
use_tpu=FLAGS.use_tpu)
学習
for epoch in range(EPOCH_NUM):
max_steps = len(x_train)*(epoch+1)//BATCH_SIZE
valid_steps = len(x_valid)//EVAL_BATCH_SIZE
if FLAGS.use_tpu:
max_steps //= SHARD_NUM
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=max_steps)
eval_spec = tf.estimator.EvalSpec(input_fn=valid_input_fn, steps=valid_steps)
result = tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
if result[0] is not None:
print('epoch: {}, loss: {} accuracy: {}'.format(epoch+1, result[0]['loss'], result[0]['accuracy']))
else:
print('epoch: {} is already trained'.format(epoch+1))
ColabでTensorBoardを開く
このスクリプトを実行するとTensorBoardを立ち上げてngrokで外に開いてくれる。
!git clone https://github.com/mixuala/colab_utils
import os
import colab_utils.tboard
# set paths
ROOT = %pwd
colab_utils.tboard.launch_tensorboard(bin_dir=ROOT, log_dir=MODEL_DIR)
結果
学習させて実行時間を計測する。計測はセルの頭に%%time
を付けるとできる。
CPU
ベースライン。
CPU times: user 18min 28s, sys: 32.8 s, total: 19min 1s
Wall time: 14min 33s
GPU
ランタイムからアクセラレータをGPUに設定。使われるGPUはNVIDIAのTesla K80。
from tensorflow.python.client import device_lib
device_lib.list_local_devices()
# ...
# physical_device_desc: "device: 0, name: Tesla K80, pci bus id: 0000:00:04.0, compute capability: 3.7"]
結果。
CPU times: user 3min 2s, sys: 24.2 s, total: 3min 26s
Wall time: 8min 6s
TPU
アクセラレータをTPUに変更。 GPUより速くなることを期待したが、むしろCPUよりも遅くなってしまった。その上精度の伸びも遅くて良いところがない。
CPU times: user 3min, sys: 23.8 s, total: 3min 24s
Wall time: 15min
再チャレンジ
エポックの立ち上がりが遅いので学習を切らずに続けて行わせてみる。
def train_input_fn(params):
dataset = tf.data.Dataset.from_tensor_slices(({'x': x_train}, y_train)) # (features, labels)
dataset = dataset.shuffle(buffer_size=1000)
dataset = dataset.batch(params['batch_size'], drop_remainder=True)
dataset = dataset.repeat(EPOCH_NUM)
return dataset
epoch = EPOCH_NUM-1
max_steps = len(x_train)*(epoch+1)/BATCH_SIZE
valid_steps = len(x_valid)//EVAL_BATCH_SIZE
if FLAGS.use_tpu:
max_steps //= SHARD_NUM
train_spec = tf.estimator.TrainSpec(input_fn=train_input_fn, max_steps=max_steps)
eval_spec = tf.estimator.EvalSpec(input_fn=valid_input_fn, steps=valid_steps)
result = tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)
if result[0] is not None:
print('epoch: {}, loss: {} accuracy: {}'.format(epoch+1, result[0]['loss'], result[0]['accuracy']))
else:
print('epoch: {} is already trained'.format(epoch+1))
これを実行したところGPUと同程度には速くなった。
# GPU
CPU times: user 40.9 s, sys: 7.8 s, total: 48.7 s
Wall time: 1min 43s
# TPU
CPU times: user 35.2 s, sys: 4.46 s, total: 39.7 s
Wall time: 1min 34s
さらにEPOCH_NUMを5から100にして再計測する。
結果
GPU
CPU times: user 3min 27s, sys: 2min, total: 5min 28s
Wall time: 6min 5s
TPU
エポック数を大幅に増やしたのにも関わらずほとんど実行時間が変わらずとても速い。 CPU時間も変わっていないので演算がTPUで完結していてCPU-TPU間のデータの受け渡しが最小限で済んでいるのかもしれない。
CPU times: user 34.5 s, sys: 7.32 s, total: 41.8 s
Wall time: 1min 38s
参考
Google Colab Free GPU Tutorial – Deep Learning Turkey – Medium