1 /* 2 * Copyright (C) 2014 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.util.concurrent.MoreExecutors.directExecutor; 20 21 import com.google.common.util.concurrent.ListenerCallQueue.Callback; 22 23 import junit.framework.TestCase; 24 25 import java.util.concurrent.CountDownLatch; 26 import java.util.concurrent.ExecutorService; 27 import java.util.concurrent.Executors; 28 import java.util.concurrent.atomic.AtomicInteger; 29 30 /** 31 * Tests for {@link ListenerCallQueue}. 32 */ 33 public class ListenerCallQueueTest extends TestCase { 34 35 private static final Callback<Object> THROWING_CALLBACK = new Callback<Object>("throwing()") { 36 @Override public void call(Object object) { 37 throw new RuntimeException(); 38 } 39 }; 40 41 public void testAddAndExecute() { 42 Object listenerInstance = new Object(); 43 ListenerCallQueue<Object> queue = 44 new ListenerCallQueue<Object>(listenerInstance, directExecutor()); 45 46 AtomicInteger counter = new AtomicInteger(); 47 queue.add(incrementingCallback(counter, 1)); 48 queue.add(incrementingCallback(counter, 2)); 49 queue.add(incrementingCallback(counter, 3)); 50 queue.add(incrementingCallback(counter, 4)); 51 assertEquals(0, counter.get()); 52 queue.execute(); 53 assertEquals(4, counter.get()); 54 } 55 56 public void testAddAndExecute_withExceptions() { 57 Object listenerInstance = new Object(); 58 ListenerCallQueue<Object> queue = 59 new ListenerCallQueue<Object>(listenerInstance, directExecutor()); 60 61 AtomicInteger counter = new AtomicInteger(); 62 queue.add(incrementingCallback(counter, 1)); 63 queue.add(THROWING_CALLBACK); 64 queue.add(incrementingCallback(counter, 2)); 65 queue.add(THROWING_CALLBACK); 66 queue.add(incrementingCallback(counter, 3)); 67 queue.add(THROWING_CALLBACK); 68 queue.add(incrementingCallback(counter, 4)); 69 queue.add(THROWING_CALLBACK); 70 assertEquals(0, counter.get()); 71 queue.execute(); 72 assertEquals(4, counter.get()); 73 } 74 75 public void testAddAndExecute_multithreaded() throws InterruptedException { 76 ExecutorService service = Executors.newFixedThreadPool(4); 77 try { 78 ListenerCallQueue<Object> queue = 79 new ListenerCallQueue<Object>(new Object(), service); 80 81 final CountDownLatch latch = new CountDownLatch(1); 82 AtomicInteger counter = new AtomicInteger(); 83 queue.add(incrementingCallback(counter, 1)); 84 queue.add(incrementingCallback(counter, 2)); 85 queue.add(incrementingCallback(counter, 3)); 86 queue.add(incrementingCallback(counter, 4)); 87 queue.add(countDownCallback(latch)); 88 assertEquals(0, counter.get()); 89 queue.execute(); 90 latch.await(); 91 assertEquals(4, counter.get()); 92 } finally { 93 service.shutdown(); 94 } 95 } 96 97 public void testAddAndExecute_multithreaded_withThrowingRunnable() throws InterruptedException { 98 ExecutorService service = Executors.newFixedThreadPool(4); 99 try { 100 ListenerCallQueue<Object> queue = 101 new ListenerCallQueue<Object>(new Object(), service); 102 103 final CountDownLatch latch = new CountDownLatch(1); 104 AtomicInteger counter = new AtomicInteger(); 105 queue.add(incrementingCallback(counter, 1)); 106 queue.add(THROWING_CALLBACK); 107 queue.add(incrementingCallback(counter, 2)); 108 queue.add(THROWING_CALLBACK); 109 queue.add(incrementingCallback(counter, 3)); 110 queue.add(THROWING_CALLBACK); 111 queue.add(incrementingCallback(counter, 4)); 112 queue.add(THROWING_CALLBACK); 113 queue.add(countDownCallback(latch)); 114 assertEquals(0, counter.get()); 115 queue.execute(); 116 latch.await(); 117 assertEquals(4, counter.get()); 118 } finally { 119 service.shutdown(); 120 } 121 } 122 123 private Callback<Object> incrementingCallback(final AtomicInteger counter, final int expected) { 124 return new Callback<Object>("incrementing") { 125 @Override void call(Object listener) { 126 assertEquals(expected, counter.incrementAndGet()); 127 } 128 }; 129 } 130 131 private Callback<Object> countDownCallback(final CountDownLatch latch) { 132 return new Callback<Object>("countDown") { 133 @Override void call(Object listener) { 134 latch.countDown(); 135 } 136 }; 137 } 138 } 139