Home | History | Annotate | Download | only in slim
      1 # Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 #
      3 # Licensed under the Apache License, Version 2.0 (the "License");
      4 # you may not use this file except in compliance with the License.
      5 # You may obtain a copy of the License at
      6 #
      7 #     http://www.apache.org/licenses/LICENSE-2.0
      8 #
      9 # Unless required by applicable law or agreed to in writing, software
     10 # distributed under the License is distributed on an "AS IS" BASIS,
     11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 # See the License for the specific language governing permissions and
     13 # limitations under the License.
     14 # ==============================================================================
     15 """Contains a helper context for running queue runners.
     16 
     17 @@NestedQueueRunnerError
     18 @@QueueRunners
     19 """
     20 
     21 from __future__ import absolute_import
     22 from __future__ import division
     23 from __future__ import print_function
     24 
     25 from contextlib import contextmanager
     26 import threading
     27 
     28 from tensorflow.python.framework import ops
     29 from tensorflow.python.training import coordinator
     30 
     31 __all__ = [
     32     'NestedQueueRunnerError',
     33     'QueueRunners',
     34 ]
     35 
     36 _queue_runner_lock = threading.Lock()
     37 
     38 
     39 class NestedQueueRunnerError(Exception):
     40   pass
     41 
     42 
     43 @contextmanager
     44 def QueueRunners(session):
     45   """Creates a context manager that handles starting and stopping queue runners.
     46 
     47   Args:
     48     session: the currently running session.
     49 
     50   Yields:
     51     a context in which queues are run.
     52 
     53   Raises:
     54     NestedQueueRunnerError: if a QueueRunners context is nested within another.
     55   """
     56   if not _queue_runner_lock.acquire(False):
     57     raise NestedQueueRunnerError('QueueRunners cannot be nested')
     58 
     59   coord = coordinator.Coordinator()
     60   threads = []
     61   for qr in ops.get_collection(ops.GraphKeys.QUEUE_RUNNERS):
     62     threads.extend(
     63         qr.create_threads(
     64             session, coord=coord, daemon=True, start=True))
     65   try:
     66     yield
     67   finally:
     68     coord.request_stop()
     69     try:
     70       coord.join(threads, stop_grace_period_secs=120)
     71     except RuntimeError:
     72       session.close()
     73 
     74     _queue_runner_lock.release()
     75