1 // This file is part of Eigen, a lightweight C++ template library 2 // for linear algebra. 3 // 4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog (at) gmail.com> 5 // 6 // This Source Code Form is subject to the terms of the Mozilla 7 // Public License v. 2.0. If a copy of the MPL was not distributed 8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/. 9 10 #if defined(EIGEN_USE_THREADS) && !defined(EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H) 11 #define EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 12 13 namespace Eigen { 14 15 // Use the SimpleThreadPool by default. We'll switch to the new non blocking 16 // thread pool later. 17 #ifndef EIGEN_USE_SIMPLE_THREAD_POOL 18 template <typename Env> using ThreadPoolTempl = NonBlockingThreadPoolTempl<Env>; 19 typedef NonBlockingThreadPool ThreadPool; 20 #else 21 template <typename Env> using ThreadPoolTempl = SimpleThreadPoolTempl<Env>; 22 typedef SimpleThreadPool ThreadPool; 23 #endif 24 25 26 // Barrier is an object that allows one or more threads to wait until 27 // Notify has been called a specified number of times. 28 class Barrier { 29 public: 30 Barrier(unsigned int count) : state_(count << 1), notified_(false) { 31 eigen_assert(((count << 1) >> 1) == count); 32 } 33 ~Barrier() { 34 eigen_assert((state_>>1) == 0); 35 } 36 37 void Notify() { 38 unsigned int v = state_.fetch_sub(2, std::memory_order_acq_rel) - 2; 39 if (v != 1) { 40 eigen_assert(((v + 2) & ~1) != 0); 41 return; // either count has not dropped to 0, or waiter is not waiting 42 } 43 std::unique_lock<std::mutex> l(mu_); 44 eigen_assert(!notified_); 45 notified_ = true; 46 cv_.notify_all(); 47 } 48 49 void Wait() { 50 unsigned int v = state_.fetch_or(1, std::memory_order_acq_rel); 51 if ((v >> 1) == 0) return; 52 std::unique_lock<std::mutex> l(mu_); 53 while (!notified_) { 54 cv_.wait(l); 55 } 56 } 57 58 private: 59 std::mutex mu_; 60 std::condition_variable cv_; 61 std::atomic<unsigned int> state_; // low bit is waiter flag 62 bool notified_; 63 }; 64 65 66 // Notification is an object that allows a user to to wait for another 67 // thread to signal a notification that an event has occurred. 68 // 69 // Multiple threads can wait on the same Notification object, 70 // but only one caller must call Notify() on the object. 71 struct Notification : Barrier { 72 Notification() : Barrier(1) {}; 73 }; 74 75 76 // Runs an arbitrary function and then calls Notify() on the passed in 77 // Notification. 78 template <typename Function, typename... Args> struct FunctionWrapperWithNotification 79 { 80 static void run(Notification* n, Function f, Args... args) { 81 f(args...); 82 if (n) { 83 n->Notify(); 84 } 85 } 86 }; 87 88 template <typename Function, typename... Args> struct FunctionWrapperWithBarrier 89 { 90 static void run(Barrier* b, Function f, Args... args) { 91 f(args...); 92 if (b) { 93 b->Notify(); 94 } 95 } 96 }; 97 98 template <typename SyncType> 99 static EIGEN_STRONG_INLINE void wait_until_ready(SyncType* n) { 100 if (n) { 101 n->Wait(); 102 } 103 } 104 105 106 // Build a thread pool device on top the an existing pool of threads. 107 struct ThreadPoolDevice { 108 // The ownership of the thread pool remains with the caller. 109 ThreadPoolDevice(ThreadPoolInterface* pool, int num_cores) : pool_(pool), num_threads_(num_cores) { } 110 111 EIGEN_STRONG_INLINE void* allocate(size_t num_bytes) const { 112 return internal::aligned_malloc(num_bytes); 113 } 114 115 EIGEN_STRONG_INLINE void deallocate(void* buffer) const { 116 internal::aligned_free(buffer); 117 } 118 119 EIGEN_STRONG_INLINE void memcpy(void* dst, const void* src, size_t n) const { 120 ::memcpy(dst, src, n); 121 } 122 EIGEN_STRONG_INLINE void memcpyHostToDevice(void* dst, const void* src, size_t n) const { 123 memcpy(dst, src, n); 124 } 125 EIGEN_STRONG_INLINE void memcpyDeviceToHost(void* dst, const void* src, size_t n) const { 126 memcpy(dst, src, n); 127 } 128 129 EIGEN_STRONG_INLINE void memset(void* buffer, int c, size_t n) const { 130 ::memset(buffer, c, n); 131 } 132 133 EIGEN_STRONG_INLINE int numThreads() const { 134 return num_threads_; 135 } 136 137 EIGEN_STRONG_INLINE size_t firstLevelCacheSize() const { 138 return l1CacheSize(); 139 } 140 141 EIGEN_STRONG_INLINE size_t lastLevelCacheSize() const { 142 // The l3 cache size is shared between all the cores. 143 return l3CacheSize() / num_threads_; 144 } 145 146 EIGEN_DEVICE_FUNC EIGEN_STRONG_INLINE int majorDeviceVersion() const { 147 // Should return an enum that encodes the ISA supported by the CPU 148 return 1; 149 } 150 151 template <class Function, class... Args> 152 EIGEN_STRONG_INLINE Notification* enqueue(Function&& f, Args&&... args) const { 153 Notification* n = new Notification(); 154 pool_->Schedule(std::bind(&FunctionWrapperWithNotification<Function, Args...>::run, n, f, args...)); 155 return n; 156 } 157 158 template <class Function, class... Args> 159 EIGEN_STRONG_INLINE void enqueue_with_barrier(Barrier* b, 160 Function&& f, 161 Args&&... args) const { 162 pool_->Schedule(std::bind( 163 &FunctionWrapperWithBarrier<Function, Args...>::run, b, f, args...)); 164 } 165 166 template <class Function, class... Args> 167 EIGEN_STRONG_INLINE void enqueueNoNotification(Function&& f, Args&&... args) const { 168 pool_->Schedule(std::bind(f, args...)); 169 } 170 171 // Returns a logical thread index between 0 and pool_->NumThreads() - 1 if 172 // called from one of the threads in pool_. Returns -1 otherwise. 173 EIGEN_STRONG_INLINE int currentThreadId() const { 174 return pool_->CurrentThreadId(); 175 } 176 177 // parallelFor executes f with [0, n) arguments in parallel and waits for 178 // completion. F accepts a half-open interval [first, last). 179 // Block size is choosen based on the iteration cost and resulting parallel 180 // efficiency. If block_align is not nullptr, it is called to round up the 181 // block size. 182 void parallelFor(Index n, const TensorOpCost& cost, 183 std::function<Index(Index)> block_align, 184 std::function<void(Index, Index)> f) const { 185 typedef TensorCostModel<ThreadPoolDevice> CostModel; 186 if (n <= 1 || numThreads() == 1 || 187 CostModel::numThreads(n, cost, static_cast<int>(numThreads())) == 1) { 188 f(0, n); 189 return; 190 } 191 192 // Calculate block size based on (1) the iteration cost and (2) parallel 193 // efficiency. We want blocks to be not too small to mitigate 194 // parallelization overheads; not too large to mitigate tail 195 // effect and potential load imbalance and we also want number 196 // of blocks to be evenly dividable across threads. 197 198 double block_size_f = 1.0 / CostModel::taskSize(1, cost); 199 Index block_size = numext::mini(n, numext::maxi<Index>(1, block_size_f)); 200 const Index max_block_size = 201 numext::mini(n, numext::maxi<Index>(1, 2 * block_size_f)); 202 if (block_align) { 203 Index new_block_size = block_align(block_size); 204 eigen_assert(new_block_size >= block_size); 205 block_size = numext::mini(n, new_block_size); 206 } 207 Index block_count = divup(n, block_size); 208 // Calculate parallel efficiency as fraction of total CPU time used for 209 // computations: 210 double max_efficiency = 211 static_cast<double>(block_count) / 212 (divup<int>(block_count, numThreads()) * numThreads()); 213 // Now try to increase block size up to max_block_size as long as it 214 // doesn't decrease parallel efficiency. 215 for (Index prev_block_count = block_count; prev_block_count > 1;) { 216 // This is the next block size that divides size into a smaller number 217 // of blocks than the current block_size. 218 Index coarser_block_size = divup(n, prev_block_count - 1); 219 if (block_align) { 220 Index new_block_size = block_align(coarser_block_size); 221 eigen_assert(new_block_size >= coarser_block_size); 222 coarser_block_size = numext::mini(n, new_block_size); 223 } 224 if (coarser_block_size > max_block_size) { 225 break; // Reached max block size. Stop. 226 } 227 // Recalculate parallel efficiency. 228 const Index coarser_block_count = divup(n, coarser_block_size); 229 eigen_assert(coarser_block_count < prev_block_count); 230 prev_block_count = coarser_block_count; 231 const double coarser_efficiency = 232 static_cast<double>(coarser_block_count) / 233 (divup<int>(coarser_block_count, numThreads()) * numThreads()); 234 if (coarser_efficiency + 0.01 >= max_efficiency) { 235 // Taking it. 236 block_size = coarser_block_size; 237 block_count = coarser_block_count; 238 if (max_efficiency < coarser_efficiency) { 239 max_efficiency = coarser_efficiency; 240 } 241 } 242 } 243 244 // Recursively divide size into halves until we reach block_size. 245 // Division code rounds mid to block_size, so we are guaranteed to get 246 // block_count leaves that do actual computations. 247 Barrier barrier(static_cast<unsigned int>(block_count)); 248 std::function<void(Index, Index)> handleRange; 249 handleRange = [=, &handleRange, &barrier, &f](Index first, Index last) { 250 if (last - first <= block_size) { 251 // Single block or less, execute directly. 252 f(first, last); 253 barrier.Notify(); 254 return; 255 } 256 // Split into halves and submit to the pool. 257 Index mid = first + divup((last - first) / 2, block_size) * block_size; 258 pool_->Schedule([=, &handleRange]() { handleRange(mid, last); }); 259 pool_->Schedule([=, &handleRange]() { handleRange(first, mid); }); 260 }; 261 handleRange(0, n); 262 barrier.Wait(); 263 } 264 265 // Convenience wrapper for parallelFor that does not align blocks. 266 void parallelFor(Index n, const TensorOpCost& cost, 267 std::function<void(Index, Index)> f) const { 268 parallelFor(n, cost, nullptr, std::move(f)); 269 } 270 271 private: 272 ThreadPoolInterface* pool_; 273 int num_threads_; 274 }; 275 276 277 } // end namespace Eigen 278 279 #endif // EIGEN_CXX11_TENSOR_TENSOR_DEVICE_THREAD_POOL_H 280