Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
     20 import com.google.common.util.concurrent.Service.State;
     21 
     22 import junit.framework.TestCase;
     23 
     24 import java.util.concurrent.CountDownLatch;
     25 import java.util.concurrent.CyclicBarrier;
     26 import java.util.concurrent.ExecutionException;
     27 import java.util.concurrent.Executors;
     28 import java.util.concurrent.Future;
     29 import java.util.concurrent.ScheduledExecutorService;
     30 import java.util.concurrent.ScheduledFuture;
     31 import java.util.concurrent.ScheduledThreadPoolExecutor;
     32 import java.util.concurrent.TimeUnit;
     33 import java.util.concurrent.atomic.AtomicBoolean;
     34 import java.util.concurrent.atomic.AtomicInteger;
     35 
     36 /**
     37  * Unit test for {@link AbstractScheduledService}.
     38  *
     39  * @author Luke Sandberg
     40  */
     41 
     42 public class AbstractScheduledServiceTest extends TestCase {
     43 
     44   volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
     45   volatile ScheduledFuture<?> future = null;
     46 
     47   volatile boolean atFixedRateCalled = false;
     48   volatile boolean withFixedDelayCalled = false;
     49   volatile boolean scheduleCalled = false;
     50 
     51   final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
     52     @Override
     53     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     54         long delay, TimeUnit unit) {
     55       return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     56     }
     57   };
     58 
     59   public void testServiceStartStop() throws Exception {
     60     NullService service = new NullService();
     61     service.startAsync().awaitRunning();
     62     assertFalse(future.isDone());
     63     service.stopAsync().awaitTerminated();
     64     assertTrue(future.isCancelled());
     65   }
     66 
     67   private class NullService extends AbstractScheduledService {
     68     @Override protected void runOneIteration() throws Exception {}
     69     @Override protected Scheduler scheduler() { return configuration; }
     70     @Override protected ScheduledExecutorService executor() { return executor; }
     71   }
     72 
     73   public void testFailOnExceptionFromRun() throws Exception {
     74     TestService service = new TestService();
     75     service.runException = new Exception();
     76     service.startAsync().awaitRunning();
     77     service.runFirstBarrier.await();
     78     service.runSecondBarrier.await();
     79     try {
     80       future.get();
     81       fail();
     82     } catch (ExecutionException e) {
     83       // An execution exception holds a runtime exception (from throwables.propogate) that holds our
     84       // original exception.
     85       assertEquals(service.runException, e.getCause().getCause());
     86     }
     87     assertEquals(service.state(), Service.State.FAILED);
     88   }
     89 
     90   public void testFailOnExceptionFromStartUp() {
     91     TestService service = new TestService();
     92     service.startUpException = new Exception();
     93     try {
     94       service.startAsync().awaitRunning();
     95       fail();
     96     } catch (IllegalStateException e) {
     97       assertEquals(service.startUpException, e.getCause());
     98     }
     99     assertEquals(0, service.numberOfTimesRunCalled.get());
    100     assertEquals(Service.State.FAILED, service.state());
    101   }
    102 
    103   public void testFailOnExceptionFromShutDown() throws Exception {
    104     TestService service = new TestService();
    105     service.shutDownException = new Exception();
    106     service.startAsync().awaitRunning();
    107     service.runFirstBarrier.await();
    108     service.stopAsync();
    109     service.runSecondBarrier.await();
    110     try {
    111       service.awaitTerminated();
    112       fail();
    113     } catch (IllegalStateException e) {
    114       assertEquals(service.shutDownException, e.getCause());
    115     }
    116     assertEquals(Service.State.FAILED, service.state());
    117   }
    118 
    119   public void testRunOneIterationCalledMultipleTimes() throws Exception {
    120     TestService service = new TestService();
    121     service.startAsync().awaitRunning();
    122     for (int i = 1; i < 10; i++) {
    123       service.runFirstBarrier.await();
    124       assertEquals(i, service.numberOfTimesRunCalled.get());
    125       service.runSecondBarrier.await();
    126     }
    127     service.runFirstBarrier.await();
    128     service.stopAsync();
    129     service.runSecondBarrier.await();
    130     service.stopAsync().awaitTerminated();
    131   }
    132 
    133   public void testExecutorOnlyCalledOnce() throws Exception {
    134     TestService service = new TestService();
    135     service.startAsync().awaitRunning();
    136     // It should be called once during startup.
    137     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    138     for (int i = 1; i < 10; i++) {
    139       service.runFirstBarrier.await();
    140       assertEquals(i, service.numberOfTimesRunCalled.get());
    141       service.runSecondBarrier.await();
    142     }
    143     service.runFirstBarrier.await();
    144     service.stopAsync();
    145     service.runSecondBarrier.await();
    146     service.stopAsync().awaitTerminated();
    147     // Only called once overall.
    148     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    149   }
    150 
    151   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
    152     final CountDownLatch terminationLatch = new CountDownLatch(1);
    153     AbstractScheduledService service = new AbstractScheduledService() {
    154       volatile ScheduledExecutorService executorService;
    155       @Override protected void runOneIteration() throws Exception {}
    156 
    157       @Override protected ScheduledExecutorService executor() {
    158         if (executorService == null) {
    159           executorService = super.executor();
    160           // Add a listener that will be executed after the listener that shuts down the executor.
    161           addListener(new Listener() {
    162             @Override public void terminated(State from) {
    163               terminationLatch.countDown();
    164             }
    165             }, MoreExecutors.sameThreadExecutor());
    166         }
    167         return executorService;
    168       }
    169 
    170       @Override protected Scheduler scheduler() {
    171         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
    172       }};
    173 
    174       service.startAsync();
    175       assertFalse(service.executor().isShutdown());
    176       service.awaitRunning();
    177       service.stopAsync();
    178       terminationLatch.await();
    179       assertTrue(service.executor().isShutdown());
    180       assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
    181   }
    182 
    183   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
    184     final CountDownLatch failureLatch = new CountDownLatch(1);
    185     AbstractScheduledService service = new AbstractScheduledService() {
    186       volatile ScheduledExecutorService executorService;
    187       @Override protected void runOneIteration() throws Exception {}
    188 
    189       @Override protected void startUp() throws Exception {
    190         throw new Exception("Failed");
    191       }
    192 
    193       @Override protected ScheduledExecutorService executor() {
    194         if (executorService == null) {
    195           executorService = super.executor();
    196           // Add a listener that will be executed after the listener that shuts down the executor.
    197           addListener(new Listener() {
    198             @Override public void failed(State from, Throwable failure) {
    199               failureLatch.countDown();
    200             }
    201             }, MoreExecutors.sameThreadExecutor());
    202         }
    203         return executorService;
    204       }
    205 
    206       @Override protected Scheduler scheduler() {
    207         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
    208       }};
    209 
    210       try {
    211         service.startAsync().awaitRunning();
    212         fail("Expected service to fail during startup");
    213       } catch (IllegalStateException expected) {}
    214       failureLatch.await();
    215       assertTrue(service.executor().isShutdown());
    216       assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS));
    217   }
    218 
    219   public void testSchedulerOnlyCalledOnce() throws Exception {
    220     TestService service = new TestService();
    221     service.startAsync().awaitRunning();
    222     // It should be called once during startup.
    223     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    224     for (int i = 1; i < 10; i++) {
    225       service.runFirstBarrier.await();
    226       assertEquals(i, service.numberOfTimesRunCalled.get());
    227       service.runSecondBarrier.await();
    228     }
    229     service.runFirstBarrier.await();
    230     service.stopAsync();
    231     service.runSecondBarrier.await();
    232     service.awaitTerminated();
    233     // Only called once overall.
    234     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    235   }
    236 
    237   private class TestService extends AbstractScheduledService {
    238     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
    239     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
    240 
    241     volatile boolean startUpCalled = false;
    242     volatile boolean shutDownCalled = false;
    243     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
    244     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
    245     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
    246     volatile Exception runException = null;
    247     volatile Exception startUpException = null;
    248     volatile Exception shutDownException = null;
    249 
    250     @Override
    251     protected void runOneIteration() throws Exception {
    252       assertTrue(startUpCalled);
    253       assertFalse(shutDownCalled);
    254       numberOfTimesRunCalled.incrementAndGet();
    255       assertEquals(State.RUNNING, state());
    256       runFirstBarrier.await();
    257       runSecondBarrier.await();
    258       if (runException != null) {
    259         throw runException;
    260       }
    261     }
    262 
    263     @Override
    264     protected void startUp() throws Exception {
    265       assertFalse(startUpCalled);
    266       assertFalse(shutDownCalled);
    267       startUpCalled = true;
    268       assertEquals(State.STARTING, state());
    269       if (startUpException != null) {
    270         throw startUpException;
    271       }
    272     }
    273 
    274     @Override
    275     protected void shutDown() throws Exception {
    276       assertTrue(startUpCalled);
    277       assertFalse(shutDownCalled);
    278       shutDownCalled = true;
    279       if (shutDownException != null) {
    280         throw shutDownException;
    281       }
    282     }
    283 
    284     @Override
    285     protected ScheduledExecutorService executor() {
    286       numberOfTimesExecutorCalled.incrementAndGet();
    287       return executor;
    288     }
    289 
    290     @Override
    291     protected Scheduler scheduler() {
    292       numberOfTimesSchedulerCalled.incrementAndGet();
    293       return configuration;
    294     }
    295   }
    296 
    297   public static class SchedulerTest extends TestCase {
    298     // These constants are arbitrary and just used to make sure that the correct method is called
    299     // with the correct parameters.
    300     private static final int initialDelay = 10;
    301     private static final int delay = 20;
    302     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
    303 
    304     // Unique runnable object used for comparison.
    305     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
    306     boolean called = false;
    307 
    308     private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
    309         long delay, TimeUnit unit) {
    310       assertFalse(called);  // only called once.
    311       called = true;
    312       assertEquals(SchedulerTest.initialDelay, initialDelay);
    313       assertEquals(SchedulerTest.delay, delay);
    314       assertEquals(SchedulerTest.unit, unit);
    315       assertEquals(testRunnable, command);
    316     }
    317 
    318     public void testFixedRateSchedule() {
    319       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
    320       schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
    321         @Override
    322         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
    323             long period, TimeUnit unit) {
    324           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    325           return null;
    326         }
    327       }, testRunnable);
    328       assertTrue(called);
    329     }
    330 
    331     public void testFixedDelaySchedule() {
    332       Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
    333       schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
    334         @Override
    335         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
    336             long delay, TimeUnit unit) {
    337           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    338           return null;
    339         }
    340       }, testRunnable);
    341       assertTrue(called);
    342     }
    343 
    344     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
    345       public AtomicInteger scheduleCounter = new AtomicInteger(0);
    346       @Override
    347       protected Schedule getNextSchedule() throws Exception {
    348         scheduleCounter.incrementAndGet();
    349         return new Schedule(0, TimeUnit.SECONDS);
    350       }
    351     }
    352 
    353     public void testCustomSchedule_startStop() throws Exception {
    354       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    355       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    356       final AtomicBoolean shouldWait = new AtomicBoolean(true);
    357       Runnable task = new Runnable() {
    358         @Override public void run() {
    359           try {
    360             if (shouldWait.get()) {
    361               firstBarrier.await();
    362               secondBarrier.await();
    363             }
    364           } catch (Exception e) {
    365             throw new RuntimeException(e);
    366           }
    367         }
    368       };
    369       TestCustomScheduler scheduler = new TestCustomScheduler();
    370       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
    371       firstBarrier.await();
    372       assertEquals(1, scheduler.scheduleCounter.get());
    373       secondBarrier.await();
    374       firstBarrier.await();
    375       assertEquals(2, scheduler.scheduleCounter.get());
    376       shouldWait.set(false);
    377       secondBarrier.await();
    378       future.cancel(false);
    379     }
    380 
    381     public void testCustomSchedulerServiceStop() throws Exception {
    382       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
    383       service.startAsync().awaitRunning();
    384       service.firstBarrier.await();
    385       assertEquals(1, service.numIterations.get());
    386       service.stopAsync();
    387       service.secondBarrier.await();
    388       service.awaitTerminated();
    389       // Sleep for a while just to ensure that our task wasn't called again.
    390       Thread.sleep(unit.toMillis(3 * delay));
    391       assertEquals(1, service.numIterations.get());
    392     }
    393 
    394     public void testBig() throws Exception {
    395       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    396         @Override protected Scheduler scheduler() {
    397           return new AbstractScheduledService.CustomScheduler() {
    398             @Override
    399             protected Schedule getNextSchedule() throws Exception {
    400               // Explicitly yield to increase the probability of a pathological scheduling.
    401               Thread.yield();
    402               return new Schedule(0, TimeUnit.SECONDS);
    403             }
    404           };
    405         }
    406       };
    407       service.useBarriers = false;
    408       service.startAsync().awaitRunning();
    409       Thread.sleep(50);
    410       service.useBarriers = true;
    411       service.firstBarrier.await();
    412       int numIterations = service.numIterations.get();
    413       service.stopAsync();
    414       service.secondBarrier.await();
    415       service.awaitTerminated();
    416       assertEquals(numIterations, service.numIterations.get());
    417     }
    418 
    419     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
    420       final AtomicInteger numIterations = new AtomicInteger(0);
    421       volatile boolean useBarriers = true;
    422       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    423       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    424 
    425       @Override protected void runOneIteration() throws Exception {
    426         numIterations.incrementAndGet();
    427         if (useBarriers) {
    428           firstBarrier.await();
    429           secondBarrier.await();
    430         }
    431       }
    432 
    433       @Override protected ScheduledExecutorService executor() {
    434         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    435         return Executors.newScheduledThreadPool(10);
    436       }
    437 
    438       @Override protected void startUp() throws Exception {}
    439 
    440       @Override protected void shutDown() throws Exception {}
    441 
    442       @Override protected Scheduler scheduler() {
    443         return new CustomScheduler() {
    444           @Override
    445           protected Schedule getNextSchedule() throws Exception {
    446             return new Schedule(delay, unit);
    447           }};
    448       }
    449     }
    450 
    451     public void testCustomSchedulerFailure() throws Exception {
    452       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
    453       service.startAsync().awaitRunning();
    454       for (int i = 1; i < 4; i++) {
    455         service.firstBarrier.await();
    456         assertEquals(i, service.numIterations.get());
    457         service.secondBarrier.await();
    458       }
    459       Thread.sleep(1000);
    460       try {
    461         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
    462         fail();
    463       } catch (IllegalStateException e) {
    464         assertEquals(State.FAILED, service.state());
    465       }
    466     }
    467 
    468     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
    469       final AtomicInteger numIterations = new AtomicInteger(0);
    470       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    471       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    472 
    473       @Override protected void runOneIteration() throws Exception {
    474         numIterations.incrementAndGet();
    475         firstBarrier.await();
    476         secondBarrier.await();
    477       }
    478 
    479       @Override protected ScheduledExecutorService executor() {
    480         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    481         return Executors.newScheduledThreadPool(10);
    482       }
    483 
    484       @Override protected Scheduler scheduler() {
    485         return new CustomScheduler() {
    486           @Override
    487           protected Schedule getNextSchedule() throws Exception {
    488             if (numIterations.get() > 2) {
    489               throw new IllegalStateException("Failed");
    490             }
    491             return new Schedule(delay, unit);
    492           }};
    493       }
    494     }
    495   }
    496 }
    497