Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #ifndef WEBRTC_BASE_THREAD_H_
     12 #define WEBRTC_BASE_THREAD_H_
     13 
     14 #include <algorithm>
     15 #include <list>
     16 #include <string>
     17 #include <vector>
     18 
     19 #if defined(WEBRTC_POSIX)
     20 #include <pthread.h>
     21 #endif
     22 #include "webrtc/base/constructormagic.h"
     23 #include "webrtc/base/event.h"
     24 #include "webrtc/base/messagequeue.h"
     25 
     26 #if defined(WEBRTC_WIN)
     27 #include "webrtc/base/win32.h"
     28 #endif
     29 
     30 namespace rtc {
     31 
     32 class Thread;
     33 
     34 class ThreadManager {
     35  public:
     36   static const int kForever = -1;
     37 
     38   ThreadManager();
     39   ~ThreadManager();
     40 
     41   static ThreadManager* Instance();
     42 
     43   Thread* CurrentThread();
     44   void SetCurrentThread(Thread* thread);
     45 
     46   // Returns a thread object with its thread_ ivar set
     47   // to whatever the OS uses to represent the thread.
     48   // If there already *is* a Thread object corresponding to this thread,
     49   // this method will return that.  Otherwise it creates a new Thread
     50   // object whose wrapped() method will return true, and whose
     51   // handle will, on Win32, be opened with only synchronization privileges -
     52   // if you need more privilegs, rather than changing this method, please
     53   // write additional code to adjust the privileges, or call a different
     54   // factory method of your own devising, because this one gets used in
     55   // unexpected contexts (like inside browser plugins) and it would be a
     56   // shame to break it.  It is also conceivable on Win32 that we won't even
     57   // be able to get synchronization privileges, in which case the result
     58   // will have a NULL handle.
     59   Thread *WrapCurrentThread();
     60   void UnwrapCurrentThread();
     61 
     62  private:
     63 #if defined(WEBRTC_POSIX)
     64   pthread_key_t key_;
     65 #endif
     66 
     67 #if defined(WEBRTC_WIN)
     68   DWORD key_;
     69 #endif
     70 
     71   RTC_DISALLOW_COPY_AND_ASSIGN(ThreadManager);
     72 };
     73 
     74 struct _SendMessage {
     75   _SendMessage() {}
     76   Thread *thread;
     77   Message msg;
     78   bool *ready;
     79 };
     80 
     81 class Runnable {
     82  public:
     83   virtual ~Runnable() {}
     84   virtual void Run(Thread* thread) = 0;
     85 
     86  protected:
     87   Runnable() {}
     88 
     89  private:
     90   RTC_DISALLOW_COPY_AND_ASSIGN(Runnable);
     91 };
     92 
     93 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
     94 
     95 class Thread : public MessageQueue {
     96  public:
     97   explicit Thread(SocketServer* ss = NULL);
     98   // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
     99   // guarantee Stop() is explicitly called before the subclass is destroyed).
    100   // This is required to avoid a data race between the destructor modifying the
    101   // vtable, and the Thread::PreRun calling the virtual method Run().
    102   ~Thread() override;
    103 
    104   static Thread* Current();
    105 
    106   // Used to catch performance regressions. Use this to disallow blocking calls
    107   // (Invoke) for a given scope.  If a synchronous call is made while this is in
    108   // effect, an assert will be triggered.
    109   // Note that this is a single threaded class.
    110   class ScopedDisallowBlockingCalls {
    111    public:
    112     ScopedDisallowBlockingCalls();
    113     ~ScopedDisallowBlockingCalls();
    114    private:
    115     Thread* const thread_;
    116     const bool previous_state_;
    117   };
    118 
    119   bool IsCurrent() const {
    120     return Current() == this;
    121   }
    122 
    123   // Sleeps the calling thread for the specified number of milliseconds, during
    124   // which time no processing is performed. Returns false if sleeping was
    125   // interrupted by a signal (POSIX only).
    126   static bool SleepMs(int millis);
    127 
    128   // Sets the thread's name, for debugging. Must be called before Start().
    129   // If |obj| is non-NULL, its value is appended to |name|.
    130   const std::string& name() const { return name_; }
    131   bool SetName(const std::string& name, const void* obj);
    132 
    133   // Starts the execution of the thread.
    134   bool Start(Runnable* runnable = NULL);
    135 
    136   // Tells the thread to stop and waits until it is joined.
    137   // Never call Stop on the current thread.  Instead use the inherited Quit
    138   // function which will exit the base MessageQueue without terminating the
    139   // underlying OS thread.
    140   virtual void Stop();
    141 
    142   // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
    143   // work, override Run().  To receive and dispatch messages, call
    144   // ProcessMessages occasionally.
    145   virtual void Run();
    146 
    147   virtual void Send(MessageHandler* phandler,
    148                     uint32_t id = 0,
    149                     MessageData* pdata = NULL);
    150 
    151   // Convenience method to invoke a functor on another thread.  Caller must
    152   // provide the |ReturnT| template argument, which cannot (easily) be deduced.
    153   // Uses Send() internally, which blocks the current thread until execution
    154   // is complete.
    155   // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool);
    156   // NOTE: This function can only be called when synchronous calls are allowed.
    157   // See ScopedDisallowBlockingCalls for details.
    158   template <class ReturnT, class FunctorT>
    159   ReturnT Invoke(const FunctorT& functor) {
    160     InvokeBegin();
    161     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
    162     Send(&handler);
    163     InvokeEnd();
    164     return handler.result();
    165   }
    166 
    167   // From MessageQueue
    168   void Clear(MessageHandler* phandler,
    169              uint32_t id = MQID_ANY,
    170              MessageList* removed = NULL) override;
    171   void ReceiveSends() override;
    172 
    173   // ProcessMessages will process I/O and dispatch messages until:
    174   //  1) cms milliseconds have elapsed (returns true)
    175   //  2) Stop() is called (returns false)
    176   bool ProcessMessages(int cms);
    177 
    178   // Returns true if this is a thread that we created using the standard
    179   // constructor, false if it was created by a call to
    180   // ThreadManager::WrapCurrentThread().  The main thread of an application
    181   // is generally not owned, since the OS representation of the thread
    182   // obviously exists before we can get to it.
    183   // You cannot call Start on non-owned threads.
    184   bool IsOwned();
    185 
    186 #if defined(WEBRTC_WIN)
    187   HANDLE GetHandle() const {
    188     return thread_;
    189   }
    190   DWORD GetId() const {
    191     return thread_id_;
    192   }
    193 #elif defined(WEBRTC_POSIX)
    194   pthread_t GetPThread() {
    195     return thread_;
    196   }
    197 #endif
    198 
    199   // Expose private method running() for tests.
    200   //
    201   // DANGER: this is a terrible public API.  Most callers that might want to
    202   // call this likely do not have enough control/knowledge of the Thread in
    203   // question to guarantee that the returned value remains true for the duration
    204   // of whatever code is conditionally executing because of the return value!
    205   bool RunningForTest() { return running(); }
    206 
    207   // Sets the per-thread allow-blocking-calls flag and returns the previous
    208   // value. Must be called on this thread.
    209   bool SetAllowBlockingCalls(bool allow);
    210 
    211   // These functions are public to avoid injecting test hooks. Don't call them
    212   // outside of tests.
    213   // This method should be called when thread is created using non standard
    214   // method, like derived implementation of rtc::Thread and it can not be
    215   // started by calling Start(). This will set started flag to true and
    216   // owned to false. This must be called from the current thread.
    217   bool WrapCurrent();
    218   void UnwrapCurrent();
    219 
    220  protected:
    221   // Same as WrapCurrent except that it never fails as it does not try to
    222   // acquire the synchronization access of the thread. The caller should never
    223   // call Stop() or Join() on this thread.
    224   void SafeWrapCurrent();
    225 
    226   // Blocks the calling thread until this thread has terminated.
    227   void Join();
    228 
    229   static void AssertBlockingIsAllowedOnCurrentThread();
    230 
    231   friend class ScopedDisallowBlockingCalls;
    232 
    233  private:
    234   static void *PreRun(void *pv);
    235 
    236   // ThreadManager calls this instead WrapCurrent() because
    237   // ThreadManager::Instance() cannot be used while ThreadManager is
    238   // being created.
    239   // The method tries to get synchronization rights of the thread on Windows if
    240   // |need_synchronize_access| is true.
    241   bool WrapCurrentWithThreadManager(ThreadManager* thread_manager,
    242                                     bool need_synchronize_access);
    243 
    244   // Return true if the thread was started and hasn't yet stopped.
    245   bool running() { return running_.Wait(0); }
    246 
    247   // Processes received "Send" requests. If |source| is not NULL, only requests
    248   // from |source| are processed, otherwise, all requests are processed.
    249   void ReceiveSendsFromThread(const Thread* source);
    250 
    251   // If |source| is not NULL, pops the first "Send" message from |source| in
    252   // |sendlist_|, otherwise, pops the first "Send" message of |sendlist_|.
    253   // The caller must lock |crit_| before calling.
    254   // Returns true if there is such a message.
    255   bool PopSendMessageFromThread(const Thread* source, _SendMessage* msg);
    256 
    257   // Used for tracking performance of Invoke calls.
    258   void InvokeBegin();
    259   void InvokeEnd();
    260 
    261   std::list<_SendMessage> sendlist_;
    262   std::string name_;
    263   Event running_;  // Signalled means running.
    264 
    265 #if defined(WEBRTC_POSIX)
    266   pthread_t thread_;
    267 #endif
    268 
    269 #if defined(WEBRTC_WIN)
    270   HANDLE thread_;
    271   DWORD thread_id_;
    272 #endif
    273 
    274   bool owned_;
    275   bool blocking_calls_allowed_;  // By default set to |true|.
    276 
    277   friend class ThreadManager;
    278 
    279   RTC_DISALLOW_COPY_AND_ASSIGN(Thread);
    280 };
    281 
    282 // AutoThread automatically installs itself at construction
    283 // uninstalls at destruction, if a Thread object is
    284 // _not already_ associated with the current OS thread.
    285 
    286 class AutoThread : public Thread {
    287  public:
    288   explicit AutoThread(SocketServer* ss = 0);
    289   ~AutoThread() override;
    290 
    291  private:
    292   RTC_DISALLOW_COPY_AND_ASSIGN(AutoThread);
    293 };
    294 
    295 // Win32 extension for threads that need to use COM
    296 #if defined(WEBRTC_WIN)
    297 class ComThread : public Thread {
    298  public:
    299   ComThread() {}
    300   virtual ~ComThread() { Stop(); }
    301 
    302  protected:
    303   virtual void Run();
    304 
    305  private:
    306   RTC_DISALLOW_COPY_AND_ASSIGN(ComThread);
    307 };
    308 #endif
    309 
    310 // Provides an easy way to install/uninstall a socketserver on a thread.
    311 class SocketServerScope {
    312  public:
    313   explicit SocketServerScope(SocketServer* ss) {
    314     old_ss_ = Thread::Current()->socketserver();
    315     Thread::Current()->set_socketserver(ss);
    316   }
    317   ~SocketServerScope() {
    318     Thread::Current()->set_socketserver(old_ss_);
    319   }
    320 
    321  private:
    322   SocketServer* old_ss_;
    323 
    324   RTC_DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope);
    325 };
    326 
    327 }  // namespace rtc
    328 
    329 #endif  // WEBRTC_BASE_THREAD_H_
    330