Home | History | Annotate | Download | only in stream
      1 /*
      2  * Copyright (c) 2012, 2015, Oracle and/or its affiliates. All rights reserved.
      3  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
      4  *
      5  * This code is free software; you can redistribute it and/or modify it
      6  * under the terms of the GNU General Public License version 2 only, as
      7  * published by the Free Software Foundation.
      8  *
      9  * This code is distributed in the hope that it will be useful, but WITHOUT
     10  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
     11  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License
     12  * version 2 for more details (a copy is included in the LICENSE file that
     13  * accompanied this code).
     14  *
     15  * You should have received a copy of the GNU General Public License version
     16  * 2 along with this work; if not, write to the Free Software Foundation,
     17  * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
     18  *
     19  * Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
     20  * or visit www.oracle.com if you need additional information or have any
     21  * questions.
     22  */
     23 package java.util.stream;
     24 
     25 import org.testng.Assert;
     26 import org.testng.annotations.Test;
     27 
     28 import java.util.ArrayList;
     29 import java.util.EnumSet;
     30 import java.util.List;
     31 import java.util.function.Supplier;
     32 
     33 import static java.util.stream.LambdaTestHelpers.countTo;
     34 
     35 @Test
     36 public class FlagOpTest extends OpTestCase {
     37 
     38     @Test(dataProvider = "StreamTestData<Integer>", dataProviderClass = StreamTestDataProvider.class)
     39     public void testFlagsPassThrough(String name, TestData<Integer, Stream<Integer>> data) {
     40 
     41         @SuppressWarnings({"unchecked", "rawtypes"})
     42         TestFlagPassThroughOp<Integer>[] ops = new TestFlagPassThroughOp[3];
     43         ops[0] = new TestFlagPassThroughOp<>();
     44         ops[1] = new TestFlagPassThroughOp<>();
     45         ops[2] = new TestFlagPassThroughOp<>();
     46 
     47         ops[0].set(null, ops[1]);
     48         ops[1].set(ops[0], ops[2]);
     49         ops[2].set(ops[1], null);
     50 
     51         withData(data).ops(ops).exercise();
     52     }
     53 
     54     static class TestFlagPassThroughOp<T> extends FlagDeclaringOp<T> {
     55         TestFlagPassThroughOp<T> upstream;
     56         TestFlagPassThroughOp<T> downstream;
     57 
     58         TestFlagPassThroughOp() {
     59             super(0);
     60         }
     61 
     62         void set(TestFlagPassThroughOp<T> upstream, TestFlagPassThroughOp<T> downstream)  {
     63             this.upstream = upstream;
     64             this.downstream = downstream;
     65         }
     66 
     67         int wrapFlags;
     68 
     69         @Override
     70         @SuppressWarnings({"unchecked", "rawtypes"})
     71         public Sink<T> opWrapSink(int flags, boolean parallel, Sink sink) {
     72             this.wrapFlags = flags;
     73 
     74             if (downstream != null) {
     75                 assertTrue(flags == downstream.wrapFlags);
     76             }
     77 
     78             return sink;
     79         }
     80     }
     81 
     82     public void testFlagsClearAllSet() {
     83         int clearAllFlags = 0;
     84         for (StreamOpFlag f : EnumSet.allOf(StreamOpFlag.class)) {
     85             if (f.isStreamFlag()) {
     86                 clearAllFlags |= f.clear();
     87             }
     88         }
     89 
     90         EnumSet<StreamOpFlag> known = EnumSet.noneOf(StreamOpFlag.class);
     91         EnumSet<StreamOpFlag> notKnown = StreamOpFlagTestHelper.allStreamFlags();
     92 
     93         List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
     94         ops.add(new FlagDeclaringOp<>(clearAllFlags));
     95         for (StreamOpFlag f : StreamOpFlagTestHelper.allStreamFlags()) {
     96             if (f.canSet(StreamOpFlag.Type.OP)) {
     97                 ops.add(new TestFlagExpectedOp<>(f.set(),
     98                                              known.clone(),
     99                                              EnumSet.noneOf(StreamOpFlag.class),
    100                                              notKnown.clone()));
    101                 known.add(f);
    102                 notKnown.remove(f);
    103             }
    104         }
    105         ops.add(new TestFlagExpectedOp<>(0,
    106                                          known.clone(),
    107                                          EnumSet.noneOf(StreamOpFlag.class),
    108                                          notKnown.clone()));
    109 
    110         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
    111         @SuppressWarnings("rawtypes")
    112         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
    113 
    114         withData(data).ops(opsArray).
    115                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
    116                 exercise();
    117     }
    118 
    119     public void testFlagsSetAllClear() {
    120         EnumSet<StreamOpFlag> known = StreamOpFlagTestHelper.allStreamFlags();
    121         int setAllFlags = 0;
    122         for (StreamOpFlag f : EnumSet.allOf(StreamOpFlag.class)) {
    123             if (f.isStreamFlag()) {
    124                 if (f.canSet(StreamOpFlag.Type.OP)) {
    125                     setAllFlags |= f.set();
    126                 } else {
    127                     known.remove(f);
    128                 }
    129             }
    130         }
    131 
    132         EnumSet<StreamOpFlag> notKnown = EnumSet.noneOf(StreamOpFlag.class);
    133 
    134         List<FlagDeclaringOp<Integer>> ops = new ArrayList<>();
    135         ops.add(new FlagDeclaringOp<>(setAllFlags));
    136         for (StreamOpFlag f : StreamOpFlagTestHelper.allStreamFlags()) {
    137             ops.add(new TestFlagExpectedOp<>(f.clear(),
    138                                              known.clone(),
    139                                              EnumSet.noneOf(StreamOpFlag.class),
    140                                              notKnown.clone()));
    141             known.remove(f);
    142             notKnown.add(f);
    143         }
    144         ops.add(new TestFlagExpectedOp<>(0,
    145                                          known.clone(),
    146                                          EnumSet.noneOf(StreamOpFlag.class),
    147                                          notKnown.clone()));
    148 
    149         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
    150         @SuppressWarnings("rawtypes")
    151         FlagDeclaringOp[] opsArray = ops.toArray(new FlagDeclaringOp[ops.size()]);
    152 
    153 
    154         withData(data).ops(opsArray).
    155                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
    156                 exercise();
    157     }
    158 
    159     public void testFlagsParallelCollect() {
    160         testFlagsSetSequence(CollectorOps::collector);
    161     }
    162 
    163     private void testFlagsSetSequence(Supplier<StatefulTestOp<Integer>> cf) {
    164         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
    165         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
    166 
    167         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
    168         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
    169             ops.add(cf.get());
    170             ops.add(new TestFlagExpectedOp<>(f.set(),
    171                                              known.clone(),
    172                                              preserve.clone(),
    173                                              EnumSet.noneOf(StreamOpFlag.class)));
    174             known.add(f);
    175             preserve.remove(f);
    176         }
    177         ops.add(cf.get());
    178         ops.add(new TestFlagExpectedOp<>(0,
    179                                          known.clone(),
    180                                          preserve.clone(),
    181                                          EnumSet.noneOf(StreamOpFlag.class)));
    182 
    183         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
    184         @SuppressWarnings("rawtypes")
    185         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
    186 
    187         withData(data).ops(opsArray).
    188                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
    189                 exercise();
    190     }
    191 
    192 
    193     public void testFlagsClearParallelCollect() {
    194         testFlagsClearSequence(CollectorOps::collector);
    195     }
    196 
    197     protected void testFlagsClearSequence(Supplier<StatefulTestOp<Integer>> cf) {
    198         EnumSet<StreamOpFlag> known = EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.SIZED);
    199         EnumSet<StreamOpFlag> preserve = EnumSet.of(StreamOpFlag.DISTINCT, StreamOpFlag.SORTED);
    200         EnumSet<StreamOpFlag> notKnown = EnumSet.noneOf(StreamOpFlag.class);
    201 
    202         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
    203         for (StreamOpFlag f : EnumSet.of(StreamOpFlag.ORDERED, StreamOpFlag.DISTINCT, StreamOpFlag.SORTED)) {
    204             ops.add(cf.get());
    205             ops.add(new TestFlagExpectedOp<>(f.clear(),
    206                                              known.clone(),
    207                                              preserve.clone(),
    208                                              notKnown.clone()));
    209             known.remove(f);
    210             preserve.remove(f);
    211             notKnown.add(f);
    212         }
    213         ops.add(cf.get());
    214         ops.add(new TestFlagExpectedOp<>(0,
    215                                          known.clone(),
    216                                          preserve.clone(),
    217                                          notKnown.clone()));
    218 
    219         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
    220         @SuppressWarnings("rawtypes")
    221         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
    222 
    223         withData(data).ops(opsArray).
    224                 without(StreamTestScenario.CLEAR_SIZED_SCENARIOS).
    225                 exercise();
    226     }
    227 
    228     public void testFlagsSizedOrderedParallelCollect() {
    229         EnumSet<StreamOpFlag> parKnown = EnumSet.of(StreamOpFlag.SIZED);
    230         EnumSet<StreamOpFlag> serKnown = parKnown.clone();
    231 
    232         List<IntermediateTestOp<Integer, Integer>> ops = new ArrayList<>();
    233         for (StreamOpFlag f : parKnown) {
    234             ops.add(CollectorOps.collector());
    235             ops.add(new ParSerTestFlagExpectedOp<>(f.clear(),
    236                                              parKnown,
    237                                              serKnown));
    238             serKnown.remove(f);
    239         }
    240         ops.add(CollectorOps.collector());
    241         ops.add(new ParSerTestFlagExpectedOp<>(0,
    242                                          parKnown,
    243                                          EnumSet.noneOf(StreamOpFlag.class)));
    244 
    245         TestData<Integer, Stream<Integer>> data = TestData.Factory.ofArray("Array", countTo(10).toArray(new Integer[0]));
    246         @SuppressWarnings("rawtypes")
    247         IntermediateTestOp[] opsArray = ops.toArray(new IntermediateTestOp[ops.size()]);
    248 
    249         withData(data).ops(opsArray).exercise();
    250     }
    251 
    252     static class ParSerTestFlagExpectedOp<T> extends FlagDeclaringOp<T> {
    253         final EnumSet<StreamOpFlag> parKnown;
    254         final EnumSet<StreamOpFlag> serKnown;
    255 
    256         ParSerTestFlagExpectedOp(int flags, EnumSet<StreamOpFlag> known, EnumSet<StreamOpFlag> serKnown) {
    257             super(flags);
    258             this.parKnown = known;
    259             this.serKnown = serKnown;
    260         }
    261 
    262         @Override
    263         @SuppressWarnings({"unchecked", "rawtypes"})
    264         public Sink<T> opWrapSink(int flags, boolean parallel, Sink upstream) {
    265             assertFlags(flags, parallel);
    266             return upstream;
    267         }
    268 
    269         protected void assertFlags(int flags, boolean parallel) {
    270             if (parallel) {
    271                 for (StreamOpFlag f : parKnown) {
    272                     Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
    273                 }
    274 
    275             } else {
    276                 for (StreamOpFlag f : serKnown) {
    277                     Assert.assertTrue(f.isKnown(flags), String.format("Flag %s is not known, but should be known.", f.toString()));
    278                 }
    279 
    280             }
    281         }
    282     }
    283 }
    284