TensorFlowのRNN(LSTM)のチュートリアルのコードを読む - sambaiz-net
チュートリアルで扱ったのは語彙数分の単語、つまり離散的な値だったが、今回は連続的な値を取る場合のモデルを作る。 全体のコードはここ。
入力
以下の関数によって生成した1次元のデータ列。 これをstrideした最後のデータ、つまり時系列的に次に来るものを予測させる。
def make_time_series_data(size):
data = []
for i in range(size):
data.append(sin(random.normalvariate(i,0.1)*0.1))
return np.reshape(np.array(data, dtype=np.float32), (size,1))
def make_batch(data, batch_size, num_steps, num_dimensions, name=None):
epoch_size = data.size // (batch_size*num_steps*num_dimensions)
data = np.lib.stride_tricks.as_strided(
data,
shape=
(epoch_size,
batch_size,
num_steps+1,
num_dimensions),
strides=(
4*batch_size*num_steps*num_dimensions,
4*num_steps*num_dimensions,
4*num_dimensions,
4 # bytes
),
writeable=False
)
return data[:, :, :-1], data[:, :, 1:]
モデル
input layerでLSTMのhidden_sizeに合わせて、output layerで予測値を得ている。 lossはMSE(Mean squared error)。OptimizerはGradientDecentOptimizerを使っている。
チュートリアルでは自力で各time_stepの値を入れていたが、 今回はdynamic_rnn()に任せている。
class Model(object):
def __init__(self, config, is_training=False):
# config
self.batch_size = config.batch_size
self.num_steps = config.num_steps
self.num_dimensions = config.num_dimensions
self.keep_prob = config.keep_prob
self.hidden_size = config.hidden_size
self.num_layers = config.num_layers
# placeholder
self.input = tf.placeholder(tf.float32, [None, self.num_steps, self.num_dimensions], name="input")
self.input_strided = tf.placeholder(tf.float32, [self.batch_size, self.num_steps, self.num_dimensions], name="input_strided")
self.lr = tf.placeholder(tf.float32, name="learning_rate")
# input layer
input = tf.reshape(self.input, [-1, self.num_dimensions])
input_w = tf.get_variable("input_w", [self.num_dimensions, self.hidden_size], dtype=tf.float32)
input_b = tf.get_variable("input_b", [self.hidden_size], dtype=tf.float32)
input = tf.nn.xw_plus_b(input, input_w, input_b)
input = tf.reshape(input, [self.batch_size, self.num_steps, self.hidden_size])
# LSTM layer
output, state = self._build_rnn_graph(input, is_training)
# output layer
output = tf.reshape(output, [-1, self.hidden_size])
output_w = tf.get_variable("output_w", [self.hidden_size, 1], dtype=tf.float32)
output_b = tf.get_variable("output_b", [1], dtype=tf.float32)
output = tf.nn.xw_plus_b(output, output_w, output_b)
self.output = tf.reshape(output, [self.batch_size, self.num_steps, 1])
self.cost = tf.reduce_mean(tf.square(self.output[:, -1, :] - self.input_strided[:, -1, :]))
self.train_op = tf.train.GradientDescentOptimizer(self.lr).minimize(self.cost, global_step=tf.train.get_or_create_global_step())
def _build_rnn_graph(self, input, is_training):
def make_cell():
cell = tf.contrib.rnn.LSTMBlockCell(
self.hidden_size, forget_bias=0.0)
if is_training and self.keep_prob < 1:
cell = tf.contrib.rnn.DropoutWrapper(
cell, output_keep_prob=self.keep_prob)
return cell
cell = tf.contrib.rnn.MultiRNNCell(
[make_cell() for _ in range(self.num_layers)], state_is_tuple=True)
initial_state = cell.zero_state(self.batch_size, tf.float32)
output, state = tf.nn.dynamic_rnn(cell, input, initial_state=initial_state)
return output, state
def learn(self, session, input, input_strided, learning_rate):
fetches = {
"cost": self.cost,
"train_op": self.train_op
}
feed_dict = {
self.input: input,
self.input_strided: input_strided,
self.lr: learning_rate
}
vals = session.run(fetches, feed_dict=feed_dict)
cost = vals["cost"]
return cost
def predict(self, session, input):
feed_dict = {
self.input: np.reshape(input, (1, input.shape[0], input.shape[1]))
}
return session.run(self.output, feed_dict=feed_dict)[0,-1]
ハイパーパラメータ
調整が難しい。max_epoch は学習率の初期値(learning_rate)で学習し続けるepochの数なわけなんだが、 これを大きくして一気にlossを減らしていこうとしたら案外簡単に収束しなくなってしまった。num_steps、つまり予測するのに見る数は今回のデータの場合そんなに大きくある必要はなくて、むしろ大きくすると平たいグラフになって最大値や最小値との誤差が大きくなった。hidden_size や num_layers は増やしても良さそうだがメモリが足りなかった。
class Config(object):
init_scale = 0.1
learning_rate = 1.0
num_layers = 2
num_steps = 5
hidden_size = 200
max_epoch = 3
batch_size = 3000
num_dimensions = 1
keep_prob = 1.0
lr_decay = 0.5
学習結果
300000個のデータから学習して予測させたところ 予測値は実際の値と比べて振幅が少なくなってしまった。