Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #if defined(WEBRTC_POSIX)
     12 #include <sys/time.h>
     13 #endif
     14 
     15 #include "webrtc/base/common.h"
     16 #include "webrtc/base/logging.h"
     17 #include "webrtc/base/messagequeue.h"
     18 #if defined(__native_client__)
     19 #include "webrtc/base/nullsocketserver.h"
     20 typedef rtc::NullSocketServer DefaultSocketServer;
     21 #else
     22 #include "webrtc/base/physicalsocketserver.h"
     23 typedef rtc::PhysicalSocketServer DefaultSocketServer;
     24 #endif
     25 
     26 namespace rtc {
     27 
     28 const uint32 kMaxMsgLatency = 150;  // 150 ms
     29 
     30 //------------------------------------------------------------------
     31 // MessageQueueManager
     32 
     33 MessageQueueManager* MessageQueueManager::instance_ = NULL;
     34 
     35 MessageQueueManager* MessageQueueManager::Instance() {
     36   // Note: This is not thread safe, but it is first called before threads are
     37   // spawned.
     38   if (!instance_)
     39     instance_ = new MessageQueueManager;
     40   return instance_;
     41 }
     42 
     43 bool MessageQueueManager::IsInitialized() {
     44   return instance_ != NULL;
     45 }
     46 
     47 MessageQueueManager::MessageQueueManager() {
     48 }
     49 
     50 MessageQueueManager::~MessageQueueManager() {
     51 }
     52 
     53 void MessageQueueManager::Add(MessageQueue *message_queue) {
     54   return Instance()->AddInternal(message_queue);
     55 }
     56 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
     57   // MessageQueueManager methods should be non-reentrant, so we
     58   // ASSERT that is the case.  If any of these ASSERT, please
     59   // contact bpm or jbeda.
     60   ASSERT(!crit_.CurrentThreadIsOwner());
     61   CritScope cs(&crit_);
     62   message_queues_.push_back(message_queue);
     63 }
     64 
     65 void MessageQueueManager::Remove(MessageQueue *message_queue) {
     66   // If there isn't a message queue manager instance, then there isn't a queue
     67   // to remove.
     68   if (!instance_) return;
     69   return Instance()->RemoveInternal(message_queue);
     70 }
     71 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
     72   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
     73   // If this is the last MessageQueue, destroy the manager as well so that
     74   // we don't leak this object at program shutdown. As mentioned above, this is
     75   // not thread-safe, but this should only happen at program termination (when
     76   // the ThreadManager is destroyed, and threads are no longer active).
     77   bool destroy = false;
     78   {
     79     CritScope cs(&crit_);
     80     std::vector<MessageQueue *>::iterator iter;
     81     iter = std::find(message_queues_.begin(), message_queues_.end(),
     82                      message_queue);
     83     if (iter != message_queues_.end()) {
     84       message_queues_.erase(iter);
     85     }
     86     destroy = message_queues_.empty();
     87   }
     88   if (destroy) {
     89     instance_ = NULL;
     90     delete this;
     91   }
     92 }
     93 
     94 void MessageQueueManager::Clear(MessageHandler *handler) {
     95   // If there isn't a message queue manager instance, then there aren't any
     96   // queues to remove this handler from.
     97   if (!instance_) return;
     98   return Instance()->ClearInternal(handler);
     99 }
    100 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
    101   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
    102   CritScope cs(&crit_);
    103   std::vector<MessageQueue *>::iterator iter;
    104   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
    105     (*iter)->Clear(handler);
    106 }
    107 
    108 //------------------------------------------------------------------
    109 // MessageQueue
    110 
    111 MessageQueue::MessageQueue(SocketServer* ss)
    112     : ss_(ss), fStop_(false), fPeekKeep_(false),
    113       dmsgq_next_num_(0) {
    114   if (!ss_) {
    115     // Currently, MessageQueue holds a socket server, and is the base class for
    116     // Thread.  It seems like it makes more sense for Thread to hold the socket
    117     // server, and provide it to the MessageQueue, since the Thread controls
    118     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
    119     // messagequeue_unittest to depend on network libraries... yuck.
    120     default_ss_.reset(new DefaultSocketServer());
    121     ss_ = default_ss_.get();
    122   }
    123   ss_->SetMessageQueue(this);
    124   MessageQueueManager::Add(this);
    125 }
    126 
    127 MessageQueue::~MessageQueue() {
    128   // The signal is done from here to ensure
    129   // that it always gets called when the queue
    130   // is going away.
    131   SignalQueueDestroyed();
    132   MessageQueueManager::Remove(this);
    133   Clear(NULL);
    134   if (ss_) {
    135     ss_->SetMessageQueue(NULL);
    136   }
    137 }
    138 
    139 void MessageQueue::set_socketserver(SocketServer* ss) {
    140   ss_ = ss ? ss : default_ss_.get();
    141   ss_->SetMessageQueue(this);
    142 }
    143 
    144 void MessageQueue::Quit() {
    145   fStop_ = true;
    146   ss_->WakeUp();
    147 }
    148 
    149 bool MessageQueue::IsQuitting() {
    150   return fStop_;
    151 }
    152 
    153 void MessageQueue::Restart() {
    154   fStop_ = false;
    155 }
    156 
    157 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
    158   if (fPeekKeep_) {
    159     *pmsg = msgPeek_;
    160     return true;
    161   }
    162   if (!Get(pmsg, cmsWait))
    163     return false;
    164   msgPeek_ = *pmsg;
    165   fPeekKeep_ = true;
    166   return true;
    167 }
    168 
    169 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
    170   // Return and clear peek if present
    171   // Always return the peek if it exists so there is Peek/Get symmetry
    172 
    173   if (fPeekKeep_) {
    174     *pmsg = msgPeek_;
    175     fPeekKeep_ = false;
    176     return true;
    177   }
    178 
    179   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
    180 
    181   int cmsTotal = cmsWait;
    182   int cmsElapsed = 0;
    183   uint32 msStart = Time();
    184   uint32 msCurrent = msStart;
    185   while (true) {
    186     // Check for sent messages
    187     ReceiveSends();
    188 
    189     // Check for posted events
    190     int cmsDelayNext = kForever;
    191     bool first_pass = true;
    192     while (true) {
    193       // All queue operations need to be locked, but nothing else in this loop
    194       // (specifically handling disposed message) can happen inside the crit.
    195       // Otherwise, disposed MessageHandlers will cause deadlocks.
    196       {
    197         CritScope cs(&crit_);
    198         // On the first pass, check for delayed messages that have been
    199         // triggered and calculate the next trigger time.
    200         if (first_pass) {
    201           first_pass = false;
    202           while (!dmsgq_.empty()) {
    203             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
    204               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
    205               break;
    206             }
    207             msgq_.push_back(dmsgq_.top().msg_);
    208             dmsgq_.pop();
    209           }
    210         }
    211         // Pull a message off the message queue, if available.
    212         if (msgq_.empty()) {
    213           break;
    214         } else {
    215           *pmsg = msgq_.front();
    216           msgq_.pop_front();
    217         }
    218       }  // crit_ is released here.
    219 
    220       // Log a warning for time-sensitive messages that we're late to deliver.
    221       if (pmsg->ts_sensitive) {
    222         int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
    223         if (delay > 0) {
    224           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
    225                             << (delay + kMaxMsgLatency) << "ms";
    226         }
    227       }
    228       // If this was a dispose message, delete it and skip it.
    229       if (MQID_DISPOSE == pmsg->message_id) {
    230         ASSERT(NULL == pmsg->phandler);
    231         delete pmsg->pdata;
    232         *pmsg = Message();
    233         continue;
    234       }
    235       return true;
    236     }
    237 
    238     if (fStop_)
    239       break;
    240 
    241     // Which is shorter, the delay wait or the asked wait?
    242 
    243     int cmsNext;
    244     if (cmsWait == kForever) {
    245       cmsNext = cmsDelayNext;
    246     } else {
    247       cmsNext = _max(0, cmsTotal - cmsElapsed);
    248       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
    249         cmsNext = cmsDelayNext;
    250     }
    251 
    252     // Wait and multiplex in the meantime
    253     if (!ss_->Wait(cmsNext, process_io))
    254       return false;
    255 
    256     // If the specified timeout expired, return
    257 
    258     msCurrent = Time();
    259     cmsElapsed = TimeDiff(msCurrent, msStart);
    260     if (cmsWait != kForever) {
    261       if (cmsElapsed >= cmsWait)
    262         return false;
    263     }
    264   }
    265   return false;
    266 }
    267 
    268 void MessageQueue::ReceiveSends() {
    269 }
    270 
    271 void MessageQueue::Post(MessageHandler *phandler, uint32 id,
    272     MessageData *pdata, bool time_sensitive) {
    273   if (fStop_)
    274     return;
    275 
    276   // Keep thread safe
    277   // Add the message to the end of the queue
    278   // Signal for the multiplexer to return
    279 
    280   CritScope cs(&crit_);
    281   Message msg;
    282   msg.phandler = phandler;
    283   msg.message_id = id;
    284   msg.pdata = pdata;
    285   if (time_sensitive) {
    286     msg.ts_sensitive = Time() + kMaxMsgLatency;
    287   }
    288   msgq_.push_back(msg);
    289   ss_->WakeUp();
    290 }
    291 
    292 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp,
    293     MessageHandler *phandler, uint32 id, MessageData* pdata) {
    294   if (fStop_)
    295     return;
    296 
    297   // Keep thread safe
    298   // Add to the priority queue. Gets sorted soonest first.
    299   // Signal for the multiplexer to return.
    300 
    301   CritScope cs(&crit_);
    302   Message msg;
    303   msg.phandler = phandler;
    304   msg.message_id = id;
    305   msg.pdata = pdata;
    306   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
    307   dmsgq_.push(dmsg);
    308   // If this message queue processes 1 message every millisecond for 50 days,
    309   // we will wrap this number.  Even then, only messages with identical times
    310   // will be misordered, and then only briefly.  This is probably ok.
    311   VERIFY(0 != ++dmsgq_next_num_);
    312   ss_->WakeUp();
    313 }
    314 
    315 int MessageQueue::GetDelay() {
    316   CritScope cs(&crit_);
    317 
    318   if (!msgq_.empty())
    319     return 0;
    320 
    321   if (!dmsgq_.empty()) {
    322     int delay = TimeUntil(dmsgq_.top().msTrigger_);
    323     if (delay < 0)
    324       delay = 0;
    325     return delay;
    326   }
    327 
    328   return kForever;
    329 }
    330 
    331 void MessageQueue::Clear(MessageHandler *phandler, uint32 id,
    332                          MessageList* removed) {
    333   CritScope cs(&crit_);
    334 
    335   // Remove messages with phandler
    336 
    337   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
    338     if (removed) {
    339       removed->push_back(msgPeek_);
    340     } else {
    341       delete msgPeek_.pdata;
    342     }
    343     fPeekKeep_ = false;
    344   }
    345 
    346   // Remove from ordered message queue
    347 
    348   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
    349     if (it->Match(phandler, id)) {
    350       if (removed) {
    351         removed->push_back(*it);
    352       } else {
    353         delete it->pdata;
    354       }
    355       it = msgq_.erase(it);
    356     } else {
    357       ++it;
    358     }
    359   }
    360 
    361   // Remove from priority queue. Not directly iterable, so use this approach
    362 
    363   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
    364   for (PriorityQueue::container_type::iterator it = new_end;
    365        it != dmsgq_.container().end(); ++it) {
    366     if (it->msg_.Match(phandler, id)) {
    367       if (removed) {
    368         removed->push_back(it->msg_);
    369       } else {
    370         delete it->msg_.pdata;
    371       }
    372     } else {
    373       *new_end++ = *it;
    374     }
    375   }
    376   dmsgq_.container().erase(new_end, dmsgq_.container().end());
    377   dmsgq_.reheap();
    378 }
    379 
    380 void MessageQueue::Dispatch(Message *pmsg) {
    381   pmsg->phandler->OnMessage(pmsg);
    382 }
    383 
    384 }  // namespace rtc
    385