1 /* 2 * Copyright 2009 Mike Cumings 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.kenai.jbosh; 18 19 import com.kenai.jbosh.ComposableBody.Builder; 20 import java.util.ArrayList; 21 import java.util.Iterator; 22 import java.util.LinkedList; 23 import java.util.List; 24 import java.util.Queue; 25 import java.util.Set; 26 import java.util.SortedSet; 27 import java.util.TreeSet; 28 import java.util.concurrent.CopyOnWriteArraySet; 29 import java.util.concurrent.Executors; 30 import java.util.concurrent.RejectedExecutionException; 31 import java.util.concurrent.ScheduledExecutorService; 32 import java.util.concurrent.ScheduledFuture; 33 import java.util.concurrent.TimeUnit; 34 import java.util.concurrent.atomic.AtomicReference; 35 import java.util.concurrent.locks.Condition; 36 import java.util.concurrent.locks.ReentrantLock; 37 import java.util.logging.Level; 38 import java.util.logging.Logger; 39 40 /** 41 * BOSH Client session instance. Each communication session with a remote 42 * connection manager is represented and handled by an instance of this 43 * class. This is the main entry point for client-side communications. 44 * To create a new session, a client configuration must first be created 45 * and then used to create a client instance: 46 * <pre> 47 * BOSHClientConfig cfg = BOSHClientConfig.Builder.create( 48 * "http://server:1234/httpbind", "jabber.org") 49 * .setFrom("user (at) jabber.org") 50 * .build(); 51 * BOSHClient client = BOSHClient.create(cfg); 52 * </pre> 53 * Additional client configuration options are available. See the 54 * {@code BOSHClientConfig.Builder} class for more information. 55 * <p/> 56 * Once a {@code BOSHClient} instance has been created, communication with 57 * the remote connection manager can begin. No attempt will be made to 58 * establish a connection to the connection manager until the first call 59 * is made to the {@code send(ComposableBody)} method. Note that it is 60 * possible to send an empty body to cause an immediate connection attempt 61 * to the connection manager. Sending an empty message would look like 62 * the following: 63 * <pre> 64 * client.send(ComposableBody.builder().build()); 65 * </pre> 66 * For more information on creating body messages with content, see the 67 * {@code ComposableBody.Builder} class documentation. 68 * <p/> 69 * Once a session has been successfully started, the client instance can be 70 * used to send arbitrary payload data. All aspects of the BOSH 71 * protocol involving setting and processing attributes in the BOSH 72 * namespace will be handled by the client code transparently and behind the 73 * scenes. The user of the client instance can therefore concentrate 74 * entirely on the content of the message payload, leaving the semantics of 75 * the BOSH protocol to the client implementation. 76 * <p/> 77 * To be notified of incoming messages from the remote connection manager, 78 * a {@code BOSHClientResponseListener} should be added to the client instance. 79 * All incoming messages will be published to all response listeners as they 80 * arrive and are processed. As with the transmission of payload data via 81 * the {@code send(ComposableBody)} method, there is no need to worry about 82 * handling of the BOSH attributes, since this is handled behind the scenes. 83 * <p/> 84 * If the connection to the remote connection manager is terminated (either 85 * explicitly or due to a terminal condition of some sort), all connection 86 * listeners will be notified. After the connection has been closed, the 87 * client instance is considered dead and a new one must be created in order 88 * to resume communications with the remote server. 89 * <p/> 90 * Instances of this class are thread-safe. 91 * 92 * @see BOSHClientConfig.Builder 93 * @see BOSHClientResponseListener 94 * @see BOSHClientConnListener 95 * @see ComposableBody.Builder 96 */ 97 public final class BOSHClient { 98 99 /** 100 * Logger. 101 */ 102 private static final Logger LOG = Logger.getLogger( 103 BOSHClient.class.getName()); 104 105 /** 106 * Value of the 'type' attribute used for session termination. 107 */ 108 private static final String TERMINATE = "terminate"; 109 110 /** 111 * Value of the 'type' attribute used for recoverable errors. 112 */ 113 private static final String ERROR = "error"; 114 115 /** 116 * Message to use for interrupted exceptions. 117 */ 118 private static final String INTERRUPTED = "Interrupted"; 119 120 /** 121 * Message used for unhandled exceptions. 122 */ 123 private static final String UNHANDLED = "Unhandled Exception"; 124 125 /** 126 * Message used whena null listener is detected. 127 */ 128 private static final String NULL_LISTENER = "Listener may not b enull"; 129 130 /** 131 * Default empty request delay. 132 */ 133 private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100; 134 135 /** 136 * Amount of time to wait before sending an empty request, in 137 * milliseconds. 138 */ 139 private static final int EMPTY_REQUEST_DELAY = Integer.getInteger( 140 BOSHClient.class.getName() + ".emptyRequestDelay", 141 DEFAULT_EMPTY_REQUEST_DELAY); 142 143 /** 144 * Default value for the pause margin. 145 */ 146 private static final int DEFAULT_PAUSE_MARGIN = 500; 147 148 /** 149 * The amount of time in milliseconds which will be reserved as a 150 * safety margin when scheduling empty requests against a maxpause 151 * value. This should give us enough time to build the message 152 * and transport it to the remote host. 153 */ 154 private static final int PAUSE_MARGIN = Integer.getInteger( 155 BOSHClient.class.getName() + ".pauseMargin", 156 DEFAULT_PAUSE_MARGIN); 157 158 /** 159 * Flag indicating whether or not we want to perform assertions. 160 */ 161 private static final boolean ASSERTIONS; 162 163 /** 164 * Connection listeners. 165 */ 166 private final Set<BOSHClientConnListener> connListeners = 167 new CopyOnWriteArraySet<BOSHClientConnListener>(); 168 169 /** 170 * Request listeners. 171 */ 172 private final Set<BOSHClientRequestListener> requestListeners = 173 new CopyOnWriteArraySet<BOSHClientRequestListener>(); 174 175 /** 176 * Response listeners. 177 */ 178 private final Set<BOSHClientResponseListener> responseListeners = 179 new CopyOnWriteArraySet<BOSHClientResponseListener>(); 180 181 /** 182 * Lock instance. 183 */ 184 private final ReentrantLock lock = new ReentrantLock(); 185 186 /** 187 * Condition indicating that there are messages to be exchanged. 188 */ 189 private final Condition notEmpty = lock.newCondition(); 190 191 /** 192 * Condition indicating that there are available slots for sending 193 * messages. 194 */ 195 private final Condition notFull = lock.newCondition(); 196 197 /** 198 * Condition indicating that there are no outstanding connections. 199 */ 200 private final Condition drained = lock.newCondition(); 201 202 /** 203 * Session configuration. 204 */ 205 private final BOSHClientConfig cfg; 206 207 /** 208 * Processor thread runnable instance. 209 */ 210 private final Runnable procRunnable = new Runnable() { 211 /** 212 * Process incoming messages. 213 */ 214 public void run() { 215 processMessages(); 216 } 217 }; 218 219 /** 220 * Processor thread runnable instance. 221 */ 222 private final Runnable emptyRequestRunnable = new Runnable() { 223 /** 224 * Process incoming messages. 225 */ 226 public void run() { 227 sendEmptyRequest(); 228 } 229 }; 230 231 /** 232 * HTTPSender instance. 233 */ 234 private final HTTPSender httpSender = 235 new ApacheHTTPSender(); 236 237 /** 238 * Storage for test hook implementation. 239 */ 240 private final AtomicReference<ExchangeInterceptor> exchInterceptor = 241 new AtomicReference<ExchangeInterceptor>(); 242 243 /** 244 * Request ID sequence to use for the session. 245 */ 246 private final RequestIDSequence requestIDSeq = new RequestIDSequence(); 247 248 /** 249 * ScheduledExcecutor to use for deferred tasks. 250 */ 251 private final ScheduledExecutorService schedExec = 252 Executors.newSingleThreadScheduledExecutor(); 253 254 /************************************************************ 255 * The following vars must be accessed via the lock instance. 256 */ 257 258 /** 259 * Thread which is used to process responses from the connection 260 * manager. Becomes null when session is terminated. 261 */ 262 private Thread procThread; 263 264 /** 265 * Future for sending a deferred empty request, if needed. 266 */ 267 private ScheduledFuture emptyRequestFuture; 268 269 /** 270 * Connection Manager session parameters. Only available when in a 271 * connected state. 272 */ 273 private CMSessionParams cmParams; 274 275 /** 276 * List of active/outstanding requests. 277 */ 278 private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>(); 279 280 /** 281 * Set of RIDs which have been received, for the purpose of sending 282 * response acknowledgements. 283 */ 284 private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>(); 285 286 /** 287 * The highest RID that we've already received a response for. This value 288 * is used to implement response acks. 289 */ 290 private Long responseAck = Long.valueOf(-1L); 291 292 /** 293 * List of requests which have been made but not yet acknowledged. This 294 * list remains unpopulated if the CM is not acking requests. 295 */ 296 private List<ComposableBody> pendingRequestAcks = 297 new ArrayList<ComposableBody>(); 298 299 /////////////////////////////////////////////////////////////////////////// 300 // Classes: 301 302 /** 303 * Class used in testing to dynamically manipulate received exchanges 304 * at test runtime. 305 */ 306 abstract static class ExchangeInterceptor { 307 /** 308 * Limit construction. 309 */ 310 ExchangeInterceptor() { 311 // Empty; 312 } 313 314 /** 315 * Hook to manipulate an HTTPExchange as is is about to be processed. 316 * 317 * @param exch original exchange that would be processed 318 * @return replacement exchange instance, or {@code null} to skip 319 * processing of this exchange 320 */ 321 abstract HTTPExchange interceptExchange(final HTTPExchange exch); 322 } 323 324 /////////////////////////////////////////////////////////////////////////// 325 // Constructors: 326 327 /** 328 * Determine whether or not we should perform assertions. Assertions 329 * can be specified via system property explicitly, or defaulted to 330 * the JVM assertions status. 331 */ 332 static { 333 final String prop = 334 BOSHClient.class.getSimpleName() + ".assertionsEnabled"; 335 boolean enabled = false; 336 if (System.getProperty(prop) == null) { 337 assert enabled = true; 338 } else { 339 enabled = Boolean.getBoolean(prop); 340 } 341 ASSERTIONS = enabled; 342 } 343 344 /** 345 * Prevent direct construction. 346 */ 347 private BOSHClient(final BOSHClientConfig sessCfg) { 348 cfg = sessCfg; 349 init(); 350 } 351 352 /////////////////////////////////////////////////////////////////////////// 353 // Public methods: 354 355 /** 356 * Create a new BOSH client session using the client configuration 357 * information provided. 358 * 359 * @param clientCfg session configuration 360 * @return BOSH session instance 361 */ 362 public static BOSHClient create(final BOSHClientConfig clientCfg) { 363 if (clientCfg == null) { 364 throw(new IllegalArgumentException( 365 "Client configuration may not be null")); 366 } 367 return new BOSHClient(clientCfg); 368 } 369 370 /** 371 * Get the client configuration that was used to create this client 372 * instance. 373 * 374 * @return client configuration 375 */ 376 public BOSHClientConfig getBOSHClientConfig() { 377 return cfg; 378 } 379 380 /** 381 * Adds a connection listener to the session. 382 * 383 * @param listener connection listener to add, if not already added 384 */ 385 public void addBOSHClientConnListener( 386 final BOSHClientConnListener listener) { 387 if (listener == null) { 388 throw(new IllegalArgumentException(NULL_LISTENER)); 389 } 390 connListeners.add(listener); 391 } 392 393 /** 394 * Removes a connection listener from the session. 395 * 396 * @param listener connection listener to remove, if previously added 397 */ 398 public void removeBOSHClientConnListener( 399 final BOSHClientConnListener listener) { 400 if (listener == null) { 401 throw(new IllegalArgumentException(NULL_LISTENER)); 402 } 403 connListeners.remove(listener); 404 } 405 406 /** 407 * Adds a request message listener to the session. 408 * 409 * @param listener request listener to add, if not already added 410 */ 411 public void addBOSHClientRequestListener( 412 final BOSHClientRequestListener listener) { 413 if (listener == null) { 414 throw(new IllegalArgumentException(NULL_LISTENER)); 415 } 416 requestListeners.add(listener); 417 } 418 419 /** 420 * Removes a request message listener from the session, if previously 421 * added. 422 * 423 * @param listener instance to remove 424 */ 425 public void removeBOSHClientRequestListener( 426 final BOSHClientRequestListener listener) { 427 if (listener == null) { 428 throw(new IllegalArgumentException(NULL_LISTENER)); 429 } 430 requestListeners.remove(listener); 431 } 432 433 /** 434 * Adds a response message listener to the session. 435 * 436 * @param listener response listener to add, if not already added 437 */ 438 public void addBOSHClientResponseListener( 439 final BOSHClientResponseListener listener) { 440 if (listener == null) { 441 throw(new IllegalArgumentException(NULL_LISTENER)); 442 } 443 responseListeners.add(listener); 444 } 445 446 /** 447 * Removes a response message listener from the session, if previously 448 * added. 449 * 450 * @param listener instance to remove 451 */ 452 public void removeBOSHClientResponseListener( 453 final BOSHClientResponseListener listener) { 454 if (listener == null) { 455 throw(new IllegalArgumentException(NULL_LISTENER)); 456 } 457 responseListeners.remove(listener); 458 } 459 460 /** 461 * Send the provided message data to the remote connection manager. The 462 * provided message body does not need to have any BOSH-specific attribute 463 * information set. It only needs to contain the actual message payload 464 * that should be delivered to the remote server. 465 * <p/> 466 * The first call to this method will result in a connection attempt 467 * to the remote connection manager. Subsequent calls to this method 468 * will block until the underlying session state allows for the message 469 * to be transmitted. In certain scenarios - such as when the maximum 470 * number of outbound connections has been reached - calls to this method 471 * will block for short periods of time. 472 * 473 * @param body message data to send to remote server 474 * @throws BOSHException on message transmission failure 475 */ 476 public void send(final ComposableBody body) throws BOSHException { 477 assertUnlocked(); 478 if (body == null) { 479 throw(new IllegalArgumentException( 480 "Message body may not be null")); 481 } 482 483 HTTPExchange exch; 484 CMSessionParams params; 485 lock.lock(); 486 try { 487 blockUntilSendable(body); 488 if (!isWorking() && !isTermination(body)) { 489 throw(new BOSHException( 490 "Cannot send message when session is closed")); 491 } 492 493 long rid = requestIDSeq.getNextRID(); 494 ComposableBody request = body; 495 params = cmParams; 496 if (params == null && exchanges.isEmpty()) { 497 // This is the first message being sent 498 request = applySessionCreationRequest(rid, body); 499 } else { 500 request = applySessionData(rid, body); 501 if (cmParams.isAckingRequests()) { 502 pendingRequestAcks.add(request); 503 } 504 } 505 exch = new HTTPExchange(request); 506 exchanges.add(exch); 507 notEmpty.signalAll(); 508 clearEmptyRequest(); 509 } finally { 510 lock.unlock(); 511 } 512 AbstractBody finalReq = exch.getRequest(); 513 HTTPResponse resp = httpSender.send(params, finalReq); 514 exch.setHTTPResponse(resp); 515 fireRequestSent(finalReq); 516 } 517 518 /** 519 * Attempt to pause the current session. When supported by the remote 520 * connection manager, pausing the session will result in the connection 521 * manager closing out all outstanding requests (including the pause 522 * request) and increases the inactivity timeout of the session. The 523 * exact value of the temporary timeout is dependent upon the connection 524 * manager. This method should be used if a client encounters an 525 * exceptional temporary situation during which it will be unable to send 526 * requests to the connection manager for a period of time greater than 527 * the maximum inactivity period. 528 * 529 * The session will revert back to it's normal, unpaused state when the 530 * client sends it's next message. 531 * 532 * @return {@code true} if the connection manager supports session pausing, 533 * {@code false} if the connection manager does not support session 534 * pausing or if the session has not yet been established 535 */ 536 public boolean pause() { 537 assertUnlocked(); 538 lock.lock(); 539 AttrMaxPause maxPause = null; 540 try { 541 if (cmParams == null) { 542 return false; 543 } 544 545 maxPause = cmParams.getMaxPause(); 546 if (maxPause == null) { 547 return false; 548 } 549 } finally { 550 lock.unlock(); 551 } 552 try { 553 send(ComposableBody.builder() 554 .setAttribute(Attributes.PAUSE, maxPause.toString()) 555 .build()); 556 } catch (BOSHException boshx) { 557 LOG.log(Level.FINEST, "Could not send pause", boshx); 558 } 559 return true; 560 } 561 562 /** 563 * End the BOSH session by disconnecting from the remote BOSH connection 564 * manager. 565 * 566 * @throws BOSHException when termination message cannot be sent 567 */ 568 public void disconnect() throws BOSHException { 569 disconnect(ComposableBody.builder().build()); 570 } 571 572 /** 573 * End the BOSH session by disconnecting from the remote BOSH connection 574 * manager, sending the provided content in the final connection 575 * termination message. 576 * 577 * @param msg final message to send 578 * @throws BOSHException when termination message cannot be sent 579 */ 580 public void disconnect(final ComposableBody msg) throws BOSHException { 581 if (msg == null) { 582 throw(new IllegalArgumentException( 583 "Message body may not be null")); 584 } 585 586 Builder builder = msg.rebuild(); 587 builder.setAttribute(Attributes.TYPE, TERMINATE); 588 send(builder.build()); 589 } 590 591 /** 592 * Forcibly close this client session instance. The preferred mechanism 593 * to close the connection is to send a disconnect message and wait for 594 * organic termination. Calling this method simply shuts down the local 595 * session without sending a termination message, releasing all resources 596 * associated with the session. 597 */ 598 public void close() { 599 dispose(new BOSHException("Session explicitly closed by caller")); 600 } 601 602 /////////////////////////////////////////////////////////////////////////// 603 // Package-private methods: 604 605 /** 606 * Get the current CM session params. 607 * 608 * @return current session params, or {@code null} 609 */ 610 CMSessionParams getCMSessionParams() { 611 lock.lock(); 612 try { 613 return cmParams; 614 } finally { 615 lock.unlock(); 616 } 617 } 618 619 /** 620 * Wait until no more messages are waiting to be processed. 621 */ 622 void drain() { 623 lock.lock(); 624 try { 625 LOG.finest("Waiting while draining..."); 626 while (isWorking() 627 && (emptyRequestFuture == null 628 || emptyRequestFuture.isDone())) { 629 try { 630 drained.await(); 631 } catch (InterruptedException intx) { 632 LOG.log(Level.FINEST, INTERRUPTED, intx); 633 } 634 } 635 LOG.finest("Drained"); 636 } finally { 637 lock.unlock(); 638 } 639 } 640 641 /** 642 * Test method used to forcibly discard next exchange. 643 * 644 * @param interceptor exchange interceptor 645 */ 646 void setExchangeInterceptor(final ExchangeInterceptor interceptor) { 647 exchInterceptor.set(interceptor); 648 } 649 650 651 /////////////////////////////////////////////////////////////////////////// 652 // Private methods: 653 654 /** 655 * Initialize the session. This initializes the underlying HTTP 656 * transport implementation and starts the receive thread. 657 */ 658 private void init() { 659 assertUnlocked(); 660 661 lock.lock(); 662 try { 663 httpSender.init(cfg); 664 procThread = new Thread(procRunnable); 665 procThread.setDaemon(true); 666 procThread.setName(BOSHClient.class.getSimpleName() 667 + "[" + System.identityHashCode(this) 668 + "]: Receive thread"); 669 procThread.start(); 670 } finally { 671 lock.unlock(); 672 } 673 } 674 675 /** 676 * Destroy this session. 677 * 678 * @param cause the reason for the session termination, or {@code null} 679 * for normal termination 680 */ 681 private void dispose(final Throwable cause) { 682 assertUnlocked(); 683 684 lock.lock(); 685 try { 686 if (procThread == null) { 687 // Already disposed 688 return; 689 } 690 procThread = null; 691 } finally { 692 lock.unlock(); 693 } 694 695 if (cause == null) { 696 fireConnectionClosed(); 697 } else { 698 fireConnectionClosedOnError(cause); 699 } 700 701 lock.lock(); 702 try { 703 clearEmptyRequest(); 704 exchanges = null; 705 cmParams = null; 706 pendingResponseAcks = null; 707 pendingRequestAcks = null; 708 notEmpty.signalAll(); 709 notFull.signalAll(); 710 drained.signalAll(); 711 } finally { 712 lock.unlock(); 713 } 714 715 httpSender.destroy(); 716 schedExec.shutdownNow(); 717 } 718 719 /** 720 * Determines if the message body specified indicates a request to 721 * pause the session. 722 * 723 * @param msg message to evaluate 724 * @return {@code true} if the message is a pause request, {@code false} 725 * otherwise 726 */ 727 private static boolean isPause(final AbstractBody msg) { 728 return msg.getAttribute(Attributes.PAUSE) != null; 729 } 730 731 /** 732 * Determines if the message body specified indicates a termination of 733 * the session. 734 * 735 * @param msg message to evaluate 736 * @return {@code true} if the message is a session termination, 737 * {@code false} otherwise 738 */ 739 private static boolean isTermination(final AbstractBody msg) { 740 return TERMINATE.equals(msg.getAttribute(Attributes.TYPE)); 741 } 742 743 /** 744 * Evaluates the HTTP response code and response message and returns the 745 * terminal binding condition that it describes, if any. 746 * 747 * @param respCode HTTP response code 748 * @param respBody response body 749 * @return terminal binding condition, or {@code null} if not a terminal 750 * binding condition message 751 */ 752 private TerminalBindingCondition getTerminalBindingCondition( 753 final int respCode, 754 final AbstractBody respBody) { 755 assertLocked(); 756 757 if (isTermination(respBody)) { 758 String str = respBody.getAttribute(Attributes.CONDITION); 759 return TerminalBindingCondition.forString(str); 760 } 761 // Check for deprecated HTTP Error Conditions 762 if (cmParams != null && cmParams.getVersion() == null) { 763 return TerminalBindingCondition.forHTTPResponseCode(respCode); 764 } 765 return null; 766 } 767 768 /** 769 * Determines if the message specified is immediately sendable or if it 770 * needs to block until the session state changes. 771 * 772 * @param msg message to evaluate 773 * @return {@code true} if the message can be immediately sent, 774 * {@code false} otherwise 775 */ 776 private boolean isImmediatelySendable(final AbstractBody msg) { 777 assertLocked(); 778 779 if (cmParams == null) { 780 // block if we're waiting for a response to our first request 781 return exchanges.isEmpty(); 782 } 783 784 AttrRequests requests = cmParams.getRequests(); 785 if (requests == null) { 786 return true; 787 } 788 int maxRequests = requests.intValue(); 789 if (exchanges.size() < maxRequests) { 790 return true; 791 } 792 if (exchanges.size() == maxRequests 793 && (isTermination(msg) || isPause(msg))) { 794 // One additional terminate or pause message is allowed 795 return true; 796 } 797 return false; 798 } 799 800 /** 801 * Determines whether or not the session is still active. 802 * 803 * @return {@code true} if it is, {@code false} otherwise 804 */ 805 private boolean isWorking() { 806 assertLocked(); 807 808 return procThread != null; 809 } 810 811 /** 812 * Blocks until either the message provided becomes immediately 813 * sendable or until the session is terminated. 814 * 815 * @param msg message to evaluate 816 */ 817 private void blockUntilSendable(final AbstractBody msg) { 818 assertLocked(); 819 820 while (isWorking() && !isImmediatelySendable(msg)) { 821 try { 822 notFull.await(); 823 } catch (InterruptedException intx) { 824 LOG.log(Level.FINEST, INTERRUPTED, intx); 825 } 826 } 827 } 828 829 /** 830 * Modifies the specified body message such that it becomes a new 831 * BOSH session creation request. 832 * 833 * @param rid request ID to use 834 * @param orig original body to modify 835 * @return modified message which acts as a session creation request 836 */ 837 private ComposableBody applySessionCreationRequest( 838 final long rid, final ComposableBody orig) throws BOSHException { 839 assertLocked(); 840 841 Builder builder = orig.rebuild(); 842 builder.setAttribute(Attributes.TO, cfg.getTo()); 843 builder.setAttribute(Attributes.XML_LANG, cfg.getLang()); 844 builder.setAttribute(Attributes.VER, 845 AttrVersion.getSupportedVersion().toString()); 846 builder.setAttribute(Attributes.WAIT, "60"); 847 builder.setAttribute(Attributes.HOLD, "1"); 848 builder.setAttribute(Attributes.RID, Long.toString(rid)); 849 applyRoute(builder); 850 applyFrom(builder); 851 builder.setAttribute(Attributes.ACK, "1"); 852 853 // Make sure the following are NOT present (i.e., during retries) 854 builder.setAttribute(Attributes.SID, null); 855 return builder.build(); 856 } 857 858 /** 859 * Applies routing information to the request message who's builder has 860 * been provided. 861 * 862 * @param builder builder instance to add routing information to 863 */ 864 private void applyRoute(final Builder builder) { 865 assertLocked(); 866 867 String route = cfg.getRoute(); 868 if (route != null) { 869 builder.setAttribute(Attributes.ROUTE, route); 870 } 871 } 872 873 /** 874 * Applies the local station ID information to the request message who's 875 * builder has been provided. 876 * 877 * @param builder builder instance to add station ID information to 878 */ 879 private void applyFrom(final Builder builder) { 880 assertLocked(); 881 882 String from = cfg.getFrom(); 883 if (from != null) { 884 builder.setAttribute(Attributes.FROM, from); 885 } 886 } 887 888 /** 889 * Applies existing session data to the outbound request, returning the 890 * modified request. 891 * 892 * This method assumes the lock is currently held. 893 * 894 * @param rid request ID to use 895 * @param orig original/raw request 896 * @return modified request with session information applied 897 */ 898 private ComposableBody applySessionData( 899 final long rid, 900 final ComposableBody orig) throws BOSHException { 901 assertLocked(); 902 903 Builder builder = orig.rebuild(); 904 builder.setAttribute(Attributes.SID, 905 cmParams.getSessionID().toString()); 906 builder.setAttribute(Attributes.RID, Long.toString(rid)); 907 applyResponseAcknowledgement(builder, rid); 908 return builder.build(); 909 } 910 911 /** 912 * Sets the 'ack' attribute of the request to the value of the highest 913 * 'rid' of a request for which it has already received a response in the 914 * case where it has also received all responses associated with lower 915 * 'rid' values. The only exception is that, after its session creation 916 * request, the client SHOULD NOT include an 'ack' attribute in any request 917 * if it has received responses to all its previous requests. 918 * 919 * @param builder message builder 920 * @param rid current request RID 921 */ 922 private void applyResponseAcknowledgement( 923 final Builder builder, 924 final long rid) { 925 assertLocked(); 926 927 if (responseAck.equals(Long.valueOf(-1L))) { 928 // We have not received any responses yet 929 return; 930 } 931 932 Long prevRID = Long.valueOf(rid - 1L); 933 if (responseAck.equals(prevRID)) { 934 // Implicit ack 935 return; 936 } 937 938 builder.setAttribute(Attributes.ACK, responseAck.toString()); 939 } 940 941 /** 942 * While we are "connected", process received responses. 943 * 944 * This method is run in the processing thread. 945 */ 946 private void processMessages() { 947 LOG.log(Level.FINEST, "Processing thread starting"); 948 try { 949 HTTPExchange exch; 950 do { 951 exch = nextExchange(); 952 if (exch == null) { 953 break; 954 } 955 956 // Test hook to manipulate what the client sees: 957 ExchangeInterceptor interceptor = exchInterceptor.get(); 958 if (interceptor != null) { 959 HTTPExchange newExch = interceptor.interceptExchange(exch); 960 if (newExch == null) { 961 LOG.log(Level.FINE, "Discarding exchange on request " 962 + "of test hook: RID=" 963 + exch.getRequest().getAttribute( 964 Attributes.RID)); 965 lock.lock(); 966 try { 967 exchanges.remove(exch); 968 } finally { 969 lock.unlock(); 970 } 971 continue; 972 } 973 exch = newExch; 974 } 975 976 processExchange(exch); 977 } while (true); 978 } finally { 979 LOG.log(Level.FINEST, "Processing thread exiting"); 980 } 981 982 } 983 984 /** 985 * Get the next message exchange to process, blocking until one becomes 986 * available if nothing is already waiting for processing. 987 * 988 * @return next available exchange to process, or {@code null} if no 989 * exchanges are immediately available 990 */ 991 private HTTPExchange nextExchange() { 992 assertUnlocked(); 993 994 final Thread thread = Thread.currentThread(); 995 HTTPExchange exch = null; 996 lock.lock(); 997 try { 998 do { 999 if (!thread.equals(procThread)) { 1000 break; 1001 } 1002 exch = exchanges.peek(); 1003 if (exch == null) { 1004 try { 1005 notEmpty.await(); 1006 } catch (InterruptedException intx) { 1007 LOG.log(Level.FINEST, INTERRUPTED, intx); 1008 } 1009 } 1010 } while (exch == null); 1011 } finally { 1012 lock.unlock(); 1013 } 1014 return exch; 1015 } 1016 1017 /** 1018 * Process the next, provided exchange. This is the main processing 1019 * method of the receive thread. 1020 * 1021 * @param exch message exchange to process 1022 */ 1023 private void processExchange(final HTTPExchange exch) { 1024 assertUnlocked(); 1025 1026 HTTPResponse resp; 1027 AbstractBody body; 1028 int respCode; 1029 try { 1030 resp = exch.getHTTPResponse(); 1031 body = resp.getBody(); 1032 respCode = resp.getHTTPStatus(); 1033 } catch (BOSHException boshx) { 1034 LOG.log(Level.FINEST, "Could not obtain response", boshx); 1035 dispose(boshx); 1036 return; 1037 } catch (InterruptedException intx) { 1038 LOG.log(Level.FINEST, INTERRUPTED, intx); 1039 dispose(intx); 1040 return; 1041 } 1042 fireResponseReceived(body); 1043 1044 // Process the message with the current session state 1045 AbstractBody req = exch.getRequest(); 1046 CMSessionParams params; 1047 List<HTTPExchange> toResend = null; 1048 lock.lock(); 1049 try { 1050 // Check for session creation response info, if needed 1051 if (cmParams == null) { 1052 cmParams = CMSessionParams.fromSessionInit(req, body); 1053 1054 // The following call handles the lock. It's not an escape. 1055 fireConnectionEstablished(); 1056 } 1057 params = cmParams; 1058 1059 checkForTerminalBindingConditions(body, respCode); 1060 if (isTermination(body)) { 1061 // Explicit termination 1062 lock.unlock(); 1063 dispose(null); 1064 return; 1065 } 1066 1067 if (isRecoverableBindingCondition(body)) { 1068 // Retransmit outstanding requests 1069 if (toResend == null) { 1070 toResend = new ArrayList<HTTPExchange>(exchanges.size()); 1071 } 1072 for (HTTPExchange exchange : exchanges) { 1073 HTTPExchange resendExch = 1074 new HTTPExchange(exchange.getRequest()); 1075 toResend.add(resendExch); 1076 } 1077 for (HTTPExchange exchange : toResend) { 1078 exchanges.add(exchange); 1079 } 1080 } else { 1081 // Process message as normal 1082 processRequestAcknowledgements(req, body); 1083 processResponseAcknowledgementData(req); 1084 HTTPExchange resendExch = 1085 processResponseAcknowledgementReport(body); 1086 if (resendExch != null && toResend == null) { 1087 toResend = new ArrayList<HTTPExchange>(1); 1088 toResend.add(resendExch); 1089 exchanges.add(resendExch); 1090 } 1091 } 1092 } catch (BOSHException boshx) { 1093 LOG.log(Level.FINEST, "Could not process response", boshx); 1094 lock.unlock(); 1095 dispose(boshx); 1096 return; 1097 } finally { 1098 if (lock.isHeldByCurrentThread()) { 1099 try { 1100 exchanges.remove(exch); 1101 if (exchanges.isEmpty()) { 1102 scheduleEmptyRequest(processPauseRequest(req)); 1103 } 1104 notFull.signalAll(); 1105 } finally { 1106 lock.unlock(); 1107 } 1108 } 1109 } 1110 1111 if (toResend != null) { 1112 for (HTTPExchange resend : toResend) { 1113 HTTPResponse response = 1114 httpSender.send(params, resend.getRequest()); 1115 resend.setHTTPResponse(response); 1116 fireRequestSent(resend.getRequest()); 1117 } 1118 } 1119 } 1120 1121 /** 1122 * Clears any scheduled empty requests. 1123 */ 1124 private void clearEmptyRequest() { 1125 assertLocked(); 1126 1127 if (emptyRequestFuture != null) { 1128 emptyRequestFuture.cancel(false); 1129 emptyRequestFuture = null; 1130 } 1131 } 1132 1133 /** 1134 * Calculates the default empty request delay/interval to use for the 1135 * active session. 1136 * 1137 * @return delay in milliseconds 1138 */ 1139 private long getDefaultEmptyRequestDelay() { 1140 assertLocked(); 1141 1142 // Figure out how long we should wait before sending an empty request 1143 AttrPolling polling = cmParams.getPollingInterval(); 1144 long delay; 1145 if (polling == null) { 1146 delay = EMPTY_REQUEST_DELAY; 1147 } else { 1148 delay = polling.getInMilliseconds(); 1149 } 1150 return delay; 1151 } 1152 1153 /** 1154 * Schedule an empty request to be sent if no other requests are 1155 * sent in a reasonable amount of time. 1156 */ 1157 private void scheduleEmptyRequest(long delay) { 1158 assertLocked(); 1159 if (delay < 0L) { 1160 throw(new IllegalArgumentException( 1161 "Empty request delay must be >= 0 (was: " + delay + ")")); 1162 } 1163 1164 clearEmptyRequest(); 1165 if (!isWorking()) { 1166 return; 1167 } 1168 1169 // Schedule the transmission 1170 if (LOG.isLoggable(Level.FINER)) { 1171 LOG.finer("Scheduling empty request in " + delay + "ms"); 1172 } 1173 try { 1174 emptyRequestFuture = schedExec.schedule(emptyRequestRunnable, 1175 delay, TimeUnit.MILLISECONDS); 1176 } catch (RejectedExecutionException rex) { 1177 LOG.log(Level.FINEST, "Could not schedule empty request", rex); 1178 } 1179 drained.signalAll(); 1180 } 1181 1182 /** 1183 * Sends an empty request to maintain session requirements. If a request 1184 * is sent within a reasonable time window, the empty request transmission 1185 * will be cancelled. 1186 */ 1187 private void sendEmptyRequest() { 1188 assertUnlocked(); 1189 // Send an empty request 1190 LOG.finest("Sending empty request"); 1191 try { 1192 send(ComposableBody.builder().build()); 1193 } catch (BOSHException boshx) { 1194 dispose(boshx); 1195 } 1196 } 1197 1198 /** 1199 * Assert that the internal lock is held. 1200 */ 1201 private void assertLocked() { 1202 if (ASSERTIONS) { 1203 if (!lock.isHeldByCurrentThread()) { 1204 throw(new AssertionError("Lock is not held by current thread")); 1205 } 1206 return; 1207 } 1208 } 1209 1210 /** 1211 * Assert that the internal lock is *not* held. 1212 */ 1213 private void assertUnlocked() { 1214 if (ASSERTIONS) { 1215 if (lock.isHeldByCurrentThread()) { 1216 throw(new AssertionError("Lock is held by current thread")); 1217 } 1218 return; 1219 } 1220 } 1221 1222 /** 1223 * Checks to see if the response indicates a terminal binding condition 1224 * (as per XEP-0124 section 17). If it does, an exception is thrown. 1225 * 1226 * @param body response body to evaluate 1227 * @param code HTTP response code 1228 * @throws BOSHException if a terminal binding condition is detected 1229 */ 1230 private void checkForTerminalBindingConditions( 1231 final AbstractBody body, 1232 final int code) 1233 throws BOSHException { 1234 TerminalBindingCondition cond = 1235 getTerminalBindingCondition(code, body); 1236 if (cond != null) { 1237 throw(new BOSHException( 1238 "Terminal binding condition encountered: " 1239 + cond.getCondition() + " (" 1240 + cond.getMessage() + ")")); 1241 } 1242 } 1243 1244 /** 1245 * Determines whether or not the response indicates a recoverable 1246 * binding condition (as per XEP-0124 section 17). 1247 * 1248 * @param resp response body 1249 * @return {@code true} if it does, {@code false} otherwise 1250 */ 1251 private static boolean isRecoverableBindingCondition( 1252 final AbstractBody resp) { 1253 return ERROR.equals(resp.getAttribute(Attributes.TYPE)); 1254 } 1255 1256 /** 1257 * Process the request to determine if the empty request delay 1258 * can be determined by looking to see if the request is a pause 1259 * request. If it can, the request's delay is returned, otherwise 1260 * the default delay is returned. 1261 * 1262 * @return delay in milliseconds that should elapse prior to an 1263 * empty message being sent 1264 */ 1265 private long processPauseRequest( 1266 final AbstractBody req) { 1267 assertLocked(); 1268 1269 if (cmParams != null && cmParams.getMaxPause() != null) { 1270 try { 1271 AttrPause pause = AttrPause.createFromString( 1272 req.getAttribute(Attributes.PAUSE)); 1273 if (pause != null) { 1274 long delay = pause.getInMilliseconds() - PAUSE_MARGIN; 1275 if (delay < 0) { 1276 delay = EMPTY_REQUEST_DELAY; 1277 } 1278 return delay; 1279 } 1280 } catch (BOSHException boshx) { 1281 LOG.log(Level.FINEST, "Could not extract", boshx); 1282 } 1283 } 1284 1285 return getDefaultEmptyRequestDelay(); 1286 } 1287 1288 /** 1289 * Check the response for request acknowledgements and take appropriate 1290 * action. 1291 * 1292 * This method assumes the lock is currently held. 1293 * 1294 * @param req request 1295 * @param resp response 1296 */ 1297 private void processRequestAcknowledgements( 1298 final AbstractBody req, final AbstractBody resp) { 1299 assertLocked(); 1300 1301 if (!cmParams.isAckingRequests()) { 1302 return; 1303 } 1304 1305 // If a report or time attribute is set, we aren't acking anything 1306 if (resp.getAttribute(Attributes.REPORT) != null) { 1307 return; 1308 } 1309 1310 // Figure out what the highest acked RID is 1311 String acked = resp.getAttribute(Attributes.ACK); 1312 Long ackUpTo; 1313 if (acked == null) { 1314 // Implicit ack of all prior requests up until RID 1315 ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID)); 1316 } else { 1317 ackUpTo = Long.parseLong(acked); 1318 } 1319 1320 // Remove the acked requests from the list 1321 if (LOG.isLoggable(Level.FINEST)) { 1322 LOG.finest("Removing pending acks up to: " + ackUpTo); 1323 } 1324 Iterator<ComposableBody> iter = pendingRequestAcks.iterator(); 1325 while (iter.hasNext()) { 1326 AbstractBody pending = iter.next(); 1327 Long pendingRID = Long.parseLong( 1328 pending.getAttribute(Attributes.RID)); 1329 if (pendingRID.compareTo(ackUpTo) <= 0) { 1330 iter.remove(); 1331 } 1332 } 1333 } 1334 1335 /** 1336 * Process the response in order to update the response acknowlegement 1337 * data. 1338 * 1339 * This method assumes the lock is currently held. 1340 * 1341 * @param req request 1342 */ 1343 private void processResponseAcknowledgementData( 1344 final AbstractBody req) { 1345 assertLocked(); 1346 1347 Long rid = Long.parseLong(req.getAttribute(Attributes.RID)); 1348 if (responseAck.equals(Long.valueOf(-1L))) { 1349 // This is the first request 1350 responseAck = rid; 1351 } else { 1352 pendingResponseAcks.add(rid); 1353 // Remove up until the first missing response (or end of queue) 1354 Long whileVal = responseAck; 1355 while (whileVal.equals(pendingResponseAcks.first())) { 1356 responseAck = whileVal; 1357 pendingResponseAcks.remove(whileVal); 1358 whileVal = Long.valueOf(whileVal.longValue() + 1); 1359 } 1360 } 1361 } 1362 1363 /** 1364 * Process the response in order to check for and respond to any potential 1365 * ack reports. 1366 * 1367 * This method assumes the lock is currently held. 1368 * 1369 * @param resp response 1370 * @return exchange to transmit if a resend is to be performed, or 1371 * {@code null} if no resend is necessary 1372 * @throws BOSHException when a a retry is needed but cannot be performed 1373 */ 1374 private HTTPExchange processResponseAcknowledgementReport( 1375 final AbstractBody resp) 1376 throws BOSHException { 1377 assertLocked(); 1378 1379 String reportStr = resp.getAttribute(Attributes.REPORT); 1380 if (reportStr == null) { 1381 // No report on this message 1382 return null; 1383 } 1384 1385 Long report = Long.parseLong(reportStr); 1386 Long time = Long.parseLong(resp.getAttribute(Attributes.TIME)); 1387 if (LOG.isLoggable(Level.FINE)) { 1388 LOG.fine("Received report of missing request (RID=" 1389 + report + ", time=" + time + "ms)"); 1390 } 1391 1392 // Find the missing request 1393 Iterator<ComposableBody> iter = pendingRequestAcks.iterator(); 1394 AbstractBody req = null; 1395 while (iter.hasNext() && req == null) { 1396 AbstractBody pending = iter.next(); 1397 Long pendingRID = Long.parseLong( 1398 pending.getAttribute(Attributes.RID)); 1399 if (report.equals(pendingRID)) { 1400 req = pending; 1401 } 1402 } 1403 1404 if (req == null) { 1405 throw(new BOSHException("Report of missing message with RID '" 1406 + reportStr 1407 + "' but local copy of that request was not found")); 1408 } 1409 1410 // Resend the missing request 1411 HTTPExchange exch = new HTTPExchange(req); 1412 exchanges.add(exch); 1413 notEmpty.signalAll(); 1414 return exch; 1415 } 1416 1417 /** 1418 * Notifies all request listeners that the specified request is being 1419 * sent. 1420 * 1421 * @param request request being sent 1422 */ 1423 private void fireRequestSent(final AbstractBody request) { 1424 assertUnlocked(); 1425 1426 BOSHMessageEvent event = null; 1427 for (BOSHClientRequestListener listener : requestListeners) { 1428 if (event == null) { 1429 event = BOSHMessageEvent.createRequestSentEvent(this, request); 1430 } 1431 try { 1432 listener.requestSent(event); 1433 } catch (Exception ex) { 1434 LOG.log(Level.WARNING, UNHANDLED, ex); 1435 } 1436 } 1437 } 1438 1439 /** 1440 * Notifies all response listeners that the specified response has been 1441 * received. 1442 * 1443 * @param response response received 1444 */ 1445 private void fireResponseReceived(final AbstractBody response) { 1446 assertUnlocked(); 1447 1448 BOSHMessageEvent event = null; 1449 for (BOSHClientResponseListener listener : responseListeners) { 1450 if (event == null) { 1451 event = BOSHMessageEvent.createResponseReceivedEvent( 1452 this, response); 1453 } 1454 try { 1455 listener.responseReceived(event); 1456 } catch (Exception ex) { 1457 LOG.log(Level.WARNING, UNHANDLED, ex); 1458 } 1459 } 1460 } 1461 1462 /** 1463 * Notifies all connection listeners that the session has been successfully 1464 * established. 1465 */ 1466 private void fireConnectionEstablished() { 1467 final boolean hadLock = lock.isHeldByCurrentThread(); 1468 if (hadLock) { 1469 lock.unlock(); 1470 } 1471 try { 1472 BOSHClientConnEvent event = null; 1473 for (BOSHClientConnListener listener : connListeners) { 1474 if (event == null) { 1475 event = BOSHClientConnEvent 1476 .createConnectionEstablishedEvent(this); 1477 } 1478 try { 1479 listener.connectionEvent(event); 1480 } catch (Exception ex) { 1481 LOG.log(Level.WARNING, UNHANDLED, ex); 1482 } 1483 } 1484 } finally { 1485 if (hadLock) { 1486 lock.lock(); 1487 } 1488 } 1489 } 1490 1491 /** 1492 * Notifies all connection listeners that the session has been 1493 * terminated normally. 1494 */ 1495 private void fireConnectionClosed() { 1496 assertUnlocked(); 1497 1498 BOSHClientConnEvent event = null; 1499 for (BOSHClientConnListener listener : connListeners) { 1500 if (event == null) { 1501 event = BOSHClientConnEvent.createConnectionClosedEvent(this); 1502 } 1503 try { 1504 listener.connectionEvent(event); 1505 } catch (Exception ex) { 1506 LOG.log(Level.WARNING, UNHANDLED, ex); 1507 } 1508 } 1509 } 1510 1511 /** 1512 * Notifies all connection listeners that the session has been 1513 * terminated due to the exceptional condition provided. 1514 * 1515 * @param cause cause of the termination 1516 */ 1517 private void fireConnectionClosedOnError( 1518 final Throwable cause) { 1519 assertUnlocked(); 1520 1521 BOSHClientConnEvent event = null; 1522 for (BOSHClientConnListener listener : connListeners) { 1523 if (event == null) { 1524 event = BOSHClientConnEvent 1525 .createConnectionClosedOnErrorEvent( 1526 this, pendingRequestAcks, cause); 1527 } 1528 try { 1529 listener.connectionEvent(event); 1530 } catch (Exception ex) { 1531 LOG.log(Level.WARNING, UNHANDLED, ex); 1532 } 1533 } 1534 } 1535 1536 } 1537