Home | History | Annotate | Download | only in client
      1 //
      2 //  ========================================================================
      3 //  Copyright (c) 1995-2014 Mort Bay Consulting Pty. Ltd.
      4 //  ------------------------------------------------------------------------
      5 //  All rights reserved. This program and the accompanying materials
      6 //  are made available under the terms of the Eclipse Public License v1.0
      7 //  and Apache License v2.0 which accompanies this distribution.
      8 //
      9 //      The Eclipse Public License is available at
     10 //      http://www.eclipse.org/legal/epl-v10.html
     11 //
     12 //      The Apache License v2.0 is available at
     13 //      http://www.opensource.org/licenses/apache2.0.php
     14 //
     15 //  You may elect to redistribute this code under either of these licenses.
     16 //  ========================================================================
     17 //
     18 
     19 package org.eclipse.jetty.client;
     20 
     21 import java.io.IOException;
     22 import java.lang.reflect.Constructor;
     23 import java.net.ProtocolException;
     24 import java.util.ArrayList;
     25 import java.util.LinkedList;
     26 import java.util.List;
     27 import java.util.concurrent.ArrayBlockingQueue;
     28 import java.util.concurrent.BlockingQueue;
     29 import java.util.concurrent.RejectedExecutionException;
     30 
     31 import org.eclipse.jetty.client.HttpClient.Connector;
     32 import org.eclipse.jetty.client.security.Authentication;
     33 import org.eclipse.jetty.client.security.SecurityListener;
     34 import org.eclipse.jetty.http.HttpCookie;
     35 import org.eclipse.jetty.http.HttpHeaders;
     36 import org.eclipse.jetty.http.HttpMethods;
     37 import org.eclipse.jetty.http.HttpStatus;
     38 import org.eclipse.jetty.http.PathMap;
     39 import org.eclipse.jetty.io.Buffer;
     40 import org.eclipse.jetty.io.ByteArrayBuffer;
     41 import org.eclipse.jetty.io.Connection;
     42 import org.eclipse.jetty.io.EndPoint;
     43 import org.eclipse.jetty.util.component.AggregateLifeCycle;
     44 import org.eclipse.jetty.util.component.Dumpable;
     45 import org.eclipse.jetty.util.log.Log;
     46 import org.eclipse.jetty.util.log.Logger;
     47 import org.eclipse.jetty.util.ssl.SslContextFactory;
     48 
     49 /**
     50  * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
     51  */
     52 public class HttpDestination implements Dumpable
     53 {
     54     private static final Logger LOG = Log.getLogger(HttpDestination.class);
     55 
     56     private final List<HttpExchange> _exchanges = new LinkedList<HttpExchange>();
     57     private final List<AbstractHttpConnection> _connections = new LinkedList<AbstractHttpConnection>();
     58     private final BlockingQueue<Object> _reservedConnections = new ArrayBlockingQueue<Object>(10, true);
     59     private final List<AbstractHttpConnection> _idleConnections = new ArrayList<AbstractHttpConnection>();
     60     private final HttpClient _client;
     61     private final Address _address;
     62     private final boolean _ssl;
     63     private final SslContextFactory _sslContextFactory;
     64     private final ByteArrayBuffer _hostHeader;
     65     private volatile int _maxConnections;
     66     private volatile int _maxQueueSize;
     67     private int _pendingConnections = 0;
     68     private int _pendingReservedConnections = 0;
     69     private volatile Address _proxy;
     70     private Authentication _proxyAuthentication;
     71     private PathMap _authorizations;
     72     private List<HttpCookie> _cookies;
     73 
     74     HttpDestination(HttpClient client, Address address, boolean ssl, SslContextFactory sslContextFactory)
     75     {
     76         _client = client;
     77         _address = address;
     78         _ssl = ssl;
     79         _sslContextFactory = sslContextFactory;
     80         _maxConnections = _client.getMaxConnectionsPerAddress();
     81         _maxQueueSize = _client.getMaxQueueSizePerAddress();
     82         String addressString = address.getHost();
     83         if (address.getPort() != (_ssl ? 443 : 80))
     84             addressString += ":" + address.getPort();
     85         _hostHeader = new ByteArrayBuffer(addressString);
     86     }
     87 
     88     public HttpClient getHttpClient()
     89     {
     90         return _client;
     91     }
     92 
     93     public Address getAddress()
     94     {
     95         return _address;
     96     }
     97 
     98     public boolean isSecure()
     99     {
    100         return _ssl;
    101     }
    102 
    103     public SslContextFactory getSslContextFactory()
    104     {
    105         return _sslContextFactory;
    106     }
    107 
    108     public Buffer getHostHeader()
    109     {
    110         return _hostHeader;
    111     }
    112 
    113     public int getMaxConnections()
    114     {
    115         return _maxConnections;
    116     }
    117 
    118     public void setMaxConnections(int maxConnections)
    119     {
    120         this._maxConnections = maxConnections;
    121     }
    122 
    123     public int getMaxQueueSize()
    124     {
    125         return _maxQueueSize;
    126     }
    127 
    128     public void setMaxQueueSize(int maxQueueSize)
    129     {
    130         this._maxQueueSize = maxQueueSize;
    131     }
    132 
    133     public int getConnections()
    134     {
    135         synchronized (this)
    136         {
    137             return _connections.size();
    138         }
    139     }
    140 
    141     public int getIdleConnections()
    142     {
    143         synchronized (this)
    144         {
    145             return _idleConnections.size();
    146         }
    147     }
    148 
    149     public void addAuthorization(String pathSpec, Authentication authorization)
    150     {
    151         synchronized (this)
    152         {
    153             if (_authorizations == null)
    154                 _authorizations = new PathMap();
    155             _authorizations.put(pathSpec, authorization);
    156         }
    157 
    158         // TODO query and remove methods
    159     }
    160 
    161     public void addCookie(HttpCookie cookie)
    162     {
    163         synchronized (this)
    164         {
    165             if (_cookies == null)
    166                 _cookies = new ArrayList<HttpCookie>();
    167             _cookies.add(cookie);
    168         }
    169 
    170         // TODO query, remove and age methods
    171     }
    172 
    173     /**
    174      * Get a connection. We either get an idle connection if one is available, or
    175      * we make a new connection, if we have not yet reached maxConnections. If we
    176      * have reached maxConnections, we wait until the number reduces.
    177      *
    178      * @param timeout max time prepared to block waiting to be able to get a connection
    179      * @return a HttpConnection for this destination
    180      * @throws IOException if an I/O error occurs
    181      */
    182     private AbstractHttpConnection getConnection(long timeout) throws IOException
    183     {
    184         AbstractHttpConnection connection = null;
    185 
    186         while ((connection == null) && (connection = getIdleConnection()) == null && timeout > 0)
    187         {
    188             boolean startConnection = false;
    189             synchronized (this)
    190             {
    191                 int totalConnections = _connections.size() + _pendingConnections;
    192                 if (totalConnections < _maxConnections)
    193                 {
    194                     _pendingReservedConnections++;
    195                     startConnection = true;
    196                 }
    197             }
    198 
    199             if (startConnection)
    200             {
    201                 startNewConnection();
    202                 try
    203                 {
    204                     Object o = _reservedConnections.take();
    205                     if (o instanceof AbstractHttpConnection)
    206                     {
    207                         connection = (AbstractHttpConnection)o;
    208                     }
    209                     else
    210                         throw (IOException)o;
    211                 }
    212                 catch (InterruptedException e)
    213                 {
    214                     LOG.ignore(e);
    215                 }
    216             }
    217             else
    218             {
    219                 try
    220                 {
    221                     Thread.currentThread();
    222                     Thread.sleep(200);
    223                     timeout -= 200;
    224                 }
    225                 catch (InterruptedException e)
    226                 {
    227                     LOG.ignore(e);
    228                 }
    229             }
    230         }
    231         return connection;
    232     }
    233 
    234     public AbstractHttpConnection reserveConnection(long timeout) throws IOException
    235     {
    236         AbstractHttpConnection connection = getConnection(timeout);
    237         if (connection != null)
    238             connection.setReserved(true);
    239         return connection;
    240     }
    241 
    242     public AbstractHttpConnection getIdleConnection() throws IOException
    243     {
    244         AbstractHttpConnection connection = null;
    245         while (true)
    246         {
    247             synchronized (this)
    248             {
    249                 if (connection != null)
    250                 {
    251                     _connections.remove(connection);
    252                     connection.close();
    253                     connection = null;
    254                 }
    255                 if (_idleConnections.size() > 0)
    256                     connection = _idleConnections.remove(_idleConnections.size() - 1);
    257             }
    258 
    259             if (connection == null)
    260             {
    261                 return null;
    262             }
    263 
    264             // Check if the connection was idle,
    265             // but it expired just a moment ago
    266             if (connection.cancelIdleTimeout())
    267             {
    268                 return connection;
    269             }
    270         }
    271     }
    272 
    273     protected void startNewConnection()
    274     {
    275         try
    276         {
    277             synchronized (this)
    278             {
    279                 _pendingConnections++;
    280             }
    281             final Connector connector = _client._connector;
    282             if (connector != null)
    283                 connector.startConnection(this);
    284         }
    285         catch (Exception e)
    286         {
    287             LOG.debug(e);
    288             onConnectionFailed(e);
    289         }
    290     }
    291 
    292     public void onConnectionFailed(Throwable throwable)
    293     {
    294         Throwable connect_failure = null;
    295 
    296         boolean startConnection = false;
    297         synchronized (this)
    298         {
    299             _pendingConnections--;
    300             if (_pendingReservedConnections > 0)
    301             {
    302                 connect_failure = throwable;
    303                 _pendingReservedConnections--;
    304             }
    305             else if (_exchanges.size() > 0)
    306             {
    307                 HttpExchange ex = _exchanges.remove(0);
    308                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
    309                     ex.getEventListener().onConnectionFailed(throwable);
    310 
    311                 // Since an existing connection had failed, we need to create a
    312                 // connection if the  queue is not empty and client is running.
    313                 if (!_exchanges.isEmpty() && _client.isStarted())
    314                     startConnection = true;
    315             }
    316         }
    317 
    318         if (startConnection)
    319             startNewConnection();
    320 
    321         if (connect_failure != null)
    322         {
    323             try
    324             {
    325                 _reservedConnections.put(connect_failure);
    326             }
    327             catch (InterruptedException e)
    328             {
    329                 LOG.ignore(e);
    330             }
    331         }
    332     }
    333 
    334     public void onException(Throwable throwable)
    335     {
    336         synchronized (this)
    337         {
    338             _pendingConnections--;
    339             if (_exchanges.size() > 0)
    340             {
    341                 HttpExchange ex = _exchanges.remove(0);
    342                 if (ex.setStatus(HttpExchange.STATUS_EXCEPTED))
    343                     ex.getEventListener().onException(throwable);
    344             }
    345         }
    346     }
    347 
    348     public void onNewConnection(final AbstractHttpConnection connection) throws IOException
    349     {
    350         Connection reservedConnection = null;
    351 
    352         synchronized (this)
    353         {
    354             _pendingConnections--;
    355             _connections.add(connection);
    356 
    357             if (_pendingReservedConnections > 0)
    358             {
    359                 reservedConnection = connection;
    360                 _pendingReservedConnections--;
    361             }
    362             else
    363             {
    364                 // Establish the tunnel if needed
    365                 EndPoint endPoint = connection.getEndPoint();
    366                 if (isProxied() && endPoint instanceof SelectConnector.UpgradableEndPoint)
    367                 {
    368                     SelectConnector.UpgradableEndPoint proxyEndPoint = (SelectConnector.UpgradableEndPoint)endPoint;
    369                     ConnectExchange connect = new ConnectExchange(getAddress(), proxyEndPoint);
    370                     connect.setAddress(getProxy());
    371                     LOG.debug("Establishing tunnel to {} via {}", getAddress(), getProxy());
    372                     send(connection, connect);
    373                 }
    374                 else
    375                 {
    376                     // Another connection stole the exchange that caused the creation of this connection ?
    377                     if (_exchanges.size() == 0)
    378                     {
    379                         LOG.debug("No exchanges for new connection {}", connection);
    380                         connection.setIdleTimeout();
    381                         _idleConnections.add(connection);
    382                     }
    383                     else
    384                     {
    385                         HttpExchange exchange = _exchanges.remove(0);
    386                         send(connection, exchange);
    387                     }
    388                 }
    389             }
    390         }
    391 
    392         if (reservedConnection != null)
    393         {
    394             try
    395             {
    396                 _reservedConnections.put(reservedConnection);
    397             }
    398             catch (InterruptedException e)
    399             {
    400                 LOG.ignore(e);
    401             }
    402         }
    403     }
    404 
    405     public void returnConnection(AbstractHttpConnection connection, boolean close) throws IOException
    406     {
    407         if (connection.isReserved())
    408             connection.setReserved(false);
    409 
    410         if (close)
    411         {
    412             try
    413             {
    414                 connection.close();
    415             }
    416             catch (IOException e)
    417             {
    418                 LOG.ignore(e);
    419             }
    420         }
    421 
    422         if (!_client.isStarted())
    423             return;
    424 
    425         if (!close && connection.getEndPoint().isOpen())
    426         {
    427             synchronized (this)
    428             {
    429                 if (_exchanges.size() == 0)
    430                 {
    431                     connection.setIdleTimeout();
    432                     _idleConnections.add(connection);
    433                 }
    434                 else
    435                 {
    436                     HttpExchange ex = _exchanges.remove(0);
    437                     send(connection, ex);
    438                 }
    439                 this.notifyAll();
    440             }
    441         }
    442         else
    443         {
    444             boolean startConnection = false;
    445             synchronized (this)
    446             {
    447                 _connections.remove(connection);
    448                 if (!_exchanges.isEmpty())
    449                     startConnection = true;
    450             }
    451 
    452             if (startConnection)
    453                 startNewConnection();
    454         }
    455     }
    456 
    457     public void returnIdleConnection(AbstractHttpConnection connection)
    458     {
    459         // TODO work out the real idle time;
    460         long idleForMs = connection.getEndPoint() != null ? connection.getEndPoint().getMaxIdleTime() : -1;
    461         connection.onIdleExpired(idleForMs);
    462 
    463         boolean startConnection = false;
    464         synchronized (this)
    465         {
    466             _idleConnections.remove(connection);
    467             _connections.remove(connection);
    468 
    469             if (!_exchanges.isEmpty() && _client.isStarted())
    470                 startConnection = true;
    471         }
    472 
    473         if (startConnection)
    474             startNewConnection();
    475     }
    476 
    477     public void send(HttpExchange ex) throws IOException
    478     {
    479         ex.setStatus(HttpExchange.STATUS_WAITING_FOR_CONNECTION);
    480 
    481         LinkedList<String> listeners = _client.getRegisteredListeners();
    482         if (listeners != null)
    483         {
    484             // Add registered listeners, fail if we can't load them
    485             for (int i = listeners.size(); i > 0; --i)
    486             {
    487                 String listenerClass = listeners.get(i - 1);
    488                 try
    489                 {
    490                     Class<?> listener = Class.forName(listenerClass);
    491                     Constructor constructor = listener.getDeclaredConstructor(HttpDestination.class, HttpExchange.class);
    492                     HttpEventListener elistener = (HttpEventListener)constructor.newInstance(this, ex);
    493                     ex.setEventListener(elistener);
    494                 }
    495                 catch (final Exception e)
    496                 {
    497                     throw new IOException("Unable to instantiate registered listener for destination: " + listenerClass)
    498                     {
    499                         {
    500                             initCause(e);
    501                         }
    502                     };
    503                 }
    504             }
    505         }
    506 
    507         // Security is supported by default and should be the first consulted
    508         if (_client.hasRealms())
    509         {
    510             ex.setEventListener(new SecurityListener(this, ex));
    511         }
    512 
    513         doSend(ex);
    514     }
    515 
    516     public void resend(HttpExchange ex) throws IOException
    517     {
    518         ex.getEventListener().onRetry();
    519         ex.reset();
    520         doSend(ex);
    521     }
    522 
    523     protected void doSend(HttpExchange ex) throws IOException
    524     {
    525         // add cookies
    526         // TODO handle max-age etc.
    527         if (_cookies != null)
    528         {
    529             StringBuilder buf = null;
    530             for (HttpCookie cookie : _cookies)
    531             {
    532                 if (buf == null)
    533                     buf = new StringBuilder();
    534                 else
    535                     buf.append("; ");
    536                 buf.append(cookie.getName()); // TODO quotes
    537                 buf.append("=");
    538                 buf.append(cookie.getValue()); // TODO quotes
    539             }
    540             if (buf != null)
    541                 ex.addRequestHeader(HttpHeaders.COOKIE, buf.toString());
    542         }
    543 
    544         // Add any known authorizations
    545         if (_authorizations != null)
    546         {
    547             Authentication auth = (Authentication)_authorizations.match(ex.getRequestURI());
    548             if (auth != null)
    549                 (auth).setCredentials(ex);
    550         }
    551 
    552         // Schedule the timeout here, before we queue the exchange
    553         // so that we count also the queue time in the timeout
    554         ex.scheduleTimeout(this);
    555 
    556         AbstractHttpConnection connection = getIdleConnection();
    557         if (connection != null)
    558         {
    559             send(connection, ex);
    560         }
    561         else
    562         {
    563             boolean startConnection = false;
    564             synchronized (this)
    565             {
    566                 if (_exchanges.size() == _maxQueueSize)
    567                     throw new RejectedExecutionException("Queue full for address " + _address);
    568 
    569                 _exchanges.add(ex);
    570                 if (_connections.size() + _pendingConnections < _maxConnections)
    571                     startConnection = true;
    572             }
    573 
    574             if (startConnection)
    575                 startNewConnection();
    576         }
    577     }
    578 
    579     protected void exchangeExpired(HttpExchange exchange)
    580     {
    581         // The exchange may expire while waiting in the
    582         // destination queue, make sure it is removed
    583         synchronized (this)
    584         {
    585             _exchanges.remove(exchange);
    586         }
    587     }
    588 
    589     protected void send(AbstractHttpConnection connection, HttpExchange exchange) throws IOException
    590     {
    591         synchronized (this)
    592         {
    593             // If server closes the connection, put the exchange back
    594             // to the exchange queue and recycle the connection
    595             if (!connection.send(exchange))
    596             {
    597                 if (exchange.getStatus() <= HttpExchange.STATUS_WAITING_FOR_CONNECTION)
    598                     _exchanges.add(0, exchange);
    599                 returnIdleConnection(connection);
    600             }
    601         }
    602     }
    603 
    604     @Override
    605     public synchronized String toString()
    606     {
    607         return String.format("HttpDestination@%x//%s:%d(%d/%d,%d,%d/%d)%n", hashCode(), _address.getHost(), _address.getPort(), _connections.size(), _maxConnections, _idleConnections.size(), _exchanges.size(), _maxQueueSize);
    608     }
    609 
    610     public synchronized String toDetailString()
    611     {
    612         StringBuilder b = new StringBuilder();
    613         b.append(toString());
    614         b.append('\n');
    615         synchronized (this)
    616         {
    617             for (AbstractHttpConnection connection : _connections)
    618             {
    619                 b.append(connection.toDetailString());
    620                 if (_idleConnections.contains(connection))
    621                     b.append(" IDLE");
    622                 b.append('\n');
    623             }
    624         }
    625         b.append("--");
    626         b.append('\n');
    627 
    628         return b.toString();
    629     }
    630 
    631     public void setProxy(Address proxy)
    632     {
    633         _proxy = proxy;
    634     }
    635 
    636     public Address getProxy()
    637     {
    638         return _proxy;
    639     }
    640 
    641     public Authentication getProxyAuthentication()
    642     {
    643         return _proxyAuthentication;
    644     }
    645 
    646     public void setProxyAuthentication(Authentication authentication)
    647     {
    648         _proxyAuthentication = authentication;
    649     }
    650 
    651     public boolean isProxied()
    652     {
    653         return _proxy != null;
    654     }
    655 
    656     public void close() throws IOException
    657     {
    658         synchronized (this)
    659         {
    660             for (AbstractHttpConnection connection : _connections)
    661             {
    662                 connection.close();
    663             }
    664         }
    665     }
    666 
    667     public String dump()
    668     {
    669         return AggregateLifeCycle.dump(this);
    670     }
    671 
    672     public void dump(Appendable out, String indent) throws IOException
    673     {
    674         synchronized (this)
    675         {
    676             out.append(String.valueOf(this));
    677             out.append("idle=");
    678             out.append(String.valueOf(_idleConnections.size()));
    679             out.append(" pending=");
    680             out.append(String.valueOf(_pendingConnections));
    681             out.append("\n");
    682             AggregateLifeCycle.dump(out, indent, _connections);
    683         }
    684     }
    685 
    686     private class ConnectExchange extends ContentExchange
    687     {
    688         private final SelectConnector.UpgradableEndPoint proxyEndPoint;
    689 
    690         public ConnectExchange(Address serverAddress, SelectConnector.UpgradableEndPoint proxyEndPoint)
    691         {
    692             this.proxyEndPoint = proxyEndPoint;
    693             setMethod(HttpMethods.CONNECT);
    694             String serverHostAndPort = serverAddress.toString();
    695             setRequestURI(serverHostAndPort);
    696             addRequestHeader(HttpHeaders.HOST, serverHostAndPort);
    697             addRequestHeader(HttpHeaders.PROXY_CONNECTION, "keep-alive");
    698             addRequestHeader(HttpHeaders.USER_AGENT, "Jetty-Client");
    699         }
    700 
    701         @Override
    702         protected void onResponseComplete() throws IOException
    703         {
    704             int responseStatus = getResponseStatus();
    705             if (responseStatus == HttpStatus.OK_200)
    706             {
    707                 proxyEndPoint.upgrade();
    708             }
    709             else if (responseStatus == HttpStatus.GATEWAY_TIMEOUT_504)
    710             {
    711                 onExpire();
    712             }
    713             else
    714             {
    715                 onException(new ProtocolException("Proxy: " + proxyEndPoint.getRemoteAddr() + ":" + proxyEndPoint.getRemotePort() + " didn't return http return code 200, but " + responseStatus));
    716             }
    717         }
    718 
    719         @Override
    720         protected void onConnectionFailed(Throwable x)
    721         {
    722             HttpDestination.this.onConnectionFailed(x);
    723         }
    724 
    725         @Override
    726         protected void onException(Throwable x)
    727         {
    728             HttpExchange exchange = null;
    729             synchronized (HttpDestination.this)
    730             {
    731                 if (!_exchanges.isEmpty())
    732                     exchange = _exchanges.remove(0);
    733             }
    734             if (exchange != null && exchange.setStatus(STATUS_EXCEPTED))
    735                 exchange.getEventListener().onException(x);
    736         }
    737 
    738         @Override
    739         protected void onExpire()
    740         {
    741             HttpExchange exchange = null;
    742             synchronized (HttpDestination.this)
    743             {
    744                 if (!_exchanges.isEmpty())
    745                     exchange = _exchanges.remove(0);
    746             }
    747             if (exchange != null && exchange.setStatus(STATUS_EXPIRED))
    748                 exchange.getEventListener().onExpire();
    749         }
    750     }
    751 }
    752