Home | History | Annotate | Download | only in timeseries
      1 # Copyright 2017 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 """Defines ways of splicing and re-arranging time series.
     16 
     17 This file provides methods for reading, parsing, and re-arranging a time
     18 series. The main departure from standard TensorFlow input pipelines is a focus
     19 on "chunking" a time series, i.e. slicing it into small contiguous windows which
     20 are then batched together for training, a form of truncated
     21 backpropagation. This typically provides a significant speedup compared to
     22 looping over the whole series sequentially, by exploiting data parallelism and
     23 by reducing redundant contributions to gradients (due to redundant information
     24 in the series itself).
     25 
     26 A series, consisting of times (an increasing vector of integers) and values (one
     27 or more floating point values for each time) along with any exogenous features,
     28 is stored either in memory or on disk in various formats (e.g. "one record per
     29 timestep" on disk, or as a dictionary of Numpy arrays in memory). The location
     30 and format is specified by configuring a `TimeSeriesReader` object
     31 (e.g. `NumpyReader`, `CSVReader`), which reads the data into the TensorFlow
     32 graph. A `TimeSeriesInputFn` object (typically `RandomWindowInputFn`) then
     33 performs windowing and batching.
     34 
     35 Time series are passed through this pipeline as dictionaries mapping feature
     36 names to their values. For training and evaluation, these require at minimum
     37 `TrainEvalFeatures.TIMES` (scalar integers, one per timestep) and
     38 `TrainEvalFeatures.VALUES` (may be either univariate or multivariate). Exogenous
     39 features may have any shape, but are likewise associated with a timestep. Times
     40 themselves need not be contiguous or regular (although smaller/fewer gaps are
     41 generally better), but each timestep must have all `VALUES` and any exogenous
     42 features (i.e. times may be missing, but given that a time is specified, every
     43 other feature must also be specified for that step; some models may support
     44 making exogenous updates conditional).
     45 
     46 The expected use case of a `TimeSeriesInputFn` is that it is first configured
     47 (for example setting a batch or window size) and passed a reader (a
     48 `TimeSeriesReader` object). The `TimeSeriesInputFn` can then be passed as the
     49 input_fn of an Estimator.
     50 
     51 For example, `RandomWindowInputFn` is useful for creating batches of random
     52 chunks of a series for training:
     53 
     54 ```
     55   # Read data in the default "time,value" CSV format with no header
     56   reader = input_pipeline.CSVReader(csv_file_name)
     57   # Set up windowing and batching for training
     58   train_input_fn = input_pipeline.RandomWindowInputFn(
     59       reader, batch_size=16, window_size=16)
     60   # Fit model parameters to data
     61   estimator.train(input_fn=train_input_fn, steps=150)
     62 ```
     63 
     64 `RandomWindowInputFn` is the primary tool for training and quantitative
     65 evaluation of time series. `WholeDatasetInputFn`, which reads a whole series
     66 into memory, is useful for qualitative evaluation and preparing to make
     67 predictions with `predict_continuation_input_fn`.
     68 """
     69 
     70 from __future__ import absolute_import
     71 from __future__ import division
     72 from __future__ import print_function
     73 
     74 import abc
     75 
     76 import numpy
     77 
     78 from tensorflow.contrib.timeseries.python.timeseries import feature_keys
     79 from tensorflow.contrib.timeseries.python.timeseries import model_utils
     80 
     81 from tensorflow.python.estimator import estimator_lib
     82 from tensorflow.python.framework import constant_op
     83 from tensorflow.python.framework import dtypes
     84 from tensorflow.python.framework import ops
     85 from tensorflow.python.framework import tensor_shape
     86 from tensorflow.python.ops import array_ops
     87 from tensorflow.python.ops import control_flow_ops
     88 from tensorflow.python.ops import io_ops
     89 from tensorflow.python.ops import math_ops
     90 from tensorflow.python.ops import nn
     91 from tensorflow.python.ops import parsing_ops
     92 from tensorflow.python.ops import random_ops
     93 from tensorflow.python.ops import state_ops
     94 from tensorflow.python.ops import tensor_array_ops
     95 from tensorflow.python.ops import variable_scope
     96 from tensorflow.python.training import input as input_lib
     97 from tensorflow.python.training import training
     98 from tensorflow.python.util import nest
     99 
    100 
    101 def predict_continuation_input_fn(
    102     evaluation, steps=None, times=None, exogenous_features=None):
    103   """An Estimator input_fn for running predict() after evaluate().
    104 
    105   If the call to evaluate() we are making predictions based on had a batch_size
    106   greater than one, predictions will start after each of these windows
    107   (i.e. will have the same batch dimension).
    108 
    109   Args:
    110     evaluation: The dictionary returned by `Estimator.evaluate`, with keys
    111       FilteringResults.STATE_TUPLE and FilteringResults.TIMES.
    112     steps: The number of steps to predict (scalar), starting after the
    113       evaluation. If `times` is specified, `steps` must not be; one is required.
    114     times: A [batch_size x window_size] array of integers (not a Tensor)
    115       indicating times to make predictions for. These times must be after the
    116       corresponding evaluation. If `steps` is specified, `times` must not be;
    117       one is required. If the batch dimension is omitted, it is assumed to be 1.
    118     exogenous_features: Optional dictionary. If specified, indicates exogenous
    119       features for the model to use while making the predictions. Values must
    120       have shape [batch_size x window_size x ...], where `batch_size` matches
    121       the batch dimension used when creating `evaluation`, and `window_size` is
    122       either the `steps` argument or the `window_size` of the `times` argument
    123       (depending on which was specified).
    124   Returns:
    125     An `input_fn` suitable for passing to the `predict` function of a time
    126     series `Estimator`.
    127   Raises:
    128     ValueError: If `times` or `steps` are misspecified.
    129   """
    130   if exogenous_features is None:
    131     exogenous_features = {}
    132   predict_times = model_utils.canonicalize_times_or_steps_from_output(
    133       times=times, steps=steps, previous_model_output=evaluation)
    134   features = {
    135       feature_keys.PredictionFeatures.STATE_TUPLE:
    136           evaluation[feature_keys.FilteringResults.STATE_TUPLE],
    137       feature_keys.PredictionFeatures.TIMES:
    138           predict_times
    139   }
    140   features.update(exogenous_features)
    141   def _predict_input_fn():
    142     """An input_fn for predict()."""
    143     # Prevents infinite iteration with a constant output in an Estimator's
    144     # predict().
    145     limited_features = {}
    146     for key, values in features.items():
    147       limited_values = nest.map_structure(
    148           lambda value: training.limit_epochs(value, num_epochs=1), values)
    149       limited_features[key] = limited_values
    150     return (limited_features, None)
    151   return _predict_input_fn
    152 
    153 
    154 class TimeSeriesReader(object):
    155   """Reads from and parses a data source for a `TimeSeriesInputFn`.
    156 
    157   This class provides methods that read a few records (`read`) or the full data
    158   set at once (`read_full`), and returns them as dictionaries mapping feature
    159   names to feature Tensors. Please see note at the top of the file for the
    160   structure of these dictionaries. The output is generally chunked by a
    161   `TimeSeriesInputFn` before being passed to the model.
    162   """
    163 
    164   def check_dataset_size(self, minimum_dataset_size):
    165     """When possible, raises an error if the dataset is too small.
    166 
    167     This method allows TimeSeriesReaders to raise informative error messages if
    168     the user has selected a window size in their TimeSeriesInputFn which is
    169     larger than the dataset size. However, many TimeSeriesReaders will not have
    170     access to a dataset size, in which case they do not need to override this
    171     method.
    172 
    173     Args:
    174       minimum_dataset_size: The minimum number of records which should be
    175         contained in the dataset. Readers should attempt to raise an error when
    176         possible if an epoch of data contains fewer records.
    177     """
    178     pass
    179 
    180   @abc.abstractmethod
    181   def read(self):
    182     """Parses one or more records into a feature dictionary.
    183 
    184     This method is expected to be called by a `TimeSeriesInputFn` object, and is
    185     not for use with models directly.
    186 
    187     A `TimeSeriesReader` object reads multiple records at a single time for
    188     efficiency; the size of these batches is an implementation detail internal
    189     to the input pipeline. These records should generally be sequential,
    190     although some out-of-order records due to file wraparounds are expected and
    191     must be handled by callers.
    192 
    193     Returns:
    194       A dictionary mapping feature names to `Tensor` values, each with an
    195       arbitrary batch dimension (for efficiency) as their first dimension.
    196     """
    197     pass
    198 
    199   @abc.abstractmethod
    200   def read_full(self):
    201     """Return the full dataset.
    202 
    203     Largely for interactive use/plotting (or evaluation on small
    204     datasets). Generally not very efficient. Not recommended for training.
    205 
    206     Returns:
    207       Same return type as `read`, but with the full dataset rather than an
    208       arbitrary chunk of it. A dictionary mapping feature names to `Tensor`
    209       values, where the size of the first dimension of each `Tensor` is the
    210       number of samples in the entire dataset. These `Tensor`s should be
    211       constant across graph invocations, assuming that the underlying data
    212       remains constant. Current implementations re-read data on each graph
    213       invocation, although this may change in the future.
    214     """
    215     pass
    216 
    217 
    218 class NumpyReader(TimeSeriesReader):
    219   """A time series parser for feeding Numpy arrays to a `TimeSeriesInputFn`.
    220 
    221   Avoids embedding data in the graph as constants.
    222   """
    223 
    224   def __init__(self, data, read_num_records_hint=4096):
    225     """Numpy array input for a `TimeSeriesInputFn`.
    226 
    227     Args:
    228       data: A dictionary mapping feature names to Numpy arrays, with two
    229         possible shapes (requires keys `TrainEvalFeatures.TIMES` and
    230         `TrainEvalFeatures.VALUES`):
    231           Univariate; `TIMES` and `VALUES` are both vectors of shape [series
    232             length]
    233           Multivariate; `TIMES` is a vector of shape [series length], `VALUES`
    234             has shape [series length x number of features].
    235         In any case, `VALUES` and any exogenous features must have their shapes
    236         prefixed by the shape of the value corresponding to the `TIMES` key.
    237       read_num_records_hint: The maximum number of samples to read at one time,
    238         for efficiency.
    239     """
    240     self._features = _canonicalize_numpy_data(
    241         data, require_single_batch=True)
    242     self._read_num_records_hint = read_num_records_hint
    243 
    244   def check_dataset_size(self, minimum_dataset_size):
    245     """Raise an error if the dataset is too small."""
    246     dataset_size = self._features[feature_keys.TrainEvalFeatures.TIMES].shape[1]
    247     if dataset_size < minimum_dataset_size:
    248       raise ValueError(
    249           ("A TimeSeriesInputFn is configured to create windows of size {}, "
    250            "but only {} records were available in the dataset. Either decrease "
    251            "the window size or provide more records.").format(
    252                minimum_dataset_size, dataset_size))
    253 
    254   def read(self):
    255     """Returns a large chunk of the Numpy arrays for later re-chunking."""
    256     # Remove the batch dimension from all features
    257     features = {key: numpy.squeeze(value, axis=0)
    258                 for key, value in self._features.items()}
    259     return estimator_lib.inputs.numpy_input_fn(
    260         x=features,
    261         # The first dimensions of features are the series length, since we have
    262         # removed the batch dimension above. We now pull out
    263         # self._read_num_records_hint steps of this single time series to pass
    264         # to the TimeSeriesInputFn.
    265         batch_size=self._read_num_records_hint,
    266         num_epochs=None,
    267         shuffle=False)()
    268 
    269   def read_full(self):
    270     """Returns `Tensor` versions of the full Numpy arrays."""
    271     features = estimator_lib.inputs.numpy_input_fn(
    272         x=self._features,
    273         batch_size=1,
    274         num_epochs=None,
    275         queue_capacity=2,  # Each queue element is a full copy of the dataset
    276         shuffle=False)()
    277     # TimeSeriesInputFn expect just a batch dimension
    278     return {feature_name: array_ops.squeeze(feature_value, axis=0)
    279             for feature_name, feature_value in features.items()}
    280 
    281 
    282 class ReaderBaseTimeSeriesParser(TimeSeriesReader):
    283   """Base for time series readers which wrap a `tf.ReaderBase`."""
    284 
    285   def __init__(self, filenames, read_num_records_hint=4096):
    286     """Configure the time series reader.
    287 
    288     Args:
    289       filenames: A string or list of strings indicating files to read records
    290         from.
    291       read_num_records_hint: When not reading a full dataset, indicates the
    292         number of records to transfer in a single chunk (for efficiency). The
    293         actual number transferred at one time may vary.
    294     """
    295     self._filenames = filenames
    296     self._read_num_records_hint = read_num_records_hint
    297 
    298   @abc.abstractmethod
    299   def _get_reader(self):
    300     """Get an instance of the tf.ReaderBase associated with this class."""
    301     pass
    302 
    303   @abc.abstractmethod
    304   def _process_records(self, lines):
    305     """Given string items, return a processed dictionary of Tensors.
    306 
    307     Args:
    308       lines: A 1-dimensional string Tensor, each representing a record to parse
    309         (source dependent, e.g. a line of a file, or a serialized protocol
    310         buffer).
    311 
    312     Returns:
    313       A dictionary mapping feature names to their values. The batch dimensions
    314       should match the length of `lines`.
    315     """
    316     pass
    317 
    318   def _get_filename_queue(self, epoch_limit):
    319     """Constructs a filename queue with an epoch limit.
    320 
    321     `epoch_limit` is intended as an error checking fallback to prevent a reader
    322     from infinitely looping in its requests for more work items if none are
    323     available in any file. It should be set high enough that it is never reached
    324     assuming at least one record exists in some file.
    325 
    326     Args:
    327       epoch_limit: The maximum number of times to read through the complete list
    328         of files before throwing an OutOfRangeError.
    329     Returns:
    330       A tuple of (filename_queue, epoch_limiter):
    331         filename_queue: A FIFOQueue with filename work items.
    332         epoch_limiter: The local variable used for epoch limitation. This should
    333           be set to zero before a reader is passed `filename_queue` in order to
    334           reset the epoch limiter's state.
    335     """
    336     epoch_limiter = variable_scope.variable(
    337         initial_value=constant_op.constant(0, dtype=dtypes.int64),
    338         name="epoch_limiter",
    339         trainable=False,
    340         collections=[ops.GraphKeys.LOCAL_VARIABLES])
    341     filenames_tensor = array_ops.reshape(
    342         ops.convert_to_tensor(self._filenames), [-1])
    343     # We can't rely on epoch_limiter being initialized, since queue runners are
    344     # started before local variables are initialized. Instead, we ignore epoch
    345     # limits before variable initialization. This means that prior to variable
    346     # initialization, a QueueRunner may cause a reader to enter an un-checked
    347     # infinite loop. However, as soon as local variables are initialized, we
    348     # will start incrementing and checking epoch_limiter, which will interrupt
    349     # any in-progress loops.
    350     conditional_count_up_to = control_flow_ops.cond(
    351         state_ops.is_variable_initialized(epoch_limiter),
    352         lambda: epoch_limiter.count_up_to(epoch_limit),
    353         lambda: constant_op.constant(0, dtype=dtypes.int64))
    354     with ops.control_dependencies([conditional_count_up_to]):
    355       filenames_tensor = array_ops.identity(filenames_tensor)
    356     filename_queue = input_lib.string_input_producer(
    357         filenames_tensor, shuffle=False, capacity=1)
    358     return filename_queue, epoch_limiter
    359 
    360   def read(self):
    361     """Reads a chunk of data from the `tf.ReaderBase` for later re-chunking."""
    362     # Assuming there is at least one item to be read among all of the files in
    363     # self._filenames, we will not need to go through more than
    364     # self._read_num_records_hint epochs to get a batch of
    365     # self._read_num_records_hint records. Setting this limit and resetting it
    366     # before each reader.read_up_to call prevents infinite looping when there
    367     # are no records available in any of the files.
    368     filename_queue, epoch_limiter = self._get_filename_queue(
    369         epoch_limit=self._read_num_records_hint)
    370     reader = self._get_reader()
    371     epoch_reset_op = state_ops.assign(epoch_limiter, 0)
    372     with ops.control_dependencies([epoch_reset_op]):
    373       _, records = reader.read_up_to(
    374           filename_queue, self._read_num_records_hint)
    375     return self._process_records(records)
    376 
    377   def read_full(self):
    378     """Reads a full epoch of data into memory."""
    379     reader = self._get_reader()
    380     # Set a hard limit of 2 epochs through self._filenames. If there are any
    381     # records available, we should only end up reading the first record in the
    382     # second epoch before exiting the while loop and subsequently resetting the
    383     # epoch limit. If there are no records available in any of the files, this
    384     # hard limit prevents the reader.read_up_to call from looping infinitely.
    385     filename_queue, epoch_limiter = self._get_filename_queue(epoch_limit=2)
    386     epoch_reset_op = state_ops.assign(epoch_limiter, 0)
    387     with ops.control_dependencies([epoch_reset_op]):
    388       first_key, first_value = reader.read_up_to(filename_queue, 1)
    389     # Read until we get a duplicate key (one epoch)
    390     def _while_condition(
    391         current_key, current_value, current_index, collected_records):
    392       del current_value, current_index, collected_records  # unused
    393       return math_ops.not_equal(array_ops.squeeze(current_key, axis=0),
    394                                 array_ops.squeeze(first_key, axis=0))
    395 
    396     def _while_body(
    397         current_key, current_value, current_index, collected_records):
    398       del current_key  # unused
    399       new_key, new_value = reader.read_up_to(filename_queue, 1)
    400       new_key.set_shape([1])
    401       new_value.set_shape([1])
    402       return (new_key,
    403               new_value,
    404               current_index + 1,
    405               collected_records.write(current_index, current_value))
    406     _, _, _, records_ta = control_flow_ops.while_loop(
    407         _while_condition,
    408         _while_body,
    409         [constant_op.constant([""]), first_value,
    410          0,  # current_index starting value
    411          tensor_array_ops.TensorArray(  # collected_records
    412              dtype=dtypes.string, size=0, dynamic_size=True)])
    413     records = records_ta.concat()
    414     # Reset the reader when we're done so that subsequent requests for data get
    415     # the dataset in the proper order.
    416     with ops.control_dependencies([records]):
    417       reader_reset_op = reader.reset()
    418     with ops.control_dependencies([reader_reset_op]):
    419       records = array_ops.identity(records)
    420     return self._process_records(records)
    421 
    422 
    423 class CSVReader(ReaderBaseTimeSeriesParser):
    424   """Reads from a collection of CSV-formatted files."""
    425 
    426   def __init__(self,
    427                filenames,
    428                column_names=(feature_keys.TrainEvalFeatures.TIMES,
    429                              feature_keys.TrainEvalFeatures.VALUES),
    430                column_dtypes=None,
    431                skip_header_lines=None,
    432                read_num_records_hint=4096):
    433     """CSV-parsing reader for a `TimeSeriesInputFn`.
    434 
    435     Args:
    436       filenames: A filename or list of filenames to read the time series
    437           from. Each line must have columns corresponding to `column_names`.
    438       column_names: A list indicating names for each
    439           feature. `TrainEvalFeatures.TIMES` and `TrainEvalFeatures.VALUES` are
    440           required; `VALUES` may be repeated to indicate a multivariate series.
    441       column_dtypes: If provided, must be a list with the same length as
    442           `column_names`, indicating dtypes for each column. Defaults to
    443           `tf.int64` for `TrainEvalFeatures.TIMES` and `tf.float32` for
    444           everything else.
    445       skip_header_lines: Passed on to `tf.TextLineReader`; skips this number of
    446           lines at the beginning of each file.
    447       read_num_records_hint: When not reading a full dataset, indicates the
    448           number of records to parse/transfer in a single chunk (for
    449           efficiency). The actual number transferred at one time may be more or
    450           less.
    451     Raises:
    452       ValueError: If required column names are not specified, or if lengths do
    453         not match.
    454     """
    455     if feature_keys.TrainEvalFeatures.TIMES not in column_names:
    456       raise ValueError("'{}' is a required column.".format(
    457           feature_keys.TrainEvalFeatures.TIMES))
    458     if feature_keys.TrainEvalFeatures.VALUES not in column_names:
    459       raise ValueError("'{}' is a required column.".format(
    460           feature_keys.TrainEvalFeatures.VALUES))
    461     if column_dtypes is not None and len(column_dtypes) != len(column_names):
    462       raise ValueError(
    463           ("If specified, the length of column_dtypes must match the length of "
    464            "column_names (got column_dtypes={} and column_names={}).").format(
    465                column_dtypes, column_names))
    466     if sum(1 for column_name in column_names
    467            if column_name == feature_keys.TrainEvalFeatures.TIMES) != 1:
    468       raise ValueError(
    469           "Got more than one times column ('{}'), but exactly "
    470           "one is required.".format(feature_keys.TrainEvalFeatures.TIMES))
    471     self._column_names = column_names
    472     self._column_dtypes = column_dtypes
    473     self._skip_header_lines = skip_header_lines
    474     super(CSVReader, self).__init__(
    475         filenames=filenames, read_num_records_hint=read_num_records_hint)
    476 
    477   def _get_reader(self):
    478     return io_ops.TextLineReader(skip_header_lines=self._skip_header_lines)
    479 
    480   def _process_records(self, lines):
    481     """Parse `lines` as CSV records."""
    482     if self._column_dtypes is None:
    483       default_values = [(array_ops.zeros([], dtypes.int64),)
    484                         if column_name == feature_keys.TrainEvalFeatures.TIMES
    485                         else () for column_name in self._column_names]
    486     else:
    487       default_values = [(array_ops.zeros([], dtype),)
    488                         for dtype in self._column_dtypes]
    489     columns = parsing_ops.decode_csv(lines, default_values)
    490     features_lists = {}
    491     for column_name, value in zip(self._column_names, columns):
    492       features_lists.setdefault(column_name, []).append(value)
    493     features = {}
    494     for column_name, values in features_lists.items():
    495       if (len(values) == 1 and
    496           column_name != feature_keys.TrainEvalFeatures.VALUES):
    497         features[column_name] = values[0]
    498       else:
    499         features[column_name] = array_ops.stack(values, axis=1)
    500     return features
    501 
    502 
    503 class TFExampleReader(ReaderBaseTimeSeriesParser):
    504   """Reads and parses `tf.Example`s from a TFRecords file."""
    505 
    506   def __init__(self,
    507                filenames,
    508                features):
    509     """Configure `tf.Example` parsing.
    510 
    511     Args:
    512       filenames: A filename or list of filenames to read the time series
    513           from. Each line must have columns corresponding to `column_names`.
    514       features: A dictionary mapping from feature keys to `tf.FixedLenFeature`
    515           objects. Must include `TrainEvalFeatures.TIMES` (scalar integer) and
    516           `TrainEvalFeatures.VALUES` (floating point vector) features.
    517     Raises:
    518       ValueError: If required times/values features are not present.
    519     """
    520     if feature_keys.TrainEvalFeatures.TIMES not in features:
    521       raise ValueError("'{}' is a required column.".format(
    522           feature_keys.TrainEvalFeatures.TIMES))
    523     if feature_keys.TrainEvalFeatures.VALUES not in features:
    524       raise ValueError("'{}' is a required column.".format(
    525           feature_keys.TrainEvalFeatures.VALUES))
    526     self._features = features
    527     super(TFExampleReader, self).__init__(filenames=filenames)
    528 
    529   def _get_reader(self):
    530     return io_ops.TFRecordReader()
    531 
    532   def _process_records(self, examples):
    533     """Parse `tf.Example`s into `Tensors`."""
    534     return parsing_ops.parse_example(
    535         serialized=examples, features=self._features)
    536 
    537 
    538 class TimeSeriesInputFn(object):
    539   """Base for classes which create batches of windows from a time series."""
    540 
    541   @abc.abstractmethod
    542   def create_batch(self):
    543     """Creates chunked Tensors from times, values, and other features.
    544 
    545     Suitable for use as the input_fn argument of a tf.estimator.Estimator's
    546     fit() or evaluate() method.
    547 
    548     Returns:
    549       A tuple of (features, targets):
    550         features: A dictionary with `TrainEvalFeatures.TIMES` and
    551           `TrainEvalFeatures.VALUES` as keys, `TIMES` having an associated value
    552           with shape [batch size x window length], `VALUES` with shape [batch
    553           size x window length x number of features]. Any other features will
    554           also have shapes prefixed with [batch size x window length].
    555         targets: Not used, but must have a value for compatibility with the
    556           Estimator API. That value should be None.
    557     """
    558     pass
    559 
    560   def __call__(self):
    561     # Allow a TimeSeriesInputFn to be used as an input function directly
    562     return self.create_batch()
    563 
    564 
    565 class WholeDatasetInputFn(TimeSeriesInputFn):
    566   """Supports passing a full time series to a model for evaluation/inference.
    567 
    568   Note that this `TimeSeriesInputFn` is not designed for high throughput, and
    569   should not be used for training. It allows for sequential evaluation on a full
    570   dataset (with sequential in-sample predictions), which then feeds naturally
    571   into `predict_continuation_input_fn` for making out-of-sample
    572   predictions. While this is useful for plotting and interactive use,
    573   `RandomWindowInputFn` is better suited to training and quantitative
    574   evaluation.
    575   """
    576   # TODO(allenl): A SequentialWindowInputFn for getting model end state without
    577   # loading the whole dataset into memory (or for quantitative evaluation of
    578   # sequential models). Note that an Estimator using such a TimeSeriesInputFn
    579   # won't return in-sample predictions for the whole dataset, which means it
    580   # won't be terribly useful for interactive use/plotting (unless the user
    581   # passes in concat metrics). Also need to be careful about state saving for
    582   # sequential models, particularly the gaps between chunks.
    583 
    584   def __init__(self, time_series_reader):
    585     """Initialize the `TimeSeriesInputFn`.
    586 
    587     Args:
    588       time_series_reader: A TimeSeriesReader object.
    589     """
    590     self._reader = time_series_reader
    591     super(WholeDatasetInputFn, self).__init__()
    592 
    593   def create_batch(self):
    594     """A suitable `input_fn` for an `Estimator`'s `evaluate()`.
    595 
    596     Returns:
    597       A dictionary mapping feature names to `Tensors`, each shape
    598       prefixed by [1, data set size] (i.e. a batch size of 1).
    599     """
    600     features = self._reader.read_full()
    601     # Add a batch dimension of one to each feature.
    602     return ({feature_name: feature_value[None, ...]
    603              for feature_name, feature_value in features.items()},
    604             None)
    605 
    606 
    607 class RandomWindowInputFn(TimeSeriesInputFn):
    608   """Wraps a `TimeSeriesReader` to create random batches of windows.
    609 
    610   Tensors are first collected into sequential windows (in a windowing queue
    611   created by `tf.train.batch`, based on the order returned from
    612   `time_series_reader`), then these windows are randomly batched (in a
    613   `RandomShuffleQueue`), the Tensors returned by `create_batch` having shapes
    614   prefixed by [`batch_size`, `window_size`].
    615 
    616   This `TimeSeriesInputFn` is useful for both training and quantitative
    617   evaluation (but be sure to run several epochs for sequential models such as
    618   `StructuralEnsembleRegressor` to completely flush stale state left over from
    619   training). For qualitative evaluation or when preparing for predictions, use
    620   `WholeDatasetInputFn`.
    621   """
    622 
    623   def __init__(
    624       self, time_series_reader, window_size, batch_size,
    625       queue_capacity_multiplier=1000, shuffle_min_after_dequeue_multiplier=2,
    626       discard_out_of_order=True, discard_consecutive_batches_limit=1000,
    627       jitter=True, num_threads=2, shuffle_seed=None):
    628     """Configure the RandomWindowInputFn.
    629 
    630     Args:
    631       time_series_reader: A TimeSeriesReader object.
    632       window_size: The number of examples to keep together sequentially. This
    633         controls the length of truncated backpropagation: smaller values mean
    634         less sequential computation, which can lead to faster training, but
    635         create a coarser approximation to the gradient (which would ideally be
    636         computed by a forward pass over the entire sequence in order).
    637       batch_size: The number of windows to place together in a batch. Larger
    638         values will lead to more stable gradients during training.
    639       queue_capacity_multiplier: The capacity for the queues used to create
    640         batches, specified as a multiple of `batch_size` (for
    641         RandomShuffleQueue) and `batch_size * window_size` (for the
    642         FIFOQueue). Controls the maximum number of windows stored. Should be
    643         greater than `shuffle_min_after_dequeue_multiplier`.
    644       shuffle_min_after_dequeue_multiplier: The minimum number of windows in the
    645         RandomShuffleQueue after a dequeue, which controls the amount of entropy
    646         introduced during batching. Specified as a multiple of `batch_size`.
    647       discard_out_of_order: If True, windows of data which have times which
    648         decrease (a higher time followed by a lower time) are discarded. If
    649         False, the window and associated features are instead sorted so that
    650         times are non-decreasing. Discarding is typically faster, as models do
    651         not have to deal with artificial gaps in the data. However, discarding
    652         does create a bias where the beginnings and endings of files are
    653         under-sampled.
    654       discard_consecutive_batches_limit: Raise an OutOfRangeError if more than
    655         this number of batches are discarded without a single non-discarded
    656         window (prevents infinite looping when the dataset is too small).
    657       jitter: If True, randomly discards examples between some windows in order
    658         to avoid deterministic chunking patterns. This is important for models
    659         like AR which may otherwise overfit a fixed chunking.
    660       num_threads: Use this number of threads for queues. Setting a value of 1
    661         removes one source of non-determinism (and in combination with
    662         shuffle_seed should provide deterministic windowing).
    663       shuffle_seed: A seed for window shuffling. The default value of None
    664         provides random behavior. With `shuffle_seed` set and
    665         `num_threads=1`, provides deterministic behavior.
    666     """
    667     self._reader = time_series_reader
    668     self._window_size = window_size
    669     self._reader.check_dataset_size(minimum_dataset_size=self._window_size)
    670     self._batch_size = batch_size
    671     self._queue_capacity_multiplier = queue_capacity_multiplier
    672     self._shuffle_min_after_dequeue_multiplier = (
    673         shuffle_min_after_dequeue_multiplier)
    674     self._discard_out_of_order = discard_out_of_order
    675     self._discard_limit = discard_consecutive_batches_limit
    676     self._jitter = jitter
    677     if num_threads is None:
    678       self._num_threads = self._batch_size
    679     else:
    680       self._num_threads = num_threads
    681     self._shuffle_seed = shuffle_seed
    682     super(RandomWindowInputFn, self).__init__()
    683 
    684   def create_batch(self):
    685     """Create queues to window and batch time series data.
    686 
    687     Returns:
    688       A dictionary of Tensors corresponding to the output of `self._reader`
    689       (from the `time_series_reader` constructor argument), each with shapes
    690       prefixed by [`batch_size`, `window_size`].
    691     """
    692     features = self._reader.read()
    693     if self._jitter:
    694       # TODO(agarwal, allenl): Figure out if more jitter is needed here.
    695       jitter = random_ops.random_uniform(shape=[], maxval=2, dtype=dtypes.int32)
    696     else:
    697       jitter = 0
    698     # To keep things efficient, we pass from the windowing batcher to the
    699     # batch-of-windows batcher in batches. This avoids the need for huge numbers
    700     # of threads, but does mean that jitter is only applied occasionally.
    701     # TODO(allenl): Experiment with different internal passing sizes.
    702     internal_passing_size = self._batch_size
    703     features_windowed = input_lib.batch(
    704         features,
    705         batch_size=self._window_size * internal_passing_size + jitter,
    706         enqueue_many=True,
    707         capacity=(self._queue_capacity_multiplier
    708                   * internal_passing_size * self._window_size),
    709         num_threads=self._num_threads)
    710     raw_features_windowed = features_windowed
    711     if self._jitter:
    712       features_windowed = {
    713           key: value[jitter:]
    714           for key, value in features_windowed.items()}
    715     features_windowed = {
    716         key: array_ops.reshape(
    717             value,
    718             array_ops.concat(
    719                 [[internal_passing_size, self._window_size],
    720                  array_ops.shape(value)[1:]],
    721                 axis=0))
    722         for key, value in features_windowed.items()}
    723     batch_and_window_shape = tensor_shape.TensorShape(
    724         [internal_passing_size, self._window_size])
    725     for key in features_windowed.keys():
    726       features_windowed[key].set_shape(
    727           batch_and_window_shape.concatenate(
    728               raw_features_windowed[key].get_shape()[1:]))
    729     # When switching files, we may end up with windows where the time is not
    730     # decreasing, even if times within each file are sorted (and even if those
    731     # files are visited in order, when looping back around to the beginning of
    732     # the first file). This is hard for models to deal with, so we either
    733     # discard such examples, creating a bias where the beginning and end of the
    734     # series is under-sampled, or we sort the window, creating large gaps.
    735     times = features_windowed[feature_keys.TrainEvalFeatures.TIMES]
    736     if self._discard_out_of_order:
    737       non_decreasing = math_ops.reduce_all(
    738           times[:, 1:] >= times[:, :-1], axis=1)
    739       # Ensure that no more than self._discard_limit complete batches are
    740       # discarded contiguously (resetting the count when we find a single clean
    741       # window). This prevents infinite looping when the dataset is smaller than
    742       # the window size.
    743       # TODO(allenl): Figure out a way to return informative errors from
    744       # count_up_to.
    745       discarded_windows_limiter = variable_scope.variable(
    746           initial_value=constant_op.constant(0, dtype=dtypes.int64),
    747           name="discarded_windows_limiter",
    748           trainable=False,
    749           collections=[ops.GraphKeys.LOCAL_VARIABLES])
    750       def _initialized_limit_check():
    751         return control_flow_ops.cond(
    752             math_ops.reduce_any(non_decreasing),
    753             lambda: state_ops.assign(discarded_windows_limiter, 0),
    754             lambda: discarded_windows_limiter.count_up_to(self._discard_limit))
    755       discard_limit_op = control_flow_ops.cond(
    756           state_ops.is_variable_initialized(discarded_windows_limiter),
    757           _initialized_limit_check,
    758           lambda: constant_op.constant(0, dtype=dtypes.int64))
    759       with ops.control_dependencies([discard_limit_op]):
    760         non_decreasing = array_ops.identity(non_decreasing)
    761     else:
    762       _, indices_descending = nn.top_k(
    763           times, k=array_ops.shape(times)[-1], sorted=True)
    764       indices = array_ops.reverse(indices_descending, axis=[0])
    765       features_windowed = {
    766           key: array_ops.gather(params=value, indices=indices)
    767           for key, value in features_windowed.items()
    768       }
    769       non_decreasing = True
    770     features_batched = input_lib.maybe_shuffle_batch(
    771         features_windowed,
    772         num_threads=self._num_threads,
    773         seed=self._shuffle_seed,
    774         batch_size=self._batch_size,
    775         capacity=self._queue_capacity_multiplier * self._batch_size,
    776         min_after_dequeue=(self._shuffle_min_after_dequeue_multiplier *
    777                            self._batch_size),
    778         keep_input=non_decreasing,
    779         enqueue_many=True)
    780     return (features_batched, None)
    781 
    782 
    783 def _canonicalize_numpy_data(data, require_single_batch):
    784   """Do basic checking and reshaping for Numpy data.
    785 
    786   Args:
    787     data: A dictionary mapping keys to Numpy arrays, with several possible
    788       shapes (requires keys `TrainEvalFeatures.TIMES` and
    789       `TrainEvalFeatures.VALUES`):
    790         Single example; `TIMES` is a scalar and `VALUES` is either a scalar or a
    791           vector of length [number of features].
    792         Sequence; `TIMES` is a vector of shape [series length], `VALUES` either
    793           has shape [series length] (univariate) or [series length x number of
    794           features] (multivariate).
    795         Batch of sequences; `TIMES` is a vector of shape [batch size x series
    796           length], `VALUES` has shape [batch size x series length] or [batch
    797           size x series length x number of features].
    798       In any case, `VALUES` and any exogenous features must have their shapes
    799       prefixed by the shape of the value corresponding to the `TIMES` key.
    800     require_single_batch: If True, raises an error if the provided data has a
    801       batch dimension > 1.
    802   Returns:
    803     A dictionary with features normalized to have shapes prefixed with [batch
    804     size x series length]. The sizes of dimensions which were omitted in the
    805     inputs are 1.
    806   Raises:
    807     ValueError: If dimensions are incorrect or do not match, or required
    808       features are missing.
    809   """
    810   features = {key: numpy.array(value) for key, value in data.items()}
    811   if (feature_keys.TrainEvalFeatures.TIMES not in features or
    812       feature_keys.TrainEvalFeatures.VALUES not in features):
    813     raise ValueError("{} and {} are required features.".format(
    814         feature_keys.TrainEvalFeatures.TIMES,
    815         feature_keys.TrainEvalFeatures.VALUES))
    816   times = features[feature_keys.TrainEvalFeatures.TIMES]
    817   for key, value in features.items():
    818     if value.shape[:len(times.shape)] != times.shape:
    819       raise ValueError(
    820           ("All features must have their shapes prefixed by the shape of the"
    821            " times feature. Got shape {} for feature '{}', but shape {} for"
    822            " '{}'").format(value.shape, key, times.shape,
    823                            feature_keys.TrainEvalFeatures.TIMES))
    824   if not times.shape:  # a single example
    825     if not features[feature_keys.TrainEvalFeatures.VALUES].shape:  # univariate
    826       # Add a feature dimension (with one feature)
    827       features[feature_keys.TrainEvalFeatures.VALUES] = features[
    828           feature_keys.TrainEvalFeatures.VALUES][..., None]
    829     elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 1:
    830       raise ValueError(
    831           ("Got an unexpected number of dimensions for the '{}' feature."
    832            " Was expecting at most 1 dimension"
    833            " ([number of features]) since '{}' does not "
    834            "have a batch or time dimension, but got shape {}").format(
    835                feature_keys.TrainEvalFeatures.VALUES,
    836                feature_keys.TrainEvalFeatures.TIMES,
    837                features[feature_keys.TrainEvalFeatures.VALUES].shape))
    838     # Add trivial batch and time dimensions for every feature
    839     features = {key: value[None, None, ...] for key, value in features.items()}
    840   if len(times.shape) == 1:  # shape [series length]
    841     if len(features[feature_keys.TrainEvalFeatures.VALUES]
    842            .shape) == 1:  # shape [series length]
    843       # Add a feature dimension (with one feature)
    844       features[feature_keys.TrainEvalFeatures.VALUES] = features[
    845           feature_keys.TrainEvalFeatures.VALUES][..., None]
    846     elif len(features[feature_keys.TrainEvalFeatures.VALUES].shape) > 2:
    847       raise ValueError(
    848           ("Got an unexpected number of dimensions for the '{}' feature."
    849            " Was expecting at most 2 dimensions"
    850            " ([series length, number of features]) since '{}' does not "
    851            "have a batch dimension, but got shape {}").format(
    852                feature_keys.TrainEvalFeatures.VALUES,
    853                feature_keys.TrainEvalFeatures.TIMES,
    854                features[feature_keys.TrainEvalFeatures.VALUES].shape))
    855     # Add trivial batch dimensions for every feature
    856     features = {key: value[None, ...] for key, value in features.items()}
    857   elif len(features[feature_keys.TrainEvalFeatures.TIMES]
    858            .shape) != 2:  # shape [batch size, series length]
    859     raise ValueError(
    860         ("Got an unexpected number of dimensions for times. Was expecting at "
    861          "most two ([batch size, series length]), but got shape {}.").format(
    862              times.shape))
    863   if require_single_batch:
    864     # We don't expect input to be already batched; batching is done later
    865     if features[feature_keys.TrainEvalFeatures.TIMES].shape[0] != 1:
    866       raise ValueError("Got batch input, was expecting unbatched input.")
    867   return features
    868