Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2014 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/base/asyncinvoker.h"
     12 
     13 #include "webrtc/base/checks.h"
     14 #include "webrtc/base/logging.h"
     15 
     16 namespace rtc {
     17 
     18 AsyncInvoker::AsyncInvoker() : destroying_(false) {}
     19 
     20 AsyncInvoker::~AsyncInvoker() {
     21   destroying_ = true;
     22   SignalInvokerDestroyed();
     23   // Messages for this need to be cleared *before* our destructor is complete.
     24   MessageQueueManager::Clear(this);
     25 }
     26 
     27 void AsyncInvoker::OnMessage(Message* msg) {
     28   // Get the AsyncClosure shared ptr from this message's data.
     29   ScopedRefMessageData<AsyncClosure>* data =
     30       static_cast<ScopedRefMessageData<AsyncClosure>*>(msg->pdata);
     31   scoped_refptr<AsyncClosure> closure = data->data();
     32   delete msg->pdata;
     33   msg->pdata = NULL;
     34 
     35   // Execute the closure and trigger the return message if needed.
     36   closure->Execute();
     37 }
     38 
     39 void AsyncInvoker::Flush(Thread* thread, uint32_t id /*= MQID_ANY*/) {
     40   if (destroying_) return;
     41 
     42   // Run this on |thread| to reduce the number of context switches.
     43   if (Thread::Current() != thread) {
     44     thread->Invoke<void>(Bind(&AsyncInvoker::Flush, this, thread, id));
     45     return;
     46   }
     47 
     48   MessageList removed;
     49   thread->Clear(this, id, &removed);
     50   for (MessageList::iterator it = removed.begin(); it != removed.end(); ++it) {
     51     // This message was pending on this thread, so run it now.
     52     thread->Send(it->phandler,
     53                  it->message_id,
     54                  it->pdata);
     55   }
     56 }
     57 
     58 void AsyncInvoker::DoInvoke(Thread* thread,
     59                             const scoped_refptr<AsyncClosure>& closure,
     60                             uint32_t id) {
     61   if (destroying_) {
     62     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
     63     return;
     64   }
     65   thread->Post(this, id, new ScopedRefMessageData<AsyncClosure>(closure));
     66 }
     67 
     68 void AsyncInvoker::DoInvokeDelayed(Thread* thread,
     69                                    const scoped_refptr<AsyncClosure>& closure,
     70                                    uint32_t delay_ms,
     71                                    uint32_t id) {
     72   if (destroying_) {
     73     LOG(LS_WARNING) << "Tried to invoke while destroying the invoker.";
     74     return;
     75   }
     76   thread->PostDelayed(delay_ms, this, id,
     77                       new ScopedRefMessageData<AsyncClosure>(closure));
     78 }
     79 
     80 GuardedAsyncInvoker::GuardedAsyncInvoker() : thread_(Thread::Current()) {
     81   thread_->SignalQueueDestroyed.connect(this,
     82                                         &GuardedAsyncInvoker::ThreadDestroyed);
     83 }
     84 
     85 GuardedAsyncInvoker::~GuardedAsyncInvoker() {
     86 }
     87 
     88 bool GuardedAsyncInvoker::Flush(uint32_t id) {
     89   rtc::CritScope cs(&crit_);
     90   if (thread_ == nullptr)
     91     return false;
     92   invoker_.Flush(thread_, id);
     93   return true;
     94 }
     95 
     96 void GuardedAsyncInvoker::ThreadDestroyed() {
     97   rtc::CritScope cs(&crit_);
     98   // We should never get more than one notification about the thread dying.
     99   RTC_DCHECK(thread_ != nullptr);
    100   thread_ = nullptr;
    101 }
    102 
    103 NotifyingAsyncClosureBase::NotifyingAsyncClosureBase(AsyncInvoker* invoker,
    104                                                      Thread* calling_thread)
    105     : invoker_(invoker), calling_thread_(calling_thread) {
    106   calling_thread->SignalQueueDestroyed.connect(
    107       this, &NotifyingAsyncClosureBase::CancelCallback);
    108   invoker->SignalInvokerDestroyed.connect(
    109       this, &NotifyingAsyncClosureBase::CancelCallback);
    110 }
    111 
    112 NotifyingAsyncClosureBase::~NotifyingAsyncClosureBase() {
    113   disconnect_all();
    114 }
    115 
    116 void NotifyingAsyncClosureBase::TriggerCallback() {
    117   CritScope cs(&crit_);
    118   if (!CallbackCanceled() && !callback_.empty()) {
    119     invoker_->AsyncInvoke<void>(calling_thread_, callback_);
    120   }
    121 }
    122 
    123 void NotifyingAsyncClosureBase::CancelCallback() {
    124   // If the callback is triggering when this is called, block the
    125   // destructor of the dying object here by waiting until the callback
    126   // is done triggering.
    127   CritScope cs(&crit_);
    128   // calling_thread_ == NULL means do not trigger the callback.
    129   calling_thread_ = NULL;
    130 }
    131 
    132 }  // namespace rtc
    133