Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright 2014 Google Inc.
      3  *
      4  * Use of this source code is governed by a BSD-style license that can be
      5  * found in the LICENSE file.
      6  */
      7 
      8 #include "SkOnce.h"
      9 #include "SkSemaphore.h"
     10 #include "SkSpinlock.h"
     11 #include "SkTArray.h"
     12 #include "SkTDArray.h"
     13 #include "SkTaskGroup.h"
     14 #include "SkThreadUtils.h"
     15 
     16 #if defined(SK_BUILD_FOR_WIN32)
     17     static void query_num_cores(int* num_cores) {
     18         SYSTEM_INFO sysinfo;
     19         GetNativeSystemInfo(&sysinfo);
     20         *num_cores = sysinfo.dwNumberOfProcessors;
     21     }
     22 #else
     23     #include <unistd.h>
     24     static void query_num_cores(int* num_cores) {
     25         *num_cores = (int)sysconf(_SC_NPROCESSORS_ONLN);
     26     }
     27 #endif
     28 
     29 // We cache sk_num_cores() so we only query the OS once.
     30 SK_DECLARE_STATIC_ONCE(g_query_num_cores_once);
     31 int sk_num_cores() {
     32     static int num_cores = 0;
     33     SkOnce(&g_query_num_cores_once, query_num_cores, &num_cores);
     34     SkASSERT(num_cores > 0);
     35     return num_cores;
     36 }
     37 
     38 namespace {
     39 
     40 class ThreadPool : SkNoncopyable {
     41 public:
     42     static void Add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
     43         if (!gGlobal) {
     44             return fn();
     45         }
     46         gGlobal->add(fn, pending);
     47     }
     48 
     49     static void Batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
     50         if (!gGlobal) {
     51             for (int i = 0; i < N; i++) { fn(i); }
     52             return;
     53         }
     54         gGlobal->batch(N, fn, pending);
     55     }
     56 
     57     static void Wait(SkAtomic<int32_t>* pending) {
     58         if (!gGlobal) {  // If we have no threads, the work must already be done.
     59             SkASSERT(pending->load(sk_memory_order_relaxed) == 0);
     60             return;
     61         }
     62         // Acquire pairs with decrement release here or in Loop.
     63         while (pending->load(sk_memory_order_acquire) > 0) {
     64             // Lend a hand until our SkTaskGroup of interest is done.
     65             Work work;
     66             {
     67                 // We're stealing work opportunistically,
     68                 // so we never call fWorkAvailable.wait(), which could sleep us if there's no work.
     69                 // This means fWorkAvailable is only an upper bound on fWork.count().
     70                 AutoLock lock(&gGlobal->fWorkLock);
     71                 if (gGlobal->fWork.empty()) {
     72                     // Someone has picked up all the work (including ours).  How nice of them!
     73                     // (They may still be working on it, so we can't assert *pending == 0 here.)
     74                     continue;
     75                 }
     76                 work = gGlobal->fWork.back();
     77                 gGlobal->fWork.pop_back();
     78             }
     79             // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
     80             // We threads gotta stick together.  We're always making forward progress.
     81             work.fn();
     82             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with load above.
     83         }
     84     }
     85 
     86 private:
     87     struct AutoLock {
     88         AutoLock(SkSpinlock* lock) : fLock(lock) { fLock->acquire(); }
     89         ~AutoLock() { fLock->release(); }
     90     private:
     91         SkSpinlock* fLock;
     92     };
     93 
     94     struct Work {
     95         std::function<void(void)> fn; // A function to call
     96         SkAtomic<int32_t>* pending;   // then decrement pending afterwards.
     97     };
     98 
     99     explicit ThreadPool(int threads) {
    100         if (threads == -1) {
    101             threads = sk_num_cores();
    102         }
    103         for (int i = 0; i < threads; i++) {
    104             fThreads.push(new SkThread(&ThreadPool::Loop, this));
    105             fThreads.top()->start();
    106         }
    107     }
    108 
    109     ~ThreadPool() {
    110         SkASSERT(fWork.empty());  // All SkTaskGroups should be destroyed by now.
    111 
    112         // Send a poison pill to each thread.
    113         SkAtomic<int> dummy(0);
    114         for (int i = 0; i < fThreads.count(); i++) {
    115             this->add(nullptr, &dummy);
    116         }
    117         // Wait for them all to swallow the pill and die.
    118         for (int i = 0; i < fThreads.count(); i++) {
    119             fThreads[i]->join();
    120         }
    121         SkASSERT(fWork.empty());  // Can't hurt to double check.
    122         fThreads.deleteAll();
    123     }
    124 
    125     void add(std::function<void(void)> fn, SkAtomic<int32_t>* pending) {
    126         Work work = { fn, pending };
    127         pending->fetch_add(+1, sk_memory_order_relaxed);  // No barrier needed.
    128         {
    129             AutoLock lock(&fWorkLock);
    130             fWork.push_back(work);
    131         }
    132         fWorkAvailable.signal(1);
    133     }
    134 
    135     void batch(int N, std::function<void(int)> fn, SkAtomic<int32_t>* pending) {
    136         pending->fetch_add(+N, sk_memory_order_relaxed);  // No barrier needed.
    137         {
    138             AutoLock lock(&fWorkLock);
    139             for (int i = 0; i < N; i++) {
    140                 Work work = { [i, fn]() { fn(i); }, pending };
    141                 fWork.push_back(work);
    142             }
    143         }
    144         fWorkAvailable.signal(N);
    145     }
    146 
    147     static void Loop(void* arg) {
    148         ThreadPool* pool = (ThreadPool*)arg;
    149         Work work;
    150         while (true) {
    151             // Sleep until there's work available, and claim one unit of Work as we wake.
    152             pool->fWorkAvailable.wait();
    153             {
    154                 AutoLock lock(&pool->fWorkLock);
    155                 if (pool->fWork.empty()) {
    156                     // Someone in Wait() stole our work (fWorkAvailable is an upper bound).
    157                     // Well, that's fine, back to sleep for us.
    158                     continue;
    159                 }
    160                 work = pool->fWork.back();
    161                 pool->fWork.pop_back();
    162             }
    163             if (!work.fn) {
    164                 return;  // Poison pill.  Time... to die.
    165             }
    166             work.fn();
    167             work.pending->fetch_add(-1, sk_memory_order_release);  // Pairs with load in Wait().
    168         }
    169     }
    170 
    171     // fWorkLock must be held when reading or modifying fWork.
    172     SkSpinlock      fWorkLock;
    173     SkTArray<Work>  fWork;
    174 
    175     // A thread-safe upper bound for fWork.count().
    176     //
    177     // We'd have it be an exact count but for the loop in Wait():
    178     // we never want that to block, so it can't call fWorkAvailable.wait(),
    179     // and that's the only way to decrement fWorkAvailable.
    180     // So fWorkAvailable may overcount actual the work available.
    181     // We make do, but this means some worker threads may wake spuriously.
    182     SkSemaphore fWorkAvailable;
    183 
    184     // These are only changed in a single-threaded context.
    185     SkTDArray<SkThread*> fThreads;
    186     static ThreadPool* gGlobal;
    187 
    188     friend struct SkTaskGroup::Enabler;
    189 };
    190 ThreadPool* ThreadPool::gGlobal = nullptr;
    191 
    192 }  // namespace
    193 
    194 SkTaskGroup::Enabler::Enabler(int threads) {
    195     SkASSERT(ThreadPool::gGlobal == nullptr);
    196     if (threads != 0) {
    197         ThreadPool::gGlobal = new ThreadPool(threads);
    198     }
    199 }
    200 
    201 SkTaskGroup::Enabler::~Enabler() { delete ThreadPool::gGlobal; }
    202 
    203 SkTaskGroup::SkTaskGroup() : fPending(0) {}
    204 
    205 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
    206 void SkTaskGroup::add(std::function<void(void)> fn) { ThreadPool::Add(fn, &fPending); }
    207 void SkTaskGroup::batch(int N, std::function<void(int)> fn) {
    208     ThreadPool::Batch(N, fn, &fPending);
    209 }
    210 
    211