Home | History | Annotate | Download | only in util
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.camera.util;
     18 
     19 import android.os.Handler;
     20 import android.util.Pair;
     21 
     22 import com.android.camera.debug.Log.Tag;
     23 
     24 import java.security.InvalidParameterException;
     25 import java.util.ArrayList;
     26 import java.util.Collections;
     27 import java.util.Map;
     28 import java.util.TreeMap;
     29 import java.util.concurrent.Semaphore;
     30 
     31 /**
     32  * Implements a thread-safe fixed-size pool map of integers to objects such that
     33  * the least element may be swapped out for a new element at any time. Elements
     34  * may be temporarily "pinned" for processing in separate threads, during which
     35  * they will not be swapped out. <br>
     36  * This class enforces the invariant that a new element can always be swapped
     37  * in. Thus, requests to pin an element for a particular task may be denied if
     38  * there are not enough unpinned elements which can be removed. <br>
     39  */
     40 public class ConcurrentSharedRingBuffer<E> {
     41     private static final Tag TAG = new Tag("CncrrntShrdRingBuf");
     42 
     43     /**
     44      * Callback interface for swapping elements at the head of the buffer.
     45      */
     46     public static interface SwapTask<E> {
     47         /**
     48          * Called if the buffer is under-capacity and a new element is being
     49          * added.
     50          *
     51          * @return the new element to add.
     52          */
     53         public E create();
     54 
     55         /**
     56          * Called if the buffer is full and an old element must be swapped out
     57          * to make room for the new element.
     58          *
     59          * @param oldElement the element being removed from the buffer.
     60          * @return the new element to add.
     61          */
     62         public E swap(E oldElement);
     63 
     64         /**
     65          * Called if the buffer already has an element with the specified key.
     66          * Note that the element may currently be pinned for processing by other
     67          * elements. Therefore, implementations must be thread safe with respect
     68          * to any other operations which may be applied to pinned tasks.
     69          *
     70          * @param existingElement the element to be updated.
     71          */
     72         public void update(E existingElement);
     73     }
     74 
     75     /**
     76      * Callback for selecting an element to pin. See
     77      * {@link tryPinGreatestSelected}.
     78      */
     79     public static interface Selector<E> {
     80         /**
     81          * @param element The element to select or not select.
     82          * @return true if the element should be selected, false otherwise.
     83          */
     84         public boolean select(E element);
     85     }
     86 
     87     public static interface PinStateListener {
     88         /**
     89          * Invoked whenever the ability to pin an element for processing
     90          * changes.
     91          *
     92          * @param pinsAvailable If true, requests to pin elements (e.g. calls to
     93          *            pinGreatest()) are less-likely to fail. If false, they are
     94          *            more-likely to fail.
     95          */
     96         public void onPinStateChange(boolean pinsAvailable);
     97     }
     98 
     99     /**
    100      * Wraps E with reference counting.
    101      */
    102     private static class Pinnable<E> {
    103         private E mElement;
    104 
    105         /** Reference-counting for the number of tasks holding this element. */
    106         private int mPins;
    107 
    108         public Pinnable(E element) {
    109             mElement = element;
    110             mPins = 0;
    111         }
    112 
    113         public E getElement() {
    114             return mElement;
    115         }
    116 
    117         private boolean isPinned() {
    118             return mPins > 0;
    119         }
    120     }
    121 
    122     /** Allow only one swapping operation at a time. */
    123     private final Object mSwapLock = new Object();
    124     /**
    125      * Lock all transactions involving mElements, mUnpinnedElements,
    126      * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and
    127      * mPinStateListener and the state of Pinnable instances. <br>
    128      * TODO Replace this with a priority semaphore and allow swapLeast()
    129      * operations to run faster at the expense of slower tryPin()/release()
    130      * calls.
    131      */
    132     private final Object mLock = new Object();
    133     /** Stores all elements. */
    134     private TreeMap<Long, Pinnable<E>> mElements;
    135     /** Stores the subset of mElements which is not pinned. */
    136     private TreeMap<Long, Pinnable<E>> mUnpinnedElements;
    137     /** Used to acquire space in mElements. */
    138     private final Semaphore mCapacitySemaphore;
    139     /** This must be acquired while an element is pinned. */
    140     private final Semaphore mPinSemaphore;
    141     private boolean mClosed = false;
    142 
    143     private Handler mPinStateHandler = null;
    144     private PinStateListener mPinStateListener = null;
    145 
    146     /**
    147      * Constructs a new ring buffer with the specified capacity.
    148      *
    149      * @param capacity the maximum number of elements to store.
    150      */
    151     public ConcurrentSharedRingBuffer(int capacity) {
    152         if (capacity <= 0) {
    153             throw new IllegalArgumentException("Capacity must be positive.");
    154         }
    155 
    156         mElements = new TreeMap<Long, Pinnable<E>>();
    157         mUnpinnedElements = new TreeMap<Long, Pinnable<E>>();
    158         mCapacitySemaphore = new Semaphore(capacity);
    159         // Start with -1 permits to pin elements since we must always have at
    160         // least one unpinned
    161         // element available to swap out as the head of the buffer.
    162         mPinSemaphore = new Semaphore(-1);
    163     }
    164 
    165     /**
    166      * Sets or replaces the listener.
    167      *
    168      * @param handler The handler on which to invoke the listener.
    169      * @param listener The listener to be called whenever the ability to pin an
    170      *            element changes.
    171      */
    172     public void setListener(Handler handler, PinStateListener listener) {
    173         synchronized (mLock) {
    174             mPinStateHandler = handler;
    175             mPinStateListener = listener;
    176         }
    177     }
    178 
    179     /**
    180      * Places a new element in the ring buffer, removing the least (by key)
    181      * non-pinned element if necessary. The existing element (or {@code null} if
    182      * the buffer is under-capacity) is passed to {@code swapper.swap()} and the
    183      * result is saved to the buffer. If an entry with {@code newKey} already
    184      * exists in the ring-buffer, then {@code swapper.update()} is called and
    185      * may modify the element in-place. See {@link SwapTask}. <br>
    186      * Note that this method is the only way to add new elements to the buffer
    187      * and will never be blocked on pinned tasks.
    188      *
    189      * @param newKey the key with which to store the swapped-in element.
    190      * @param swapper the callback used to perform the swap.
    191      * @return true if the swap was successful and the new element was saved to
    192      *         the buffer, false if the swap was not possible and the element
    193      *         was not saved to the buffer. Note that if the swap failed,
    194      *         {@code swapper.create()} may or may not have been invoked.
    195      */
    196     public boolean swapLeast(long newKey, SwapTask<E> swapper) {
    197         synchronized (mSwapLock) {
    198             Pinnable<E> existingElement = null;
    199 
    200             synchronized (mLock) {
    201                 if (mClosed) {
    202                     return false;
    203                 }
    204                 existingElement = mElements.get(newKey);
    205             }
    206 
    207             if (existingElement != null) {
    208                 swapper.update(existingElement.getElement());
    209                 return true;
    210             }
    211 
    212             if (mCapacitySemaphore.tryAcquire()) {
    213                 // If we are under capacity, insert the new element and return.
    214                 Pinnable<E> p = new Pinnable<E>(swapper.create());
    215 
    216                 synchronized (mLock) {
    217                     if (mClosed) {
    218                         return false;
    219                     }
    220 
    221                     // Add the new element and release another permit to pin
    222                     // allow pinning another element.
    223                     mElements.put(newKey, p);
    224                     mUnpinnedElements.put(newKey, p);
    225                     mPinSemaphore.release();
    226                     if (mPinSemaphore.availablePermits() == 1) {
    227                         notifyPinStateChange(true);
    228                     }
    229                 }
    230 
    231                 return true;
    232             } else {
    233                 Pinnable<E> toSwap;
    234 
    235                 // Note that this method must be synchronized to avoid
    236                 // attempting to remove more than one unpinned element at a
    237                 // time.
    238                 synchronized (mLock) {
    239                     if (mClosed) {
    240                         return false;
    241                     }
    242 
    243                     Map.Entry<Long, Pinnable<E>> toSwapEntry = mUnpinnedElements.pollFirstEntry();
    244 
    245                     if (toSwapEntry == null) {
    246                         // We should never get here.
    247                         throw new RuntimeException("No unpinned element available.");
    248                     }
    249 
    250                     toSwap = toSwapEntry.getValue();
    251 
    252                     // We must remove the element from both mElements and
    253                     // mUnpinnedElements because it must be re-added after the
    254                     // swap to be placed in the correct order with newKey.
    255                     mElements.remove(toSwapEntry.getKey());
    256                 }
    257 
    258                 try {
    259                     toSwap.mElement = swapper.swap(toSwap.mElement);
    260                 } finally {
    261                     synchronized (mLock) {
    262                         if (mClosed) {
    263                             return false;
    264                         }
    265 
    266                         mElements.put(newKey, toSwap);
    267                         mUnpinnedElements.put(newKey, toSwap);
    268                     }
    269                 }
    270                 return true;
    271             }
    272         }
    273     }
    274 
    275     /**
    276      * Attempts to pin the element with the given key and return it. <br>
    277      * Note that, if a non-null pair is returned, the caller <em>must</em> call
    278      * {@link #release} with the key.
    279      *
    280      * @return the key and object of the pinned element, if one could be pinned,
    281      *         or null.
    282      */
    283     public Pair<Long, E> tryPin(long key) {
    284 
    285         boolean acquiredLastPin = false;
    286         Pinnable<E> entry = null;
    287 
    288         synchronized (mLock) {
    289             if (mClosed) {
    290                 return null;
    291             }
    292 
    293             if (mElements.isEmpty()) {
    294                 return null;
    295             }
    296 
    297             entry = mElements.get(key);
    298 
    299             if (entry == null) {
    300                 return null;
    301             }
    302 
    303             if (entry.isPinned()) {
    304                 // If the element is already pinned by another task, simply
    305                 // increment the pin count.
    306                 entry.mPins++;
    307             } else {
    308                 // We must ensure that there will still be an unpinned element
    309                 // after we pin this one.
    310                 if (mPinSemaphore.tryAcquire()) {
    311                     mUnpinnedElements.remove(key);
    312                     entry.mPins++;
    313 
    314                     acquiredLastPin = mPinSemaphore.availablePermits() <= 0;
    315                 } else {
    316                     return null;
    317                 }
    318             }
    319         }
    320 
    321         // If we just grabbed the last permit, we must notify listeners of the
    322         // pin
    323         // state change.
    324         if (acquiredLastPin) {
    325             notifyPinStateChange(false);
    326         }
    327 
    328         return Pair.create(key, entry.getElement());
    329     }
    330 
    331     public void release(long key) {
    332         synchronized (mLock) {
    333             // Note that this must proceed even if the buffer has been closed.
    334 
    335             Pinnable<E> element = mElements.get(key);
    336 
    337             if (element == null) {
    338                 throw new InvalidParameterException("No entry found for the given key.");
    339             }
    340 
    341             if (!element.isPinned()) {
    342                 throw new IllegalArgumentException("Calling release() with unpinned element.");
    343             }
    344 
    345             // Unpin the element
    346             element.mPins--;
    347 
    348             if (!element.isPinned()) {
    349                 // If there are now 0 tasks pinning this element...
    350                 mUnpinnedElements.put(key, element);
    351 
    352                 // Allow pinning another element.
    353                 mPinSemaphore.release();
    354 
    355                 if (mPinSemaphore.availablePermits() == 1) {
    356                     notifyPinStateChange(true);
    357                 }
    358             }
    359         }
    360     }
    361 
    362     /**
    363      * Attempts to pin the greatest element and return it. <br>
    364      * Note that, if a non-null element is returned, the caller <em>must</em>
    365      * call {@link #release} with the element. Furthermore, behavior is
    366      * undefined if the element's {@code compareTo} behavior changes between
    367      * these calls.
    368      *
    369      * @return the key and object of the pinned element, if one could be pinned,
    370      *         or null.
    371      */
    372     public Pair<Long, E> tryPinGreatest() {
    373         synchronized (mLock) {
    374             if (mClosed) {
    375                 return null;
    376             }
    377 
    378             if (mElements.isEmpty()) {
    379                 return null;
    380             }
    381 
    382             return tryPin(mElements.lastKey());
    383         }
    384     }
    385 
    386     /**
    387      * Attempts to pin the greatest element for which {@code selector} returns
    388      * true. <br>
    389      *
    390      * @see #pinGreatest
    391      */
    392     public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) {
    393         // (Quickly) get the list of elements to search through.
    394         ArrayList<Long> keys = new ArrayList<Long>();
    395         synchronized (mLock) {
    396             if (mClosed) {
    397                 return null;
    398             }
    399 
    400             if (mElements.isEmpty()) {
    401                 return null;
    402             }
    403 
    404             keys.addAll(mElements.keySet());
    405         }
    406 
    407         Collections.sort(keys);
    408 
    409         // Pin each element, from greatest key to least, until we find the one
    410         // we want (the element with the greatest key for which
    411         // selector.selected() returns true).
    412         for (int i = keys.size() - 1; i >= 0; i--) {
    413             Pair<Long, E> pinnedCandidate = tryPin(keys.get(i));
    414             if (pinnedCandidate != null) {
    415                 boolean selected = false;
    416 
    417                 try {
    418                     selected = selector.select(pinnedCandidate.second);
    419                 } finally {
    420                     // Don't leak pinnedCandidate if the above select() threw an
    421                     // exception.
    422                     if (selected) {
    423                         return pinnedCandidate;
    424                     } else {
    425                         release(pinnedCandidate.first);
    426                     }
    427                 }
    428             }
    429         }
    430 
    431         return null;
    432     }
    433 
    434     /**
    435      * Removes all elements from the buffer, running {@code task} on each one,
    436      * and waiting, if necessary, for all pins to be released.
    437      *
    438      * @param task
    439      * @throws InterruptedException
    440      */
    441     public void close(Task<E> task) throws InterruptedException {
    442         int numPinnedElements;
    443 
    444         // Ensure that any pending swap tasks complete before closing.
    445         synchronized (mSwapLock) {
    446             synchronized (mLock) {
    447                 mClosed = true;
    448                 numPinnedElements = mElements.size() - mUnpinnedElements.size();
    449             }
    450         }
    451 
    452         notifyPinStateChange(false);
    453 
    454         // Wait for all pinned tasks to complete.
    455         if (numPinnedElements > 0) {
    456             mPinSemaphore.acquire(numPinnedElements);
    457         }
    458 
    459         for (Pinnable<E> element : mElements.values()) {
    460             task.run(element.mElement);
    461         }
    462 
    463         mUnpinnedElements.clear();
    464 
    465         mElements.clear();
    466     }
    467 
    468     private void notifyPinStateChange(final boolean pinsAvailable) {
    469         synchronized (mLock) {
    470             // We must synchronize on mPinStateHandler and mPinStateListener.
    471             if (mPinStateHandler != null) {
    472                 final PinStateListener listener = mPinStateListener;
    473                 mPinStateHandler.post(new Runnable() {
    474                         @Override
    475                     public void run() {
    476                         listener.onPinStateChange(pinsAvailable);
    477                     }
    478                 });
    479             }
    480         }
    481     }
    482 }
    483