Home | History | Annotate | Download | only in base
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_
      6 #define BASE_OBSERVER_LIST_THREADSAFE_H_
      7 
      8 #include <algorithm>
      9 #include <map>
     10 #include <memory>
     11 #include <tuple>
     12 
     13 #include "base/bind.h"
     14 #include "base/location.h"
     15 #include "base/logging.h"
     16 #include "base/macros.h"
     17 #include "base/memory/ptr_util.h"
     18 #include "base/memory/ref_counted.h"
     19 #include "base/observer_list.h"
     20 #include "base/single_thread_task_runner.h"
     21 #include "base/threading/platform_thread.h"
     22 #include "base/threading/thread_task_runner_handle.h"
     23 
     24 ///////////////////////////////////////////////////////////////////////////////
     25 //
     26 // OVERVIEW:
     27 //
     28 //   A thread-safe container for a list of observers.
     29 //   This is similar to the observer_list (see observer_list.h), but it
     30 //   is more robust for multi-threaded situations.
     31 //
     32 //   The following use cases are supported:
     33 //    * Observers can register for notifications from any thread.
     34 //      Callbacks to the observer will occur on the same thread where
     35 //      the observer initially called AddObserver() from.
     36 //    * Any thread may trigger a notification via Notify().
     37 //    * Observers can remove themselves from the observer list inside
     38 //      of a callback.
     39 //    * If one thread is notifying observers concurrently with an observer
     40 //      removing itself from the observer list, the notifications will
     41 //      be silently dropped.
     42 //
     43 //   The drawback of the threadsafe observer list is that notifications
     44 //   are not as real-time as the non-threadsafe version of this class.
     45 //   Notifications will always be done via PostTask() to another thread,
     46 //   whereas with the non-thread-safe observer_list, notifications happen
     47 //   synchronously and immediately.
     48 //
     49 //   IMPLEMENTATION NOTES
     50 //   The ObserverListThreadSafe maintains an ObserverList for each thread
     51 //   which uses the ThreadSafeObserver.  When Notifying the observers,
     52 //   we simply call PostTask to each registered thread, and then each thread
     53 //   will notify its regular ObserverList.
     54 //
     55 ///////////////////////////////////////////////////////////////////////////////
     56 
     57 namespace base {
     58 namespace internal {
     59 
     60 template <typename ObserverType, typename Method>
     61 struct Dispatcher;
     62 
     63 template <typename ObserverType, typename ReceiverType, typename... Params>
     64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> {
     65   static void Run(void(ReceiverType::* m)(Params...),
     66                   Params... params, ObserverType* obj) {
     67     (obj->*m)(std::forward<Params>(params)...);
     68   }
     69 };
     70 
     71 }  // namespace internal
     72 
     73 template <class ObserverType>
     74 class ObserverListThreadSafe
     75     : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> {
     76  public:
     77   using NotificationType =
     78       typename ObserverList<ObserverType>::NotificationType;
     79 
     80   ObserverListThreadSafe()
     81       : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {}
     82   explicit ObserverListThreadSafe(NotificationType type) : type_(type) {}
     83 
     84   // Add an observer to the list.  An observer should not be added to
     85   // the same list more than once.
     86   void AddObserver(ObserverType* obs) {
     87     // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it,
     88     // so do not add the observer.
     89     if (!ThreadTaskRunnerHandle::IsSet())
     90       return;
     91 
     92     ObserverList<ObserverType>* list = nullptr;
     93     PlatformThreadId thread_id = PlatformThread::CurrentId();
     94     {
     95       AutoLock lock(list_lock_);
     96       if (observer_lists_.find(thread_id) == observer_lists_.end()) {
     97         observer_lists_[thread_id] =
     98             base::MakeUnique<ObserverListContext>(type_);
     99       }
    100       list = &(observer_lists_[thread_id]->list);
    101     }
    102     list->AddObserver(obs);
    103   }
    104 
    105   // Remove an observer from the list if it is in the list.
    106   // If there are pending notifications in-transit to the observer, they will
    107   // be aborted.
    108   // If the observer to be removed is in the list, RemoveObserver MUST
    109   // be called from the same thread which called AddObserver.
    110   void RemoveObserver(ObserverType* obs) {
    111     PlatformThreadId thread_id = PlatformThread::CurrentId();
    112     {
    113       AutoLock lock(list_lock_);
    114       auto it = observer_lists_.find(thread_id);
    115       if (it == observer_lists_.end()) {
    116         // This will happen if we try to remove an observer on a thread
    117         // we never added an observer for.
    118         return;
    119       }
    120       ObserverList<ObserverType>& list = it->second->list;
    121 
    122       list.RemoveObserver(obs);
    123 
    124       // If that was the last observer in the list, remove the ObserverList
    125       // entirely.
    126       if (list.size() == 0)
    127         observer_lists_.erase(it);
    128     }
    129   }
    130 
    131   // Verifies that the list is currently empty (i.e. there are no observers).
    132   void AssertEmpty() const {
    133     AutoLock lock(list_lock_);
    134     DCHECK(observer_lists_.empty());
    135   }
    136 
    137   // Notify methods.
    138   // Make a thread-safe callback to each Observer in the list.
    139   // Note, these calls are effectively asynchronous.  You cannot assume
    140   // that at the completion of the Notify call that all Observers have
    141   // been Notified.  The notification may still be pending delivery.
    142   template <typename Method, typename... Params>
    143   void Notify(const tracked_objects::Location& from_here,
    144               Method m, Params&&... params) {
    145     Callback<void(ObserverType*)> method =
    146         Bind(&internal::Dispatcher<ObserverType, Method>::Run,
    147              m, std::forward<Params>(params)...);
    148 
    149     AutoLock lock(list_lock_);
    150     for (const auto& entry : observer_lists_) {
    151       ObserverListContext* context = entry.second.get();
    152       context->task_runner->PostTask(
    153           from_here,
    154           Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper,
    155                this, context, method));
    156     }
    157   }
    158 
    159  private:
    160   friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>;
    161 
    162   struct ObserverListContext {
    163     explicit ObserverListContext(NotificationType type)
    164         : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {}
    165 
    166     scoped_refptr<SingleThreadTaskRunner> task_runner;
    167     ObserverList<ObserverType> list;
    168 
    169    private:
    170     DISALLOW_COPY_AND_ASSIGN(ObserverListContext);
    171   };
    172 
    173   ~ObserverListThreadSafe() {
    174   }
    175 
    176   // Wrapper which is called to fire the notifications for each thread's
    177   // ObserverList.  This function MUST be called on the thread which owns
    178   // the unsafe ObserverList.
    179   void NotifyWrapper(ObserverListContext* context,
    180                      const Callback<void(ObserverType*)>& method) {
    181     // Check that this list still needs notifications.
    182     {
    183       AutoLock lock(list_lock_);
    184       auto it = observer_lists_.find(PlatformThread::CurrentId());
    185 
    186       // The ObserverList could have been removed already.  In fact, it could
    187       // have been removed and then re-added!  If the master list's loop
    188       // does not match this one, then we do not need to finish this
    189       // notification.
    190       if (it == observer_lists_.end() || it->second.get() != context)
    191         return;
    192     }
    193 
    194     for (auto& observer : context->list) {
    195       method.Run(&observer);
    196     }
    197 
    198     // If there are no more observers on the list, we can now delete it.
    199     if (context->list.size() == 0) {
    200       {
    201         AutoLock lock(list_lock_);
    202         // Remove |list| if it's not already removed.
    203         // This can happen if multiple observers got removed in a notification.
    204         // See http://crbug.com/55725.
    205         auto it = observer_lists_.find(PlatformThread::CurrentId());
    206         if (it != observer_lists_.end() && it->second.get() == context)
    207           observer_lists_.erase(it);
    208       }
    209     }
    210   }
    211 
    212   mutable Lock list_lock_;  // Protects the observer_lists_.
    213 
    214   // Key by PlatformThreadId because in tests, clients can attempt to remove
    215   // observers without a SingleThreadTaskRunner. If this were keyed by
    216   // SingleThreadTaskRunner, that operation would be silently ignored, leaving
    217   // garbage in the ObserverList.
    218   std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>>
    219       observer_lists_;
    220 
    221   const NotificationType type_;
    222 
    223   DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe);
    224 };
    225 
    226 }  // namespace base
    227 
    228 #endif  // BASE_OBSERVER_LIST_THREADSAFE_H_
    229