Home | History | Annotate | Download | only in timeseries
      1 # Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 """Tests for the time series input pipeline."""
     16 
     17 from __future__ import absolute_import
     18 from __future__ import division
     19 from __future__ import print_function
     20 
     21 import csv
     22 import tempfile
     23 
     24 import numpy
     25 
     26 from tensorflow.contrib.timeseries.python.timeseries import input_pipeline
     27 from tensorflow.contrib.timeseries.python.timeseries import test_utils
     28 from tensorflow.contrib.timeseries.python.timeseries.feature_keys import TrainEvalFeatures
     29 
     30 from tensorflow.core.example import example_pb2
     31 from tensorflow.python.framework import dtypes
     32 from tensorflow.python.framework import errors
     33 from tensorflow.python.lib.io import tf_record
     34 from tensorflow.python.ops import parsing_ops
     35 from tensorflow.python.ops import variables
     36 from tensorflow.python.platform import test
     37 from tensorflow.python.training import coordinator as coordinator_lib
     38 from tensorflow.python.training import queue_runner_impl
     39 
     40 
     41 def _make_csv_temp_file(to_write, test_tmpdir):
     42   _, data_file = tempfile.mkstemp(dir=test_tmpdir)
     43   with open(data_file, "w") as f:
     44     csvwriter = csv.writer(f)
     45     for record in to_write:
     46       csvwriter.writerow(record)
     47   return data_file
     48 
     49 
     50 def _make_csv_time_series(num_features, num_samples, test_tmpdir):
     51   filename = _make_csv_temp_file(
     52       [[i] + [float(i) * 2. + feature_number
     53               for feature_number in range(num_features)]
     54        for i in range(num_samples)],
     55       test_tmpdir=test_tmpdir)
     56   return filename
     57 
     58 
     59 def _make_tfexample_series(num_features, num_samples, test_tmpdir):
     60   _, data_file = tempfile.mkstemp(dir=test_tmpdir)
     61   with tf_record.TFRecordWriter(data_file) as writer:
     62     for i in range(num_samples):
     63       example = example_pb2.Example()
     64       times = example.features.feature[TrainEvalFeatures.TIMES]
     65       times.int64_list.value.append(i)
     66       values = example.features.feature[TrainEvalFeatures.VALUES]
     67       values.float_list.value.extend(
     68           [float(i) * 2. + feature_number
     69            for feature_number in range(num_features)])
     70       writer.write(example.SerializeToString())
     71   return data_file
     72 
     73 
     74 def _make_numpy_time_series(num_features, num_samples):
     75   times = numpy.arange(num_samples)
     76   values = times[:, None] * 2. + numpy.arange(num_features)[None, :]
     77   return {TrainEvalFeatures.TIMES: times,
     78           TrainEvalFeatures.VALUES: values}
     79 
     80 
     81 class RandomWindowInputFnTests(test.TestCase):
     82 
     83   def _random_window_input_fn_test_template(
     84       self, time_series_reader, window_size, batch_size, num_features,
     85       discard_out_of_order=False):
     86     input_fn = input_pipeline.RandomWindowInputFn(
     87         time_series_reader=time_series_reader,
     88         window_size=window_size, batch_size=batch_size)
     89     result, _ = input_fn()
     90     init_op = variables.local_variables_initializer()
     91     with self.test_session() as session:
     92       coordinator = coordinator_lib.Coordinator()
     93       queue_runner_impl.start_queue_runners(session, coord=coordinator)
     94       session.run(init_op)
     95       features = session.run(result)
     96       coordinator.request_stop()
     97       coordinator.join()
     98     self.assertAllEqual([batch_size, window_size],
     99                         features[TrainEvalFeatures.TIMES].shape)
    100     for window_position in range(window_size - 1):
    101       for batch_position in range(batch_size):
    102         # Checks that all times are contiguous
    103         self.assertEqual(
    104             features[TrainEvalFeatures.TIMES][batch_position,
    105                                               window_position + 1],
    106             features[TrainEvalFeatures.TIMES][batch_position,
    107                                               window_position] + 1)
    108     self.assertAllEqual([batch_size, window_size, num_features],
    109                         features[TrainEvalFeatures.VALUES].shape)
    110     self.assertEqual("int64", features[TrainEvalFeatures.TIMES].dtype)
    111     for feature_number in range(num_features):
    112       self.assertAllEqual(
    113           features[TrainEvalFeatures.TIMES] * 2. + feature_number,
    114           features[TrainEvalFeatures.VALUES][:, :, feature_number])
    115     return features
    116 
    117   def _test_out_of_order(self, time_series_reader, discard_out_of_order):
    118     self._random_window_input_fn_test_template(
    119         time_series_reader=time_series_reader,
    120         num_features=1, window_size=2, batch_size=5,
    121         discard_out_of_order=discard_out_of_order)
    122 
    123   def test_csv_sort_out_of_order(self):
    124     filename = _make_csv_time_series(num_features=1, num_samples=50,
    125                                      test_tmpdir=self.get_temp_dir())
    126     time_series_reader = input_pipeline.CSVReader([filename])
    127     self._test_out_of_order(time_series_reader, discard_out_of_order=False)
    128 
    129   def test_tfexample_sort_out_of_order(self):
    130     filename = _make_tfexample_series(
    131         num_features=1, num_samples=50,
    132         test_tmpdir=self.get_temp_dir())
    133     time_series_reader = input_pipeline.TFExampleReader(
    134         [filename],
    135         features={
    136             TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature(
    137                 shape=[], dtype=dtypes.int64),
    138             TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature(
    139                 shape=[1], dtype=dtypes.float32)})
    140     self._test_out_of_order(time_series_reader, discard_out_of_order=False)
    141 
    142   def test_numpy_sort_out_of_order(self):
    143     data = _make_numpy_time_series(num_features=1, num_samples=50)
    144     time_series_reader = input_pipeline.NumpyReader(data)
    145     self._test_out_of_order(time_series_reader, discard_out_of_order=False)
    146 
    147   def test_csv_discard_out_of_order(self):
    148     filename = _make_csv_time_series(num_features=1, num_samples=50,
    149                                      test_tmpdir=self.get_temp_dir())
    150     time_series_reader = input_pipeline.CSVReader([filename])
    151     self._test_out_of_order(time_series_reader, discard_out_of_order=True)
    152 
    153   def test_csv_discard_out_of_order_window_equal(self):
    154     filename = _make_csv_time_series(num_features=1, num_samples=3,
    155                                      test_tmpdir=self.get_temp_dir())
    156     time_series_reader = input_pipeline.CSVReader([filename])
    157     self._random_window_input_fn_test_template(
    158         time_series_reader=time_series_reader,
    159         num_features=1, window_size=3, batch_size=5,
    160         discard_out_of_order=True)
    161 
    162   def test_csv_discard_out_of_order_window_too_large(self):
    163     filename = _make_csv_time_series(num_features=1, num_samples=2,
    164                                      test_tmpdir=self.get_temp_dir())
    165     time_series_reader = input_pipeline.CSVReader([filename])
    166     with self.assertRaises(errors.OutOfRangeError):
    167       self._random_window_input_fn_test_template(
    168           time_series_reader=time_series_reader,
    169           num_features=1, window_size=3, batch_size=5,
    170           discard_out_of_order=True)
    171 
    172   def test_csv_no_data(self):
    173     filename = _make_csv_time_series(num_features=1, num_samples=0,
    174                                      test_tmpdir=self.get_temp_dir())
    175     time_series_reader = input_pipeline.CSVReader([filename])
    176     with self.assertRaises(errors.OutOfRangeError):
    177       self._test_out_of_order(time_series_reader, discard_out_of_order=True)
    178 
    179   def test_numpy_discard_out_of_order(self):
    180     data = _make_numpy_time_series(num_features=1, num_samples=50)
    181     time_series_reader = input_pipeline.NumpyReader(data)
    182     self._test_out_of_order(time_series_reader, discard_out_of_order=True)
    183 
    184   def test_numpy_discard_out_of_order_window_equal(self):
    185     data = _make_numpy_time_series(num_features=1, num_samples=3)
    186     time_series_reader = input_pipeline.NumpyReader(data)
    187     self._random_window_input_fn_test_template(
    188         time_series_reader=time_series_reader,
    189         num_features=1, window_size=3, batch_size=5,
    190         discard_out_of_order=True)
    191 
    192   def test_numpy_discard_out_of_order_window_too_large(self):
    193     data = _make_numpy_time_series(num_features=1, num_samples=2)
    194     time_series_reader = input_pipeline.NumpyReader(data)
    195     with self.assertRaisesRegexp(ValueError, "only 2 records were available"):
    196       self._random_window_input_fn_test_template(
    197           time_series_reader=time_series_reader,
    198           num_features=1, window_size=3, batch_size=5,
    199           discard_out_of_order=True)
    200 
    201   def _test_multivariate(self, time_series_reader, num_features):
    202     self._random_window_input_fn_test_template(
    203         time_series_reader=time_series_reader,
    204         num_features=num_features,
    205         window_size=2,
    206         batch_size=5)
    207 
    208   def test_csv_multivariate(self):
    209     filename = _make_csv_time_series(num_features=2, num_samples=50,
    210                                      test_tmpdir=self.get_temp_dir())
    211     time_series_reader = input_pipeline.CSVReader(
    212         [filename],
    213         column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES,
    214                       TrainEvalFeatures.VALUES))
    215     self._test_multivariate(time_series_reader=time_series_reader,
    216                             num_features=2)
    217 
    218   def test_tfexample_multivariate(self):
    219     filename = _make_tfexample_series(
    220         num_features=2, num_samples=50,
    221         test_tmpdir=self.get_temp_dir())
    222     time_series_reader = input_pipeline.TFExampleReader(
    223         [filename],
    224         features={
    225             TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature(
    226                 shape=[], dtype=dtypes.int64),
    227             TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature(
    228                 shape=[2], dtype=dtypes.float32)})
    229     self._test_multivariate(time_series_reader=time_series_reader,
    230                             num_features=2)
    231 
    232   def test_numpy_multivariate(self):
    233     data = _make_numpy_time_series(num_features=3, num_samples=50)
    234     time_series_reader = input_pipeline.NumpyReader(data)
    235     self._test_multivariate(time_series_reader, num_features=3)
    236 
    237   def test_numpy_withbatch(self):
    238     data_nobatch = _make_numpy_time_series(num_features=4, num_samples=100)
    239     data = {feature_name: feature_value[None]
    240             for feature_name, feature_value in data_nobatch.items()}
    241     time_series_reader = input_pipeline.NumpyReader(data)
    242     self._random_window_input_fn_test_template(
    243         time_series_reader=time_series_reader,
    244         num_features=4,
    245         window_size=3,
    246         batch_size=5)
    247 
    248   def test_numpy_nobatch_nofeatures(self):
    249     data = _make_numpy_time_series(num_features=1, num_samples=100)
    250     data[TrainEvalFeatures.VALUES] = data[TrainEvalFeatures.VALUES][:, 0]
    251     time_series_reader = input_pipeline.NumpyReader(data)
    252     self._random_window_input_fn_test_template(
    253         time_series_reader=time_series_reader,
    254         num_features=1,
    255         window_size=16,
    256         batch_size=16)
    257 
    258 
    259 class WholeDatasetInputFnTests(test.TestCase):
    260 
    261   def _whole_dataset_input_fn_test_template(
    262       self, time_series_reader, num_features, num_samples):
    263     result, _ = input_pipeline.WholeDatasetInputFn(time_series_reader)()
    264     with self.test_session() as session:
    265       session.run(variables.local_variables_initializer())
    266       coordinator = coordinator_lib.Coordinator()
    267       queue_runner_impl.start_queue_runners(session, coord=coordinator)
    268       features = session.run(result)
    269       coordinator.request_stop()
    270       coordinator.join()
    271     self.assertEqual("int64", features[TrainEvalFeatures.TIMES].dtype)
    272     self.assertAllEqual(numpy.arange(num_samples, dtype=numpy.int64)[None, :],
    273                         features[TrainEvalFeatures.TIMES])
    274     for feature_number in range(num_features):
    275       self.assertAllEqual(
    276           features[TrainEvalFeatures.TIMES] * 2. + feature_number,
    277           features[TrainEvalFeatures.VALUES][:, :, feature_number])
    278 
    279   def test_csv(self):
    280     filename = _make_csv_time_series(num_features=3, num_samples=50,
    281                                      test_tmpdir=self.get_temp_dir())
    282     time_series_reader = input_pipeline.CSVReader(
    283         [filename],
    284         column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES,
    285                       TrainEvalFeatures.VALUES, TrainEvalFeatures.VALUES))
    286     self._whole_dataset_input_fn_test_template(
    287         time_series_reader=time_series_reader, num_features=3, num_samples=50)
    288 
    289   def test_csv_no_data(self):
    290     filename = _make_csv_time_series(num_features=1, num_samples=0,
    291                                      test_tmpdir=self.get_temp_dir())
    292     time_series_reader = input_pipeline.CSVReader([filename])
    293     with self.assertRaises(errors.OutOfRangeError):
    294       self._whole_dataset_input_fn_test_template(
    295           time_series_reader=time_series_reader, num_features=1, num_samples=50)
    296 
    297   def test_tfexample(self):
    298     filename = _make_tfexample_series(
    299         num_features=4, num_samples=100,
    300         test_tmpdir=self.get_temp_dir())
    301     time_series_reader = input_pipeline.TFExampleReader(
    302         [filename],
    303         features={
    304             TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature(
    305                 shape=[], dtype=dtypes.int64),
    306             TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature(
    307                 shape=[4], dtype=dtypes.float32)})
    308     self._whole_dataset_input_fn_test_template(
    309         time_series_reader=time_series_reader, num_features=4, num_samples=100)
    310 
    311   def test_numpy(self):
    312     data = _make_numpy_time_series(num_features=4, num_samples=100)
    313     time_series_reader = input_pipeline.NumpyReader(data)
    314     self._whole_dataset_input_fn_test_template(
    315         time_series_reader=time_series_reader, num_features=4, num_samples=100)
    316 
    317   def test_numpy_withbatch(self):
    318     data_nobatch = _make_numpy_time_series(num_features=4, num_samples=100)
    319     data = {feature_name: feature_value[None]
    320             for feature_name, feature_value in data_nobatch.items()}
    321     time_series_reader = input_pipeline.NumpyReader(data)
    322     self._whole_dataset_input_fn_test_template(
    323         time_series_reader=time_series_reader, num_features=4, num_samples=100)
    324 
    325   def test_numpy_nobatch_nofeatures(self):
    326     data = _make_numpy_time_series(num_features=1, num_samples=100)
    327     data[TrainEvalFeatures.VALUES] = data[TrainEvalFeatures.VALUES][:, 0]
    328     time_series_reader = input_pipeline.NumpyReader(data)
    329     self._whole_dataset_input_fn_test_template(
    330         time_series_reader=time_series_reader, num_features=1, num_samples=100)
    331 
    332 
    333 class AllWindowInputFnTests(test.TestCase):
    334 
    335   def _all_window_input_fn_test_template(
    336       self, time_series_reader, num_samples, window_size,
    337       original_numpy_features=None):
    338     input_fn = test_utils.AllWindowInputFn(
    339         time_series_reader=time_series_reader,
    340         window_size=window_size)
    341     features, _ = input_fn()
    342     init_op = variables.local_variables_initializer()
    343     with self.test_session() as session:
    344       coordinator = coordinator_lib.Coordinator()
    345       queue_runner_impl.start_queue_runners(session, coord=coordinator)
    346       session.run(init_op)
    347       chunked_times, chunked_values = session.run(
    348           [features[TrainEvalFeatures.TIMES],
    349            features[TrainEvalFeatures.VALUES]])
    350       coordinator.request_stop()
    351       coordinator.join()
    352     self.assertAllEqual([num_samples - window_size + 1, window_size],
    353                         chunked_times.shape)
    354     if original_numpy_features is not None:
    355       original_times = original_numpy_features[TrainEvalFeatures.TIMES]
    356       original_values = original_numpy_features[TrainEvalFeatures.VALUES]
    357       self.assertAllEqual(original_times, numpy.unique(chunked_times))
    358       self.assertAllEqual(original_values[chunked_times],
    359                           chunked_values)
    360 
    361   def test_csv(self):
    362     filename = _make_csv_time_series(num_features=1, num_samples=50,
    363                                      test_tmpdir=self.get_temp_dir())
    364     time_series_reader = input_pipeline.CSVReader(
    365         [filename],
    366         column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES))
    367     self._all_window_input_fn_test_template(
    368         time_series_reader=time_series_reader, num_samples=50, window_size=10)
    369 
    370   def test_numpy(self):
    371     data = _make_numpy_time_series(num_features=2, num_samples=31)
    372     time_series_reader = input_pipeline.NumpyReader(data)
    373     self._all_window_input_fn_test_template(
    374         time_series_reader=time_series_reader, original_numpy_features=data,
    375         num_samples=31, window_size=5)
    376 
    377 
    378 if __name__ == "__main__":
    379   test.main()
    380