1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifndef TALK_BASE_MESSAGEQUEUE_H_ 29 #define TALK_BASE_MESSAGEQUEUE_H_ 30 31 #include <algorithm> 32 #include <cstring> 33 #include <list> 34 #include <queue> 35 #include <vector> 36 37 #include "talk/base/basictypes.h" 38 #include "talk/base/constructormagic.h" 39 #include "talk/base/criticalsection.h" 40 #include "talk/base/messagehandler.h" 41 #include "talk/base/scoped_ptr.h" 42 #include "talk/base/scoped_ref_ptr.h" 43 #include "talk/base/sigslot.h" 44 #include "talk/base/socketserver.h" 45 #include "talk/base/timeutils.h" 46 47 namespace talk_base { 48 49 struct Message; 50 class MessageQueue; 51 52 // MessageQueueManager does cleanup of of message queues 53 54 class MessageQueueManager { 55 public: 56 static void Add(MessageQueue *message_queue); 57 static void Remove(MessageQueue *message_queue); 58 static void Clear(MessageHandler *handler); 59 60 // For testing purposes, we expose whether or not the MessageQueueManager 61 // instance has been initialized. It has no other use relative to the rest of 62 // the functions of this class, which auto-initialize the underlying 63 // MessageQueueManager instance when necessary. 64 static bool IsInitialized(); 65 66 private: 67 static MessageQueueManager* Instance(); 68 69 MessageQueueManager(); 70 ~MessageQueueManager(); 71 72 void AddInternal(MessageQueue *message_queue); 73 void RemoveInternal(MessageQueue *message_queue); 74 void ClearInternal(MessageHandler *handler); 75 76 static MessageQueueManager* instance_; 77 // This list contains 'active' MessageQueues. 78 std::vector<MessageQueue *> message_queues_; 79 CriticalSection crit_; 80 }; 81 82 // Derive from this for specialized data 83 // App manages lifetime, except when messages are purged 84 85 class MessageData { 86 public: 87 MessageData() {} 88 virtual ~MessageData() {} 89 }; 90 91 template <class T> 92 class TypedMessageData : public MessageData { 93 public: 94 explicit TypedMessageData(const T& data) : data_(data) { } 95 const T& data() const { return data_; } 96 T& data() { return data_; } 97 private: 98 T data_; 99 }; 100 101 // Like TypedMessageData, but for pointers that require a delete. 102 template <class T> 103 class ScopedMessageData : public MessageData { 104 public: 105 explicit ScopedMessageData(T* data) : data_(data) { } 106 const scoped_ptr<T>& data() const { return data_; } 107 scoped_ptr<T>& data() { return data_; } 108 private: 109 scoped_ptr<T> data_; 110 }; 111 112 // Like ScopedMessageData, but for reference counted pointers. 113 template <class T> 114 class ScopedRefMessageData : public MessageData { 115 public: 116 explicit ScopedRefMessageData(T* data) : data_(data) { } 117 const scoped_refptr<T>& data() const { return data_; } 118 scoped_refptr<T>& data() { return data_; } 119 private: 120 scoped_refptr<T> data_; 121 }; 122 123 template<class T> 124 inline MessageData* WrapMessageData(const T& data) { 125 return new TypedMessageData<T>(data); 126 } 127 128 template<class T> 129 inline const T& UseMessageData(MessageData* data) { 130 return static_cast< TypedMessageData<T>* >(data)->data(); 131 } 132 133 template<class T> 134 class DisposeData : public MessageData { 135 public: 136 explicit DisposeData(T* data) : data_(data) { } 137 virtual ~DisposeData() { delete data_; } 138 private: 139 T* data_; 140 }; 141 142 const uint32 MQID_ANY = static_cast<uint32>(-1); 143 const uint32 MQID_DISPOSE = static_cast<uint32>(-2); 144 145 // No destructor 146 147 struct Message { 148 Message() { 149 memset(this, 0, sizeof(*this)); 150 } 151 inline bool Match(MessageHandler* handler, uint32 id) const { 152 return (handler == NULL || handler == phandler) 153 && (id == MQID_ANY || id == message_id); 154 } 155 MessageHandler *phandler; 156 uint32 message_id; 157 MessageData *pdata; 158 uint32 ts_sensitive; 159 }; 160 161 typedef std::list<Message> MessageList; 162 163 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages 164 // with the same trigger time are processed in num_ (FIFO) order. 165 166 class DelayedMessage { 167 public: 168 DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) 169 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } 170 171 bool operator< (const DelayedMessage& dmsg) const { 172 return (dmsg.msTrigger_ < msTrigger_) 173 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); 174 } 175 176 int cmsDelay_; // for debugging 177 uint32 msTrigger_; 178 uint32 num_; 179 Message msg_; 180 }; 181 182 class MessageQueue { 183 public: 184 explicit MessageQueue(SocketServer* ss = NULL); 185 virtual ~MessageQueue(); 186 187 SocketServer* socketserver() { return ss_; } 188 void set_socketserver(SocketServer* ss); 189 190 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, 191 // futher Posts and Sends will fail. However, any pending Sends and *ready* 192 // Posts (as opposed to unexpired delayed Posts) will be delivered before 193 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 194 // we eliminate the race condition when an MessageHandler and MessageQueue 195 // may be destroyed independently of each other. 196 virtual void Quit(); 197 virtual bool IsQuitting(); 198 virtual void Restart(); 199 200 // Get() will process I/O until: 201 // 1) A message is available (returns true) 202 // 2) cmsWait seconds have elapsed (returns false) 203 // 3) Stop() is called (returns false) 204 virtual bool Get(Message *pmsg, int cmsWait = kForever, 205 bool process_io = true); 206 virtual bool Peek(Message *pmsg, int cmsWait = 0); 207 virtual void Post(MessageHandler *phandler, uint32 id = 0, 208 MessageData *pdata = NULL, bool time_sensitive = false); 209 virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, 210 uint32 id = 0, MessageData *pdata = NULL) { 211 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 212 } 213 virtual void PostAt(uint32 tstamp, MessageHandler *phandler, 214 uint32 id = 0, MessageData *pdata = NULL) { 215 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 216 } 217 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 218 MessageList* removed = NULL); 219 virtual void Dispatch(Message *pmsg); 220 virtual void ReceiveSends(); 221 222 // Amount of time until the next message can be retrieved 223 virtual int GetDelay(); 224 225 bool empty() const { return size() == 0u; } 226 size_t size() const { 227 CritScope cs(&crit_); // msgq_.size() is not thread safe. 228 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); 229 } 230 231 // Internally posts a message which causes the doomed object to be deleted 232 template<class T> void Dispose(T* doomed) { 233 if (doomed) { 234 Post(NULL, MQID_DISPOSE, new DisposeData<T>(doomed)); 235 } 236 } 237 238 // When this signal is sent out, any references to this queue should 239 // no longer be used. 240 sigslot::signal0<> SignalQueueDestroyed; 241 242 protected: 243 class PriorityQueue : public std::priority_queue<DelayedMessage> { 244 public: 245 container_type& container() { return c; } 246 void reheap() { make_heap(c.begin(), c.end(), comp); } 247 }; 248 249 void EnsureActive(); 250 void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, 251 uint32 id, MessageData* pdata); 252 253 // The SocketServer is not owned by MessageQueue. 254 SocketServer* ss_; 255 // If a server isn't supplied in the constructor, use this one. 256 scoped_ptr<SocketServer> default_ss_; 257 bool fStop_; 258 bool fPeekKeep_; 259 Message msgPeek_; 260 // A message queue is active if it has ever had a message posted to it. 261 // This also corresponds to being in MessageQueueManager's global list. 262 bool active_; 263 MessageList msgq_; 264 PriorityQueue dmsgq_; 265 uint32 dmsgq_next_num_; 266 mutable CriticalSection crit_; 267 268 private: 269 DISALLOW_COPY_AND_ASSIGN(MessageQueue); 270 }; 271 272 } // namespace talk_base 273 274 #endif // TALK_BASE_MESSAGEQUEUE_H_ 275