Home | History | Annotate | Download | only in okhttp
      1 /*
      2  * Copyright (C) 2013 Square, Inc.
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package com.squareup.okhttp;
     17 
     18 import com.squareup.okhttp.Call.AsyncCall;
     19 import com.squareup.okhttp.internal.Util;
     20 import com.squareup.okhttp.internal.http.HttpEngine;
     21 import java.util.ArrayDeque;
     22 import java.util.Deque;
     23 import java.util.Iterator;
     24 import java.util.concurrent.ExecutorService;
     25 import java.util.concurrent.SynchronousQueue;
     26 import java.util.concurrent.ThreadPoolExecutor;
     27 import java.util.concurrent.TimeUnit;
     28 
     29 /**
     30  * Policy on when async requests are executed.
     31  *
     32  * <p>Each dispatcher uses an {@link ExecutorService} to run calls internally. If you
     33  * supply your own executor, it should be able to run {@linkplain #getMaxRequests the
     34  * configured maximum} number of calls concurrently.
     35  */
     36 public final class Dispatcher {
     37   private int maxRequests = 64;
     38   private int maxRequestsPerHost = 5;
     39 
     40   /** Executes calls. Created lazily. */
     41   private ExecutorService executorService;
     42 
     43   /** Ready calls in the order they'll be run. */
     44   private final Deque<AsyncCall> readyCalls = new ArrayDeque<>();
     45 
     46   /** Running calls. Includes canceled calls that haven't finished yet. */
     47   private final Deque<AsyncCall> runningCalls = new ArrayDeque<>();
     48 
     49   /** In-flight synchronous calls. Includes canceled calls that haven't finished yet. */
     50   private final Deque<Call> executedCalls = new ArrayDeque<>();
     51 
     52   public Dispatcher(ExecutorService executorService) {
     53     this.executorService = executorService;
     54   }
     55 
     56   public Dispatcher() {
     57   }
     58 
     59   public synchronized ExecutorService getExecutorService() {
     60     if (executorService == null) {
     61       executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
     62           new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
     63     }
     64     return executorService;
     65   }
     66 
     67   /**
     68    * Set the maximum number of requests to execute concurrently. Above this
     69    * requests queue in memory, waiting for the running calls to complete.
     70    *
     71    * <p>If more than {@code maxRequests} requests are in flight when this is
     72    * invoked, those requests will remain in flight.
     73    */
     74   public synchronized void setMaxRequests(int maxRequests) {
     75     if (maxRequests < 1) {
     76       throw new IllegalArgumentException("max < 1: " + maxRequests);
     77     }
     78     this.maxRequests = maxRequests;
     79     promoteCalls();
     80   }
     81 
     82   public synchronized int getMaxRequests() {
     83     return maxRequests;
     84   }
     85 
     86   /**
     87    * Set the maximum number of requests for each host to execute concurrently.
     88    * This limits requests by the URL's host name. Note that concurrent requests
     89    * to a single IP address may still exceed this limit: multiple hostnames may
     90    * share an IP address or be routed through the same HTTP proxy.
     91    *
     92    * <p>If more than {@code maxRequestsPerHost} requests are in flight when this
     93    * is invoked, those requests will remain in flight.
     94    */
     95   public synchronized void setMaxRequestsPerHost(int maxRequestsPerHost) {
     96     if (maxRequestsPerHost < 1) {
     97       throw new IllegalArgumentException("max < 1: " + maxRequestsPerHost);
     98     }
     99     this.maxRequestsPerHost = maxRequestsPerHost;
    100     promoteCalls();
    101   }
    102 
    103   public synchronized int getMaxRequestsPerHost() {
    104     return maxRequestsPerHost;
    105   }
    106 
    107   synchronized void enqueue(AsyncCall call) {
    108     if (runningCalls.size() < maxRequests && runningCallsForHost(call) < maxRequestsPerHost) {
    109       runningCalls.add(call);
    110       getExecutorService().execute(call);
    111     } else {
    112       readyCalls.add(call);
    113     }
    114   }
    115 
    116   /** Cancel all calls with the tag {@code tag}. */
    117   public synchronized void cancel(Object tag) {
    118     for (AsyncCall call : readyCalls) {
    119       if (Util.equal(tag, call.tag())) {
    120         call.cancel();
    121       }
    122     }
    123 
    124     for (AsyncCall call : runningCalls) {
    125       if (Util.equal(tag, call.tag())) {
    126         call.get().canceled = true;
    127         HttpEngine engine = call.get().engine;
    128         if (engine != null) engine.cancel();
    129       }
    130     }
    131 
    132     for (Call call : executedCalls) {
    133       if (Util.equal(tag, call.tag())) {
    134         call.cancel();
    135       }
    136     }
    137   }
    138 
    139   /** Used by {@code AsyncCall#run} to signal completion. */
    140   synchronized void finished(AsyncCall call) {
    141     if (!runningCalls.remove(call)) throw new AssertionError("AsyncCall wasn't running!");
    142     promoteCalls();
    143   }
    144 
    145   private void promoteCalls() {
    146     if (runningCalls.size() >= maxRequests) return; // Already running max capacity.
    147     if (readyCalls.isEmpty()) return; // No ready calls to promote.
    148 
    149     for (Iterator<AsyncCall> i = readyCalls.iterator(); i.hasNext(); ) {
    150       AsyncCall call = i.next();
    151 
    152       if (runningCallsForHost(call) < maxRequestsPerHost) {
    153         i.remove();
    154         runningCalls.add(call);
    155         getExecutorService().execute(call);
    156       }
    157 
    158       if (runningCalls.size() >= maxRequests) return; // Reached max capacity.
    159     }
    160   }
    161 
    162   /** Returns the number of running calls that share a host with {@code call}. */
    163   private int runningCallsForHost(AsyncCall call) {
    164     int result = 0;
    165     for (AsyncCall c : runningCalls) {
    166       if (c.host().equals(call.host())) result++;
    167     }
    168     return result;
    169   }
    170 
    171   /** Used by {@code Call#execute} to signal it is in-flight. */
    172   synchronized void executed(Call call) {
    173     executedCalls.add(call);
    174   }
    175 
    176   /** Used by {@code Call#execute} to signal completion. */
    177   synchronized void finished(Call call) {
    178     if (!executedCalls.remove(call)) throw new AssertionError("Call wasn't in-flight!");
    179   }
    180 
    181   public synchronized int getRunningCallCount() {
    182     return runningCalls.size();
    183   }
    184 
    185   public synchronized int getQueuedCallCount() {
    186     return readyCalls.size();
    187   }
    188 }
    189