Home | History | Annotate | Download | only in layers
      1 # Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 """Tests for layer wrappers."""
     16 
     17 from __future__ import absolute_import
     18 from __future__ import division
     19 from __future__ import print_function
     20 
     21 import numpy as np
     22 
     23 from tensorflow.python.keras._impl import keras
     24 from tensorflow.python.platform import test
     25 
     26 
     27 class TimeDistributedTest(test.TestCase):
     28 
     29   def test_timedistributed_dense(self):
     30     # first, test with Dense layer
     31     with self.test_session():
     32       model = keras.models.Sequential()
     33       model.add(
     34           keras.layers.TimeDistributed(
     35               keras.layers.Dense(2), input_shape=(3, 4)))
     36       model.compile(optimizer='rmsprop', loss='mse')
     37       model.fit(
     38           np.random.random((10, 3, 4)),
     39           np.random.random((10, 3, 2)),
     40           epochs=1,
     41           batch_size=10)
     42 
     43       # test config
     44       model.get_config()
     45 
     46   def test_timedistributed_static_batch_size(self):
     47     with self.test_session():
     48       model = keras.models.Sequential()
     49       model.add(
     50           keras.layers.TimeDistributed(
     51               keras.layers.Dense(2), input_shape=(3, 4), batch_size=10))
     52       model.compile(optimizer='rmsprop', loss='mse')
     53       model.fit(
     54           np.random.random((10, 3, 4)),
     55           np.random.random((10, 3, 2)),
     56           epochs=1,
     57           batch_size=10)
     58 
     59   def test_timedistributed_conv2d(self):
     60     # test with Conv2D
     61     with self.test_session():
     62       model = keras.models.Sequential()
     63       model.add(
     64           keras.layers.TimeDistributed(
     65               keras.layers.Conv2D(5, (2, 2), padding='same'),
     66               input_shape=(2, 4, 4, 3)))
     67       model.add(keras.layers.Activation('relu'))
     68       model.compile(optimizer='rmsprop', loss='mse')
     69       model.train_on_batch(
     70           np.random.random((1, 2, 4, 4, 3)), np.random.random((1, 2, 4, 4, 5)))
     71 
     72       model = keras.models.model_from_json(model.to_json())
     73       model.summary()
     74 
     75   def test_timedistributed_stacked(self):
     76     # test stacked layers
     77     with self.test_session():
     78       model = keras.models.Sequential()
     79       model.add(
     80           keras.layers.TimeDistributed(
     81               keras.layers.Dense(2), input_shape=(3, 4)))
     82       model.add(keras.layers.TimeDistributed(keras.layers.Dense(3)))
     83       model.add(keras.layers.Activation('relu'))
     84       model.compile(optimizer='rmsprop', loss='mse')
     85 
     86       model.fit(
     87           np.random.random((10, 3, 4)),
     88           np.random.random((10, 3, 3)),
     89           epochs=1,
     90           batch_size=10)
     91 
     92   def test_regularizers(self):
     93     with self.test_session():
     94       model = keras.models.Sequential()
     95       model.add(
     96           keras.layers.TimeDistributed(
     97               keras.layers.Dense(2, kernel_regularizer='l1'),
     98               input_shape=(3, 4)))
     99       model.add(keras.layers.Activation('relu'))
    100       model.compile(optimizer='rmsprop', loss='mse')
    101       self.assertEqual(len(model.losses), 1)
    102 
    103   def test_TimeDistributed_learning_phase(self):
    104     with self.test_session():
    105       # test layers that need learning_phase to be set
    106       np.random.seed(1234)
    107       x = keras.layers.Input(shape=(3, 2))
    108       y = keras.layers.TimeDistributed(
    109           keras.layers.Dropout(.999))(x, training=True)
    110       model = keras.models.Model(x, y)
    111       y = model.predict(np.random.random((10, 3, 2)))
    112       self.assertAllClose(np.mean(y), 0., atol=1e-1, rtol=1e-1)
    113 
    114   def test_TimeDistributed_batchnorm(self):
    115     with self.test_session():
    116       # test that wrapped BN updates still work.
    117       model = keras.models.Sequential()
    118       model.add(keras.layers.TimeDistributed(
    119           keras.layers.BatchNormalization(center=True, scale=True),
    120           name='bn',
    121           input_shape=(10, 2)))
    122       model.compile(optimizer='rmsprop', loss='mse')
    123       # Assert that mean and variance are 0 and 1.
    124       td = model.layers[0]
    125       self.assertAllClose(td.get_weights()[2], np.array([0, 0]))
    126       assert np.array_equal(td.get_weights()[3], np.array([1, 1]))
    127       # Train
    128       model.train_on_batch(np.random.normal(loc=2, scale=2, size=(1, 10, 2)),
    129                            np.broadcast_to(np.array([0, 1]), (1, 10, 2)))
    130       # Assert that mean and variance changed.
    131       assert not np.array_equal(td.get_weights()[2], np.array([0, 0]))
    132       assert not np.array_equal(td.get_weights()[3], np.array([1, 1]))
    133       # Verify input_map has one mapping from inputs to reshaped inputs.
    134       self.assertEqual(len(td._input_map.keys()), 1)
    135 
    136   def test_TimeDistributed_trainable(self):
    137     # test layers that need learning_phase to be set
    138     x = keras.layers.Input(shape=(3, 2))
    139     layer = keras.layers.TimeDistributed(keras.layers.BatchNormalization())
    140     _ = layer(x)
    141     assert len(layer.updates) == 2
    142     assert len(layer.trainable_weights) == 2
    143     layer.trainable = False
    144     assert not layer.updates
    145     assert not layer.trainable_weights
    146     layer.trainable = True
    147     assert len(layer.updates) == 2
    148     assert len(layer.trainable_weights) == 2
    149 
    150 
    151 class BidirectionalTest(test.TestCase):
    152 
    153   def test_bidirectional(self):
    154     rnn = keras.layers.SimpleRNN
    155     samples = 2
    156     dim = 2
    157     timesteps = 2
    158     output_dim = 2
    159     with self.test_session():
    160       for mode in ['sum', 'concat', 'ave', 'mul']:
    161         x = np.random.random((samples, timesteps, dim))
    162         target_dim = 2 * output_dim if mode == 'concat' else output_dim
    163         y = np.random.random((samples, target_dim))
    164 
    165         # test with Sequential model
    166         model = keras.models.Sequential()
    167         model.add(
    168             keras.layers.Bidirectional(
    169                 rnn(output_dim), merge_mode=mode, input_shape=(timesteps, dim)))
    170         model.compile(loss='mse', optimizer='sgd')
    171         model.fit(x, y, epochs=1, batch_size=1)
    172 
    173         # test compute output shape
    174         ref_shape = model.layers[-1].output.get_shape()
    175         shape = model.layers[-1].compute_output_shape(
    176             (None, timesteps, dim))
    177         self.assertListEqual(shape.as_list(), ref_shape.as_list())
    178 
    179         # test config
    180         model.get_config()
    181         model = keras.models.model_from_json(model.to_json())
    182         model.summary()
    183 
    184   def test_bidirectional_weight_loading(self):
    185     rnn = keras.layers.SimpleRNN
    186     samples = 2
    187     dim = 2
    188     timesteps = 2
    189     output_dim = 2
    190     with self.test_session():
    191       x = np.random.random((samples, timesteps, dim))
    192       model = keras.models.Sequential()
    193       model.add(
    194           keras.layers.Bidirectional(
    195               rnn(output_dim), input_shape=(timesteps, dim)))
    196       y_ref = model.predict(x)
    197       weights = model.layers[-1].get_weights()
    198       model.layers[-1].set_weights(weights)
    199       y = model.predict(x)
    200       self.assertAllClose(y, y_ref)
    201 
    202   def test_bidirectional_stacked(self):
    203     # test stacked bidirectional layers
    204     rnn = keras.layers.SimpleRNN
    205     samples = 2
    206     dim = 2
    207     timesteps = 2
    208     output_dim = 2
    209     mode = 'sum'
    210 
    211     with self.test_session():
    212       x = np.random.random((samples, timesteps, dim))
    213       target_dim = 2 * output_dim if mode == 'concat' else output_dim
    214       y = np.random.random((samples, target_dim))
    215 
    216       model = keras.models.Sequential()
    217       model.add(
    218           keras.layers.Bidirectional(
    219               rnn(output_dim, return_sequences=True),
    220               merge_mode=mode,
    221               input_shape=(timesteps, dim)))
    222       model.add(keras.layers.Bidirectional(rnn(output_dim), merge_mode=mode))
    223       model.compile(loss='mse', optimizer='sgd')
    224       model.fit(x, y, epochs=1, batch_size=1)
    225 
    226       # test with functional API
    227       inputs = keras.layers.Input((timesteps, dim))
    228       output = keras.layers.Bidirectional(
    229           rnn(output_dim), merge_mode=mode)(inputs)
    230       model = keras.models.Model(inputs, output)
    231       model.compile(loss='mse', optimizer='sgd')
    232       model.fit(x, y, epochs=1, batch_size=1)
    233 
    234   def test_bidirectional_statefulness(self):
    235     # Bidirectional and stateful
    236     rnn = keras.layers.SimpleRNN
    237     samples = 2
    238     dim = 2
    239     timesteps = 2
    240     output_dim = 2
    241     mode = 'sum'
    242 
    243     with self.test_session():
    244       x = np.random.random((samples, timesteps, dim))
    245       target_dim = 2 * output_dim if mode == 'concat' else output_dim
    246       y = np.random.random((samples, target_dim))
    247 
    248       inputs = keras.layers.Input(batch_shape=(1, timesteps, dim))
    249       output = keras.layers.Bidirectional(
    250           rnn(output_dim, stateful=True), merge_mode=mode)(inputs)
    251       model = keras.models.Model(inputs, output)
    252       model.compile(loss='mse', optimizer='sgd')
    253       model.fit(x, y, epochs=1, batch_size=1)
    254 
    255   def test_Bidirectional_merged_value(self):
    256     rnn = keras.layers.LSTM
    257     samples = 2
    258     dim = 5
    259     timesteps = 3
    260     units = 3
    261     x = [np.random.rand(samples, timesteps, dim)]
    262 
    263     with self.test_session():
    264       for merge_mode in ['sum', 'mul', 'ave', 'concat', None]:
    265         if merge_mode == 'sum':
    266           merge_func = lambda y, y_rev: y + y_rev
    267         elif merge_mode == 'mul':
    268           merge_func = lambda y, y_rev: y * y_rev
    269         elif merge_mode == 'ave':
    270           merge_func = lambda y, y_rev: (y + y_rev) / 2
    271         elif merge_mode == 'concat':
    272           merge_func = lambda y, y_rev: np.concatenate((y, y_rev), axis=-1)
    273         else:
    274           merge_func = lambda y, y_rev: [y, y_rev]
    275 
    276         # basic case
    277         inputs = keras.Input((timesteps, dim))
    278         layer = keras.layers.Bidirectional(
    279             rnn(units, return_sequences=True), merge_mode=merge_mode)
    280         f_merged = keras.backend.function([inputs], _to_list(layer(inputs)))
    281         f_forward = keras.backend.function([inputs],
    282                                            [layer.forward_layer.call(inputs)])
    283         f_backward = keras.backend.function(
    284             [inputs],
    285             [keras.backend.reverse(layer.backward_layer.call(inputs), 1)])
    286 
    287         y_merged = f_merged(x)
    288         y_expected = _to_list(merge_func(f_forward(x)[0], f_backward(x)[0]))
    289         assert len(y_merged) == len(y_expected)
    290         for x1, x2 in zip(y_merged, y_expected):
    291           self.assertAllClose(x1, x2, atol=1e-5)
    292 
    293         # test return_state
    294         inputs = keras.Input((timesteps, dim))
    295         layer = keras.layers.Bidirectional(
    296             rnn(units, return_state=True), merge_mode=merge_mode)
    297         f_merged = keras.backend.function([inputs], layer(inputs))
    298         f_forward = keras.backend.function([inputs],
    299                                            layer.forward_layer.call(inputs))
    300         f_backward = keras.backend.function([inputs],
    301                                             layer.backward_layer.call(inputs))
    302         n_states = len(layer.layer.states)
    303 
    304         y_merged = f_merged(x)
    305         y_forward = f_forward(x)
    306         y_backward = f_backward(x)
    307         y_expected = _to_list(merge_func(y_forward[0], y_backward[0]))
    308         assert len(y_merged) == len(y_expected) + n_states * 2
    309         for x1, x2 in zip(y_merged, y_expected):
    310           self.assertAllClose(x1, x2, atol=1e-5)
    311 
    312         y_merged = y_merged[-n_states * 2:]
    313         y_forward = y_forward[-n_states:]
    314         y_backward = y_backward[-n_states:]
    315         for state_birnn, state_inner in zip(y_merged, y_forward + y_backward):
    316           self.assertAllClose(state_birnn, state_inner, atol=1e-5)
    317 
    318   def test_Bidirectional_dropout(self):
    319     rnn = keras.layers.LSTM
    320     samples = 2
    321     dim = 5
    322     timesteps = 3
    323     units = 3
    324     merge_mode = 'sum'
    325     x = [np.random.rand(samples, timesteps, dim)]
    326 
    327     with self.test_session():
    328       inputs = keras.Input((timesteps, dim))
    329       wrapped = keras.layers.Bidirectional(
    330           rnn(units, dropout=0.2, recurrent_dropout=0.2), merge_mode=merge_mode)
    331       outputs = _to_list(wrapped(inputs, training=True))
    332       assert all(not getattr(x, '_uses_learning_phase') for x in outputs)
    333 
    334       inputs = keras.Input((timesteps, dim))
    335       wrapped = keras.layers.Bidirectional(
    336           rnn(units, dropout=0.2, return_state=True), merge_mode=merge_mode)
    337       outputs = _to_list(wrapped(inputs))
    338       assert all(x._uses_learning_phase for x in outputs)
    339 
    340       model = keras.Model(inputs, outputs)
    341       assert model.uses_learning_phase
    342       y1 = _to_list(model.predict(x))
    343       y2 = _to_list(model.predict(x))
    344       for x1, x2 in zip(y1, y2):
    345         self.assertAllClose(x1, x2, atol=1e-5)
    346 
    347   def test_Bidirectional_state_reuse(self):
    348     rnn = keras.layers.LSTM
    349     samples = 2
    350     dim = 5
    351     timesteps = 3
    352     units = 3
    353 
    354     with self.test_session():
    355       input1 = keras.layers.Input((timesteps, dim))
    356       layer = keras.layers.Bidirectional(
    357           rnn(units, return_state=True, return_sequences=True))
    358       state = layer(input1)[1:]
    359 
    360       # test passing invalid initial_state: passing a tensor
    361       input2 = keras.layers.Input((timesteps, dim))
    362       with self.assertRaises(ValueError):
    363         output = keras.layers.Bidirectional(
    364             rnn(units))(input2, initial_state=state[0])
    365 
    366       # test valid usage: passing a list
    367       output = keras.layers.Bidirectional(rnn(units))(input2,
    368                                                       initial_state=state)
    369       model = keras.models.Model([input1, input2], output)
    370       assert len(model.layers) == 4
    371       assert isinstance(model.layers[-1].input, list)
    372       inputs = [np.random.rand(samples, timesteps, dim),
    373                 np.random.rand(samples, timesteps, dim)]
    374       model.predict(inputs)
    375 
    376   def test_Bidirectional_trainable(self):
    377     # test layers that need learning_phase to be set
    378     with self.test_session():
    379       x = keras.layers.Input(shape=(3, 2))
    380       layer = keras.layers.Bidirectional(keras.layers.SimpleRNN(3))
    381       _ = layer(x)
    382       assert len(layer.trainable_weights) == 6
    383       layer.trainable = False
    384       assert not layer.trainable_weights
    385       layer.trainable = True
    386       assert len(layer.trainable_weights) == 6
    387 
    388 
    389 def _to_list(ls):
    390   if isinstance(ls, list):
    391     return ls
    392   else:
    393     return [ls]
    394 
    395 
    396 if __name__ == '__main__':
    397   test.main()
    398