Home | History | Annotate | Download | only in jbosh
      1 /*
      2  * Copyright 2009 Mike Cumings
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *   http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.kenai.jbosh;
     18 
     19 import com.kenai.jbosh.ComposableBody.Builder;
     20 import java.util.ArrayList;
     21 import java.util.Iterator;
     22 import java.util.LinkedList;
     23 import java.util.List;
     24 import java.util.Queue;
     25 import java.util.Set;
     26 import java.util.SortedSet;
     27 import java.util.TreeSet;
     28 import java.util.concurrent.CopyOnWriteArraySet;
     29 import java.util.concurrent.Executors;
     30 import java.util.concurrent.RejectedExecutionException;
     31 import java.util.concurrent.ScheduledExecutorService;
     32 import java.util.concurrent.ScheduledFuture;
     33 import java.util.concurrent.TimeUnit;
     34 import java.util.concurrent.atomic.AtomicReference;
     35 import java.util.concurrent.locks.Condition;
     36 import java.util.concurrent.locks.ReentrantLock;
     37 import java.util.logging.Level;
     38 import java.util.logging.Logger;
     39 
     40 /**
     41  * BOSH Client session instance.  Each communication session with a remote
     42  * connection manager is represented and handled by an instance of this
     43  * class.  This is the main entry point for client-side communications.
     44  * To create a new session, a client configuration must first be created
     45  * and then used to create a client instance:
     46  * <pre>
     47  * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
     48  *         "http://server:1234/httpbind", "jabber.org")
     49  *     .setFrom("user (at) jabber.org")
     50  *     .build();
     51  * BOSHClient client = BOSHClient.create(cfg);
     52  * </pre>
     53  * Additional client configuration options are available.  See the
     54  * {@code BOSHClientConfig.Builder} class for more information.
     55  * <p/>
     56  * Once a {@code BOSHClient} instance has been created, communication with
     57  * the remote connection manager can begin.  No attempt will be made to
     58  * establish a connection to the connection manager until the first call
     59  * is made to the {@code send(ComposableBody)} method.  Note that it is
     60  * possible to send an empty body to cause an immediate connection attempt
     61  * to the connection manager.  Sending an empty message would look like
     62  * the following:
     63  * <pre>
     64  * client.send(ComposableBody.builder().build());
     65  * </pre>
     66  * For more information on creating body messages with content, see the
     67  * {@code ComposableBody.Builder} class documentation.
     68  * <p/>
     69  * Once a session has been successfully started, the client instance can be
     70  * used to send arbitrary payload data.  All aspects of the BOSH
     71  * protocol involving setting and processing attributes in the BOSH
     72  * namespace will be handled by the client code transparently and behind the
     73  * scenes.  The user of the client instance can therefore concentrate
     74  * entirely on the content of the message payload, leaving the semantics of
     75  * the BOSH protocol to the client implementation.
     76  * <p/>
     77  * To be notified of incoming messages from the remote connection manager,
     78  * a {@code BOSHClientResponseListener} should be added to the client instance.
     79  * All incoming messages will be published to all response listeners as they
     80  * arrive and are processed.  As with the transmission of payload data via
     81  * the {@code send(ComposableBody)} method, there is no need to worry about
     82  * handling of the BOSH attributes, since this is handled behind the scenes.
     83  * <p/>
     84  * If the connection to the remote connection manager is terminated (either
     85  * explicitly or due to a terminal condition of some sort), all connection
     86  * listeners will be notified.  After the connection has been closed, the
     87  * client instance is considered dead and a new one must be created in order
     88  * to resume communications with the remote server.
     89  * <p/>
     90  * Instances of this class are thread-safe.
     91  *
     92  * @see BOSHClientConfig.Builder
     93  * @see BOSHClientResponseListener
     94  * @see BOSHClientConnListener
     95  * @see ComposableBody.Builder
     96  */
     97 public final class BOSHClient {
     98 
     99     /**
    100      * Logger.
    101      */
    102     private static final Logger LOG = Logger.getLogger(
    103             BOSHClient.class.getName());
    104 
    105     /**
    106      * Value of the 'type' attribute used for session termination.
    107      */
    108     private static final String TERMINATE = "terminate";
    109 
    110     /**
    111      * Value of the 'type' attribute used for recoverable errors.
    112      */
    113     private static final String ERROR = "error";
    114 
    115     /**
    116      * Message to use for interrupted exceptions.
    117      */
    118     private static final String INTERRUPTED = "Interrupted";
    119 
    120     /**
    121      * Message used for unhandled exceptions.
    122      */
    123     private static final String UNHANDLED = "Unhandled Exception";
    124 
    125     /**
    126      * Message used whena null listener is detected.
    127      */
    128     private static final String NULL_LISTENER = "Listener may not b enull";
    129 
    130     /**
    131      * Default empty request delay.
    132      */
    133     private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;
    134 
    135     /**
    136      * Amount of time to wait before sending an empty request, in
    137      * milliseconds.
    138      */
    139     private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
    140             BOSHClient.class.getName() + ".emptyRequestDelay",
    141             DEFAULT_EMPTY_REQUEST_DELAY);
    142 
    143     /**
    144      * Default value for the pause margin.
    145      */
    146     private static final int DEFAULT_PAUSE_MARGIN = 500;
    147 
    148     /**
    149      * The amount of time in milliseconds which will be reserved as a
    150      * safety margin when scheduling empty requests against a maxpause
    151      * value.   This should give us enough time to build the message
    152      * and transport it to the remote host.
    153      */
    154     private static final int PAUSE_MARGIN = Integer.getInteger(
    155             BOSHClient.class.getName() + ".pauseMargin",
    156             DEFAULT_PAUSE_MARGIN);
    157 
    158     /**
    159      * Flag indicating whether or not we want to perform assertions.
    160      */
    161     private static final boolean ASSERTIONS;
    162 
    163     /**
    164      * Connection listeners.
    165      */
    166     private final Set<BOSHClientConnListener> connListeners =
    167             new CopyOnWriteArraySet<BOSHClientConnListener>();
    168 
    169     /**
    170      * Request listeners.
    171      */
    172     private final Set<BOSHClientRequestListener> requestListeners =
    173             new CopyOnWriteArraySet<BOSHClientRequestListener>();
    174 
    175     /**
    176      * Response listeners.
    177      */
    178     private final Set<BOSHClientResponseListener> responseListeners =
    179             new CopyOnWriteArraySet<BOSHClientResponseListener>();
    180 
    181     /**
    182      * Lock instance.
    183      */
    184     private final ReentrantLock lock = new ReentrantLock();
    185 
    186     /**
    187      * Condition indicating that there are messages to be exchanged.
    188      */
    189     private final Condition notEmpty = lock.newCondition();
    190 
    191     /**
    192      * Condition indicating that there are available slots for sending
    193      * messages.
    194      */
    195     private final Condition notFull = lock.newCondition();
    196 
    197     /**
    198      * Condition indicating that there are no outstanding connections.
    199      */
    200     private final Condition drained = lock.newCondition();
    201 
    202     /**
    203      * Session configuration.
    204      */
    205     private final BOSHClientConfig cfg;
    206 
    207     /**
    208      * Processor thread runnable instance.
    209      */
    210     private final Runnable procRunnable = new Runnable() {
    211         /**
    212          * Process incoming messages.
    213          */
    214         public void run() {
    215             processMessages();
    216         }
    217     };
    218 
    219     /**
    220      * Processor thread runnable instance.
    221      */
    222     private final Runnable emptyRequestRunnable = new Runnable() {
    223         /**
    224          * Process incoming messages.
    225          */
    226         public void run() {
    227             sendEmptyRequest();
    228         }
    229     };
    230 
    231     /**
    232      * HTTPSender instance.
    233      */
    234     private final HTTPSender httpSender =
    235             new ApacheHTTPSender();
    236 
    237     /**
    238      * Storage for test hook implementation.
    239      */
    240     private final AtomicReference<ExchangeInterceptor> exchInterceptor =
    241             new AtomicReference<ExchangeInterceptor>();
    242 
    243     /**
    244      * Request ID sequence to use for the session.
    245      */
    246     private final RequestIDSequence requestIDSeq = new RequestIDSequence();
    247 
    248     /**
    249      * ScheduledExcecutor to use for deferred tasks.
    250      */
    251     private final ScheduledExecutorService schedExec =
    252             Executors.newSingleThreadScheduledExecutor();
    253 
    254     /************************************************************
    255      * The following vars must be accessed via the lock instance.
    256      */
    257 
    258     /**
    259      * Thread which is used to process responses from the connection
    260      * manager.  Becomes null when session is terminated.
    261      */
    262     private Thread procThread;
    263 
    264     /**
    265      * Future for sending a deferred empty request, if needed.
    266      */
    267     private ScheduledFuture emptyRequestFuture;
    268 
    269     /**
    270      * Connection Manager session parameters.  Only available when in a
    271      * connected state.
    272      */
    273     private CMSessionParams cmParams;
    274 
    275     /**
    276      * List of active/outstanding requests.
    277      */
    278     private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();
    279 
    280     /**
    281      * Set of RIDs which have been received, for the purpose of sending
    282      * response acknowledgements.
    283      */
    284     private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
    285 
    286     /**
    287      * The highest RID that we've already received a response for.  This value
    288      * is used to implement response acks.
    289      */
    290     private Long responseAck = Long.valueOf(-1L);
    291 
    292     /**
    293      * List of requests which have been made but not yet acknowledged.  This
    294      * list remains unpopulated if the CM is not acking requests.
    295      */
    296     private List<ComposableBody> pendingRequestAcks =
    297             new ArrayList<ComposableBody>();
    298 
    299     ///////////////////////////////////////////////////////////////////////////
    300     // Classes:
    301 
    302     /**
    303      * Class used in testing to dynamically manipulate received exchanges
    304      * at test runtime.
    305      */
    306     abstract static class ExchangeInterceptor {
    307         /**
    308          * Limit construction.
    309          */
    310         ExchangeInterceptor() {
    311             // Empty;
    312         }
    313 
    314         /**
    315          * Hook to manipulate an HTTPExchange as is is about to be processed.
    316          *
    317          * @param exch original exchange that would be processed
    318          * @return replacement exchange instance, or {@code null} to skip
    319          *  processing of this exchange
    320          */
    321         abstract HTTPExchange interceptExchange(final HTTPExchange exch);
    322     }
    323 
    324     ///////////////////////////////////////////////////////////////////////////
    325     // Constructors:
    326 
    327     /**
    328      * Determine whether or not we should perform assertions.  Assertions
    329      * can be specified via system property explicitly, or defaulted to
    330      * the JVM assertions status.
    331      */
    332     static {
    333         final String prop =
    334                 BOSHClient.class.getSimpleName() + ".assertionsEnabled";
    335         boolean enabled = false;
    336         if (System.getProperty(prop) == null) {
    337             assert enabled = true;
    338         } else {
    339             enabled = Boolean.getBoolean(prop);
    340         }
    341         ASSERTIONS = enabled;
    342     }
    343 
    344     /**
    345      * Prevent direct construction.
    346      */
    347     private BOSHClient(final BOSHClientConfig sessCfg) {
    348         cfg = sessCfg;
    349         init();
    350     }
    351 
    352     ///////////////////////////////////////////////////////////////////////////
    353     // Public methods:
    354 
    355     /**
    356      * Create a new BOSH client session using the client configuration
    357      * information provided.
    358      *
    359      * @param clientCfg session configuration
    360      * @return BOSH session instance
    361      */
    362     public static BOSHClient create(final BOSHClientConfig clientCfg) {
    363         if (clientCfg == null) {
    364             throw(new IllegalArgumentException(
    365                     "Client configuration may not be null"));
    366         }
    367         return new BOSHClient(clientCfg);
    368     }
    369 
    370     /**
    371      * Get the client configuration that was used to create this client
    372      * instance.
    373      *
    374      * @return client configuration
    375      */
    376     public BOSHClientConfig getBOSHClientConfig() {
    377         return cfg;
    378     }
    379 
    380     /**
    381      * Adds a connection listener to the session.
    382      *
    383      * @param listener connection listener to add, if not already added
    384      */
    385     public void addBOSHClientConnListener(
    386             final BOSHClientConnListener listener) {
    387         if (listener == null) {
    388             throw(new IllegalArgumentException(NULL_LISTENER));
    389         }
    390         connListeners.add(listener);
    391     }
    392 
    393     /**
    394      * Removes a connection listener from the session.
    395      *
    396      * @param listener connection listener to remove, if previously added
    397      */
    398     public void removeBOSHClientConnListener(
    399             final BOSHClientConnListener listener) {
    400         if (listener == null) {
    401             throw(new IllegalArgumentException(NULL_LISTENER));
    402         }
    403         connListeners.remove(listener);
    404     }
    405 
    406     /**
    407      * Adds a request message listener to the session.
    408      *
    409      * @param listener request listener to add, if not already added
    410      */
    411     public void addBOSHClientRequestListener(
    412             final BOSHClientRequestListener listener) {
    413         if (listener == null) {
    414             throw(new IllegalArgumentException(NULL_LISTENER));
    415         }
    416         requestListeners.add(listener);
    417     }
    418 
    419     /**
    420      * Removes a request message listener from the session, if previously
    421      * added.
    422      *
    423      * @param listener instance to remove
    424      */
    425     public void removeBOSHClientRequestListener(
    426             final BOSHClientRequestListener listener) {
    427         if (listener == null) {
    428             throw(new IllegalArgumentException(NULL_LISTENER));
    429         }
    430         requestListeners.remove(listener);
    431     }
    432 
    433     /**
    434      * Adds a response message listener to the session.
    435      *
    436      * @param listener response listener to add, if not already added
    437      */
    438     public void addBOSHClientResponseListener(
    439             final BOSHClientResponseListener listener) {
    440         if (listener == null) {
    441             throw(new IllegalArgumentException(NULL_LISTENER));
    442         }
    443         responseListeners.add(listener);
    444     }
    445 
    446     /**
    447      * Removes a response message listener from the session, if previously
    448      * added.
    449      *
    450      * @param listener instance to remove
    451      */
    452     public void removeBOSHClientResponseListener(
    453             final BOSHClientResponseListener listener) {
    454         if (listener == null) {
    455             throw(new IllegalArgumentException(NULL_LISTENER));
    456         }
    457         responseListeners.remove(listener);
    458     }
    459 
    460     /**
    461      * Send the provided message data to the remote connection manager.  The
    462      * provided message body does not need to have any BOSH-specific attribute
    463      * information set.  It only needs to contain the actual message payload
    464      * that should be delivered to the remote server.
    465      * <p/>
    466      * The first call to this method will result in a connection attempt
    467      * to the remote connection manager.  Subsequent calls to this method
    468      * will block until the underlying session state allows for the message
    469      * to be transmitted.  In certain scenarios - such as when the maximum
    470      * number of outbound connections has been reached - calls to this method
    471      * will block for short periods of time.
    472      *
    473      * @param body message data to send to remote server
    474      * @throws BOSHException on message transmission failure
    475      */
    476     public void send(final ComposableBody body) throws BOSHException {
    477         assertUnlocked();
    478         if (body == null) {
    479             throw(new IllegalArgumentException(
    480                     "Message body may not be null"));
    481         }
    482 
    483         HTTPExchange exch;
    484         CMSessionParams params;
    485         lock.lock();
    486         try {
    487             blockUntilSendable(body);
    488             if (!isWorking() && !isTermination(body)) {
    489                 throw(new BOSHException(
    490                         "Cannot send message when session is closed"));
    491             }
    492 
    493             long rid = requestIDSeq.getNextRID();
    494             ComposableBody request = body;
    495             params = cmParams;
    496             if (params == null && exchanges.isEmpty()) {
    497                 // This is the first message being sent
    498                 request = applySessionCreationRequest(rid, body);
    499             } else {
    500                 request = applySessionData(rid, body);
    501                 if (cmParams.isAckingRequests()) {
    502                     pendingRequestAcks.add(request);
    503                 }
    504             }
    505             exch = new HTTPExchange(request);
    506             exchanges.add(exch);
    507             notEmpty.signalAll();
    508             clearEmptyRequest();
    509         } finally {
    510             lock.unlock();
    511         }
    512         AbstractBody finalReq = exch.getRequest();
    513         HTTPResponse resp = httpSender.send(params, finalReq);
    514         exch.setHTTPResponse(resp);
    515         fireRequestSent(finalReq);
    516     }
    517 
    518     /**
    519      * Attempt to pause the current session.  When supported by the remote
    520      * connection manager, pausing the session will result in the connection
    521      * manager closing out all outstanding requests (including the pause
    522      * request) and increases the inactivity timeout of the session.  The
    523      * exact value of the temporary timeout is dependent upon the connection
    524      * manager.  This method should be used if a client encounters an
    525      * exceptional temporary situation during which it will be unable to send
    526      * requests to the connection manager for a period of time greater than
    527      * the maximum inactivity period.
    528      *
    529      * The session will revert back to it's normal, unpaused state when the
    530      * client sends it's next message.
    531      *
    532      * @return {@code true} if the connection manager supports session pausing,
    533      *  {@code false} if the connection manager does not support session
    534      *  pausing or if the session has not yet been established
    535      */
    536     public boolean pause() {
    537         assertUnlocked();
    538         lock.lock();
    539         AttrMaxPause maxPause = null;
    540         try {
    541             if (cmParams == null) {
    542                 return false;
    543             }
    544 
    545             maxPause = cmParams.getMaxPause();
    546             if (maxPause == null) {
    547                 return false;
    548             }
    549         } finally {
    550             lock.unlock();
    551         }
    552         try {
    553             send(ComposableBody.builder()
    554                     .setAttribute(Attributes.PAUSE, maxPause.toString())
    555                     .build());
    556         } catch (BOSHException boshx) {
    557             LOG.log(Level.FINEST, "Could not send pause", boshx);
    558         }
    559         return true;
    560     }
    561 
    562     /**
    563      * End the BOSH session by disconnecting from the remote BOSH connection
    564      * manager.
    565      *
    566      * @throws BOSHException when termination message cannot be sent
    567      */
    568     public void disconnect() throws BOSHException {
    569         disconnect(ComposableBody.builder().build());
    570     }
    571 
    572     /**
    573      * End the BOSH session by disconnecting from the remote BOSH connection
    574      * manager, sending the provided content in the final connection
    575      * termination message.
    576      *
    577      * @param msg final message to send
    578      * @throws BOSHException when termination message cannot be sent
    579      */
    580     public void disconnect(final ComposableBody msg) throws BOSHException {
    581         if (msg == null) {
    582             throw(new IllegalArgumentException(
    583                     "Message body may not be null"));
    584         }
    585 
    586         Builder builder = msg.rebuild();
    587         builder.setAttribute(Attributes.TYPE, TERMINATE);
    588         send(builder.build());
    589     }
    590 
    591     /**
    592      * Forcibly close this client session instance.  The preferred mechanism
    593      * to close the connection is to send a disconnect message and wait for
    594      * organic termination.  Calling this method simply shuts down the local
    595      * session without sending a termination message, releasing all resources
    596      * associated with the session.
    597      */
    598     public void close() {
    599         dispose(new BOSHException("Session explicitly closed by caller"));
    600     }
    601 
    602     ///////////////////////////////////////////////////////////////////////////
    603     // Package-private methods:
    604 
    605     /**
    606      * Get the current CM session params.
    607      *
    608      * @return current session params, or {@code null}
    609      */
    610     CMSessionParams getCMSessionParams() {
    611         lock.lock();
    612         try {
    613             return cmParams;
    614         } finally {
    615             lock.unlock();
    616         }
    617     }
    618 
    619     /**
    620      * Wait until no more messages are waiting to be processed.
    621      */
    622     void drain() {
    623         lock.lock();
    624         try {
    625             LOG.finest("Waiting while draining...");
    626             while (isWorking()
    627                     && (emptyRequestFuture == null
    628                     || emptyRequestFuture.isDone())) {
    629                 try {
    630                     drained.await();
    631                 } catch (InterruptedException intx) {
    632                     LOG.log(Level.FINEST, INTERRUPTED, intx);
    633                 }
    634             }
    635             LOG.finest("Drained");
    636         } finally {
    637             lock.unlock();
    638         }
    639     }
    640 
    641     /**
    642      * Test method used to forcibly discard next exchange.
    643      *
    644      * @param interceptor exchange interceptor
    645      */
    646     void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
    647         exchInterceptor.set(interceptor);
    648     }
    649 
    650 
    651     ///////////////////////////////////////////////////////////////////////////
    652     // Private methods:
    653 
    654     /**
    655      * Initialize the session.  This initializes the underlying HTTP
    656      * transport implementation and starts the receive thread.
    657      */
    658     private void init() {
    659         assertUnlocked();
    660 
    661         lock.lock();
    662         try {
    663             httpSender.init(cfg);
    664             procThread = new Thread(procRunnable);
    665             procThread.setDaemon(true);
    666             procThread.setName(BOSHClient.class.getSimpleName()
    667                     + "[" + System.identityHashCode(this)
    668                     + "]: Receive thread");
    669             procThread.start();
    670         } finally {
    671             lock.unlock();
    672         }
    673     }
    674 
    675     /**
    676      * Destroy this session.
    677      *
    678      * @param cause the reason for the session termination, or {@code null}
    679      *  for normal termination
    680      */
    681     private void dispose(final Throwable cause) {
    682         assertUnlocked();
    683 
    684         lock.lock();
    685         try {
    686             if (procThread == null) {
    687                 // Already disposed
    688                 return;
    689             }
    690             procThread = null;
    691         } finally {
    692             lock.unlock();
    693         }
    694 
    695         if (cause == null) {
    696             fireConnectionClosed();
    697         } else {
    698             fireConnectionClosedOnError(cause);
    699         }
    700 
    701         lock.lock();
    702         try {
    703             clearEmptyRequest();
    704             exchanges = null;
    705             cmParams = null;
    706             pendingResponseAcks = null;
    707             pendingRequestAcks = null;
    708             notEmpty.signalAll();
    709             notFull.signalAll();
    710             drained.signalAll();
    711         } finally {
    712             lock.unlock();
    713         }
    714 
    715         httpSender.destroy();
    716         schedExec.shutdownNow();
    717     }
    718 
    719     /**
    720      * Determines if the message body specified indicates a request to
    721      * pause the session.
    722      *
    723      * @param msg message to evaluate
    724      * @return {@code true} if the message is a pause request, {@code false}
    725      *  otherwise
    726      */
    727     private static boolean isPause(final AbstractBody msg) {
    728         return msg.getAttribute(Attributes.PAUSE) != null;
    729     }
    730 
    731     /**
    732      * Determines if the message body specified indicates a termination of
    733      * the session.
    734      *
    735      * @param msg message to evaluate
    736      * @return {@code true} if the message is a session termination,
    737      *  {@code false} otherwise
    738      */
    739     private static boolean isTermination(final AbstractBody msg) {
    740         return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
    741     }
    742 
    743     /**
    744      * Evaluates the HTTP response code and response message and returns the
    745      * terminal binding condition that it describes, if any.
    746      *
    747      * @param respCode HTTP response code
    748      * @param respBody response body
    749      * @return terminal binding condition, or {@code null} if not a terminal
    750      *  binding condition message
    751      */
    752     private TerminalBindingCondition getTerminalBindingCondition(
    753             final int respCode,
    754             final AbstractBody respBody) {
    755         assertLocked();
    756 
    757         if (isTermination(respBody)) {
    758             String str = respBody.getAttribute(Attributes.CONDITION);
    759             return TerminalBindingCondition.forString(str);
    760         }
    761         // Check for deprecated HTTP Error Conditions
    762         if (cmParams != null && cmParams.getVersion() == null) {
    763             return TerminalBindingCondition.forHTTPResponseCode(respCode);
    764         }
    765         return null;
    766     }
    767 
    768     /**
    769      * Determines if the message specified is immediately sendable or if it
    770      * needs to block until the session state changes.
    771      *
    772      * @param msg message to evaluate
    773      * @return {@code true} if the message can be immediately sent,
    774      *  {@code false} otherwise
    775      */
    776     private boolean isImmediatelySendable(final AbstractBody msg) {
    777         assertLocked();
    778 
    779         if (cmParams == null) {
    780             // block if we're waiting for a response to our first request
    781             return exchanges.isEmpty();
    782         }
    783 
    784         AttrRequests requests = cmParams.getRequests();
    785         if (requests == null) {
    786             return true;
    787         }
    788         int maxRequests = requests.intValue();
    789         if (exchanges.size() < maxRequests) {
    790             return true;
    791         }
    792         if (exchanges.size() == maxRequests
    793                 && (isTermination(msg) || isPause(msg))) {
    794             // One additional terminate or pause message is allowed
    795             return true;
    796         }
    797         return false;
    798     }
    799 
    800     /**
    801      * Determines whether or not the session is still active.
    802      *
    803      * @return {@code true} if it is, {@code false} otherwise
    804      */
    805     private boolean isWorking() {
    806         assertLocked();
    807 
    808         return procThread != null;
    809     }
    810 
    811     /**
    812      * Blocks until either the message provided becomes immediately
    813      * sendable or until the session is terminated.
    814      *
    815      * @param msg message to evaluate
    816      */
    817     private void blockUntilSendable(final AbstractBody msg) {
    818         assertLocked();
    819 
    820         while (isWorking() && !isImmediatelySendable(msg)) {
    821             try {
    822                 notFull.await();
    823             } catch (InterruptedException intx) {
    824                 LOG.log(Level.FINEST, INTERRUPTED, intx);
    825             }
    826         }
    827     }
    828 
    829     /**
    830      * Modifies the specified body message such that it becomes a new
    831      * BOSH session creation request.
    832      *
    833      * @param rid request ID to use
    834      * @param orig original body to modify
    835      * @return modified message which acts as a session creation request
    836      */
    837     private ComposableBody applySessionCreationRequest(
    838             final long rid, final ComposableBody orig) throws BOSHException {
    839         assertLocked();
    840 
    841         Builder builder = orig.rebuild();
    842         builder.setAttribute(Attributes.TO, cfg.getTo());
    843         builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
    844         builder.setAttribute(Attributes.VER,
    845                 AttrVersion.getSupportedVersion().toString());
    846         builder.setAttribute(Attributes.WAIT, "60");
    847         builder.setAttribute(Attributes.HOLD, "1");
    848         builder.setAttribute(Attributes.RID, Long.toString(rid));
    849         applyRoute(builder);
    850         applyFrom(builder);
    851         builder.setAttribute(Attributes.ACK, "1");
    852 
    853         // Make sure the following are NOT present (i.e., during retries)
    854         builder.setAttribute(Attributes.SID, null);
    855         return builder.build();
    856     }
    857 
    858     /**
    859      * Applies routing information to the request message who's builder has
    860      * been provided.
    861      *
    862      * @param builder builder instance to add routing information to
    863      */
    864     private void applyRoute(final Builder builder) {
    865         assertLocked();
    866 
    867         String route = cfg.getRoute();
    868         if (route != null) {
    869             builder.setAttribute(Attributes.ROUTE, route);
    870         }
    871     }
    872 
    873     /**
    874      * Applies the local station ID information to the request message who's
    875      * builder has been provided.
    876      *
    877      * @param builder builder instance to add station ID information to
    878      */
    879     private void applyFrom(final Builder builder) {
    880         assertLocked();
    881 
    882         String from = cfg.getFrom();
    883         if (from != null) {
    884             builder.setAttribute(Attributes.FROM, from);
    885         }
    886     }
    887 
    888     /**
    889      * Applies existing session data to the outbound request, returning the
    890      * modified request.
    891      *
    892      * This method assumes the lock is currently held.
    893      *
    894      * @param rid request ID to use
    895      * @param orig original/raw request
    896      * @return modified request with session information applied
    897      */
    898     private ComposableBody applySessionData(
    899             final long rid,
    900             final ComposableBody orig) throws BOSHException {
    901         assertLocked();
    902 
    903         Builder builder = orig.rebuild();
    904         builder.setAttribute(Attributes.SID,
    905                 cmParams.getSessionID().toString());
    906         builder.setAttribute(Attributes.RID, Long.toString(rid));
    907         applyResponseAcknowledgement(builder, rid);
    908         return builder.build();
    909     }
    910 
    911     /**
    912      * Sets the 'ack' attribute of the request to the value of the highest
    913      * 'rid' of a request for which it has already received a response in the
    914      * case where it has also received all responses associated with lower
    915      * 'rid' values.  The only exception is that, after its session creation
    916      * request, the client SHOULD NOT include an 'ack' attribute in any request
    917      * if it has received responses to all its previous requests.
    918      *
    919      * @param builder message builder
    920      * @param rid current request RID
    921      */
    922     private void applyResponseAcknowledgement(
    923             final Builder builder,
    924             final long rid) {
    925         assertLocked();
    926 
    927         if (responseAck.equals(Long.valueOf(-1L))) {
    928             // We have not received any responses yet
    929             return;
    930         }
    931 
    932         Long prevRID = Long.valueOf(rid - 1L);
    933         if (responseAck.equals(prevRID)) {
    934             // Implicit ack
    935             return;
    936         }
    937 
    938         builder.setAttribute(Attributes.ACK, responseAck.toString());
    939     }
    940 
    941     /**
    942      * While we are "connected", process received responses.
    943      *
    944      * This method is run in the processing thread.
    945      */
    946     private void processMessages() {
    947         LOG.log(Level.FINEST, "Processing thread starting");
    948         try {
    949             HTTPExchange exch;
    950             do {
    951                 exch = nextExchange();
    952                 if (exch == null) {
    953                     break;
    954                 }
    955 
    956                 // Test hook to manipulate what the client sees:
    957                 ExchangeInterceptor interceptor = exchInterceptor.get();
    958                 if (interceptor != null) {
    959                     HTTPExchange newExch = interceptor.interceptExchange(exch);
    960                     if (newExch == null) {
    961                         LOG.log(Level.FINE, "Discarding exchange on request "
    962                                 + "of test hook: RID="
    963                                 + exch.getRequest().getAttribute(
    964                                     Attributes.RID));
    965                         lock.lock();
    966                         try {
    967                             exchanges.remove(exch);
    968                         } finally {
    969                             lock.unlock();
    970                         }
    971                         continue;
    972                     }
    973                     exch = newExch;
    974                 }
    975 
    976                 processExchange(exch);
    977             } while (true);
    978         } finally {
    979             LOG.log(Level.FINEST, "Processing thread exiting");
    980         }
    981 
    982     }
    983 
    984     /**
    985      * Get the next message exchange to process, blocking until one becomes
    986      * available if nothing is already waiting for processing.
    987      *
    988      * @return next available exchange to process, or {@code null} if no
    989      *  exchanges are immediately available
    990      */
    991     private HTTPExchange nextExchange() {
    992         assertUnlocked();
    993 
    994         final Thread thread = Thread.currentThread();
    995         HTTPExchange exch = null;
    996         lock.lock();
    997         try {
    998             do {
    999                 if (!thread.equals(procThread)) {
   1000                     break;
   1001                 }
   1002                 exch = exchanges.peek();
   1003                 if (exch == null) {
   1004                     try {
   1005                         notEmpty.await();
   1006                     } catch (InterruptedException intx) {
   1007                         LOG.log(Level.FINEST, INTERRUPTED, intx);
   1008                     }
   1009                 }
   1010             } while (exch == null);
   1011         } finally {
   1012             lock.unlock();
   1013         }
   1014         return exch;
   1015     }
   1016 
   1017     /**
   1018      * Process the next, provided exchange.  This is the main processing
   1019      * method of the receive thread.
   1020      *
   1021      * @param exch message exchange to process
   1022      */
   1023     private void processExchange(final HTTPExchange exch) {
   1024         assertUnlocked();
   1025 
   1026         HTTPResponse resp;
   1027         AbstractBody body;
   1028         int respCode;
   1029         try {
   1030             resp = exch.getHTTPResponse();
   1031             body = resp.getBody();
   1032             respCode = resp.getHTTPStatus();
   1033         } catch (BOSHException boshx) {
   1034             LOG.log(Level.FINEST, "Could not obtain response", boshx);
   1035             dispose(boshx);
   1036             return;
   1037         } catch (InterruptedException intx) {
   1038             LOG.log(Level.FINEST, INTERRUPTED, intx);
   1039             dispose(intx);
   1040             return;
   1041         }
   1042         fireResponseReceived(body);
   1043 
   1044         // Process the message with the current session state
   1045         AbstractBody req = exch.getRequest();
   1046         CMSessionParams params;
   1047         List<HTTPExchange> toResend = null;
   1048         lock.lock();
   1049         try {
   1050             // Check for session creation response info, if needed
   1051             if (cmParams == null) {
   1052                 cmParams = CMSessionParams.fromSessionInit(req, body);
   1053 
   1054                 // The following call handles the lock. It's not an escape.
   1055                 fireConnectionEstablished();
   1056             }
   1057             params = cmParams;
   1058 
   1059             checkForTerminalBindingConditions(body, respCode);
   1060             if (isTermination(body)) {
   1061                 // Explicit termination
   1062                 lock.unlock();
   1063                 dispose(null);
   1064                 return;
   1065             }
   1066 
   1067             if (isRecoverableBindingCondition(body)) {
   1068                 // Retransmit outstanding requests
   1069                 if (toResend == null) {
   1070                     toResend = new ArrayList<HTTPExchange>(exchanges.size());
   1071                 }
   1072                 for (HTTPExchange exchange : exchanges) {
   1073                     HTTPExchange resendExch =
   1074                             new HTTPExchange(exchange.getRequest());
   1075                     toResend.add(resendExch);
   1076                 }
   1077                 for (HTTPExchange exchange : toResend) {
   1078                     exchanges.add(exchange);
   1079                 }
   1080             } else {
   1081                 // Process message as normal
   1082                 processRequestAcknowledgements(req, body);
   1083                 processResponseAcknowledgementData(req);
   1084                 HTTPExchange resendExch =
   1085                         processResponseAcknowledgementReport(body);
   1086                 if (resendExch != null && toResend == null) {
   1087                     toResend = new ArrayList<HTTPExchange>(1);
   1088                     toResend.add(resendExch);
   1089                     exchanges.add(resendExch);
   1090                 }
   1091             }
   1092         } catch (BOSHException boshx) {
   1093             LOG.log(Level.FINEST, "Could not process response", boshx);
   1094             lock.unlock();
   1095             dispose(boshx);
   1096             return;
   1097         } finally {
   1098             if (lock.isHeldByCurrentThread()) {
   1099                 try {
   1100                     exchanges.remove(exch);
   1101                     if (exchanges.isEmpty()) {
   1102                         scheduleEmptyRequest(processPauseRequest(req));
   1103                     }
   1104                     notFull.signalAll();
   1105                 } finally {
   1106                     lock.unlock();
   1107                 }
   1108             }
   1109         }
   1110 
   1111         if (toResend != null) {
   1112             for (HTTPExchange resend : toResend) {
   1113                 HTTPResponse response =
   1114                         httpSender.send(params, resend.getRequest());
   1115                 resend.setHTTPResponse(response);
   1116                 fireRequestSent(resend.getRequest());
   1117             }
   1118         }
   1119     }
   1120 
   1121     /**
   1122      * Clears any scheduled empty requests.
   1123      */
   1124     private void clearEmptyRequest() {
   1125         assertLocked();
   1126 
   1127         if (emptyRequestFuture != null) {
   1128             emptyRequestFuture.cancel(false);
   1129             emptyRequestFuture = null;
   1130         }
   1131     }
   1132 
   1133     /**
   1134      * Calculates the default empty request delay/interval to use for the
   1135      * active session.
   1136      *
   1137      * @return delay in milliseconds
   1138      */
   1139     private long getDefaultEmptyRequestDelay() {
   1140         assertLocked();
   1141 
   1142         // Figure out how long we should wait before sending an empty request
   1143         AttrPolling polling = cmParams.getPollingInterval();
   1144         long delay;
   1145         if (polling == null) {
   1146             delay = EMPTY_REQUEST_DELAY;
   1147         } else {
   1148             delay = polling.getInMilliseconds();
   1149         }
   1150         return delay;
   1151     }
   1152 
   1153     /**
   1154      * Schedule an empty request to be sent if no other requests are
   1155      * sent in a reasonable amount of time.
   1156      */
   1157     private void scheduleEmptyRequest(long delay) {
   1158         assertLocked();
   1159         if (delay < 0L) {
   1160             throw(new IllegalArgumentException(
   1161                     "Empty request delay must be >= 0 (was: " + delay + ")"));
   1162         }
   1163 
   1164         clearEmptyRequest();
   1165         if (!isWorking()) {
   1166             return;
   1167         }
   1168 
   1169         // Schedule the transmission
   1170         if (LOG.isLoggable(Level.FINER)) {
   1171             LOG.finer("Scheduling empty request in " + delay + "ms");
   1172         }
   1173         try {
   1174             emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
   1175                     delay, TimeUnit.MILLISECONDS);
   1176         } catch (RejectedExecutionException rex) {
   1177             LOG.log(Level.FINEST, "Could not schedule empty request", rex);
   1178         }
   1179         drained.signalAll();
   1180     }
   1181 
   1182     /**
   1183      * Sends an empty request to maintain session requirements.  If a request
   1184      * is sent within a reasonable time window, the empty request transmission
   1185      * will be cancelled.
   1186      */
   1187     private void sendEmptyRequest() {
   1188         assertUnlocked();
   1189         // Send an empty request
   1190         LOG.finest("Sending empty request");
   1191         try {
   1192             send(ComposableBody.builder().build());
   1193         } catch (BOSHException boshx) {
   1194             dispose(boshx);
   1195         }
   1196     }
   1197 
   1198     /**
   1199      * Assert that the internal lock is held.
   1200      */
   1201     private void assertLocked() {
   1202         if (ASSERTIONS) {
   1203             if (!lock.isHeldByCurrentThread()) {
   1204                 throw(new AssertionError("Lock is not held by current thread"));
   1205             }
   1206             return;
   1207         }
   1208     }
   1209 
   1210     /**
   1211      * Assert that the internal lock is *not* held.
   1212      */
   1213     private void assertUnlocked() {
   1214         if (ASSERTIONS) {
   1215             if (lock.isHeldByCurrentThread()) {
   1216                 throw(new AssertionError("Lock is held by current thread"));
   1217             }
   1218             return;
   1219         }
   1220     }
   1221 
   1222     /**
   1223      * Checks to see if the response indicates a terminal binding condition
   1224      * (as per XEP-0124 section 17).  If it does, an exception is thrown.
   1225      *
   1226      * @param body response body to evaluate
   1227      * @param code HTTP response code
   1228      * @throws BOSHException if a terminal binding condition is detected
   1229      */
   1230     private void checkForTerminalBindingConditions(
   1231             final AbstractBody body,
   1232             final int code)
   1233             throws BOSHException {
   1234         TerminalBindingCondition cond =
   1235                 getTerminalBindingCondition(code, body);
   1236         if (cond != null) {
   1237             throw(new BOSHException(
   1238                     "Terminal binding condition encountered: "
   1239                     + cond.getCondition() + "  ("
   1240                     + cond.getMessage() + ")"));
   1241         }
   1242     }
   1243 
   1244     /**
   1245      * Determines whether or not the response indicates a recoverable
   1246      * binding condition (as per XEP-0124 section 17).
   1247      *
   1248      * @param resp response body
   1249      * @return {@code true} if it does, {@code false} otherwise
   1250      */
   1251     private static boolean isRecoverableBindingCondition(
   1252             final AbstractBody resp) {
   1253         return ERROR.equals(resp.getAttribute(Attributes.TYPE));
   1254     }
   1255 
   1256     /**
   1257      * Process the request to determine if the empty request delay
   1258      * can be determined by looking to see if the request is a pause
   1259      * request.  If it can, the request's delay is returned, otherwise
   1260      * the default delay is returned.
   1261      *
   1262      * @return delay in milliseconds that should elapse prior to an
   1263      *  empty message being sent
   1264      */
   1265     private long processPauseRequest(
   1266             final AbstractBody req) {
   1267         assertLocked();
   1268 
   1269         if (cmParams != null && cmParams.getMaxPause() != null) {
   1270             try {
   1271                 AttrPause pause = AttrPause.createFromString(
   1272                         req.getAttribute(Attributes.PAUSE));
   1273                 if (pause != null) {
   1274                     long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
   1275                     if (delay < 0) {
   1276                         delay = EMPTY_REQUEST_DELAY;
   1277                     }
   1278                     return delay;
   1279                 }
   1280             } catch (BOSHException boshx) {
   1281                 LOG.log(Level.FINEST, "Could not extract", boshx);
   1282             }
   1283         }
   1284 
   1285         return getDefaultEmptyRequestDelay();
   1286     }
   1287 
   1288     /**
   1289      * Check the response for request acknowledgements and take appropriate
   1290      * action.
   1291      *
   1292      * This method assumes the lock is currently held.
   1293      *
   1294      * @param req request
   1295      * @param resp response
   1296      */
   1297     private void processRequestAcknowledgements(
   1298             final AbstractBody req, final AbstractBody resp) {
   1299         assertLocked();
   1300 
   1301         if (!cmParams.isAckingRequests()) {
   1302             return;
   1303         }
   1304 
   1305         // If a report or time attribute is set, we aren't acking anything
   1306         if (resp.getAttribute(Attributes.REPORT) != null) {
   1307             return;
   1308         }
   1309 
   1310         // Figure out what the highest acked RID is
   1311         String acked = resp.getAttribute(Attributes.ACK);
   1312         Long ackUpTo;
   1313         if (acked == null) {
   1314             // Implicit ack of all prior requests up until RID
   1315             ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
   1316         } else {
   1317             ackUpTo = Long.parseLong(acked);
   1318         }
   1319 
   1320         // Remove the acked requests from the list
   1321         if (LOG.isLoggable(Level.FINEST)) {
   1322             LOG.finest("Removing pending acks up to: " + ackUpTo);
   1323         }
   1324         Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
   1325         while (iter.hasNext()) {
   1326             AbstractBody pending = iter.next();
   1327             Long pendingRID = Long.parseLong(
   1328                     pending.getAttribute(Attributes.RID));
   1329             if (pendingRID.compareTo(ackUpTo) <= 0) {
   1330                 iter.remove();
   1331             }
   1332         }
   1333     }
   1334 
   1335     /**
   1336      * Process the response in order to update the response acknowlegement
   1337      * data.
   1338      *
   1339      * This method assumes the lock is currently held.
   1340      *
   1341      * @param req request
   1342      */
   1343     private void processResponseAcknowledgementData(
   1344             final AbstractBody req) {
   1345         assertLocked();
   1346 
   1347         Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
   1348         if (responseAck.equals(Long.valueOf(-1L))) {
   1349             // This is the first request
   1350             responseAck = rid;
   1351         } else {
   1352             pendingResponseAcks.add(rid);
   1353             // Remove up until the first missing response (or end of queue)
   1354             Long whileVal = responseAck;
   1355             while (whileVal.equals(pendingResponseAcks.first())) {
   1356                 responseAck = whileVal;
   1357                 pendingResponseAcks.remove(whileVal);
   1358                 whileVal = Long.valueOf(whileVal.longValue() + 1);
   1359             }
   1360         }
   1361     }
   1362 
   1363     /**
   1364      * Process the response in order to check for and respond to any potential
   1365      * ack reports.
   1366      *
   1367      * This method assumes the lock is currently held.
   1368      *
   1369      * @param resp response
   1370      * @return exchange to transmit if a resend is to be performed, or
   1371      *  {@code null} if no resend is necessary
   1372      * @throws BOSHException when a a retry is needed but cannot be performed
   1373      */
   1374     private HTTPExchange processResponseAcknowledgementReport(
   1375             final AbstractBody resp)
   1376             throws BOSHException {
   1377         assertLocked();
   1378 
   1379         String reportStr = resp.getAttribute(Attributes.REPORT);
   1380         if (reportStr == null) {
   1381             // No report on this message
   1382             return null;
   1383         }
   1384 
   1385         Long report = Long.parseLong(reportStr);
   1386         Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
   1387         if (LOG.isLoggable(Level.FINE)) {
   1388             LOG.fine("Received report of missing request (RID="
   1389                     + report + ", time=" + time + "ms)");
   1390         }
   1391 
   1392         // Find the missing request
   1393         Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
   1394         AbstractBody req = null;
   1395         while (iter.hasNext() && req == null) {
   1396             AbstractBody pending = iter.next();
   1397             Long pendingRID = Long.parseLong(
   1398                     pending.getAttribute(Attributes.RID));
   1399             if (report.equals(pendingRID)) {
   1400                 req = pending;
   1401             }
   1402         }
   1403 
   1404         if (req == null) {
   1405             throw(new BOSHException("Report of missing message with RID '"
   1406                     + reportStr
   1407                     + "' but local copy of that request was not found"));
   1408         }
   1409 
   1410         // Resend the missing request
   1411         HTTPExchange exch = new HTTPExchange(req);
   1412         exchanges.add(exch);
   1413         notEmpty.signalAll();
   1414         return exch;
   1415     }
   1416 
   1417     /**
   1418      * Notifies all request listeners that the specified request is being
   1419      * sent.
   1420      *
   1421      * @param request request being sent
   1422      */
   1423     private void fireRequestSent(final AbstractBody request) {
   1424         assertUnlocked();
   1425 
   1426         BOSHMessageEvent event = null;
   1427         for (BOSHClientRequestListener listener : requestListeners) {
   1428             if (event == null) {
   1429                 event = BOSHMessageEvent.createRequestSentEvent(this, request);
   1430             }
   1431             try {
   1432                 listener.requestSent(event);
   1433             } catch (Exception ex) {
   1434                 LOG.log(Level.WARNING, UNHANDLED, ex);
   1435             }
   1436         }
   1437     }
   1438 
   1439     /**
   1440      * Notifies all response listeners that the specified response has been
   1441      * received.
   1442      *
   1443      * @param response response received
   1444      */
   1445     private void fireResponseReceived(final AbstractBody response) {
   1446         assertUnlocked();
   1447 
   1448         BOSHMessageEvent event = null;
   1449         for (BOSHClientResponseListener listener : responseListeners) {
   1450             if (event == null) {
   1451                 event = BOSHMessageEvent.createResponseReceivedEvent(
   1452                         this, response);
   1453             }
   1454             try {
   1455                 listener.responseReceived(event);
   1456             } catch (Exception ex) {
   1457                 LOG.log(Level.WARNING, UNHANDLED, ex);
   1458             }
   1459         }
   1460     }
   1461 
   1462     /**
   1463      * Notifies all connection listeners that the session has been successfully
   1464      * established.
   1465      */
   1466     private void fireConnectionEstablished() {
   1467         final boolean hadLock = lock.isHeldByCurrentThread();
   1468         if (hadLock) {
   1469             lock.unlock();
   1470         }
   1471         try {
   1472             BOSHClientConnEvent event = null;
   1473             for (BOSHClientConnListener listener : connListeners) {
   1474                 if (event == null) {
   1475                     event = BOSHClientConnEvent
   1476                             .createConnectionEstablishedEvent(this);
   1477                 }
   1478                 try {
   1479                     listener.connectionEvent(event);
   1480                 } catch (Exception ex) {
   1481                     LOG.log(Level.WARNING, UNHANDLED, ex);
   1482                 }
   1483             }
   1484         } finally {
   1485             if (hadLock) {
   1486                 lock.lock();
   1487             }
   1488         }
   1489     }
   1490 
   1491     /**
   1492      * Notifies all connection listeners that the session has been
   1493      * terminated normally.
   1494      */
   1495     private void fireConnectionClosed() {
   1496         assertUnlocked();
   1497 
   1498         BOSHClientConnEvent event = null;
   1499         for (BOSHClientConnListener listener : connListeners) {
   1500             if (event == null) {
   1501                 event = BOSHClientConnEvent.createConnectionClosedEvent(this);
   1502             }
   1503             try {
   1504                 listener.connectionEvent(event);
   1505             } catch (Exception ex) {
   1506                 LOG.log(Level.WARNING, UNHANDLED, ex);
   1507             }
   1508         }
   1509     }
   1510 
   1511     /**
   1512      * Notifies all connection listeners that the session has been
   1513      * terminated due to the exceptional condition provided.
   1514      *
   1515      * @param cause cause of the termination
   1516      */
   1517     private void fireConnectionClosedOnError(
   1518             final Throwable cause) {
   1519         assertUnlocked();
   1520 
   1521         BOSHClientConnEvent event = null;
   1522         for (BOSHClientConnListener listener : connListeners) {
   1523             if (event == null) {
   1524                 event = BOSHClientConnEvent
   1525                         .createConnectionClosedOnErrorEvent(
   1526                         this, pendingRequestAcks, cause);
   1527             }
   1528             try {
   1529                 listener.connectionEvent(event);
   1530             } catch (Exception ex) {
   1531                 LOG.log(Level.WARNING, UNHANDLED, ex);
   1532             }
   1533         }
   1534     }
   1535 
   1536 }
   1537