Home | History | Annotate | Download | only in io
      1 /*
      2  *  Licensed to the Apache Software Foundation (ASF) under one or more
      3  *  contributor license agreements.  See the NOTICE file distributed with
      4  *  this work for additional information regarding copyright ownership.
      5  *  The ASF licenses this file to You under the Apache License, Version 2.0
      6  *  (the "License"); you may not use this file except in compliance with
      7  *  the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  */
     17 
     18 package java.io;
     19 
     20 import java.util.Arrays;
     21 import libcore.io.IoUtils;
     22 
     23 /**
     24  * Receives information on a communications pipe. When two threads want to pass
     25  * data back and forth, one creates a piped writer and the other creates a piped
     26  * reader.
     27  *
     28  * @see PipedWriter
     29  */
     30 public class PipedReader extends Reader {
     31 
     32     private Thread lastReader;
     33 
     34     private Thread lastWriter;
     35 
     36     private boolean isClosed;
     37 
     38     /**
     39      * The circular buffer through which data is passed. Data is read from the
     40      * range {@code [out, in)} and written to the range {@code [in, out)}.
     41      * Data in the buffer is either sequential: <pre>
     42      *     { - - - X X X X X X X - - - - - }
     43      *             ^             ^
     44      *             |             |
     45      *            out           in</pre>
     46      * ...or wrapped around the buffer's end: <pre>
     47      *     { X X X X - - - - - - - - X X X }
     48      *               ^               ^
     49      *               |               |
     50      *              in              out</pre>
     51      * When the buffer is empty, {@code in == -1}. Reading when the buffer is
     52      * empty will block until data is available. When the buffer is full,
     53      * {@code in == out}. Writing when the buffer is full will block until free
     54      * space is available.
     55      */
     56     private char[] buffer;
     57 
     58     /**
     59      * The index in {@code buffer} where the next character will be written.
     60      */
     61     private int in = -1;
     62 
     63     /**
     64      * The index in {@code buffer} where the next character will be read.
     65      */
     66     private int out;
     67 
     68     /**
     69      * The size of the default pipe in characters
     70      */
     71     private static final int PIPE_SIZE = 1024;
     72 
     73     /**
     74      * Indicates if this pipe is connected
     75      */
     76     boolean isConnected;
     77 
     78     /**
     79      * Constructs a new unconnected {@code PipedReader}. The resulting reader
     80      * must be connected to a {@code PipedWriter} before data may be read from
     81      * it.
     82      */
     83     public PipedReader() {}
     84 
     85     /**
     86      * Constructs a new {@code PipedReader} connected to the {@link PipedWriter}
     87      * {@code out}. Any data written to the writer can be read from the this
     88      * reader.
     89      *
     90      * @param out
     91      *            the {@code PipedWriter} to connect to.
     92      * @throws IOException
     93      *             if {@code out} is already connected.
     94      */
     95     public PipedReader(PipedWriter out) throws IOException {
     96         connect(out);
     97     }
     98 
     99     /**
    100      * Constructs a new unconnected {@code PipedReader} with the given buffer size.
    101      * The resulting reader must be connected to a {@code PipedWriter} before
    102      * data may be read from it.
    103      *
    104      * @param pipeSize the size of the buffer in chars.
    105      * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
    106      * @since 1.6
    107      */
    108     public PipedReader(int pipeSize) {
    109         if (pipeSize <= 0) {
    110             throw new IllegalArgumentException("pipe size " + pipeSize + " too small");
    111         }
    112         buffer = new char[pipeSize];
    113     }
    114 
    115     /**
    116      * Constructs a new {@code PipedReader} connected to the given {@code PipedWriter},
    117      * with the given buffer size. Any data written to the writer can be read from
    118      * this reader.
    119      *
    120      * @param out the {@code PipedWriter} to connect to.
    121      * @param pipeSize the size of the buffer in chars.
    122      * @throws IOException if an I/O error occurs
    123      * @throws IllegalArgumentException if pipeSize is less than or equal to zero.
    124      * @since 1.6
    125      */
    126     public PipedReader(PipedWriter out, int pipeSize) throws IOException {
    127         this(pipeSize);
    128         connect(out);
    129     }
    130 
    131     /**
    132      * Closes this reader. This implementation releases the buffer used for
    133      * the pipe and notifies all threads waiting to read or write.
    134      *
    135      * @throws IOException
    136      *             if an error occurs while closing this reader.
    137      */
    138     @Override
    139     public synchronized void close() throws IOException {
    140         buffer = null;
    141         isClosed = true;
    142         notifyAll();
    143     }
    144 
    145     /**
    146      * Connects this {@code PipedReader} to a {@link PipedWriter}. Any data
    147      * written to the writer becomes readable in this reader.
    148      *
    149      * @param src
    150      *            the writer to connect to.
    151      * @throws IOException
    152      *             if this reader is closed or already connected, or if {@code
    153      *             src} is already connected.
    154      */
    155     public void connect(PipedWriter src) throws IOException {
    156         src.connect(this);
    157     }
    158 
    159     /**
    160      * Establishes the connection to the PipedWriter.
    161      *
    162      * @throws IOException
    163      *             If this Reader is already connected.
    164      */
    165     synchronized void establishConnection() throws IOException {
    166         if (isConnected) {
    167             throw new IOException("Pipe already connected");
    168         }
    169         if (isClosed) {
    170             throw new IOException("Pipe is closed");
    171         }
    172         if (buffer == null) { // We may already have allocated the buffer.
    173             buffer = new char[PIPE_SIZE];
    174         }
    175         isConnected = true;
    176     }
    177 
    178     /**
    179      * Reads a single character from this reader and returns it as an integer
    180      * with the two higher-order bytes set to 0. Returns -1 if the end of the
    181      * reader has been reached. If there is no data in the pipe, this method
    182      * blocks until data is available, the end of the reader is detected or an
    183      * exception is thrown.
    184      * <p>
    185      * Separate threads should be used to read from a {@code PipedReader} and to
    186      * write to the connected {@link PipedWriter}. If the same thread is used,
    187      * a deadlock may occur.
    188      *
    189      * @return the character read or -1 if the end of the reader has been
    190      *         reached.
    191      * @throws IOException
    192      *             if this reader is closed or some other I/O error occurs.
    193      */
    194     @Override
    195     public int read() throws IOException {
    196         char[] chars = new char[1];
    197         int result = read(chars, 0, 1);
    198         return result != -1 ? chars[0] : result;
    199     }
    200 
    201     /**
    202      * Reads up to {@code count} characters from this reader and stores them
    203      * in the character array {@code buffer} starting at {@code offset}. If
    204      * there is no data in the pipe, this method blocks until at least one byte
    205      * has been read, the end of the reader is detected or an exception is
    206      * thrown.
    207      *
    208      * <p>Separate threads should be used to read from a {@code PipedReader} and to
    209      * write to the connected {@link PipedWriter}. If the same thread is used, a
    210      * deadlock may occur.
    211      *
    212      * <p>Returns the number of characters read or -1 if the end of the reader has
    213      * been reached.
    214      *
    215      * @throws IndexOutOfBoundsException
    216      *     if {@code offset < 0 || count < 0 || offset + count > buffer.length}.
    217      * @throws InterruptedIOException
    218      *             if the thread reading from this reader is interrupted.
    219      * @throws IOException
    220      *             if this reader is closed or not connected to a writer, or if
    221      *             the thread writing to the connected writer is no longer
    222      *             alive.
    223      */
    224     @Override public synchronized int read(char[] buffer, int offset, int count) throws IOException {
    225         if (!isConnected) {
    226             throw new IOException("Pipe not connected");
    227         }
    228         if (this.buffer == null) {
    229             throw new IOException("Pipe is closed");
    230         }
    231         Arrays.checkOffsetAndCount(buffer.length, offset, count);
    232         if (count == 0) {
    233             return 0;
    234         }
    235         /**
    236          * Set the last thread to be reading on this PipedReader. If
    237          * lastReader dies while someone is waiting to write an IOException
    238          * of "Pipe broken" will be thrown in receive()
    239          */
    240         lastReader = Thread.currentThread();
    241         try {
    242             boolean first = true;
    243             while (in == -1) {
    244                 // Are we at end of stream?
    245                 if (isClosed) {
    246                     return -1;
    247                 }
    248                 if (!first && lastWriter != null && !lastWriter.isAlive()) {
    249                     throw new IOException("Pipe broken");
    250                 }
    251                 first = false;
    252                 // Notify callers of receive()
    253                 notifyAll();
    254                 wait(1000);
    255             }
    256         } catch (InterruptedException e) {
    257             IoUtils.throwInterruptedIoException();
    258         }
    259 
    260         int copyLength = 0;
    261         /* Copy chars from out to end of buffer first */
    262         if (out >= in) {
    263             copyLength = count > this.buffer.length - out ? this.buffer.length - out : count;
    264             System.arraycopy(this.buffer, out, buffer, offset, copyLength);
    265             out += copyLength;
    266             if (out == this.buffer.length) {
    267                 out = 0;
    268             }
    269             if (out == in) {
    270                 // empty buffer
    271                 in = -1;
    272                 out = 0;
    273             }
    274         }
    275 
    276         /*
    277          * Did the read fully succeed in the previous copy or is the buffer
    278          * empty?
    279          */
    280         if (copyLength == count || in == -1) {
    281             return copyLength;
    282         }
    283 
    284         int charsCopied = copyLength;
    285         /* Copy bytes from 0 to the number of available bytes */
    286         copyLength = in - out > count - copyLength ? count - copyLength : in - out;
    287         System.arraycopy(this.buffer, out, buffer, offset + charsCopied, copyLength);
    288         out += copyLength;
    289         if (out == in) {
    290             // empty buffer
    291             in = -1;
    292             out = 0;
    293         }
    294         return charsCopied + copyLength;
    295     }
    296 
    297     /**
    298      * Indicates whether this reader is ready to be read without blocking.
    299      * Returns {@code true} if this reader will not block when {@code read} is
    300      * called, {@code false} if unknown or blocking will occur. This
    301      * implementation returns {@code true} if the internal buffer contains
    302      * characters that can be read.
    303      *
    304      * @return always {@code false}.
    305      * @throws IOException
    306      *             if this reader is closed or not connected, or if some other
    307      *             I/O error occurs.
    308      * @see #read()
    309      * @see #read(char[], int, int)
    310      */
    311     @Override
    312     public synchronized boolean ready() throws IOException {
    313         if (!isConnected) {
    314             throw new IOException("Pipe not connected");
    315         }
    316         if (buffer == null) {
    317             throw new IOException("Pipe is closed");
    318         }
    319         return in != -1;
    320     }
    321 
    322     /**
    323      * Receives a char and stores it into the PipedReader. This called by
    324      * PipedWriter.write() when writes occur.
    325      * <P>
    326      * If the buffer is full and the thread sending #receive is interrupted, the
    327      * InterruptedIOException will be thrown.
    328      *
    329      * @param oneChar
    330      *            the char to store into the pipe.
    331      *
    332      * @throws IOException
    333      *             If the stream is already closed or another IOException
    334      *             occurs.
    335      */
    336     synchronized void receive(char oneChar) throws IOException {
    337         if (buffer == null) {
    338             throw new IOException("Pipe is closed");
    339         }
    340         if (lastReader != null && !lastReader.isAlive()) {
    341             throw new IOException("Pipe broken");
    342         }
    343         /*
    344         * Set the last thread to be writing on this PipedWriter. If
    345         * lastWriter dies while someone is waiting to read an IOException
    346         * of "Pipe broken" will be thrown in read()
    347         */
    348         lastWriter = Thread.currentThread();
    349         try {
    350             while (buffer != null && out == in) {
    351                 notifyAll();
    352                 wait(1000);
    353                 if (lastReader != null && !lastReader.isAlive()) {
    354                     throw new IOException("Pipe broken");
    355                 }
    356             }
    357         } catch (InterruptedException e) {
    358             IoUtils.throwInterruptedIoException();
    359         }
    360         if (buffer == null) {
    361             throw new IOException("Pipe is closed");
    362         }
    363         if (in == -1) {
    364             in = 0;
    365         }
    366         buffer[in++] = oneChar;
    367         if (in == buffer.length) {
    368             in = 0;
    369         }
    370     }
    371 
    372     /**
    373      * Receives a char array and stores it into the PipedReader. This called by
    374      * PipedWriter.write() when writes occur.
    375      * <P>
    376      * If the buffer is full and the thread sending #receive is interrupted, the
    377      * InterruptedIOException will be thrown.
    378      *
    379      * @throws IOException
    380      *             If the stream is already closed or another IOException
    381      *             occurs.
    382      */
    383     synchronized void receive(char[] chars, int offset, int count) throws IOException {
    384         Arrays.checkOffsetAndCount(chars.length, offset, count);
    385         if (buffer == null) {
    386             throw new IOException("Pipe is closed");
    387         }
    388         if (lastReader != null && !lastReader.isAlive()) {
    389             throw new IOException("Pipe broken");
    390         }
    391         /**
    392          * Set the last thread to be writing on this PipedWriter. If
    393          * lastWriter dies while someone is waiting to read an IOException
    394          * of "Pipe broken" will be thrown in read()
    395          */
    396         lastWriter = Thread.currentThread();
    397         while (count > 0) {
    398             try {
    399                 while (buffer != null && out == in) {
    400                     notifyAll();
    401                     wait(1000);
    402                     if (lastReader != null && !lastReader.isAlive()) {
    403                         throw new IOException("Pipe broken");
    404                     }
    405                 }
    406             } catch (InterruptedException e) {
    407                 IoUtils.throwInterruptedIoException();
    408             }
    409             if (buffer == null) {
    410                 throw new IOException("Pipe is closed");
    411             }
    412             if (in == -1) {
    413                 in = 0;
    414             }
    415             if (in >= out) {
    416                 int length = buffer.length - in;
    417                 if (count < length) {
    418                     length = count;
    419                 }
    420                 System.arraycopy(chars, offset, buffer, in, length);
    421                 offset += length;
    422                 count -= length;
    423                 in += length;
    424                 if (in == buffer.length) {
    425                     in = 0;
    426                 }
    427             }
    428             if (count > 0 && in != out) {
    429                 int length = out - in;
    430                 if (count < length) {
    431                     length = count;
    432                 }
    433                 System.arraycopy(chars, offset, buffer, in, length);
    434                 offset += length;
    435                 count -= length;
    436                 in += length;
    437             }
    438         }
    439     }
    440 
    441     synchronized void done() {
    442         isClosed = true;
    443         notifyAll();
    444     }
    445 }
    446