Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2013 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
     20 
     21 import com.google.caliper.AfterExperiment;
     22 import com.google.caliper.BeforeExperiment;
     23 import com.google.caliper.Benchmark;
     24 import com.google.caliper.Param;
     25 import com.google.caliper.api.Footprint;
     26 import com.google.caliper.api.VmOptions;
     27 import com.google.common.base.Preconditions;
     28 import com.google.common.collect.Lists;
     29 
     30 import java.util.Queue;
     31 import java.util.concurrent.ArrayBlockingQueue;
     32 import java.util.concurrent.CountDownLatch;
     33 import java.util.concurrent.Executor;
     34 import java.util.concurrent.ExecutorService;
     35 import java.util.concurrent.ThreadPoolExecutor;
     36 import java.util.concurrent.TimeUnit;
     37 import java.util.concurrent.atomic.AtomicInteger;
     38 import java.util.logging.Level;
     39 import java.util.logging.Logger;
     40 
     41 import javax.annotation.Nullable;
     42 import javax.annotation.concurrent.GuardedBy;
     43 
     44 /**
     45  * Benchmarks for {@link ExecutionList}.
     46  */
     47 @VmOptions({"-Xms3g", "-Xmx3g"})
     48 public class ExecutionListBenchmark {
     49   private static final int NUM_THREADS = 10;  // make a param?
     50 
     51   // simple interface to wrap our two implementations.
     52   interface ExecutionListWrapper {
     53     void add(Runnable runnable, Executor executor);
     54     void execute();
     55     /** Returns the underlying implementation, useful for the Footprint benchmark. */
     56     Object getImpl();
     57   }
     58 
     59   enum Impl {
     60     NEW {
     61       @Override ExecutionListWrapper newExecutionList() {
     62         return new ExecutionListWrapper() {
     63           final ExecutionList list = new ExecutionList();
     64           @Override public void add(Runnable runnable, Executor executor) {
     65             list.add(runnable, executor);
     66           }
     67 
     68           @Override public void execute() {
     69             list.execute();
     70           }
     71 
     72           @Override public Object getImpl() {
     73             return list;
     74           }
     75         };
     76       }
     77     },
     78     NEW_WITH_CAS {
     79       @Override ExecutionListWrapper newExecutionList() {
     80         return new ExecutionListWrapper() {
     81           final ExecutionListCAS list = new ExecutionListCAS();
     82           @Override public void add(Runnable runnable, Executor executor) {
     83             list.add(runnable, executor);
     84           }
     85 
     86           @Override public void execute() {
     87             list.execute();
     88           }
     89 
     90           @Override public Object getImpl() {
     91             return list;
     92           }
     93         };
     94       }
     95     },
     96     NEW_WITH_QUEUE {
     97       @Override ExecutionListWrapper newExecutionList() {
     98         return new ExecutionListWrapper() {
     99           final NewExecutionListQueue list = new NewExecutionListQueue();
    100           @Override public void add(Runnable runnable, Executor executor) {
    101             list.add(runnable, executor);
    102           }
    103 
    104           @Override public void execute() {
    105             list.execute();
    106           }
    107 
    108           @Override public Object getImpl() {
    109             return list;
    110           }
    111         };
    112       }
    113     },
    114     NEW_WITHOUT_REVERSE {
    115       @Override ExecutionListWrapper newExecutionList() {
    116         return new ExecutionListWrapper() {
    117           final NewExecutionListWithoutReverse list = new NewExecutionListWithoutReverse();
    118           @Override public void add(Runnable runnable, Executor executor) {
    119             list.add(runnable, executor);
    120           }
    121 
    122           @Override public void execute() {
    123             list.execute();
    124           }
    125 
    126           @Override public Object getImpl() {
    127             return list;
    128           }
    129         };
    130       }
    131     },
    132     OLD {
    133       @Override ExecutionListWrapper newExecutionList() {
    134         return new ExecutionListWrapper() {
    135           final OldExecutionList list = new OldExecutionList();
    136           @Override public void add(Runnable runnable, Executor executor) {
    137             list.add(runnable, executor);
    138           }
    139 
    140           @Override public void execute() {
    141             list.execute();
    142           }
    143 
    144           @Override public Object getImpl() {
    145             return list;
    146           }
    147         };
    148       }
    149     };
    150     abstract ExecutionListWrapper newExecutionList();
    151   }
    152 
    153   private ExecutorService executorService;
    154   private CountDownLatch listenerLatch;
    155   private ExecutionListWrapper list;
    156 
    157   @Param Impl impl;
    158   @Param({"1", "5", "10"}) int numListeners;
    159 
    160   private final Runnable listener = new Runnable() {
    161     @Override public void run() {
    162       listenerLatch.countDown();
    163     }
    164   };
    165 
    166   @BeforeExperiment void setUp() throws Exception {
    167     executorService = new ThreadPoolExecutor(NUM_THREADS,
    168         NUM_THREADS,
    169         Long.MAX_VALUE,
    170         TimeUnit.SECONDS,
    171         new ArrayBlockingQueue<Runnable>(1000));
    172     final AtomicInteger integer = new AtomicInteger();
    173     // Execute a bunch of tasks to ensure that our threads are allocated and hot
    174     for (int i = 0; i < NUM_THREADS * 10; i++) {
    175       executorService.submit(new Runnable() {
    176         @Override public void run() {
    177           integer.getAndIncrement();
    178         }});
    179     }
    180   }
    181 
    182   @AfterExperiment void tearDown() throws Exception {
    183     executorService.shutdown();
    184   }
    185 
    186   @Footprint(exclude = {Runnable.class, Executor.class})
    187   public Object measureSize() {
    188     list = impl.newExecutionList();
    189     for (int i = 0; i < numListeners; i++) {
    190       list.add(listener, directExecutor());
    191     }
    192     return list.getImpl();
    193   }
    194 
    195   @Benchmark int addThenExecute_singleThreaded(int reps) {
    196     int returnValue = 0;
    197     for (int i = 0; i < reps; i++) {
    198       list = impl.newExecutionList();
    199       listenerLatch = new CountDownLatch(numListeners);
    200       for (int j = 0; j < numListeners; j++) {
    201         list.add(listener, directExecutor());
    202         returnValue += listenerLatch.getCount();
    203       }
    204       list.execute();
    205       returnValue += listenerLatch.getCount();
    206     }
    207     return returnValue;
    208   }
    209 
    210   @Benchmark int executeThenAdd_singleThreaded(int reps) {
    211     int returnValue = 0;
    212     for (int i = 0; i < reps; i++) {
    213       list = impl.newExecutionList();
    214       list.execute();
    215       listenerLatch = new CountDownLatch(numListeners);
    216       for (int j = 0; j < numListeners; j++) {
    217         list.add(listener, directExecutor());
    218         returnValue += listenerLatch.getCount();
    219       }
    220       returnValue += listenerLatch.getCount();
    221     }
    222     return returnValue;
    223   }
    224 
    225   private final Runnable executeTask = new Runnable() {
    226     @Override public void run() {
    227       list.execute();
    228     }
    229   };
    230 
    231   @Benchmark int addThenExecute_multiThreaded(final int reps) throws InterruptedException {
    232     Runnable addTask = new Runnable() {
    233       @Override public void run() {
    234         for (int i = 0; i < numListeners; i++) {
    235           list.add(listener, directExecutor());
    236         }
    237       }
    238     };
    239     int returnValue = 0;
    240     for (int i = 0; i < reps; i++) {
    241       list = impl.newExecutionList();
    242       listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
    243       for (int j = 0; j < NUM_THREADS; j++) {
    244         executorService.submit(addTask);
    245       }
    246       executorService.submit(executeTask);
    247       returnValue = (int) listenerLatch.getCount();
    248       listenerLatch.await();
    249     }
    250     return returnValue;
    251   }
    252 
    253   @Benchmark int executeThenAdd_multiThreaded(final int reps) throws InterruptedException {
    254     Runnable addTask = new Runnable() {
    255       @Override public void run() {
    256         for (int i = 0; i < numListeners; i++) {
    257           list.add(listener, directExecutor());
    258         }
    259       }
    260     };
    261     int returnValue = 0;
    262     for (int i = 0; i < reps; i++) {
    263       list = impl.newExecutionList();
    264       listenerLatch = new CountDownLatch(numListeners * NUM_THREADS);
    265       executorService.submit(executeTask);
    266       for (int j = 0; j < NUM_THREADS; j++) {
    267         executorService.submit(addTask);
    268       }
    269       returnValue = (int) listenerLatch.getCount();
    270       listenerLatch.await();
    271     }
    272     return returnValue;
    273   }
    274 
    275   // This is the old implementation of ExecutionList using a LinkedList.
    276   private static final class OldExecutionList {
    277     static final Logger log = Logger.getLogger(OldExecutionList.class.getName());
    278     final Queue<OldExecutionList.RunnableExecutorPair> runnables = Lists.newLinkedList();
    279     boolean executed = false;
    280 
    281     public void add(Runnable runnable, Executor executor) {
    282       Preconditions.checkNotNull(runnable, "Runnable was null.");
    283       Preconditions.checkNotNull(executor, "Executor was null.");
    284 
    285       boolean executeImmediate = false;
    286 
    287       synchronized (runnables) {
    288         if (!executed) {
    289           runnables.add(new RunnableExecutorPair(runnable, executor));
    290         } else {
    291           executeImmediate = true;
    292         }
    293       }
    294 
    295       if (executeImmediate) {
    296         new RunnableExecutorPair(runnable, executor).execute();
    297       }
    298     }
    299 
    300     public void execute() {
    301       synchronized (runnables) {
    302         if (executed) {
    303           return;
    304         }
    305         executed = true;
    306       }
    307 
    308       while (!runnables.isEmpty()) {
    309         runnables.poll().execute();
    310       }
    311     }
    312 
    313     private static class RunnableExecutorPair {
    314       final Runnable runnable;
    315       final Executor executor;
    316 
    317       RunnableExecutorPair(Runnable runnable, Executor executor) {
    318         this.runnable = runnable;
    319         this.executor = executor;
    320       }
    321 
    322       void execute() {
    323         try {
    324           executor.execute(runnable);
    325         } catch (RuntimeException e) {
    326           log.log(Level.SEVERE, "RuntimeException while executing runnable "
    327               + runnable + " with executor " + executor, e);
    328         }
    329       }
    330     }
    331   }
    332 
    333   // A version of the execution list that doesn't reverse the stack in execute().
    334   private static final class NewExecutionListWithoutReverse {
    335     static final Logger log = Logger.getLogger(NewExecutionListWithoutReverse.class.getName());
    336 
    337     @GuardedBy("this")
    338     private RunnableExecutorPair runnables;
    339     @GuardedBy("this")
    340     private boolean executed;
    341 
    342     public void add(Runnable runnable, Executor executor) {
    343       Preconditions.checkNotNull(runnable, "Runnable was null.");
    344       Preconditions.checkNotNull(executor, "Executor was null.");
    345 
    346       synchronized (this) {
    347         if (!executed) {
    348           runnables = new RunnableExecutorPair(runnable, executor, runnables);
    349           return;
    350         }
    351       }
    352       executeListener(runnable, executor);
    353     }
    354 
    355     public void execute() {
    356       RunnableExecutorPair list;
    357       synchronized (this) {
    358         if (executed) {
    359           return;
    360         }
    361         executed = true;
    362         list = runnables;
    363         runnables = null;  // allow GC to free listeners even if this stays around for a while.
    364       }
    365       while (list != null) {
    366         executeListener(list.runnable, list.executor);
    367         list = list.next;
    368       }
    369     }
    370 
    371     private static void executeListener(Runnable runnable, Executor executor) {
    372       try {
    373         executor.execute(runnable);
    374       } catch (RuntimeException e) {
    375         log.log(Level.SEVERE, "RuntimeException while executing runnable "
    376             + runnable + " with executor " + executor, e);
    377       }
    378     }
    379 
    380     private static final class RunnableExecutorPair {
    381       final Runnable runnable;
    382       final Executor executor;
    383       @Nullable RunnableExecutorPair next;
    384 
    385       RunnableExecutorPair(Runnable runnable, Executor executor, RunnableExecutorPair next) {
    386         this.runnable = runnable;
    387         this.executor = executor;
    388         this.next = next;
    389       }
    390     }
    391   }
    392 
    393   // A version of the ExecutionList that uses an explicit tail pointer to keep the nodes in order
    394   // rather than flipping the stack in execute().
    395   private static final class NewExecutionListQueue {
    396     static final Logger log = Logger.getLogger(NewExecutionListQueue.class.getName());
    397 
    398     @GuardedBy("this")
    399     private RunnableExecutorPair head;
    400     @GuardedBy("this")
    401     private RunnableExecutorPair tail;
    402     @GuardedBy("this")
    403     private boolean executed;
    404 
    405     public void add(Runnable runnable, Executor executor) {
    406       Preconditions.checkNotNull(runnable, "Runnable was null.");
    407       Preconditions.checkNotNull(executor, "Executor was null.");
    408 
    409       synchronized (this) {
    410         if (!executed) {
    411           RunnableExecutorPair newTail = new RunnableExecutorPair(runnable, executor);
    412           if (head == null) {
    413             head = newTail;
    414             tail = newTail;
    415           } else {
    416             tail.next = newTail;
    417             tail = newTail;
    418           }
    419           return;
    420         }
    421       }
    422       executeListener(runnable, executor);
    423     }
    424 
    425     public void execute() {
    426       RunnableExecutorPair list;
    427       synchronized (this) {
    428         if (executed) {
    429           return;
    430         }
    431         executed = true;
    432         list = head;
    433         head = null;  // allow GC to free listeners even if this stays around for a while.
    434         tail = null;
    435       }
    436       while (list != null) {
    437         executeListener(list.runnable, list.executor);
    438         list = list.next;
    439       }
    440     }
    441 
    442     private static void executeListener(Runnable runnable, Executor executor) {
    443       try {
    444         executor.execute(runnable);
    445       } catch (RuntimeException e) {
    446         log.log(Level.SEVERE, "RuntimeException while executing runnable "
    447             + runnable + " with executor " + executor, e);
    448       }
    449     }
    450 
    451     private static final class RunnableExecutorPair {
    452       Runnable runnable;
    453       Executor executor;
    454       @Nullable RunnableExecutorPair next;
    455 
    456       RunnableExecutorPair(Runnable runnable, Executor executor) {
    457         this.runnable = runnable;
    458         this.executor = executor;
    459       }
    460     }
    461   }
    462 
    463   // A version of the list that uses compare and swap to manage the stack without locks.
    464   private static final class ExecutionListCAS {
    465     static final Logger log = Logger.getLogger(ExecutionListCAS.class.getName());
    466 
    467     private static final sun.misc.Unsafe UNSAFE;
    468     private static final long HEAD_OFFSET;
    469 
    470     /**
    471      * A special instance of {@link RunnableExecutorPair} that is used as a sentinel value for the
    472      * bottom of the stack.
    473      */
    474     private static final RunnableExecutorPair NULL_PAIR = new RunnableExecutorPair(null, null);
    475 
    476     static {
    477       try {
    478         UNSAFE = getUnsafe();
    479         HEAD_OFFSET = UNSAFE.objectFieldOffset(ExecutionListCAS.class.getDeclaredField("head"));
    480       } catch (Exception ex) {
    481         throw new Error(ex);
    482       }
    483     }
    484 
    485     /**
    486      * TODO(user):  This was copied verbatim from Striped64.java... standardize this?
    487      */
    488     private static sun.misc.Unsafe getUnsafe() {
    489         try {
    490             return sun.misc.Unsafe.getUnsafe();
    491         } catch (SecurityException tryReflectionInstead) {}
    492         try {
    493             return java.security.AccessController.doPrivileged
    494             (new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
    495                 @Override public sun.misc.Unsafe run() throws Exception {
    496                     Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
    497                     for (java.lang.reflect.Field f : k.getDeclaredFields()) {
    498                         f.setAccessible(true);
    499                         Object x = f.get(null);
    500                         if (k.isInstance(x))
    501                             return k.cast(x);
    502                     }
    503                     throw new NoSuchFieldError("the Unsafe");
    504                 }});
    505         } catch (java.security.PrivilegedActionException e) {
    506             throw new RuntimeException("Could not initialize intrinsics",
    507                                        e.getCause());
    508         }
    509     }
    510     private volatile RunnableExecutorPair head = NULL_PAIR;
    511 
    512     public void add(Runnable runnable, Executor executor) {
    513       Preconditions.checkNotNull(runnable, "Runnable was null.");
    514       Preconditions.checkNotNull(executor, "Executor was null.");
    515 
    516       RunnableExecutorPair newHead = new RunnableExecutorPair(runnable, executor);
    517       RunnableExecutorPair oldHead;
    518       do {
    519         oldHead = head;
    520         if (oldHead == null) {
    521           // If runnables == null then execute() has been called so we should just execute our
    522           // listener immediately.
    523           newHead.execute();
    524           return;
    525         }
    526         // Try to make newHead the new head of the stack at runnables.
    527         newHead.next = oldHead;
    528       } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, oldHead, newHead));
    529     }
    530 
    531     public void execute() {
    532       RunnableExecutorPair stack;
    533       do {
    534         stack = head;
    535         if (stack == null) {
    536           // If head == null then execute() has been called so we should just return
    537           return;
    538         }
    539         // try to swap null into head.
    540       } while (!UNSAFE.compareAndSwapObject(this, HEAD_OFFSET, stack, null));
    541 
    542       RunnableExecutorPair reversedStack = null;
    543       while (stack != NULL_PAIR) {
    544         RunnableExecutorPair head = stack;
    545         stack = stack.next;
    546         head.next = reversedStack;
    547         reversedStack = head;
    548       }
    549       stack = reversedStack;
    550       while (stack != null) {
    551         stack.execute();
    552         stack = stack.next;
    553       }
    554     }
    555 
    556     private static class RunnableExecutorPair {
    557       final Runnable runnable;
    558       final Executor executor;
    559       // Volatile because this is written on one thread and read on another with no synchronization.
    560       @Nullable volatile RunnableExecutorPair next;
    561 
    562       RunnableExecutorPair(Runnable runnable, Executor executor) {
    563         this.runnable = runnable;
    564         this.executor = executor;
    565       }
    566 
    567       void execute() {
    568         try {
    569           executor.execute(runnable);
    570         } catch (RuntimeException e) {
    571           log.log(Level.SEVERE, "RuntimeException while executing runnable "
    572               + runnable + " with executor " + executor, e);
    573         }
    574       }
    575     }
    576   }
    577 }
    578