Home | History | Annotate | Download | only in channels
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  * Copyright (c) 2000, 2012, Oracle and/or its affiliates. All rights reserved.
      4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      5  *
      6  * This code is free software; you can redistribute it and/or modify it
      7  * under the terms of the GNU General Public License version 2 only, as
      8  * published by the Free Software Foundation.  Oracle designates this
      9  * particular file as subject to the "Classpath" exception as provided
     10  * by Oracle in the LICENSE file that accompanied this code.
     11  *
     12  * This code is distributed in the hope that it will be useful, but WITHOUT
     13  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     14  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     15  * version 2 for more details (a copy is included in the LICENSE file that
     16  * accompanied this code).
     17  *
     18  * You should have received a copy of the GNU General Public License version
     19  * 2 along with this work; if not, write to the Free Software Foundation,
     20  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     21  *
     22  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     23  * or visit www.oracle.com if you need additional information or have any
     24  * questions.
     25  */
     26 
     27 package java.nio.channels;
     28 
     29 import java.io.FileInputStream;
     30 import java.io.FileOutputStream;
     31 import java.io.InputStream;
     32 import java.io.OutputStream;
     33 import java.io.Reader;
     34 import java.io.Writer;
     35 import java.io.IOException;
     36 import java.nio.ByteBuffer;
     37 import java.nio.charset.Charset;
     38 import java.nio.charset.CharsetDecoder;
     39 import java.nio.charset.CharsetEncoder;
     40 import java.nio.charset.UnsupportedCharsetException;
     41 import java.nio.channels.spi.AbstractInterruptibleChannel;
     42 import java.util.concurrent.ExecutionException;
     43 import sun.nio.ch.ChannelInputStream;
     44 import sun.nio.cs.StreamDecoder;
     45 import sun.nio.cs.StreamEncoder;
     46 
     47 
     48 /**
     49  * Utility methods for channels and streams.
     50  *
     51  * <p> This class defines static methods that support the interoperation of the
     52  * stream classes of the <tt>{@link java.io}</tt> package with the channel
     53  * classes of this package.  </p>
     54  *
     55  *
     56  * @author Mark Reinhold
     57  * @author Mike McCloskey
     58  * @author JSR-51 Expert Group
     59  * @since 1.4
     60  */
     61 
     62 public final class Channels {
     63 
     64     private Channels() { }              // No instantiation
     65 
     66     private static void checkNotNull(Object o, String name) {
     67         if (o == null)
     68             throw new NullPointerException("\"" + name + "\" is null!");
     69     }
     70 
     71     /**
     72      * Write all remaining bytes in buffer to the given channel.
     73      * If the channel is selectable then it must be configured blocking.
     74      */
     75     private static void writeFullyImpl(WritableByteChannel ch, ByteBuffer bb)
     76         throws IOException
     77     {
     78         while (bb.remaining() > 0) {
     79             int n = ch.write(bb);
     80             if (n <= 0)
     81                 throw new RuntimeException("no bytes written");
     82         }
     83     }
     84 
     85     /**
     86      * Write all remaining bytes in buffer to the given channel.
     87      *
     88      * @throws  IllegalBlockingModeException
     89      *          If the channel is selectable and configured non-blocking.
     90      */
     91     private static void writeFully(WritableByteChannel ch, ByteBuffer bb)
     92         throws IOException
     93     {
     94         if (ch instanceof SelectableChannel) {
     95             SelectableChannel sc = (SelectableChannel)ch;
     96             synchronized (sc.blockingLock()) {
     97                 if (!sc.isBlocking())
     98                     throw new IllegalBlockingModeException();
     99                 writeFullyImpl(ch, bb);
    100             }
    101         } else {
    102             writeFullyImpl(ch, bb);
    103         }
    104     }
    105 
    106     // -- Byte streams from channels --
    107 
    108     /**
    109      * Constructs a stream that reads bytes from the given channel.
    110      *
    111      * <p> The <tt>read</tt> methods of the resulting stream will throw an
    112      * {@link IllegalBlockingModeException} if invoked while the underlying
    113      * channel is in non-blocking mode.  The stream will not be buffered, and
    114      * it will not support the {@link InputStream#mark mark} or {@link
    115      * InputStream#reset reset} methods.  The stream will be safe for access by
    116      * multiple concurrent threads.  Closing the stream will in turn cause the
    117      * channel to be closed.  </p>
    118      *
    119      * @param  ch
    120      *         The channel from which bytes will be read
    121      *
    122      * @return  A new input stream
    123      */
    124     public static InputStream newInputStream(ReadableByteChannel ch) {
    125         checkNotNull(ch, "ch");
    126         return new sun.nio.ch.ChannelInputStream(ch);
    127     }
    128 
    129     /**
    130      * Constructs a stream that writes bytes to the given channel.
    131      *
    132      * <p> The <tt>write</tt> methods of the resulting stream will throw an
    133      * {@link IllegalBlockingModeException} if invoked while the underlying
    134      * channel is in non-blocking mode.  The stream will not be buffered.  The
    135      * stream will be safe for access by multiple concurrent threads.  Closing
    136      * the stream will in turn cause the channel to be closed.  </p>
    137      *
    138      * @param  ch
    139      *         The channel to which bytes will be written
    140      *
    141      * @return  A new output stream
    142      */
    143     public static OutputStream newOutputStream(final WritableByteChannel ch) {
    144         checkNotNull(ch, "ch");
    145 
    146         return new OutputStream() {
    147 
    148                 private ByteBuffer bb = null;
    149                 private byte[] bs = null;       // Invoker's previous array
    150                 private byte[] b1 = null;
    151 
    152                 public synchronized void write(int b) throws IOException {
    153                    if (b1 == null)
    154                         b1 = new byte[1];
    155                     b1[0] = (byte)b;
    156                     this.write(b1);
    157                 }
    158 
    159                 public synchronized void write(byte[] bs, int off, int len)
    160                     throws IOException
    161                 {
    162                     if ((off < 0) || (off > bs.length) || (len < 0) ||
    163                         ((off + len) > bs.length) || ((off + len) < 0)) {
    164                         throw new IndexOutOfBoundsException();
    165                     } else if (len == 0) {
    166                         return;
    167                     }
    168                     ByteBuffer bb = ((this.bs == bs)
    169                                      ? this.bb
    170                                      : ByteBuffer.wrap(bs));
    171                     bb.limit(Math.min(off + len, bb.capacity()));
    172                     bb.position(off);
    173                     this.bb = bb;
    174                     this.bs = bs;
    175                     Channels.writeFully(ch, bb);
    176                 }
    177 
    178                 public void close() throws IOException {
    179                     ch.close();
    180                 }
    181 
    182             };
    183     }
    184 
    185     /**
    186      * Constructs a stream that reads bytes from the given channel.
    187      *
    188      * <p> The stream will not be buffered, and it will not support the {@link
    189      * InputStream#mark mark} or {@link InputStream#reset reset} methods.  The
    190      * stream will be safe for access by multiple concurrent threads.  Closing
    191      * the stream will in turn cause the channel to be closed.  </p>
    192      *
    193      * @param  ch
    194      *         The channel from which bytes will be read
    195      *
    196      * @return  A new input stream
    197      *
    198      * @since 1.7
    199      */
    200     public static InputStream newInputStream(final AsynchronousByteChannel ch) {
    201         checkNotNull(ch, "ch");
    202         return new InputStream() {
    203 
    204             private ByteBuffer bb = null;
    205             private byte[] bs = null;           // Invoker's previous array
    206             private byte[] b1 = null;
    207 
    208             @Override
    209             public synchronized int read() throws IOException {
    210                 if (b1 == null)
    211                     b1 = new byte[1];
    212                 int n = this.read(b1);
    213                 if (n == 1)
    214                     return b1[0] & 0xff;
    215                 return -1;
    216             }
    217 
    218             @Override
    219             public synchronized int read(byte[] bs, int off, int len)
    220                 throws IOException
    221             {
    222                 if ((off < 0) || (off > bs.length) || (len < 0) ||
    223                     ((off + len) > bs.length) || ((off + len) < 0)) {
    224                     throw new IndexOutOfBoundsException();
    225                 } else if (len == 0)
    226                     return 0;
    227 
    228                 ByteBuffer bb = ((this.bs == bs)
    229                                  ? this.bb
    230                                  : ByteBuffer.wrap(bs));
    231                 bb.position(off);
    232                 bb.limit(Math.min(off + len, bb.capacity()));
    233                 this.bb = bb;
    234                 this.bs = bs;
    235 
    236                 boolean interrupted = false;
    237                 try {
    238                     for (;;) {
    239                         try {
    240                             return ch.read(bb).get();
    241                         } catch (ExecutionException ee) {
    242                             throw new IOException(ee.getCause());
    243                         } catch (InterruptedException ie) {
    244                             interrupted = true;
    245                         }
    246                     }
    247                 } finally {
    248                     if (interrupted)
    249                         Thread.currentThread().interrupt();
    250                 }
    251             }
    252 
    253             @Override
    254             public void close() throws IOException {
    255                 ch.close();
    256             }
    257         };
    258     }
    259 
    260     /**
    261      * Constructs a stream that writes bytes to the given channel.
    262      *
    263      * <p> The stream will not be buffered. The stream will be safe for access
    264      * by multiple concurrent threads.  Closing the stream will in turn cause
    265      * the channel to be closed.  </p>
    266      *
    267      * @param  ch
    268      *         The channel to which bytes will be written
    269      *
    270      * @return  A new output stream
    271      *
    272      * @since 1.7
    273      */
    274     public static OutputStream newOutputStream(final AsynchronousByteChannel ch) {
    275         checkNotNull(ch, "ch");
    276         return new OutputStream() {
    277 
    278             private ByteBuffer bb = null;
    279             private byte[] bs = null;   // Invoker's previous array
    280             private byte[] b1 = null;
    281 
    282             @Override
    283             public synchronized void write(int b) throws IOException {
    284                if (b1 == null)
    285                     b1 = new byte[1];
    286                 b1[0] = (byte)b;
    287                 this.write(b1);
    288             }
    289 
    290             @Override
    291             public synchronized void write(byte[] bs, int off, int len)
    292                 throws IOException
    293             {
    294                 if ((off < 0) || (off > bs.length) || (len < 0) ||
    295                     ((off + len) > bs.length) || ((off + len) < 0)) {
    296                     throw new IndexOutOfBoundsException();
    297                 } else if (len == 0) {
    298                     return;
    299                 }
    300                 ByteBuffer bb = ((this.bs == bs)
    301                                  ? this.bb
    302                                  : ByteBuffer.wrap(bs));
    303                 bb.limit(Math.min(off + len, bb.capacity()));
    304                 bb.position(off);
    305                 this.bb = bb;
    306                 this.bs = bs;
    307 
    308                 boolean interrupted = false;
    309                 try {
    310                     while (bb.remaining() > 0) {
    311                         try {
    312                             ch.write(bb).get();
    313                         } catch (ExecutionException ee) {
    314                             throw new IOException(ee.getCause());
    315                         } catch (InterruptedException ie) {
    316                             interrupted = true;
    317                         }
    318                     }
    319                 } finally {
    320                     if (interrupted)
    321                         Thread.currentThread().interrupt();
    322                 }
    323             }
    324 
    325             @Override
    326             public void close() throws IOException {
    327                 ch.close();
    328             }
    329         };
    330     }
    331 
    332 
    333     // -- Channels from streams --
    334 
    335     /**
    336      * Constructs a channel that reads bytes from the given stream.
    337      *
    338      * <p> The resulting channel will not be buffered; it will simply redirect
    339      * its I/O operations to the given stream.  Closing the channel will in
    340      * turn cause the stream to be closed.  </p>
    341      *
    342      * @param  in
    343      *         The stream from which bytes are to be read
    344      *
    345      * @return  A new readable byte channel
    346      */
    347     public static ReadableByteChannel newChannel(final InputStream in) {
    348         checkNotNull(in, "in");
    349 
    350         if (in instanceof FileInputStream &&
    351             FileInputStream.class.equals(in.getClass())) {
    352             return ((FileInputStream)in).getChannel();
    353         }
    354 
    355         return new ReadableByteChannelImpl(in);
    356     }
    357 
    358     private static class ReadableByteChannelImpl
    359         extends AbstractInterruptibleChannel    // Not really interruptible
    360         implements ReadableByteChannel
    361     {
    362         InputStream in;
    363         private static final int TRANSFER_SIZE = 8192;
    364         private byte buf[] = new byte[0];
    365         private boolean open = true;
    366         private Object readLock = new Object();
    367 
    368         ReadableByteChannelImpl(InputStream in) {
    369             this.in = in;
    370         }
    371 
    372         public int read(ByteBuffer dst) throws IOException {
    373             int len = dst.remaining();
    374             int totalRead = 0;
    375             int bytesRead = 0;
    376             synchronized (readLock) {
    377                 while (totalRead < len) {
    378                     int bytesToRead = Math.min((len - totalRead),
    379                                                TRANSFER_SIZE);
    380                     if (buf.length < bytesToRead)
    381                         buf = new byte[bytesToRead];
    382                     if ((totalRead > 0) && !(in.available() > 0))
    383                         break; // block at most once
    384                     try {
    385                         begin();
    386                         bytesRead = in.read(buf, 0, bytesToRead);
    387                     } finally {
    388                         end(bytesRead > 0);
    389                     }
    390                     if (bytesRead < 0)
    391                         break;
    392                     else
    393                         totalRead += bytesRead;
    394                     dst.put(buf, 0, bytesRead);
    395                 }
    396                 if ((bytesRead < 0) && (totalRead == 0))
    397                     return -1;
    398 
    399                 return totalRead;
    400             }
    401         }
    402 
    403         protected void implCloseChannel() throws IOException {
    404             in.close();
    405             open = false;
    406         }
    407     }
    408 
    409 
    410     /**
    411      * Constructs a channel that writes bytes to the given stream.
    412      *
    413      * <p> The resulting channel will not be buffered; it will simply redirect
    414      * its I/O operations to the given stream.  Closing the channel will in
    415      * turn cause the stream to be closed.  </p>
    416      *
    417      * @param  out
    418      *         The stream to which bytes are to be written
    419      *
    420      * @return  A new writable byte channel
    421      */
    422     public static WritableByteChannel newChannel(final OutputStream out) {
    423         checkNotNull(out, "out");
    424         return new WritableByteChannelImpl(out);
    425     }
    426 
    427     private static class WritableByteChannelImpl
    428         extends AbstractInterruptibleChannel    // Not really interruptible
    429         implements WritableByteChannel
    430     {
    431         OutputStream out;
    432         private static final int TRANSFER_SIZE = 8192;
    433         private byte buf[] = new byte[0];
    434         private boolean open = true;
    435         private Object writeLock = new Object();
    436 
    437         WritableByteChannelImpl(OutputStream out) {
    438             this.out = out;
    439         }
    440 
    441         public int write(ByteBuffer src) throws IOException {
    442             int len = src.remaining();
    443             int totalWritten = 0;
    444             synchronized (writeLock) {
    445                 while (totalWritten < len) {
    446                     int bytesToWrite = Math.min((len - totalWritten),
    447                                                 TRANSFER_SIZE);
    448                     if (buf.length < bytesToWrite)
    449                         buf = new byte[bytesToWrite];
    450                     src.get(buf, 0, bytesToWrite);
    451                     try {
    452                         begin();
    453                         out.write(buf, 0, bytesToWrite);
    454                     } finally {
    455                         end(bytesToWrite > 0);
    456                     }
    457                     totalWritten += bytesToWrite;
    458                 }
    459                 return totalWritten;
    460             }
    461         }
    462 
    463         protected void implCloseChannel() throws IOException {
    464             out.close();
    465             open = false;
    466         }
    467     }
    468 
    469 
    470     // -- Character streams from channels --
    471 
    472     /**
    473      * Constructs a reader that decodes bytes from the given channel using the
    474      * given decoder.
    475      *
    476      * <p> The resulting stream will contain an internal input buffer of at
    477      * least <tt>minBufferCap</tt> bytes.  The stream's <tt>read</tt> methods
    478      * will, as needed, fill the buffer by reading bytes from the underlying
    479      * channel; if the channel is in non-blocking mode when bytes are to be
    480      * read then an {@link IllegalBlockingModeException} will be thrown.  The
    481      * resulting stream will not otherwise be buffered, and it will not support
    482      * the {@link Reader#mark mark} or {@link Reader#reset reset} methods.
    483      * Closing the stream will in turn cause the channel to be closed.  </p>
    484      *
    485      * @param  ch
    486      *         The channel from which bytes will be read
    487      *
    488      * @param  dec
    489      *         The charset decoder to be used
    490      *
    491      * @param  minBufferCap
    492      *         The minimum capacity of the internal byte buffer,
    493      *         or <tt>-1</tt> if an implementation-dependent
    494      *         default capacity is to be used
    495      *
    496      * @return  A new reader
    497      */
    498     public static Reader newReader(ReadableByteChannel ch,
    499                                    CharsetDecoder dec,
    500                                    int minBufferCap)
    501     {
    502         checkNotNull(ch, "ch");
    503         return StreamDecoder.forDecoder(ch, dec.reset(), minBufferCap);
    504     }
    505 
    506     /**
    507      * Constructs a reader that decodes bytes from the given channel according
    508      * to the named charset.
    509      *
    510      * <p> An invocation of this method of the form
    511      *
    512      * <blockquote><pre>
    513      * Channels.newReader(ch, csname)</pre></blockquote>
    514      *
    515      * behaves in exactly the same way as the expression
    516      *
    517      * <blockquote><pre>
    518      * Channels.newReader(ch,
    519      *                    Charset.forName(csName)
    520      *                        .newDecoder(),
    521      *                    -1);</pre></blockquote>
    522      *
    523      * @param  ch
    524      *         The channel from which bytes will be read
    525      *
    526      * @param  csName
    527      *         The name of the charset to be used
    528      *
    529      * @return  A new reader
    530      *
    531      * @throws  UnsupportedCharsetException
    532      *          If no support for the named charset is available
    533      *          in this instance of the Java virtual machine
    534      */
    535     public static Reader newReader(ReadableByteChannel ch,
    536                                    String csName)
    537     {
    538         checkNotNull(csName, "csName");
    539         return newReader(ch, Charset.forName(csName).newDecoder(), -1);
    540     }
    541 
    542     /**
    543      * Constructs a writer that encodes characters using the given encoder and
    544      * writes the resulting bytes to the given channel.
    545      *
    546      * <p> The resulting stream will contain an internal output buffer of at
    547      * least <tt>minBufferCap</tt> bytes.  The stream's <tt>write</tt> methods
    548      * will, as needed, flush the buffer by writing bytes to the underlying
    549      * channel; if the channel is in non-blocking mode when bytes are to be
    550      * written then an {@link IllegalBlockingModeException} will be thrown.
    551      * The resulting stream will not otherwise be buffered.  Closing the stream
    552      * will in turn cause the channel to be closed.  </p>
    553      *
    554      * @param  ch
    555      *         The channel to which bytes will be written
    556      *
    557      * @param  enc
    558      *         The charset encoder to be used
    559      *
    560      * @param  minBufferCap
    561      *         The minimum capacity of the internal byte buffer,
    562      *         or <tt>-1</tt> if an implementation-dependent
    563      *         default capacity is to be used
    564      *
    565      * @return  A new writer
    566      */
    567     public static Writer newWriter(final WritableByteChannel ch,
    568                                    final CharsetEncoder enc,
    569                                    final int minBufferCap)
    570     {
    571         checkNotNull(ch, "ch");
    572         return StreamEncoder.forEncoder(ch, enc.reset(), minBufferCap);
    573     }
    574 
    575     /**
    576      * Constructs a writer that encodes characters according to the named
    577      * charset and writes the resulting bytes to the given channel.
    578      *
    579      * <p> An invocation of this method of the form
    580      *
    581      * <blockquote><pre>
    582      * Channels.newWriter(ch, csname)</pre></blockquote>
    583      *
    584      * behaves in exactly the same way as the expression
    585      *
    586      * <blockquote><pre>
    587      * Channels.newWriter(ch,
    588      *                    Charset.forName(csName)
    589      *                        .newEncoder(),
    590      *                    -1);</pre></blockquote>
    591      *
    592      * @param  ch
    593      *         The channel to which bytes will be written
    594      *
    595      * @param  csName
    596      *         The name of the charset to be used
    597      *
    598      * @return  A new writer
    599      *
    600      * @throws  UnsupportedCharsetException
    601      *          If no support for the named charset is available
    602      *          in this instance of the Java virtual machine
    603      */
    604     public static Writer newWriter(WritableByteChannel ch,
    605                                    String csName)
    606     {
    607         checkNotNull(csName, "csName");
    608         return newWriter(ch, Charset.forName(csName).newEncoder(), -1);
    609     }
    610 }
    611