Home | History | Annotate | Download | only in proxy
      1 // Copyright (c) 2010 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/proxy/multi_threaded_proxy_resolver.h"
      6 
      7 #include "base/message_loop.h"
      8 #include "base/string_util.h"
      9 #include "base/stringprintf.h"
     10 #include "base/threading/thread.h"
     11 #include "base/threading/thread_restrictions.h"
     12 #include "net/base/net_errors.h"
     13 #include "net/base/net_log.h"
     14 #include "net/proxy/proxy_info.h"
     15 
     16 // TODO(eroman): Have the MultiThreadedProxyResolver clear its PAC script
     17 //               data when SetPacScript fails. That will reclaim memory when
     18 //               testing bogus scripts.
     19 
     20 namespace net {
     21 
     22 namespace {
     23 
     24 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
     25  public:
     26   explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
     27   void PurgeMemory() { resolver_->PurgeMemory(); }
     28  private:
     29   friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
     30   ~PurgeMemoryTask() {}
     31   ProxyResolver* resolver_;
     32 };
     33 
     34 }  // namespace
     35 
     36 // An "executor" is a job-runner for PAC requests. It encapsulates a worker
     37 // thread and a synchronous ProxyResolver (which will be operated on said
     38 // thread.)
     39 class MultiThreadedProxyResolver::Executor
     40     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Executor > {
     41  public:
     42   // |coordinator| must remain valid throughout our lifetime. It is used to
     43   // signal when the executor is ready to receive work by calling
     44   // |coordinator->OnExecutorReady()|.
     45   // The constructor takes ownership of |resolver|.
     46   // |thread_number| is an identifier used when naming the worker thread.
     47   Executor(MultiThreadedProxyResolver* coordinator,
     48            ProxyResolver* resolver,
     49            int thread_number);
     50 
     51   // Submit a job to this executor.
     52   void StartJob(Job* job);
     53 
     54   // Callback for when a job has completed running on the executor's thread.
     55   void OnJobCompleted(Job* job);
     56 
     57   // Cleanup the executor. Cancels all outstanding work, and frees the thread
     58   // and resolver.
     59   void Destroy();
     60 
     61   void PurgeMemory();
     62 
     63   // Returns the outstanding job, or NULL.
     64   Job* outstanding_job() const { return outstanding_job_.get(); }
     65 
     66   ProxyResolver* resolver() { return resolver_.get(); }
     67 
     68   int thread_number() const { return thread_number_; }
     69 
     70  private:
     71   friend class base::RefCountedThreadSafe<Executor>;
     72   ~Executor();
     73 
     74   MultiThreadedProxyResolver* coordinator_;
     75   const int thread_number_;
     76 
     77   // The currently active job for this executor (either a SetPacScript or
     78   // GetProxyForURL task).
     79   scoped_refptr<Job> outstanding_job_;
     80 
     81   // The synchronous resolver implementation.
     82   scoped_ptr<ProxyResolver> resolver_;
     83 
     84   // The thread where |resolver_| is run on.
     85   // Note that declaration ordering is important here. |thread_| needs to be
     86   // destroyed *before* |resolver_|, in case |resolver_| is currently
     87   // executing on |thread_|.
     88   scoped_ptr<base::Thread> thread_;
     89 };
     90 
     91 // MultiThreadedProxyResolver::Job ---------------------------------------------
     92 
     93 class MultiThreadedProxyResolver::Job
     94     : public base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job> {
     95  public:
     96   // Identifies the subclass of Job (only being used for debugging purposes).
     97   enum Type {
     98     TYPE_GET_PROXY_FOR_URL,
     99     TYPE_SET_PAC_SCRIPT,
    100     TYPE_SET_PAC_SCRIPT_INTERNAL,
    101   };
    102 
    103   Job(Type type, CompletionCallback* user_callback)
    104       : type_(type),
    105         user_callback_(user_callback),
    106         executor_(NULL),
    107         was_cancelled_(false) {
    108   }
    109 
    110   void set_executor(Executor* executor) {
    111     executor_ = executor;
    112   }
    113 
    114   // The "executor" is the job runner that is scheduling this job. If
    115   // this job has not been submitted to an executor yet, this will be
    116   // NULL (and we know it hasn't started yet).
    117   Executor* executor() {
    118     return executor_;
    119   }
    120 
    121   // Mark the job as having been cancelled.
    122   void Cancel() {
    123     was_cancelled_ = true;
    124   }
    125 
    126   // Returns true if Cancel() has been called.
    127   bool was_cancelled() const { return was_cancelled_; }
    128 
    129   Type type() const { return type_; }
    130 
    131   // Returns true if this job still has a user callback. Some jobs
    132   // do not have a user callback, because they were helper jobs
    133   // scheduled internally (for example TYPE_SET_PAC_SCRIPT_INTERNAL).
    134   //
    135   // Otherwise jobs that correspond with user-initiated work will
    136   // have a non-NULL callback up until the callback is run.
    137   bool has_user_callback() const { return user_callback_ != NULL; }
    138 
    139   // This method is called when the job is inserted into a wait queue
    140   // because no executors were ready to accept it.
    141   virtual void WaitingForThread() {}
    142 
    143   // This method is called just before the job is posted to the work thread.
    144   virtual void FinishedWaitingForThread() {}
    145 
    146   // This method is called on the worker thread to do the job's work. On
    147   // completion, implementors are expected to call OnJobCompleted() on
    148   // |origin_loop|.
    149   virtual void Run(MessageLoop* origin_loop) = 0;
    150 
    151  protected:
    152   void OnJobCompleted() {
    153     // |executor_| will be NULL if the executor has already been deleted.
    154     if (executor_)
    155       executor_->OnJobCompleted(this);
    156   }
    157 
    158   void RunUserCallback(int result) {
    159     DCHECK(has_user_callback());
    160     CompletionCallback* callback = user_callback_;
    161     // Null the callback so has_user_callback() will now return false.
    162     user_callback_ = NULL;
    163     callback->Run(result);
    164   }
    165 
    166   friend class base::RefCountedThreadSafe<MultiThreadedProxyResolver::Job>;
    167 
    168   virtual ~Job() {}
    169 
    170  private:
    171   const Type type_;
    172   CompletionCallback* user_callback_;
    173   Executor* executor_;
    174   bool was_cancelled_;
    175 };
    176 
    177 // MultiThreadedProxyResolver::SetPacScriptJob ---------------------------------
    178 
    179 // Runs on the worker thread to call ProxyResolver::SetPacScript.
    180 class MultiThreadedProxyResolver::SetPacScriptJob
    181     : public MultiThreadedProxyResolver::Job {
    182  public:
    183   SetPacScriptJob(const scoped_refptr<ProxyResolverScriptData>& script_data,
    184                   CompletionCallback* callback)
    185     : Job(callback ? TYPE_SET_PAC_SCRIPT : TYPE_SET_PAC_SCRIPT_INTERNAL,
    186           callback),
    187       script_data_(script_data) {
    188   }
    189 
    190   // Runs on the worker thread.
    191   virtual void Run(MessageLoop* origin_loop) {
    192     ProxyResolver* resolver = executor()->resolver();
    193     int rv = resolver->SetPacScript(script_data_, NULL);
    194 
    195     DCHECK_NE(rv, ERR_IO_PENDING);
    196     origin_loop->PostTask(
    197         FROM_HERE,
    198         NewRunnableMethod(this, &SetPacScriptJob::RequestComplete, rv));
    199   }
    200 
    201  private:
    202   // Runs the completion callback on the origin thread.
    203   void RequestComplete(int result_code) {
    204     // The task may have been cancelled after it was started.
    205     if (!was_cancelled() && has_user_callback()) {
    206       RunUserCallback(result_code);
    207     }
    208     OnJobCompleted();
    209   }
    210 
    211   const scoped_refptr<ProxyResolverScriptData> script_data_;
    212 };
    213 
    214 // MultiThreadedProxyResolver::GetProxyForURLJob ------------------------------
    215 
    216 class MultiThreadedProxyResolver::GetProxyForURLJob
    217     : public MultiThreadedProxyResolver::Job {
    218  public:
    219   // |url|         -- the URL of the query.
    220   // |results|     -- the structure to fill with proxy resolve results.
    221   GetProxyForURLJob(const GURL& url,
    222                     ProxyInfo* results,
    223                     CompletionCallback* callback,
    224                     const BoundNetLog& net_log)
    225       : Job(TYPE_GET_PROXY_FOR_URL, callback),
    226         results_(results),
    227         net_log_(net_log),
    228         url_(url),
    229         was_waiting_for_thread_(false) {
    230     DCHECK(callback);
    231   }
    232 
    233   BoundNetLog* net_log() { return &net_log_; }
    234 
    235   virtual void WaitingForThread() {
    236     was_waiting_for_thread_ = true;
    237     net_log_.BeginEvent(
    238         NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
    239   }
    240 
    241   virtual void FinishedWaitingForThread() {
    242     DCHECK(executor());
    243 
    244     if (was_waiting_for_thread_) {
    245       net_log_.EndEvent(
    246           NetLog::TYPE_WAITING_FOR_PROXY_RESOLVER_THREAD, NULL);
    247     }
    248 
    249     net_log_.AddEvent(
    250         NetLog::TYPE_SUBMITTED_TO_RESOLVER_THREAD,
    251         make_scoped_refptr(new NetLogIntegerParameter(
    252             "thread_number", executor()->thread_number())));
    253   }
    254 
    255   // Runs on the worker thread.
    256   virtual void Run(MessageLoop* origin_loop) {
    257     ProxyResolver* resolver = executor()->resolver();
    258     int rv = resolver->GetProxyForURL(
    259         url_, &results_buf_, NULL, NULL, net_log_);
    260     DCHECK_NE(rv, ERR_IO_PENDING);
    261 
    262     origin_loop->PostTask(
    263         FROM_HERE,
    264         NewRunnableMethod(this, &GetProxyForURLJob::QueryComplete, rv));
    265   }
    266 
    267  private:
    268   // Runs the completion callback on the origin thread.
    269   void QueryComplete(int result_code) {
    270     // The Job may have been cancelled after it was started.
    271     if (!was_cancelled()) {
    272       if (result_code >= OK) {  // Note: unit-tests use values > 0.
    273         results_->Use(results_buf_);
    274       }
    275       RunUserCallback(result_code);
    276     }
    277     OnJobCompleted();
    278   }
    279 
    280   // Must only be used on the "origin" thread.
    281   ProxyInfo* results_;
    282 
    283   // Can be used on either "origin" or worker thread.
    284   BoundNetLog net_log_;
    285   const GURL url_;
    286 
    287   // Usable from within DoQuery on the worker thread.
    288   ProxyInfo results_buf_;
    289 
    290   bool was_waiting_for_thread_;
    291 };
    292 
    293 // MultiThreadedProxyResolver::Executor ----------------------------------------
    294 
    295 MultiThreadedProxyResolver::Executor::Executor(
    296     MultiThreadedProxyResolver* coordinator,
    297     ProxyResolver* resolver,
    298     int thread_number)
    299     : coordinator_(coordinator),
    300       thread_number_(thread_number),
    301       resolver_(resolver) {
    302   DCHECK(coordinator);
    303   DCHECK(resolver);
    304   // Start up the thread.
    305   // Note that it is safe to pass a temporary C-String to Thread(), as it will
    306   // make a copy.
    307   std::string thread_name =
    308       base::StringPrintf("PAC thread #%d", thread_number);
    309   thread_.reset(new base::Thread(thread_name.c_str()));
    310   CHECK(thread_->Start());
    311 }
    312 
    313 void MultiThreadedProxyResolver::Executor::StartJob(Job* job) {
    314   DCHECK(!outstanding_job_);
    315   outstanding_job_ = job;
    316 
    317   // Run the job. Once it has completed (regardless of whether it was
    318   // cancelled), it will invoke OnJobCompleted() on this thread.
    319   job->set_executor(this);
    320   job->FinishedWaitingForThread();
    321   thread_->message_loop()->PostTask(
    322       FROM_HERE,
    323       NewRunnableMethod(job, &Job::Run, MessageLoop::current()));
    324 }
    325 
    326 void MultiThreadedProxyResolver::Executor::OnJobCompleted(Job* job) {
    327   DCHECK_EQ(job, outstanding_job_.get());
    328   outstanding_job_ = NULL;
    329   coordinator_->OnExecutorReady(this);
    330 }
    331 
    332 void MultiThreadedProxyResolver::Executor::Destroy() {
    333   DCHECK(coordinator_);
    334 
    335   // Give the resolver an opportunity to shutdown from THIS THREAD before
    336   // joining on the resolver thread. This allows certain implementations
    337   // to avoid deadlocks.
    338   resolver_->Shutdown();
    339 
    340   {
    341     // See http://crbug.com/69710.
    342     base::ThreadRestrictions::ScopedAllowIO allow_io;
    343 
    344     // Join the worker thread.
    345     thread_.reset();
    346   }
    347 
    348   // Cancel any outstanding job.
    349   if (outstanding_job_) {
    350     outstanding_job_->Cancel();
    351     // Orphan the job (since this executor may be deleted soon).
    352     outstanding_job_->set_executor(NULL);
    353   }
    354 
    355   // It is now safe to free the ProxyResolver, since all the tasks that
    356   // were using it on the resolver thread have completed.
    357   resolver_.reset();
    358 
    359   // Null some stuff as a precaution.
    360   coordinator_ = NULL;
    361   outstanding_job_ = NULL;
    362 }
    363 
    364 void MultiThreadedProxyResolver::Executor::PurgeMemory() {
    365   scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
    366   thread_->message_loop()->PostTask(
    367       FROM_HERE,
    368       NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
    369 }
    370 
    371 MultiThreadedProxyResolver::Executor::~Executor() {
    372   // The important cleanup happens as part of Destroy(), which should always be
    373   // called first.
    374   DCHECK(!coordinator_) << "Destroy() was not called";
    375   DCHECK(!thread_.get());
    376   DCHECK(!resolver_.get());
    377   DCHECK(!outstanding_job_);
    378 }
    379 
    380 // MultiThreadedProxyResolver --------------------------------------------------
    381 
    382 MultiThreadedProxyResolver::MultiThreadedProxyResolver(
    383     ProxyResolverFactory* resolver_factory,
    384     size_t max_num_threads)
    385     : ProxyResolver(resolver_factory->resolvers_expect_pac_bytes()),
    386       resolver_factory_(resolver_factory),
    387       max_num_threads_(max_num_threads) {
    388   DCHECK_GE(max_num_threads, 1u);
    389 }
    390 
    391 MultiThreadedProxyResolver::~MultiThreadedProxyResolver() {
    392   // We will cancel all outstanding requests.
    393   pending_jobs_.clear();
    394   ReleaseAllExecutors();
    395 }
    396 
    397 int MultiThreadedProxyResolver::GetProxyForURL(const GURL& url,
    398                                                ProxyInfo* results,
    399                                                CompletionCallback* callback,
    400                                                RequestHandle* request,
    401                                                const BoundNetLog& net_log) {
    402   DCHECK(CalledOnValidThread());
    403   DCHECK(callback);
    404   DCHECK(current_script_data_.get())
    405       << "Resolver is un-initialized. Must call SetPacScript() first!";
    406 
    407   scoped_refptr<GetProxyForURLJob> job(
    408       new GetProxyForURLJob(url, results, callback, net_log));
    409 
    410   // Completion will be notified through |callback|, unless the caller cancels
    411   // the request using |request|.
    412   if (request)
    413     *request = reinterpret_cast<RequestHandle>(job.get());
    414 
    415   // If there is an executor that is ready to run this request, submit it!
    416   Executor* executor = FindIdleExecutor();
    417   if (executor) {
    418     DCHECK_EQ(0u, pending_jobs_.size());
    419     executor->StartJob(job);
    420     return ERR_IO_PENDING;
    421   }
    422 
    423   // Otherwise queue this request. (We will schedule it to a thread once one
    424   // becomes available).
    425   job->WaitingForThread();
    426   pending_jobs_.push_back(job);
    427 
    428   // If we haven't already reached the thread limit, provision a new thread to
    429   // drain the requests more quickly.
    430   if (executors_.size() < max_num_threads_) {
    431     executor = AddNewExecutor();
    432     executor->StartJob(
    433         new SetPacScriptJob(current_script_data_, NULL));
    434   }
    435 
    436   return ERR_IO_PENDING;
    437 }
    438 
    439 void MultiThreadedProxyResolver::CancelRequest(RequestHandle req) {
    440   DCHECK(CalledOnValidThread());
    441   DCHECK(req);
    442 
    443   Job* job = reinterpret_cast<Job*>(req);
    444   DCHECK_EQ(Job::TYPE_GET_PROXY_FOR_URL, job->type());
    445 
    446   if (job->executor()) {
    447     // If the job was already submitted to the executor, just mark it
    448     // as cancelled so the user callback isn't run on completion.
    449     job->Cancel();
    450   } else {
    451     // Otherwise the job is just sitting in a queue.
    452     PendingJobsQueue::iterator it =
    453         std::find(pending_jobs_.begin(), pending_jobs_.end(), job);
    454     DCHECK(it != pending_jobs_.end());
    455     pending_jobs_.erase(it);
    456   }
    457 }
    458 
    459 void MultiThreadedProxyResolver::CancelSetPacScript() {
    460   DCHECK(CalledOnValidThread());
    461   DCHECK_EQ(0u, pending_jobs_.size());
    462   DCHECK_EQ(1u, executors_.size());
    463   DCHECK_EQ(Job::TYPE_SET_PAC_SCRIPT,
    464             executors_[0]->outstanding_job()->type());
    465 
    466   // Defensively clear some data which shouldn't be getting used
    467   // anymore.
    468   current_script_data_ = NULL;
    469 
    470   ReleaseAllExecutors();
    471 }
    472 
    473 void MultiThreadedProxyResolver::PurgeMemory() {
    474   DCHECK(CalledOnValidThread());
    475   for (ExecutorList::iterator it = executors_.begin();
    476        it != executors_.end(); ++it) {
    477     Executor* executor = *it;
    478     executor->PurgeMemory();
    479   }
    480 }
    481 
    482 int MultiThreadedProxyResolver::SetPacScript(
    483     const scoped_refptr<ProxyResolverScriptData>& script_data,
    484     CompletionCallback* callback) {
    485   DCHECK(CalledOnValidThread());
    486   DCHECK(callback);
    487 
    488   // Save the script details, so we can provision new executors later.
    489   current_script_data_ = script_data;
    490 
    491   // The user should not have any outstanding requests when they call
    492   // SetPacScript().
    493   CheckNoOutstandingUserRequests();
    494 
    495   // Destroy all of the current threads and their proxy resolvers.
    496   ReleaseAllExecutors();
    497 
    498   // Provision a new executor, and run the SetPacScript request. On completion
    499   // notification will be sent through |callback|.
    500   Executor* executor = AddNewExecutor();
    501   executor->StartJob(new SetPacScriptJob(script_data, callback));
    502   return ERR_IO_PENDING;
    503 }
    504 
    505 void MultiThreadedProxyResolver::CheckNoOutstandingUserRequests() const {
    506   DCHECK(CalledOnValidThread());
    507   CHECK_EQ(0u, pending_jobs_.size());
    508 
    509   for (ExecutorList::const_iterator it = executors_.begin();
    510        it != executors_.end(); ++it) {
    511     const Executor* executor = *it;
    512     Job* job = executor->outstanding_job();
    513     // The "has_user_callback()" is to exclude jobs for which the callback
    514     // has already been invoked, or was not user-initiated (as in the case of
    515     // lazy thread provisions). User-initiated jobs may !has_user_callback()
    516     // when the callback has already been run. (Since we only clear the
    517     // outstanding job AFTER the callback has been invoked, it is possible
    518     // for a new request to be started from within the callback).
    519     CHECK(!job || job->was_cancelled() || !job->has_user_callback());
    520   }
    521 }
    522 
    523 void MultiThreadedProxyResolver::ReleaseAllExecutors() {
    524   DCHECK(CalledOnValidThread());
    525   for (ExecutorList::iterator it = executors_.begin();
    526        it != executors_.end(); ++it) {
    527     Executor* executor = *it;
    528     executor->Destroy();
    529   }
    530   executors_.clear();
    531 }
    532 
    533 MultiThreadedProxyResolver::Executor*
    534 MultiThreadedProxyResolver::FindIdleExecutor() {
    535   DCHECK(CalledOnValidThread());
    536   for (ExecutorList::iterator it = executors_.begin();
    537        it != executors_.end(); ++it) {
    538     Executor* executor = *it;
    539     if (!executor->outstanding_job())
    540       return executor;
    541   }
    542   return NULL;
    543 }
    544 
    545 MultiThreadedProxyResolver::Executor*
    546 MultiThreadedProxyResolver::AddNewExecutor() {
    547   DCHECK(CalledOnValidThread());
    548   DCHECK_LT(executors_.size(), max_num_threads_);
    549   // The "thread number" is used to give the thread a unique name.
    550   int thread_number = executors_.size();
    551   ProxyResolver* resolver = resolver_factory_->CreateProxyResolver();
    552   Executor* executor = new Executor(
    553       this, resolver, thread_number);
    554   executors_.push_back(make_scoped_refptr(executor));
    555   return executor;
    556 }
    557 
    558 void MultiThreadedProxyResolver::OnExecutorReady(Executor* executor) {
    559   DCHECK(CalledOnValidThread());
    560   if (pending_jobs_.empty())
    561     return;
    562 
    563   // Get the next job to process (FIFO). Transfer it from the pending queue
    564   // to the executor.
    565   scoped_refptr<Job> job = pending_jobs_.front();
    566   pending_jobs_.pop_front();
    567   executor->StartJob(job);
    568 }
    569 
    570 }  // namespace net
    571