Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2013, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.  Oracle designates this
      8  * particular file as subject to the "Classpath" exception as provided
      9  * by Oracle in the LICENSE file that accompanied this code.
     10  *
     11  * This code is distributed in the hope that it will be useful, but WITHOUT
     12  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     13  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     14  * version 2 for more details (a copy is included in the LICENSE file that
     15  * accompanied this code).
     16  *
     17  * You should have received a copy of the GNU General Public License version
     18  * 2 along with this work; if not, write to the Free Software Foundation,
     19  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     20  *
     21  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     22  * or visit www.oracle.com if you need additional information or have any
     23  * questions.
     24  */
     25 package java.util.stream;
     26 
     27 import java.util.Objects;
     28 import java.util.function.Consumer;
     29 import java.util.function.DoubleConsumer;
     30 import java.util.function.IntConsumer;
     31 import java.util.function.LongConsumer;
     32 
     33 /**
     34  * An extension of {@link Consumer} used to conduct values through the stages of
     35  * a stream pipeline, with additional methods to manage size information,
     36  * control flow, etc.  Before calling the {@code accept()} method on a
     37  * {@code Sink} for the first time, you must first call the {@code begin()}
     38  * method to inform it that data is coming (optionally informing the sink how
     39  * much data is coming), and after all data has been sent, you must call the
     40  * {@code end()} method.  After calling {@code end()}, you should not call
     41  * {@code accept()} without again calling {@code begin()}.  {@code Sink} also
     42  * offers a mechanism by which the sink can cooperatively signal that it does
     43  * not wish to receive any more data (the {@code cancellationRequested()}
     44  * method), which a source can poll before sending more data to the
     45  * {@code Sink}.
     46  *
     47  * <p>A sink may be in one of two states: an initial state and an active state.
     48  * It starts out in the initial state; the {@code begin()} method transitions
     49  * it to the active state, and the {@code end()} method transitions it back into
     50  * the initial state, where it can be re-used.  Data-accepting methods (such as
     51  * {@code accept()} are only valid in the active state.
     52  *
     53  * @apiNote
     54  * A stream pipeline consists of a source, zero or more intermediate stages
     55  * (such as filtering or mapping), and a terminal stage, such as reduction or
     56  * for-each.  For concreteness, consider the pipeline:
     57  *
     58  * <pre>{@code
     59  *     int longestStringLengthStartingWithA
     60  *         = strings.stream()
     61  *                  .filter(s -> s.startsWith("A"))
     62  *                  .mapToInt(String::length)
     63  *                  .max();
     64  * }</pre>
     65  *
     66  * <p>Here, we have three stages, filtering, mapping, and reducing.  The
     67  * filtering stage consumes strings and emits a subset of those strings; the
     68  * mapping stage consumes strings and emits ints; the reduction stage consumes
     69  * those ints and computes the maximal value.
     70  *
     71  * <p>A {@code Sink} instance is used to represent each stage of this pipeline,
     72  * whether the stage accepts objects, ints, longs, or doubles.  Sink has entry
     73  * points for {@code accept(Object)}, {@code accept(int)}, etc, so that we do
     74  * not need a specialized interface for each primitive specialization.  (It
     75  * might be called a "kitchen sink" for this omnivorous tendency.)  The entry
     76  * point to the pipeline is the {@code Sink} for the filtering stage, which
     77  * sends some elements "downstream" -- into the {@code Sink} for the mapping
     78  * stage, which in turn sends integral values downstream into the {@code Sink}
     79  * for the reduction stage. The {@code Sink} implementations associated with a
     80  * given stage is expected to know the data type for the next stage, and call
     81  * the correct {@code accept} method on its downstream {@code Sink}.  Similarly,
     82  * each stage must implement the correct {@code accept} method corresponding to
     83  * the data type it accepts.
     84  *
     85  * <p>The specialized subtypes such as {@link Sink.OfInt} override
     86  * {@code accept(Object)} to call the appropriate primitive specialization of
     87  * {@code accept}, implement the appropriate primitive specialization of
     88  * {@code Consumer}, and re-abstract the appropriate primitive specialization of
     89  * {@code accept}.
     90  *
     91  * <p>The chaining subtypes such as {@link ChainedInt} not only implement
     92  * {@code Sink.OfInt}, but also maintain a {@code downstream} field which
     93  * represents the downstream {@code Sink}, and implement the methods
     94  * {@code begin()}, {@code end()}, and {@code cancellationRequested()} to
     95  * delegate to the downstream {@code Sink}.  Most implementations of
     96  * intermediate operations will use these chaining wrappers.  For example, the
     97  * mapping stage in the above example would look like:
     98  *
     99  * <pre>{@code
    100  *     IntSink is = new Sink.ChainedReference<U>(sink) {
    101  *         public void accept(U u) {
    102  *             downstream.accept(mapper.applyAsInt(u));
    103  *         }
    104  *     };
    105  * }</pre>
    106  *
    107  * <p>Here, we implement {@code Sink.ChainedReference<U>}, meaning that we expect
    108  * to receive elements of type {@code U} as input, and pass the downstream sink
    109  * to the constructor.  Because the next stage expects to receive integers, we
    110  * must call the {@code accept(int)} method when emitting values to the downstream.
    111  * The {@code accept()} method applies the mapping function from {@code U} to
    112  * {@code int} and passes the resulting value to the downstream {@code Sink}.
    113  *
    114  * @param <T> type of elements for value streams
    115  * @since 1.8
    116  * @hide Visible for CTS testing only (OpenJDK8 tests).
    117  */
    118 public interface Sink<T> extends Consumer<T> {
    119     /**
    120      * Resets the sink state to receive a fresh data set.  This must be called
    121      * before sending any data to the sink.  After calling {@link #end()},
    122      * you may call this method to reset the sink for another calculation.
    123      * @param size The exact size of the data to be pushed downstream, if
    124      * known or {@code -1} if unknown or infinite.
    125      *
    126      * <p>Prior to this call, the sink must be in the initial state, and after
    127      * this call it is in the active state.
    128      */
    129     default void begin(long size) {}
    130 
    131     /**
    132      * Indicates that all elements have been pushed.  If the {@code Sink} is
    133      * stateful, it should send any stored state downstream at this time, and
    134      * should clear any accumulated state (and associated resources).
    135      *
    136      * <p>Prior to this call, the sink must be in the active state, and after
    137      * this call it is returned to the initial state.
    138      */
    139     default void end() {}
    140 
    141     /**
    142      * Indicates that this {@code Sink} does not wish to receive any more data.
    143      *
    144      * @implSpec The default implementation always returns false.
    145      *
    146      * @return true if cancellation is requested
    147      */
    148     default boolean cancellationRequested() {
    149         return false;
    150     }
    151 
    152     /**
    153      * Accepts an int value.
    154      *
    155      * @implSpec The default implementation throws IllegalStateException.
    156      *
    157      * @throws IllegalStateException if this sink does not accept int values
    158      */
    159     default void accept(int value) {
    160         throw new IllegalStateException("called wrong accept method");
    161     }
    162 
    163     /**
    164      * Accepts a long value.
    165      *
    166      * @implSpec The default implementation throws IllegalStateException.
    167      *
    168      * @throws IllegalStateException if this sink does not accept long values
    169      */
    170     default void accept(long value) {
    171         throw new IllegalStateException("called wrong accept method");
    172     }
    173 
    174     /**
    175      * Accepts a double value.
    176      *
    177      * @implSpec The default implementation throws IllegalStateException.
    178      *
    179      * @throws IllegalStateException if this sink does not accept double values
    180      */
    181     default void accept(double value) {
    182         throw new IllegalStateException("called wrong accept method");
    183     }
    184 
    185     /**
    186      * {@code Sink} that implements {@code Sink<Integer>}, re-abstracts
    187      * {@code accept(int)}, and wires {@code accept(Integer)} to bridge to
    188      * {@code accept(int)}.
    189      */
    190     interface OfInt extends Sink<Integer>, IntConsumer {
    191         @Override
    192         void accept(int value);
    193 
    194         @Override
    195         default void accept(Integer i) {
    196             if (Tripwire.ENABLED)
    197                 Tripwire.trip(getClass(), "{0} calling Sink.OfInt.accept(Integer)");
    198             accept(i.intValue());
    199         }
    200     }
    201 
    202     /**
    203      * {@code Sink} that implements {@code Sink<Long>}, re-abstracts
    204      * {@code accept(long)}, and wires {@code accept(Long)} to bridge to
    205      * {@code accept(long)}.
    206      */
    207     interface OfLong extends Sink<Long>, LongConsumer {
    208         @Override
    209         void accept(long value);
    210 
    211         @Override
    212         default void accept(Long i) {
    213             if (Tripwire.ENABLED)
    214                 Tripwire.trip(getClass(), "{0} calling Sink.OfLong.accept(Long)");
    215             accept(i.longValue());
    216         }
    217     }
    218 
    219     /**
    220      * {@code Sink} that implements {@code Sink<Double>}, re-abstracts
    221      * {@code accept(double)}, and wires {@code accept(Double)} to bridge to
    222      * {@code accept(double)}.
    223      */
    224     interface OfDouble extends Sink<Double>, DoubleConsumer {
    225         @Override
    226         void accept(double value);
    227 
    228         @Override
    229         default void accept(Double i) {
    230             if (Tripwire.ENABLED)
    231                 Tripwire.trip(getClass(), "{0} calling Sink.OfDouble.accept(Double)");
    232             accept(i.doubleValue());
    233         }
    234     }
    235 
    236     /**
    237      * Abstract {@code Sink} implementation for creating chains of
    238      * sinks.  The {@code begin}, {@code end}, and
    239      * {@code cancellationRequested} methods are wired to chain to the
    240      * downstream {@code Sink}.  This implementation takes a downstream
    241      * {@code Sink} of unknown input shape and produces a {@code Sink<T>}.  The
    242      * implementation of the {@code accept()} method must call the correct
    243      * {@code accept()} method on the downstream {@code Sink}.
    244      */
    245     static abstract class ChainedReference<T, E_OUT> implements Sink<T> {
    246         protected final Sink<? super E_OUT> downstream;
    247 
    248         public ChainedReference(Sink<? super E_OUT> downstream) {
    249             this.downstream = Objects.requireNonNull(downstream);
    250         }
    251 
    252         @Override
    253         public void begin(long size) {
    254             downstream.begin(size);
    255         }
    256 
    257         @Override
    258         public void end() {
    259             downstream.end();
    260         }
    261 
    262         @Override
    263         public boolean cancellationRequested() {
    264             return downstream.cancellationRequested();
    265         }
    266     }
    267 
    268     /**
    269      * Abstract {@code Sink} implementation designed for creating chains of
    270      * sinks.  The {@code begin}, {@code end}, and
    271      * {@code cancellationRequested} methods are wired to chain to the
    272      * downstream {@code Sink}.  This implementation takes a downstream
    273      * {@code Sink} of unknown input shape and produces a {@code Sink.OfInt}.
    274      * The implementation of the {@code accept()} method must call the correct
    275      * {@code accept()} method on the downstream {@code Sink}.
    276      */
    277     static abstract class ChainedInt<E_OUT> implements Sink.OfInt {
    278         protected final Sink<? super E_OUT> downstream;
    279 
    280         public ChainedInt(Sink<? super E_OUT> downstream) {
    281             this.downstream = Objects.requireNonNull(downstream);
    282         }
    283 
    284         @Override
    285         public void begin(long size) {
    286             downstream.begin(size);
    287         }
    288 
    289         @Override
    290         public void end() {
    291             downstream.end();
    292         }
    293 
    294         @Override
    295         public boolean cancellationRequested() {
    296             return downstream.cancellationRequested();
    297         }
    298     }
    299 
    300     /**
    301      * Abstract {@code Sink} implementation designed for creating chains of
    302      * sinks.  The {@code begin}, {@code end}, and
    303      * {@code cancellationRequested} methods are wired to chain to the
    304      * downstream {@code Sink}.  This implementation takes a downstream
    305      * {@code Sink} of unknown input shape and produces a {@code Sink.OfLong}.
    306      * The implementation of the {@code accept()} method must call the correct
    307      * {@code accept()} method on the downstream {@code Sink}.
    308      */
    309     static abstract class ChainedLong<E_OUT> implements Sink.OfLong {
    310         protected final Sink<? super E_OUT> downstream;
    311 
    312         public ChainedLong(Sink<? super E_OUT> downstream) {
    313             this.downstream = Objects.requireNonNull(downstream);
    314         }
    315 
    316         @Override
    317         public void begin(long size) {
    318             downstream.begin(size);
    319         }
    320 
    321         @Override
    322         public void end() {
    323             downstream.end();
    324         }
    325 
    326         @Override
    327         public boolean cancellationRequested() {
    328             return downstream.cancellationRequested();
    329         }
    330     }
    331 
    332     /**
    333      * Abstract {@code Sink} implementation designed for creating chains of
    334      * sinks.  The {@code begin}, {@code end}, and
    335      * {@code cancellationRequested} methods are wired to chain to the
    336      * downstream {@code Sink}.  This implementation takes a downstream
    337      * {@code Sink} of unknown input shape and produces a {@code Sink.OfDouble}.
    338      * The implementation of the {@code accept()} method must call the correct
    339      * {@code accept()} method on the downstream {@code Sink}.
    340      */
    341     static abstract class ChainedDouble<E_OUT> implements Sink.OfDouble {
    342         protected final Sink<? super E_OUT> downstream;
    343 
    344         public ChainedDouble(Sink<? super E_OUT> downstream) {
    345             this.downstream = Objects.requireNonNull(downstream);
    346         }
    347 
    348         @Override
    349         public void begin(long size) {
    350             downstream.begin(size);
    351         }
    352 
    353         @Override
    354         public void end() {
    355             downstream.end();
    356         }
    357 
    358         @Override
    359         public boolean cancellationRequested() {
    360             return downstream.cancellationRequested();
    361         }
    362     }
    363 }
    364