Home | History | Annotate | Download | only in Tensor
      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog (at) gmail.com>
      5 //
      6 // This Source Code Form is subject to the terms of the Mozilla
      7 // Public License v. 2.0. If a copy of the MPL was not distributed
      8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      9 
     10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H)
     11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
     12 
     13 namespace Eigen {
     14 
     15 // Use the SimpleThreadPool by default. We'll switch to the new non blocking
     16 // thread pool later.
     17 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL
     18 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>;
     19 typedef NonBlockingThreadPool ThreadPool;
     20 #else
     21 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>;
     22 typedef SimpleThreadPool ThreadPool;
     23 #endif
     24 
     25 
     26 // Barrier is an object that allows one or more threads to wait until
     27 // Notify has been called a specified number of times.
     28 class Barrier {
     29  public:
     30   Barrier(unsigned int count) : state_(count << 1), notified_(false) {
     31     eigen_assert(((count << 1) >> 1) == count);
     32   }
     33   ~Barrier() {
     34     eigen_assert((state_>>1) == 0);
     35   }
     36 
     37   void Notify() {
     38     unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2;
     39     if (v != 1) {
     40       eigen_assert(((v + 2) & ~1) != 0);
     41       return;  // either count has not dropped to 0, or waiter is not waiting
     42     }
     43     std::unique_lock<std::mutex> l(mu_);
     44     eigen_assert(!notified_);
     45     notified_ = true;
     46     cv_.notify_all();
     47   }
     48 
     49   void Wait() {
     50     unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel);
     51     if ((v >> 1) == 0) return;
     52     std::unique_lock<std::mutex> l(mu_);
     53     while (!notified_) {
     54       cv_.wait(l);
     55     }
     56   }
     57 
     58  private:
     59   std::mutex mu_;
     60   std::condition_variable cv_;
     61   std::atomic<unsigned int> state_;  // low bit is waiter flag
     62   bool notified_;
     63 };
     64 
     65 
     66 // Notification is an object that allows a user to to wait for another
     67 // thread to signal a notification that an event has occurred.
     68 //
     69 // Multiple threads can wait on the same Notification object,
     70 // but only one caller must call Notify() on the object.
     71 struct Notification : Barrier {
     72   Notification() : Barrier(1) {};
     73 };
     74 
     75 
     76 // Runs an arbitrary function and then calls Notify() on the passed in
     77 // Notification.
     78 template <typename Function, typename... Args> struct FunctionWrapperWithNotification
     79 {
     80   static void run(Notification* n, Function f, Args... args) {
     81     f(args...);
     82     if (n) {
     83       n->Notify();
     84     }
     85   }
     86 };
     87 
     88 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier
     89 {
     90   static void run(Barrier* b, Function f, Args... args) {
     91     f(args...);
     92     if (b) {
     93       b->Notify();
     94     }
     95   }
     96 };
     97 
     98 template <typename SyncType>
     99 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) {
    100   if (n) {
    101     n->Wait();
    102   }
    103 }
    104 
    105 
    106 // Build a thread pool device on top the an existing pool of threads.
    107 struct ThreadPoolDevice {
    108   // The ownership of the thread pool remains with the caller.
    109   ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { }
    110 
    111   EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const {
    112     return internal::aligned_malloc(num_bytes);
    113   }
    114 
    115   EIGEN_STRONG_INLINE void deallocate(void* buffer) const {
    116     internal::aligned_free(buffer);
    117   }
    118 
    119   EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const {
    120     ::memcpy(dst, src, n);
    121   }
    122   EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const {
    123     memcpy(dst, src, n);
    124   }
    125   EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const {
    126     memcpy(dst, src, n);
    127   }
    128 
    129   EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const {
    130     ::memset(buffer, c, n);
    131   }
    132 
    133   EIGEN_STRONG_INLINE int numThreads() const {
    134     return num_threads_;
    135   }
    136 
    137   EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const {
    138     return l1CacheSize();
    139   }
    140 
    141   EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const {
    142     // The l3 cache size is shared between all the cores.
    143     return l3CacheSize() / num_threads_;
    144   }
    145 
    146   EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const {
    147     // Should return an enum that encodes the ISA supported by the CPU
    148     return 1;
    149   }
    150 
    151   template <class Function, class... Args>
    152   EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const {
    153     Notification* n = new Notification();
    154     pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...));
    155     return n;
    156   }
    157 
    158   template <class Function, class... Args>
    159   EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b,
    160                                                 Function&& f,
    161                                                 Args&&... args) const {
    162     pool_->Schedule(std::bind(
    163         &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...));
    164   }
    165 
    166   template <class Function, class... Args>
    167   EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const {
    168     pool_->Schedule(std::bind(f, args...));
    169   }
    170 
    171   // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if
    172   // called from one of the threads in pool_. Returns -1 otherwise.
    173   EIGEN_STRONG_INLINE int currentThreadId() const {
    174     return pool_->CurrentThreadId();
    175   }
    176 
    177   // parallelFor executes f with [0, n) arguments in parallel and waits for
    178   // completion. F accepts a half-open interval [first, last).
    179   // Block size is choosen based on the iteration cost and resulting parallel
    180   // efficiency. If block_align is not nullptr, it is called to round up the
    181   // block size.
    182   void parallelFor(Index n, const TensorOpCost& cost,
    183                    std::function<Index(Index)> block_align,
    184                    std::function<void(Index, Index)> f) const {
    185     typedef TensorCostModel<ThreadPoolDevice> CostModel;
    186     if (n <= 1 || numThreads() == 1 ||
    187         CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) {
    188       f(0, n);
    189       return;
    190     }
    191 
    192     // Calculate block size based on (1) the iteration cost and (2) parallel
    193     // efficiency. We want blocks to be not too small to mitigate
    194     // parallelization overheads; not too large to mitigate tail
    195     // effect and potential load imbalance and we also want number
    196     // of blocks to be evenly dividable across threads.
    197 
    198     double block_size_f = 1.0 / CostModel::taskSize(1, cost);
    199     Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f));
    200     const Index max_block_size =
    201         numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f));
    202     if (block_align) {
    203       Index new_block_size = block_align(block_size);
    204       eigen_assert(new_block_size >= block_size);
    205       block_size = numext::mini(n, new_block_size);
    206     }
    207     Index block_count = divup(n, block_size);
    208     // Calculate parallel efficiency as fraction of total CPU time used for
    209     // computations:
    210     double max_efficiency =
    211         static_cast<double>(block_count) /
    212         (divup<int>(block_count, numThreads()) * numThreads());
    213     // Now try to increase block size up to max_block_size as long as it
    214     // doesn't decrease parallel efficiency.
    215     for (Index prev_block_count = block_count; prev_block_count > 1;) {
    216       // This is the next block size that divides size into a smaller number
    217       // of blocks than the current block_size.
    218       Index coarser_block_size = divup(n, prev_block_count - 1);
    219       if (block_align) {
    220         Index new_block_size = block_align(coarser_block_size);
    221         eigen_assert(new_block_size >= coarser_block_size);
    222         coarser_block_size = numext::mini(n, new_block_size);
    223       }
    224       if (coarser_block_size > max_block_size) {
    225         break;  // Reached max block size. Stop.
    226       }
    227       // Recalculate parallel efficiency.
    228       const Index coarser_block_count = divup(n, coarser_block_size);
    229       eigen_assert(coarser_block_count < prev_block_count);
    230       prev_block_count = coarser_block_count;
    231       const double coarser_efficiency =
    232           static_cast<double>(coarser_block_count) /
    233           (divup<int>(coarser_block_count, numThreads()) * numThreads());
    234       if (coarser_efficiency + 0.01 >= max_efficiency) {
    235         // Taking it.
    236         block_size = coarser_block_size;
    237         block_count = coarser_block_count;
    238         if (max_efficiency < coarser_efficiency) {
    239           max_efficiency = coarser_efficiency;
    240         }
    241       }
    242     }
    243 
    244     // Recursively divide size into halves until we reach block_size.
    245     // Division code rounds mid to block_size, so we are guaranteed to get
    246     // block_count leaves that do actual computations.
    247     Barrier barrier(static_cast<unsigned int>(block_count));
    248     std::function<void(Index, Index)> handleRange;
    249     handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) {
    250       if (last - first <= block_size) {
    251         // Single block or less, execute directly.
    252         f(first, last);
    253         barrier.Notify();
    254         return;
    255       }
    256       // Split into halves and submit to the pool.
    257       Index mid = first + divup((last - first) / 2, block_size) * block_size;
    258       pool_->Schedule([=, &handleRange]() { handleRange(mid, last); });
    259       pool_->Schedule([=, &handleRange]() { handleRange(first, mid); });
    260     };
    261     handleRange(0, n);
    262     barrier.Wait();
    263   }
    264 
    265   // Convenience wrapper for parallelFor that does not align blocks.
    266   void parallelFor(Index n, const TensorOpCost& cost,
    267                    std::function<void(Index, Index)> f) const {
    268     parallelFor(n, cost, nullptr, std::move(f));
    269   }
    270 
    271  private:
    272   ThreadPoolInterface* pool_;
    273   int num_threads_;
    274 };
    275 
    276 
    277 }  // end namespace Eigen
    278 
    279 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H
    280