Home | History | Annotate | Download | only in kati
      1 // Copyright 2016 Google Inc. All rights reserved
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //      http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 #include "thread_pool.h"
     16 
     17 #include <condition_variable>
     18 #include <mutex>
     19 #include <stack>
     20 #include <thread>
     21 #include <vector>
     22 
     23 #include "affinity.h"
     24 
     25 class ThreadPoolImpl : public ThreadPool {
     26  public:
     27   explicit ThreadPoolImpl(int num_threads)
     28       : is_waiting_(false) {
     29     SetAffinityForMultiThread();
     30     threads_.reserve(num_threads);
     31     for (int i = 0; i < num_threads; i++) {
     32       threads_.push_back(thread([this]() { Loop(); }));
     33     }
     34   }
     35 
     36   virtual ~ThreadPoolImpl() override {
     37   }
     38 
     39   virtual void Submit(function<void(void)> task) override {
     40     unique_lock<mutex> lock(mu_);
     41     tasks_.push(task);
     42     cond_.notify_one();
     43   }
     44 
     45   virtual void Wait() override {
     46     {
     47       unique_lock<mutex> lock(mu_);
     48       is_waiting_ = true;
     49       cond_.notify_all();
     50     }
     51 
     52     for (thread& th : threads_) {
     53       th.join();
     54     }
     55 
     56     SetAffinityForSingleThread();
     57   }
     58 
     59  private:
     60   void Loop() {
     61     while (true) {
     62       function<void(void)> task;
     63       {
     64         unique_lock<mutex> lock(mu_);
     65         if (tasks_.empty()) {
     66           if (is_waiting_)
     67             return;
     68           cond_.wait(lock);
     69         }
     70 
     71         if (tasks_.empty())
     72           continue;
     73 
     74         task = tasks_.top();
     75         tasks_.pop();
     76       }
     77       task();
     78     }
     79   }
     80 
     81   vector<thread> threads_;
     82   mutex mu_;
     83   condition_variable cond_;
     84   stack<function<void(void)>> tasks_;
     85   bool is_waiting_;
     86 };
     87 
     88 ThreadPool* NewThreadPool(int num_threads) {
     89   return new ThreadPoolImpl(num_threads);
     90 }
     91