1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/proxy/multi_threaded_proxy_resolver.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "base/metrics/histogram.h" 11 #include "base/strings/string_util.h" 12 #include "base/strings/stringprintf.h" 13 #include "base/threading/thread.h" 14 #include "base/threading/thread_restrictions.h" 15 #include "net/base/net_errors.h" 16 #include "net/base/net_log.h" 17 #include "net/proxy/proxy_info.h" 18 19 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script 20 // data when SetPacScript fails. That will reclaim memory when 21 // testing bogus scripts. 22 23 namespace net { 24 25 // An "executor" is a job-runner for PAC requests. It encapsulates a worker 26 // thread and a synchronous ProxyResolver (which will be operated on said 27 // thread.) 28 class MultiThreadedProxyResolver::Executor 29 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { 30 public: 31 // |coordinator| must remain valid throughout our lifetime. It is used to 32 // signal when the executor is ready to receive work by calling 33 // |coordinator->OnExecutorReady()|. 34 // The constructor takes ownership of |resolver|. 35 // |thread_number| is an identifier used when naming the worker thread. 36 Executor(MultiThreadedProxyResolver* coordinator, 37 ProxyResolver* resolver, 38 int thread_number); 39 40 // Submit a job to this executor. 41 void StartJob(Job* job); 42 43 // Callback for when a job has completed running on the executor's thread. 44 void OnJobCompleted(Job* job); 45 46 // Cleanup the executor. Cancels all outstanding work, and frees the thread 47 // and resolver. 48 void Destroy(); 49 50 void PurgeMemory(); 51 52 // Returns the outstanding job, or NULL. 53 Job* outstanding_job() const { return outstanding_job_.get(); } 54 55 ProxyResolver* resolver() { return resolver_.get(); } 56 57 int thread_number() const { return thread_number_; } 58 59 private: 60 friend class base::RefCountedThreadSafe<Executor>; 61 ~Executor(); 62 63 MultiThreadedProxyResolver* coordinator_; 64 const int thread_number_; 65 66 // The currently active job for this executor (either a SetPacScript or 67 // GetProxyForURL task). 68 scoped_refptr<Job> outstanding_job_; 69 70 // The synchronous resolver implementation. 71 scoped_ptr<ProxyResolver> resolver_; 72 73 // The thread where |resolver_| is run on. 74 // Note that declaration ordering is important here. |thread_| needs to be 75 // destroyed *before* |resolver_|, in case |resolver_| is currently 76 // executing on |thread_|. 77 scoped_ptr<base::Thread> thread_; 78 }; 79 80 // MultiThreadedProxyResolver::Job --------------------------------------------- 81 82 class MultiThreadedProxyResolver::Job 83 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { 84 public: 85 // Identifies the subclass of Job (only being used for debugging purposes). 86 enum Type { 87 TYPE_GET_PROXY_FOR_URL, 88 TYPE_SET_PAC_SCRIPT, 89 TYPE_SET_PAC_SCRIPT_INTERNAL, 90 }; 91 92 Job(Type type, const CompletionCallback& callback) 93 : type_(type), 94 callback_(callback), 95 executor_(NULL), 96 was_cancelled_(false) { 97 } 98 99 void set_executor(Executor* executor) { 100 executor_ = executor; 101 } 102 103 // The "executor" is the job runner that is scheduling this job. If 104 // this job has not been submitted to an executor yet, this will be 105 // NULL (and we know it hasn't started yet). 106 Executor* executor() { 107 return executor_; 108 } 109 110 // Mark the job as having been cancelled. 111 void Cancel() { 112 was_cancelled_ = true; 113 } 114 115 // Returns true if Cancel() has been called. 116 bool was_cancelled() const { return was_cancelled_; } 117 118 Type type() const { return type_; } 119 120 // Returns true if this job still has a user callback. Some jobs 121 // do not have a user callback, because they were helper jobs 122 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). 123 // 124 // Otherwise jobs that correspond with user-initiated work will 125 // have a non-null callback up until the callback is run. 126 bool has_user_callback() const { return !callback_.is_null(); } 127 128 // This method is called when the job is inserted into a wait queue 129 // because no executors were ready to accept it. 130 virtual void WaitingForThread() {} 131 132 // This method is called just before the job is posted to the work thread. 133 virtual void FinishedWaitingForThread() {} 134 135 // This method is called on the worker thread to do the job's work. On 136 // completion, implementors are expected to call OnJobCompleted() on 137 // |origin_loop|. 138 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0; 139 140 protected: 141 void OnJobCompleted() { 142 // |executor_| will be NULL if the executor has already been deleted. 143 if (executor_) 144 executor_->OnJobCompleted(this); 145 } 146 147 void RunUserCallback(int result) { 148 DCHECK(has_user_callback()); 149 CompletionCallback callback = callback_; 150 // Reset the callback so has_user_callback() will now return false. 151 callback_.Reset(); 152 callback.Run(result); 153 } 154 155 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; 156 157 virtual ~Job() {} 158 159 private: 160 const Type type_; 161 CompletionCallback callback_; 162 Executor* executor_; 163 bool was_cancelled_; 164 }; 165 166 // MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- 167 168 // Runs on the worker thread to call ProxyResolver::SetPacScript. 169 class MultiThreadedProxyResolver::SetPacScriptJob 170 : public MultiThreadedProxyResolver::Job { 171 public: 172 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, 173 const CompletionCallback& callback) 174 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT : 175 TYPE_SET_PAC_SCRIPT_INTERNAL, 176 callback), 177 script_data_(script_data) { 178 } 179 180 // Runs on the worker thread. 181 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE { 182 ProxyResolver* resolver = executor()->resolver(); 183 int rv = resolver->SetPacScript(script_data_, CompletionCallback()); 184 185 DCHECK_NE(rv, ERR_IO_PENDING); 186 origin_loop->PostTask( 187 FROM_HERE, 188 base::Bind(&SetPacScriptJob::RequestComplete, this, rv)); 189 } 190 191 protected: 192 virtual ~SetPacScriptJob() {} 193 194 private: 195 // Runs the completion callback on the origin thread. 196 void RequestComplete(int result_code) { 197 // The task may have been cancelled after it was started. 198 if (!was_cancelled() && has_user_callback()) { 199 RunUserCallback(result_code); 200 } 201 OnJobCompleted(); 202 } 203 204 const scoped_refptr<ProxyResolverScriptData> script_data_; 205 }; 206 207 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ 208 209 class MultiThreadedProxyResolver::GetProxyForURLJob 210 : public MultiThreadedProxyResolver::Job { 211 public: 212 // |url| -- the URL of the query. 213 // |results| -- the structure to fill with proxy resolve results. 214 GetProxyForURLJob(const GURL& url, 215 ProxyInfo* results, 216 const CompletionCallback& callback, 217 const BoundNetLog& net_log) 218 : Job(TYPE_GET_PROXY_FOR_URL, callback), 219 results_(results), 220 net_log_(net_log), 221 url_(url), 222 was_waiting_for_thread_(false) { 223 DCHECK(!callback.is_null()); 224 start_time_ = base::TimeTicks::Now(); 225 } 226 227 BoundNetLog* net_log() { return &net_log_; } 228 229 virtual void WaitingForThread() OVERRIDE { 230 was_waiting_for_thread_ = true; 231 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); 232 } 233 234 virtual void FinishedWaitingForThread() OVERRIDE { 235 DCHECK(executor()); 236 237 submitted_to_thread_time_ = base::TimeTicks::Now(); 238 239 if (was_waiting_for_thread_) { 240 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); 241 } 242 243 net_log_.AddEvent( 244 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, 245 NetLog::IntegerCallback("thread_number", executor()->thread_number())); 246 } 247 248 // Runs on the worker thread. 249 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE { 250 ProxyResolver* resolver = executor()->resolver(); 251 int rv = resolver->GetProxyForURL( 252 url_, &results_buf_, CompletionCallback(), NULL, net_log_); 253 DCHECK_NE(rv, ERR_IO_PENDING); 254 255 origin_loop->PostTask( 256 FROM_HERE, 257 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv)); 258 } 259 260 protected: 261 virtual ~GetProxyForURLJob() {} 262 263 private: 264 // Runs the completion callback on the origin thread. 265 void QueryComplete(int result_code) { 266 // The Job may have been cancelled after it was started. 267 if (!was_cancelled()) { 268 RecordPerformanceMetrics(); 269 if (result_code >= OK) { // Note: unit-tests use values > 0. 270 results_->Use(results_buf_); 271 } 272 RunUserCallback(result_code); 273 } 274 OnJobCompleted(); 275 } 276 277 void RecordPerformanceMetrics() { 278 DCHECK(!was_cancelled()); 279 280 base::TimeTicks now = base::TimeTicks::Now(); 281 282 // Log the total time the request took to complete. 283 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time", 284 now - start_time_); 285 286 // Log the time the request was stalled waiting for a thread to free up. 287 UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time", 288 submitted_to_thread_time_ - start_time_); 289 } 290 291 // Must only be used on the "origin" thread. 292 ProxyInfo* results_; 293 294 // Can be used on either "origin" or worker thread. 295 BoundNetLog net_log_; 296 const GURL url_; 297 298 // Usable from within DoQuery on the worker thread. 299 ProxyInfo results_buf_; 300 301 base::TimeTicks start_time_; 302 base::TimeTicks submitted_to_thread_time_; 303 304 bool was_waiting_for_thread_; 305 }; 306 307 // MultiThreadedProxyResolver::Executor ---------------------------------------- 308 309 MultiThreadedProxyResolver::Executor::Executor( 310 MultiThreadedProxyResolver* coordinator, 311 ProxyResolver* resolver, 312 int thread_number) 313 : coordinator_(coordinator), 314 thread_number_(thread_number), 315 resolver_(resolver) { 316 DCHECK(coordinator); 317 DCHECK(resolver); 318 // Start up the thread. 319 // Note that it is safe to pass a temporary C-String to Thread(), as it will 320 // make a copy. 321 std::string thread_name = 322 base::StringPrintf("PAC thread #%d", thread_number); 323 thread_.reset(new base::Thread(thread_name.c_str())); 324 CHECK(thread_->Start()); 325 } 326 327 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { 328 DCHECK(!outstanding_job_.get()); 329 outstanding_job_ = job; 330 331 // Run the job. Once it has completed (regardless of whether it was 332 // cancelled), it will invoke OnJobCompleted() on this thread. 333 job->set_executor(this); 334 job->FinishedWaitingForThread(); 335 thread_->message_loop()->PostTask( 336 FROM_HERE, 337 base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); 338 } 339 340 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { 341 DCHECK_EQ(job, outstanding_job_.get()); 342 outstanding_job_ = NULL; 343 coordinator_->OnExecutorReady(this); 344 } 345 346 void MultiThreadedProxyResolver::Executor::Destroy() { 347 DCHECK(coordinator_); 348 349 { 350 // See http://crbug.com/69710. 351 base::ThreadRestrictions::ScopedAllowIO allow_io; 352 353 // Join the worker thread. 354 thread_.reset(); 355 } 356 357 // Cancel any outstanding job. 358 if (outstanding_job_.get()) { 359 outstanding_job_->Cancel(); 360 // Orphan the job (since this executor may be deleted soon). 361 outstanding_job_->set_executor(NULL); 362 } 363 364 // It is now safe to free the ProxyResolver, since all the tasks that 365 // were using it on the resolver thread have completed. 366 resolver_.reset(); 367 368 // Null some stuff as a precaution. 369 coordinator_ = NULL; 370 outstanding_job_ = NULL; 371 } 372 373 void MultiThreadedProxyResolver::Executor::PurgeMemory() { 374 thread_->message_loop()->PostTask( 375 FROM_HERE, 376 base::Bind(&ProxyResolver::PurgeMemory, 377 base::Unretained(resolver_.get()))); 378 } 379 380 MultiThreadedProxyResolver::Executor::~Executor() { 381 // The important cleanup happens as part of Destroy(), which should always be 382 // called first. 383 DCHECK(!coordinator_) << "Destroy() was not called"; 384 DCHECK(!thread_.get()); 385 DCHECK(!resolver_.get()); 386 DCHECK(!outstanding_job_.get()); 387 } 388 389 // MultiThreadedProxyResolver -------------------------------------------------- 390 391 MultiThreadedProxyResolver::MultiThreadedProxyResolver( 392 ProxyResolverFactory* resolver_factory, 393 size_t max_num_threads) 394 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), 395 resolver_factory_(resolver_factory), 396 max_num_threads_(max_num_threads) { 397 DCHECK_GE(max_num_threads, 1u); 398 } 399 400 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { 401 // We will cancel all outstanding requests. 402 pending_jobs_.clear(); 403 ReleaseAllExecutors(); 404 } 405 406 int MultiThreadedProxyResolver::GetProxyForURL( 407 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, 408 RequestHandle* request, const BoundNetLog& net_log) { 409 DCHECK(CalledOnValidThread()); 410 DCHECK(!callback.is_null()); 411 DCHECK(current_script_data_.get()) 412 << "Resolver is un-initialized. Must call SetPacScript() first!"; 413 414 scoped_refptr<GetProxyForURLJob> job( 415 new GetProxyForURLJob(url, results, callback, net_log)); 416 417 // Completion will be notified through |callback|, unless the caller cancels 418 // the request using |request|. 419 if (request) 420 *request = reinterpret_cast<RequestHandle>(job.get()); 421 422 // If there is an executor that is ready to run this request, submit it! 423 Executor* executor = FindIdleExecutor(); 424 if (executor) { 425 DCHECK_EQ(0u, pending_jobs_.size()); 426 executor->StartJob(job.get()); 427 return ERR_IO_PENDING; 428 } 429 430 // Otherwise queue this request. (We will schedule it to a thread once one 431 // becomes available). 432 job->WaitingForThread(); 433 pending_jobs_.push_back(job); 434 435 // If we haven't already reached the thread limit, provision a new thread to 436 // drain the requests more quickly. 437 if (executors_.size() < max_num_threads_) { 438 executor = AddNewExecutor(); 439 executor->StartJob( 440 new SetPacScriptJob(current_script_data_, CompletionCallback())); 441 } 442 443 return ERR_IO_PENDING; 444 } 445 446 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { 447 DCHECK(CalledOnValidThread()); 448 DCHECK(req); 449 450 Job* job = reinterpret_cast<Job*>(req); 451 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); 452 453 if (job->executor()) { 454 // If the job was already submitted to the executor, just mark it 455 // as cancelled so the user callback isn't run on completion. 456 job->Cancel(); 457 } else { 458 // Otherwise the job is just sitting in a queue. 459 PendingJobsQueue::iterator it = 460 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); 461 DCHECK(it != pending_jobs_.end()); 462 pending_jobs_.erase(it); 463 } 464 } 465 466 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { 467 DCHECK(CalledOnValidThread()); 468 DCHECK(req); 469 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; 470 } 471 472 void MultiThreadedProxyResolver::CancelSetPacScript() { 473 DCHECK(CalledOnValidThread()); 474 DCHECK_EQ(0u, pending_jobs_.size()); 475 DCHECK_EQ(1u, executors_.size()); 476 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, 477 executors_[0]->outstanding_job()->type()); 478 479 // Defensively clear some data which shouldn't be getting used 480 // anymore. 481 current_script_data_ = NULL; 482 483 ReleaseAllExecutors(); 484 } 485 486 void MultiThreadedProxyResolver::PurgeMemory() { 487 DCHECK(CalledOnValidThread()); 488 for (ExecutorList::iterator it = executors_.begin(); 489 it != executors_.end(); ++it) { 490 Executor* executor = it->get(); 491 executor->PurgeMemory(); 492 } 493 } 494 495 int MultiThreadedProxyResolver::SetPacScript( 496 const scoped_refptr<ProxyResolverScriptData>& script_data, 497 const CompletionCallback&callback) { 498 DCHECK(CalledOnValidThread()); 499 DCHECK(!callback.is_null()); 500 501 // Save the script details, so we can provision new executors later. 502 current_script_data_ = script_data; 503 504 // The user should not have any outstanding requests when they call 505 // SetPacScript(). 506 CheckNoOutstandingUserRequests(); 507 508 // Destroy all of the current threads and their proxy resolvers. 509 ReleaseAllExecutors(); 510 511 // Provision a new executor, and run the SetPacScript request. On completion 512 // notification will be sent through |callback|. 513 Executor* executor = AddNewExecutor(); 514 executor->StartJob(new SetPacScriptJob(script_data, callback)); 515 return ERR_IO_PENDING; 516 } 517 518 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { 519 DCHECK(CalledOnValidThread()); 520 CHECK_EQ(0u, pending_jobs_.size()); 521 522 for (ExecutorList::const_iterator it = executors_.begin(); 523 it != executors_.end(); ++it) { 524 const Executor* executor = it->get(); 525 Job* job = executor->outstanding_job(); 526 // The "has_user_callback()" is to exclude jobs for which the callback 527 // has already been invoked, or was not user-initiated (as in the case of 528 // lazy thread provisions). User-initiated jobs may !has_user_callback() 529 // when the callback has already been run. (Since we only clear the 530 // outstanding job AFTER the callback has been invoked, it is possible 531 // for a new request to be started from within the callback). 532 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); 533 } 534 } 535 536 void MultiThreadedProxyResolver::ReleaseAllExecutors() { 537 DCHECK(CalledOnValidThread()); 538 for (ExecutorList::iterator it = executors_.begin(); 539 it != executors_.end(); ++it) { 540 Executor* executor = it->get(); 541 executor->Destroy(); 542 } 543 executors_.clear(); 544 } 545 546 MultiThreadedProxyResolver::Executor* 547 MultiThreadedProxyResolver::FindIdleExecutor() { 548 DCHECK(CalledOnValidThread()); 549 for (ExecutorList::iterator it = executors_.begin(); 550 it != executors_.end(); ++it) { 551 Executor* executor = it->get(); 552 if (!executor->outstanding_job()) 553 return executor; 554 } 555 return NULL; 556 } 557 558 MultiThreadedProxyResolver::Executor* 559 MultiThreadedProxyResolver::AddNewExecutor() { 560 DCHECK(CalledOnValidThread()); 561 DCHECK_LT(executors_.size(), max_num_threads_); 562 // The "thread number" is used to give the thread a unique name. 563 int thread_number = executors_.size(); 564 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); 565 Executor* executor = new Executor( 566 this, resolver, thread_number); 567 executors_.push_back(make_scoped_refptr(executor)); 568 return executor; 569 } 570 571 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { 572 DCHECK(CalledOnValidThread()); 573 if (pending_jobs_.empty()) 574 return; 575 576 // Get the next job to process (FIFO). Transfer it from the pending queue 577 // to the executor. 578 scoped_refptr<Job> job = pending_jobs_.front(); 579 pending_jobs_.pop_front(); 580 executor->StartJob(job.get()); 581 } 582 583 } // namespace net 584