Home | History | Annotate | Download | only in nio
      1 /* Licensed to the Apache Software Foundation (ASF) under one or more
      2  * contributor license agreements.  See the NOTICE file distributed with
      3  * this work for additional information regarding copyright ownership.
      4  * The ASF licenses this file to You under the Apache License, Version 2.0
      5  * (the "License"); you may not use this file except in compliance with
      6  * the License.  You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 package java.nio;
     17 
     18 import java.io.FileDescriptor;
     19 import java.io.IOException;
     20 import java.nio.channels.ClosedSelectorException;
     21 import java.nio.channels.IllegalSelectorException;
     22 import java.nio.channels.SelectableChannel;
     23 import java.nio.channels.SelectionKey;
     24 import static java.nio.channels.SelectionKey.*;
     25 import java.nio.channels.Selector;
     26 import java.nio.channels.SocketChannel;
     27 import java.nio.channels.spi.AbstractSelectableChannel;
     28 import java.nio.channels.spi.AbstractSelectionKey;
     29 import java.nio.channels.spi.AbstractSelector;
     30 import java.nio.channels.spi.SelectorProvider;
     31 import java.util.Arrays;
     32 import java.util.Collection;
     33 import java.util.Collections;
     34 import java.util.HashSet;
     35 import java.util.Iterator;
     36 import java.util.Set;
     37 import java.util.UnsafeArrayList;
     38 import libcore.io.ErrnoException;
     39 import libcore.io.IoBridge;
     40 import libcore.io.IoUtils;
     41 import libcore.io.Libcore;
     42 import libcore.io.StructPollfd;
     43 import libcore.util.EmptyArray;
     44 import static libcore.io.OsConstants.*;
     45 
     46 /*
     47  * Default implementation of java.nio.channels.Selector
     48  */
     49 final class SelectorImpl extends AbstractSelector {
     50 
     51     /**
     52      * Used to synchronize when a key's interest ops change.
     53      */
     54     final Object keysLock = new Object();
     55 
     56     private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>();
     57 
     58     /**
     59      * The unmodifiable set of keys as exposed to the user. This object is used
     60      * for synchronization.
     61      */
     62     private final Set<SelectionKey> unmodifiableKeys = Collections
     63             .<SelectionKey>unmodifiableSet(mutableKeys);
     64 
     65     private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>();
     66 
     67     /**
     68      * The unmodifiable set of selectable keys as seen by the user. This object
     69      * is used for synchronization.
     70      */
     71     private final Set<SelectionKey> selectedKeys
     72             = new UnaddableSet<SelectionKey>(mutableSelectedKeys);
     73 
     74     /**
     75      * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each
     76      * time select returns, wakeupIn is drained.
     77      */
     78     private final FileDescriptor wakeupIn;
     79     private final FileDescriptor wakeupOut;
     80 
     81     private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8);
     82 
     83     public SelectorImpl(SelectorProvider selectorProvider) throws IOException {
     84         super(selectorProvider);
     85 
     86         /*
     87          * Create a pipes to trigger wakeup. We can't use a NIO pipe because it
     88          * would be closed if the selecting thread is interrupted. Also
     89          * configure the pipe so we can fully drain it without blocking.
     90          */
     91         try {
     92             FileDescriptor[] pipeFds = Libcore.os.pipe();
     93             wakeupIn = pipeFds[0];
     94             wakeupOut = pipeFds[1];
     95             IoUtils.setBlocking(wakeupIn, false);
     96             pollFds.add(new StructPollfd());
     97             setPollFd(0, wakeupIn, POLLIN, null);
     98         } catch (ErrnoException errnoException) {
     99             throw errnoException.rethrowAsIOException();
    100         }
    101     }
    102 
    103     @Override protected void implCloseSelector() throws IOException {
    104         wakeup();
    105         synchronized (this) {
    106             synchronized (unmodifiableKeys) {
    107                 synchronized (selectedKeys) {
    108                     IoUtils.close(wakeupIn);
    109                     IoUtils.close(wakeupOut);
    110                     doCancel();
    111                     for (SelectionKey sk : mutableKeys) {
    112                         deregister((AbstractSelectionKey) sk);
    113                     }
    114                 }
    115             }
    116         }
    117     }
    118 
    119     @Override protected SelectionKey register(AbstractSelectableChannel channel,
    120             int operations, Object attachment) {
    121         if (!provider().equals(channel.provider())) {
    122             throw new IllegalSelectorException();
    123         }
    124         synchronized (this) {
    125             synchronized (unmodifiableKeys) {
    126                 SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations,
    127                         attachment, this);
    128                 mutableKeys.add(selectionKey);
    129                 ensurePollFdsCapacity();
    130                 return selectionKey;
    131             }
    132         }
    133     }
    134 
    135     @Override public synchronized Set<SelectionKey> keys() {
    136         checkClosed();
    137         return unmodifiableKeys;
    138     }
    139 
    140     private void checkClosed() {
    141         if (!isOpen()) {
    142             throw new ClosedSelectorException();
    143         }
    144     }
    145 
    146     @Override public int select() throws IOException {
    147         // Blocks until some fd is ready.
    148         return selectInternal(-1);
    149     }
    150 
    151     @Override public int select(long timeout) throws IOException {
    152         if (timeout < 0) {
    153             throw new IllegalArgumentException();
    154         }
    155         // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow.
    156         return selectInternal((timeout == 0) ? -1 : timeout);
    157     }
    158 
    159     @Override public int selectNow() throws IOException {
    160         return selectInternal(0);
    161     }
    162 
    163     private int selectInternal(long timeout) throws IOException {
    164         checkClosed();
    165         synchronized (this) {
    166             synchronized (unmodifiableKeys) {
    167                 synchronized (selectedKeys) {
    168                     doCancel();
    169                     boolean isBlock = (timeout != 0);
    170                     synchronized (keysLock) {
    171                         preparePollFds();
    172                     }
    173                     int rc;
    174                     try {
    175                         if (isBlock) {
    176                             begin();
    177                         }
    178                         try {
    179                             rc = Libcore.os.poll(pollFds.array(), (int) timeout);
    180                         } catch (ErrnoException errnoException) {
    181                             throw errnoException.rethrowAsIOException();
    182                         }
    183                     } finally {
    184                         if (isBlock) {
    185                             end();
    186                         }
    187                     }
    188 
    189                     int readyCount = (rc > 0) ? processPollFds() : 0;
    190                     readyCount -= doCancel();
    191                     return readyCount;
    192                 }
    193             }
    194         }
    195     }
    196 
    197     private void setPollFd(int i, FileDescriptor fd, int events, Object object) {
    198         StructPollfd pollFd = pollFds.get(i);
    199         pollFd.fd = fd;
    200         pollFd.events = (short) events;
    201         pollFd.userData = object;
    202     }
    203 
    204     private void preparePollFds() {
    205         int i = 1; // Our wakeup pipe comes before all the user's fds.
    206         for (SelectionKeyImpl key : mutableKeys) {
    207             int interestOps = key.interestOpsNoCheck();
    208             short eventMask = 0;
    209             if (((OP_ACCEPT | OP_READ) & interestOps) != 0) {
    210                 eventMask |= POLLIN;
    211             }
    212             if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) {
    213                 eventMask |= POLLOUT;
    214             }
    215             if (eventMask != 0) {
    216                 setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key);
    217             }
    218         }
    219     }
    220 
    221     private void ensurePollFdsCapacity() {
    222         // We need one slot for each element of mutableKeys, plus one for the wakeup pipe.
    223         while (pollFds.size() < mutableKeys.size() + 1) {
    224             pollFds.add(new StructPollfd());
    225         }
    226     }
    227 
    228     /**
    229      * Updates the key ready ops and selected key set.
    230      */
    231     private int processPollFds() throws IOException {
    232         if (pollFds.get(0).revents == POLLIN) {
    233             // Read bytes from the wakeup pipe until the pipe is empty.
    234             byte[] buffer = new byte[8];
    235             while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) {
    236             }
    237         }
    238 
    239         int readyKeyCount = 0;
    240         for (int i = 1; i < pollFds.size(); ++i) {
    241             StructPollfd pollFd = pollFds.get(i);
    242             if (pollFd.revents == 0) {
    243                 continue;
    244             }
    245             if (pollFd.fd == null) {
    246                 break;
    247             }
    248 
    249             SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData;
    250 
    251             pollFd.fd = null;
    252             pollFd.userData = null;
    253 
    254             int ops = key.interestOpsNoCheck();
    255             int selectedOp = 0;
    256             if ((pollFd.revents & POLLIN) != 0) {
    257                 selectedOp = ops & (OP_ACCEPT | OP_READ);
    258             } else if ((pollFd.revents & POLLOUT) != 0) {
    259                 if (key.isConnected()) {
    260                     selectedOp = ops & OP_WRITE;
    261                 } else {
    262                     selectedOp = ops & OP_CONNECT;
    263                 }
    264             }
    265 
    266             if (selectedOp != 0) {
    267                 boolean wasSelected = mutableSelectedKeys.contains(key);
    268                 if (wasSelected && key.readyOps() != selectedOp) {
    269                     key.setReadyOps(key.readyOps() | selectedOp);
    270                     ++readyKeyCount;
    271                 } else if (!wasSelected) {
    272                     key.setReadyOps(selectedOp);
    273                     mutableSelectedKeys.add(key);
    274                     ++readyKeyCount;
    275                 }
    276             }
    277         }
    278 
    279         return readyKeyCount;
    280     }
    281 
    282     @Override public synchronized Set<SelectionKey> selectedKeys() {
    283         checkClosed();
    284         return selectedKeys;
    285     }
    286 
    287     /**
    288      * Removes cancelled keys from the key set and selected key set, and
    289      * deregisters the corresponding channels. Returns the number of keys
    290      * removed from the selected key set.
    291      */
    292     private int doCancel() {
    293         int deselected = 0;
    294 
    295         Set<SelectionKey> cancelledKeys = cancelledKeys();
    296         synchronized (cancelledKeys) {
    297             if (cancelledKeys.size() > 0) {
    298                 for (SelectionKey currentKey : cancelledKeys) {
    299                     mutableKeys.remove(currentKey);
    300                     deregister((AbstractSelectionKey) currentKey);
    301                     if (mutableSelectedKeys.remove(currentKey)) {
    302                         deselected++;
    303                     }
    304                 }
    305                 cancelledKeys.clear();
    306             }
    307         }
    308 
    309         return deselected;
    310     }
    311 
    312     @Override public Selector wakeup() {
    313         try {
    314             Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1);
    315         } catch (ErrnoException ignored) {
    316         }
    317         return this;
    318     }
    319 
    320     private static class UnaddableSet<E> implements Set<E> {
    321 
    322         private final Set<E> set;
    323 
    324         UnaddableSet(Set<E> set) {
    325             this.set = set;
    326         }
    327 
    328         @Override
    329         public boolean equals(Object object) {
    330             return set.equals(object);
    331         }
    332 
    333         @Override
    334         public int hashCode() {
    335             return set.hashCode();
    336         }
    337 
    338         public boolean add(E object) {
    339             throw new UnsupportedOperationException();
    340         }
    341 
    342         public boolean addAll(Collection<? extends E> c) {
    343             throw new UnsupportedOperationException();
    344         }
    345 
    346         public void clear() {
    347             set.clear();
    348         }
    349 
    350         public boolean contains(Object object) {
    351             return set.contains(object);
    352         }
    353 
    354         public boolean containsAll(Collection<?> c) {
    355             return set.containsAll(c);
    356         }
    357 
    358         public boolean isEmpty() {
    359             return set.isEmpty();
    360         }
    361 
    362         public Iterator<E> iterator() {
    363             return set.iterator();
    364         }
    365 
    366         public boolean remove(Object object) {
    367             return set.remove(object);
    368         }
    369 
    370         public boolean removeAll(Collection<?> c) {
    371             return set.removeAll(c);
    372         }
    373 
    374         public boolean retainAll(Collection<?> c) {
    375             return set.retainAll(c);
    376         }
    377 
    378         public int size() {
    379             return set.size();
    380         }
    381 
    382         public Object[] toArray() {
    383             return set.toArray();
    384         }
    385 
    386         public <T> T[] toArray(T[] a) {
    387             return set.toArray(a);
    388         }
    389     }
    390 }
    391