Home | History | Annotate | Download | only in ThreadPool
      1 // This file is part of Eigen, a lightweight C++ template library
      2 // for linear algebra.
      3 //
      4 // Copyright (C) 2016 Dmitry Vyukov <dvyukov (at) google.com>
      5 //
      6 // This Source Code Form is subject to the terms of the Mozilla
      7 // Public License v. 2.0. If a copy of the MPL was not distributed
      8 // with this file, You can obtain one at http://mozilla.org/MPL/2.0/.
      9 
     10 #ifndef EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
     11 #define EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
     12 
     13 
     14 namespace Eigen {
     15 
     16 // RunQueue is a fixed-size, partially non-blocking deque or Work items.
     17 // Operations on front of the queue must be done by a single thread (owner),
     18 // operations on back of the queue can be done by multiple threads concurrently.
     19 //
     20 // Algorithm outline:
     21 // All remote threads operating on the queue back are serialized by a mutex.
     22 // This ensures that at most two threads access state: owner and one remote
     23 // thread (Size aside). The algorithm ensures that the occupied region of the
     24 // underlying array is logically continuous (can wraparound, but no stray
     25 // occupied elements). Owner operates on one end of this region, remote thread
     26 // operates on the other end. Synchronization between these threads
     27 // (potential consumption of the last element and take up of the last empty
     28 // element) happens by means of state variable in each element. States are:
     29 // empty, busy (in process of insertion of removal) and ready. Threads claim
     30 // elements (empty->busy and ready->busy transitions) by means of a CAS
     31 // operation. The finishing transition (busy->empty and busy->ready) are done
     32 // with plain store as the element is exclusively owned by the current thread.
     33 //
     34 // Note: we could permit only pointers as elements, then we would not need
     35 // separate state variable as null/non-null pointer value would serve as state,
     36 // but that would require malloc/free per operation for large, complex values
     37 // (and this is designed to store std::function<()>).
     38 template <typename Work, unsigned kSize>
     39 class RunQueue {
     40  public:
     41   RunQueue() : front_(0), back_(0) {
     42     // require power-of-two for fast masking
     43     eigen_assert((kSize & (kSize - 1)) == 0);
     44     eigen_assert(kSize > 2);            // why would you do this?
     45     eigen_assert(kSize <= (64 << 10));  // leave enough space for counter
     46     for (unsigned i = 0; i < kSize; i++)
     47       array_[i].state.store(kEmpty, std::memory_order_relaxed);
     48   }
     49 
     50   ~RunQueue() { eigen_assert(Size() == 0); }
     51 
     52   // PushFront inserts w at the beginning of the queue.
     53   // If queue is full returns w, otherwise returns default-constructed Work.
     54   Work PushFront(Work w) {
     55     unsigned front = front_.load(std::memory_order_relaxed);
     56     Elem* e = &array_[front & kMask];
     57     uint8_t s = e->state.load(std::memory_order_relaxed);
     58     if (s != kEmpty ||
     59         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     60       return w;
     61     front_.store(front + 1 + (kSize << 1), std::memory_order_relaxed);
     62     e->w = std::move(w);
     63     e->state.store(kReady, std::memory_order_release);
     64     return Work();
     65   }
     66 
     67   // PopFront removes and returns the first element in the queue.
     68   // If the queue was empty returns default-constructed Work.
     69   Work PopFront() {
     70     unsigned front = front_.load(std::memory_order_relaxed);
     71     Elem* e = &array_[(front - 1) & kMask];
     72     uint8_t s = e->state.load(std::memory_order_relaxed);
     73     if (s != kReady ||
     74         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     75       return Work();
     76     Work w = std::move(e->w);
     77     e->state.store(kEmpty, std::memory_order_release);
     78     front = ((front - 1) & kMask2) | (front & ~kMask2);
     79     front_.store(front, std::memory_order_relaxed);
     80     return w;
     81   }
     82 
     83   // PushBack adds w at the end of the queue.
     84   // If queue is full returns w, otherwise returns default-constructed Work.
     85   Work PushBack(Work w) {
     86     std::unique_lock<std::mutex> lock(mutex_);
     87     unsigned back = back_.load(std::memory_order_relaxed);
     88     Elem* e = &array_[(back - 1) & kMask];
     89     uint8_t s = e->state.load(std::memory_order_relaxed);
     90     if (s != kEmpty ||
     91         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
     92       return w;
     93     back = ((back - 1) & kMask2) | (back & ~kMask2);
     94     back_.store(back, std::memory_order_relaxed);
     95     e->w = std::move(w);
     96     e->state.store(kReady, std::memory_order_release);
     97     return Work();
     98   }
     99 
    100   // PopBack removes and returns the last elements in the queue.
    101   // Can fail spuriously.
    102   Work PopBack() {
    103     if (Empty()) return Work();
    104     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
    105     if (!lock) return Work();
    106     unsigned back = back_.load(std::memory_order_relaxed);
    107     Elem* e = &array_[back & kMask];
    108     uint8_t s = e->state.load(std::memory_order_relaxed);
    109     if (s != kReady ||
    110         !e->state.compare_exchange_strong(s, kBusy, std::memory_order_acquire))
    111       return Work();
    112     Work w = std::move(e->w);
    113     e->state.store(kEmpty, std::memory_order_release);
    114     back_.store(back + 1 + (kSize << 1), std::memory_order_relaxed);
    115     return w;
    116   }
    117 
    118   // PopBackHalf removes and returns half last elements in the queue.
    119   // Returns number of elements removed. But can also fail spuriously.
    120   unsigned PopBackHalf(std::vector<Work>* result) {
    121     if (Empty()) return 0;
    122     std::unique_lock<std::mutex> lock(mutex_, std::try_to_lock);
    123     if (!lock) return 0;
    124     unsigned back = back_.load(std::memory_order_relaxed);
    125     unsigned size = Size();
    126     unsigned mid = back;
    127     if (size > 1) mid = back + (size - 1) / 2;
    128     unsigned n = 0;
    129     unsigned start = 0;
    130     for (; static_cast<int>(mid - back) >= 0; mid--) {
    131       Elem* e = &array_[mid & kMask];
    132       uint8_t s = e->state.load(std::memory_order_relaxed);
    133       if (n == 0) {
    134         if (s != kReady ||
    135             !e->state.compare_exchange_strong(s, kBusy,
    136                                               std::memory_order_acquire))
    137           continue;
    138         start = mid;
    139       } else {
    140         // Note: no need to store temporal kBusy, we exclusively own these
    141         // elements.
    142         eigen_assert(s == kReady);
    143       }
    144       result->push_back(std::move(e->w));
    145       e->state.store(kEmpty, std::memory_order_release);
    146       n++;
    147     }
    148     if (n != 0)
    149       back_.store(start + 1 + (kSize << 1), std::memory_order_relaxed);
    150     return n;
    151   }
    152 
    153   // Size returns current queue size.
    154   // Can be called by any thread at any time.
    155   unsigned Size() const {
    156     // Emptiness plays critical role in thread pool blocking. So we go to great
    157     // effort to not produce false positives (claim non-empty queue as empty).
    158     for (;;) {
    159       // Capture a consistent snapshot of front/tail.
    160       unsigned front = front_.load(std::memory_order_acquire);
    161       unsigned back = back_.load(std::memory_order_acquire);
    162       unsigned front1 = front_.load(std::memory_order_relaxed);
    163       if (front != front1) continue;
    164       int size = (front & kMask2) - (back & kMask2);
    165       // Fix overflow.
    166       if (size < 0) size += 2 * kSize;
    167       // Order of modification in push/pop is crafted to make the queue look
    168       // larger than it is during concurrent modifications. E.g. pop can
    169       // decrement size before the corresponding push has incremented it.
    170       // So the computed size can be up to kSize + 1, fix it.
    171       if (size > static_cast<int>(kSize)) size = kSize;
    172       return size;
    173     }
    174   }
    175 
    176   // Empty tests whether container is empty.
    177   // Can be called by any thread at any time.
    178   bool Empty() const { return Size() == 0; }
    179 
    180  private:
    181   static const unsigned kMask = kSize - 1;
    182   static const unsigned kMask2 = (kSize << 1) - 1;
    183   struct Elem {
    184     std::atomic<uint8_t> state;
    185     Work w;
    186   };
    187   enum {
    188     kEmpty,
    189     kBusy,
    190     kReady,
    191   };
    192   std::mutex mutex_;
    193   // Low log(kSize) + 1 bits in front_ and back_ contain rolling index of
    194   // front/back, repsectively. The remaining bits contain modification counters
    195   // that are incremented on Push operations. This allows us to (1) distinguish
    196   // between empty and full conditions (if we would use log(kSize) bits for
    197   // position, these conditions would be indistinguishable); (2) obtain
    198   // consistent snapshot of front_/back_ for Size operation using the
    199   // modification counters.
    200   std::atomic<unsigned> front_;
    201   std::atomic<unsigned> back_;
    202   Elem array_[kSize];
    203 
    204   RunQueue(const RunQueue&) = delete;
    205   void operator=(const RunQueue&) = delete;
    206 };
    207 
    208 }  // namespace Eigen
    209 
    210 #endif  // EIGEN_CXX11_THREADPOOL_RUNQUEUE_H_
    211