Home | History | Annotate | Download | only in collect
      1 /*
      2  * Copyright (C) 2010 The Guava Authors
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
      5  * in compliance with the License. You may obtain a copy of the License at
      6  *
      7  * http://www.apache.org/licenses/LICENSE-2.0
      8  *
      9  * Unless required by applicable law or agreed to in writing, software distributed under the License
     10  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
     11  * or implied. See the License for the specific language governing permissions and limitations under
     12  * the License.
     13  */
     14 
     15 package com.google.common.collect;
     16 
     17 import static com.google.common.base.Preconditions.checkNotNull;
     18 import static com.google.common.base.Preconditions.checkState;
     19 
     20 import com.google.common.base.Equivalence;
     21 import com.google.common.base.Function;
     22 import com.google.common.base.Throwables;
     23 import com.google.common.collect.MapMaker.RemovalCause;
     24 import com.google.common.collect.MapMaker.RemovalListener;
     25 
     26 import java.io.IOException;
     27 import java.io.ObjectInputStream;
     28 import java.io.ObjectOutputStream;
     29 import java.io.Serializable;
     30 import java.lang.ref.ReferenceQueue;
     31 import java.util.concurrent.ConcurrentMap;
     32 import java.util.concurrent.ExecutionException;
     33 import java.util.concurrent.atomic.AtomicReferenceArray;
     34 
     35 import javax.annotation.Nullable;
     36 import javax.annotation.concurrent.GuardedBy;
     37 
     38 /**
     39  * Adds computing functionality to {@link MapMakerInternalMap}.
     40  *
     41  * @author Bob Lee
     42  * @author Charles Fry
     43  */
     44 class ComputingConcurrentHashMap<K, V> extends MapMakerInternalMap<K, V> {
     45   final Function<? super K, ? extends V> computingFunction;
     46 
     47   /**
     48    * Creates a new, empty map with the specified strategy, initial capacity, load factor and
     49    * concurrency level.
     50    */
     51   ComputingConcurrentHashMap(MapMaker builder,
     52       Function<? super K, ? extends V> computingFunction) {
     53     super(builder);
     54     this.computingFunction = checkNotNull(computingFunction);
     55   }
     56 
     57   @Override
     58   Segment<K, V> createSegment(int initialCapacity, int maxSegmentSize) {
     59     return new ComputingSegment<K, V>(this, initialCapacity, maxSegmentSize);
     60   }
     61 
     62   @Override
     63   ComputingSegment<K, V> segmentFor(int hash) {
     64     return (ComputingSegment<K, V>) super.segmentFor(hash);
     65   }
     66 
     67   V getOrCompute(K key) throws ExecutionException {
     68     int hash = hash(checkNotNull(key));
     69     return segmentFor(hash).getOrCompute(key, hash, computingFunction);
     70   }
     71 
     72   @SuppressWarnings("serial") // This class is never serialized.
     73   static final class ComputingSegment<K, V> extends Segment<K, V> {
     74     ComputingSegment(MapMakerInternalMap<K, V> map, int initialCapacity, int maxSegmentSize) {
     75       super(map, initialCapacity, maxSegmentSize);
     76     }
     77 
     78     V getOrCompute(K key, int hash, Function<? super K, ? extends V> computingFunction)
     79         throws ExecutionException {
     80       try {
     81         outer: while (true) {
     82           // don't call getLiveEntry, which would ignore computing values
     83           ReferenceEntry<K, V> e = getEntry(key, hash);
     84           if (e != null) {
     85             V value = getLiveValue(e);
     86             if (value != null) {
     87               recordRead(e);
     88               return value;
     89             }
     90           }
     91 
     92           // at this point e is either null, computing, or expired;
     93           // avoid locking if it's already computing
     94           if (e == null || !e.getValueReference().isComputingReference()) {
     95             boolean createNewEntry = true;
     96             ComputingValueReference<K, V> computingValueReference = null;
     97             lock();
     98             try {
     99               preWriteCleanup();
    100 
    101               int newCount = this.count - 1;
    102               AtomicReferenceArray<ReferenceEntry<K, V>> table = this.table;
    103               int index = hash & (table.length() - 1);
    104               ReferenceEntry<K, V> first = table.get(index);
    105 
    106               for (e = first; e != null; e = e.getNext()) {
    107                 K entryKey = e.getKey();
    108                 if (e.getHash() == hash && entryKey != null
    109                     && map.keyEquivalence.equivalent(key, entryKey)) {
    110                   ValueReference<K, V> valueReference = e.getValueReference();
    111                   if (valueReference.isComputingReference()) {
    112                     createNewEntry = false;
    113                   } else {
    114                     V value = e.getValueReference().get();
    115                     if (value == null) {
    116                       enqueueNotification(entryKey, hash, value, RemovalCause.COLLECTED);
    117                     } else if (map.expires() && map.isExpired(e)) {
    118                       // This is a duplicate check, as preWriteCleanup already purged expired
    119                       // entries, but let's accomodate an incorrect expiration queue.
    120                       enqueueNotification(entryKey, hash, value, RemovalCause.EXPIRED);
    121                     } else {
    122                       recordLockedRead(e);
    123                       return value;
    124                     }
    125 
    126                     // immediately reuse invalid entries
    127                     evictionQueue.remove(e);
    128                     expirationQueue.remove(e);
    129                     this.count = newCount; // write-volatile
    130                   }
    131                   break;
    132                 }
    133               }
    134 
    135               if (createNewEntry) {
    136                 computingValueReference = new ComputingValueReference<K, V>(computingFunction);
    137 
    138                 if (e == null) {
    139                   e = newEntry(key, hash, first);
    140                   e.setValueReference(computingValueReference);
    141                   table.set(index, e);
    142                 } else {
    143                   e.setValueReference(computingValueReference);
    144                 }
    145               }
    146             } finally {
    147               unlock();
    148               postWriteCleanup();
    149             }
    150 
    151             if (createNewEntry) {
    152               // This thread solely created the entry.
    153               return compute(key, hash, e, computingValueReference);
    154             }
    155           }
    156 
    157           // The entry already exists. Wait for the computation.
    158           checkState(!Thread.holdsLock(e), "Recursive computation");
    159           // don't consider expiration as we're concurrent with computation
    160           V value = e.getValueReference().waitForValue();
    161           if (value != null) {
    162             recordRead(e);
    163             return value;
    164           }
    165           // else computing thread will clearValue
    166           continue outer;
    167         }
    168       } finally {
    169         postReadCleanup();
    170       }
    171     }
    172 
    173     V compute(K key, int hash, ReferenceEntry<K, V> e,
    174         ComputingValueReference<K, V> computingValueReference)
    175         throws ExecutionException {
    176       V value = null;
    177       long start = System.nanoTime();
    178       long end = 0;
    179       try {
    180         // Synchronizes on the entry to allow failing fast when a recursive computation is
    181         // detected. This is not fool-proof since the entry may be copied when the segment
    182         // is written to.
    183         synchronized (e) {
    184           value = computingValueReference.compute(key, hash);
    185           end = System.nanoTime();
    186         }
    187         if (value != null) {
    188           // putIfAbsent
    189           V oldValue = put(key, hash, value, true);
    190           if (oldValue != null) {
    191             // the computed value was already clobbered
    192             enqueueNotification(key, hash, value, RemovalCause.REPLACED);
    193           }
    194         }
    195         return value;
    196       } finally {
    197         if (end == 0) {
    198           end = System.nanoTime();
    199         }
    200         if (value == null) {
    201           clearValue(key, hash, computingValueReference);
    202         }
    203       }
    204     }
    205   }
    206 
    207   /**
    208    * Used to provide computation exceptions to other threads.
    209    */
    210   private static final class ComputationExceptionReference<K, V> implements ValueReference<K, V> {
    211     final Throwable t;
    212 
    213     ComputationExceptionReference(Throwable t) {
    214       this.t = t;
    215     }
    216 
    217     @Override
    218     public V get() {
    219       return null;
    220     }
    221 
    222     @Override
    223     public ReferenceEntry<K, V> getEntry() {
    224       return null;
    225     }
    226 
    227     @Override
    228     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
    229       return this;
    230     }
    231 
    232     @Override
    233     public boolean isComputingReference() {
    234       return false;
    235     }
    236 
    237     @Override
    238     public V waitForValue() throws ExecutionException {
    239       throw new ExecutionException(t);
    240     }
    241 
    242     @Override
    243     public void clear(ValueReference<K, V> newValue) {}
    244   }
    245 
    246   /**
    247    * Used to provide computation result to other threads.
    248    */
    249   private static final class ComputedReference<K, V> implements ValueReference<K, V> {
    250     final V value;
    251 
    252     ComputedReference(@Nullable V value) {
    253       this.value = value;
    254     }
    255 
    256     @Override
    257     public V get() {
    258       return value;
    259     }
    260 
    261     @Override
    262     public ReferenceEntry<K, V> getEntry() {
    263       return null;
    264     }
    265 
    266     @Override
    267     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
    268       return this;
    269     }
    270 
    271     @Override
    272     public boolean isComputingReference() {
    273       return false;
    274     }
    275 
    276     @Override
    277     public V waitForValue() {
    278       return get();
    279     }
    280 
    281     @Override
    282     public void clear(ValueReference<K, V> newValue) {}
    283   }
    284 
    285   private static final class ComputingValueReference<K, V> implements ValueReference<K, V> {
    286     final Function<? super K, ? extends V> computingFunction;
    287 
    288     @GuardedBy("ComputingValueReference.this") // writes
    289     volatile ValueReference<K, V> computedReference = unset();
    290 
    291     public ComputingValueReference(Function<? super K, ? extends V> computingFunction) {
    292       this.computingFunction = computingFunction;
    293     }
    294 
    295     @Override
    296     public V get() {
    297       // All computation lookups go through waitForValue. This method thus is
    298       // only used by put, to whom we always want to appear absent.
    299       return null;
    300     }
    301 
    302     @Override
    303     public ReferenceEntry<K, V> getEntry() {
    304       return null;
    305     }
    306 
    307     @Override
    308     public ValueReference<K, V> copyFor(ReferenceQueue<V> queue, ReferenceEntry<K, V> entry) {
    309       return this;
    310     }
    311 
    312     @Override
    313     public boolean isComputingReference() {
    314       return true;
    315     }
    316 
    317     /**
    318      * Waits for a computation to complete. Returns the result of the computation.
    319      */
    320     @Override
    321     public V waitForValue() throws ExecutionException {
    322       if (computedReference == UNSET) {
    323         boolean interrupted = false;
    324         try {
    325           synchronized (this) {
    326             while (computedReference == UNSET) {
    327               try {
    328                 wait();
    329               } catch (InterruptedException ie) {
    330                 interrupted = true;
    331               }
    332             }
    333           }
    334         } finally {
    335           if (interrupted) {
    336             Thread.currentThread().interrupt();
    337           }
    338         }
    339       }
    340       return computedReference.waitForValue();
    341     }
    342 
    343     @Override
    344     public void clear(ValueReference<K, V> newValue) {
    345       // The pending computation was clobbered by a manual write. Unblock all
    346       // pending gets, and have them return the new value.
    347       setValueReference(newValue);
    348 
    349       // TODO(fry): could also cancel computation if we had a thread handle
    350     }
    351 
    352     V compute(K key, int hash) throws ExecutionException {
    353       V value;
    354       try {
    355         value = computingFunction.apply(key);
    356       } catch (Throwable t) {
    357         setValueReference(new ComputationExceptionReference<K, V>(t));
    358         throw new ExecutionException(t);
    359       }
    360 
    361       setValueReference(new ComputedReference<K, V>(value));
    362       return value;
    363     }
    364 
    365     void setValueReference(ValueReference<K, V> valueReference) {
    366       synchronized (this) {
    367         if (computedReference == UNSET) {
    368           computedReference = valueReference;
    369           notifyAll();
    370         }
    371       }
    372     }
    373   }
    374 
    375   /**
    376    * Overrides get() to compute on demand. Also throws an exception when {@code null} is returned
    377    * from a computation.
    378    */
    379   static final class ComputingMapAdapter<K, V>
    380       extends ComputingConcurrentHashMap<K, V> implements Serializable {
    381     private static final long serialVersionUID = 0;
    382 
    383     ComputingMapAdapter(MapMaker mapMaker,
    384         Function<? super K, ? extends V> computingFunction) {
    385       super(mapMaker, computingFunction);
    386     }
    387 
    388     @SuppressWarnings("unchecked") // unsafe, which is one advantage of Cache over Map
    389     @Override
    390     public V get(Object key) {
    391       V value;
    392       try {
    393         value = getOrCompute((K) key);
    394       } catch (ExecutionException e) {
    395         Throwable cause = e.getCause();
    396         Throwables.propagateIfInstanceOf(cause, ComputationException.class);
    397         throw new ComputationException(cause);
    398       }
    399 
    400       if (value == null) {
    401         throw new NullPointerException(computingFunction + " returned null for key " + key + ".");
    402       }
    403       return value;
    404     }
    405   }
    406 
    407   // Serialization Support
    408 
    409   private static final long serialVersionUID = 4;
    410 
    411   @Override
    412   Object writeReplace() {
    413     return new ComputingSerializationProxy<K, V>(keyStrength, valueStrength, keyEquivalence,
    414         valueEquivalence, expireAfterWriteNanos, expireAfterAccessNanos, maximumSize,
    415         concurrencyLevel, removalListener, this, computingFunction);
    416   }
    417 
    418   static final class ComputingSerializationProxy<K, V> extends AbstractSerializationProxy<K, V> {
    419 
    420     final Function<? super K, ? extends V> computingFunction;
    421 
    422     ComputingSerializationProxy(Strength keyStrength, Strength valueStrength,
    423         Equivalence<Object> keyEquivalence, Equivalence<Object> valueEquivalence,
    424         long expireAfterWriteNanos, long expireAfterAccessNanos, int maximumSize,
    425         int concurrencyLevel, RemovalListener<? super K, ? super V> removalListener,
    426         ConcurrentMap<K, V> delegate, Function<? super K, ? extends V> computingFunction) {
    427       super(keyStrength, valueStrength, keyEquivalence, valueEquivalence, expireAfterWriteNanos,
    428           expireAfterAccessNanos, maximumSize, concurrencyLevel, removalListener, delegate);
    429       this.computingFunction = computingFunction;
    430     }
    431 
    432     private void writeObject(ObjectOutputStream out) throws IOException {
    433       out.defaultWriteObject();
    434       writeMapTo(out);
    435     }
    436 
    437     @SuppressWarnings("deprecation") // self-use
    438     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
    439       in.defaultReadObject();
    440       MapMaker mapMaker = readMapMaker(in);
    441       delegate = mapMaker.makeComputingMap(computingFunction);
    442       readEntries(in);
    443     }
    444 
    445     Object readResolve() {
    446       return delegate;
    447     }
    448 
    449     private static final long serialVersionUID = 4;
    450   }
    451 }
    452