1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package java.io; 19 20 import java.util.Arrays; 21 import libcore.io.IoUtils; 22 23 /** 24 * Receives information on a communications pipe. When two threads want to pass 25 * data back and forth, one creates a piped writer and the other creates a piped 26 * reader. 27 * 28 * @see PipedWriter 29 */ 30 public class PipedReader extends Reader { 31 32 private Thread lastReader; 33 34 private Thread lastWriter; 35 36 private boolean isClosed; 37 38 /** 39 * The circular buffer through which data is passed. Data is read from the 40 * range {@code [out, in)} and written to the range {@code [in, out)}. 41 * Data in the buffer is either sequential: <pre> 42 * { - - - X X X X X X X - - - - - } 43 * ^ ^ 44 * | | 45 * out in</pre> 46 * ...or wrapped around the buffer's end: <pre> 47 * { X X X X - - - - - - - - X X X } 48 * ^ ^ 49 * | | 50 * in out</pre> 51 * When the buffer is empty, {@code in == -1}. Reading when the buffer is 52 * empty will block until data is available. When the buffer is full, 53 * {@code in == out}. Writing when the buffer is full will block until free 54 * space is available. 55 */ 56 private char[] buffer; 57 58 /** 59 * The index in {@code buffer} where the next character will be written. 60 */ 61 private int in = -1; 62 63 /** 64 * The index in {@code buffer} where the next character will be read. 65 */ 66 private int out; 67 68 /** 69 * The size of the default pipe in characters 70 */ 71 private static final int PIPE_SIZE = 1024; 72 73 /** 74 * Indicates if this pipe is connected 75 */ 76 boolean isConnected; 77 78 /** 79 * Constructs a new unconnected {@code PipedReader}. The resulting reader 80 * must be connected to a {@code PipedWriter} before data may be read from 81 * it. 82 */ 83 public PipedReader() {} 84 85 /** 86 * Constructs a new {@code PipedReader} connected to the {@link PipedWriter} 87 * {@code out}. Any data written to the writer can be read from the this 88 * reader. 89 * 90 * @param out 91 * the {@code PipedWriter} to connect to. 92 * @throws IOException 93 * if {@code out} is already connected. 94 */ 95 public PipedReader(PipedWriter out) throws IOException { 96 connect(out); 97 } 98 99 /** 100 * Constructs a new unconnected {@code PipedReader} with the given buffer size. 101 * The resulting reader must be connected to a {@code PipedWriter} before 102 * data may be read from it. 103 * 104 * @param pipeSize the size of the buffer in chars. 105 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 106 * @since 1.6 107 */ 108 public PipedReader(int pipeSize) { 109 if (pipeSize <= 0) { 110 throw new IllegalArgumentException("pipe size " + pipeSize + " too small"); 111 } 112 buffer = new char[pipeSize]; 113 } 114 115 /** 116 * Constructs a new {@code PipedReader} connected to the given {@code PipedWriter}, 117 * with the given buffer size. Any data written to the writer can be read from 118 * this reader. 119 * 120 * @param out the {@code PipedWriter} to connect to. 121 * @param pipeSize the size of the buffer in chars. 122 * @throws IOException if an I/O error occurs 123 * @throws IllegalArgumentException if pipeSize is less than or equal to zero. 124 * @since 1.6 125 */ 126 public PipedReader(PipedWriter out, int pipeSize) throws IOException { 127 this(pipeSize); 128 connect(out); 129 } 130 131 /** 132 * Closes this reader. This implementation releases the buffer used for 133 * the pipe and notifies all threads waiting to read or write. 134 * 135 * @throws IOException 136 * if an error occurs while closing this reader. 137 */ 138 @Override 139 public synchronized void close() throws IOException { 140 buffer = null; 141 isClosed = true; 142 notifyAll(); 143 } 144 145 /** 146 * Connects this {@code PipedReader} to a {@link PipedWriter}. Any data 147 * written to the writer becomes readable in this reader. 148 * 149 * @param src 150 * the writer to connect to. 151 * @throws IOException 152 * if this reader is closed or already connected, or if {@code 153 * src} is already connected. 154 */ 155 public void connect(PipedWriter src) throws IOException { 156 src.connect(this); 157 } 158 159 /** 160 * Establishes the connection to the PipedWriter. 161 * 162 * @throws IOException 163 * If this Reader is already connected. 164 */ 165 synchronized void establishConnection() throws IOException { 166 if (isConnected) { 167 throw new IOException("Pipe already connected"); 168 } 169 if (isClosed) { 170 throw new IOException("Pipe is closed"); 171 } 172 if (buffer == null) { // We may already have allocated the buffer. 173 buffer = new char[PIPE_SIZE]; 174 } 175 isConnected = true; 176 } 177 178 /** 179 * Reads a single character from this reader and returns it as an integer 180 * with the two higher-order bytes set to 0. Returns -1 if the end of the 181 * reader has been reached. If there is no data in the pipe, this method 182 * blocks until data is available, the end of the reader is detected or an 183 * exception is thrown. 184 * <p> 185 * Separate threads should be used to read from a {@code PipedReader} and to 186 * write to the connected {@link PipedWriter}. If the same thread is used, 187 * a deadlock may occur. 188 * 189 * @return the character read or -1 if the end of the reader has been 190 * reached. 191 * @throws IOException 192 * if this reader is closed or some other I/O error occurs. 193 */ 194 @Override 195 public int read() throws IOException { 196 char[] chars = new char[1]; 197 int result = read(chars, 0, 1); 198 return result != -1 ? chars[0] : result; 199 } 200 201 /** 202 * Reads up to {@code count} characters from this reader and stores them 203 * in the character array {@code buffer} starting at {@code offset}. If 204 * there is no data in the pipe, this method blocks until at least one byte 205 * has been read, the end of the reader is detected or an exception is 206 * thrown. 207 * 208 * <p>Separate threads should be used to read from a {@code PipedReader} and to 209 * write to the connected {@link PipedWriter}. If the same thread is used, a 210 * deadlock may occur. 211 * 212 * <p>Returns the number of characters read or -1 if the end of the reader has 213 * been reached. 214 * 215 * @throws IndexOutOfBoundsException 216 * if {@code offset < 0 || count < 0 || offset + count > buffer.length}. 217 * @throws InterruptedIOException 218 * if the thread reading from this reader is interrupted. 219 * @throws IOException 220 * if this reader is closed or not connected to a writer, or if 221 * the thread writing to the connected writer is no longer 222 * alive. 223 */ 224 @Override public synchronized int read(char[] buffer, int offset, int count) throws IOException { 225 if (!isConnected) { 226 throw new IOException("Pipe not connected"); 227 } 228 if (this.buffer == null) { 229 throw new IOException("Pipe is closed"); 230 } 231 Arrays.checkOffsetAndCount(buffer.length, offset, count); 232 if (count == 0) { 233 return 0; 234 } 235 /** 236 * Set the last thread to be reading on this PipedReader. If 237 * lastReader dies while someone is waiting to write an IOException 238 * of "Pipe broken" will be thrown in receive() 239 */ 240 lastReader = Thread.currentThread(); 241 try { 242 boolean first = true; 243 while (in == -1) { 244 // Are we at end of stream? 245 if (isClosed) { 246 return -1; 247 } 248 if (!first && lastWriter != null && !lastWriter.isAlive()) { 249 throw new IOException("Pipe broken"); 250 } 251 first = false; 252 // Notify callers of receive() 253 notifyAll(); 254 wait(1000); 255 } 256 } catch (InterruptedException e) { 257 IoUtils.throwInterruptedIoException(); 258 } 259 260 int copyLength = 0; 261 /* Copy chars from out to end of buffer first */ 262 if (out >= in) { 263 copyLength = count > this.buffer.length - out ? this.buffer.length - out : count; 264 System.arraycopy(this.buffer, out, buffer, offset, copyLength); 265 out += copyLength; 266 if (out == this.buffer.length) { 267 out = 0; 268 } 269 if (out == in) { 270 // empty buffer 271 in = -1; 272 out = 0; 273 } 274 } 275 276 /* 277 * Did the read fully succeed in the previous copy or is the buffer 278 * empty? 279 */ 280 if (copyLength == count || in == -1) { 281 return copyLength; 282 } 283 284 int charsCopied = copyLength; 285 /* Copy bytes from 0 to the number of available bytes */ 286 copyLength = in - out > count - copyLength ? count - copyLength : in - out; 287 System.arraycopy(this.buffer, out, buffer, offset + charsCopied, copyLength); 288 out += copyLength; 289 if (out == in) { 290 // empty buffer 291 in = -1; 292 out = 0; 293 } 294 return charsCopied + copyLength; 295 } 296 297 /** 298 * Indicates whether this reader is ready to be read without blocking. 299 * Returns {@code true} if this reader will not block when {@code read} is 300 * called, {@code false} if unknown or blocking will occur. This 301 * implementation returns {@code true} if the internal buffer contains 302 * characters that can be read. 303 * 304 * @return always {@code false}. 305 * @throws IOException 306 * if this reader is closed or not connected, or if some other 307 * I/O error occurs. 308 * @see #read() 309 * @see #read(char[], int, int) 310 */ 311 @Override 312 public synchronized boolean ready() throws IOException { 313 if (!isConnected) { 314 throw new IOException("Pipe not connected"); 315 } 316 if (buffer == null) { 317 throw new IOException("Pipe is closed"); 318 } 319 return in != -1; 320 } 321 322 /** 323 * Receives a char and stores it into the PipedReader. This called by 324 * PipedWriter.write() when writes occur. 325 * <P> 326 * If the buffer is full and the thread sending #receive is interrupted, the 327 * InterruptedIOException will be thrown. 328 * 329 * @param oneChar 330 * the char to store into the pipe. 331 * 332 * @throws IOException 333 * If the stream is already closed or another IOException 334 * occurs. 335 */ 336 synchronized void receive(char oneChar) throws IOException { 337 if (buffer == null) { 338 throw new IOException("Pipe is closed"); 339 } 340 if (lastReader != null && !lastReader.isAlive()) { 341 throw new IOException("Pipe broken"); 342 } 343 /* 344 * Set the last thread to be writing on this PipedWriter. If 345 * lastWriter dies while someone is waiting to read an IOException 346 * of "Pipe broken" will be thrown in read() 347 */ 348 lastWriter = Thread.currentThread(); 349 try { 350 while (buffer != null && out == in) { 351 notifyAll(); 352 wait(1000); 353 if (lastReader != null && !lastReader.isAlive()) { 354 throw new IOException("Pipe broken"); 355 } 356 } 357 } catch (InterruptedException e) { 358 IoUtils.throwInterruptedIoException(); 359 } 360 if (buffer == null) { 361 throw new IOException("Pipe is closed"); 362 } 363 if (in == -1) { 364 in = 0; 365 } 366 buffer[in++] = oneChar; 367 if (in == buffer.length) { 368 in = 0; 369 } 370 } 371 372 /** 373 * Receives a char array and stores it into the PipedReader. This called by 374 * PipedWriter.write() when writes occur. 375 * <P> 376 * If the buffer is full and the thread sending #receive is interrupted, the 377 * InterruptedIOException will be thrown. 378 * 379 * @throws IOException 380 * If the stream is already closed or another IOException 381 * occurs. 382 */ 383 synchronized void receive(char[] chars, int offset, int count) throws IOException { 384 Arrays.checkOffsetAndCount(chars.length, offset, count); 385 if (buffer == null) { 386 throw new IOException("Pipe is closed"); 387 } 388 if (lastReader != null && !lastReader.isAlive()) { 389 throw new IOException("Pipe broken"); 390 } 391 /** 392 * Set the last thread to be writing on this PipedWriter. If 393 * lastWriter dies while someone is waiting to read an IOException 394 * of "Pipe broken" will be thrown in read() 395 */ 396 lastWriter = Thread.currentThread(); 397 while (count > 0) { 398 try { 399 while (buffer != null && out == in) { 400 notifyAll(); 401 wait(1000); 402 if (lastReader != null && !lastReader.isAlive()) { 403 throw new IOException("Pipe broken"); 404 } 405 } 406 } catch (InterruptedException e) { 407 IoUtils.throwInterruptedIoException(); 408 } 409 if (buffer == null) { 410 throw new IOException("Pipe is closed"); 411 } 412 if (in == -1) { 413 in = 0; 414 } 415 if (in >= out) { 416 int length = buffer.length - in; 417 if (count < length) { 418 length = count; 419 } 420 System.arraycopy(chars, offset, buffer, in, length); 421 offset += length; 422 count -= length; 423 in += length; 424 if (in == buffer.length) { 425 in = 0; 426 } 427 } 428 if (count > 0 && in != out) { 429 int length = out - in; 430 if (count < length) { 431 length = count; 432 } 433 System.arraycopy(chars, offset, buffer, in, length); 434 offset += length; 435 count -= length; 436 in += length; 437 } 438 } 439 } 440 441 synchronized void done() { 442 isClosed = true; 443 notifyAll(); 444 } 445 } 446