Home | History | Annotate | Download | only in ibb
      1 /**
      2  * All rights reserved. Licensed under the Apache License, Version 2.0 (the "License");
      3  * you may not use this file except in compliance with the License.
      4  * You may obtain a copy of the License at
      5  *
      6  *     http://www.apache.org/licenses/LICENSE-2.0
      7  *
      8  * Unless required by applicable law or agreed to in writing, software
      9  * distributed under the License is distributed on an "AS IS" BASIS,
     10  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     11  * See the License for the specific language governing permissions and
     12  * limitations under the License.
     13  */
     14 package org.jivesoftware.smackx.bytestreams.ibb;
     15 
     16 import java.io.IOException;
     17 import java.io.InputStream;
     18 import java.io.OutputStream;
     19 import java.net.SocketTimeoutException;
     20 import java.util.concurrent.BlockingQueue;
     21 import java.util.concurrent.LinkedBlockingQueue;
     22 import java.util.concurrent.TimeUnit;
     23 
     24 import org.jivesoftware.smack.Connection;
     25 import org.jivesoftware.smack.PacketListener;
     26 import org.jivesoftware.smack.XMPPException;
     27 import org.jivesoftware.smack.filter.AndFilter;
     28 import org.jivesoftware.smack.filter.PacketFilter;
     29 import org.jivesoftware.smack.filter.PacketTypeFilter;
     30 import org.jivesoftware.smack.packet.IQ;
     31 import org.jivesoftware.smack.packet.Message;
     32 import org.jivesoftware.smack.packet.Packet;
     33 import org.jivesoftware.smack.packet.PacketExtension;
     34 import org.jivesoftware.smack.packet.XMPPError;
     35 import org.jivesoftware.smack.util.StringUtils;
     36 import org.jivesoftware.smack.util.SyncPacketSend;
     37 import org.jivesoftware.smackx.bytestreams.BytestreamSession;
     38 import org.jivesoftware.smackx.bytestreams.ibb.packet.Close;
     39 import org.jivesoftware.smackx.bytestreams.ibb.packet.Data;
     40 import org.jivesoftware.smackx.bytestreams.ibb.packet.DataPacketExtension;
     41 import org.jivesoftware.smackx.bytestreams.ibb.packet.Open;
     42 
     43 /**
     44  * InBandBytestreamSession class represents an In-Band Bytestream session.
     45  * <p>
     46  * In-band bytestreams are bidirectional and this session encapsulates the streams for both
     47  * directions.
     48  * <p>
     49  * Note that closing the In-Band Bytestream session will close both streams. If both streams are
     50  * closed individually the session will be closed automatically once the second stream is closed.
     51  * Use the {@link #setCloseBothStreamsEnabled(boolean)} method if both streams should be closed
     52  * automatically if one of them is closed.
     53  *
     54  * @author Henning Staib
     55  */
     56 public class InBandBytestreamSession implements BytestreamSession {
     57 
     58     /* XMPP connection */
     59     private final Connection connection;
     60 
     61     /* the In-Band Bytestream open request for this session */
     62     private final Open byteStreamRequest;
     63 
     64     /*
     65      * the input stream for this session (either IQIBBInputStream or MessageIBBInputStream)
     66      */
     67     private IBBInputStream inputStream;
     68 
     69     /*
     70      * the output stream for this session (either IQIBBOutputStream or MessageIBBOutputStream)
     71      */
     72     private IBBOutputStream outputStream;
     73 
     74     /* JID of the remote peer */
     75     private String remoteJID;
     76 
     77     /* flag to close both streams if one of them is closed */
     78     private boolean closeBothStreamsEnabled = false;
     79 
     80     /* flag to indicate if session is closed */
     81     private boolean isClosed = false;
     82 
     83     /**
     84      * Constructor.
     85      *
     86      * @param connection the XMPP connection
     87      * @param byteStreamRequest the In-Band Bytestream open request for this session
     88      * @param remoteJID JID of the remote peer
     89      */
     90     protected InBandBytestreamSession(Connection connection, Open byteStreamRequest,
     91                     String remoteJID) {
     92         this.connection = connection;
     93         this.byteStreamRequest = byteStreamRequest;
     94         this.remoteJID = remoteJID;
     95 
     96         // initialize streams dependent to the uses stanza type
     97         switch (byteStreamRequest.getStanza()) {
     98         case IQ:
     99             this.inputStream = new IQIBBInputStream();
    100             this.outputStream = new IQIBBOutputStream();
    101             break;
    102         case MESSAGE:
    103             this.inputStream = new MessageIBBInputStream();
    104             this.outputStream = new MessageIBBOutputStream();
    105             break;
    106         }
    107 
    108     }
    109 
    110     public InputStream getInputStream() {
    111         return this.inputStream;
    112     }
    113 
    114     public OutputStream getOutputStream() {
    115         return this.outputStream;
    116     }
    117 
    118     public int getReadTimeout() {
    119         return this.inputStream.readTimeout;
    120     }
    121 
    122     public void setReadTimeout(int timeout) {
    123         if (timeout < 0) {
    124             throw new IllegalArgumentException("Timeout must be >= 0");
    125         }
    126         this.inputStream.readTimeout = timeout;
    127     }
    128 
    129     /**
    130      * Returns whether both streams should be closed automatically if one of the streams is closed.
    131      * Default is <code>false</code>.
    132      *
    133      * @return <code>true</code> if both streams will be closed if one of the streams is closed,
    134      *         <code>false</code> if both streams can be closed independently.
    135      */
    136     public boolean isCloseBothStreamsEnabled() {
    137         return closeBothStreamsEnabled;
    138     }
    139 
    140     /**
    141      * Sets whether both streams should be closed automatically if one of the streams is closed.
    142      * Default is <code>false</code>.
    143      *
    144      * @param closeBothStreamsEnabled <code>true</code> if both streams should be closed if one of
    145      *        the streams is closed, <code>false</code> if both streams should be closed
    146      *        independently
    147      */
    148     public void setCloseBothStreamsEnabled(boolean closeBothStreamsEnabled) {
    149         this.closeBothStreamsEnabled = closeBothStreamsEnabled;
    150     }
    151 
    152     public void close() throws IOException {
    153         closeByLocal(true); // close input stream
    154         closeByLocal(false); // close output stream
    155     }
    156 
    157     /**
    158      * This method is invoked if a request to close the In-Band Bytestream has been received.
    159      *
    160      * @param closeRequest the close request from the remote peer
    161      */
    162     protected void closeByPeer(Close closeRequest) {
    163 
    164         /*
    165          * close streams without flushing them, because stream is already considered closed on the
    166          * remote peers side
    167          */
    168         this.inputStream.closeInternal();
    169         this.inputStream.cleanup();
    170         this.outputStream.closeInternal(false);
    171 
    172         // acknowledge close request
    173         IQ confirmClose = IQ.createResultIQ(closeRequest);
    174         this.connection.sendPacket(confirmClose);
    175 
    176     }
    177 
    178     /**
    179      * This method is invoked if one of the streams has been closed locally, if an error occurred
    180      * locally or if the whole session should be closed.
    181      *
    182      * @throws IOException if an error occurs while sending the close request
    183      */
    184     protected synchronized void closeByLocal(boolean in) throws IOException {
    185         if (this.isClosed) {
    186             return;
    187         }
    188 
    189         if (this.closeBothStreamsEnabled) {
    190             this.inputStream.closeInternal();
    191             this.outputStream.closeInternal(true);
    192         }
    193         else {
    194             if (in) {
    195                 this.inputStream.closeInternal();
    196             }
    197             else {
    198                 // close stream but try to send any data left
    199                 this.outputStream.closeInternal(true);
    200             }
    201         }
    202 
    203         if (this.inputStream.isClosed && this.outputStream.isClosed) {
    204             this.isClosed = true;
    205 
    206             // send close request
    207             Close close = new Close(this.byteStreamRequest.getSessionID());
    208             close.setTo(this.remoteJID);
    209             try {
    210                 SyncPacketSend.getReply(this.connection, close);
    211             }
    212             catch (XMPPException e) {
    213                 throw new IOException("Error while closing stream: " + e.getMessage());
    214             }
    215 
    216             this.inputStream.cleanup();
    217 
    218             // remove session from manager
    219             InBandBytestreamManager.getByteStreamManager(this.connection).getSessions().remove(this);
    220         }
    221 
    222     }
    223 
    224     /**
    225      * IBBInputStream class is the base implementation of an In-Band Bytestream input stream.
    226      * Subclasses of this input stream must provide a packet listener along with a packet filter to
    227      * collect the In-Band Bytestream data packets.
    228      */
    229     private abstract class IBBInputStream extends InputStream {
    230 
    231         /* the data packet listener to fill the data queue */
    232         private final PacketListener dataPacketListener;
    233 
    234         /* queue containing received In-Band Bytestream data packets */
    235         protected final BlockingQueue<DataPacketExtension> dataQueue = new LinkedBlockingQueue<DataPacketExtension>();
    236 
    237         /* buffer containing the data from one data packet */
    238         private byte[] buffer;
    239 
    240         /* pointer to the next byte to read from buffer */
    241         private int bufferPointer = -1;
    242 
    243         /* data packet sequence (range from 0 to 65535) */
    244         private long seq = -1;
    245 
    246         /* flag to indicate if input stream is closed */
    247         private boolean isClosed = false;
    248 
    249         /* flag to indicate if close method was invoked */
    250         private boolean closeInvoked = false;
    251 
    252         /* timeout for read operations */
    253         private int readTimeout = 0;
    254 
    255         /**
    256          * Constructor.
    257          */
    258         public IBBInputStream() {
    259             // add data packet listener to connection
    260             this.dataPacketListener = getDataPacketListener();
    261             connection.addPacketListener(this.dataPacketListener, getDataPacketFilter());
    262         }
    263 
    264         /**
    265          * Returns the packet listener that processes In-Band Bytestream data packets.
    266          *
    267          * @return the data packet listener
    268          */
    269         protected abstract PacketListener getDataPacketListener();
    270 
    271         /**
    272          * Returns the packet filter that accepts In-Band Bytestream data packets.
    273          *
    274          * @return the data packet filter
    275          */
    276         protected abstract PacketFilter getDataPacketFilter();
    277 
    278         public synchronized int read() throws IOException {
    279             checkClosed();
    280 
    281             // if nothing read yet or whole buffer has been read fill buffer
    282             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
    283                 // if no data available and stream was closed return -1
    284                 if (!loadBuffer()) {
    285                     return -1;
    286                 }
    287             }
    288 
    289             // return byte and increment buffer pointer
    290             return ((int) buffer[bufferPointer++]) & 0xff;
    291         }
    292 
    293         public synchronized int read(byte[] b, int off, int len) throws IOException {
    294             if (b == null) {
    295                 throw new NullPointerException();
    296             }
    297             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
    298                             || ((off + len) < 0)) {
    299                 throw new IndexOutOfBoundsException();
    300             }
    301             else if (len == 0) {
    302                 return 0;
    303             }
    304 
    305             checkClosed();
    306 
    307             // if nothing read yet or whole buffer has been read fill buffer
    308             if (bufferPointer == -1 || bufferPointer >= buffer.length) {
    309                 // if no data available and stream was closed return -1
    310                 if (!loadBuffer()) {
    311                     return -1;
    312                 }
    313             }
    314 
    315             // if more bytes wanted than available return all available
    316             int bytesAvailable = buffer.length - bufferPointer;
    317             if (len > bytesAvailable) {
    318                 len = bytesAvailable;
    319             }
    320 
    321             System.arraycopy(buffer, bufferPointer, b, off, len);
    322             bufferPointer += len;
    323             return len;
    324         }
    325 
    326         public synchronized int read(byte[] b) throws IOException {
    327             return read(b, 0, b.length);
    328         }
    329 
    330         /**
    331          * This method blocks until a data packet is received, the stream is closed or the current
    332          * thread is interrupted.
    333          *
    334          * @return <code>true</code> if data was received, otherwise <code>false</code>
    335          * @throws IOException if data packets are out of sequence
    336          */
    337         private synchronized boolean loadBuffer() throws IOException {
    338 
    339             // wait until data is available or stream is closed
    340             DataPacketExtension data = null;
    341             try {
    342                 if (this.readTimeout == 0) {
    343                     while (data == null) {
    344                         if (isClosed && this.dataQueue.isEmpty()) {
    345                             return false;
    346                         }
    347                         data = this.dataQueue.poll(1000, TimeUnit.MILLISECONDS);
    348                     }
    349                 }
    350                 else {
    351                     data = this.dataQueue.poll(this.readTimeout, TimeUnit.MILLISECONDS);
    352                     if (data == null) {
    353                         throw new SocketTimeoutException();
    354                     }
    355                 }
    356             }
    357             catch (InterruptedException e) {
    358                 // Restore the interrupted status
    359                 Thread.currentThread().interrupt();
    360                 return false;
    361             }
    362 
    363             // handle sequence overflow
    364             if (this.seq == 65535) {
    365                 this.seq = -1;
    366             }
    367 
    368             // check if data packets sequence is successor of last seen sequence
    369             long seq = data.getSeq();
    370             if (seq - 1 != this.seq) {
    371                 // packets out of order; close stream/session
    372                 InBandBytestreamSession.this.close();
    373                 throw new IOException("Packets out of sequence");
    374             }
    375             else {
    376                 this.seq = seq;
    377             }
    378 
    379             // set buffer to decoded data
    380             buffer = data.getDecodedData();
    381             bufferPointer = 0;
    382             return true;
    383         }
    384 
    385         /**
    386          * Checks if this stream is closed and throws an IOException if necessary
    387          *
    388          * @throws IOException if stream is closed and no data should be read anymore
    389          */
    390         private void checkClosed() throws IOException {
    391             /* throw no exception if there is data available, but not if close method was invoked */
    392             if ((isClosed && this.dataQueue.isEmpty()) || closeInvoked) {
    393                 // clear data queue in case additional data was received after stream was closed
    394                 this.dataQueue.clear();
    395                 throw new IOException("Stream is closed");
    396             }
    397         }
    398 
    399         public boolean markSupported() {
    400             return false;
    401         }
    402 
    403         public void close() throws IOException {
    404             if (isClosed) {
    405                 return;
    406             }
    407 
    408             this.closeInvoked = true;
    409 
    410             InBandBytestreamSession.this.closeByLocal(true);
    411         }
    412 
    413         /**
    414          * This method sets the close flag and removes the data packet listener.
    415          */
    416         private void closeInternal() {
    417             if (isClosed) {
    418                 return;
    419             }
    420             isClosed = true;
    421         }
    422 
    423         /**
    424          * Invoked if the session is closed.
    425          */
    426         private void cleanup() {
    427             connection.removePacketListener(this.dataPacketListener);
    428         }
    429 
    430     }
    431 
    432     /**
    433      * IQIBBInputStream class implements IBBInputStream to be used with IQ stanzas encapsulating the
    434      * data packets.
    435      */
    436     private class IQIBBInputStream extends IBBInputStream {
    437 
    438         protected PacketListener getDataPacketListener() {
    439             return new PacketListener() {
    440 
    441                 private long lastSequence = -1;
    442 
    443                 public void processPacket(Packet packet) {
    444                     // get data packet extension
    445                     DataPacketExtension data = (DataPacketExtension) packet.getExtension(
    446                                     DataPacketExtension.ELEMENT_NAME,
    447                                     InBandBytestreamManager.NAMESPACE);
    448 
    449                     /*
    450                      * check if sequence was not used already (see XEP-0047 Section 2.2)
    451                      */
    452                     if (data.getSeq() <= this.lastSequence) {
    453                         IQ unexpectedRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
    454                                         XMPPError.Condition.unexpected_request));
    455                         connection.sendPacket(unexpectedRequest);
    456                         return;
    457 
    458                     }
    459 
    460                     // check if encoded data is valid (see XEP-0047 Section 2.2)
    461                     if (data.getDecodedData() == null) {
    462                         // data is invalid; respond with bad-request error
    463                         IQ badRequest = IQ.createErrorResponse((IQ) packet, new XMPPError(
    464                                         XMPPError.Condition.bad_request));
    465                         connection.sendPacket(badRequest);
    466                         return;
    467                     }
    468 
    469                     // data is valid; add to data queue
    470                     dataQueue.offer(data);
    471 
    472                     // confirm IQ
    473                     IQ confirmData = IQ.createResultIQ((IQ) packet);
    474                     connection.sendPacket(confirmData);
    475 
    476                     // set last seen sequence
    477                     this.lastSequence = data.getSeq();
    478                     if (this.lastSequence == 65535) {
    479                         this.lastSequence = -1;
    480                     }
    481 
    482                 }
    483 
    484             };
    485         }
    486 
    487         protected PacketFilter getDataPacketFilter() {
    488             /*
    489              * filter all IQ stanzas having type 'SET' (represented by Data class), containing a
    490              * data packet extension, matching session ID and recipient
    491              */
    492             return new AndFilter(new PacketTypeFilter(Data.class), new IBBDataPacketFilter());
    493         }
    494 
    495     }
    496 
    497     /**
    498      * MessageIBBInputStream class implements IBBInputStream to be used with message stanzas
    499      * encapsulating the data packets.
    500      */
    501     private class MessageIBBInputStream extends IBBInputStream {
    502 
    503         protected PacketListener getDataPacketListener() {
    504             return new PacketListener() {
    505 
    506                 public void processPacket(Packet packet) {
    507                     // get data packet extension
    508                     DataPacketExtension data = (DataPacketExtension) packet.getExtension(
    509                                     DataPacketExtension.ELEMENT_NAME,
    510                                     InBandBytestreamManager.NAMESPACE);
    511 
    512                     // check if encoded data is valid
    513                     if (data.getDecodedData() == null) {
    514                         /*
    515                          * TODO once a majority of XMPP server implementation support XEP-0079
    516                          * Advanced Message Processing the invalid message could be answered with an
    517                          * appropriate error. For now we just ignore the packet. Subsequent packets
    518                          * with an increased sequence will cause the input stream to close the
    519                          * stream/session.
    520                          */
    521                         return;
    522                     }
    523 
    524                     // data is valid; add to data queue
    525                     dataQueue.offer(data);
    526 
    527                     // TODO confirm packet once XMPP servers support XEP-0079
    528                 }
    529 
    530             };
    531         }
    532 
    533         @Override
    534         protected PacketFilter getDataPacketFilter() {
    535             /*
    536              * filter all message stanzas containing a data packet extension, matching session ID
    537              * and recipient
    538              */
    539             return new AndFilter(new PacketTypeFilter(Message.class), new IBBDataPacketFilter());
    540         }
    541 
    542     }
    543 
    544     /**
    545      * IBBDataPacketFilter class filters all packets from the remote peer of this session,
    546      * containing an In-Band Bytestream data packet extension whose session ID matches this sessions
    547      * ID.
    548      */
    549     private class IBBDataPacketFilter implements PacketFilter {
    550 
    551         public boolean accept(Packet packet) {
    552             // sender equals remote peer
    553             if (!packet.getFrom().equalsIgnoreCase(remoteJID)) {
    554                 return false;
    555             }
    556 
    557             // stanza contains data packet extension
    558             PacketExtension packetExtension = packet.getExtension(DataPacketExtension.ELEMENT_NAME,
    559                             InBandBytestreamManager.NAMESPACE);
    560             if (packetExtension == null || !(packetExtension instanceof DataPacketExtension)) {
    561                 return false;
    562             }
    563 
    564             // session ID equals this session ID
    565             DataPacketExtension data = (DataPacketExtension) packetExtension;
    566             if (!data.getSessionID().equals(byteStreamRequest.getSessionID())) {
    567                 return false;
    568             }
    569 
    570             return true;
    571         }
    572 
    573     }
    574 
    575     /**
    576      * IBBOutputStream class is the base implementation of an In-Band Bytestream output stream.
    577      * Subclasses of this output stream must provide a method to send data over XMPP stream.
    578      */
    579     private abstract class IBBOutputStream extends OutputStream {
    580 
    581         /* buffer with the size of this sessions block size */
    582         protected final byte[] buffer;
    583 
    584         /* pointer to next byte to write to buffer */
    585         protected int bufferPointer = 0;
    586 
    587         /* data packet sequence (range from 0 to 65535) */
    588         protected long seq = 0;
    589 
    590         /* flag to indicate if output stream is closed */
    591         protected boolean isClosed = false;
    592 
    593         /**
    594          * Constructor.
    595          */
    596         public IBBOutputStream() {
    597             this.buffer = new byte[(byteStreamRequest.getBlockSize()/4)*3];
    598         }
    599 
    600         /**
    601          * Writes the given data packet to the XMPP stream.
    602          *
    603          * @param data the data packet
    604          * @throws IOException if an I/O error occurred while sending or if the stream is closed
    605          */
    606         protected abstract void writeToXML(DataPacketExtension data) throws IOException;
    607 
    608         public synchronized void write(int b) throws IOException {
    609             if (this.isClosed) {
    610                 throw new IOException("Stream is closed");
    611             }
    612 
    613             // if buffer is full flush buffer
    614             if (bufferPointer >= buffer.length) {
    615                 flushBuffer();
    616             }
    617 
    618             buffer[bufferPointer++] = (byte) b;
    619         }
    620 
    621         public synchronized void write(byte b[], int off, int len) throws IOException {
    622             if (b == null) {
    623                 throw new NullPointerException();
    624             }
    625             else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length)
    626                             || ((off + len) < 0)) {
    627                 throw new IndexOutOfBoundsException();
    628             }
    629             else if (len == 0) {
    630                 return;
    631             }
    632 
    633             if (this.isClosed) {
    634                 throw new IOException("Stream is closed");
    635             }
    636 
    637             // is data to send greater than buffer size
    638             if (len >= buffer.length) {
    639 
    640                 // "byte" off the first chunk to write out
    641                 writeOut(b, off, buffer.length);
    642 
    643                 // recursively call this method with the lesser amount
    644                 write(b, off + buffer.length, len - buffer.length);
    645             }
    646             else {
    647                 writeOut(b, off, len);
    648             }
    649         }
    650 
    651         public synchronized void write(byte[] b) throws IOException {
    652             write(b, 0, b.length);
    653         }
    654 
    655         /**
    656          * Fills the buffer with the given data and sends it over the XMPP stream if the buffers
    657          * capacity has been reached. This method is only called from this class so it is assured
    658          * that the amount of data to send is <= buffer capacity
    659          *
    660          * @param b the data
    661          * @param off the data
    662          * @param len the number of bytes to write
    663          * @throws IOException if an I/O error occurred while sending or if the stream is closed
    664          */
    665         private synchronized void writeOut(byte b[], int off, int len) throws IOException {
    666             if (this.isClosed) {
    667                 throw new IOException("Stream is closed");
    668             }
    669 
    670             // set to 0 in case the next 'if' block is not executed
    671             int available = 0;
    672 
    673             // is data to send greater that buffer space left
    674             if (len > buffer.length - bufferPointer) {
    675                 // fill buffer to capacity and send it
    676                 available = buffer.length - bufferPointer;
    677                 System.arraycopy(b, off, buffer, bufferPointer, available);
    678                 bufferPointer += available;
    679                 flushBuffer();
    680             }
    681 
    682             // copy the data left to buffer
    683             System.arraycopy(b, off + available, buffer, bufferPointer, len - available);
    684             bufferPointer += len - available;
    685         }
    686 
    687         public synchronized void flush() throws IOException {
    688             if (this.isClosed) {
    689                 throw new IOException("Stream is closed");
    690             }
    691             flushBuffer();
    692         }
    693 
    694         private synchronized void flushBuffer() throws IOException {
    695 
    696             // do nothing if no data to send available
    697             if (bufferPointer == 0) {
    698                 return;
    699             }
    700 
    701             // create data packet
    702             String enc = StringUtils.encodeBase64(buffer, 0, bufferPointer, false);
    703             DataPacketExtension data = new DataPacketExtension(byteStreamRequest.getSessionID(),
    704                             this.seq, enc);
    705 
    706             // write to XMPP stream
    707             writeToXML(data);
    708 
    709             // reset buffer pointer
    710             bufferPointer = 0;
    711 
    712             // increment sequence, considering sequence overflow
    713             this.seq = (this.seq + 1 == 65535 ? 0 : this.seq + 1);
    714 
    715         }
    716 
    717         public void close() throws IOException {
    718             if (isClosed) {
    719                 return;
    720             }
    721             InBandBytestreamSession.this.closeByLocal(false);
    722         }
    723 
    724         /**
    725          * Sets the close flag and optionally flushes the stream.
    726          *
    727          * @param flush if <code>true</code> flushes the stream
    728          */
    729         protected void closeInternal(boolean flush) {
    730             if (this.isClosed) {
    731                 return;
    732             }
    733             this.isClosed = true;
    734 
    735             try {
    736                 if (flush) {
    737                     flushBuffer();
    738                 }
    739             }
    740             catch (IOException e) {
    741                 /*
    742                  * ignore, because writeToXML() will not throw an exception if stream is already
    743                  * closed
    744                  */
    745             }
    746         }
    747 
    748     }
    749 
    750     /**
    751      * IQIBBOutputStream class implements IBBOutputStream to be used with IQ stanzas encapsulating
    752      * the data packets.
    753      */
    754     private class IQIBBOutputStream extends IBBOutputStream {
    755 
    756         @Override
    757         protected synchronized void writeToXML(DataPacketExtension data) throws IOException {
    758             // create IQ stanza containing data packet
    759             IQ iq = new Data(data);
    760             iq.setTo(remoteJID);
    761 
    762             try {
    763                 SyncPacketSend.getReply(connection, iq);
    764             }
    765             catch (XMPPException e) {
    766                 // close session unless it is already closed
    767                 if (!this.isClosed) {
    768                     InBandBytestreamSession.this.close();
    769                     throw new IOException("Error while sending Data: " + e.getMessage());
    770                 }
    771             }
    772 
    773         }
    774 
    775     }
    776 
    777     /**
    778      * MessageIBBOutputStream class implements IBBOutputStream to be used with message stanzas
    779      * encapsulating the data packets.
    780      */
    781     private class MessageIBBOutputStream extends IBBOutputStream {
    782 
    783         @Override
    784         protected synchronized void writeToXML(DataPacketExtension data) {
    785             // create message stanza containing data packet
    786             Message message = new Message(remoteJID);
    787             message.addExtension(data);
    788 
    789             connection.sendPacket(message);
    790 
    791         }
    792 
    793     }
    794 
    795 }
    796