Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.HashSet;
     28 import java.util.LinkedHashSet;
     29 import java.util.Objects;
     30 import java.util.Set;
     31 import java.util.Spliterator;
     32 import java.util.concurrent.ConcurrentHashMap;
     33 import java.util.concurrent.atomic.AtomicBoolean;
     34 import java.util.function.IntFunction;
     35 
     36 /**
     37  * Factory methods for transforming streams into duplicate-free streams, using
     38  * {@link Object#equals(Object)} to determine equality.
     39  *
     40  * @since 1.8
     41  */
     42 final class DistinctOps {
     43 
     44     private DistinctOps() { }
     45 
     46     /**
     47      * Appends a "distinct" operation to the provided stream, and returns the
     48      * new stream.
     49      *
     50      * @param <T> the type of both input and output elements
     51      * @param upstream a reference stream with element type T
     52      * @return the new stream
     53      */
     54     static <T> ReferencePipeline<T, T> makeRef(AbstractPipeline<?, T, ?> upstream) {
     55         return new ReferencePipeline.StatefulOp<T, T>(upstream, StreamShape.REFERENCE,
     56                                                       StreamOpFlag.IS_DISTINCT | StreamOpFlag.NOT_SIZED) {
     57 
     58             <P_IN> Node<T> reduce(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
     59                 // If the stream is SORTED then it should also be ORDERED so the following will also
     60                 // preserve the sort order
     61                 TerminalOp<T, LinkedHashSet<T>> reduceOp
     62                         = ReduceOps.<T, LinkedHashSet<T>>makeRef(LinkedHashSet::new, LinkedHashSet::add,
     63                                                                  LinkedHashSet::addAll);
     64                 return Nodes.node(reduceOp.evaluateParallel(helper, spliterator));
     65             }
     66 
     67             @Override
     68             public <P_IN> Node<T> opEvaluateParallel(PipelineHelper<T> helper,
     69                                               Spliterator<P_IN> spliterator,
     70                                               IntFunction<T[]> generator) {
     71                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
     72                     // No-op
     73                     return helper.evaluate(spliterator, false, generator);
     74                 }
     75                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
     76                     return reduce(helper, spliterator);
     77                 }
     78                 else {
     79                     // Holder of null state since ConcurrentHashMap does not support null values
     80                     AtomicBoolean seenNull = new AtomicBoolean(false);
     81                     ConcurrentHashMap<T, Boolean> map = new ConcurrentHashMap<>();
     82                     TerminalOp<T, Void> forEachOp = ForEachOps.makeRef(t -> {
     83                         if (t == null)
     84                             seenNull.set(true);
     85                         else
     86                             map.putIfAbsent(t, Boolean.TRUE);
     87                     }, false);
     88                     forEachOp.evaluateParallel(helper, spliterator);
     89 
     90                     // If null has been seen then copy the key set into a HashSet that supports null values
     91                     // and add null
     92                     Set<T> keys = map.keySet();
     93                     if (seenNull.get()) {
     94                         // TODO Implement a more efficient set-union view, rather than copying
     95                         keys = new HashSet<>(keys);
     96                         keys.add(null);
     97                     }
     98                     return Nodes.node(keys);
     99                 }
    100             }
    101 
    102             @Override
    103             public <P_IN> Spliterator<T> opEvaluateParallelLazy(PipelineHelper<T> helper, Spliterator<P_IN> spliterator) {
    104                 if (StreamOpFlag.DISTINCT.isKnown(helper.getStreamAndOpFlags())) {
    105                     // No-op
    106                     return helper.wrapSpliterator(spliterator);
    107                 }
    108                 else if (StreamOpFlag.ORDERED.isKnown(helper.getStreamAndOpFlags())) {
    109                     // Not lazy, barrier required to preserve order
    110                     return reduce(helper, spliterator).spliterator();
    111                 }
    112                 else {
    113                     // Lazy
    114                     return new StreamSpliterators.DistinctSpliterator<>(helper.wrapSpliterator(spliterator));
    115                 }
    116             }
    117 
    118             @Override
    119             public Sink<T> opWrapSink(int flags, Sink<T> sink) {
    120                 Objects.requireNonNull(sink);
    121 
    122                 if (StreamOpFlag.DISTINCT.isKnown(flags)) {
    123                     return sink;
    124                 } else if (StreamOpFlag.SORTED.isKnown(flags)) {
    125                     return new Sink.ChainedReference<T, T>(sink) {
    126                         boolean seenNull;
    127                         T lastSeen;
    128 
    129                         @Override
    130                         public void begin(long size) {
    131                             seenNull = false;
    132                             lastSeen = null;
    133                             downstream.begin(-1);
    134                         }
    135 
    136                         @Override
    137                         public void end() {
    138                             seenNull = false;
    139                             lastSeen = null;
    140                             downstream.end();
    141                         }
    142 
    143                         @Override
    144                         public void accept(T t) {
    145                             if (t == null) {
    146                                 if (!seenNull) {
    147                                     seenNull = true;
    148                                     downstream.accept(lastSeen = null);
    149                                 }
    150                             } else if (lastSeen == null || !t.equals(lastSeen)) {
    151                                 downstream.accept(lastSeen = t);
    152                             }
    153                         }
    154                     };
    155                 } else {
    156                     return new Sink.ChainedReference<T, T>(sink) {
    157                         Set<T> seen;
    158 
    159                         @Override
    160                         public void begin(long size) {
    161                             seen = new HashSet<>();
    162                             downstream.begin(-1);
    163                         }
    164 
    165                         @Override
    166                         public void end() {
    167                             seen = null;
    168                             downstream.end();
    169                         }
    170 
    171                         @Override
    172                         public void accept(T t) {
    173                             if (!seen.contains(t)) {
    174                                 seen.add(t);
    175                                 downstream.accept(t);
    176                             }
    177                         }
    178                     };
    179                 }
    180             }
    181         };
    182     }
    183 }
    184