Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler;
     20 import com.google.common.util.concurrent.Service.State;
     21 
     22 import junit.framework.TestCase;
     23 
     24 import java.util.concurrent.CyclicBarrier;
     25 import java.util.concurrent.ExecutionException;
     26 import java.util.concurrent.Executors;
     27 import java.util.concurrent.Future;
     28 import java.util.concurrent.ScheduledExecutorService;
     29 import java.util.concurrent.ScheduledFuture;
     30 import java.util.concurrent.ScheduledThreadPoolExecutor;
     31 import java.util.concurrent.TimeUnit;
     32 import java.util.concurrent.atomic.AtomicBoolean;
     33 import java.util.concurrent.atomic.AtomicInteger;
     34 import java.util.concurrent.atomic.AtomicReference;
     35 
     36 /**
     37  * Unit test for {@link AbstractScheduledService}.
     38  *
     39  * @author Luke Sandberg
     40  */
     41 
     42 public class AbstractScheduledServiceTest extends TestCase {
     43 
     44   volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS);
     45   volatile ScheduledFuture<?> future = null;
     46 
     47   volatile boolean atFixedRateCalled = false;
     48   volatile boolean withFixedDelayCalled = false;
     49   volatile boolean scheduleCalled = false;
     50 
     51   final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) {
     52     @Override
     53     public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
     54         long delay, TimeUnit unit) {
     55       return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit);
     56     }
     57   };
     58 
     59   public void testServiceStartStop() throws Exception {
     60     NullService service = new NullService();
     61     service.startAsync().awaitRunning();
     62     assertFalse(future.isDone());
     63     service.stopAsync().awaitTerminated();
     64     assertTrue(future.isCancelled());
     65   }
     66 
     67   private class NullService extends AbstractScheduledService {
     68     @Override protected void runOneIteration() throws Exception {}
     69     @Override protected Scheduler scheduler() { return configuration; }
     70     @Override protected ScheduledExecutorService executor() { return executor; }
     71   }
     72 
     73   public void testFailOnExceptionFromRun() throws Exception {
     74     TestService service = new TestService();
     75     service.runException = new Exception();
     76     service.startAsync().awaitRunning();
     77     service.runFirstBarrier.await();
     78     service.runSecondBarrier.await();
     79     try {
     80       future.get();
     81       fail();
     82     } catch (ExecutionException e) {
     83       // An execution exception holds a runtime exception (from throwables.propogate) that holds our
     84       // original exception.
     85       assertEquals(service.runException, e.getCause().getCause());
     86     }
     87     assertEquals(service.state(), Service.State.FAILED);
     88   }
     89 
     90   public void testFailOnExceptionFromStartUp() {
     91     TestService service = new TestService();
     92     service.startUpException = new Exception();
     93     try {
     94       service.startAsync().awaitRunning();
     95       fail();
     96     } catch (IllegalStateException e) {
     97       assertEquals(service.startUpException, e.getCause());
     98     }
     99     assertEquals(0, service.numberOfTimesRunCalled.get());
    100     assertEquals(Service.State.FAILED, service.state());
    101   }
    102 
    103   public void testFailOnExceptionFromShutDown() throws Exception {
    104     TestService service = new TestService();
    105     service.shutDownException = new Exception();
    106     service.startAsync().awaitRunning();
    107     service.runFirstBarrier.await();
    108     service.stopAsync();
    109     service.runSecondBarrier.await();
    110     try {
    111       service.awaitTerminated();
    112       fail();
    113     } catch (IllegalStateException e) {
    114       assertEquals(service.shutDownException, e.getCause());
    115     }
    116     assertEquals(Service.State.FAILED, service.state());
    117   }
    118 
    119   public void testRunOneIterationCalledMultipleTimes() throws Exception {
    120     TestService service = new TestService();
    121     service.startAsync().awaitRunning();
    122     for (int i = 1; i < 10; i++) {
    123       service.runFirstBarrier.await();
    124       assertEquals(i, service.numberOfTimesRunCalled.get());
    125       service.runSecondBarrier.await();
    126     }
    127     service.runFirstBarrier.await();
    128     service.stopAsync();
    129     service.runSecondBarrier.await();
    130     service.stopAsync().awaitTerminated();
    131   }
    132 
    133   public void testExecutorOnlyCalledOnce() throws Exception {
    134     TestService service = new TestService();
    135     service.startAsync().awaitRunning();
    136     // It should be called once during startup.
    137     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    138     for (int i = 1; i < 10; i++) {
    139       service.runFirstBarrier.await();
    140       assertEquals(i, service.numberOfTimesRunCalled.get());
    141       service.runSecondBarrier.await();
    142     }
    143     service.runFirstBarrier.await();
    144     service.stopAsync();
    145     service.runSecondBarrier.await();
    146     service.stopAsync().awaitTerminated();
    147     // Only called once overall.
    148     assertEquals(1, service.numberOfTimesExecutorCalled.get());
    149   }
    150 
    151   public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception {
    152     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
    153     AbstractScheduledService service = new AbstractScheduledService() {
    154       @Override protected void runOneIteration() throws Exception {}
    155 
    156       @Override protected ScheduledExecutorService executor() {
    157         executor.set(super.executor());
    158         return executor.get();
    159       }
    160 
    161       @Override protected Scheduler scheduler() {
    162         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
    163       }
    164     };
    165 
    166     service.startAsync();
    167     assertFalse(service.executor().isShutdown());
    168     service.awaitRunning();
    169     service.stopAsync();
    170     service.awaitTerminated();
    171     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
    172   }
    173 
    174   public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception {
    175     final AtomicReference<ScheduledExecutorService> executor = Atomics.newReference();
    176     AbstractScheduledService service = new AbstractScheduledService() {
    177       @Override protected void startUp() throws Exception {
    178         throw new Exception("Failed");
    179       }
    180 
    181       @Override protected void runOneIteration() throws Exception {}
    182 
    183       @Override protected ScheduledExecutorService executor() {
    184         executor.set(super.executor());
    185         return executor.get();
    186       }
    187 
    188       @Override protected Scheduler scheduler() {
    189         return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS);
    190       }
    191     };
    192 
    193     try {
    194       service.startAsync().awaitRunning();
    195       fail("Expected service to fail during startup");
    196     } catch (IllegalStateException expected) {}
    197 
    198     assertTrue(executor.get().awaitTermination(100, TimeUnit.MILLISECONDS));
    199   }
    200 
    201   public void testSchedulerOnlyCalledOnce() throws Exception {
    202     TestService service = new TestService();
    203     service.startAsync().awaitRunning();
    204     // It should be called once during startup.
    205     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    206     for (int i = 1; i < 10; i++) {
    207       service.runFirstBarrier.await();
    208       assertEquals(i, service.numberOfTimesRunCalled.get());
    209       service.runSecondBarrier.await();
    210     }
    211     service.runFirstBarrier.await();
    212     service.stopAsync();
    213     service.runSecondBarrier.await();
    214     service.awaitTerminated();
    215     // Only called once overall.
    216     assertEquals(1, service.numberOfTimesSchedulerCalled.get());
    217   }
    218 
    219   private class TestService extends AbstractScheduledService {
    220     CyclicBarrier runFirstBarrier = new CyclicBarrier(2);
    221     CyclicBarrier runSecondBarrier = new CyclicBarrier(2);
    222 
    223     volatile boolean startUpCalled = false;
    224     volatile boolean shutDownCalled = false;
    225     AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0);
    226     AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0);
    227     AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0);
    228     volatile Exception runException = null;
    229     volatile Exception startUpException = null;
    230     volatile Exception shutDownException = null;
    231 
    232     @Override
    233     protected void runOneIteration() throws Exception {
    234       assertTrue(startUpCalled);
    235       assertFalse(shutDownCalled);
    236       numberOfTimesRunCalled.incrementAndGet();
    237       assertEquals(State.RUNNING, state());
    238       runFirstBarrier.await();
    239       runSecondBarrier.await();
    240       if (runException != null) {
    241         throw runException;
    242       }
    243     }
    244 
    245     @Override
    246     protected void startUp() throws Exception {
    247       assertFalse(startUpCalled);
    248       assertFalse(shutDownCalled);
    249       startUpCalled = true;
    250       assertEquals(State.STARTING, state());
    251       if (startUpException != null) {
    252         throw startUpException;
    253       }
    254     }
    255 
    256     @Override
    257     protected void shutDown() throws Exception {
    258       assertTrue(startUpCalled);
    259       assertFalse(shutDownCalled);
    260       shutDownCalled = true;
    261       if (shutDownException != null) {
    262         throw shutDownException;
    263       }
    264     }
    265 
    266     @Override
    267     protected ScheduledExecutorService executor() {
    268       numberOfTimesExecutorCalled.incrementAndGet();
    269       return executor;
    270     }
    271 
    272     @Override
    273     protected Scheduler scheduler() {
    274       numberOfTimesSchedulerCalled.incrementAndGet();
    275       return configuration;
    276     }
    277   }
    278 
    279   public static class SchedulerTest extends TestCase {
    280     // These constants are arbitrary and just used to make sure that the correct method is called
    281     // with the correct parameters.
    282     private static final int initialDelay = 10;
    283     private static final int delay = 20;
    284     private static final TimeUnit unit = TimeUnit.MILLISECONDS;
    285 
    286     // Unique runnable object used for comparison.
    287     final Runnable testRunnable = new Runnable() {@Override public void run() {}};
    288     boolean called = false;
    289 
    290     private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay,
    291         long delay, TimeUnit unit) {
    292       assertFalse(called);  // only called once.
    293       called = true;
    294       assertEquals(SchedulerTest.initialDelay, initialDelay);
    295       assertEquals(SchedulerTest.delay, delay);
    296       assertEquals(SchedulerTest.unit, unit);
    297       assertEquals(testRunnable, command);
    298     }
    299 
    300     public void testFixedRateSchedule() {
    301       Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit);
    302       schedule.schedule(null, new ScheduledThreadPoolExecutor(1) {
    303         @Override
    304         public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
    305             long period, TimeUnit unit) {
    306           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    307           return null;
    308         }
    309       }, testRunnable);
    310       assertTrue(called);
    311     }
    312 
    313     public void testFixedDelaySchedule() {
    314       Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit);
    315       schedule.schedule(null, new ScheduledThreadPoolExecutor(10) {
    316         @Override
    317         public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
    318             long delay, TimeUnit unit) {
    319           assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit);
    320           return null;
    321         }
    322       }, testRunnable);
    323       assertTrue(called);
    324     }
    325 
    326     private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler {
    327       public AtomicInteger scheduleCounter = new AtomicInteger(0);
    328       @Override
    329       protected Schedule getNextSchedule() throws Exception {
    330         scheduleCounter.incrementAndGet();
    331         return new Schedule(0, TimeUnit.SECONDS);
    332       }
    333     }
    334 
    335     public void testCustomSchedule_startStop() throws Exception {
    336       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    337       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    338       final AtomicBoolean shouldWait = new AtomicBoolean(true);
    339       Runnable task = new Runnable() {
    340         @Override public void run() {
    341           try {
    342             if (shouldWait.get()) {
    343               firstBarrier.await();
    344               secondBarrier.await();
    345             }
    346           } catch (Exception e) {
    347             throw new RuntimeException(e);
    348           }
    349         }
    350       };
    351       TestCustomScheduler scheduler = new TestCustomScheduler();
    352       Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task);
    353       firstBarrier.await();
    354       assertEquals(1, scheduler.scheduleCounter.get());
    355       secondBarrier.await();
    356       firstBarrier.await();
    357       assertEquals(2, scheduler.scheduleCounter.get());
    358       shouldWait.set(false);
    359       secondBarrier.await();
    360       future.cancel(false);
    361     }
    362 
    363     public void testCustomSchedulerServiceStop() throws Exception {
    364       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService();
    365       service.startAsync().awaitRunning();
    366       service.firstBarrier.await();
    367       assertEquals(1, service.numIterations.get());
    368       service.stopAsync();
    369       service.secondBarrier.await();
    370       service.awaitTerminated();
    371       // Sleep for a while just to ensure that our task wasn't called again.
    372       Thread.sleep(unit.toMillis(3 * delay));
    373       assertEquals(1, service.numIterations.get());
    374     }
    375 
    376     public void testBig() throws Exception {
    377       TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() {
    378         @Override protected Scheduler scheduler() {
    379           return new AbstractScheduledService.CustomScheduler() {
    380             @Override
    381             protected Schedule getNextSchedule() throws Exception {
    382               // Explicitly yield to increase the probability of a pathological scheduling.
    383               Thread.yield();
    384               return new Schedule(0, TimeUnit.SECONDS);
    385             }
    386           };
    387         }
    388       };
    389       service.useBarriers = false;
    390       service.startAsync().awaitRunning();
    391       Thread.sleep(50);
    392       service.useBarriers = true;
    393       service.firstBarrier.await();
    394       int numIterations = service.numIterations.get();
    395       service.stopAsync();
    396       service.secondBarrier.await();
    397       service.awaitTerminated();
    398       assertEquals(numIterations, service.numIterations.get());
    399     }
    400 
    401     private static class TestAbstractScheduledCustomService extends AbstractScheduledService {
    402       final AtomicInteger numIterations = new AtomicInteger(0);
    403       volatile boolean useBarriers = true;
    404       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    405       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    406 
    407       @Override protected void runOneIteration() throws Exception {
    408         numIterations.incrementAndGet();
    409         if (useBarriers) {
    410           firstBarrier.await();
    411           secondBarrier.await();
    412         }
    413       }
    414 
    415       @Override protected ScheduledExecutorService executor() {
    416         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    417         return Executors.newScheduledThreadPool(10);
    418       }
    419 
    420       @Override protected void startUp() throws Exception {}
    421 
    422       @Override protected void shutDown() throws Exception {}
    423 
    424       @Override protected Scheduler scheduler() {
    425         return new CustomScheduler() {
    426           @Override
    427           protected Schedule getNextSchedule() throws Exception {
    428             return new Schedule(delay, unit);
    429           }};
    430       }
    431     }
    432 
    433     public void testCustomSchedulerFailure() throws Exception {
    434       TestFailingCustomScheduledService service = new TestFailingCustomScheduledService();
    435       service.startAsync().awaitRunning();
    436       for (int i = 1; i < 4; i++) {
    437         service.firstBarrier.await();
    438         assertEquals(i, service.numIterations.get());
    439         service.secondBarrier.await();
    440       }
    441       Thread.sleep(1000);
    442       try {
    443         service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS);
    444         fail();
    445       } catch (IllegalStateException e) {
    446         assertEquals(State.FAILED, service.state());
    447       }
    448     }
    449 
    450     private static class TestFailingCustomScheduledService extends AbstractScheduledService {
    451       final AtomicInteger numIterations = new AtomicInteger(0);
    452       final CyclicBarrier firstBarrier = new CyclicBarrier(2);
    453       final CyclicBarrier secondBarrier = new CyclicBarrier(2);
    454 
    455       @Override protected void runOneIteration() throws Exception {
    456         numIterations.incrementAndGet();
    457         firstBarrier.await();
    458         secondBarrier.await();
    459       }
    460 
    461       @Override protected ScheduledExecutorService executor() {
    462         // use a bunch of threads so that weird overlapping schedules are more likely to happen.
    463         return Executors.newScheduledThreadPool(10);
    464       }
    465 
    466       @Override protected Scheduler scheduler() {
    467         return new CustomScheduler() {
    468           @Override
    469           protected Schedule getNextSchedule() throws Exception {
    470             if (numIterations.get() > 2) {
    471               throw new IllegalStateException("Failed");
    472             }
    473             return new Schedule(delay, unit);
    474           }};
    475       }
    476     }
    477   }
    478 }
    479