Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004 Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #ifndef TALK_BASE_THREAD_H_
     29 #define TALK_BASE_THREAD_H_
     30 
     31 #include <algorithm>
     32 #include <list>
     33 #include <string>
     34 #include <vector>
     35 
     36 #ifdef POSIX
     37 #include <pthread.h>
     38 #endif
     39 #include "talk/base/constructormagic.h"
     40 #include "talk/base/messagequeue.h"
     41 
     42 #ifdef WIN32
     43 #include "talk/base/win32.h"
     44 #endif
     45 
     46 namespace talk_base {
     47 
     48 class Thread;
     49 
     50 class ThreadManager {
     51  public:
     52   ThreadManager();
     53   ~ThreadManager();
     54 
     55   static ThreadManager* Instance();
     56 
     57   Thread* CurrentThread();
     58   void SetCurrentThread(Thread* thread);
     59 
     60   // Returns a thread object with its thread_ ivar set
     61   // to whatever the OS uses to represent the thread.
     62   // If there already *is* a Thread object corresponding to this thread,
     63   // this method will return that.  Otherwise it creates a new Thread
     64   // object whose wrapped() method will return true, and whose
     65   // handle will, on Win32, be opened with only synchronization privileges -
     66   // if you need more privilegs, rather than changing this method, please
     67   // write additional code to adjust the privileges, or call a different
     68   // factory method of your own devising, because this one gets used in
     69   // unexpected contexts (like inside browser plugins) and it would be a
     70   // shame to break it.  It is also conceivable on Win32 that we won't even
     71   // be able to get synchronization privileges, in which case the result
     72   // will have a NULL handle.
     73   Thread *WrapCurrentThread();
     74   void UnwrapCurrentThread();
     75 
     76  private:
     77 #ifdef POSIX
     78   pthread_key_t key_;
     79 #endif
     80 
     81 #ifdef WIN32
     82   DWORD key_;
     83 #endif
     84 
     85   DISALLOW_COPY_AND_ASSIGN(ThreadManager);
     86 };
     87 
     88 struct _SendMessage {
     89   _SendMessage() {}
     90   Thread *thread;
     91   Message msg;
     92   bool *ready;
     93 };
     94 
     95 enum ThreadPriority {
     96   PRIORITY_IDLE = -1,
     97   PRIORITY_NORMAL = 0,
     98   PRIORITY_ABOVE_NORMAL = 1,
     99   PRIORITY_HIGH = 2,
    100 };
    101 
    102 class Runnable {
    103  public:
    104   virtual ~Runnable() {}
    105   virtual void Run(Thread* thread) = 0;
    106 
    107  protected:
    108   Runnable() {}
    109 
    110  private:
    111   DISALLOW_COPY_AND_ASSIGN(Runnable);
    112 };
    113 
    114 // WARNING! SUBCLASSES MUST CALL Stop() IN THEIR DESTRUCTORS!  See ~Thread().
    115 
    116 class Thread : public MessageQueue {
    117  public:
    118   explicit Thread(SocketServer* ss = NULL);
    119   // NOTE: ALL SUBCLASSES OF Thread MUST CALL Stop() IN THEIR DESTRUCTORS (or
    120   // guarantee Stop() is explicitly called before the subclass is destroyed).
    121   // This is required to avoid a data race between the destructor modifying the
    122   // vtable, and the Thread::PreRun calling the virtual method Run().
    123   virtual ~Thread();
    124 
    125   static Thread* Current();
    126 
    127   bool IsCurrent() const {
    128     return Current() == this;
    129   }
    130 
    131   // Sleeps the calling thread for the specified number of milliseconds, during
    132   // which time no processing is performed. Returns false if sleeping was
    133   // interrupted by a signal (POSIX only).
    134   static bool SleepMs(int millis);
    135 
    136   // Sets the thread's name, for debugging. Must be called before Start().
    137   // If |obj| is non-NULL, its value is appended to |name|.
    138   const std::string& name() const { return name_; }
    139   bool SetName(const std::string& name, const void* obj);
    140 
    141   // Sets the thread's priority. Must be called before Start().
    142   ThreadPriority priority() const { return priority_; }
    143   bool SetPriority(ThreadPriority priority);
    144 
    145   // Starts the execution of the thread.
    146   bool started() const { return started_; }
    147   bool Start(Runnable* runnable = NULL);
    148 
    149   // Used for fire-and-forget threads.  Deletes this thread object when the
    150   // Run method returns.
    151   void Release() {
    152     delete_self_when_complete_ = true;
    153   }
    154 
    155   // Tells the thread to stop and waits until it is joined.
    156   // Never call Stop on the current thread.  Instead use the inherited Quit
    157   // function which will exit the base MessageQueue without terminating the
    158   // underlying OS thread.
    159   virtual void Stop();
    160 
    161   // By default, Thread::Run() calls ProcessMessages(kForever).  To do other
    162   // work, override Run().  To receive and dispatch messages, call
    163   // ProcessMessages occasionally.
    164   virtual void Run();
    165 
    166   virtual void Send(MessageHandler *phandler, uint32 id = 0,
    167       MessageData *pdata = NULL);
    168 
    169   // Convenience method to invoke a functor on another thread.  Caller must
    170   // provide the |ReturnT| template argument, which cannot (easily) be deduced.
    171   // Uses Send() internally, which blocks the current thread until execution
    172   // is complete.
    173   // Ex: bool result = thread.Invoke<bool>(&MyFunctionReturningBool);
    174   template <class ReturnT, class FunctorT>
    175   ReturnT Invoke(const FunctorT& functor) {
    176     FunctorMessageHandler<ReturnT, FunctorT> handler(functor);
    177     Send(&handler);
    178     return handler.result();
    179   }
    180 
    181   // From MessageQueue
    182   virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY,
    183                      MessageList* removed = NULL);
    184   virtual void ReceiveSends();
    185 
    186   // ProcessMessages will process I/O and dispatch messages until:
    187   //  1) cms milliseconds have elapsed (returns true)
    188   //  2) Stop() is called (returns false)
    189   bool ProcessMessages(int cms);
    190 
    191   // Returns true if this is a thread that we created using the standard
    192   // constructor, false if it was created by a call to
    193   // ThreadManager::WrapCurrentThread().  The main thread of an application
    194   // is generally not owned, since the OS representation of the thread
    195   // obviously exists before we can get to it.
    196   // You cannot call Start on non-owned threads.
    197   bool IsOwned();
    198 
    199 #ifdef WIN32
    200   HANDLE GetHandle() const {
    201     return thread_;
    202   }
    203   DWORD GetId() const {
    204     return thread_id_;
    205   }
    206 #elif POSIX
    207   pthread_t GetPThread() {
    208     return thread_;
    209   }
    210 #endif
    211 
    212   // This method should be called when thread is created using non standard
    213   // method, like derived implementation of talk_base::Thread and it can not be
    214   // started by calling Start(). This will set started flag to true and
    215   // owned to false. This must be called from the current thread.
    216   // NOTE: These methods should be used by the derived classes only, added here
    217   // only for testing.
    218   bool WrapCurrent();
    219   void UnwrapCurrent();
    220 
    221  protected:
    222   // Blocks the calling thread until this thread has terminated.
    223   void Join();
    224 
    225  private:
    226   // Helper class to facilitate executing a functor on a thread.
    227   template <class ReturnT, class FunctorT>
    228   class FunctorMessageHandler : public MessageHandler {
    229    public:
    230     explicit FunctorMessageHandler(const FunctorT& functor)
    231         : functor_(functor) {}
    232     virtual void OnMessage(Message* msg) {
    233       result_ = functor_();
    234     }
    235     const ReturnT& result() const { return result_; }
    236    private:
    237     FunctorT functor_;
    238     ReturnT result_;
    239   };
    240 
    241   // Specialization for ReturnT of void.
    242   template <class FunctorT>
    243   class FunctorMessageHandler<void, FunctorT> : public MessageHandler {
    244    public:
    245     explicit FunctorMessageHandler(const FunctorT& functor)
    246         : functor_(functor) {}
    247     virtual void OnMessage(Message* msg) { functor_(); }
    248     void result() const {}
    249    private:
    250     FunctorT functor_;
    251   };
    252 
    253   static void *PreRun(void *pv);
    254 
    255   // ThreadManager calls this instead WrapCurrent() because
    256   // ThreadManager::Instance() cannot be used while ThreadManager is
    257   // being created.
    258   bool WrapCurrentWithThreadManager(ThreadManager* thread_manager);
    259 
    260   std::list<_SendMessage> sendlist_;
    261   std::string name_;
    262   ThreadPriority priority_;
    263   bool started_;
    264 
    265 #ifdef POSIX
    266   pthread_t thread_;
    267 #endif
    268 
    269 #ifdef WIN32
    270   HANDLE thread_;
    271   DWORD thread_id_;
    272 #endif
    273 
    274   bool owned_;
    275   bool delete_self_when_complete_;
    276 
    277   friend class ThreadManager;
    278 
    279   DISALLOW_COPY_AND_ASSIGN(Thread);
    280 };
    281 
    282 // AutoThread automatically installs itself at construction
    283 // uninstalls at destruction, if a Thread object is
    284 // _not already_ associated with the current OS thread.
    285 
    286 class AutoThread : public Thread {
    287  public:
    288   explicit AutoThread(SocketServer* ss = 0);
    289   virtual ~AutoThread();
    290 
    291  private:
    292   DISALLOW_COPY_AND_ASSIGN(AutoThread);
    293 };
    294 
    295 // Win32 extension for threads that need to use COM
    296 #ifdef WIN32
    297 class ComThread : public Thread {
    298  public:
    299   ComThread() {}
    300   virtual ~ComThread() { Stop(); }
    301 
    302  protected:
    303   virtual void Run();
    304 
    305  private:
    306   DISALLOW_COPY_AND_ASSIGN(ComThread);
    307 };
    308 #endif
    309 
    310 // Provides an easy way to install/uninstall a socketserver on a thread.
    311 class SocketServerScope {
    312  public:
    313   explicit SocketServerScope(SocketServer* ss) {
    314     old_ss_ = Thread::Current()->socketserver();
    315     Thread::Current()->set_socketserver(ss);
    316   }
    317   ~SocketServerScope() {
    318     Thread::Current()->set_socketserver(old_ss_);
    319   }
    320 
    321  private:
    322   SocketServer* old_ss_;
    323 
    324   DISALLOW_IMPLICIT_CONSTRUCTORS(SocketServerScope);
    325 };
    326 
    327 }  // namespace talk_base
    328 
    329 #endif  // TALK_BASE_THREAD_H_
    330