Home | History | Annotate | Download | only in ch
      1 /*
      2  * Copyright (c) 2008, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 
     26 package sun.nio.ch;
     27 
     28 import java.nio.channels.spi.AsynchronousChannelProvider;
     29 import java.io.IOException;
     30 import java.util.concurrent.ArrayBlockingQueue;
     31 import java.util.concurrent.RejectedExecutionException;
     32 import java.util.concurrent.atomic.AtomicInteger;
     33 import static sun.nio.ch.EPoll.*;
     34 
     35 /**
     36  * AsynchronousChannelGroup implementation based on the Linux epoll facility.
     37  */
     38 
     39 final class EPollPort
     40     extends Port
     41 {
     42     // maximum number of events to poll at a time
     43     private static final int MAX_EPOLL_EVENTS = 512;
     44 
     45     // errors
     46     private static final int ENOENT     = 2;
     47 
     48     // epoll file descriptor
     49     private final int epfd;
     50 
     51     // true if epoll closed
     52     private boolean closed;
     53 
     54     // socket pair used for wakeup
     55     private final int sp[];
     56 
     57     // number of wakeups pending
     58     private final AtomicInteger wakeupCount = new AtomicInteger();
     59 
     60     // address of the poll array passed to epoll_wait
     61     private final long address;
     62 
     63     // encapsulates an event for a channel
     64     static class Event {
     65         final PollableChannel channel;
     66         final int events;
     67 
     68         Event(PollableChannel channel, int events) {
     69             this.channel = channel;
     70             this.events = events;
     71         }
     72 
     73         PollableChannel channel()   { return channel; }
     74         int events()                { return events; }
     75     }
     76 
     77     // queue of events for cases that a polling thread dequeues more than one
     78     // event
     79     private final ArrayBlockingQueue<Event> queue;
     80     private final Event NEED_TO_POLL = new Event(null, 0);
     81     private final Event EXECUTE_TASK_OR_SHUTDOWN = new Event(null, 0);
     82 
     83     EPollPort(AsynchronousChannelProvider provider, ThreadPool pool)
     84         throws IOException
     85     {
     86         super(provider, pool);
     87 
     88         // open epoll
     89         this.epfd = epollCreate();
     90 
     91         // create socket pair for wakeup mechanism
     92         int[] sv = new int[2];
     93         try {
     94             socketpair(sv);
     95             // register one end with epoll
     96             epollCtl(epfd, EPOLL_CTL_ADD, sv[0], Net.POLLIN);
     97         } catch (IOException x) {
     98             close0(epfd);
     99             throw x;
    100         }
    101         this.sp = sv;
    102 
    103         // allocate the poll array
    104         this.address = allocatePollArray(MAX_EPOLL_EVENTS);
    105 
    106         // create the queue and offer the special event to ensure that the first
    107         // threads polls
    108         this.queue = new ArrayBlockingQueue<Event>(MAX_EPOLL_EVENTS);
    109         this.queue.offer(NEED_TO_POLL);
    110     }
    111 
    112     EPollPort start() {
    113         startThreads(new EventHandlerTask());
    114         return this;
    115     }
    116 
    117     /**
    118      * Release all resources
    119      */
    120     private void implClose() {
    121         synchronized (this) {
    122             if (closed)
    123                 return;
    124             closed = true;
    125         }
    126         freePollArray(address);
    127         close0(sp[0]);
    128         close0(sp[1]);
    129         close0(epfd);
    130     }
    131 
    132     private void wakeup() {
    133         if (wakeupCount.incrementAndGet() == 1) {
    134             // write byte to socketpair to force wakeup
    135             try {
    136                 interrupt(sp[1]);
    137             } catch (IOException x) {
    138                 throw new AssertionError(x);
    139             }
    140         }
    141     }
    142 
    143     @Override
    144     void executeOnHandlerTask(Runnable task) {
    145         synchronized (this) {
    146             if (closed)
    147                 throw new RejectedExecutionException();
    148             offerTask(task);
    149             wakeup();
    150         }
    151     }
    152 
    153     @Override
    154     void shutdownHandlerTasks() {
    155         /*
    156          * If no tasks are running then just release resources; otherwise
    157          * write to the one end of the socketpair to wakeup any polling threads.
    158          */
    159         int nThreads = threadCount();
    160         if (nThreads == 0) {
    161             implClose();
    162         } else {
    163             // send interrupt to each thread
    164             while (nThreads-- > 0) {
    165                 wakeup();
    166             }
    167         }
    168     }
    169 
    170     // invoke by clients to register a file descriptor
    171     @Override
    172     void startPoll(int fd, int events) {
    173         // update events (or add to epoll on first usage)
    174         int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
    175         if (err == ENOENT)
    176             err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
    177         if (err != 0)
    178             throw new AssertionError();     // should not happen
    179     }
    180 
    181     /*
    182      * Task to process events from epoll and dispatch to the channel's
    183      * onEvent handler.
    184      *
    185      * Events are retreived from epoll in batch and offered to a BlockingQueue
    186      * where they are consumed by handler threads. A special "NEED_TO_POLL"
    187      * event is used to signal one consumer to re-poll when all events have
    188      * been consumed.
    189      */
    190     private class EventHandlerTask implements Runnable {
    191         private Event poll() throws IOException {
    192             try {
    193                 for (;;) {
    194                     int n = epollWait(epfd, address, MAX_EPOLL_EVENTS);
    195                     /*
    196                      * 'n' events have been read. Here we map them to their
    197                      * corresponding channel in batch and queue n-1 so that
    198                      * they can be handled by other handler threads. The last
    199                      * event is handled by this thread (and so is not queued).
    200                      */
    201                     fdToChannelLock.readLock().lock();
    202                     try {
    203                         while (n-- > 0) {
    204                             long eventAddress = getEvent(address, n);
    205                             int fd = getDescriptor(eventAddress);
    206 
    207                             // wakeup
    208                             if (fd == sp[0]) {
    209                                 if (wakeupCount.decrementAndGet() == 0) {
    210                                     // no more wakeups so drain pipe
    211                                     drain1(sp[0]);
    212                                 }
    213 
    214                                 // queue special event if there are more events
    215                                 // to handle.
    216                                 if (n > 0) {
    217                                     queue.offer(EXECUTE_TASK_OR_SHUTDOWN);
    218                                     continue;
    219                                 }
    220                                 return EXECUTE_TASK_OR_SHUTDOWN;
    221                             }
    222 
    223                             PollableChannel channel = fdToChannel.get(fd);
    224                             if (channel != null) {
    225                                 int events = getEvents(eventAddress);
    226                                 Event ev = new Event(channel, events);
    227 
    228                                 // n-1 events are queued; This thread handles
    229                                 // the last one except for the wakeup
    230                                 if (n > 0) {
    231                                     queue.offer(ev);
    232                                 } else {
    233                                     return ev;
    234                                 }
    235                             }
    236                         }
    237                     } finally {
    238                         fdToChannelLock.readLock().unlock();
    239                     }
    240                 }
    241             } finally {
    242                 // to ensure that some thread will poll when all events have
    243                 // been consumed
    244                 queue.offer(NEED_TO_POLL);
    245             }
    246         }
    247 
    248         public void run() {
    249             Invoker.GroupAndInvokeCount myGroupAndInvokeCount =
    250                 Invoker.getGroupAndInvokeCount();
    251             final boolean isPooledThread = (myGroupAndInvokeCount != null);
    252             boolean replaceMe = false;
    253             Event ev;
    254             try {
    255                 for (;;) {
    256                     // reset invoke count
    257                     if (isPooledThread)
    258                         myGroupAndInvokeCount.resetInvokeCount();
    259 
    260                     try {
    261                         replaceMe = false;
    262                         ev = queue.take();
    263 
    264                         // no events and this thread has been "selected" to
    265                         // poll for more.
    266                         if (ev == NEED_TO_POLL) {
    267                             try {
    268                                 ev = poll();
    269                             } catch (IOException x) {
    270                                 x.printStackTrace();
    271                                 return;
    272                             }
    273                         }
    274                     } catch (InterruptedException x) {
    275                         continue;
    276                     }
    277 
    278                     // handle wakeup to execute task or shutdown
    279                     if (ev == EXECUTE_TASK_OR_SHUTDOWN) {
    280                         Runnable task = pollTask();
    281                         if (task == null) {
    282                             // shutdown request
    283                             return;
    284                         }
    285                         // run task (may throw error/exception)
    286                         replaceMe = true;
    287                         task.run();
    288                         continue;
    289                     }
    290 
    291                     // process event
    292                     try {
    293                         ev.channel().onEvent(ev.events(), isPooledThread);
    294                     } catch (Error x) {
    295                         replaceMe = true; throw x;
    296                     } catch (RuntimeException x) {
    297                         replaceMe = true; throw x;
    298                     }
    299                 }
    300             } finally {
    301                 // last handler to exit when shutdown releases resources
    302                 int remaining = threadExit(this, replaceMe);
    303                 if (remaining == 0 && isShutdown()) {
    304                     implClose();
    305                 }
    306             }
    307         }
    308     }
    309 
    310     // -- Native methods --
    311 
    312     private static native void socketpair(int[] sv) throws IOException;
    313 
    314     private static native void interrupt(int fd) throws IOException;
    315 
    316     private static native void drain1(int fd) throws IOException;
    317 
    318     private static native void close0(int fd);
    319 
    320     // Android-removed: Code to load native libraries, doesn't make sense on Android.
    321     /*
    322     static {
    323         IOUtil.load();
    324     }
    325     */
    326 }
    327