Home | History | Annotate | Download | only in tsccm
      1 /*
      2  * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
      3  * $Revision: 677240 $
      4  * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
      5  *
      6  * ====================================================================
      7  *
      8  *  Licensed to the Apache Software Foundation (ASF) under one or more
      9  *  contributor license agreements.  See the NOTICE file distributed with
     10  *  this work for additional information regarding copyright ownership.
     11  *  The ASF licenses this file to You under the Apache License, Version 2.0
     12  *  (the "License"); you may not use this file except in compliance with
     13  *  the License.  You may obtain a copy of the License at
     14  *
     15  *      http://www.apache.org/licenses/LICENSE-2.0
     16  *
     17  *  Unless required by applicable law or agreed to in writing, software
     18  *  distributed under the License is distributed on an "AS IS" BASIS,
     19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     20  *  See the License for the specific language governing permissions and
     21  *  limitations under the License.
     22  * ====================================================================
     23  *
     24  * This software consists of voluntary contributions made by many
     25  * individuals on behalf of the Apache Software Foundation.  For more
     26  * information on the Apache Software Foundation, please see
     27  * <http://www.apache.org/>.
     28  *
     29  */
     30 
     31 package org.apache.http.impl.conn.tsccm;
     32 
     33 import java.util.Date;
     34 import java.util.HashMap;
     35 import java.util.Iterator;
     36 import java.util.Queue;
     37 import java.util.LinkedList;
     38 import java.util.Map;
     39 import java.util.concurrent.locks.Condition;
     40 import java.util.concurrent.TimeUnit;
     41 
     42 import org.apache.commons.logging.Log;
     43 import org.apache.commons.logging.LogFactory;
     44 import org.apache.http.conn.routing.HttpRoute;
     45 import org.apache.http.conn.ClientConnectionOperator;
     46 import org.apache.http.conn.ConnectionPoolTimeoutException;
     47 import org.apache.http.conn.params.ConnPerRoute;
     48 import org.apache.http.conn.params.ConnManagerParams;
     49 import org.apache.http.params.HttpParams;
     50 
     51 
     52 /**
     53  * A connection pool that maintains connections by route.
     54  * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
     55  * in HttpClient 3.x, see there for original authors. It implements the same
     56  * algorithm for connection re-use and connection-per-host enforcement:
     57  * <ul>
     58  * <li>connections are re-used only for the exact same route</li>
     59  * <li>connection limits are enforced per route rather than per host</li>
     60  * </ul>
     61  * Note that access to the pool datastructures is synchronized via the
     62  * {@link AbstractConnPool#poolLock poolLock} in the base class,
     63  * not via <code>synchronized</code> methods.
     64  *
     65  * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
     66  * @author <a href="mailto:becke (at) u.washington.edu">Michael Becke</a>
     67  * @author and others
     68  *
     69  * @deprecated Please use {@link java.net.URL#openConnection} instead.
     70  *     Please visit <a href="http://android-developers.blogspot.com/2011/09/androids-http-clients.html">this webpage</a>
     71  *     for further details.
     72  */
     73 @Deprecated
     74 public class ConnPoolByRoute extends AbstractConnPool {
     75 
     76     private final Log log = LogFactory.getLog(getClass());
     77 
     78     /** Connection operator for this pool */
     79     protected final ClientConnectionOperator operator;
     80 
     81     /** The list of free connections */
     82     protected Queue<BasicPoolEntry> freeConnections;
     83 
     84     /** The list of WaitingThreads waiting for a connection */
     85     protected Queue<WaitingThread> waitingThreads;
     86 
     87     /**
     88      * A map of route-specific pools.
     89      * Keys are of class {@link HttpRoute},
     90      * values of class {@link RouteSpecificPool}.
     91      */
     92     protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
     93 
     94     protected final int maxTotalConnections;
     95 
     96     private final ConnPerRoute connPerRoute;
     97 
     98     /**
     99      * Creates a new connection pool, managed by route.
    100      */
    101     public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
    102         super();
    103         if (operator == null) {
    104             throw new IllegalArgumentException("Connection operator may not be null");
    105         }
    106         this.operator = operator;
    107 
    108         freeConnections = createFreeConnQueue();
    109         waitingThreads  = createWaitingThreadQueue();
    110         routeToPool     = createRouteToPoolMap();
    111         maxTotalConnections = ConnManagerParams
    112             .getMaxTotalConnections(params);
    113         connPerRoute = ConnManagerParams
    114             .getMaxConnectionsPerRoute(params);
    115     }
    116 
    117 
    118     /**
    119      * Creates the queue for {@link #freeConnections}.
    120      * Called once by the constructor.
    121      *
    122      * @return  a queue
    123      */
    124     protected Queue<BasicPoolEntry> createFreeConnQueue() {
    125         return new LinkedList<BasicPoolEntry>();
    126     }
    127 
    128     /**
    129      * Creates the queue for {@link #waitingThreads}.
    130      * Called once by the constructor.
    131      *
    132      * @return  a queue
    133      */
    134     protected Queue<WaitingThread> createWaitingThreadQueue() {
    135         return new LinkedList<WaitingThread>();
    136     }
    137 
    138     /**
    139      * Creates the map for {@link #routeToPool}.
    140      * Called once by the constructor.
    141      *
    142      * @return  a map
    143      */
    144     protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
    145         return new HashMap<HttpRoute, RouteSpecificPool>();
    146     }
    147 
    148 
    149     /**
    150      * Creates a new route-specific pool.
    151      * Called by {@link #getRoutePool} when necessary.
    152      *
    153      * @param route     the route
    154      *
    155      * @return  the new pool
    156      */
    157     protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
    158         return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
    159     }
    160 
    161 
    162     /**
    163      * Creates a new waiting thread.
    164      * Called by {@link #getRoutePool} when necessary.
    165      *
    166      * @param cond      the condition to wait for
    167      * @param rospl     the route specific pool, or <code>null</code>
    168      *
    169      * @return  a waiting thread representation
    170      */
    171     protected WaitingThread newWaitingThread(Condition cond,
    172                                              RouteSpecificPool rospl) {
    173         return new WaitingThread(cond, rospl);
    174     }
    175 
    176 
    177     /**
    178      * Get a route-specific pool of available connections.
    179      *
    180      * @param route   the route
    181      * @param create    whether to create the pool if it doesn't exist
    182      *
    183      * @return  the pool for the argument route,
    184      *     never <code>null</code> if <code>create</code> is <code>true</code>
    185      */
    186     protected RouteSpecificPool getRoutePool(HttpRoute route,
    187                                              boolean create) {
    188         RouteSpecificPool rospl = null;
    189         poolLock.lock();
    190         try {
    191 
    192             rospl = routeToPool.get(route);
    193             if ((rospl == null) && create) {
    194                 // no pool for this route yet (or anymore)
    195                 rospl = newRouteSpecificPool(route);
    196                 routeToPool.put(route, rospl);
    197             }
    198 
    199         } finally {
    200             poolLock.unlock();
    201         }
    202 
    203         return rospl;
    204     }
    205 
    206 
    207     //@@@ consider alternatives for gathering statistics
    208     public int getConnectionsInPool(HttpRoute route) {
    209 
    210         poolLock.lock();
    211         try {
    212             // don't allow a pool to be created here!
    213             RouteSpecificPool rospl = getRoutePool(route, false);
    214             return (rospl != null) ? rospl.getEntryCount() : 0;
    215 
    216         } finally {
    217             poolLock.unlock();
    218         }
    219     }
    220 
    221     @Override
    222     public PoolEntryRequest requestPoolEntry(
    223             final HttpRoute route,
    224             final Object state) {
    225 
    226         final WaitingThreadAborter aborter = new WaitingThreadAborter();
    227 
    228         return new PoolEntryRequest() {
    229 
    230             public void abortRequest() {
    231                 poolLock.lock();
    232                 try {
    233                     aborter.abort();
    234                 } finally {
    235                     poolLock.unlock();
    236                 }
    237             }
    238 
    239             public BasicPoolEntry getPoolEntry(
    240                     long timeout,
    241                     TimeUnit tunit)
    242                         throws InterruptedException, ConnectionPoolTimeoutException {
    243                 return getEntryBlocking(route, state, timeout, tunit, aborter);
    244             }
    245 
    246         };
    247     }
    248 
    249     /**
    250      * Obtains a pool entry with a connection within the given timeout.
    251      * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
    252      * must be called before blocking, to allow the thread to be interrupted.
    253      *
    254      * @param route     the route for which to get the connection
    255      * @param timeout   the timeout, 0 or negative for no timeout
    256      * @param tunit     the unit for the <code>timeout</code>,
    257      *                  may be <code>null</code> only if there is no timeout
    258      * @param aborter   an object which can abort a {@link WaitingThread}.
    259      *
    260      * @return  pool entry holding a connection for the route
    261      *
    262      * @throws ConnectionPoolTimeoutException
    263      *         if the timeout expired
    264      * @throws InterruptedException
    265      *         if the calling thread was interrupted
    266      */
    267     protected BasicPoolEntry getEntryBlocking(
    268                                    HttpRoute route, Object state,
    269                                    long timeout, TimeUnit tunit,
    270                                    WaitingThreadAborter aborter)
    271         throws ConnectionPoolTimeoutException, InterruptedException {
    272 
    273         Date deadline = null;
    274         if (timeout > 0) {
    275             deadline = new Date
    276                 (System.currentTimeMillis() + tunit.toMillis(timeout));
    277         }
    278 
    279         BasicPoolEntry entry = null;
    280         poolLock.lock();
    281         try {
    282 
    283             RouteSpecificPool rospl = getRoutePool(route, true);
    284             WaitingThread waitingThread = null;
    285 
    286             while (entry == null) {
    287 
    288                 if (isShutDown) {
    289                     throw new IllegalStateException
    290                         ("Connection pool shut down.");
    291                 }
    292 
    293                 if (log.isDebugEnabled()) {
    294                     log.debug("Total connections kept alive: " + freeConnections.size());
    295                     log.debug("Total issued connections: " + issuedConnections.size());
    296                     log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
    297                 }
    298 
    299                 // the cases to check for:
    300                 // - have a free connection for that route
    301                 // - allowed to create a free connection for that route
    302                 // - can delete and replace a free connection for another route
    303                 // - need to wait for one of the things above to come true
    304 
    305                 entry = getFreeEntry(rospl, state);
    306                 if (entry != null) {
    307                     break;
    308                 }
    309 
    310                 boolean hasCapacity = rospl.getCapacity() > 0;
    311 
    312                 if (log.isDebugEnabled()) {
    313                     log.debug("Available capacity: " + rospl.getCapacity()
    314                             + " out of " + rospl.getMaxEntries()
    315                             + " [" + route + "][" + state + "]");
    316                 }
    317 
    318                 if (hasCapacity && numConnections < maxTotalConnections) {
    319 
    320                     entry = createEntry(rospl, operator);
    321 
    322                 } else if (hasCapacity && !freeConnections.isEmpty()) {
    323 
    324                     deleteLeastUsedEntry();
    325                     entry = createEntry(rospl, operator);
    326 
    327                 } else {
    328 
    329                     if (log.isDebugEnabled()) {
    330                         log.debug("Need to wait for connection" +
    331                                 " [" + route + "][" + state + "]");
    332                     }
    333 
    334                     if (waitingThread == null) {
    335                         waitingThread =
    336                             newWaitingThread(poolLock.newCondition(), rospl);
    337                         aborter.setWaitingThread(waitingThread);
    338                     }
    339 
    340                     boolean success = false;
    341                     try {
    342                         rospl.queueThread(waitingThread);
    343                         waitingThreads.add(waitingThread);
    344                         success = waitingThread.await(deadline);
    345 
    346                     } finally {
    347                         // In case of 'success', we were woken up by the
    348                         // connection pool and should now have a connection
    349                         // waiting for us, or else we're shutting down.
    350                         // Just continue in the loop, both cases are checked.
    351                         rospl.removeThread(waitingThread);
    352                         waitingThreads.remove(waitingThread);
    353                     }
    354 
    355                     // check for spurious wakeup vs. timeout
    356                     if (!success && (deadline != null) &&
    357                         (deadline.getTime() <= System.currentTimeMillis())) {
    358                         throw new ConnectionPoolTimeoutException
    359                             ("Timeout waiting for connection");
    360                     }
    361                 }
    362             } // while no entry
    363 
    364         } finally {
    365             poolLock.unlock();
    366         }
    367 
    368         return entry;
    369 
    370     } // getEntry
    371 
    372 
    373     // non-javadoc, see base class AbstractConnPool
    374     @Override
    375     public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
    376 
    377         HttpRoute route = entry.getPlannedRoute();
    378         if (log.isDebugEnabled()) {
    379             log.debug("Freeing connection" +
    380                     " [" + route + "][" + entry.getState() + "]");
    381         }
    382 
    383         poolLock.lock();
    384         try {
    385             if (isShutDown) {
    386                 // the pool is shut down, release the
    387                 // connection's resources and get out of here
    388                 closeConnection(entry.getConnection());
    389                 return;
    390             }
    391 
    392             // no longer issued, we keep a hard reference now
    393             issuedConnections.remove(entry.getWeakRef());
    394 
    395             RouteSpecificPool rospl = getRoutePool(route, true);
    396 
    397             if (reusable) {
    398                 rospl.freeEntry(entry);
    399                 freeConnections.add(entry);
    400                 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
    401             } else {
    402                 rospl.dropEntry();
    403                 numConnections--;
    404             }
    405 
    406             notifyWaitingThread(rospl);
    407 
    408         } finally {
    409             poolLock.unlock();
    410         }
    411 
    412     } // freeEntry
    413 
    414 
    415 
    416     /**
    417      * If available, get a free pool entry for a route.
    418      *
    419      * @param rospl       the route-specific pool from which to get an entry
    420      *
    421      * @return  an available pool entry for the given route, or
    422      *          <code>null</code> if none is available
    423      */
    424     protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
    425 
    426         BasicPoolEntry entry = null;
    427         poolLock.lock();
    428         try {
    429             boolean done = false;
    430             while(!done) {
    431 
    432                 entry = rospl.allocEntry(state);
    433 
    434                 if (entry != null) {
    435                     if (log.isDebugEnabled()) {
    436                         log.debug("Getting free connection"
    437                                 + " [" + rospl.getRoute() + "][" + state + "]");
    438 
    439                     }
    440                     freeConnections.remove(entry);
    441                     boolean valid = idleConnHandler.remove(entry.getConnection());
    442                     if(!valid) {
    443                         // If the free entry isn't valid anymore, get rid of it
    444                         // and loop to find another one that might be valid.
    445                         if(log.isDebugEnabled())
    446                             log.debug("Closing expired free connection"
    447                                     + " [" + rospl.getRoute() + "][" + state + "]");
    448                         closeConnection(entry.getConnection());
    449                         // We use dropEntry instead of deleteEntry because the entry
    450                         // is no longer "free" (we just allocated it), and deleteEntry
    451                         // can only be used to delete free entries.
    452                         rospl.dropEntry();
    453                         numConnections--;
    454                     } else {
    455                         issuedConnections.add(entry.getWeakRef());
    456                         done = true;
    457                     }
    458 
    459                 } else {
    460                     done = true;
    461                     if (log.isDebugEnabled()) {
    462                         log.debug("No free connections"
    463                                 + " [" + rospl.getRoute() + "][" + state + "]");
    464                     }
    465                 }
    466             }
    467         } finally {
    468             poolLock.unlock();
    469         }
    470 
    471         return entry;
    472     }
    473 
    474 
    475     /**
    476      * Creates a new pool entry.
    477      * This method assumes that the new connection will be handed
    478      * out immediately.
    479      *
    480      * @param rospl       the route-specific pool for which to create the entry
    481      * @param op        the operator for creating a connection
    482      *
    483      * @return  the new pool entry for a new connection
    484      */
    485     protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
    486                                          ClientConnectionOperator op) {
    487 
    488         if (log.isDebugEnabled()) {
    489             log.debug("Creating new connection [" + rospl.getRoute() + "]");
    490         }
    491 
    492         // the entry will create the connection when needed
    493         BasicPoolEntry entry =
    494             new BasicPoolEntry(op, rospl.getRoute(), refQueue);
    495 
    496         poolLock.lock();
    497         try {
    498 
    499             rospl.createdEntry(entry);
    500             numConnections++;
    501 
    502             issuedConnections.add(entry.getWeakRef());
    503 
    504         } finally {
    505             poolLock.unlock();
    506         }
    507 
    508         return entry;
    509     }
    510 
    511 
    512     /**
    513      * Deletes a given pool entry.
    514      * This closes the pooled connection and removes all references,
    515      * so that it can be GCed.
    516      *
    517      * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
    518      * It is assumed that the caller has already handled this step.</p>
    519      * <!-- @@@ is that a good idea? or rather fix it? -->
    520      *
    521      * @param entry         the pool entry for the connection to delete
    522      */
    523     protected void deleteEntry(BasicPoolEntry entry) {
    524 
    525         HttpRoute route = entry.getPlannedRoute();
    526 
    527         if (log.isDebugEnabled()) {
    528             log.debug("Deleting connection"
    529                     + " [" + route + "][" + entry.getState() + "]");
    530         }
    531 
    532         poolLock.lock();
    533         try {
    534 
    535             closeConnection(entry.getConnection());
    536 
    537             RouteSpecificPool rospl = getRoutePool(route, true);
    538             rospl.deleteEntry(entry);
    539             numConnections--;
    540             if (rospl.isUnused()) {
    541                 routeToPool.remove(route);
    542             }
    543 
    544             idleConnHandler.remove(entry.getConnection());// not idle, but dead
    545 
    546         } finally {
    547             poolLock.unlock();
    548         }
    549     }
    550 
    551 
    552     /**
    553      * Delete an old, free pool entry to make room for a new one.
    554      * Used to replace pool entries with ones for a different route.
    555      */
    556     protected void deleteLeastUsedEntry() {
    557 
    558         try {
    559             poolLock.lock();
    560 
    561             //@@@ with get() instead of remove, we could
    562             //@@@ leave the removing to deleteEntry()
    563             BasicPoolEntry entry = freeConnections.remove();
    564 
    565             if (entry != null) {
    566                 deleteEntry(entry);
    567             } else if (log.isDebugEnabled()) {
    568                 log.debug("No free connection to delete.");
    569             }
    570 
    571         } finally {
    572             poolLock.unlock();
    573         }
    574     }
    575 
    576 
    577     // non-javadoc, see base class AbstractConnPool
    578     @Override
    579     protected void handleLostEntry(HttpRoute route) {
    580 
    581         poolLock.lock();
    582         try {
    583 
    584             RouteSpecificPool rospl = getRoutePool(route, true);
    585             rospl.dropEntry();
    586             if (rospl.isUnused()) {
    587                 routeToPool.remove(route);
    588             }
    589 
    590             numConnections--;
    591             notifyWaitingThread(rospl);
    592 
    593         } finally {
    594             poolLock.unlock();
    595         }
    596     }
    597 
    598 
    599     /**
    600      * Notifies a waiting thread that a connection is available.
    601      * This will wake a thread waiting in the specific route pool,
    602      * if there is one.
    603      * Otherwise, a thread in the connection pool will be notified.
    604      *
    605      * @param rospl     the pool in which to notify, or <code>null</code>
    606      */
    607     protected void notifyWaitingThread(RouteSpecificPool rospl) {
    608 
    609         //@@@ while this strategy provides for best connection re-use,
    610         //@@@ is it fair? only do this if the connection is open?
    611         // Find the thread we are going to notify. We want to ensure that
    612         // each waiting thread is only interrupted once, so we will remove
    613         // it from all wait queues before interrupting.
    614         WaitingThread waitingThread = null;
    615 
    616         poolLock.lock();
    617         try {
    618 
    619             if ((rospl != null) && rospl.hasThread()) {
    620                 if (log.isDebugEnabled()) {
    621                     log.debug("Notifying thread waiting on pool" +
    622                             " [" + rospl.getRoute() + "]");
    623                 }
    624                 waitingThread = rospl.nextThread();
    625             } else if (!waitingThreads.isEmpty()) {
    626                 if (log.isDebugEnabled()) {
    627                     log.debug("Notifying thread waiting on any pool");
    628                 }
    629                 waitingThread = waitingThreads.remove();
    630             } else if (log.isDebugEnabled()) {
    631                 log.debug("Notifying no-one, there are no waiting threads");
    632             }
    633 
    634             if (waitingThread != null) {
    635                 waitingThread.wakeup();
    636             }
    637 
    638         } finally {
    639             poolLock.unlock();
    640         }
    641     }
    642 
    643 
    644     //@@@ revise this cleanup stuff
    645     //@@@ move method to base class when deleteEntry() is fixed
    646     // non-javadoc, see base class AbstractConnPool
    647     @Override
    648     public void deleteClosedConnections() {
    649 
    650         poolLock.lock();
    651         try {
    652 
    653             Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
    654             while (iter.hasNext()) {
    655                 BasicPoolEntry entry = iter.next();
    656                 if (!entry.getConnection().isOpen()) {
    657                     iter.remove();
    658                     deleteEntry(entry);
    659                 }
    660             }
    661 
    662         } finally {
    663             poolLock.unlock();
    664         }
    665     }
    666 
    667 
    668     // non-javadoc, see base class AbstractConnPool
    669     @Override
    670     public void shutdown() {
    671 
    672         poolLock.lock();
    673         try {
    674 
    675             super.shutdown();
    676 
    677             // close all free connections
    678             //@@@ move this to base class?
    679             Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
    680             while (ibpe.hasNext()) {
    681                 BasicPoolEntry entry = ibpe.next();
    682                 ibpe.remove();
    683                 closeConnection(entry.getConnection());
    684             }
    685 
    686             // wake up all waiting threads
    687             Iterator<WaitingThread> iwth = waitingThreads.iterator();
    688             while (iwth.hasNext()) {
    689                 WaitingThread waiter = iwth.next();
    690                 iwth.remove();
    691                 waiter.wakeup();
    692             }
    693 
    694             routeToPool.clear();
    695 
    696         } finally {
    697             poolLock.unlock();
    698         }
    699     }
    700 
    701 
    702 } // class ConnPoolByRoute
    703 
    704