Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Spliterator;
     28 import java.util.concurrent.CountedCompleter;
     29 import java.util.concurrent.ForkJoinPool;
     30 
     31 /**
     32  * Abstract base class for most fork-join tasks used to implement stream ops.
     33  * Manages splitting logic, tracking of child tasks, and intermediate results.
     34  * Each task is associated with a {@link Spliterator} that describes the portion
     35  * of the input associated with the subtree rooted at this task.
     36  * Tasks may be leaf nodes (which will traverse the elements of
     37  * the {@code Spliterator}) or internal nodes (which split the
     38  * {@code Spliterator} into multiple child tasks).
     39  *
     40  * @implNote
     41  * <p>This class is based on {@link CountedCompleter}, a form of fork-join task
     42  * where each task has a semaphore-like count of uncompleted children, and the
     43  * task is implicitly completed and notified when its last child completes.
     44  * Internal node tasks will likely override the {@code onCompletion} method from
     45  * {@code CountedCompleter} to merge the results from child tasks into the
     46  * current task's result.
     47  *
     48  * <p>Splitting and setting up the child task links is done by {@code compute()}
     49  * for internal nodes.  At {@code compute()} time for leaf nodes, it is
     50  * guaranteed that the parent's child-related fields (including sibling links
     51  * for the parent's children) will be set up for all children.
     52  *
     53  * <p>For example, a task that performs a reduce would override {@code doLeaf()}
     54  * to perform a reduction on that leaf node's chunk using the
     55  * {@code Spliterator}, and override {@code onCompletion()} to merge the results
     56  * of the child tasks for internal nodes:
     57  *
     58  * <pre>{@code
     59  *     protected S doLeaf() {
     60  *         spliterator.forEach(...);
     61  *         return localReductionResult;
     62  *     }
     63  *
     64  *     public void onCompletion(CountedCompleter caller) {
     65  *         if (!isLeaf()) {
     66  *             ReduceTask<P_IN, P_OUT, T, R> child = children;
     67  *             R result = child.getLocalResult();
     68  *             child = child.nextSibling;
     69  *             for (; child != null; child = child.nextSibling)
     70  *                 result = combine(result, child.getLocalResult());
     71  *             setLocalResult(result);
     72  *         }
     73  *     }
     74  * }</pre>
     75  *
     76  * <p>Serialization is not supported as there is no intention to serialize
     77  * tasks managed by stream ops.
     78  *
     79  * @param  Type of elements input to the pipeline
     80  * @param  Type of elements output from the pipeline
     81  * @param <R> Type of intermediate result, which may be different from operation
     82  *        result type
     83  * @param <K> Type of parent, child and sibling tasks
     84  * @since 1.8
     85  */
     86 @SuppressWarnings("serial")
     87 abstract class AbstractTask<P_IN, P_OUT, R,
     88                             K extends AbstractTask<P_IN, P_OUT, R, K>>
     89         extends CountedCompleter<R> {
     90 
     91     /**
     92      * Default target factor of leaf tasks for parallel decomposition.
     93      * To allow load balancing, we over-partition, currently to approximately
     94      * four tasks per processor, which enables others to help out
     95      * if leaf tasks are uneven or some processors are otherwise busy.
     96      */
     97     static final int LEAF_TARGET = ForkJoinPool.getCommonPoolParallelism() << 2;
     98 
     99     /** The pipeline helper, common to all tasks in a computation */
    100     protected final PipelineHelper<P_OUT> helper;
    101 
    102     /**
    103      * The spliterator for the portion of the input associated with the subtree
    104      * rooted at this task
    105      */
    106     protected Spliterator<P_IN> spliterator;
    107 
    108     /** Target leaf size, common to all tasks in a computation */
    109     protected long targetSize; // may be laziliy initialized
    110 
    111     /**
    112      * The left child.
    113      * null if no children
    114      * if non-null rightChild is non-null
    115      */
    116     protected K leftChild;
    117 
    118     /**
    119      * The right child.
    120      * null if no children
    121      * if non-null leftChild is non-null
    122      */
    123     protected K rightChild;
    124 
    125     /** The result of this node, if completed */
    126     private R localResult;
    127 
    128     /**
    129      * Constructor for root nodes.
    130      *
    131      * @param helper The {@code PipelineHelper} describing the stream pipeline
    132      *               up to this operation
    133      * @param spliterator The {@code Spliterator} describing the source for this
    134      *                    pipeline
    135      */
    136     protected AbstractTask(PipelineHelper<P_OUT> helper,
    137                            Spliterator<P_IN> spliterator) {
    138         super(null);
    139         this.helper = helper;
    140         this.spliterator = spliterator;
    141         this.targetSize = 0L;
    142     }
    143 
    144     /**
    145      * Constructor for non-root nodes.
    146      *
    147      * @param parent this node's parent task
    148      * @param spliterator {@code Spliterator} describing the subtree rooted at
    149      *        this node, obtained by splitting the parent {@code Spliterator}
    150      */
    151     protected AbstractTask(K parent,
    152                            Spliterator<P_IN> spliterator) {
    153         super(parent);
    154         this.spliterator = spliterator;
    155         this.helper = parent.helper;
    156         this.targetSize = parent.targetSize;
    157     }
    158 
    159     /**
    160      * Constructs a new node of type T whose parent is the receiver; must call
    161      * the AbstractTask(T, Spliterator) constructor with the receiver and the
    162      * provided Spliterator.
    163      *
    164      * @param spliterator {@code Spliterator} describing the subtree rooted at
    165      *        this node, obtained by splitting the parent {@code Spliterator}
    166      * @return newly constructed child node
    167      */
    168     protected abstract K makeChild(Spliterator<P_IN> spliterator);
    169 
    170     /**
    171      * Computes the result associated with a leaf node.  Will be called by
    172      * {@code compute()} and the result passed to @{code setLocalResult()}
    173      *
    174      * @return the computed result of a leaf node
    175      */
    176     protected abstract R doLeaf();
    177 
    178     /**
    179      * Returns a suggested target leaf size based on the initial size estimate.
    180      *
    181      * @return suggested target leaf size
    182      */
    183     public static long suggestTargetSize(long sizeEstimate) {
    184         long est = sizeEstimate / LEAF_TARGET;
    185         return est > 0L ? est : 1L;
    186     }
    187 
    188     /**
    189      * Returns the targetSize, initializing it via the supplied
    190      * size estimate if not already initialized.
    191      */
    192     protected final long getTargetSize(long sizeEstimate) {
    193         long s;
    194         return ((s = targetSize) != 0 ? s :
    195                 (targetSize = suggestTargetSize(sizeEstimate)));
    196     }
    197 
    198     /**
    199      * Returns the local result, if any. Subclasses should use
    200      * {@link #setLocalResult(Object)} and {@link #getLocalResult()} to manage
    201      * results.  This returns the local result so that calls from within the
    202      * fork-join framework will return the correct result.
    203      *
    204      * @return local result for this node previously stored with
    205      * {@link #setLocalResult}
    206      */
    207     @Override
    208     public R getRawResult() {
    209         return localResult;
    210     }
    211 
    212     /**
    213      * Does nothing; instead, subclasses should use
    214      * {@link #setLocalResult(Object)}} to manage results.
    215      *
    216      * @param result must be null, or an exception is thrown (this is a safety
    217      *        tripwire to detect when {@code setRawResult()} is being used
    218      *        instead of {@code setLocalResult()}
    219      */
    220     @Override
    221     protected void setRawResult(R result) {
    222         if (result != null)
    223             throw new IllegalStateException();
    224     }
    225 
    226     /**
    227      * Retrieves a result previously stored with {@link #setLocalResult}
    228      *
    229      * @return local result for this node previously stored with
    230      * {@link #setLocalResult}
    231      */
    232     protected R getLocalResult() {
    233         return localResult;
    234     }
    235 
    236     /**
    237      * Associates the result with the task, can be retrieved with
    238      * {@link #getLocalResult}
    239      *
    240      * @param localResult local result for this node
    241      */
    242     protected void setLocalResult(R localResult) {
    243         this.localResult = localResult;
    244     }
    245 
    246     /**
    247      * Indicates whether this task is a leaf node.  (Only valid after
    248      * {@link #compute} has been called on this node).  If the node is not a
    249      * leaf node, then children will be non-null and numChildren will be
    250      * positive.
    251      *
    252      * @return {@code true} if this task is a leaf node
    253      */
    254     protected boolean isLeaf() {
    255         return leftChild == null;
    256     }
    257 
    258     /**
    259      * Indicates whether this task is the root node
    260      *
    261      * @return {@code true} if this task is the root node.
    262      */
    263     protected boolean isRoot() {
    264         return getParent() == null;
    265     }
    266 
    267     /**
    268      * Returns the parent of this task, or null if this task is the root
    269      *
    270      * @return the parent of this task, or null if this task is the root
    271      */
    272     @SuppressWarnings("unchecked")
    273     protected K getParent() {
    274         return (K) getCompleter();
    275     }
    276 
    277     /**
    278      * Decides whether or not to split a task further or compute it
    279      * directly. If computing directly, calls {@code doLeaf} and pass
    280      * the result to {@code setRawResult}. Otherwise splits off
    281      * subtasks, forking one and continuing as the other.
    282      *
    283      * <p> The method is structured to conserve resources across a
    284      * range of uses.  The loop continues with one of the child tasks
    285      * when split, to avoid deep recursion. To cope with spliterators
    286      * that may be systematically biased toward left-heavy or
    287      * right-heavy splits, we alternate which child is forked versus
    288      * continued in the loop.
    289      */
    290     @Override
    291     public void compute() {
    292         Spliterator<P_IN> rs = spliterator, ls; // right, left spliterators
    293         long sizeEstimate = rs.estimateSize();
    294         long sizeThreshold = getTargetSize(sizeEstimate);
    295         boolean forkRight = false;
    296         @SuppressWarnings("unchecked") K task = (K) this;
    297         while (sizeEstimate > sizeThreshold && (ls = rs.trySplit()) != null) {
    298             K leftChild, rightChild, taskToFork;
    299             task.leftChild  = leftChild = task.makeChild(ls);
    300             task.rightChild = rightChild = task.makeChild(rs);
    301             task.setPendingCount(1);
    302             if (forkRight) {
    303                 forkRight = false;
    304                 rs = ls;
    305                 task = leftChild;
    306                 taskToFork = rightChild;
    307             }
    308             else {
    309                 forkRight = true;
    310                 task = rightChild;
    311                 taskToFork = leftChild;
    312             }
    313             taskToFork.fork();
    314             sizeEstimate = rs.estimateSize();
    315         }
    316         task.setLocalResult(task.doLeaf());
    317         task.tryComplete();
    318     }
    319 
    320     /**
    321      * {@inheritDoc}
    322      *
    323      * @implNote
    324      * Clears spliterator and children fields.  Overriders MUST call
    325      * {@code super.onCompletion} as the last thing they do if they want these
    326      * cleared.
    327      */
    328     @Override
    329     public void onCompletion(CountedCompleter<?> caller) {
    330         spliterator = null;
    331         leftChild = rightChild = null;
    332     }
    333 
    334     /**
    335      * Returns whether this node is a "leftmost" node -- whether the path from
    336      * the root to this node involves only traversing leftmost child links.  For
    337      * a leaf node, this means it is the first leaf node in the encounter order.
    338      *
    339      * @return {@code true} if this node is a "leftmost" node
    340      */
    341     protected boolean isLeftmostNode() {
    342         @SuppressWarnings("unchecked")
    343         K node = (K) this;
    344         while (node != null) {
    345             K parent = node.getParent();
    346             if (parent != null && parent.leftChild != node)
    347                 return false;
    348             node = parent;
    349         }
    350         return true;
    351     }
    352 }
    353