Home | History | Annotate | Download | only in ch
      1 /*
      2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 
     26 package sun.nio.ch;
     27 
     28 import java.nio.channels.Channel;
     29 import java.nio.channels.AsynchronousChannelGroup;
     30 import java.nio.channels.spi.AsynchronousChannelProvider;
     31 import java.io.IOException;
     32 import java.io.FileDescriptor;
     33 import java.util.Queue;
     34 import java.util.concurrent.*;
     35 import java.util.concurrent.atomic.AtomicInteger;
     36 import java.util.concurrent.atomic.AtomicBoolean;
     37 import java.security.PrivilegedAction;
     38 import java.security.AccessController;
     39 import java.security.AccessControlContext;
     40 import sun.security.action.GetIntegerAction;
     41 
     42 /**
     43  * Base implementation of AsynchronousChannelGroup
     44  */
     45 
     46 abstract class AsynchronousChannelGroupImpl
     47     extends AsynchronousChannelGroup implements Executor
     48 {
     49     // number of internal threads handling I/O events when using an unbounded
     50     // thread pool. Internal threads do not dispatch to completion handlers.
     51     private static final int internalThreadCount = AccessController.doPrivileged(
     52         new GetIntegerAction("sun.nio.ch.internalThreadPoolSize", 1));
     53 
     54     // associated thread pool
     55     private final ThreadPool pool;
     56 
     57     // number of tasks running (including internal)
     58     private final AtomicInteger threadCount = new AtomicInteger();
     59 
     60     // associated Executor for timeouts
     61     private ScheduledThreadPoolExecutor timeoutExecutor;
     62 
     63     // task queue for when using a fixed thread pool. In that case, thread
     64     // waiting on I/O events must be awokon to poll tasks from this queue.
     65     private final Queue<Runnable> taskQueue;
     66 
     67     // group shutdown
     68     private final AtomicBoolean shutdown = new AtomicBoolean();
     69     private final Object shutdownNowLock = new Object();
     70     private volatile boolean terminateInitiated;
     71 
     72     AsynchronousChannelGroupImpl(AsynchronousChannelProvider provider,
     73                                  ThreadPool pool)
     74     {
     75         super(provider);
     76         this.pool = pool;
     77 
     78         if (pool.isFixedThreadPool()) {
     79             taskQueue = new ConcurrentLinkedQueue<Runnable>();
     80         } else {
     81             taskQueue = null;   // not used
     82         }
     83 
     84         // use default thread factory as thread should not be visible to
     85         // application (it doesn't execute completion handlers).
     86         this.timeoutExecutor = (ScheduledThreadPoolExecutor)
     87             Executors.newScheduledThreadPool(1, ThreadPool.defaultThreadFactory());
     88         this.timeoutExecutor.setRemoveOnCancelPolicy(true);
     89     }
     90 
     91     final ExecutorService executor() {
     92         return pool.executor();
     93     }
     94 
     95     final boolean isFixedThreadPool() {
     96         return pool.isFixedThreadPool();
     97     }
     98 
     99     final int fixedThreadCount() {
    100         if (isFixedThreadPool()) {
    101             return pool.poolSize();
    102         } else {
    103             return pool.poolSize() + internalThreadCount;
    104         }
    105     }
    106 
    107     private Runnable bindToGroup(final Runnable task) {
    108         final AsynchronousChannelGroupImpl thisGroup = this;
    109         return new Runnable() {
    110             public void run() {
    111                 Invoker.bindToGroup(thisGroup);
    112                 task.run();
    113             }
    114         };
    115     }
    116 
    117     private void startInternalThread(final Runnable task) {
    118         AccessController.doPrivileged(new PrivilegedAction<Void>() {
    119             @Override
    120             public Void run() {
    121                 // internal threads should not be visible to application so
    122                 // cannot use user-supplied thread factory
    123                 ThreadPool.defaultThreadFactory().newThread(task).start();
    124                 return null;
    125             }
    126          });
    127     }
    128 
    129     protected final void startThreads(Runnable task) {
    130         if (!isFixedThreadPool()) {
    131             for (int i=0; i<internalThreadCount; i++) {
    132                 startInternalThread(task);
    133                 threadCount.incrementAndGet();
    134             }
    135         }
    136         if (pool.poolSize() > 0) {
    137             task = bindToGroup(task);
    138             try {
    139                 for (int i=0; i<pool.poolSize(); i++) {
    140                     pool.executor().execute(task);
    141                     threadCount.incrementAndGet();
    142                 }
    143             } catch (RejectedExecutionException  x) {
    144                 // nothing we can do
    145             }
    146         }
    147     }
    148 
    149     final int threadCount() {
    150         return threadCount.get();
    151     }
    152 
    153     /**
    154      * Invoked by tasks as they terminate
    155      */
    156     final int threadExit(Runnable task, boolean replaceMe) {
    157         if (replaceMe) {
    158             try {
    159                 if (Invoker.isBoundToAnyGroup()) {
    160                     // submit new task to replace this thread
    161                     pool.executor().execute(bindToGroup(task));
    162                 } else {
    163                     // replace internal thread
    164                     startInternalThread(task);
    165                 }
    166                 return threadCount.get();
    167             } catch (RejectedExecutionException x) {
    168                 // unable to replace
    169             }
    170         }
    171         return threadCount.decrementAndGet();
    172     }
    173 
    174     /**
    175      * Wakes up a thread waiting for I/O events to execute the given task.
    176      */
    177     abstract void executeOnHandlerTask(Runnable task);
    178 
    179     /**
    180      * For a fixed thread pool the task is queued to a thread waiting on I/O
    181      * events. For other thread pools we simply submit the task to the thread
    182      * pool.
    183      */
    184     final void executeOnPooledThread(Runnable task) {
    185         if (isFixedThreadPool()) {
    186             executeOnHandlerTask(task);
    187         } else {
    188             pool.executor().execute(bindToGroup(task));
    189         }
    190     }
    191 
    192     final void offerTask(Runnable task) {
    193         taskQueue.offer(task);
    194     }
    195 
    196     final Runnable pollTask() {
    197         return (taskQueue == null) ? null : taskQueue.poll();
    198     }
    199 
    200     final Future<?> schedule(Runnable task, long timeout, TimeUnit unit) {
    201         try {
    202             return timeoutExecutor.schedule(task, timeout, unit);
    203         } catch (RejectedExecutionException rej) {
    204             if (terminateInitiated) {
    205                 // no timeout scheduled as group is terminating
    206                 return null;
    207             }
    208             throw new AssertionError(rej);
    209         }
    210     }
    211 
    212     @Override
    213     public final boolean isShutdown() {
    214         return shutdown.get();
    215     }
    216 
    217     @Override
    218     public final boolean isTerminated()  {
    219         return pool.executor().isTerminated();
    220     }
    221 
    222     /**
    223      * Returns true if there are no channels in the group
    224      */
    225     abstract boolean isEmpty();
    226 
    227     /**
    228      * Attaches a foreign channel to this group.
    229      */
    230     abstract Object attachForeignChannel(Channel channel, FileDescriptor fdo)
    231         throws IOException;
    232 
    233     /**
    234      * Detaches a foreign channel from this group.
    235      */
    236     abstract void detachForeignChannel(Object key);
    237 
    238     /**
    239      * Closes all channels in the group
    240      */
    241     abstract void closeAllChannels() throws IOException;
    242 
    243     /**
    244      * Shutdown all tasks waiting for I/O events.
    245      */
    246     abstract void shutdownHandlerTasks();
    247 
    248     private void shutdownExecutors() {
    249         AccessController.doPrivileged(new PrivilegedAction<Void>() {
    250             public Void run() {
    251                 pool.executor().shutdown();
    252                 timeoutExecutor.shutdown();
    253                 return null;
    254             }
    255         });
    256     }
    257 
    258     @Override
    259     public final void shutdown() {
    260         if (shutdown.getAndSet(true)) {
    261             // already shutdown
    262             return;
    263         }
    264         // if there are channels in the group then shutdown will continue
    265         // when the last channel is closed
    266         if (!isEmpty()) {
    267             return;
    268         }
    269         // initiate termination (acquire shutdownNowLock to ensure that other
    270         // threads invoking shutdownNow will block).
    271         synchronized (shutdownNowLock) {
    272             if (!terminateInitiated) {
    273                 terminateInitiated = true;
    274                 shutdownHandlerTasks();
    275                 shutdownExecutors();
    276             }
    277         }
    278     }
    279 
    280     @Override
    281     public final void shutdownNow() throws IOException {
    282         shutdown.set(true);
    283         synchronized (shutdownNowLock) {
    284             if (!terminateInitiated) {
    285                 terminateInitiated = true;
    286                 closeAllChannels();
    287                 shutdownHandlerTasks();
    288                 shutdownExecutors();
    289             }
    290         }
    291     }
    292 
    293     /**
    294      * For use by AsynchronousFileChannel to release resources without shutting
    295      * down the thread pool.
    296      */
    297     final void detachFromThreadPool() {
    298         if (shutdown.getAndSet(true))
    299             throw new AssertionError("Already shutdown");
    300         if (!isEmpty())
    301             throw new AssertionError("Group not empty");
    302         shutdownHandlerTasks();
    303     }
    304 
    305     @Override
    306     public final boolean awaitTermination(long timeout, TimeUnit unit)
    307         throws InterruptedException
    308     {
    309         return pool.executor().awaitTermination(timeout, unit);
    310     }
    311 
    312     /**
    313      * Executes the given command on one of the channel group's pooled threads.
    314      */
    315     @Override
    316     public final void execute(Runnable task) {
    317         SecurityManager sm = System.getSecurityManager();
    318         if (sm != null) {
    319             // when a security manager is installed then the user's task
    320             // must be run with the current calling context
    321             final AccessControlContext acc = AccessController.getContext();
    322             final Runnable delegate = task;
    323             task = new Runnable() {
    324                 @Override
    325                 public void run() {
    326                     AccessController.doPrivileged(new PrivilegedAction<Void>() {
    327                         @Override
    328                         public Void run() {
    329                             delegate.run();
    330                             return null;
    331                         }
    332                     }, acc);
    333                 }
    334             };
    335         }
    336         executeOnPooledThread(task);
    337     }
    338 }
    339