Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.ArrayList;
     28 import java.util.Arrays;
     29 import java.util.Comparator;
     30 import java.util.Objects;
     31 import java.util.Spliterator;
     32 import java.util.function.IntFunction;
     33 
     34 
     35 /**
     36  * Factory methods for transforming streams into sorted streams.
     37  *
     38  * @since 1.8
     39  */
     40 final class SortedOps {
     41 
     42     private SortedOps() { }
     43 
     44     /**
     45      * Appends a "sorted" operation to the provided stream.
     46      *
     47      * @param <T> the type of both input and output elements
     48      * @param upstream a reference stream with element type T
     49      */
     50     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream) {
     51         return new OfRef<>(upstream);
     52     }
     53 
     54     /**
     55      * Appends a "sorted" operation to the provided stream.
     56      *
     57      * @param <T> the type of both input and output elements
     58      * @param upstream a reference stream with element type T
     59      * @param comparator the comparator to order elements by
     60      */
     61     static <T> Stream<T> makeRef(AbstractPipeline<?, T, ?> upstream,
     62                                 Comparator<? super T> comparator) {
     63         return new OfRef<>(upstream, comparator);
     64     }
     65 
     66     /**
     67      * Appends a "sorted" operation to the provided stream.
     68      *
     69      * @param <T> the type of both input and output elements
     70      * @param upstream a reference stream with element type T
     71      */
     72     static <T> IntStream makeInt(AbstractPipeline<?, Integer, ?> upstream) {
     73         return new OfInt(upstream);
     74     }
     75 
     76     /**
     77      * Appends a "sorted" operation to the provided stream.
     78      *
     79      * @param <T> the type of both input and output elements
     80      * @param upstream a reference stream with element type T
     81      */
     82     static <T> LongStream makeLong(AbstractPipeline<?, Long, ?> upstream) {
     83         return new OfLong(upstream);
     84     }
     85 
     86     /**
     87      * Appends a "sorted" operation to the provided stream.
     88      *
     89      * @param <T> the type of both input and output elements
     90      * @param upstream a reference stream with element type T
     91      */
     92     static <T> DoubleStream makeDouble(AbstractPipeline<?, Double, ?> upstream) {
     93         return new OfDouble(upstream);
     94     }
     95 
     96     /**
     97      * Specialized subtype for sorting reference streams
     98      */
     99     private static final class OfRef<T> extends ReferencePipeline.StatefulOp<T, T> {
    100         /**
    101          * Comparator used for sorting
    102          */
    103         private final boolean isNaturalSort;
    104         private final Comparator<? super T> comparator;
    105 
    106         /**
    107          * Sort using natural order of {@literal <T>} which must be
    108          * {@code Comparable}.
    109          */
    110         OfRef(AbstractPipeline<?, T, ?> upstream) {
    111             super(upstream, StreamShape.REFERENCE,
    112                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    113             this.isNaturalSort = true;
    114             // Will throw CCE when we try to sort if T is not Comparable
    115             @SuppressWarnings("unchecked")
    116             Comparator<? super T> comp = (Comparator<? super T>) Comparator.naturalOrder();
    117             this.comparator = comp;
    118         }
    119 
    120         /**
    121          * Sort using the provided comparator.
    122          *
    123          * @param comparator The comparator to be used to evaluate ordering.
    124          */
    125         OfRef(AbstractPipeline<?, T, ?> upstream, Comparator<? super T> comparator) {
    126             super(upstream, StreamShape.REFERENCE,
    127                   StreamOpFlag.IS_ORDERED | StreamOpFlag.NOT_SORTED);
    128             this.isNaturalSort = false;
    129             this.comparator = Objects.requireNonNull(comparator);
    130         }
    131 
    132         @Override
    133         public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    134             Objects.requireNonNull(sink);
    135 
    136             // If the input is already naturally sorted and this operation
    137             // also naturally sorted then this is a no-op
    138             if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
    139                 return sink;
    140             else if (StreamOpFlag.SIZED.isKnown(flags))
    141                 return new SizedRefSortingSink<>(sink, comparator);
    142             else
    143                 return new RefSortingSink<>(sink, comparator);
    144         }
    145 
    146         @Override
    147         public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
    148                                                  Spliterator<P_IN> spliterator,
    149                                                  IntFunction<T[]> generator) {
    150             // If the input is already naturally sorted and this operation
    151             // naturally sorts then collect the output
    152             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags()) && isNaturalSort) {
    153                 return helper.evaluate(spliterator, false, generator);
    154             }
    155             else {
    156                 // @@@ Weak two-pass parallel implementation; parallel collect, parallel sort
    157                 T[] flattenedData = helper.evaluate(spliterator, true, generator).asArray(generator);
    158                 Arrays.parallelSort(flattenedData, comparator);
    159                 return Nodes.node(flattenedData);
    160             }
    161         }
    162     }
    163 
    164     /**
    165      * Specialized subtype for sorting int streams.
    166      */
    167     private static final class OfInt extends IntPipeline.StatefulOp<Integer> {
    168         OfInt(AbstractPipeline<?, Integer, ?> upstream) {
    169             super(upstream, StreamShape.INT_VALUE,
    170                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    171         }
    172 
    173         @Override
    174         public Sink<Integer> opWrapSink(int flags, Sink<Integer> sink) {
    175             Objects.requireNonNull(sink);
    176 
    177             if (StreamOpFlag.SORTED.isKnown(flags))
    178                 return sink;
    179             else if (StreamOpFlag.SIZED.isKnown(flags))
    180                 return new SizedIntSortingSink(sink);
    181             else
    182                 return new IntSortingSink(sink);
    183         }
    184 
    185         @Override
    186         public <P_IN> Node<Integer> opEvaluateParallel(PipelineHelper<Integer> helper,
    187                                                        Spliterator<P_IN> spliterator,
    188                                                        IntFunction<Integer[]> generator) {
    189             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
    190                 return helper.evaluate(spliterator, false, generator);
    191             }
    192             else {
    193                 Node.OfInt n = (Node.OfInt) helper.evaluate(spliterator, true, generator);
    194 
    195                 int[] content = n.asPrimitiveArray();
    196                 Arrays.parallelSort(content);
    197 
    198                 return Nodes.node(content);
    199             }
    200         }
    201     }
    202 
    203     /**
    204      * Specialized subtype for sorting long streams.
    205      */
    206     private static final class OfLong extends LongPipeline.StatefulOp<Long> {
    207         OfLong(AbstractPipeline<?, Long, ?> upstream) {
    208             super(upstream, StreamShape.LONG_VALUE,
    209                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    210         }
    211 
    212         @Override
    213         public Sink<Long> opWrapSink(int flags, Sink<Long> sink) {
    214             Objects.requireNonNull(sink);
    215 
    216             if (StreamOpFlag.SORTED.isKnown(flags))
    217                 return sink;
    218             else if (StreamOpFlag.SIZED.isKnown(flags))
    219                 return new SizedLongSortingSink(sink);
    220             else
    221                 return new LongSortingSink(sink);
    222         }
    223 
    224         @Override
    225         public <P_IN> Node<Long> opEvaluateParallel(PipelineHelper<Long> helper,
    226                                                     Spliterator<P_IN> spliterator,
    227                                                     IntFunction<Long[]> generator) {
    228             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
    229                 return helper.evaluate(spliterator, false, generator);
    230             }
    231             else {
    232                 Node.OfLong n = (Node.OfLong) helper.evaluate(spliterator, true, generator);
    233 
    234                 long[] content = n.asPrimitiveArray();
    235                 Arrays.parallelSort(content);
    236 
    237                 return Nodes.node(content);
    238             }
    239         }
    240     }
    241 
    242     /**
    243      * Specialized subtype for sorting double streams.
    244      */
    245     private static final class OfDouble extends DoublePipeline.StatefulOp<Double> {
    246         OfDouble(AbstractPipeline<?, Double, ?> upstream) {
    247             super(upstream, StreamShape.DOUBLE_VALUE,
    248                   StreamOpFlag.IS_ORDERED | StreamOpFlag.IS_SORTED);
    249         }
    250 
    251         @Override
    252         public Sink<Double> opWrapSink(int flags, Sink<Double> sink) {
    253             Objects.requireNonNull(sink);
    254 
    255             if (StreamOpFlag.SORTED.isKnown(flags))
    256                 return sink;
    257             else if (StreamOpFlag.SIZED.isKnown(flags))
    258                 return new SizedDoubleSortingSink(sink);
    259             else
    260                 return new DoubleSortingSink(sink);
    261         }
    262 
    263         @Override
    264         public <P_IN> Node<Double> opEvaluateParallel(PipelineHelper<Double> helper,
    265                                                       Spliterator<P_IN> spliterator,
    266                                                       IntFunction<Double[]> generator) {
    267             if (StreamOpFlag.SORTED.isKnown(helper.getStreamAndOpFlags())) {
    268                 return helper.evaluate(spliterator, false, generator);
    269             }
    270             else {
    271                 Node.OfDouble n = (Node.OfDouble) helper.evaluate(spliterator, true, generator);
    272 
    273                 double[] content = n.asPrimitiveArray();
    274                 Arrays.parallelSort(content);
    275 
    276                 return Nodes.node(content);
    277             }
    278         }
    279     }
    280 
    281     /**
    282      * Abstract {@link Sink} for implementing sort on reference streams.
    283      *
    284      * <p>
    285      * Note: documentation below applies to reference and all primitive sinks.
    286      * <p>
    287      * Sorting sinks first accept all elements, buffering then into an array
    288      * or a re-sizable data structure, if the size of the pipeline is known or
    289      * unknown respectively.  At the end of the sink protocol those elements are
    290      * sorted and then pushed downstream.
    291      * This class records if {@link #cancellationRequested} is called.  If so it
    292      * can be inferred that the source pushing source elements into the pipeline
    293      * knows that the pipeline is short-circuiting.  In such cases sub-classes
    294      * pushing elements downstream will preserve the short-circuiting protocol
    295      * by calling {@code downstream.cancellationRequested()} and checking the
    296      * result is {@code false} before an element is pushed.
    297      * <p>
    298      * Note that the above behaviour is an optimization for sorting with
    299      * sequential streams.  It is not an error that more elements, than strictly
    300      * required to produce a result, may flow through the pipeline.  This can
    301      * occur, in general (not restricted to just sorting), for short-circuiting
    302      * parallel pipelines.
    303      */
    304     private static abstract class AbstractRefSortingSink<T> extends Sink.ChainedReference<T, T> {
    305         protected final Comparator<? super T> comparator;
    306         // @@@ could be a lazy final value, if/when support is added
    307         protected boolean cancellationWasRequested;
    308 
    309         AbstractRefSortingSink(Sink<? super T> downstream, Comparator<? super T> comparator) {
    310             super(downstream);
    311             this.comparator = comparator;
    312         }
    313 
    314         /**
    315          * Records is cancellation is requested so short-circuiting behaviour
    316          * can be preserved when the sorted elements are pushed downstream.
    317          *
    318          * @return false, as this sink never short-circuits.
    319          */
    320         @Override
    321         public final boolean cancellationRequested() {
    322             cancellationWasRequested = true;
    323             return false;
    324         }
    325     }
    326 
    327     /**
    328      * {@link Sink} for implementing sort on SIZED reference streams.
    329      */
    330     private static final class SizedRefSortingSink<T> extends AbstractRefSortingSink<T> {
    331         private T[] array;
    332         private int offset;
    333 
    334         SizedRefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
    335             super(sink, comparator);
    336         }
    337 
    338         @Override
    339         @SuppressWarnings("unchecked")
    340         public void begin(long size) {
    341             if (size >= Nodes.MAX_ARRAY_SIZE)
    342                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    343             array = (T[]) new Object[(int) size];
    344         }
    345 
    346         @Override
    347         public void end() {
    348             Arrays.sort(array, 0, offset, comparator);
    349             downstream.begin(offset);
    350             if (!cancellationWasRequested) {
    351                 for (int i = 0; i < offset; i++)
    352                     downstream.accept(array[i]);
    353             }
    354             else {
    355                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
    356                     downstream.accept(array[i]);
    357             }
    358             downstream.end();
    359             array = null;
    360         }
    361 
    362         @Override
    363         public void accept(T t) {
    364             array[offset++] = t;
    365         }
    366     }
    367 
    368     /**
    369      * {@link Sink} for implementing sort on reference streams.
    370      */
    371     private static final class RefSortingSink<T> extends AbstractRefSortingSink<T> {
    372         private ArrayList<T> list;
    373 
    374         RefSortingSink(Sink<? super T> sink, Comparator<? super T> comparator) {
    375             super(sink, comparator);
    376         }
    377 
    378         @Override
    379         public void begin(long size) {
    380             if (size >= Nodes.MAX_ARRAY_SIZE)
    381                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    382             list = (size >= 0) ? new ArrayList<T>((int) size) : new ArrayList<T>();
    383         }
    384 
    385         @Override
    386         public void end() {
    387             list.sort(comparator);
    388             downstream.begin(list.size());
    389             if (!cancellationWasRequested) {
    390                 list.forEach(downstream::accept);
    391             }
    392             else {
    393                 for (T t : list) {
    394                     if (downstream.cancellationRequested()) break;
    395                     downstream.accept(t);
    396                 }
    397             }
    398             downstream.end();
    399             list = null;
    400         }
    401 
    402         @Override
    403         public void accept(T t) {
    404             list.add(t);
    405         }
    406     }
    407 
    408     /**
    409      * Abstract {@link Sink} for implementing sort on int streams.
    410      */
    411     private static abstract class AbstractIntSortingSink extends Sink.ChainedInt<Integer> {
    412         protected boolean cancellationWasRequested;
    413 
    414         AbstractIntSortingSink(Sink<? super Integer> downstream) {
    415             super(downstream);
    416         }
    417 
    418         @Override
    419         public final boolean cancellationRequested() {
    420             cancellationWasRequested = true;
    421             return false;
    422         }
    423     }
    424 
    425     /**
    426      * {@link Sink} for implementing sort on SIZED int streams.
    427      */
    428     private static final class SizedIntSortingSink extends AbstractIntSortingSink {
    429         private int[] array;
    430         private int offset;
    431 
    432         SizedIntSortingSink(Sink<? super Integer> downstream) {
    433             super(downstream);
    434         }
    435 
    436         @Override
    437         public void begin(long size) {
    438             if (size >= Nodes.MAX_ARRAY_SIZE)
    439                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    440             array = new int[(int) size];
    441         }
    442 
    443         @Override
    444         public void end() {
    445             Arrays.sort(array, 0, offset);
    446             downstream.begin(offset);
    447             if (!cancellationWasRequested) {
    448                 for (int i = 0; i < offset; i++)
    449                     downstream.accept(array[i]);
    450             }
    451             else {
    452                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
    453                     downstream.accept(array[i]);
    454             }
    455             downstream.end();
    456             array = null;
    457         }
    458 
    459         @Override
    460         public void accept(int t) {
    461             array[offset++] = t;
    462         }
    463     }
    464 
    465     /**
    466      * {@link Sink} for implementing sort on int streams.
    467      */
    468     private static final class IntSortingSink extends AbstractIntSortingSink {
    469         private SpinedBuffer.OfInt b;
    470 
    471         IntSortingSink(Sink<? super Integer> sink) {
    472             super(sink);
    473         }
    474 
    475         @Override
    476         public void begin(long size) {
    477             if (size >= Nodes.MAX_ARRAY_SIZE)
    478                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    479             b = (size > 0) ? new SpinedBuffer.OfInt((int) size) : new SpinedBuffer.OfInt();
    480         }
    481 
    482         @Override
    483         public void end() {
    484             int[] ints = b.asPrimitiveArray();
    485             Arrays.sort(ints);
    486             downstream.begin(ints.length);
    487             if (!cancellationWasRequested) {
    488                 for (int anInt : ints)
    489                     downstream.accept(anInt);
    490             }
    491             else {
    492                 for (int anInt : ints) {
    493                     if (downstream.cancellationRequested()) break;
    494                     downstream.accept(anInt);
    495                 }
    496             }
    497             downstream.end();
    498         }
    499 
    500         @Override
    501         public void accept(int t) {
    502             b.accept(t);
    503         }
    504     }
    505 
    506     /**
    507      * Abstract {@link Sink} for implementing sort on long streams.
    508      */
    509     private static abstract class AbstractLongSortingSink extends Sink.ChainedLong<Long> {
    510         protected boolean cancellationWasRequested;
    511 
    512         AbstractLongSortingSink(Sink<? super Long> downstream) {
    513             super(downstream);
    514         }
    515 
    516         @Override
    517         public final boolean cancellationRequested() {
    518             cancellationWasRequested = true;
    519             return false;
    520         }
    521     }
    522 
    523     /**
    524      * {@link Sink} for implementing sort on SIZED long streams.
    525      */
    526     private static final class SizedLongSortingSink extends AbstractLongSortingSink {
    527         private long[] array;
    528         private int offset;
    529 
    530         SizedLongSortingSink(Sink<? super Long> downstream) {
    531             super(downstream);
    532         }
    533 
    534         @Override
    535         public void begin(long size) {
    536             if (size >= Nodes.MAX_ARRAY_SIZE)
    537                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    538             array = new long[(int) size];
    539         }
    540 
    541         @Override
    542         public void end() {
    543             Arrays.sort(array, 0, offset);
    544             downstream.begin(offset);
    545             if (!cancellationWasRequested) {
    546                 for (int i = 0; i < offset; i++)
    547                     downstream.accept(array[i]);
    548             }
    549             else {
    550                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
    551                     downstream.accept(array[i]);
    552             }
    553             downstream.end();
    554             array = null;
    555         }
    556 
    557         @Override
    558         public void accept(long t) {
    559             array[offset++] = t;
    560         }
    561     }
    562 
    563     /**
    564      * {@link Sink} for implementing sort on long streams.
    565      */
    566     private static final class LongSortingSink extends AbstractLongSortingSink {
    567         private SpinedBuffer.OfLong b;
    568 
    569         LongSortingSink(Sink<? super Long> sink) {
    570             super(sink);
    571         }
    572 
    573         @Override
    574         public void begin(long size) {
    575             if (size >= Nodes.MAX_ARRAY_SIZE)
    576                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    577             b = (size > 0) ? new SpinedBuffer.OfLong((int) size) : new SpinedBuffer.OfLong();
    578         }
    579 
    580         @Override
    581         public void end() {
    582             long[] longs = b.asPrimitiveArray();
    583             Arrays.sort(longs);
    584             downstream.begin(longs.length);
    585             if (!cancellationWasRequested) {
    586                 for (long aLong : longs)
    587                     downstream.accept(aLong);
    588             }
    589             else {
    590                 for (long aLong : longs) {
    591                     if (downstream.cancellationRequested()) break;
    592                     downstream.accept(aLong);
    593                 }
    594             }
    595             downstream.end();
    596         }
    597 
    598         @Override
    599         public void accept(long t) {
    600             b.accept(t);
    601         }
    602     }
    603 
    604     /**
    605      * Abstract {@link Sink} for implementing sort on long streams.
    606      */
    607     private static abstract class AbstractDoubleSortingSink extends Sink.ChainedDouble<Double> {
    608         protected boolean cancellationWasRequested;
    609 
    610         AbstractDoubleSortingSink(Sink<? super Double> downstream) {
    611             super(downstream);
    612         }
    613 
    614         @Override
    615         public final boolean cancellationRequested() {
    616             cancellationWasRequested = true;
    617             return false;
    618         }
    619     }
    620 
    621     /**
    622      * {@link Sink} for implementing sort on SIZED double streams.
    623      */
    624     private static final class SizedDoubleSortingSink extends AbstractDoubleSortingSink {
    625         private double[] array;
    626         private int offset;
    627 
    628         SizedDoubleSortingSink(Sink<? super Double> downstream) {
    629             super(downstream);
    630         }
    631 
    632         @Override
    633         public void begin(long size) {
    634             if (size >= Nodes.MAX_ARRAY_SIZE)
    635                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    636             array = new double[(int) size];
    637         }
    638 
    639         @Override
    640         public void end() {
    641             Arrays.sort(array, 0, offset);
    642             downstream.begin(offset);
    643             if (!cancellationWasRequested) {
    644                 for (int i = 0; i < offset; i++)
    645                     downstream.accept(array[i]);
    646             }
    647             else {
    648                 for (int i = 0; i < offset && !downstream.cancellationRequested(); i++)
    649                     downstream.accept(array[i]);
    650             }
    651             downstream.end();
    652             array = null;
    653         }
    654 
    655         @Override
    656         public void accept(double t) {
    657             array[offset++] = t;
    658         }
    659     }
    660 
    661     /**
    662      * {@link Sink} for implementing sort on double streams.
    663      */
    664     private static final class DoubleSortingSink extends AbstractDoubleSortingSink {
    665         private SpinedBuffer.OfDouble b;
    666 
    667         DoubleSortingSink(Sink<? super Double> sink) {
    668             super(sink);
    669         }
    670 
    671         @Override
    672         public void begin(long size) {
    673             if (size >= Nodes.MAX_ARRAY_SIZE)
    674                 throw new IllegalArgumentException(Nodes.BAD_SIZE);
    675             b = (size > 0) ? new SpinedBuffer.OfDouble((int) size) : new SpinedBuffer.OfDouble();
    676         }
    677 
    678         @Override
    679         public void end() {
    680             double[] doubles = b.asPrimitiveArray();
    681             Arrays.sort(doubles);
    682             downstream.begin(doubles.length);
    683             if (!cancellationWasRequested) {
    684                 for (double aDouble : doubles)
    685                     downstream.accept(aDouble);
    686             }
    687             else {
    688                 for (double aDouble : doubles) {
    689                     if (downstream.cancellationRequested()) break;
    690                     downstream.accept(aDouble);
    691                 }
    692             }
    693             downstream.end();
    694         }
    695 
    696         @Override
    697         public void accept(double t) {
    698             b.accept(t);
    699         }
    700     }
    701 }
    702