1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #if defined(_MSC_VER) && _MSC_VER < 1300 29 #pragma warning(disable:4786) 30 #endif 31 32 #ifdef POSIX 33 #include <sys/time.h> 34 #endif 35 36 #include "talk/base/common.h" 37 #include "talk/base/logging.h" 38 #include "talk/base/messagequeue.h" 39 #include "talk/base/physicalsocketserver.h" 40 41 42 namespace talk_base { 43 44 const uint32 kMaxMsgLatency = 150; // 150 ms 45 46 //------------------------------------------------------------------ 47 // MessageQueueManager 48 49 MessageQueueManager* MessageQueueManager::instance_; 50 51 MessageQueueManager* MessageQueueManager::Instance() { 52 // Note: This is not thread safe, but it is first called before threads are 53 // spawned. 54 if (!instance_) 55 instance_ = new MessageQueueManager; 56 return instance_; 57 } 58 59 MessageQueueManager::MessageQueueManager() { 60 } 61 62 MessageQueueManager::~MessageQueueManager() { 63 } 64 65 void MessageQueueManager::Add(MessageQueue *message_queue) { 66 // MessageQueueManager methods should be non-reentrant, so we 67 // ASSERT that is the case. If any of these ASSERT, please 68 // contact bpm or jbeda. 69 ASSERT(!crit_.CurrentThreadIsOwner()); 70 CritScope cs(&crit_); 71 message_queues_.push_back(message_queue); 72 } 73 74 void MessageQueueManager::Remove(MessageQueue *message_queue) { 75 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 76 // If this is the last MessageQueue, destroy the manager as well so that 77 // we don't leak this object at program shutdown. As mentioned above, this is 78 // not thread-safe, but this should only happen at program termination (when 79 // the ThreadManager is destroyed, and threads are no longer active). 80 bool destroy = false; 81 { 82 CritScope cs(&crit_); 83 std::vector<MessageQueue *>::iterator iter; 84 iter = std::find(message_queues_.begin(), message_queues_.end(), 85 message_queue); 86 if (iter != message_queues_.end()) { 87 message_queues_.erase(iter); 88 } 89 destroy = message_queues_.empty(); 90 } 91 if (destroy) { 92 instance_ = NULL; 93 delete this; 94 } 95 } 96 97 void MessageQueueManager::Clear(MessageHandler *handler) { 98 ASSERT(!crit_.CurrentThreadIsOwner()); // See note above. 99 CritScope cs(&crit_); 100 std::vector<MessageQueue *>::iterator iter; 101 for (iter = message_queues_.begin(); iter != message_queues_.end(); iter++) 102 (*iter)->Clear(handler); 103 } 104 105 //------------------------------------------------------------------ 106 // MessageQueue 107 108 MessageQueue::MessageQueue(SocketServer* ss) 109 : ss_(ss), fStop_(false), fPeekKeep_(false), active_(false), 110 dmsgq_next_num_(0) { 111 if (!ss_) { 112 // Currently, MessageQueue holds a socket server, and is the base class for 113 // Thread. It seems like it makes more sense for Thread to hold the socket 114 // server, and provide it to the MessageQueue, since the Thread controls 115 // the I/O model, and MQ is agnostic to those details. Anyway, this causes 116 // messagequeue_unittest to depend on network libraries... yuck. 117 default_ss_.reset(new PhysicalSocketServer()); 118 ss_ = default_ss_.get(); 119 } 120 ss_->SetMessageQueue(this); 121 } 122 123 MessageQueue::~MessageQueue() { 124 // The signal is done from here to ensure 125 // that it always gets called when the queue 126 // is going away. 127 SignalQueueDestroyed(); 128 if (active_) { 129 MessageQueueManager::Instance()->Remove(this); 130 Clear(NULL); 131 } 132 if (ss_) { 133 ss_->SetMessageQueue(NULL); 134 } 135 } 136 137 void MessageQueue::set_socketserver(SocketServer* ss) { 138 ss_ = ss ? ss : default_ss_.get(); 139 ss_->SetMessageQueue(this); 140 } 141 142 void MessageQueue::Quit() { 143 fStop_ = true; 144 ss_->WakeUp(); 145 } 146 147 bool MessageQueue::IsQuitting() { 148 return fStop_; 149 } 150 151 void MessageQueue::Restart() { 152 fStop_ = false; 153 } 154 155 bool MessageQueue::Peek(Message *pmsg, int cmsWait) { 156 if (fPeekKeep_) { 157 *pmsg = msgPeek_; 158 return true; 159 } 160 if (!Get(pmsg, cmsWait)) 161 return false; 162 msgPeek_ = *pmsg; 163 fPeekKeep_ = true; 164 return true; 165 } 166 167 bool MessageQueue::Get(Message *pmsg, int cmsWait, bool process_io) { 168 // Return and clear peek if present 169 // Always return the peek if it exists so there is Peek/Get symmetry 170 171 if (fPeekKeep_) { 172 *pmsg = msgPeek_; 173 fPeekKeep_ = false; 174 return true; 175 } 176 177 // Get w/wait + timer scan / dispatch + socket / event multiplexer dispatch 178 179 int cmsTotal = cmsWait; 180 int cmsElapsed = 0; 181 uint32 msStart = Time(); 182 uint32 msCurrent = msStart; 183 while (true) { 184 // Check for sent messages 185 186 ReceiveSends(); 187 188 // Check queues 189 190 int cmsDelayNext = kForever; 191 { 192 CritScope cs(&crit_); 193 194 // Check for delayed messages that have been triggered 195 // Calc the next trigger too 196 197 while (!dmsgq_.empty()) { 198 if (TimeIsLater(msCurrent, dmsgq_.top().msTrigger_)) { 199 cmsDelayNext = TimeDiff(dmsgq_.top().msTrigger_, msCurrent); 200 break; 201 } 202 msgq_.push_back(dmsgq_.top().msg_); 203 dmsgq_.pop(); 204 } 205 206 // Check for posted events 207 208 while (!msgq_.empty()) { 209 *pmsg = msgq_.front(); 210 if (pmsg->ts_sensitive) { 211 long delay = TimeDiff(msCurrent, pmsg->ts_sensitive); 212 if (delay > 0) { 213 LOG_F(LS_WARNING) << "id: " << pmsg->message_id << " delay: " 214 << (delay + kMaxMsgLatency) << "ms"; 215 } 216 } 217 msgq_.pop_front(); 218 if (MQID_DISPOSE == pmsg->message_id) { 219 ASSERT(NULL == pmsg->phandler); 220 delete pmsg->pdata; 221 continue; 222 } 223 return true; 224 } 225 } 226 227 if (fStop_) 228 break; 229 230 // Which is shorter, the delay wait or the asked wait? 231 232 int cmsNext; 233 if (cmsWait == kForever) { 234 cmsNext = cmsDelayNext; 235 } else { 236 cmsNext = _max(0, cmsTotal - cmsElapsed); 237 if ((cmsDelayNext != kForever) && (cmsDelayNext < cmsNext)) 238 cmsNext = cmsDelayNext; 239 } 240 241 // Wait and multiplex in the meantime 242 if (!ss_->Wait(cmsNext, process_io)) 243 return false; 244 245 // If the specified timeout expired, return 246 247 msCurrent = Time(); 248 cmsElapsed = TimeDiff(msCurrent, msStart); 249 if (cmsWait != kForever) { 250 if (cmsElapsed >= cmsWait) 251 return false; 252 } 253 } 254 return false; 255 } 256 257 void MessageQueue::ReceiveSends() { 258 } 259 260 void MessageQueue::Post(MessageHandler *phandler, uint32 id, 261 MessageData *pdata, bool time_sensitive) { 262 if (fStop_) 263 return; 264 265 // Keep thread safe 266 // Add the message to the end of the queue 267 // Signal for the multiplexer to return 268 269 CritScope cs(&crit_); 270 EnsureActive(); 271 Message msg; 272 msg.phandler = phandler; 273 msg.message_id = id; 274 msg.pdata = pdata; 275 if (time_sensitive) { 276 msg.ts_sensitive = Time() + kMaxMsgLatency; 277 } 278 msgq_.push_back(msg); 279 ss_->WakeUp(); 280 } 281 282 void MessageQueue::DoDelayPost(int cmsDelay, uint32 tstamp, 283 MessageHandler *phandler, uint32 id, MessageData* pdata) { 284 if (fStop_) 285 return; 286 287 // Keep thread safe 288 // Add to the priority queue. Gets sorted soonest first. 289 // Signal for the multiplexer to return. 290 291 CritScope cs(&crit_); 292 EnsureActive(); 293 Message msg; 294 msg.phandler = phandler; 295 msg.message_id = id; 296 msg.pdata = pdata; 297 DelayedMessage dmsg(cmsDelay, tstamp, dmsgq_next_num_, msg); 298 dmsgq_.push(dmsg); 299 // If this message queue processes 1 message every millisecond for 50 days, 300 // we will wrap this number. Even then, only messages with identical times 301 // will be misordered, and then only briefly. This is probably ok. 302 VERIFY(0 != ++dmsgq_next_num_); 303 ss_->WakeUp(); 304 } 305 306 int MessageQueue::GetDelay() { 307 CritScope cs(&crit_); 308 309 if (!msgq_.empty()) 310 return 0; 311 312 if (!dmsgq_.empty()) { 313 int delay = TimeUntil(dmsgq_.top().msTrigger_); 314 if (delay < 0) 315 delay = 0; 316 return delay; 317 } 318 319 return kForever; 320 } 321 322 void MessageQueue::Clear(MessageHandler *phandler, uint32 id, 323 MessageList* removed) { 324 CritScope cs(&crit_); 325 326 // Remove messages with phandler 327 328 if (fPeekKeep_ && msgPeek_.Match(phandler, id)) { 329 if (removed) { 330 removed->push_back(msgPeek_); 331 } else { 332 delete msgPeek_.pdata; 333 } 334 fPeekKeep_ = false; 335 } 336 337 // Remove from ordered message queue 338 339 for (MessageList::iterator it = msgq_.begin(); it != msgq_.end();) { 340 if (it->Match(phandler, id)) { 341 if (removed) { 342 removed->push_back(*it); 343 } else { 344 delete it->pdata; 345 } 346 it = msgq_.erase(it); 347 } else { 348 ++it; 349 } 350 } 351 352 // Remove from priority queue. Not directly iterable, so use this approach 353 354 PriorityQueue::container_type::iterator new_end = dmsgq_.container().begin(); 355 for (PriorityQueue::container_type::iterator it = new_end; 356 it != dmsgq_.container().end(); ++it) { 357 if (it->msg_.Match(phandler, id)) { 358 if (removed) { 359 removed->push_back(it->msg_); 360 } else { 361 delete it->msg_.pdata; 362 } 363 } else { 364 *new_end++ = *it; 365 } 366 } 367 dmsgq_.container().erase(new_end, dmsgq_.container().end()); 368 dmsgq_.reheap(); 369 } 370 371 void MessageQueue::Dispatch(Message *pmsg) { 372 pmsg->phandler->OnMessage(pmsg); 373 } 374 375 void MessageQueue::EnsureActive() { 376 ASSERT(crit_.CurrentThreadIsOwner()); 377 if (!active_) { 378 active_ = true; 379 MessageQueueManager::Instance()->Add(this); 380 } 381 } 382 383 } // namespace talk_base 384