1 /* 2 * Copyright (C) 2014 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.camera.util; 18 19 import android.os.Handler; 20 import android.util.Pair; 21 22 import com.android.camera.debug.Log.Tag; 23 24 import java.security.InvalidParameterException; 25 import java.util.ArrayList; 26 import java.util.Collections; 27 import java.util.Map; 28 import java.util.TreeMap; 29 import java.util.concurrent.Semaphore; 30 31 /** 32 * Implements a thread-safe fixed-size pool map of integers to objects such that 33 * the least element may be swapped out for a new element at any time. Elements 34 * may be temporarily "pinned" for processing in separate threads, during which 35 * they will not be swapped out. <br> 36 * This class enforces the invariant that a new element can always be swapped 37 * in. Thus, requests to pin an element for a particular task may be denied if 38 * there are not enough unpinned elements which can be removed. <br> 39 */ 40 public class ConcurrentSharedRingBuffer<E> { 41 private static final Tag TAG = new Tag("CncrrntShrdRingBuf"); 42 43 /** 44 * Callback interface for swapping elements at the head of the buffer. 45 */ 46 public static interface SwapTask<E> { 47 /** 48 * Called if the buffer is under-capacity and a new element is being 49 * added. 50 * 51 * @return the new element to add. 52 */ 53 public E create(); 54 55 /** 56 * Called if the buffer is full and an old element must be swapped out 57 * to make room for the new element. 58 * 59 * @param oldElement the element being removed from the buffer. 60 * @return the new element to add. 61 */ 62 public E swap(E oldElement); 63 64 /** 65 * Called if the buffer already has an element with the specified key. 66 * Note that the element may currently be pinned for processing by other 67 * elements. Therefore, implementations must be thread safe with respect 68 * to any other operations which may be applied to pinned tasks. 69 * 70 * @param existingElement the element to be updated. 71 */ 72 public void update(E existingElement); 73 } 74 75 /** 76 * Callback for selecting an element to pin. See 77 * {@link tryPinGreatestSelected}. 78 */ 79 public static interface Selector<E> { 80 /** 81 * @param element The element to select or not select. 82 * @return true if the element should be selected, false otherwise. 83 */ 84 public boolean select(E element); 85 } 86 87 public static interface PinStateListener { 88 /** 89 * Invoked whenever the ability to pin an element for processing 90 * changes. 91 * 92 * @param pinsAvailable If true, requests to pin elements (e.g. calls to 93 * pinGreatest()) are less-likely to fail. If false, they are 94 * more-likely to fail. 95 */ 96 public void onPinStateChange(boolean pinsAvailable); 97 } 98 99 /** 100 * Wraps E with reference counting. 101 */ 102 private static class Pinnable<E> { 103 private E mElement; 104 105 /** Reference-counting for the number of tasks holding this element. */ 106 private int mPins; 107 108 public Pinnable(E element) { 109 mElement = element; 110 mPins = 0; 111 } 112 113 public E getElement() { 114 return mElement; 115 } 116 117 private boolean isPinned() { 118 return mPins > 0; 119 } 120 } 121 122 /** Allow only one swapping operation at a time. */ 123 private final Object mSwapLock = new Object(); 124 /** 125 * Lock all transactions involving mElements, mUnpinnedElements, 126 * mCapacitySemaphore, mPinSemaphore, mClosed, mPinStateHandler, and 127 * mPinStateListener and the state of Pinnable instances. <br> 128 * TODO Replace this with a priority semaphore and allow swapLeast() 129 * operations to run faster at the expense of slower tryPin()/release() 130 * calls. 131 */ 132 private final Object mLock = new Object(); 133 /** Stores all elements. */ 134 private TreeMap<Long, Pinnable<E>> mElements; 135 /** Stores the subset of mElements which is not pinned. */ 136 private TreeMap<Long, Pinnable<E>> mUnpinnedElements; 137 /** Used to acquire space in mElements. */ 138 private final Semaphore mCapacitySemaphore; 139 /** This must be acquired while an element is pinned. */ 140 private final Semaphore mPinSemaphore; 141 private boolean mClosed = false; 142 143 private Handler mPinStateHandler = null; 144 private PinStateListener mPinStateListener = null; 145 146 /** 147 * Constructs a new ring buffer with the specified capacity. 148 * 149 * @param capacity the maximum number of elements to store. 150 */ 151 public ConcurrentSharedRingBuffer(int capacity) { 152 if (capacity <= 0) { 153 throw new IllegalArgumentException("Capacity must be positive."); 154 } 155 156 mElements = new TreeMap<Long, Pinnable<E>>(); 157 mUnpinnedElements = new TreeMap<Long, Pinnable<E>>(); 158 mCapacitySemaphore = new Semaphore(capacity); 159 // Start with -1 permits to pin elements since we must always have at 160 // least one unpinned 161 // element available to swap out as the head of the buffer. 162 mPinSemaphore = new Semaphore(-1); 163 } 164 165 /** 166 * Sets or replaces the listener. 167 * 168 * @param handler The handler on which to invoke the listener. 169 * @param listener The listener to be called whenever the ability to pin an 170 * element changes. 171 */ 172 public void setListener(Handler handler, PinStateListener listener) { 173 synchronized (mLock) { 174 mPinStateHandler = handler; 175 mPinStateListener = listener; 176 } 177 } 178 179 /** 180 * Places a new element in the ring buffer, removing the least (by key) 181 * non-pinned element if necessary. The existing element (or {@code null} if 182 * the buffer is under-capacity) is passed to {@code swapper.swap()} and the 183 * result is saved to the buffer. If an entry with {@code newKey} already 184 * exists in the ring-buffer, then {@code swapper.update()} is called and 185 * may modify the element in-place. See {@link SwapTask}. <br> 186 * Note that this method is the only way to add new elements to the buffer 187 * and will never be blocked on pinned tasks. 188 * 189 * @param newKey the key with which to store the swapped-in element. 190 * @param swapper the callback used to perform the swap. 191 * @return true if the swap was successful and the new element was saved to 192 * the buffer, false if the swap was not possible and the element 193 * was not saved to the buffer. Note that if the swap failed, 194 * {@code swapper.create()} may or may not have been invoked. 195 */ 196 public boolean swapLeast(long newKey, SwapTask<E> swapper) { 197 synchronized (mSwapLock) { 198 Pinnable<E> existingElement = null; 199 200 synchronized (mLock) { 201 if (mClosed) { 202 return false; 203 } 204 existingElement = mElements.get(newKey); 205 } 206 207 if (existingElement != null) { 208 swapper.update(existingElement.getElement()); 209 return true; 210 } 211 212 if (mCapacitySemaphore.tryAcquire()) { 213 // If we are under capacity, insert the new element and return. 214 Pinnable<E> p = new Pinnable<E>(swapper.create()); 215 216 synchronized (mLock) { 217 if (mClosed) { 218 return false; 219 } 220 221 // Add the new element and release another permit to pin 222 // allow pinning another element. 223 mElements.put(newKey, p); 224 mUnpinnedElements.put(newKey, p); 225 mPinSemaphore.release(); 226 if (mPinSemaphore.availablePermits() == 1) { 227 notifyPinStateChange(true); 228 } 229 } 230 231 return true; 232 } else { 233 Pinnable<E> toSwap; 234 235 // Note that this method must be synchronized to avoid 236 // attempting to remove more than one unpinned element at a 237 // time. 238 synchronized (mLock) { 239 if (mClosed) { 240 return false; 241 } 242 243 Map.Entry<Long, Pinnable<E>> toSwapEntry = mUnpinnedElements.pollFirstEntry(); 244 245 if (toSwapEntry == null) { 246 // We should never get here. 247 throw new RuntimeException("No unpinned element available."); 248 } 249 250 toSwap = toSwapEntry.getValue(); 251 252 // We must remove the element from both mElements and 253 // mUnpinnedElements because it must be re-added after the 254 // swap to be placed in the correct order with newKey. 255 mElements.remove(toSwapEntry.getKey()); 256 } 257 258 try { 259 toSwap.mElement = swapper.swap(toSwap.mElement); 260 } finally { 261 synchronized (mLock) { 262 if (mClosed) { 263 return false; 264 } 265 266 mElements.put(newKey, toSwap); 267 mUnpinnedElements.put(newKey, toSwap); 268 } 269 } 270 return true; 271 } 272 } 273 } 274 275 /** 276 * Attempts to pin the element with the given key and return it. <br> 277 * Note that, if a non-null pair is returned, the caller <em>must</em> call 278 * {@link #release} with the key. 279 * 280 * @return the key and object of the pinned element, if one could be pinned, 281 * or null. 282 */ 283 public Pair<Long, E> tryPin(long key) { 284 285 boolean acquiredLastPin = false; 286 Pinnable<E> entry = null; 287 288 synchronized (mLock) { 289 if (mClosed) { 290 return null; 291 } 292 293 if (mElements.isEmpty()) { 294 return null; 295 } 296 297 entry = mElements.get(key); 298 299 if (entry == null) { 300 return null; 301 } 302 303 if (entry.isPinned()) { 304 // If the element is already pinned by another task, simply 305 // increment the pin count. 306 entry.mPins++; 307 } else { 308 // We must ensure that there will still be an unpinned element 309 // after we pin this one. 310 if (mPinSemaphore.tryAcquire()) { 311 mUnpinnedElements.remove(key); 312 entry.mPins++; 313 314 acquiredLastPin = mPinSemaphore.availablePermits() <= 0; 315 } else { 316 return null; 317 } 318 } 319 } 320 321 // If we just grabbed the last permit, we must notify listeners of the 322 // pin 323 // state change. 324 if (acquiredLastPin) { 325 notifyPinStateChange(false); 326 } 327 328 return Pair.create(key, entry.getElement()); 329 } 330 331 public void release(long key) { 332 synchronized (mLock) { 333 // Note that this must proceed even if the buffer has been closed. 334 335 Pinnable<E> element = mElements.get(key); 336 337 if (element == null) { 338 throw new InvalidParameterException("No entry found for the given key."); 339 } 340 341 if (!element.isPinned()) { 342 throw new IllegalArgumentException("Calling release() with unpinned element."); 343 } 344 345 // Unpin the element 346 element.mPins--; 347 348 if (!element.isPinned()) { 349 // If there are now 0 tasks pinning this element... 350 mUnpinnedElements.put(key, element); 351 352 // Allow pinning another element. 353 mPinSemaphore.release(); 354 355 if (mPinSemaphore.availablePermits() == 1) { 356 notifyPinStateChange(true); 357 } 358 } 359 } 360 } 361 362 /** 363 * Attempts to pin the greatest element and return it. <br> 364 * Note that, if a non-null element is returned, the caller <em>must</em> 365 * call {@link #release} with the element. Furthermore, behavior is 366 * undefined if the element's {@code compareTo} behavior changes between 367 * these calls. 368 * 369 * @return the key and object of the pinned element, if one could be pinned, 370 * or null. 371 */ 372 public Pair<Long, E> tryPinGreatest() { 373 synchronized (mLock) { 374 if (mClosed) { 375 return null; 376 } 377 378 if (mElements.isEmpty()) { 379 return null; 380 } 381 382 return tryPin(mElements.lastKey()); 383 } 384 } 385 386 /** 387 * Attempts to pin the greatest element for which {@code selector} returns 388 * true. <br> 389 * 390 * @see #pinGreatest 391 */ 392 public Pair<Long, E> tryPinGreatestSelected(Selector<E> selector) { 393 // (Quickly) get the list of elements to search through. 394 ArrayList<Long> keys = new ArrayList<Long>(); 395 synchronized (mLock) { 396 if (mClosed) { 397 return null; 398 } 399 400 if (mElements.isEmpty()) { 401 return null; 402 } 403 404 keys.addAll(mElements.keySet()); 405 } 406 407 Collections.sort(keys); 408 409 // Pin each element, from greatest key to least, until we find the one 410 // we want (the element with the greatest key for which 411 // selector.selected() returns true). 412 for (int i = keys.size() - 1; i >= 0; i--) { 413 Pair<Long, E> pinnedCandidate = tryPin(keys.get(i)); 414 if (pinnedCandidate != null) { 415 boolean selected = false; 416 417 try { 418 selected = selector.select(pinnedCandidate.second); 419 } finally { 420 // Don't leak pinnedCandidate if the above select() threw an 421 // exception. 422 if (selected) { 423 return pinnedCandidate; 424 } else { 425 release(pinnedCandidate.first); 426 } 427 } 428 } 429 } 430 431 return null; 432 } 433 434 /** 435 * Removes all elements from the buffer, running {@code task} on each one, 436 * and waiting, if necessary, for all pins to be released. 437 * 438 * @param task 439 * @throws InterruptedException 440 */ 441 public void close(Task<E> task) throws InterruptedException { 442 int numPinnedElements; 443 444 // Ensure that any pending swap tasks complete before closing. 445 synchronized (mSwapLock) { 446 synchronized (mLock) { 447 mClosed = true; 448 numPinnedElements = mElements.size() - mUnpinnedElements.size(); 449 } 450 } 451 452 notifyPinStateChange(false); 453 454 // Wait for all pinned tasks to complete. 455 if (numPinnedElements > 0) { 456 mPinSemaphore.acquire(numPinnedElements); 457 } 458 459 for (Pinnable<E> element : mElements.values()) { 460 task.run(element.mElement); 461 } 462 463 mUnpinnedElements.clear(); 464 465 mElements.clear(); 466 } 467 468 private void notifyPinStateChange(final boolean pinsAvailable) { 469 synchronized (mLock) { 470 // We must synchronize on mPinStateHandler and mPinStateListener. 471 if (mPinStateHandler != null) { 472 final PinStateListener listener = mPinStateListener; 473 mPinStateHandler.post(new Runnable() { 474 @Override 475 public void run() { 476 listener.onPinStateChange(pinsAvailable); 477 } 478 }); 479 } 480 } 481 } 482 } 483