Home | History | Annotate | Download | only in glue
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "jingle/glue/thread_wrapper.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/lazy_instance.h"
     10 #include "base/threading/thread_local.h"
     11 #include "third_party/libjingle/source/talk/base/nullsocketserver.h"
     12 
     13 namespace jingle_glue {
     14 
     15 struct JingleThreadWrapper::PendingSend {
     16   PendingSend(const talk_base::Message& message_value)
     17       : sending_thread(JingleThreadWrapper::current()),
     18         message(message_value),
     19         done_event(true, false) {
     20     DCHECK(sending_thread);
     21   }
     22 
     23   JingleThreadWrapper* sending_thread;
     24   talk_base::Message message;
     25   base::WaitableEvent done_event;
     26 };
     27 
     28 base::LazyInstance<base::ThreadLocalPointer<JingleThreadWrapper> >
     29     g_jingle_thread_wrapper = LAZY_INSTANCE_INITIALIZER;
     30 
     31 // static
     32 void JingleThreadWrapper::EnsureForCurrentMessageLoop() {
     33   if (JingleThreadWrapper::current() == NULL) {
     34     base::MessageLoop* message_loop = base::MessageLoop::current();
     35     g_jingle_thread_wrapper.Get()
     36         .Set(new JingleThreadWrapper(message_loop->message_loop_proxy()));
     37     message_loop->AddDestructionObserver(current());
     38   }
     39 
     40   DCHECK_EQ(talk_base::Thread::Current(), current());
     41 }
     42 
     43 // static
     44 JingleThreadWrapper* JingleThreadWrapper::current() {
     45   return g_jingle_thread_wrapper.Get().Get();
     46 }
     47 
     48 JingleThreadWrapper::JingleThreadWrapper(
     49     scoped_refptr<base::SingleThreadTaskRunner> task_runner)
     50     : talk_base::Thread(new talk_base::NullSocketServer()),
     51       task_runner_(task_runner),
     52       send_allowed_(false),
     53       last_task_id_(0),
     54       pending_send_event_(true, false),
     55       weak_ptr_factory_(this) {
     56   DCHECK(task_runner->BelongsToCurrentThread());
     57   DCHECK(!talk_base::Thread::Current());
     58   weak_ptr_ = weak_ptr_factory_.GetWeakPtr();
     59   talk_base::MessageQueueManager::Add(this);
     60   WrapCurrent();
     61 }
     62 
     63 JingleThreadWrapper::~JingleThreadWrapper() {
     64   Clear(NULL, talk_base::MQID_ANY, NULL);
     65 }
     66 
     67 void JingleThreadWrapper::WillDestroyCurrentMessageLoop() {
     68   DCHECK_EQ(talk_base::Thread::Current(), current());
     69   UnwrapCurrent();
     70   g_jingle_thread_wrapper.Get().Set(NULL);
     71   talk_base::ThreadManager::Instance()->SetCurrentThread(NULL);
     72   talk_base::MessageQueueManager::Remove(this);
     73   talk_base::SocketServer* ss = socketserver();
     74   delete this;
     75   delete ss;
     76 }
     77 
     78 void JingleThreadWrapper::Post(
     79     talk_base::MessageHandler* handler, uint32 message_id,
     80     talk_base::MessageData* data, bool time_sensitive) {
     81   PostTaskInternal(0, handler, message_id, data);
     82 }
     83 
     84 void JingleThreadWrapper::PostDelayed(
     85     int delay_ms, talk_base::MessageHandler* handler,
     86     uint32 message_id, talk_base::MessageData* data) {
     87   PostTaskInternal(delay_ms, handler, message_id, data);
     88 }
     89 
     90 void JingleThreadWrapper::Clear(talk_base::MessageHandler* handler, uint32 id,
     91                                 talk_base::MessageList* removed) {
     92   base::AutoLock auto_lock(lock_);
     93 
     94   for (MessagesQueue::iterator it = messages_.begin();
     95        it != messages_.end();) {
     96     MessagesQueue::iterator next = it;
     97     ++next;
     98 
     99     if (it->second.Match(handler, id)) {
    100       if (removed) {
    101         removed->push_back(it->second);
    102       } else {
    103         delete it->second.pdata;
    104       }
    105       messages_.erase(it);
    106     }
    107 
    108     it = next;
    109   }
    110 
    111   for (std::list<PendingSend*>::iterator it = pending_send_messages_.begin();
    112        it != pending_send_messages_.end();) {
    113     std::list<PendingSend*>::iterator next = it;
    114     ++next;
    115 
    116     if ((*it)->message.Match(handler, id)) {
    117       if (removed) {
    118         removed ->push_back((*it)->message);
    119       } else {
    120         delete (*it)->message.pdata;
    121       }
    122       (*it)->done_event.Signal();
    123       pending_send_messages_.erase(it);
    124     }
    125 
    126     it = next;
    127   }
    128 }
    129 
    130 void JingleThreadWrapper::Send(talk_base::MessageHandler *handler, uint32 id,
    131                                talk_base::MessageData *data) {
    132   if (fStop_)
    133     return;
    134 
    135   JingleThreadWrapper* current_thread = JingleThreadWrapper::current();
    136   DCHECK(current_thread != NULL) << "Send() can be called only from a "
    137       "thread that has JingleThreadWrapper.";
    138 
    139   talk_base::Message message;
    140   message.phandler = handler;
    141   message.message_id = id;
    142   message.pdata = data;
    143 
    144   if (current_thread == this) {
    145     handler->OnMessage(&message);
    146     return;
    147   }
    148 
    149   // Send message from a thread different than |this|.
    150 
    151   // Allow inter-thread send only from threads that have
    152   // |send_allowed_| flag set.
    153   DCHECK(current_thread->send_allowed_) << "Send()'ing synchronous "
    154       "messages is not allowed from the current thread.";
    155 
    156   PendingSend pending_send(message);
    157   {
    158     base::AutoLock auto_lock(lock_);
    159     pending_send_messages_.push_back(&pending_send);
    160   }
    161 
    162   // Need to signal |pending_send_event_| here in case the thread is
    163   // sending message to another thread.
    164   pending_send_event_.Signal();
    165   task_runner_->PostTask(FROM_HERE,
    166                          base::Bind(&JingleThreadWrapper::ProcessPendingSends,
    167                                     weak_ptr_));
    168 
    169 
    170   while (!pending_send.done_event.IsSignaled()) {
    171     base::WaitableEvent* events[] = {&pending_send.done_event,
    172                                      &current_thread->pending_send_event_};
    173     size_t event = base::WaitableEvent::WaitMany(events, arraysize(events));
    174     DCHECK(event == 0 || event == 1);
    175 
    176     if (event == 1)
    177       current_thread->ProcessPendingSends();
    178   }
    179 }
    180 
    181 void JingleThreadWrapper::ProcessPendingSends() {
    182   while (true) {
    183     PendingSend* pending_send = NULL;
    184     {
    185       base::AutoLock auto_lock(lock_);
    186       if (!pending_send_messages_.empty()) {
    187         pending_send = pending_send_messages_.front();
    188         pending_send_messages_.pop_front();
    189       } else {
    190         // Reset the event while |lock_| is still locked.
    191         pending_send_event_.Reset();
    192         break;
    193       }
    194     }
    195     if (pending_send) {
    196       pending_send->message.phandler->OnMessage(&pending_send->message);
    197       pending_send->done_event.Signal();
    198     }
    199   }
    200 }
    201 
    202 void JingleThreadWrapper::PostTaskInternal(
    203     int delay_ms, talk_base::MessageHandler* handler,
    204     uint32 message_id, talk_base::MessageData* data) {
    205   int task_id;
    206   talk_base::Message message;
    207   message.phandler = handler;
    208   message.message_id = message_id;
    209   message.pdata = data;
    210   {
    211     base::AutoLock auto_lock(lock_);
    212     task_id = ++last_task_id_;
    213     messages_.insert(std::pair<int, talk_base::Message>(task_id, message));
    214   }
    215 
    216   if (delay_ms <= 0) {
    217     task_runner_->PostTask(FROM_HERE,
    218                            base::Bind(&JingleThreadWrapper::RunTask,
    219                                       weak_ptr_, task_id));
    220   } else {
    221     task_runner_->PostDelayedTask(FROM_HERE,
    222                                   base::Bind(&JingleThreadWrapper::RunTask,
    223                                              weak_ptr_, task_id),
    224                                   base::TimeDelta::FromMilliseconds(delay_ms));
    225   }
    226 }
    227 
    228 void JingleThreadWrapper::RunTask(int task_id) {
    229   bool have_message = false;
    230   talk_base::Message message;
    231   {
    232     base::AutoLock auto_lock(lock_);
    233     MessagesQueue::iterator it = messages_.find(task_id);
    234     if (it != messages_.end()) {
    235       have_message = true;
    236       message = it->second;
    237       messages_.erase(it);
    238     }
    239   }
    240 
    241   if (have_message) {
    242     if (message.message_id == talk_base::MQID_DISPOSE) {
    243       DCHECK(message.phandler == NULL);
    244       delete message.pdata;
    245     } else {
    246       message.phandler->OnMessage(&message);
    247     }
    248   }
    249 }
    250 
    251 // All methods below are marked as not reached. See comments in the
    252 // header for more details.
    253 void JingleThreadWrapper::Quit() {
    254   NOTREACHED();
    255 }
    256 
    257 bool JingleThreadWrapper::IsQuitting() {
    258   NOTREACHED();
    259   return false;
    260 }
    261 
    262 void JingleThreadWrapper::Restart() {
    263   NOTREACHED();
    264 }
    265 
    266 bool JingleThreadWrapper::Get(talk_base::Message*, int, bool) {
    267   NOTREACHED();
    268   return false;
    269 }
    270 
    271 bool JingleThreadWrapper::Peek(talk_base::Message*, int) {
    272   NOTREACHED();
    273   return false;
    274 }
    275 
    276 void JingleThreadWrapper::PostAt(uint32, talk_base::MessageHandler*,
    277                                  uint32, talk_base::MessageData*) {
    278   NOTREACHED();
    279 }
    280 
    281 void JingleThreadWrapper::Dispatch(talk_base::Message* message) {
    282   NOTREACHED();
    283 }
    284 
    285 void JingleThreadWrapper::ReceiveSends() {
    286   NOTREACHED();
    287 }
    288 
    289 int JingleThreadWrapper::GetDelay() {
    290   NOTREACHED();
    291   return 0;
    292 }
    293 
    294 void JingleThreadWrapper::Stop() {
    295   NOTREACHED();
    296 }
    297 
    298 void JingleThreadWrapper::Run() {
    299   NOTREACHED();
    300 }
    301 
    302 }  // namespace jingle_glue
    303