1 # Copyright 2017 The TensorFlow Authors. All Rights Reserved. 2 # 3 # Licensed under the Apache License, Version 2.0 (the "License"); 4 # you may not use this file except in compliance with the License. 5 # You may obtain a copy of the License at 6 # 7 # http://www.apache.org/licenses/LICENSE-2.0 8 # 9 # Unless required by applicable law or agreed to in writing, software 10 # distributed under the License is distributed on an "AS IS" BASIS, 11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 # See the License for the specific language governing permissions and 13 # limitations under the License. 14 # ============================================================================== 15 """Defines ways of splicing and re-arranging time series. 16 17 This file provides methods for reading, parsing, and re-arranging a time 18 series. The main departure from standard TensorFlow input pipelines is a focus 19 on "chunking" a time series, i.e. slicing it into small contiguous windows which 20 are then batched together for training, a form of truncated 21 backpropagation. This typically provides a significant speedup compared to 22 looping over the whole series sequentially, by exploiting data parallelism and 23 by reducing redundant contributions to gradients (due to redundant information 24 in the series itself). 25 26 A series, consisting of times (an increasing vector of integers) and values (one 27 or more floating point values for each time) along with any exogenous features, 28 is stored either in memory or on disk in various formats (e.g. "one record per 29 timestep" on disk, or as a dictionary of Numpy arrays in memory). The location 30 and format is specified by configuring a `TimeSeriesReader` object 31 (e.g. `NumpyReader`, `CSVReader`), which reads the data into the TensorFlow 32 graph. A `TimeSeriesInputFn` object (typically `RandomWindowInputFn`) then 33 performs windowing and batching. 34 35 Time series are passed through this pipeline as dictionaries mapping feature 36 names to their values. For training and evaluation, these require at minimum 37 `TrainEvalFeatures.TIMES` (scalar integers, one per timestep) and 38 `TrainEvalFeatures.VALUES` (may be either univariate or multivariate). Exogenous 39 features may have any shape, but are likewise associated with a timestep. Times 40 themselves need not be contiguous or regular (although smaller/fewer gaps are 41 generally better), but each timestep must have all `VALUES` and any exogenous 42 features (i.e. times may be missing, but given that a time is specified, every 43 other feature must also be specified for that step; some models may support 44 making exogenous updates conditional). 45 46 The expected use case of a `TimeSeriesInputFn` is that it is first configured 47 (for example setting a batch or window size) and passed a reader (a 48 `TimeSeriesReader` object). The `TimeSeriesInputFn` can then be passed as the 49 input_fn of an Estimator. 50 51 For example, `RandomWindowInputFn` is useful for creating batches of random 52 chunks of a series for training: 53 54 ``` 55 # Read data in the default "time,value" CSV format with no header 56 reader = input_pipeline.CSVReader(csv_file_name) 57 # Set up windowing and batching for training 58 train_input_fn = input_pipeline.RandomWindowInputFn( 59 reader, batch_size=16, window_size=16) 60 # Fit model parameters to data 61 estimator.train(input_fn=train_input_fn, steps=150) 62 ``` 63 64 `RandomWindowInputFn` is the primary tool for training and quantitative 65 evaluation of time series. `WholeDatasetInputFn`, which reads a whole series 66 into memory, is useful for qualitative evaluation and preparing to make 67 predictions with `predict_continuation_input_fn`. 68 """ 69 70 from __future__ import absolute_import 71 from __future__ import division 72 from __future__ import print_function 73 74 import abc 75 76 import numpy 77 78 from tensorflow.contrib.timeseries.python.timeseries import feature_keys 79 from tensorflow.contrib.timeseries.python.timeseries import model_utils 80 81 from tensorflow.python.estimator import estimator_lib 82 from tensorflow.python.framework import constant_op 83 from tensorflow.python.framework import dtypes 84 from tensorflow.python.framework import ops 85 from tensorflow.python.framework import tensor_shape 86 from tensorflow.python.ops import array_ops 87 from tensorflow.python.ops import control_flow_ops 88 from tensorflow.python.ops import io_ops 89 from tensorflow.python.ops import math_ops 90 from tensorflow.python.ops import nn 91 from tensorflow.python.ops import parsing_ops 92 from tensorflow.python.ops import random_ops 93 from tensorflow.python.ops import state_ops 94 from tensorflow.python.ops import tensor_array_ops 95 from tensorflow.python.ops import variable_scope 96 from tensorflow.python.training import input as input_lib 97 from tensorflow.python.training import training 98 from tensorflow.python.util import nest 99 100 101 def predict_continuation_input_fn( 102 evaluation, steps=None, times=None, exogenous_features=None): 103 """An Estimator input_fn for running predict() after evaluate(). 104 105 If the call to evaluate() we are making predictions based on had a batch_size 106 greater than one, predictions will start after each of these windows 107 (i.e. will have the same batch dimension). 108 109 Args: 110 evaluation: The dictionary returned by `Estimator.evaluate`, with keys 111 FilteringResults.STATE_TUPLE and FilteringResults.TIMES. 112 steps: The number of steps to predict (scalar), starting after the 113 evaluation. If `times` is specified, `steps` must not be; one is required. 114 times: A [batch_size x window_size] array of integers (not a Tensor) 115 indicating times to make predictions for. These times must be after the 116 corresponding evaluation. If `steps` is specified, `times` must not be; 117 one is required. If the batch dimension is omitted, it is assumed to be 1. 118 exogenous_features: Optional dictionary. If specified, indicates exogenous 119 features for the model to use while making the predictions. Values must 120 have shape [batch_size x window_size x ...], where `batch_size` matches 121 the batch dimension used when creating `evaluation`, and `window_size` is 122 either the `steps` argument or the `window_size` of the `times` argument 123 (depending on which was specified). 124 Returns: 125 An `input_fn` suitable for passing to the `predict` function of a time 126 series `Estimator`. 127 Raises: 128 ValueError: If `times` or `steps` are misspecified. 129 """ 130 if exogenous_features is None: 131 exogenous_features = {} 132 predict_times = model_utils.canonicalize_times_or_steps_from_output( 133 times=times, steps=steps, previous_model_output=evaluation) 134 features = { 135 feature_keys.PredictionFeatures.STATE_TUPLE: 136 evaluation[feature_keys.FilteringResults.STATE_TUPLE], 137 feature_keys.PredictionFeatures.TIMES: 138 predict_times 139 } 140 features.update(exogenous_features) 141 def _predict_input_fn(): 142 """An input_fn for predict().""" 143 # Prevents infinite iteration with a constant output in an Estimator's 144 # predict(). 145 limited_features = {} 146 for key, values in features.items(): 147 limited_values = nest.map_structure( 148 lambda value: training.limit_epochs(value, num_epochs=1), values) 149 limited_features[key] = limited_values 150 return (limited_features, None) 151 return _predict_input_fn 152 153 154 class TimeSeriesReader(object): 155 """Reads from and parses a data source for a `TimeSeriesInputFn`. 156 157 This class provides methods that read a few records (`read`) or the full data 158 set at once (`read_full`), and returns them as dictionaries mapping feature 159 names to feature Tensors. Please see note at the top of the file for the 160 structure of these dictionaries. The output is generally chunked by a 161 `TimeSeriesInputFn` before being passed to the model. 162 """ 163 164 def check_dataset_size(self, minimum_dataset_size): 165 """When possible, raises an error if the dataset is too small. 166 167 This method allows TimeSeriesReaders to raise informative error messages if 168 the user has selected a window size in their TimeSeriesInputFn which is 169 larger than the dataset size. However, many TimeSeriesReaders will not have 170 access to a dataset size, in which case they do not need to override this 171 method. 172 173 Args: 174 minimum_dataset_size: The minimum number of records which should be 175 contained in the dataset. Readers should attempt to raise an error when 176 possible if an epoch of data contains fewer records. 177 """ 178 pass 179 180 @abc.abstractmethod 181 def read(self): 182 """Parses one or more records into a feature dictionary. 183 184 This method is expected to be called by a `TimeSeriesInputFn` object, and is 185 not for use with models directly. 186 187 A `TimeSeriesReader` object reads multiple records at a single time for 188 efficiency; the size of these batches is an implementation detail internal 189 to the input pipeline. These records should generally be sequential, 190 although some out-of-order records due to file wraparounds are expected and 191 must be handled by callers. 192 193 Returns: 194 A dictionary mapping feature names to `Tensor` values, each with an 195 arbitrary batch dimension (for efficiency) as their first dimension. 196 """ 197 pass 198 199 @abc.abstractmethod 200 def read_full(self): 201 """Return the full dataset. 202 203 Largely for interactive use/plotting (or evaluation on small 204 datasets). Generally not very efficient. Not recommended for training. 205 206 Returns: 207 Same return type as `read`, but with the full dataset rather than an 208 arbitrary chunk of it. A dictionary mapping feature names to `Tensor` 209 values, where the size of the first dimension of each `Tensor` is the 210 number of samples in the entire dataset. These `Tensor`s should be 211 constant across graph invocations, assuming that the underlying data 212 remains constant. Current implementations re-read data on each graph 213 invocation, although this may change in the future. 214 """ 215 pass 216 217 218 class NumpyReader(TimeSeriesReader): 219 """A time series parser for feeding Numpy arrays to a `TimeSeriesInputFn`. 220 221 Avoids embedding data in the graph as constants. 222 """ 223 224 def __init__(self, data, read_num_records_hint=4096): 225 """Numpy array input for a `TimeSeriesInputFn`. 226 227 Args: 228 data: A dictionary mapping feature names to Numpy arrays, with two 229 possible shapes (requires keys `TrainEvalFeatures.TIMES` and 230 `TrainEvalFeatures.VALUES`): 231 Univariate; `TIMES` and `VALUES` are both vectors of shape [series 232 length] 233 Multivariate; `TIMES` is a vector of shape [series length], `VALUES` 234 has shape [series length x number of features]. 235 In any case, `VALUES` and any exogenous features must have their shapes 236 prefixed by the shape of the value corresponding to the `TIMES` key. 237 read_num_records_hint: The maximum number of samples to read at one time, 238 for efficiency. 239 """ 240 self._features = _canonicalize_numpy_data( 241 data, require_single_batch=True) 242 self._read_num_records_hint = read_num_records_hint 243 244 def check_dataset_size(self, minimum_dataset_size): 245 """Raise an error if the dataset is too small.""" 246 dataset_size = self._features[feature_keys.TrainEvalFeatures.TIMES].shape[1] 247 if dataset_size < minimum_dataset_size: 248 raise ValueError( 249 ("A TimeSeriesInputFn is configured to create windows of size {}, " 250 "but only {} records were available in the dataset. Either decrease " 251 "the window size or provide more records.").format( 252 minimum_dataset_size, dataset_size)) 253 254 def read(self): 255 """Returns a large chunk of the Numpy arrays for later re-chunking.""" 256 # Remove the batch dimension from all features 257 features = {key: numpy.squeeze(value, axis=0) 258 for key, value in self._features.items()} 259 return estimator_lib.inputs.numpy_input_fn( 260 x=features, 261 # The first dimensions of features are the series length, since we have 262 # removed the batch dimension above. We now pull out 263 # self._read_num_records_hint steps of this single time series to pass 264 # to the TimeSeriesInputFn. 265 batch_size=self._read_num_records_hint, 266 num_epochs=None, 267 shuffle=False)() 268 269 def read_full(self): 270 """Returns `Tensor` versions of the full Numpy arrays.""" 271 features = estimator_lib.inputs.numpy_input_fn( 272 x=self._features, 273 batch_size=1, 274 num_epochs=None, 275 queue_capacity=2, # Each queue element is a full copy of the dataset 276 shuffle=False)() 277 # TimeSeriesInputFn expect just a batch dimension 278 return {feature_name: array_ops.squeeze(feature_value, axis=0) 279 for feature_name, feature_value in features.items()} 280 281 282 class ReaderBaseTimeSeriesParser(TimeSeriesReader): 283 """Base for time series readers which wrap a `tf.ReaderBase`.""" 284 285 def __init__(self, filenames, read_num_records_hint=4096): 286 """Configure the time series reader. 287 288 Args: 289 filenames: A string or list of strings indicating files to read records 290 from. 291 read_num_records_hint: When not reading a full dataset, indicates the 292 number of records to transfer in a single chunk (for efficiency). The 293 actual number transferred at one time may vary. 294 """ 295 self._filenames = filenames 296 self._read_num_records_hint = read_num_records_hint 297 298 @abc.abstractmethod 299 def _get_reader(self): 300 """Get an instance of the tf.ReaderBase associated with this class.""" 301 pass 302 303 @abc.abstractmethod 304 def _process_records(self, lines): 305 """Given string items, return a processed dictionary of Tensors. 306 307 Args: 308 lines: A 1-dimensional string Tensor, each representing a record to parse 309 (source dependent, e.g. a line of a file, or a serialized protocol 310 buffer). 311 312 Returns: 313 A dictionary mapping feature names to their values. The batch dimensions 314 should match the length of `lines`. 315 """ 316 pass 317 318 def _get_filename_queue(self, epoch_limit): 319 """Constructs a filename queue with an epoch limit. 320 321 `epoch_limit` is intended as an error checking fallback to prevent a reader 322 from infinitely looping in its requests for more work items if none are 323 available in any file. It should be set high enough that it is never reached 324 assuming at least one record exists in some file. 325 326 Args: 327 epoch_limit: The maximum number of times to read through the complete list 328 of files before throwing an OutOfRangeError. 329 Returns: 330 A tuple of (filename_queue, epoch_limiter): 331 filename_queue: A FIFOQueue with filename work items. 332 epoch_limiter: The local variable used for epoch limitation. This should 333 be set to zero before a reader is passed `filename_queue` in order to 334 reset the epoch limiter's state. 335 """ 336 epoch_limiter = variable_scope.variable( 337 initial_value=constant_op.constant(0, dtype=dtypes.int64), 338 name="epoch_limiter", 339 trainable=False, 340 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 341 filenames_tensor = array_ops.reshape( 342 ops.convert_to_tensor(self._filenames), [-1]) 343 # We can't rely on epoch_limiter being initialized, since queue runners are 344 # started before local variables are initialized. Instead, we ignore epoch 345 # limits before variable initialization. This means that prior to variable 346 # initialization, a QueueRunner may cause a reader to enter an un-checked 347 # infinite loop. However, as soon as local variables are initialized, we 348 # will start incrementing and checking epoch_limiter, which will interrupt 349 # any in-progress loops. 350 conditional_count_up_to = control_flow_ops.cond( 351 state_ops.is_variable_initialized(epoch_limiter), 352 lambda: epoch_limiter.count_up_to(epoch_limit), 353 lambda: constant_op.constant(0, dtype=dtypes.int64)) 354 with ops.control_dependencies([conditional_count_up_to]): 355 filenames_tensor = array_ops.identity(filenames_tensor) 356 filename_queue = input_lib.string_input_producer( 357 filenames_tensor, shuffle=False, capacity=1) 358 return filename_queue, epoch_limiter 359 360 def read(self): 361 """Reads a chunk of data from the `tf.ReaderBase` for later re-chunking.""" 362 # Assuming there is at least one item to be read among all of the files in 363 # self._filenames, we will not need to go through more than 364 # self._read_num_records_hint epochs to get a batch of 365 # self._read_num_records_hint records. Setting this limit and resetting it 366 # before each reader.read_up_to call prevents infinite looping when there 367 # are no records available in any of the files. 368 filename_queue, epoch_limiter = self._get_filename_queue( 369 epoch_limit=self._read_num_records_hint) 370 reader = self._get_reader() 371 epoch_reset_op = state_ops.assign(epoch_limiter, 0) 372 with ops.control_dependencies([epoch_reset_op]): 373 _, records = reader.read_up_to( 374 filename_queue, self._read_num_records_hint) 375 return self._process_records(records) 376 377 def read_full(self): 378 """Reads a full epoch of data into memory.""" 379 reader = self._get_reader() 380 # Set a hard limit of 2 epochs through self._filenames. If there are any 381 # records available, we should only end up reading the first record in the 382 # second epoch before exiting the while loop and subsequently resetting the 383 # epoch limit. If there are no records available in any of the files, this 384 # hard limit prevents the reader.read_up_to call from looping infinitely. 385 filename_queue, epoch_limiter = self._get_filename_queue(epoch_limit=2) 386 epoch_reset_op = state_ops.assign(epoch_limiter, 0) 387 with ops.control_dependencies([epoch_reset_op]): 388 first_key, first_value = reader.read_up_to(filename_queue, 1) 389 # Read until we get a duplicate key (one epoch) 390 def _while_condition( 391 current_key, current_value, current_index, collected_records): 392 del current_value, current_index, collected_records # unused 393 return math_ops.not_equal(array_ops.squeeze(current_key, axis=0), 394 array_ops.squeeze(first_key, axis=0)) 395 396 def _while_body( 397 current_key, current_value, current_index, collected_records): 398 del current_key # unused 399 new_key, new_value = reader.read_up_to(filename_queue, 1) 400 new_key.set_shape([1]) 401 new_value.set_shape([1]) 402 return (new_key, 403 new_value, 404 current_index + 1, 405 collected_records.write(current_index, current_value)) 406 _, _, _, records_ta = control_flow_ops.while_loop( 407 _while_condition, 408 _while_body, 409 [constant_op.constant([""]), first_value, 410 0, # current_index starting value 411 tensor_array_ops.TensorArray( # collected_records 412 dtype=dtypes.string, size=0, dynamic_size=True)]) 413 records = records_ta.concat() 414 # Reset the reader when we're done so that subsequent requests for data get 415 # the dataset in the proper order. 416 with ops.control_dependencies([records]): 417 reader_reset_op = reader.reset() 418 with ops.control_dependencies([reader_reset_op]): 419 records = array_ops.identity(records) 420 return self._process_records(records) 421 422 423 class CSVReader(ReaderBaseTimeSeriesParser): 424 """Reads from a collection of CSV-formatted files.""" 425 426 def __init__(self, 427 filenames, 428 column_names=(feature_keys.TrainEvalFeatures.TIMES, 429 feature_keys.TrainEvalFeatures.VALUES), 430 column_dtypes=None, 431 skip_header_lines=None, 432 read_num_records_hint=4096): 433 """CSV-parsing reader for a `TimeSeriesInputFn`. 434 435 Args: 436 filenames: A filename or list of filenames to read the time series 437 from. Each line must have columns corresponding to `column_names`. 438 column_names: A list indicating names for each 439 feature. `TrainEvalFeatures.TIMES` and `TrainEvalFeatures.VALUES` are 440 required; `VALUES` may be repeated to indicate a multivariate series. 441 column_dtypes: If provided, must be a list with the same length as 442 `column_names`, indicating dtypes for each column. Defaults to 443 `tf.int64` for `TrainEvalFeatures.TIMES` and `tf.float32` for 444 everything else. 445 skip_header_lines: Passed on to `tf.TextLineReader`; skips this number of 446 lines at the beginning of each file. 447 read_num_records_hint: When not reading a full dataset, indicates the 448 number of records to parse/transfer in a single chunk (for 449 efficiency). The actual number transferred at one time may be more or 450 less. 451 Raises: 452 ValueError: If required column names are not specified, or if lengths do 453 not match. 454 """ 455 if feature_keys.TrainEvalFeatures.TIMES not in column_names: 456 raise ValueError("'{}' is a required column.".format( 457 feature_keys.TrainEvalFeatures.TIMES)) 458 if feature_keys.TrainEvalFeatures.VALUES not in column_names: 459 raise ValueError("'{}' is a required column.".format( 460 feature_keys.TrainEvalFeatures.VALUES)) 461 if column_dtypes is not None and len(column_dtypes) != len(column_names): 462 raise ValueError( 463 ("If specified, the length of column_dtypes must match the length of " 464 "column_names (got column_dtypes={} and column_names={}).").format( 465 column_dtypes, column_names)) 466 if sum(1 for column_name in column_names 467 if column_name == feature_keys.TrainEvalFeatures.TIMES) != 1: 468 raise ValueError( 469 "Got more than one times column ('{}'), but exactly " 470 "one is required.".format(feature_keys.TrainEvalFeatures.TIMES)) 471 self._column_names = column_names 472 self._column_dtypes = column_dtypes 473 self._skip_header_lines = skip_header_lines 474 super(CSVReader, self).__init__( 475 filenames=filenames, read_num_records_hint=read_num_records_hint) 476 477 def _get_reader(self): 478 return io_ops.TextLineReader(skip_header_lines=self._skip_header_lines) 479 480 def _process_records(self, lines): 481 """Parse `lines` as CSV records.""" 482 if self._column_dtypes is None: 483 default_values = [(array_ops.zeros([], dtypes.int64),) 484 if column_name == feature_keys.TrainEvalFeatures.TIMES 485 else () for column_name in self._column_names] 486 else: 487 default_values = [(array_ops.zeros([], dtype),) 488 for dtype in self._column_dtypes] 489 columns = parsing_ops.decode_csv(lines, default_values) 490 features_lists = {} 491 for column_name, value in zip(self._column_names, columns): 492 features_lists.setdefault(column_name, []).append(value) 493 features = {} 494 for column_name, values in features_lists.items(): 495 if (len(values) == 1 and 496 column_name != feature_keys.TrainEvalFeatures.VALUES): 497 features[column_name] = values[0] 498 else: 499 features[column_name] = array_ops.stack(values, axis=1) 500 return features 501 502 503 class TFExampleReader(ReaderBaseTimeSeriesParser): 504 """Reads and parses `tf.Example`s from a TFRecords file.""" 505 506 def __init__(self, 507 filenames, 508 features): 509 """Configure `tf.Example` parsing. 510 511 Args: 512 filenames: A filename or list of filenames to read the time series 513 from. Each line must have columns corresponding to `column_names`. 514 features: A dictionary mapping from feature keys to `tf.FixedLenFeature` 515 objects. Must include `TrainEvalFeatures.TIMES` (scalar integer) and 516 `TrainEvalFeatures.VALUES` (floating point vector) features. 517 Raises: 518 ValueError: If required times/values features are not present. 519 """ 520 if feature_keys.TrainEvalFeatures.TIMES not in features: 521 raise ValueError("'{}' is a required column.".format( 522 feature_keys.TrainEvalFeatures.TIMES)) 523 if feature_keys.TrainEvalFeatures.VALUES not in features: 524 raise ValueError("'{}' is a required column.".format( 525 feature_keys.TrainEvalFeatures.VALUES)) 526 self._features = features 527 super(TFExampleReader, self).__init__(filenames=filenames) 528 529 def _get_reader(self): 530 return io_ops.TFRecordReader() 531 532 def _process_records(self, examples): 533 """Parse `tf.Example`s into `Tensors`.""" 534 return parsing_ops.parse_example( 535 serialized=examples, features=self._features) 536 537 538 class TimeSeriesInputFn(object): 539 """Base for classes which create batches of windows from a time series.""" 540 541 @abc.abstractmethod 542 def create_batch(self): 543 """Creates chunked Tensors from times, values, and other features. 544 545 Suitable for use as the input_fn argument of a tf.estimator.Estimator's 546 fit() or evaluate() method. 547 548 Returns: 549 A tuple of (features, targets): 550 features: A dictionary with `TrainEvalFeatures.TIMES` and 551 `TrainEvalFeatures.VALUES` as keys, `TIMES` having an associated value 552 with shape [batch size x window length], `VALUES` with shape [batch 553 size x window length x number of features]. Any other features will 554 also have shapes prefixed with [batch size x window length]. 555 targets: Not used, but must have a value for compatibility with the 556 Estimator API. That value should be None. 557 """ 558 pass 559 560 def __call__(self): 561 # Allow a TimeSeriesInputFn to be used as an input function directly 562 return self.create_batch() 563 564 565 class WholeDatasetInputFn(TimeSeriesInputFn): 566 """Supports passing a full time series to a model for evaluation/inference. 567 568 Note that this `TimeSeriesInputFn` is not designed for high throughput, and 569 should not be used for training. It allows for sequential evaluation on a full 570 dataset (with sequential in-sample predictions), which then feeds naturally 571 into `predict_continuation_input_fn` for making out-of-sample 572 predictions. While this is useful for plotting and interactive use, 573 `RandomWindowInputFn` is better suited to training and quantitative 574 evaluation. 575 """ 576 # TODO(allenl): A SequentialWindowInputFn for getting model end state without 577 # loading the whole dataset into memory (or for quantitative evaluation of 578 # sequential models). Note that an Estimator using such a TimeSeriesInputFn 579 # won't return in-sample predictions for the whole dataset, which means it 580 # won't be terribly useful for interactive use/plotting (unless the user 581 # passes in concat metrics). Also need to be careful about state saving for 582 # sequential models, particularly the gaps between chunks. 583 584 def __init__(self, time_series_reader): 585 """Initialize the `TimeSeriesInputFn`. 586 587 Args: 588 time_series_reader: A TimeSeriesReader object. 589 """ 590 self._reader = time_series_reader 591 super(WholeDatasetInputFn, self).__init__() 592 593 def create_batch(self): 594 """A suitable `input_fn` for an `Estimator`'s `evaluate()`. 595 596 Returns: 597 A dictionary mapping feature names to `Tensors`, each shape 598 prefixed by [1, data set size] (i.e. a batch size of 1). 599 """ 600 features = self._reader.read_full() 601 # Add a batch dimension of one to each feature. 602 return ({feature_name: feature_value[None, ...] 603 for feature_name, feature_value in features.items()}, 604 None) 605 606 607 class RandomWindowInputFn(TimeSeriesInputFn): 608 """Wraps a `TimeSeriesReader` to create random batches of windows. 609 610 Tensors are first collected into sequential windows (in a windowing queue 611 created by `tf.train.batch`, based on the order returned from 612 `time_series_reader`), then these windows are randomly batched (in a 613 `RandomShuffleQueue`), the Tensors returned by `create_batch` having shapes 614 prefixed by [`batch_size`, `window_size`]. 615 616 This `TimeSeriesInputFn` is useful for both training and quantitative 617 evaluation (but be sure to run several epochs for sequential models such as 618 `StructuralEnsembleRegressor` to completely flush stale state left over from 619 training). For qualitative evaluation or when preparing for predictions, use 620 `WholeDatasetInputFn`. 621 """ 622 623 def __init__( 624 self, time_series_reader, window_size, batch_size, 625 queue_capacity_multiplier=1000, shuffle_min_after_dequeue_multiplier=2, 626 discard_out_of_order=True, discard_consecutive_batches_limit=1000, 627 jitter=True, num_threads=2, shuffle_seed=None): 628 """Configure the RandomWindowInputFn. 629 630 Args: 631 time_series_reader: A TimeSeriesReader object. 632 window_size: The number of examples to keep together sequentially. This 633 controls the length of truncated backpropagation: smaller values mean 634 less sequential computation, which can lead to faster training, but 635 create a coarser approximation to the gradient (which would ideally be 636 computed by a forward pass over the entire sequence in order). 637 batch_size: The number of windows to place together in a batch. Larger 638 values will lead to more stable gradients during training. 639 queue_capacity_multiplier: The capacity for the queues used to create 640 batches, specified as a multiple of `batch_size` (for 641 RandomShuffleQueue) and `batch_size * window_size` (for the 642 FIFOQueue). Controls the maximum number of windows stored. Should be 643 greater than `shuffle_min_after_dequeue_multiplier`. 644 shuffle_min_after_dequeue_multiplier: The minimum number of windows in the 645 RandomShuffleQueue after a dequeue, which controls the amount of entropy 646 introduced during batching. Specified as a multiple of `batch_size`. 647 discard_out_of_order: If True, windows of data which have times which 648 decrease (a higher time followed by a lower time) are discarded. If 649 False, the window and associated features are instead sorted so that 650 times are non-decreasing. Discarding is typically faster, as models do 651 not have to deal with artificial gaps in the data. However, discarding 652 does create a bias where the beginnings and endings of files are 653 under-sampled. 654 discard_consecutive_batches_limit: Raise an OutOfRangeError if more than 655 this number of batches are discarded without a single non-discarded 656 window (prevents infinite looping when the dataset is too small). 657 jitter: If True, randomly discards examples between some windows in order 658 to avoid deterministic chunking patterns. This is important for models 659 like AR which may otherwise overfit a fixed chunking. 660 num_threads: Use this number of threads for queues. Setting a value of 1 661 removes one source of non-determinism (and in combination with 662 shuffle_seed should provide deterministic windowing). 663 shuffle_seed: A seed for window shuffling. The default value of None 664 provides random behavior. With `shuffle_seed` set and 665 `num_threads=1`, provides deterministic behavior. 666 """ 667 self._reader = time_series_reader 668 self._window_size = window_size 669 self._reader.check_dataset_size(minimum_dataset_size=self._window_size) 670 self._batch_size = batch_size 671 self._queue_capacity_multiplier = queue_capacity_multiplier 672 self._shuffle_min_after_dequeue_multiplier = ( 673 shuffle_min_after_dequeue_multiplier) 674 self._discard_out_of_order = discard_out_of_order 675 self._discard_limit = discard_consecutive_batches_limit 676 self._jitter = jitter 677 if num_threads is None: 678 self._num_threads = self._batch_size 679 else: 680 self._num_threads = num_threads 681 self._shuffle_seed = shuffle_seed 682 super(RandomWindowInputFn, self).__init__() 683 684 def create_batch(self): 685 """Create queues to window and batch time series data. 686 687 Returns: 688 A dictionary of Tensors corresponding to the output of `self._reader` 689 (from the `time_series_reader` constructor argument), each with shapes 690 prefixed by [`batch_size`, `window_size`]. 691 """ 692 features = self._reader.read() 693 if self._jitter: 694 # TODO(agarwal, allenl): Figure out if more jitter is needed here. 695 jitter = random_ops.random_uniform(shape=[], maxval=2, dtype=dtypes.int32) 696 else: 697 jitter = 0 698 # To keep things efficient, we pass from the windowing batcher to the 699 # batch-of-windows batcher in batches. This avoids the need for huge numbers 700 # of threads, but does mean that jitter is only applied occasionally. 701 # TODO(allenl): Experiment with different internal passing sizes. 702 internal_passing_size = self._batch_size 703 features_windowed = input_lib.batch( 704 features, 705 batch_size=self._window_size * internal_passing_size + jitter, 706 enqueue_many=True, 707 capacity=(self._queue_capacity_multiplier 708 * internal_passing_size * self._window_size), 709 num_threads=self._num_threads) 710 raw_features_windowed = features_windowed 711 if self._jitter: 712 features_windowed = { 713 key: value[jitter:] 714 for key, value in features_windowed.items()} 715 features_windowed = { 716 key: array_ops.reshape( 717 value, 718 array_ops.concat( 719 [[internal_passing_size, self._window_size], 720 array_ops.shape(value)[1:]], 721 axis=0)) 722 for key, value in features_windowed.items()} 723 batch_and_window_shape = tensor_shape.TensorShape( 724 [internal_passing_size, self._window_size]) 725 for key in features_windowed.keys(): 726 features_windowed[key].set_shape( 727 batch_and_window_shape.concatenate( 728 raw_features_windowed[key].get_shape()[1:])) 729 # When switching files, we may end up with windows where the time is not 730 # decreasing, even if times within each file are sorted (and even if those 731 # files are visited in order, when looping back around to the beginning of 732 # the first file). This is hard for models to deal with, so we either 733 # discard such examples, creating a bias where the beginning and end of the 734 # series is under-sampled, or we sort the window, creating large gaps. 735 times = features_windowed[feature_keys.TrainEvalFeatures.TIMES] 736 if self._discard_out_of_order: 737 non_decreasing = math_ops.reduce_all( 738 times[:, 1:] >= times[:, :-1], axis=1) 739 # Ensure that no more than self._discard_limit complete batches are 740 # discarded contiguously (resetting the count when we find a single clean 741 # window). This prevents infinite looping when the dataset is smaller than 742 # the window size. 743 # TODO(allenl): Figure out a way to return informative errors from 744 # count_up_to. 745 discarded_windows_limiter = variable_scope.variable( 746 initial_value=constant_op.constant(0, dtype=dtypes.int64), 747 name="discarded_windows_limiter", 748 trainable=False, 749 collections=[ops.GraphKeys.LOCAL_VARIABLES]) 750 def _initialized_limit_check(): 751 return control_flow_ops.cond( 752 math_ops.reduce_any(non_decreasing), 753 lambda: state_ops.assign(discarded_windows_limiter, 0), 754 lambda: discarded_windows_limiter.count_up_to(self._discard_limit)) 755 discard_limit_op = control_flow_ops.cond( 756 state_ops.is_variable_initialized(discarded_windows_limiter), 757 _initialized_limit_check, 758 lambda: constant_op.constant(0, dtype=dtypes.int64)) 759 with ops.control_dependencies([discard_limit_op]): 760 non_decreasing = array_ops.identity(non_decreasing) 761 else: 762 _, indices_descending = nn.top_k( 763 times, k=array_ops.shape(times)[-1], sorted=True) 764 indices = array_ops.reverse(indices_descending, axis=[0]) 765 features_windowed = { 766 key: array_ops.gather(params=value, indices=indices) 767 for key, value in features_windowed.items() 768 } 769 non_decreasing = True 770 features_batched = input_lib.maybe_shuffle_batch( 771 features_windowed, 772 num_threads=self._num_threads, 773 seed=self._shuffle_seed, 774 batch_size=self._batch_size, 775 capacity=self._queue_capacity_multiplier * self._batch_size, 776 min_after_dequeue=(self._shuffle_min_after_dequeue_multiplier * 777 self._batch_size), 778 keep_input=non_decreasing, 779 enqueue_many=True) 780 return (features_batched, None) 781 782 783 def _canonicalize_numpy_data(data, require_single_batch): 784 """Do basic checking and reshaping for Numpy data. 785 786 Args: 787 data: A dictionary mapping keys to Numpy arrays, with several possible 788 shapes (requires keys `TrainEvalFeatures.TIMES` and 789 `TrainEvalFeatures.VALUES`): 790 Single example; `TIMES` is a scalar and `VALUES` is either a scalar or a 791 vector of length [number of features]. 792 Sequence; `TIMES` is a vector of shape [series length], `VALUES` either 793 has shape [series length] (univariate) or [series length x number of 794 features] (multivariate). 795 Batch of sequences; `TIMES` is a vector of shape [batch size x series 796 length], `VALUES` has shape [batch size x series length] or [batch 797 size x series length x number of features]. 798 In any case, `VALUES` and any exogenous features must have their shapes 799 prefixed by the shape of the value corresponding to the `TIMES` key. 800 require_single_batch: If True, raises an error if the provided data has a 801 batch dimension > 1. 802 Returns: 803 A dictionary with features normalized to have shapes prefixed with [batch 804 size x series length]. The sizes of dimensions which were omitted in the 805 inputs are 1. 806 Raises: 807 ValueError: If dimensions are incorrect or do not match, or required 808 features are missing. 809 """ 810 features = {key: numpy.array(value) for key, value in data.items()} 811 if (feature_keys.TrainEvalFeatures.TIMES not in features or 812 feature_keys.TrainEvalFeatures.VALUES not in features): 813 raise ValueError("{} and {} are required features.".format( 814 feature_keys.TrainEvalFeatures.TIMES, 815 feature_keys.TrainEvalFeatures.VALUES)) 816 times = features[feature_keys.TrainEvalFeatures.TIMES] 817 for key, value in features.items(): 818 if value.shape[:len(times.shape)] != times.shape: 819 raise ValueError( 820 ("All features must have their shapes prefixed by the shape of the" 821 " times feature. Got shape {} for feature '{}', but shape {} for" 822 " '{}'").format(value.shape, key, times.shape, 823 feature_keys.TrainEvalFeatures.TIMES)) 824 if not times.shape: # a single example 825 if not features[feature_keys.TrainEvalFeatures.VALUES].shape: # univariate 826 # Add a feature dimension (with one feature) 827 features[feature_keys.TrainEvalFeatures.VALUES] = features[ 828 feature_keys.TrainEvalFeatures.VALUES][..., None] 829 elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 1: 830 raise ValueError( 831 ("Got an unexpected number of dimensions for the '{}' feature." 832 " Was expecting at most 1 dimension" 833 " ([number of features]) since '{}' does not " 834 "have a batch or time dimension, but got shape {}").format( 835 feature_keys.TrainEvalFeatures.VALUES, 836 feature_keys.TrainEvalFeatures.TIMES, 837 features[feature_keys.TrainEvalFeatures.VALUES].shape)) 838 # Add trivial batch and time dimensions for every feature 839 features = {key: value[None, None, ...] for key, value in features.items()} 840 if len(times.shape) == 1: # shape [series length] 841 if len(features[feature_keys.TrainEvalFeatures.VALUES] 842 .shape) == 1: # shape [series length] 843 # Add a feature dimension (with one feature) 844 features[feature_keys.TrainEvalFeatures.VALUES] = features[ 845 feature_keys.TrainEvalFeatures.VALUES][..., None] 846 elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 2: 847 raise ValueError( 848 ("Got an unexpected number of dimensions for the '{}' feature." 849 " Was expecting at most 2 dimensions" 850 " ([series length, number of features]) since '{}' does not " 851 "have a batch dimension, but got shape {}").format( 852 feature_keys.TrainEvalFeatures.VALUES, 853 feature_keys.TrainEvalFeatures.TIMES, 854 features[feature_keys.TrainEvalFeatures.VALUES].shape)) 855 # Add trivial batch dimensions for every feature 856 features = {key: value[None, ...] for key, value in features.items()} 857 elif len(features[feature_keys.TrainEvalFeatures.TIMES] 858 .shape) != 2: # shape [batch size, series length] 859 raise ValueError( 860 ("Got an unexpected number of dimensions for times. Was expecting at " 861 "most two ([batch size, series length]), but got shape {}.").format( 862 times.shape)) 863 if require_single_batch: 864 # We don't expect input to be already batched; batching is done later 865 if features[feature_keys.TrainEvalFeatures.TIMES].shape[0] != 1: 866 raise ValueError("Got batch input, was expecting unbatched input.") 867 return features 868