Home | History | Annotate | Download | only in io
      1 /*
      2  *  Licensed to the Apache Software Foundation (ASF) under one or more
      3  *  contributor license agreements.  See the NOTICE file distributed with
      4  *  this work for additional information regarding copyright ownership.
      5  *  The ASF licenses this file to You under the Apache License, Version 2.0
      6  *  (the "License"); you may not use this file except in compliance with
      7  *  the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  */
     17 
     18 package java.io;
     19 
     20 import java.util.Arrays;
     21 import libcore.io.IoUtils;
     22 
     23 /**
     24  * Receives information from a communications pipe. When two threads want to
     25  * pass data back and forth, one creates a piped output stream and the other one
     26  * creates a piped input stream.
     27  *
     28  * @see PipedOutputStream
     29  */
     30 public class PipedInputStream extends InputStream {
     31 
     32     private Thread lastReader;
     33 
     34     private Thread lastWriter;
     35 
     36     private boolean isClosed;
     37 
     38     /**
     39      * The circular buffer through which data is passed. Data is read from the
     40      * range {@code [out, in)} and written to the range {@code [in, out)}.
     41      * Data in the buffer is either sequential: <pre>
     42      *     { - - - X X X X X X X - - - - - }
     43      *             ^             ^
     44      *             |             |
     45      *            out           in</pre>
     46      * ...or wrapped around the buffer's end: <pre>
     47      *     { X X X X - - - - - - - - X X X }
     48      *               ^               ^
     49      *               |               |
     50      *              in              out</pre>
     51      * When the buffer is empty, {@code in == -1}. Reading when the buffer is
     52      * empty will block until data is available. When the buffer is full,
     53      * {@code in == out}. Writing when the buffer is full will block until free
     54      * space is available.
     55      */
     56     protected byte[] buffer;
     57 
     58     /**
     59      * The index in {@code buffer} where the next byte will be written.
     60      */
     61     protected int in = -1;
     62 
     63     /**
     64      * The index in {@code buffer} where the next byte will be read.
     65      */
     66     protected int out;
     67 
     68     /**
     69      * The size of the default pipe in bytes.
     70      */
     71     protected static final int PIPE_SIZE = 1024;
     72 
     73     /**
     74      * Indicates if this pipe is connected.
     75      */
     76     boolean isConnected;
     77 
     78     /**
     79      * Constructs a new unconnected {@code PipedInputStream}. The resulting
     80      * stream must be connected to a {@link PipedOutputStream} before data may
     81      * be read from it.
     82      */
     83     public PipedInputStream() {}
     84 
     85     /**
     86      * Constructs a new {@code PipedInputStream} connected to the
     87      * {@link PipedOutputStream} {@code out}. Any data written to the output
     88      * stream can be read from the this input stream.
     89      *
     90      * @param out
     91      *            the piped output stream to connect to.
     92      * @throws IOException
     93      *             if this stream or {@code out} are already connected.
     94      */
     95     public PipedInputStream(PipedOutputStream out) throws IOException {
     96         connect(out);
     97     }
     98 
     99     /**
    100      * Constructs a new unconnected {@code PipedInputStream} with the given
    101      * buffer size. The resulting stream must be connected to a
    102      * {@code PipedOutputStream} before data may be read from it.
    103      *
    104      * @param pipeSize the size of the buffer in bytes.
    105      * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
    106      * @since 1.6
    107      */
    108     public PipedInputStream(int pipeSize) {
    109         if (pipeSize <= 0) {
    110             throw new IllegalArgumentException("pipe size " + pipeSize + " too small");
    111         }
    112         buffer = new byte[pipeSize];
    113     }
    114 
    115     /**
    116      * Constructs a new {@code PipedInputStream} connected to the given {@code PipedOutputStream},
    117      * with the given buffer size. Any data written to the output stream can be read from this
    118      * input stream.
    119      *
    120      * @param out the {@code PipedOutputStream} to connect to.
    121      * @param pipeSize the size of the buffer in bytes.
    122      * @throws IOException if an I/O error occurs.
    123      * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
    124      * @since 1.6
    125      */
    126     public PipedInputStream(PipedOutputStream out, int pipeSize) throws IOException {
    127         this(pipeSize);
    128         connect(out);
    129     }
    130 
    131     /**
    132      * {@inheritDoc}
    133      *
    134      * <p>Unlike most streams, {@code PipedInputStream} returns 0 rather than throwing
    135      * {@code IOException} if the stream has been closed. Unconnected and broken pipes also
    136      * return 0.
    137      *
    138      * @throws IOException if an I/O error occurs
    139      */
    140     @Override
    141     public synchronized int available() throws IOException {
    142         if (buffer == null || in == -1) {
    143             return 0;
    144         }
    145         return in <= out ? buffer.length - out + in : in - out;
    146     }
    147 
    148     /**
    149      * Closes this stream. This implementation releases the buffer used for the
    150      * pipe and notifies all threads waiting to read or write.
    151      *
    152      * @throws IOException
    153      *             if an error occurs while closing this stream.
    154      */
    155     @Override
    156     public synchronized void close() throws IOException {
    157         buffer = null;
    158         notifyAll();
    159     }
    160 
    161     /**
    162      * Connects this {@code PipedInputStream} to a {@link PipedOutputStream}.
    163      * Any data written to the output stream becomes readable in this input
    164      * stream.
    165      *
    166      * @param src
    167      *            the source output stream.
    168      * @throws IOException
    169      *             if either stream is already connected.
    170      */
    171     public void connect(PipedOutputStream src) throws IOException {
    172         src.connect(this);
    173     }
    174 
    175     /**
    176      * Establishes the connection to the PipedOutputStream.
    177      *
    178      * @throws IOException
    179      *             If this Reader is already connected.
    180      */
    181     synchronized void establishConnection() throws IOException {
    182         if (isConnected) {
    183             throw new IOException("Pipe already connected");
    184         }
    185         if (buffer == null) { // We may already have allocated the buffer.
    186             buffer = new byte[PipedInputStream.PIPE_SIZE];
    187         }
    188         isConnected = true;
    189     }
    190 
    191     /**
    192      * Reads a single byte from this stream and returns it as an integer in the
    193      * range from 0 to 255. Returns -1 if the end of this stream has been
    194      * reached. If there is no data in the pipe, this method blocks until data
    195      * is available, the end of the stream is detected or an exception is
    196      * thrown.
    197      * <p>
    198      * Separate threads should be used to read from a {@code PipedInputStream}
    199      * and to write to the connected {@link PipedOutputStream}. If the same
    200      * thread is used, a deadlock may occur.
    201      *
    202      * @return the byte read or -1 if the end of the source stream has been
    203      *         reached.
    204      * @throws IOException
    205      *             if this stream is closed or not connected to an output
    206      *             stream, or if the thread writing to the connected output
    207      *             stream is no longer alive.
    208      */
    209     @Override
    210     public synchronized int read() throws IOException {
    211         if (!isConnected) {
    212             throw new IOException("Not connected");
    213         }
    214         if (buffer == null) {
    215             throw new IOException("InputStream is closed");
    216         }
    217 
    218         /**
    219          * Set the last thread to be reading on this PipedInputStream. If
    220          * lastReader dies while someone is waiting to write an IOException of
    221          * "Pipe broken" will be thrown in receive()
    222          */
    223         lastReader = Thread.currentThread();
    224         try {
    225             int attempts = 3;
    226             while (in == -1) {
    227                 // Are we at end of stream?
    228                 if (isClosed) {
    229                     return -1;
    230                 }
    231                 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
    232                     throw new IOException("Pipe broken");
    233                 }
    234                 // Notify callers of receive()
    235                 notifyAll();
    236                 wait(1000);
    237             }
    238         } catch (InterruptedException e) {
    239             IoUtils.throwInterruptedIoException();
    240         }
    241 
    242         int result = buffer[out++] & 0xff;
    243         if (out == buffer.length) {
    244             out = 0;
    245         }
    246         if (out == in) {
    247             // empty buffer
    248             in = -1;
    249             out = 0;
    250         }
    251 
    252         // let blocked writers write to the newly available buffer space
    253         notifyAll();
    254 
    255         return result;
    256     }
    257 
    258     /**
    259      * Reads at most {@code byteCount} bytes from this stream and stores them in the
    260      * byte array {@code bytes} starting at {@code offset}. Blocks until at
    261      * least one byte has been read, the end of the stream is detected or an
    262      * exception is thrown.
    263      * <p>
    264      * Separate threads should be used to read from a {@code PipedInputStream}
    265      * and to write to the connected {@link PipedOutputStream}. If the same
    266      * thread is used, a deadlock may occur.
    267      *
    268      * @return the number of bytes actually read or -1 if the end of the stream
    269      *         has been reached.
    270      * @throws IndexOutOfBoundsException
    271      *             if {@code offset < 0} or {@code byteCount < 0}, or if {@code
    272      *             offset + byteCount} is greater than the size of {@code bytes}.
    273      * @throws InterruptedIOException
    274      *             if the thread reading from this stream is interrupted.
    275      * @throws IOException
    276      *             if this stream is closed or not connected to an output
    277      *             stream, or if the thread writing to the connected output
    278      *             stream is no longer alive.
    279      * @throws NullPointerException
    280      *             if {@code bytes} is {@code null}.
    281      */
    282     @Override
    283     public synchronized int read(byte[] bytes, int offset, int byteCount) throws IOException {
    284         Arrays.checkOffsetAndCount(bytes.length, offset, byteCount);
    285         if (byteCount == 0) {
    286             return 0;
    287         }
    288 
    289         if (!isConnected) {
    290             throw new IOException("Not connected");
    291         }
    292 
    293         if (buffer == null) {
    294             throw new IOException("InputStream is closed");
    295         }
    296 
    297         /*
    298          * Set the last thread to be reading on this PipedInputStream. If
    299          * lastReader dies while someone is waiting to write an IOException of
    300          * "Pipe broken" will be thrown in receive()
    301          */
    302         lastReader = Thread.currentThread();
    303         try {
    304             int attempts = 3;
    305             while (in == -1) {
    306                 // Are we at end of stream?
    307                 if (isClosed) {
    308                     return -1;
    309                 }
    310                 if ((attempts-- <= 0) && lastWriter != null && !lastWriter.isAlive()) {
    311                     throw new IOException("Pipe broken");
    312                 }
    313                 // Notify callers of receive()
    314                 notifyAll();
    315                 wait(1000);
    316             }
    317         } catch (InterruptedException e) {
    318             IoUtils.throwInterruptedIoException();
    319         }
    320 
    321         int totalCopied = 0;
    322 
    323         // copy bytes from out thru the end of buffer
    324         if (out >= in) {
    325             int leftInBuffer = buffer.length - out;
    326             int length = leftInBuffer < byteCount ? leftInBuffer : byteCount;
    327             System.arraycopy(buffer, out, bytes, offset, length);
    328             out += length;
    329             if (out == buffer.length) {
    330                 out = 0;
    331             }
    332             if (out == in) {
    333                 // empty buffer
    334                 in = -1;
    335                 out = 0;
    336             }
    337             totalCopied += length;
    338         }
    339 
    340         // copy bytes from out thru in
    341         if (totalCopied < byteCount && in != -1) {
    342             int leftInBuffer = in - out;
    343             int leftToCopy = byteCount - totalCopied;
    344             int length = leftToCopy < leftInBuffer ? leftToCopy : leftInBuffer;
    345             System.arraycopy(buffer, out, bytes, offset + totalCopied, length);
    346             out += length;
    347             if (out == in) {
    348                 // empty buffer
    349                 in = -1;
    350                 out = 0;
    351             }
    352             totalCopied += length;
    353         }
    354 
    355         // let blocked writers write to the newly available buffer space
    356         notifyAll();
    357 
    358         return totalCopied;
    359     }
    360 
    361     /**
    362      * Receives a byte and stores it in this stream's {@code buffer}. This
    363      * method is called by {@link PipedOutputStream#write(int)}. The least
    364      * significant byte of the integer {@code oneByte} is stored at index
    365      * {@code in} in the {@code buffer}.
    366      * <p>
    367      * This method blocks as long as {@code buffer} is full.
    368      *
    369      * @param oneByte
    370      *            the byte to store in this pipe.
    371      * @throws InterruptedIOException
    372      *             if the {@code buffer} is full and the thread that has called
    373      *             this method is interrupted.
    374      * @throws IOException
    375      *             if this stream is closed or the thread that has last read
    376      *             from this stream is no longer alive.
    377      */
    378     protected synchronized void receive(int oneByte) throws IOException {
    379         if (buffer == null || isClosed) {
    380             throw new IOException("Pipe is closed");
    381         }
    382 
    383         /*
    384          * Set the last thread to be writing on this PipedInputStream. If
    385          * lastWriter dies while someone is waiting to read an IOException of
    386          * "Pipe broken" will be thrown in read()
    387          */
    388         lastWriter = Thread.currentThread();
    389         try {
    390             while (buffer != null && out == in) {
    391                 if (lastReader != null && !lastReader.isAlive()) {
    392                     throw new IOException("Pipe broken");
    393                 }
    394                 notifyAll();
    395                 wait(1000);
    396             }
    397         } catch (InterruptedException e) {
    398             IoUtils.throwInterruptedIoException();
    399         }
    400         if (buffer == null) {
    401             throw new IOException("Pipe is closed");
    402         }
    403         if (in == -1) {
    404             in = 0;
    405         }
    406         buffer[in++] = (byte) oneByte;
    407         if (in == buffer.length) {
    408             in = 0;
    409         }
    410 
    411         // let blocked readers read the newly available data
    412         notifyAll();
    413     }
    414 
    415     synchronized void done() {
    416         isClosed = true;
    417         notifyAll();
    418     }
    419 }
    420