1 # Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # ============================================================================== 15 """Tests for layer wrappers.""" 16 17 from __future__ import absolute_import 18 from __future__ import division 19 from __future__ import print_function 20 21 import numpy as np 22 23 from tensorflow.python.keras._impl import keras 24 from tensorflow.python.platform import test 25 26 27 class TimeDistributedTest(test.TestCase): 28 29 def test_timedistributed_dense(self): 30 # first, test with Dense layer 31 with self.test_session(): 32 model = keras.models.Sequential() 33 model.add( 34 keras.layers.TimeDistributed( 35 keras.layers.Dense(2), input_shape=(3, 4))) 36 model.compile(optimizer='rmsprop', loss='mse') 37 model.fit( 38 np.random.random((10, 3, 4)), 39 np.random.random((10, 3, 2)), 40 epochs=1, 41 batch_size=10) 42 43 # test config 44 model.get_config() 45 46 def test_timedistributed_static_batch_size(self): 47 with self.test_session(): 48 model = keras.models.Sequential() 49 model.add( 50 keras.layers.TimeDistributed( 51 keras.layers.Dense(2), input_shape=(3, 4), batch_size=10)) 52 model.compile(optimizer='rmsprop', loss='mse') 53 model.fit( 54 np.random.random((10, 3, 4)), 55 np.random.random((10, 3, 2)), 56 epochs=1, 57 batch_size=10) 58 59 def test_timedistributed_conv2d(self): 60 # test with Conv2D 61 with self.test_session(): 62 model = keras.models.Sequential() 63 model.add( 64 keras.layers.TimeDistributed( 65 keras.layers.Conv2D(5, (2, 2), padding='same'), 66 input_shape=(2, 4, 4, 3))) 67 model.add(keras.layers.Activation('relu')) 68 model.compile(optimizer='rmsprop', loss='mse') 69 model.train_on_batch( 70 np.random.random((1, 2, 4, 4, 3)), np.random.random((1, 2, 4, 4, 5))) 71 72 model = keras.models.model_from_json(model.to_json()) 73 model.summary() 74 75 def test_timedistributed_stacked(self): 76 # test stacked layers 77 with self.test_session(): 78 model = keras.models.Sequential() 79 model.add( 80 keras.layers.TimeDistributed( 81 keras.layers.Dense(2), input_shape=(3, 4))) 82 model.add(keras.layers.TimeDistributed(keras.layers.Dense(3))) 83 model.add(keras.layers.Activation('relu')) 84 model.compile(optimizer='rmsprop', loss='mse') 85 86 model.fit( 87 np.random.random((10, 3, 4)), 88 np.random.random((10, 3, 3)), 89 epochs=1, 90 batch_size=10) 91 92 def test_regularizers(self): 93 with self.test_session(): 94 model = keras.models.Sequential() 95 model.add( 96 keras.layers.TimeDistributed( 97 keras.layers.Dense(2, kernel_regularizer='l1'), 98 input_shape=(3, 4))) 99 model.add(keras.layers.Activation('relu')) 100 model.compile(optimizer='rmsprop', loss='mse') 101 self.assertEqual(len(model.losses), 1) 102 103 def test_TimeDistributed_learning_phase(self): 104 with self.test_session(): 105 # test layers that need learning_phase to be set 106 np.random.seed(1234) 107 x = keras.layers.Input(shape=(3, 2)) 108 y = keras.layers.TimeDistributed( 109 keras.layers.Dropout(.999))(x, training=True) 110 model = keras.models.Model(x, y) 111 y = model.predict(np.random.random((10, 3, 2))) 112 self.assertAllClose(np.mean(y), 0., atol=1e-1, rtol=1e-1) 113 114 def test_TimeDistributed_batchnorm(self): 115 with self.test_session(): 116 # test that wrapped BN updates still work. 117 model = keras.models.Sequential() 118 model.add(keras.layers.TimeDistributed( 119 keras.layers.BatchNormalization(center=True, scale=True), 120 name='bn', 121 input_shape=(10, 2))) 122 model.compile(optimizer='rmsprop', loss='mse') 123 # Assert that mean and variance are 0 and 1. 124 td = model.layers[0] 125 self.assertAllClose(td.get_weights()[2], np.array([0, 0])) 126 assert np.array_equal(td.get_weights()[3], np.array([1, 1])) 127 # Train 128 model.train_on_batch(np.random.normal(loc=2, scale=2, size=(1, 10, 2)), 129 np.broadcast_to(np.array([0, 1]), (1, 10, 2))) 130 # Assert that mean and variance changed. 131 assert not np.array_equal(td.get_weights()[2], np.array([0, 0])) 132 assert not np.array_equal(td.get_weights()[3], np.array([1, 1])) 133 # Verify input_map has one mapping from inputs to reshaped inputs. 134 self.assertEqual(len(td._input_map.keys()), 1) 135 136 def test_TimeDistributed_trainable(self): 137 # test layers that need learning_phase to be set 138 x = keras.layers.Input(shape=(3, 2)) 139 layer = keras.layers.TimeDistributed(keras.layers.BatchNormalization()) 140 _ = layer(x) 141 assert len(layer.updates) == 2 142 assert len(layer.trainable_weights) == 2 143 layer.trainable = False 144 assert not layer.updates 145 assert not layer.trainable_weights 146 layer.trainable = True 147 assert len(layer.updates) == 2 148 assert len(layer.trainable_weights) == 2 149 150 151 class BidirectionalTest(test.TestCase): 152 153 def test_bidirectional(self): 154 rnn = keras.layers.SimpleRNN 155 samples = 2 156 dim = 2 157 timesteps = 2 158 output_dim = 2 159 with self.test_session(): 160 for mode in ['sum', 'concat', 'ave', 'mul']: 161 x = np.random.random((samples, timesteps, dim)) 162 target_dim = 2 * output_dim if mode == 'concat' else output_dim 163 y = np.random.random((samples, target_dim)) 164 165 # test with Sequential model 166 model = keras.models.Sequential() 167 model.add( 168 keras.layers.Bidirectional( 169 rnn(output_dim), merge_mode=mode, input_shape=(timesteps, dim))) 170 model.compile(loss='mse', optimizer='sgd') 171 model.fit(x, y, epochs=1, batch_size=1) 172 173 # test compute output shape 174 ref_shape = model.layers[-1].output.get_shape() 175 shape = model.layers[-1].compute_output_shape( 176 (None, timesteps, dim)) 177 self.assertListEqual(shape.as_list(), ref_shape.as_list()) 178 179 # test config 180 model.get_config() 181 model = keras.models.model_from_json(model.to_json()) 182 model.summary() 183 184 def test_bidirectional_weight_loading(self): 185 rnn = keras.layers.SimpleRNN 186 samples = 2 187 dim = 2 188 timesteps = 2 189 output_dim = 2 190 with self.test_session(): 191 x = np.random.random((samples, timesteps, dim)) 192 model = keras.models.Sequential() 193 model.add( 194 keras.layers.Bidirectional( 195 rnn(output_dim), input_shape=(timesteps, dim))) 196 y_ref = model.predict(x) 197 weights = model.layers[-1].get_weights() 198 model.layers[-1].set_weights(weights) 199 y = model.predict(x) 200 self.assertAllClose(y, y_ref) 201 202 def test_bidirectional_stacked(self): 203 # test stacked bidirectional layers 204 rnn = keras.layers.SimpleRNN 205 samples = 2 206 dim = 2 207 timesteps = 2 208 output_dim = 2 209 mode = 'sum' 210 211 with self.test_session(): 212 x = np.random.random((samples, timesteps, dim)) 213 target_dim = 2 * output_dim if mode == 'concat' else output_dim 214 y = np.random.random((samples, target_dim)) 215 216 model = keras.models.Sequential() 217 model.add( 218 keras.layers.Bidirectional( 219 rnn(output_dim, return_sequences=True), 220 merge_mode=mode, 221 input_shape=(timesteps, dim))) 222 model.add(keras.layers.Bidirectional(rnn(output_dim), merge_mode=mode)) 223 model.compile(loss='mse', optimizer='sgd') 224 model.fit(x, y, epochs=1, batch_size=1) 225 226 # test with functional API 227 inputs = keras.layers.Input((timesteps, dim)) 228 output = keras.layers.Bidirectional( 229 rnn(output_dim), merge_mode=mode)(inputs) 230 model = keras.models.Model(inputs, output) 231 model.compile(loss='mse', optimizer='sgd') 232 model.fit(x, y, epochs=1, batch_size=1) 233 234 def test_bidirectional_statefulness(self): 235 # Bidirectional and stateful 236 rnn = keras.layers.SimpleRNN 237 samples = 2 238 dim = 2 239 timesteps = 2 240 output_dim = 2 241 mode = 'sum' 242 243 with self.test_session(): 244 x = np.random.random((samples, timesteps, dim)) 245 target_dim = 2 * output_dim if mode == 'concat' else output_dim 246 y = np.random.random((samples, target_dim)) 247 248 inputs = keras.layers.Input(batch_shape=(1, timesteps, dim)) 249 output = keras.layers.Bidirectional( 250 rnn(output_dim, stateful=True), merge_mode=mode)(inputs) 251 model = keras.models.Model(inputs, output) 252 model.compile(loss='mse', optimizer='sgd') 253 model.fit(x, y, epochs=1, batch_size=1) 254 255 def test_Bidirectional_merged_value(self): 256 rnn = keras.layers.LSTM 257 samples = 2 258 dim = 5 259 timesteps = 3 260 units = 3 261 x = [np.random.rand(samples, timesteps, dim)] 262 263 with self.test_session(): 264 for merge_mode in ['sum', 'mul', 'ave', 'concat', None]: 265 if merge_mode == 'sum': 266 merge_func = lambda y, y_rev: y + y_rev 267 elif merge_mode == 'mul': 268 merge_func = lambda y, y_rev: y * y_rev 269 elif merge_mode == 'ave': 270 merge_func = lambda y, y_rev: (y + y_rev) / 2 271 elif merge_mode == 'concat': 272 merge_func = lambda y, y_rev: np.concatenate((y, y_rev), axis=-1) 273 else: 274 merge_func = lambda y, y_rev: [y, y_rev] 275 276 # basic case 277 inputs = keras.Input((timesteps, dim)) 278 layer = keras.layers.Bidirectional( 279 rnn(units, return_sequences=True), merge_mode=merge_mode) 280 f_merged = keras.backend.function([inputs], _to_list(layer(inputs))) 281 f_forward = keras.backend.function([inputs], 282 [layer.forward_layer.call(inputs)]) 283 f_backward = keras.backend.function( 284 [inputs], 285 [keras.backend.reverse(layer.backward_layer.call(inputs), 1)]) 286 287 y_merged = f_merged(x) 288 y_expected = _to_list(merge_func(f_forward(x)[0], f_backward(x)[0])) 289 assert len(y_merged) == len(y_expected) 290 for x1, x2 in zip(y_merged, y_expected): 291 self.assertAllClose(x1, x2, atol=1e-5) 292 293 # test return_state 294 inputs = keras.Input((timesteps, dim)) 295 layer = keras.layers.Bidirectional( 296 rnn(units, return_state=True), merge_mode=merge_mode) 297 f_merged = keras.backend.function([inputs], layer(inputs)) 298 f_forward = keras.backend.function([inputs], 299 layer.forward_layer.call(inputs)) 300 f_backward = keras.backend.function([inputs], 301 layer.backward_layer.call(inputs)) 302 n_states = len(layer.layer.states) 303 304 y_merged = f_merged(x) 305 y_forward = f_forward(x) 306 y_backward = f_backward(x) 307 y_expected = _to_list(merge_func(y_forward[0], y_backward[0])) 308 assert len(y_merged) == len(y_expected) + n_states * 2 309 for x1, x2 in zip(y_merged, y_expected): 310 self.assertAllClose(x1, x2, atol=1e-5) 311 312 y_merged = y_merged[-n_states * 2:] 313 y_forward = y_forward[-n_states:] 314 y_backward = y_backward[-n_states:] 315 for state_birnn, state_inner in zip(y_merged, y_forward + y_backward): 316 self.assertAllClose(state_birnn, state_inner, atol=1e-5) 317 318 def test_Bidirectional_dropout(self): 319 rnn = keras.layers.LSTM 320 samples = 2 321 dim = 5 322 timesteps = 3 323 units = 3 324 merge_mode = 'sum' 325 x = [np.random.rand(samples, timesteps, dim)] 326 327 with self.test_session(): 328 inputs = keras.Input((timesteps, dim)) 329 wrapped = keras.layers.Bidirectional( 330 rnn(units, dropout=0.2, recurrent_dropout=0.2), merge_mode=merge_mode) 331 outputs = _to_list(wrapped(inputs, training=True)) 332 assert all(not getattr(x, '_uses_learning_phase') for x in outputs) 333 334 inputs = keras.Input((timesteps, dim)) 335 wrapped = keras.layers.Bidirectional( 336 rnn(units, dropout=0.2, return_state=True), merge_mode=merge_mode) 337 outputs = _to_list(wrapped(inputs)) 338 assert all(x._uses_learning_phase for x in outputs) 339 340 model = keras.Model(inputs, outputs) 341 assert model.uses_learning_phase 342 y1 = _to_list(model.predict(x)) 343 y2 = _to_list(model.predict(x)) 344 for x1, x2 in zip(y1, y2): 345 self.assertAllClose(x1, x2, atol=1e-5) 346 347 def test_Bidirectional_state_reuse(self): 348 rnn = keras.layers.LSTM 349 samples = 2 350 dim = 5 351 timesteps = 3 352 units = 3 353 354 with self.test_session(): 355 input1 = keras.layers.Input((timesteps, dim)) 356 layer = keras.layers.Bidirectional( 357 rnn(units, return_state=True, return_sequences=True)) 358 state = layer(input1)[1:] 359 360 # test passing invalid initial_state: passing a tensor 361 input2 = keras.layers.Input((timesteps, dim)) 362 with self.assertRaises(ValueError): 363 output = keras.layers.Bidirectional( 364 rnn(units))(input2, initial_state=state[0]) 365 366 # test valid usage: passing a list 367 output = keras.layers.Bidirectional(rnn(units))(input2, 368 initial_state=state) 369 model = keras.models.Model([input1, input2], output) 370 assert len(model.layers) == 4 371 assert isinstance(model.layers[-1].input, list) 372 inputs = [np.random.rand(samples, timesteps, dim), 373 np.random.rand(samples, timesteps, dim)] 374 model.predict(inputs) 375 376 def test_Bidirectional_trainable(self): 377 # test layers that need learning_phase to be set 378 with self.test_session(): 379 x = keras.layers.Input(shape=(3, 2)) 380 layer = keras.layers.Bidirectional(keras.layers.SimpleRNN(3)) 381 _ = layer(x) 382 assert len(layer.trainable_weights) == 6 383 layer.trainable = False 384 assert not layer.trainable_weights 385 layer.trainable = True 386 assert len(layer.trainable_weights) == 6 387 388 389 def _to_list(ls): 390 if isinstance(ls, list): 391 return ls 392 else: 393 return [ls] 394 395 396 if __name__ == '__main__': 397 test.main() 398