1 # Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # ============================================================================== 15 """Tests for the time series input pipeline.""" 16 17 from __future__ import absolute_import 18 from __future__ import division 19 from __future__ import print_function 20 21 import csv 22 import tempfile 23 24 import numpy 25 26 from tensorflow.contrib.timeseries.python.timeseries import input_pipeline 27 from tensorflow.contrib.timeseries.python.timeseries import test_utils 28 from tensorflow.contrib.timeseries.python.timeseries.feature_keys import TrainEvalFeatures 29 30 from tensorflow.core.example import example_pb2 31 from tensorflow.python.framework import dtypes 32 from tensorflow.python.framework import errors 33 from tensorflow.python.lib.io import tf_record 34 from tensorflow.python.ops import parsing_ops 35 from tensorflow.python.ops import variables 36 from tensorflow.python.platform import test 37 from tensorflow.python.training import coordinator as coordinator_lib 38 from tensorflow.python.training import queue_runner_impl 39 40 41 def _make_csv_temp_file(to_write, test_tmpdir): 42 _, data_file = tempfile.mkstemp(dir=test_tmpdir) 43 with open(data_file, "w") as f: 44 csvwriter = csv.writer(f) 45 for record in to_write: 46 csvwriter.writerow(record) 47 return data_file 48 49 50 def _make_csv_time_series(num_features, num_samples, test_tmpdir): 51 filename = _make_csv_temp_file( 52 [[i] + [float(i) * 2. + feature_number 53 for feature_number in range(num_features)] 54 for i in range(num_samples)], 55 test_tmpdir=test_tmpdir) 56 return filename 57 58 59 def _make_tfexample_series(num_features, num_samples, test_tmpdir): 60 _, data_file = tempfile.mkstemp(dir=test_tmpdir) 61 with tf_record.TFRecordWriter(data_file) as writer: 62 for i in range(num_samples): 63 example = example_pb2.Example() 64 times = example.features.feature[TrainEvalFeatures.TIMES] 65 times.int64_list.value.append(i) 66 values = example.features.feature[TrainEvalFeatures.VALUES] 67 values.float_list.value.extend( 68 [float(i) * 2. + feature_number 69 for feature_number in range(num_features)]) 70 writer.write(example.SerializeToString()) 71 return data_file 72 73 74 def _make_numpy_time_series(num_features, num_samples): 75 times = numpy.arange(num_samples) 76 values = times[:, None] * 2. + numpy.arange(num_features)[None, :] 77 return {TrainEvalFeatures.TIMES: times, 78 TrainEvalFeatures.VALUES: values} 79 80 81 class RandomWindowInputFnTests(test.TestCase): 82 83 def _random_window_input_fn_test_template( 84 self, time_series_reader, window_size, batch_size, num_features, 85 discard_out_of_order=False): 86 input_fn = input_pipeline.RandomWindowInputFn( 87 time_series_reader=time_series_reader, 88 window_size=window_size, batch_size=batch_size) 89 result, _ = input_fn() 90 init_op = variables.local_variables_initializer() 91 with self.test_session() as session: 92 coordinator = coordinator_lib.Coordinator() 93 queue_runner_impl.start_queue_runners(session, coord=coordinator) 94 session.run(init_op) 95 features = session.run(result) 96 coordinator.request_stop() 97 coordinator.join() 98 self.assertAllEqual([batch_size, window_size], 99 features[TrainEvalFeatures.TIMES].shape) 100 for window_position in range(window_size - 1): 101 for batch_position in range(batch_size): 102 # Checks that all times are contiguous 103 self.assertEqual( 104 features[TrainEvalFeatures.TIMES][batch_position, 105 window_position + 1], 106 features[TrainEvalFeatures.TIMES][batch_position, 107 window_position] + 1) 108 self.assertAllEqual([batch_size, window_size, num_features], 109 features[TrainEvalFeatures.VALUES].shape) 110 self.assertEqual("int64", features[TrainEvalFeatures.TIMES].dtype) 111 for feature_number in range(num_features): 112 self.assertAllEqual( 113 features[TrainEvalFeatures.TIMES] * 2. + feature_number, 114 features[TrainEvalFeatures.VALUES][:, :, feature_number]) 115 return features 116 117 def _test_out_of_order(self, time_series_reader, discard_out_of_order): 118 self._random_window_input_fn_test_template( 119 time_series_reader=time_series_reader, 120 num_features=1, window_size=2, batch_size=5, 121 discard_out_of_order=discard_out_of_order) 122 123 def test_csv_sort_out_of_order(self): 124 filename = _make_csv_time_series(num_features=1, num_samples=50, 125 test_tmpdir=self.get_temp_dir()) 126 time_series_reader = input_pipeline.CSVReader([filename]) 127 self._test_out_of_order(time_series_reader, discard_out_of_order=False) 128 129 def test_tfexample_sort_out_of_order(self): 130 filename = _make_tfexample_series( 131 num_features=1, num_samples=50, 132 test_tmpdir=self.get_temp_dir()) 133 time_series_reader = input_pipeline.TFExampleReader( 134 [filename], 135 features={ 136 TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature( 137 shape=[], dtype=dtypes.int64), 138 TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature( 139 shape=[1], dtype=dtypes.float32)}) 140 self._test_out_of_order(time_series_reader, discard_out_of_order=False) 141 142 def test_numpy_sort_out_of_order(self): 143 data = _make_numpy_time_series(num_features=1, num_samples=50) 144 time_series_reader = input_pipeline.NumpyReader(data) 145 self._test_out_of_order(time_series_reader, discard_out_of_order=False) 146 147 def test_csv_discard_out_of_order(self): 148 filename = _make_csv_time_series(num_features=1, num_samples=50, 149 test_tmpdir=self.get_temp_dir()) 150 time_series_reader = input_pipeline.CSVReader([filename]) 151 self._test_out_of_order(time_series_reader, discard_out_of_order=True) 152 153 def test_csv_discard_out_of_order_window_equal(self): 154 filename = _make_csv_time_series(num_features=1, num_samples=3, 155 test_tmpdir=self.get_temp_dir()) 156 time_series_reader = input_pipeline.CSVReader([filename]) 157 self._random_window_input_fn_test_template( 158 time_series_reader=time_series_reader, 159 num_features=1, window_size=3, batch_size=5, 160 discard_out_of_order=True) 161 162 def test_csv_discard_out_of_order_window_too_large(self): 163 filename = _make_csv_time_series(num_features=1, num_samples=2, 164 test_tmpdir=self.get_temp_dir()) 165 time_series_reader = input_pipeline.CSVReader([filename]) 166 with self.assertRaises(errors.OutOfRangeError): 167 self._random_window_input_fn_test_template( 168 time_series_reader=time_series_reader, 169 num_features=1, window_size=3, batch_size=5, 170 discard_out_of_order=True) 171 172 def test_csv_no_data(self): 173 filename = _make_csv_time_series(num_features=1, num_samples=0, 174 test_tmpdir=self.get_temp_dir()) 175 time_series_reader = input_pipeline.CSVReader([filename]) 176 with self.assertRaises(errors.OutOfRangeError): 177 self._test_out_of_order(time_series_reader, discard_out_of_order=True) 178 179 def test_numpy_discard_out_of_order(self): 180 data = _make_numpy_time_series(num_features=1, num_samples=50) 181 time_series_reader = input_pipeline.NumpyReader(data) 182 self._test_out_of_order(time_series_reader, discard_out_of_order=True) 183 184 def test_numpy_discard_out_of_order_window_equal(self): 185 data = _make_numpy_time_series(num_features=1, num_samples=3) 186 time_series_reader = input_pipeline.NumpyReader(data) 187 self._random_window_input_fn_test_template( 188 time_series_reader=time_series_reader, 189 num_features=1, window_size=3, batch_size=5, 190 discard_out_of_order=True) 191 192 def test_numpy_discard_out_of_order_window_too_large(self): 193 data = _make_numpy_time_series(num_features=1, num_samples=2) 194 time_series_reader = input_pipeline.NumpyReader(data) 195 with self.assertRaisesRegexp(ValueError, "only 2 records were available"): 196 self._random_window_input_fn_test_template( 197 time_series_reader=time_series_reader, 198 num_features=1, window_size=3, batch_size=5, 199 discard_out_of_order=True) 200 201 def _test_multivariate(self, time_series_reader, num_features): 202 self._random_window_input_fn_test_template( 203 time_series_reader=time_series_reader, 204 num_features=num_features, 205 window_size=2, 206 batch_size=5) 207 208 def test_csv_multivariate(self): 209 filename = _make_csv_time_series(num_features=2, num_samples=50, 210 test_tmpdir=self.get_temp_dir()) 211 time_series_reader = input_pipeline.CSVReader( 212 [filename], 213 column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES, 214 TrainEvalFeatures.VALUES)) 215 self._test_multivariate(time_series_reader=time_series_reader, 216 num_features=2) 217 218 def test_tfexample_multivariate(self): 219 filename = _make_tfexample_series( 220 num_features=2, num_samples=50, 221 test_tmpdir=self.get_temp_dir()) 222 time_series_reader = input_pipeline.TFExampleReader( 223 [filename], 224 features={ 225 TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature( 226 shape=[], dtype=dtypes.int64), 227 TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature( 228 shape=[2], dtype=dtypes.float32)}) 229 self._test_multivariate(time_series_reader=time_series_reader, 230 num_features=2) 231 232 def test_numpy_multivariate(self): 233 data = _make_numpy_time_series(num_features=3, num_samples=50) 234 time_series_reader = input_pipeline.NumpyReader(data) 235 self._test_multivariate(time_series_reader, num_features=3) 236 237 def test_numpy_withbatch(self): 238 data_nobatch = _make_numpy_time_series(num_features=4, num_samples=100) 239 data = {feature_name: feature_value[None] 240 for feature_name, feature_value in data_nobatch.items()} 241 time_series_reader = input_pipeline.NumpyReader(data) 242 self._random_window_input_fn_test_template( 243 time_series_reader=time_series_reader, 244 num_features=4, 245 window_size=3, 246 batch_size=5) 247 248 def test_numpy_nobatch_nofeatures(self): 249 data = _make_numpy_time_series(num_features=1, num_samples=100) 250 data[TrainEvalFeatures.VALUES] = data[TrainEvalFeatures.VALUES][:, 0] 251 time_series_reader = input_pipeline.NumpyReader(data) 252 self._random_window_input_fn_test_template( 253 time_series_reader=time_series_reader, 254 num_features=1, 255 window_size=16, 256 batch_size=16) 257 258 259 class WholeDatasetInputFnTests(test.TestCase): 260 261 def _whole_dataset_input_fn_test_template( 262 self, time_series_reader, num_features, num_samples): 263 result, _ = input_pipeline.WholeDatasetInputFn(time_series_reader)() 264 with self.test_session() as session: 265 session.run(variables.local_variables_initializer()) 266 coordinator = coordinator_lib.Coordinator() 267 queue_runner_impl.start_queue_runners(session, coord=coordinator) 268 features = session.run(result) 269 coordinator.request_stop() 270 coordinator.join() 271 self.assertEqual("int64", features[TrainEvalFeatures.TIMES].dtype) 272 self.assertAllEqual(numpy.arange(num_samples, dtype=numpy.int64)[None, :], 273 features[TrainEvalFeatures.TIMES]) 274 for feature_number in range(num_features): 275 self.assertAllEqual( 276 features[TrainEvalFeatures.TIMES] * 2. + feature_number, 277 features[TrainEvalFeatures.VALUES][:, :, feature_number]) 278 279 def test_csv(self): 280 filename = _make_csv_time_series(num_features=3, num_samples=50, 281 test_tmpdir=self.get_temp_dir()) 282 time_series_reader = input_pipeline.CSVReader( 283 [filename], 284 column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES, 285 TrainEvalFeatures.VALUES, TrainEvalFeatures.VALUES)) 286 self._whole_dataset_input_fn_test_template( 287 time_series_reader=time_series_reader, num_features=3, num_samples=50) 288 289 def test_csv_no_data(self): 290 filename = _make_csv_time_series(num_features=1, num_samples=0, 291 test_tmpdir=self.get_temp_dir()) 292 time_series_reader = input_pipeline.CSVReader([filename]) 293 with self.assertRaises(errors.OutOfRangeError): 294 self._whole_dataset_input_fn_test_template( 295 time_series_reader=time_series_reader, num_features=1, num_samples=50) 296 297 def test_tfexample(self): 298 filename = _make_tfexample_series( 299 num_features=4, num_samples=100, 300 test_tmpdir=self.get_temp_dir()) 301 time_series_reader = input_pipeline.TFExampleReader( 302 [filename], 303 features={ 304 TrainEvalFeatures.TIMES: parsing_ops.FixedLenFeature( 305 shape=[], dtype=dtypes.int64), 306 TrainEvalFeatures.VALUES: parsing_ops.FixedLenFeature( 307 shape=[4], dtype=dtypes.float32)}) 308 self._whole_dataset_input_fn_test_template( 309 time_series_reader=time_series_reader, num_features=4, num_samples=100) 310 311 def test_numpy(self): 312 data = _make_numpy_time_series(num_features=4, num_samples=100) 313 time_series_reader = input_pipeline.NumpyReader(data) 314 self._whole_dataset_input_fn_test_template( 315 time_series_reader=time_series_reader, num_features=4, num_samples=100) 316 317 def test_numpy_withbatch(self): 318 data_nobatch = _make_numpy_time_series(num_features=4, num_samples=100) 319 data = {feature_name: feature_value[None] 320 for feature_name, feature_value in data_nobatch.items()} 321 time_series_reader = input_pipeline.NumpyReader(data) 322 self._whole_dataset_input_fn_test_template( 323 time_series_reader=time_series_reader, num_features=4, num_samples=100) 324 325 def test_numpy_nobatch_nofeatures(self): 326 data = _make_numpy_time_series(num_features=1, num_samples=100) 327 data[TrainEvalFeatures.VALUES] = data[TrainEvalFeatures.VALUES][:, 0] 328 time_series_reader = input_pipeline.NumpyReader(data) 329 self._whole_dataset_input_fn_test_template( 330 time_series_reader=time_series_reader, num_features=1, num_samples=100) 331 332 333 class AllWindowInputFnTests(test.TestCase): 334 335 def _all_window_input_fn_test_template( 336 self, time_series_reader, num_samples, window_size, 337 original_numpy_features=None): 338 input_fn = test_utils.AllWindowInputFn( 339 time_series_reader=time_series_reader, 340 window_size=window_size) 341 features, _ = input_fn() 342 init_op = variables.local_variables_initializer() 343 with self.test_session() as session: 344 coordinator = coordinator_lib.Coordinator() 345 queue_runner_impl.start_queue_runners(session, coord=coordinator) 346 session.run(init_op) 347 chunked_times, chunked_values = session.run( 348 [features[TrainEvalFeatures.TIMES], 349 features[TrainEvalFeatures.VALUES]]) 350 coordinator.request_stop() 351 coordinator.join() 352 self.assertAllEqual([num_samples - window_size + 1, window_size], 353 chunked_times.shape) 354 if original_numpy_features is not None: 355 original_times = original_numpy_features[TrainEvalFeatures.TIMES] 356 original_values = original_numpy_features[TrainEvalFeatures.VALUES] 357 self.assertAllEqual(original_times, numpy.unique(chunked_times)) 358 self.assertAllEqual(original_values[chunked_times], 359 chunked_values) 360 361 def test_csv(self): 362 filename = _make_csv_time_series(num_features=1, num_samples=50, 363 test_tmpdir=self.get_temp_dir()) 364 time_series_reader = input_pipeline.CSVReader( 365 [filename], 366 column_names=(TrainEvalFeatures.TIMES, TrainEvalFeatures.VALUES)) 367 self._all_window_input_fn_test_template( 368 time_series_reader=time_series_reader, num_samples=50, window_size=10) 369 370 def test_numpy(self): 371 data = _make_numpy_time_series(num_features=2, num_samples=31) 372 time_series_reader = input_pipeline.NumpyReader(data) 373 self._all_window_input_fn_test_template( 374 time_series_reader=time_series_reader, original_numpy_features=data, 375 num_samples=31, window_size=5) 376 377 378 if __name__ == "__main__": 379 test.main() 380