Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.camera.util;
     18 
     19 import android.os.Handler;
     20 import android.util.Pair;
     21 
     22 import com.android.camera.debug.Log.Tag;
     23 
     24 import java.security.InvalidParameterException;
     25 import java.util.ArrayList;
     26 import java.util.Collections;
     27 import java.util.Map;
     28 import java.util.TreeMap;
     29 import java.util.concurrent.Semaphore;
     30 
     31 /**
     32  * Implements a thread-safe fixed-size pool map of integers to objects such that
     33  * the least element may be swapped out for a new element at any time. Elements
     34  * may be temporarily "pinned" for processing in separate threads, during which
     35  * they will not be swapped out. <br>
     36  * This class enforces the invariant that a new element can always be swapped
     37  * in. Thus, requests to pin an element for a particular task may be denied if
     38  * there are not enough unpinned elements which can be removed. <br>
     39  */
     40 public class ConcurrentSharedRingBuffer<E> {
     41     private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
     42 
     43     /**
     44      * Callback interface for swapping elements at the head of the buffer.
     45      */
     46     public static interface SwapTask<E> {
     47         /**
     48          * Called if the buffer is under-capacity and a new element is being
     49          * added.
     50          *
     51          * @return the new element to add.
     52          */
     53         public E create();
     54 
     55         /**
     56          * Called if the buffer is full and an old element must be swapped out
     57          * to make room for the new element.
     58          *
     59          * @param oldElement the element being removed from the buffer.
     60          * @return the new element to add.
     61          */
     62         public E swap(E oldElement);
     63 
     64         /**
     65          * Called if the buffer already has an element with the specified key.
     66          * Note that the element may currently be pinned for processing by other
     67          * elements. Therefore, implementations must be thread safe with respect
     68          * to any other operations which may be applied to pinned tasks.
     69          *
     70          * @param existingElement the element to be updated.
     71          */
     72         public void update(E existingElement);
     73 
     74         /**
     75          * Returns the key of the element that the ring buffer should prefer
     76          * when considering a swapping candidate. If the returned key is not an
     77          * unpinned element then ring buffer will replace the element with least
     78          * key.
     79          *
     80          * @return a key of an existing unpinned element or a negative value.
     81          */
     82         public long getSwapKey();
     83     }
     84 
     85     /**
     86      * Callback for selecting an element to pin. See
     87      * {@link tryPinGreatestSelected}.
     88      */
     89     public static interface Selector<E> {
     90         /**
     91          * @param element The element to select or not select.
     92          * @return true if the element should be selected, false otherwise.
     93          */
     94         public boolean select(E element);
     95     }
     96 
     97     public static interface PinStateListener {
     98         /**
     99          * Invoked whenever the ability to pin an element for processing
    100          * changes.
    101          *
    102          * @param pinsAvailable If true, requests to pin elements (e.g. calls to
    103          *            pinGreatest()) are less-likely to fail. If false, they are
    104          *            more-likely to fail.
    105          */
    106         public void onPinStateChange(boolean pinsAvailable);
    107     }
    108 
    109     /**
    110      * Wraps E with reference counting.
    111      */
    112     private static class Pinnable<E> {
    113         private E mElement;
    114 
    115         /** Reference-counting for the number of tasks holding this element. */
    116         private int mPins;
    117 
    118         public Pinnable(E element) {
    119             mElement = element;
    120             mPins = 0;
    121         }
    122 
    123         public E getElement() {
    124             return mElement;
    125         }
    126 
    127         private boolean isPinned() {
    128             return mPins > 0;
    129         }
    130     }
    131 
    132     /**
    133      * A Semaphore that allows to reduce permits to negative values.
    134      */
    135     private static class NegativePermitsSemaphore extends Semaphore {
    136         public NegativePermitsSemaphore(int permits) {
    137             super(permits);
    138         }
    139 
    140         /**
    141          * Reduces the number of permits by <code>permits</code>.
    142          * <p/>
    143          * This method can only be called when number of available permits is
    144          * zero.
    145          */
    146         @Override
    147         public void reducePermits(int permits) {
    148             if (availablePermits() != 0) {
    149                 throw new IllegalStateException("Called without draining the semaphore.");
    150             }
    151             super.reducePermits(permits);
    152         }
    153     }
    154 
    155     /** Allow only one swapping operation at a time. */
    156     private final Object mSwapLock = new Object();
    157     /**
    158      * Lock all transactions involving mElements, mUnpinnedElements,
    159      * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
    160      * mPinStateListener and the state of Pinnable instances. <br>
    161      * TODO Replace this with a priority semaphore and allow swapLeast()
    162      * operations to run faster at the expense of slower tryPin()/release()
    163      * calls.
    164      */
    165     private final Object mLock = new Object();
    166     /** Stores all elements. */
    167     private TreeMap<Long, Pinnable<E>> mElements;
    168     /** Stores the subset of mElements which is not pinned. */
    169     private TreeMap<Long, Pinnable<E>> mUnpinnedElements;
    170     /** Used to acquire space in mElements. */
    171     private final Semaphore mCapacitySemaphore;
    172     /** This must be acquired while an element is pinned. */
    173     private final NegativePermitsSemaphore mPinSemaphore;
    174     private boolean mClosed = false;
    175 
    176     private Handler mPinStateHandler = null;
    177     private PinStateListener mPinStateListener = null;
    178 
    179     /**
    180      * Constructs a new ring buffer with the specified capacity.
    181      *
    182      * @param capacity the maximum number of elements to store.
    183      */
    184     public ConcurrentSharedRingBuffer(int capacity) {
    185         if (capacity <= 0) {
    186             throw new IllegalArgumentException("Capacity must be positive.");
    187         }
    188 
    189         mElements = new TreeMap<Long, Pinnable<E>>();
    190         mUnpinnedElements = new TreeMap<Long, Pinnable<E>>();
    191         mCapacitySemaphore = new Semaphore(capacity);
    192         // Start with -1 permits to pin elements since we must always have at
    193         // least one unpinned
    194         // element available to swap out as the head of the buffer.
    195         mPinSemaphore = new NegativePermitsSemaphore(-1);
    196     }
    197 
    198     /**
    199      * Sets or replaces the listener.
    200      *
    201      * @param handler The handler on which to invoke the listener.
    202      * @param listener The listener to be called whenever the ability to pin an
    203      *            element changes.
    204      */
    205     public void setListener(Handler handler, PinStateListener listener) {
    206         synchronized (mLock) {
    207             mPinStateHandler = handler;
    208             mPinStateListener = listener;
    209         }
    210     }
    211 
    212     /**
    213      * Places a new element in the ring buffer, removing the least (by key)
    214      * non-pinned element if necessary. The existing element (or {@code null} if
    215      * the buffer is under-capacity) is passed to {@code swapper.swap()} and the
    216      * result is saved to the buffer. If an entry with {@code newKey} already
    217      * exists in the ring-buffer, then {@code swapper.update()} is called and
    218      * may modify the element in-place. See {@link SwapTask}. <br>
    219      * Note that this method is the only way to add new elements to the buffer
    220      * and will never be blocked on pinned tasks.
    221      *
    222      * @param newKey the key with which to store the swapped-in element.
    223      * @param swapper the callback used to perform the swap.
    224      * @return true if the swap was successful and the new element was saved to
    225      *         the buffer, false if the swap was not possible and the element
    226      *         was not saved to the buffer. Note that if the swap failed,
    227      *         {@code swapper.create()} may or may not have been invoked.
    228      */
    229     public boolean swapLeast(long newKey, SwapTask<E> swapper) {
    230         synchronized (mSwapLock) {
    231             Pinnable<E> existingElement = null;
    232 
    233             synchronized (mLock) {
    234                 if (mClosed) {
    235                     return false;
    236                 }
    237                 existingElement = mElements.get(newKey);
    238             }
    239 
    240             if (existingElement != null) {
    241                 swapper.update(existingElement.getElement());
    242                 return true;
    243             }
    244 
    245             if (mCapacitySemaphore.tryAcquire()) {
    246                 // If we are under capacity, insert the new element and return.
    247                 Pinnable<E> p = new Pinnable<E>(swapper.create());
    248 
    249                 synchronized (mLock) {
    250                     if (mClosed) {
    251                         return false;
    252                     }
    253 
    254                     // Add the new element and release another permit to pin
    255                     // allow pinning another element.
    256                     mElements.put(newKey, p);
    257                     mUnpinnedElements.put(newKey, p);
    258                     mPinSemaphore.release();
    259                     if (mPinSemaphore.availablePermits() == 1) {
    260                         notifyPinStateChange(true);
    261                     }
    262                 }
    263 
    264                 return true;
    265             } else {
    266                 Pinnable<E> toSwap;
    267 
    268                 // Note that this method must be synchronized to avoid
    269                 // attempting to remove more than one unpinned element at a
    270                 // time.
    271                 synchronized (mLock) {
    272                     if (mClosed) {
    273                         return false;
    274                     }
    275                     Pair<Long, Pinnable<E>> toSwapEntry = null;
    276                     long swapKey = swapper.getSwapKey();
    277                     // If swapKey is same as the inserted key return early.
    278                     if (swapKey == newKey) {
    279                         return false;
    280                     }
    281 
    282                     if (mUnpinnedElements.containsKey(swapKey)) {
    283                         toSwapEntry = Pair.create(swapKey, mUnpinnedElements.remove(swapKey));
    284                     } else {
    285                         // The returned key from getSwapKey was not found in the
    286                         // unpinned elements use the least entry from the
    287                         // unpinned elements.
    288                         Map.Entry<Long, Pinnable<E>> swapEntry = mUnpinnedElements.pollFirstEntry();
    289                         if (swapEntry != null) {
    290                             toSwapEntry = Pair.create(swapEntry.getKey(), swapEntry.getValue());
    291                         }
    292                     }
    293 
    294                     if (toSwapEntry == null) {
    295                         // We can get here if no unpinned element was found.
    296                         return false;
    297                     }
    298 
    299                     toSwap = toSwapEntry.second;
    300 
    301                     // We must remove the element from both mElements and
    302                     // mUnpinnedElements because it must be re-added after the
    303                     // swap to be placed in the correct order with newKey.
    304                     mElements.remove(toSwapEntry.first);
    305                 }
    306 
    307                 try {
    308                     toSwap.mElement = swapper.swap(toSwap.mElement);
    309                 } finally {
    310                     synchronized (mLock) {
    311                         if (mClosed) {
    312                             return false;
    313                         }
    314 
    315                         mElements.put(newKey, toSwap);
    316                         mUnpinnedElements.put(newKey, toSwap);
    317                     }
    318                 }
    319                 return true;
    320             }
    321         }
    322     }
    323 
    324     /**
    325      * Attempts to pin the element with the given key and return it. <br>
    326      * Note that, if a non-null pair is returned, the caller <em>must</em> call
    327      * {@link #release} with the key.
    328      *
    329      * @return the key and object of the pinned element, if one could be pinned,
    330      *         or null.
    331      */
    332     public Pair<Long, E> tryPin(long key) {
    333 
    334         boolean acquiredLastPin = false;
    335         Pinnable<E> entry = null;
    336 
    337         synchronized (mLock) {
    338             if (mClosed) {
    339                 return null;
    340             }
    341 
    342             if (mElements.isEmpty()) {
    343                 return null;
    344             }
    345 
    346             entry = mElements.get(key);
    347 
    348             if (entry == null) {
    349                 return null;
    350             }
    351 
    352             if (entry.isPinned()) {
    353                 // If the element is already pinned by another task, simply
    354                 // increment the pin count.
    355                 entry.mPins++;
    356             } else {
    357                 // We must ensure that there will still be an unpinned element
    358                 // after we pin this one.
    359                 if (mPinSemaphore.tryAcquire()) {
    360                     mUnpinnedElements.remove(key);
    361                     entry.mPins++;
    362 
    363                     acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
    364                 } else {
    365                     return null;
    366                 }
    367             }
    368         }
    369 
    370         // If we just grabbed the last permit, we must notify listeners of the
    371         // pin
    372         // state change.
    373         if (acquiredLastPin) {
    374             notifyPinStateChange(false);
    375         }
    376 
    377         return Pair.create(key, entry.getElement());
    378     }
    379 
    380     public void release(long key) {
    381         synchronized (mLock) {
    382             // Note that this must proceed even if the buffer has been closed.
    383 
    384             Pinnable<E> element = mElements.get(key);
    385 
    386             if (element == null) {
    387                 throw new InvalidParameterException(
    388                         "No entry found for the given key: " + key + ".");
    389             }
    390 
    391             if (!element.isPinned()) {
    392                 throw new IllegalArgumentException("Calling release() with unpinned element.");
    393             }
    394 
    395             // Unpin the element
    396             element.mPins--;
    397 
    398             if (!element.isPinned()) {
    399                 // If there are now 0 tasks pinning this element...
    400                 mUnpinnedElements.put(key, element);
    401 
    402                 // Allow pinning another element.
    403                 mPinSemaphore.release();
    404 
    405                 if (mPinSemaphore.availablePermits() == 1) {
    406                     notifyPinStateChange(true);
    407                 }
    408             }
    409         }
    410     }
    411 
    412     /**
    413      * Attempts to pin the greatest element and return it. <br>
    414      * Note that, if a non-null element is returned, the caller <em>must</em>
    415      * call {@link #release} with the element. Furthermore, behavior is
    416      * undefined if the element's {@code compareTo} behavior changes between
    417      * these calls.
    418      *
    419      * @return the key and object of the pinned element, if one could be pinned,
    420      *         or null.
    421      */
    422     public Pair<Long, E> tryPinGreatest() {
    423         synchronized (mLock) {
    424             if (mClosed) {
    425                 return null;
    426             }
    427 
    428             if (mElements.isEmpty()) {
    429                 return null;
    430             }
    431 
    432             return tryPin(mElements.lastKey());
    433         }
    434     }
    435 
    436     /**
    437      * Attempts to pin the greatest element for which {@code selector} returns
    438      * true. <br>
    439      *
    440      * @see #pinGreatest
    441      */
    442     public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) {
    443         // (Quickly) get the list of elements to search through.
    444         ArrayList<Long> keys = new ArrayList<Long>();
    445         synchronized (mLock) {
    446             if (mClosed) {
    447                 return null;
    448             }
    449 
    450             if (mElements.isEmpty()) {
    451                 return null;
    452             }
    453 
    454             keys.addAll(mElements.keySet());
    455         }
    456 
    457         Collections.sort(keys);
    458 
    459         // Pin each element, from greatest key to least, until we find the one
    460         // we want (the element with the greatest key for which
    461         // selector.selected() returns true).
    462         for (int i = keys.size() - 1; i >= 0; i--) {
    463             Pair<Long, E> pinnedCandidate = tryPin(keys.get(i));
    464             if (pinnedCandidate != null) {
    465                 boolean selected = false;
    466 
    467                 try {
    468                     selected = selector.select(pinnedCandidate.second);
    469                 } finally {
    470                     // Don't leak pinnedCandidate if the above select() threw an
    471                     // exception.
    472                     if (selected) {
    473                         return pinnedCandidate;
    474                     } else {
    475                         release(pinnedCandidate.first);
    476                     }
    477                 }
    478             }
    479         }
    480 
    481         return null;
    482     }
    483 
    484     /**
    485      * Removes all elements from the buffer, running {@code task} on each one,
    486      * and waiting, if necessary, for all pins to be released.
    487      *
    488      * @param task
    489      * @throws InterruptedException
    490      */
    491     public void close(Task<E> task) throws InterruptedException {
    492         int numPinnedElements;
    493 
    494         // Ensure that any pending swap tasks complete before closing.
    495         synchronized (mSwapLock) {
    496             synchronized (mLock) {
    497                 mClosed = true;
    498                 numPinnedElements = mElements.size() - mUnpinnedElements.size();
    499             }
    500         }
    501 
    502         notifyPinStateChange(false);
    503 
    504         // Wait for all pinned tasks to complete.
    505         if (numPinnedElements > 0) {
    506             mPinSemaphore.acquire(numPinnedElements);
    507         }
    508 
    509         for (Pinnable<E> element : mElements.values()) {
    510             task.run(element.mElement);
    511             // Release the capacity permits.
    512             mCapacitySemaphore.release();
    513         }
    514 
    515         mUnpinnedElements.clear();
    516 
    517         mElements.clear();
    518     }
    519 
    520     /**
    521      * Attempts to get a pinned element for the given key.
    522      *
    523      * @param key the key of the pinned element.
    524      * @return (key, value) pair if found otherwise null.
    525      */
    526     public Pair<Long, E> tryGetPinned(long key) {
    527         synchronized (mLock) {
    528             if (mClosed) {
    529                 return null;
    530             }
    531             for (java.util.Map.Entry<Long, Pinnable<E>> element : mElements.entrySet()) {
    532                 if (element.getKey() == key) {
    533                     if (element.getValue().isPinned()) {
    534                         return Pair.create(element.getKey(), element.getValue().getElement());
    535                     } else {
    536                         return null;
    537                     }
    538                 }
    539             }
    540         }
    541         return null;
    542     }
    543 
    544     /**
    545      * Reopens previously closed buffer.
    546      * <p/>
    547      * Buffer should be closed before calling this method. If called with an
    548      * open buffer an {@link IllegalStateException} is thrown.
    549      *
    550      * @param unpinnedReservedSlotCount a non-negative integer for number of
    551      *            slots to reserve for unpinned elements. These slots can never
    552      *            be pinned and will always be available for swapping.
    553      * @throws InterruptedException
    554      */
    555     public void reopenBuffer(int unpinnedReservedSlotCount)
    556             throws InterruptedException {
    557         if (unpinnedReservedSlotCount < 0
    558                 || unpinnedReservedSlotCount >= mCapacitySemaphore.availablePermits()) {
    559             throw new IllegalArgumentException("Invalid unpinned reserved slot count: " +
    560                     unpinnedReservedSlotCount);
    561         }
    562 
    563         // Ensure that any pending swap tasks complete before closing.
    564         synchronized (mSwapLock) {
    565             synchronized (mLock) {
    566                 if (!mClosed) {
    567                     throw new IllegalStateException(
    568                             "Attempt to reopen the buffer when it is not closed.");
    569                 }
    570 
    571                 mPinSemaphore.drainPermits();
    572                 mPinSemaphore.reducePermits(unpinnedReservedSlotCount);
    573                 mClosed = false;
    574             }
    575         }
    576     }
    577 
    578     /**
    579      * Releases a pinned element for the given key.
    580      * <p/>
    581      * If element is unpinned, it is not released.
    582      *
    583      * @param key the key of the element, if the element is not present an
    584      *            {@link IllegalArgumentException} is thrown.
    585      */
    586     public void releaseIfPinned(long key) {
    587         synchronized (mLock) {
    588             Pinnable<E> element = mElements.get(key);
    589 
    590             if (element == null) {
    591                 throw new IllegalArgumentException("Invalid key." + key);
    592             }
    593 
    594             if (element.isPinned()) {
    595                 release(key);
    596             }
    597         }
    598     }
    599 
    600     /**
    601      * Releases all pinned elements in the buffer.
    602      * <p/>
    603      * Note: it only calls {@link #release(long)} only once on a pinned element.
    604      */
    605     public void releaseAll() {
    606         synchronized (mSwapLock) {
    607             synchronized (mLock) {
    608                 if (mClosed || mElements.isEmpty()
    609                         || mElements.size() == mUnpinnedElements.size()) {
    610                     return;
    611                 }
    612                 for (java.util.Map.Entry<Long, Pinnable<E>> entry : mElements.entrySet()) {
    613                     if (entry.getValue().isPinned()) {
    614                         release(entry.getKey());
    615                     }
    616                 }
    617             }
    618         }
    619     }
    620 
    621     private void notifyPinStateChange(final boolean pinsAvailable) {
    622         synchronized (mLock) {
    623             // We must synchronize on mPinStateHandler and mPinStateListener.
    624             if (mPinStateHandler != null) {
    625                 final PinStateListener listener = mPinStateListener;
    626                 mPinStateHandler.post(new Runnable() {
    627                         @Override
    628                     public void run() {
    629                         listener.onPinStateChange(pinsAvailable);
    630                     }
    631                 });
    632             }
    633         }
    634     }
    635 }
    636