Home | History | Annotate | Download | only in ch
      1 /*
      2  * Copyright (c) 2008, 2009, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 
     26 package sun.nio.ch;
     27 
     28 import java.nio.channels.*;
     29 import java.util.concurrent.*;
     30 import java.security.AccessController;
     31 import sun.security.action.GetIntegerAction;
     32 
     33 /**
     34  * Defines static methods to invoke a completion handler or arbitrary task.
     35  */
     36 
     37 class Invoker {
     38     private Invoker() { }
     39 
     40     // maximum number of completion handlers that may be invoked on the current
     41     // thread before it re-directs invocations to the thread pool. This helps
     42     // avoid stack overflow and lessens the risk of starvation.
     43     private static final int maxHandlerInvokeCount = AccessController.doPrivileged(
     44         new GetIntegerAction("sun.nio.ch.maxCompletionHandlersOnStack", 16));
     45 
     46     // Per-thread object with reference to channel group and a counter for
     47     // the number of completion handlers invoked. This should be reset to 0
     48     // when all completion handlers have completed.
     49     static class GroupAndInvokeCount {
     50         private final AsynchronousChannelGroupImpl group;
     51         private int handlerInvokeCount;
     52         GroupAndInvokeCount(AsynchronousChannelGroupImpl group) {
     53             this.group = group;
     54         }
     55         AsynchronousChannelGroupImpl group() {
     56             return group;
     57         }
     58         int invokeCount() {
     59             return handlerInvokeCount;
     60         }
     61         void setInvokeCount(int value) {
     62             handlerInvokeCount = value;
     63         }
     64         void resetInvokeCount() {
     65             handlerInvokeCount = 0;
     66         }
     67         void incrementInvokeCount() {
     68             handlerInvokeCount++;
     69         }
     70     }
     71     private static final ThreadLocal<GroupAndInvokeCount> myGroupAndInvokeCount =
     72         new ThreadLocal<GroupAndInvokeCount>() {
     73             @Override protected GroupAndInvokeCount initialValue() {
     74                 return null;
     75             }
     76         };
     77 
     78     /**
     79      * Binds this thread to the given group
     80      */
     81     static void bindToGroup(AsynchronousChannelGroupImpl group) {
     82         myGroupAndInvokeCount.set(new GroupAndInvokeCount(group));
     83     }
     84 
     85     /**
     86      * Returns the GroupAndInvokeCount object for this thread.
     87      */
     88     static GroupAndInvokeCount getGroupAndInvokeCount() {
     89         return myGroupAndInvokeCount.get();
     90     }
     91 
     92     /**
     93      * Returns true if the current thread is in a channel group's thread pool
     94      */
     95     static boolean isBoundToAnyGroup() {
     96         return myGroupAndInvokeCount.get() != null;
     97     }
     98 
     99     /**
    100      * Returns true if the current thread is in the given channel's thread
    101      * pool and we haven't exceeded the maximum number of handler frames on
    102      * the stack.
    103      */
    104     static boolean mayInvokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
    105                                    AsynchronousChannelGroupImpl group)
    106     {
    107         if ((myGroupAndInvokeCount != null) &&
    108             (myGroupAndInvokeCount.group() == group) &&
    109             (myGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
    110         {
    111             return true;
    112         }
    113         return false;
    114     }
    115 
    116     /**
    117      * Invoke handler without checking the thread identity or number of handlers
    118      * on the thread stack.
    119      */
    120     static <V,A> void invokeUnchecked(CompletionHandler<V,? super A> handler,
    121                                       A attachment,
    122                                       V value,
    123                                       Throwable exc)
    124     {
    125         if (exc == null) {
    126             handler.completed(value, attachment);
    127         } else {
    128             handler.failed(exc, attachment);
    129         }
    130 
    131         // clear interrupt
    132         Thread.interrupted();
    133 
    134         // clear thread locals when in default thread pool
    135         // Android-changed: System.getSecurityManager always returns null.
    136         // if (System.getSecurityManager() != null) {
    137         //    Thread me = Thread.currentThread();
    138         //     if (me instanceof sun.misc.InnocuousThread) {
    139         //        GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
    140         //        ((sun.misc.InnocuousThread)me).eraseThreadLocals();
    141         //        if (thisGroupAndInvokeCount != null) {
    142         //            myGroupAndInvokeCount.set(thisGroupAndInvokeCount);
    143         //        }
    144         //    }
    145         // }
    146     }
    147 
    148     /**
    149      * Invoke handler assuming thread identity already checked
    150      */
    151     static <V,A> void invokeDirect(GroupAndInvokeCount myGroupAndInvokeCount,
    152                                    CompletionHandler<V,? super A> handler,
    153                                    A attachment,
    154                                    V result,
    155                                    Throwable exc)
    156     {
    157         myGroupAndInvokeCount.incrementInvokeCount();
    158         Invoker.invokeUnchecked(handler, attachment, result, exc);
    159     }
    160 
    161     /**
    162      * Invokes the handler. If the current thread is in the channel group's
    163      * thread pool then the handler is invoked directly, otherwise it is
    164      * invoked indirectly.
    165      */
    166     static <V,A> void invoke(AsynchronousChannel channel,
    167                              CompletionHandler<V,? super A> handler,
    168                              A attachment,
    169                              V result,
    170                              Throwable exc)
    171     {
    172         boolean invokeDirect = false;
    173         boolean identityOkay = false;
    174         GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
    175         if (thisGroupAndInvokeCount != null) {
    176             if ((thisGroupAndInvokeCount.group() == ((Groupable)channel).group()))
    177                 identityOkay = true;
    178             if (identityOkay &&
    179                 (thisGroupAndInvokeCount.invokeCount() < maxHandlerInvokeCount))
    180             {
    181                 // group match
    182                 invokeDirect = true;
    183             }
    184         }
    185         if (invokeDirect) {
    186             invokeDirect(thisGroupAndInvokeCount, handler, attachment, result, exc);
    187         } else {
    188             try {
    189                 invokeIndirectly(channel, handler, attachment, result, exc);
    190             } catch (RejectedExecutionException ree) {
    191                 // channel group shutdown; fallback to invoking directly
    192                 // if the current thread has the right identity.
    193                 if (identityOkay) {
    194                     invokeDirect(thisGroupAndInvokeCount,
    195                                  handler, attachment, result, exc);
    196                 } else {
    197                     throw new ShutdownChannelGroupException();
    198                 }
    199             }
    200         }
    201     }
    202 
    203     /**
    204      * Invokes the handler indirectly via the channel group's thread pool.
    205      */
    206     static <V,A> void invokeIndirectly(AsynchronousChannel channel,
    207                                        final CompletionHandler<V,? super A> handler,
    208                                        final A attachment,
    209                                        final V result,
    210                                        final Throwable exc)
    211     {
    212         try {
    213             ((Groupable)channel).group().executeOnPooledThread(new Runnable() {
    214                 public void run() {
    215                     GroupAndInvokeCount thisGroupAndInvokeCount =
    216                         myGroupAndInvokeCount.get();
    217                     if (thisGroupAndInvokeCount != null)
    218                         thisGroupAndInvokeCount.setInvokeCount(1);
    219                     invokeUnchecked(handler, attachment, result, exc);
    220                 }
    221             });
    222         } catch (RejectedExecutionException ree) {
    223             throw new ShutdownChannelGroupException();
    224         }
    225     }
    226 
    227     /**
    228      * Invokes the handler "indirectly" in the given Executor
    229      */
    230     static <V,A> void invokeIndirectly(final CompletionHandler<V,? super A> handler,
    231                                        final A attachment,
    232                                        final V value,
    233                                        final Throwable exc,
    234                                        Executor executor)
    235     {
    236          try {
    237             executor.execute(new Runnable() {
    238                 public void run() {
    239                     invokeUnchecked(handler, attachment, value, exc);
    240                 }
    241             });
    242         } catch (RejectedExecutionException ree) {
    243             throw new ShutdownChannelGroupException();
    244         }
    245     }
    246 
    247     /**
    248      * Invokes the given task on the thread pool associated with the given
    249      * channel. If the current thread is in the thread pool then the task is
    250      * invoked directly.
    251      */
    252     static void invokeOnThreadInThreadPool(Groupable channel,
    253                                            Runnable task)
    254     {
    255         boolean invokeDirect;
    256         GroupAndInvokeCount thisGroupAndInvokeCount = myGroupAndInvokeCount.get();
    257         AsynchronousChannelGroupImpl targetGroup = channel.group();
    258         if (thisGroupAndInvokeCount == null) {
    259             invokeDirect = false;
    260         } else {
    261             invokeDirect = (thisGroupAndInvokeCount.group == targetGroup);
    262         }
    263         try {
    264             if (invokeDirect) {
    265                 task.run();
    266             } else {
    267                 targetGroup.executeOnPooledThread(task);
    268             }
    269         } catch (RejectedExecutionException ree) {
    270             throw new ShutdownChannelGroupException();
    271         }
    272     }
    273 
    274     /**
    275      * Invoke handler with completed result. This method does not check the
    276      * thread identity or the number of handlers on the thread stack.
    277      */
    278     static <V,A> void invokeUnchecked(PendingFuture<V,A> future) {
    279         assert future.isDone();
    280         CompletionHandler<V,? super A> handler = future.handler();
    281         if (handler != null) {
    282             invokeUnchecked(handler,
    283                             future.attachment(),
    284                             future.value(),
    285                             future.exception());
    286         }
    287     }
    288 
    289     /**
    290      * Invoke handler with completed result. If the current thread is in the
    291      * channel group's thread pool then the handler is invoked directly,
    292      * otherwise it is invoked indirectly.
    293      */
    294     static <V,A> void invoke(PendingFuture<V,A> future) {
    295         assert future.isDone();
    296         CompletionHandler<V,? super A> handler = future.handler();
    297         if (handler != null) {
    298             invoke(future.channel(),
    299                    handler,
    300                    future.attachment(),
    301                    future.value(),
    302                    future.exception());
    303         }
    304     }
    305 
    306     /**
    307      * Invoke handler with completed result. The handler is invoked indirectly,
    308      * via the channel group's thread pool.
    309      */
    310     static <V,A> void invokeIndirectly(PendingFuture<V,A> future) {
    311         assert future.isDone();
    312         CompletionHandler<V,? super A> handler = future.handler();
    313         if (handler != null) {
    314             invokeIndirectly(future.channel(),
    315                              handler,
    316                              future.attachment(),
    317                              future.value(),
    318                              future.exception());
    319         }
    320     }
    321 }
    322