Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Comparator;
     28 import java.util.Iterator;
     29 import java.util.Objects;
     30 import java.util.Optional;
     31 import java.util.Spliterator;
     32 import java.util.Spliterators;
     33 import java.util.function.BiConsumer;
     34 import java.util.function.BiFunction;
     35 import java.util.function.BinaryOperator;
     36 import java.util.function.Consumer;
     37 import java.util.function.DoubleConsumer;
     38 import java.util.function.Function;
     39 import java.util.function.IntConsumer;
     40 import java.util.function.IntFunction;
     41 import java.util.function.LongConsumer;
     42 import java.util.function.Predicate;
     43 import java.util.function.Supplier;
     44 import java.util.function.ToDoubleFunction;
     45 import java.util.function.ToIntFunction;
     46 import java.util.function.ToLongFunction;
     47 
     48 /**
     49  * Abstract base class for an intermediate pipeline stage or pipeline source
     50  * stage implementing whose elements are of type {@code U}.
     51  *
     52  * @param  type of elements in the upstream source
     53  * @param  type of elements in produced by this stage
     54  *
     55  * @since 1.8
     56  * @hide Visible for CTS testing only (OpenJDK8 tests).
     57  */
     58 public abstract class ReferencePipeline<P_IN, P_OUT>
     59         extends AbstractPipeline<P_IN, P_OUT, Stream<P_OUT>>
     60         implements Stream<P_OUT>  {
     61 
     62     /**
     63      * Constructor for the head of a stream pipeline.
     64      *
     65      * @param source {@code Supplier<Spliterator>} describing the stream source
     66      * @param sourceFlags the source flags for the stream source, described in
     67      *        {@link StreamOpFlag}
     68      * @param parallel {@code true} if the pipeline is parallel
     69      */
     70     ReferencePipeline(Supplier<? extends Spliterator<?>> source,
     71                       int sourceFlags, boolean parallel) {
     72         super(source, sourceFlags, parallel);
     73     }
     74 
     75     /**
     76      * Constructor for the head of a stream pipeline.
     77      *
     78      * @param source {@code Spliterator} describing the stream source
     79      * @param sourceFlags The source flags for the stream source, described in
     80      *        {@link StreamOpFlag}
     81      * @param parallel {@code true} if the pipeline is parallel
     82      */
     83     ReferencePipeline(Spliterator<?> source,
     84                       int sourceFlags, boolean parallel) {
     85         super(source, sourceFlags, parallel);
     86     }
     87 
     88     /**
     89      * Constructor for appending an intermediate operation onto an existing
     90      * pipeline.
     91      *
     92      * @param upstream the upstream element source.
     93      */
     94     ReferencePipeline(AbstractPipeline<?, P_IN, ?> upstream, int opFlags) {
     95         super(upstream, opFlags);
     96     }
     97 
     98     // Shape-specific methods
     99 
    100     @Override
    101     public final StreamShape getOutputShape() {
    102         return StreamShape.REFERENCE;
    103     }
    104 
    105     @Override
    106     public final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper,
    107                                         Spliterator<P_IN> spliterator,
    108                                         boolean flattenTree,
    109                                         IntFunction<P_OUT[]> generator) {
    110         return Nodes.collect(helper, spliterator, flattenTree, generator);
    111     }
    112 
    113     @Override
    114     public final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
    115                                      Supplier<Spliterator<P_IN>> supplier,
    116                                      boolean isParallel) {
    117         return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
    118     }
    119 
    120     @Override
    121     public final Spliterator<P_OUT> lazySpliterator(Supplier<? extends Spliterator<P_OUT>> supplier) {
    122         return new StreamSpliterators.DelegatingSpliterator<>(supplier);
    123     }
    124 
    125     @Override
    126     public final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
    127         do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    128     }
    129 
    130     @Override
    131     public final Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown, IntFunction<P_OUT[]> generator) {
    132         return Nodes.builder(exactSizeIfKnown, generator);
    133     }
    134 
    135 
    136     // BaseStream
    137 
    138     @Override
    139     public final Iterator<P_OUT> iterator() {
    140         return Spliterators.iterator(spliterator());
    141     }
    142 
    143 
    144     // Stream
    145 
    146     // Stateless intermediate operations from Stream
    147 
    148     @Override
    149     public Stream<P_OUT> unordered() {
    150         if (!isOrdered())
    151             return this;
    152         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_ORDERED) {
    153             @Override
    154             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
    155                 return sink;
    156             }
    157         };
    158     }
    159 
    160     @Override
    161     public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    162         Objects.requireNonNull(predicate);
    163         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
    164                                      StreamOpFlag.NOT_SIZED) {
    165             @Override
    166             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
    167                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
    168                     @Override
    169                     public void begin(long size) {
    170                         downstream.begin(-1);
    171                     }
    172 
    173                     @Override
    174                     public void accept(P_OUT u) {
    175                         if (predicate.test(u))
    176                             downstream.accept(u);
    177                     }
    178                 };
    179             }
    180         };
    181     }
    182 
    183     @Override
    184     @SuppressWarnings("unchecked")
    185     public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
    186         Objects.requireNonNull(mapper);
    187         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
    188                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
    189             @Override
    190             public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    191                 return new Sink.ChainedReference<P_OUT, R>(sink) {
    192                     @Override
    193                     public void accept(P_OUT u) {
    194                         downstream.accept(mapper.apply(u));
    195                     }
    196                 };
    197             }
    198         };
    199     }
    200 
    201     @Override
    202     public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
    203         Objects.requireNonNull(mapper);
    204         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    205                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
    206             @Override
    207             public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
    208                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
    209                     @Override
    210                     public void accept(P_OUT u) {
    211                         downstream.accept(mapper.applyAsInt(u));
    212                     }
    213                 };
    214             }
    215         };
    216     }
    217 
    218     @Override
    219     public final LongStream mapToLong(ToLongFunction<? super P_OUT> mapper) {
    220         Objects.requireNonNull(mapper);
    221         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    222                                       StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
    223             @Override
    224             public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
    225                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
    226                     @Override
    227                     public void accept(P_OUT u) {
    228                         downstream.accept(mapper.applyAsLong(u));
    229                     }
    230                 };
    231             }
    232         };
    233     }
    234 
    235     @Override
    236     public final DoubleStream mapToDouble(ToDoubleFunction<? super P_OUT> mapper) {
    237         Objects.requireNonNull(mapper);
    238         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    239                                         StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
    240             @Override
    241             public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
    242                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
    243                     @Override
    244                     public void accept(P_OUT u) {
    245                         downstream.accept(mapper.applyAsDouble(u));
    246                     }
    247                 };
    248             }
    249         };
    250     }
    251 
    252     @Override
    253     public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
    254         Objects.requireNonNull(mapper);
    255         // We can do better than this, by polling cancellationRequested when stream is infinite
    256         return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
    257                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
    258             @Override
    259             public Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
    260                 return new Sink.ChainedReference<P_OUT, R>(sink) {
    261                     @Override
    262                     public void begin(long size) {
    263                         downstream.begin(-1);
    264                     }
    265 
    266                     @Override
    267                     public void accept(P_OUT u) {
    268                         try (Stream<? extends R> result = mapper.apply(u)) {
    269                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
    270                             if (result != null)
    271                                 result.sequential().forEach(downstream);
    272                         }
    273                     }
    274                 };
    275             }
    276         };
    277     }
    278 
    279     @Override
    280     public final IntStream flatMapToInt(Function<? super P_OUT, ? extends IntStream> mapper) {
    281         Objects.requireNonNull(mapper);
    282         // We can do better than this, by polling cancellationRequested when stream is infinite
    283         return new IntPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    284                                               StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
    285             @Override
    286             public Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
    287                 return new Sink.ChainedReference<P_OUT, Integer>(sink) {
    288                     IntConsumer downstreamAsInt = downstream::accept;
    289                     @Override
    290                     public void begin(long size) {
    291                         downstream.begin(-1);
    292                     }
    293 
    294                     @Override
    295                     public void accept(P_OUT u) {
    296                         try (IntStream result = mapper.apply(u)) {
    297                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
    298                             if (result != null)
    299                                 result.sequential().forEach(downstreamAsInt);
    300                         }
    301                     }
    302                 };
    303             }
    304         };
    305     }
    306 
    307     @Override
    308     public final DoubleStream flatMapToDouble(Function<? super P_OUT, ? extends DoubleStream> mapper) {
    309         Objects.requireNonNull(mapper);
    310         // We can do better than this, by polling cancellationRequested when stream is infinite
    311         return new DoublePipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    312                                                      StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
    313             @Override
    314             public Sink<P_OUT> opWrapSink(int flags, Sink<Double> sink) {
    315                 return new Sink.ChainedReference<P_OUT, Double>(sink) {
    316                     DoubleConsumer downstreamAsDouble = downstream::accept;
    317                     @Override
    318                     public void begin(long size) {
    319                         downstream.begin(-1);
    320                     }
    321 
    322                     @Override
    323                     public void accept(P_OUT u) {
    324                         try (DoubleStream result = mapper.apply(u)) {
    325                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
    326                             if (result != null)
    327                                 result.sequential().forEach(downstreamAsDouble);
    328                         }
    329                     }
    330                 };
    331             }
    332         };
    333     }
    334 
    335     @Override
    336     public final LongStream flatMapToLong(Function<? super P_OUT, ? extends LongStream> mapper) {
    337         Objects.requireNonNull(mapper);
    338         // We can do better than this, by polling cancellationRequested when stream is infinite
    339         return new LongPipeline.StatelessOp<P_OUT>(this, StreamShape.REFERENCE,
    340                                                    StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
    341             @Override
    342             public Sink<P_OUT> opWrapSink(int flags, Sink<Long> sink) {
    343                 return new Sink.ChainedReference<P_OUT, Long>(sink) {
    344                     LongConsumer downstreamAsLong = downstream::accept;
    345                     @Override
    346                     public void begin(long size) {
    347                         downstream.begin(-1);
    348                     }
    349 
    350                     @Override
    351                     public void accept(P_OUT u) {
    352                         try (LongStream result = mapper.apply(u)) {
    353                             // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
    354                             if (result != null)
    355                                 result.sequential().forEach(downstreamAsLong);
    356                         }
    357                     }
    358                 };
    359             }
    360         };
    361     }
    362 
    363     @Override
    364     public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
    365         Objects.requireNonNull(action);
    366         return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
    367                                      0) {
    368             @Override
    369             public Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
    370                 return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
    371                     @Override
    372                     public void accept(P_OUT u) {
    373                         action.accept(u);
    374                         downstream.accept(u);
    375                     }
    376                 };
    377             }
    378         };
    379     }
    380 
    381     // Stateful intermediate operations from Stream
    382 
    383     @Override
    384     public final Stream<P_OUT> distinct() {
    385         return DistinctOps.makeRef(this);
    386     }
    387 
    388     @Override
    389     public final Stream<P_OUT> sorted() {
    390         return SortedOps.makeRef(this);
    391     }
    392 
    393     @Override
    394     public final Stream<P_OUT> sorted(Comparator<? super P_OUT> comparator) {
    395         return SortedOps.makeRef(this, comparator);
    396     }
    397 
    398     @Override
    399     public final Stream<P_OUT> limit(long maxSize) {
    400         if (maxSize < 0)
    401             throw new IllegalArgumentException(Long.toString(maxSize));
    402         return SliceOps.makeRef(this, 0, maxSize);
    403     }
    404 
    405     @Override
    406     public final Stream<P_OUT> skip(long n) {
    407         if (n < 0)
    408             throw new IllegalArgumentException(Long.toString(n));
    409         if (n == 0)
    410             return this;
    411         else
    412             return SliceOps.makeRef(this, n, -1);
    413     }
    414 
    415     // Terminal operations from Stream
    416 
    417     @Override
    418     public void forEach(Consumer<? super P_OUT> action) {
    419         evaluate(ForEachOps.makeRef(action, false));
    420     }
    421 
    422     @Override
    423     public void forEachOrdered(Consumer<? super P_OUT> action) {
    424         evaluate(ForEachOps.makeRef(action, true));
    425     }
    426 
    427     @Override
    428     @SuppressWarnings("unchecked")
    429     public final <A> A[] toArray(IntFunction<A[]> generator) {
    430         // Since A has no relation to U (not possible to declare that A is an upper bound of U)
    431         // there will be no static type checking.
    432         // Therefore use a raw type and assume A == U rather than propagating the separation of A and U
    433         // throughout the code-base.
    434         // The runtime type of U is never checked for equality with the component type of the runtime type of A[].
    435         // Runtime checking will be performed when an element is stored in A[], thus if A is not a
    436         // super type of U an ArrayStoreException will be thrown.
    437         @SuppressWarnings("rawtypes")
    438         IntFunction rawGenerator = (IntFunction) generator;
    439         // TODO(b/29399275): Eclipse compiler requires explicit (Node<A[]>) cast below.
    440         return (A[]) Nodes.flatten((Node<A[]>) evaluateToArrayNode(rawGenerator), rawGenerator)
    441                 .asArray(rawGenerator);
    442     }
    443 
    444     @Override
    445     public final Object[] toArray() {
    446         return toArray(Object[]::new);
    447     }
    448 
    449     @Override
    450     public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
    451         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
    452     }
    453 
    454     @Override
    455     public final boolean allMatch(Predicate<? super P_OUT> predicate) {
    456         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ALL));
    457     }
    458 
    459     @Override
    460     public final boolean noneMatch(Predicate<? super P_OUT> predicate) {
    461         return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.NONE));
    462     }
    463 
    464     @Override
    465     public final Optional<P_OUT> findFirst() {
    466         return evaluate(FindOps.makeRef(true));
    467     }
    468 
    469     @Override
    470     public final Optional<P_OUT> findAny() {
    471         return evaluate(FindOps.makeRef(false));
    472     }
    473 
    474     @Override
    475     public final P_OUT reduce(final P_OUT identity, final BinaryOperator<P_OUT> accumulator) {
    476         return evaluate(ReduceOps.makeRef(identity, accumulator, accumulator));
    477     }
    478 
    479     @Override
    480     public final Optional<P_OUT> reduce(BinaryOperator<P_OUT> accumulator) {
    481         return evaluate(ReduceOps.makeRef(accumulator));
    482     }
    483 
    484     @Override
    485     public final <R> R reduce(R identity, BiFunction<R, ? super P_OUT, R> accumulator, BinaryOperator<R> combiner) {
    486         return evaluate(ReduceOps.makeRef(identity, accumulator, combiner));
    487     }
    488 
    489     @Override
    490     @SuppressWarnings("unchecked")
    491     public final <R, A> R collect(Collector<? super P_OUT, A, R> collector) {
    492         A container;
    493         if (isParallel()
    494                 && (collector.characteristics().contains(Collector.Characteristics.CONCURRENT))
    495                 && (!isOrdered() || collector.characteristics().contains(Collector.Characteristics.UNORDERED))) {
    496             container = collector.supplier().get();
    497             BiConsumer<A, ? super P_OUT> accumulator = collector.accumulator();
    498             forEach(u -> accumulator.accept(container, u));
    499         }
    500         else {
    501             container = evaluate(ReduceOps.makeRef(collector));
    502         }
    503         return collector.characteristics().contains(Collector.Characteristics.IDENTITY_FINISH)
    504                ? (R) container
    505                : collector.finisher().apply(container);
    506     }
    507 
    508     @Override
    509     public final <R> R collect(Supplier<R> supplier,
    510                                BiConsumer<R, ? super P_OUT> accumulator,
    511                                BiConsumer<R, R> combiner) {
    512         return evaluate(ReduceOps.makeRef(supplier, accumulator, combiner));
    513     }
    514 
    515     @Override
    516     public final Optional<P_OUT> max(Comparator<? super P_OUT> comparator) {
    517         return reduce(BinaryOperator.maxBy(comparator));
    518     }
    519 
    520     @Override
    521     public final Optional<P_OUT> min(Comparator<? super P_OUT> comparator) {
    522         return reduce(BinaryOperator.minBy(comparator));
    523 
    524     }
    525 
    526     @Override
    527     public final long count() {
    528         return mapToLong(e -> 1L).sum();
    529     }
    530 
    531 
    532     //
    533 
    534     /**
    535      * Source stage of a ReferencePipeline.
    536      *
    537      * @param  type of elements in the upstream source
    538      * @param  type of elements in produced by this stage
    539      * @since 1.8
    540      * @hide Visible for CTS testing only (OpenJDK8 tests).
    541      */
    542     public static class Head<E_IN, E_OUT> extends ReferencePipeline<E_IN, E_OUT> {
    543         /**
    544          * Constructor for the source stage of a Stream.
    545          *
    546          * @param source {@code Supplier<Spliterator>} describing the stream
    547          *               source
    548          * @param sourceFlags the source flags for the stream source, described
    549          *                    in {@link StreamOpFlag}
    550          */
    551         public Head(Supplier<? extends Spliterator<?>> source,
    552              int sourceFlags, boolean parallel) {
    553             super(source, sourceFlags, parallel);
    554         }
    555 
    556         /**
    557          * Constructor for the source stage of a Stream.
    558          *
    559          * @param source {@code Spliterator} describing the stream source
    560          * @param sourceFlags the source flags for the stream source, described
    561          *                    in {@link StreamOpFlag}
    562          */
    563         public Head(Spliterator<?> source,
    564              int sourceFlags, boolean parallel) {
    565             super(source, sourceFlags, parallel);
    566         }
    567 
    568         @Override
    569         public final boolean opIsStateful() {
    570             throw new UnsupportedOperationException();
    571         }
    572 
    573         @Override
    574         public final Sink<E_IN> opWrapSink(int flags, Sink<E_OUT> sink) {
    575             throw new UnsupportedOperationException();
    576         }
    577 
    578         // Optimized sequential terminal operations for the head of the pipeline
    579 
    580         @Override
    581         public void forEach(Consumer<? super E_OUT> action) {
    582             if (!isParallel()) {
    583                 sourceStageSpliterator().forEachRemaining(action);
    584             }
    585             else {
    586                 super.forEach(action);
    587             }
    588         }
    589 
    590         @Override
    591         public void forEachOrdered(Consumer<? super E_OUT> action) {
    592             if (!isParallel()) {
    593                 sourceStageSpliterator().forEachRemaining(action);
    594             }
    595             else {
    596                 super.forEachOrdered(action);
    597             }
    598         }
    599     }
    600 
    601     /**
    602      * Base class for a stateless intermediate stage of a Stream.
    603      *
    604      * @param  type of elements in the upstream source
    605      * @param  type of elements in produced by this stage
    606      * @since 1.8
    607      * @hide Visible for CTS testing only (OpenJDK8 tests).
    608      */
    609     public abstract static class StatelessOp<E_IN, E_OUT>
    610             extends ReferencePipeline<E_IN, E_OUT> {
    611         /**
    612          * Construct a new Stream by appending a stateless intermediate
    613          * operation to an existing stream.
    614          *
    615          * @param upstream The upstream pipeline stage
    616          * @param inputShape The stream shape for the upstream pipeline stage
    617          * @param opFlags Operation flags for the new stage
    618          */
    619         public StatelessOp(AbstractPipeline<?, E_IN, ?> upstream,
    620                     StreamShape inputShape,
    621                     int opFlags) {
    622             super(upstream, opFlags);
    623             assert upstream.getOutputShape() == inputShape;
    624         }
    625 
    626         @Override
    627         public final boolean opIsStateful() {
    628             return false;
    629         }
    630     }
    631 
    632     /**
    633      * Base class for a stateful intermediate stage of a Stream.
    634      *
    635      * @param  type of elements in the upstream source
    636      * @param  type of elements in produced by this stage
    637      * @since 1.8
    638      * @hide Visible for CTS testing only (OpenJDK8 tests).
    639      */
    640     public abstract static class StatefulOp<E_IN, E_OUT>
    641             extends ReferencePipeline<E_IN, E_OUT> {
    642         /**
    643          * Construct a new Stream by appending a stateful intermediate operation
    644          * to an existing stream.
    645          * @param upstream The upstream pipeline stage
    646          * @param inputShape The stream shape for the upstream pipeline stage
    647          * @param opFlags Operation flags for the new stage
    648          */
    649         public StatefulOp(AbstractPipeline<?, E_IN, ?> upstream,
    650                    StreamShape inputShape,
    651                    int opFlags) {
    652             super(upstream, opFlags);
    653             assert upstream.getOutputShape() == inputShape;
    654         }
    655 
    656         @Override
    657         public final boolean opIsStateful() {
    658             return true;
    659         }
    660 
    661         @Override
    662         public abstract <P_IN> Node<E_OUT> opEvaluateParallel(PipelineHelper<E_OUT> helper,
    663                                                        Spliterator<P_IN> spliterator,
    664                                                        IntFunction<E_OUT[]> generator);
    665     }
    666 }
    667