1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 20 import com.google.common.util.concurrent.Service.State; 21 22 import junit.framework.TestCase; 23 24 import java.util.concurrent.CyclicBarrier; 25 import java.util.concurrent.ExecutionException; 26 import java.util.concurrent.Executors; 27 import java.util.concurrent.Future; 28 import java.util.concurrent.ScheduledExecutorService; 29 import java.util.concurrent.ScheduledFuture; 30 import java.util.concurrent.ScheduledThreadPoolExecutor; 31 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.concurrent.atomic.AtomicInteger; 35 36 /** 37 * Unit test for {@link AbstractScheduledService}. 38 * 39 * @author Luke Sandberg 40 */ 41 42 public class AbstractScheduledServiceTest extends TestCase { 43 44 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); 45 volatile ScheduledFuture<?> future = null; 46 47 volatile boolean atFixedRateCalled = false; 48 volatile boolean withFixedDelayCalled = false; 49 volatile boolean scheduleCalled = false; 50 51 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) { 52 @Override 53 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 54 long delay, TimeUnit unit) { 55 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 56 } 57 }; 58 59 public void testServiceStartStop() throws Exception { 60 NullService service = new NullService(); 61 service.startAndWait(); 62 assertFalse(future.isDone()); 63 service.stopAndWait(); 64 assertTrue(future.isCancelled()); 65 } 66 67 private class NullService extends AbstractScheduledService { 68 @Override protected void runOneIteration() throws Exception { } 69 @Override protected void startUp() throws Exception { } 70 @Override protected void shutDown() throws Exception { } 71 @Override protected Scheduler scheduler() { return configuration; } 72 @Override protected ScheduledExecutorService executor() { return executor; } 73 } 74 75 public void testFailOnExceptionFromRun() throws Exception { 76 TestService service = new TestService(); 77 service.runException = new Exception(); 78 service.startAndWait(); 79 service.runFirstBarrier.await(); 80 service.runSecondBarrier.await(); 81 try { 82 future.get(); 83 fail(); 84 } catch (ExecutionException e) { 85 // An execution exception holds a runtime exception (from throwables.propogate) that holds our 86 // original exception. 87 assertEquals(service.runException, e.getCause().getCause()); 88 } 89 assertEquals(service.state(), Service.State.FAILED); 90 } 91 92 public void testFailOnExceptionFromStartUp() { 93 TestService service = new TestService(); 94 service.startUpException = new Exception(); 95 try { 96 service.startAndWait(); 97 fail(); 98 } catch (UncheckedExecutionException e) { 99 assertEquals(service.startUpException, e.getCause()); 100 } 101 assertEquals(0, service.numberOfTimesRunCalled.get()); 102 assertEquals(Service.State.FAILED, service.state()); 103 } 104 105 public void testFailOnExceptionFromShutDown() throws Exception { 106 TestService service = new TestService(); 107 service.shutDownException = new Exception(); 108 service.startAndWait(); 109 service.runFirstBarrier.await(); 110 ListenableFuture<Service.State> stopHandle = service.stop(); 111 service.runSecondBarrier.await(); 112 try { 113 stopHandle.get(); 114 fail(); 115 } catch (ExecutionException e) { 116 assertEquals(service.shutDownException, e.getCause()); 117 } 118 assertEquals(Service.State.FAILED, service.state()); 119 } 120 121 public void testRunOneIterationCalledMultipleTimes() throws Exception { 122 TestService service = new TestService(); 123 service.startAndWait(); 124 for (int i = 1; i < 10; i++) { 125 service.runFirstBarrier.await(); 126 assertEquals(i, service.numberOfTimesRunCalled.get()); 127 service.runSecondBarrier.await(); 128 } 129 service.runFirstBarrier.await(); 130 service.stop(); 131 service.runSecondBarrier.await(); 132 service.stopAndWait(); 133 } 134 135 public void testExecutorOnlyCalledOnce() throws Exception { 136 TestService service = new TestService(); 137 service.startAndWait(); 138 // It should be called once during startup. 139 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 140 for (int i = 1; i < 10; i++) { 141 service.runFirstBarrier.await(); 142 assertEquals(i, service.numberOfTimesRunCalled.get()); 143 service.runSecondBarrier.await(); 144 } 145 service.runFirstBarrier.await(); 146 service.stop(); 147 service.runSecondBarrier.await(); 148 service.stopAndWait(); 149 // Only called once overall. 150 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 151 } 152 153 public void testSchedulerOnlyCalledOnce() throws Exception { 154 TestService service = new TestService(); 155 service.startAndWait(); 156 // It should be called once during startup. 157 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 158 for (int i = 1; i < 10; i++) { 159 service.runFirstBarrier.await(); 160 assertEquals(i, service.numberOfTimesRunCalled.get()); 161 service.runSecondBarrier.await(); 162 } 163 service.runFirstBarrier.await(); 164 service.stop(); 165 service.runSecondBarrier.await(); 166 service.stopAndWait(); 167 // Only called once overall. 168 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 169 } 170 171 private class TestService extends AbstractScheduledService { 172 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 173 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 174 175 volatile boolean startUpCalled = false; 176 volatile boolean shutDownCalled = false; 177 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 178 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 179 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 180 volatile Exception runException = null; 181 volatile Exception startUpException = null; 182 volatile Exception shutDownException = null; 183 184 @Override 185 protected void runOneIteration() throws Exception { 186 assertTrue(startUpCalled); 187 assertFalse(shutDownCalled); 188 numberOfTimesRunCalled.incrementAndGet(); 189 assertEquals(State.RUNNING, state()); 190 runFirstBarrier.await(); 191 runSecondBarrier.await(); 192 if (runException != null) { 193 throw runException; 194 } 195 } 196 197 @Override 198 protected void startUp() throws Exception { 199 assertFalse(startUpCalled); 200 assertFalse(shutDownCalled); 201 startUpCalled = true; 202 assertEquals(State.STARTING, state()); 203 if (startUpException != null) { 204 throw startUpException; 205 } 206 } 207 208 @Override 209 protected void shutDown() throws Exception { 210 assertTrue(startUpCalled); 211 assertFalse(shutDownCalled); 212 shutDownCalled = true; 213 if (shutDownException != null) { 214 throw shutDownException; 215 } 216 } 217 218 @Override 219 protected ScheduledExecutorService executor() { 220 numberOfTimesExecutorCalled.incrementAndGet(); 221 return executor; 222 } 223 224 @Override 225 protected Scheduler scheduler() { 226 numberOfTimesSchedulerCalled.incrementAndGet(); 227 return configuration; 228 } 229 } 230 231 public static class SchedulerTest extends TestCase { 232 // These constants are arbitrary and just used to make sure that the correct method is called 233 // with the correct parameters. 234 private static final int initialDelay = 10; 235 private static final int delay = 20; 236 private static final TimeUnit unit = TimeUnit.MILLISECONDS; 237 238 // Unique runnable object used for comparison. 239 final Runnable testRunnable = new Runnable() {@Override public void run() {}}; 240 boolean called = false; 241 242 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, 243 long delay, TimeUnit unit) { 244 assertFalse(called); // only called once. 245 called = true; 246 assertEquals(SchedulerTest.initialDelay, initialDelay); 247 assertEquals(SchedulerTest.delay, delay); 248 assertEquals(SchedulerTest.unit, unit); 249 assertEquals(testRunnable, command); 250 } 251 252 public void testFixedRateSchedule() { 253 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); 254 schedule.schedule(null, new ScheduledThreadPoolExecutor(1) { 255 @Override 256 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 257 long period, TimeUnit unit) { 258 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 259 return null; 260 } 261 }, testRunnable); 262 assertTrue(called); 263 } 264 265 public void testFixedDelaySchedule() { 266 Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit); 267 schedule.schedule(null, new ScheduledThreadPoolExecutor(10) { 268 @Override 269 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 270 long delay, TimeUnit unit) { 271 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 272 return null; 273 } 274 }, testRunnable); 275 assertTrue(called); 276 } 277 278 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 279 public AtomicInteger scheduleCounter = new AtomicInteger(0); 280 @Override 281 protected Schedule getNextSchedule() throws Exception { 282 scheduleCounter.incrementAndGet(); 283 return new Schedule(0, TimeUnit.SECONDS); 284 } 285 } 286 287 public void testCustomSchedule_startStop() throws Exception { 288 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 289 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 290 final AtomicBoolean shouldWait = new AtomicBoolean(true); 291 Runnable task = new Runnable() { 292 @Override public void run() { 293 try { 294 if (shouldWait.get()) { 295 firstBarrier.await(); 296 secondBarrier.await(); 297 } 298 } catch (Exception e) { 299 throw new RuntimeException(e); 300 } 301 } 302 }; 303 TestCustomScheduler scheduler = new TestCustomScheduler(); 304 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 305 firstBarrier.await(); 306 assertEquals(1, scheduler.scheduleCounter.get()); 307 secondBarrier.await(); 308 firstBarrier.await(); 309 assertEquals(2, scheduler.scheduleCounter.get()); 310 shouldWait.set(false); 311 secondBarrier.await(); 312 future.cancel(false); 313 } 314 315 public void testCustomSchedulerServiceStop() throws Exception { 316 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 317 service.startAndWait(); 318 service.firstBarrier.await(); 319 assertEquals(1, service.numIterations.get()); 320 service.stop(); 321 service.secondBarrier.await(); 322 service.stopAndWait(); 323 // Sleep for a while just to ensure that our task wasn't called again. 324 Thread.sleep(unit.toMillis(3 * delay)); 325 assertEquals(1, service.numIterations.get()); 326 } 327 328 public void testBig() throws Exception { 329 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { 330 @Override protected Scheduler scheduler() { 331 return new AbstractScheduledService.CustomScheduler(){ 332 @Override 333 protected Schedule getNextSchedule() throws Exception { 334 // Explicitly yield to increase the probability of a pathological scheduling. 335 Thread.yield(); 336 return new Schedule(0, TimeUnit.SECONDS); 337 } 338 }; 339 } 340 }; 341 service.useBarriers = false; 342 service.startAndWait(); 343 Thread.sleep(50); 344 service.useBarriers = true; 345 service.firstBarrier.await(); 346 int numIterations = service.numIterations.get(); 347 service.stop(); 348 service.secondBarrier.await(); 349 service.stopAndWait(); 350 assertEquals(numIterations, service.numIterations.get()); 351 } 352 353 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 354 final AtomicInteger numIterations = new AtomicInteger(0); 355 volatile boolean useBarriers = true; 356 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 357 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 358 359 @Override protected void runOneIteration() throws Exception { 360 numIterations.incrementAndGet(); 361 if (useBarriers) { 362 firstBarrier.await(); 363 secondBarrier.await(); 364 } 365 } 366 367 @Override protected ScheduledExecutorService executor() { 368 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 369 return Executors.newScheduledThreadPool(10); 370 } 371 372 @Override protected void startUp() throws Exception { } 373 374 @Override protected void shutDown() throws Exception { } 375 376 @Override protected Scheduler scheduler() { 377 return new CustomScheduler() { 378 @Override 379 protected Schedule getNextSchedule() throws Exception { 380 return new Schedule(delay, unit); 381 }}; 382 } 383 } 384 385 public void testCustomSchedulerFailure() throws Exception { 386 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 387 service.startAndWait(); 388 for (int i = 1; i < 4; i++) { 389 service.firstBarrier.await(); 390 assertEquals(i, service.numIterations.get()); 391 service.secondBarrier.await(); 392 } 393 Thread.sleep(1000); 394 try { 395 service.stop().get(100, TimeUnit.SECONDS); 396 fail(); 397 } catch (ExecutionException e) { 398 assertEquals(State.FAILED, service.state()); 399 } 400 } 401 402 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 403 final AtomicInteger numIterations = new AtomicInteger(0); 404 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 405 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 406 407 @Override protected void runOneIteration() throws Exception { 408 numIterations.incrementAndGet(); 409 firstBarrier.await(); 410 secondBarrier.await(); 411 } 412 413 @Override protected ScheduledExecutorService executor() { 414 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 415 return Executors.newScheduledThreadPool(10); 416 } 417 418 @Override protected void startUp() throws Exception { } 419 420 @Override protected void shutDown() throws Exception { } 421 422 @Override protected Scheduler scheduler() { 423 return new CustomScheduler() { 424 @Override 425 protected Schedule getNextSchedule() throws Exception { 426 if (numIterations.get() > 2) { 427 throw new IllegalStateException("Failed"); 428 } 429 return new Schedule(delay, unit); 430 }}; 431 } 432 } 433 } 434 } 435