1 /* 2 * Copyright (C) 2011 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import com.google.common.util.concurrent.AbstractScheduledService.Scheduler; 20 import com.google.common.util.concurrent.Service.State; 21 22 import junit.framework.TestCase; 23 24 import java.util.concurrent.CountDownLatch; 25 import java.util.concurrent.CyclicBarrier; 26 import java.util.concurrent.ExecutionException; 27 import java.util.concurrent.Executors; 28 import java.util.concurrent.Future; 29 import java.util.concurrent.ScheduledExecutorService; 30 import java.util.concurrent.ScheduledFuture; 31 import java.util.concurrent.ScheduledThreadPoolExecutor; 32 import java.util.concurrent.TimeUnit; 33 import java.util.concurrent.atomic.AtomicBoolean; 34 import java.util.concurrent.atomic.AtomicInteger; 35 36 /** 37 * Unit test for {@link AbstractScheduledService}. 38 * 39 * @author Luke Sandberg 40 */ 41 42 public class AbstractScheduledServiceTest extends TestCase { 43 44 volatile Scheduler configuration = Scheduler.newFixedDelaySchedule(0, 10, TimeUnit.MILLISECONDS); 45 volatile ScheduledFuture<?> future = null; 46 47 volatile boolean atFixedRateCalled = false; 48 volatile boolean withFixedDelayCalled = false; 49 volatile boolean scheduleCalled = false; 50 51 final ScheduledExecutorService executor = new ScheduledThreadPoolExecutor(10) { 52 @Override 53 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 54 long delay, TimeUnit unit) { 55 return future = super.scheduleWithFixedDelay(command, initialDelay, delay, unit); 56 } 57 }; 58 59 public void testServiceStartStop() throws Exception { 60 NullService service = new NullService(); 61 service.startAsync().awaitRunning(); 62 assertFalse(future.isDone()); 63 service.stopAsync().awaitTerminated(); 64 assertTrue(future.isCancelled()); 65 } 66 67 private class NullService extends AbstractScheduledService { 68 @Override protected void runOneIteration() throws Exception {} 69 @Override protected Scheduler scheduler() { return configuration; } 70 @Override protected ScheduledExecutorService executor() { return executor; } 71 } 72 73 public void testFailOnExceptionFromRun() throws Exception { 74 TestService service = new TestService(); 75 service.runException = new Exception(); 76 service.startAsync().awaitRunning(); 77 service.runFirstBarrier.await(); 78 service.runSecondBarrier.await(); 79 try { 80 future.get(); 81 fail(); 82 } catch (ExecutionException e) { 83 // An execution exception holds a runtime exception (from throwables.propogate) that holds our 84 // original exception. 85 assertEquals(service.runException, e.getCause().getCause()); 86 } 87 assertEquals(service.state(), Service.State.FAILED); 88 } 89 90 public void testFailOnExceptionFromStartUp() { 91 TestService service = new TestService(); 92 service.startUpException = new Exception(); 93 try { 94 service.startAsync().awaitRunning(); 95 fail(); 96 } catch (IllegalStateException e) { 97 assertEquals(service.startUpException, e.getCause()); 98 } 99 assertEquals(0, service.numberOfTimesRunCalled.get()); 100 assertEquals(Service.State.FAILED, service.state()); 101 } 102 103 public void testFailOnExceptionFromShutDown() throws Exception { 104 TestService service = new TestService(); 105 service.shutDownException = new Exception(); 106 service.startAsync().awaitRunning(); 107 service.runFirstBarrier.await(); 108 service.stopAsync(); 109 service.runSecondBarrier.await(); 110 try { 111 service.awaitTerminated(); 112 fail(); 113 } catch (IllegalStateException e) { 114 assertEquals(service.shutDownException, e.getCause()); 115 } 116 assertEquals(Service.State.FAILED, service.state()); 117 } 118 119 public void testRunOneIterationCalledMultipleTimes() throws Exception { 120 TestService service = new TestService(); 121 service.startAsync().awaitRunning(); 122 for (int i = 1; i < 10; i++) { 123 service.runFirstBarrier.await(); 124 assertEquals(i, service.numberOfTimesRunCalled.get()); 125 service.runSecondBarrier.await(); 126 } 127 service.runFirstBarrier.await(); 128 service.stopAsync(); 129 service.runSecondBarrier.await(); 130 service.stopAsync().awaitTerminated(); 131 } 132 133 public void testExecutorOnlyCalledOnce() throws Exception { 134 TestService service = new TestService(); 135 service.startAsync().awaitRunning(); 136 // It should be called once during startup. 137 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 138 for (int i = 1; i < 10; i++) { 139 service.runFirstBarrier.await(); 140 assertEquals(i, service.numberOfTimesRunCalled.get()); 141 service.runSecondBarrier.await(); 142 } 143 service.runFirstBarrier.await(); 144 service.stopAsync(); 145 service.runSecondBarrier.await(); 146 service.stopAsync().awaitTerminated(); 147 // Only called once overall. 148 assertEquals(1, service.numberOfTimesExecutorCalled.get()); 149 } 150 151 public void testDefaultExecutorIsShutdownWhenServiceIsStopped() throws Exception { 152 final CountDownLatch terminationLatch = new CountDownLatch(1); 153 AbstractScheduledService service = new AbstractScheduledService() { 154 volatile ScheduledExecutorService executorService; 155 @Override protected void runOneIteration() throws Exception {} 156 157 @Override protected ScheduledExecutorService executor() { 158 if (executorService == null) { 159 executorService = super.executor(); 160 // Add a listener that will be executed after the listener that shuts down the executor. 161 addListener(new Listener() { 162 @Override public void terminated(State from) { 163 terminationLatch.countDown(); 164 } 165 }, MoreExecutors.sameThreadExecutor()); 166 } 167 return executorService; 168 } 169 170 @Override protected Scheduler scheduler() { 171 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 172 }}; 173 174 service.startAsync(); 175 assertFalse(service.executor().isShutdown()); 176 service.awaitRunning(); 177 service.stopAsync(); 178 terminationLatch.await(); 179 assertTrue(service.executor().isShutdown()); 180 assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS)); 181 } 182 183 public void testDefaultExecutorIsShutdownWhenServiceFails() throws Exception { 184 final CountDownLatch failureLatch = new CountDownLatch(1); 185 AbstractScheduledService service = new AbstractScheduledService() { 186 volatile ScheduledExecutorService executorService; 187 @Override protected void runOneIteration() throws Exception {} 188 189 @Override protected void startUp() throws Exception { 190 throw new Exception("Failed"); 191 } 192 193 @Override protected ScheduledExecutorService executor() { 194 if (executorService == null) { 195 executorService = super.executor(); 196 // Add a listener that will be executed after the listener that shuts down the executor. 197 addListener(new Listener() { 198 @Override public void failed(State from, Throwable failure) { 199 failureLatch.countDown(); 200 } 201 }, MoreExecutors.sameThreadExecutor()); 202 } 203 return executorService; 204 } 205 206 @Override protected Scheduler scheduler() { 207 return Scheduler.newFixedDelaySchedule(0, 1, TimeUnit.MILLISECONDS); 208 }}; 209 210 try { 211 service.startAsync().awaitRunning(); 212 fail("Expected service to fail during startup"); 213 } catch (IllegalStateException expected) {} 214 failureLatch.await(); 215 assertTrue(service.executor().isShutdown()); 216 assertTrue(service.executor().awaitTermination(100, TimeUnit.MILLISECONDS)); 217 } 218 219 public void testSchedulerOnlyCalledOnce() throws Exception { 220 TestService service = new TestService(); 221 service.startAsync().awaitRunning(); 222 // It should be called once during startup. 223 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 224 for (int i = 1; i < 10; i++) { 225 service.runFirstBarrier.await(); 226 assertEquals(i, service.numberOfTimesRunCalled.get()); 227 service.runSecondBarrier.await(); 228 } 229 service.runFirstBarrier.await(); 230 service.stopAsync(); 231 service.runSecondBarrier.await(); 232 service.awaitTerminated(); 233 // Only called once overall. 234 assertEquals(1, service.numberOfTimesSchedulerCalled.get()); 235 } 236 237 private class TestService extends AbstractScheduledService { 238 CyclicBarrier runFirstBarrier = new CyclicBarrier(2); 239 CyclicBarrier runSecondBarrier = new CyclicBarrier(2); 240 241 volatile boolean startUpCalled = false; 242 volatile boolean shutDownCalled = false; 243 AtomicInteger numberOfTimesRunCalled = new AtomicInteger(0); 244 AtomicInteger numberOfTimesExecutorCalled = new AtomicInteger(0); 245 AtomicInteger numberOfTimesSchedulerCalled = new AtomicInteger(0); 246 volatile Exception runException = null; 247 volatile Exception startUpException = null; 248 volatile Exception shutDownException = null; 249 250 @Override 251 protected void runOneIteration() throws Exception { 252 assertTrue(startUpCalled); 253 assertFalse(shutDownCalled); 254 numberOfTimesRunCalled.incrementAndGet(); 255 assertEquals(State.RUNNING, state()); 256 runFirstBarrier.await(); 257 runSecondBarrier.await(); 258 if (runException != null) { 259 throw runException; 260 } 261 } 262 263 @Override 264 protected void startUp() throws Exception { 265 assertFalse(startUpCalled); 266 assertFalse(shutDownCalled); 267 startUpCalled = true; 268 assertEquals(State.STARTING, state()); 269 if (startUpException != null) { 270 throw startUpException; 271 } 272 } 273 274 @Override 275 protected void shutDown() throws Exception { 276 assertTrue(startUpCalled); 277 assertFalse(shutDownCalled); 278 shutDownCalled = true; 279 if (shutDownException != null) { 280 throw shutDownException; 281 } 282 } 283 284 @Override 285 protected ScheduledExecutorService executor() { 286 numberOfTimesExecutorCalled.incrementAndGet(); 287 return executor; 288 } 289 290 @Override 291 protected Scheduler scheduler() { 292 numberOfTimesSchedulerCalled.incrementAndGet(); 293 return configuration; 294 } 295 } 296 297 public static class SchedulerTest extends TestCase { 298 // These constants are arbitrary and just used to make sure that the correct method is called 299 // with the correct parameters. 300 private static final int initialDelay = 10; 301 private static final int delay = 20; 302 private static final TimeUnit unit = TimeUnit.MILLISECONDS; 303 304 // Unique runnable object used for comparison. 305 final Runnable testRunnable = new Runnable() {@Override public void run() {}}; 306 boolean called = false; 307 308 private void assertSingleCallWithCorrectParameters(Runnable command, long initialDelay, 309 long delay, TimeUnit unit) { 310 assertFalse(called); // only called once. 311 called = true; 312 assertEquals(SchedulerTest.initialDelay, initialDelay); 313 assertEquals(SchedulerTest.delay, delay); 314 assertEquals(SchedulerTest.unit, unit); 315 assertEquals(testRunnable, command); 316 } 317 318 public void testFixedRateSchedule() { 319 Scheduler schedule = Scheduler.newFixedRateSchedule(initialDelay, delay, unit); 320 schedule.schedule(null, new ScheduledThreadPoolExecutor(1) { 321 @Override 322 public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, 323 long period, TimeUnit unit) { 324 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 325 return null; 326 } 327 }, testRunnable); 328 assertTrue(called); 329 } 330 331 public void testFixedDelaySchedule() { 332 Scheduler schedule = Scheduler.newFixedDelaySchedule(initialDelay, delay, unit); 333 schedule.schedule(null, new ScheduledThreadPoolExecutor(10) { 334 @Override 335 public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, 336 long delay, TimeUnit unit) { 337 assertSingleCallWithCorrectParameters(command, initialDelay, delay, unit); 338 return null; 339 } 340 }, testRunnable); 341 assertTrue(called); 342 } 343 344 private class TestCustomScheduler extends AbstractScheduledService.CustomScheduler { 345 public AtomicInteger scheduleCounter = new AtomicInteger(0); 346 @Override 347 protected Schedule getNextSchedule() throws Exception { 348 scheduleCounter.incrementAndGet(); 349 return new Schedule(0, TimeUnit.SECONDS); 350 } 351 } 352 353 public void testCustomSchedule_startStop() throws Exception { 354 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 355 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 356 final AtomicBoolean shouldWait = new AtomicBoolean(true); 357 Runnable task = new Runnable() { 358 @Override public void run() { 359 try { 360 if (shouldWait.get()) { 361 firstBarrier.await(); 362 secondBarrier.await(); 363 } 364 } catch (Exception e) { 365 throw new RuntimeException(e); 366 } 367 } 368 }; 369 TestCustomScheduler scheduler = new TestCustomScheduler(); 370 Future<?> future = scheduler.schedule(null, Executors.newScheduledThreadPool(10), task); 371 firstBarrier.await(); 372 assertEquals(1, scheduler.scheduleCounter.get()); 373 secondBarrier.await(); 374 firstBarrier.await(); 375 assertEquals(2, scheduler.scheduleCounter.get()); 376 shouldWait.set(false); 377 secondBarrier.await(); 378 future.cancel(false); 379 } 380 381 public void testCustomSchedulerServiceStop() throws Exception { 382 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService(); 383 service.startAsync().awaitRunning(); 384 service.firstBarrier.await(); 385 assertEquals(1, service.numIterations.get()); 386 service.stopAsync(); 387 service.secondBarrier.await(); 388 service.awaitTerminated(); 389 // Sleep for a while just to ensure that our task wasn't called again. 390 Thread.sleep(unit.toMillis(3 * delay)); 391 assertEquals(1, service.numIterations.get()); 392 } 393 394 public void testBig() throws Exception { 395 TestAbstractScheduledCustomService service = new TestAbstractScheduledCustomService() { 396 @Override protected Scheduler scheduler() { 397 return new AbstractScheduledService.CustomScheduler() { 398 @Override 399 protected Schedule getNextSchedule() throws Exception { 400 // Explicitly yield to increase the probability of a pathological scheduling. 401 Thread.yield(); 402 return new Schedule(0, TimeUnit.SECONDS); 403 } 404 }; 405 } 406 }; 407 service.useBarriers = false; 408 service.startAsync().awaitRunning(); 409 Thread.sleep(50); 410 service.useBarriers = true; 411 service.firstBarrier.await(); 412 int numIterations = service.numIterations.get(); 413 service.stopAsync(); 414 service.secondBarrier.await(); 415 service.awaitTerminated(); 416 assertEquals(numIterations, service.numIterations.get()); 417 } 418 419 private static class TestAbstractScheduledCustomService extends AbstractScheduledService { 420 final AtomicInteger numIterations = new AtomicInteger(0); 421 volatile boolean useBarriers = true; 422 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 423 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 424 425 @Override protected void runOneIteration() throws Exception { 426 numIterations.incrementAndGet(); 427 if (useBarriers) { 428 firstBarrier.await(); 429 secondBarrier.await(); 430 } 431 } 432 433 @Override protected ScheduledExecutorService executor() { 434 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 435 return Executors.newScheduledThreadPool(10); 436 } 437 438 @Override protected void startUp() throws Exception {} 439 440 @Override protected void shutDown() throws Exception {} 441 442 @Override protected Scheduler scheduler() { 443 return new CustomScheduler() { 444 @Override 445 protected Schedule getNextSchedule() throws Exception { 446 return new Schedule(delay, unit); 447 }}; 448 } 449 } 450 451 public void testCustomSchedulerFailure() throws Exception { 452 TestFailingCustomScheduledService service = new TestFailingCustomScheduledService(); 453 service.startAsync().awaitRunning(); 454 for (int i = 1; i < 4; i++) { 455 service.firstBarrier.await(); 456 assertEquals(i, service.numIterations.get()); 457 service.secondBarrier.await(); 458 } 459 Thread.sleep(1000); 460 try { 461 service.stopAsync().awaitTerminated(100, TimeUnit.SECONDS); 462 fail(); 463 } catch (IllegalStateException e) { 464 assertEquals(State.FAILED, service.state()); 465 } 466 } 467 468 private static class TestFailingCustomScheduledService extends AbstractScheduledService { 469 final AtomicInteger numIterations = new AtomicInteger(0); 470 final CyclicBarrier firstBarrier = new CyclicBarrier(2); 471 final CyclicBarrier secondBarrier = new CyclicBarrier(2); 472 473 @Override protected void runOneIteration() throws Exception { 474 numIterations.incrementAndGet(); 475 firstBarrier.await(); 476 secondBarrier.await(); 477 } 478 479 @Override protected ScheduledExecutorService executor() { 480 // use a bunch of threads so that weird overlapping schedules are more likely to happen. 481 return Executors.newScheduledThreadPool(10); 482 } 483 484 @Override protected Scheduler scheduler() { 485 return new CustomScheduler() { 486 @Override 487 protected Schedule getNextSchedule() throws Exception { 488 if (numIterations.get() > 2) { 489 throw new IllegalStateException("Failed"); 490 } 491 return new Schedule(delay, unit); 492 }}; 493 } 494 } 495 } 496 } 497