Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #ifdef POSIX
     29 #include <sys/time.h>
     30 #endif
     31 
     32 #include "talk/base/common.h"
     33 #include "talk/base/logging.h"
     34 #include "talk/base/messagequeue.h"
     35 #include "talk/base/physicalsocketserver.h"
     36 
     37 
     38 namespace talk_base {
     39 
     40 const uint32 kMaxMsgLatency = 150;  // 150 ms
     41 
     42 //------------------------------------------------------------------
     43 // MessageQueueManager
     44 
     45 MessageQueueManager* MessageQueueManager::instance_;
     46 
     47 MessageQueueManager* MessageQueueManager::Instance() {
     48   // Note: This is not thread safe, but it is first called before threads are
     49   // spawned.
     50   if (!instance_)
     51     instance_ = new MessageQueueManager;
     52   return instance_;
     53 }
     54 
     55 MessageQueueManager::MessageQueueManager() {
     56 }
     57 
     58 MessageQueueManager::~MessageQueueManager() {
     59 }
     60 
     61 void MessageQueueManager::Add(MessageQueue *message_queue) {
     62   // MessageQueueManager methods should be non-reentrant, so we
     63   // ASSERT that is the case.  If any of these ASSERT, please
     64   // contact bpm or jbeda.
     65   ASSERT(!crit_.CurrentThreadIsOwner());
     66   CritScope cs(&crit_);
     67   message_queues_.push_back(message_queue);
     68 }
     69 
     70 void MessageQueueManager::Remove(MessageQueue *message_queue) {
     71   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
     72   // If this is the last MessageQueue, destroy the manager as well so that
     73   // we don't leak this object at program shutdown. As mentioned above, this is
     74   // not thread-safe, but this should only happen at program termination (when
     75   // the ThreadManager is destroyed, and threads are no longer active).
     76   bool destroy = false;
     77   {
     78     CritScope cs(&crit_);
     79     std::vector<MessageQueue *>::iterator iter;
     80     iter = std::find(message_queues_.begin(), message_queues_.end(),
     81                      message_queue);
     82     if (iter != message_queues_.end()) {
     83       message_queues_.erase(iter);
     84     }
     85     destroy = message_queues_.empty();
     86   }
     87   if (destroy) {
     88     instance_ = NULL;
     89     delete this;
     90   }
     91 }
     92 
     93 void MessageQueueManager::Clear(MessageHandler *handler) {
     94   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
     95   CritScope cs(&crit_);
     96   std::vector<MessageQueue *>::iterator iter;
     97   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
     98     (*iter)->Clear(handler);
     99 }
    100 
    101 //------------------------------------------------------------------
    102 // MessageQueue
    103 
    104 MessageQueue::MessageQueue(SocketServer* ss)
    105     : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false),
    106       dmsgq_next_num_(0) {
    107   if (!ss_) {
    108     // Currently, MessageQueue holds a socket server, and is the base class for
    109     // Thread.  It seems like it makes more sense for Thread to hold the socket
    110     // server, and provide it to the MessageQueue, since the Thread controls
    111     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
    112     // messagequeue_unittest to depend on network libraries... yuck.
    113     default_ss_.reset(new PhysicalSocketServer());
    114     ss_ = default_ss_.get();
    115   }
    116   ss_->SetMessageQueue(this);
    117 }
    118 
    119 MessageQueue::~MessageQueue() {
    120   // The signal is done from here to ensure
    121   // that it always gets called when the queue
    122   // is going away.
    123   SignalQueueDestroyed();
    124   if (active_) {
    125     MessageQueueManager::Instance()->Remove(this);
    126     Clear(NULL);
    127   }
    128   if (ss_) {
    129     ss_->SetMessageQueue(NULL);
    130   }
    131 }
    132 
    133 void MessageQueue::set_socketserver(SocketServer* ss) {
    134   ss_ = ss ? ss : default_ss_.get();
    135   ss_->SetMessageQueue(this);
    136 }
    137 
    138 void MessageQueue::Quit() {
    139   fStop_ = true;
    140   ss_->WakeUp();
    141 }
    142 
    143 bool MessageQueue::IsQuitting() {
    144   return fStop_;
    145 }
    146 
    147 void MessageQueue::Restart() {
    148   fStop_ = false;
    149 }
    150 
    151 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
    152   if (fPeekKeep_) {
    153     *pmsg = msgPeek_;
    154     return true;
    155   }
    156   if (!Get(pmsg, cmsWait))
    157     return false;
    158   msgPeek_ = *pmsg;
    159   fPeekKeep_ = true;
    160   return true;
    161 }
    162 
    163 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
    164   // Return and clear peek if present
    165   // Always return the peek if it exists so there is Peek/Get symmetry
    166 
    167   if (fPeekKeep_) {
    168     *pmsg = msgPeek_;
    169     fPeekKeep_ = false;
    170     return true;
    171   }
    172 
    173   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
    174 
    175   int cmsTotal = cmsWait;
    176   int cmsElapsed = 0;
    177   uint32 msStart = Time();
    178   uint32 msCurrent = msStart;
    179   while (true) {
    180     // Check for sent messages
    181     ReceiveSends();
    182 
    183     // Check for posted events
    184     int cmsDelayNext = kForever;
    185     bool first_pass = true;
    186     while (true) {
    187       // All queue operations need to be locked, but nothing else in this loop
    188       // (specifically handling disposed message) can happen inside the crit.
    189       // Otherwise, disposed MessageHandlers will cause deadlocks.
    190       {
    191         CritScope cs(&crit_);
    192         // On the first pass, check for delayed messages that have been
    193         // triggered and calculate the next trigger time.
    194         if (first_pass) {
    195           first_pass = false;
    196           while (!dmsgq_.empty()) {
    197             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
    198               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
    199               break;
    200             }
    201             msgq_.push_back(dmsgq_.top().msg_);
    202             dmsgq_.pop();
    203           }
    204         }
    205         // Pull a message off the message queue, if available.
    206         if (msgq_.empty()) {
    207           break;
    208         } else {
    209           *pmsg = msgq_.front();
    210           msgq_.pop_front();
    211         }
    212       }  // crit_ is released here.
    213 
    214       // Log a warning for time-sensitive messages that we're late to deliver.
    215       if (pmsg->ts_sensitive) {
    216         int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
    217         if (delay > 0) {
    218           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
    219                             << (delay + kMaxMsgLatency) << "ms";
    220         }
    221       }
    222       // If this was a dispose message, delete it and skip it.
    223       if (MQID_DISPOSE == pmsg->message_id) {
    224         ASSERT(NULL == pmsg->phandler);
    225         delete pmsg->pdata;
    226         *pmsg = Message();
    227         continue;
    228       }
    229       return true;
    230     }
    231 
    232     if (fStop_)
    233       break;
    234 
    235     // Which is shorter, the delay wait or the asked wait?
    236 
    237     int cmsNext;
    238     if (cmsWait == kForever) {
    239       cmsNext = cmsDelayNext;
    240     } else {
    241       cmsNext = _max(0, cmsTotal - cmsElapsed);
    242       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
    243         cmsNext = cmsDelayNext;
    244     }
    245 
    246     // Wait and multiplex in the meantime
    247     if (!ss_->Wait(cmsNext, process_io))
    248       return false;
    249 
    250     // If the specified timeout expired, return
    251 
    252     msCurrent = Time();
    253     cmsElapsed = TimeDiff(msCurrent, msStart);
    254     if (cmsWait != kForever) {
    255       if (cmsElapsed >= cmsWait)
    256         return false;
    257     }
    258   }
    259   return false;
    260 }
    261 
    262 void MessageQueue::ReceiveSends() {
    263 }
    264 
    265 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
    266     MessageData *pdata, bool time_sensitive) {
    267   if (fStop_)
    268     return;
    269 
    270   // Keep thread safe
    271   // Add the message to the end of the queue
    272   // Signal for the multiplexer to return
    273 
    274   CritScope cs(&crit_);
    275   EnsureActive();
    276   Message msg;
    277   msg.phandler = phandler;
    278   msg.message_id = id;
    279   msg.pdata = pdata;
    280   if (time_sensitive) {
    281     msg.ts_sensitive = Time() + kMaxMsgLatency;
    282   }
    283   msgq_.push_back(msg);
    284   ss_->WakeUp();
    285 }
    286 
    287 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
    288     MessageHandler *phandler, uint32 id, MessageData* pdata) {
    289   if (fStop_)
    290     return;
    291 
    292   // Keep thread safe
    293   // Add to the priority queue. Gets sorted soonest first.
    294   // Signal for the multiplexer to return.
    295 
    296   CritScope cs(&crit_);
    297   EnsureActive();
    298   Message msg;
    299   msg.phandler = phandler;
    300   msg.message_id = id;
    301   msg.pdata = pdata;
    302   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
    303   dmsgq_.push(dmsg);
    304   // If this message queue processes 1 message every millisecond for 50 days,
    305   // we will wrap this number.  Even then, only messages with identical times
    306   // will be misordered, and then only briefly.  This is probably ok.
    307   VERIFY(0 != ++dmsgq_next_num_);
    308   ss_->WakeUp();
    309 }
    310 
    311 int MessageQueue::GetDelay() {
    312   CritScope cs(&crit_);
    313 
    314   if (!msgq_.empty())
    315     return 0;
    316 
    317   if (!dmsgq_.empty()) {
    318     int delay = TimeUntil(dmsgq_.top().msTrigger_);
    319     if (delay < 0)
    320       delay = 0;
    321     return delay;
    322   }
    323 
    324   return kForever;
    325 }
    326 
    327 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
    328                          MessageList* removed) {
    329   CritScope cs(&crit_);
    330 
    331   // Remove messages with phandler
    332 
    333   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
    334     if (removed) {
    335       removed->push_back(msgPeek_);
    336     } else {
    337       delete msgPeek_.pdata;
    338     }
    339     fPeekKeep_ = false;
    340   }
    341 
    342   // Remove from ordered message queue
    343 
    344   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
    345     if (it->Match(phandler, id)) {
    346       if (removed) {
    347         removed->push_back(*it);
    348       } else {
    349         delete it->pdata;
    350       }
    351       it = msgq_.erase(it);
    352     } else {
    353       ++it;
    354     }
    355   }
    356 
    357   // Remove from priority queue. Not directly iterable, so use this approach
    358 
    359   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
    360   for (PriorityQueue::container_type::iterator it = new_end;
    361        it != dmsgq_.container().end(); ++it) {
    362     if (it->msg_.Match(phandler, id)) {
    363       if (removed) {
    364         removed->push_back(it->msg_);
    365       } else {
    366         delete it->msg_.pdata;
    367       }
    368     } else {
    369       *new_end++ = *it;
    370     }
    371   }
    372   dmsgq_.container().erase(new_end, dmsgq_.container().end());
    373   dmsgq_.reheap();
    374 }
    375 
    376 void MessageQueue::Dispatch(Message *pmsg) {
    377   pmsg->phandler->OnMessage(pmsg);
    378 }
    379 
    380 void MessageQueue::EnsureActive() {
    381   ASSERT(crit_.CurrentThreadIsOwner());
    382   if (!active_) {
    383     active_ = true;
    384     MessageQueueManager::Instance()->Add(this);
    385   }
    386 }
    387 
    388 }  // namespace talk_base
    389