1 /* 2 * Copyright 2014 Google Inc. 3 * 4 * Use of this source code is governed by a BSD-style license that can be 5 * found in the LICENSE file. 6 */ 7 8 #include "SkOnce.h" 9 #include "SkSemaphore.h" 10 #include "SkSpinlock.h" 11 #include "SkTArray.h" 12 #include "SkTDArray.h" 13 #include "SkTaskGroup.h" 14 #include "SkThreadUtils.h" 15 16 #if defined(SK_BUILD_FOR_WIN32) 17 static void query_num_cores(int* num_cores) { 18 SYSTEM_INFO sysinfo; 19 GetNativeSystemInfo(&sysinfo); 20 *num_cores = sysinfo.dwNumberOfProcessors; 21 } 22 #else 23 #include <unistd.h> 24 static void query_num_cores(int* num_cores) { 25 *num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN); 26 } 27 #endif 28 29 // We cache sk_num_cores() so we only query the OS once. 30 SK_DECLARE_STATIC_ONCE(g_query_num_cores_once); 31 int sk_num_cores() { 32 static int num_cores = 0; 33 SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores); 34 SkASSERT(num_cores > 0); 35 return num_cores; 36 } 37 38 namespace { 39 40 class ThreadPool : SkNoncopyable { 41 public: 42 static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { 43 if (!gGlobal) { 44 return fn(); 45 } 46 gGlobal->add(fn, pending); 47 } 48 49 static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) { 50 if (!gGlobal) { 51 for (int i = 0; i < N; i++) { fn(i); } 52 return; 53 } 54 gGlobal->batch(N, fn, pending); 55 } 56 57 static void Wait(SkAtomic<int32_t>* pending) { 58 if (!gGlobal) { // If we have no threads, the work must already be done. 59 SkASSERT(pending->load(sk_memory_order_relaxed) == 0); 60 return; 61 } 62 // Acquire pairs with decrement release here or in Loop. 63 while (pending->load(sk_memory_order_acquire) > 0) { 64 // Lend a hand until our SkTaskGroup of interest is done. 65 Work work; 66 { 67 // We're stealing work opportunistically, 68 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work. 69 // This means fWorkAvailable is only an upper bound on fWork.count(). 70 AutoLock lock(&gGlobal->fWorkLock); 71 if (gGlobal->fWork.empty()) { 72 // Someone has picked up all the work (including ours). How nice of them! 73 // (They may still be working on it, so we can't assert *pending == 0 here.) 74 continue; 75 } 76 work = gGlobal->fWork.back(); 77 gGlobal->fWork.pop_back(); 78 } 79 // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine. 80 // We threads gotta stick together. We're always making forward progress. 81 work.fn(); 82 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load above. 83 } 84 } 85 86 private: 87 struct AutoLock { 88 AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); } 89 ~AutoLock() { fLock->release(); } 90 private: 91 SkSpinlock* fLock; 92 }; 93 94 struct Work { 95 std::function<void(void)> fn; // A function to call 96 SkAtomic<int32_t>* pending; // then decrement pending afterwards. 97 }; 98 99 explicit ThreadPool(int threads) { 100 if (threads == -1) { 101 threads = sk_num_cores(); 102 } 103 for (int i = 0; i < threads; i++) { 104 fThreads.push(new SkThread(&ThreadPool::Loop, this)); 105 fThreads.top()->start(); 106 } 107 } 108 109 ~ThreadPool() { 110 SkASSERT(fWork.empty()); // All SkTaskGroups should be destroyed by now. 111 112 // Send a poison pill to each thread. 113 SkAtomic<int> dummy(0); 114 for (int i = 0; i < fThreads.count(); i++) { 115 this->add(nullptr, &dummy); 116 } 117 // Wait for them all to swallow the pill and die. 118 for (int i = 0; i < fThreads.count(); i++) { 119 fThreads[i]->join(); 120 } 121 SkASSERT(fWork.empty()); // Can't hurt to double check. 122 fThreads.deleteAll(); 123 } 124 125 void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) { 126 Work work = { fn, pending }; 127 pending->fetch_add(+1, sk_memory_order_relaxed); // No barrier needed. 128 { 129 AutoLock lock(&fWorkLock); 130 fWork.push_back(work); 131 } 132 fWorkAvailable.signal(1); 133 } 134 135 void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) { 136 pending->fetch_add(+N, sk_memory_order_relaxed); // No barrier needed. 137 { 138 AutoLock lock(&fWorkLock); 139 for (int i = 0; i < N; i++) { 140 Work work = { [i, fn]() { fn(i); }, pending }; 141 fWork.push_back(work); 142 } 143 } 144 fWorkAvailable.signal(N); 145 } 146 147 static void Loop(void* arg) { 148 ThreadPool* pool = (ThreadPool*)arg; 149 Work work; 150 while (true) { 151 // Sleep until there's work available, and claim one unit of Work as we wake. 152 pool->fWorkAvailable.wait(); 153 { 154 AutoLock lock(&pool->fWorkLock); 155 if (pool->fWork.empty()) { 156 // Someone in Wait() stole our work (fWorkAvailable is an upper bound). 157 // Well, that's fine, back to sleep for us. 158 continue; 159 } 160 work = pool->fWork.back(); 161 pool->fWork.pop_back(); 162 } 163 if (!work.fn) { 164 return; // Poison pill. Time... to die. 165 } 166 work.fn(); 167 work.pending->fetch_add(-1, sk_memory_order_release); // Pairs with load in Wait(). 168 } 169 } 170 171 // fWorkLock must be held when reading or modifying fWork. 172 SkSpinlock fWorkLock; 173 SkTArray<Work> fWork; 174 175 // A thread-safe upper bound for fWork.count(). 176 // 177 // We'd have it be an exact count but for the loop in Wait(): 178 // we never want that to block, so it can't call fWorkAvailable.wait(), 179 // and that's the only way to decrement fWorkAvailable. 180 // So fWorkAvailable may overcount actual the work available. 181 // We make do, but this means some worker threads may wake spuriously. 182 SkSemaphore fWorkAvailable; 183 184 // These are only changed in a single-threaded context. 185 SkTDArray<SkThread*> fThreads; 186 static ThreadPool* gGlobal; 187 188 friend struct SkTaskGroup::Enabler; 189 }; 190 ThreadPool* ThreadPool::gGlobal = nullptr; 191 192 } // namespace 193 194 SkTaskGroup::Enabler::Enabler(int threads) { 195 SkASSERT(ThreadPool::gGlobal == nullptr); 196 if (threads != 0) { 197 ThreadPool::gGlobal = new ThreadPool(threads); 198 } 199 } 200 201 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; } 202 203 SkTaskGroup::SkTaskGroup() : fPending(0) {} 204 205 void SkTaskGroup::wait() { ThreadPool::Wait(&fPending); } 206 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); } 207 void SkTaskGroup::batch(int N, std::function<void(int)> fn) { 208 ThreadPool::Batch(N, fn, &fPending); 209 } 210 211