Home | History | Annotate | Download | only in proxy
      1 // Copyright (c) 2012 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/proxy/multi_threaded_proxy_resolver.h"
      6 
      7 #include "base/bind.h"
      8 #include "base/bind_helpers.h"
      9 #include "base/message_loop/message_loop_proxy.h"
     10 #include "base/metrics/histogram.h"
     11 #include "base/strings/string_util.h"
     12 #include "base/strings/stringprintf.h"
     13 #include "base/threading/thread.h"
     14 #include "base/threading/thread_restrictions.h"
     15 #include "net/base/net_errors.h"
     16 #include "net/base/net_log.h"
     17 #include "net/proxy/proxy_info.h"
     18 
     19 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
     20 //               data when SetPacScript fails. That will reclaim memory when
     21 //               testing bogus scripts.
     22 
     23 namespace net {
     24 
     25 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
     26 // thread and a synchronous ProxyResolver (which will be operated on said
     27 // thread.)
     28 class MultiThreadedProxyResolver::Executor
     29     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
     30  public:
     31   // |coordinator| must remain valid throughout our lifetime. It is used to
     32   // signal when the executor is ready to receive work by calling
     33   // |coordinator->OnExecutorReady()|.
     34   // The constructor takes ownership of |resolver|.
     35   // |thread_number| is an identifier used when naming the worker thread.
     36   Executor(MultiThreadedProxyResolver* coordinator,
     37            ProxyResolver* resolver,
     38            int thread_number);
     39 
     40   // Submit a job to this executor.
     41   void StartJob(Job* job);
     42 
     43   // Callback for when a job has completed running on the executor's thread.
     44   void OnJobCompleted(Job* job);
     45 
     46   // Cleanup the executor. Cancels all outstanding work, and frees the thread
     47   // and resolver.
     48   void Destroy();
     49 
     50   void PurgeMemory();
     51 
     52   // Returns the outstanding job, or NULL.
     53   Job* outstanding_job() const { return outstanding_job_.get(); }
     54 
     55   ProxyResolver* resolver() { return resolver_.get(); }
     56 
     57   int thread_number() const { return thread_number_; }
     58 
     59  private:
     60   friend class base::RefCountedThreadSafe<Executor>;
     61   ~Executor();
     62 
     63   MultiThreadedProxyResolver* coordinator_;
     64   const int thread_number_;
     65 
     66   // The currently active job for this executor (either a SetPacScript or
     67   // GetProxyForURL task).
     68   scoped_refptr<Job> outstanding_job_;
     69 
     70   // The synchronous resolver implementation.
     71   scoped_ptr<ProxyResolver> resolver_;
     72 
     73   // The thread where |resolver_| is run on.
     74   // Note that declaration ordering is important here. |thread_| needs to be
     75   // destroyed *before* |resolver_|, in case |resolver_| is currently
     76   // executing on |thread_|.
     77   scoped_ptr<base::Thread> thread_;
     78 };
     79 
     80 // MultiThreadedProxyResolver::Job ---------------------------------------------
     81 
     82 class MultiThreadedProxyResolver::Job
     83     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
     84  public:
     85   // Identifies the subclass of Job (only being used for debugging purposes).
     86   enum Type {
     87     TYPE_GET_PROXY_FOR_URL,
     88     TYPE_SET_PAC_SCRIPT,
     89     TYPE_SET_PAC_SCRIPT_INTERNAL,
     90   };
     91 
     92   Job(Type type, const CompletionCallback& callback)
     93       : type_(type),
     94         callback_(callback),
     95         executor_(NULL),
     96         was_cancelled_(false) {
     97   }
     98 
     99   void set_executor(Executor* executor) {
    100     executor_ = executor;
    101   }
    102 
    103   // The "executor" is the job runner that is scheduling this job. If
    104   // this job has not been submitted to an executor yet, this will be
    105   // NULL (and we know it hasn't started yet).
    106   Executor* executor() {
    107     return executor_;
    108   }
    109 
    110   // Mark the job as having been cancelled.
    111   void Cancel() {
    112     was_cancelled_ = true;
    113   }
    114 
    115   // Returns true if Cancel() has been called.
    116   bool was_cancelled() const { return was_cancelled_; }
    117 
    118   Type type() const { return type_; }
    119 
    120   // Returns true if this job still has a user callback. Some jobs
    121   // do not have a user callback, because they were helper jobs
    122   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
    123   //
    124   // Otherwise jobs that correspond with user-initiated work will
    125   // have a non-null callback up until the callback is run.
    126   bool has_user_callback() const { return !callback_.is_null(); }
    127 
    128   // This method is called when the job is inserted into a wait queue
    129   // because no executors were ready to accept it.
    130   virtual void WaitingForThread() {}
    131 
    132   // This method is called just before the job is posted to the work thread.
    133   virtual void FinishedWaitingForThread() {}
    134 
    135   // This method is called on the worker thread to do the job's work. On
    136   // completion, implementors are expected to call OnJobCompleted() on
    137   // |origin_loop|.
    138   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) = 0;
    139 
    140  protected:
    141   void OnJobCompleted() {
    142     // |executor_| will be NULL if the executor has already been deleted.
    143     if (executor_)
    144       executor_->OnJobCompleted(this);
    145   }
    146 
    147   void RunUserCallback(int result) {
    148     DCHECK(has_user_callback());
    149     CompletionCallback callback = callback_;
    150     // Reset the callback so has_user_callback() will now return false.
    151     callback_.Reset();
    152     callback.Run(result);
    153   }
    154 
    155   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
    156 
    157   virtual ~Job() {}
    158 
    159  private:
    160   const Type type_;
    161   CompletionCallback callback_;
    162   Executor* executor_;
    163   bool was_cancelled_;
    164 };
    165 
    166 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
    167 
    168 // Runs on the worker thread to call ProxyResolver::SetPacScript.
    169 class MultiThreadedProxyResolver::SetPacScriptJob
    170     : public MultiThreadedProxyResolver::Job {
    171  public:
    172   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
    173                   const CompletionCallback& callback)
    174     : Job(!callback.is_null() ? TYPE_SET_PAC_SCRIPT :
    175                                 TYPE_SET_PAC_SCRIPT_INTERNAL,
    176           callback),
    177       script_data_(script_data) {
    178   }
    179 
    180   // Runs on the worker thread.
    181   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
    182     ProxyResolver* resolver = executor()->resolver();
    183     int rv = resolver->SetPacScript(script_data_, CompletionCallback());
    184 
    185     DCHECK_NE(rv, ERR_IO_PENDING);
    186     origin_loop->PostTask(
    187         FROM_HERE,
    188         base::Bind(&SetPacScriptJob::RequestComplete, this, rv));
    189   }
    190 
    191  protected:
    192   virtual ~SetPacScriptJob() {}
    193 
    194  private:
    195   // Runs the completion callback on the origin thread.
    196   void RequestComplete(int result_code) {
    197     // The task may have been cancelled after it was started.
    198     if (!was_cancelled() && has_user_callback()) {
    199       RunUserCallback(result_code);
    200     }
    201     OnJobCompleted();
    202   }
    203 
    204   const scoped_refptr<ProxyResolverScriptData> script_data_;
    205 };
    206 
    207 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
    208 
    209 class MultiThreadedProxyResolver::GetProxyForURLJob
    210     : public MultiThreadedProxyResolver::Job {
    211  public:
    212   // |url|         -- the URL of the query.
    213   // |results|     -- the structure to fill with proxy resolve results.
    214   GetProxyForURLJob(const GURL& url,
    215                     ProxyInfo* results,
    216                     const CompletionCallback& callback,
    217                     const BoundNetLog& net_log)
    218       : Job(TYPE_GET_PROXY_FOR_URL, callback),
    219         results_(results),
    220         net_log_(net_log),
    221         url_(url),
    222         was_waiting_for_thread_(false) {
    223     DCHECK(!callback.is_null());
    224     start_time_ = base::TimeTicks::Now();
    225   }
    226 
    227   BoundNetLog* net_log() { return &net_log_; }
    228 
    229   virtual void WaitingForThread() OVERRIDE {
    230     was_waiting_for_thread_ = true;
    231     net_log_.BeginEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
    232   }
    233 
    234   virtual void FinishedWaitingForThread() OVERRIDE {
    235     DCHECK(executor());
    236 
    237     submitted_to_thread_time_ = base::TimeTicks::Now();
    238 
    239     if (was_waiting_for_thread_) {
    240       net_log_.EndEvent(NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD);
    241     }
    242 
    243     net_log_.AddEvent(
    244         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
    245         NetLog::IntegerCallback("thread_number", executor()->thread_number()));
    246   }
    247 
    248   // Runs on the worker thread.
    249   virtual void Run(scoped_refptr<base::MessageLoopProxy> origin_loop) OVERRIDE {
    250     ProxyResolver* resolver = executor()->resolver();
    251     int rv = resolver->GetProxyForURL(
    252         url_, &results_buf_, CompletionCallback(), NULL, net_log_);
    253     DCHECK_NE(rv, ERR_IO_PENDING);
    254 
    255     origin_loop->PostTask(
    256         FROM_HERE,
    257         base::Bind(&GetProxyForURLJob::QueryComplete, this, rv));
    258   }
    259 
    260  protected:
    261   virtual ~GetProxyForURLJob() {}
    262 
    263  private:
    264   // Runs the completion callback on the origin thread.
    265   void QueryComplete(int result_code) {
    266     // The Job may have been cancelled after it was started.
    267     if (!was_cancelled()) {
    268       RecordPerformanceMetrics();
    269       if (result_code >= OK) {  // Note: unit-tests use values > 0.
    270         results_->Use(results_buf_);
    271       }
    272       RunUserCallback(result_code);
    273     }
    274     OnJobCompleted();
    275   }
    276 
    277   void RecordPerformanceMetrics() {
    278     DCHECK(!was_cancelled());
    279 
    280     base::TimeTicks now = base::TimeTicks::Now();
    281 
    282     // Log the total time the request took to complete.
    283     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Time",
    284                                now - start_time_);
    285 
    286     // Log the time the request was stalled waiting for a thread to free up.
    287     UMA_HISTOGRAM_MEDIUM_TIMES("Net.MTPR_GetProxyForUrl_Thread_Wait_Time",
    288                                submitted_to_thread_time_ - start_time_);
    289   }
    290 
    291   // Must only be used on the "origin" thread.
    292   ProxyInfo* results_;
    293 
    294   // Can be used on either "origin" or worker thread.
    295   BoundNetLog net_log_;
    296   const GURL url_;
    297 
    298   // Usable from within DoQuery on the worker thread.
    299   ProxyInfo results_buf_;
    300 
    301   base::TimeTicks start_time_;
    302   base::TimeTicks submitted_to_thread_time_;
    303 
    304   bool was_waiting_for_thread_;
    305 };
    306 
    307 // MultiThreadedProxyResolver::Executor ----------------------------------------
    308 
    309 MultiThreadedProxyResolver::Executor::Executor(
    310     MultiThreadedProxyResolver* coordinator,
    311     ProxyResolver* resolver,
    312     int thread_number)
    313     : coordinator_(coordinator),
    314       thread_number_(thread_number),
    315       resolver_(resolver) {
    316   DCHECK(coordinator);
    317   DCHECK(resolver);
    318   // Start up the thread.
    319   // Note that it is safe to pass a temporary C-String to Thread(), as it will
    320   // make a copy.
    321   std::string thread_name =
    322       base::StringPrintf("PAC thread #%d", thread_number);
    323   thread_.reset(new base::Thread(thread_name.c_str()));
    324   CHECK(thread_->Start());
    325 }
    326 
    327 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
    328   DCHECK(!outstanding_job_.get());
    329   outstanding_job_ = job;
    330 
    331   // Run the job. Once it has completed (regardless of whether it was
    332   // cancelled), it will invoke OnJobCompleted() on this thread.
    333   job->set_executor(this);
    334   job->FinishedWaitingForThread();
    335   thread_->message_loop()->PostTask(
    336       FROM_HERE,
    337       base::Bind(&Job::Run, job, base::MessageLoopProxy::current()));
    338 }
    339 
    340 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
    341   DCHECK_EQ(job, outstanding_job_.get());
    342   outstanding_job_ = NULL;
    343   coordinator_->OnExecutorReady(this);
    344 }
    345 
    346 void MultiThreadedProxyResolver::Executor::Destroy() {
    347   DCHECK(coordinator_);
    348 
    349   {
    350     // See http://crbug.com/69710.
    351     base::ThreadRestrictions::ScopedAllowIO allow_io;
    352 
    353     // Join the worker thread.
    354     thread_.reset();
    355   }
    356 
    357   // Cancel any outstanding job.
    358   if (outstanding_job_.get()) {
    359     outstanding_job_->Cancel();
    360     // Orphan the job (since this executor may be deleted soon).
    361     outstanding_job_->set_executor(NULL);
    362   }
    363 
    364   // It is now safe to free the ProxyResolver, since all the tasks that
    365   // were using it on the resolver thread have completed.
    366   resolver_.reset();
    367 
    368   // Null some stuff as a precaution.
    369   coordinator_ = NULL;
    370   outstanding_job_ = NULL;
    371 }
    372 
    373 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
    374   thread_->message_loop()->PostTask(
    375       FROM_HERE,
    376       base::Bind(&ProxyResolver::PurgeMemory,
    377                  base::Unretained(resolver_.get())));
    378 }
    379 
    380 MultiThreadedProxyResolver::Executor::~Executor() {
    381   // The important cleanup happens as part of Destroy(), which should always be
    382   // called first.
    383   DCHECK(!coordinator_) << "Destroy() was not called";
    384   DCHECK(!thread_.get());
    385   DCHECK(!resolver_.get());
    386   DCHECK(!outstanding_job_.get());
    387 }
    388 
    389 // MultiThreadedProxyResolver --------------------------------------------------
    390 
    391 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
    392     ProxyResolverFactory* resolver_factory,
    393     size_t max_num_threads)
    394     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
    395       resolver_factory_(resolver_factory),
    396       max_num_threads_(max_num_threads) {
    397   DCHECK_GE(max_num_threads, 1u);
    398 }
    399 
    400 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
    401   // We will cancel all outstanding requests.
    402   pending_jobs_.clear();
    403   ReleaseAllExecutors();
    404 }
    405 
    406 int MultiThreadedProxyResolver::GetProxyForURL(
    407     const GURL& url, ProxyInfo* results, const CompletionCallback& callback,
    408     RequestHandle* request, const BoundNetLog& net_log) {
    409   DCHECK(CalledOnValidThread());
    410   DCHECK(!callback.is_null());
    411   DCHECK(current_script_data_.get())
    412       << "Resolver is un-initialized. Must call SetPacScript() first!";
    413 
    414   scoped_refptr<GetProxyForURLJob> job(
    415       new GetProxyForURLJob(url, results, callback, net_log));
    416 
    417   // Completion will be notified through |callback|, unless the caller cancels
    418   // the request using |request|.
    419   if (request)
    420     *request = reinterpret_cast<RequestHandle>(job.get());
    421 
    422   // If there is an executor that is ready to run this request, submit it!
    423   Executor* executor = FindIdleExecutor();
    424   if (executor) {
    425     DCHECK_EQ(0u, pending_jobs_.size());
    426     executor->StartJob(job.get());
    427     return ERR_IO_PENDING;
    428   }
    429 
    430   // Otherwise queue this request. (We will schedule it to a thread once one
    431   // becomes available).
    432   job->WaitingForThread();
    433   pending_jobs_.push_back(job);
    434 
    435   // If we haven't already reached the thread limit, provision a new thread to
    436   // drain the requests more quickly.
    437   if (executors_.size() < max_num_threads_) {
    438     executor = AddNewExecutor();
    439     executor->StartJob(
    440         new SetPacScriptJob(current_script_data_, CompletionCallback()));
    441   }
    442 
    443   return ERR_IO_PENDING;
    444 }
    445 
    446 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
    447   DCHECK(CalledOnValidThread());
    448   DCHECK(req);
    449 
    450   Job* job = reinterpret_cast<Job*>(req);
    451   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
    452 
    453   if (job->executor()) {
    454     // If the job was already submitted to the executor, just mark it
    455     // as cancelled so the user callback isn't run on completion.
    456     job->Cancel();
    457   } else {
    458     // Otherwise the job is just sitting in a queue.
    459     PendingJobsQueue::iterator it =
    460         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
    461     DCHECK(it != pending_jobs_.end());
    462     pending_jobs_.erase(it);
    463   }
    464 }
    465 
    466 LoadState MultiThreadedProxyResolver::GetLoadState(RequestHandle req) const {
    467   DCHECK(CalledOnValidThread());
    468   DCHECK(req);
    469   return LOAD_STATE_RESOLVING_PROXY_FOR_URL;
    470 }
    471 
    472 void MultiThreadedProxyResolver::CancelSetPacScript() {
    473   DCHECK(CalledOnValidThread());
    474   DCHECK_EQ(0u, pending_jobs_.size());
    475   DCHECK_EQ(1u, executors_.size());
    476   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
    477             executors_[0]->outstanding_job()->type());
    478 
    479   // Defensively clear some data which shouldn't be getting used
    480   // anymore.
    481   current_script_data_ = NULL;
    482 
    483   ReleaseAllExecutors();
    484 }
    485 
    486 void MultiThreadedProxyResolver::PurgeMemory() {
    487   DCHECK(CalledOnValidThread());
    488   for (ExecutorList::iterator it = executors_.begin();
    489        it != executors_.end(); ++it) {
    490     Executor* executor = it->get();
    491     executor->PurgeMemory();
    492   }
    493 }
    494 
    495 int MultiThreadedProxyResolver::SetPacScript(
    496     const scoped_refptr<ProxyResolverScriptData>& script_data,
    497     const CompletionCallback&callback) {
    498   DCHECK(CalledOnValidThread());
    499   DCHECK(!callback.is_null());
    500 
    501   // Save the script details, so we can provision new executors later.
    502   current_script_data_ = script_data;
    503 
    504   // The user should not have any outstanding requests when they call
    505   // SetPacScript().
    506   CheckNoOutstandingUserRequests();
    507 
    508   // Destroy all of the current threads and their proxy resolvers.
    509   ReleaseAllExecutors();
    510 
    511   // Provision a new executor, and run the SetPacScript request. On completion
    512   // notification will be sent through |callback|.
    513   Executor* executor = AddNewExecutor();
    514   executor->StartJob(new SetPacScriptJob(script_data, callback));
    515   return ERR_IO_PENDING;
    516 }
    517 
    518 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
    519   DCHECK(CalledOnValidThread());
    520   CHECK_EQ(0u, pending_jobs_.size());
    521 
    522   for (ExecutorList::const_iterator it = executors_.begin();
    523        it != executors_.end(); ++it) {
    524     const Executor* executor = it->get();
    525     Job* job = executor->outstanding_job();
    526     // The "has_user_callback()" is to exclude jobs for which the callback
    527     // has already been invoked, or was not user-initiated (as in the case of
    528     // lazy thread provisions). User-initiated jobs may !has_user_callback()
    529     // when the callback has already been run. (Since we only clear the
    530     // outstanding job AFTER the callback has been invoked, it is possible
    531     // for a new request to be started from within the callback).
    532     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
    533   }
    534 }
    535 
    536 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
    537   DCHECK(CalledOnValidThread());
    538   for (ExecutorList::iterator it = executors_.begin();
    539        it != executors_.end(); ++it) {
    540     Executor* executor = it->get();
    541     executor->Destroy();
    542   }
    543   executors_.clear();
    544 }
    545 
    546 MultiThreadedProxyResolver::Executor*
    547 MultiThreadedProxyResolver::FindIdleExecutor() {
    548   DCHECK(CalledOnValidThread());
    549   for (ExecutorList::iterator it = executors_.begin();
    550        it != executors_.end(); ++it) {
    551     Executor* executor = it->get();
    552     if (!executor->outstanding_job())
    553       return executor;
    554   }
    555   return NULL;
    556 }
    557 
    558 MultiThreadedProxyResolver::Executor*
    559 MultiThreadedProxyResolver::AddNewExecutor() {
    560   DCHECK(CalledOnValidThread());
    561   DCHECK_LT(executors_.size(), max_num_threads_);
    562   // The "thread number" is used to give the thread a unique name.
    563   int thread_number = executors_.size();
    564   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
    565   Executor* executor = new Executor(
    566       this, resolver, thread_number);
    567   executors_.push_back(make_scoped_refptr(executor));
    568   return executor;
    569 }
    570 
    571 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
    572   DCHECK(CalledOnValidThread());
    573   if (pending_jobs_.empty())
    574     return;
    575 
    576   // Get the next job to process (FIFO). Transfer it from the pending queue
    577   // to the executor.
    578   scoped_refptr<Job> job = pending_jobs_.front();
    579   pending_jobs_.pop_front();
    580   executor->StartJob(job.get());
    581 }
    582 
    583 }  // namespace net
    584