Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #ifndef WEBRTC_BASE_THREAD_H_
     12 #define WEBRTC_BASE_THREAD_H_
     13 
     14 #include <algorithm>
     15 #include <list>
     16 #include <string>
     17 #include <vector>
     18 
     19 #if defined(WEBRTC_POSIX)
     20 #include <pthread.h>
     21 #endif
     22 #include "webrtc/base/constructormagic.h"
     23 #include "webrtc/base/event.h"
     24 #include "webrtc/base/messagequeue.h"
     25 
     26 #if defined(WEBRTC_WIN)
     27 #include "webrtc/base/win32.h"
     28 #endif
     29 
     30 namespace rtc {
     31 
     32 class Thread;
     33 
     34 class ThreadManager {
     35  public:
     36   ThreadManager();
     37   ~ThreadManager();
     38 
     39   static ThreadManager* Instance();
     40 
     41   Thread* CurrentThread();
     42   void SetCurrentThread(Thread* thread);
     43 
     44   // Returns a thread object with its thread_ ivar set
     45   // to whatever the OS uses to represent the thread.
     46   // If there already *is* a Thread object corresponding to this thread,
     47   // this method will return that.  Otherwise it creates a new Thread
     48   // object whose wrapped() method will return true, and whose
     49   // handle will, on Win32, be opened with only synchronization privileges -
     50   // if you need more privilegs, rather than changing this method, please
     51   // write additional code to adjust the privileges, or call a different
     52   // factory method of your own devising, because this one gets used in
     53   // unexpected contexts (like inside browser plugins) and it would be a
     54   // shame to break it.  It is also conceivable on Win32 that we won't even
     55   // be able to get synchronization privileges, in which case the result
     56   // will have a NULL handle.
     57   Thread *WrapCurrentThread();
     58   void UnwrapCurrentThread();
     59 
     60  private:
     61 #if defined(WEBRTC_POSIX)
     62   pthread_key_t key_;
     63 #endif
     64 
     65 #if defined(WEBRTC_WIN)
     66   DWORD key_;
     67 #endif
     68 
     69   DISALLOW_COPY_AND_ASSIGN(ThreadManager);
     70 };
     71 
     72 struct _SendMessage {
     73   _SendMessage() {}
     74   Thread *thread;
     75   Message msg;
     76   bool *ready;
     77 };
     78 
     79 enum ThreadPriority {
     80   PRIORITY_IDLE = -1,
     81   PRIORITY_NORMAL = 0,
     82   PRIORITY_ABOVE_NORMAL = 1,
     83   PRIORITY_HIGH = 2,
     84 };
     85 
     86 class Runnable {
     87  public:
     88   virtual ~Runnable() {}
     89   virtual void Run(Thread* thread) = 0;
     90 
     91  protected:
     92   Runnable() {}
     93 
     94  private:
     95   DISALLOW_COPY_AND_ASSIGN(Runnable);
     96 };
     97 
     98 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
     99 
    100 class Thread : public MessageQueue {
    101  public:
    102   explicit Thread(SocketServer* ss = NULL);
    103   // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
    104   // guarantee Stop() is explicitly called before the subclass is destroyed).
    105   // This is required to avoid a data race between the destructor modifying the
    106   // vtable, and the Thread::PreRun calling the virtual method Run().
    107   virtual ~Thread();
    108 
    109   static Thread* Current();
    110 
    111   // Used to catch performance regressions. Use this to disallow blocking calls
    112   // (Invoke) for a given scope.  If a synchronous call is made while this is in
    113   // effect, an assert will be triggered.
    114   // Note that this is a single threaded class.
    115   class ScopedDisallowBlockingCalls {
    116    public:
    117     ScopedDisallowBlockingCalls();
    118     ~ScopedDisallowBlockingCalls();
    119    private:
    120     Thread* const thread_;
    121     const bool previous_state_;
    122   };
    123 
    124   bool IsCurrent() const {
    125     return Current() == this;
    126   }
    127 
    128   // Sleeps the calling thread for the specified number of milliseconds, during
    129   // which time no processing is performed. Returns false if sleeping was
    130   // interrupted by a signal (POSIX only).
    131   static bool SleepMs(int millis);
    132 
    133   // Sets the thread's name, for debugging. Must be called before Start().
    134   // If |obj| is non-NULL, its value is appended to |name|.
    135   const std::string& name() const { return name_; }
    136   bool SetName(const std::string& name, const void* obj);
    137 
    138   // Sets the thread's priority. Must be called before Start().
    139   ThreadPriority priority() const { return priority_; }
    140   bool SetPriority(ThreadPriority priority);
    141 
    142   // Starts the execution of the thread.
    143   bool Start(Runnable* runnable = NULL);
    144 
    145   // Tells the thread to stop and waits until it is joined.
    146   // Never call Stop on the current thread.  Instead use the inherited Quit
    147   // function which will exit the base MessageQueue without terminating the
    148   // underlying OS thread.
    149   virtual void Stop();
    150 
    151   // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
    152   // work, override Run().  To receive and dispatch messages, call
    153   // ProcessMessages occasionally.
    154   virtual void Run();
    155 
    156   virtual void Send(MessageHandler *phandler, uint32 id = 0,
    157       MessageData *pdata = NULL);
    158 
    159   // Convenience method to invoke a functor on another thread.  Caller must
    160   // provide the |ReturnT| template argument, which cannot (easily) be deduced.
    161   // Uses Send() internally, which blocks the current thread until execution
    162   // is complete.
    163   // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool);
    164   // NOTE: This function can only be called when synchronous calls are allowed.
    165   // See ScopedDisallowBlockingCalls for details.
    166   template <class ReturnT, class FunctorT>
    167   ReturnT Invoke(const FunctorT& functor) {
    168     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
    169     Send(&handler);
    170     return handler.result();
    171   }
    172 
    173   // From MessageQueue
    174   virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY,
    175                      MessageList* removed = NULL);
    176   virtual void ReceiveSends();
    177 
    178   // ProcessMessages will process I/O and dispatch messages until:
    179   //  1) cms milliseconds have elapsed (returns true)
    180   //  2) Stop() is called (returns false)
    181   bool ProcessMessages(int cms);
    182 
    183   // Returns true if this is a thread that we created using the standard
    184   // constructor, false if it was created by a call to
    185   // ThreadManager::WrapCurrentThread().  The main thread of an application
    186   // is generally not owned, since the OS representation of the thread
    187   // obviously exists before we can get to it.
    188   // You cannot call Start on non-owned threads.
    189   bool IsOwned();
    190 
    191 #if defined(WEBRTC_WIN)
    192   HANDLE GetHandle() const {
    193     return thread_;
    194   }
    195   DWORD GetId() const {
    196     return thread_id_;
    197   }
    198 #elif defined(WEBRTC_POSIX)
    199   pthread_t GetPThread() {
    200     return thread_;
    201   }
    202 #endif
    203 
    204   // Expose private method running() for tests.
    205   //
    206   // DANGER: this is a terrible public API.  Most callers that might want to
    207   // call this likely do not have enough control/knowledge of the Thread in
    208   // question to guarantee that the returned value remains true for the duration
    209   // of whatever code is conditionally executing because of the return value!
    210   bool RunningForTest() { return running(); }
    211 
    212   // Sets the per-thread allow-blocking-calls flag and returns the previous
    213   // value.
    214   bool SetAllowBlockingCalls(bool allow);
    215 
    216  protected:
    217   // This method should be called when thread is created using non standard
    218   // method, like derived implementation of rtc::Thread and it can not be
    219   // started by calling Start(). This will set started flag to true and
    220   // owned to false. This must be called from the current thread.
    221   bool WrapCurrent();
    222   void UnwrapCurrent();
    223 
    224   // Same as WrapCurrent except that it never fails as it does not try to
    225   // acquire the synchronization access of the thread. The caller should never
    226   // call Stop() or Join() on this thread.
    227   void SafeWrapCurrent();
    228 
    229   // Blocks the calling thread until this thread has terminated.
    230   void Join();
    231 
    232   static void AssertBlockingIsAllowedOnCurrentThread();
    233 
    234   friend class ScopedDisallowBlockingCalls;
    235 
    236  private:
    237   static void *PreRun(void *pv);
    238 
    239   // ThreadManager calls this instead WrapCurrent() because
    240   // ThreadManager::Instance() cannot be used while ThreadManager is
    241   // being created.
    242   // The method tries to get synchronization rights of the thread on Windows if
    243   // |need_synchronize_access| is true.
    244   bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,
    245                                     bool need_synchronize_access);
    246 
    247   // Return true if the thread was started and hasn't yet stopped.
    248   bool running() { return running_.Wait(0); }
    249 
    250   // Processes received "Send" requests. If |source| is not NULL, only requests
    251   // from |source| are processed, otherwise, all requests are processed.
    252   void ReceiveSendsFromThread(const Thread* source);
    253 
    254   // If |source| is not NULL, pops the first "Send" message from |source| in
    255   // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
    256   // The caller must lock |crit_| before calling.
    257   // Returns true if there is such a message.
    258   bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
    259 
    260   std::list<_SendMessage> sendlist_;
    261   std::string name_;
    262   ThreadPriority priority_;
    263   Event running_;  // Signalled means running.
    264 
    265 #if defined(WEBRTC_POSIX)
    266   pthread_t thread_;
    267 #endif
    268 
    269 #if defined(WEBRTC_WIN)
    270   HANDLE thread_;
    271   DWORD thread_id_;
    272 #endif
    273 
    274   bool owned_;
    275   bool blocking_calls_allowed_;  // By default set to |true|.
    276 
    277   friend class ThreadManager;
    278 
    279   DISALLOW_COPY_AND_ASSIGN(Thread);
    280 };
    281 
    282 // AutoThread automatically installs itself at construction
    283 // uninstalls at destruction, if a Thread object is
    284 // _not already_ associated with the current OS thread.
    285 
    286 class AutoThread : public Thread {
    287  public:
    288   explicit AutoThread(SocketServer* ss = 0);
    289   virtual ~AutoThread();
    290 
    291  private:
    292   DISALLOW_COPY_AND_ASSIGN(AutoThread);
    293 };
    294 
    295 // Win32 extension for threads that need to use COM
    296 #if defined(WEBRTC_WIN)
    297 class ComThread : public Thread {
    298  public:
    299   ComThread() {}
    300   virtual ~ComThread() { Stop(); }
    301 
    302  protected:
    303   virtual void Run();
    304 
    305  private:
    306   DISALLOW_COPY_AND_ASSIGN(ComThread);
    307 };
    308 #endif
    309 
    310 // Provides an easy way to install/uninstall a socketserver on a thread.
    311 class SocketServerScope {
    312  public:
    313   explicit SocketServerScope(SocketServer* ss) {
    314     old_ss_ = Thread::Current()->socketserver();
    315     Thread::Current()->set_socketserver(ss);
    316   }
    317   ~SocketServerScope() {
    318     Thread::Current()->set_socketserver(old_ss_);
    319   }
    320 
    321  private:
    322   SocketServer* old_ss_;
    323 
    324   DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope);
    325 };
    326 
    327 }  // namespace rtc
    328 
    329 #endif  // WEBRTC_BASE_THREAD_H_
    330