Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Objects;
     28 import java.util.Optional;
     29 import java.util.OptionalDouble;
     30 import java.util.OptionalInt;
     31 import java.util.OptionalLong;
     32 import java.util.Spliterator;
     33 import java.util.concurrent.CountedCompleter;
     34 import java.util.function.BiConsumer;
     35 import java.util.function.BiFunction;
     36 import java.util.function.BinaryOperator;
     37 import java.util.function.DoubleBinaryOperator;
     38 import java.util.function.IntBinaryOperator;
     39 import java.util.function.LongBinaryOperator;
     40 import java.util.function.ObjDoubleConsumer;
     41 import java.util.function.ObjIntConsumer;
     42 import java.util.function.ObjLongConsumer;
     43 import java.util.function.Supplier;
     44 
     45 /**
     46  * Factory for creating instances of {@code TerminalOp} that implement
     47  * reductions.
     48  *
     49  * @since 1.8
     50  */
     51 final class ReduceOps {
     52 
     53     private ReduceOps() { }
     54 
     55     /**
     56      * Constructs a {@code TerminalOp} that implements a functional reduce on
     57      * reference values.
     58      *
     59      * @param <T> the type of the input elements
     60      * @param <U> the type of the result
     61      * @param seed the identity element for the reduction
     62      * @param reducer the accumulating function that incorporates an additional
     63      *        input element into the result
     64      * @param combiner the combining function that combines two intermediate
     65      *        results
     66      * @return a {@code TerminalOp} implementing the reduction
     67      */
     68     public static <T, U> TerminalOp<T, U>
     69     makeRef(U seed, BiFunction<U, ? super T, U> reducer, BinaryOperator<U> combiner) {
     70         Objects.requireNonNull(reducer);
     71         Objects.requireNonNull(combiner);
     72         class ReducingSink extends Box<U> implements AccumulatingSink<T, U, ReducingSink> {
     73             @Override
     74             public void begin(long size) {
     75                 state = seed;
     76             }
     77 
     78             @Override
     79             public void accept(T t) {
     80                 state = reducer.apply(state, t);
     81             }
     82 
     83             @Override
     84             public void combine(ReducingSink other) {
     85                 state = combiner.apply(state, other.state);
     86             }
     87         }
     88         return new ReduceOp<T, U, ReducingSink>(StreamShape.REFERENCE) {
     89             @Override
     90             public ReducingSink makeSink() {
     91                 return new ReducingSink();
     92             }
     93         };
     94     }
     95 
     96     /**
     97      * Constructs a {@code TerminalOp} that implements a functional reduce on
     98      * reference values producing an optional reference result.
     99      *
    100      * @param <T> The type of the input elements, and the type of the result
    101      * @param operator The reducing function
    102      * @return A {@code TerminalOp} implementing the reduction
    103      */
    104     public static <T> TerminalOp<T, Optional<T>>
    105     makeRef(BinaryOperator<T> operator) {
    106         Objects.requireNonNull(operator);
    107         class ReducingSink
    108                 implements AccumulatingSink<T, Optional<T>, ReducingSink> {
    109             private boolean empty;
    110             private T state;
    111 
    112             public void begin(long size) {
    113                 empty = true;
    114                 state = null;
    115             }
    116 
    117             @Override
    118             public void accept(T t) {
    119                 if (empty) {
    120                     empty = false;
    121                     state = t;
    122                 } else {
    123                     state = operator.apply(state, t);
    124                 }
    125             }
    126 
    127             @Override
    128             public Optional<T> get() {
    129                 return empty ? Optional.empty() : Optional.of(state);
    130             }
    131 
    132             @Override
    133             public void combine(ReducingSink other) {
    134                 if (!other.empty)
    135                     accept(other.state);
    136             }
    137         }
    138         return new ReduceOp<T, Optional<T>, ReducingSink>(StreamShape.REFERENCE) {
    139             @Override
    140             public ReducingSink makeSink() {
    141                 return new ReducingSink();
    142             }
    143         };
    144     }
    145 
    146     /**
    147      * Constructs a {@code TerminalOp} that implements a mutable reduce on
    148      * reference values.
    149      *
    150      * @param <T> the type of the input elements
    151      * @param <I> the type of the intermediate reduction result
    152      * @param collector a {@code Collector} defining the reduction
    153      * @return a {@code ReduceOp} implementing the reduction
    154      */
    155     public static <T, I> TerminalOp<T, I>
    156     makeRef(Collector<? super T, I, ?> collector) {
    157         Supplier<I> supplier = Objects.requireNonNull(collector).supplier();
    158         BiConsumer<I, ? super T> accumulator = collector.accumulator();
    159         BinaryOperator<I> combiner = collector.combiner();
    160         class ReducingSink extends Box<I>
    161                 implements AccumulatingSink<T, I, ReducingSink> {
    162             @Override
    163             public void begin(long size) {
    164                 state = supplier.get();
    165             }
    166 
    167             @Override
    168             public void accept(T t) {
    169                 accumulator.accept(state, t);
    170             }
    171 
    172             @Override
    173             public void combine(ReducingSink other) {
    174                 state = combiner.apply(state, other.state);
    175             }
    176         }
    177         return new ReduceOp<T, I, ReducingSink>(StreamShape.REFERENCE) {
    178             @Override
    179             public ReducingSink makeSink() {
    180                 return new ReducingSink();
    181             }
    182 
    183             @Override
    184             public int getOpFlags() {
    185                 return collector.characteristics().contains(Collector.Characteristics.UNORDERED)
    186                        ? StreamOpFlag.NOT_ORDERED
    187                        : 0;
    188             }
    189         };
    190     }
    191 
    192     /**
    193      * Constructs a {@code TerminalOp} that implements a mutable reduce on
    194      * reference values.
    195      *
    196      * @param <T> the type of the input elements
    197      * @param <R> the type of the result
    198      * @param seedFactory a factory to produce a new base accumulator
    199      * @param accumulator a function to incorporate an element into an
    200      *        accumulator
    201      * @param reducer a function to combine an accumulator into another
    202      * @return a {@code TerminalOp} implementing the reduction
    203      */
    204     public static <T, R> TerminalOp<T, R>
    205     makeRef(Supplier<R> seedFactory,
    206             BiConsumer<R, ? super T> accumulator,
    207             BiConsumer<R,R> reducer) {
    208         Objects.requireNonNull(seedFactory);
    209         Objects.requireNonNull(accumulator);
    210         Objects.requireNonNull(reducer);
    211         class ReducingSink extends Box<R>
    212                 implements AccumulatingSink<T, R, ReducingSink> {
    213             @Override
    214             public void begin(long size) {
    215                 state = seedFactory.get();
    216             }
    217 
    218             @Override
    219             public void accept(T t) {
    220                 accumulator.accept(state, t);
    221             }
    222 
    223             @Override
    224             public void combine(ReducingSink other) {
    225                 reducer.accept(state, other.state);
    226             }
    227         }
    228         return new ReduceOp<T, R, ReducingSink>(StreamShape.REFERENCE) {
    229             @Override
    230             public ReducingSink makeSink() {
    231                 return new ReducingSink();
    232             }
    233         };
    234     }
    235 
    236     /**
    237      * Constructs a {@code TerminalOp} that implements a functional reduce on
    238      * {@code int} values.
    239      *
    240      * @param identity the identity for the combining function
    241      * @param operator the combining function
    242      * @return a {@code TerminalOp} implementing the reduction
    243      */
    244     public static TerminalOp<Integer, Integer>
    245     makeInt(int identity, IntBinaryOperator operator) {
    246         Objects.requireNonNull(operator);
    247         class ReducingSink
    248                 implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
    249             private int state;
    250 
    251             @Override
    252             public void begin(long size) {
    253                 state = identity;
    254             }
    255 
    256             @Override
    257             public void accept(int t) {
    258                 state = operator.applyAsInt(state, t);
    259             }
    260 
    261             @Override
    262             public Integer get() {
    263                 return state;
    264             }
    265 
    266             @Override
    267             public void combine(ReducingSink other) {
    268                 accept(other.state);
    269             }
    270         }
    271         return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
    272             @Override
    273             public ReducingSink makeSink() {
    274                 return new ReducingSink();
    275             }
    276         };
    277     }
    278 
    279     /**
    280      * Constructs a {@code TerminalOp} that implements a functional reduce on
    281      * {@code int} values, producing an optional integer result.
    282      *
    283      * @param operator the combining function
    284      * @return a {@code TerminalOp} implementing the reduction
    285      */
    286     public static TerminalOp<Integer, OptionalInt>
    287     makeInt(IntBinaryOperator operator) {
    288         Objects.requireNonNull(operator);
    289         class ReducingSink
    290                 implements AccumulatingSink<Integer, OptionalInt, ReducingSink>, Sink.OfInt {
    291             private boolean empty;
    292             private int state;
    293 
    294             public void begin(long size) {
    295                 empty = true;
    296                 state = 0;
    297             }
    298 
    299             @Override
    300             public void accept(int t) {
    301                 if (empty) {
    302                     empty = false;
    303                     state = t;
    304                 }
    305                 else {
    306                     state = operator.applyAsInt(state, t);
    307                 }
    308             }
    309 
    310             @Override
    311             public OptionalInt get() {
    312                 return empty ? OptionalInt.empty() : OptionalInt.of(state);
    313             }
    314 
    315             @Override
    316             public void combine(ReducingSink other) {
    317                 if (!other.empty)
    318                     accept(other.state);
    319             }
    320         }
    321         return new ReduceOp<Integer, OptionalInt, ReducingSink>(StreamShape.INT_VALUE) {
    322             @Override
    323             public ReducingSink makeSink() {
    324                 return new ReducingSink();
    325             }
    326         };
    327     }
    328 
    329     /**
    330      * Constructs a {@code TerminalOp} that implements a mutable reduce on
    331      * {@code int} values.
    332      *
    333      * @param <R> The type of the result
    334      * @param supplier a factory to produce a new accumulator of the result type
    335      * @param accumulator a function to incorporate an int into an
    336      *        accumulator
    337      * @param combiner a function to combine an accumulator into another
    338      * @return A {@code ReduceOp} implementing the reduction
    339      */
    340     public static <R> TerminalOp<Integer, R>
    341     makeInt(Supplier<R> supplier,
    342             ObjIntConsumer<R> accumulator,
    343             BinaryOperator<R> combiner) {
    344         Objects.requireNonNull(supplier);
    345         Objects.requireNonNull(accumulator);
    346         Objects.requireNonNull(combiner);
    347         class ReducingSink extends Box<R>
    348                 implements AccumulatingSink<Integer, R, ReducingSink>, Sink.OfInt {
    349             @Override
    350             public void begin(long size) {
    351                 state = supplier.get();
    352             }
    353 
    354             @Override
    355             public void accept(int t) {
    356                 accumulator.accept(state, t);
    357             }
    358 
    359             @Override
    360             public void combine(ReducingSink other) {
    361                 state = combiner.apply(state, other.state);
    362             }
    363         }
    364         return new ReduceOp<Integer, R, ReducingSink>(StreamShape.INT_VALUE) {
    365             @Override
    366             public ReducingSink makeSink() {
    367                 return new ReducingSink();
    368             }
    369         };
    370     }
    371 
    372     /**
    373      * Constructs a {@code TerminalOp} that implements a functional reduce on
    374      * {@code long} values.
    375      *
    376      * @param identity the identity for the combining function
    377      * @param operator the combining function
    378      * @return a {@code TerminalOp} implementing the reduction
    379      */
    380     public static TerminalOp<Long, Long>
    381     makeLong(long identity, LongBinaryOperator operator) {
    382         Objects.requireNonNull(operator);
    383         class ReducingSink
    384                 implements AccumulatingSink<Long, Long, ReducingSink>, Sink.OfLong {
    385             private long state;
    386 
    387             @Override
    388             public void begin(long size) {
    389                 state = identity;
    390             }
    391 
    392             @Override
    393             public void accept(long t) {
    394                 state = operator.applyAsLong(state, t);
    395             }
    396 
    397             @Override
    398             public Long get() {
    399                 return state;
    400             }
    401 
    402             @Override
    403             public void combine(ReducingSink other) {
    404                 accept(other.state);
    405             }
    406         }
    407         return new ReduceOp<Long, Long, ReducingSink>(StreamShape.LONG_VALUE) {
    408             @Override
    409             public ReducingSink makeSink() {
    410                 return new ReducingSink();
    411             }
    412         };
    413     }
    414 
    415     /**
    416      * Constructs a {@code TerminalOp} that implements a functional reduce on
    417      * {@code long} values, producing an optional long result.
    418      *
    419      * @param operator the combining function
    420      * @return a {@code TerminalOp} implementing the reduction
    421      */
    422     public static TerminalOp<Long, OptionalLong>
    423     makeLong(LongBinaryOperator operator) {
    424         Objects.requireNonNull(operator);
    425         class ReducingSink
    426                 implements AccumulatingSink<Long, OptionalLong, ReducingSink>, Sink.OfLong {
    427             private boolean empty;
    428             private long state;
    429 
    430             public void begin(long size) {
    431                 empty = true;
    432                 state = 0;
    433             }
    434 
    435             @Override
    436             public void accept(long t) {
    437                 if (empty) {
    438                     empty = false;
    439                     state = t;
    440                 }
    441                 else {
    442                     state = operator.applyAsLong(state, t);
    443                 }
    444             }
    445 
    446             @Override
    447             public OptionalLong get() {
    448                 return empty ? OptionalLong.empty() : OptionalLong.of(state);
    449             }
    450 
    451             @Override
    452             public void combine(ReducingSink other) {
    453                 if (!other.empty)
    454                     accept(other.state);
    455             }
    456         }
    457         return new ReduceOp<Long, OptionalLong, ReducingSink>(StreamShape.LONG_VALUE) {
    458             @Override
    459             public ReducingSink makeSink() {
    460                 return new ReducingSink();
    461             }
    462         };
    463     }
    464 
    465     /**
    466      * Constructs a {@code TerminalOp} that implements a mutable reduce on
    467      * {@code long} values.
    468      *
    469      * @param <R> the type of the result
    470      * @param supplier a factory to produce a new accumulator of the result type
    471      * @param accumulator a function to incorporate an int into an
    472      *        accumulator
    473      * @param combiner a function to combine an accumulator into another
    474      * @return a {@code TerminalOp} implementing the reduction
    475      */
    476     public static <R> TerminalOp<Long, R>
    477     makeLong(Supplier<R> supplier,
    478              ObjLongConsumer<R> accumulator,
    479              BinaryOperator<R> combiner) {
    480         Objects.requireNonNull(supplier);
    481         Objects.requireNonNull(accumulator);
    482         Objects.requireNonNull(combiner);
    483         class ReducingSink extends Box<R>
    484                 implements AccumulatingSink<Long, R, ReducingSink>, Sink.OfLong {
    485             @Override
    486             public void begin(long size) {
    487                 state = supplier.get();
    488             }
    489 
    490             @Override
    491             public void accept(long t) {
    492                 accumulator.accept(state, t);
    493             }
    494 
    495             @Override
    496             public void combine(ReducingSink other) {
    497                 state = combiner.apply(state, other.state);
    498             }
    499         }
    500         return new ReduceOp<Long, R, ReducingSink>(StreamShape.LONG_VALUE) {
    501             @Override
    502             public ReducingSink makeSink() {
    503                 return new ReducingSink();
    504             }
    505         };
    506     }
    507 
    508     /**
    509      * Constructs a {@code TerminalOp} that implements a functional reduce on
    510      * {@code double} values.
    511      *
    512      * @param identity the identity for the combining function
    513      * @param operator the combining function
    514      * @return a {@code TerminalOp} implementing the reduction
    515      */
    516     public static TerminalOp<Double, Double>
    517     makeDouble(double identity, DoubleBinaryOperator operator) {
    518         Objects.requireNonNull(operator);
    519         class ReducingSink
    520                 implements AccumulatingSink<Double, Double, ReducingSink>, Sink.OfDouble {
    521             private double state;
    522 
    523             @Override
    524             public void begin(long size) {
    525                 state = identity;
    526             }
    527 
    528             @Override
    529             public void accept(double t) {
    530                 state = operator.applyAsDouble(state, t);
    531             }
    532 
    533             @Override
    534             public Double get() {
    535                 return state;
    536             }
    537 
    538             @Override
    539             public void combine(ReducingSink other) {
    540                 accept(other.state);
    541             }
    542         }
    543         return new ReduceOp<Double, Double, ReducingSink>(StreamShape.DOUBLE_VALUE) {
    544             @Override
    545             public ReducingSink makeSink() {
    546                 return new ReducingSink();
    547             }
    548         };
    549     }
    550 
    551     /**
    552      * Constructs a {@code TerminalOp} that implements a functional reduce on
    553      * {@code double} values, producing an optional double result.
    554      *
    555      * @param operator the combining function
    556      * @return a {@code TerminalOp} implementing the reduction
    557      */
    558     public static TerminalOp<Double, OptionalDouble>
    559     makeDouble(DoubleBinaryOperator operator) {
    560         Objects.requireNonNull(operator);
    561         class ReducingSink
    562                 implements AccumulatingSink<Double, OptionalDouble, ReducingSink>, Sink.OfDouble {
    563             private boolean empty;
    564             private double state;
    565 
    566             public void begin(long size) {
    567                 empty = true;
    568                 state = 0;
    569             }
    570 
    571             @Override
    572             public void accept(double t) {
    573                 if (empty) {
    574                     empty = false;
    575                     state = t;
    576                 }
    577                 else {
    578                     state = operator.applyAsDouble(state, t);
    579                 }
    580             }
    581 
    582             @Override
    583             public OptionalDouble get() {
    584                 return empty ? OptionalDouble.empty() : OptionalDouble.of(state);
    585             }
    586 
    587             @Override
    588             public void combine(ReducingSink other) {
    589                 if (!other.empty)
    590                     accept(other.state);
    591             }
    592         }
    593         return new ReduceOp<Double, OptionalDouble, ReducingSink>(StreamShape.DOUBLE_VALUE) {
    594             @Override
    595             public ReducingSink makeSink() {
    596                 return new ReducingSink();
    597             }
    598         };
    599     }
    600 
    601     /**
    602      * Constructs a {@code TerminalOp} that implements a mutable reduce on
    603      * {@code double} values.
    604      *
    605      * @param <R> the type of the result
    606      * @param supplier a factory to produce a new accumulator of the result type
    607      * @param accumulator a function to incorporate an int into an
    608      *        accumulator
    609      * @param combiner a function to combine an accumulator into another
    610      * @return a {@code TerminalOp} implementing the reduction
    611      */
    612     public static <R> TerminalOp<Double, R>
    613     makeDouble(Supplier<R> supplier,
    614                ObjDoubleConsumer<R> accumulator,
    615                BinaryOperator<R> combiner) {
    616         Objects.requireNonNull(supplier);
    617         Objects.requireNonNull(accumulator);
    618         Objects.requireNonNull(combiner);
    619         class ReducingSink extends Box<R>
    620                 implements AccumulatingSink<Double, R, ReducingSink>, Sink.OfDouble {
    621             @Override
    622             public void begin(long size) {
    623                 state = supplier.get();
    624             }
    625 
    626             @Override
    627             public void accept(double t) {
    628                 accumulator.accept(state, t);
    629             }
    630 
    631             @Override
    632             public void combine(ReducingSink other) {
    633                 state = combiner.apply(state, other.state);
    634             }
    635         }
    636         return new ReduceOp<Double, R, ReducingSink>(StreamShape.DOUBLE_VALUE) {
    637             @Override
    638             public ReducingSink makeSink() {
    639                 return new ReducingSink();
    640             }
    641         };
    642     }
    643 
    644     /**
    645      * A type of {@code TerminalSink} that implements an associative reducing
    646      * operation on elements of type {@code T} and producing a result of type
    647      * {@code R}.
    648      *
    649      * @param <T> the type of input element to the combining operation
    650      * @param <R> the result type
    651      * @param <K> the type of the {@code AccumulatingSink}.
    652      */
    653     private interface AccumulatingSink<T, R, K extends AccumulatingSink<T, R, K>>
    654             extends TerminalSink<T, R> {
    655         public void combine(K other);
    656     }
    657 
    658     /**
    659      * State box for a single state element, used as a base class for
    660      * {@code AccumulatingSink} instances
    661      *
    662      * @param <U> The type of the state element
    663      */
    664     private static abstract class Box<U> {
    665         U state;
    666 
    667         Box() {} // Avoid creation of special accessor
    668 
    669         public U get() {
    670             return state;
    671         }
    672     }
    673 
    674     /**
    675      * A {@code TerminalOp} that evaluates a stream pipeline and sends the
    676      * output into an {@code AccumulatingSink}, which performs a reduce
    677      * operation. The {@code AccumulatingSink} must represent an associative
    678      * reducing operation.
    679      *
    680      * @param <T> the output type of the stream pipeline
    681      * @param <R> the result type of the reducing operation
    682      * @param <S> the type of the {@code AccumulatingSink}
    683      */
    684     private static abstract class ReduceOp<T, R, S extends AccumulatingSink<T, R, S>>
    685             implements TerminalOp<T, R> {
    686         private final StreamShape inputShape;
    687 
    688         /**
    689          * Create a {@code ReduceOp} of the specified stream shape which uses
    690          * the specified {@code Supplier} to create accumulating sinks.
    691          *
    692          * @param shape The shape of the stream pipeline
    693          */
    694         ReduceOp(StreamShape shape) {
    695             inputShape = shape;
    696         }
    697 
    698         public abstract S makeSink();
    699 
    700         @Override
    701         public StreamShape inputShape() {
    702             return inputShape;
    703         }
    704 
    705         @Override
    706         public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
    707                                            Spliterator<P_IN> spliterator) {
    708             return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    709         }
    710 
    711         @Override
    712         public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
    713                                          Spliterator<P_IN> spliterator) {
    714             return new ReduceTask<>(this, helper, spliterator).invoke().get();
    715         }
    716     }
    717 
    718     /**
    719      * A {@code ForkJoinTask} for performing a parallel reduce operation.
    720      */
    721     @SuppressWarnings("serial")
    722     private static final class ReduceTask<P_IN, P_OUT, R,
    723                                           S extends AccumulatingSink<P_OUT, R, S>>
    724             extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {
    725         private final ReduceOp<P_OUT, R, S> op;
    726 
    727         ReduceTask(ReduceOp<P_OUT, R, S> op,
    728                    PipelineHelper<P_OUT> helper,
    729                    Spliterator<P_IN> spliterator) {
    730             super(helper, spliterator);
    731             this.op = op;
    732         }
    733 
    734         ReduceTask(ReduceTask<P_IN, P_OUT, R, S> parent,
    735                    Spliterator<P_IN> spliterator) {
    736             super(parent, spliterator);
    737             this.op = parent.op;
    738         }
    739 
    740         @Override
    741         protected ReduceTask<P_IN, P_OUT, R, S> makeChild(Spliterator<P_IN> spliterator) {
    742             return new ReduceTask<>(this, spliterator);
    743         }
    744 
    745         @Override
    746         protected S doLeaf() {
    747             return helper.wrapAndCopyInto(op.makeSink(), spliterator);
    748         }
    749 
    750         @Override
    751         public void onCompletion(CountedCompleter<?> caller) {
    752             if (!isLeaf()) {
    753                 S leftResult = leftChild.getLocalResult();
    754                 leftResult.combine(rightChild.getLocalResult());
    755                 setLocalResult(leftResult);
    756             }
    757             // GC spliterator, left and right child
    758             super.onCompletion(caller);
    759         }
    760     }
    761 }
    762