1 // Copyright 2014 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ 6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ 7 8 #include <deque> 9 #include <set> 10 11 #include "base/bind.h" 12 #include "base/callback.h" 13 #include "base/gtest_prod_util.h" 14 #include "base/macros.h" 15 #include "base/memory/weak_ptr.h" 16 #include "base/message_loop/message_loop.h" 17 #include "base/threading/non_thread_safe.h" 18 #include "base/time/time.h" 19 #include "base/timer/timer.h" 20 #include "net/base/backoff_entry.h" 21 22 namespace syncer { 23 24 // A queue that dispatches tasks, ignores duplicates, and provides backoff 25 // semantics. 26 // 27 // |T| is the task type. 28 // 29 // For each task added to the queue, the HandleTaskCallback will eventually be 30 // invoked. For each invocation, the user of TaskQueue must call exactly one of 31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|. 32 // 33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task). 34 // 35 // Example usage: 36 // 37 // void Handle(const Foo& foo); 38 // ... 39 // TaskQueue<Foo> queue(base::Bind(&Handle), 40 // base::TimeDelta::FromSeconds(1), 41 // base::TimeDelta::FromMinutes(1)); 42 // ... 43 // { 44 // Foo foo; 45 // // Add foo to the queue. At some point, Handle will be invoked in this 46 // // message loop. 47 // queue.AddToQueue(foo); 48 // } 49 // ... 50 // void Handle(const Foo& foo) { 51 // DoSomethingWith(foo); 52 // // We must call one of the three methods to tell the queue how we're 53 // // dealing with foo. Of course, we are free to call in the the context of 54 // // this HandleTaskCallback or outside the context if we so choose. 55 // if (SuccessfullyHandled(foo)) { 56 // queue.MarkAsSucceeded(foo); 57 // } else if (Failed(foo)) { 58 // queue.MarkAsFailed(foo); 59 // if (ShouldRetry(foo)) { 60 // queue.AddToQueue(foo); 61 // } 62 // } else { 63 // Cancel(foo); 64 // } 65 // } 66 // 67 template <typename T> 68 class TaskQueue : base::NonThreadSafe { 69 public: 70 // A callback provided by users of the TaskQueue to handle tasks. 71 // 72 // This callback is invoked by the queue with a task to be handled. The 73 // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|, 74 // or |Cancel| to signify completion of the task. 75 typedef base::Callback<void(const T&)> HandleTaskCallback; 76 77 // Construct a TaskQueue. 78 // 79 // |callback| the callback to be invoked for handling tasks. 80 // 81 // |initial_backoff_delay| the initial amount of time the queue will wait 82 // before dispatching tasks after a failed task (see |MarkAsFailed|). May be 83 // zero. Subsequent failures will increase the delay up to 84 // |max_backoff_delay|. 85 // 86 // |max_backoff_delay| the maximum amount of time the queue will wait before 87 // dispatching tasks. May be zero. Must be greater than or equal to 88 // |initial_backoff_delay|. 89 TaskQueue(const HandleTaskCallback& callback, 90 const base::TimeDelta& initial_backoff_delay, 91 const base::TimeDelta& max_backoff_delay); 92 93 // Add |task| to the end of the queue. 94 // 95 // If |task| is already present (as determined by operator==) it is not added. 96 void AddToQueue(const T& task); 97 98 // Mark |task| as completing successfully. 99 // 100 // Marking a task as completing successfully will reduce or eliminate any 101 // backoff delay in effect. 102 // 103 // May only be called after the HandleTaskCallback has been invoked with 104 // |task|. 105 void MarkAsSucceeded(const T& task); 106 107 // Mark |task| as failed. 108 // 109 // Marking a task as failed will cause a backoff, i.e. a delay in dispatching 110 // of subsequent tasks. Repeated failures will increase the delay. 111 // 112 // May only be called after the HandleTaskCallback has been invoked with 113 // |task|. 114 void MarkAsFailed(const T& task); 115 116 // Cancel |task|. 117 // 118 // |task| is removed from the queue and will not be retried. Does not affect 119 // the backoff delay. 120 // 121 // May only be called after the HandleTaskCallback has been invoked with 122 // |task|. 123 void Cancel(const T& task); 124 125 // Reset any backoff delay and resume dispatching of tasks. 126 // 127 // Useful for when you know the cause of previous failures has been resolved 128 // and you want don't want to wait for the accumulated backoff delay to 129 // elapse. 130 void ResetBackoff(); 131 132 // Use |timer| for scheduled events. 133 // 134 // Used in tests. See also MockTimer. 135 void SetTimerForTest(scoped_ptr<base::Timer> timer); 136 137 private: 138 void FinishTask(const T& task); 139 void ScheduleDispatch(); 140 void Dispatch(); 141 // Return true if we should dispatch tasks. 142 bool ShouldDispatch(); 143 144 const HandleTaskCallback process_callback_; 145 net::BackoffEntry::Policy backoff_policy_; 146 scoped_ptr<net::BackoffEntry> backoff_entry_; 147 // The number of tasks currently being handled. 148 int num_in_progress_; 149 std::deque<T> queue_; 150 // The set of tasks in queue_ or currently being handled. 151 std::set<T> tasks_; 152 base::Closure dispatch_closure_; 153 scoped_ptr<base::Timer> backoff_timer_; 154 base::TimeDelta delay_; 155 156 // Must be last data member. 157 base::WeakPtrFactory<TaskQueue> weak_ptr_factory_; 158 159 DISALLOW_COPY_AND_ASSIGN(TaskQueue); 160 }; 161 162 // The maximum number of tasks that may be concurrently executed. Think 163 // carefully before changing this value. The desired behavior of backoff may 164 // not be obvious when there is more than one concurrent task 165 const int kMaxConcurrentTasks = 1; 166 167 template <typename T> 168 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback, 169 const base::TimeDelta& initial_backoff_delay, 170 const base::TimeDelta& max_backoff_delay) 171 : process_callback_(callback), 172 backoff_policy_({}), 173 num_in_progress_(0), 174 weak_ptr_factory_(this) { 175 DCHECK_LE(initial_backoff_delay.InMicroseconds(), 176 max_backoff_delay.InMicroseconds()); 177 backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds(); 178 backoff_policy_.multiply_factor = 2.0; 179 backoff_policy_.jitter_factor = 0.1; 180 backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds(); 181 backoff_policy_.entry_lifetime_ms = -1; 182 backoff_policy_.always_use_initial_delay = false; 183 backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_)); 184 dispatch_closure_ = 185 base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr()); 186 backoff_timer_.reset(new base::Timer(false, false)); 187 } 188 189 template <typename T> 190 void TaskQueue<T>::AddToQueue(const T& task) { 191 DCHECK(CalledOnValidThread()); 192 // Ignore duplicates. 193 if (tasks_.find(task) == tasks_.end()) { 194 queue_.push_back(task); 195 tasks_.insert(task); 196 } 197 ScheduleDispatch(); 198 } 199 200 template <typename T> 201 void TaskQueue<T>::MarkAsSucceeded(const T& task) { 202 DCHECK(CalledOnValidThread()); 203 FinishTask(task); 204 // The task succeeded. Stop any pending timer, reset (clear) the backoff, and 205 // reschedule a dispatch. 206 backoff_timer_->Stop(); 207 backoff_entry_->Reset(); 208 ScheduleDispatch(); 209 } 210 211 template <typename T> 212 void TaskQueue<T>::MarkAsFailed(const T& task) { 213 DCHECK(CalledOnValidThread()); 214 FinishTask(task); 215 backoff_entry_->InformOfRequest(false); 216 ScheduleDispatch(); 217 } 218 219 template <typename T> 220 void TaskQueue<T>::Cancel(const T& task) { 221 DCHECK(CalledOnValidThread()); 222 FinishTask(task); 223 ScheduleDispatch(); 224 } 225 226 template <typename T> 227 void TaskQueue<T>::ResetBackoff() { 228 backoff_timer_->Stop(); 229 backoff_entry_->Reset(); 230 ScheduleDispatch(); 231 } 232 233 template <typename T> 234 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) { 235 DCHECK(CalledOnValidThread()); 236 DCHECK(timer.get()); 237 backoff_timer_ = timer.Pass(); 238 } 239 240 template <typename T> 241 void TaskQueue<T>::FinishTask(const T& task) { 242 DCHECK(CalledOnValidThread()); 243 DCHECK_GE(num_in_progress_, 1); 244 --num_in_progress_; 245 const size_t num_erased = tasks_.erase(task); 246 DCHECK_EQ(1U, num_erased); 247 } 248 249 template <typename T> 250 void TaskQueue<T>::ScheduleDispatch() { 251 DCHECK(CalledOnValidThread()); 252 if (backoff_timer_->IsRunning() || !ShouldDispatch()) { 253 return; 254 } 255 256 backoff_timer_->Start( 257 FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_); 258 } 259 260 template <typename T> 261 void TaskQueue<T>::Dispatch() { 262 DCHECK(CalledOnValidThread()); 263 if (!ShouldDispatch()) { 264 return; 265 } 266 267 DCHECK(!queue_.empty()); 268 const T& task = queue_.front(); 269 ++num_in_progress_; 270 DCHECK_LE(num_in_progress_, kMaxConcurrentTasks); 271 base::MessageLoop::current()->PostTask(FROM_HERE, 272 base::Bind(process_callback_, task)); 273 queue_.pop_front(); 274 } 275 276 template <typename T> 277 bool TaskQueue<T>::ShouldDispatch() { 278 return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty(); 279 } 280 281 } // namespace syncer 282 283 #endif // SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_ 284