TensorflowのRNN(Recurrent Neural Networks)のチュートリアルのコードを読む。これは文章のそれまでの単語の履歴から、その次に続く単語を予測することで言語モデルを作るもの。
RNN/LSTMとは
RNNは入力に対して出力のほかに情報を次のステップに渡すことで時系列データで学習できるようにするネットワーク。 展開すると同じネットワークに単語を一つずつ入れていくように表現できる。
これを単純にMLPで実装しようとすると逆誤差伝搬する際に過去の層にも伝搬させる(BPTT: Backpropagation through time)必要があり、 時間を遡るほど活性化関数の微分係数が再帰的に繰り返し掛けられるため勾配が消失や爆発しやすくなってしまう。 また、時系列データのうちに発火したいものと発火したくないものが混在している場合、同じ重みにつながっているため更新を打ち消しあってしまう入力/出力重み衝突という問題もある。
これらを解決するのがLSTM(Long Short Term Memory networks)で、 勾配消失は活性化関数がxで重みが単位行列のニューロンの CEC(Constant Error Carousel) によって常に誤差に掛けられる係数を1にすることで防ぎ、 入力/出力重み衝突は必要な入出力を通したり不必要な情報は忘れさせるために値域(0,1)の値を掛ける input gate、forget gate、output gate によって回避する。gateは入力と前回の出力によって制御される。
TensorflowではいくつかLSTMの実装が用意されていて、 CudnnLSTM や BasicLSTMCell、LSTMBlockCell などがある。 cuDNNというのはNVIDIAのCUDAのDNNライブラリのこと。 LSTMBlockCell はもう少し複雑なLSTMで BasicLSTMCell よりも速い。
動かしてみる
$ git clone https://github.com/tensorflow/models.git
$ cd models/tutorials/rnn/ptb/
$ wget http://www.fit.vutbr.cz/~imikolov/rnnlm/simple-examples.tgz
$ tar xvf simple-examples.tgz
$ python3 -m venv env
$ . ./env/bin/activate
$ pip install numpy tensorflow
$ python ptb_word_lm.py --data_path=simple-examples/data/ --num_gpus=0
Epoch: 1 Learning rate: 1.000
0.004 perplexity: 5534.452 speed: 894 wps
0.104 perplexity: 845.383 speed: 1277 wps
...
0.803 perplexity: 316.808 speed: 1195 wps
0.903 perplexity: 298.087 speed: 1205 wps
Epoch: 1 Train Perplexity: 283.825
Epoch: 1 Valid Perplexity: 182.132
Epoch: 2 Learning rate: 1.000
...
Epoch: 4 Learning rate: 1.000
...
Epoch: 5 Learning rate: 0.500
...
Epoch: 6 Learning rate: 0.250
...
Epoch: 7 Learning rate: 0.125
...
Epoch: 13 Learning rate: 0.002
...
Test Perplexity: 121.759
reader
readerにはテストがあったので、これを使って実際にどんな出力をしているか見てみる。
ptb_raw_data
単語をIDに変換したものと語彙数が返る。
def setUp(self):
self._string_data = "\n".join(
[" hello there i am",
" rain as day",
" want some cheesy puffs ?"])
def testPtbRawData(self):
tmpdir = tf.test.get_temp_dir()
for suffix in "train", "valid", "test":
filename = os.path.join(tmpdir, "ptb.%s.txt" % suffix)
with tf.gfile.GFile(filename, "w") as fh:
fh.write(self._string_data)
# Smoke test
output = reader.ptb_raw_data(tmpdir)
print('output: {0}'.format(output))
# output: (
# [5, 10, 6, 1, 8, 2, 4, 11, 9, 3, 7, 0], # train
# [5, 10, 6, 1, 8, 2, 4, 11, 9, 3, 7, 0], # valid
# [5, 10, 6, 1, 8, 2, 4, 11, 9, 3, 7, 0], # test
# 12 # vocabulary
# )
self.assertEqual(len(output), 4)
print(word_to_id)
=> {'?': 0, 'am<eos>': 1, 'as': 2, 'cheesy': 3, 'day<eos>': 4, 'hello': 5, 'i': 6, 'puffs': 7, 'rain': 8, 'some': 9, 'there': 10, 'want': 11}
ptb_producer
session.runする度に時系列順に[batch_size, num_steps]のTensorを出力する。 二つ目の返り値は一つ右にずらしたもの。
def testPtbProducer(self):
raw_data = [
# t=0↓ t=1↓
4, 3, 2, 1, 0,
5, 6, 1, 1, 1,
1, 0, 3, 4, 1
]
batch_size = 3
num_steps = 2
x, y = reader.ptb_producer(raw_data, batch_size, num_steps)
with self.test_session() as session:
coord = tf.train.Coordinator()
tf.train.start_queue_runners(session, coord=coord)
try:
xval, yval = session.run([x, y])
self.assertAllEqual(xval, [[4, 3], [5, 6], [1, 0]])
self.assertAllEqual(yval, [[3, 2], [6, 1], [0, 3]])
xval, yval = session.run([x, y])
self.assertAllEqual(xval, [[2, 1], [1, 1], [3, 4]])
self.assertAllEqual(yval, [[1, 0], [1, 1], [4, 1]])
finally:
coord.request_stop()
coord.join()
実装はこんな感じ。
i = tf.train.range_input_producer(epoch_size, shuffle=False).dequeue()
x = tf.strided_slice(data, [0, i * num_steps],
[batch_size, (i + 1) * num_steps])
x.set_shape([batch_size, num_steps])
y = tf.strided_slice(data, [0, i * num_steps + 1],
[batch_size, (i + 1) * num_steps + 1])
y.set_shape([batch_size, num_steps])
range_input_producerはその名の通りrangeのように0から値を生成するが、 Threadを調整するCoordinatorを生成し、 start_queue_runnersに渡す必要がある。
# example of range_input_producer
with self.test_session() as session:
i = tf.train.range_input_producer(100, shuffle=False).dequeue()
coord = tf.train.Coordinator()
tf.train.start_queue_runners(session, coord=coord)
try:
print(session.run(i)) # => 0
print(session.run(i)) # => 1
print(session.run(i)) # => 2
finally:
coord.request_stop()
coord.join() # Wait for all the threads to terminate.
Model
入力の準備
embedding_lookupで embeddingから各stepの単語のものを抽出する。
with tf.device("/cpu:0"):
embedding = tf.get_variable(
"embedding", [vocab_size, size], dtype=data_type())
# shape=(batch_size, num_steps, size), dtype=float32
inputs = tf.nn.embedding_lookup(embedding, input_.input_data)
embedding_lookupの挙動はこんな感じ。
# example of embedding_lookup
with tf.Session() as session:
print(session.run(tf.nn.embedding_lookup(
[[0, 1], [2, 3], [4, 5], [6, 7], [8, 9], [10, 11]],
[[0,1,2], [3,4,5], [6,7,8]]
)))
# => [[ 0 2 4]
# [ 6 8 10]
# [ 1 3 5]]
学習中の場合、過学習を防ぐためkeep_prob残してDropoutし、RNNのグラフを作り始める。
if is_training and config.keep_prob < 1:
inputs = tf.nn.dropout(inputs, config.keep_prob)
output, state = self._build_rnn_graph(inputs, config, is_training)
RNNのグラフ
rnn_mode で実装を選べるようになっている。
def _build_rnn_graph(self, inputs, config, is_training):
if config.rnn_mode == CUDNN:
return self._build_rnn_graph_cudnn(inputs, config, is_training)
else:
return self._build_rnn_graph_lstm(inputs, config, is_training)
Cellは LSTMBlockCell を DroopoutWrapper でラップしたもの。 さらにこれをCellの出力が次のCellの入力になる MultiRNNCell で num_layers の数重ねている。
最初にzero_stateの 初期状態から num_steps まわして各stepでのoutputと最後のstateを返す。
def _get_lstm_cell(self, config, is_training):
if config.rnn_mode == BASIC:
return tf.contrib.rnn.BasicLSTMCell(
config.hidden_size, forget_bias=0.0, state_is_tuple=True,
reuse=not is_training)
if config.rnn_mode == BLOCK:
return tf.contrib.rnn.LSTMBlockCell(
config.hidden_size, forget_bias=0.0)
raise ValueError("rnn_mode %s not supported" % config.rnn_mode)
def _build_rnn_graph_lstm(self, inputs, config, is_training):
def make_cell():
cell = self._get_lstm_cell(config, is_training)
if is_training and config.keep_prob < 1:
cell = tf.contrib.rnn.DropoutWrapper(
cell, output_keep_prob=config.keep_prob)
return cell
cell = tf.contrib.rnn.MultiRNNCell(
[make_cell() for _ in range(config.num_layers)], state_is_tuple=True)
self._initial_state = cell.zero_state(config.batch_size, data_type())
state = self._initial_state
# [shape=(batch_size, hidden_size) dtype=float32, ...]
outputs = []
with tf.variable_scope("RNN"):
for time_step in range(self.num_steps):
if time_step > 0: tf.get_variable_scope().reuse_variables()
(cell_output, state) = cell(inputs[:, time_step, :], state)
outputs.append(cell_output)
# shape=(batch_size * num_steps, hidden_size), dtype=float32
output = tf.reshape(tf.concat(outputs, 1), [-1, config.hidden_size])
return output, state
コスト
出力層を通したのをlogits(log(p/(1-p)) (0≦p≦1))のシーケンスとして扱い、 sequence_lossで それぞれ交差エントロピーを求め、その和をコストとする。
output, state = self._build_rnn_graph(inputs, config, is_training)
softmax_w = tf.get_variable(
"softmax_w", [size, vocab_size], dtype=data_type())
softmax_b = tf.get_variable("softmax_b", [vocab_size], dtype=data_type())
logits = tf.nn.xw_plus_b(output, softmax_w, softmax_b)
# shape=(batch_size, num_steps, vocab_size), dtype=float32
logits = tf.reshape(logits, [self.batch_size, self.num_steps, vocab_size])
loss = tf.contrib.seq2seq.sequence_loss(
# logits: [batch_size, sequence_length=num_steps, num_decoder_symbols=vocab_size] and dtype float
# The logits correspond to the prediction across all classes at each timestep.
logits,
# targets: [batch_size, sequence_length=num_steps] and dtype int
# The target represents the true class at each timestep.
input_.targets,
# weights: [batch_size, sequence_length] and dtype float
# When using weights as masking, set all valid timesteps to 1 and all padded timesteps to 0
tf.ones([self.batch_size, self.num_steps], dtype=data_type()),
average_across_timesteps=False,
average_across_batch=True)
# Update the cost
self._cost = tf.reduce_sum(loss)
self._final_state = state
勾配
trainable_variablesで trainable=True (つまり_lr以外)のvariableを取得し、 gradientsで各variableに対しての勾配を求め、 clip_by_global_normで大きさを抑える。 これはgradient clippingと呼ばれる勾配爆発を防ぐための手法。
if not is_training:
return
self._lr = tf.Variable(0.0, trainable=False)
tvars = tf.trainable_variables()
grads, _ = tf.clip_by_global_norm(tf.gradients(self._cost, tvars),
config.max_grad_norm)
# example of trainable_variables
with tf.Session() as session:
a = tf.Variable(10.0, trainable=False)
b = tf.Variable(20.0)
c = tf.get_variable("c", [2, 2])
d = tf.get_variable("d", [3, 3], trainable=False)
session.run(tf.global_variables_initializer())
print(session.run(tf.trainable_variables()))
# [20.0, array([[ 1.10110056, 0.6373167 ],
# [ 0.44673324, -0.11995673]], dtype=float32)]
# example of gradients & clip_by_global_norm
with tf.Session() as session:
xs = tf.Variable([10., 20., 30.])
ys = [xs ** 2 + 123, xs * 5]
grad = tf.gradients(ys,xs)
session.run(tf.global_variables_initializer())
print(session.run(grad)) # [20 + 5, 40 + 5, 60 + 5]
list_clipped, global_norm = session.run(tf.clip_by_global_norm(grad,2))
# global_norm = sqrt(sum([l2norm(t)**2 for t in t_list]))
# = sqrt(25 ** 2 + 45 ** 2 + 65 ** 2)
print(global_norm) # 82.9156
# t_list[i] * clip_norm / max(global_norm, clip_norm)
# = [25, 45, 65] * 2 / global_norm
print(list_clipped) # [0.60302269, 1.08544087, 1.56785905]
Optimize
学習率_lrの GradientDescenetOptimizer でoptimizeする。 apply_gradientsするたびに global_stepがインクリメントされる。
optimizer = tf.train.GradientDescentOptimizer(self._lr)
self._train_op = optimizer.apply_gradients(
zip(grads, tvars),
global_step=tf.train.get_or_create_global_step())
self._new_lr = tf.placeholder(
tf.float32, shape=[], name="new_learning_rate")
self._lr_update = tf.assign(self._lr, self._new_lr)
run_epoch
session.run() する。
fetches = {
"cost": model.cost,
"final_state": model.final_state,
}
if eval_op is not None:
fetches["eval_op"] = eval_op
for step in range(model.input.epoch_size):
feed_dict = {}
for i, (c, h) in enumerate(model.initial_state):
feed_dict[c] = state[i].c
feed_dict[h] = state[i].h
vals = session.run(fetches, feed_dict)
cost = vals["cost"]
state = vals["final_state"]
costs += cost
iters += model.input.num_steps
if verbose and step % (model.input.epoch_size // 10) == 10:
print("%.3f perplexity: %.3f speed: %.0f wps" %
(step * 1.0 / model.input.epoch_size, np.exp(costs / iters),
iters * model.input.batch_size * max(1, FLAGS.num_gpus) /
(time.time() - start_time)))
main
起点。学習率はmax_epochまで初期値で、それ以後のepochでは指数的に減少させていく。
- Threadを調整するCorrdinator
- variableを保存するSaver
- チェックポイントからセッションを再開するSessionManager
のラッパー。
追記(2018-07-01): Supervisorはdeprecatedになったので代わりにMonitoredSessionを使うべき。
with tf.Graph().as_default():
tf.train.import_meta_graph(metagraph)
for model in models.values():
model.import_ops()
sv = tf.train.Supervisor(logdir=FLAGS.save_path)
config_proto = tf.ConfigProto(allow_soft_placement=soft_placement)
with sv.managed_session(config=config_proto) as session:
for i in range(config.max_max_epoch):
lr_decay = config.lr_decay ** max(i + 1 - config.max_epoch, 0.0)
m.assign_lr(session, config.learning_rate * lr_decay)
print("Epoch: %d Learning rate: %.3f" % (i + 1, session.run(m.lr)))
train_perplexity = run_epoch(session, m, eval_op=m.train_op,
verbose=True)
print("Epoch: %d Train Perplexity: %.3f" % (i + 1, train_perplexity))
valid_perplexity = run_epoch(session, mvalid)
print("Epoch: %d Valid Perplexity: %.3f" % (i + 1, valid_perplexity))
test_perplexity = run_epoch(session, mtest)
print("Test Perplexity: %.3f" % test_perplexity)
if FLAGS.save_path:
print("Saving model to %s." % FLAGS.save_path)
sv.saver.save(session, FLAGS.save_path, global_step=sv.global_step)
TensorFlow/RNNで連続的な値を取る時系列データを予測する - sambaiz-net
参考
詳解 ディープラーニング ~TensorFlow・Kerasによる時系列データ処理~