1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #ifndef BASE_OBSERVER_LIST_THREADSAFE_H_ 6 #define BASE_OBSERVER_LIST_THREADSAFE_H_ 7 8 #include <algorithm> 9 #include <map> 10 #include <memory> 11 #include <tuple> 12 13 #include "base/bind.h" 14 #include "base/location.h" 15 #include "base/logging.h" 16 #include "base/macros.h" 17 #include "base/memory/ptr_util.h" 18 #include "base/memory/ref_counted.h" 19 #include "base/observer_list.h" 20 #include "base/single_thread_task_runner.h" 21 #include "base/threading/platform_thread.h" 22 #include "base/threading/thread_task_runner_handle.h" 23 24 /////////////////////////////////////////////////////////////////////////////// 25 // 26 // OVERVIEW: 27 // 28 // A thread-safe container for a list of observers. 29 // This is similar to the observer_list (see observer_list.h), but it 30 // is more robust for multi-threaded situations. 31 // 32 // The following use cases are supported: 33 // * Observers can register for notifications from any thread. 34 // Callbacks to the observer will occur on the same thread where 35 // the observer initially called AddObserver() from. 36 // * Any thread may trigger a notification via Notify(). 37 // * Observers can remove themselves from the observer list inside 38 // of a callback. 39 // * If one thread is notifying observers concurrently with an observer 40 // removing itself from the observer list, the notifications will 41 // be silently dropped. 42 // 43 // The drawback of the threadsafe observer list is that notifications 44 // are not as real-time as the non-threadsafe version of this class. 45 // Notifications will always be done via PostTask() to another thread, 46 // whereas with the non-thread-safe observer_list, notifications happen 47 // synchronously and immediately. 48 // 49 // IMPLEMENTATION NOTES 50 // The ObserverListThreadSafe maintains an ObserverList for each thread 51 // which uses the ThreadSafeObserver. When Notifying the observers, 52 // we simply call PostTask to each registered thread, and then each thread 53 // will notify its regular ObserverList. 54 // 55 /////////////////////////////////////////////////////////////////////////////// 56 57 namespace base { 58 namespace internal { 59 60 template <typename ObserverType, typename Method> 61 struct Dispatcher; 62 63 template <typename ObserverType, typename ReceiverType, typename... Params> 64 struct Dispatcher<ObserverType, void(ReceiverType::*)(Params...)> { 65 static void Run(void(ReceiverType::* m)(Params...), 66 Params... params, ObserverType* obj) { 67 (obj->*m)(std::forward<Params>(params)...); 68 } 69 }; 70 71 } // namespace internal 72 73 template <class ObserverType> 74 class ObserverListThreadSafe 75 : public RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>> { 76 public: 77 using NotificationType = 78 typename ObserverList<ObserverType>::NotificationType; 79 80 ObserverListThreadSafe() 81 : type_(ObserverListBase<ObserverType>::NOTIFY_ALL) {} 82 explicit ObserverListThreadSafe(NotificationType type) : type_(type) {} 83 84 // Add an observer to the list. An observer should not be added to 85 // the same list more than once. 86 void AddObserver(ObserverType* obs) { 87 // If there is no ThreadTaskRunnerHandle, it is impossible to notify on it, 88 // so do not add the observer. 89 if (!ThreadTaskRunnerHandle::IsSet()) 90 return; 91 92 ObserverList<ObserverType>* list = nullptr; 93 PlatformThreadId thread_id = PlatformThread::CurrentId(); 94 { 95 AutoLock lock(list_lock_); 96 if (observer_lists_.find(thread_id) == observer_lists_.end()) { 97 observer_lists_[thread_id] = 98 base::MakeUnique<ObserverListContext>(type_); 99 } 100 list = &(observer_lists_[thread_id]->list); 101 } 102 list->AddObserver(obs); 103 } 104 105 // Remove an observer from the list if it is in the list. 106 // If there are pending notifications in-transit to the observer, they will 107 // be aborted. 108 // If the observer to be removed is in the list, RemoveObserver MUST 109 // be called from the same thread which called AddObserver. 110 void RemoveObserver(ObserverType* obs) { 111 PlatformThreadId thread_id = PlatformThread::CurrentId(); 112 { 113 AutoLock lock(list_lock_); 114 auto it = observer_lists_.find(thread_id); 115 if (it == observer_lists_.end()) { 116 // This will happen if we try to remove an observer on a thread 117 // we never added an observer for. 118 return; 119 } 120 ObserverList<ObserverType>& list = it->second->list; 121 122 list.RemoveObserver(obs); 123 124 // If that was the last observer in the list, remove the ObserverList 125 // entirely. 126 if (list.size() == 0) 127 observer_lists_.erase(it); 128 } 129 } 130 131 // Verifies that the list is currently empty (i.e. there are no observers). 132 void AssertEmpty() const { 133 AutoLock lock(list_lock_); 134 DCHECK(observer_lists_.empty()); 135 } 136 137 // Notify methods. 138 // Make a thread-safe callback to each Observer in the list. 139 // Note, these calls are effectively asynchronous. You cannot assume 140 // that at the completion of the Notify call that all Observers have 141 // been Notified. The notification may still be pending delivery. 142 template <typename Method, typename... Params> 143 void Notify(const tracked_objects::Location& from_here, 144 Method m, Params&&... params) { 145 Callback<void(ObserverType*)> method = 146 Bind(&internal::Dispatcher<ObserverType, Method>::Run, 147 m, std::forward<Params>(params)...); 148 149 AutoLock lock(list_lock_); 150 for (const auto& entry : observer_lists_) { 151 ObserverListContext* context = entry.second.get(); 152 context->task_runner->PostTask( 153 from_here, 154 Bind(&ObserverListThreadSafe<ObserverType>::NotifyWrapper, 155 this, context, method)); 156 } 157 } 158 159 private: 160 friend class RefCountedThreadSafe<ObserverListThreadSafe<ObserverType>>; 161 162 struct ObserverListContext { 163 explicit ObserverListContext(NotificationType type) 164 : task_runner(ThreadTaskRunnerHandle::Get()), list(type) {} 165 166 scoped_refptr<SingleThreadTaskRunner> task_runner; 167 ObserverList<ObserverType> list; 168 169 private: 170 DISALLOW_COPY_AND_ASSIGN(ObserverListContext); 171 }; 172 173 ~ObserverListThreadSafe() { 174 } 175 176 // Wrapper which is called to fire the notifications for each thread's 177 // ObserverList. This function MUST be called on the thread which owns 178 // the unsafe ObserverList. 179 void NotifyWrapper(ObserverListContext* context, 180 const Callback<void(ObserverType*)>& method) { 181 // Check that this list still needs notifications. 182 { 183 AutoLock lock(list_lock_); 184 auto it = observer_lists_.find(PlatformThread::CurrentId()); 185 186 // The ObserverList could have been removed already. In fact, it could 187 // have been removed and then re-added! If the master list's loop 188 // does not match this one, then we do not need to finish this 189 // notification. 190 if (it == observer_lists_.end() || it->second.get() != context) 191 return; 192 } 193 194 for (auto& observer : context->list) { 195 method.Run(&observer); 196 } 197 198 // If there are no more observers on the list, we can now delete it. 199 if (context->list.size() == 0) { 200 { 201 AutoLock lock(list_lock_); 202 // Remove |list| if it's not already removed. 203 // This can happen if multiple observers got removed in a notification. 204 // See http://crbug.com/55725. 205 auto it = observer_lists_.find(PlatformThread::CurrentId()); 206 if (it != observer_lists_.end() && it->second.get() == context) 207 observer_lists_.erase(it); 208 } 209 } 210 } 211 212 mutable Lock list_lock_; // Protects the observer_lists_. 213 214 // Key by PlatformThreadId because in tests, clients can attempt to remove 215 // observers without a SingleThreadTaskRunner. If this were keyed by 216 // SingleThreadTaskRunner, that operation would be silently ignored, leaving 217 // garbage in the ObserverList. 218 std::map<PlatformThreadId, std::unique_ptr<ObserverListContext>> 219 observer_lists_; 220 221 const NotificationType type_; 222 223 DISALLOW_COPY_AND_ASSIGN(ObserverListThreadSafe); 224 }; 225 226 } // namespace base 227 228 #endif // BASE_OBSERVER_LIST_THREADSAFE_H_ 229