1 /* 2 * Copyright (c) 2012 The WebRTC project authors. All Rights Reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #include "webrtc/modules/utility/source/process_thread_impl.h" 12 13 #include "webrtc/base/checks.h" 14 #include "webrtc/modules/include/module.h" 15 #include "webrtc/system_wrappers/include/logging.h" 16 #include "webrtc/system_wrappers/include/tick_util.h" 17 18 namespace webrtc { 19 namespace { 20 21 // We use this constant internally to signal that a module has requested 22 // a callback right away. When this is set, no call to TimeUntilNextProcess 23 // should be made, but Process() should be called directly. 24 const int64_t kCallProcessImmediately = -1; 25 26 int64_t GetNextCallbackTime(Module* module, int64_t time_now) { 27 int64_t interval = module->TimeUntilNextProcess(); 28 if (interval < 0) { 29 // Falling behind, we should call the callback now. 30 return time_now; 31 } 32 return time_now + interval; 33 } 34 } 35 36 ProcessThread::~ProcessThread() {} 37 38 // static 39 rtc::scoped_ptr<ProcessThread> ProcessThread::Create( 40 const char* thread_name) { 41 return rtc::scoped_ptr<ProcessThread>(new ProcessThreadImpl(thread_name)); 42 } 43 44 ProcessThreadImpl::ProcessThreadImpl(const char* thread_name) 45 : wake_up_(EventWrapper::Create()), 46 stop_(false), 47 thread_name_(thread_name) {} 48 49 ProcessThreadImpl::~ProcessThreadImpl() { 50 RTC_DCHECK(thread_checker_.CalledOnValidThread()); 51 RTC_DCHECK(!thread_.get()); 52 RTC_DCHECK(!stop_); 53 54 while (!queue_.empty()) { 55 delete queue_.front(); 56 queue_.pop(); 57 } 58 } 59 60 void ProcessThreadImpl::Start() { 61 RTC_DCHECK(thread_checker_.CalledOnValidThread()); 62 RTC_DCHECK(!thread_.get()); 63 if (thread_.get()) 64 return; 65 66 RTC_DCHECK(!stop_); 67 68 { 69 // TODO(tommi): Since DeRegisterModule is currently being called from 70 // different threads in some cases (ChannelOwner), we need to lock access to 71 // the modules_ collection even on the controller thread. 72 // Once we've cleaned up those places, we can remove this lock. 73 rtc::CritScope lock(&lock_); 74 for (ModuleCallback& m : modules_) 75 m.module->ProcessThreadAttached(this); 76 } 77 78 thread_.reset( 79 new rtc::PlatformThread(&ProcessThreadImpl::Run, this, thread_name_)); 80 thread_->Start(); 81 } 82 83 void ProcessThreadImpl::Stop() { 84 RTC_DCHECK(thread_checker_.CalledOnValidThread()); 85 if(!thread_.get()) 86 return; 87 88 { 89 rtc::CritScope lock(&lock_); 90 stop_ = true; 91 } 92 93 wake_up_->Set(); 94 95 thread_->Stop(); 96 stop_ = false; 97 98 // TODO(tommi): Since DeRegisterModule is currently being called from 99 // different threads in some cases (ChannelOwner), we need to lock access to 100 // the modules_ collection even on the controller thread. 101 // Since DeRegisterModule also checks thread_, we also need to hold the 102 // lock for the .reset() operation. 103 // Once we've cleaned up those places, we can remove this lock. 104 rtc::CritScope lock(&lock_); 105 thread_.reset(); 106 for (ModuleCallback& m : modules_) 107 m.module->ProcessThreadAttached(nullptr); 108 } 109 110 void ProcessThreadImpl::WakeUp(Module* module) { 111 // Allowed to be called on any thread. 112 { 113 rtc::CritScope lock(&lock_); 114 for (ModuleCallback& m : modules_) { 115 if (m.module == module) 116 m.next_callback = kCallProcessImmediately; 117 } 118 } 119 wake_up_->Set(); 120 } 121 122 void ProcessThreadImpl::PostTask(rtc::scoped_ptr<ProcessTask> task) { 123 // Allowed to be called on any thread. 124 { 125 rtc::CritScope lock(&lock_); 126 queue_.push(task.release()); 127 } 128 wake_up_->Set(); 129 } 130 131 void ProcessThreadImpl::RegisterModule(Module* module) { 132 RTC_DCHECK(thread_checker_.CalledOnValidThread()); 133 RTC_DCHECK(module); 134 135 #if (!defined(NDEBUG) || defined(DCHECK_ALWAYS_ON)) 136 { 137 // Catch programmer error. 138 rtc::CritScope lock(&lock_); 139 for (const ModuleCallback& mc : modules_) 140 RTC_DCHECK(mc.module != module); 141 } 142 #endif 143 144 // Now that we know the module isn't in the list, we'll call out to notify 145 // the module that it's attached to the worker thread. We don't hold 146 // the lock while we make this call. 147 if (thread_.get()) 148 module->ProcessThreadAttached(this); 149 150 { 151 rtc::CritScope lock(&lock_); 152 modules_.push_back(ModuleCallback(module)); 153 } 154 155 // Wake the thread calling ProcessThreadImpl::Process() to update the 156 // waiting time. The waiting time for the just registered module may be 157 // shorter than all other registered modules. 158 wake_up_->Set(); 159 } 160 161 void ProcessThreadImpl::DeRegisterModule(Module* module) { 162 // Allowed to be called on any thread. 163 // TODO(tommi): Disallow this ^^^ 164 RTC_DCHECK(module); 165 166 { 167 rtc::CritScope lock(&lock_); 168 modules_.remove_if([&module](const ModuleCallback& m) { 169 return m.module == module; 170 }); 171 172 // TODO(tommi): we currently need to hold the lock while calling out to 173 // ProcessThreadAttached. This is to make sure that the thread hasn't been 174 // destroyed while we attach the module. Once we can make sure 175 // DeRegisterModule isn't being called on arbitrary threads, we can move the 176 // |if (thread_.get())| check and ProcessThreadAttached() call outside the 177 // lock scope. 178 179 // Notify the module that it's been detached. 180 if (thread_.get()) 181 module->ProcessThreadAttached(nullptr); 182 } 183 } 184 185 // static 186 bool ProcessThreadImpl::Run(void* obj) { 187 return static_cast<ProcessThreadImpl*>(obj)->Process(); 188 } 189 190 bool ProcessThreadImpl::Process() { 191 int64_t now = TickTime::MillisecondTimestamp(); 192 int64_t next_checkpoint = now + (1000 * 60); 193 194 { 195 rtc::CritScope lock(&lock_); 196 if (stop_) 197 return false; 198 for (ModuleCallback& m : modules_) { 199 // TODO(tommi): Would be good to measure the time TimeUntilNextProcess 200 // takes and dcheck if it takes too long (e.g. >=10ms). Ideally this 201 // operation should not require taking a lock, so querying all modules 202 // should run in a matter of nanoseconds. 203 if (m.next_callback == 0) 204 m.next_callback = GetNextCallbackTime(m.module, now); 205 206 if (m.next_callback <= now || 207 m.next_callback == kCallProcessImmediately) { 208 m.module->Process(); 209 // Use a new 'now' reference to calculate when the next callback 210 // should occur. We'll continue to use 'now' above for the baseline 211 // of calculating how long we should wait, to reduce variance. 212 int64_t new_now = TickTime::MillisecondTimestamp(); 213 m.next_callback = GetNextCallbackTime(m.module, new_now); 214 } 215 216 if (m.next_callback < next_checkpoint) 217 next_checkpoint = m.next_callback; 218 } 219 220 while (!queue_.empty()) { 221 ProcessTask* task = queue_.front(); 222 queue_.pop(); 223 lock_.Leave(); 224 task->Run(); 225 delete task; 226 lock_.Enter(); 227 } 228 } 229 230 int64_t time_to_wait = next_checkpoint - TickTime::MillisecondTimestamp(); 231 if (time_to_wait > 0) 232 wake_up_->Wait(static_cast<unsigned long>(time_to_wait)); 233 234 return true; 235 } 236 } // namespace webrtc 237