1 /* 2 * Copyright (C) 2008 The Guava Authors 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.google.common.util.concurrent; 18 19 import static com.google.common.collect.Iterables.getOnlyElement; 20 import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator; 21 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor; 22 import static java.util.concurrent.TimeUnit.SECONDS; 23 import static org.junit.contrib.truth.Truth.ASSERT; 24 25 import com.google.common.base.Throwables; 26 import com.google.common.collect.ImmutableList; 27 28 import junit.framework.TestCase; 29 30 import java.util.Collections; 31 import java.util.List; 32 import java.util.concurrent.Callable; 33 import java.util.concurrent.CyclicBarrier; 34 import java.util.concurrent.ExecutorService; 35 import java.util.concurrent.Future; 36 import java.util.concurrent.RejectedExecutionException; 37 import java.util.concurrent.TimeUnit; 38 import java.util.concurrent.atomic.AtomicReference; 39 40 /** 41 * Tests for MoreExecutors. 42 * 43 * @author Kyle Littlefield (klittle) 44 */ 45 public class MoreExecutorsTest extends TestCase { 46 47 public void testSameThreadExecutorServiceInThreadExecution() 48 throws Exception { 49 final ListeningExecutorService executor = 50 MoreExecutors.sameThreadExecutor(); 51 final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() { 52 @Override 53 protected Integer initialValue() { 54 return 0; 55 } 56 }; 57 final AtomicReference<Throwable> throwableFromOtherThread = 58 new AtomicReference<Throwable>(null); 59 final Runnable incrementTask = 60 new Runnable() { 61 @Override 62 public void run() { 63 threadLocalCount.set(threadLocalCount.get() + 1); 64 } 65 }; 66 67 Thread otherThread = new Thread( 68 new Runnable() { 69 @Override 70 public void run() { 71 try { 72 Future<?> future = executor.submit(incrementTask); 73 assertTrue(future.isDone()); 74 assertEquals(1, threadLocalCount.get().intValue()); 75 } catch (Throwable Throwable) { 76 throwableFromOtherThread.set(Throwable); 77 } 78 } 79 }); 80 81 otherThread.start(); 82 83 ListenableFuture<?> future = executor.submit(incrementTask); 84 assertTrue(future.isDone()); 85 assertListenerRunImmediately(future); 86 assertEquals(1, threadLocalCount.get().intValue()); 87 otherThread.join(1000); 88 assertEquals(Thread.State.TERMINATED, otherThread.getState()); 89 Throwable throwable = throwableFromOtherThread.get(); 90 assertNull("Throwable from other thread: " 91 + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)), 92 throwableFromOtherThread.get()); 93 } 94 95 public void testSameThreadExecutorInvokeAll() throws Exception { 96 final ExecutorService executor = MoreExecutors.sameThreadExecutor(); 97 final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() { 98 @Override 99 protected Integer initialValue() { 100 return 0; 101 } 102 }; 103 104 final Callable<Integer> incrementTask = new Callable<Integer>() { 105 @Override 106 public Integer call() { 107 int i = threadLocalCount.get(); 108 threadLocalCount.set(i + 1); 109 return i; 110 } 111 }; 112 113 List<Future<Integer>> futures = 114 executor.invokeAll(Collections.nCopies(10, incrementTask)); 115 116 for (int i = 0; i < 10; i++) { 117 Future<Integer> future = futures.get(i); 118 assertTrue("Task should have been run before being returned", future.isDone()); 119 assertEquals(i, future.get().intValue()); 120 } 121 122 assertEquals(10, threadLocalCount.get().intValue()); 123 } 124 125 public void testSameThreadExecutorServiceTermination() 126 throws Exception { 127 final ExecutorService executor = MoreExecutors.sameThreadExecutor(); 128 final CyclicBarrier barrier = new CyclicBarrier(2); 129 final AtomicReference<Throwable> throwableFromOtherThread = 130 new AtomicReference<Throwable>(null); 131 final Runnable doNothingRunnable = new Runnable() { 132 @Override public void run() { 133 }}; 134 135 Thread otherThread = new Thread(new Runnable() { 136 @Override 137 public void run() { 138 try { 139 Future<?> future = executor.submit(new Callable<Void>() { 140 @Override 141 public Void call() throws Exception { 142 // WAIT #1 143 barrier.await(1, TimeUnit.SECONDS); 144 145 // WAIT #2 146 barrier.await(1, TimeUnit.SECONDS); 147 assertTrue(executor.isShutdown()); 148 assertFalse(executor.isTerminated()); 149 150 // WAIT #3 151 barrier.await(1, TimeUnit.SECONDS); 152 return null; 153 } 154 }); 155 assertTrue(future.isDone()); 156 assertTrue(executor.isShutdown()); 157 assertTrue(executor.isTerminated()); 158 } catch (Throwable Throwable) { 159 throwableFromOtherThread.set(Throwable); 160 } 161 }}); 162 163 otherThread.start(); 164 165 // WAIT #1 166 barrier.await(1, TimeUnit.SECONDS); 167 assertFalse(executor.isShutdown()); 168 assertFalse(executor.isTerminated()); 169 170 executor.shutdown(); 171 assertTrue(executor.isShutdown()); 172 try { 173 executor.submit(doNothingRunnable); 174 fail("Should have encountered RejectedExecutionException"); 175 } catch (RejectedExecutionException ex) { 176 // good to go 177 } 178 assertFalse(executor.isTerminated()); 179 180 // WAIT #2 181 barrier.await(1, TimeUnit.SECONDS); 182 assertFalse(executor.awaitTermination(20, TimeUnit.MILLISECONDS)); 183 184 // WAIT #3 185 barrier.await(1, TimeUnit.SECONDS); 186 assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS)); 187 assertTrue(executor.awaitTermination(0, TimeUnit.SECONDS)); 188 assertTrue(executor.isShutdown()); 189 try { 190 executor.submit(doNothingRunnable); 191 fail("Should have encountered RejectedExecutionException"); 192 } catch (RejectedExecutionException ex) { 193 // good to go 194 } 195 assertTrue(executor.isTerminated()); 196 197 otherThread.join(1000); 198 assertEquals(Thread.State.TERMINATED, otherThread.getState()); 199 Throwable throwable = throwableFromOtherThread.get(); 200 assertNull("Throwable from other thread: " 201 + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)), 202 throwableFromOtherThread.get()); 203 } 204 205 public void testListeningDecorator() throws Exception { 206 ListeningExecutorService service = 207 listeningDecorator(MoreExecutors.sameThreadExecutor()); 208 assertSame(service, listeningDecorator(service)); 209 List<Callable<String>> callables = 210 ImmutableList.of(Callables.returning("x")); 211 List<Future<String>> results; 212 213 results = service.invokeAll(callables); 214 ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class); 215 216 results = service.invokeAll(callables, 1, SECONDS); 217 ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class); 218 219 /* 220 * TODO(cpovirk): move ForwardingTestCase somewhere common, and use it to 221 * test the forwarded methods 222 */ 223 } 224 225 private static void assertListenerRunImmediately(ListenableFuture<?> future) { 226 CountingRunnable listener = new CountingRunnable(); 227 future.addListener(listener, sameThreadExecutor()); 228 assertEquals(1, listener.count); 229 } 230 231 private static final class CountingRunnable implements Runnable { 232 int count; 233 234 @Override 235 public void run() { 236 count++; 237 } 238 } 239 } 240