Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Spliterator;
     28 import java.util.concurrent.atomic.AtomicReference;
     29 
     30 /**
     31  * Abstract class for fork-join tasks used to implement short-circuiting
     32  * stream ops, which can produce a result without processing all elements of the
     33  * stream.
     34  *
     35  * @param  type of input elements to the pipeline
     36  * @param  type of output elements from the pipeline
     37  * @param <R> type of intermediate result, may be different from operation
     38  *        result type
     39  * @param <K> type of child and sibling tasks
     40  * @since 1.8
     41  */
     42 @SuppressWarnings("serial")
     43 abstract class AbstractShortCircuitTask<P_IN, P_OUT, R,
     44                                         K extends AbstractShortCircuitTask<P_IN, P_OUT, R, K>>
     45         extends AbstractTask<P_IN, P_OUT, R, K> {
     46     /**
     47      * The result for this computation; this is shared among all tasks and set
     48      * exactly once
     49      */
     50     protected final AtomicReference<R> sharedResult;
     51 
     52     /**
     53      * Indicates whether this task has been canceled.  Tasks may cancel other
     54      * tasks in the computation under various conditions, such as in a
     55      * find-first operation, a task that finds a value will cancel all tasks
     56      * that are later in the encounter order.
     57      */
     58     protected volatile boolean canceled;
     59 
     60     /**
     61      * Constructor for root tasks.
     62      *
     63      * @param helper the {@code PipelineHelper} describing the stream pipeline
     64      *               up to this operation
     65      * @param spliterator the {@code Spliterator} describing the source for this
     66      *                    pipeline
     67      */
     68     protected AbstractShortCircuitTask(PipelineHelper<P_OUT> helper,
     69                                        Spliterator<P_IN> spliterator) {
     70         super(helper, spliterator);
     71         sharedResult = new AtomicReference<>(null);
     72     }
     73 
     74     /**
     75      * Constructor for non-root nodes.
     76      *
     77      * @param parent parent task in the computation tree
     78      * @param spliterator the {@code Spliterator} for the portion of the
     79      *                    computation tree described by this task
     80      */
     81     protected AbstractShortCircuitTask(K parent,
     82                                        Spliterator<P_IN> spliterator) {
     83         super(parent, spliterator);
     84         sharedResult = parent.sharedResult;
     85     }
     86 
     87     /**
     88      * Returns the value indicating the computation completed with no task
     89      * finding a short-circuitable result.  For example, for a "find" operation,
     90      * this might be null or an empty {@code Optional}.
     91      *
     92      * @return the result to return when no task finds a result
     93      */
     94     protected abstract R getEmptyResult();
     95 
     96     /**
     97      * Overrides AbstractTask version to include checks for early
     98      * exits while splitting or computing.
     99      */
    100     @Override
    101     public void compute() {
    102         Spliterator<P_IN> rs = spliterator, ls;
    103         long sizeEstimate = rs.estimateSize();
    104         long sizeThreshold = getTargetSize(sizeEstimate);
    105         boolean forkRight = false;
    106         @SuppressWarnings("unchecked") K task = (K) this;
    107         AtomicReference<R> sr = sharedResult;
    108         R result;
    109         while ((result = sr.get()) == null) {
    110             if (task.taskCanceled()) {
    111                 result = task.getEmptyResult();
    112                 break;
    113             }
    114             if (sizeEstimate <= sizeThreshold || (ls = rs.trySplit()) == null) {
    115                 result = task.doLeaf();
    116                 break;
    117             }
    118             K leftChild, rightChild, taskToFork;
    119             task.leftChild  = leftChild = task.makeChild(ls);
    120             task.rightChild = rightChild = task.makeChild(rs);
    121             task.setPendingCount(1);
    122             if (forkRight) {
    123                 forkRight = false;
    124                 rs = ls;
    125                 task = leftChild;
    126                 taskToFork = rightChild;
    127             }
    128             else {
    129                 forkRight = true;
    130                 task = rightChild;
    131                 taskToFork = leftChild;
    132             }
    133             taskToFork.fork();
    134             sizeEstimate = rs.estimateSize();
    135         }
    136         task.setLocalResult(result);
    137         task.tryComplete();
    138     }
    139 
    140 
    141     /**
    142      * Declares that a globally valid result has been found.  If another task has
    143      * not already found the answer, the result is installed in
    144      * {@code sharedResult}.  The {@code compute()} method will check
    145      * {@code sharedResult} before proceeding with computation, so this causes
    146      * the computation to terminate early.
    147      *
    148      * @param result the result found
    149      */
    150     protected void shortCircuit(R result) {
    151         if (result != null)
    152             sharedResult.compareAndSet(null, result);
    153     }
    154 
    155     /**
    156      * Sets a local result for this task.  If this task is the root, set the
    157      * shared result instead (if not already set).
    158      *
    159      * @param localResult The result to set for this task
    160      */
    161     @Override
    162     protected void setLocalResult(R localResult) {
    163         if (isRoot()) {
    164             if (localResult != null)
    165                 sharedResult.compareAndSet(null, localResult);
    166         }
    167         else
    168             super.setLocalResult(localResult);
    169     }
    170 
    171     /**
    172      * Retrieves the local result for this task
    173      */
    174     @Override
    175     public R getRawResult() {
    176         return getLocalResult();
    177     }
    178 
    179     /**
    180      * Retrieves the local result for this task.  If this task is the root,
    181      * retrieves the shared result instead.
    182      */
    183     @Override
    184     public R getLocalResult() {
    185         if (isRoot()) {
    186             R answer = sharedResult.get();
    187             return (answer == null) ? getEmptyResult() : answer;
    188         }
    189         else
    190             return super.getLocalResult();
    191     }
    192 
    193     /**
    194      * Mark this task as canceled
    195      */
    196     protected void cancel() {
    197         canceled = true;
    198     }
    199 
    200     /**
    201      * Queries whether this task is canceled.  A task is considered canceled if
    202      * it or any of its parents have been canceled.
    203      *
    204      * @return {@code true} if this task or any parent is canceled.
    205      */
    206     protected boolean taskCanceled() {
    207         boolean cancel = canceled;
    208         if (!cancel) {
    209             for (K parent = getParent(); !cancel && parent != null; parent = parent.getParent())
    210                 cancel = parent.canceled;
    211         }
    212 
    213         return cancel;
    214     }
    215 
    216     /**
    217      * Cancels all tasks which succeed this one in the encounter order.  This
    218      * includes canceling all the current task's right sibling, as well as the
    219      * later right siblings of all its parents.
    220      */
    221     protected void cancelLaterNodes() {
    222         // Go up the tree, cancel right siblings of this node and all parents
    223         for (@SuppressWarnings("unchecked") K parent = getParent(), node = (K) this;
    224              parent != null;
    225              node = parent, parent = parent.getParent()) {
    226             // If node is a left child of parent, then has a right sibling
    227             if (parent.leftChild == node) {
    228                 K rightSibling = parent.rightChild;
    229                 if (!rightSibling.canceled)
    230                     rightSibling.cancel();
    231             }
    232         }
    233     }
    234 }
    235