Home | History | Annotate | Download | only in ThreadPool
      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2014 Benoit Steiner <benoit.steiner.goog (at) gmail.com>
      5 //
      6 // This Source Code Form is subject to the terms of the Mozilla
      7 // Public License v. 2.0. If a copy of the MPL was not distributed
      8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      9 
     10 #ifndef EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
     11 #define EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
     12 
     13 namespace Eigen {
     14 
     15 // The implementation of the ThreadPool type ensures that the Schedule method
     16 // runs the functions it is provided in FIFO order when the scheduling is done
     17 // by a single thread.
     18 // Environment provides a way to create threads and also allows to intercept
     19 // task submission and execution.
     20 template <typename Environment>
     21 class SimpleThreadPoolTempl : public ThreadPoolInterface {
     22  public:
     23   // Construct a pool that contains "num_threads" threads.
     24   explicit SimpleThreadPoolTempl(int num_threads, Environment env = Environment())
     25       : env_(env), threads_(num_threads), waiters_(num_threads) {
     26     for (int i = 0; i < num_threads; i++) {
     27       threads_.push_back(env.CreateThread([this, i]() { WorkerLoop(i); }));
     28     }
     29   }
     30 
     31   // Wait until all scheduled work has finished and then destroy the
     32   // set of threads.
     33   ~SimpleThreadPoolTempl() {
     34     {
     35       // Wait for all work to get done.
     36       std::unique_lock<std::mutex> l(mu_);
     37       while (!pending_.empty()) {
     38         empty_.wait(l);
     39       }
     40       exiting_ = true;
     41 
     42       // Wakeup all waiters.
     43       for (auto w : waiters_) {
     44         w->ready = true;
     45         w->task.f = nullptr;
     46         w->cv.notify_one();
     47       }
     48     }
     49 
     50     // Wait for threads to finish.
     51     for (auto t : threads_) {
     52       delete t;
     53     }
     54   }
     55 
     56   // Schedule fn() for execution in the pool of threads. The functions are
     57   // executed in the order in which they are scheduled.
     58   void Schedule(std::function<void()> fn) final {
     59     Task t = env_.CreateTask(std::move(fn));
     60     std::unique_lock<std::mutex> l(mu_);
     61     if (waiters_.empty()) {
     62       pending_.push_back(std::move(t));
     63     } else {
     64       Waiter* w = waiters_.back();
     65       waiters_.pop_back();
     66       w->ready = true;
     67       w->task = std::move(t);
     68       w->cv.notify_one();
     69     }
     70   }
     71 
     72   int NumThreads() const final {
     73     return static_cast<int>(threads_.size());
     74   }
     75 
     76   int CurrentThreadId() const final {
     77     const PerThread* pt = this->GetPerThread();
     78     if (pt->pool == this) {
     79       return pt->thread_id;
     80     } else {
     81       return -1;
     82     }
     83   }
     84 
     85  protected:
     86   void WorkerLoop(int thread_id) {
     87     std::unique_lock<std::mutex> l(mu_);
     88     PerThread* pt = GetPerThread();
     89     pt->pool = this;
     90     pt->thread_id = thread_id;
     91     Waiter w;
     92     Task t;
     93     while (!exiting_) {
     94       if (pending_.empty()) {
     95         // Wait for work to be assigned to me
     96         w.ready = false;
     97         waiters_.push_back(&w);
     98         while (!w.ready) {
     99           w.cv.wait(l);
    100         }
    101         t = w.task;
    102         w.task.f = nullptr;
    103       } else {
    104         // Pick up pending work
    105         t = std::move(pending_.front());
    106         pending_.pop_front();
    107         if (pending_.empty()) {
    108           empty_.notify_all();
    109         }
    110       }
    111       if (t.f) {
    112         mu_.unlock();
    113         env_.ExecuteTask(t);
    114         t.f = nullptr;
    115         mu_.lock();
    116       }
    117     }
    118   }
    119 
    120  private:
    121   typedef typename Environment::Task Task;
    122   typedef typename Environment::EnvThread Thread;
    123 
    124   struct Waiter {
    125     std::condition_variable cv;
    126     Task task;
    127     bool ready;
    128   };
    129 
    130   struct PerThread {
    131     constexpr PerThread() : pool(NULL), thread_id(-1) { }
    132     SimpleThreadPoolTempl* pool;  // Parent pool, or null for normal threads.
    133     int thread_id;                // Worker thread index in pool.
    134   };
    135 
    136   Environment env_;
    137   std::mutex mu_;
    138   MaxSizeVector<Thread*> threads_;  // All threads
    139   MaxSizeVector<Waiter*> waiters_;  // Stack of waiting threads.
    140   std::deque<Task> pending_;        // Queue of pending work
    141   std::condition_variable empty_;   // Signaled on pending_.empty()
    142   bool exiting_ = false;
    143 
    144   PerThread* GetPerThread() const {
    145     EIGEN_THREAD_LOCAL PerThread per_thread;
    146     return &per_thread;
    147   }
    148 };
    149 
    150 typedef SimpleThreadPoolTempl<StlThreadEnvironment> SimpleThreadPool;
    151 
    152 }  // namespace Eigen
    153 
    154 #endif  // EIGEN_CXX11_THREADPOOL_SIMPLE_THREAD_POOL_H
    155