1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "jingle/glue/thread_wrapper.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/lazy_instance.h" 10 #include "base/threading/thread_local.h" 11 #include "third_party/libjingle/source/talk/base/nullsocketserver.h" 12 13 namespace jingle_glue { 14 15 struct JingleThreadWrapper::PendingSend { 16 PendingSend(const talk_base::Message& message_value) 17 : sending_thread(JingleThreadWrapper::current()), 18 message(message_value), 19 done_event(true, false) { 20 DCHECK(sending_thread); 21 } 22 23 JingleThreadWrapper* sending_thread; 24 talk_base::Message message; 25 base::WaitableEvent done_event; 26 }; 27 28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> > 29 g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER; 30 31 // static 32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() { 33 if (JingleThreadWrapper::current() == NULL) { 34 base::MessageLoop* message_loop = base::MessageLoop::current(); 35 g_jingle_thread_wrapper.Get() 36 .Set(new JingleThreadWrapper(message_loop->message_loop_proxy())); 37 message_loop->AddDestructionObserver(current()); 38 } 39 40 DCHECK_EQ(talk_base::Thread::Current(), current()); 41 } 42 43 // static 44 JingleThreadWrapper* JingleThreadWrapper::current() { 45 return g_jingle_thread_wrapper.Get().Get(); 46 } 47 48 JingleThreadWrapper::JingleThreadWrapper( 49 scoped_refptr<base::SingleThreadTaskRunner> task_runner) 50 : talk_base::Thread(new talk_base::NullSocketServer()), 51 task_runner_(task_runner), 52 send_allowed_(false), 53 last_task_id_(0), 54 pending_send_event_(true, false), 55 weak_ptr_factory_(this) { 56 DCHECK(task_runner->BelongsToCurrentThread()); 57 DCHECK(!talk_base::Thread::Current()); 58 weak_ptr_ = weak_ptr_factory_.GetWeakPtr(); 59 talk_base::MessageQueueManager::Add(this); 60 WrapCurrent(); 61 } 62 63 JingleThreadWrapper::~JingleThreadWrapper() { 64 Clear(NULL, talk_base::MQID_ANY, NULL); 65 } 66 67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() { 68 DCHECK_EQ(talk_base::Thread::Current(), current()); 69 UnwrapCurrent(); 70 g_jingle_thread_wrapper.Get().Set(NULL); 71 talk_base::ThreadManager::Instance()->SetCurrentThread(NULL); 72 talk_base::MessageQueueManager::Remove(this); 73 talk_base::SocketServer* ss = socketserver(); 74 delete this; 75 delete ss; 76 } 77 78 void JingleThreadWrapper::Post( 79 talk_base::MessageHandler* handler, uint32 message_id, 80 talk_base::MessageData* data, bool time_sensitive) { 81 PostTaskInternal(0, handler, message_id, data); 82 } 83 84 void JingleThreadWrapper::PostDelayed( 85 int delay_ms, talk_base::MessageHandler* handler, 86 uint32 message_id, talk_base::MessageData* data) { 87 PostTaskInternal(delay_ms, handler, message_id, data); 88 } 89 90 void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id, 91 talk_base::MessageList* removed) { 92 base::AutoLock auto_lock(lock_); 93 94 for (MessagesQueue::iterator it = messages_.begin(); 95 it != messages_.end();) { 96 MessagesQueue::iterator next = it; 97 ++next; 98 99 if (it->second.Match(handler, id)) { 100 if (removed) { 101 removed->push_back(it->second); 102 } else { 103 delete it->second.pdata; 104 } 105 messages_.erase(it); 106 } 107 108 it = next; 109 } 110 111 for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin(); 112 it != pending_send_messages_.end();) { 113 std::list<PendingSend*>::iterator next = it; 114 ++next; 115 116 if ((*it)->message.Match(handler, id)) { 117 if (removed) { 118 removed ->push_back((*it)->message); 119 } else { 120 delete (*it)->message.pdata; 121 } 122 (*it)->done_event.Signal(); 123 pending_send_messages_.erase(it); 124 } 125 126 it = next; 127 } 128 } 129 130 void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id, 131 talk_base::MessageData *data) { 132 if (fStop_) 133 return; 134 135 JingleThreadWrapper* current_thread = JingleThreadWrapper::current(); 136 DCHECK(current_thread != NULL) << "Send() can be called only from a " 137 "thread that has JingleThreadWrapper."; 138 139 talk_base::Message message; 140 message.phandler = handler; 141 message.message_id = id; 142 message.pdata = data; 143 144 if (current_thread == this) { 145 handler->OnMessage(&message); 146 return; 147 } 148 149 // Send message from a thread different than |this|. 150 151 // Allow inter-thread send only from threads that have 152 // |send_allowed_| flag set. 153 DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous " 154 "messages is not allowed from the current thread."; 155 156 PendingSend pending_send(message); 157 { 158 base::AutoLock auto_lock(lock_); 159 pending_send_messages_.push_back(&pending_send); 160 } 161 162 // Need to signal |pending_send_event_| here in case the thread is 163 // sending message to another thread. 164 pending_send_event_.Signal(); 165 task_runner_->PostTask(FROM_HERE, 166 base::Bind(&JingleThreadWrapper::ProcessPendingSends, 167 weak_ptr_)); 168 169 170 while (!pending_send.done_event.IsSignaled()) { 171 base::WaitableEvent* events[] = {&pending_send.done_event, 172 ¤t_thread->pending_send_event_}; 173 size_t event = base::WaitableEvent::WaitMany(events, arraysize(events)); 174 DCHECK(event == 0 || event == 1); 175 176 if (event == 1) 177 current_thread->ProcessPendingSends(); 178 } 179 } 180 181 void JingleThreadWrapper::ProcessPendingSends() { 182 while (true) { 183 PendingSend* pending_send = NULL; 184 { 185 base::AutoLock auto_lock(lock_); 186 if (!pending_send_messages_.empty()) { 187 pending_send = pending_send_messages_.front(); 188 pending_send_messages_.pop_front(); 189 } else { 190 // Reset the event while |lock_| is still locked. 191 pending_send_event_.Reset(); 192 break; 193 } 194 } 195 if (pending_send) { 196 pending_send->message.phandler->OnMessage(&pending_send->message); 197 pending_send->done_event.Signal(); 198 } 199 } 200 } 201 202 void JingleThreadWrapper::PostTaskInternal( 203 int delay_ms, talk_base::MessageHandler* handler, 204 uint32 message_id, talk_base::MessageData* data) { 205 int task_id; 206 talk_base::Message message; 207 message.phandler = handler; 208 message.message_id = message_id; 209 message.pdata = data; 210 { 211 base::AutoLock auto_lock(lock_); 212 task_id = ++last_task_id_; 213 messages_.insert(std::pair<int, talk_base::Message>(task_id, message)); 214 } 215 216 if (delay_ms <= 0) { 217 task_runner_->PostTask(FROM_HERE, 218 base::Bind(&JingleThreadWrapper::RunTask, 219 weak_ptr_, task_id)); 220 } else { 221 task_runner_->PostDelayedTask(FROM_HERE, 222 base::Bind(&JingleThreadWrapper::RunTask, 223 weak_ptr_, task_id), 224 base::TimeDelta::FromMilliseconds(delay_ms)); 225 } 226 } 227 228 void JingleThreadWrapper::RunTask(int task_id) { 229 bool have_message = false; 230 talk_base::Message message; 231 { 232 base::AutoLock auto_lock(lock_); 233 MessagesQueue::iterator it = messages_.find(task_id); 234 if (it != messages_.end()) { 235 have_message = true; 236 message = it->second; 237 messages_.erase(it); 238 } 239 } 240 241 if (have_message) { 242 if (message.message_id == talk_base::MQID_DISPOSE) { 243 DCHECK(message.phandler == NULL); 244 delete message.pdata; 245 } else { 246 message.phandler->OnMessage(&message); 247 } 248 } 249 } 250 251 // All methods below are marked as not reached. See comments in the 252 // header for more details. 253 void JingleThreadWrapper::Quit() { 254 NOTREACHED(); 255 } 256 257 bool JingleThreadWrapper::IsQuitting() { 258 NOTREACHED(); 259 return false; 260 } 261 262 void JingleThreadWrapper::Restart() { 263 NOTREACHED(); 264 } 265 266 bool JingleThreadWrapper::Get(talk_base::Message*, int, bool) { 267 NOTREACHED(); 268 return false; 269 } 270 271 bool JingleThreadWrapper::Peek(talk_base::Message*, int) { 272 NOTREACHED(); 273 return false; 274 } 275 276 void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*, 277 uint32, talk_base::MessageData*) { 278 NOTREACHED(); 279 } 280 281 void JingleThreadWrapper::Dispatch(talk_base::Message* message) { 282 NOTREACHED(); 283 } 284 285 void JingleThreadWrapper::ReceiveSends() { 286 NOTREACHED(); 287 } 288 289 int JingleThreadWrapper::GetDelay() { 290 NOTREACHED(); 291 return 0; 292 } 293 294 void JingleThreadWrapper::Stop() { 295 NOTREACHED(); 296 } 297 298 void JingleThreadWrapper::Run() { 299 NOTREACHED(); 300 } 301 302 } // namespace jingle_glue 303