Home | History | Annotate | Download | only in data
      1 # Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 """Implements a simple prefetch_queue."""
     16 
     17 from __future__ import absolute_import
     18 from __future__ import division
     19 from __future__ import print_function
     20 
     21 from tensorflow.python.framework import ops
     22 from tensorflow.python.ops import data_flow_ops
     23 from tensorflow.python.ops import math_ops
     24 from tensorflow.python.summary import summary
     25 from tensorflow.python.training import queue_runner
     26 
     27 
     28 def _which_queue(dynamic_pad):
     29   return (data_flow_ops.PaddingFIFOQueue if dynamic_pad
     30           else data_flow_ops.FIFOQueue)
     31 
     32 
     33 def prefetch_queue(tensors,
     34                    capacity=8,
     35                    num_threads=1,
     36                    dynamic_pad=False,
     37                    shared_name=None,
     38                    name=None):
     39   """Creates a queue to prefetech tensors from `tensors`.
     40 
     41   A queue runner for enqueing tensors into the prefetch_queue is automatically
     42   added to the TF QueueRunners collection.
     43 
     44   Example:
     45   This is for example useful to pre-assemble input batches read with
     46   `tf.train.batch()` and enqueue the pre-assembled batches.  Ops that dequeue
     47   from the pre-assembled queue will not pay the cost of assembling the batch.
     48 
     49   images, labels = tf.train.batch([image, label], batch_size=32, num_threads=4)
     50   batch_queue = prefetch_queue([images, labels])
     51   images, labels = batch_queue.dequeue()
     52   logits = Net(images)
     53   loss = Loss(logits, labels)
     54 
     55   Args:
     56     tensors: A list or dictionary of `Tensors` to enqueue in the buffer.
     57     capacity: An integer. The maximum number of elements in the queue.
     58     num_threads: An integer.  Number of threads running the enqueue op.
     59     dynamic_pad: Boolean.  Whether to allow variable dimensions in input shapes.
     60     shared_name: (optional). If set, this queue will be shared under the given
     61       name across multiple sessions.
     62     name: (Optional) A name for the operations.
     63 
     64   Returns:
     65     A queue from which you can dequeue tensors with the same type and shape
     66     as `tensors`.
     67   """
     68   if isinstance(tensors, dict):
     69     # Need to wrap the keys and values in list() since Python3 returns views.
     70     # We sort the keys so the order is consistent across runs.
     71     names = list(sorted(tensors.keys()))
     72     tensor_list = list([tensors[n] for n in names])
     73   else:
     74     names = None
     75     tensor_list = tensors
     76 
     77   with ops.name_scope(name, "prefetch_queue", tensor_list) as name:
     78     dtypes = [t.dtype for t in tensor_list]
     79     shapes = [t.get_shape() for t in tensor_list]
     80     queue = _which_queue(dynamic_pad)(
     81         capacity=capacity,
     82         dtypes=dtypes,
     83         shapes=shapes,
     84         names=names,
     85         shared_name=shared_name)
     86     enqueue_op = queue.enqueue(tensors)
     87     queue_runner.add_queue_runner(
     88         queue_runner.QueueRunner(queue, [enqueue_op] * num_threads))
     89     summary.scalar("fraction_of_%d_full" % capacity,
     90                    math_ops.to_float(queue.size()) * (1. / capacity))
     91     return queue
     92