Home | History | Annotate | Download | only in imagedistributor
      1 /*
      2  * Copyright (C) 2014 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.camera.one.v2.sharedimagereader.imagedistributor;
     18 
     19 import com.android.camera.async.BufferQueue;
     20 import com.android.camera.async.BufferQueueController;
     21 import com.android.camera.debug.Log;
     22 import com.android.camera.debug.Logger;
     23 import com.android.camera.one.v2.camera2proxy.ImageProxy;
     24 
     25 import java.util.ArrayList;
     26 import java.util.HashSet;
     27 import java.util.List;
     28 import java.util.Set;
     29 
     30 import javax.annotation.ParametersAreNonnullByDefault;
     31 import javax.annotation.concurrent.GuardedBy;
     32 
     33 /**
     34  * Distributes incoming images to output {@link BufferQueueController}s
     35  * according to their timestamp.
     36  */
     37 @ParametersAreNonnullByDefault
     38 class ImageDistributorImpl implements ImageDistributor {
     39     /**
     40      * An input timestamp stream and an output image stream to receive images
     41      * with timestamps which match those found in the input stream.
     42      */
     43     private static class DispatchRecord {
     44         public final BufferQueue<Long> timestampBufferQueue;
     45         public final BufferQueueController<ImageProxy> imageStream;
     46 
     47         private DispatchRecord(BufferQueue<Long> timestampBufferQueue,
     48                 BufferQueueController<ImageProxy> imageStream) {
     49             this.timestampBufferQueue = timestampBufferQueue;
     50             this.imageStream = imageStream;
     51         }
     52     }
     53 
     54     private final Logger mLogger;
     55 
     56     /**
     57      * Contains pairs mapping {@link BufferQueue}s of timestamps of images to
     58      * the {@link BufferQueueController} to receive images with those
     59      * timestamps.
     60      */
     61     @GuardedBy("mDispatchTable")
     62     private final Set<DispatchRecord> mDispatchTable;
     63 
     64     /**
     65      * A stream to consume timestamps for all images captured by the underlying
     66      * device. This is used as a kind of clock-signal to indicate when timestamp
     67      * streams for entries in the {@link #mDispatchTable} are up-to-date.
     68      */
     69     private final BufferQueue<Long> mGlobalTimestampBufferQueue;
     70 
     71     /*
     72      * @param globalTimestampStream A stream of timestamps for every capture
     73      * processed by the underlying {@link CaptureSession}. This is used to
     74      * synchronize all of the timestamp streams associated with each added
     75      * output stream.
     76      */
     77     public ImageDistributorImpl(Logger.Factory logFactory,
     78             BufferQueue<Long> globalTimestampBufferQueue) {
     79         mLogger = logFactory.create(new Log.Tag("ImgDistributorImpl"));
     80         mGlobalTimestampBufferQueue = globalTimestampBufferQueue;
     81         mDispatchTable = new HashSet<>();
     82     }
     83 
     84     /**
     85      * Distributes the image to all added routes according to timestamp. Note
     86      * that this waits until the global timestamp stream indicates that the next
     87      * image has been captured to ensure that the timestamp streams for all
     88      * routes are up-to-date.
     89      * <p>
     90      * If interrupted, this will close the image and return.
     91      * </p>
     92      * It is assumed that incoming images will have unique, increasing
     93      * timestamps.
     94      *
     95      * @param image The image to distribute.
     96      */
     97     public void distributeImage(ImageProxy image) {
     98         final long timestamp = image.getTimestamp();
     99 
    100         // Wait until the global timestamp stream indicates that either the
    101         // *next* image has been captured, or the stream has been closed. Both
    102         // of these conditions are sufficient to guarantee that all other
    103         // timestamp streams should have an entry for the *current* image's
    104         // timestamp (if the associated image stream needs the image). Note that
    105         // this assumes that {@link #mGlobalImageTimestamp} and each timestamp
    106         // stream associated with a {@link DispatchRecord} are updated on the
    107         // same thread in order.
    108         try {
    109             while (true) {
    110                 if (mGlobalTimestampBufferQueue.getNext() > timestamp) {
    111                     break;
    112                 }
    113             }
    114         } catch (InterruptedException e) {
    115             image.close();
    116             return;
    117         } catch (BufferQueue.BufferQueueClosedException e) {
    118             // If the stream is closed, then all other timestamp streams must be
    119             // up-to-date.
    120         }
    121 
    122         List<BufferQueueController<ImageProxy>> streamsToReceiveImage = new ArrayList<>();
    123         Set<DispatchRecord> deadRecords = new HashSet<>();
    124 
    125         // mDispatchTable may be modified in {@link #addRoute} while iterating,
    126         // so to avoid unnecessary locking, make a copy to iterate over.
    127         Set<DispatchRecord> recordsToProcess;
    128         synchronized (mDispatchTable) {
    129             recordsToProcess = new HashSet<>(mDispatchTable);
    130         }
    131         for (DispatchRecord dispatchRecord : recordsToProcess) {
    132             // If either the input timestampBufferQueue or the output
    133             // imageStream is closed, then the route can be removed.
    134             if (dispatchRecord.timestampBufferQueue.isClosed() ||
    135                     dispatchRecord.imageStream.isClosed()) {
    136                 deadRecords.add(dispatchRecord);
    137             }
    138             Long requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext();
    139             while (requestedImageTimestamp != null && requestedImageTimestamp < timestamp) {
    140                 // This branch should only run if there is an error in the
    141                 // camera framework/driver. (Technically, we could get here if
    142                 // an ImageStream was not registered with the ImageDistributor
    143                 // before the image arrived, or if the timestamp stream was not
    144                 // updated appropriately. Both of these conditions would be
    145                 // serious app-level bugs, however, and are less likely
    146                 // than a framework/driver error.)
    147                 // If the current image is newer than the image requested by a
    148                 // stream in the dispatch table, then the driver must have
    149                 // skipped the requested image.
    150 
    151                 mLogger.e(String.format("Image (%d) expected, but never received!  Instead, " +
    152                         "received (%d)!  This is likely a camera driver error.",
    153                         requestedImageTimestamp, timestamp), new RuntimeException());
    154 
    155                 // TODO There may be threads blocked, waiting to receive the
    156                 // requested image.
    157                 // This should propagate the absent-image through
    158                 // dispatchRecord.imageStream to avoid starvation.
    159 
    160                 dispatchRecord.timestampBufferQueue.discardNext();
    161                 requestedImageTimestamp = dispatchRecord.timestampBufferQueue.peekNext();
    162             }
    163             if (requestedImageTimestamp == null) {
    164                 continue;
    165             }
    166             if (requestedImageTimestamp == timestamp) {
    167                 // Discard the value we just looked at.
    168                 dispatchRecord.timestampBufferQueue.discardNext();
    169                 streamsToReceiveImage.add(dispatchRecord.imageStream);
    170             }
    171         }
    172 
    173         synchronized (mDispatchTable) {
    174             mDispatchTable.removeAll(deadRecords);
    175         }
    176 
    177         int streamsToReceiveImageSize = streamsToReceiveImage.size();
    178         // If nobody needs the image, just close the image.
    179         if (streamsToReceiveImageSize == 0) {
    180             image.close();
    181             return;
    182         }
    183 
    184         RefCountedImageProxy sharedImage = new RefCountedImageProxy(image,
    185                 streamsToReceiveImageSize);
    186         for (BufferQueueController<ImageProxy> outputStream : streamsToReceiveImage) {
    187             // Wrap shared image to ensure that *each* stream must close the
    188             // image before the underlying reference count is decremented,
    189             // regardless of how many times it is closed from each stream.
    190             ImageProxy singleCloseImage = new SingleCloseImageProxy(sharedImage);
    191             outputStream.update(singleCloseImage);
    192         }
    193     }
    194 
    195     /**
    196      * Registers the given output image stream as a destination for images with
    197      * timestamps present in inputTimestampStream. Note that
    198      * inputTimestampStream is assumed to be synchronized with the global
    199      * timestamp stream such that it must always contain a timestamp for a
    200      * requested image before the global timestamp stream provides the timestamp
    201      * for the *next* image.
    202      *
    203      * @param inputTimestampBufferQueue A stream of timestamps of images to be
    204      *            routed to the given output stream. This should be closed by
    205      *            the producer when no more images are expected.
    206      * @param outputStream The image stream on which to add images.
    207      */
    208     @Override
    209     public void addRoute(BufferQueue<Long> inputTimestampBufferQueue,
    210             BufferQueueController<ImageProxy> outputStream) {
    211         synchronized (mDispatchTable) {
    212             mDispatchTable.add(new DispatchRecord(inputTimestampBufferQueue, outputStream));
    213         }
    214     }
    215 }
    216