Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2008 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.collect.Iterables.getOnlyElement;
     20 import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
     21 import static com.google.common.util.concurrent.MoreExecutors.sameThreadExecutor;
     22 import static java.util.concurrent.TimeUnit.SECONDS;
     23 import static org.junit.contrib.truth.Truth.ASSERT;
     24 
     25 import com.google.common.base.Throwables;
     26 import com.google.common.collect.ImmutableList;
     27 
     28 import junit.framework.TestCase;
     29 
     30 import java.util.Collections;
     31 import java.util.List;
     32 import java.util.concurrent.Callable;
     33 import java.util.concurrent.CyclicBarrier;
     34 import java.util.concurrent.ExecutorService;
     35 import java.util.concurrent.Future;
     36 import java.util.concurrent.RejectedExecutionException;
     37 import java.util.concurrent.TimeUnit;
     38 import java.util.concurrent.atomic.AtomicReference;
     39 
     40 /**
     41  * Tests for MoreExecutors.
     42  *
     43  * @author Kyle Littlefield (klittle)
     44  */
     45 public class MoreExecutorsTest extends TestCase {
     46 
     47   public void testSameThreadExecutorServiceInThreadExecution()
     48       throws Exception {
     49     final ListeningExecutorService executor =
     50         MoreExecutors.sameThreadExecutor();
     51     final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() {
     52       @Override
     53       protected Integer initialValue() {
     54         return 0;
     55       }
     56     };
     57     final AtomicReference<Throwable> throwableFromOtherThread =
     58         new AtomicReference<Throwable>(null);
     59     final Runnable incrementTask =
     60         new Runnable() {
     61           @Override
     62           public void run() {
     63             threadLocalCount.set(threadLocalCount.get() + 1);
     64           }
     65         };
     66 
     67     Thread otherThread = new Thread(
     68         new Runnable() {
     69           @Override
     70           public void run() {
     71             try {
     72               Future<?> future = executor.submit(incrementTask);
     73               assertTrue(future.isDone());
     74               assertEquals(1, threadLocalCount.get().intValue());
     75             } catch (Throwable Throwable) {
     76               throwableFromOtherThread.set(Throwable);
     77             }
     78           }
     79         });
     80 
     81     otherThread.start();
     82 
     83     ListenableFuture<?> future = executor.submit(incrementTask);
     84     assertTrue(future.isDone());
     85     assertListenerRunImmediately(future);
     86     assertEquals(1, threadLocalCount.get().intValue());
     87     otherThread.join(1000);
     88     assertEquals(Thread.State.TERMINATED, otherThread.getState());
     89     Throwable throwable = throwableFromOtherThread.get();
     90     assertNull("Throwable from other thread: "
     91         + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)),
     92         throwableFromOtherThread.get());
     93   }
     94 
     95   public void testSameThreadExecutorInvokeAll() throws Exception {
     96     final ExecutorService executor = MoreExecutors.sameThreadExecutor();
     97     final ThreadLocal<Integer> threadLocalCount = new ThreadLocal<Integer>() {
     98       @Override
     99       protected Integer initialValue() {
    100         return 0;
    101       }
    102     };
    103 
    104     final Callable<Integer> incrementTask = new Callable<Integer>() {
    105       @Override
    106       public Integer call() {
    107         int i = threadLocalCount.get();
    108         threadLocalCount.set(i + 1);
    109         return i;
    110       }
    111     };
    112 
    113     List<Future<Integer>> futures =
    114         executor.invokeAll(Collections.nCopies(10, incrementTask));
    115 
    116     for (int i = 0; i < 10; i++) {
    117       Future<Integer> future = futures.get(i);
    118       assertTrue("Task should have been run before being returned", future.isDone());
    119       assertEquals(i, future.get().intValue());
    120     }
    121 
    122     assertEquals(10, threadLocalCount.get().intValue());
    123   }
    124 
    125   public void testSameThreadExecutorServiceTermination()
    126       throws Exception {
    127     final ExecutorService executor = MoreExecutors.sameThreadExecutor();
    128     final CyclicBarrier barrier = new CyclicBarrier(2);
    129     final AtomicReference<Throwable> throwableFromOtherThread =
    130         new AtomicReference<Throwable>(null);
    131     final Runnable doNothingRunnable = new Runnable() {
    132         @Override public void run() {
    133         }};
    134 
    135     Thread otherThread = new Thread(new Runnable() {
    136       @Override
    137       public void run() {
    138         try {
    139           Future<?> future = executor.submit(new Callable<Void>() {
    140             @Override
    141             public Void call() throws Exception {
    142               // WAIT #1
    143               barrier.await(1, TimeUnit.SECONDS);
    144 
    145               // WAIT #2
    146               barrier.await(1, TimeUnit.SECONDS);
    147               assertTrue(executor.isShutdown());
    148               assertFalse(executor.isTerminated());
    149 
    150               // WAIT #3
    151               barrier.await(1, TimeUnit.SECONDS);
    152               return null;
    153             }
    154           });
    155           assertTrue(future.isDone());
    156           assertTrue(executor.isShutdown());
    157           assertTrue(executor.isTerminated());
    158         } catch (Throwable Throwable) {
    159           throwableFromOtherThread.set(Throwable);
    160         }
    161       }});
    162 
    163     otherThread.start();
    164 
    165     // WAIT #1
    166     barrier.await(1, TimeUnit.SECONDS);
    167     assertFalse(executor.isShutdown());
    168     assertFalse(executor.isTerminated());
    169 
    170     executor.shutdown();
    171     assertTrue(executor.isShutdown());
    172     try {
    173       executor.submit(doNothingRunnable);
    174       fail("Should have encountered RejectedExecutionException");
    175     } catch (RejectedExecutionException ex) {
    176       // good to go
    177     }
    178     assertFalse(executor.isTerminated());
    179 
    180     // WAIT #2
    181     barrier.await(1, TimeUnit.SECONDS);
    182     assertFalse(executor.awaitTermination(20, TimeUnit.MILLISECONDS));
    183 
    184     // WAIT #3
    185     barrier.await(1, TimeUnit.SECONDS);
    186     assertTrue(executor.awaitTermination(1, TimeUnit.SECONDS));
    187     assertTrue(executor.awaitTermination(0, TimeUnit.SECONDS));
    188     assertTrue(executor.isShutdown());
    189     try {
    190       executor.submit(doNothingRunnable);
    191       fail("Should have encountered RejectedExecutionException");
    192     } catch (RejectedExecutionException ex) {
    193       // good to go
    194     }
    195     assertTrue(executor.isTerminated());
    196 
    197     otherThread.join(1000);
    198     assertEquals(Thread.State.TERMINATED, otherThread.getState());
    199     Throwable throwable = throwableFromOtherThread.get();
    200     assertNull("Throwable from other thread: "
    201         + (throwable == null ? null : Throwables.getStackTraceAsString(throwable)),
    202         throwableFromOtherThread.get());
    203   }
    204 
    205   public void testListeningDecorator() throws Exception {
    206     ListeningExecutorService service =
    207         listeningDecorator(MoreExecutors.sameThreadExecutor());
    208     assertSame(service, listeningDecorator(service));
    209     List<Callable<String>> callables =
    210         ImmutableList.of(Callables.returning("x"));
    211     List<Future<String>> results;
    212 
    213     results = service.invokeAll(callables);
    214     ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
    215 
    216     results = service.invokeAll(callables, 1, SECONDS);
    217     ASSERT.that(getOnlyElement(results)).isA(ListenableFutureTask.class);
    218 
    219     /*
    220      * TODO(cpovirk): move ForwardingTestCase somewhere common, and use it to
    221      * test the forwarded methods
    222      */
    223   }
    224 
    225   private static void assertListenerRunImmediately(ListenableFuture<?> future) {
    226     CountingRunnable listener = new CountingRunnable();
    227     future.addListener(listener, sameThreadExecutor());
    228     assertEquals(1, listener.count);
    229   }
    230 
    231   private static final class CountingRunnable implements Runnable {
    232     int count;
    233 
    234     @Override
    235     public void run() {
    236       count++;
    237     }
    238   }
    239 }
    240