Home | History | Annotate | Download | only in core
      1 #include "SkTaskGroup.h"
      2 
      3 #include "SkCondVar.h"
      4 #include "SkRunnable.h"
      5 #include "SkTDArray.h"
      6 #include "SkThread.h"
      7 #include "SkThreadUtils.h"
      8 
      9 #if defined(SK_BUILD_FOR_WIN32)
     10     static inline int num_cores() {
     11         SYSTEM_INFO sysinfo;
     12         GetSystemInfo(&sysinfo);
     13         return sysinfo.dwNumberOfProcessors;
     14     }
     15 #else
     16     #include <unistd.h>
     17     static inline int num_cores() {
     18         return (int) sysconf(_SC_NPROCESSORS_ONLN);
     19     }
     20 #endif
     21 
     22 namespace {
     23 
     24 class ThreadPool : SkNoncopyable {
     25 public:
     26     static void Add(SkRunnable* task, int32_t* pending) {
     27         if (!gGlobal) {  // If we have no threads, run synchronously.
     28             return task->run();
     29         }
     30         gGlobal->add(&CallRunnable, task, pending);
     31     }
     32 
     33     static void Add(void (*fn)(void*), void* arg, int32_t* pending) {
     34         if (!gGlobal) {
     35             return fn(arg);
     36         }
     37         gGlobal->add(fn, arg, pending);
     38     }
     39 
     40     static void Batch(void (*fn)(void*), void* args, int N, size_t stride, int32_t* pending) {
     41         if (!gGlobal) {
     42             for (int i = 0; i < N; i++) { fn((char*)args + i*stride); }
     43             return;
     44         }
     45         gGlobal->batch(fn, args, N, stride, pending);
     46     }
     47 
     48     static void Wait(int32_t* pending) {
     49         if (!gGlobal) {  // If we have no threads, the work must already be done.
     50             SkASSERT(*pending == 0);
     51             return;
     52         }
     53         while (sk_acquire_load(pending) > 0) {  // Pairs with sk_atomic_dec here or in Loop.
     54             // Lend a hand until our SkTaskGroup of interest is done.
     55             Work work;
     56             {
     57                 AutoLock lock(&gGlobal->fReady);
     58                 if (gGlobal->fWork.isEmpty()) {
     59                     // Someone has picked up all the work (including ours).  How nice of them!
     60                     // (They may still be working on it, so we can't assert *pending == 0 here.)
     61                     continue;
     62                 }
     63                 gGlobal->fWork.pop(&work);
     64             }
     65             // This Work isn't necessarily part of our SkTaskGroup of interest, but that's fine.
     66             // We threads gotta stick together.  We're always making forward progress.
     67             work.fn(work.arg);
     68             sk_atomic_dec(work.pending);  // Release pairs with the sk_acquire_load() just above.
     69         }
     70     }
     71 
     72 private:
     73     struct AutoLock {
     74         AutoLock(SkCondVar* c) : fC(c) { fC->lock(); }
     75         ~AutoLock() { fC->unlock(); }
     76     private:
     77         SkCondVar* fC;
     78     };
     79 
     80     static void CallRunnable(void* arg) { static_cast<SkRunnable*>(arg)->run(); }
     81 
     82     struct Work {
     83         void (*fn)(void*);  // A function to call,
     84         void* arg;          // its argument,
     85         int32_t* pending;   // then sk_atomic_dec(pending) afterwards.
     86     };
     87 
     88     explicit ThreadPool(int threads) : fDraining(false) {
     89         if (threads == -1) {
     90             threads = num_cores();
     91         }
     92         for (int i = 0; i < threads; i++) {
     93             fThreads.push(SkNEW_ARGS(SkThread, (&ThreadPool::Loop, this)));
     94             fThreads.top()->start();
     95         }
     96     }
     97 
     98     ~ThreadPool() {
     99         SkASSERT(fWork.isEmpty());  // All SkTaskGroups should be destroyed by now.
    100         {
    101             AutoLock lock(&fReady);
    102             fDraining = true;
    103             fReady.broadcast();
    104         }
    105         for (int i = 0; i < fThreads.count(); i++) {
    106             fThreads[i]->join();
    107         }
    108         SkASSERT(fWork.isEmpty());  // Can't hurt to double check.
    109         fThreads.deleteAll();
    110     }
    111 
    112     void add(void (*fn)(void*), void* arg, int32_t* pending) {
    113         Work work = { fn, arg, pending };
    114         sk_atomic_inc(pending);  // No barrier needed.
    115         {
    116             AutoLock lock(&fReady);
    117             fWork.push(work);
    118             fReady.signal();
    119         }
    120     }
    121 
    122     void batch(void (*fn)(void*), void* arg, int N, size_t stride, int32_t* pending) {
    123         sk_atomic_add(pending, N);  // No barrier needed.
    124         {
    125             AutoLock lock(&fReady);
    126             Work* batch = fWork.append(N);
    127             for (int i = 0; i < N; i++) {
    128                 Work work = { fn, (char*)arg + i*stride, pending };
    129                 batch[i] = work;
    130             }
    131             fReady.broadcast();
    132         }
    133     }
    134 
    135     static void Loop(void* arg) {
    136         ThreadPool* pool = (ThreadPool*)arg;
    137         Work work;
    138         while (true) {
    139             {
    140                 AutoLock lock(&pool->fReady);
    141                 while (pool->fWork.isEmpty()) {
    142                     if (pool->fDraining) {
    143                         return;
    144                     }
    145                     pool->fReady.wait();
    146                 }
    147                 pool->fWork.pop(&work);
    148             }
    149             work.fn(work.arg);
    150             sk_atomic_dec(work.pending);  // Release pairs with sk_acquire_load() in Wait().
    151         }
    152     }
    153 
    154     SkTDArray<Work>      fWork;
    155     SkTDArray<SkThread*> fThreads;
    156     SkCondVar            fReady;
    157     bool                 fDraining;
    158 
    159     static ThreadPool* gGlobal;
    160     friend struct SkTaskGroup::Enabler;
    161 };
    162 ThreadPool* ThreadPool::gGlobal = NULL;
    163 
    164 }  // namespace
    165 
    166 SkTaskGroup::Enabler::Enabler(int threads) {
    167     SkASSERT(ThreadPool::gGlobal == NULL);
    168     if (threads != 0 && SkCondVar::Supported()) {
    169         ThreadPool::gGlobal = SkNEW_ARGS(ThreadPool, (threads));
    170     }
    171 }
    172 
    173 SkTaskGroup::Enabler::~Enabler() {
    174     SkDELETE(ThreadPool::gGlobal);
    175 }
    176 
    177 SkTaskGroup::SkTaskGroup() : fPending(0) {}
    178 
    179 void SkTaskGroup::wait()                            { ThreadPool::Wait(&fPending); }
    180 void SkTaskGroup::add(SkRunnable* task)             { ThreadPool::Add(task, &fPending); }
    181 void SkTaskGroup::add(void (*fn)(void*), void* arg) { ThreadPool::Add(fn, arg, &fPending); }
    182 void SkTaskGroup::batch (void (*fn)(void*), void* args, int N, size_t stride) {
    183     ThreadPool::Batch(fn, args, N, stride, &fPending);
    184 }
    185 
    186