Home | History | Annotate | Download | only in buffer
      1 /*
      2  * Copyright (C) 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 package com.android.tv.tuner.exoplayer.buffer;
     18 
     19 import android.media.MediaCodec;
     20 import android.os.ConditionVariable;
     21 import android.os.Handler;
     22 import android.os.HandlerThread;
     23 import android.os.Message;
     24 import android.util.ArraySet;
     25 import android.util.Log;
     26 import android.util.Pair;
     27 
     28 import com.google.android.exoplayer.MediaFormat;
     29 import com.google.android.exoplayer.SampleHolder;
     30 import com.google.android.exoplayer.util.MimeTypes;
     31 import com.android.tv.common.SoftPreconditions;
     32 import com.android.tv.tuner.exoplayer.buffer.RecordingSampleBuffer.BufferReason;
     33 
     34 import java.io.IOException;
     35 import java.util.LinkedList;
     36 import java.util.List;
     37 import java.util.Set;
     38 import java.util.concurrent.ConcurrentLinkedQueue;
     39 
     40 /**
     41  * Handles all {@link SampleChunk} I/O operations.
     42  * An I/O dedicated thread handles all I/O operations for synchronization.
     43  */
     44 public class SampleChunkIoHelper implements Handler.Callback {
     45     private static final String TAG = "SampleChunkIoHelper";
     46 
     47     private static final int MAX_READ_BUFFER_SAMPLES = 3;
     48     private static final int READ_RESCHEDULING_DELAY_MS = 10;
     49 
     50     private static final int MSG_OPEN_READ = 1;
     51     private static final int MSG_OPEN_WRITE = 2;
     52     private static final int MSG_CLOSE_READ = 3;
     53     private static final int MSG_CLOSE_WRITE = 4;
     54     private static final int MSG_READ = 5;
     55     private static final int MSG_WRITE = 6;
     56     private static final int MSG_RELEASE = 7;
     57 
     58     private final long mSampleChunkDurationUs;
     59     private final int mTrackCount;
     60     private final List<String> mIds;
     61     private final List<MediaFormat> mMediaFormats;
     62     private final @BufferReason int mBufferReason;
     63     private final BufferManager mBufferManager;
     64     private final SamplePool mSamplePool;
     65     private final IoCallback mIoCallback;
     66 
     67     private Handler mIoHandler;
     68     private final ConcurrentLinkedQueue<SampleHolder> mReadSampleBuffers[];
     69     private final ConcurrentLinkedQueue<SampleHolder> mHandlerReadSampleBuffers[];
     70     private final long[] mWriteIndexEndPositionUs;
     71     private final long[] mWriteChunkEndPositionUs;
     72     private final SampleChunk.IoState[] mReadIoStates;
     73     private final SampleChunk.IoState[] mWriteIoStates;
     74     private final Set<Integer> mSelectedTracks = new ArraySet<>();
     75     private long mBufferDurationUs = 0;
     76     private boolean mWriteEnded;
     77     private boolean mErrorNotified;
     78     private boolean mFinished;
     79 
     80     /**
     81      * A Callback for I/O events.
     82      */
     83     public static abstract class IoCallback {
     84 
     85         /**
     86          * Called when there is no sample to read.
     87          */
     88         public void onIoReachedEos() {
     89         }
     90 
     91         /**
     92          * Called when there is an irrecoverable error during I/O.
     93          */
     94         public void onIoError() {
     95         }
     96     }
     97 
     98     private class IoParams {
     99         private final int index;
    100         private final long positionUs;
    101         private final SampleHolder sample;
    102         private final ConditionVariable conditionVariable;
    103         private final ConcurrentLinkedQueue<SampleHolder> readSampleBuffer;
    104 
    105         private IoParams(int index, long positionUs, SampleHolder sample,
    106                 ConditionVariable conditionVariable,
    107                 ConcurrentLinkedQueue<SampleHolder> readSampleBuffer) {
    108             this.index = index;
    109             this.positionUs = positionUs;
    110             this.sample = sample;
    111             this.conditionVariable = conditionVariable;
    112             this.readSampleBuffer = readSampleBuffer;
    113         }
    114     }
    115 
    116     /**
    117      * Creates {@link SampleChunk} I/O handler.
    118      *
    119      * @param ids track names
    120      * @param mediaFormats {@link android.media.MediaFormat} for each track
    121      * @param bufferReason reason to be buffered
    122      * @param bufferManager manager of {@link SampleChunk} collections
    123      * @param samplePool allocator for a sample
    124      * @param ioCallback listeners for I/O events
    125      */
    126     public SampleChunkIoHelper(List<String> ids, List<MediaFormat> mediaFormats,
    127             @BufferReason int bufferReason, BufferManager bufferManager, SamplePool samplePool,
    128             IoCallback ioCallback) {
    129         mTrackCount = ids.size();
    130         mIds = ids;
    131         mMediaFormats = mediaFormats;
    132         mBufferReason = bufferReason;
    133         mBufferManager = bufferManager;
    134         mSamplePool = samplePool;
    135         mIoCallback = ioCallback;
    136 
    137         mReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
    138         mHandlerReadSampleBuffers = new ConcurrentLinkedQueue[mTrackCount];
    139         mWriteIndexEndPositionUs = new long[mTrackCount];
    140         mWriteChunkEndPositionUs = new long[mTrackCount];
    141         mReadIoStates = new SampleChunk.IoState[mTrackCount];
    142         mWriteIoStates = new SampleChunk.IoState[mTrackCount];
    143 
    144         // Small chunk duration for live playback will give more fine grained storage usage
    145         // and eviction handling for trickplay.
    146         mSampleChunkDurationUs =
    147                 bufferReason == RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK ?
    148                         RecordingSampleBuffer.MIN_SEEK_DURATION_US :
    149                         RecordingSampleBuffer.RECORDING_CHUNK_DURATION_US;
    150         for (int i = 0; i < mTrackCount; ++i) {
    151             mWriteIndexEndPositionUs[i] = RecordingSampleBuffer.MIN_SEEK_DURATION_US;
    152             mWriteChunkEndPositionUs[i] = mSampleChunkDurationUs;
    153             mReadIoStates[i] = new SampleChunk.IoState();
    154             mWriteIoStates[i] = new SampleChunk.IoState();
    155         }
    156     }
    157 
    158     /**
    159      * Prepares and initializes for I/O operations.
    160      *
    161      * @throws IOException
    162      */
    163     public void init() throws IOException {
    164         HandlerThread handlerThread = new HandlerThread(TAG);
    165         handlerThread.start();
    166         mIoHandler = new Handler(handlerThread.getLooper(), this);
    167         if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDED_PLAYBACK) {
    168             for (int i = 0; i < mTrackCount; ++i) {
    169                 mBufferManager.loadTrackFromStorage(mIds.get(i), mSamplePool);
    170             }
    171             mWriteEnded = true;
    172         } else {
    173             for (int i = 0; i < mTrackCount; ++i) {
    174                 mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_WRITE, i));
    175             }
    176         }
    177     }
    178 
    179     /**
    180      * Reads a sample if it is available.
    181      *
    182      * @param index track index
    183      * @return {@code null} if a sample is not available, otherwise returns a sample
    184      */
    185     public SampleHolder readSample(int index) {
    186         SampleHolder sample = mReadSampleBuffers[index].poll();
    187         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
    188         return sample;
    189     }
    190 
    191     /**
    192      * Writes a sample.
    193      *
    194      * @param index track index
    195      * @param sample to write
    196      * @param conditionVariable which will be wait until the write is finished
    197      * @throws IOException
    198      */
    199     public void writeSample(int index, SampleHolder sample,
    200             ConditionVariable conditionVariable) throws IOException {
    201         if (mErrorNotified) {
    202             throw new IOException("Storage I/O error happened");
    203         }
    204         conditionVariable.close();
    205         IoParams params = new IoParams(index, 0, sample, conditionVariable, null);
    206         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_WRITE, params));
    207     }
    208 
    209     /**
    210      * Starts read from the specified position.
    211      *
    212      * @param index track index
    213      * @param positionUs the specified position
    214      */
    215     public void openRead(int index, long positionUs) {
    216         // Old mReadSampleBuffers may have a pending read.
    217         mReadSampleBuffers[index] = new ConcurrentLinkedQueue<>();
    218         IoParams params = new IoParams(index, positionUs, null, null, mReadSampleBuffers[index]);
    219         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_OPEN_READ, params));
    220     }
    221 
    222     /**
    223      * Closes read from the specified track.
    224      *
    225      * @param index track index
    226      */
    227     public void closeRead(int index) {
    228         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_CLOSE_READ, index));
    229     }
    230 
    231     /**
    232      * Notifies writes are finished.
    233      */
    234     public void closeWrite() {
    235         mIoHandler.sendEmptyMessage(MSG_CLOSE_WRITE);
    236     }
    237 
    238     /**
    239      * Finishes I/O operations and releases all the resources.
    240      * @throws IOException
    241      */
    242     public void release() throws IOException {
    243         if (mIoHandler == null) {
    244             return;
    245         }
    246         // Finishes all I/O operations.
    247         ConditionVariable conditionVariable = new ConditionVariable();
    248         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_RELEASE, conditionVariable));
    249         conditionVariable.block();
    250 
    251         for (int i = 0; i < mTrackCount; ++i) {
    252             mBufferManager.unregisterChunkEvictedListener(mIds.get(i));
    253         }
    254         try {
    255             if (mBufferReason == RecordingSampleBuffer.BUFFER_REASON_RECORDING && mTrackCount > 0) {
    256                 // Saves meta information for recording.
    257                 List<BufferManager.TrackFormat> audios = new LinkedList<>();
    258                 List<BufferManager.TrackFormat> videos = new LinkedList<>();
    259                 for (int i = 0; i < mTrackCount; ++i) {
    260                     android.media.MediaFormat format =
    261                             mMediaFormats.get(i).getFrameworkMediaFormatV16();
    262                     format.setLong(android.media.MediaFormat.KEY_DURATION, mBufferDurationUs);
    263                     if (MimeTypes.isAudio(mMediaFormats.get(i).mimeType)) {
    264                         audios.add(new BufferManager.TrackFormat(mIds.get(i), format));
    265                     } else if (MimeTypes.isVideo(mMediaFormats.get(i).mimeType)) {
    266                         videos.add(new BufferManager.TrackFormat(mIds.get(i), format));
    267                     }
    268                 }
    269                 mBufferManager.writeMetaFiles(audios, videos);
    270             }
    271         } finally {
    272             mBufferManager.release();
    273             mIoHandler.getLooper().quitSafely();
    274         }
    275     }
    276 
    277     @Override
    278     public boolean handleMessage(Message message) {
    279         if (mFinished) {
    280             return true;
    281         }
    282         releaseEvictedChunks();
    283         try {
    284             switch (message.what) {
    285                 case MSG_OPEN_READ:
    286                     doOpenRead((IoParams) message.obj);
    287                     return true;
    288                 case MSG_OPEN_WRITE:
    289                     doOpenWrite((int) message.obj);
    290                     return true;
    291                 case MSG_CLOSE_READ:
    292                     doCloseRead((int) message.obj);
    293                     return true;
    294                 case MSG_CLOSE_WRITE:
    295                     doCloseWrite();
    296                     return true;
    297                 case MSG_READ:
    298                     doRead((int) message.obj);
    299                     return true;
    300                 case MSG_WRITE:
    301                     doWrite((IoParams) message.obj);
    302                     // Since only write will increase storage, eviction will be handled here.
    303                     return true;
    304                 case MSG_RELEASE:
    305                     doRelease((ConditionVariable) message.obj);
    306                     return true;
    307             }
    308         } catch (IOException e) {
    309             mIoCallback.onIoError();
    310             mErrorNotified = true;
    311             Log.e(TAG, "IoException happened", e);
    312             return true;
    313         }
    314         return false;
    315     }
    316 
    317     private void doOpenRead(IoParams params) throws IOException {
    318         int index = params.index;
    319         mIoHandler.removeMessages(MSG_READ, index);
    320         Pair<SampleChunk, Integer> readPosition =
    321                 mBufferManager.getReadFile(mIds.get(index), params.positionUs);
    322         if (readPosition == null) {
    323             String errorMessage = "Chunk ID:" + mIds.get(index) + " pos:" + params.positionUs
    324                     + "is not found";
    325             SoftPreconditions.checkNotNull(readPosition, TAG, errorMessage);
    326             throw new IOException(errorMessage);
    327         }
    328         mSelectedTracks.add(index);
    329         mReadIoStates[index].openRead(readPosition.first, (long) readPosition.second);
    330         if (mHandlerReadSampleBuffers[index] != null) {
    331             SampleHolder sample;
    332             while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) {
    333                 mSamplePool.releaseSample(sample);
    334             }
    335         }
    336         mHandlerReadSampleBuffers[index] = params.readSampleBuffer;
    337         mIoHandler.sendMessage(mIoHandler.obtainMessage(MSG_READ, index));
    338     }
    339 
    340     private void doOpenWrite(int index) throws IOException {
    341         SampleChunk chunk = mBufferManager.createNewWriteFileIfNeeded(mIds.get(index), 0,
    342                 mSamplePool, null, 0);
    343         mWriteIoStates[index].openWrite(chunk);
    344     }
    345 
    346     private void doCloseRead(int index) {
    347         mSelectedTracks.remove(index);
    348         if (mHandlerReadSampleBuffers[index] != null) {
    349             SampleHolder sample;
    350             while ((sample = mHandlerReadSampleBuffers[index].poll()) != null) {
    351                 mSamplePool.releaseSample(sample);
    352             }
    353         }
    354         mIoHandler.removeMessages(MSG_READ, index);
    355     }
    356 
    357     private void doRead(int index) throws IOException {
    358         mIoHandler.removeMessages(MSG_READ, index);
    359         if (mHandlerReadSampleBuffers[index].size() >= MAX_READ_BUFFER_SAMPLES) {
    360             // If enough samples are buffered, try again few moments later hoping that
    361             // buffered samples are consumed.
    362             mIoHandler.sendMessageDelayed(
    363                     mIoHandler.obtainMessage(MSG_READ, index), READ_RESCHEDULING_DELAY_MS);
    364         } else {
    365             if (mReadIoStates[index].isReadFinished()) {
    366                 for (int i = 0; i < mTrackCount; ++i) {
    367                     if (!mReadIoStates[i].isReadFinished()) {
    368                         return;
    369                     }
    370                 }
    371                 mIoCallback.onIoReachedEos();
    372                 return;
    373             }
    374             SampleHolder sample = mReadIoStates[index].read();
    375             if (sample != null) {
    376                 mHandlerReadSampleBuffers[index].offer(sample);
    377             } else {
    378                 // Read reached write but write is not finished yet --- wait a few moments to
    379                 // see if another sample is written.
    380                 mIoHandler.sendMessageDelayed(
    381                         mIoHandler.obtainMessage(MSG_READ, index),
    382                         READ_RESCHEDULING_DELAY_MS);
    383             }
    384         }
    385     }
    386 
    387     private void doWrite(IoParams params) throws IOException {
    388         try {
    389             if (mWriteEnded) {
    390                 SoftPreconditions.checkState(false);
    391                 return;
    392             }
    393             int index = params.index;
    394             SampleHolder sample = params.sample;
    395             SampleChunk nextChunk = null;
    396             if ((sample.flags & MediaCodec.BUFFER_FLAG_KEY_FRAME) != 0) {
    397                 if (sample.timeUs > mBufferDurationUs) {
    398                     mBufferDurationUs = sample.timeUs;
    399                 }
    400                 if (sample.timeUs >= mWriteIndexEndPositionUs[index]) {
    401                     SampleChunk currentChunk = sample.timeUs >= mWriteChunkEndPositionUs[index] ?
    402                             null : mWriteIoStates[params.index].getChunk();
    403                     int currentOffset = (int) mWriteIoStates[params.index].getOffset();
    404                     nextChunk = mBufferManager.createNewWriteFileIfNeeded(
    405                             mIds.get(index), mWriteIndexEndPositionUs[index], mSamplePool,
    406                             currentChunk, currentOffset);
    407                     mWriteIndexEndPositionUs[index] =
    408                             ((sample.timeUs / RecordingSampleBuffer.MIN_SEEK_DURATION_US) + 1) *
    409                                     RecordingSampleBuffer.MIN_SEEK_DURATION_US;
    410                     if (nextChunk != null) {
    411                         mWriteChunkEndPositionUs[index] =
    412                                 ((sample.timeUs / mSampleChunkDurationUs) + 1)
    413                                         * mSampleChunkDurationUs;
    414                     }
    415                 }
    416             }
    417             mWriteIoStates[params.index].write(params.sample, nextChunk);
    418         } finally {
    419             params.conditionVariable.open();
    420         }
    421     }
    422 
    423     private void doCloseWrite() throws IOException {
    424         if (mWriteEnded) {
    425             return;
    426         }
    427         mWriteEnded = true;
    428         boolean readFinished = true;
    429         for (int i = 0; i < mTrackCount; ++i) {
    430             readFinished = readFinished && mReadIoStates[i].isReadFinished();
    431             mWriteIoStates[i].closeWrite();
    432         }
    433         if (readFinished) {
    434             mIoCallback.onIoReachedEos();
    435         }
    436     }
    437 
    438     private void doRelease(ConditionVariable conditionVariable) {
    439         mIoHandler.removeCallbacksAndMessages(null);
    440         mFinished = true;
    441         conditionVariable.open();
    442         mSelectedTracks.clear();
    443     }
    444 
    445     private void releaseEvictedChunks() {
    446         if (mBufferReason != RecordingSampleBuffer.BUFFER_REASON_LIVE_PLAYBACK
    447                 || mSelectedTracks.isEmpty()) {
    448             return;
    449         }
    450         long currentStartPositionUs = Long.MAX_VALUE;
    451         for (int trackIndex : mSelectedTracks) {
    452             currentStartPositionUs = Math.min(currentStartPositionUs,
    453                     mReadIoStates[trackIndex].getStartPositionUs());
    454         }
    455         for (int i = 0; i < mTrackCount; ++i) {
    456             long evictEndPositionUs = Math.min(mBufferManager.getStartPositionUs(mIds.get(i)),
    457                     currentStartPositionUs);
    458             mBufferManager.evictChunks(mIds.get(i), evictEndPositionUs);
    459         }
    460     }
    461 }