Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
      6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
      7 
      8 #include <algorithm>
      9 #include <map>
     10 #include <tuple>
     11 
     12 #include "base/bind.h"
     13 #include "base/location.h"
     14 #include "base/logging.h"
     15 #include "base/macros.h"
     16 #include "base/memory/ref_counted.h"
     17 #include "base/message_loop/message_loop.h"
     18 #include "base/observer_list.h"
     19 #include "base/single_thread_task_runner.h"
     20 #include "base/stl_util.h"
     21 #include "base/threading/platform_thread.h"
     22 #include "base/threading/thread_task_runner_handle.h"
     23 
     24 ///////////////////////////////////////////////////////////////////////////////
     25 //
     26 // OVERVIEW:
     27 //
     28 //   A thread-safe container for a list of observers.
     29 //   This is similar to the observer_list (see observer_list.h), but it
     30 //   is more robust for multi-threaded situations.
     31 //
     32 //   The following use cases are supported:
     33 //    * Observers can register for notifications from any thread.
     34 //      Callbacks to the observer will occur on the same thread where
     35 //      the observer initially called AddObserver() from.
     36 //    * Any thread may trigger a notification via Notify().
     37 //    * Observers can remove themselves from the observer list inside
     38 //      of a callback.
     39 //    * If one thread is notifying observers concurrently with an observer
     40 //      removing itself from the observer list, the notifications will
     41 //      be silently dropped.
     42 //
     43 //   The drawback of the threadsafe observer list is that notifications
     44 //   are not as real-time as the non-threadsafe version of this class.
     45 //   Notifications will always be done via PostTask() to another thread,
     46 //   whereas with the non-thread-safe observer_list, notifications happen
     47 //   synchronously and immediately.
     48 //
     49 //   IMPLEMENTATION NOTES
     50 //   The ObserverListThreadSafe maintains an ObserverList for each thread
     51 //   which uses the ThreadSafeObserver.  When Notifying the observers,
     52 //   we simply call PostTask to each registered thread, and then each thread
     53 //   will notify its regular ObserverList.
     54 //
     55 ///////////////////////////////////////////////////////////////////////////////
     56 
     57 namespace base {
     58 
     59 // Forward declaration for ObserverListThreadSafeTraits.
     60 template <class ObserverType>
     61 class ObserverListThreadSafe;
     62 
     63 namespace internal {
     64 
     65 // An UnboundMethod is a wrapper for a method where the actual object is
     66 // provided at Run dispatch time.
     67 template <class T, class Method, class Params>
     68 class UnboundMethod {
     69  public:
     70   UnboundMethod(Method m, const Params& p) : m_(m), p_(p) {
     71     static_assert((internal::ParamsUseScopedRefptrCorrectly<Params>::value),
     72                   "bad unbound method params");
     73   }
     74   void Run(T* obj) const {
     75     DispatchToMethod(obj, m_, p_);
     76   }
     77  private:
     78   Method m_;
     79   Params p_;
     80 };
     81 
     82 }  // namespace internal
     83 
     84 // This class is used to work around VS2005 not accepting:
     85 //
     86 // friend class
     87 //     base::RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
     88 //
     89 // Instead of friending the class, we could friend the actual function
     90 // which calls delete.  However, this ends up being
     91 // RefCountedThreadSafe::DeleteInternal(), which is private.  So we
     92 // define our own templated traits class so we can friend it.
     93 template <class T>
     94 struct ObserverListThreadSafeTraits {
     95   static void Destruct(const ObserverListThreadSafe<T>* x) {
     96     delete x;
     97   }
     98 };
     99 
    100 template <class ObserverType>
    101 class ObserverListThreadSafe
    102     : public RefCountedThreadSafe<
    103         ObserverListThreadSafe<ObserverType>,
    104         ObserverListThreadSafeTraits<ObserverType>> {
    105  public:
    106   typedef typename ObserverList<ObserverType>::NotificationType
    107       NotificationType;
    108 
    109   ObserverListThreadSafe()
    110       : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
    111   explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
    112 
    113   // Add an observer to the list.  An observer should not be added to
    114   // the same list more than once.
    115   void AddObserver(ObserverType* obs) {
    116     // If there is not a current MessageLoop, it is impossible to notify on it,
    117     // so do not add the observer.
    118     if (!MessageLoop::current())
    119       return;
    120 
    121     ObserverList<ObserverType>* list = nullptr;
    122     PlatformThreadId thread_id = PlatformThread::CurrentId();
    123     {
    124       AutoLock lock(list_lock_);
    125       if (observer_lists_.find(thread_id) == observer_lists_.end())
    126         observer_lists_[thread_id] = new ObserverListContext(type_);
    127       list = &(observer_lists_[thread_id]->list);
    128     }
    129     list->AddObserver(obs);
    130   }
    131 
    132   // Remove an observer from the list if it is in the list.
    133   // If there are pending notifications in-transit to the observer, they will
    134   // be aborted.
    135   // If the observer to be removed is in the list, RemoveObserver MUST
    136   // be called from the same thread which called AddObserver.
    137   void RemoveObserver(ObserverType* obs) {
    138     ObserverListContext* context = nullptr;
    139     ObserverList<ObserverType>* list = nullptr;
    140     PlatformThreadId thread_id = PlatformThread::CurrentId();
    141     {
    142       AutoLock lock(list_lock_);
    143       typename ObserversListMap::iterator it = observer_lists_.find(thread_id);
    144       if (it == observer_lists_.end()) {
    145         // This will happen if we try to remove an observer on a thread
    146         // we never added an observer for.
    147         return;
    148       }
    149       context = it->second;
    150       list = &context->list;
    151 
    152       // If we're about to remove the last observer from the list,
    153       // then we can remove this observer_list entirely.
    154       if (list->HasObserver(obs) && list->size() == 1)
    155         observer_lists_.erase(it);
    156     }
    157     list->RemoveObserver(obs);
    158 
    159     // If RemoveObserver is called from a notification, the size will be
    160     // nonzero.  Instead of deleting here, the NotifyWrapper will delete
    161     // when it finishes iterating.
    162     if (list->size() == 0)
    163       delete context;
    164   }
    165 
    166   // Verifies that the list is currently empty (i.e. there are no observers).
    167   void AssertEmpty() const {
    168     AutoLock lock(list_lock_);
    169     DCHECK(observer_lists_.empty());
    170   }
    171 
    172   // Notify methods.
    173   // Make a thread-safe callback to each Observer in the list.
    174   // Note, these calls are effectively asynchronous.  You cannot assume
    175   // that at the completion of the Notify call that all Observers have
    176   // been Notified.  The notification may still be pending delivery.
    177   template <class Method, class... Params>
    178   void Notify(const tracked_objects::Location& from_here,
    179               Method m,
    180               const Params&... params) {
    181     internal::UnboundMethod<ObserverType, Method, std::tuple<Params...>> method(
    182         m, std::make_tuple(params...));
    183 
    184     AutoLock lock(list_lock_);
    185     for (const auto& entry : observer_lists_) {
    186       ObserverListContext* context = entry.second;
    187       context->task_runner->PostTask(
    188           from_here,
    189           Bind(&ObserverListThreadSafe<ObserverType>::template NotifyWrapper<
    190                    Method, std::tuple<Params...>>,
    191                this, context, method));
    192     }
    193   }
    194 
    195  private:
    196   // See comment above ObserverListThreadSafeTraits' definition.
    197   friend struct ObserverListThreadSafeTraits<ObserverType>;
    198 
    199   struct ObserverListContext {
    200     explicit ObserverListContext(NotificationType type)
    201         : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
    202 
    203     scoped_refptr<SingleThreadTaskRunner> task_runner;
    204     ObserverList<ObserverType> list;
    205 
    206    private:
    207     DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
    208   };
    209 
    210   ~ObserverListThreadSafe() {
    211     STLDeleteValues(&observer_lists_);
    212   }
    213 
    214   // Wrapper which is called to fire the notifications for each thread's
    215   // ObserverList.  This function MUST be called on the thread which owns
    216   // the unsafe ObserverList.
    217   template <class Method, class Params>
    218   void NotifyWrapper(
    219       ObserverListContext* context,
    220       const internal::UnboundMethod<ObserverType, Method, Params>& method) {
    221     // Check that this list still needs notifications.
    222     {
    223       AutoLock lock(list_lock_);
    224       typename ObserversListMap::iterator it =
    225           observer_lists_.find(PlatformThread::CurrentId());
    226 
    227       // The ObserverList could have been removed already.  In fact, it could
    228       // have been removed and then re-added!  If the master list's loop
    229       // does not match this one, then we do not need to finish this
    230       // notification.
    231       if (it == observer_lists_.end() || it->second != context)
    232         return;
    233     }
    234 
    235     {
    236       typename ObserverList<ObserverType>::Iterator it(&context->list);
    237       ObserverType* obs;
    238       while ((obs = it.GetNext()) != nullptr)
    239         method.Run(obs);
    240     }
    241 
    242     // If there are no more observers on the list, we can now delete it.
    243     if (context->list.size() == 0) {
    244       {
    245         AutoLock lock(list_lock_);
    246         // Remove |list| if it's not already removed.
    247         // This can happen if multiple observers got removed in a notification.
    248         // See http://crbug.com/55725.
    249         typename ObserversListMap::iterator it =
    250             observer_lists_.find(PlatformThread::CurrentId());
    251         if (it != observer_lists_.end() && it->second == context)
    252           observer_lists_.erase(it);
    253       }
    254       delete context;
    255     }
    256   }
    257 
    258   // Key by PlatformThreadId because in tests, clients can attempt to remove
    259   // observers without a MessageLoop. If this were keyed by MessageLoop, that
    260   // operation would be silently ignored, leaving garbage in the ObserverList.
    261   typedef std::map<PlatformThreadId, ObserverListContext*>
    262       ObserversListMap;
    263 
    264   mutable Lock list_lock_;  // Protects the observer_lists_.
    265   ObserversListMap observer_lists_;
    266   const NotificationType type_;
    267 
    268   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
    269 };
    270 
    271 }  // namespace base
    272 
    273 #endif  // BASE_OBSERVER_LIST_THREADSAFE_H_
    274