1 /** 2 * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License"); 3 * you may not use this file except in compliance with the License. 4 * You may obtain a copy of the License at 5 * 6 * http://www.apache.org/licenses/LICENSE-2.0 7 * 8 * Unless required by applicable law or agreed to in writing, software 9 * distributed under the License is distributed on an "AS IS" BASIS, 10 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 11 * See the License for the specific language governing permissions and 12 * limitations under the License. 13 */ 14 package org.jivesoftware.smackx.bytestreams.ibb; 15 16 import java.io.IOException; 17 import java.io.InputStream; 18 import java.io.OutputStream; 19 import java.net.SocketTimeoutException; 20 import java.util.concurrent.BlockingQueue; 21 import java.util.concurrent.LinkedBlockingQueue; 22 import java.util.concurrent.TimeUnit; 23 24 import org.jivesoftware.smack.Connection; 25 import org.jivesoftware.smack.PacketListener; 26 import org.jivesoftware.smack.XMPPException; 27 import org.jivesoftware.smack.filter.AndFilter; 28 import org.jivesoftware.smack.filter.PacketFilter; 29 import org.jivesoftware.smack.filter.PacketTypeFilter; 30 import org.jivesoftware.smack.packet.IQ; 31 import org.jivesoftware.smack.packet.Message; 32 import org.jivesoftware.smack.packet.Packet; 33 import org.jivesoftware.smack.packet.PacketExtension; 34 import org.jivesoftware.smack.packet.XMPPError; 35 import org.jivesoftware.smack.util.StringUtils; 36 import org.jivesoftware.smack.util.SyncPacketSend; 37 import org.jivesoftware.smackx.bytestreams.BytestreamSession; 38 import org.jivesoftware.smackx.bytestreams.ibb.packet.Close; 39 import org.jivesoftware.smackx.bytestreams.ibb.packet.Data; 40 import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension; 41 import org.jivesoftware.smackx.bytestreams.ibb.packet.Open; 42 43 /** 44 * InBandBytestreamSession class represents an In-Band Bytestream session. 45 * <p> 46 * In-band bytestreams are bidirectional and this session encapsulates the streams for both 47 * directions. 48 * <p> 49 * Note that closing the In-Band Bytestream session will close both streams. If both streams are 50 * closed individually the session will be closed automatically once the second stream is closed. 51 * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed 52 * automatically if one of them is closed. 53 * 54 * @author Henning Staib 55 */ 56 public class InBandBytestreamSession implements BytestreamSession { 57 58 /* XMPP connection */ 59 private final Connection connection; 60 61 /* the In-Band Bytestream open request for this session */ 62 private final Open byteStreamRequest; 63 64 /* 65 * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream) 66 */ 67 private IBBInputStream inputStream; 68 69 /* 70 * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream) 71 */ 72 private IBBOutputStream outputStream; 73 74 /* JID of the remote peer */ 75 private String remoteJID; 76 77 /* flag to close both streams if one of them is closed */ 78 private boolean closeBothStreamsEnabled = false; 79 80 /* flag to indicate if session is closed */ 81 private boolean isClosed = false; 82 83 /** 84 * Constructor. 85 * 86 * @param connection the XMPP connection 87 * @param byteStreamRequest the In-Band Bytestream open request for this session 88 * @param remoteJID JID of the remote peer 89 */ 90 protected InBandBytestreamSession(Connection connection, Open byteStreamRequest, 91 String remoteJID) { 92 this.connection = connection; 93 this.byteStreamRequest = byteStreamRequest; 94 this.remoteJID = remoteJID; 95 96 // initialize streams dependent to the uses stanza type 97 switch (byteStreamRequest.getStanza()) { 98 case IQ: 99 this.inputStream = new IQIBBInputStream(); 100 this.outputStream = new IQIBBOutputStream(); 101 break; 102 case MESSAGE: 103 this.inputStream = new MessageIBBInputStream(); 104 this.outputStream = new MessageIBBOutputStream(); 105 break; 106 } 107 108 } 109 110 public InputStream getInputStream() { 111 return this.inputStream; 112 } 113 114 public OutputStream getOutputStream() { 115 return this.outputStream; 116 } 117 118 public int getReadTimeout() { 119 return this.inputStream.readTimeout; 120 } 121 122 public void setReadTimeout(int timeout) { 123 if (timeout < 0) { 124 throw new IllegalArgumentException("Timeout must be >= 0"); 125 } 126 this.inputStream.readTimeout = timeout; 127 } 128 129 /** 130 * Returns whether both streams should be closed automatically if one of the streams is closed. 131 * Default is <code>false</code>. 132 * 133 * @return <code>true</code> if both streams will be closed if one of the streams is closed, 134 * <code>false</code> if both streams can be closed independently. 135 */ 136 public boolean isCloseBothStreamsEnabled() { 137 return closeBothStreamsEnabled; 138 } 139 140 /** 141 * Sets whether both streams should be closed automatically if one of the streams is closed. 142 * Default is <code>false</code>. 143 * 144 * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of 145 * the streams is closed, <code>false</code> if both streams should be closed 146 * independently 147 */ 148 public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) { 149 this.closeBothStreamsEnabled = closeBothStreamsEnabled; 150 } 151 152 public void close() throws IOException { 153 closeByLocal(true); // close input stream 154 closeByLocal(false); // close output stream 155 } 156 157 /** 158 * This method is invoked if a request to close the In-Band Bytestream has been received. 159 * 160 * @param closeRequest the close request from the remote peer 161 */ 162 protected void closeByPeer(Close closeRequest) { 163 164 /* 165 * close streams without flushing them, because stream is already considered closed on the 166 * remote peers side 167 */ 168 this.inputStream.closeInternal(); 169 this.inputStream.cleanup(); 170 this.outputStream.closeInternal(false); 171 172 // acknowledge close request 173 IQ confirmClose = IQ.createResultIQ(closeRequest); 174 this.connection.sendPacket(confirmClose); 175 176 } 177 178 /** 179 * This method is invoked if one of the streams has been closed locally, if an error occurred 180 * locally or if the whole session should be closed. 181 * 182 * @throws IOException if an error occurs while sending the close request 183 */ 184 protected synchronized void closeByLocal(boolean in) throws IOException { 185 if (this.isClosed) { 186 return; 187 } 188 189 if (this.closeBothStreamsEnabled) { 190 this.inputStream.closeInternal(); 191 this.outputStream.closeInternal(true); 192 } 193 else { 194 if (in) { 195 this.inputStream.closeInternal(); 196 } 197 else { 198 // close stream but try to send any data left 199 this.outputStream.closeInternal(true); 200 } 201 } 202 203 if (this.inputStream.isClosed && this.outputStream.isClosed) { 204 this.isClosed = true; 205 206 // send close request 207 Close close = new Close(this.byteStreamRequest.getSessionID()); 208 close.setTo(this.remoteJID); 209 try { 210 SyncPacketSend.getReply(this.connection, close); 211 } 212 catch (XMPPException e) { 213 throw new IOException("Error while closing stream: " + e.getMessage()); 214 } 215 216 this.inputStream.cleanup(); 217 218 // remove session from manager 219 InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this); 220 } 221 222 } 223 224 /** 225 * IBBInputStream class is the base implementation of an In-Band Bytestream input stream. 226 * Subclasses of this input stream must provide a packet listener along with a packet filter to 227 * collect the In-Band Bytestream data packets. 228 */ 229 private abstract class IBBInputStream extends InputStream { 230 231 /* the data packet listener to fill the data queue */ 232 private final PacketListener dataPacketListener; 233 234 /* queue containing received In-Band Bytestream data packets */ 235 protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>(); 236 237 /* buffer containing the data from one data packet */ 238 private byte[] buffer; 239 240 /* pointer to the next byte to read from buffer */ 241 private int bufferPointer = -1; 242 243 /* data packet sequence (range from 0 to 65535) */ 244 private long seq = -1; 245 246 /* flag to indicate if input stream is closed */ 247 private boolean isClosed = false; 248 249 /* flag to indicate if close method was invoked */ 250 private boolean closeInvoked = false; 251 252 /* timeout for read operations */ 253 private int readTimeout = 0; 254 255 /** 256 * Constructor. 257 */ 258 public IBBInputStream() { 259 // add data packet listener to connection 260 this.dataPacketListener = getDataPacketListener(); 261 connection.addPacketListener(this.dataPacketListener, getDataPacketFilter()); 262 } 263 264 /** 265 * Returns the packet listener that processes In-Band Bytestream data packets. 266 * 267 * @return the data packet listener 268 */ 269 protected abstract PacketListener getDataPacketListener(); 270 271 /** 272 * Returns the packet filter that accepts In-Band Bytestream data packets. 273 * 274 * @return the data packet filter 275 */ 276 protected abstract PacketFilter getDataPacketFilter(); 277 278 public synchronized int read() throws IOException { 279 checkClosed(); 280 281 // if nothing read yet or whole buffer has been read fill buffer 282 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 283 // if no data available and stream was closed return -1 284 if (!loadBuffer()) { 285 return -1; 286 } 287 } 288 289 // return byte and increment buffer pointer 290 return ((int) buffer[bufferPointer++]) & 0xff; 291 } 292 293 public synchronized int read(byte[] b, int off, int len) throws IOException { 294 if (b == null) { 295 throw new NullPointerException(); 296 } 297 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 298 || ((off + len) < 0)) { 299 throw new IndexOutOfBoundsException(); 300 } 301 else if (len == 0) { 302 return 0; 303 } 304 305 checkClosed(); 306 307 // if nothing read yet or whole buffer has been read fill buffer 308 if (bufferPointer == -1 || bufferPointer >= buffer.length) { 309 // if no data available and stream was closed return -1 310 if (!loadBuffer()) { 311 return -1; 312 } 313 } 314 315 // if more bytes wanted than available return all available 316 int bytesAvailable = buffer.length - bufferPointer; 317 if (len > bytesAvailable) { 318 len = bytesAvailable; 319 } 320 321 System.arraycopy(buffer, bufferPointer, b, off, len); 322 bufferPointer += len; 323 return len; 324 } 325 326 public synchronized int read(byte[] b) throws IOException { 327 return read(b, 0, b.length); 328 } 329 330 /** 331 * This method blocks until a data packet is received, the stream is closed or the current 332 * thread is interrupted. 333 * 334 * @return <code>true</code> if data was received, otherwise <code>false</code> 335 * @throws IOException if data packets are out of sequence 336 */ 337 private synchronized boolean loadBuffer() throws IOException { 338 339 // wait until data is available or stream is closed 340 DataPacketExtension data = null; 341 try { 342 if (this.readTimeout == 0) { 343 while (data == null) { 344 if (isClosed && this.dataQueue.isEmpty()) { 345 return false; 346 } 347 data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS); 348 } 349 } 350 else { 351 data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS); 352 if (data == null) { 353 throw new SocketTimeoutException(); 354 } 355 } 356 } 357 catch (InterruptedException e) { 358 // Restore the interrupted status 359 Thread.currentThread().interrupt(); 360 return false; 361 } 362 363 // handle sequence overflow 364 if (this.seq == 65535) { 365 this.seq = -1; 366 } 367 368 // check if data packets sequence is successor of last seen sequence 369 long seq = data.getSeq(); 370 if (seq - 1 != this.seq) { 371 // packets out of order; close stream/session 372 InBandBytestreamSession.this.close(); 373 throw new IOException("Packets out of sequence"); 374 } 375 else { 376 this.seq = seq; 377 } 378 379 // set buffer to decoded data 380 buffer = data.getDecodedData(); 381 bufferPointer = 0; 382 return true; 383 } 384 385 /** 386 * Checks if this stream is closed and throws an IOException if necessary 387 * 388 * @throws IOException if stream is closed and no data should be read anymore 389 */ 390 private void checkClosed() throws IOException { 391 /* throw no exception if there is data available, but not if close method was invoked */ 392 if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) { 393 // clear data queue in case additional data was received after stream was closed 394 this.dataQueue.clear(); 395 throw new IOException("Stream is closed"); 396 } 397 } 398 399 public boolean markSupported() { 400 return false; 401 } 402 403 public void close() throws IOException { 404 if (isClosed) { 405 return; 406 } 407 408 this.closeInvoked = true; 409 410 InBandBytestreamSession.this.closeByLocal(true); 411 } 412 413 /** 414 * This method sets the close flag and removes the data packet listener. 415 */ 416 private void closeInternal() { 417 if (isClosed) { 418 return; 419 } 420 isClosed = true; 421 } 422 423 /** 424 * Invoked if the session is closed. 425 */ 426 private void cleanup() { 427 connection.removePacketListener(this.dataPacketListener); 428 } 429 430 } 431 432 /** 433 * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the 434 * data packets. 435 */ 436 private class IQIBBInputStream extends IBBInputStream { 437 438 protected PacketListener getDataPacketListener() { 439 return new PacketListener() { 440 441 private long lastSequence = -1; 442 443 public void processPacket(Packet packet) { 444 // get data packet extension 445 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 446 DataPacketExtension.ELEMENT_NAME, 447 InBandBytestreamManager.NAMESPACE); 448 449 /* 450 * check if sequence was not used already (see XEP-0047 Section 2.2) 451 */ 452 if (data.getSeq() <= this.lastSequence) { 453 IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 454 XMPPError.Condition.unexpected_request)); 455 connection.sendPacket(unexpectedRequest); 456 return; 457 458 } 459 460 // check if encoded data is valid (see XEP-0047 Section 2.2) 461 if (data.getDecodedData() == null) { 462 // data is invalid; respond with bad-request error 463 IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError( 464 XMPPError.Condition.bad_request)); 465 connection.sendPacket(badRequest); 466 return; 467 } 468 469 // data is valid; add to data queue 470 dataQueue.offer(data); 471 472 // confirm IQ 473 IQ confirmData = IQ.createResultIQ((IQ) packet); 474 connection.sendPacket(confirmData); 475 476 // set last seen sequence 477 this.lastSequence = data.getSeq(); 478 if (this.lastSequence == 65535) { 479 this.lastSequence = -1; 480 } 481 482 } 483 484 }; 485 } 486 487 protected PacketFilter getDataPacketFilter() { 488 /* 489 * filter all IQ stanzas having type 'SET' (represented by Data class), containing a 490 * data packet extension, matching session ID and recipient 491 */ 492 return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter()); 493 } 494 495 } 496 497 /** 498 * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas 499 * encapsulating the data packets. 500 */ 501 private class MessageIBBInputStream extends IBBInputStream { 502 503 protected PacketListener getDataPacketListener() { 504 return new PacketListener() { 505 506 public void processPacket(Packet packet) { 507 // get data packet extension 508 DataPacketExtension data = (DataPacketExtension) packet.getExtension( 509 DataPacketExtension.ELEMENT_NAME, 510 InBandBytestreamManager.NAMESPACE); 511 512 // check if encoded data is valid 513 if (data.getDecodedData() == null) { 514 /* 515 * TODO once a majority of XMPP server implementation support XEP-0079 516 * Advanced Message Processing the invalid message could be answered with an 517 * appropriate error. For now we just ignore the packet. Subsequent packets 518 * with an increased sequence will cause the input stream to close the 519 * stream/session. 520 */ 521 return; 522 } 523 524 // data is valid; add to data queue 525 dataQueue.offer(data); 526 527 // TODO confirm packet once XMPP servers support XEP-0079 528 } 529 530 }; 531 } 532 533 @Override 534 protected PacketFilter getDataPacketFilter() { 535 /* 536 * filter all message stanzas containing a data packet extension, matching session ID 537 * and recipient 538 */ 539 return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter()); 540 } 541 542 } 543 544 /** 545 * IBBDataPacketFilter class filters all packets from the remote peer of this session, 546 * containing an In-Band Bytestream data packet extension whose session ID matches this sessions 547 * ID. 548 */ 549 private class IBBDataPacketFilter implements PacketFilter { 550 551 public boolean accept(Packet packet) { 552 // sender equals remote peer 553 if (!packet.getFrom().equalsIgnoreCase(remoteJID)) { 554 return false; 555 } 556 557 // stanza contains data packet extension 558 PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME, 559 InBandBytestreamManager.NAMESPACE); 560 if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) { 561 return false; 562 } 563 564 // session ID equals this session ID 565 DataPacketExtension data = (DataPacketExtension) packetExtension; 566 if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) { 567 return false; 568 } 569 570 return true; 571 } 572 573 } 574 575 /** 576 * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream. 577 * Subclasses of this output stream must provide a method to send data over XMPP stream. 578 */ 579 private abstract class IBBOutputStream extends OutputStream { 580 581 /* buffer with the size of this sessions block size */ 582 protected final byte[] buffer; 583 584 /* pointer to next byte to write to buffer */ 585 protected int bufferPointer = 0; 586 587 /* data packet sequence (range from 0 to 65535) */ 588 protected long seq = 0; 589 590 /* flag to indicate if output stream is closed */ 591 protected boolean isClosed = false; 592 593 /** 594 * Constructor. 595 */ 596 public IBBOutputStream() { 597 this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3]; 598 } 599 600 /** 601 * Writes the given data packet to the XMPP stream. 602 * 603 * @param data the data packet 604 * @throws IOException if an I/O error occurred while sending or if the stream is closed 605 */ 606 protected abstract void writeToXML(DataPacketExtension data) throws IOException; 607 608 public synchronized void write(int b) throws IOException { 609 if (this.isClosed) { 610 throw new IOException("Stream is closed"); 611 } 612 613 // if buffer is full flush buffer 614 if (bufferPointer >= buffer.length) { 615 flushBuffer(); 616 } 617 618 buffer[bufferPointer++] = (byte) b; 619 } 620 621 public synchronized void write(byte b[], int off, int len) throws IOException { 622 if (b == null) { 623 throw new NullPointerException(); 624 } 625 else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) 626 || ((off + len) < 0)) { 627 throw new IndexOutOfBoundsException(); 628 } 629 else if (len == 0) { 630 return; 631 } 632 633 if (this.isClosed) { 634 throw new IOException("Stream is closed"); 635 } 636 637 // is data to send greater than buffer size 638 if (len >= buffer.length) { 639 640 // "byte" off the first chunk to write out 641 writeOut(b, off, buffer.length); 642 643 // recursively call this method with the lesser amount 644 write(b, off + buffer.length, len - buffer.length); 645 } 646 else { 647 writeOut(b, off, len); 648 } 649 } 650 651 public synchronized void write(byte[] b) throws IOException { 652 write(b, 0, b.length); 653 } 654 655 /** 656 * Fills the buffer with the given data and sends it over the XMPP stream if the buffers 657 * capacity has been reached. This method is only called from this class so it is assured 658 * that the amount of data to send is <= buffer capacity 659 * 660 * @param b the data 661 * @param off the data 662 * @param len the number of bytes to write 663 * @throws IOException if an I/O error occurred while sending or if the stream is closed 664 */ 665 private synchronized void writeOut(byte b[], int off, int len) throws IOException { 666 if (this.isClosed) { 667 throw new IOException("Stream is closed"); 668 } 669 670 // set to 0 in case the next 'if' block is not executed 671 int available = 0; 672 673 // is data to send greater that buffer space left 674 if (len > buffer.length - bufferPointer) { 675 // fill buffer to capacity and send it 676 available = buffer.length - bufferPointer; 677 System.arraycopy(b, off, buffer, bufferPointer, available); 678 bufferPointer += available; 679 flushBuffer(); 680 } 681 682 // copy the data left to buffer 683 System.arraycopy(b, off + available, buffer, bufferPointer, len - available); 684 bufferPointer += len - available; 685 } 686 687 public synchronized void flush() throws IOException { 688 if (this.isClosed) { 689 throw new IOException("Stream is closed"); 690 } 691 flushBuffer(); 692 } 693 694 private synchronized void flushBuffer() throws IOException { 695 696 // do nothing if no data to send available 697 if (bufferPointer == 0) { 698 return; 699 } 700 701 // create data packet 702 String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false); 703 DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(), 704 this.seq, enc); 705 706 // write to XMPP stream 707 writeToXML(data); 708 709 // reset buffer pointer 710 bufferPointer = 0; 711 712 // increment sequence, considering sequence overflow 713 this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1); 714 715 } 716 717 public void close() throws IOException { 718 if (isClosed) { 719 return; 720 } 721 InBandBytestreamSession.this.closeByLocal(false); 722 } 723 724 /** 725 * Sets the close flag and optionally flushes the stream. 726 * 727 * @param flush if <code>true</code> flushes the stream 728 */ 729 protected void closeInternal(boolean flush) { 730 if (this.isClosed) { 731 return; 732 } 733 this.isClosed = true; 734 735 try { 736 if (flush) { 737 flushBuffer(); 738 } 739 } 740 catch (IOException e) { 741 /* 742 * ignore, because writeToXML() will not throw an exception if stream is already 743 * closed 744 */ 745 } 746 } 747 748 } 749 750 /** 751 * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating 752 * the data packets. 753 */ 754 private class IQIBBOutputStream extends IBBOutputStream { 755 756 @Override 757 protected synchronized void writeToXML(DataPacketExtension data) throws IOException { 758 // create IQ stanza containing data packet 759 IQ iq = new Data(data); 760 iq.setTo(remoteJID); 761 762 try { 763 SyncPacketSend.getReply(connection, iq); 764 } 765 catch (XMPPException e) { 766 // close session unless it is already closed 767 if (!this.isClosed) { 768 InBandBytestreamSession.this.close(); 769 throw new IOException("Error while sending Data: " + e.getMessage()); 770 } 771 } 772 773 } 774 775 } 776 777 /** 778 * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas 779 * encapsulating the data packets. 780 */ 781 private class MessageIBBOutputStream extends IBBOutputStream { 782 783 @Override 784 protected synchronized void writeToXML(DataPacketExtension data) { 785 // create message stanza containing data packet 786 Message message = new Message(remoteJID); 787 message.addExtension(data); 788 789 connection.sendPacket(message); 790 791 } 792 793 } 794 795 } 796