Home | History | Annotate | Download | only in ch
      1 /*
      2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 
     26 package sun.nio.ch;
     27 
     28 import java.io.FileDescriptor;
     29 import java.io.IOException;
     30 import java.net.InetSocketAddress;
     31 import java.net.SocketAddress;
     32 import java.nio.ByteBuffer;
     33 import java.nio.channels.AlreadyConnectedException;
     34 import java.nio.channels.AsynchronousChannel;
     35 import java.nio.channels.AsynchronousCloseException;
     36 import java.nio.channels.ClosedChannelException;
     37 import java.nio.channels.CompletionHandler;
     38 import java.nio.channels.ConnectionPendingException;
     39 import java.nio.channels.InterruptedByTimeoutException;
     40 import java.nio.channels.ShutdownChannelGroupException;
     41 import java.security.AccessController;
     42 import java.util.concurrent.Future;
     43 import java.util.concurrent.TimeUnit;
     44 
     45 import dalvik.system.CloseGuard;
     46 import sun.net.NetHooks;
     47 import sun.security.action.GetPropertyAction;
     48 
     49 /**
     50  * Unix implementation of AsynchronousSocketChannel
     51  */
     52 
     53 class UnixAsynchronousSocketChannelImpl
     54     extends AsynchronousSocketChannelImpl implements Port.PollableChannel
     55 {
     56     private final static NativeDispatcher nd = new SocketDispatcher();
     57     private static enum OpType { CONNECT, READ, WRITE };
     58 
     59     private static final boolean disableSynchronousRead;
     60     static {
     61         String propValue = AccessController.doPrivileged(
     62             new GetPropertyAction("sun.nio.ch.disableSynchronousRead", "false"));
     63         disableSynchronousRead = (propValue.length() == 0) ?
     64             true : Boolean.valueOf(propValue);
     65     }
     66 
     67     private final Port port;
     68     private final int fdVal;
     69 
     70     // used to ensure that the context for I/O operations that complete
     71     // ascynrhonously is visible to the pooled threads handling I/O events.
     72     private final Object updateLock = new Object();
     73 
     74     // pending connect (updateLock)
     75     private boolean connectPending;
     76     private CompletionHandler<Void,Object> connectHandler;
     77     private Object connectAttachment;
     78     private PendingFuture<Void,Object> connectFuture;
     79 
     80     // pending remote address (stateLock)
     81     private SocketAddress pendingRemote;
     82 
     83     // pending read (updateLock)
     84     private boolean readPending;
     85     private boolean isScatteringRead;
     86     private ByteBuffer readBuffer;
     87     private ByteBuffer[] readBuffers;
     88     private CompletionHandler<Number,Object> readHandler;
     89     private Object readAttachment;
     90     private PendingFuture<Number,Object> readFuture;
     91     private Future<?> readTimer;
     92 
     93     // pending write (updateLock)
     94     private boolean writePending;
     95     private boolean isGatheringWrite;
     96     private ByteBuffer writeBuffer;
     97     private ByteBuffer[] writeBuffers;
     98     private CompletionHandler<Number,Object> writeHandler;
     99     private Object writeAttachment;
    100     private PendingFuture<Number,Object> writeFuture;
    101     private Future<?> writeTimer;
    102 
    103     // Android-changed: Add CloseGuard support.
    104     private final CloseGuard guard = CloseGuard.get();
    105 
    106     UnixAsynchronousSocketChannelImpl(Port port)
    107         throws IOException
    108     {
    109         super(port);
    110 
    111         // set non-blocking
    112         try {
    113             IOUtil.configureBlocking(fd, false);
    114         } catch (IOException x) {
    115             nd.close(fd);
    116             throw x;
    117         }
    118 
    119         this.port = port;
    120         this.fdVal = IOUtil.fdVal(fd);
    121 
    122         // add mapping from file descriptor to this channel
    123         port.register(fdVal, this);
    124         // Android-changed: Add CloseGuard support.
    125         guard.open("close");
    126     }
    127 
    128     // Constructor for sockets created by UnixAsynchronousServerSocketChannelImpl
    129     UnixAsynchronousSocketChannelImpl(Port port,
    130                                       FileDescriptor fd,
    131                                       InetSocketAddress remote)
    132         throws IOException
    133     {
    134         super(port, fd, remote);
    135 
    136         this.fdVal = IOUtil.fdVal(fd);
    137         IOUtil.configureBlocking(fd, false);
    138 
    139         try {
    140             port.register(fdVal, this);
    141         } catch (ShutdownChannelGroupException x) {
    142             // ShutdownChannelGroupException thrown if we attempt to register a
    143             // new channel after the group is shutdown
    144             throw new IOException(x);
    145         }
    146 
    147         this.port = port;
    148         guard.open("close");
    149     }
    150 
    151     @Override
    152     public AsynchronousChannelGroupImpl group() {
    153         return port;
    154     }
    155 
    156     // register events for outstanding I/O operations, caller already owns updateLock
    157     private void updateEvents() {
    158         assert Thread.holdsLock(updateLock);
    159         int events = 0;
    160         if (readPending)
    161             events |= Net.POLLIN;
    162         if (connectPending || writePending)
    163             events |= Net.POLLOUT;
    164         if (events != 0)
    165             port.startPoll(fdVal, events);
    166     }
    167 
    168     // register events for outstanding I/O operations
    169     private void lockAndUpdateEvents() {
    170         synchronized (updateLock) {
    171             updateEvents();
    172         }
    173     }
    174 
    175     // invoke to finish read and/or write operations
    176     private void finish(boolean mayInvokeDirect,
    177                         boolean readable,
    178                         boolean writable)
    179     {
    180         boolean finishRead = false;
    181         boolean finishWrite = false;
    182         boolean finishConnect = false;
    183 
    184         // map event to pending result
    185         synchronized (updateLock) {
    186             if (readable && this.readPending) {
    187                 this.readPending = false;
    188                 finishRead = true;
    189             }
    190             if (writable) {
    191                 if (this.writePending) {
    192                     this.writePending = false;
    193                     finishWrite = true;
    194                 } else if (this.connectPending) {
    195                     this.connectPending = false;
    196                     finishConnect = true;
    197                 }
    198             }
    199         }
    200 
    201         // complete the I/O operation. Special case for when channel is
    202         // ready for both reading and writing. In that case, submit task to
    203         // complete write if write operation has a completion handler.
    204         if (finishRead) {
    205             if (finishWrite)
    206                 finishWrite(false);
    207             finishRead(mayInvokeDirect);
    208             return;
    209         }
    210         if (finishWrite) {
    211             finishWrite(mayInvokeDirect);
    212         }
    213         if (finishConnect) {
    214             finishConnect(mayInvokeDirect);
    215         }
    216     }
    217 
    218     /**
    219      * Invoked by event handler thread when file descriptor is polled
    220      */
    221     @Override
    222     public void onEvent(int events, boolean mayInvokeDirect) {
    223         boolean readable = (events & Net.POLLIN) > 0;
    224         boolean writable = (events & Net.POLLOUT) > 0;
    225         if ((events & (Net.POLLERR | Net.POLLHUP)) > 0) {
    226             readable = true;
    227             writable = true;
    228         }
    229         finish(mayInvokeDirect, readable, writable);
    230     }
    231 
    232     @Override
    233     void implClose() throws IOException {
    234         // Android-changed: Add CloseGuard support.
    235         guard.close();
    236         // remove the mapping
    237         port.unregister(fdVal);
    238 
    239         // close file descriptor
    240         nd.close(fd);
    241 
    242         // All outstanding I/O operations are required to fail
    243         finish(false, true, true);
    244     }
    245 
    246     protected void finalize() throws Throwable {
    247         try {
    248             if (guard != null) {
    249                 guard.warnIfOpen();
    250             }
    251             close();
    252         } finally {
    253             super.finalize();
    254         }
    255     }
    256 
    257     @Override
    258     public void onCancel(PendingFuture<?,?> task) {
    259         if (task.getContext() == OpType.CONNECT)
    260             killConnect();
    261         if (task.getContext() == OpType.READ)
    262             killReading();
    263         if (task.getContext() == OpType.WRITE)
    264             killWriting();
    265     }
    266 
    267     // -- connect --
    268 
    269     private void setConnected() throws IOException {
    270         synchronized (stateLock) {
    271             state = ST_CONNECTED;
    272             localAddress = Net.localAddress(fd);
    273             remoteAddress = (InetSocketAddress)pendingRemote;
    274         }
    275     }
    276 
    277     private void finishConnect(boolean mayInvokeDirect) {
    278         Throwable e = null;
    279         try {
    280             begin();
    281             checkConnect(fdVal);
    282             setConnected();
    283         } catch (Throwable x) {
    284             if (x instanceof ClosedChannelException)
    285                 x = new AsynchronousCloseException();
    286             e = x;
    287         } finally {
    288             end();
    289         }
    290         if (e != null) {
    291             // close channel if connection cannot be established
    292             try {
    293                 close();
    294             } catch (Throwable suppressed) {
    295                 e.addSuppressed(suppressed);
    296             }
    297         }
    298 
    299         // invoke handler and set result
    300         CompletionHandler<Void,Object> handler = connectHandler;
    301         Object att = connectAttachment;
    302         PendingFuture<Void,Object> future = connectFuture;
    303         if (handler == null) {
    304             future.setResult(null, e);
    305         } else {
    306             if (mayInvokeDirect) {
    307                 Invoker.invokeUnchecked(handler, att, null, e);
    308             } else {
    309                 Invoker.invokeIndirectly(this, handler, att, null, e);
    310             }
    311         }
    312     }
    313 
    314     @Override
    315     @SuppressWarnings("unchecked")
    316     <A> Future<Void> implConnect(SocketAddress remote,
    317                                  A attachment,
    318                                  CompletionHandler<Void,? super A> handler)
    319     {
    320         if (!isOpen()) {
    321             Throwable e = new ClosedChannelException();
    322             if (handler == null) {
    323                 return CompletedFuture.withFailure(e);
    324             } else {
    325                 Invoker.invoke(this, handler, attachment, null, e);
    326                 return null;
    327             }
    328         }
    329 
    330         InetSocketAddress isa = Net.checkAddress(remote);
    331 
    332         // permission check
    333         SecurityManager sm = System.getSecurityManager();
    334         if (sm != null)
    335             sm.checkConnect(isa.getAddress().getHostAddress(), isa.getPort());
    336 
    337         // check and set state
    338         boolean notifyBeforeTcpConnect;
    339         synchronized (stateLock) {
    340             if (state == ST_CONNECTED)
    341                 throw new AlreadyConnectedException();
    342             if (state == ST_PENDING)
    343                 throw new ConnectionPendingException();
    344             state = ST_PENDING;
    345             pendingRemote = remote;
    346             notifyBeforeTcpConnect = (localAddress == null);
    347         }
    348 
    349         Throwable e = null;
    350         try {
    351             begin();
    352             // notify hook if unbound
    353             if (notifyBeforeTcpConnect)
    354                 NetHooks.beforeTcpConnect(fd, isa.getAddress(), isa.getPort());
    355             int n = Net.connect(fd, isa.getAddress(), isa.getPort());
    356             if (n == IOStatus.UNAVAILABLE) {
    357                 // connection could not be established immediately
    358                 PendingFuture<Void,A> result = null;
    359                 synchronized (updateLock) {
    360                     if (handler == null) {
    361                         result = new PendingFuture<Void,A>(this, OpType.CONNECT);
    362                         this.connectFuture = (PendingFuture<Void,Object>)result;
    363                     } else {
    364                         this.connectHandler = (CompletionHandler<Void,Object>)handler;
    365                         this.connectAttachment = attachment;
    366                     }
    367                     this.connectPending = true;
    368                     updateEvents();
    369                 }
    370                 return result;
    371             }
    372             setConnected();
    373         } catch (Throwable x) {
    374             if (x instanceof ClosedChannelException)
    375                 x = new AsynchronousCloseException();
    376             e = x;
    377         } finally {
    378             end();
    379         }
    380 
    381         // close channel if connect fails
    382         if (e != null) {
    383             try {
    384                 close();
    385             } catch (Throwable suppressed) {
    386                 e.addSuppressed(suppressed);
    387             }
    388         }
    389         if (handler == null) {
    390             return CompletedFuture.withResult(null, e);
    391         } else {
    392             Invoker.invoke(this, handler, attachment, null, e);
    393             return null;
    394         }
    395     }
    396 
    397     // -- read --
    398 
    399     private void finishRead(boolean mayInvokeDirect) {
    400         int n = -1;
    401         Throwable exc = null;
    402 
    403         // copy fields as we can't access them after reading is re-enabled.
    404         boolean scattering = isScatteringRead;
    405         CompletionHandler<Number,Object> handler = readHandler;
    406         Object att = readAttachment;
    407         PendingFuture<Number,Object> future = readFuture;
    408         Future<?> timeout = readTimer;
    409 
    410         try {
    411             begin();
    412 
    413             if (scattering) {
    414                 n = (int)IOUtil.read(fd, readBuffers, nd);
    415             } else {
    416                 n = IOUtil.read(fd, readBuffer, -1, nd);
    417             }
    418             if (n == IOStatus.UNAVAILABLE) {
    419                 // spurious wakeup, is this possible?
    420                 synchronized (updateLock) {
    421                     readPending = true;
    422                 }
    423                 return;
    424             }
    425 
    426             // allow objects to be GC'ed.
    427             this.readBuffer = null;
    428             this.readBuffers = null;
    429             this.readAttachment = null;
    430 
    431             // allow another read to be initiated
    432             enableReading();
    433 
    434         } catch (Throwable x) {
    435             enableReading();
    436             if (x instanceof ClosedChannelException)
    437                 x = new AsynchronousCloseException();
    438             exc = x;
    439         } finally {
    440             // restart poll in case of concurrent write
    441             if (!(exc instanceof AsynchronousCloseException))
    442                 lockAndUpdateEvents();
    443             end();
    444         }
    445 
    446         // cancel the associated timer
    447         if (timeout != null)
    448             timeout.cancel(false);
    449 
    450         // create result
    451         Number result = (exc != null) ? null : (scattering) ?
    452             (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
    453 
    454         // invoke handler or set result
    455         if (handler == null) {
    456             future.setResult(result, exc);
    457         } else {
    458             if (mayInvokeDirect) {
    459                 Invoker.invokeUnchecked(handler, att, result, exc);
    460             } else {
    461                 Invoker.invokeIndirectly(this, handler, att, result, exc);
    462             }
    463         }
    464     }
    465 
    466     private Runnable readTimeoutTask = new Runnable() {
    467         public void run() {
    468             CompletionHandler<Number,Object> handler = null;
    469             Object att = null;
    470             PendingFuture<Number,Object> future = null;
    471 
    472             synchronized (updateLock) {
    473                 if (!readPending)
    474                     return;
    475                 readPending = false;
    476                 handler = readHandler;
    477                 att = readAttachment;
    478                 future = readFuture;
    479             }
    480 
    481             // kill further reading before releasing waiters
    482             enableReading(true);
    483 
    484             // invoke handler or set result
    485             Exception exc = new InterruptedByTimeoutException();
    486             if (handler == null) {
    487                 future.setFailure(exc);
    488             } else {
    489                 AsynchronousChannel ch = UnixAsynchronousSocketChannelImpl.this;
    490                 Invoker.invokeIndirectly(ch, handler, att, null, exc);
    491             }
    492         }
    493     };
    494 
    495     /**
    496      * Initiates a read or scattering read operation
    497      */
    498     @Override
    499     @SuppressWarnings("unchecked")
    500     <V extends Number,A> Future<V> implRead(boolean isScatteringRead,
    501                                             ByteBuffer dst,
    502                                             ByteBuffer[] dsts,
    503                                             long timeout,
    504                                             TimeUnit unit,
    505                                             A attachment,
    506                                             CompletionHandler<V,? super A> handler)
    507     {
    508         // A synchronous read is not attempted if disallowed by system property
    509         // or, we are using a fixed thread pool and the completion handler may
    510         // not be invoked directly (because the thread is not a pooled thread or
    511         // there are too many handlers on the stack).
    512         Invoker.GroupAndInvokeCount myGroupAndInvokeCount = null;
    513         boolean invokeDirect = false;
    514         boolean attemptRead = false;
    515         if (!disableSynchronousRead) {
    516             if (handler == null) {
    517                 attemptRead = true;
    518             } else {
    519                 myGroupAndInvokeCount = Invoker.getGroupAndInvokeCount();
    520                 invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
    521                 // okay to attempt read with user thread pool
    522                 attemptRead = invokeDirect || !port.isFixedThreadPool();
    523             }
    524         }
    525 
    526         int n = IOStatus.UNAVAILABLE;
    527         Throwable exc = null;
    528         boolean pending = false;
    529 
    530         try {
    531             begin();
    532 
    533             if (attemptRead) {
    534                 if (isScatteringRead) {
    535                     n = (int)IOUtil.read(fd, dsts, nd);
    536                 } else {
    537                     n = IOUtil.read(fd, dst, -1, nd);
    538                 }
    539             }
    540 
    541             if (n == IOStatus.UNAVAILABLE) {
    542                 PendingFuture<V,A> result = null;
    543                 synchronized (updateLock) {
    544                     this.isScatteringRead = isScatteringRead;
    545                     this.readBuffer = dst;
    546                     this.readBuffers = dsts;
    547                     if (handler == null) {
    548                         this.readHandler = null;
    549                         result = new PendingFuture<V,A>(this, OpType.READ);
    550                         this.readFuture = (PendingFuture<Number,Object>)result;
    551                         this.readAttachment = null;
    552                     } else {
    553                         this.readHandler = (CompletionHandler<Number,Object>)handler;
    554                         this.readAttachment = attachment;
    555                         this.readFuture = null;
    556                     }
    557                     if (timeout > 0L) {
    558                         this.readTimer = port.schedule(readTimeoutTask, timeout, unit);
    559                     }
    560                     this.readPending = true;
    561                     updateEvents();
    562                 }
    563                 pending = true;
    564                 return result;
    565             }
    566         } catch (Throwable x) {
    567             if (x instanceof ClosedChannelException)
    568                 x = new AsynchronousCloseException();
    569             exc = x;
    570         } finally {
    571             if (!pending)
    572                 enableReading();
    573             end();
    574         }
    575 
    576         Number result = (exc != null) ? null : (isScatteringRead) ?
    577             (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
    578 
    579         // read completed immediately
    580         if (handler != null) {
    581             if (invokeDirect) {
    582                 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
    583             } else {
    584                 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
    585             }
    586             return null;
    587         } else {
    588             return CompletedFuture.withResult((V)result, exc);
    589         }
    590     }
    591 
    592     // -- write --
    593 
    594     private void finishWrite(boolean mayInvokeDirect) {
    595         int n = -1;
    596         Throwable exc = null;
    597 
    598         // copy fields as we can't access them after reading is re-enabled.
    599         boolean gathering = this.isGatheringWrite;
    600         CompletionHandler<Number,Object> handler = this.writeHandler;
    601         Object att = this.writeAttachment;
    602         PendingFuture<Number,Object> future = this.writeFuture;
    603         Future<?> timer = this.writeTimer;
    604 
    605         try {
    606             begin();
    607 
    608             if (gathering) {
    609                 n = (int)IOUtil.write(fd, writeBuffers, nd);
    610             } else {
    611                 n = IOUtil.write(fd, writeBuffer, -1, nd);
    612             }
    613             if (n == IOStatus.UNAVAILABLE) {
    614                 // spurious wakeup, is this possible?
    615                 synchronized (updateLock) {
    616                     writePending = true;
    617                 }
    618                 return;
    619             }
    620 
    621             // allow objects to be GC'ed.
    622             this.writeBuffer = null;
    623             this.writeBuffers = null;
    624             this.writeAttachment = null;
    625 
    626             // allow another write to be initiated
    627             enableWriting();
    628 
    629         } catch (Throwable x) {
    630             enableWriting();
    631             if (x instanceof ClosedChannelException)
    632                 x = new AsynchronousCloseException();
    633             exc = x;
    634         } finally {
    635             // restart poll in case of concurrent write
    636             if (!(exc instanceof AsynchronousCloseException))
    637                 lockAndUpdateEvents();
    638             end();
    639         }
    640 
    641         // cancel the associated timer
    642         if (timer != null)
    643             timer.cancel(false);
    644 
    645         // create result
    646         Number result = (exc != null) ? null : (gathering) ?
    647             (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
    648 
    649         // invoke handler or set result
    650         if (handler == null) {
    651             future.setResult(result, exc);
    652         } else {
    653             if (mayInvokeDirect) {
    654                 Invoker.invokeUnchecked(handler, att, result, exc);
    655             } else {
    656                 Invoker.invokeIndirectly(this, handler, att, result, exc);
    657             }
    658         }
    659     }
    660 
    661     private Runnable writeTimeoutTask = new Runnable() {
    662         public void run() {
    663             CompletionHandler<Number,Object> handler = null;
    664             Object att = null;
    665             PendingFuture<Number,Object> future = null;
    666 
    667             synchronized (updateLock) {
    668                 if (!writePending)
    669                     return;
    670                 writePending = false;
    671                 handler = writeHandler;
    672                 att = writeAttachment;
    673                 future = writeFuture;
    674             }
    675 
    676             // kill further writing before releasing waiters
    677             enableWriting(true);
    678 
    679             // invoke handler or set result
    680             Exception exc = new InterruptedByTimeoutException();
    681             if (handler != null) {
    682                 Invoker.invokeIndirectly(UnixAsynchronousSocketChannelImpl.this,
    683                     handler, att, null, exc);
    684             } else {
    685                 future.setFailure(exc);
    686             }
    687         }
    688     };
    689 
    690     /**
    691      * Initiates a read or scattering read operation
    692      */
    693     @Override
    694     @SuppressWarnings("unchecked")
    695     <V extends Number,A> Future<V> implWrite(boolean isGatheringWrite,
    696                                              ByteBuffer src,
    697                                              ByteBuffer[] srcs,
    698                                              long timeout,
    699                                              TimeUnit unit,
    700                                              A attachment,
    701                                              CompletionHandler<V,? super A> handler)
    702     {
    703         Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
    704             Invoker.getGroupAndInvokeCount();
    705         boolean invokeDirect = Invoker.mayInvokeDirect(myGroupAndInvokeCount, port);
    706         boolean attemptWrite = (handler == null) || invokeDirect ||
    707             !port.isFixedThreadPool();  // okay to attempt write with user thread pool
    708 
    709         int n = IOStatus.UNAVAILABLE;
    710         Throwable exc = null;
    711         boolean pending = false;
    712 
    713         try {
    714             begin();
    715 
    716             if (attemptWrite) {
    717                 if (isGatheringWrite) {
    718                     n = (int)IOUtil.write(fd, srcs, nd);
    719                 } else {
    720                     n = IOUtil.write(fd, src, -1, nd);
    721                 }
    722             }
    723 
    724             if (n == IOStatus.UNAVAILABLE) {
    725                 PendingFuture<V,A> result = null;
    726                 synchronized (updateLock) {
    727                     this.isGatheringWrite = isGatheringWrite;
    728                     this.writeBuffer = src;
    729                     this.writeBuffers = srcs;
    730                     if (handler == null) {
    731                         this.writeHandler = null;
    732                         result = new PendingFuture<V,A>(this, OpType.WRITE);
    733                         this.writeFuture = (PendingFuture<Number,Object>)result;
    734                         this.writeAttachment = null;
    735                     } else {
    736                         this.writeHandler = (CompletionHandler<Number,Object>)handler;
    737                         this.writeAttachment = attachment;
    738                         this.writeFuture = null;
    739                     }
    740                     if (timeout > 0L) {
    741                         this.writeTimer = port.schedule(writeTimeoutTask, timeout, unit);
    742                     }
    743                     this.writePending = true;
    744                     updateEvents();
    745                 }
    746                 pending = true;
    747                 return result;
    748             }
    749         } catch (Throwable x) {
    750             if (x instanceof ClosedChannelException)
    751                 x = new AsynchronousCloseException();
    752             exc = x;
    753         } finally {
    754             if (!pending)
    755                 enableWriting();
    756             end();
    757         }
    758 
    759         Number result = (exc != null) ? null : (isGatheringWrite) ?
    760             (Number)Long.valueOf(n) : (Number)Integer.valueOf(n);
    761 
    762         // write completed immediately
    763         if (handler != null) {
    764             if (invokeDirect) {
    765                 Invoker.invokeDirect(myGroupAndInvokeCount, handler, attachment, (V)result, exc);
    766             } else {
    767                 Invoker.invokeIndirectly(this, handler, attachment, (V)result, exc);
    768             }
    769             return null;
    770         } else {
    771             return CompletedFuture.withResult((V)result, exc);
    772         }
    773     }
    774 
    775     // -- Native methods --
    776 
    777     private static native void checkConnect(int fdVal) throws IOException;
    778 }
    779