Home | History | Annotate | Download | only in kati
      1 // Copyright 2016 Google Inc. All rights reserved
      2 //
      3 // Licensed under the Apache License, Version 2.0 (the "License");
      4 // you may not use this file except in compliance with the License.
      5 // You may obtain a copy of the License at
      6 //
      7 //      http://www.apache.org/licenses/LICENSE-2.0
      8 //
      9 // Unless required by applicable law or agreed to in writing, software
     10 // distributed under the License is distributed on an "AS IS" BASIS,
     11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 // See the License for the specific language governing permissions and
     13 // limitations under the License.
     14 
     15 // +build ignore
     16 
     17 #include "thread_pool.h"
     18 
     19 #include <condition_variable>
     20 #include <mutex>
     21 #include <stack>
     22 #include <thread>
     23 #include <vector>
     24 
     25 #include "affinity.h"
     26 
     27 class ThreadPoolImpl : public ThreadPool {
     28  public:
     29   explicit ThreadPoolImpl(int num_threads)
     30       : is_waiting_(false) {
     31     SetAffinityForMultiThread();
     32     threads_.reserve(num_threads);
     33     for (int i = 0; i < num_threads; i++) {
     34       threads_.push_back(thread([this]() { Loop(); }));
     35     }
     36   }
     37 
     38   virtual ~ThreadPoolImpl() override {
     39   }
     40 
     41   virtual void Submit(function<void(void)> task) override {
     42     unique_lock<mutex> lock(mu_);
     43     tasks_.push(task);
     44     cond_.notify_one();
     45   }
     46 
     47   virtual void Wait() override {
     48     {
     49       unique_lock<mutex> lock(mu_);
     50       is_waiting_ = true;
     51       cond_.notify_all();
     52     }
     53 
     54     for (thread& th : threads_) {
     55       th.join();
     56     }
     57 
     58     SetAffinityForSingleThread();
     59   }
     60 
     61  private:
     62   void Loop() {
     63     while (true) {
     64       function<void(void)> task;
     65       {
     66         unique_lock<mutex> lock(mu_);
     67         if (tasks_.empty()) {
     68           if (is_waiting_)
     69             return;
     70           cond_.wait(lock);
     71         }
     72 
     73         if (tasks_.empty())
     74           continue;
     75 
     76         task = tasks_.top();
     77         tasks_.pop();
     78       }
     79       task();
     80     }
     81   }
     82 
     83   vector<thread> threads_;
     84   mutex mu_;
     85   condition_variable cond_;
     86   stack<function<void(void)>> tasks_;
     87   bool is_waiting_;
     88 };
     89 
     90 ThreadPool* NewThreadPool(int num_threads) {
     91   return new ThreadPoolImpl(num_threads);
     92 }
     93