Home | History | Annotate | Download | only in concurrent
      1 /*
      2  * Copyright (C) 2011 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  * http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.google.common.util.concurrent;
     18 
     19 import com.google.common.annotations.Beta;
     20 import com.google.common.annotations.VisibleForTesting;
     21 import com.google.common.base.MoreObjects;
     22 import com.google.common.base.Preconditions;
     23 import com.google.common.base.Supplier;
     24 import com.google.common.collect.ImmutableList;
     25 import com.google.common.collect.Iterables;
     26 import com.google.common.collect.MapMaker;
     27 import com.google.common.math.IntMath;
     28 import com.google.common.primitives.Ints;
     29 
     30 import java.lang.ref.Reference;
     31 import java.lang.ref.ReferenceQueue;
     32 import java.lang.ref.WeakReference;
     33 import java.math.RoundingMode;
     34 import java.util.Arrays;
     35 import java.util.Collections;
     36 import java.util.List;
     37 import java.util.concurrent.ConcurrentMap;
     38 import java.util.concurrent.Semaphore;
     39 import java.util.concurrent.atomic.AtomicReferenceArray;
     40 import java.util.concurrent.locks.Lock;
     41 import java.util.concurrent.locks.ReadWriteLock;
     42 import java.util.concurrent.locks.ReentrantLock;
     43 import java.util.concurrent.locks.ReentrantReadWriteLock;
     44 
     45 /**
     46  * A striped {@code Lock/Semaphore/ReadWriteLock}. This offers the underlying lock striping
     47  * similar to that of {@code ConcurrentHashMap} in a reusable form, and extends it for
     48  * semaphores and read-write locks. Conceptually, lock striping is the technique of dividing a lock
     49  * into many <i>stripes</i>, increasing the granularity of a single lock and allowing independent
     50  * operations to lock different stripes and proceed concurrently, instead of creating contention
     51  * for a single lock.
     52  *
     53  * <p>The guarantee provided by this class is that equal keys lead to the same lock (or semaphore),
     54  * i.e. {@code if (key1.equals(key2))} then {@code striped.get(key1) == striped.get(key2)}
     55  * (assuming {@link Object#hashCode()} is correctly implemented for the keys). Note
     56  * that if {@code key1} is <strong>not</strong> equal to {@code key2}, it is <strong>not</strong>
     57  * guaranteed that {@code striped.get(key1) != striped.get(key2)}; the elements might nevertheless
     58  * be mapped to the same lock. The lower the number of stripes, the higher the probability of this
     59  * happening.
     60  *
     61  * <p>There are three flavors of this class: {@code Striped<Lock>}, {@code Striped<Semaphore>},
     62  * and {@code Striped<ReadWriteLock>}. For each type, two implementations are offered:
     63  * {@linkplain #lock(int) strong} and {@linkplain #lazyWeakLock(int) weak}
     64  * {@code Striped<Lock>}, {@linkplain #semaphore(int, int) strong} and {@linkplain
     65  * #lazyWeakSemaphore(int, int) weak} {@code Striped<Semaphore>}, and {@linkplain
     66  * #readWriteLock(int) strong} and {@linkplain #lazyWeakReadWriteLock(int) weak}
     67  * {@code Striped<ReadWriteLock>}. <i>Strong</i> means that all stripes (locks/semaphores) are
     68  * initialized eagerly, and are not reclaimed unless {@code Striped} itself is reclaimable.
     69  * <i>Weak</i> means that locks/semaphores are created lazily, and they are allowed to be reclaimed
     70  * if nobody is holding on to them. This is useful, for example, if one wants to create a {@code
     71  * Striped<Lock>} of many locks, but worries that in most cases only a small portion of these
     72  * would be in use.
     73  *
     74  * <p>Prior to this class, one might be tempted to use {@code Map<K, Lock>}, where {@code K}
     75  * represents the task. This maximizes concurrency by having each unique key mapped to a unique
     76  * lock, but also maximizes memory footprint. On the other extreme, one could use a single lock
     77  * for all tasks, which minimizes memory footprint but also minimizes concurrency. Instead of
     78  * choosing either of these extremes, {@code Striped} allows the user to trade between required
     79  * concurrency and memory footprint. For example, if a set of tasks are CPU-bound, one could easily
     80  * create a very compact {@code Striped<Lock>} of {@code availableProcessors() * 4} stripes,
     81  * instead of possibly thousands of locks which could be created in a {@code Map<K, Lock>}
     82  * structure.
     83  *
     84  * @author Dimitris Andreou
     85  * @since 13.0
     86  */
     87 @Beta
     88 public abstract class Striped<L> {
     89   /**
     90    * If there are at least this many stripes, we assume the memory usage of a ConcurrentMap will be
     91    * smaller than a large array.  (This assumes that in the lazy case, most stripes are unused. As
     92    * always, if many stripes are in use, a non-lazy striped makes more sense.)
     93    */
     94   private static final int LARGE_LAZY_CUTOFF = 1024;
     95 
     96   private Striped() {}
     97 
     98   /**
     99    * Returns the stripe that corresponds to the passed key. It is always guaranteed that if
    100    * {@code key1.equals(key2)}, then {@code get(key1) == get(key2)}.
    101    *
    102    * @param key an arbitrary, non-null key
    103    * @return the stripe that the passed key corresponds to
    104    */
    105   public abstract L get(Object key);
    106 
    107   /**
    108    * Returns the stripe at the specified index. Valid indexes are 0, inclusively, to
    109    * {@code size()}, exclusively.
    110    *
    111    * @param index the index of the stripe to return; must be in {@code [0...size())}
    112    * @return the stripe at the specified index
    113    */
    114   public abstract L getAt(int index);
    115 
    116   /**
    117    * Returns the index to which the given key is mapped, so that getAt(indexFor(key)) == get(key).
    118    */
    119   abstract int indexFor(Object key);
    120 
    121   /**
    122    * Returns the total number of stripes in this instance.
    123    */
    124   public abstract int size();
    125 
    126   /**
    127    * Returns the stripes that correspond to the passed objects, in ascending (as per
    128    * {@link #getAt(int)}) order. Thus, threads that use the stripes in the order returned
    129    * by this method are guaranteed to not deadlock each other.
    130    *
    131    * <p>It should be noted that using a {@code Striped<L>} with relatively few stripes, and
    132    * {@code bulkGet(keys)} with a relative large number of keys can cause an excessive number
    133    * of shared stripes (much like the birthday paradox, where much fewer than anticipated birthdays
    134    * are needed for a pair of them to match). Please consider carefully the implications of the
    135    * number of stripes, the intended concurrency level, and the typical number of keys used in a
    136    * {@code bulkGet(keys)} operation. See <a href="http://www.mathpages.com/home/kmath199.htm">Balls
    137    * in Bins model</a> for mathematical formulas that can be used to estimate the probability of
    138    * collisions.
    139    *
    140    * @param keys arbitrary non-null keys
    141    * @return the stripes corresponding to the objects (one per each object, derived by delegating
    142    *         to {@link #get(Object)}; may contain duplicates), in an increasing index order.
    143    */
    144   public Iterable<L> bulkGet(Iterable<?> keys) {
    145     // Initially using the array to store the keys, then reusing it to store the respective L's
    146     final Object[] array = Iterables.toArray(keys, Object.class);
    147     if (array.length == 0) {
    148       return ImmutableList.of();
    149     }
    150     int[] stripes = new int[array.length];
    151     for (int i = 0; i < array.length; i++) {
    152       stripes[i] = indexFor(array[i]);
    153     }
    154     Arrays.sort(stripes);
    155     // optimize for runs of identical stripes
    156     int previousStripe = stripes[0];
    157     array[0] = getAt(previousStripe);
    158     for (int i = 1; i < array.length; i++) {
    159       int currentStripe = stripes[i];
    160       if (currentStripe == previousStripe) {
    161         array[i] = array[i - 1];
    162       } else {
    163         array[i] = getAt(currentStripe);
    164         previousStripe = currentStripe;
    165       }
    166     }
    167     /*
    168      * Note that the returned Iterable holds references to the returned stripes, to avoid
    169      * error-prone code like:
    170      *
    171      * Striped<Lock> stripedLock = Striped.lazyWeakXXX(...)'
    172      * Iterable<Lock> locks = stripedLock.bulkGet(keys);
    173      * for (Lock lock : locks) {
    174      *   lock.lock();
    175      * }
    176      * operation();
    177      * for (Lock lock : locks) {
    178      *   lock.unlock();
    179      * }
    180      *
    181      * If we only held the int[] stripes, translating it on the fly to L's, the original locks
    182      * might be garbage collected after locking them, ending up in a huge mess.
    183      */
    184     @SuppressWarnings("unchecked") // we carefully replaced all keys with their respective L's
    185     List<L> asList = (List<L>) Arrays.asList(array);
    186     return Collections.unmodifiableList(asList);
    187   }
    188 
    189   // Static factories
    190 
    191   /**
    192    * Creates a {@code Striped<Lock>} with eagerly initialized, strongly referenced locks.
    193    * Every lock is reentrant.
    194    *
    195    * @param stripes the minimum number of stripes (locks) required
    196    * @return a new {@code Striped<Lock>}
    197    */
    198   public static Striped<Lock> lock(int stripes) {
    199     return new CompactStriped<Lock>(stripes, new Supplier<Lock>() {
    200       @Override public Lock get() {
    201         return new PaddedLock();
    202       }
    203     });
    204   }
    205 
    206   /**
    207    * Creates a {@code Striped<Lock>} with lazily initialized, weakly referenced locks.
    208    * Every lock is reentrant.
    209    *
    210    * @param stripes the minimum number of stripes (locks) required
    211    * @return a new {@code Striped<Lock>}
    212    */
    213   public static Striped<Lock> lazyWeakLock(int stripes) {
    214     return lazy(stripes, new Supplier<Lock>() {
    215       @Override public Lock get() {
    216         return new ReentrantLock(false);
    217       }
    218     });
    219   }
    220 
    221   private static <L> Striped<L> lazy(int stripes, Supplier<L> supplier) {
    222     return stripes < LARGE_LAZY_CUTOFF
    223         ? new SmallLazyStriped<L>(stripes, supplier)
    224         : new LargeLazyStriped<L>(stripes, supplier);
    225   }
    226 
    227   /**
    228    * Creates a {@code Striped<Semaphore>} with eagerly initialized, strongly referenced semaphores,
    229    * with the specified number of permits.
    230    *
    231    * @param stripes the minimum number of stripes (semaphores) required
    232    * @param permits the number of permits in each semaphore
    233    * @return a new {@code Striped<Semaphore>}
    234    */
    235   public static Striped<Semaphore> semaphore(int stripes, final int permits) {
    236     return new CompactStriped<Semaphore>(stripes, new Supplier<Semaphore>() {
    237       @Override public Semaphore get() {
    238         return new PaddedSemaphore(permits);
    239       }
    240     });
    241   }
    242 
    243   /**
    244    * Creates a {@code Striped<Semaphore>} with lazily initialized, weakly referenced semaphores,
    245    * with the specified number of permits.
    246    *
    247    * @param stripes the minimum number of stripes (semaphores) required
    248    * @param permits the number of permits in each semaphore
    249    * @return a new {@code Striped<Semaphore>}
    250    */
    251   public static Striped<Semaphore> lazyWeakSemaphore(int stripes, final int permits) {
    252     return lazy(stripes, new Supplier<Semaphore>() {
    253       @Override public Semaphore get() {
    254         return new Semaphore(permits, false);
    255       }
    256     });
    257   }
    258 
    259   /**
    260    * Creates a {@code Striped<ReadWriteLock>} with eagerly initialized, strongly referenced
    261    * read-write locks. Every lock is reentrant.
    262    *
    263    * @param stripes the minimum number of stripes (locks) required
    264    * @return a new {@code Striped<ReadWriteLock>}
    265    */
    266   public static Striped<ReadWriteLock> readWriteLock(int stripes) {
    267     return new CompactStriped<ReadWriteLock>(stripes, READ_WRITE_LOCK_SUPPLIER);
    268   }
    269 
    270   /**
    271    * Creates a {@code Striped<ReadWriteLock>} with lazily initialized, weakly referenced
    272    * read-write locks. Every lock is reentrant.
    273    *
    274    * @param stripes the minimum number of stripes (locks) required
    275    * @return a new {@code Striped<ReadWriteLock>}
    276    */
    277   public static Striped<ReadWriteLock> lazyWeakReadWriteLock(int stripes) {
    278     return lazy(stripes, READ_WRITE_LOCK_SUPPLIER);
    279   }
    280 
    281   // ReentrantReadWriteLock is large enough to make padding probably unnecessary
    282   private static final Supplier<ReadWriteLock> READ_WRITE_LOCK_SUPPLIER =
    283       new Supplier<ReadWriteLock>() {
    284     @Override public ReadWriteLock get() {
    285       return new ReentrantReadWriteLock();
    286     }
    287   };
    288 
    289   private abstract static class PowerOfTwoStriped<L> extends Striped<L> {
    290     /** Capacity (power of two) minus one, for fast mod evaluation */
    291     final int mask;
    292 
    293     PowerOfTwoStriped(int stripes) {
    294       Preconditions.checkArgument(stripes > 0, "Stripes must be positive");
    295       this.mask = stripes > Ints.MAX_POWER_OF_TWO ? ALL_SET : ceilToPowerOfTwo(stripes) - 1;
    296     }
    297 
    298     @Override final int indexFor(Object key) {
    299       int hash = smear(key.hashCode());
    300       return hash & mask;
    301     }
    302 
    303     @Override public final L get(Object key) {
    304       return getAt(indexFor(key));
    305     }
    306   }
    307 
    308   /**
    309    * Implementation of Striped where 2^k stripes are represented as an array of the same length,
    310    * eagerly initialized.
    311    */
    312   private static class CompactStriped<L> extends PowerOfTwoStriped<L> {
    313     /** Size is a power of two. */
    314     private final Object[] array;
    315 
    316     private CompactStriped(int stripes, Supplier<L> supplier) {
    317       super(stripes);
    318       Preconditions.checkArgument(stripes <= Ints.MAX_POWER_OF_TWO, "Stripes must be <= 2^30)");
    319 
    320       this.array = new Object[mask + 1];
    321       for (int i = 0; i < array.length; i++) {
    322         array[i] = supplier.get();
    323       }
    324     }
    325 
    326     @SuppressWarnings("unchecked") // we only put L's in the array
    327     @Override public L getAt(int index) {
    328       return (L) array[index];
    329     }
    330 
    331     @Override public int size() {
    332       return array.length;
    333     }
    334   }
    335 
    336   /**
    337    * Implementation of Striped where up to 2^k stripes can be represented, using an
    338    * AtomicReferenceArray of size 2^k. To map a user key into a stripe, we take a k-bit slice of the
    339    * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
    340    */
    341   @VisibleForTesting static class SmallLazyStriped<L> extends PowerOfTwoStriped<L> {
    342     final AtomicReferenceArray<ArrayReference<? extends L>> locks;
    343     final Supplier<L> supplier;
    344     final int size;
    345     final ReferenceQueue<L> queue = new ReferenceQueue<L>();
    346 
    347     SmallLazyStriped(int stripes, Supplier<L> supplier) {
    348       super(stripes);
    349       this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
    350       this.locks = new AtomicReferenceArray<ArrayReference<? extends L>>(size);
    351       this.supplier = supplier;
    352     }
    353 
    354     @Override public L getAt(int index) {
    355       if (size != Integer.MAX_VALUE) {
    356         Preconditions.checkElementIndex(index, size());
    357       } // else no check necessary, all index values are valid
    358       ArrayReference<? extends L> existingRef = locks.get(index);
    359       L existing = existingRef == null ? null : existingRef.get();
    360       if (existing != null) {
    361         return existing;
    362       }
    363       L created = supplier.get();
    364       ArrayReference<L> newRef = new ArrayReference<L>(created, index, queue);
    365       while (!locks.compareAndSet(index, existingRef, newRef)) {
    366         // we raced, we need to re-read and try again
    367         existingRef = locks.get(index);
    368         existing = existingRef == null ? null : existingRef.get();
    369         if (existing != null) {
    370           return existing;
    371         }
    372       }
    373       drainQueue();
    374       return created;
    375     }
    376 
    377     // N.B. Draining the queue is only necessary to ensure that we don't accumulate empty references
    378     // in the array.  We could skip this if we decide we don't care about holding on to Reference
    379     // objects indefinitely.
    380     private void drainQueue() {
    381       Reference<? extends L> ref;
    382       while ((ref = queue.poll()) != null) {
    383         // We only ever register ArrayReferences with the queue so this is always safe.
    384         ArrayReference<? extends L> arrayRef = (ArrayReference<? extends L>) ref;
    385         // Try to clear out the array slot, n.b. if we fail that is fine, in either case the
    386         // arrayRef will be out of the array after this step.
    387         locks.compareAndSet(arrayRef.index, arrayRef, null);
    388       }
    389     }
    390 
    391     @Override public int size() {
    392       return size;
    393     }
    394 
    395     private static final class ArrayReference<L> extends WeakReference<L> {
    396       final int index;
    397 
    398       ArrayReference(L referent, int index, ReferenceQueue<L> queue) {
    399         super(referent, queue);
    400         this.index = index;
    401       }
    402     }
    403   }
    404 
    405   /**
    406    * Implementation of Striped where up to 2^k stripes can be represented, using a ConcurrentMap
    407    * where the key domain is [0..2^k). To map a user key into a stripe, we take a k-bit slice of the
    408    * user key's (smeared) hashCode(). The stripes are lazily initialized and are weakly referenced.
    409    */
    410   @VisibleForTesting static class LargeLazyStriped<L> extends PowerOfTwoStriped<L> {
    411     final ConcurrentMap<Integer, L> locks;
    412     final Supplier<L> supplier;
    413     final int size;
    414 
    415     LargeLazyStriped(int stripes, Supplier<L> supplier) {
    416       super(stripes);
    417       this.size = (mask == ALL_SET) ? Integer.MAX_VALUE : mask + 1;
    418       this.supplier = supplier;
    419       this.locks = new MapMaker().weakValues().makeMap();
    420     }
    421 
    422     @Override public L getAt(int index) {
    423       if (size != Integer.MAX_VALUE) {
    424         Preconditions.checkElementIndex(index, size());
    425       } // else no check necessary, all index values are valid
    426       L existing = locks.get(index);
    427       if (existing != null) {
    428         return existing;
    429       }
    430       L created = supplier.get();
    431       existing = locks.putIfAbsent(index, created);
    432       return MoreObjects.firstNonNull(existing, created);
    433     }
    434 
    435     @Override public int size() {
    436       return size;
    437     }
    438   }
    439 
    440   /**
    441    * A bit mask were all bits are set.
    442    */
    443   private static final int ALL_SET = ~0;
    444 
    445   private static int ceilToPowerOfTwo(int x) {
    446     return 1 << IntMath.log2(x, RoundingMode.CEILING);
    447   }
    448 
    449   /*
    450    * This method was written by Doug Lea with assistance from members of JCP
    451    * JSR-166 Expert Group and released to the public domain, as explained at
    452    * http://creativecommons.org/licenses/publicdomain
    453    *
    454    * As of 2010/06/11, this method is identical to the (package private) hash
    455    * method in OpenJDK 7's java.util.HashMap class.
    456    */
    457   // Copied from java/com/google/common/collect/Hashing.java
    458   private static int smear(int hashCode) {
    459     hashCode ^= (hashCode >>> 20) ^ (hashCode >>> 12);
    460     return hashCode ^ (hashCode >>> 7) ^ (hashCode >>> 4);
    461   }
    462 
    463   private static class PaddedLock extends ReentrantLock {
    464     /*
    465      * Padding from 40 into 64 bytes, same size as cache line. Might be beneficial to add
    466      * a fourth long here, to minimize chance of interference between consecutive locks,
    467      * but I couldn't observe any benefit from that.
    468      */
    469     @SuppressWarnings("unused")
    470     long q1, q2, q3;
    471 
    472     PaddedLock() {
    473       super(false);
    474     }
    475   }
    476 
    477   private static class PaddedSemaphore extends Semaphore {
    478     // See PaddedReentrantLock comment
    479     @SuppressWarnings("unused")
    480     long q1, q2, q3;
    481 
    482     PaddedSemaphore(int permits) {
    483       super(permits, false);
    484     }
    485   }
    486 }
    487