Home | History | Annotate | Download | only in proxy
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "net/proxy/single_threaded_proxy_resolver.h"
      6 
      7 #include "base/thread.h"
      8 #include "net/base/load_log.h"
      9 #include "net/base/net_errors.h"
     10 #include "net/proxy/proxy_info.h"
     11 
     12 namespace net {
     13 
     14 namespace {
     15 
     16 class PurgeMemoryTask : public base::RefCountedThreadSafe<PurgeMemoryTask> {
     17  public:
     18   explicit PurgeMemoryTask(ProxyResolver* resolver) : resolver_(resolver) {}
     19   void PurgeMemory() { resolver_->PurgeMemory(); }
     20  private:
     21   friend class base::RefCountedThreadSafe<PurgeMemoryTask>;
     22   ~PurgeMemoryTask() {}
     23   ProxyResolver* resolver_;
     24 };
     25 
     26 }
     27 
     28 // SingleThreadedProxyResolver::SetPacScriptTask ------------------------------
     29 
     30 // Runs on the worker thread to call ProxyResolver::SetPacScript.
     31 class SingleThreadedProxyResolver::SetPacScriptTask
     32     : public base::RefCountedThreadSafe<
     33         SingleThreadedProxyResolver::SetPacScriptTask> {
     34  public:
     35   SetPacScriptTask(SingleThreadedProxyResolver* coordinator,
     36                    const GURL& pac_url,
     37                    const std::string& pac_bytes,
     38                    CompletionCallback* callback)
     39     : coordinator_(coordinator),
     40       callback_(callback),
     41       pac_bytes_(pac_bytes),
     42       pac_url_(pac_url),
     43       origin_loop_(MessageLoop::current()) {
     44     DCHECK(callback);
     45   }
     46 
     47   // Start the SetPacScript request on the worker thread.
     48   void Start() {
     49     coordinator_->thread()->message_loop()->PostTask(
     50         FROM_HERE, NewRunnableMethod(this, &SetPacScriptTask::DoRequest,
     51         coordinator_->resolver_.get()));
     52   }
     53 
     54   void Cancel() {
     55     // Clear these to inform RequestComplete that it should not try to
     56     // access them.
     57     coordinator_ = NULL;
     58     callback_ = NULL;
     59   }
     60 
     61   // Returns true if Cancel() has been called.
     62   bool was_cancelled() const { return callback_ == NULL; }
     63 
     64  private:
     65   friend class base::RefCountedThreadSafe<
     66         SingleThreadedProxyResolver::SetPacScriptTask>;
     67 
     68   ~SetPacScriptTask() {}
     69 
     70   // Runs on the worker thread.
     71   void DoRequest(ProxyResolver* resolver) {
     72     int rv = resolver->expects_pac_bytes() ?
     73         resolver->SetPacScriptByData(pac_bytes_, NULL) :
     74         resolver->SetPacScriptByUrl(pac_url_, NULL);
     75 
     76     DCHECK_NE(rv, ERR_IO_PENDING);
     77     origin_loop_->PostTask(FROM_HERE,
     78         NewRunnableMethod(this, &SetPacScriptTask::RequestComplete, rv));
     79   }
     80 
     81   // Runs the completion callback on the origin thread.
     82   void RequestComplete(int result_code) {
     83     // The task may have been cancelled after it was started.
     84     if (!was_cancelled()) {
     85       CompletionCallback* callback = callback_;
     86       coordinator_->RemoveOutstandingSetPacScriptTask(this);
     87       callback->Run(result_code);
     88     }
     89   }
     90 
     91   // Must only be used on the "origin" thread.
     92   SingleThreadedProxyResolver* coordinator_;
     93   CompletionCallback* callback_;
     94   std::string pac_bytes_;
     95   GURL pac_url_;
     96 
     97   // Usable from within DoQuery on the worker thread.
     98   MessageLoop* origin_loop_;
     99 };
    100 
    101 // SingleThreadedProxyResolver::Job -------------------------------------------
    102 
    103 class SingleThreadedProxyResolver::Job
    104     : public base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job> {
    105  public:
    106   // |coordinator| -- the SingleThreadedProxyResolver that owns this job.
    107   // |url|         -- the URL of the query.
    108   // |results|     -- the structure to fill with proxy resolve results.
    109   Job(SingleThreadedProxyResolver* coordinator,
    110       const GURL& url,
    111       ProxyInfo* results,
    112       CompletionCallback* callback,
    113       LoadLog* load_log)
    114     : coordinator_(coordinator),
    115       callback_(callback),
    116       results_(results),
    117       load_log_(load_log),
    118       url_(url),
    119       is_started_(false),
    120       origin_loop_(MessageLoop::current()) {
    121     DCHECK(callback);
    122   }
    123 
    124   // Start the resolve proxy request on the worker thread.
    125   void Start() {
    126     is_started_ = true;
    127 
    128     size_t load_log_bound = load_log_ ? load_log_->max_num_entries() : 0;
    129 
    130     coordinator_->thread()->message_loop()->PostTask(
    131         FROM_HERE, NewRunnableMethod(this, &Job::DoQuery,
    132         coordinator_->resolver_.get(),
    133         load_log_bound));
    134   }
    135 
    136   bool is_started() const { return is_started_; }
    137 
    138   void Cancel() {
    139     // Clear these to inform QueryComplete that it should not try to
    140     // access them.
    141     coordinator_ = NULL;
    142     callback_ = NULL;
    143     results_ = NULL;
    144   }
    145 
    146   // Returns true if Cancel() has been called.
    147   bool was_cancelled() const { return callback_ == NULL; }
    148 
    149  private:
    150   friend class base::RefCountedThreadSafe<SingleThreadedProxyResolver::Job>;
    151 
    152   ~Job() {}
    153 
    154   // Runs on the worker thread.
    155   void DoQuery(ProxyResolver* resolver, size_t load_log_bound) {
    156     LoadLog* worker_log = NULL;
    157     if (load_log_bound > 0) {
    158       worker_log = new LoadLog(load_log_bound);
    159       worker_log->AddRef();  // Balanced in QueryComplete.
    160     }
    161 
    162     int rv = resolver->GetProxyForURL(url_, &results_buf_, NULL, NULL,
    163                                       worker_log);
    164     DCHECK_NE(rv, ERR_IO_PENDING);
    165 
    166     origin_loop_->PostTask(FROM_HERE,
    167         NewRunnableMethod(this, &Job::QueryComplete, rv, worker_log));
    168   }
    169 
    170   // Runs the completion callback on the origin thread.
    171   void QueryComplete(int result_code, LoadLog* worker_log) {
    172     // Merge the load log that was generated on the worker thread, into the
    173     // main log.
    174     if (worker_log) {
    175       if (load_log_)
    176         load_log_->Append(worker_log);
    177       worker_log->Release();
    178     }
    179 
    180     // The Job may have been cancelled after it was started.
    181     if (!was_cancelled()) {
    182       if (result_code >= OK) {  // Note: unit-tests use values > 0.
    183         results_->Use(results_buf_);
    184       }
    185       callback_->Run(result_code);
    186 
    187       // We check for cancellation once again, in case the callback deleted
    188       // the owning ProxyService (whose destructor will in turn cancel us).
    189       if (!was_cancelled())
    190         coordinator_->RemoveFrontOfJobsQueueAndStartNext(this);
    191     }
    192   }
    193 
    194   // Must only be used on the "origin" thread.
    195   SingleThreadedProxyResolver* coordinator_;
    196   CompletionCallback* callback_;
    197   ProxyInfo* results_;
    198   scoped_refptr<LoadLog> load_log_;
    199   GURL url_;
    200   bool is_started_;
    201 
    202   // Usable from within DoQuery on the worker thread.
    203   ProxyInfo results_buf_;
    204   MessageLoop* origin_loop_;
    205 };
    206 
    207 // SingleThreadedProxyResolver ------------------------------------------------
    208 
    209 SingleThreadedProxyResolver::SingleThreadedProxyResolver(
    210     ProxyResolver* resolver)
    211     : ProxyResolver(resolver->expects_pac_bytes()),
    212       resolver_(resolver) {
    213 }
    214 
    215 SingleThreadedProxyResolver::~SingleThreadedProxyResolver() {
    216   // Cancel the inprogress job (if any), and free the rest.
    217   for (PendingJobsQueue::iterator it = pending_jobs_.begin();
    218        it != pending_jobs_.end();
    219        ++it) {
    220     (*it)->Cancel();
    221   }
    222 
    223   if (outstanding_set_pac_script_task_)
    224     outstanding_set_pac_script_task_->Cancel();
    225 
    226   // Note that |thread_| is destroyed before |resolver_|. This is important
    227   // since |resolver_| could be running on |thread_|.
    228 }
    229 
    230 int SingleThreadedProxyResolver::GetProxyForURL(const GURL& url,
    231                                                 ProxyInfo* results,
    232                                                 CompletionCallback* callback,
    233                                                 RequestHandle* request,
    234                                                 LoadLog* load_log) {
    235   DCHECK(callback);
    236 
    237   scoped_refptr<Job> job = new Job(this, url, results, callback, load_log);
    238   pending_jobs_.push_back(job);
    239   ProcessPendingJobs();  // Jobs can never finish synchronously.
    240 
    241   // Completion will be notified through |callback|, unless the caller cancels
    242   // the request using |request|.
    243   if (request)
    244     *request = reinterpret_cast<RequestHandle>(job.get());
    245 
    246   return ERR_IO_PENDING;
    247 }
    248 
    249 // There are three states of the request we need to handle:
    250 // (1) Not started (just sitting in the queue).
    251 // (2) Executing Job::DoQuery in the worker thread.
    252 // (3) Waiting for Job::QueryComplete to be run on the origin thread.
    253 void SingleThreadedProxyResolver::CancelRequest(RequestHandle req) {
    254   DCHECK(req);
    255 
    256   Job* job = reinterpret_cast<Job*>(req);
    257 
    258   bool is_active_job = job->is_started() && !pending_jobs_.empty() &&
    259       pending_jobs_.front().get() == job;
    260 
    261   job->Cancel();
    262 
    263   if (is_active_job) {
    264     RemoveFrontOfJobsQueueAndStartNext(job);
    265     return;
    266   }
    267 
    268   // Otherwise just delete the job from the queue.
    269   PendingJobsQueue::iterator it = std::find(
    270       pending_jobs_.begin(), pending_jobs_.end(), job);
    271   DCHECK(it != pending_jobs_.end());
    272   pending_jobs_.erase(it);
    273 }
    274 
    275 void SingleThreadedProxyResolver::CancelSetPacScript() {
    276   DCHECK(outstanding_set_pac_script_task_);
    277   outstanding_set_pac_script_task_->Cancel();
    278   outstanding_set_pac_script_task_ = NULL;
    279 }
    280 
    281 void SingleThreadedProxyResolver::PurgeMemory() {
    282   if (thread_.get()) {
    283     scoped_refptr<PurgeMemoryTask> helper(new PurgeMemoryTask(resolver_.get()));
    284     thread_->message_loop()->PostTask(FROM_HERE,
    285         NewRunnableMethod(helper.get(), &PurgeMemoryTask::PurgeMemory));
    286   }
    287 }
    288 
    289 int SingleThreadedProxyResolver::SetPacScript(
    290     const GURL& pac_url,
    291     const std::string& pac_bytes,
    292     CompletionCallback* callback) {
    293   EnsureThreadStarted();
    294   DCHECK(!outstanding_set_pac_script_task_);
    295 
    296   SetPacScriptTask* task = new SetPacScriptTask(
    297       this, pac_url, pac_bytes, callback);
    298   outstanding_set_pac_script_task_ = task;
    299   task->Start();
    300   return ERR_IO_PENDING;
    301 }
    302 
    303 void SingleThreadedProxyResolver::EnsureThreadStarted() {
    304   if (!thread_.get()) {
    305     thread_.reset(new base::Thread("pac-thread"));
    306     thread_->Start();
    307   }
    308 }
    309 
    310 void SingleThreadedProxyResolver::ProcessPendingJobs() {
    311   if (pending_jobs_.empty())
    312     return;
    313 
    314   // Get the next job to process (FIFO).
    315   Job* job = pending_jobs_.front().get();
    316   if (job->is_started())
    317     return;
    318 
    319   EnsureThreadStarted();
    320   job->Start();
    321 }
    322 
    323 void SingleThreadedProxyResolver::RemoveFrontOfJobsQueueAndStartNext(
    324     Job* expected_job) {
    325   DCHECK_EQ(expected_job, pending_jobs_.front().get());
    326   pending_jobs_.pop_front();
    327 
    328   // Start next work item.
    329   ProcessPendingJobs();
    330 }
    331 
    332 void SingleThreadedProxyResolver::RemoveOutstandingSetPacScriptTask(
    333     SetPacScriptTask* task) {
    334   DCHECK_EQ(outstanding_set_pac_script_task_.get(), task);
    335   outstanding_set_pac_script_task_ = NULL;
    336 }
    337 
    338 }  // namespace net
    339