Home | History | Annotate | Download | only in gpu
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #ifndef TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
     17 #define TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
     18 
     19 #include <deque>
     20 #include <vector>
     21 #include "tensorflow/core/framework/log_memory.h"
     22 #include "tensorflow/core/framework/tensor.h"
     23 #include "tensorflow/core/framework/tensor_reference.h"
     24 #include "tensorflow/core/lib/core/notification.h"
     25 #include "tensorflow/core/lib/core/threadpool.h"
     26 #include "tensorflow/core/lib/gtl/inlined_vector.h"
     27 #include "tensorflow/core/platform/mutex.h"
     28 #include "tensorflow/core/platform/stream_executor.h"
     29 #include "tensorflow/core/platform/thread_annotations.h"
     30 #include "tensorflow/core/platform/types.h"
     31 
     32 namespace perftools {
     33 namespace gputools {
     34 class Event;
     35 class Stream;
     36 class StreamExecutor;
     37 }  // namespace gputools
     38 }  // namespace perftools
     39 
     40 namespace tensorflow {
     41 
     42 class GPUOptions;
     43 
     44 // An object to keep track of pending Events in the StreamExecutor streams
     45 // and associated Tensors that cannot safely be deleted until the associated
     46 // Events are recorded.
     47 class EventMgr {
     48  public:
     49   EventMgr(perftools::gputools::StreamExecutor* se,
     50            const GPUOptions& gpu_options);
     51 
     52   ~EventMgr();
     53 
     54   // Releases the references on the elements of "tensors" as soon as
     55   // all events currently enqueued on "stream" have completed.
     56   void ThenDeleteTensors(perftools::gputools::Stream* stream,
     57                          const TensorReferenceVector& tensors);
     58 
     59   struct BufRec {
     60     Allocator* alloc;
     61     void* buf;
     62     // operation and step_id are only populated when
     63     // LogMemory::IsEnabled() is true.
     64     string operation;
     65     int64 step_id;
     66   };
     67 
     68   // Takes ownership of *bufrec.buf and calls bufrec.alloc->DeallocateRaw()
     69   // on it as soon as all events currently enqueued on *stream have completed.
     70   inline void ThenDeleteBuffer(perftools::gputools::Stream* stream,
     71                                BufRec bufrec) {
     72     ToFreeVector to_free;
     73     {
     74       mutex_lock l(mu_);
     75       QueueBuffer(stream, bufrec);
     76       PollEvents(false, &to_free);
     77     }
     78     FreeMemory(to_free);
     79   }
     80 
     81   inline void ThenExecute(perftools::gputools::Stream* stream,
     82                           std::function<void()> func) {
     83     ToFreeVector to_free;
     84     {
     85       mutex_lock l(mu_);
     86       QueueFunc(stream, std::move(func));
     87       PollEvents(false, &to_free);
     88     }
     89     FreeMemory(to_free);
     90   }
     91 
     92  private:
     93   friend class TEST_EventMgrHelper;
     94   perftools::gputools::StreamExecutor* const exec_;
     95   const int64 deferred_bytes_threshold_;
     96   const int32 polling_active_delay_usecs_;
     97   const int32 polling_inactive_delay_msecs_;
     98   mutex mu_;
     99   condition_variable events_pending_ GUARDED_BY(mu_);
    100 
    101   void FlushAccumulatedTensors() EXCLUSIVE_LOCKS_REQUIRED(mu_);
    102 
    103   struct InUse {
    104     perftools::gputools::Event* event;
    105     TensorReferenceVector* mem;
    106     BufRec bufrec;
    107     std::function<void()> func;
    108   };
    109 
    110   typedef gtl::InlinedVector<InUse, 4> ToFreeVector;
    111 
    112   void FreeMemory(const ToFreeVector& to_free) {
    113     for (const auto& iu : to_free) {
    114       if (iu.mem != nullptr) {
    115         for (auto& t : *(iu.mem)) {
    116           t.Unref();
    117         }
    118         delete iu.mem;
    119       }
    120       if (iu.bufrec.buf) {
    121         if (LogMemory::IsEnabled()) {
    122           LogMemory::RecordRawDeallocation(iu.bufrec.operation,
    123                                            iu.bufrec.step_id, iu.bufrec.buf,
    124                                            iu.bufrec.alloc, false);
    125         }
    126         iu.bufrec.alloc->DeallocateRaw(iu.bufrec.buf);
    127       }
    128       // The function must be called in another thread.
    129       if (iu.func != nullptr) threadpool_.Schedule(iu.func);
    130     }
    131   }
    132 
    133   // Stream-enqueue an unused Event and save with it a collection of
    134   // Tensors and/or a BufRec to be deleted only after the Event
    135   // records.
    136   void QueueInUse(perftools::gputools::Stream* stream, InUse in_use)
    137       EXCLUSIVE_LOCKS_REQUIRED(mu_);
    138 
    139   void QueueTensors(perftools::gputools::Stream* stream,
    140                     TensorReferenceVector* tensors)
    141       EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    142     QueueInUse(stream, {nullptr, tensors, BufRec(), nullptr});
    143   }
    144 
    145   void QueueBuffer(perftools::gputools::Stream* stream, BufRec bufrec)
    146       EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    147     QueueInUse(stream, {nullptr, nullptr, bufrec, nullptr});
    148   }
    149 
    150   void QueueFunc(perftools::gputools::Stream* stream,
    151                  std::function<void()> func) EXCLUSIVE_LOCKS_REQUIRED(mu_) {
    152     QueueInUse(stream, {nullptr, nullptr, BufRec(), std::move(func)});
    153   }
    154 
    155   // This function should be called at roughly the same tempo as
    156   // QueueTensors() to check whether pending events have recorded,
    157   // and then retire them.  It appends InUse elements that need cleanup
    158   // to "*to_free".  The caller should call FreeMemory(to_free)
    159   // when this returns.
    160   void PollEvents(bool is_dedicated_poller, ToFreeVector* to_free)
    161       EXCLUSIVE_LOCKS_REQUIRED(mu_);
    162 
    163   // An internal polling loop that runs at a low frequency to clear
    164   // straggler Events.
    165   void PollLoop();
    166 
    167   // Setup/Teardown functions for the polling loop.
    168   void StartPollingLoop();
    169   void StopPollingLoop();
    170 
    171   // A stack of unused events
    172   std::vector<perftools::gputools::Event*> free_events_ GUARDED_BY(mu_);
    173 
    174   // Buffered list of tensors waiting to have an event queued for deletion
    175   perftools::gputools::Stream* accumulated_stream_ GUARDED_BY(mu_);
    176   TensorReferenceVector* accumulated_tensors_ GUARDED_BY(mu_);
    177   // Sum of the TotalBytes() of the tensors in "accumulated_tensors_"
    178   int64 accumulated_tensor_bytes_ GUARDED_BY(mu_);
    179 
    180   // A FIFO queue of InUse events and associated tensors.
    181   std::deque<InUse> used_events_ GUARDED_BY(mu_);
    182 
    183   std::unique_ptr<Notification> stop_polling_;
    184   std::unique_ptr<Notification> polling_stopped_;
    185 
    186   // The main PollLoop for the event manager runs in this threadpool.
    187   thread::ThreadPool threadpool_;
    188 };
    189 
    190 }  // namespace tensorflow
    191 #endif  // TENSORFLOW_COMMON_RUNTIME_GPU_GPU_EVENT_MGR_H_
    192