Home | History | Annotate | Download | only in tsccm
      1 /*
      2  * $HeadURL: http://svn.apache.org/repos/asf/httpcomponents/httpclient/trunk/module-client/src/main/java/org/apache/http/impl/conn/tsccm/ConnPoolByRoute.java $
      3  * $Revision: 677240 $
      4  * $Date: 2008-07-16 04:25:47 -0700 (Wed, 16 Jul 2008) $
      5  *
      6  * ====================================================================
      7  *
      8  *  Licensed to the Apache Software Foundation (ASF) under one or more
      9  *  contributor license agreements.  See the NOTICE file distributed with
     10  *  this work for additional information regarding copyright ownership.
     11  *  The ASF licenses this file to You under the Apache License, Version 2.0
     12  *  (the "License"); you may not use this file except in compliance with
     13  *  the License.  You may obtain a copy of the License at
     14  *
     15  *      http://www.apache.org/licenses/LICENSE-2.0
     16  *
     17  *  Unless required by applicable law or agreed to in writing, software
     18  *  distributed under the License is distributed on an "AS IS" BASIS,
     19  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     20  *  See the License for the specific language governing permissions and
     21  *  limitations under the License.
     22  * ====================================================================
     23  *
     24  * This software consists of voluntary contributions made by many
     25  * individuals on behalf of the Apache Software Foundation.  For more
     26  * information on the Apache Software Foundation, please see
     27  * <http://www.apache.org/>.
     28  *
     29  */
     30 
     31 package org.apache.http.impl.conn.tsccm;
     32 
     33 import java.util.Date;
     34 import java.util.HashMap;
     35 import java.util.Iterator;
     36 import java.util.Queue;
     37 import java.util.LinkedList;
     38 import java.util.Map;
     39 import java.util.concurrent.locks.Condition;
     40 import java.util.concurrent.TimeUnit;
     41 
     42 import org.apache.commons.logging.Log;
     43 import org.apache.commons.logging.LogFactory;
     44 import org.apache.http.conn.routing.HttpRoute;
     45 import org.apache.http.conn.ClientConnectionOperator;
     46 import org.apache.http.conn.ConnectionPoolTimeoutException;
     47 import org.apache.http.conn.params.ConnPerRoute;
     48 import org.apache.http.conn.params.ConnManagerParams;
     49 import org.apache.http.params.HttpParams;
     50 
     51 
     52 /**
     53  * A connection pool that maintains connections by route.
     54  * This class is derived from <code>MultiThreadedHttpConnectionManager</code>
     55  * in HttpClient 3.x, see there for original authors. It implements the same
     56  * algorithm for connection re-use and connection-per-host enforcement:
     57  * <ul>
     58  * <li>connections are re-used only for the exact same route</li>
     59  * <li>connection limits are enforced per route rather than per host</li>
     60  * </ul>
     61  * Note that access to the pool datastructures is synchronized via the
     62  * {@link AbstractConnPool#poolLock poolLock} in the base class,
     63  * not via <code>synchronized</code> methods.
     64  *
     65  * @author <a href="mailto:rolandw at apache.org">Roland Weber</a>
     66  * @author <a href="mailto:becke (at) u.washington.edu">Michael Becke</a>
     67  * @author and others
     68  */
     69 public class ConnPoolByRoute extends AbstractConnPool {
     70 
     71     private final Log log = LogFactory.getLog(getClass());
     72 
     73     /** Connection operator for this pool */
     74     protected final ClientConnectionOperator operator;
     75 
     76     /** The list of free connections */
     77     protected Queue<BasicPoolEntry> freeConnections;
     78 
     79     /** The list of WaitingThreads waiting for a connection */
     80     protected Queue<WaitingThread> waitingThreads;
     81 
     82     /**
     83      * A map of route-specific pools.
     84      * Keys are of class {@link HttpRoute},
     85      * values of class {@link RouteSpecificPool}.
     86      */
     87     protected final Map<HttpRoute, RouteSpecificPool> routeToPool;
     88 
     89     protected final int maxTotalConnections;
     90 
     91     private final ConnPerRoute connPerRoute;
     92 
     93     /**
     94      * Creates a new connection pool, managed by route.
     95      */
     96     public ConnPoolByRoute(final ClientConnectionOperator operator, final HttpParams params) {
     97         super();
     98         if (operator == null) {
     99             throw new IllegalArgumentException("Connection operator may not be null");
    100         }
    101         this.operator = operator;
    102 
    103         freeConnections = createFreeConnQueue();
    104         waitingThreads  = createWaitingThreadQueue();
    105         routeToPool     = createRouteToPoolMap();
    106         maxTotalConnections = ConnManagerParams
    107             .getMaxTotalConnections(params);
    108         connPerRoute = ConnManagerParams
    109             .getMaxConnectionsPerRoute(params);
    110     }
    111 
    112 
    113     /**
    114      * Creates the queue for {@link #freeConnections}.
    115      * Called once by the constructor.
    116      *
    117      * @return  a queue
    118      */
    119     protected Queue<BasicPoolEntry> createFreeConnQueue() {
    120         return new LinkedList<BasicPoolEntry>();
    121     }
    122 
    123     /**
    124      * Creates the queue for {@link #waitingThreads}.
    125      * Called once by the constructor.
    126      *
    127      * @return  a queue
    128      */
    129     protected Queue<WaitingThread> createWaitingThreadQueue() {
    130         return new LinkedList<WaitingThread>();
    131     }
    132 
    133     /**
    134      * Creates the map for {@link #routeToPool}.
    135      * Called once by the constructor.
    136      *
    137      * @return  a map
    138      */
    139     protected Map<HttpRoute, RouteSpecificPool> createRouteToPoolMap() {
    140         return new HashMap<HttpRoute, RouteSpecificPool>();
    141     }
    142 
    143 
    144     /**
    145      * Creates a new route-specific pool.
    146      * Called by {@link #getRoutePool} when necessary.
    147      *
    148      * @param route     the route
    149      *
    150      * @return  the new pool
    151      */
    152     protected RouteSpecificPool newRouteSpecificPool(HttpRoute route) {
    153         return new RouteSpecificPool(route, connPerRoute.getMaxForRoute(route));
    154     }
    155 
    156 
    157     /**
    158      * Creates a new waiting thread.
    159      * Called by {@link #getRoutePool} when necessary.
    160      *
    161      * @param cond      the condition to wait for
    162      * @param rospl     the route specific pool, or <code>null</code>
    163      *
    164      * @return  a waiting thread representation
    165      */
    166     protected WaitingThread newWaitingThread(Condition cond,
    167                                              RouteSpecificPool rospl) {
    168         return new WaitingThread(cond, rospl);
    169     }
    170 
    171 
    172     /**
    173      * Get a route-specific pool of available connections.
    174      *
    175      * @param route   the route
    176      * @param create    whether to create the pool if it doesn't exist
    177      *
    178      * @return  the pool for the argument route,
    179      *     never <code>null</code> if <code>create</code> is <code>true</code>
    180      */
    181     protected RouteSpecificPool getRoutePool(HttpRoute route,
    182                                              boolean create) {
    183         RouteSpecificPool rospl = null;
    184         poolLock.lock();
    185         try {
    186 
    187             rospl = routeToPool.get(route);
    188             if ((rospl == null) && create) {
    189                 // no pool for this route yet (or anymore)
    190                 rospl = newRouteSpecificPool(route);
    191                 routeToPool.put(route, rospl);
    192             }
    193 
    194         } finally {
    195             poolLock.unlock();
    196         }
    197 
    198         return rospl;
    199     }
    200 
    201 
    202     //@@@ consider alternatives for gathering statistics
    203     public int getConnectionsInPool(HttpRoute route) {
    204 
    205         poolLock.lock();
    206         try {
    207             // don't allow a pool to be created here!
    208             RouteSpecificPool rospl = getRoutePool(route, false);
    209             return (rospl != null) ? rospl.getEntryCount() : 0;
    210 
    211         } finally {
    212             poolLock.unlock();
    213         }
    214     }
    215 
    216     @Override
    217     public PoolEntryRequest requestPoolEntry(
    218             final HttpRoute route,
    219             final Object state) {
    220 
    221         final WaitingThreadAborter aborter = new WaitingThreadAborter();
    222 
    223         return new PoolEntryRequest() {
    224 
    225             public void abortRequest() {
    226                 poolLock.lock();
    227                 try {
    228                     aborter.abort();
    229                 } finally {
    230                     poolLock.unlock();
    231                 }
    232             }
    233 
    234             public BasicPoolEntry getPoolEntry(
    235                     long timeout,
    236                     TimeUnit tunit)
    237                         throws InterruptedException, ConnectionPoolTimeoutException {
    238                 return getEntryBlocking(route, state, timeout, tunit, aborter);
    239             }
    240 
    241         };
    242     }
    243 
    244     /**
    245      * Obtains a pool entry with a connection within the given timeout.
    246      * If a {@link WaitingThread} is used to block, {@link WaitingThreadAborter#setWaitingThread(WaitingThread)}
    247      * must be called before blocking, to allow the thread to be interrupted.
    248      *
    249      * @param route     the route for which to get the connection
    250      * @param timeout   the timeout, 0 or negative for no timeout
    251      * @param tunit     the unit for the <code>timeout</code>,
    252      *                  may be <code>null</code> only if there is no timeout
    253      * @param aborter   an object which can abort a {@link WaitingThread}.
    254      *
    255      * @return  pool entry holding a connection for the route
    256      *
    257      * @throws ConnectionPoolTimeoutException
    258      *         if the timeout expired
    259      * @throws InterruptedException
    260      *         if the calling thread was interrupted
    261      */
    262     protected BasicPoolEntry getEntryBlocking(
    263                                    HttpRoute route, Object state,
    264                                    long timeout, TimeUnit tunit,
    265                                    WaitingThreadAborter aborter)
    266         throws ConnectionPoolTimeoutException, InterruptedException {
    267 
    268         Date deadline = null;
    269         if (timeout > 0) {
    270             deadline = new Date
    271                 (System.currentTimeMillis() + tunit.toMillis(timeout));
    272         }
    273 
    274         BasicPoolEntry entry = null;
    275         poolLock.lock();
    276         try {
    277 
    278             RouteSpecificPool rospl = getRoutePool(route, true);
    279             WaitingThread waitingThread = null;
    280 
    281             while (entry == null) {
    282 
    283                 if (isShutDown) {
    284                     throw new IllegalStateException
    285                         ("Connection pool shut down.");
    286                 }
    287 
    288                 if (log.isDebugEnabled()) {
    289                     log.debug("Total connections kept alive: " + freeConnections.size());
    290                     log.debug("Total issued connections: " + issuedConnections.size());
    291                     log.debug("Total allocated connection: " + numConnections + " out of " + maxTotalConnections);
    292                 }
    293 
    294                 // the cases to check for:
    295                 // - have a free connection for that route
    296                 // - allowed to create a free connection for that route
    297                 // - can delete and replace a free connection for another route
    298                 // - need to wait for one of the things above to come true
    299 
    300                 entry = getFreeEntry(rospl, state);
    301                 if (entry != null) {
    302                     break;
    303                 }
    304 
    305                 boolean hasCapacity = rospl.getCapacity() > 0;
    306 
    307                 if (log.isDebugEnabled()) {
    308                     log.debug("Available capacity: " + rospl.getCapacity()
    309                             + " out of " + rospl.getMaxEntries()
    310                             + " [" + route + "][" + state + "]");
    311                 }
    312 
    313                 if (hasCapacity && numConnections < maxTotalConnections) {
    314 
    315                     entry = createEntry(rospl, operator);
    316 
    317                 } else if (hasCapacity && !freeConnections.isEmpty()) {
    318 
    319                     deleteLeastUsedEntry();
    320                     entry = createEntry(rospl, operator);
    321 
    322                 } else {
    323 
    324                     if (log.isDebugEnabled()) {
    325                         log.debug("Need to wait for connection" +
    326                                 " [" + route + "][" + state + "]");
    327                     }
    328 
    329                     if (waitingThread == null) {
    330                         waitingThread =
    331                             newWaitingThread(poolLock.newCondition(), rospl);
    332                         aborter.setWaitingThread(waitingThread);
    333                     }
    334 
    335                     boolean success = false;
    336                     try {
    337                         rospl.queueThread(waitingThread);
    338                         waitingThreads.add(waitingThread);
    339                         success = waitingThread.await(deadline);
    340 
    341                     } finally {
    342                         // In case of 'success', we were woken up by the
    343                         // connection pool and should now have a connection
    344                         // waiting for us, or else we're shutting down.
    345                         // Just continue in the loop, both cases are checked.
    346                         rospl.removeThread(waitingThread);
    347                         waitingThreads.remove(waitingThread);
    348                     }
    349 
    350                     // check for spurious wakeup vs. timeout
    351                     if (!success && (deadline != null) &&
    352                         (deadline.getTime() <= System.currentTimeMillis())) {
    353                         throw new ConnectionPoolTimeoutException
    354                             ("Timeout waiting for connection");
    355                     }
    356                 }
    357             } // while no entry
    358 
    359         } finally {
    360             poolLock.unlock();
    361         }
    362 
    363         return entry;
    364 
    365     } // getEntry
    366 
    367 
    368     // non-javadoc, see base class AbstractConnPool
    369     @Override
    370     public void freeEntry(BasicPoolEntry entry, boolean reusable, long validDuration, TimeUnit timeUnit) {
    371 
    372         HttpRoute route = entry.getPlannedRoute();
    373         if (log.isDebugEnabled()) {
    374             log.debug("Freeing connection" +
    375                     " [" + route + "][" + entry.getState() + "]");
    376         }
    377 
    378         poolLock.lock();
    379         try {
    380             if (isShutDown) {
    381                 // the pool is shut down, release the
    382                 // connection's resources and get out of here
    383                 closeConnection(entry.getConnection());
    384                 return;
    385             }
    386 
    387             // no longer issued, we keep a hard reference now
    388             issuedConnections.remove(entry.getWeakRef());
    389 
    390             RouteSpecificPool rospl = getRoutePool(route, true);
    391 
    392             if (reusable) {
    393                 rospl.freeEntry(entry);
    394                 freeConnections.add(entry);
    395                 idleConnHandler.add(entry.getConnection(), validDuration, timeUnit);
    396             } else {
    397                 rospl.dropEntry();
    398                 numConnections--;
    399             }
    400 
    401             notifyWaitingThread(rospl);
    402 
    403         } finally {
    404             poolLock.unlock();
    405         }
    406 
    407     } // freeEntry
    408 
    409 
    410 
    411     /**
    412      * If available, get a free pool entry for a route.
    413      *
    414      * @param rospl       the route-specific pool from which to get an entry
    415      *
    416      * @return  an available pool entry for the given route, or
    417      *          <code>null</code> if none is available
    418      */
    419     protected BasicPoolEntry getFreeEntry(RouteSpecificPool rospl, Object state) {
    420 
    421         BasicPoolEntry entry = null;
    422         poolLock.lock();
    423         try {
    424             boolean done = false;
    425             while(!done) {
    426 
    427                 entry = rospl.allocEntry(state);
    428 
    429                 if (entry != null) {
    430                     if (log.isDebugEnabled()) {
    431                         log.debug("Getting free connection"
    432                                 + " [" + rospl.getRoute() + "][" + state + "]");
    433 
    434                     }
    435                     freeConnections.remove(entry);
    436                     boolean valid = idleConnHandler.remove(entry.getConnection());
    437                     if(!valid) {
    438                         // If the free entry isn't valid anymore, get rid of it
    439                         // and loop to find another one that might be valid.
    440                         if(log.isDebugEnabled())
    441                             log.debug("Closing expired free connection"
    442                                     + " [" + rospl.getRoute() + "][" + state + "]");
    443                         closeConnection(entry.getConnection());
    444                         // We use dropEntry instead of deleteEntry because the entry
    445                         // is no longer "free" (we just allocated it), and deleteEntry
    446                         // can only be used to delete free entries.
    447                         rospl.dropEntry();
    448                         numConnections--;
    449                     } else {
    450                         issuedConnections.add(entry.getWeakRef());
    451                         done = true;
    452                     }
    453 
    454                 } else {
    455                     done = true;
    456                     if (log.isDebugEnabled()) {
    457                         log.debug("No free connections"
    458                                 + " [" + rospl.getRoute() + "][" + state + "]");
    459                     }
    460                 }
    461             }
    462         } finally {
    463             poolLock.unlock();
    464         }
    465 
    466         return entry;
    467     }
    468 
    469 
    470     /**
    471      * Creates a new pool entry.
    472      * This method assumes that the new connection will be handed
    473      * out immediately.
    474      *
    475      * @param rospl       the route-specific pool for which to create the entry
    476      * @param op        the operator for creating a connection
    477      *
    478      * @return  the new pool entry for a new connection
    479      */
    480     protected BasicPoolEntry createEntry(RouteSpecificPool rospl,
    481                                          ClientConnectionOperator op) {
    482 
    483         if (log.isDebugEnabled()) {
    484             log.debug("Creating new connection [" + rospl.getRoute() + "]");
    485         }
    486 
    487         // the entry will create the connection when needed
    488         BasicPoolEntry entry =
    489             new BasicPoolEntry(op, rospl.getRoute(), refQueue);
    490 
    491         poolLock.lock();
    492         try {
    493 
    494             rospl.createdEntry(entry);
    495             numConnections++;
    496 
    497             issuedConnections.add(entry.getWeakRef());
    498 
    499         } finally {
    500             poolLock.unlock();
    501         }
    502 
    503         return entry;
    504     }
    505 
    506 
    507     /**
    508      * Deletes a given pool entry.
    509      * This closes the pooled connection and removes all references,
    510      * so that it can be GCed.
    511      *
    512      * <p><b>Note:</b> Does not remove the entry from the freeConnections list.
    513      * It is assumed that the caller has already handled this step.</p>
    514      * <!-- @@@ is that a good idea? or rather fix it? -->
    515      *
    516      * @param entry         the pool entry for the connection to delete
    517      */
    518     protected void deleteEntry(BasicPoolEntry entry) {
    519 
    520         HttpRoute route = entry.getPlannedRoute();
    521 
    522         if (log.isDebugEnabled()) {
    523             log.debug("Deleting connection"
    524                     + " [" + route + "][" + entry.getState() + "]");
    525         }
    526 
    527         poolLock.lock();
    528         try {
    529 
    530             closeConnection(entry.getConnection());
    531 
    532             RouteSpecificPool rospl = getRoutePool(route, true);
    533             rospl.deleteEntry(entry);
    534             numConnections--;
    535             if (rospl.isUnused()) {
    536                 routeToPool.remove(route);
    537             }
    538 
    539             idleConnHandler.remove(entry.getConnection());// not idle, but dead
    540 
    541         } finally {
    542             poolLock.unlock();
    543         }
    544     }
    545 
    546 
    547     /**
    548      * Delete an old, free pool entry to make room for a new one.
    549      * Used to replace pool entries with ones for a different route.
    550      */
    551     protected void deleteLeastUsedEntry() {
    552 
    553         try {
    554             poolLock.lock();
    555 
    556             //@@@ with get() instead of remove, we could
    557             //@@@ leave the removing to deleteEntry()
    558             BasicPoolEntry entry = freeConnections.remove();
    559 
    560             if (entry != null) {
    561                 deleteEntry(entry);
    562             } else if (log.isDebugEnabled()) {
    563                 log.debug("No free connection to delete.");
    564             }
    565 
    566         } finally {
    567             poolLock.unlock();
    568         }
    569     }
    570 
    571 
    572     // non-javadoc, see base class AbstractConnPool
    573     @Override
    574     protected void handleLostEntry(HttpRoute route) {
    575 
    576         poolLock.lock();
    577         try {
    578 
    579             RouteSpecificPool rospl = getRoutePool(route, true);
    580             rospl.dropEntry();
    581             if (rospl.isUnused()) {
    582                 routeToPool.remove(route);
    583             }
    584 
    585             numConnections--;
    586             notifyWaitingThread(rospl);
    587 
    588         } finally {
    589             poolLock.unlock();
    590         }
    591     }
    592 
    593 
    594     /**
    595      * Notifies a waiting thread that a connection is available.
    596      * This will wake a thread waiting in the specific route pool,
    597      * if there is one.
    598      * Otherwise, a thread in the connection pool will be notified.
    599      *
    600      * @param rospl     the pool in which to notify, or <code>null</code>
    601      */
    602     protected void notifyWaitingThread(RouteSpecificPool rospl) {
    603 
    604         //@@@ while this strategy provides for best connection re-use,
    605         //@@@ is it fair? only do this if the connection is open?
    606         // Find the thread we are going to notify. We want to ensure that
    607         // each waiting thread is only interrupted once, so we will remove
    608         // it from all wait queues before interrupting.
    609         WaitingThread waitingThread = null;
    610 
    611         poolLock.lock();
    612         try {
    613 
    614             if ((rospl != null) && rospl.hasThread()) {
    615                 if (log.isDebugEnabled()) {
    616                     log.debug("Notifying thread waiting on pool" +
    617                             " [" + rospl.getRoute() + "]");
    618                 }
    619                 waitingThread = rospl.nextThread();
    620             } else if (!waitingThreads.isEmpty()) {
    621                 if (log.isDebugEnabled()) {
    622                     log.debug("Notifying thread waiting on any pool");
    623                 }
    624                 waitingThread = waitingThreads.remove();
    625             } else if (log.isDebugEnabled()) {
    626                 log.debug("Notifying no-one, there are no waiting threads");
    627             }
    628 
    629             if (waitingThread != null) {
    630                 waitingThread.wakeup();
    631             }
    632 
    633         } finally {
    634             poolLock.unlock();
    635         }
    636     }
    637 
    638 
    639     //@@@ revise this cleanup stuff
    640     //@@@ move method to base class when deleteEntry() is fixed
    641     // non-javadoc, see base class AbstractConnPool
    642     @Override
    643     public void deleteClosedConnections() {
    644 
    645         poolLock.lock();
    646         try {
    647 
    648             Iterator<BasicPoolEntry>  iter = freeConnections.iterator();
    649             while (iter.hasNext()) {
    650                 BasicPoolEntry entry = iter.next();
    651                 if (!entry.getConnection().isOpen()) {
    652                     iter.remove();
    653                     deleteEntry(entry);
    654                 }
    655             }
    656 
    657         } finally {
    658             poolLock.unlock();
    659         }
    660     }
    661 
    662 
    663     // non-javadoc, see base class AbstractConnPool
    664     @Override
    665     public void shutdown() {
    666 
    667         poolLock.lock();
    668         try {
    669 
    670             super.shutdown();
    671 
    672             // close all free connections
    673             //@@@ move this to base class?
    674             Iterator<BasicPoolEntry> ibpe = freeConnections.iterator();
    675             while (ibpe.hasNext()) {
    676                 BasicPoolEntry entry = ibpe.next();
    677                 ibpe.remove();
    678                 closeConnection(entry.getConnection());
    679             }
    680 
    681             // wake up all waiting threads
    682             Iterator<WaitingThread> iwth = waitingThreads.iterator();
    683             while (iwth.hasNext()) {
    684                 WaitingThread waiter = iwth.next();
    685                 iwth.remove();
    686                 waiter.wakeup();
    687             }
    688 
    689             routeToPool.clear();
    690 
    691         } finally {
    692             poolLock.unlock();
    693         }
    694     }
    695 
    696 
    697 } // class ConnPoolByRoute
    698 
    699