Home | History | Annotate | Download | only in async
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.camera.async;
     18 
     19 import java.util.ArrayList;
     20 import java.util.List;
     21 import java.util.NoSuchElementException;
     22 import java.util.concurrent.BlockingQueue;
     23 import java.util.concurrent.LinkedBlockingQueue;
     24 import java.util.concurrent.TimeUnit;
     25 import java.util.concurrent.TimeoutException;
     26 import java.util.concurrent.atomic.AtomicBoolean;
     27 
     28 import javax.annotation.Nonnull;
     29 
     30 /**
     31  * A {@link BufferQueue} implementation useful for thread-safe producer-consumer
     32  * interactions.<br>
     33  * Unlike a regular {@link java.util.concurrent.BlockingQueue}, this allows
     34  * closing the queue from either the producer or consumer side and enables
     35  * precise accounting of objects which are never read by the consumer. Notably,
     36  * this enables cleanly shutting down producer-consumer interactions without
     37  * leaking managed resources which might otherwise be left dangling in the
     38  * queue.
     39  */
     40 public class ConcurrentBufferQueue<T> implements BufferQueue<T>, BufferQueueController<T>,
     41         SafeCloseable {
     42     /**
     43      * A callback to be invoked with all of the elements of the sequence which
     44      * are added but never retrieved via {@link #getNext}.
     45      */
     46     public static interface UnusedElementProcessor<T> {
     47         /**
     48          * Implementations should properly close the discarded element, if
     49          * necessary.
     50          */
     51         public void process(T element);
     52     }
     53 
     54     /**
     55      * An entry can either be a {@link T} or a special "poison-pill" marker
     56      * indicating that the sequence has been closed.
     57      */
     58     private static class Entry<T> {
     59         private final T mValue;
     60         private final boolean mClosing;
     61 
     62         private Entry(T value, boolean closing) {
     63             mValue = value;
     64             mClosing = closing;
     65         }
     66 
     67         public boolean isClosingMarker() {
     68             return mClosing;
     69         }
     70 
     71         public T getValue() {
     72             return mValue;
     73         }
     74     }
     75     /**
     76      * Lock used for mQueue modification and mClosed.
     77      */
     78     private final Object mLock;
     79     /**
     80      * The queue in which to store elements of the sequence as they arrive.
     81      */
     82     private final BlockingQueue<Entry<T>> mQueue;
     83     /**
     84      * Whether this sequence is closed.
     85      */
     86     private final AtomicBoolean mClosed;
     87     /**
     88      * The callback to use to process all elements which are discarded by the
     89      * queue.
     90      */
     91     private final UnusedElementProcessor<T> mUnusedElementProcessor;
     92 
     93     public ConcurrentBufferQueue(UnusedElementProcessor<T> unusedElementProcessor) {
     94         mUnusedElementProcessor = unusedElementProcessor;
     95         mLock = new Object();
     96         mQueue = new LinkedBlockingQueue<>();
     97         mClosed = new AtomicBoolean();
     98     }
     99 
    100     public ConcurrentBufferQueue() {
    101         // Instantiate with a DiscardedElementProcessor which does nothing.
    102         this(new UnusedElementProcessor<T>() {
    103             @Override
    104             public void process(T element) {
    105             }
    106         });
    107     }
    108 
    109     @Override
    110     public void close() {
    111         List<Entry<T>> remainingElements = new ArrayList<>();
    112         synchronized (mLock) {
    113             // Mark as closed so that no more threads wait in getNext().
    114             // Any additional calls to close() will return immediately.
    115             boolean alreadyClosed = mClosed.getAndSet(true);
    116             if (alreadyClosed) {
    117                 return;
    118             }
    119 
    120             mQueue.drainTo(remainingElements);
    121 
    122             // Keep feeding any currently-waiting consumer threads "poison pill"
    123             // {@link Entry}s indicating that the sequence has ended so they
    124             // wake up. When no more threads are waiting for another value from
    125             // mQueue, the call to peek() from this thread will see a value.
    126             // Note that this also ensures that there is a poison pill in the
    127             // queue
    128             // to keep waking-up any threads which manage to block in getNext()
    129             // even after marking mClosed.
    130             while (mQueue.peek() == null) {
    131                 mQueue.add(makeClosingMarker());
    132             }
    133         }
    134 
    135         for (Entry<T> entry : remainingElements) {
    136             if (!entry.isClosingMarker()) {
    137                 mUnusedElementProcessor.process(entry.getValue());
    138             }
    139         }
    140     }
    141 
    142     @Override
    143     public void update(@Nonnull T element) {
    144         boolean closed = false;
    145         synchronized (mLock) {
    146             closed = mClosed.get();
    147             if (!closed) {
    148                 mQueue.add(makeEntry(element));
    149             }
    150         }
    151         if (closed) {
    152             mUnusedElementProcessor.process(element);
    153         }
    154     }
    155 
    156     private T doWithNextEntry(Entry<T> nextEntry) throws BufferQueueClosedException {
    157         if (nextEntry.isClosingMarker()) {
    158             // Always keep a poison-pill in the queue to avoid a race condition
    159             // in which a thread reaches the mQueue.take() call after close().
    160             mQueue.add(nextEntry);
    161             throw new BufferQueueClosedException();
    162         } else {
    163             return nextEntry.getValue();
    164         }
    165     }
    166 
    167     @Override
    168     public T getNext() throws InterruptedException, BufferQueueClosedException {
    169         Entry<T> nextEntry = mQueue.take();
    170         return doWithNextEntry(nextEntry);
    171     }
    172 
    173     @Override
    174     public T getNext(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException,
    175             BufferQueueClosedException {
    176         Entry<T> nextEntry = mQueue.poll(timeout, unit);
    177         if (nextEntry == null) {
    178             throw new TimeoutException();
    179         }
    180         return doWithNextEntry(nextEntry);
    181     }
    182 
    183     @Override
    184     public T peekNext() {
    185         Entry<T> nextEntry = mQueue.peek();
    186         if (nextEntry == null) {
    187             return null;
    188         } else if (nextEntry.isClosingMarker()) {
    189             return null;
    190         } else {
    191             return nextEntry.getValue();
    192         }
    193     }
    194 
    195     @Override
    196     public void discardNext() {
    197         try {
    198             Entry<T> nextEntry = mQueue.remove();
    199             if (nextEntry.isClosingMarker()) {
    200                 // Always keep a poison-pill in the queue to avoid a race
    201                 // condition in which a thread reaches the mQueue.take() call
    202                 // after close().
    203                 mQueue.add(nextEntry);
    204             } else {
    205                 mUnusedElementProcessor.process(nextEntry.getValue());
    206             }
    207         } catch (NoSuchElementException e) {
    208             // If the queue is already empty, do nothing.
    209             return;
    210         }
    211     }
    212 
    213     @Override
    214     public boolean isClosed() {
    215         return mClosed.get();
    216     }
    217 
    218     private Entry makeEntry(T value) {
    219         return new Entry(value, false);
    220     }
    221 
    222     private Entry makeClosingMarker() {
    223         return new Entry(null, true);
    224     }
    225 }
    226