1 // Copyright (c) 2010 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/proxy/multi_threaded_proxy_resolver.h" 6 7 #include "base/message_loop.h" 8 #include "base/string_util.h" 9 #include "base/stringprintf.h" 10 #include "base/threading/thread.h" 11 #include "base/threading/thread_restrictions.h" 12 #include "net/base/net_errors.h" 13 #include "net/base/net_log.h" 14 #include "net/proxy/proxy_info.h" 15 16 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script 17 // data when SetPacScript fails. That will reclaim memory when 18 // testing bogus scripts. 19 20 namespace net { 21 22 namespace { 23 24 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> { 25 public: 26 explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {} 27 void PurgeMemory() { resolver_->PurgeMemory(); } 28 private: 29 friend class base::RefCountedThreadSafe<PurgeMemoryTask>; 30 ~PurgeMemoryTask() {} 31 ProxyResolver* resolver_; 32 }; 33 34 } // namespace 35 36 // An "executor" is a job-runner for PAC requests. It encapsulates a worker 37 // thread and a synchronous ProxyResolver (which will be operated on said 38 // thread.) 39 class MultiThreadedProxyResolver::Executor 40 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { 41 public: 42 // |coordinator| must remain valid throughout our lifetime. It is used to 43 // signal when the executor is ready to receive work by calling 44 // |coordinator->OnExecutorReady()|. 45 // The constructor takes ownership of |resolver|. 46 // |thread_number| is an identifier used when naming the worker thread. 47 Executor(MultiThreadedProxyResolver* coordinator, 48 ProxyResolver* resolver, 49 int thread_number); 50 51 // Submit a job to this executor. 52 void StartJob(Job* job); 53 54 // Callback for when a job has completed running on the executor's thread. 55 void OnJobCompleted(Job* job); 56 57 // Cleanup the executor. Cancels all outstanding work, and frees the thread 58 // and resolver. 59 void Destroy(); 60 61 void PurgeMemory(); 62 63 // Returns the outstanding job, or NULL. 64 Job* outstanding_job() const { return outstanding_job_.get(); } 65 66 ProxyResolver* resolver() { return resolver_.get(); } 67 68 int thread_number() const { return thread_number_; } 69 70 private: 71 friend class base::RefCountedThreadSafe<Executor>; 72 ~Executor(); 73 74 MultiThreadedProxyResolver* coordinator_; 75 const int thread_number_; 76 77 // The currently active job for this executor (either a SetPacScript or 78 // GetProxyForURL task). 79 scoped_refptr<Job> outstanding_job_; 80 81 // The synchronous resolver implementation. 82 scoped_ptr<ProxyResolver> resolver_; 83 84 // The thread where |resolver_| is run on. 85 // Note that declaration ordering is important here. |thread_| needs to be 86 // destroyed *before* |resolver_|, in case |resolver_| is currently 87 // executing on |thread_|. 88 scoped_ptr<base::Thread> thread_; 89 }; 90 91 // MultiThreadedProxyResolver::Job --------------------------------------------- 92 93 class MultiThreadedProxyResolver::Job 94 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { 95 public: 96 // Identifies the subclass of Job (only being used for debugging purposes). 97 enum Type { 98 TYPE_GET_PROXY_FOR_URL, 99 TYPE_SET_PAC_SCRIPT, 100 TYPE_SET_PAC_SCRIPT_INTERNAL, 101 }; 102 103 Job(Type type, CompletionCallback* user_callback) 104 : type_(type), 105 user_callback_(user_callback), 106 executor_(NULL), 107 was_cancelled_(false) { 108 } 109 110 void set_executor(Executor* executor) { 111 executor_ = executor; 112 } 113 114 // The "executor" is the job runner that is scheduling this job. If 115 // this job has not been submitted to an executor yet, this will be 116 // NULL (and we know it hasn't started yet). 117 Executor* executor() { 118 return executor_; 119 } 120 121 // Mark the job as having been cancelled. 122 void Cancel() { 123 was_cancelled_ = true; 124 } 125 126 // Returns true if Cancel() has been called. 127 bool was_cancelled() const { return was_cancelled_; } 128 129 Type type() const { return type_; } 130 131 // Returns true if this job still has a user callback. Some jobs 132 // do not have a user callback, because they were helper jobs 133 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). 134 // 135 // Otherwise jobs that correspond with user-initiated work will 136 // have a non-NULL callback up until the callback is run. 137 bool has_user_callback() const { return user_callback_ != NULL; } 138 139 // This method is called when the job is inserted into a wait queue 140 // because no executors were ready to accept it. 141 virtual void WaitingForThread() {} 142 143 // This method is called just before the job is posted to the work thread. 144 virtual void FinishedWaitingForThread() {} 145 146 // This method is called on the worker thread to do the job's work. On 147 // completion, implementors are expected to call OnJobCompleted() on 148 // |origin_loop|. 149 virtual void Run(MessageLoop* origin_loop) = 0; 150 151 protected: 152 void OnJobCompleted() { 153 // |executor_| will be NULL if the executor has already been deleted. 154 if (executor_) 155 executor_->OnJobCompleted(this); 156 } 157 158 void RunUserCallback(int result) { 159 DCHECK(has_user_callback()); 160 CompletionCallback* callback = user_callback_; 161 // Null the callback so has_user_callback() will now return false. 162 user_callback_ = NULL; 163 callback->Run(result); 164 } 165 166 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; 167 168 virtual ~Job() {} 169 170 private: 171 const Type type_; 172 CompletionCallback* user_callback_; 173 Executor* executor_; 174 bool was_cancelled_; 175 }; 176 177 // MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- 178 179 // Runs on the worker thread to call ProxyResolver::SetPacScript. 180 class MultiThreadedProxyResolver::SetPacScriptJob 181 : public MultiThreadedProxyResolver::Job { 182 public: 183 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, 184 CompletionCallback* callback) 185 : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL, 186 callback), 187 script_data_(script_data) { 188 } 189 190 // Runs on the worker thread. 191 virtual void Run(MessageLoop* origin_loop) { 192 ProxyResolver* resolver = executor()->resolver(); 193 int rv = resolver->SetPacScript(script_data_, NULL); 194 195 DCHECK_NE(rv, ERR_IO_PENDING); 196 origin_loop->PostTask( 197 FROM_HERE, 198 NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv)); 199 } 200 201 private: 202 // Runs the completion callback on the origin thread. 203 void RequestComplete(int result_code) { 204 // The task may have been cancelled after it was started. 205 if (!was_cancelled() && has_user_callback()) { 206 RunUserCallback(result_code); 207 } 208 OnJobCompleted(); 209 } 210 211 const scoped_refptr<ProxyResolverScriptData> script_data_; 212 }; 213 214 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ 215 216 class MultiThreadedProxyResolver::GetProxyForURLJob 217 : public MultiThreadedProxyResolver::Job { 218 public: 219 // |url| -- the URL of the query. 220 // |results| -- the structure to fill with proxy resolve results. 221 GetProxyForURLJob(const GURL& url, 222 ProxyInfo* results, 223 CompletionCallback* callback, 224 const BoundNetLog& net_log) 225 : Job(TYPE_GET_PROXY_FOR_URL, callback), 226 results_(results), 227 net_log_(net_log), 228 url_(url), 229 was_waiting_for_thread_(false) { 230 DCHECK(callback); 231 } 232 233 BoundNetLog* net_log() { return &net_log_; } 234 235 virtual void WaitingForThread() { 236 was_waiting_for_thread_ = true; 237 net_log_.BeginEvent( 238 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); 239 } 240 241 virtual void FinishedWaitingForThread() { 242 DCHECK(executor()); 243 244 if (was_waiting_for_thread_) { 245 net_log_.EndEvent( 246 NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL); 247 } 248 249 net_log_.AddEvent( 250 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, 251 make_scoped_refptr(new NetLogIntegerParameter( 252 "thread_number", executor()->thread_number()))); 253 } 254 255 // Runs on the worker thread. 256 virtual void Run(MessageLoop* origin_loop) { 257 ProxyResolver* resolver = executor()->resolver(); 258 int rv = resolver->GetProxyForURL( 259 url_, &results_buf_, NULL, NULL, net_log_); 260 DCHECK_NE(rv, ERR_IO_PENDING); 261 262 origin_loop->PostTask( 263 FROM_HERE, 264 NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv)); 265 } 266 267 private: 268 // Runs the completion callback on the origin thread. 269 void QueryComplete(int result_code) { 270 // The Job may have been cancelled after it was started. 271 if (!was_cancelled()) { 272 if (result_code >= OK) { // Note: unit-tests use values > 0. 273 results_->Use(results_buf_); 274 } 275 RunUserCallback(result_code); 276 } 277 OnJobCompleted(); 278 } 279 280 // Must only be used on the "origin" thread. 281 ProxyInfo* results_; 282 283 // Can be used on either "origin" or worker thread. 284 BoundNetLog net_log_; 285 const GURL url_; 286 287 // Usable from within DoQuery on the worker thread. 288 ProxyInfo results_buf_; 289 290 bool was_waiting_for_thread_; 291 }; 292 293 // MultiThreadedProxyResolver::Executor ---------------------------------------- 294 295 MultiThreadedProxyResolver::Executor::Executor( 296 MultiThreadedProxyResolver* coordinator, 297 ProxyResolver* resolver, 298 int thread_number) 299 : coordinator_(coordinator), 300 thread_number_(thread_number), 301 resolver_(resolver) { 302 DCHECK(coordinator); 303 DCHECK(resolver); 304 // Start up the thread. 305 // Note that it is safe to pass a temporary C-String to Thread(), as it will 306 // make a copy. 307 std::string thread_name = 308 base::StringPrintf("PAC thread #%d", thread_number); 309 thread_.reset(new base::Thread(thread_name.c_str())); 310 CHECK(thread_->Start()); 311 } 312 313 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { 314 DCHECK(!outstanding_job_); 315 outstanding_job_ = job; 316 317 // Run the job. Once it has completed (regardless of whether it was 318 // cancelled), it will invoke OnJobCompleted() on this thread. 319 job->set_executor(this); 320 job->FinishedWaitingForThread(); 321 thread_->message_loop()->PostTask( 322 FROM_HERE, 323 NewRunnableMethod(job, &Job::Run, MessageLoop::current())); 324 } 325 326 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { 327 DCHECK_EQ(job, outstanding_job_.get()); 328 outstanding_job_ = NULL; 329 coordinator_->OnExecutorReady(this); 330 } 331 332 void MultiThreadedProxyResolver::Executor::Destroy() { 333 DCHECK(coordinator_); 334 335 // Give the resolver an opportunity to shutdown from THIS THREAD before 336 // joining on the resolver thread. This allows certain implementations 337 // to avoid deadlocks. 338 resolver_->Shutdown(); 339 340 { 341 // See http://crbug.com/69710. 342 base::ThreadRestrictions::ScopedAllowIO allow_io; 343 344 // Join the worker thread. 345 thread_.reset(); 346 } 347 348 // Cancel any outstanding job. 349 if (outstanding_job_) { 350 outstanding_job_->Cancel(); 351 // Orphan the job (since this executor may be deleted soon). 352 outstanding_job_->set_executor(NULL); 353 } 354 355 // It is now safe to free the ProxyResolver, since all the tasks that 356 // were using it on the resolver thread have completed. 357 resolver_.reset(); 358 359 // Null some stuff as a precaution. 360 coordinator_ = NULL; 361 outstanding_job_ = NULL; 362 } 363 364 void MultiThreadedProxyResolver::Executor::PurgeMemory() { 365 scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get())); 366 thread_->message_loop()->PostTask( 367 FROM_HERE, 368 NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory)); 369 } 370 371 MultiThreadedProxyResolver::Executor::~Executor() { 372 // The important cleanup happens as part of Destroy(), which should always be 373 // called first. 374 DCHECK(!coordinator_) << "Destroy() was not called"; 375 DCHECK(!thread_.get()); 376 DCHECK(!resolver_.get()); 377 DCHECK(!outstanding_job_); 378 } 379 380 // MultiThreadedProxyResolver -------------------------------------------------- 381 382 MultiThreadedProxyResolver::MultiThreadedProxyResolver( 383 ProxyResolverFactory* resolver_factory, 384 size_t max_num_threads) 385 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), 386 resolver_factory_(resolver_factory), 387 max_num_threads_(max_num_threads) { 388 DCHECK_GE(max_num_threads, 1u); 389 } 390 391 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { 392 // We will cancel all outstanding requests. 393 pending_jobs_.clear(); 394 ReleaseAllExecutors(); 395 } 396 397 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url, 398 ProxyInfo* results, 399 CompletionCallback* callback, 400 RequestHandle* request, 401 const BoundNetLog& net_log) { 402 DCHECK(CalledOnValidThread()); 403 DCHECK(callback); 404 DCHECK(current_script_data_.get()) 405 << "Resolver is un-initialized. Must call SetPacScript() first!"; 406 407 scoped_refptr<GetProxyForURLJob> job( 408 new GetProxyForURLJob(url, results, callback, net_log)); 409 410 // Completion will be notified through |callback|, unless the caller cancels 411 // the request using |request|. 412 if (request) 413 *request = reinterpret_cast<RequestHandle>(job.get()); 414 415 // If there is an executor that is ready to run this request, submit it! 416 Executor* executor = FindIdleExecutor(); 417 if (executor) { 418 DCHECK_EQ(0u, pending_jobs_.size()); 419 executor->StartJob(job); 420 return ERR_IO_PENDING; 421 } 422 423 // Otherwise queue this request. (We will schedule it to a thread once one 424 // becomes available). 425 job->WaitingForThread(); 426 pending_jobs_.push_back(job); 427 428 // If we haven't already reached the thread limit, provision a new thread to 429 // drain the requests more quickly. 430 if (executors_.size() < max_num_threads_) { 431 executor = AddNewExecutor(); 432 executor->StartJob( 433 new SetPacScriptJob(current_script_data_, NULL)); 434 } 435 436 return ERR_IO_PENDING; 437 } 438 439 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { 440 DCHECK(CalledOnValidThread()); 441 DCHECK(req); 442 443 Job* job = reinterpret_cast<Job*>(req); 444 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); 445 446 if (job->executor()) { 447 // If the job was already submitted to the executor, just mark it 448 // as cancelled so the user callback isn't run on completion. 449 job->Cancel(); 450 } else { 451 // Otherwise the job is just sitting in a queue. 452 PendingJobsQueue::iterator it = 453 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); 454 DCHECK(it != pending_jobs_.end()); 455 pending_jobs_.erase(it); 456 } 457 } 458 459 void MultiThreadedProxyResolver::CancelSetPacScript() { 460 DCHECK(CalledOnValidThread()); 461 DCHECK_EQ(0u, pending_jobs_.size()); 462 DCHECK_EQ(1u, executors_.size()); 463 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, 464 executors_[0]->outstanding_job()->type()); 465 466 // Defensively clear some data which shouldn't be getting used 467 // anymore. 468 current_script_data_ = NULL; 469 470 ReleaseAllExecutors(); 471 } 472 473 void MultiThreadedProxyResolver::PurgeMemory() { 474 DCHECK(CalledOnValidThread()); 475 for (ExecutorList::iterator it = executors_.begin(); 476 it != executors_.end(); ++it) { 477 Executor* executor = *it; 478 executor->PurgeMemory(); 479 } 480 } 481 482 int MultiThreadedProxyResolver::SetPacScript( 483 const scoped_refptr<ProxyResolverScriptData>& script_data, 484 CompletionCallback* callback) { 485 DCHECK(CalledOnValidThread()); 486 DCHECK(callback); 487 488 // Save the script details, so we can provision new executors later. 489 current_script_data_ = script_data; 490 491 // The user should not have any outstanding requests when they call 492 // SetPacScript(). 493 CheckNoOutstandingUserRequests(); 494 495 // Destroy all of the current threads and their proxy resolvers. 496 ReleaseAllExecutors(); 497 498 // Provision a new executor, and run the SetPacScript request. On completion 499 // notification will be sent through |callback|. 500 Executor* executor = AddNewExecutor(); 501 executor->StartJob(new SetPacScriptJob(script_data, callback)); 502 return ERR_IO_PENDING; 503 } 504 505 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { 506 DCHECK(CalledOnValidThread()); 507 CHECK_EQ(0u, pending_jobs_.size()); 508 509 for (ExecutorList::const_iterator it = executors_.begin(); 510 it != executors_.end(); ++it) { 511 const Executor* executor = *it; 512 Job* job = executor->outstanding_job(); 513 // The "has_user_callback()" is to exclude jobs for which the callback 514 // has already been invoked, or was not user-initiated (as in the case of 515 // lazy thread provisions). User-initiated jobs may !has_user_callback() 516 // when the callback has already been run. (Since we only clear the 517 // outstanding job AFTER the callback has been invoked, it is possible 518 // for a new request to be started from within the callback). 519 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); 520 } 521 } 522 523 void MultiThreadedProxyResolver::ReleaseAllExecutors() { 524 DCHECK(CalledOnValidThread()); 525 for (ExecutorList::iterator it = executors_.begin(); 526 it != executors_.end(); ++it) { 527 Executor* executor = *it; 528 executor->Destroy(); 529 } 530 executors_.clear(); 531 } 532 533 MultiThreadedProxyResolver::Executor* 534 MultiThreadedProxyResolver::FindIdleExecutor() { 535 DCHECK(CalledOnValidThread()); 536 for (ExecutorList::iterator it = executors_.begin(); 537 it != executors_.end(); ++it) { 538 Executor* executor = *it; 539 if (!executor->outstanding_job()) 540 return executor; 541 } 542 return NULL; 543 } 544 545 MultiThreadedProxyResolver::Executor* 546 MultiThreadedProxyResolver::AddNewExecutor() { 547 DCHECK(CalledOnValidThread()); 548 DCHECK_LT(executors_.size(), max_num_threads_); 549 // The "thread number" is used to give the thread a unique name. 550 int thread_number = executors_.size(); 551 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); 552 Executor* executor = new Executor( 553 this, resolver, thread_number); 554 executors_.push_back(make_scoped_refptr(executor)); 555 return executor; 556 } 557 558 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { 559 DCHECK(CalledOnValidThread()); 560 if (pending_jobs_.empty()) 561 return; 562 563 // Get the next job to process (FIFO). Transfer it from the pending queue 564 // to the executor. 565 scoped_refptr<Job> job = pending_jobs_.front(); 566 pending_jobs_.pop_front(); 567 executor->StartJob(job); 568 } 569 570 } // namespace net 571