Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2008 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.collect.ImmutableList;
     20 import com.google.common.collect.Lists;
     21 import com.google.common.collect.Queues;
     22 
     23 import junit.framework.TestCase;
     24 
     25 import java.util.List;
     26 import java.util.Queue;
     27 import java.util.concurrent.CyclicBarrier;
     28 import java.util.concurrent.Executor;
     29 import java.util.concurrent.ExecutorService;
     30 import java.util.concurrent.Executors;
     31 import java.util.concurrent.RejectedExecutionException;
     32 import java.util.concurrent.TimeUnit;
     33 import java.util.concurrent.atomic.AtomicBoolean;
     34 import java.util.concurrent.atomic.AtomicInteger;
     35 
     36 /**
     37  * Tests {@link SerializingExecutor}.
     38  *
     39  * @author JJ Furman
     40  */
     41 public class SerializingExecutorTest extends TestCase {
     42   private static class FakeExecutor implements Executor {
     43     Queue<Runnable> tasks = Queues.newArrayDeque();
     44     @Override public void execute(Runnable command) {
     45       tasks.add(command);
     46     }
     47 
     48     boolean hasNext() {
     49       return !tasks.isEmpty();
     50     }
     51 
     52     void runNext() {
     53       assertTrue("expected at least one task to run", hasNext());
     54       tasks.remove().run();
     55     }
     56 
     57   }
     58   private FakeExecutor fakePool;
     59   private SerializingExecutor e;
     60 
     61   @Override
     62   public void setUp() {
     63     fakePool = new FakeExecutor();
     64     e = new SerializingExecutor(fakePool);
     65   }
     66 
     67   public void testSerializingNullExecutor_fails() {
     68     try {
     69       new SerializingExecutor(null);
     70       fail("Should have failed with NullPointerException.");
     71     } catch (NullPointerException expected) {
     72     }
     73   }
     74 
     75   public void testBasics() {
     76     final AtomicInteger totalCalls = new AtomicInteger();
     77     Runnable intCounter = new Runnable() {
     78       @Override
     79       public void run() {
     80         totalCalls.incrementAndGet();
     81       }
     82     };
     83 
     84     assertFalse(fakePool.hasNext());
     85     e.execute(intCounter);
     86     assertTrue(fakePool.hasNext());
     87     e.execute(intCounter);
     88     assertEquals(0, totalCalls.get());
     89     fakePool.runNext(); // run just 1 sub task...
     90     assertEquals(2, totalCalls.get());
     91     assertFalse(fakePool.hasNext());
     92 
     93     // Check that execute can be safely repeated
     94     e.execute(intCounter);
     95     e.execute(intCounter);
     96     e.execute(intCounter);
     97     assertEquals(2, totalCalls.get());
     98     fakePool.runNext();
     99     assertEquals(5, totalCalls.get());
    100     assertFalse(fakePool.hasNext());
    101   }
    102 
    103   public void testOrdering() {
    104     final List<Integer> callOrder = Lists.newArrayList();
    105 
    106     class FakeOp implements Runnable {
    107       final int op;
    108 
    109       FakeOp(int op) {
    110         this.op = op;
    111       }
    112 
    113       @Override
    114       public void run() {
    115         callOrder.add(op);
    116       }
    117     }
    118 
    119     e.execute(new FakeOp(0));
    120     e.execute(new FakeOp(1));
    121     e.execute(new FakeOp(2));
    122     fakePool.runNext();
    123 
    124     assertEquals(ImmutableList.of(0, 1, 2), callOrder);
    125   }
    126 
    127   public void testExceptions() {
    128 
    129     final AtomicInteger numCalls = new AtomicInteger();
    130 
    131     Runnable runMe = new Runnable() {
    132       @Override
    133       public void run() {
    134         numCalls.incrementAndGet();
    135         throw new RuntimeException("FAKE EXCEPTION!");
    136       }
    137     };
    138 
    139     e.execute(runMe);
    140     e.execute(runMe);
    141     fakePool.runNext();
    142 
    143     assertEquals(2, numCalls.get());
    144   }
    145 
    146   public void testDelegateRejection() {
    147     final AtomicInteger numCalls = new AtomicInteger();
    148     final AtomicBoolean reject = new AtomicBoolean(true);
    149     final SerializingExecutor executor = new SerializingExecutor(
    150         new Executor() {
    151           @Override public void execute(Runnable r) {
    152             if (reject.get()) {
    153               throw new RejectedExecutionException();
    154             }
    155             r.run();
    156           }
    157         });
    158     Runnable task = new Runnable() {
    159       @Override
    160       public void run() {
    161         numCalls.incrementAndGet();
    162       }
    163     };
    164     try {
    165       executor.execute(task);
    166       fail();
    167     } catch (RejectedExecutionException expected) {}
    168     assertEquals(0, numCalls.get());
    169     reject.set(false);
    170     executor.execute(task);
    171     assertEquals(2, numCalls.get());
    172   }
    173 
    174   public void testTaskThrowsError() throws Exception {
    175     class MyError extends Error {}
    176     final CyclicBarrier barrier = new CyclicBarrier(2);
    177     // we need to make sure the error gets thrown on a different thread.
    178     ExecutorService service = Executors.newSingleThreadExecutor();
    179     try {
    180       final SerializingExecutor executor = new SerializingExecutor(service);
    181       Runnable errorTask = new Runnable() {
    182         @Override
    183         public void run() {
    184           throw new MyError();
    185         }
    186       };
    187       Runnable barrierTask = new Runnable() {
    188         @Override
    189         public void run() {
    190           try {
    191             barrier.await();
    192           } catch (Exception e) {
    193             throw new RuntimeException(e);
    194           }
    195         }
    196       };
    197       executor.execute(errorTask);
    198       service.execute(barrierTask);  // submit directly to the service
    199       // the barrier task runs after the error task so we know that the error has been observed by
    200       // SerializingExecutor by the time the barrier is satified
    201       barrier.await(10, TimeUnit.SECONDS);
    202       executor.execute(barrierTask);
    203       // timeout means the second task wasn't even tried
    204       barrier.await(10, TimeUnit.SECONDS);
    205     } finally {
    206       service.shutdown();
    207     }
    208   }
    209 }
    210