Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Spliterator;
     28 import java.util.concurrent.CountedCompleter;
     29 import java.util.function.IntFunction;
     30 
     31 /**
     32  * Factory for instances of a short-circuiting stateful intermediate operations
     33  * that produce subsequences of their input stream.
     34  *
     35  * @since 1.8
     36  */
     37 final class SliceOps {
     38 
     39     // No instances
     40     private SliceOps() { }
     41 
     42     /**
     43      * Calculates the sliced size given the current size, number of elements
     44      * skip, and the number of elements to limit.
     45      *
     46      * @param size the current size
     47      * @param skip the number of elements to skip, assumed to be >= 0
     48      * @param limit the number of elements to limit, assumed to be >= 0, with
     49      *        a value of {@code Long.MAX_VALUE} if there is no limit
     50      * @return the sliced size
     51      */
     52     private static long calcSize(long size, long skip, long limit) {
     53         return size >= 0 ? Math.max(-1, Math.min(size - skip, limit)) : -1;
     54     }
     55 
     56     /**
     57      * Calculates the slice fence, which is one past the index of the slice
     58      * range
     59      * @param skip the number of elements to skip, assumed to be >= 0
     60      * @param limit the number of elements to limit, assumed to be >= 0, with
     61      *        a value of {@code Long.MAX_VALUE} if there is no limit
     62      * @return the slice fence.
     63      */
     64     private static long calcSliceFence(long skip, long limit) {
     65         long sliceFence = limit >= 0 ? skip + limit : Long.MAX_VALUE;
     66         // Check for overflow
     67         return (sliceFence >= 0) ? sliceFence : Long.MAX_VALUE;
     68     }
     69 
     70     /**
     71      * Creates a slice spliterator given a stream shape governing the
     72      * spliterator type.  Requires that the underlying Spliterator
     73      * be SUBSIZED.
     74      */
     75     @SuppressWarnings("unchecked")
     76     private static <P_IN> Spliterator<P_IN> sliceSpliterator(StreamShape shape,
     77                                                              Spliterator<P_IN> s,
     78                                                              long skip, long limit) {
     79         assert s.hasCharacteristics(Spliterator.SUBSIZED);
     80         long sliceFence = calcSliceFence(skip, limit);
     81         switch (shape) {
     82             case REFERENCE:
     83                 return new StreamSpliterators
     84                         .SliceSpliterator.OfRef<>(s, skip, sliceFence);
     85             case INT_VALUE:
     86                 return (Spliterator<P_IN>) new StreamSpliterators
     87                         .SliceSpliterator.OfInt((Spliterator.OfInt) s, skip, sliceFence);
     88             case LONG_VALUE:
     89                 return (Spliterator<P_IN>) new StreamSpliterators
     90                         .SliceSpliterator.OfLong((Spliterator.OfLong) s, skip, sliceFence);
     91             case DOUBLE_VALUE:
     92                 return (Spliterator<P_IN>) new StreamSpliterators
     93                         .SliceSpliterator.OfDouble((Spliterator.OfDouble) s, skip, sliceFence);
     94             default:
     95                 throw new IllegalStateException("Unknown shape " + shape);
     96         }
     97     }
     98 
     99     @SuppressWarnings("unchecked")
    100     private static <T> IntFunction<T[]> castingArray() {
    101         return size -> (T[]) new Object[size];
    102     }
    103 
    104     /**
    105      * Appends a "slice" operation to the provided stream.  The slice operation
    106      * may be may be skip-only, limit-only, or skip-and-limit.
    107      *
    108      * @param <T> the type of both input and output elements
    109      * @param upstream a reference stream with element type T
    110      * @param skip the number of elements to skip.  Must be >= 0.
    111      * @param limit the maximum size of the resulting stream, or -1 if no limit
    112      *        is to be imposed
    113      */
    114     public static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
    115                                         long skip, long limit) {
    116         if (skip < 0)
    117             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
    118 
    119         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
    120                                                       flags(limit)) {
    121             Spliterator<T> unorderedSkipLimitSpliterator(Spliterator<T> s,
    122                                                          long skip, long limit, long sizeIfKnown) {
    123                 if (skip <= sizeIfKnown) {
    124                     // Use just the limit if the number of elements
    125                     // to skip is <= the known pipeline size
    126                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
    127                     skip = 0;
    128                 }
    129                 return new StreamSpliterators.UnorderedSliceSpliterator.OfRef<>(s, skip, limit);
    130             }
    131 
    132             @Override
    133             public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    134                 long size = helper.exactOutputSizeIfKnown(spliterator);
    135                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    136                     return new StreamSpliterators.SliceSpliterator.OfRef<>(
    137                             helper.wrapSpliterator(spliterator),
    138                             skip,
    139                             calcSliceFence(skip, limit));
    140                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    141                     return unorderedSkipLimitSpliterator(
    142                             helper.wrapSpliterator(spliterator),
    143                             skip, limit, size);
    144                 }
    145                 else {
    146                     // @@@ OOMEs will occur for LongStream.longs().filter(i -> true).limit(n)
    147                     //     regardless of the value of n
    148                     //     Need to adjust the target size of splitting for the
    149                     //     SliceTask from say (size / k) to say min(size / k, 1 << 14)
    150                     //     This will limit the size of the buffers created at the leaf nodes
    151                     //     cancellation will be more aggressive cancelling later tasks
    152                     //     if the target slice size has been reached from a given task,
    153                     //     cancellation should also clear local results if any
    154                     return new SliceTask<>(this, helper, spliterator, castingArray(), skip, limit).
    155                             invoke().spliterator();
    156                 }
    157             }
    158 
    159             @Override
    160             public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
    161                                               Spliterator<P_IN> spliterator,
    162                                               IntFunction<T[]> generator) {
    163                 long size = helper.exactOutputSizeIfKnown(spliterator);
    164                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    165                     // Because the pipeline is SIZED the slice spliterator
    166                     // can be created from the source, this requires matching
    167                     // to shape of the source, and is potentially more efficient
    168                     // than creating the slice spliterator from the pipeline
    169                     // wrapping spliterator
    170                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
    171                     return Nodes.collect(helper, s, true, generator);
    172                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    173                     Spliterator<T> s =  unorderedSkipLimitSpliterator(
    174                             helper.wrapSpliterator(spliterator),
    175                             skip, limit, size);
    176                     // Collect using this pipeline, which is empty and therefore
    177                     // can be used with the pipeline wrapping spliterator
    178                     // Note that we cannot create a slice spliterator from
    179                     // the source spliterator if the pipeline is not SIZED
    180                     return Nodes.collect(this, s, true, generator);
    181                 }
    182                 else {
    183                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
    184                             invoke();
    185                 }
    186             }
    187 
    188             @Override
    189             public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    190                 return new Sink.ChainedReference<T, T>(sink) {
    191                     long n = skip;
    192                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
    193 
    194                     @Override
    195                     public void begin(long size) {
    196                         downstream.begin(calcSize(size, skip, m));
    197                     }
    198 
    199                     @Override
    200                     public void accept(T t) {
    201                         if (n == 0) {
    202                             if (m > 0) {
    203                                 m--;
    204                                 downstream.accept(t);
    205                             }
    206                         }
    207                         else {
    208                             n--;
    209                         }
    210                     }
    211 
    212                     @Override
    213                     public boolean cancellationRequested() {
    214                         return m == 0 || downstream.cancellationRequested();
    215                     }
    216                 };
    217             }
    218         };
    219     }
    220 
    221     /**
    222      * Appends a "slice" operation to the provided IntStream.  The slice
    223      * operation may be may be skip-only, limit-only, or skip-and-limit.
    224      *
    225      * @param upstream An IntStream
    226      * @param skip The number of elements to skip.  Must be >= 0.
    227      * @param limit The maximum size of the resulting stream, or -1 if no limit
    228      *        is to be imposed
    229      */
    230     public static IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream,
    231                                     long skip, long limit) {
    232         if (skip < 0)
    233             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
    234 
    235         return new IntPipeline.StatefulOp<Integer>(upstream, StreamShape.INT_VALUE,
    236                                                    flags(limit)) {
    237             Spliterator.OfInt unorderedSkipLimitSpliterator(
    238                     Spliterator.OfInt s, long skip, long limit, long sizeIfKnown) {
    239                 if (skip <= sizeIfKnown) {
    240                     // Use just the limit if the number of elements
    241                     // to skip is <= the known pipeline size
    242                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
    243                     skip = 0;
    244                 }
    245                 return new StreamSpliterators.UnorderedSliceSpliterator.OfInt(s, skip, limit);
    246             }
    247 
    248             @Override
    249             public <P_IN> Spliterator<Integer> opEvaluateParallelLazy(PipelineHelper<Integer> helper,
    250                                                                Spliterator<P_IN> spliterator) {
    251                 long size = helper.exactOutputSizeIfKnown(spliterator);
    252                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    253                     return new StreamSpliterators.SliceSpliterator.OfInt(
    254                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
    255                             skip,
    256                             calcSliceFence(skip, limit));
    257                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    258                     return unorderedSkipLimitSpliterator(
    259                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
    260                             skip, limit, size);
    261                 }
    262                 else {
    263                     return new SliceTask<>(this, helper, spliterator, Integer[]::new, skip, limit).
    264                             invoke().spliterator();
    265                 }
    266             }
    267 
    268             @Override
    269             public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
    270                                                     Spliterator<P_IN> spliterator,
    271                                                     IntFunction<Integer[]> generator) {
    272                 long size = helper.exactOutputSizeIfKnown(spliterator);
    273                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    274                     // Because the pipeline is SIZED the slice spliterator
    275                     // can be created from the source, this requires matching
    276                     // to shape of the source, and is potentially more efficient
    277                     // than creating the slice spliterator from the pipeline
    278                     // wrapping spliterator
    279                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
    280                     return Nodes.collectInt(helper, s, true);
    281                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    282                     Spliterator.OfInt s =  unorderedSkipLimitSpliterator(
    283                             (Spliterator.OfInt) helper.wrapSpliterator(spliterator),
    284                             skip, limit, size);
    285                     // Collect using this pipeline, which is empty and therefore
    286                     // can be used with the pipeline wrapping spliterator
    287                     // Note that we cannot create a slice spliterator from
    288                     // the source spliterator if the pipeline is not SIZED
    289                     return Nodes.collectInt(this, s, true);
    290                 }
    291                 else {
    292                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
    293                             invoke();
    294                 }
    295             }
    296 
    297             @Override
    298             public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
    299                 return new Sink.ChainedInt<Integer>(sink) {
    300                     long n = skip;
    301                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
    302 
    303                     @Override
    304                     public void begin(long size) {
    305                         downstream.begin(calcSize(size, skip, m));
    306                     }
    307 
    308                     @Override
    309                     public void accept(int t) {
    310                         if (n == 0) {
    311                             if (m > 0) {
    312                                 m--;
    313                                 downstream.accept(t);
    314                             }
    315                         }
    316                         else {
    317                             n--;
    318                         }
    319                     }
    320 
    321                     @Override
    322                     public boolean cancellationRequested() {
    323                         return m == 0 || downstream.cancellationRequested();
    324                     }
    325                 };
    326             }
    327         };
    328     }
    329 
    330     /**
    331      * Appends a "slice" operation to the provided LongStream.  The slice
    332      * operation may be may be skip-only, limit-only, or skip-and-limit.
    333      *
    334      * @param upstream A LongStream
    335      * @param skip The number of elements to skip.  Must be >= 0.
    336      * @param limit The maximum size of the resulting stream, or -1 if no limit
    337      *        is to be imposed
    338      */
    339     public static LongStream makeLong(AbstractPipeline<?, Long, ?> upstream,
    340                                       long skip, long limit) {
    341         if (skip < 0)
    342             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
    343 
    344         return new LongPipeline.StatefulOp<Long>(upstream, StreamShape.LONG_VALUE,
    345                                                  flags(limit)) {
    346             Spliterator.OfLong unorderedSkipLimitSpliterator(
    347                     Spliterator.OfLong s, long skip, long limit, long sizeIfKnown) {
    348                 if (skip <= sizeIfKnown) {
    349                     // Use just the limit if the number of elements
    350                     // to skip is <= the known pipeline size
    351                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
    352                     skip = 0;
    353                 }
    354                 return new StreamSpliterators.UnorderedSliceSpliterator.OfLong(s, skip, limit);
    355             }
    356 
    357             @Override
    358             public <P_IN> Spliterator<Long> opEvaluateParallelLazy(PipelineHelper<Long> helper,
    359                                                             Spliterator<P_IN> spliterator) {
    360                 long size = helper.exactOutputSizeIfKnown(spliterator);
    361                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    362                     return new StreamSpliterators.SliceSpliterator.OfLong(
    363                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
    364                             skip,
    365                             calcSliceFence(skip, limit));
    366                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    367                     return unorderedSkipLimitSpliterator(
    368                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
    369                             skip, limit, size);
    370                 }
    371                 else {
    372                     return new SliceTask<>(this, helper, spliterator, Long[]::new, skip, limit).
    373                             invoke().spliterator();
    374                 }
    375             }
    376 
    377             @Override
    378             public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
    379                                                  Spliterator<P_IN> spliterator,
    380                                                  IntFunction<Long[]> generator) {
    381                 long size = helper.exactOutputSizeIfKnown(spliterator);
    382                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    383                     // Because the pipeline is SIZED the slice spliterator
    384                     // can be created from the source, this requires matching
    385                     // to shape of the source, and is potentially more efficient
    386                     // than creating the slice spliterator from the pipeline
    387                     // wrapping spliterator
    388                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
    389                     return Nodes.collectLong(helper, s, true);
    390                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    391                     Spliterator.OfLong s =  unorderedSkipLimitSpliterator(
    392                             (Spliterator.OfLong) helper.wrapSpliterator(spliterator),
    393                             skip, limit, size);
    394                     // Collect using this pipeline, which is empty and therefore
    395                     // can be used with the pipeline wrapping spliterator
    396                     // Note that we cannot create a slice spliterator from
    397                     // the source spliterator if the pipeline is not SIZED
    398                     return Nodes.collectLong(this, s, true);
    399                 }
    400                 else {
    401                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
    402                             invoke();
    403                 }
    404             }
    405 
    406             @Override
    407             public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
    408                 return new Sink.ChainedLong<Long>(sink) {
    409                     long n = skip;
    410                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
    411 
    412                     @Override
    413                     public void begin(long size) {
    414                         downstream.begin(calcSize(size, skip, m));
    415                     }
    416 
    417                     @Override
    418                     public void accept(long t) {
    419                         if (n == 0) {
    420                             if (m > 0) {
    421                                 m--;
    422                                 downstream.accept(t);
    423                             }
    424                         }
    425                         else {
    426                             n--;
    427                         }
    428                     }
    429 
    430                     @Override
    431                     public boolean cancellationRequested() {
    432                         return m == 0 || downstream.cancellationRequested();
    433                     }
    434                 };
    435             }
    436         };
    437     }
    438 
    439     /**
    440      * Appends a "slice" operation to the provided DoubleStream.  The slice
    441      * operation may be may be skip-only, limit-only, or skip-and-limit.
    442      *
    443      * @param upstream A DoubleStream
    444      * @param skip The number of elements to skip.  Must be >= 0.
    445      * @param limit The maximum size of the resulting stream, or -1 if no limit
    446      *        is to be imposed
    447      */
    448     public static DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream,
    449                                           long skip, long limit) {
    450         if (skip < 0)
    451             throw new IllegalArgumentException("Skip must be non-negative: " + skip);
    452 
    453         return new DoublePipeline.StatefulOp<Double>(upstream, StreamShape.DOUBLE_VALUE,
    454                                                      flags(limit)) {
    455             Spliterator.OfDouble unorderedSkipLimitSpliterator(
    456                     Spliterator.OfDouble s, long skip, long limit, long sizeIfKnown) {
    457                 if (skip <= sizeIfKnown) {
    458                     // Use just the limit if the number of elements
    459                     // to skip is <= the known pipeline size
    460                     limit = limit >= 0 ? Math.min(limit, sizeIfKnown - skip) : sizeIfKnown - skip;
    461                     skip = 0;
    462                 }
    463                 return new StreamSpliterators.UnorderedSliceSpliterator.OfDouble(s, skip, limit);
    464             }
    465 
    466             @Override
    467             public <P_IN> Spliterator<Double> opEvaluateParallelLazy(PipelineHelper<Double> helper,
    468                                                               Spliterator<P_IN> spliterator) {
    469                 long size = helper.exactOutputSizeIfKnown(spliterator);
    470                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    471                     return new StreamSpliterators.SliceSpliterator.OfDouble(
    472                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
    473                             skip,
    474                             calcSliceFence(skip, limit));
    475                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    476                     return unorderedSkipLimitSpliterator(
    477                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
    478                             skip, limit, size);
    479                 }
    480                 else {
    481                     return new SliceTask<>(this, helper, spliterator, Double[]::new, skip, limit).
    482                             invoke().spliterator();
    483                 }
    484             }
    485 
    486             @Override
    487             public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
    488                                                    Spliterator<P_IN> spliterator,
    489                                                    IntFunction<Double[]> generator) {
    490                 long size = helper.exactOutputSizeIfKnown(spliterator);
    491                 if (size > 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) {
    492                     // Because the pipeline is SIZED the slice spliterator
    493                     // can be created from the source, this requires matching
    494                     // to shape of the source, and is potentially more efficient
    495                     // than creating the slice spliterator from the pipeline
    496                     // wrapping spliterator
    497                     Spliterator<P_IN> s = sliceSpliterator(helper.getSourceShape(), spliterator, skip, limit);
    498                     return Nodes.collectDouble(helper, s, true);
    499                 } else if (!StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    500                     Spliterator.OfDouble s =  unorderedSkipLimitSpliterator(
    501                             (Spliterator.OfDouble) helper.wrapSpliterator(spliterator),
    502                             skip, limit, size);
    503                     // Collect using this pipeline, which is empty and therefore
    504                     // can be used with the pipeline wrapping spliterator
    505                     // Note that we cannot create a slice spliterator from
    506                     // the source spliterator if the pipeline is not SIZED
    507                     return Nodes.collectDouble(this, s, true);
    508                 }
    509                 else {
    510                     return new SliceTask<>(this, helper, spliterator, generator, skip, limit).
    511                             invoke();
    512                 }
    513             }
    514 
    515             @Override
    516             public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
    517                 return new Sink.ChainedDouble<Double>(sink) {
    518                     long n = skip;
    519                     long m = limit >= 0 ? limit : Long.MAX_VALUE;
    520 
    521                     @Override
    522                     public void begin(long size) {
    523                         downstream.begin(calcSize(size, skip, m));
    524                     }
    525 
    526                     @Override
    527                     public void accept(double t) {
    528                         if (n == 0) {
    529                             if (m > 0) {
    530                                 m--;
    531                                 downstream.accept(t);
    532                             }
    533                         }
    534                         else {
    535                             n--;
    536                         }
    537                     }
    538 
    539                     @Override
    540                     public boolean cancellationRequested() {
    541                         return m == 0 || downstream.cancellationRequested();
    542                     }
    543                 };
    544             }
    545         };
    546     }
    547 
    548     private static int flags(long limit) {
    549         return StreamOpFlag.NOT_SIZED | ((limit != -1) ? StreamOpFlag.IS_SHORT_CIRCUIT : 0);
    550     }
    551 
    552     /**
    553      * {@code ForkJoinTask} implementing slice computation.
    554      *
    555      * @param  Input element type to the stream pipeline
    556      * @param  Output element type from the stream pipeline
    557      */
    558     @SuppressWarnings("serial")
    559     private static final class SliceTask<P_IN, P_OUT>
    560             extends AbstractShortCircuitTask<P_IN, P_OUT, Node<P_OUT>, SliceTask<P_IN, P_OUT>> {
    561         private final AbstractPipeline<P_OUT, P_OUT, ?> op;
    562         private final IntFunction<P_OUT[]> generator;
    563         private final long targetOffset, targetSize;
    564         private long thisNodeSize;
    565 
    566         private volatile boolean completed;
    567 
    568         SliceTask(AbstractPipeline<P_OUT, P_OUT, ?> op,
    569                   PipelineHelper<P_OUT> helper,
    570                   Spliterator<P_IN> spliterator,
    571                   IntFunction<P_OUT[]> generator,
    572                   long offset, long size) {
    573             super(helper, spliterator);
    574             this.op = op;
    575             this.generator = generator;
    576             this.targetOffset = offset;
    577             this.targetSize = size;
    578         }
    579 
    580         SliceTask(SliceTask<P_IN, P_OUT> parent, Spliterator<P_IN> spliterator) {
    581             super(parent, spliterator);
    582             this.op = parent.op;
    583             this.generator = parent.generator;
    584             this.targetOffset = parent.targetOffset;
    585             this.targetSize = parent.targetSize;
    586         }
    587 
    588         @Override
    589         protected SliceTask<P_IN, P_OUT> makeChild(Spliterator<P_IN> spliterator) {
    590             return new SliceTask<>(this, spliterator);
    591         }
    592 
    593         @Override
    594         protected final Node<P_OUT> getEmptyResult() {
    595             return Nodes.emptyNode(op.getOutputShape());
    596         }
    597 
    598         @Override
    599         protected final Node<P_OUT> doLeaf() {
    600             if (isRoot()) {
    601                 long sizeIfKnown = StreamOpFlag.SIZED.isPreserved(op.sourceOrOpFlags)
    602                                    ? op.exactOutputSizeIfKnown(spliterator)
    603                                    : -1;
    604                 final Node.Builder<P_OUT> nb = op.makeNodeBuilder(sizeIfKnown, generator);
    605                 Sink<P_OUT> opSink = op.opWrapSink(helper.getStreamAndOpFlags(), nb);
    606                 helper.copyIntoWithCancel(helper.wrapSink(opSink), spliterator);
    607                 // There is no need to truncate since the op performs the
    608                 // skipping and limiting of elements
    609                 return nb.build();
    610             }
    611             else {
    612                 Node<P_OUT> node = helper.wrapAndCopyInto(helper.makeNodeBuilder(-1, generator),
    613                                                           spliterator).build();
    614                 thisNodeSize = node.count();
    615                 completed = true;
    616                 spliterator = null;
    617                 return node;
    618             }
    619         }
    620 
    621         @Override
    622         public final void onCompletion(CountedCompleter<?> caller) {
    623             if (!isLeaf()) {
    624                 Node<P_OUT> result;
    625                 thisNodeSize = leftChild.thisNodeSize + rightChild.thisNodeSize;
    626                 if (canceled) {
    627                     thisNodeSize = 0;
    628                     result = getEmptyResult();
    629                 }
    630                 else if (thisNodeSize == 0)
    631                     result = getEmptyResult();
    632                 else if (leftChild.thisNodeSize == 0)
    633                     result = rightChild.getLocalResult();
    634                 else {
    635                     result = Nodes.conc(op.getOutputShape(),
    636                                         leftChild.getLocalResult(), rightChild.getLocalResult());
    637                 }
    638                 setLocalResult(isRoot() ? doTruncate(result) : result);
    639                 completed = true;
    640             }
    641             if (targetSize >= 0
    642                 && !isRoot()
    643                 && isLeftCompleted(targetOffset + targetSize))
    644                     cancelLaterNodes();
    645 
    646             super.onCompletion(caller);
    647         }
    648 
    649         @Override
    650         protected void cancel() {
    651             super.cancel();
    652             if (completed)
    653                 setLocalResult(getEmptyResult());
    654         }
    655 
    656         private Node<P_OUT> doTruncate(Node<P_OUT> input) {
    657             long to = targetSize >= 0 ? Math.min(input.count(), targetOffset + targetSize) : thisNodeSize;
    658             return input.truncate(targetOffset, to, generator);
    659         }
    660 
    661         /**
    662          * Determine if the number of completed elements in this node and nodes
    663          * to the left of this node is greater than or equal to the target size.
    664          *
    665          * @param target the target size
    666          * @return true if the number of elements is greater than or equal to
    667          *         the target size, otherwise false.
    668          */
    669         private boolean isLeftCompleted(long target) {
    670             long size = completed ? thisNodeSize : completedSize(target);
    671             if (size >= target)
    672                 return true;
    673             for (SliceTask<P_IN, P_OUT> parent = getParent(), node = this;
    674                  parent != null;
    675                  node = parent, parent = parent.getParent()) {
    676                 if (node == parent.rightChild) {
    677                     SliceTask<P_IN, P_OUT> left = parent.leftChild;
    678                     if (left != null) {
    679                         size += left.completedSize(target);
    680                         if (size >= target)
    681                             return true;
    682                     }
    683                 }
    684             }
    685             return size >= target;
    686         }
    687 
    688         /**
    689          * Compute the number of completed elements in this node.
    690          * <p>
    691          * Computation terminates if all nodes have been processed or the
    692          * number of completed elements is greater than or equal to the target
    693          * size.
    694          *
    695          * @param target the target size
    696          * @return return the number of completed elements
    697          */
    698         private long completedSize(long target) {
    699             if (completed)
    700                 return thisNodeSize;
    701             else {
    702                 SliceTask<P_IN, P_OUT> left = leftChild;
    703                 SliceTask<P_IN, P_OUT> right = rightChild;
    704                 if (left == null || right == null) {
    705                     // must be completed
    706                     return thisNodeSize;
    707                 }
    708                 else {
    709                     long leftSize = left.completedSize(target);
    710                     return (leftSize >= target) ? leftSize : leftSize + right.completedSize(target);
    711                 }
    712             }
    713         }
    714     }
    715 }
    716