1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define USE_LOG SLAndroidLogLevel_Verbose 18 19 #include "sles_allinclusive.h" 20 #include "android_StreamPlayer.h" 21 22 #include <media/IStreamSource.h> 23 #include <media/IMediaPlayerService.h> 24 #include <media/stagefright/foundation/ADebug.h> 25 #include <binder/IPCThreadState.h> 26 27 #include "mpeg2ts/ATSParser.h" 28 29 //-------------------------------------------------------------------------------------------------- 30 namespace android { 31 32 StreamSourceAppProxy::StreamSourceAppProxy( 33 IAndroidBufferQueue *androidBufferQueue, 34 const sp<CallbackProtector> &callbackProtector, 35 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own 36 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the 37 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's 38 // destructor to run from inside it's own constructor. 39 StreamPlayer * /* const sp<StreamPlayer> & */ player) : 40 mBuffersHasBeenSet(false), 41 mAndroidBufferQueue(androidBufferQueue), 42 mCallbackProtector(callbackProtector), 43 mPlayer(player) 44 { 45 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()"); 46 } 47 48 StreamSourceAppProxy::~StreamSourceAppProxy() { 49 SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()"); 50 disconnect(); 51 } 52 53 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = { 54 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key 55 sizeof(SLuint32), // item size 56 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data 57 }; 58 59 //-------------------------------------------------- 60 // IStreamSource implementation 61 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) { 62 assert(listener != NULL); 63 Mutex::Autolock _l(mLock); 64 assert(mListener == NULL); 65 mListener = listener; 66 } 67 68 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) { 69 Mutex::Autolock _l(mLock); 70 assert(!mBuffersHasBeenSet); 71 mBuffers = buffers; 72 mBuffersHasBeenSet = true; 73 } 74 75 void StreamSourceAppProxy::onBufferAvailable(size_t index) { 76 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index); 77 78 { 79 Mutex::Autolock _l(mLock); 80 // assert not needed because if not set, size() will be zero and the CHECK_LT will also fail 81 // assert(mBuffersHasBeenSet); 82 CHECK_LT(index, mBuffers.size()); 83 #if 0 // enable if needed for debugging 84 sp<IMemory> mem = mBuffers.itemAt(index); 85 SLAint64 length = (SLAint64) mem->size(); 86 #endif 87 mAvailableBuffers.push_back(index); 88 //SL_LOGD("onBufferAvailable() now %d buffers available in queue", 89 // mAvailableBuffers.size()); 90 } 91 92 // a new shared mem buffer is available: let's try to fill immediately 93 pullFromBuffQueue(); 94 } 95 96 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) { 97 if (mListener != 0) { 98 mListener->issueCommand(cmd, false /* synchronous */, msg); 99 } 100 } 101 102 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) { 103 if (mListener != 0) { 104 mListener->queueBuffer(buffIndex, buffLength); 105 } 106 } 107 108 void StreamSourceAppProxy::disconnect() { 109 Mutex::Autolock _l(mLock); 110 mListener.clear(); 111 // Force binder to push the decremented reference count for sp<IStreamListener>. 112 // mediaserver and client both have sp<> to the other. When you decrement an sp<> 113 // reference count, binder doesn't push that to the other process immediately. 114 IPCThreadState::self()->flushCommands(); 115 mBuffers.clear(); 116 mBuffersHasBeenSet = false; 117 mAvailableBuffers.clear(); 118 } 119 120 //-------------------------------------------------- 121 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server) 122 void StreamSourceAppProxy::pullFromBuffQueue() { 123 124 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) { 125 126 size_t bufferId; 127 void* bufferLoc; 128 size_t buffSize; 129 130 slAndroidBufferQueueCallback callback = NULL; 131 void* pBufferContext, *pBufferData, *callbackPContext = NULL; 132 AdvancedBufferHeader *oldFront = NULL; 133 uint32_t dataSize /* , dataUsed */; 134 135 // retrieve data from the buffer queue 136 interface_lock_exclusive(mAndroidBufferQueue); 137 138 // can this read operation cause us to call the buffer queue callback 139 // (either because there was a command with no data, or all the data has been consumed) 140 bool queueCallbackCandidate = false; 141 142 if (mAndroidBufferQueue->mState.count != 0) { 143 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize); 144 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear); 145 146 oldFront = mAndroidBufferQueue->mFront; 147 AdvancedBufferHeader *newFront = &oldFront[1]; 148 149 // consume events when starting to read data from a buffer for the first time 150 if (oldFront->mDataSizeConsumed == 0) { 151 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue 152 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) { 153 receivedCmd_l(IStreamListener::EOS); 154 // EOS has no associated data 155 queueCallbackCandidate = true; 156 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) { 157 receivedCmd_l(IStreamListener::DISCONTINUITY); 158 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) { 159 sp<AMessage> msg = new AMessage(); 160 msg->setInt64(IStreamListener::kKeyResumeAtPTS, 161 (int64_t)oldFront->mItems.mTsCmdData.mPts); 162 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 163 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode 164 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) { 165 sp<AMessage> msg = new AMessage(); 166 msg->setInt32( 167 IStreamListener::kKeyDiscontinuityMask, 168 ATSParser::DISCONTINUITY_FORMATCHANGE); 169 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 170 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode 171 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) { 172 sp<AMessage> msg = new AMessage(); 173 msg->setInt32( 174 IStreamListener::kKeyDiscontinuityMask, 175 ATSParser::DISCONTINUITY_VIDEO_FORMAT); 176 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 177 } 178 // note that here we are intentionally only supporting 179 // ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c 180 181 // some commands may introduce a time discontinuity, reevaluate position if needed 182 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY | 183 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) { 184 const sp<StreamPlayer> player(mPlayer.promote()); 185 if (player != NULL) { 186 // FIXME see note at onSeek 187 player->seek(ANDROID_UNKNOWN_TIME); 188 } 189 } 190 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE; 191 } 192 193 { 194 // we're going to change the shared mem buffer queue, so lock it 195 Mutex::Autolock _l(mLock); 196 if (!mAvailableBuffers.empty()) { 197 bufferId = *mAvailableBuffers.begin(); 198 CHECK_LT(bufferId, mBuffers.size()); 199 sp<IMemory> mem = mBuffers.itemAt(bufferId); 200 bufferLoc = mem->pointer(); 201 buffSize = mem->size(); 202 203 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed; 204 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) { 205 // more available than requested, copy as much as requested 206 // consume data: 1/ copy to given destination 207 memcpy(bufferLoc, pSrc, buffSize); 208 // 2/ keep track of how much has been consumed 209 oldFront->mDataSizeConsumed += buffSize; 210 // 3/ notify shared mem listener that new data is available 211 receivedBuffer_l(bufferId, buffSize); 212 mAvailableBuffers.erase(mAvailableBuffers.begin()); 213 } else { 214 // requested as much available or more: consume the whole of the current 215 // buffer and move to the next 216 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed; 217 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed); 218 oldFront->mDataSizeConsumed = oldFront->mDataSize; 219 220 // move queue to next 221 if (newFront == &mAndroidBufferQueue-> 222 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) { 223 // reached the end, circle back 224 newFront = mAndroidBufferQueue->mBufferArray; 225 } 226 mAndroidBufferQueue->mFront = newFront; 227 mAndroidBufferQueue->mState.count--; 228 mAndroidBufferQueue->mState.index++; 229 230 if (consumed > 0) { 231 // consume data: 1/ copy to given destination 232 memcpy(bufferLoc, pSrc, consumed); 233 // 2/ keep track of how much has been consumed 234 // here nothing to do because we are done with this buffer 235 // 3/ notify StreamPlayer that new data is available 236 receivedBuffer_l(bufferId, consumed); 237 mAvailableBuffers.erase(mAvailableBuffers.begin()); 238 } 239 240 // data has been consumed, and the buffer queue state has been updated 241 // we will notify the client if applicable 242 queueCallbackCandidate = true; 243 } 244 } 245 246 if (queueCallbackCandidate) { 247 if (mAndroidBufferQueue->mCallbackEventsMask & 248 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) { 249 callback = mAndroidBufferQueue->mCallback; 250 // save callback data while under lock 251 callbackPContext = mAndroidBufferQueue->mContext; 252 pBufferContext = (void *)oldFront->mBufferContext; 253 pBufferData = (void *)oldFront->mDataBuffer; 254 dataSize = oldFront->mDataSize; 255 // here a buffer is only dequeued when fully consumed 256 //dataUsed = oldFront->mDataSizeConsumed; 257 } 258 } 259 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size()); 260 if (!mAvailableBuffers.empty()) { 261 // there is still room in the shared memory, recheck later if we can pull 262 // data from the buffer queue and write it to shared memory 263 const sp<StreamPlayer> player(mPlayer.promote()); 264 if (player != NULL) { 265 player->queueRefilled(); 266 } 267 } 268 } 269 270 } else { // empty queue 271 SL_LOGD("ABQ empty, starving!"); 272 } 273 274 interface_unlock_exclusive(mAndroidBufferQueue); 275 276 // notify client of buffer processed 277 if (NULL != callback) { 278 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext, 279 pBufferContext, pBufferData, dataSize, 280 dataSize, /* dataUsed */ 281 // no messages during playback other than marking the buffer as processed 282 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */, 283 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ ); 284 if (SL_RESULT_SUCCESS != result) { 285 // Reserved for future use 286 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result); 287 } 288 } 289 290 mCallbackProtector->exitCb(); 291 } // enterCbIfOk 292 } 293 294 295 //-------------------------------------------------------------------------------------------------- 296 StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo, 297 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) : 298 GenericMediaPlayer(params, hasVideo), 299 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)), 300 mStopForDestroyCompleted(false) 301 { 302 SL_LOGD("StreamPlayer::StreamPlayer()"); 303 } 304 305 StreamPlayer::~StreamPlayer() { 306 SL_LOGD("StreamPlayer::~StreamPlayer()"); 307 mAppProxy->disconnect(); 308 } 309 310 311 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) { 312 switch (msg->what()) { 313 case kWhatPullFromAbq: 314 onPullFromAndroidBufferQueue(); 315 break; 316 317 case kWhatStopForDestroy: 318 onStopForDestroy(); 319 break; 320 321 default: 322 GenericMediaPlayer::onMessageReceived(msg); 323 break; 324 } 325 } 326 327 328 void StreamPlayer::preDestroy() { 329 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper 330 (new AMessage(kWhatStopForDestroy, id()))->post(); 331 { 332 Mutex::Autolock _l(mStopForDestroyLock); 333 while (!mStopForDestroyCompleted) { 334 mStopForDestroyCondition.wait(mStopForDestroyLock); 335 } 336 } 337 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign 338 GenericMediaPlayer::preDestroy(); 339 } 340 341 342 void StreamPlayer::onStopForDestroy() { 343 if (mPlayer != 0) { 344 mPlayer->stop(); 345 // causes CHECK failure in Nuplayer 346 //mPlayer->setDataSource(NULL); 347 mPlayer->setVideoSurfaceTexture(NULL); 348 mPlayer->disconnect(); 349 mPlayer.clear(); 350 { 351 // FIXME ugh make this a method 352 Mutex::Autolock _l(mPreparedPlayerLock); 353 mPreparedPlayer.clear(); 354 } 355 } 356 { 357 Mutex::Autolock _l(mStopForDestroyLock); 358 mStopForDestroyCompleted = true; 359 } 360 mStopForDestroyCondition.signal(); 361 } 362 363 364 /** 365 * Asynchronously notify the player that the queue is ready to be pulled from. 366 */ 367 void StreamPlayer::queueRefilled() { 368 // async notification that the ABQ was refilled: the player should pull from the ABQ, and 369 // and push to shared memory (to the media server) 370 (new AMessage(kWhatPullFromAbq, id()))->post(); 371 } 372 373 374 void StreamPlayer::appClear_l() { 375 // the user of StreamPlayer has cleared its AndroidBufferQueue: 376 // there's no clear() for the shared memory queue, so this is a no-op 377 } 378 379 380 //-------------------------------------------------- 381 // Event handlers 382 void StreamPlayer::onPrepare() { 383 SL_LOGD("StreamPlayer::onPrepare()"); 384 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService()); 385 if (mediaPlayerService != NULL) { 386 mPlayer = mediaPlayerService->create(getpid(), mPlayerClient /*IMediaPlayerClient*/, 387 mPlaybackParams.sessionId); 388 if (mPlayer == NULL) { 389 SL_LOGE("media player service failed to create player by app proxy"); 390 } else if (mPlayer->setDataSource(mAppProxy /*IStreamSource*/) != NO_ERROR) { 391 SL_LOGE("setDataSource failed"); 392 mPlayer.clear(); 393 } 394 } 395 if (mPlayer == NULL) { 396 mStateFlags |= kFlagPreparedUnsuccessfully; 397 } 398 GenericMediaPlayer::onPrepare(); 399 SL_LOGD("StreamPlayer::onPrepare() done"); 400 } 401 402 403 void StreamPlayer::onPlay() { 404 SL_LOGD("StreamPlayer::onPlay()"); 405 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the 406 // player had starved the shared memory) 407 queueRefilled(); 408 409 GenericMediaPlayer::onPlay(); 410 } 411 412 413 void StreamPlayer::onPullFromAndroidBufferQueue() { 414 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()"); 415 mAppProxy->pullFromBuffQueue(); 416 } 417 418 } // namespace android 419