1 /* 2 * libjingle 3 * Copyright 2004--2005, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #ifndef TALK_BASE_MESSAGEQUEUE_H_ 29 #define TALK_BASE_MESSAGEQUEUE_H_ 30 31 #include <algorithm> 32 #include <cstring> 33 #include <list> 34 #include <queue> 35 #include <vector> 36 37 #include "talk/base/basictypes.h" 38 #include "talk/base/criticalsection.h" 39 #include "talk/base/messagehandler.h" 40 #include "talk/base/scoped_ptr.h" 41 #include "talk/base/sigslot.h" 42 #include "talk/base/socketserver.h" 43 #include "talk/base/time.h" 44 45 namespace talk_base { 46 47 struct Message; 48 class MessageQueue; 49 50 // MessageQueueManager does cleanup of of message queues 51 52 class MessageQueueManager { 53 public: 54 static MessageQueueManager* Instance(); 55 56 void Add(MessageQueue *message_queue); 57 void Remove(MessageQueue *message_queue); 58 void Clear(MessageHandler *handler); 59 60 private: 61 MessageQueueManager(); 62 ~MessageQueueManager(); 63 64 static MessageQueueManager* instance_; 65 // This list contains 'active' MessageQueues. 66 std::vector<MessageQueue *> message_queues_; 67 CriticalSection crit_; 68 }; 69 70 // Derive from this for specialized data 71 // App manages lifetime, except when messages are purged 72 73 class MessageData { 74 public: 75 MessageData() {} 76 virtual ~MessageData() {} 77 }; 78 79 template <class T> 80 class TypedMessageData : public MessageData { 81 public: 82 explicit TypedMessageData(const T& data) : data_(data) { } 83 const T& data() const { return data_; } 84 T& data() { return data_; } 85 private: 86 T data_; 87 }; 88 89 // Like TypedMessageData, but for pointers that require a delete. 90 template <class T> 91 class ScopedMessageData : public MessageData { 92 public: 93 explicit ScopedMessageData(T* data) : data_(data) { } 94 const scoped_ptr<T>& data() const { return data_; } 95 scoped_ptr<T>& data() { return data_; } 96 private: 97 scoped_ptr<T> data_; 98 }; 99 100 template<class T> 101 inline MessageData* WrapMessageData(const T& data) { 102 return new TypedMessageData<T>(data); 103 } 104 105 template<class T> 106 inline const T& UseMessageData(MessageData* data) { 107 return static_cast< TypedMessageData<T>* >(data)->data(); 108 } 109 110 template<class T> 111 class DisposeData : public MessageData { 112 public: 113 explicit DisposeData(T* data) : data_(data) { } 114 virtual ~DisposeData() { delete data_; } 115 private: 116 T* data_; 117 }; 118 119 const uint32 MQID_ANY = static_cast<uint32>(-1); 120 const uint32 MQID_DISPOSE = static_cast<uint32>(-2); 121 122 // No destructor 123 124 struct Message { 125 Message() { 126 memset(this, 0, sizeof(*this)); 127 } 128 inline bool Match(MessageHandler* handler, uint32 id) const { 129 return (handler == NULL || handler == phandler) 130 && (id == MQID_ANY || id == message_id); 131 } 132 MessageHandler *phandler; 133 uint32 message_id; 134 MessageData *pdata; 135 uint32 ts_sensitive; 136 }; 137 138 typedef std::list<Message> MessageList; 139 140 // DelayedMessage goes into a priority queue, sorted by trigger time. Messages 141 // with the same trigger time are processed in num_ (FIFO) order. 142 143 class DelayedMessage { 144 public: 145 DelayedMessage(int delay, uint32 trigger, uint32 num, const Message& msg) 146 : cmsDelay_(delay), msTrigger_(trigger), num_(num), msg_(msg) { } 147 148 bool operator< (const DelayedMessage& dmsg) const { 149 return (dmsg.msTrigger_ < msTrigger_) 150 || ((dmsg.msTrigger_ == msTrigger_) && (dmsg.num_ < num_)); 151 } 152 153 int cmsDelay_; // for debugging 154 uint32 msTrigger_; 155 uint32 num_; 156 Message msg_; 157 }; 158 159 class MessageQueue { 160 public: 161 explicit MessageQueue(SocketServer* ss = NULL); 162 virtual ~MessageQueue(); 163 164 SocketServer* socketserver() { return ss_; } 165 void set_socketserver(SocketServer* ss); 166 167 // Note: The behavior of MessageQueue has changed. When a MQ is stopped, 168 // futher Posts and Sends will fail. However, any pending Sends and *ready* 169 // Posts (as opposed to unexpired delayed Posts) will be delivered before 170 // Get (or Peek) returns false. By guaranteeing delivery of those messages, 171 // we eliminate the race condition when an MessageHandler and MessageQueue 172 // may be destroyed independently of each other. 173 virtual void Quit(); 174 virtual bool IsQuitting(); 175 virtual void Restart(); 176 177 // Get() will process I/O until: 178 // 1) A message is available (returns true) 179 // 2) cmsWait seconds have elapsed (returns false) 180 // 3) Stop() is called (returns false) 181 virtual bool Get(Message *pmsg, int cmsWait = kForever, 182 bool process_io = true); 183 virtual bool Peek(Message *pmsg, int cmsWait = 0); 184 virtual void Post(MessageHandler *phandler, uint32 id = 0, 185 MessageData *pdata = NULL, bool time_sensitive = false); 186 virtual void PostDelayed(int cmsDelay, MessageHandler *phandler, 187 uint32 id = 0, MessageData *pdata = NULL) { 188 return DoDelayPost(cmsDelay, TimeAfter(cmsDelay), phandler, id, pdata); 189 } 190 virtual void PostAt(uint32 tstamp, MessageHandler *phandler, 191 uint32 id = 0, MessageData *pdata = NULL) { 192 return DoDelayPost(TimeUntil(tstamp), tstamp, phandler, id, pdata); 193 } 194 virtual void Clear(MessageHandler *phandler, uint32 id = MQID_ANY, 195 MessageList* removed = NULL); 196 virtual void Dispatch(Message *pmsg); 197 virtual void ReceiveSends(); 198 199 // Amount of time until the next message can be retrieved 200 virtual int GetDelay(); 201 202 bool empty() const { return msgq_.empty() && dmsgq_.empty() && !fPeekKeep_; } 203 size_t size() const { return msgq_.size() + dmsgq_.size() + fPeekKeep_; } 204 205 // Internally posts a message which causes the doomed object to be deleted 206 template<class T> void Dispose(T* doomed) { 207 if (doomed) { 208 Post(NULL, MQID_DISPOSE, new DisposeData<T>(doomed)); 209 } 210 } 211 212 // When this signal is sent out, any references to this queue should 213 // no longer be used. 214 sigslot::signal0<> SignalQueueDestroyed; 215 216 protected: 217 class PriorityQueue : public std::priority_queue<DelayedMessage> { 218 public: 219 container_type& container() { return c; } 220 void reheap() { make_heap(c.begin(), c.end(), comp); } 221 }; 222 223 void EnsureActive(); 224 void DoDelayPost(int cmsDelay, uint32 tstamp, MessageHandler *phandler, 225 uint32 id, MessageData* pdata); 226 227 // The SocketServer is not owned by MessageQueue. 228 SocketServer* ss_; 229 // If a server isn't supplied in the constructor, use this one. 230 scoped_ptr<SocketServer> default_ss_; 231 bool fStop_; 232 bool fPeekKeep_; 233 Message msgPeek_; 234 // A message queue is active if it has ever had a message posted to it. 235 // This also corresponds to being in MessageQueueManager's global list. 236 bool active_; 237 MessageList msgq_; 238 PriorityQueue dmsgq_; 239 uint32 dmsgq_next_num_; 240 CriticalSection crit_; 241 }; 242 243 } // namespace talk_base 244 245 #endif // TALK_BASE_MESSAGEQUEUE_H_ 246