Home | History | Annotate | Download | only in attachments
      1 // Copyright 2014 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
      6 #define SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
      7 
      8 #include <deque>
      9 #include <set>
     10 
     11 #include "base/bind.h"
     12 #include "base/callback.h"
     13 #include "base/gtest_prod_util.h"
     14 #include "base/macros.h"
     15 #include "base/memory/weak_ptr.h"
     16 #include "base/message_loop/message_loop.h"
     17 #include "base/threading/non_thread_safe.h"
     18 #include "base/time/time.h"
     19 #include "base/timer/timer.h"
     20 #include "net/base/backoff_entry.h"
     21 
     22 namespace syncer {
     23 
     24 // A queue that dispatches tasks, ignores duplicates, and provides backoff
     25 // semantics.
     26 //
     27 // |T| is the task type.
     28 //
     29 // For each task added to the queue, the HandleTaskCallback will eventually be
     30 // invoked.  For each invocation, the user of TaskQueue must call exactly one of
     31 // |MarkAsSucceeded|, |MarkAsFailed|, or |Cancel|.
     32 //
     33 // To retry a failed task, call MarkAsFailed(task) then AddToQueue(task).
     34 //
     35 // Example usage:
     36 //
     37 // void Handle(const Foo& foo);
     38 // ...
     39 // TaskQueue<Foo> queue(base::Bind(&Handle),
     40 //                      base::TimeDelta::FromSeconds(1),
     41 //                      base::TimeDelta::FromMinutes(1));
     42 // ...
     43 // {
     44 //   Foo foo;
     45 //   // Add foo to the queue.  At some point, Handle will be invoked in this
     46 //   // message loop.
     47 //   queue.AddToQueue(foo);
     48 // }
     49 // ...
     50 // void Handle(const Foo& foo) {
     51 //   DoSomethingWith(foo);
     52 //   // We must call one of the three methods to tell the queue how we're
     53 //   // dealing with foo.  Of course, we are free to call in the the context of
     54 //   // this HandleTaskCallback or outside the context if we so choose.
     55 //   if (SuccessfullyHandled(foo)) {
     56 //     queue.MarkAsSucceeded(foo);
     57 //   } else if (Failed(foo)) {
     58 //     queue.MarkAsFailed(foo);
     59 //     if (ShouldRetry(foo)) {
     60 //       queue.AddToQueue(foo);
     61 //     }
     62 //   } else {
     63 //     Cancel(foo);
     64 //   }
     65 // }
     66 //
     67 template <typename T>
     68 class TaskQueue : base::NonThreadSafe {
     69  public:
     70   // A callback provided by users of the TaskQueue to handle tasks.
     71   //
     72   // This callback is invoked by the queue with a task to be handled.  The
     73   // callee is expected to (eventually) call |MarkAsSucceeded|, |MarkAsFailed|,
     74   // or |Cancel| to signify completion of the task.
     75   typedef base::Callback<void(const T&)> HandleTaskCallback;
     76 
     77   // Construct a TaskQueue.
     78   //
     79   // |callback| the callback to be invoked for handling tasks.
     80   //
     81   // |initial_backoff_delay| the initial amount of time the queue will wait
     82   // before dispatching tasks after a failed task (see |MarkAsFailed|).  May be
     83   // zero.  Subsequent failures will increase the delay up to
     84   // |max_backoff_delay|.
     85   //
     86   // |max_backoff_delay| the maximum amount of time the queue will wait before
     87   // dispatching tasks.  May be zero.  Must be greater than or equal to
     88   // |initial_backoff_delay|.
     89   TaskQueue(const HandleTaskCallback& callback,
     90             const base::TimeDelta& initial_backoff_delay,
     91             const base::TimeDelta& max_backoff_delay);
     92 
     93   // Add |task| to the end of the queue.
     94   //
     95   // If |task| is already present (as determined by operator==) it is not added.
     96   void AddToQueue(const T& task);
     97 
     98   // Mark |task| as completing successfully.
     99   //
    100   // Marking a task as completing successfully will reduce or eliminate any
    101   // backoff delay in effect.
    102   //
    103   // May only be called after the HandleTaskCallback has been invoked with
    104   // |task|.
    105   void MarkAsSucceeded(const T& task);
    106 
    107   // Mark |task| as failed.
    108   //
    109   // Marking a task as failed will cause a backoff, i.e. a delay in dispatching
    110   // of subsequent tasks.  Repeated failures will increase the delay.
    111   //
    112   // May only be called after the HandleTaskCallback has been invoked with
    113   // |task|.
    114   void MarkAsFailed(const T& task);
    115 
    116   // Cancel |task|.
    117   //
    118   // |task| is removed from the queue and will not be retried.  Does not affect
    119   // the backoff delay.
    120   //
    121   // May only be called after the HandleTaskCallback has been invoked with
    122   // |task|.
    123   void Cancel(const T& task);
    124 
    125   // Reset any backoff delay and resume dispatching of tasks.
    126   //
    127   // Useful for when you know the cause of previous failures has been resolved
    128   // and you want don't want to wait for the accumulated backoff delay to
    129   // elapse.
    130   void ResetBackoff();
    131 
    132   // Use |timer| for scheduled events.
    133   //
    134   // Used in tests.  See also MockTimer.
    135   void SetTimerForTest(scoped_ptr<base::Timer> timer);
    136 
    137  private:
    138   void FinishTask(const T& task);
    139   void ScheduleDispatch();
    140   void Dispatch();
    141   // Return true if we should dispatch tasks.
    142   bool ShouldDispatch();
    143 
    144   const HandleTaskCallback process_callback_;
    145   net::BackoffEntry::Policy backoff_policy_;
    146   scoped_ptr<net::BackoffEntry> backoff_entry_;
    147   // The number of tasks currently being handled.
    148   int num_in_progress_;
    149   std::deque<T> queue_;
    150   // The set of tasks in queue_ or currently being handled.
    151   std::set<T> tasks_;
    152   base::Closure dispatch_closure_;
    153   scoped_ptr<base::Timer> backoff_timer_;
    154   base::TimeDelta delay_;
    155 
    156   // Must be last data member.
    157   base::WeakPtrFactory<TaskQueue> weak_ptr_factory_;
    158 
    159   DISALLOW_COPY_AND_ASSIGN(TaskQueue);
    160 };
    161 
    162 // The maximum number of tasks that may be concurrently executed.  Think
    163 // carefully before changing this value.  The desired behavior of backoff may
    164 // not be obvious when there is more than one concurrent task
    165 const int kMaxConcurrentTasks = 1;
    166 
    167 template <typename T>
    168 TaskQueue<T>::TaskQueue(const HandleTaskCallback& callback,
    169                         const base::TimeDelta& initial_backoff_delay,
    170                         const base::TimeDelta& max_backoff_delay)
    171     : process_callback_(callback),
    172       backoff_policy_({}),
    173       num_in_progress_(0),
    174       weak_ptr_factory_(this) {
    175   DCHECK_LE(initial_backoff_delay.InMicroseconds(),
    176             max_backoff_delay.InMicroseconds());
    177   backoff_policy_.initial_delay_ms = initial_backoff_delay.InMilliseconds();
    178   backoff_policy_.multiply_factor = 2.0;
    179   backoff_policy_.jitter_factor = 0.1;
    180   backoff_policy_.maximum_backoff_ms = max_backoff_delay.InMilliseconds();
    181   backoff_policy_.entry_lifetime_ms = -1;
    182   backoff_policy_.always_use_initial_delay = false;
    183   backoff_entry_.reset(new net::BackoffEntry(&backoff_policy_));
    184   dispatch_closure_ =
    185       base::Bind(&TaskQueue::Dispatch, weak_ptr_factory_.GetWeakPtr());
    186   backoff_timer_.reset(new base::Timer(false, false));
    187 }
    188 
    189 template <typename T>
    190 void TaskQueue<T>::AddToQueue(const T& task) {
    191   DCHECK(CalledOnValidThread());
    192   // Ignore duplicates.
    193   if (tasks_.find(task) == tasks_.end()) {
    194     queue_.push_back(task);
    195     tasks_.insert(task);
    196   }
    197   ScheduleDispatch();
    198 }
    199 
    200 template <typename T>
    201 void TaskQueue<T>::MarkAsSucceeded(const T& task) {
    202   DCHECK(CalledOnValidThread());
    203   FinishTask(task);
    204   // The task succeeded.  Stop any pending timer, reset (clear) the backoff, and
    205   // reschedule a dispatch.
    206   backoff_timer_->Stop();
    207   backoff_entry_->Reset();
    208   ScheduleDispatch();
    209 }
    210 
    211 template <typename T>
    212 void TaskQueue<T>::MarkAsFailed(const T& task) {
    213   DCHECK(CalledOnValidThread());
    214   FinishTask(task);
    215   backoff_entry_->InformOfRequest(false);
    216   ScheduleDispatch();
    217 }
    218 
    219 template <typename T>
    220 void TaskQueue<T>::Cancel(const T& task) {
    221   DCHECK(CalledOnValidThread());
    222   FinishTask(task);
    223   ScheduleDispatch();
    224 }
    225 
    226 template <typename T>
    227 void TaskQueue<T>::ResetBackoff() {
    228   backoff_timer_->Stop();
    229   backoff_entry_->Reset();
    230   ScheduleDispatch();
    231 }
    232 
    233 template <typename T>
    234 void TaskQueue<T>::SetTimerForTest(scoped_ptr<base::Timer> timer) {
    235   DCHECK(CalledOnValidThread());
    236   DCHECK(timer.get());
    237   backoff_timer_ = timer.Pass();
    238 }
    239 
    240 template <typename T>
    241 void TaskQueue<T>::FinishTask(const T& task) {
    242   DCHECK(CalledOnValidThread());
    243   DCHECK_GE(num_in_progress_, 1);
    244   --num_in_progress_;
    245   const size_t num_erased = tasks_.erase(task);
    246   DCHECK_EQ(1U, num_erased);
    247 }
    248 
    249 template <typename T>
    250 void TaskQueue<T>::ScheduleDispatch() {
    251   DCHECK(CalledOnValidThread());
    252   if (backoff_timer_->IsRunning() || !ShouldDispatch()) {
    253     return;
    254   }
    255 
    256   backoff_timer_->Start(
    257       FROM_HERE, backoff_entry_->GetTimeUntilRelease(), dispatch_closure_);
    258 }
    259 
    260 template <typename T>
    261 void TaskQueue<T>::Dispatch() {
    262   DCHECK(CalledOnValidThread());
    263   if (!ShouldDispatch()) {
    264     return;
    265   }
    266 
    267   DCHECK(!queue_.empty());
    268   const T& task = queue_.front();
    269   ++num_in_progress_;
    270   DCHECK_LE(num_in_progress_, kMaxConcurrentTasks);
    271   base::MessageLoop::current()->PostTask(FROM_HERE,
    272                                          base::Bind(process_callback_, task));
    273   queue_.pop_front();
    274 }
    275 
    276 template <typename T>
    277 bool TaskQueue<T>::ShouldDispatch() {
    278   return num_in_progress_ < kMaxConcurrentTasks && !queue_.empty();
    279 }
    280 
    281 }  // namespace syncer
    282 
    283 #endif  //  SYNC_INTERNAL_API_PUBLIC_ATTACHMENTS_TASK_QUEUE_H_
    284