1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #ifndef ANDROID_QUEUE_WORKER_H_ 18 #define ANDROID_QUEUE_WORKER_H_ 19 20 #include "worker.h" 21 22 #include <queue> 23 24 namespace android { 25 26 template <typename T> 27 class QueueWorker : public Worker { 28 public: 29 static const size_t kDefaultMaxQueueSize = 2; 30 static const int64_t kTimeoutDisabled = -1; 31 32 QueueWorker(const char *name, int priority) 33 : Worker(name, priority), 34 max_queue_size_(kDefaultMaxQueueSize), 35 queue_timeout_ms_(kTimeoutDisabled), 36 idle_timeout_ms_(kTimeoutDisabled), 37 idled_out_(false) { 38 } 39 40 int QueueWork(std::unique_ptr<T> workitem); 41 42 bool IsWorkPending() const { 43 return !queue_.empty(); 44 } 45 bool idle() const { 46 return idled_out_; 47 } 48 49 int64_t idle_timeout() { 50 return idle_timeout_ms_; 51 } 52 void set_idle_timeout(int64_t timeout_ms) { 53 idle_timeout_ms_ = timeout_ms; 54 } 55 56 int64_t queue_timeout() { 57 return queue_timeout_ms_; 58 } 59 void set_queue_timeout(int64_t timeout_ms) { 60 queue_timeout_ms_ = timeout_ms; 61 } 62 63 size_t max_queue_size() const { 64 return max_queue_size_; 65 } 66 void set_max_queue_size(size_t size) { 67 max_queue_size_ = size; 68 } 69 70 protected: 71 virtual void ProcessWork(std::unique_ptr<T> workitem) = 0; 72 virtual void ProcessIdle(){} 73 virtual void Routine(); 74 75 template <typename Predicate> 76 int WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred, 77 int64_t max_msecs); 78 79 private: 80 std::queue<std::unique_ptr<T>> queue_; 81 size_t max_queue_size_; 82 int64_t queue_timeout_ms_; 83 int64_t idle_timeout_ms_; 84 bool idled_out_; 85 }; 86 87 template <typename T> 88 template <typename Predicate> 89 int QueueWorker<T>::WaitCond(std::unique_lock<std::mutex> &lock, Predicate pred, 90 int64_t max_msecs) { 91 bool ret = true; 92 auto wait_func = [&] { return pred() || should_exit(); }; 93 94 if (max_msecs < 0) { 95 cond_.wait(lock, wait_func); 96 } else { 97 auto timeout = std::chrono::milliseconds(max_msecs); 98 ret = cond_.wait_for(lock, timeout, wait_func); 99 } 100 101 if (!ret) 102 return -ETIMEDOUT; 103 else if (should_exit()) 104 return -EINTR; 105 106 return 0; 107 } 108 109 template <typename T> 110 void QueueWorker<T>::Routine() { 111 std::unique_lock<std::mutex> lk(mutex_); 112 std::unique_ptr<T> workitem; 113 114 auto wait_func = [&] { return !queue_.empty(); }; 115 int ret = 116 WaitCond(lk, wait_func, idled_out_ ? kTimeoutDisabled : idle_timeout_ms_); 117 switch (ret) { 118 case 0: 119 break; 120 case -ETIMEDOUT: 121 ProcessIdle(); 122 idled_out_ = true; 123 return; 124 case -EINTR: 125 default: 126 return; 127 } 128 129 if (!queue_.empty()) { 130 workitem = std::move(queue_.front()); 131 queue_.pop(); 132 } 133 lk.unlock(); 134 cond_.notify_all(); 135 136 idled_out_ = false; 137 ProcessWork(std::move(workitem)); 138 } 139 140 template <typename T> 141 int QueueWorker<T>::QueueWork(std::unique_ptr<T> workitem) { 142 std::unique_lock<std::mutex> lk(mutex_); 143 144 auto wait_func = [&] { return queue_.size() < max_queue_size_; }; 145 int ret = WaitCond(lk, wait_func, queue_timeout_ms_); 146 if (ret) 147 return ret; 148 149 queue_.push(std::move(workitem)); 150 lk.unlock(); 151 152 cond_.notify_one(); 153 154 return 0; 155 } 156 }; 157 #endif 158