Home | History | Annotate | Download | only in android
      1 /*
      2  * Copyright (C) 2010 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define USE_LOG SLAndroidLogLevel_Verbose
     18 
     19 #include "sles_allinclusive.h"
     20 #include "android_StreamPlayer.h"
     21 
     22 #include <media/IStreamSource.h>
     23 #include <media/IMediaPlayerService.h>
     24 #include <media/stagefright/foundation/ADebug.h>
     25 #include <binder/IPCThreadState.h>
     26 
     27 #include "mpeg2ts/ATSParser.h"
     28 
     29 //--------------------------------------------------------------------------------------------------
     30 namespace android {
     31 
     32 StreamSourceAppProxy::StreamSourceAppProxy(
     33         IAndroidBufferQueue *androidBufferQueue,
     34         const sp<CallbackProtector> &callbackProtector,
     35         // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own
     36         // construction.   If you pass in a sp<> to 'this' inside a constructor, then first the
     37         // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's
     38         // destructor to run from inside it's own constructor.
     39         StreamPlayer * /* const sp<StreamPlayer> & */ player) :
     40     mBuffersHasBeenSet(false),
     41     mAndroidBufferQueue(androidBufferQueue),
     42     mCallbackProtector(callbackProtector),
     43     mPlayer(player)
     44 {
     45     SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()");
     46 }
     47 
     48 StreamSourceAppProxy::~StreamSourceAppProxy() {
     49     SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()");
     50     disconnect();
     51 }
     52 
     53 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = {
     54         SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key
     55         sizeof(SLuint32),                    // item size
     56         SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data
     57 };
     58 
     59 //--------------------------------------------------
     60 // IStreamSource implementation
     61 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) {
     62     assert(listener != NULL);
     63     Mutex::Autolock _l(mLock);
     64     assert(mListener == NULL);
     65     mListener = listener;
     66 }
     67 
     68 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) {
     69     Mutex::Autolock _l(mLock);
     70     assert(!mBuffersHasBeenSet);
     71     mBuffers = buffers;
     72     mBuffersHasBeenSet = true;
     73 }
     74 
     75 void StreamSourceAppProxy::onBufferAvailable(size_t index) {
     76     //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index);
     77 
     78     {
     79         Mutex::Autolock _l(mLock);
     80         // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail
     81         // assert(mBuffersHasBeenSet);
     82         CHECK_LT(index, mBuffers.size());
     83 #if 0   // enable if needed for debugging
     84         sp<IMemory> mem = mBuffers.itemAt(index);
     85         SLAint64 length = (SLAint64) mem->size();
     86 #endif
     87         mAvailableBuffers.push_back(index);
     88         //SL_LOGD("onBufferAvailable() now %d buffers available in queue", mAvailableBuffers.size());
     89     }
     90 
     91     // a new shared mem buffer is available: let's try to fill immediately
     92     pullFromBuffQueue();
     93 }
     94 
     95 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) {
     96     if (mListener != 0) {
     97         mListener->issueCommand(cmd, false /* synchronous */, msg);
     98     }
     99 }
    100 
    101 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) {
    102     if (mListener != 0) {
    103         mListener->queueBuffer(buffIndex, buffLength);
    104     }
    105 }
    106 
    107 void StreamSourceAppProxy::disconnect() {
    108     Mutex::Autolock _l(mLock);
    109     mListener.clear();
    110     // Force binder to push the decremented reference count for sp<IStreamListener>.
    111     // mediaserver and client both have sp<> to the other. When you decrement an sp<>
    112     // reference count, binder doesn't push that to the other process immediately.
    113     IPCThreadState::self()->flushCommands();
    114     mBuffers.clear();
    115     mBuffersHasBeenSet = false;
    116     mAvailableBuffers.clear();
    117 }
    118 
    119 //--------------------------------------------------
    120 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server)
    121 void StreamSourceAppProxy::pullFromBuffQueue() {
    122 
    123   if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) {
    124 
    125     size_t bufferId;
    126     void* bufferLoc;
    127     size_t buffSize;
    128 
    129     slAndroidBufferQueueCallback callback = NULL;
    130     void* pBufferContext, *pBufferData, *callbackPContext = NULL;
    131     AdvancedBufferHeader *oldFront = NULL;
    132     uint32_t dataSize /* , dataUsed */;
    133 
    134     // retrieve data from the buffer queue
    135     interface_lock_exclusive(mAndroidBufferQueue);
    136 
    137     // can this read operation cause us to call the buffer queue callback
    138     // (either because there was a command with no data, or all the data has been consumed)
    139     bool queueCallbackCandidate = false;
    140 
    141     if (mAndroidBufferQueue->mState.count != 0) {
    142         // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize);
    143         assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear);
    144 
    145         oldFront = mAndroidBufferQueue->mFront;
    146         AdvancedBufferHeader *newFront = &oldFront[1];
    147 
    148         // consume events when starting to read data from a buffer for the first time
    149         if (oldFront->mDataSizeConsumed == 0) {
    150             // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue
    151             if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) {
    152                 receivedCmd_l(IStreamListener::EOS);
    153                 // EOS has no associated data
    154                 queueCallbackCandidate = true;
    155             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) {
    156                 receivedCmd_l(IStreamListener::DISCONTINUITY);
    157             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) {
    158                 sp<AMessage> msg = new AMessage();
    159                 msg->setInt64(IStreamListener::kKeyResumeAtPTS,
    160                         (int64_t)oldFront->mItems.mTsCmdData.mPts);
    161                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
    162             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
    163                     & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) {
    164                 sp<AMessage> msg = new AMessage();
    165                 msg->setInt32(
    166                         IStreamListener::kKeyDiscontinuityMask,
    167                         ATSParser::DISCONTINUITY_FORMATCHANGE);
    168                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
    169             } else if (oldFront->mItems.mTsCmdData.mTsCmdCode
    170                     & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) {
    171                 sp<AMessage> msg = new AMessage();
    172                 msg->setInt32(
    173                         IStreamListener::kKeyDiscontinuityMask,
    174                         ATSParser::DISCONTINUITY_VIDEO_FORMAT);
    175                 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/);
    176             }
    177             // note that here we are intentionally only supporting
    178             //   ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c
    179 
    180             // some commands may introduce a time discontinuity, reevaluate position if needed
    181             if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY |
    182                     ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) {
    183                 const sp<StreamPlayer> player(mPlayer.promote());
    184                 if (player != NULL) {
    185                     // FIXME see note at onSeek
    186                     player->seek(ANDROID_UNKNOWN_TIME);
    187                 }
    188             }
    189             oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE;
    190         }
    191 
    192         {
    193             // we're going to change the shared mem buffer queue, so lock it
    194             Mutex::Autolock _l(mLock);
    195             if (!mAvailableBuffers.empty()) {
    196                 bufferId = *mAvailableBuffers.begin();
    197                 CHECK_LT(bufferId, mBuffers.size());
    198                 sp<IMemory> mem = mBuffers.itemAt(bufferId);
    199                 bufferLoc = mem->pointer();
    200                 buffSize = mem->size();
    201 
    202                 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed;
    203                 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) {
    204                     // more available than requested, copy as much as requested
    205                     // consume data: 1/ copy to given destination
    206                     memcpy(bufferLoc, pSrc, buffSize);
    207                     //               2/ keep track of how much has been consumed
    208                     oldFront->mDataSizeConsumed += buffSize;
    209                     //               3/ notify shared mem listener that new data is available
    210                     receivedBuffer_l(bufferId, buffSize);
    211                     mAvailableBuffers.erase(mAvailableBuffers.begin());
    212                 } else {
    213                     // requested as much available or more: consume the whole of the current
    214                     //   buffer and move to the next
    215                     size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed;
    216                     //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed);
    217                     oldFront->mDataSizeConsumed = oldFront->mDataSize;
    218 
    219                     // move queue to next
    220                     if (newFront == &mAndroidBufferQueue->
    221                             mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) {
    222                         // reached the end, circle back
    223                         newFront = mAndroidBufferQueue->mBufferArray;
    224                     }
    225                     mAndroidBufferQueue->mFront = newFront;
    226                     mAndroidBufferQueue->mState.count--;
    227                     mAndroidBufferQueue->mState.index++;
    228 
    229                     if (consumed > 0) {
    230                         // consume data: 1/ copy to given destination
    231                         memcpy(bufferLoc, pSrc, consumed);
    232                         //               2/ keep track of how much has been consumed
    233                         // here nothing to do because we are done with this buffer
    234                         //               3/ notify StreamPlayer that new data is available
    235                         receivedBuffer_l(bufferId, consumed);
    236                         mAvailableBuffers.erase(mAvailableBuffers.begin());
    237                     }
    238 
    239                     // data has been consumed, and the buffer queue state has been updated
    240                     // we will notify the client if applicable
    241                     queueCallbackCandidate = true;
    242                 }
    243             }
    244 
    245             if (queueCallbackCandidate) {
    246                 if (mAndroidBufferQueue->mCallbackEventsMask &
    247                         SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) {
    248                     callback = mAndroidBufferQueue->mCallback;
    249                     // save callback data while under lock
    250                     callbackPContext = mAndroidBufferQueue->mContext;
    251                     pBufferContext = (void *)oldFront->mBufferContext;
    252                     pBufferData    = (void *)oldFront->mDataBuffer;
    253                     dataSize       = oldFront->mDataSize;
    254                     // here a buffer is only dequeued when fully consumed
    255                     //dataUsed     = oldFront->mDataSizeConsumed;
    256                 }
    257             }
    258             //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size());
    259             if (!mAvailableBuffers.empty()) {
    260                 // there is still room in the shared memory, recheck later if we can pull
    261                 // data from the buffer queue and write it to shared memory
    262                 const sp<StreamPlayer> player(mPlayer.promote());
    263                 if (player != NULL) {
    264                     player->queueRefilled();
    265                 }
    266             }
    267         }
    268 
    269     } else { // empty queue
    270         SL_LOGD("ABQ empty, starving!");
    271     }
    272 
    273     interface_unlock_exclusive(mAndroidBufferQueue);
    274 
    275     // notify client of buffer processed
    276     if (NULL != callback) {
    277         SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext,
    278                 pBufferContext, pBufferData, dataSize,
    279                 dataSize, /* dataUsed  */
    280                 // no messages during playback other than marking the buffer as processed
    281                 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */,
    282                 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ );
    283         if (SL_RESULT_SUCCESS != result) {
    284             // Reserved for future use
    285             SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result);
    286         }
    287     }
    288 
    289     mCallbackProtector->exitCb();
    290   } // enterCbIfOk
    291 }
    292 
    293 
    294 //--------------------------------------------------------------------------------------------------
    295 StreamPlayer::StreamPlayer(AudioPlayback_Parameters* params, bool hasVideo,
    296         IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) :
    297         GenericMediaPlayer(params, hasVideo),
    298         mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)),
    299         mStopForDestroyCompleted(false)
    300 {
    301     SL_LOGD("StreamPlayer::StreamPlayer()");
    302 
    303     mPlaybackParams = *params;
    304 
    305 }
    306 
    307 StreamPlayer::~StreamPlayer() {
    308     SL_LOGD("StreamPlayer::~StreamPlayer()");
    309     mAppProxy->disconnect();
    310 }
    311 
    312 
    313 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) {
    314     switch (msg->what()) {
    315         case kWhatPullFromAbq:
    316             onPullFromAndroidBufferQueue();
    317             break;
    318 
    319         case kWhatStopForDestroy:
    320             onStopForDestroy();
    321             break;
    322 
    323         default:
    324             GenericMediaPlayer::onMessageReceived(msg);
    325             break;
    326     }
    327 }
    328 
    329 
    330 void StreamPlayer::preDestroy() {
    331     // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper
    332     (new AMessage(kWhatStopForDestroy, id()))->post();
    333     {
    334         Mutex::Autolock _l(mStopForDestroyLock);
    335         while (!mStopForDestroyCompleted) {
    336             mStopForDestroyCondition.wait(mStopForDestroyLock);
    337         }
    338     }
    339     // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign
    340     GenericMediaPlayer::preDestroy();
    341 }
    342 
    343 
    344 void StreamPlayer::onStopForDestroy() {
    345     if (mPlayer != 0) {
    346         mPlayer->stop();
    347         // causes CHECK failure in Nuplayer
    348         //mPlayer->setDataSource(NULL);
    349         mPlayer->setVideoSurfaceTexture(NULL);
    350         mPlayer->disconnect();
    351         mPlayer.clear();
    352         {
    353             // FIXME ugh make this a method
    354             Mutex::Autolock _l(mPreparedPlayerLock);
    355             mPreparedPlayer.clear();
    356         }
    357     }
    358     {
    359         Mutex::Autolock _l(mStopForDestroyLock);
    360         mStopForDestroyCompleted = true;
    361     }
    362     mStopForDestroyCondition.signal();
    363 }
    364 
    365 
    366 /**
    367  * Asynchronously notify the player that the queue is ready to be pulled from.
    368  */
    369 void StreamPlayer::queueRefilled() {
    370     // async notification that the ABQ was refilled: the player should pull from the ABQ, and
    371     //    and push to shared memory (to the media server)
    372     (new AMessage(kWhatPullFromAbq, id()))->post();
    373 }
    374 
    375 
    376 void StreamPlayer::appClear_l() {
    377     // the user of StreamPlayer has cleared its AndroidBufferQueue:
    378     // there's no clear() for the shared memory queue, so this is a no-op
    379 }
    380 
    381 
    382 //--------------------------------------------------
    383 // Event handlers
    384 void StreamPlayer::onPrepare() {
    385     SL_LOGD("StreamPlayer::onPrepare()");
    386         sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService());
    387         if (mediaPlayerService != NULL) {
    388             mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/,
    389                     mPlaybackParams.sessionId);
    390             if (mPlayer == NULL) {
    391                 SL_LOGE("media player service failed to create player by app proxy");
    392             } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) {
    393                 SL_LOGE("setDataSource failed");
    394                 mPlayer.clear();
    395             }
    396         }
    397     if (mPlayer == NULL) {
    398         mStateFlags |= kFlagPreparedUnsuccessfully;
    399     }
    400     GenericMediaPlayer::onPrepare();
    401     SL_LOGD("StreamPlayer::onPrepare() done");
    402 }
    403 
    404 
    405 void StreamPlayer::onPlay() {
    406     SL_LOGD("StreamPlayer::onPlay()");
    407     // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the
    408     // player had starved the shared memory)
    409     queueRefilled();
    410 
    411     GenericMediaPlayer::onPlay();
    412 }
    413 
    414 
    415 void StreamPlayer::onPullFromAndroidBufferQueue() {
    416     SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()");
    417     mAppProxy->pullFromBuffQueue();
    418 }
    419 
    420 } // namespace android
    421