Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
     20 import com.google.common.util.concurrent.Service.State;
     21 
     22 import junit.framework.TestCase;
     23 
     24 import java.util.concurrent.CyclicBarrier;
     25 import java.util.concurrent.ExecutionException;
     26 import java.util.concurrent.Executors;
     27 import java.util.concurrent.Future;
     28 import java.util.concurrent.ScheduledExecutorService;
     29 import java.util.concurrent.ScheduledFuture;
     30 import java.util.concurrent.ScheduledThreadPoolExecutor;
     31 
     32 import java.util.concurrent.TimeUnit;
     33 import java.util.concurrent.atomic.AtomicBoolean;
     34 import java.util.concurrent.atomic.AtomicInteger;
     35 
     36 /**
     37  * Unit test for {@link AbstractScheduledService}.
     38  *
     39  * @author Luke Sandberg
     40  */
     41 
     42 public class AbstractScheduledServiceTest extends TestCase {
     43 
     44   volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
     45   volatile ScheduledFuture<?> future = null;
     46 
     47   volatile boolean atFixedRateCalled = false;
     48   volatile boolean withFixedDelayCalled = false;
     49   volatile boolean scheduleCalled = false;
     50 
     51   final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
     52     @Override
     53     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     54         long delay, TimeUnit unit) {
     55       return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     56     }
     57   };
     58 
     59   public void testServiceStartStop() throws Exception {
     60     NullService service = new NullService();
     61     service.startAndWait();
     62     assertFalse(future.isDone());
     63     service.stopAndWait();
     64     assertTrue(future.isCancelled());
     65   }
     66 
     67   private class NullService extends AbstractScheduledService {
     68     @Override protected void runOneIteration() throws Exception { }
     69     @Override protected void startUp() throws Exception { }
     70     @Override protected void shutDown() throws Exception { }
     71     @Override protected Scheduler scheduler() { return configuration; }
     72     @Override protected ScheduledExecutorService executor() { return executor; }
     73   }
     74 
     75   public void testFailOnExceptionFromRun() throws Exception {
     76     TestService service = new TestService();
     77     service.runException = new Exception();
     78     service.startAndWait();
     79     service.runFirstBarrier.await();
     80     service.runSecondBarrier.await();
     81     try {
     82       future.get();
     83       fail();
     84     } catch (ExecutionException e) {
     85       // An execution exception holds a runtime exception (from throwables.propogate) that holds our
     86       // original exception.
     87       assertEquals(service.runException, e.getCause().getCause());
     88     }
     89     assertEquals(service.state(), Service.State.FAILED);
     90   }
     91 
     92   public void testFailOnExceptionFromStartUp() {
     93     TestService service = new TestService();
     94     service.startUpException = new Exception();
     95     try {
     96       service.startAndWait();
     97       fail();
     98     } catch (UncheckedExecutionException e) {
     99       assertEquals(service.startUpException, e.getCause());
    100     }
    101     assertEquals(0, service.numberOfTimesRunCalled.get());
    102     assertEquals(Service.State.FAILED, service.state());
    103   }
    104 
    105   public void testFailOnExceptionFromShutDown() throws Exception {
    106     TestService service = new TestService();
    107     service.shutDownException = new Exception();
    108     service.startAndWait();
    109     service.runFirstBarrier.await();
    110     ListenableFuture<Service.State> stopHandle = service.stop();
    111     service.runSecondBarrier.await();
    112     try {
    113       stopHandle.get();
    114       fail();
    115     } catch (ExecutionException e) {
    116       assertEquals(service.shutDownException, e.getCause());
    117     }
    118     assertEquals(Service.State.FAILED, service.state());
    119   }
    120 
    121   public void testRunOneIterationCalledMultipleTimes() throws Exception {
    122     TestService service = new TestService();
    123     service.startAndWait();
    124     for (int i = 1; i < 10; i++) {
    125       service.runFirstBarrier.await();
    126       assertEquals(i, service.numberOfTimesRunCalled.get());
    127       service.runSecondBarrier.await();
    128     }
    129     service.runFirstBarrier.await();
    130     service.stop();
    131     service.runSecondBarrier.await();
    132     service.stopAndWait();
    133   }
    134 
    135   public void testExecutorOnlyCalledOnce() throws Exception {
    136     TestService service = new TestService();
    137     service.startAndWait();
    138     // It should be called once during startup.
    139     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    140     for (int i = 1; i < 10; i++) {
    141       service.runFirstBarrier.await();
    142       assertEquals(i, service.numberOfTimesRunCalled.get());
    143       service.runSecondBarrier.await();
    144     }
    145     service.runFirstBarrier.await();
    146     service.stop();
    147     service.runSecondBarrier.await();
    148     service.stopAndWait();
    149     // Only called once overall.
    150     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    151   }
    152 
    153   public void testSchedulerOnlyCalledOnce() throws Exception {
    154     TestService service = new TestService();
    155     service.startAndWait();
    156     // It should be called once during startup.
    157     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    158     for (int i = 1; i < 10; i++) {
    159       service.runFirstBarrier.await();
    160       assertEquals(i, service.numberOfTimesRunCalled.get());
    161       service.runSecondBarrier.await();
    162     }
    163     service.runFirstBarrier.await();
    164     service.stop();
    165     service.runSecondBarrier.await();
    166     service.stopAndWait();
    167     // Only called once overall.
    168     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    169   }
    170 
    171   private class TestService extends AbstractScheduledService {
    172     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
    173     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
    174 
    175     volatile boolean startUpCalled = false;
    176     volatile boolean shutDownCalled = false;
    177     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
    178     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
    179     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
    180     volatile Exception runException = null;
    181     volatile Exception startUpException = null;
    182     volatile Exception shutDownException = null;
    183 
    184     @Override
    185     protected void runOneIteration() throws Exception {
    186       assertTrue(startUpCalled);
    187       assertFalse(shutDownCalled);
    188       numberOfTimesRunCalled.incrementAndGet();
    189       assertEquals(State.RUNNING, state());
    190       runFirstBarrier.await();
    191       runSecondBarrier.await();
    192       if (runException != null) {
    193         throw runException;
    194       }
    195     }
    196 
    197     @Override
    198     protected void startUp() throws Exception {
    199       assertFalse(startUpCalled);
    200       assertFalse(shutDownCalled);
    201       startUpCalled = true;
    202       assertEquals(State.STARTING, state());
    203       if (startUpException != null) {
    204         throw startUpException;
    205       }
    206     }
    207 
    208     @Override
    209     protected void shutDown() throws Exception {
    210       assertTrue(startUpCalled);
    211       assertFalse(shutDownCalled);
    212       shutDownCalled = true;
    213       if (shutDownException != null) {
    214         throw shutDownException;
    215       }
    216     }
    217 
    218     @Override
    219     protected ScheduledExecutorService executor() {
    220       numberOfTimesExecutorCalled.incrementAndGet();
    221       return executor;
    222     }
    223 
    224     @Override
    225     protected Scheduler scheduler() {
    226       numberOfTimesSchedulerCalled.incrementAndGet();
    227       return configuration;
    228     }
    229   }
    230 
    231   public static class SchedulerTest extends TestCase {
    232     // These constants are arbitrary and just used to make sure that the correct method is called
    233     // with the correct parameters.
    234     private static final int initialDelay = 10;
    235     private static final int delay = 20;
    236     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
    237 
    238     // Unique runnable object used for comparison.
    239     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
    240     boolean called = false;
    241 
    242     private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
    243         long delay, TimeUnit unit) {
    244       assertFalse(called);  // only called once.
    245       called = true;
    246       assertEquals(SchedulerTest.initialDelay, initialDelay);
    247       assertEquals(SchedulerTest.delay, delay);
    248       assertEquals(SchedulerTest.unit, unit);
    249       assertEquals(testRunnable, command);
    250     }
    251 
    252     public void testFixedRateSchedule() {
    253       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
    254       schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
    255         @Override
    256         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
    257             long period, TimeUnit unit) {
    258           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    259           return null;
    260         }
    261       }, testRunnable);
    262       assertTrue(called);
    263     }
    264 
    265     public void testFixedDelaySchedule() {
    266       Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
    267       schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
    268         @Override
    269         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
    270             long delay, TimeUnit unit) {
    271           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    272           return null;
    273         }
    274       }, testRunnable);
    275       assertTrue(called);
    276     }
    277 
    278     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
    279       public AtomicInteger scheduleCounter = new AtomicInteger(0);
    280       @Override
    281       protected Schedule getNextSchedule() throws Exception {
    282         scheduleCounter.incrementAndGet();
    283         return new Schedule(0, TimeUnit.SECONDS);
    284       }
    285     }
    286 
    287     public void testCustomSchedule_startStop() throws Exception {
    288       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    289       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    290       final AtomicBoolean shouldWait = new AtomicBoolean(true);
    291       Runnable task = new Runnable() {
    292         @Override public void run() {
    293           try {
    294             if (shouldWait.get()) {
    295               firstBarrier.await();
    296               secondBarrier.await();
    297             }
    298           } catch (Exception e) {
    299             throw new RuntimeException(e);
    300           }
    301         }
    302       };
    303       TestCustomScheduler scheduler = new TestCustomScheduler();
    304       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
    305       firstBarrier.await();
    306       assertEquals(1, scheduler.scheduleCounter.get());
    307       secondBarrier.await();
    308       firstBarrier.await();
    309       assertEquals(2, scheduler.scheduleCounter.get());
    310       shouldWait.set(false);
    311       secondBarrier.await();
    312       future.cancel(false);
    313     }
    314 
    315     public void testCustomSchedulerServiceStop() throws Exception {
    316       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
    317       service.startAndWait();
    318       service.firstBarrier.await();
    319       assertEquals(1, service.numIterations.get());
    320       service.stop();
    321       service.secondBarrier.await();
    322       service.stopAndWait();
    323       // Sleep for a while just to ensure that our task wasn't called again.
    324       Thread.sleep(unit.toMillis(3 * delay));
    325       assertEquals(1, service.numIterations.get());
    326     }
    327 
    328     public void testBig() throws Exception {
    329       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    330         @Override protected Scheduler scheduler() {
    331           return new AbstractScheduledService.CustomScheduler(){
    332             @Override
    333             protected Schedule getNextSchedule() throws Exception {
    334               // Explicitly yield to increase the probability of a pathological scheduling.
    335               Thread.yield();
    336               return new Schedule(0, TimeUnit.SECONDS);
    337             }
    338           };
    339         }
    340       };
    341       service.useBarriers = false;
    342       service.startAndWait();
    343       Thread.sleep(50);
    344       service.useBarriers = true;
    345       service.firstBarrier.await();
    346       int numIterations = service.numIterations.get();
    347       service.stop();
    348       service.secondBarrier.await();
    349       service.stopAndWait();
    350       assertEquals(numIterations, service.numIterations.get());
    351     }
    352 
    353     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
    354       final AtomicInteger numIterations = new AtomicInteger(0);
    355       volatile boolean useBarriers = true;
    356       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    357       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    358 
    359       @Override protected void runOneIteration() throws Exception {
    360         numIterations.incrementAndGet();
    361         if (useBarriers) {
    362           firstBarrier.await();
    363           secondBarrier.await();
    364         }
    365       }
    366 
    367       @Override protected ScheduledExecutorService executor() {
    368         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    369         return Executors.newScheduledThreadPool(10);
    370       }
    371 
    372       @Override protected void startUp() throws Exception { }
    373 
    374       @Override protected void shutDown() throws Exception { }
    375 
    376       @Override protected Scheduler scheduler() {
    377         return new CustomScheduler() {
    378           @Override
    379           protected Schedule getNextSchedule() throws Exception {
    380             return new Schedule(delay, unit);
    381           }};
    382       }
    383     }
    384 
    385     public void testCustomSchedulerFailure() throws Exception {
    386       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
    387       service.startAndWait();
    388       for (int i = 1; i < 4; i++) {
    389         service.firstBarrier.await();
    390         assertEquals(i, service.numIterations.get());
    391         service.secondBarrier.await();
    392       }
    393       Thread.sleep(1000);
    394       try {
    395         service.stop().get(100, TimeUnit.SECONDS);
    396         fail();
    397       } catch (ExecutionException e) {
    398         assertEquals(State.FAILED, service.state());
    399       }
    400     }
    401 
    402     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
    403       final AtomicInteger numIterations = new AtomicInteger(0);
    404       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    405       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    406 
    407       @Override protected void runOneIteration() throws Exception {
    408         numIterations.incrementAndGet();
    409         firstBarrier.await();
    410         secondBarrier.await();
    411       }
    412 
    413       @Override protected ScheduledExecutorService executor() {
    414         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    415         return Executors.newScheduledThreadPool(10);
    416       }
    417 
    418       @Override protected void startUp() throws Exception { }
    419 
    420       @Override protected void shutDown() throws Exception { }
    421 
    422       @Override protected Scheduler scheduler() {
    423         return new CustomScheduler() {
    424           @Override
    425           protected Schedule getNextSchedule() throws Exception {
    426             if (numIterations.get() > 2) {
    427               throw new IllegalStateException("Failed");
    428             }
    429             return new Schedule(delay, unit);
    430           }};
    431       }
    432     }
    433   }
    434 }
    435