Home | History | Annotate | Download | only in proxy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/proxy/multi_threaded_proxy_resolver.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "base/strings/string_util.h"
     11 #include "base/strings/stringprintf.h"
     12 #include "base/threading/thread.h"
     13 #include "base/threading/thread_restrictions.h"
     14 #include "net/base/net_errors.h"
     15 #include "net/base/net_log.h"
     16 #include "net/proxy/proxy_info.h"
     17 
     18 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
     19 //               data when SetPacScript fails. That will reclaim memory when
     20 //               testing bogus scripts.
     21 
     22 namespace net {
     23 
     24 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
     25 // thread and a synchronous ProxyResolver (which will be operated on said
     26 // thread.)
     27 class MultiThreadedProxyResolver::Executor
     28     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
     29  public:
     30   // |coordinator| must remain valid throughout our lifetime. It is used to
     31   // signal when the executor is ready to receive work by calling
     32   // |coordinator->OnExecutorReady()|.
     33   // The constructor takes ownership of |resolver|.
     34   // |thread_number| is an identifier used when naming the worker thread.
     35   Executor(MultiThreadedProxyResolver* coordinator,
     36            ProxyResolver* resolver,
     37            int thread_number);
     38 
     39   // Submit a job to this executor.
     40   void StartJob(Job* job);
     41 
     42   // Callback for when a job has completed running on the executor's thread.
     43   void OnJobCompleted(Job* job);
     44 
     45   // Cleanup the executor. Cancels all outstanding work, and frees the thread
     46   // and resolver.
     47   void Destroy();
     48 
     49   // Returns the outstanding job, or NULL.
     50   Job* outstanding_job() const { return outstanding_job_.get(); }
     51 
     52   ProxyResolver* resolver() { return resolver_.get(); }
     53 
     54   int thread_number() const { return thread_number_; }
     55 
     56  private:
     57   friend class base::RefCountedThreadSafe<Executor>;
     58   ~Executor();
     59 
     60   MultiThreadedProxyResolver* coordinator_;
     61   const int thread_number_;
     62 
     63   // The currently active job for this executor (either a SetPacScript or
     64   // GetProxyForURL task).
     65   scoped_refptr<Job> outstanding_job_;
     66 
     67   // The synchronous resolver implementation.
     68   scoped_ptr<ProxyResolver> resolver_;
     69 
     70   // The thread where |resolver_| is run on.
     71   // Note that declaration ordering is important here. |thread_| needs to be
     72   // destroyed *before* |resolver_|, in case |resolver_| is currently
     73   // executing on |thread_|.
     74   scoped_ptr<base::Thread> thread_;
     75 };
     76 
     77 // MultiThreadedProxyResolver::Job ---------------------------------------------
     78 
     79 class MultiThreadedProxyResolver::Job
     80     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
     81  public:
     82   // Identifies the subclass of Job (only being used for debugging purposes).
     83   enum Type {
     84     TYPE_GET_PROXY_FOR_URL,
     85     TYPE_SET_PAC_SCRIPT,
     86     TYPE_SET_PAC_SCRIPT_INTERNAL,
     87   };
     88 
     89   Job(Type type, const CompletionCallback& callback)
     90       : type_(type),
     91         callback_(callback),
     92         executor_(NULL),
     93         was_cancelled_(false) {
     94   }
     95 
     96   void set_executor(Executor* executor) {
     97     executor_ = executor;
     98   }
     99 
    100   // The "executor" is the job runner that is scheduling this job. If
    101   // this job has not been submitted to an executor yet, this will be
    102   // NULL (and we know it hasn't started yet).
    103   Executor* executor() {
    104     return executor_;
    105   }
    106 
    107   // Mark the job as having been cancelled.
    108   void Cancel() {
    109     was_cancelled_ = true;
    110   }
    111 
    112   // Returns true if Cancel() has been called.
    113   bool was_cancelled() const { return was_cancelled_; }
    114 
    115   Type type() const { return type_; }
    116 
    117   // Returns true if this job still has a user callback. Some jobs
    118   // do not have a user callback, because they were helper jobs
    119   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
    120   //
    121   // Otherwise jobs that correspond with user-initiated work will
    122   // have a non-null callback up until the callback is run.
    123   bool has_user_callback() const { return !callback_.is_null(); }
    124 
    125   // This method is called when the job is inserted into a wait queue
    126   // because no executors were ready to accept it.
    127   virtual void WaitingForThread() {}
    128 
    129   // This method is called just before the job is posted to the work thread.
    130   virtual void FinishedWaitingForThread() {}
    131 
    132   // This method is called on the worker thread to do the job's work. On
    133   // completion, implementors are expected to call OnJobCompleted() on
    134   // |origin_loop|.
    135   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
    136 
    137  protected:
    138   void OnJobCompleted() {
    139     // |executor_| will be NULL if the executor has already been deleted.
    140     if (executor_)
    141       executor_->OnJobCompleted(this);
    142   }
    143 
    144   void RunUserCallback(int result) {
    145     DCHECK(has_user_callback());
    146     CompletionCallback callback = callback_;
    147     // Reset the callback so has_user_callback() will now return false.
    148     callback_.Reset();
    149     callback.Run(result);
    150   }
    151 
    152   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
    153 
    154   virtual ~Job() {}
    155 
    156  private:
    157   const Type type_;
    158   CompletionCallback callback_;
    159   Executor* executor_;
    160   bool was_cancelled_;
    161 };
    162 
    163 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
    164 
    165 // Runs on the worker thread to call ProxyResolver::SetPacScript.
    166 class MultiThreadedProxyResolver::SetPacScriptJob
    167     : public MultiThreadedProxyResolver::Job {
    168  public:
    169   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
    170                   const CompletionCallback& callback)
    171     : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
    172                                 TYPE_SET_PAC_SCRIPT_INTERNAL,
    173           callback),
    174       script_data_(script_data) {
    175   }
    176 
    177   // Runs on the worker thread.
    178   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
    179     ProxyResolver* resolver = executor()->resolver();
    180     int rv = resolver->SetPacScript(script_data_, CompletionCallback());
    181 
    182     DCHECK_NE(rv, ERR_IO_PENDING);
    183     origin_loop->PostTask(
    184         FROM_HERE,
    185         base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
    186   }
    187 
    188  protected:
    189   virtual ~SetPacScriptJob() {}
    190 
    191  private:
    192   // Runs the completion callback on the origin thread.
    193   void RequestComplete(int result_code) {
    194     // The task may have been cancelled after it was started.
    195     if (!was_cancelled() && has_user_callback()) {
    196       RunUserCallback(result_code);
    197     }
    198     OnJobCompleted();
    199   }
    200 
    201   const scoped_refptr<ProxyResolverScriptData> script_data_;
    202 };
    203 
    204 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
    205 
    206 class MultiThreadedProxyResolver::GetProxyForURLJob
    207     : public MultiThreadedProxyResolver::Job {
    208  public:
    209   // |url|         -- the URL of the query.
    210   // |results|     -- the structure to fill with proxy resolve results.
    211   GetProxyForURLJob(const GURL& url,
    212                     ProxyInfo* results,
    213                     const CompletionCallback& callback,
    214                     const BoundNetLog& net_log)
    215       : Job(TYPE_GET_PROXY_FOR_URL, callback),
    216         results_(results),
    217         net_log_(net_log),
    218         url_(url),
    219         was_waiting_for_thread_(false) {
    220     DCHECK(!callback.is_null());
    221   }
    222 
    223   BoundNetLog* net_log() { return &net_log_; }
    224 
    225   virtual void WaitingForThread() OVERRIDE {
    226     was_waiting_for_thread_ = true;
    227     net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
    228   }
    229 
    230   virtual void FinishedWaitingForThread() OVERRIDE {
    231     DCHECK(executor());
    232 
    233     if (was_waiting_for_thread_) {
    234       net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
    235     }
    236 
    237     net_log_.AddEvent(
    238         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
    239         NetLog::IntegerCallback("thread_number", executor()->thread_number()));
    240   }
    241 
    242   // Runs on the worker thread.
    243   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
    244     ProxyResolver* resolver = executor()->resolver();
    245     int rv = resolver->GetProxyForURL(
    246         url_, &results_buf_, CompletionCallback(), NULL, net_log_);
    247     DCHECK_NE(rv, ERR_IO_PENDING);
    248 
    249     origin_loop->PostTask(
    250         FROM_HERE,
    251         base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
    252   }
    253 
    254  protected:
    255   virtual ~GetProxyForURLJob() {}
    256 
    257  private:
    258   // Runs the completion callback on the origin thread.
    259   void QueryComplete(int result_code) {
    260     // The Job may have been cancelled after it was started.
    261     if (!was_cancelled()) {
    262       if (result_code >= OK) {  // Note: unit-tests use values > 0.
    263         results_->Use(results_buf_);
    264       }
    265       RunUserCallback(result_code);
    266     }
    267     OnJobCompleted();
    268   }
    269 
    270   // Must only be used on the "origin" thread.
    271   ProxyInfo* results_;
    272 
    273   // Can be used on either "origin" or worker thread.
    274   BoundNetLog net_log_;
    275   const GURL url_;
    276 
    277   // Usable from within DoQuery on the worker thread.
    278   ProxyInfo results_buf_;
    279 
    280   bool was_waiting_for_thread_;
    281 };
    282 
    283 // MultiThreadedProxyResolver::Executor ----------------------------------------
    284 
    285 MultiThreadedProxyResolver::Executor::Executor(
    286     MultiThreadedProxyResolver* coordinator,
    287     ProxyResolver* resolver,
    288     int thread_number)
    289     : coordinator_(coordinator),
    290       thread_number_(thread_number),
    291       resolver_(resolver) {
    292   DCHECK(coordinator);
    293   DCHECK(resolver);
    294   // Start up the thread.
    295   thread_.reset(new base::Thread(base::StringPrintf("PAC thread #%d",
    296                                                     thread_number)));
    297   CHECK(thread_->Start());
    298 }
    299 
    300 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
    301   DCHECK(!outstanding_job_.get());
    302   outstanding_job_ = job;
    303 
    304   // Run the job. Once it has completed (regardless of whether it was
    305   // cancelled), it will invoke OnJobCompleted() on this thread.
    306   job->set_executor(this);
    307   job->FinishedWaitingForThread();
    308   thread_->message_loop()->PostTask(
    309       FROM_HERE,
    310       base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
    311 }
    312 
    313 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
    314   DCHECK_EQ(job, outstanding_job_.get());
    315   outstanding_job_ = NULL;
    316   coordinator_->OnExecutorReady(this);
    317 }
    318 
    319 void MultiThreadedProxyResolver::Executor::Destroy() {
    320   DCHECK(coordinator_);
    321 
    322   {
    323     // See http://crbug.com/69710.
    324     base::ThreadRestrictions::ScopedAllowIO allow_io;
    325 
    326     // Join the worker thread.
    327     thread_.reset();
    328   }
    329 
    330   // Cancel any outstanding job.
    331   if (outstanding_job_.get()) {
    332     outstanding_job_->Cancel();
    333     // Orphan the job (since this executor may be deleted soon).
    334     outstanding_job_->set_executor(NULL);
    335   }
    336 
    337   // It is now safe to free the ProxyResolver, since all the tasks that
    338   // were using it on the resolver thread have completed.
    339   resolver_.reset();
    340 
    341   // Null some stuff as a precaution.
    342   coordinator_ = NULL;
    343   outstanding_job_ = NULL;
    344 }
    345 
    346 MultiThreadedProxyResolver::Executor::~Executor() {
    347   // The important cleanup happens as part of Destroy(), which should always be
    348   // called first.
    349   DCHECK(!coordinator_) << "Destroy() was not called";
    350   DCHECK(!thread_.get());
    351   DCHECK(!resolver_.get());
    352   DCHECK(!outstanding_job_.get());
    353 }
    354 
    355 // MultiThreadedProxyResolver --------------------------------------------------
    356 
    357 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
    358     ProxyResolverFactory* resolver_factory,
    359     size_t max_num_threads)
    360     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
    361       resolver_factory_(resolver_factory),
    362       max_num_threads_(max_num_threads) {
    363   DCHECK_GE(max_num_threads, 1u);
    364 }
    365 
    366 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
    367   // We will cancel all outstanding requests.
    368   pending_jobs_.clear();
    369   ReleaseAllExecutors();
    370 }
    371 
    372 int MultiThreadedProxyResolver::GetProxyForURL(
    373     const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
    374     RequestHandle* request, const BoundNetLog& net_log) {
    375   DCHECK(CalledOnValidThread());
    376   DCHECK(!callback.is_null());
    377   DCHECK(current_script_data_.get())
    378       << "Resolver is un-initialized. Must call SetPacScript() first!";
    379 
    380   scoped_refptr<GetProxyForURLJob> job(
    381       new GetProxyForURLJob(url, results, callback, net_log));
    382 
    383   // Completion will be notified through |callback|, unless the caller cancels
    384   // the request using |request|.
    385   if (request)
    386     *request = reinterpret_cast<RequestHandle>(job.get());
    387 
    388   // If there is an executor that is ready to run this request, submit it!
    389   Executor* executor = FindIdleExecutor();
    390   if (executor) {
    391     DCHECK_EQ(0u, pending_jobs_.size());
    392     executor->StartJob(job.get());
    393     return ERR_IO_PENDING;
    394   }
    395 
    396   // Otherwise queue this request. (We will schedule it to a thread once one
    397   // becomes available).
    398   job->WaitingForThread();
    399   pending_jobs_.push_back(job);
    400 
    401   // If we haven't already reached the thread limit, provision a new thread to
    402   // drain the requests more quickly.
    403   if (executors_.size() < max_num_threads_) {
    404     executor = AddNewExecutor();
    405     executor->StartJob(
    406         new SetPacScriptJob(current_script_data_, CompletionCallback()));
    407   }
    408 
    409   return ERR_IO_PENDING;
    410 }
    411 
    412 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
    413   DCHECK(CalledOnValidThread());
    414   DCHECK(req);
    415 
    416   Job* job = reinterpret_cast<Job*>(req);
    417   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
    418 
    419   if (job->executor()) {
    420     // If the job was already submitted to the executor, just mark it
    421     // as cancelled so the user callback isn't run on completion.
    422     job->Cancel();
    423   } else {
    424     // Otherwise the job is just sitting in a queue.
    425     PendingJobsQueue::iterator it =
    426         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
    427     DCHECK(it != pending_jobs_.end());
    428     pending_jobs_.erase(it);
    429   }
    430 }
    431 
    432 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
    433   DCHECK(CalledOnValidThread());
    434   DCHECK(req);
    435   return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
    436 }
    437 
    438 void MultiThreadedProxyResolver::CancelSetPacScript() {
    439   DCHECK(CalledOnValidThread());
    440   DCHECK_EQ(0u, pending_jobs_.size());
    441   DCHECK_EQ(1u, executors_.size());
    442   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
    443             executors_[0]->outstanding_job()->type());
    444 
    445   // Defensively clear some data which shouldn't be getting used
    446   // anymore.
    447   current_script_data_ = NULL;
    448 
    449   ReleaseAllExecutors();
    450 }
    451 
    452 int MultiThreadedProxyResolver::SetPacScript(
    453     const scoped_refptr<ProxyResolverScriptData>& script_data,
    454     const CompletionCallback&callback) {
    455   DCHECK(CalledOnValidThread());
    456   DCHECK(!callback.is_null());
    457 
    458   // Save the script details, so we can provision new executors later.
    459   current_script_data_ = script_data;
    460 
    461   // The user should not have any outstanding requests when they call
    462   // SetPacScript().
    463   CheckNoOutstandingUserRequests();
    464 
    465   // Destroy all of the current threads and their proxy resolvers.
    466   ReleaseAllExecutors();
    467 
    468   // Provision a new executor, and run the SetPacScript request. On completion
    469   // notification will be sent through |callback|.
    470   Executor* executor = AddNewExecutor();
    471   executor->StartJob(new SetPacScriptJob(script_data, callback));
    472   return ERR_IO_PENDING;
    473 }
    474 
    475 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
    476   DCHECK(CalledOnValidThread());
    477   CHECK_EQ(0u, pending_jobs_.size());
    478 
    479   for (ExecutorList::const_iterator it = executors_.begin();
    480        it != executors_.end(); ++it) {
    481     const Executor* executor = it->get();
    482     Job* job = executor->outstanding_job();
    483     // The "has_user_callback()" is to exclude jobs for which the callback
    484     // has already been invoked, or was not user-initiated (as in the case of
    485     // lazy thread provisions). User-initiated jobs may !has_user_callback()
    486     // when the callback has already been run. (Since we only clear the
    487     // outstanding job AFTER the callback has been invoked, it is possible
    488     // for a new request to be started from within the callback).
    489     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
    490   }
    491 }
    492 
    493 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
    494   DCHECK(CalledOnValidThread());
    495   for (ExecutorList::iterator it = executors_.begin();
    496        it != executors_.end(); ++it) {
    497     Executor* executor = it->get();
    498     executor->Destroy();
    499   }
    500   executors_.clear();
    501 }
    502 
    503 MultiThreadedProxyResolver::Executor*
    504 MultiThreadedProxyResolver::FindIdleExecutor() {
    505   DCHECK(CalledOnValidThread());
    506   for (ExecutorList::iterator it = executors_.begin();
    507        it != executors_.end(); ++it) {
    508     Executor* executor = it->get();
    509     if (!executor->outstanding_job())
    510       return executor;
    511   }
    512   return NULL;
    513 }
    514 
    515 MultiThreadedProxyResolver::Executor*
    516 MultiThreadedProxyResolver::AddNewExecutor() {
    517   DCHECK(CalledOnValidThread());
    518   DCHECK_LT(executors_.size(), max_num_threads_);
    519   // The "thread number" is used to give the thread a unique name.
    520   int thread_number = executors_.size();
    521   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
    522   Executor* executor = new Executor(
    523       this, resolver, thread_number);
    524   executors_.push_back(make_scoped_refptr(executor));
    525   return executor;
    526 }
    527 
    528 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
    529   DCHECK(CalledOnValidThread());
    530   if (pending_jobs_.empty())
    531     return;
    532 
    533   // Get the next job to process (FIFO). Transfer it from the pending queue
    534   // to the executor.
    535   scoped_refptr<Job> job = pending_jobs_.front();
    536   pending_jobs_.pop_front();
    537   executor->StartJob(job.get());
    538 }
    539 
    540 }  // namespace net
    541