Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #if defined(WEBRTC_POSIX)
     12 #include <sys/time.h>
     13 #endif
     14 
     15 #include <algorithm>
     16 
     17 #include "webrtc/base/common.h"
     18 #include "webrtc/base/logging.h"
     19 #include "webrtc/base/messagequeue.h"
     20 #if defined(__native_client__)
     21 #include "webrtc/base/nullsocketserver.h"
     22 typedef rtc::NullSocketServer DefaultSocketServer;
     23 #else
     24 #include "webrtc/base/physicalsocketserver.h"
     25 typedef rtc::PhysicalSocketServer DefaultSocketServer;
     26 #endif
     27 
     28 namespace rtc {
     29 
     30 const uint32_t kMaxMsgLatency = 150;  // 150 ms
     31 
     32 //------------------------------------------------------------------
     33 // MessageQueueManager
     34 
     35 MessageQueueManager* MessageQueueManager::instance_ = NULL;
     36 
     37 MessageQueueManager* MessageQueueManager::Instance() {
     38   // Note: This is not thread safe, but it is first called before threads are
     39   // spawned.
     40   if (!instance_)
     41     instance_ = new MessageQueueManager;
     42   return instance_;
     43 }
     44 
     45 bool MessageQueueManager::IsInitialized() {
     46   return instance_ != NULL;
     47 }
     48 
     49 MessageQueueManager::MessageQueueManager() {
     50 }
     51 
     52 MessageQueueManager::~MessageQueueManager() {
     53 }
     54 
     55 void MessageQueueManager::Add(MessageQueue *message_queue) {
     56   return Instance()->AddInternal(message_queue);
     57 }
     58 void MessageQueueManager::AddInternal(MessageQueue *message_queue) {
     59   // MessageQueueManager methods should be non-reentrant, so we
     60   // ASSERT that is the case.  If any of these ASSERT, please
     61   // contact bpm or jbeda.
     62 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
     63   ASSERT(!crit_.CurrentThreadIsOwner());
     64 #endif
     65   CritScope cs(&crit_);
     66   message_queues_.push_back(message_queue);
     67 }
     68 
     69 void MessageQueueManager::Remove(MessageQueue *message_queue) {
     70   // If there isn't a message queue manager instance, then there isn't a queue
     71   // to remove.
     72   if (!instance_) return;
     73   return Instance()->RemoveInternal(message_queue);
     74 }
     75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) {
     76 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
     77   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
     78 #endif
     79   // If this is the last MessageQueue, destroy the manager as well so that
     80   // we don't leak this object at program shutdown. As mentioned above, this is
     81   // not thread-safe, but this should only happen at program termination (when
     82   // the ThreadManager is destroyed, and threads are no longer active).
     83   bool destroy = false;
     84   {
     85     CritScope cs(&crit_);
     86     std::vector<MessageQueue *>::iterator iter;
     87     iter = std::find(message_queues_.begin(), message_queues_.end(),
     88                      message_queue);
     89     if (iter != message_queues_.end()) {
     90       message_queues_.erase(iter);
     91     }
     92     destroy = message_queues_.empty();
     93   }
     94   if (destroy) {
     95     instance_ = NULL;
     96     delete this;
     97   }
     98 }
     99 
    100 void MessageQueueManager::Clear(MessageHandler *handler) {
    101   // If there isn't a message queue manager instance, then there aren't any
    102   // queues to remove this handler from.
    103   if (!instance_) return;
    104   return Instance()->ClearInternal(handler);
    105 }
    106 void MessageQueueManager::ClearInternal(MessageHandler *handler) {
    107 #if CS_DEBUG_CHECKS  // CurrentThreadIsOwner returns true by default.
    108   ASSERT(!crit_.CurrentThreadIsOwner());  // See note above.
    109 #endif
    110   CritScope cs(&crit_);
    111   std::vector<MessageQueue *>::iterator iter;
    112   for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++)
    113     (*iter)->Clear(handler);
    114 }
    115 
    116 //------------------------------------------------------------------
    117 // MessageQueue
    118 
    119 MessageQueue::MessageQueue(SocketServer* ss)
    120     : ss_(ss), fStop_(false), fPeekKeep_(false),
    121       dmsgq_next_num_(0) {
    122   if (!ss_) {
    123     // Currently, MessageQueue holds a socket server, and is the base class for
    124     // Thread.  It seems like it makes more sense for Thread to hold the socket
    125     // server, and provide it to the MessageQueue, since the Thread controls
    126     // the I/O model, and MQ is agnostic to those details.  Anyway, this causes
    127     // messagequeue_unittest to depend on network libraries... yuck.
    128     default_ss_.reset(new DefaultSocketServer());
    129     ss_ = default_ss_.get();
    130   }
    131   ss_->SetMessageQueue(this);
    132   MessageQueueManager::Add(this);
    133 }
    134 
    135 MessageQueue::~MessageQueue() {
    136   // The signal is done from here to ensure
    137   // that it always gets called when the queue
    138   // is going away.
    139   SignalQueueDestroyed();
    140   MessageQueueManager::Remove(this);
    141   Clear(NULL);
    142   if (ss_) {
    143     ss_->SetMessageQueue(NULL);
    144   }
    145 }
    146 
    147 void MessageQueue::set_socketserver(SocketServer* ss) {
    148   ss_ = ss ? ss : default_ss_.get();
    149   ss_->SetMessageQueue(this);
    150 }
    151 
    152 void MessageQueue::Quit() {
    153   fStop_ = true;
    154   ss_->WakeUp();
    155 }
    156 
    157 bool MessageQueue::IsQuitting() {
    158   return fStop_;
    159 }
    160 
    161 void MessageQueue::Restart() {
    162   fStop_ = false;
    163 }
    164 
    165 bool MessageQueue::Peek(Message *pmsg, int cmsWait) {
    166   if (fPeekKeep_) {
    167     *pmsg = msgPeek_;
    168     return true;
    169   }
    170   if (!Get(pmsg, cmsWait))
    171     return false;
    172   msgPeek_ = *pmsg;
    173   fPeekKeep_ = true;
    174   return true;
    175 }
    176 
    177 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) {
    178   // Return and clear peek if present
    179   // Always return the peek if it exists so there is Peek/Get symmetry
    180 
    181   if (fPeekKeep_) {
    182     *pmsg = msgPeek_;
    183     fPeekKeep_ = false;
    184     return true;
    185   }
    186 
    187   // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch
    188 
    189   int cmsTotal = cmsWait;
    190   int cmsElapsed = 0;
    191   uint32_t msStart = Time();
    192   uint32_t msCurrent = msStart;
    193   while (true) {
    194     // Check for sent messages
    195     ReceiveSends();
    196 
    197     // Check for posted events
    198     int cmsDelayNext = kForever;
    199     bool first_pass = true;
    200     while (true) {
    201       // All queue operations need to be locked, but nothing else in this loop
    202       // (specifically handling disposed message) can happen inside the crit.
    203       // Otherwise, disposed MessageHandlers will cause deadlocks.
    204       {
    205         CritScope cs(&crit_);
    206         // On the first pass, check for delayed messages that have been
    207         // triggered and calculate the next trigger time.
    208         if (first_pass) {
    209           first_pass = false;
    210           while (!dmsgq_.empty()) {
    211             if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) {
    212               cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent);
    213               break;
    214             }
    215             msgq_.push_back(dmsgq_.top().msg_);
    216             dmsgq_.pop();
    217           }
    218         }
    219         // Pull a message off the message queue, if available.
    220         if (msgq_.empty()) {
    221           break;
    222         } else {
    223           *pmsg = msgq_.front();
    224           msgq_.pop_front();
    225         }
    226       }  // crit_ is released here.
    227 
    228       // Log a warning for time-sensitive messages that we're late to deliver.
    229       if (pmsg->ts_sensitive) {
    230         int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive);
    231         if (delay > 0) {
    232           LOG_F(LS_WARNING) << "id: " << pmsg->message_id << "  delay: "
    233                             << (delay + kMaxMsgLatency) << "ms";
    234         }
    235       }
    236       // If this was a dispose message, delete it and skip it.
    237       if (MQID_DISPOSE == pmsg->message_id) {
    238         ASSERT(NULL == pmsg->phandler);
    239         delete pmsg->pdata;
    240         *pmsg = Message();
    241         continue;
    242       }
    243       return true;
    244     }
    245 
    246     if (fStop_)
    247       break;
    248 
    249     // Which is shorter, the delay wait or the asked wait?
    250 
    251     int cmsNext;
    252     if (cmsWait == kForever) {
    253       cmsNext = cmsDelayNext;
    254     } else {
    255       cmsNext = std::max(0, cmsTotal - cmsElapsed);
    256       if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext))
    257         cmsNext = cmsDelayNext;
    258     }
    259 
    260     // Wait and multiplex in the meantime
    261     if (!ss_->Wait(cmsNext, process_io))
    262       return false;
    263 
    264     // If the specified timeout expired, return
    265 
    266     msCurrent = Time();
    267     cmsElapsed = TimeDiff(msCurrent, msStart);
    268     if (cmsWait != kForever) {
    269       if (cmsElapsed >= cmsWait)
    270         return false;
    271     }
    272   }
    273   return false;
    274 }
    275 
    276 void MessageQueue::ReceiveSends() {
    277 }
    278 
    279 void MessageQueue::Post(MessageHandler* phandler,
    280                         uint32_t id,
    281                         MessageData* pdata,
    282                         bool time_sensitive) {
    283   if (fStop_)
    284     return;
    285 
    286   // Keep thread safe
    287   // Add the message to the end of the queue
    288   // Signal for the multiplexer to return
    289 
    290   CritScope cs(&crit_);
    291   Message msg;
    292   msg.phandler = phandler;
    293   msg.message_id = id;
    294   msg.pdata = pdata;
    295   if (time_sensitive) {
    296     msg.ts_sensitive = Time() + kMaxMsgLatency;
    297   }
    298   msgq_.push_back(msg);
    299   ss_->WakeUp();
    300 }
    301 
    302 void MessageQueue::PostDelayed(int cmsDelay,
    303                                MessageHandler* phandler,
    304                                uint32_t id,
    305                                MessageData* pdata) {
    306   return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata);
    307 }
    308 
    309 void MessageQueue::PostAt(uint32_t tstamp,
    310                           MessageHandler* phandler,
    311                           uint32_t id,
    312                           MessageData* pdata) {
    313   return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata);
    314 }
    315 
    316 void MessageQueue::DoDelayPost(int cmsDelay,
    317                                uint32_t tstamp,
    318                                MessageHandler* phandler,
    319                                uint32_t id,
    320                                MessageData* pdata) {
    321   if (fStop_)
    322     return;
    323 
    324   // Keep thread safe
    325   // Add to the priority queue. Gets sorted soonest first.
    326   // Signal for the multiplexer to return.
    327 
    328   CritScope cs(&crit_);
    329   Message msg;
    330   msg.phandler = phandler;
    331   msg.message_id = id;
    332   msg.pdata = pdata;
    333   DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg);
    334   dmsgq_.push(dmsg);
    335   // If this message queue processes 1 message every millisecond for 50 days,
    336   // we will wrap this number.  Even then, only messages with identical times
    337   // will be misordered, and then only briefly.  This is probably ok.
    338   VERIFY(0 != ++dmsgq_next_num_);
    339   ss_->WakeUp();
    340 }
    341 
    342 int MessageQueue::GetDelay() {
    343   CritScope cs(&crit_);
    344 
    345   if (!msgq_.empty())
    346     return 0;
    347 
    348   if (!dmsgq_.empty()) {
    349     int delay = TimeUntil(dmsgq_.top().msTrigger_);
    350     if (delay < 0)
    351       delay = 0;
    352     return delay;
    353   }
    354 
    355   return kForever;
    356 }
    357 
    358 void MessageQueue::Clear(MessageHandler* phandler,
    359                          uint32_t id,
    360                          MessageList* removed) {
    361   CritScope cs(&crit_);
    362 
    363   // Remove messages with phandler
    364 
    365   if (fPeekKeep_ && msgPeek_.Match(phandler, id)) {
    366     if (removed) {
    367       removed->push_back(msgPeek_);
    368     } else {
    369       delete msgPeek_.pdata;
    370     }
    371     fPeekKeep_ = false;
    372   }
    373 
    374   // Remove from ordered message queue
    375 
    376   for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) {
    377     if (it->Match(phandler, id)) {
    378       if (removed) {
    379         removed->push_back(*it);
    380       } else {
    381         delete it->pdata;
    382       }
    383       it = msgq_.erase(it);
    384     } else {
    385       ++it;
    386     }
    387   }
    388 
    389   // Remove from priority queue. Not directly iterable, so use this approach
    390 
    391   PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin();
    392   for (PriorityQueue::container_type::iterator it = new_end;
    393        it != dmsgq_.container().end(); ++it) {
    394     if (it->msg_.Match(phandler, id)) {
    395       if (removed) {
    396         removed->push_back(it->msg_);
    397       } else {
    398         delete it->msg_.pdata;
    399       }
    400     } else {
    401       *new_end++ = *it;
    402     }
    403   }
    404   dmsgq_.container().erase(new_end, dmsgq_.container().end());
    405   dmsgq_.reheap();
    406 }
    407 
    408 void MessageQueue::Dispatch(Message *pmsg) {
    409   pmsg->phandler->OnMessage(pmsg);
    410 }
    411 
    412 }  // namespace rtc
    413