1 // Copyright (c) 2012 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "net/proxy/multi_threaded_proxy_resolver.h" 6 7 #include "base/bind.h" 8 #include "base/bind_helpers.h" 9 #include "base/message_loop/message_loop_proxy.h" 10 #include "base/strings/string_util.h" 11 #include "base/strings/stringprintf.h" 12 #include "base/threading/thread.h" 13 #include "base/threading/thread_restrictions.h" 14 #include "net/base/net_errors.h" 15 #include "net/base/net_log.h" 16 #include "net/proxy/proxy_info.h" 17 18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script 19 // data when SetPacScript fails. That will reclaim memory when 20 // testing bogus scripts. 21 22 namespace net { 23 24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker 25 // thread and a synchronous ProxyResolver (which will be operated on said 26 // thread.) 27 class MultiThreadedProxyResolver::Executor 28 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > { 29 public: 30 // |coordinator| must remain valid throughout our lifetime. It is used to 31 // signal when the executor is ready to receive work by calling 32 // |coordinator->OnExecutorReady()|. 33 // The constructor takes ownership of |resolver|. 34 // |thread_number| is an identifier used when naming the worker thread. 35 Executor(MultiThreadedProxyResolver* coordinator, 36 ProxyResolver* resolver, 37 int thread_number); 38 39 // Submit a job to this executor. 40 void StartJob(Job* job); 41 42 // Callback for when a job has completed running on the executor's thread. 43 void OnJobCompleted(Job* job); 44 45 // Cleanup the executor. Cancels all outstanding work, and frees the thread 46 // and resolver. 47 void Destroy(); 48 49 // Returns the outstanding job, or NULL. 50 Job* outstanding_job() const { return outstanding_job_.get(); } 51 52 ProxyResolver* resolver() { return resolver_.get(); } 53 54 int thread_number() const { return thread_number_; } 55 56 private: 57 friend class base::RefCountedThreadSafe<Executor>; 58 ~Executor(); 59 60 MultiThreadedProxyResolver* coordinator_; 61 const int thread_number_; 62 63 // The currently active job for this executor (either a SetPacScript or 64 // GetProxyForURL task). 65 scoped_refptr<Job> outstanding_job_; 66 67 // The synchronous resolver implementation. 68 scoped_ptr<ProxyResolver> resolver_; 69 70 // The thread where |resolver_| is run on. 71 // Note that declaration ordering is important here. |thread_| needs to be 72 // destroyed *before* |resolver_|, in case |resolver_| is currently 73 // executing on |thread_|. 74 scoped_ptr<base::Thread> thread_; 75 }; 76 77 // MultiThreadedProxyResolver::Job --------------------------------------------- 78 79 class MultiThreadedProxyResolver::Job 80 : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> { 81 public: 82 // Identifies the subclass of Job (only being used for debugging purposes). 83 enum Type { 84 TYPE_GET_PROXY_FOR_URL, 85 TYPE_SET_PAC_SCRIPT, 86 TYPE_SET_PAC_SCRIPT_INTERNAL, 87 }; 88 89 Job(Type type, const CompletionCallback& callback) 90 : type_(type), 91 callback_(callback), 92 executor_(NULL), 93 was_cancelled_(false) { 94 } 95 96 void set_executor(Executor* executor) { 97 executor_ = executor; 98 } 99 100 // The "executor" is the job runner that is scheduling this job. If 101 // this job has not been submitted to an executor yet, this will be 102 // NULL (and we know it hasn't started yet). 103 Executor* executor() { 104 return executor_; 105 } 106 107 // Mark the job as having been cancelled. 108 void Cancel() { 109 was_cancelled_ = true; 110 } 111 112 // Returns true if Cancel() has been called. 113 bool was_cancelled() const { return was_cancelled_; } 114 115 Type type() const { return type_; } 116 117 // Returns true if this job still has a user callback. Some jobs 118 // do not have a user callback, because they were helper jobs 119 // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL). 120 // 121 // Otherwise jobs that correspond with user-initiated work will 122 // have a non-null callback up until the callback is run. 123 bool has_user_callback() const { return !callback_.is_null(); } 124 125 // This method is called when the job is inserted into a wait queue 126 // because no executors were ready to accept it. 127 virtual void WaitingForThread() {} 128 129 // This method is called just before the job is posted to the work thread. 130 virtual void FinishedWaitingForThread() {} 131 132 // This method is called on the worker thread to do the job's work. On 133 // completion, implementors are expected to call OnJobCompleted() on 134 // |origin_loop|. 135 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0; 136 137 protected: 138 void OnJobCompleted() { 139 // |executor_| will be NULL if the executor has already been deleted. 140 if (executor_) 141 executor_->OnJobCompleted(this); 142 } 143 144 void RunUserCallback(int result) { 145 DCHECK(has_user_callback()); 146 CompletionCallback callback = callback_; 147 // Reset the callback so has_user_callback() will now return false. 148 callback_.Reset(); 149 callback.Run(result); 150 } 151 152 friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>; 153 154 virtual ~Job() {} 155 156 private: 157 const Type type_; 158 CompletionCallback callback_; 159 Executor* executor_; 160 bool was_cancelled_; 161 }; 162 163 // MultiThreadedProxyResolver::SetPacScriptJob --------------------------------- 164 165 // Runs on the worker thread to call ProxyResolver::SetPacScript. 166 class MultiThreadedProxyResolver::SetPacScriptJob 167 : public MultiThreadedProxyResolver::Job { 168 public: 169 SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data, 170 const CompletionCallback& callback) 171 : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT : 172 TYPE_SET_PAC_SCRIPT_INTERNAL, 173 callback), 174 script_data_(script_data) { 175 } 176 177 // Runs on the worker thread. 178 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE { 179 ProxyResolver* resolver = executor()->resolver(); 180 int rv = resolver->SetPacScript(script_data_, CompletionCallback()); 181 182 DCHECK_NE(rv, ERR_IO_PENDING); 183 origin_loop->PostTask( 184 FROM_HERE, 185 base::Bind(&SetPacScriptJob::RequestComplete, this, rv)); 186 } 187 188 protected: 189 virtual ~SetPacScriptJob() {} 190 191 private: 192 // Runs the completion callback on the origin thread. 193 void RequestComplete(int result_code) { 194 // The task may have been cancelled after it was started. 195 if (!was_cancelled() && has_user_callback()) { 196 RunUserCallback(result_code); 197 } 198 OnJobCompleted(); 199 } 200 201 const scoped_refptr<ProxyResolverScriptData> script_data_; 202 }; 203 204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------ 205 206 class MultiThreadedProxyResolver::GetProxyForURLJob 207 : public MultiThreadedProxyResolver::Job { 208 public: 209 // |url| -- the URL of the query. 210 // |results| -- the structure to fill with proxy resolve results. 211 GetProxyForURLJob(const GURL& url, 212 ProxyInfo* results, 213 const CompletionCallback& callback, 214 const BoundNetLog& net_log) 215 : Job(TYPE_GET_PROXY_FOR_URL, callback), 216 results_(results), 217 net_log_(net_log), 218 url_(url), 219 was_waiting_for_thread_(false) { 220 DCHECK(!callback.is_null()); 221 } 222 223 BoundNetLog* net_log() { return &net_log_; } 224 225 virtual void WaitingForThread() OVERRIDE { 226 was_waiting_for_thread_ = true; 227 net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); 228 } 229 230 virtual void FinishedWaitingForThread() OVERRIDE { 231 DCHECK(executor()); 232 233 if (was_waiting_for_thread_) { 234 net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD); 235 } 236 237 net_log_.AddEvent( 238 NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD, 239 NetLog::IntegerCallback("thread_number", executor()->thread_number())); 240 } 241 242 // Runs on the worker thread. 243 virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE { 244 ProxyResolver* resolver = executor()->resolver(); 245 int rv = resolver->GetProxyForURL( 246 url_, &results_buf_, CompletionCallback(), NULL, net_log_); 247 DCHECK_NE(rv, ERR_IO_PENDING); 248 249 origin_loop->PostTask( 250 FROM_HERE, 251 base::Bind(&GetProxyForURLJob::QueryComplete, this, rv)); 252 } 253 254 protected: 255 virtual ~GetProxyForURLJob() {} 256 257 private: 258 // Runs the completion callback on the origin thread. 259 void QueryComplete(int result_code) { 260 // The Job may have been cancelled after it was started. 261 if (!was_cancelled()) { 262 if (result_code >= OK) { // Note: unit-tests use values > 0. 263 results_->Use(results_buf_); 264 } 265 RunUserCallback(result_code); 266 } 267 OnJobCompleted(); 268 } 269 270 // Must only be used on the "origin" thread. 271 ProxyInfo* results_; 272 273 // Can be used on either "origin" or worker thread. 274 BoundNetLog net_log_; 275 const GURL url_; 276 277 // Usable from within DoQuery on the worker thread. 278 ProxyInfo results_buf_; 279 280 bool was_waiting_for_thread_; 281 }; 282 283 // MultiThreadedProxyResolver::Executor ---------------------------------------- 284 285 MultiThreadedProxyResolver::Executor::Executor( 286 MultiThreadedProxyResolver* coordinator, 287 ProxyResolver* resolver, 288 int thread_number) 289 : coordinator_(coordinator), 290 thread_number_(thread_number), 291 resolver_(resolver) { 292 DCHECK(coordinator); 293 DCHECK(resolver); 294 // Start up the thread. 295 thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d", 296 thread_number))); 297 CHECK(thread_->Start()); 298 } 299 300 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) { 301 DCHECK(!outstanding_job_.get()); 302 outstanding_job_ = job; 303 304 // Run the job. Once it has completed (regardless of whether it was 305 // cancelled), it will invoke OnJobCompleted() on this thread. 306 job->set_executor(this); 307 job->FinishedWaitingForThread(); 308 thread_->message_loop()->PostTask( 309 FROM_HERE, 310 base::Bind(&Job::Run, job, base::MessageLoopProxy::current())); 311 } 312 313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) { 314 DCHECK_EQ(job, outstanding_job_.get()); 315 outstanding_job_ = NULL; 316 coordinator_->OnExecutorReady(this); 317 } 318 319 void MultiThreadedProxyResolver::Executor::Destroy() { 320 DCHECK(coordinator_); 321 322 { 323 // See http://crbug.com/69710. 324 base::ThreadRestrictions::ScopedAllowIO allow_io; 325 326 // Join the worker thread. 327 thread_.reset(); 328 } 329 330 // Cancel any outstanding job. 331 if (outstanding_job_.get()) { 332 outstanding_job_->Cancel(); 333 // Orphan the job (since this executor may be deleted soon). 334 outstanding_job_->set_executor(NULL); 335 } 336 337 // It is now safe to free the ProxyResolver, since all the tasks that 338 // were using it on the resolver thread have completed. 339 resolver_.reset(); 340 341 // Null some stuff as a precaution. 342 coordinator_ = NULL; 343 outstanding_job_ = NULL; 344 } 345 346 MultiThreadedProxyResolver::Executor::~Executor() { 347 // The important cleanup happens as part of Destroy(), which should always be 348 // called first. 349 DCHECK(!coordinator_) << "Destroy() was not called"; 350 DCHECK(!thread_.get()); 351 DCHECK(!resolver_.get()); 352 DCHECK(!outstanding_job_.get()); 353 } 354 355 // MultiThreadedProxyResolver -------------------------------------------------- 356 357 MultiThreadedProxyResolver::MultiThreadedProxyResolver( 358 ProxyResolverFactory* resolver_factory, 359 size_t max_num_threads) 360 : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()), 361 resolver_factory_(resolver_factory), 362 max_num_threads_(max_num_threads) { 363 DCHECK_GE(max_num_threads, 1u); 364 } 365 366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() { 367 // We will cancel all outstanding requests. 368 pending_jobs_.clear(); 369 ReleaseAllExecutors(); 370 } 371 372 int MultiThreadedProxyResolver::GetProxyForURL( 373 const GURL& url, ProxyInfo* results, const CompletionCallback& callback, 374 RequestHandle* request, const BoundNetLog& net_log) { 375 DCHECK(CalledOnValidThread()); 376 DCHECK(!callback.is_null()); 377 DCHECK(current_script_data_.get()) 378 << "Resolver is un-initialized. Must call SetPacScript() first!"; 379 380 scoped_refptr<GetProxyForURLJob> job( 381 new GetProxyForURLJob(url, results, callback, net_log)); 382 383 // Completion will be notified through |callback|, unless the caller cancels 384 // the request using |request|. 385 if (request) 386 *request = reinterpret_cast<RequestHandle>(job.get()); 387 388 // If there is an executor that is ready to run this request, submit it! 389 Executor* executor = FindIdleExecutor(); 390 if (executor) { 391 DCHECK_EQ(0u, pending_jobs_.size()); 392 executor->StartJob(job.get()); 393 return ERR_IO_PENDING; 394 } 395 396 // Otherwise queue this request. (We will schedule it to a thread once one 397 // becomes available). 398 job->WaitingForThread(); 399 pending_jobs_.push_back(job); 400 401 // If we haven't already reached the thread limit, provision a new thread to 402 // drain the requests more quickly. 403 if (executors_.size() < max_num_threads_) { 404 executor = AddNewExecutor(); 405 executor->StartJob( 406 new SetPacScriptJob(current_script_data_, CompletionCallback())); 407 } 408 409 return ERR_IO_PENDING; 410 } 411 412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) { 413 DCHECK(CalledOnValidThread()); 414 DCHECK(req); 415 416 Job* job = reinterpret_cast<Job*>(req); 417 DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type()); 418 419 if (job->executor()) { 420 // If the job was already submitted to the executor, just mark it 421 // as cancelled so the user callback isn't run on completion. 422 job->Cancel(); 423 } else { 424 // Otherwise the job is just sitting in a queue. 425 PendingJobsQueue::iterator it = 426 std::find(pending_jobs_.begin(), pending_jobs_.end(), job); 427 DCHECK(it != pending_jobs_.end()); 428 pending_jobs_.erase(it); 429 } 430 } 431 432 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const { 433 DCHECK(CalledOnValidThread()); 434 DCHECK(req); 435 return LOAD_STATE_RESOLVING_PROXY_FOR_URL; 436 } 437 438 void MultiThreadedProxyResolver::CancelSetPacScript() { 439 DCHECK(CalledOnValidThread()); 440 DCHECK_EQ(0u, pending_jobs_.size()); 441 DCHECK_EQ(1u, executors_.size()); 442 DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT, 443 executors_[0]->outstanding_job()->type()); 444 445 // Defensively clear some data which shouldn't be getting used 446 // anymore. 447 current_script_data_ = NULL; 448 449 ReleaseAllExecutors(); 450 } 451 452 int MultiThreadedProxyResolver::SetPacScript( 453 const scoped_refptr<ProxyResolverScriptData>& script_data, 454 const CompletionCallback&callback) { 455 DCHECK(CalledOnValidThread()); 456 DCHECK(!callback.is_null()); 457 458 // Save the script details, so we can provision new executors later. 459 current_script_data_ = script_data; 460 461 // The user should not have any outstanding requests when they call 462 // SetPacScript(). 463 CheckNoOutstandingUserRequests(); 464 465 // Destroy all of the current threads and their proxy resolvers. 466 ReleaseAllExecutors(); 467 468 // Provision a new executor, and run the SetPacScript request. On completion 469 // notification will be sent through |callback|. 470 Executor* executor = AddNewExecutor(); 471 executor->StartJob(new SetPacScriptJob(script_data, callback)); 472 return ERR_IO_PENDING; 473 } 474 475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const { 476 DCHECK(CalledOnValidThread()); 477 CHECK_EQ(0u, pending_jobs_.size()); 478 479 for (ExecutorList::const_iterator it = executors_.begin(); 480 it != executors_.end(); ++it) { 481 const Executor* executor = it->get(); 482 Job* job = executor->outstanding_job(); 483 // The "has_user_callback()" is to exclude jobs for which the callback 484 // has already been invoked, or was not user-initiated (as in the case of 485 // lazy thread provisions). User-initiated jobs may !has_user_callback() 486 // when the callback has already been run. (Since we only clear the 487 // outstanding job AFTER the callback has been invoked, it is possible 488 // for a new request to be started from within the callback). 489 CHECK(!job || job->was_cancelled() || !job->has_user_callback()); 490 } 491 } 492 493 void MultiThreadedProxyResolver::ReleaseAllExecutors() { 494 DCHECK(CalledOnValidThread()); 495 for (ExecutorList::iterator it = executors_.begin(); 496 it != executors_.end(); ++it) { 497 Executor* executor = it->get(); 498 executor->Destroy(); 499 } 500 executors_.clear(); 501 } 502 503 MultiThreadedProxyResolver::Executor* 504 MultiThreadedProxyResolver::FindIdleExecutor() { 505 DCHECK(CalledOnValidThread()); 506 for (ExecutorList::iterator it = executors_.begin(); 507 it != executors_.end(); ++it) { 508 Executor* executor = it->get(); 509 if (!executor->outstanding_job()) 510 return executor; 511 } 512 return NULL; 513 } 514 515 MultiThreadedProxyResolver::Executor* 516 MultiThreadedProxyResolver::AddNewExecutor() { 517 DCHECK(CalledOnValidThread()); 518 DCHECK_LT(executors_.size(), max_num_threads_); 519 // The "thread number" is used to give the thread a unique name. 520 int thread_number = executors_.size(); 521 ProxyResolver* resolver = resolver_factory_->CreateProxyResolver(); 522 Executor* executor = new Executor( 523 this, resolver, thread_number); 524 executors_.push_back(make_scoped_refptr(executor)); 525 return executor; 526 } 527 528 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) { 529 DCHECK(CalledOnValidThread()); 530 if (pending_jobs_.empty()) 531 return; 532 533 // Get the next job to process (FIFO). Transfer it from the pending queue 534 // to the executor. 535 scoped_refptr<Job> job = pending_jobs_.front(); 536 pending_jobs_.pop_front(); 537 executor->StartJob(job.get()); 538 } 539 540 } // namespace net 541