1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef WEBRTC_BASE_THREAD_H_ 12 #define WEBRTC_BASE_THREAD_H_ 13 14 #include <algorithm> 15 #include <list> 16 #include <string> 17 #include <vector> 18 19 #if defined(WEBRTC_POSIX) 20 #include <pthread.h> 21 #endif 22 #include "webrtc/base/constructormagic.h" 23 #include "webrtc/base/event.h" 24 #include "webrtc/base/messagequeue.h" 25 26 #if defined(WEBRTC_WIN) 27 #include "webrtc/base/win32.h" 28 #endif 29 30 namespace rtc { 31 32 class Thread; 33 34 class ThreadManager { 35 public: 36 ThreadManager(); 37 ~ThreadManager(); 38 39 static ThreadManager* Instance(); 40 41 Thread* CurrentThread(); 42 void SetCurrentThread(Thread* thread); 43 44 // Returns a thread object with its thread_ ivar set 45 // to whatever the OS uses to represent the thread. 46 // If there already *is* a Thread object corresponding to this thread, 47 // this method will return that. Otherwise it creates a new Thread 48 // object whose wrapped() method will return true, and whose 49 // handle will, on Win32, be opened with only synchronization privileges - 50 // if you need more privilegs, rather than changing this method, please 51 // write additional code to adjust the privileges, or call a different 52 // factory method of your own devising, because this one gets used in 53 // unexpected contexts (like inside browser plugins) and it would be a 54 // shame to break it. It is also conceivable on Win32 that we won't even 55 // be able to get synchronization privileges, in which case the result 56 // will have a NULL handle. 57 Thread *WrapCurrentThread(); 58 void UnwrapCurrentThread(); 59 60 private: 61 #if defined(WEBRTC_POSIX) 62 pthread_key_t key_; 63 #endif 64 65 #if defined(WEBRTC_WIN) 66 DWORD key_; 67 #endif 68 69 DISALLOW_COPY_AND_ASSIGN(ThreadManager); 70 }; 71 72 struct _SendMessage { 73 _SendMessage() {} 74 Thread *thread; 75 Message msg; 76 bool *ready; 77 }; 78 79 enum ThreadPriority { 80 PRIORITY_IDLE = -1, 81 PRIORITY_NORMAL = 0, 82 PRIORITY_ABOVE_NORMAL = 1, 83 PRIORITY_HIGH = 2, 84 }; 85 86 class Runnable { 87 public: 88 virtual ~Runnable() {} 89 virtual void Run(Thread* thread) = 0; 90 91 protected: 92 Runnable() {} 93 94 private: 95 DISALLOW_COPY_AND_ASSIGN(Runnable); 96 }; 97 98 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS! See ~Thread(). 99 100 class Thread : public MessageQueue { 101 public: 102 explicit Thread(SocketServer* ss = NULL); 103 // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or 104 // guarantee Stop() is explicitly called before the subclass is destroyed). 105 // This is required to avoid a data race between the destructor modifying the 106 // vtable, and the Thread::PreRun calling the virtual method Run(). 107 virtual ~Thread(); 108 109 static Thread* Current(); 110 111 // Used to catch performance regressions. Use this to disallow blocking calls 112 // (Invoke) for a given scope. If a synchronous call is made while this is in 113 // effect, an assert will be triggered. 114 // Note that this is a single threaded class. 115 class ScopedDisallowBlockingCalls { 116 public: 117 ScopedDisallowBlockingCalls(); 118 ~ScopedDisallowBlockingCalls(); 119 private: 120 Thread* const thread_; 121 const bool previous_state_; 122 }; 123 124 bool IsCurrent() const { 125 return Current() == this; 126 } 127 128 // Sleeps the calling thread for the specified number of milliseconds, during 129 // which time no processing is performed. Returns false if sleeping was 130 // interrupted by a signal (POSIX only). 131 static bool SleepMs(int millis); 132 133 // Sets the thread's name, for debugging. Must be called before Start(). 134 // If |obj| is non-NULL, its value is appended to |name|. 135 const std::string& name() const { return name_; } 136 bool SetName(const std::string& name, const void* obj); 137 138 // Sets the thread's priority. Must be called before Start(). 139 ThreadPriority priority() const { return priority_; } 140 bool SetPriority(ThreadPriority priority); 141 142 // Starts the execution of the thread. 143 bool Start(Runnable* runnable = NULL); 144 145 // Tells the thread to stop and waits until it is joined. 146 // Never call Stop on the current thread. Instead use the inherited Quit 147 // function which will exit the base MessageQueue without terminating the 148 // underlying OS thread. 149 virtual void Stop(); 150 151 // By default, Thread::Run() calls ProcessMessages(kForever). To do other 152 // work, override Run(). To receive and dispatch messages, call 153 // ProcessMessages occasionally. 154 virtual void Run(); 155 156 virtual void Send(MessageHandler *phandler, uint32 id = 0, 157 MessageData *pdata = NULL); 158 159 // Convenience method to invoke a functor on another thread. Caller must 160 // provide the |ReturnT| template argument, which cannot (easily) be deduced. 161 // Uses Send() internally, which blocks the current thread until execution 162 // is complete. 163 // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool); 164 // NOTE: This function can only be called when synchronous calls are allowed. 165 // See ScopedDisallowBlockingCalls for details. 166 template <class ReturnT, class FunctorT> 167 ReturnT Invoke(const FunctorT& functor) { 168 FunctorMessageHandler<ReturnT, FunctorT> handler(functor); 169 Send(&handler); 170 return handler.result(); 171 } 172 173 // From MessageQueue 174 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 175 MessageList* removed = NULL); 176 virtual void ReceiveSends(); 177 178 // ProcessMessages will process I/O and dispatch messages until: 179 // 1) cms milliseconds have elapsed (returns true) 180 // 2) Stop() is called (returns false) 181 bool ProcessMessages(int cms); 182 183 // Returns true if this is a thread that we created using the standard 184 // constructor, false if it was created by a call to 185 // ThreadManager::WrapCurrentThread(). The main thread of an application 186 // is generally not owned, since the OS representation of the thread 187 // obviously exists before we can get to it. 188 // You cannot call Start on non-owned threads. 189 bool IsOwned(); 190 191 #if defined(WEBRTC_WIN) 192 HANDLE GetHandle() const { 193 return thread_; 194 } 195 DWORD GetId() const { 196 return thread_id_; 197 } 198 #elif defined(WEBRTC_POSIX) 199 pthread_t GetPThread() { 200 return thread_; 201 } 202 #endif 203 204 // Expose private method running() for tests. 205 // 206 // DANGER: this is a terrible public API. Most callers that might want to 207 // call this likely do not have enough control/knowledge of the Thread in 208 // question to guarantee that the returned value remains true for the duration 209 // of whatever code is conditionally executing because of the return value! 210 bool RunningForTest() { return running(); } 211 212 // Sets the per-thread allow-blocking-calls flag and returns the previous 213 // value. 214 bool SetAllowBlockingCalls(bool allow); 215 216 protected: 217 // This method should be called when thread is created using non standard 218 // method, like derived implementation of rtc::Thread and it can not be 219 // started by calling Start(). This will set started flag to true and 220 // owned to false. This must be called from the current thread. 221 bool WrapCurrent(); 222 void UnwrapCurrent(); 223 224 // Same as WrapCurrent except that it never fails as it does not try to 225 // acquire the synchronization access of the thread. The caller should never 226 // call Stop() or Join() on this thread. 227 void SafeWrapCurrent(); 228 229 // Blocks the calling thread until this thread has terminated. 230 void Join(); 231 232 static void AssertBlockingIsAllowedOnCurrentThread(); 233 234 friend class ScopedDisallowBlockingCalls; 235 236 private: 237 static void *PreRun(void *pv); 238 239 // ThreadManager calls this instead WrapCurrent() because 240 // ThreadManager::Instance() cannot be used while ThreadManager is 241 // being created. 242 // The method tries to get synchronization rights of the thread on Windows if 243 // |need_synchronize_access| is true. 244 bool WrapCurrentWithThreadManager(ThreadManager* thread_manager, 245 bool need_synchronize_access); 246 247 // Return true if the thread was started and hasn't yet stopped. 248 bool running() { return running_.Wait(0); } 249 250 // Processes received "Send" requests. If |source| is not NULL, only requests 251 // from |source| are processed, otherwise, all requests are processed. 252 void ReceiveSendsFromThread(const Thread* source); 253 254 // If |source| is not NULL, pops the first "Send" message from |source| in 255 // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|. 256 // The caller must lock |crit_| before calling. 257 // Returns true if there is such a message. 258 bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg); 259 260 std::list<_SendMessage> sendlist_; 261 std::string name_; 262 ThreadPriority priority_; 263 Event running_; // Signalled means running. 264 265 #if defined(WEBRTC_POSIX) 266 pthread_t thread_; 267 #endif 268 269 #if defined(WEBRTC_WIN) 270 HANDLE thread_; 271 DWORD thread_id_; 272 #endif 273 274 bool owned_; 275 bool blocking_calls_allowed_; // By default set to |true|. 276 277 friend class ThreadManager; 278 279 DISALLOW_COPY_AND_ASSIGN(Thread); 280 }; 281 282 // AutoThread automatically installs itself at construction 283 // uninstalls at destruction, if a Thread object is 284 // _not already_ associated with the current OS thread. 285 286 class AutoThread : public Thread { 287 public: 288 explicit AutoThread(SocketServer* ss = 0); 289 virtual ~AutoThread(); 290 291 private: 292 DISALLOW_COPY_AND_ASSIGN(AutoThread); 293 }; 294 295 // Win32 extension for threads that need to use COM 296 #if defined(WEBRTC_WIN) 297 class ComThread : public Thread { 298 public: 299 ComThread() {} 300 virtual ~ComThread() { Stop(); } 301 302 protected: 303 virtual void Run(); 304 305 private: 306 DISALLOW_COPY_AND_ASSIGN(ComThread); 307 }; 308 #endif 309 310 // Provides an easy way to install/uninstall a socketserver on a thread. 311 class SocketServerScope { 312 public: 313 explicit SocketServerScope(SocketServer* ss) { 314 old_ss_ = Thread::Current()->socketserver(); 315 Thread::Current()->set_socketserver(ss); 316 } 317 ~SocketServerScope() { 318 Thread::Current()->set_socketserver(old_ss_); 319 } 320 321 private: 322 SocketServer* old_ss_; 323 324 DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope); 325 }; 326 327 } // namespace rtc 328 329 #endif // WEBRTC_BASE_THREAD_H_ 330