1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifndef TALK_BASE_MESSAGEQUEUE_H_ 29 #define TALK_BASE_MESSAGEQUEUE_H_ 30 31 #include <algorithm> 32 #include <cstring> 33 #include <list> 34 #include <queue> 35 #include <vector> 36 37 #include "talk/base/basictypes.h" 38 #include "talk/base/constructormagic.h" 39 #include "talk/base/criticalsection.h" 40 #include "talk/base/messagehandler.h" 41 #include "talk/base/scoped_ptr.h" 42 #include "talk/base/scoped_ref_ptr.h" 43 #include "talk/base/sigslot.h" 44 #include "talk/base/socketserver.h" 45 #include "talk/base/timeutils.h" 46 47 namespace talk_base { 48 49 struct Message; 50 class MessageQueue; 51 52 // MessageQueueManager does cleanup of of message queues 53 54 class MessageQueueManager { 55 public: 56 static MessageQueueManager* Instance(); 57 58 void Add(MessageQueue *message_queue); 59 void Remove(MessageQueue *message_queue); 60 void Clear(MessageHandler *handler); 61 62 private: 63 MessageQueueManager(); 64 ~MessageQueueManager(); 65 66 static MessageQueueManager* instance_; 67 // This list contains 'active' MessageQueues. 68 std::vector<MessageQueue *> message_queues_; 69 CriticalSection crit_; 70 }; 71 72 // Derive from this for specialized data 73 // App manages lifetime, except when messages are purged 74 75 class MessageData { 76 public: 77 MessageData() {} 78 virtual ~MessageData() {} 79 }; 80 81 template <class T> 82 class TypedMessageData : public MessageData { 83 public: 84 explicit TypedMessageData(const T& data) : data_(data) { } 85 const T& data() const { return data_; } 86 T& data() { return data_; } 87 private: 88 T data_; 89 }; 90 91 // Like TypedMessageData, but for pointers that require a delete. 92 template <class T> 93 class ScopedMessageData : public MessageData { 94 public: 95 explicit ScopedMessageData(T* data) : data_(data) { } 96 const scoped_ptr<T>& data() const { return data_; } 97 scoped_ptr<T>& data() { return data_; } 98 private: 99 scoped_ptr<T> data_; 100 }; 101 102 // Like ScopedMessageData, but for reference counted pointers. 103 template <class T> 104 class ScopedRefMessageData : public MessageData { 105 public: 106 explicit ScopedRefMessageData(T* data) : data_(data) { } 107 const scoped_refptr<T>& data() const { return data_; } 108 scoped_refptr<T>& data() { return data_; } 109 private: 110 scoped_refptr<T> data_; 111 }; 112 113 template<class T> 114 inline MessageData* WrapMessageData(const T& data) { 115 return new TypedMessageData<T>(data); 116 } 117 118 template<class T> 119 inline const T& UseMessageData(MessageData* data) { 120 return static_cast< TypedMessageData<T>* >(data)->data(); 121 } 122 123 template<class T> 124 class DisposeData : public MessageData { 125 public: 126 explicit DisposeData(T* data) : data_(data) { } 127 virtual ~DisposeData() { delete data_; } 128 private: 129 T* data_; 130 }; 131 132 const uint32 MQID_ANY = static_cast<uint32>(-1); 133 const uint32 MQID_DISPOSE = static_cast<uint32>(-2); 134 135 // No destructor 136 137 struct Message { 138 Message() { 139 memset(this, 0, sizeof(*this)); 140 } 141 inline bool Match(MessageHandler* handler, uint32 id) const { 142 return (handler == NULL || handler == phandler) 143 && (id == MQID_ANY || id == message_id); 144 } 145 MessageHandler *phandler; 146 uint32 message_id; 147 MessageData *pdata; 148 uint32 ts_sensitive; 149 }; 150 151 typedef std::list<Message> MessageList; 152 153 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages 154 // with the same trigger time are processed in num_ (FIFO) order. 155 156 class DelayedMessage { 157 public: 158 DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) 159 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } 160 161 bool operator< (const DelayedMessage& dmsg) const { 162 return (dmsg.msTrigger_ < msTrigger_) 163 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); 164 } 165 166 int cmsDelay_; // for debugging 167 uint32 msTrigger_; 168 uint32 num_; 169 Message msg_; 170 }; 171 172 class MessageQueue { 173 public: 174 explicit MessageQueue(SocketServer* ss = NULL); 175 virtual ~MessageQueue(); 176 177 SocketServer* socketserver() { return ss_; } 178 void set_socketserver(SocketServer* ss); 179 180 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, 181 // futher Posts and Sends will fail. However, any pending Sends and *ready* 182 // Posts (as opposed to unexpired delayed Posts) will be delivered before 183 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 184 // we eliminate the race condition when an MessageHandler and MessageQueue 185 // may be destroyed independently of each other. 186 virtual void Quit(); 187 virtual bool IsQuitting(); 188 virtual void Restart(); 189 190 // Get() will process I/O until: 191 // 1) A message is available (returns true) 192 // 2) cmsWait seconds have elapsed (returns false) 193 // 3) Stop() is called (returns false) 194 virtual bool Get(Message *pmsg, int cmsWait = kForever, 195 bool process_io = true); 196 virtual bool Peek(Message *pmsg, int cmsWait = 0); 197 virtual void Post(MessageHandler *phandler, uint32 id = 0, 198 MessageData *pdata = NULL, bool time_sensitive = false); 199 virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, 200 uint32 id = 0, MessageData *pdata = NULL) { 201 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 202 } 203 virtual void PostAt(uint32 tstamp, MessageHandler *phandler, 204 uint32 id = 0, MessageData *pdata = NULL) { 205 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 206 } 207 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 208 MessageList* removed = NULL); 209 virtual void Dispatch(Message *pmsg); 210 virtual void ReceiveSends(); 211 212 // Amount of time until the next message can be retrieved 213 virtual int GetDelay(); 214 215 bool empty() const { return size() == 0u; } 216 size_t size() const { 217 CritScope cs(&crit_); // msgq_.size() is not thread safe. 218 return msgq_.size() + dmsgq_.size() + (fPeekKeep_ ? 1u : 0u); 219 } 220 221 // Internally posts a message which causes the doomed object to be deleted 222 template<class T> void Dispose(T* doomed) { 223 if (doomed) { 224 Post(NULL, MQID_DISPOSE, new DisposeData<T>(doomed)); 225 } 226 } 227 228 // When this signal is sent out, any references to this queue should 229 // no longer be used. 230 sigslot::signal0<> SignalQueueDestroyed; 231 232 protected: 233 class PriorityQueue : public std::priority_queue<DelayedMessage> { 234 public: 235 container_type& container() { return c; } 236 void reheap() { make_heap(c.begin(), c.end(), comp); } 237 }; 238 239 void EnsureActive(); 240 void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, 241 uint32 id, MessageData* pdata); 242 243 // The SocketServer is not owned by MessageQueue. 244 SocketServer* ss_; 245 // If a server isn't supplied in the constructor, use this one. 246 scoped_ptr<SocketServer> default_ss_; 247 bool fStop_; 248 bool fPeekKeep_; 249 Message msgPeek_; 250 // A message queue is active if it has ever had a message posted to it. 251 // This also corresponds to being in MessageQueueManager's global list. 252 bool active_; 253 MessageList msgq_; 254 PriorityQueue dmsgq_; 255 uint32 dmsgq_next_num_; 256 mutable CriticalSection crit_; 257 258 private: 259 DISALLOW_COPY_AND_ASSIGN(MessageQueue); 260 }; 261 262 } // namespace talk_base 263 264 #endif // TALK_BASE_MESSAGEQUEUE_H_ 265