Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Optional;
     28 import java.util.OptionalDouble;
     29 import java.util.OptionalInt;
     30 import java.util.OptionalLong;
     31 import java.util.Spliterator;
     32 import java.util.concurrent.CountedCompleter;
     33 import java.util.function.Predicate;
     34 import java.util.function.Supplier;
     35 
     36 /**
     37  * Factory for instances of a short-circuiting {@code TerminalOp} that searches
     38  * for an element in a stream pipeline, and terminates when it finds one.
     39  * Supported variants include find-first (find the first element in the
     40  * encounter order) and find-any (find any element, may not be the first in
     41  * encounter order.)
     42  *
     43  * @since 1.8
     44  */
     45 final class FindOps {
     46 
     47     private FindOps() { }
     48 
     49     /**
     50      * Constructs a {@code TerminalOp} for streams of objects.
     51      *
     52      * @param <T> the type of elements of the stream
     53      * @param mustFindFirst whether the {@code TerminalOp} must produce the
     54      *        first element in the encounter order
     55      * @return a {@code TerminalOp} implementing the find operation
     56      */
     57     public static <T> TerminalOp<T, Optional<T>> makeRef(boolean mustFindFirst) {
     58         return new FindOp<>(mustFindFirst, StreamShape.REFERENCE, Optional.empty(),
     59                             Optional::isPresent, FindSink.OfRef::new);
     60     }
     61 
     62     /**
     63      * Constructs a {@code TerminalOp} for streams of ints.
     64      *
     65      * @param mustFindFirst whether the {@code TerminalOp} must produce the
     66      *        first element in the encounter order
     67      * @return a {@code TerminalOp} implementing the find operation
     68      */
     69     public static TerminalOp<Integer, OptionalInt> makeInt(boolean mustFindFirst) {
     70         return new FindOp<>(mustFindFirst, StreamShape.INT_VALUE, OptionalInt.empty(),
     71                             OptionalInt::isPresent, FindSink.OfInt::new);
     72     }
     73 
     74     /**
     75      * Constructs a {@code TerminalOp} for streams of longs.
     76      *
     77      * @param mustFindFirst whether the {@code TerminalOp} must produce the
     78      *        first element in the encounter order
     79      * @return a {@code TerminalOp} implementing the find operation
     80      */
     81     public static TerminalOp<Long, OptionalLong> makeLong(boolean mustFindFirst) {
     82         return new FindOp<>(mustFindFirst, StreamShape.LONG_VALUE, OptionalLong.empty(),
     83                             OptionalLong::isPresent, FindSink.OfLong::new);
     84     }
     85 
     86     /**
     87      * Constructs a {@code FindOp} for streams of doubles.
     88      *
     89      * @param mustFindFirst whether the {@code TerminalOp} must produce the
     90      *        first element in the encounter order
     91      * @return a {@code TerminalOp} implementing the find operation
     92      */
     93     public static TerminalOp<Double, OptionalDouble> makeDouble(boolean mustFindFirst) {
     94         return new FindOp<>(mustFindFirst, StreamShape.DOUBLE_VALUE, OptionalDouble.empty(),
     95                             OptionalDouble::isPresent, FindSink.OfDouble::new);
     96     }
     97 
     98     /**
     99      * A short-circuiting {@code TerminalOp} that searches for an element in a
    100      * stream pipeline, and terminates when it finds one.  Implements both
    101      * find-first (find the first element in the encounter order) and find-any
    102      * (find any element, may not be the first in encounter order.)
    103      *
    104      * @param <T> the output type of the stream pipeline
    105      * @param <O> the result type of the find operation, typically an optional
    106      *        type
    107      */
    108     private static final class FindOp<T, O> implements TerminalOp<T, O> {
    109         private final StreamShape shape;
    110         final boolean mustFindFirst;
    111         final O emptyValue;
    112         final Predicate<O> presentPredicate;
    113         final Supplier<TerminalSink<T, O>> sinkSupplier;
    114 
    115         /**
    116          * Constructs a {@code FindOp}.
    117          *
    118          * @param mustFindFirst if true, must find the first element in
    119          *        encounter order, otherwise can find any element
    120          * @param shape stream shape of elements to search
    121          * @param emptyValue result value corresponding to "found nothing"
    122          * @param presentPredicate {@code Predicate} on result value
    123          *        corresponding to "found something"
    124          * @param sinkSupplier supplier for a {@code TerminalSink} implementing
    125          *        the matching functionality
    126          */
    127         FindOp(boolean mustFindFirst,
    128                        StreamShape shape,
    129                        O emptyValue,
    130                        Predicate<O> presentPredicate,
    131                        Supplier<TerminalSink<T, O>> sinkSupplier) {
    132             this.mustFindFirst = mustFindFirst;
    133             this.shape = shape;
    134             this.emptyValue = emptyValue;
    135             this.presentPredicate = presentPredicate;
    136             this.sinkSupplier = sinkSupplier;
    137         }
    138 
    139         @Override
    140         public int getOpFlags() {
    141             return StreamOpFlag.IS_SHORT_CIRCUIT | (mustFindFirst ? 0 : StreamOpFlag.NOT_ORDERED);
    142         }
    143 
    144         @Override
    145         public StreamShape inputShape() {
    146             return shape;
    147         }
    148 
    149         @Override
    150         public <S> O evaluateSequential(PipelineHelper<T> helper,
    151                                         Spliterator<S> spliterator) {
    152             O result = helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).get();
    153             return result != null ? result : emptyValue;
    154         }
    155 
    156         @Override
    157         public <P_IN> O evaluateParallel(PipelineHelper<T> helper,
    158                                          Spliterator<P_IN> spliterator) {
    159             return new FindTask<>(this, helper, spliterator).invoke();
    160         }
    161     }
    162 
    163     /**
    164      * Implementation of @{code TerminalSink} that implements the find
    165      * functionality, requesting cancellation when something has been found
    166      *
    167      * @param <T> The type of input element
    168      * @param <O> The result type, typically an optional type
    169      */
    170     private static abstract class FindSink<T, O> implements TerminalSink<T, O> {
    171         boolean hasValue;
    172         T value;
    173 
    174         FindSink() {} // Avoid creation of special accessor
    175 
    176         @Override
    177         public void accept(T value) {
    178             if (!hasValue) {
    179                 hasValue = true;
    180                 this.value = value;
    181             }
    182         }
    183 
    184         @Override
    185         public boolean cancellationRequested() {
    186             return hasValue;
    187         }
    188 
    189         /** Specialization of {@code FindSink} for reference streams */
    190         static final class OfRef<T> extends FindSink<T, Optional<T>> {
    191             @Override
    192             public Optional<T> get() {
    193                 return hasValue ? Optional.of(value) : null;
    194             }
    195         }
    196 
    197         /** Specialization of {@code FindSink} for int streams */
    198         static final class OfInt extends FindSink<Integer, OptionalInt>
    199                 implements Sink.OfInt {
    200             @Override
    201             public void accept(int value) {
    202                 // Boxing is OK here, since few values will actually flow into the sink
    203                 accept((Integer) value);
    204             }
    205 
    206             @Override
    207             public OptionalInt get() {
    208                 return hasValue ? OptionalInt.of(value) : null;
    209             }
    210         }
    211 
    212         /** Specialization of {@code FindSink} for long streams */
    213         static final class OfLong extends FindSink<Long, OptionalLong>
    214                 implements Sink.OfLong {
    215             @Override
    216             public void accept(long value) {
    217                 // Boxing is OK here, since few values will actually flow into the sink
    218                 accept((Long) value);
    219             }
    220 
    221             @Override
    222             public OptionalLong get() {
    223                 return hasValue ? OptionalLong.of(value) : null;
    224             }
    225         }
    226 
    227         /** Specialization of {@code FindSink} for double streams */
    228         static final class OfDouble extends FindSink<Double, OptionalDouble>
    229                 implements Sink.OfDouble {
    230             @Override
    231             public void accept(double value) {
    232                 // Boxing is OK here, since few values will actually flow into the sink
    233                 accept((Double) value);
    234             }
    235 
    236             @Override
    237             public OptionalDouble get() {
    238                 return hasValue ? OptionalDouble.of(value) : null;
    239             }
    240         }
    241     }
    242 
    243     /**
    244      * {@code ForkJoinTask} implementing parallel short-circuiting search
    245      * @param  Input element type to the stream pipeline
    246      * @param  Output element type from the stream pipeline
    247      * @param <O> Result type from the find operation
    248      */
    249     @SuppressWarnings("serial")
    250     private static final class FindTask<P_IN, P_OUT, O>
    251             extends AbstractShortCircuitTask<P_IN, P_OUT, O, FindTask<P_IN, P_OUT, O>> {
    252         private final FindOp<P_OUT, O> op;
    253 
    254         FindTask(FindOp<P_OUT, O> op,
    255                  PipelineHelper<P_OUT> helper,
    256                  Spliterator<P_IN> spliterator) {
    257             super(helper, spliterator);
    258             this.op = op;
    259         }
    260 
    261         FindTask(FindTask<P_IN, P_OUT, O> parent, Spliterator<P_IN> spliterator) {
    262             super(parent, spliterator);
    263             this.op = parent.op;
    264         }
    265 
    266         @Override
    267         protected FindTask<P_IN, P_OUT, O> makeChild(Spliterator<P_IN> spliterator) {
    268             return new FindTask<>(this, spliterator);
    269         }
    270 
    271         @Override
    272         protected O getEmptyResult() {
    273             return op.emptyValue;
    274         }
    275 
    276         private void foundResult(O answer) {
    277             if (isLeftmostNode())
    278                 shortCircuit(answer);
    279             else
    280                 cancelLaterNodes();
    281         }
    282 
    283         @Override
    284         protected O doLeaf() {
    285             O result = helper.wrapAndCopyInto(op.sinkSupplier.get(), spliterator).get();
    286             if (!op.mustFindFirst) {
    287                 if (result != null)
    288                     shortCircuit(result);
    289                 return null;
    290             }
    291             else {
    292                 if (result != null) {
    293                     foundResult(result);
    294                     return result;
    295                 }
    296                 else
    297                     return null;
    298             }
    299         }
    300 
    301         @Override
    302         public void onCompletion(CountedCompleter<?> caller) {
    303             if (op.mustFindFirst) {
    304                     for (FindTask<P_IN, P_OUT, O> child = leftChild, p = null; child != p;
    305                          p = child, child = rightChild) {
    306                     O result = child.getLocalResult();
    307                     if (result != null && op.presentPredicate.test(result)) {
    308                         setLocalResult(result);
    309                         foundResult(result);
    310                         break;
    311                     }
    312                 }
    313             }
    314             super.onCompletion(caller);
    315         }
    316     }
    317 }
    318 
    319