1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #if defined(WEBRTC_POSIX) 12 #include <sys/time.h> 13 #endif 14 15 #include <algorithm> 16 17 #include "webrtc/base/common.h" 18 #include "webrtc/base/logging.h" 19 #include "webrtc/base/messagequeue.h" 20 #if defined(__native_client__) 21 #include "webrtc/base/nullsocketserver.h" 22 typedef rtc::NullSocketServer DefaultSocketServer; 23 #else 24 #include "webrtc/base/physicalsocketserver.h" 25 typedef rtc::PhysicalSocketServer DefaultSocketServer; 26 #endif 27 28 namespace rtc { 29 30 const uint32_t kMaxMsgLatency = 150; // 150 ms 31 32 //------------------------------------------------------------------ 33 // MessageQueueManager 34 35 MessageQueueManager* MessageQueueManager::instance_ = NULL; 36 37 MessageQueueManager* MessageQueueManager::Instance() { 38 // Note: This is not thread safe, but it is first called before threads are 39 // spawned. 40 if (!instance_) 41 instance_ = new MessageQueueManager; 42 return instance_; 43 } 44 45 bool MessageQueueManager::IsInitialized() { 46 return instance_ != NULL; 47 } 48 49 MessageQueueManager::MessageQueueManager() { 50 } 51 52 MessageQueueManager::~MessageQueueManager() { 53 } 54 55 void MessageQueueManager::Add(MessageQueue *message_queue) { 56 return Instance()->AddInternal(message_queue); 57 } 58 void MessageQueueManager::AddInternal(MessageQueue *message_queue) { 59 // MessageQueueManager methods should be non-reentrant, so we 60 // ASSERT that is the case. If any of these ASSERT, please 61 // contact bpm or jbeda. 62 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. 63 ASSERT(!crit_.CurrentThreadIsOwner()); 64 #endif 65 CritScope cs(&crit_); 66 message_queues_.push_back(message_queue); 67 } 68 69 void MessageQueueManager::Remove(MessageQueue *message_queue) { 70 // If there isn't a message queue manager instance, then there isn't a queue 71 // to remove. 72 if (!instance_) return; 73 return Instance()->RemoveInternal(message_queue); 74 } 75 void MessageQueueManager::RemoveInternal(MessageQueue *message_queue) { 76 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. 77 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 78 #endif 79 // If this is the last MessageQueue, destroy the manager as well so that 80 // we don't leak this object at program shutdown. As mentioned above, this is 81 // not thread-safe, but this should only happen at program termination (when 82 // the ThreadManager is destroyed, and threads are no longer active). 83 bool destroy = false; 84 { 85 CritScope cs(&crit_); 86 std::vector<MessageQueue *>::iterator iter; 87 iter = std::find(message_queues_.begin(), message_queues_.end(), 88 message_queue); 89 if (iter != message_queues_.end()) { 90 message_queues_.erase(iter); 91 } 92 destroy = message_queues_.empty(); 93 } 94 if (destroy) { 95 instance_ = NULL; 96 delete this; 97 } 98 } 99 100 void MessageQueueManager::Clear(MessageHandler *handler) { 101 // If there isn't a message queue manager instance, then there aren't any 102 // queues to remove this handler from. 103 if (!instance_) return; 104 return Instance()->ClearInternal(handler); 105 } 106 void MessageQueueManager::ClearInternal(MessageHandler *handler) { 107 #if CS_DEBUG_CHECKS // CurrentThreadIsOwner returns true by default. 108 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 109 #endif 110 CritScope cs(&crit_); 111 std::vector<MessageQueue *>::iterator iter; 112 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 113 (*iter)->Clear(handler); 114 } 115 116 //------------------------------------------------------------------ 117 // MessageQueue 118 119 MessageQueue::MessageQueue(SocketServer* ss) 120 : ss_(ss), fStop_(false), fPeekKeep_(false), 121 dmsgq_next_num_(0) { 122 if (!ss_) { 123 // Currently, MessageQueue holds a socket server, and is the base class for 124 // Thread. It seems like it makes more sense for Thread to hold the socket 125 // server, and provide it to the MessageQueue, since the Thread controls 126 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 127 // messagequeue_unittest to depend on network libraries... yuck. 128 default_ss_.reset(new DefaultSocketServer()); 129 ss_ = default_ss_.get(); 130 } 131 ss_->SetMessageQueue(this); 132 MessageQueueManager::Add(this); 133 } 134 135 MessageQueue::~MessageQueue() { 136 // The signal is done from here to ensure 137 // that it always gets called when the queue 138 // is going away. 139 SignalQueueDestroyed(); 140 MessageQueueManager::Remove(this); 141 Clear(NULL); 142 if (ss_) { 143 ss_->SetMessageQueue(NULL); 144 } 145 } 146 147 void MessageQueue::set_socketserver(SocketServer* ss) { 148 ss_ = ss ? ss : default_ss_.get(); 149 ss_->SetMessageQueue(this); 150 } 151 152 void MessageQueue::Quit() { 153 fStop_ = true; 154 ss_->WakeUp(); 155 } 156 157 bool MessageQueue::IsQuitting() { 158 return fStop_; 159 } 160 161 void MessageQueue::Restart() { 162 fStop_ = false; 163 } 164 165 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 166 if (fPeekKeep_) { 167 *pmsg = msgPeek_; 168 return true; 169 } 170 if (!Get(pmsg, cmsWait)) 171 return false; 172 msgPeek_ = *pmsg; 173 fPeekKeep_ = true; 174 return true; 175 } 176 177 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { 178 // Return and clear peek if present 179 // Always return the peek if it exists so there is Peek/Get symmetry 180 181 if (fPeekKeep_) { 182 *pmsg = msgPeek_; 183 fPeekKeep_ = false; 184 return true; 185 } 186 187 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 188 189 int cmsTotal = cmsWait; 190 int cmsElapsed = 0; 191 uint32_t msStart = Time(); 192 uint32_t msCurrent = msStart; 193 while (true) { 194 // Check for sent messages 195 ReceiveSends(); 196 197 // Check for posted events 198 int cmsDelayNext = kForever; 199 bool first_pass = true; 200 while (true) { 201 // All queue operations need to be locked, but nothing else in this loop 202 // (specifically handling disposed message) can happen inside the crit. 203 // Otherwise, disposed MessageHandlers will cause deadlocks. 204 { 205 CritScope cs(&crit_); 206 // On the first pass, check for delayed messages that have been 207 // triggered and calculate the next trigger time. 208 if (first_pass) { 209 first_pass = false; 210 while (!dmsgq_.empty()) { 211 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 212 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 213 break; 214 } 215 msgq_.push_back(dmsgq_.top().msg_); 216 dmsgq_.pop(); 217 } 218 } 219 // Pull a message off the message queue, if available. 220 if (msgq_.empty()) { 221 break; 222 } else { 223 *pmsg = msgq_.front(); 224 msgq_.pop_front(); 225 } 226 } // crit_ is released here. 227 228 // Log a warning for time-sensitive messages that we're late to deliver. 229 if (pmsg->ts_sensitive) { 230 int32_t delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 231 if (delay > 0) { 232 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 233 << (delay + kMaxMsgLatency) << "ms"; 234 } 235 } 236 // If this was a dispose message, delete it and skip it. 237 if (MQID_DISPOSE == pmsg->message_id) { 238 ASSERT(NULL == pmsg->phandler); 239 delete pmsg->pdata; 240 *pmsg = Message(); 241 continue; 242 } 243 return true; 244 } 245 246 if (fStop_) 247 break; 248 249 // Which is shorter, the delay wait or the asked wait? 250 251 int cmsNext; 252 if (cmsWait == kForever) { 253 cmsNext = cmsDelayNext; 254 } else { 255 cmsNext = std::max(0, cmsTotal - cmsElapsed); 256 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 257 cmsNext = cmsDelayNext; 258 } 259 260 // Wait and multiplex in the meantime 261 if (!ss_->Wait(cmsNext, process_io)) 262 return false; 263 264 // If the specified timeout expired, return 265 266 msCurrent = Time(); 267 cmsElapsed = TimeDiff(msCurrent, msStart); 268 if (cmsWait != kForever) { 269 if (cmsElapsed >= cmsWait) 270 return false; 271 } 272 } 273 return false; 274 } 275 276 void MessageQueue::ReceiveSends() { 277 } 278 279 void MessageQueue::Post(MessageHandler* phandler, 280 uint32_t id, 281 MessageData* pdata, 282 bool time_sensitive) { 283 if (fStop_) 284 return; 285 286 // Keep thread safe 287 // Add the message to the end of the queue 288 // Signal for the multiplexer to return 289 290 CritScope cs(&crit_); 291 Message msg; 292 msg.phandler = phandler; 293 msg.message_id = id; 294 msg.pdata = pdata; 295 if (time_sensitive) { 296 msg.ts_sensitive = Time() + kMaxMsgLatency; 297 } 298 msgq_.push_back(msg); 299 ss_->WakeUp(); 300 } 301 302 void MessageQueue::PostDelayed(int cmsDelay, 303 MessageHandler* phandler, 304 uint32_t id, 305 MessageData* pdata) { 306 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 307 } 308 309 void MessageQueue::PostAt(uint32_t tstamp, 310 MessageHandler* phandler, 311 uint32_t id, 312 MessageData* pdata) { 313 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 314 } 315 316 void MessageQueue::DoDelayPost(int cmsDelay, 317 uint32_t tstamp, 318 MessageHandler* phandler, 319 uint32_t id, 320 MessageData* pdata) { 321 if (fStop_) 322 return; 323 324 // Keep thread safe 325 // Add to the priority queue. Gets sorted soonest first. 326 // Signal for the multiplexer to return. 327 328 CritScope cs(&crit_); 329 Message msg; 330 msg.phandler = phandler; 331 msg.message_id = id; 332 msg.pdata = pdata; 333 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 334 dmsgq_.push(dmsg); 335 // If this message queue processes 1 message every millisecond for 50 days, 336 // we will wrap this number. Even then, only messages with identical times 337 // will be misordered, and then only briefly. This is probably ok. 338 VERIFY(0 != ++dmsgq_next_num_); 339 ss_->WakeUp(); 340 } 341 342 int MessageQueue::GetDelay() { 343 CritScope cs(&crit_); 344 345 if (!msgq_.empty()) 346 return 0; 347 348 if (!dmsgq_.empty()) { 349 int delay = TimeUntil(dmsgq_.top().msTrigger_); 350 if (delay < 0) 351 delay = 0; 352 return delay; 353 } 354 355 return kForever; 356 } 357 358 void MessageQueue::Clear(MessageHandler* phandler, 359 uint32_t id, 360 MessageList* removed) { 361 CritScope cs(&crit_); 362 363 // Remove messages with phandler 364 365 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { 366 if (removed) { 367 removed->push_back(msgPeek_); 368 } else { 369 delete msgPeek_.pdata; 370 } 371 fPeekKeep_ = false; 372 } 373 374 // Remove from ordered message queue 375 376 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { 377 if (it->Match(phandler, id)) { 378 if (removed) { 379 removed->push_back(*it); 380 } else { 381 delete it->pdata; 382 } 383 it = msgq_.erase(it); 384 } else { 385 ++it; 386 } 387 } 388 389 // Remove from priority queue. Not directly iterable, so use this approach 390 391 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); 392 for (PriorityQueue::container_type::iterator it = new_end; 393 it != dmsgq_.container().end(); ++it) { 394 if (it->msg_.Match(phandler, id)) { 395 if (removed) { 396 removed->push_back(it->msg_); 397 } else { 398 delete it->msg_.pdata; 399 } 400 } else { 401 *new_end++ = *it; 402 } 403 } 404 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 405 dmsgq_.reheap(); 406 } 407 408 void MessageQueue::Dispatch(Message *pmsg) { 409 pmsg->phandler->OnMessage(pmsg); 410 } 411 412 } // namespace rtc 413