1 /* Licensed to the Apache Software Foundation (ASF) under one or more 2 * contributor license agreements. See the NOTICE file distributed with 3 * this work for additional information regarding copyright ownership. 4 * The ASF licenses this file to You under the Apache License, Version 2.0 5 * (the "License"); you may not use this file except in compliance with 6 * the License. You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 package java.nio; 17 18 import java.io.FileDescriptor; 19 import java.io.IOException; 20 import java.nio.channels.ClosedSelectorException; 21 import java.nio.channels.IllegalSelectorException; 22 import java.nio.channels.SelectableChannel; 23 import java.nio.channels.SelectionKey; 24 import static java.nio.channels.SelectionKey.*; 25 import java.nio.channels.Selector; 26 import java.nio.channels.SocketChannel; 27 import java.nio.channels.spi.AbstractSelectableChannel; 28 import java.nio.channels.spi.AbstractSelectionKey; 29 import java.nio.channels.spi.AbstractSelector; 30 import java.nio.channels.spi.SelectorProvider; 31 import java.util.Arrays; 32 import java.util.Collection; 33 import java.util.Collections; 34 import java.util.HashSet; 35 import java.util.Iterator; 36 import java.util.Set; 37 import java.util.UnsafeArrayList; 38 import libcore.io.ErrnoException; 39 import libcore.io.IoBridge; 40 import libcore.io.IoUtils; 41 import libcore.io.Libcore; 42 import libcore.io.StructPollfd; 43 import libcore.util.EmptyArray; 44 import static libcore.io.OsConstants.*; 45 46 /* 47 * Default implementation of java.nio.channels.Selector 48 */ 49 final class SelectorImpl extends AbstractSelector { 50 51 /** 52 * Used to synchronize when a key's interest ops change. 53 */ 54 final Object keysLock = new Object(); 55 56 private final Set<SelectionKeyImpl> mutableKeys = new HashSet<SelectionKeyImpl>(); 57 58 /** 59 * The unmodifiable set of keys as exposed to the user. This object is used 60 * for synchronization. 61 */ 62 private final Set<SelectionKey> unmodifiableKeys = Collections 63 .<SelectionKey>unmodifiableSet(mutableKeys); 64 65 private final Set<SelectionKey> mutableSelectedKeys = new HashSet<SelectionKey>(); 66 67 /** 68 * The unmodifiable set of selectable keys as seen by the user. This object 69 * is used for synchronization. 70 */ 71 private final Set<SelectionKey> selectedKeys 72 = new UnaddableSet<SelectionKey>(mutableSelectedKeys); 73 74 /** 75 * The wakeup pipe. To trigger a wakeup, write a byte to wakeupOut. Each 76 * time select returns, wakeupIn is drained. 77 */ 78 private final FileDescriptor wakeupIn; 79 private final FileDescriptor wakeupOut; 80 81 private final UnsafeArrayList<StructPollfd> pollFds = new UnsafeArrayList<StructPollfd>(StructPollfd.class, 8); 82 83 public SelectorImpl(SelectorProvider selectorProvider) throws IOException { 84 super(selectorProvider); 85 86 /* 87 * Create a pipes to trigger wakeup. We can't use a NIO pipe because it 88 * would be closed if the selecting thread is interrupted. Also 89 * configure the pipe so we can fully drain it without blocking. 90 */ 91 try { 92 FileDescriptor[] pipeFds = Libcore.os.pipe(); 93 wakeupIn = pipeFds[0]; 94 wakeupOut = pipeFds[1]; 95 IoUtils.setBlocking(wakeupIn, false); 96 pollFds.add(new StructPollfd()); 97 setPollFd(0, wakeupIn, POLLIN, null); 98 } catch (ErrnoException errnoException) { 99 throw errnoException.rethrowAsIOException(); 100 } 101 } 102 103 @Override protected void implCloseSelector() throws IOException { 104 wakeup(); 105 synchronized (this) { 106 synchronized (unmodifiableKeys) { 107 synchronized (selectedKeys) { 108 IoUtils.close(wakeupIn); 109 IoUtils.close(wakeupOut); 110 doCancel(); 111 for (SelectionKey sk : mutableKeys) { 112 deregister((AbstractSelectionKey) sk); 113 } 114 } 115 } 116 } 117 } 118 119 @Override protected SelectionKey register(AbstractSelectableChannel channel, 120 int operations, Object attachment) { 121 if (!provider().equals(channel.provider())) { 122 throw new IllegalSelectorException(); 123 } 124 synchronized (this) { 125 synchronized (unmodifiableKeys) { 126 SelectionKeyImpl selectionKey = new SelectionKeyImpl(channel, operations, 127 attachment, this); 128 mutableKeys.add(selectionKey); 129 ensurePollFdsCapacity(); 130 return selectionKey; 131 } 132 } 133 } 134 135 @Override public synchronized Set<SelectionKey> keys() { 136 checkClosed(); 137 return unmodifiableKeys; 138 } 139 140 private void checkClosed() { 141 if (!isOpen()) { 142 throw new ClosedSelectorException(); 143 } 144 } 145 146 @Override public int select() throws IOException { 147 // Blocks until some fd is ready. 148 return selectInternal(-1); 149 } 150 151 @Override public int select(long timeout) throws IOException { 152 if (timeout < 0) { 153 throw new IllegalArgumentException(); 154 } 155 // Our timeout is interpreted differently to Unix's --- 0 means block. See selectNow. 156 return selectInternal((timeout == 0) ? -1 : timeout); 157 } 158 159 @Override public int selectNow() throws IOException { 160 return selectInternal(0); 161 } 162 163 private int selectInternal(long timeout) throws IOException { 164 checkClosed(); 165 synchronized (this) { 166 synchronized (unmodifiableKeys) { 167 synchronized (selectedKeys) { 168 doCancel(); 169 boolean isBlock = (timeout != 0); 170 synchronized (keysLock) { 171 preparePollFds(); 172 } 173 int rc; 174 try { 175 if (isBlock) { 176 begin(); 177 } 178 try { 179 rc = Libcore.os.poll(pollFds.array(), (int) timeout); 180 } catch (ErrnoException errnoException) { 181 throw errnoException.rethrowAsIOException(); 182 } 183 } finally { 184 if (isBlock) { 185 end(); 186 } 187 } 188 189 int readyCount = (rc > 0) ? processPollFds() : 0; 190 readyCount -= doCancel(); 191 return readyCount; 192 } 193 } 194 } 195 } 196 197 private void setPollFd(int i, FileDescriptor fd, int events, Object object) { 198 StructPollfd pollFd = pollFds.get(i); 199 pollFd.fd = fd; 200 pollFd.events = (short) events; 201 pollFd.userData = object; 202 } 203 204 private void preparePollFds() { 205 int i = 1; // Our wakeup pipe comes before all the user's fds. 206 for (SelectionKeyImpl key : mutableKeys) { 207 int interestOps = key.interestOpsNoCheck(); 208 short eventMask = 0; 209 if (((OP_ACCEPT | OP_READ) & interestOps) != 0) { 210 eventMask |= POLLIN; 211 } 212 if (((OP_CONNECT | OP_WRITE) & interestOps) != 0) { 213 eventMask |= POLLOUT; 214 } 215 if (eventMask != 0) { 216 setPollFd(i++, ((FileDescriptorChannel) key.channel()).getFD(), eventMask, key); 217 } 218 } 219 } 220 221 private void ensurePollFdsCapacity() { 222 // We need one slot for each element of mutableKeys, plus one for the wakeup pipe. 223 while (pollFds.size() < mutableKeys.size() + 1) { 224 pollFds.add(new StructPollfd()); 225 } 226 } 227 228 /** 229 * Updates the key ready ops and selected key set. 230 */ 231 private int processPollFds() throws IOException { 232 if (pollFds.get(0).revents == POLLIN) { 233 // Read bytes from the wakeup pipe until the pipe is empty. 234 byte[] buffer = new byte[8]; 235 while (IoBridge.read(wakeupIn, buffer, 0, 1) > 0) { 236 } 237 } 238 239 int readyKeyCount = 0; 240 for (int i = 1; i < pollFds.size(); ++i) { 241 StructPollfd pollFd = pollFds.get(i); 242 if (pollFd.revents == 0) { 243 continue; 244 } 245 if (pollFd.fd == null) { 246 break; 247 } 248 249 SelectionKeyImpl key = (SelectionKeyImpl) pollFd.userData; 250 251 pollFd.fd = null; 252 pollFd.userData = null; 253 254 int ops = key.interestOpsNoCheck(); 255 int selectedOp = 0; 256 if ((pollFd.revents & POLLIN) != 0) { 257 selectedOp = ops & (OP_ACCEPT | OP_READ); 258 } else if ((pollFd.revents & POLLOUT) != 0) { 259 if (key.isConnected()) { 260 selectedOp = ops & OP_WRITE; 261 } else { 262 selectedOp = ops & OP_CONNECT; 263 } 264 } 265 266 if (selectedOp != 0) { 267 boolean wasSelected = mutableSelectedKeys.contains(key); 268 if (wasSelected && key.readyOps() != selectedOp) { 269 key.setReadyOps(key.readyOps() | selectedOp); 270 ++readyKeyCount; 271 } else if (!wasSelected) { 272 key.setReadyOps(selectedOp); 273 mutableSelectedKeys.add(key); 274 ++readyKeyCount; 275 } 276 } 277 } 278 279 return readyKeyCount; 280 } 281 282 @Override public synchronized Set<SelectionKey> selectedKeys() { 283 checkClosed(); 284 return selectedKeys; 285 } 286 287 /** 288 * Removes cancelled keys from the key set and selected key set, and 289 * deregisters the corresponding channels. Returns the number of keys 290 * removed from the selected key set. 291 */ 292 private int doCancel() { 293 int deselected = 0; 294 295 Set<SelectionKey> cancelledKeys = cancelledKeys(); 296 synchronized (cancelledKeys) { 297 if (cancelledKeys.size() > 0) { 298 for (SelectionKey currentKey : cancelledKeys) { 299 mutableKeys.remove(currentKey); 300 deregister((AbstractSelectionKey) currentKey); 301 if (mutableSelectedKeys.remove(currentKey)) { 302 deselected++; 303 } 304 } 305 cancelledKeys.clear(); 306 } 307 } 308 309 return deselected; 310 } 311 312 @Override public Selector wakeup() { 313 try { 314 Libcore.os.write(wakeupOut, new byte[] { 1 }, 0, 1); 315 } catch (ErrnoException ignored) { 316 } 317 return this; 318 } 319 320 private static class UnaddableSet<E> implements Set<E> { 321 322 private final Set<E> set; 323 324 UnaddableSet(Set<E> set) { 325 this.set = set; 326 } 327 328 @Override 329 public boolean equals(Object object) { 330 return set.equals(object); 331 } 332 333 @Override 334 public int hashCode() { 335 return set.hashCode(); 336 } 337 338 public boolean add(E object) { 339 throw new UnsupportedOperationException(); 340 } 341 342 public boolean addAll(Collection<? extends E> c) { 343 throw new UnsupportedOperationException(); 344 } 345 346 public void clear() { 347 set.clear(); 348 } 349 350 public boolean contains(Object object) { 351 return set.contains(object); 352 } 353 354 public boolean containsAll(Collection<?> c) { 355 return set.containsAll(c); 356 } 357 358 public boolean isEmpty() { 359 return set.isEmpty(); 360 } 361 362 public Iterator<E> iterator() { 363 return set.iterator(); 364 } 365 366 public boolean remove(Object object) { 367 return set.remove(object); 368 } 369 370 public boolean removeAll(Collection<?> c) { 371 return set.removeAll(c); 372 } 373 374 public boolean retainAll(Collection<?> c) { 375 return set.retainAll(c); 376 } 377 378 public int size() { 379 return set.size(); 380 } 381 382 public Object[] toArray() { 383 return set.toArray(); 384 } 385 386 public <T> T[] toArray(T[] a) { 387 return set.toArray(a); 388 } 389 } 390 } 391