Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2014 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
     20 
     21 import com.google.common.util.concurrent.ListenerCallQueue.Callback;
     22 
     23 import junit.framework.TestCase;
     24 
     25 import java.util.concurrent.CountDownLatch;
     26 import java.util.concurrent.ExecutorService;
     27 import java.util.concurrent.Executors;
     28 import java.util.concurrent.atomic.AtomicInteger;
     29 
     30 /**
     31  * Tests for {@link ListenerCallQueue}.
     32  */
     33 public class ListenerCallQueueTest extends TestCase {
     34 
     35   private static final Callback<Object> THROWING_CALLBACK = new Callback<Object>("throwing()") {
     36     @Override public void call(Object object) {
     37       throw new RuntimeException();
     38     }
     39   };
     40 
     41   public void testAddAndExecute() {
     42     Object listenerInstance = new Object();
     43     ListenerCallQueue<Object> queue =
     44         new ListenerCallQueue<Object>(listenerInstance, directExecutor());
     45 
     46     AtomicInteger counter = new AtomicInteger();
     47     queue.add(incrementingCallback(counter, 1));
     48     queue.add(incrementingCallback(counter, 2));
     49     queue.add(incrementingCallback(counter, 3));
     50     queue.add(incrementingCallback(counter, 4));
     51     assertEquals(0, counter.get());
     52     queue.execute();
     53     assertEquals(4, counter.get());
     54   }
     55 
     56   public void testAddAndExecute_withExceptions() {
     57     Object listenerInstance = new Object();
     58     ListenerCallQueue<Object> queue =
     59         new ListenerCallQueue<Object>(listenerInstance, directExecutor());
     60 
     61     AtomicInteger counter = new AtomicInteger();
     62     queue.add(incrementingCallback(counter, 1));
     63     queue.add(THROWING_CALLBACK);
     64     queue.add(incrementingCallback(counter, 2));
     65     queue.add(THROWING_CALLBACK);
     66     queue.add(incrementingCallback(counter, 3));
     67     queue.add(THROWING_CALLBACK);
     68     queue.add(incrementingCallback(counter, 4));
     69     queue.add(THROWING_CALLBACK);
     70     assertEquals(0, counter.get());
     71     queue.execute();
     72     assertEquals(4, counter.get());
     73   }
     74 
     75   public void testAddAndExecute_multithreaded() throws InterruptedException {
     76     ExecutorService service = Executors.newFixedThreadPool(4);
     77     try {
     78       ListenerCallQueue<Object> queue =
     79           new ListenerCallQueue<Object>(new Object(), service);
     80 
     81       final CountDownLatch latch = new CountDownLatch(1);
     82       AtomicInteger counter = new AtomicInteger();
     83       queue.add(incrementingCallback(counter, 1));
     84       queue.add(incrementingCallback(counter, 2));
     85       queue.add(incrementingCallback(counter, 3));
     86       queue.add(incrementingCallback(counter, 4));
     87       queue.add(countDownCallback(latch));
     88       assertEquals(0, counter.get());
     89       queue.execute();
     90       latch.await();
     91       assertEquals(4, counter.get());
     92     } finally {
     93       service.shutdown();
     94     }
     95   }
     96 
     97   public void testAddAndExecute_multithreaded_withThrowingRunnable() throws InterruptedException {
     98     ExecutorService service = Executors.newFixedThreadPool(4);
     99     try {
    100       ListenerCallQueue<Object> queue =
    101           new ListenerCallQueue<Object>(new Object(), service);
    102 
    103       final CountDownLatch latch = new CountDownLatch(1);
    104       AtomicInteger counter = new AtomicInteger();
    105       queue.add(incrementingCallback(counter, 1));
    106       queue.add(THROWING_CALLBACK);
    107       queue.add(incrementingCallback(counter, 2));
    108       queue.add(THROWING_CALLBACK);
    109       queue.add(incrementingCallback(counter, 3));
    110       queue.add(THROWING_CALLBACK);
    111       queue.add(incrementingCallback(counter, 4));
    112       queue.add(THROWING_CALLBACK);
    113       queue.add(countDownCallback(latch));
    114       assertEquals(0, counter.get());
    115       queue.execute();
    116       latch.await();
    117       assertEquals(4, counter.get());
    118     } finally {
    119       service.shutdown();
    120     }
    121   }
    122 
    123   private Callback<Object> incrementingCallback(final AtomicInteger counter, final int expected) {
    124     return new Callback<Object>("incrementing") {
    125       @Override void call(Object listener) {
    126         assertEquals(expected, counter.incrementAndGet());
    127       }
    128     };
    129   }
    130 
    131   private Callback<Object> countDownCallback(final CountDownLatch latch) {
    132     return new Callback<Object>("countDown") {
    133       @Override void call(Object listener) {
    134         latch.countDown();
    135       }
    136     };
    137   }
    138 }
    139