1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifdef POSIX 29 #include <sys/time.h> 30 #endif 31 32 #include "talk/base/common.h" 33 #include "talk/base/logging.h" 34 #include "talk/base/messagequeue.h" 35 #include "talk/base/physicalsocketserver.h" 36 37 38 namespace talk_base { 39 40 const uint32 kMaxMsgLatency = 150; // 150 ms 41 42 //------------------------------------------------------------------ 43 // MessageQueueManager 44 45 MessageQueueManager* MessageQueueManager::instance_; 46 47 MessageQueueManager* MessageQueueManager::Instance() { 48 // Note: This is not thread safe, but it is first called before threads are 49 // spawned. 50 if (!instance_) 51 instance_ = new MessageQueueManager; 52 return instance_; 53 } 54 55 MessageQueueManager::MessageQueueManager() { 56 } 57 58 MessageQueueManager::~MessageQueueManager() { 59 } 60 61 void MessageQueueManager::Add(MessageQueue *message_queue) { 62 // MessageQueueManager methods should be non-reentrant, so we 63 // ASSERT that is the case. If any of these ASSERT, please 64 // contact bpm or jbeda. 65 ASSERT(!crit_.CurrentThreadIsOwner()); 66 CritScope cs(&crit_); 67 message_queues_.push_back(message_queue); 68 } 69 70 void MessageQueueManager::Remove(MessageQueue *message_queue) { 71 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 72 // If this is the last MessageQueue, destroy the manager as well so that 73 // we don't leak this object at program shutdown. As mentioned above, this is 74 // not thread-safe, but this should only happen at program termination (when 75 // the ThreadManager is destroyed, and threads are no longer active). 76 bool destroy = false; 77 { 78 CritScope cs(&crit_); 79 std::vector<MessageQueue *>::iterator iter; 80 iter = std::find(message_queues_.begin(), message_queues_.end(), 81 message_queue); 82 if (iter != message_queues_.end()) { 83 message_queues_.erase(iter); 84 } 85 destroy = message_queues_.empty(); 86 } 87 if (destroy) { 88 instance_ = NULL; 89 delete this; 90 } 91 } 92 93 void MessageQueueManager::Clear(MessageHandler *handler) { 94 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 95 CritScope cs(&crit_); 96 std::vector<MessageQueue *>::iterator iter; 97 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 98 (*iter)->Clear(handler); 99 } 100 101 //------------------------------------------------------------------ 102 // MessageQueue 103 104 MessageQueue::MessageQueue(SocketServer* ss) 105 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), 106 dmsgq_next_num_(0) { 107 if (!ss_) { 108 // Currently, MessageQueue holds a socket server, and is the base class for 109 // Thread. It seems like it makes more sense for Thread to hold the socket 110 // server, and provide it to the MessageQueue, since the Thread controls 111 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 112 // messagequeue_unittest to depend on network libraries... yuck. 113 default_ss_.reset(new PhysicalSocketServer()); 114 ss_ = default_ss_.get(); 115 } 116 ss_->SetMessageQueue(this); 117 } 118 119 MessageQueue::~MessageQueue() { 120 // The signal is done from here to ensure 121 // that it always gets called when the queue 122 // is going away. 123 SignalQueueDestroyed(); 124 if (active_) { 125 MessageQueueManager::Instance()->Remove(this); 126 Clear(NULL); 127 } 128 if (ss_) { 129 ss_->SetMessageQueue(NULL); 130 } 131 } 132 133 void MessageQueue::set_socketserver(SocketServer* ss) { 134 ss_ = ss ? ss : default_ss_.get(); 135 ss_->SetMessageQueue(this); 136 } 137 138 void MessageQueue::Quit() { 139 fStop_ = true; 140 ss_->WakeUp(); 141 } 142 143 bool MessageQueue::IsQuitting() { 144 return fStop_; 145 } 146 147 void MessageQueue::Restart() { 148 fStop_ = false; 149 } 150 151 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 152 if (fPeekKeep_) { 153 *pmsg = msgPeek_; 154 return true; 155 } 156 if (!Get(pmsg, cmsWait)) 157 return false; 158 msgPeek_ = *pmsg; 159 fPeekKeep_ = true; 160 return true; 161 } 162 163 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { 164 // Return and clear peek if present 165 // Always return the peek if it exists so there is Peek/Get symmetry 166 167 if (fPeekKeep_) { 168 *pmsg = msgPeek_; 169 fPeekKeep_ = false; 170 return true; 171 } 172 173 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 174 175 int cmsTotal = cmsWait; 176 int cmsElapsed = 0; 177 uint32 msStart = Time(); 178 uint32 msCurrent = msStart; 179 while (true) { 180 // Check for sent messages 181 ReceiveSends(); 182 183 // Check for posted events 184 int cmsDelayNext = kForever; 185 bool first_pass = true; 186 while (true) { 187 // All queue operations need to be locked, but nothing else in this loop 188 // (specifically handling disposed message) can happen inside the crit. 189 // Otherwise, disposed MessageHandlers will cause deadlocks. 190 { 191 CritScope cs(&crit_); 192 // On the first pass, check for delayed messages that have been 193 // triggered and calculate the next trigger time. 194 if (first_pass) { 195 first_pass = false; 196 while (!dmsgq_.empty()) { 197 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 198 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 199 break; 200 } 201 msgq_.push_back(dmsgq_.top().msg_); 202 dmsgq_.pop(); 203 } 204 } 205 // Pull a message off the message queue, if available. 206 if (msgq_.empty()) { 207 break; 208 } else { 209 *pmsg = msgq_.front(); 210 msgq_.pop_front(); 211 } 212 } // crit_ is released here. 213 214 // Log a warning for time-sensitive messages that we're late to deliver. 215 if (pmsg->ts_sensitive) { 216 int32 delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 217 if (delay > 0) { 218 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 219 << (delay + kMaxMsgLatency) << "ms"; 220 } 221 } 222 // If this was a dispose message, delete it and skip it. 223 if (MQID_DISPOSE == pmsg->message_id) { 224 ASSERT(NULL == pmsg->phandler); 225 delete pmsg->pdata; 226 *pmsg = Message(); 227 continue; 228 } 229 return true; 230 } 231 232 if (fStop_) 233 break; 234 235 // Which is shorter, the delay wait or the asked wait? 236 237 int cmsNext; 238 if (cmsWait == kForever) { 239 cmsNext = cmsDelayNext; 240 } else { 241 cmsNext = _max(0, cmsTotal - cmsElapsed); 242 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 243 cmsNext = cmsDelayNext; 244 } 245 246 // Wait and multiplex in the meantime 247 if (!ss_->Wait(cmsNext, process_io)) 248 return false; 249 250 // If the specified timeout expired, return 251 252 msCurrent = Time(); 253 cmsElapsed = TimeDiff(msCurrent, msStart); 254 if (cmsWait != kForever) { 255 if (cmsElapsed >= cmsWait) 256 return false; 257 } 258 } 259 return false; 260 } 261 262 void MessageQueue::ReceiveSends() { 263 } 264 265 void MessageQueue::Post(MessageHandler *phandler, uint32 id, 266 MessageData *pdata, bool time_sensitive) { 267 if (fStop_) 268 return; 269 270 // Keep thread safe 271 // Add the message to the end of the queue 272 // Signal for the multiplexer to return 273 274 CritScope cs(&crit_); 275 EnsureActive(); 276 Message msg; 277 msg.phandler = phandler; 278 msg.message_id = id; 279 msg.pdata = pdata; 280 if (time_sensitive) { 281 msg.ts_sensitive = Time() + kMaxMsgLatency; 282 } 283 msgq_.push_back(msg); 284 ss_->WakeUp(); 285 } 286 287 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, 288 MessageHandler *phandler, uint32 id, MessageData* pdata) { 289 if (fStop_) 290 return; 291 292 // Keep thread safe 293 // Add to the priority queue. Gets sorted soonest first. 294 // Signal for the multiplexer to return. 295 296 CritScope cs(&crit_); 297 EnsureActive(); 298 Message msg; 299 msg.phandler = phandler; 300 msg.message_id = id; 301 msg.pdata = pdata; 302 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 303 dmsgq_.push(dmsg); 304 // If this message queue processes 1 message every millisecond for 50 days, 305 // we will wrap this number. Even then, only messages with identical times 306 // will be misordered, and then only briefly. This is probably ok. 307 VERIFY(0 != ++dmsgq_next_num_); 308 ss_->WakeUp(); 309 } 310 311 int MessageQueue::GetDelay() { 312 CritScope cs(&crit_); 313 314 if (!msgq_.empty()) 315 return 0; 316 317 if (!dmsgq_.empty()) { 318 int delay = TimeUntil(dmsgq_.top().msTrigger_); 319 if (delay < 0) 320 delay = 0; 321 return delay; 322 } 323 324 return kForever; 325 } 326 327 void MessageQueue::Clear(MessageHandler *phandler, uint32 id, 328 MessageList* removed) { 329 CritScope cs(&crit_); 330 331 // Remove messages with phandler 332 333 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { 334 if (removed) { 335 removed->push_back(msgPeek_); 336 } else { 337 delete msgPeek_.pdata; 338 } 339 fPeekKeep_ = false; 340 } 341 342 // Remove from ordered message queue 343 344 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { 345 if (it->Match(phandler, id)) { 346 if (removed) { 347 removed->push_back(*it); 348 } else { 349 delete it->pdata; 350 } 351 it = msgq_.erase(it); 352 } else { 353 ++it; 354 } 355 } 356 357 // Remove from priority queue. Not directly iterable, so use this approach 358 359 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); 360 for (PriorityQueue::container_type::iterator it = new_end; 361 it != dmsgq_.container().end(); ++it) { 362 if (it->msg_.Match(phandler, id)) { 363 if (removed) { 364 removed->push_back(it->msg_); 365 } else { 366 delete it->msg_.pdata; 367 } 368 } else { 369 *new_end++ = *it; 370 } 371 } 372 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 373 dmsgq_.reheap(); 374 } 375 376 void MessageQueue::Dispatch(Message *pmsg) { 377 pmsg->phandler->OnMessage(pmsg); 378 } 379 380 void MessageQueue::EnsureActive() { 381 ASSERT(crit_.CurrentThreadIsOwner()); 382 if (!active_) { 383 active_ = true; 384 MessageQueueManager::Instance()->Add(this); 385 } 386 } 387 388 } // namespace talk_base 389