Home | History | Annotate | Download | only in source
      1 /*
      2  *  Copyright (c) 2012 The WebRTC project authors. All Rights Reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #include "webrtc/modules/utility/source/process_thread_impl.h"
     12 
     13 #include "webrtc/base/checks.h"
     14 #include "webrtc/modules/include/module.h"
     15 #include "webrtc/system_wrappers/include/logging.h"
     16 #include "webrtc/system_wrappers/include/tick_util.h"
     17 
     18 namespace webrtc {
     19 namespace {
     20 
     21 // We use this constant internally to signal that a module has requested
     22 // a callback right away.  When this is set, no call to TimeUntilNextProcess
     23 // should be made, but Process() should be called directly.
     24 const int64_t kCallProcessImmediately = -1;
     25 
     26 int64_t GetNextCallbackTime(Module* module, int64_t time_now) {
     27   int64_t interval = module->TimeUntilNextProcess();
     28   if (interval < 0) {
     29     // Falling behind, we should call the callback now.
     30     return time_now;
     31   }
     32   return time_now + interval;
     33 }
     34 }
     35 
     36 ProcessThread::~ProcessThread() {}
     37 
     38 // static
     39 rtc::scoped_ptr<ProcessThread> ProcessThread::Create(
     40     const char* thread_name) {
     41   return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl(thread_name));
     42 }
     43 
     44 ProcessThreadImpl::ProcessThreadImpl(const char* thread_name)
     45     : wake_up_(EventWrapper::Create()),
     46       stop_(false),
     47       thread_name_(thread_name) {}
     48 
     49 ProcessThreadImpl::~ProcessThreadImpl() {
     50   RTC_DCHECK(thread_checker_.CalledOnValidThread());
     51   RTC_DCHECK(!thread_.get());
     52   RTC_DCHECK(!stop_);
     53 
     54   while (!queue_.empty()) {
     55     delete queue_.front();
     56     queue_.pop();
     57   }
     58 }
     59 
     60 void ProcessThreadImpl::Start() {
     61   RTC_DCHECK(thread_checker_.CalledOnValidThread());
     62   RTC_DCHECK(!thread_.get());
     63   if (thread_.get())
     64     return;
     65 
     66   RTC_DCHECK(!stop_);
     67 
     68   {
     69     // TODO(tommi): Since DeRegisterModule is currently being called from
     70     // different threads in some cases (ChannelOwner), we need to lock access to
     71     // the modules_ collection even on the controller thread.
     72     // Once we've cleaned up those places, we can remove this lock.
     73     rtc::CritScope lock(&lock_);
     74     for (ModuleCallback& m : modules_)
     75       m.module->ProcessThreadAttached(this);
     76   }
     77 
     78   thread_.reset(
     79       new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_));
     80   thread_->Start();
     81 }
     82 
     83 void ProcessThreadImpl::Stop() {
     84   RTC_DCHECK(thread_checker_.CalledOnValidThread());
     85   if(!thread_.get())
     86     return;
     87 
     88   {
     89     rtc::CritScope lock(&lock_);
     90     stop_ = true;
     91   }
     92 
     93   wake_up_->Set();
     94 
     95   thread_->Stop();
     96   stop_ = false;
     97 
     98   // TODO(tommi): Since DeRegisterModule is currently being called from
     99   // different threads in some cases (ChannelOwner), we need to lock access to
    100   // the modules_ collection even on the controller thread.
    101   // Since DeRegisterModule also checks thread_, we also need to hold the
    102   // lock for the .reset() operation.
    103   // Once we've cleaned up those places, we can remove this lock.
    104   rtc::CritScope lock(&lock_);
    105   thread_.reset();
    106   for (ModuleCallback& m : modules_)
    107     m.module->ProcessThreadAttached(nullptr);
    108 }
    109 
    110 void ProcessThreadImpl::WakeUp(Module* module) {
    111   // Allowed to be called on any thread.
    112   {
    113     rtc::CritScope lock(&lock_);
    114     for (ModuleCallback& m : modules_) {
    115       if (m.module == module)
    116         m.next_callback = kCallProcessImmediately;
    117     }
    118   }
    119   wake_up_->Set();
    120 }
    121 
    122 void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) {
    123   // Allowed to be called on any thread.
    124   {
    125     rtc::CritScope lock(&lock_);
    126     queue_.push(task.release());
    127   }
    128   wake_up_->Set();
    129 }
    130 
    131 void ProcessThreadImpl::RegisterModule(Module* module) {
    132   RTC_DCHECK(thread_checker_.CalledOnValidThread());
    133   RTC_DCHECK(module);
    134 
    135 #if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON))
    136   {
    137     // Catch programmer error.
    138     rtc::CritScope lock(&lock_);
    139     for (const ModuleCallback& mc : modules_)
    140       RTC_DCHECK(mc.module != module);
    141   }
    142 #endif
    143 
    144   // Now that we know the module isn't in the list, we'll call out to notify
    145   // the module that it's attached to the worker thread.  We don't hold
    146   // the lock while we make this call.
    147   if (thread_.get())
    148     module->ProcessThreadAttached(this);
    149 
    150   {
    151     rtc::CritScope lock(&lock_);
    152     modules_.push_back(ModuleCallback(module));
    153   }
    154 
    155   // Wake the thread calling ProcessThreadImpl::Process() to update the
    156   // waiting time. The waiting time for the just registered module may be
    157   // shorter than all other registered modules.
    158   wake_up_->Set();
    159 }
    160 
    161 void ProcessThreadImpl::DeRegisterModule(Module* module) {
    162   // Allowed to be called on any thread.
    163   // TODO(tommi): Disallow this ^^^
    164   RTC_DCHECK(module);
    165 
    166   {
    167     rtc::CritScope lock(&lock_);
    168     modules_.remove_if([&module](const ModuleCallback& m) {
    169         return m.module == module;
    170       });
    171 
    172     // TODO(tommi): we currently need to hold the lock while calling out to
    173     // ProcessThreadAttached.  This is to make sure that the thread hasn't been
    174     // destroyed while we attach the module.  Once we can make sure
    175     // DeRegisterModule isn't being called on arbitrary threads, we can move the
    176     // |if (thread_.get())| check and ProcessThreadAttached() call outside the
    177     // lock scope.
    178 
    179     // Notify the module that it's been detached.
    180     if (thread_.get())
    181       module->ProcessThreadAttached(nullptr);
    182   }
    183 }
    184 
    185 // static
    186 bool ProcessThreadImpl::Run(void* obj) {
    187   return static_cast<ProcessThreadImpl*>(obj)->Process();
    188 }
    189 
    190 bool ProcessThreadImpl::Process() {
    191   int64_t now = TickTime::MillisecondTimestamp();
    192   int64_t next_checkpoint = now + (1000 * 60);
    193 
    194   {
    195     rtc::CritScope lock(&lock_);
    196     if (stop_)
    197       return false;
    198     for (ModuleCallback& m : modules_) {
    199       // TODO(tommi): Would be good to measure the time TimeUntilNextProcess
    200       // takes and dcheck if it takes too long (e.g. >=10ms).  Ideally this
    201       // operation should not require taking a lock, so querying all modules
    202       // should run in a matter of nanoseconds.
    203       if (m.next_callback == 0)
    204         m.next_callback = GetNextCallbackTime(m.module, now);
    205 
    206       if (m.next_callback <= now ||
    207           m.next_callback == kCallProcessImmediately) {
    208         m.module->Process();
    209         // Use a new 'now' reference to calculate when the next callback
    210         // should occur.  We'll continue to use 'now' above for the baseline
    211         // of calculating how long we should wait, to reduce variance.
    212         int64_t new_now = TickTime::MillisecondTimestamp();
    213         m.next_callback = GetNextCallbackTime(m.module, new_now);
    214       }
    215 
    216       if (m.next_callback < next_checkpoint)
    217         next_checkpoint = m.next_callback;
    218     }
    219 
    220     while (!queue_.empty()) {
    221       ProcessTask* task = queue_.front();
    222       queue_.pop();
    223       lock_.Leave();
    224       task->Run();
    225       delete task;
    226       lock_.Enter();
    227     }
    228   }
    229 
    230   int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp();
    231   if (time_to_wait > 0)
    232     wake_up_->Wait(static_cast<unsigned long>(time_to_wait));
    233 
    234   return true;
    235 }
    236 }  // namespace webrtc
    237