Home | History | Annotate | Download | only in nio
      1 /*
      2  *  Licensed to the Apache Software Foundation (ASF) under one or more
      3  *  contributor license agreements.  See the NOTICE file distributed with
      4  *  this work for additional information regarding copyright ownership.
      5  *  The ASF licenses this file to You under the Apache License, Version 2.0
      6  *  (the "License"); you may not use this file except in compliance with
      7  *  the License.  You may obtain a copy of the License at
      8  *
      9  *     http://www.apache.org/licenses/LICENSE-2.0
     10  *
     11  *  Unless required by applicable law or agreed to in writing, software
     12  *  distributed under the License is distributed on an "AS IS" BASIS,
     13  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     14  *  See the License for the specific language governing permissions and
     15  *  limitations under the License.
     16  */
     17 
     18 package java.nio;
     19 
     20 import android.system.ErrnoException;
     21 import java.io.FileDescriptor;
     22 import java.io.InterruptedIOException;
     23 import java.io.IOException;
     24 import java.net.ConnectException;
     25 import java.net.DatagramPacket;
     26 import java.net.DatagramSocket;
     27 import java.net.DatagramSocketImpl;
     28 import java.net.Inet4Address;
     29 import java.net.InetAddress;
     30 import java.net.InetSocketAddress;
     31 import java.net.NetworkInterface;
     32 import java.net.PlainDatagramSocketImpl;
     33 import java.net.SocketAddress;
     34 import java.net.SocketException;
     35 import java.nio.channels.AlreadyConnectedException;
     36 import java.nio.channels.ClosedChannelException;
     37 import java.nio.channels.DatagramChannel;
     38 import java.nio.channels.IllegalBlockingModeException;
     39 import java.nio.channels.NotYetConnectedException;
     40 import java.nio.channels.spi.SelectorProvider;
     41 import java.nio.channels.UnresolvedAddressException;
     42 import java.nio.channels.UnsupportedAddressTypeException;
     43 import java.util.Arrays;
     44 import java.util.Set;
     45 import libcore.io.IoBridge;
     46 import libcore.io.IoUtils;
     47 import libcore.io.Libcore;
     48 import libcore.util.EmptyArray;
     49 
     50 /*
     51  * The default implementation class of java.nio.channels.DatagramChannel.
     52  */
     53 class DatagramChannelImpl extends DatagramChannel implements FileDescriptorChannel {
     54     // The fd to interact with native code
     55     private final FileDescriptor fd;
     56 
     57     // Our internal DatagramSocket.
     58     private DatagramSocket socket;
     59 
     60     // The remote address to be connected.
     61     InetSocketAddress connectAddress;
     62 
     63     // The local address.
     64     InetAddress localAddress;
     65 
     66     // local port
     67     private int localPort;
     68 
     69     // At first, uninitialized.
     70     boolean connected = false;
     71 
     72     // whether the socket is bound
     73     boolean isBound = false;
     74 
     75     private final Object readLock = new Object();
     76     private final Object writeLock = new Object();
     77 
     78     /*
     79      * Constructor
     80      */
     81     protected DatagramChannelImpl(SelectorProvider selectorProvider) throws IOException {
     82         super(selectorProvider);
     83         fd = IoBridge.socket(false);
     84     }
     85 
     86     /*
     87      * for native call
     88      */
     89     @SuppressWarnings("unused")
     90     private DatagramChannelImpl() {
     91         super(SelectorProvider.provider());
     92         fd = new FileDescriptor();
     93         connectAddress = new InetSocketAddress(0);
     94     }
     95 
     96     /*
     97      * Getting the internal DatagramSocket If we have not the socket, we create
     98      * a new one.
     99      */
    100     @Override
    101     synchronized public DatagramSocket socket() {
    102         if (socket == null) {
    103             socket = new DatagramSocketAdapter(new PlainDatagramSocketImpl(fd, localPort), this);
    104         }
    105         return socket;
    106     }
    107 
    108     /**
    109      * Initialise the isBound, localAddress and localPort state from the file descriptor. Used when
    110      * some or all of the bound state has been left to the OS to decide, or when the Socket handled
    111      * bind() or connect().
    112      *
    113      * @param updateSocketState
    114      *        if the associated socket (if present) needs to be updated
    115      * @hide used to sync state, non-private to avoid synthetic method
    116      */
    117     void onBind(boolean updateSocketState) {
    118         SocketAddress sa;
    119         try {
    120             sa = Libcore.os.getsockname(fd);
    121         } catch (ErrnoException errnoException) {
    122             throw new AssertionError(errnoException);
    123         }
    124         isBound = true;
    125         InetSocketAddress localSocketAddress = (InetSocketAddress) sa;
    126         localAddress = localSocketAddress.getAddress();
    127         localPort = localSocketAddress.getPort();
    128         if (updateSocketState && socket != null) {
    129             socket.onBind(localAddress, localPort);
    130         }
    131     }
    132 
    133     @Override
    134     synchronized public boolean isConnected() {
    135         return connected;
    136     }
    137 
    138     @Override
    139     synchronized public DatagramChannel connect(SocketAddress address) throws IOException {
    140         // must be open
    141         checkOpen();
    142         // status must be un-connected.
    143         if (connected) {
    144             throw new IllegalStateException();
    145         }
    146 
    147         // check the address
    148         InetSocketAddress inetSocketAddress = SocketChannelImpl.validateAddress(address);
    149         InetAddress remoteAddress = inetSocketAddress.getAddress();
    150         int remotePort = inetSocketAddress.getPort();
    151         try {
    152             begin();
    153             IoBridge.connect(fd, remoteAddress, remotePort);
    154         } catch (ConnectException e) {
    155             // ConnectException means connect fail, not exception
    156         } finally {
    157             end(true);
    158         }
    159 
    160         // connect() performs a bind() if an explicit bind() was not performed. Keep the local
    161         // address state held by the channel and the socket up to date.
    162         if (!isBound) {
    163             onBind(true /* updateSocketState */);
    164         }
    165 
    166         // Keep the connected state held by the channel and the socket up to date.
    167         onConnect(remoteAddress, remotePort, true /* updateSocketState */);
    168         return this;
    169     }
    170 
    171     /**
    172      * Initialize the state associated with being connected, optionally syncing the socket if there
    173      * is one.
    174      * @hide used to sync state, non-private to avoid synthetic method
    175      */
    176     void onConnect(InetAddress remoteAddress, int remotePort, boolean updateSocketState) {
    177         connected = true;
    178         connectAddress = new InetSocketAddress(remoteAddress, remotePort);
    179         if (updateSocketState && socket != null) {
    180             socket.onConnect(remoteAddress, remotePort);
    181         }
    182     }
    183 
    184     @Override
    185     synchronized public DatagramChannel disconnect() throws IOException {
    186         if (!isConnected() || !isOpen()) {
    187             return this;
    188         }
    189 
    190         // Keep the disconnected state held by the channel and the socket up to date.
    191         onDisconnect(true /* updateSocketState */);
    192 
    193         try {
    194             Libcore.os.connect(fd, InetAddress.UNSPECIFIED, 0);
    195         } catch (ErrnoException errnoException) {
    196             throw errnoException.rethrowAsIOException();
    197         }
    198         return this;
    199     }
    200 
    201     /**
    202      * Initialize the state associated with being disconnected, optionally syncing the socket if
    203      * there is one.
    204      * @hide used to sync state, non-private to avoid synthetic method
    205      */
    206     void onDisconnect(boolean updateSocketState) {
    207         connected = false;
    208         connectAddress = null;
    209         if (updateSocketState && socket != null && socket.isConnected()) {
    210             socket.onDisconnect();
    211         }
    212     }
    213 
    214     @Override
    215     public SocketAddress receive(ByteBuffer target) throws IOException {
    216         target.checkWritable();
    217         checkOpen();
    218 
    219         if (!isBound) {
    220             return null;
    221         }
    222 
    223         SocketAddress retAddr = null;
    224         try {
    225             begin();
    226 
    227             // receive real data packet, (not peek)
    228             synchronized (readLock) {
    229                 boolean loop = isBlocking();
    230                 if (!target.isDirect()) {
    231                     retAddr = receiveImpl(target, loop);
    232                 } else {
    233                     retAddr = receiveDirectImpl(target, loop);
    234                 }
    235             }
    236         } catch (InterruptedIOException e) {
    237             // this line used in Linux
    238             return null;
    239         } finally {
    240             end(retAddr != null);
    241         }
    242         return retAddr;
    243     }
    244 
    245     private SocketAddress receiveImpl(ByteBuffer target, boolean loop) throws IOException {
    246         SocketAddress retAddr = null;
    247         DatagramPacket receivePacket;
    248         int oldposition = target.position();
    249         int received;
    250         // TODO: disallow mapped buffers and lose this conditional?
    251         if (target.hasArray()) {
    252             receivePacket = new DatagramPacket(target.array(), target.position() + target.arrayOffset(), target.remaining());
    253         } else {
    254             receivePacket = new DatagramPacket(new byte[target.remaining()], target.remaining());
    255         }
    256         do {
    257             received = IoBridge.recvfrom(false, fd, receivePacket.getData(), receivePacket.getOffset(), receivePacket.getLength(), 0, receivePacket, isConnected());
    258             if (receivePacket.getAddress() != null) {
    259                 if (received > 0) {
    260                     if (target.hasArray()) {
    261                         target.position(oldposition + received);
    262                     } else {
    263                         // copy the data of received packet
    264                         target.put(receivePacket.getData(), 0, received);
    265                     }
    266                 }
    267                 retAddr = receivePacket.getSocketAddress();
    268                 break;
    269             }
    270         } while (loop);
    271         return retAddr;
    272     }
    273 
    274     private SocketAddress receiveDirectImpl(ByteBuffer target, boolean loop) throws IOException {
    275         SocketAddress retAddr = null;
    276         DatagramPacket receivePacket = new DatagramPacket(EmptyArray.BYTE, 0);
    277         do {
    278             IoBridge.recvfrom(false, fd, target, 0, receivePacket, isConnected());
    279             if (receivePacket.getAddress() != null) {
    280                 retAddr = receivePacket.getSocketAddress();
    281                 break;
    282             }
    283         } while (loop);
    284         return retAddr;
    285     }
    286 
    287     @Override
    288     public int send(ByteBuffer source, SocketAddress socketAddress) throws IOException {
    289         checkNotNull(source);
    290         checkOpen();
    291 
    292         InetSocketAddress isa = (InetSocketAddress) socketAddress;
    293         if (isa.getAddress() == null) {
    294             throw new IOException();
    295         }
    296 
    297         if (isConnected() && !connectAddress.equals(isa)) {
    298             throw new IllegalArgumentException("Connected to " + connectAddress +
    299                                                ", not " + socketAddress);
    300         }
    301 
    302         synchronized (writeLock) {
    303             int sendCount = 0;
    304             try {
    305                 begin();
    306                 sendCount = IoBridge.sendto(fd, source, 0, isa.getAddress(), isa.getPort());
    307                 if (!isBound) {
    308                     onBind(true /* updateSocketState */);
    309                 }
    310             } finally {
    311                 end(sendCount >= 0);
    312             }
    313             return sendCount;
    314         }
    315     }
    316 
    317     @Override
    318     public int read(ByteBuffer target) throws IOException {
    319         target.checkWritable();
    320         checkOpenConnected();
    321 
    322         if (!target.hasRemaining()) {
    323             return 0;
    324         }
    325 
    326         int readCount;
    327         if (target.isDirect() || target.hasArray()) {
    328             readCount = readImpl(target);
    329         } else {
    330             byte[] readArray = new byte[target.remaining()];
    331             ByteBuffer readBuffer = ByteBuffer.wrap(readArray);
    332             readCount = readImpl(readBuffer);
    333             if (readCount > 0) {
    334                 target.put(readArray, 0, readCount);
    335             }
    336         }
    337         return readCount;
    338     }
    339 
    340     @Override
    341     public long read(ByteBuffer[] targets, int offset, int length) throws IOException {
    342         Arrays.checkOffsetAndCount(targets.length, offset, length);
    343 
    344         // status must be open and connected
    345         checkOpenConnected();
    346         int totalCount = FileChannelImpl.calculateTotalRemaining(targets, offset, length, true);
    347         if (totalCount == 0) {
    348             return 0;
    349         }
    350 
    351         // read data to readBuffer, and then transfer data from readBuffer to
    352         // targets.
    353         ByteBuffer readBuffer = ByteBuffer.allocate(totalCount);
    354         int readCount;
    355         readCount = readImpl(readBuffer);
    356         int left = readCount;
    357         int index = offset;
    358         // transfer data from readBuffer to targets
    359         byte[] readArray = readBuffer.array();
    360         while (left > 0) {
    361             int putLength = Math.min(targets[index].remaining(), left);
    362             targets[index].put(readArray, readCount - left, putLength);
    363             index++;
    364             left -= putLength;
    365         }
    366         return readCount;
    367     }
    368 
    369     /*
    370      * read from channel, and store the result in the target.
    371      */
    372     private int readImpl(ByteBuffer dst) throws IOException {
    373         synchronized (readLock) {
    374             int readCount = 0;
    375             try {
    376                 begin();
    377                 readCount = IoBridge.recvfrom(false, fd, dst, 0, null, isConnected());
    378             } catch (InterruptedIOException e) {
    379                 // InterruptedIOException will be thrown when timeout.
    380                 return 0;
    381             } finally {
    382                 end(readCount > 0);
    383             }
    384             return readCount;
    385         }
    386     }
    387 
    388     @Override public int write(ByteBuffer src) throws IOException {
    389         checkNotNull(src);
    390         checkOpenConnected();
    391         if (!src.hasRemaining()) {
    392             return 0;
    393         }
    394 
    395         return writeImpl(src);
    396     }
    397 
    398     /**
    399      * @see java.nio.channels.DatagramChannel#write(java.nio.ByteBuffer[], int,
    400      *      int)
    401      */
    402     @Override
    403     public long write(ByteBuffer[] sources, int offset, int length) throws IOException {
    404         Arrays.checkOffsetAndCount(sources.length, offset, length);
    405 
    406         // status must be open and connected
    407         checkOpenConnected();
    408         int count = FileChannelImpl.calculateTotalRemaining(sources, offset, length, false);
    409         if (count == 0) {
    410             return 0;
    411         }
    412         ByteBuffer writeBuf = ByteBuffer.allocate(count);
    413         for (int val = offset; val < length + offset; val++) {
    414             ByteBuffer source = sources[val];
    415             int oldPosition = source.position();
    416             writeBuf.put(source);
    417             source.position(oldPosition);
    418         }
    419         writeBuf.flip();
    420         int result = writeImpl(writeBuf);
    421         int val = offset;
    422         int written = result;
    423         while (result > 0) {
    424             ByteBuffer source = sources[val];
    425             int gap = Math.min(result, source.remaining());
    426             source.position(source.position() + gap);
    427             val++;
    428             result -= gap;
    429         }
    430         return written;
    431     }
    432 
    433     private int writeImpl(ByteBuffer buf) throws IOException {
    434         synchronized (writeLock) {
    435             int result = 0;
    436             try {
    437                 begin();
    438                 result = IoBridge.sendto(fd, buf, 0, null, 0);
    439             } finally {
    440                 end(result > 0);
    441             }
    442             return result;
    443         }
    444     }
    445 
    446     @Override protected synchronized void implCloseSelectableChannel() throws IOException {
    447         // A closed channel is not connected.
    448         onDisconnect(true /* updateSocketState */);
    449         IoBridge.closeAndSignalBlockedThreads(fd);
    450 
    451         if (socket != null && !socket.isClosed()) {
    452             socket.onClose();
    453         }
    454     }
    455 
    456     @Override protected void implConfigureBlocking(boolean blocking) throws IOException {
    457         IoUtils.setBlocking(fd, blocking);
    458     }
    459 
    460     /*
    461      * Status check, must be open.
    462      */
    463     private void checkOpen() throws ClosedChannelException {
    464         if (!isOpen()) {
    465             throw new ClosedChannelException();
    466         }
    467     }
    468 
    469     /*
    470      * Status check, must be open and connected, for read and write.
    471      */
    472     private void checkOpenConnected() throws IOException {
    473         checkOpen();
    474         if (!isConnected()) {
    475             throw new NotYetConnectedException();
    476         }
    477     }
    478 
    479     /*
    480      * Buffer check, must not null
    481      */
    482     private void checkNotNull(ByteBuffer source) {
    483         if (source == null) {
    484             throw new NullPointerException("source == null");
    485         }
    486     }
    487 
    488     /*
    489      * Get the fd for internal use.
    490      */
    491     public FileDescriptor getFD() {
    492         return fd;
    493     }
    494 
    495     /*
    496      * The adapter class of DatagramSocket
    497      */
    498     private static class DatagramSocketAdapter extends DatagramSocket {
    499 
    500         /*
    501          * The internal datagramChannelImpl.
    502          */
    503         private final DatagramChannelImpl channelImpl;
    504 
    505         /*
    506          * Constructor initialize the datagramSocketImpl and datagramChannelImpl
    507          */
    508         DatagramSocketAdapter(DatagramSocketImpl socketimpl, DatagramChannelImpl channelImpl) {
    509             super(socketimpl);
    510             this.channelImpl = channelImpl;
    511 
    512             // Sync state socket state with the channel it is being created from
    513             if (channelImpl.isBound) {
    514                 onBind(channelImpl.localAddress, channelImpl.localPort);
    515             }
    516             if (channelImpl.connected) {
    517                 onConnect(
    518                         channelImpl.connectAddress.getAddress(),
    519                         channelImpl.connectAddress.getPort());
    520             } else {
    521                 onDisconnect();
    522             }
    523             if (!channelImpl.isOpen()) {
    524                 onClose();
    525             }
    526         }
    527 
    528         /*
    529          * Get the internal datagramChannelImpl
    530          */
    531         @Override
    532         public DatagramChannel getChannel() {
    533             return channelImpl;
    534         }
    535 
    536         @Override
    537         public void bind(SocketAddress localAddr) throws SocketException {
    538             if (channelImpl.isConnected()) {
    539                 throw new AlreadyConnectedException();
    540             }
    541             super.bind(localAddr);
    542             channelImpl.onBind(false /* updateSocketState */);
    543         }
    544 
    545         @Override
    546         public void connect(SocketAddress peer) throws SocketException {
    547             if (isConnected()) {
    548                 // RI compatibility: If the socket is already connected this fails.
    549                 throw new IllegalStateException("Socket is already connected.");
    550             }
    551             super.connect(peer);
    552             // Connect may have performed an implicit bind(). Sync up here.
    553             channelImpl.onBind(false /* updateSocketState */);
    554 
    555             InetSocketAddress inetSocketAddress = (InetSocketAddress) peer;
    556             channelImpl.onConnect(
    557                     inetSocketAddress.getAddress(), inetSocketAddress.getPort(),
    558                     false /* updateSocketState */);
    559         }
    560 
    561         @Override
    562         public void connect(InetAddress address, int port) {
    563             // To avoid implementing connect() twice call this.connect(SocketAddress) in preference
    564             // to super.connect().
    565             try {
    566                 connect(new InetSocketAddress(address, port));
    567             } catch (SocketException e) {
    568                 // Ignored - there is nothing we can report here.
    569             }
    570         }
    571 
    572         @Override
    573         public void receive(DatagramPacket packet) throws IOException {
    574             if (!channelImpl.isBlocking()) {
    575                 throw new IllegalBlockingModeException();
    576             }
    577 
    578             boolean wasBound = isBound();
    579             super.receive(packet);
    580             if (!wasBound) {
    581                 // DatagramSocket.receive() will implicitly bind if it hasn't been done explicitly.
    582                 // Sync the channel state with the socket.
    583                 channelImpl.onBind(false /* updateSocketState */);
    584             }
    585         }
    586 
    587         @Override
    588         public void send(DatagramPacket packet) throws IOException {
    589             if (!channelImpl.isBlocking()) {
    590                 throw new IllegalBlockingModeException();
    591             }
    592 
    593             // DatagramSocket.send() will implicitly bind if it hasn't been done explicitly. Force
    594             // bind() here so that the channel state stays in sync with the socket.
    595             boolean wasBound = isBound();
    596             super.send(packet);
    597             if (!wasBound) {
    598                 // DatagramSocket.send() will implicitly bind if it hasn't been done explicitly.
    599                 // Sync the channel state with the socket.
    600                 channelImpl.onBind(false /* updateSocketState */);
    601             }
    602         }
    603 
    604         @Override
    605         public void close() {
    606             synchronized (channelImpl) {
    607                 super.close();
    608                 if (channelImpl.isOpen()) {
    609                     try {
    610                         channelImpl.close();
    611                     } catch (IOException e) {
    612                         // Ignore
    613                     }
    614                 }
    615             }
    616         }
    617 
    618         @Override
    619         public void disconnect() {
    620             super.disconnect();
    621             channelImpl.onDisconnect(false /* updateSocketState */);
    622         }
    623     }
    624 }
    625