1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #ifndef WEBRTC_BASE_MESSAGEQUEUE_H_ 12 #define WEBRTC_BASE_MESSAGEQUEUE_H_ 13 14 #include <string.h> 15 16 #include <algorithm> 17 #include <list> 18 #include <queue> 19 #include <vector> 20 21 #include "webrtc/base/basictypes.h" 22 #include "webrtc/base/constructormagic.h" 23 #include "webrtc/base/criticalsection.h" 24 #include "webrtc/base/messagehandler.h" 25 #include "webrtc/base/scoped_ptr.h" 26 #include "webrtc/base/scoped_ref_ptr.h" 27 #include "webrtc/base/sigslot.h" 28 #include "webrtc/base/socketserver.h" 29 #include "webrtc/base/timeutils.h" 30 31 namespace rtc { 32 33 struct Message; 34 class MessageQueue; 35 36 // MessageQueueManager does cleanup of of message queues 37 38 class MessageQueueManager { 39 public: 40 static void Add(MessageQueue *message_queue); 41 static void Remove(MessageQueue *message_queue); 42 static void Clear(MessageHandler *handler); 43 44 // For testing purposes, we expose whether or not the MessageQueueManager 45 // instance has been initialized. It has no other use relative to the rest of 46 // the functions of this class, which auto-initialize the underlying 47 // MessageQueueManager instance when necessary. 48 static bool IsInitialized(); 49 50 private: 51 static MessageQueueManager* Instance(); 52 53 MessageQueueManager(); 54 ~MessageQueueManager(); 55 56 void AddInternal(MessageQueue *message_queue); 57 void RemoveInternal(MessageQueue *message_queue); 58 void ClearInternal(MessageHandler *handler); 59 60 static MessageQueueManager* instance_; 61 // This list contains all live MessageQueues. 62 std::vector<MessageQueue *> message_queues_; 63 CriticalSection crit_; 64 }; 65 66 // Derive from this for specialized data 67 // App manages lifetime, except when messages are purged 68 69 class MessageData { 70 public: 71 MessageData() {} 72 virtual ~MessageData() {} 73 }; 74 75 template <class T> 76 class TypedMessageData : public MessageData { 77 public: 78 explicit TypedMessageData(const T& data) : data_(data) { } 79 const T& data() const { return data_; } 80 T& data() { return data_; } 81 private: 82 T data_; 83 }; 84 85 // Like TypedMessageData, but for pointers that require a delete. 86 template <class T> 87 class ScopedMessageData : public MessageData { 88 public: 89 explicit ScopedMessageData(T* data) : data_(data) { } 90 const scoped_ptr<T>& data() const { return data_; } 91 scoped_ptr<T>& data() { return data_; } 92 private: 93 scoped_ptr<T> data_; 94 }; 95 96 // Like ScopedMessageData, but for reference counted pointers. 97 template <class T> 98 class ScopedRefMessageData : public MessageData { 99 public: 100 explicit ScopedRefMessageData(T* data) : data_(data) { } 101 const scoped_refptr<T>& data() const { return data_; } 102 scoped_refptr<T>& data() { return data_; } 103 private: 104 scoped_refptr<T> data_; 105 }; 106 107 template<class T> 108 inline MessageData* WrapMessageData(const T& data) { 109 return new TypedMessageData<T>(data); 110 } 111 112 template<class T> 113 inline const T& UseMessageData(MessageData* data) { 114 return static_cast< TypedMessageData<T>* >(data)->data(); 115 } 116 117 template<class T> 118 class DisposeData : public MessageData { 119 public: 120 explicit DisposeData(T* data) : data_(data) { } 121 virtual ~DisposeData() { delete data_; } 122 private: 123 T* data_; 124 }; 125 126 const uint32 MQID_ANY = static_cast<uint32>(-1); 127 const uint32 MQID_DISPOSE = static_cast<uint32>(-2); 128 129 // No destructor 130 131 struct Message { 132 Message() { 133 memset(this, 0, sizeof(*this)); 134 } 135 inline bool Match(MessageHandler* handler, uint32 id) const { 136 return (handler == NULL || handler == phandler) 137 && (id == MQID_ANY || id == message_id); 138 } 139 MessageHandler *phandler; 140 uint32 message_id; 141 MessageData *pdata; 142 uint32 ts_sensitive; 143 }; 144 145 typedef std::list<Message> MessageList; 146 147 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages 148 // with the same trigger time are processed in num_ (FIFO) order. 149 150 class DelayedMessage { 151 public: 152 DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) 153 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } 154 155 bool operator< (const DelayedMessage& dmsg) const { 156 return (dmsg.msTrigger_ < msTrigger_) 157 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); 158 } 159 160 int cmsDelay_; // for debugging 161 uint32 msTrigger_; 162 uint32 num_; 163 Message msg_; 164 }; 165 166 class MessageQueue { 167 public: 168 explicit MessageQueue(SocketServer* ss = NULL); 169 virtual ~MessageQueue(); 170 171 SocketServer* socketserver() { return ss_; } 172 void set_socketserver(SocketServer* ss); 173 174 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, 175 // futher Posts and Sends will fail. However, any pending Sends and *ready* 176 // Posts (as opposed to unexpired delayed Posts) will be delivered before 177 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 178 // we eliminate the race condition when an MessageHandler and MessageQueue 179 // may be destroyed independently of each other. 180 virtual void Quit(); 181 virtual bool IsQuitting(); 182 virtual void Restart(); 183 184 // Get() will process I/O until: 185 // 1) A message is available (returns true) 186 // 2) cmsWait seconds have elapsed (returns false) 187 // 3) Stop() is called (returns false) 188 virtual bool Get(Message *pmsg, int cmsWait = kForever, 189 bool process_io = true); 190 virtual bool Peek(Message *pmsg, int cmsWait = 0); 191 virtual void Post(MessageHandler *phandler, uint32 id = 0, 192 MessageData *pdata = NULL, bool time_sensitive = false); 193 virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, 194 uint32 id = 0, MessageData *pdata = NULL) { 195 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 196 } 197 virtual void PostAt(uint32 tstamp, MessageHandler *phandler, 198 uint32 id = 0, MessageData *pdata = NULL) { 199 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 200 } 201 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 202 MessageList* removed = NULL); 203 virtual void Dispatch(Message *pmsg); 204 virtual void ReceiveSends(); 205 206 // Amount of time until the next message can be retrieved 207 virtual int GetDelay(); 208 209 bool empty() const { return size() == 0u; } 210 size_t size() const { 211 CritScope cs(&crit_); // msgq_.size() is not thread safe. 212 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); 213 } 214 215 // Internally posts a message which causes the doomed object to be deleted 216 template<class T> void Dispose(T* doomed) { 217 if (doomed) { 218 Post(NULL, MQID_DISPOSE, new DisposeData<T>(doomed)); 219 } 220 } 221 222 // When this signal is sent out, any references to this queue should 223 // no longer be used. 224 sigslot::signal0<> SignalQueueDestroyed; 225 226 protected: 227 class PriorityQueue : public std::priority_queue<DelayedMessage> { 228 public: 229 container_type& container() { return c; } 230 void reheap() { make_heap(c.begin(), c.end(), comp); } 231 }; 232 233 void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, 234 uint32 id, MessageData* pdata); 235 236 // The SocketServer is not owned by MessageQueue. 237 SocketServer* ss_; 238 // If a server isn't supplied in the constructor, use this one. 239 scoped_ptr<SocketServer> default_ss_; 240 bool fStop_; 241 bool fPeekKeep_; 242 Message msgPeek_; 243 MessageList msgq_; 244 PriorityQueue dmsgq_; 245 uint32 dmsgq_next_num_; 246 mutable CriticalSection crit_; 247 248 private: 249 DISALLOW_COPY_AND_ASSIGN(MessageQueue); 250 }; 251 252 } // namespace rtc 253 254 #endif // WEBRTC_BASE_MESSAGEQUEUE_H_ 255