1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef WEBRTC_BASE_THREAD_H_ 12 #define WEBRTC_BASE_THREAD_H_ 13 14 #include <algorithm> 15 #include <list> 16 #include <string> 17 #include <vector> 18 19 #if defined(WEBRTC_POSIX) 20 #include <pthread.h> 21 #endif 22 #include "webrtc/base/constructormagic.h" 23 #include "webrtc/base/event.h" 24 #include "webrtc/base/messagequeue.h" 25 26 #if defined(WEBRTC_WIN) 27 #include "webrtc/base/win32.h" 28 #endif 29 30 namespace rtc { 31 32 class Thread; 33 34 class ThreadManager { 35 public: 36 static const int kForever = -1; 37 38 ThreadManager(); 39 ~ThreadManager(); 40 41 static ThreadManager* Instance(); 42 43 Thread* CurrentThread(); 44 void SetCurrentThread(Thread* thread); 45 46 // Returns a thread object with its thread_ ivar set 47 // to whatever the OS uses to represent the thread. 48 // If there already *is* a Thread object corresponding to this thread, 49 // this method will return that. Otherwise it creates a new Thread 50 // object whose wrapped() method will return true, and whose 51 // handle will, on Win32, be opened with only synchronization privileges - 52 // if you need more privilegs, rather than changing this method, please 53 // write additional code to adjust the privileges, or call a different 54 // factory method of your own devising, because this one gets used in 55 // unexpected contexts (like inside browser plugins) and it would be a 56 // shame to break it. It is also conceivable on Win32 that we won't even 57 // be able to get synchronization privileges, in which case the result 58 // will have a NULL handle. 59 Thread *WrapCurrentThread(); 60 void UnwrapCurrentThread(); 61 62 private: 63 #if defined(WEBRTC_POSIX) 64 pthread_key_t key_; 65 #endif 66 67 #if defined(WEBRTC_WIN) 68 DWORD key_; 69 #endif 70 71 RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager); 72 }; 73 74 struct _SendMessage { 75 _SendMessage() {} 76 Thread *thread; 77 Message msg; 78 bool *ready; 79 }; 80 81 class Runnable { 82 public: 83 virtual ~Runnable() {} 84 virtual void Run(Thread* thread) = 0; 85 86 protected: 87 Runnable() {} 88 89 private: 90 RTC_DISALLOW_COPY_AND_ASSIGN(Runnable); 91 }; 92 93 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). 94 95 class Thread : public MessageQueue { 96 public: 97 explicit Thread(SocketServer* ss = NULL); 98 // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or 99 // guarantee Stop() is explicitly called before the subclass is destroyed). 100 // This is required to avoid a data race between the destructor modifying the 101 // vtable, and the Thread::PreRun calling the virtual method Run(). 102 ~Thread() override; 103 104 static Thread* Current(); 105 106 // Used to catch performance regressions. Use this to disallow blocking calls 107 // (Invoke) for a given scope. If a synchronous call is made while this is in 108 // effect, an assert will be triggered. 109 // Note that this is a single threaded class. 110 class ScopedDisallowBlockingCalls { 111 public: 112 ScopedDisallowBlockingCalls(); 113 ~ScopedDisallowBlockingCalls(); 114 private: 115 Thread* const thread_; 116 const bool previous_state_; 117 }; 118 119 bool IsCurrent() const { 120 return Current() == this; 121 } 122 123 // Sleeps the calling thread for the specified number of milliseconds, during 124 // which time no processing is performed. Returns false if sleeping was 125 // interrupted by a signal (POSIX only). 126 static bool SleepMs(int millis); 127 128 // Sets the thread's name, for debugging. Must be called before Start(). 129 // If |obj| is non-NULL, its value is appended to |name|. 130 const std::string& name() const { return name_; } 131 bool SetName(const std::string& name, const void* obj); 132 133 // Starts the execution of the thread. 134 bool Start(Runnable* runnable = NULL); 135 136 // Tells the thread to stop and waits until it is joined. 137 // Never call Stop on the current thread. Instead use the inherited Quit 138 // function which will exit the base MessageQueue without terminating the 139 // underlying OS thread. 140 virtual void Stop(); 141 142 // By default, Thread::Run() calls ProcessMessages(kForever). To do other 143 // work, override Run(). To receive and dispatch messages, call 144 // ProcessMessages occasionally. 145 virtual void Run(); 146 147 virtual void Send(MessageHandler* phandler, 148 uint32_t id = 0, 149 MessageData* pdata = NULL); 150 151 // Convenience method to invoke a functor on another thread. Caller must 152 // provide the |ReturnT| template argument, which cannot (easily) be deduced. 153 // Uses Send() internally, which blocks the current thread until execution 154 // is complete. 155 // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool); 156 // NOTE: This function can only be called when synchronous calls are allowed. 157 // See ScopedDisallowBlockingCalls for details. 158 template <class ReturnT, class FunctorT> 159 ReturnT Invoke(const FunctorT& functor) { 160 InvokeBegin(); 161 FunctorMessageHandler<ReturnT, FunctorT> handler(functor); 162 Send(&handler); 163 InvokeEnd(); 164 return handler.result(); 165 } 166 167 // From MessageQueue 168 void Clear(MessageHandler* phandler, 169 uint32_t id = MQID_ANY, 170 MessageList* removed = NULL) override; 171 void ReceiveSends() override; 172 173 // ProcessMessages will process I/O and dispatch messages until: 174 // 1) cms milliseconds have elapsed (returns true) 175 // 2) Stop() is called (returns false) 176 bool ProcessMessages(int cms); 177 178 // Returns true if this is a thread that we created using the standard 179 // constructor, false if it was created by a call to 180 // ThreadManager::WrapCurrentThread(). The main thread of an application 181 // is generally not owned, since the OS representation of the thread 182 // obviously exists before we can get to it. 183 // You cannot call Start on non-owned threads. 184 bool IsOwned(); 185 186 #if defined(WEBRTC_WIN) 187 HANDLE GetHandle() const { 188 return thread_; 189 } 190 DWORD GetId() const { 191 return thread_id_; 192 } 193 #elif defined(WEBRTC_POSIX) 194 pthread_t GetPThread() { 195 return thread_; 196 } 197 #endif 198 199 // Expose private method running() for tests. 200 // 201 // DANGER: this is a terrible public API. Most callers that might want to 202 // call this likely do not have enough control/knowledge of the Thread in 203 // question to guarantee that the returned value remains true for the duration 204 // of whatever code is conditionally executing because of the return value! 205 bool RunningForTest() { return running(); } 206 207 // Sets the per-thread allow-blocking-calls flag and returns the previous 208 // value. Must be called on this thread. 209 bool SetAllowBlockingCalls(bool allow); 210 211 // These functions are public to avoid injecting test hooks. Don't call them 212 // outside of tests. 213 // This method should be called when thread is created using non standard 214 // method, like derived implementation of rtc::Thread and it can not be 215 // started by calling Start(). This will set started flag to true and 216 // owned to false. This must be called from the current thread. 217 bool WrapCurrent(); 218 void UnwrapCurrent(); 219 220 protected: 221 // Same as WrapCurrent except that it never fails as it does not try to 222 // acquire the synchronization access of the thread. The caller should never 223 // call Stop() or Join() on this thread. 224 void SafeWrapCurrent(); 225 226 // Blocks the calling thread until this thread has terminated. 227 void Join(); 228 229 static void AssertBlockingIsAllowedOnCurrentThread(); 230 231 friend class ScopedDisallowBlockingCalls; 232 233 private: 234 static void *PreRun(void *pv); 235 236 // ThreadManager calls this instead WrapCurrent() because 237 // ThreadManager::Instance() cannot be used while ThreadManager is 238 // being created. 239 // The method tries to get synchronization rights of the thread on Windows if 240 // |need_synchronize_access| is true. 241 bool WrapCurrentWithThreadManager(ThreadManager* thread_manager, 242 bool need_synchronize_access); 243 244 // Return true if the thread was started and hasn't yet stopped. 245 bool running() { return running_.Wait(0); } 246 247 // Processes received "Send" requests. If |source| is not NULL, only requests 248 // from |source| are processed, otherwise, all requests are processed. 249 void ReceiveSendsFromThread(const Thread* source); 250 251 // If |source| is not NULL, pops the first "Send" message from |source| in 252 // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. 253 // The caller must lock |crit_| before calling. 254 // Returns true if there is such a message. 255 bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); 256 257 // Used for tracking performance of Invoke calls. 258 void InvokeBegin(); 259 void InvokeEnd(); 260 261 std::list<_SendMessage> sendlist_; 262 std::string name_; 263 Event running_; // Signalled means running. 264 265 #if defined(WEBRTC_POSIX) 266 pthread_t thread_; 267 #endif 268 269 #if defined(WEBRTC_WIN) 270 HANDLE thread_; 271 DWORD thread_id_; 272 #endif 273 274 bool owned_; 275 bool blocking_calls_allowed_; // By default set to |true|. 276 277 friend class ThreadManager; 278 279 RTC_DISALLOW_COPY_AND_ASSIGN(Thread); 280 }; 281 282 // AutoThread automatically installs itself at construction 283 // uninstalls at destruction, if a Thread object is 284 // _not already_ associated with the current OS thread. 285 286 class AutoThread : public Thread { 287 public: 288 explicit AutoThread(SocketServer* ss = 0); 289 ~AutoThread() override; 290 291 private: 292 RTC_DISALLOW_COPY_AND_ASSIGN(AutoThread); 293 }; 294 295 // Win32 extension for threads that need to use COM 296 #if defined(WEBRTC_WIN) 297 class ComThread : public Thread { 298 public: 299 ComThread() {} 300 virtual ~ComThread() { Stop(); } 301 302 protected: 303 virtual void Run(); 304 305 private: 306 RTC_DISALLOW_COPY_AND_ASSIGN(ComThread); 307 }; 308 #endif 309 310 // Provides an easy way to install/uninstall a socketserver on a thread. 311 class SocketServerScope { 312 public: 313 explicit SocketServerScope(SocketServer* ss) { 314 old_ss_ = Thread::Current()->socketserver(); 315 Thread::Current()->set_socketserver(ss); 316 } 317 ~SocketServerScope() { 318 Thread::Current()->set_socketserver(old_ss_); 319 } 320 321 private: 322 SocketServer* old_ss_; 323 324 RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope); 325 }; 326 327 } // namespace rtc 328 329 #endif // WEBRTC_BASE_THREAD_H_ 330