1 /* 2 * Copyright (C) 2010 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define USE_LOG SLAndroidLogLevel_Verbose 18 19 #include "sles_allinclusive.h" 20 #include "android_StreamPlayer.h" 21 22 #include <media/IStreamSource.h> 23 #include <media/IMediaPlayerService.h> 24 #include <media/stagefright/foundation/ADebug.h> 25 #include <media/stagefright/foundation/MediaKeys.h> 26 #include <binder/IPCThreadState.h> 27 28 #include <ATSParser.h> 29 30 //-------------------------------------------------------------------------------------------------- 31 namespace android { 32 33 StreamSourceAppProxy::StreamSourceAppProxy( 34 IAndroidBufferQueue *androidBufferQueue, 35 const sp<CallbackProtector> &callbackProtector, 36 // sp<StreamPlayer> would cause StreamPlayer's destructor to run during it's own 37 // construction. If you pass in a sp<> to 'this' inside a constructor, then first the 38 // refcount is increased from 0 to 1, then decreased from 1 to 0, which causes the object's 39 // destructor to run from inside it's own constructor. 40 StreamPlayer * /* const sp<StreamPlayer> & */ player) : 41 mBuffersHasBeenSet(false), 42 mAndroidBufferQueue(androidBufferQueue), 43 mCallbackProtector(callbackProtector), 44 mPlayer(player) 45 { 46 SL_LOGV("StreamSourceAppProxy::StreamSourceAppProxy()"); 47 } 48 49 StreamSourceAppProxy::~StreamSourceAppProxy() { 50 SL_LOGV("StreamSourceAppProxy::~StreamSourceAppProxy()"); 51 disconnect(); 52 } 53 54 const SLuint32 StreamSourceAppProxy::kItemProcessed[NB_BUFFEREVENT_ITEM_FIELDS] = { 55 SL_ANDROID_ITEMKEY_BUFFERQUEUEEVENT, // item key 56 sizeof(SLuint32), // item size 57 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED // item data 58 }; 59 60 //-------------------------------------------------- 61 // IStreamSource implementation 62 void StreamSourceAppProxy::setListener(const sp<IStreamListener> &listener) { 63 assert(listener != NULL); 64 Mutex::Autolock _l(mLock); 65 assert(mListener == NULL); 66 mListener = listener; 67 } 68 69 void StreamSourceAppProxy::setBuffers(const Vector<sp<IMemory> > &buffers) { 70 Mutex::Autolock _l(mLock); 71 assert(!mBuffersHasBeenSet); 72 mBuffers = buffers; 73 mBuffersHasBeenSet = true; 74 } 75 76 void StreamSourceAppProxy::onBufferAvailable(size_t index) { 77 //SL_LOGD("StreamSourceAppProxy::onBufferAvailable(%d)", index); 78 79 { 80 Mutex::Autolock _l(mLock); 81 if (!mBuffersHasBeenSet) { 82 // no buffers available to push data to from the buffer queue, bail 83 return; 84 } 85 CHECK_LT(index, mBuffers.size()); 86 #if 0 // enable if needed for debugging 87 sp<IMemory> mem = mBuffers.itemAt(index); 88 SLAint64 length = (SLAint64) mem->size(); 89 #endif 90 mAvailableBuffers.push_back(index); 91 //SL_LOGD("onBufferAvailable() now %d buffers available in queue", 92 // mAvailableBuffers.size()); 93 } 94 95 // a new shared mem buffer is available: let's try to fill immediately 96 pullFromBuffQueue(); 97 } 98 99 void StreamSourceAppProxy::receivedCmd_l(IStreamListener::Command cmd, const sp<AMessage> &msg) { 100 if (mListener != 0) { 101 mListener->issueCommand(cmd, false /* synchronous */, msg); 102 } 103 } 104 105 void StreamSourceAppProxy::receivedBuffer_l(size_t buffIndex, size_t buffLength) { 106 if (mListener != 0) { 107 mListener->queueBuffer(buffIndex, buffLength); 108 } 109 } 110 111 void StreamSourceAppProxy::disconnect() { 112 Mutex::Autolock _l(mLock); 113 mListener.clear(); 114 // Force binder to push the decremented reference count for sp<IStreamListener>. 115 // mediaserver and client both have sp<> to the other. When you decrement an sp<> 116 // reference count, binder doesn't push that to the other process immediately. 117 IPCThreadState::self()->flushCommands(); 118 mBuffers.clear(); 119 mBuffersHasBeenSet = false; 120 mAvailableBuffers.clear(); 121 } 122 123 //-------------------------------------------------- 124 // consumption from ABQ: pull from the ABQ, and push to shared memory (media server) 125 void StreamSourceAppProxy::pullFromBuffQueue() { 126 127 if (android::CallbackProtector::enterCbIfOk(mCallbackProtector)) { 128 129 size_t bufferId; 130 void* bufferLoc; 131 size_t buffSize; 132 133 slAndroidBufferQueueCallback callback = NULL; 134 void* pBufferContext, *pBufferData, *callbackPContext = NULL; 135 AdvancedBufferHeader *oldFront = NULL; 136 uint32_t dataSize /* , dataUsed */; 137 138 // retrieve data from the buffer queue 139 interface_lock_exclusive(mAndroidBufferQueue); 140 141 // can this read operation cause us to call the buffer queue callback 142 // (either because there was a command with no data, or all the data has been consumed) 143 bool queueCallbackCandidate = false; 144 145 if (mAndroidBufferQueue->mState.count != 0) { 146 // SL_LOGD("nbBuffers in ABQ = %u, buffSize=%u",abq->mState.count, buffSize); 147 assert(mAndroidBufferQueue->mFront != mAndroidBufferQueue->mRear); 148 149 oldFront = mAndroidBufferQueue->mFront; 150 AdvancedBufferHeader *newFront = &oldFront[1]; 151 152 // consume events when starting to read data from a buffer for the first time 153 if (oldFront->mDataSizeConsumed == 0) { 154 // note this code assumes at most one event per buffer; see IAndroidBufferQueue_Enqueue 155 if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_EOS) { 156 receivedCmd_l(IStreamListener::EOS); 157 // EOS has no associated data 158 queueCallbackCandidate = true; 159 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCONTINUITY) { 160 receivedCmd_l(IStreamListener::DISCONTINUITY); 161 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode & ANDROID_MP2TSEVENT_DISCON_NEWPTS) { 162 sp<AMessage> msg = new AMessage(); 163 msg->setInt64(kATSParserKeyResumeAtPTS, 164 (int64_t)oldFront->mItems.mTsCmdData.mPts); 165 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 166 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode 167 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL) { 168 sp<AMessage> msg = new AMessage(); 169 msg->setInt32( 170 kIStreamListenerKeyDiscontinuityMask, 171 ATSParser::DISCONTINUITY_FORMATCHANGE); 172 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 173 } else if (oldFront->mItems.mTsCmdData.mTsCmdCode 174 & ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO) { 175 sp<AMessage> msg = new AMessage(); 176 msg->setInt32( 177 kIStreamListenerKeyDiscontinuityMask, 178 ATSParser::DISCONTINUITY_VIDEO_FORMAT); 179 receivedCmd_l(IStreamListener::DISCONTINUITY, msg /*msg*/); 180 } 181 // note that here we are intentionally only supporting 182 // ANDROID_MP2TSEVENT_FORMAT_CHANGE_VIDEO, see IAndroidBufferQueue.c 183 184 // some commands may introduce a time discontinuity, reevaluate position if needed 185 if (oldFront->mItems.mTsCmdData.mTsCmdCode & (ANDROID_MP2TSEVENT_DISCONTINUITY | 186 ANDROID_MP2TSEVENT_DISCON_NEWPTS | ANDROID_MP2TSEVENT_FORMAT_CHANGE_FULL)) { 187 const sp<StreamPlayer> player(mPlayer.promote()); 188 if (player != NULL) { 189 // FIXME see note at onSeek 190 player->seek(ANDROID_UNKNOWN_TIME); 191 } 192 } 193 oldFront->mItems.mTsCmdData.mTsCmdCode = ANDROID_MP2TSEVENT_NONE; 194 } 195 196 { 197 // we're going to change the shared mem buffer queue, so lock it 198 Mutex::Autolock _l(mLock); 199 if (!mAvailableBuffers.empty()) { 200 bufferId = *mAvailableBuffers.begin(); 201 CHECK_LT(bufferId, mBuffers.size()); 202 sp<IMemory> mem = mBuffers.itemAt(bufferId); 203 bufferLoc = mem->pointer(); 204 buffSize = mem->size(); 205 206 char *pSrc = ((char*)oldFront->mDataBuffer) + oldFront->mDataSizeConsumed; 207 if (oldFront->mDataSizeConsumed + buffSize < oldFront->mDataSize) { 208 // more available than requested, copy as much as requested 209 // consume data: 1/ copy to given destination 210 memcpy(bufferLoc, pSrc, buffSize); 211 // 2/ keep track of how much has been consumed 212 oldFront->mDataSizeConsumed += buffSize; 213 // 3/ notify shared mem listener that new data is available 214 receivedBuffer_l(bufferId, buffSize); 215 mAvailableBuffers.erase(mAvailableBuffers.begin()); 216 } else { 217 // requested as much available or more: consume the whole of the current 218 // buffer and move to the next 219 size_t consumed = oldFront->mDataSize - oldFront->mDataSizeConsumed; 220 //SL_LOGD("consuming rest of buffer: enqueueing=%u", consumed); 221 oldFront->mDataSizeConsumed = oldFront->mDataSize; 222 223 // move queue to next 224 if (newFront == &mAndroidBufferQueue-> 225 mBufferArray[mAndroidBufferQueue->mNumBuffers + 1]) { 226 // reached the end, circle back 227 newFront = mAndroidBufferQueue->mBufferArray; 228 } 229 mAndroidBufferQueue->mFront = newFront; 230 mAndroidBufferQueue->mState.count--; 231 mAndroidBufferQueue->mState.index++; 232 233 if (consumed > 0) { 234 // consume data: 1/ copy to given destination 235 memcpy(bufferLoc, pSrc, consumed); 236 // 2/ keep track of how much has been consumed 237 // here nothing to do because we are done with this buffer 238 // 3/ notify StreamPlayer that new data is available 239 receivedBuffer_l(bufferId, consumed); 240 mAvailableBuffers.erase(mAvailableBuffers.begin()); 241 } 242 243 // data has been consumed, and the buffer queue state has been updated 244 // we will notify the client if applicable 245 queueCallbackCandidate = true; 246 } 247 } 248 249 if (queueCallbackCandidate) { 250 if (mAndroidBufferQueue->mCallbackEventsMask & 251 SL_ANDROIDBUFFERQUEUEEVENT_PROCESSED) { 252 callback = mAndroidBufferQueue->mCallback; 253 // save callback data while under lock 254 callbackPContext = mAndroidBufferQueue->mContext; 255 pBufferContext = (void *)oldFront->mBufferContext; 256 pBufferData = (void *)oldFront->mDataBuffer; 257 dataSize = oldFront->mDataSize; 258 // here a buffer is only dequeued when fully consumed 259 //dataUsed = oldFront->mDataSizeConsumed; 260 } 261 } 262 //SL_LOGD("%d buffers available after reading from queue", mAvailableBuffers.size()); 263 if (!mAvailableBuffers.empty()) { 264 // there is still room in the shared memory, recheck later if we can pull 265 // data from the buffer queue and write it to shared memory 266 const sp<StreamPlayer> player(mPlayer.promote()); 267 if (player != NULL) { 268 player->queueRefilled(); 269 } 270 } 271 } 272 273 } else { // empty queue 274 SL_LOGD("ABQ empty, starving!"); 275 } 276 277 interface_unlock_exclusive(mAndroidBufferQueue); 278 279 // notify client of buffer processed 280 if (NULL != callback) { 281 SLresult result = (*callback)(&mAndroidBufferQueue->mItf, callbackPContext, 282 pBufferContext, pBufferData, dataSize, 283 dataSize, /* dataUsed */ 284 // no messages during playback other than marking the buffer as processed 285 (const SLAndroidBufferItem*)(&kItemProcessed) /* pItems */, 286 NB_BUFFEREVENT_ITEM_FIELDS *sizeof(SLuint32) /* itemsLength */ ); 287 if (SL_RESULT_SUCCESS != result) { 288 // Reserved for future use 289 SL_LOGW("Unsuccessful result %d returned from AndroidBufferQueueCallback", result); 290 } 291 } 292 293 mCallbackProtector->exitCb(); 294 } // enterCbIfOk 295 } 296 297 298 //-------------------------------------------------------------------------------------------------- 299 StreamPlayer::StreamPlayer(const AudioPlayback_Parameters* params, bool hasVideo, 300 IAndroidBufferQueue *androidBufferQueue, const sp<CallbackProtector> &callbackProtector) : 301 GenericMediaPlayer(params, hasVideo), 302 mAppProxy(new StreamSourceAppProxy(androidBufferQueue, callbackProtector, this)), 303 mStopForDestroyCompleted(false) 304 { 305 SL_LOGD("StreamPlayer::StreamPlayer()"); 306 } 307 308 StreamPlayer::~StreamPlayer() { 309 SL_LOGD("StreamPlayer::~StreamPlayer()"); 310 mAppProxy->disconnect(); 311 } 312 313 314 void StreamPlayer::onMessageReceived(const sp<AMessage> &msg) { 315 switch (msg->what()) { 316 case kWhatPullFromAbq: 317 onPullFromAndroidBufferQueue(); 318 break; 319 320 case kWhatStopForDestroy: 321 onStopForDestroy(); 322 break; 323 324 default: 325 GenericMediaPlayer::onMessageReceived(msg); 326 break; 327 } 328 } 329 330 331 void StreamPlayer::preDestroy() { 332 // FIXME NuPlayerDriver is currently not thread-safe, so stop() must be called by looper 333 (new AMessage(kWhatStopForDestroy, this))->post(); 334 { 335 Mutex::Autolock _l(mStopForDestroyLock); 336 while (!mStopForDestroyCompleted) { 337 mStopForDestroyCondition.wait(mStopForDestroyLock); 338 } 339 } 340 // GenericMediaPlayer::preDestroy will repeat some of what we've done, but that's benign 341 GenericMediaPlayer::preDestroy(); 342 } 343 344 345 void StreamPlayer::onStopForDestroy() { 346 if (mPlayer != 0) { 347 mPlayer->stop(); 348 // causes CHECK failure in Nuplayer 349 //mPlayer->setDataSource(NULL); 350 mPlayer->setVideoSurfaceTexture(NULL); 351 mPlayer->disconnect(); 352 mPlayer.clear(); 353 { 354 // FIXME ugh make this a method 355 Mutex::Autolock _l(mPreparedPlayerLock); 356 mPreparedPlayer.clear(); 357 } 358 } 359 { 360 Mutex::Autolock _l(mStopForDestroyLock); 361 mStopForDestroyCompleted = true; 362 } 363 mStopForDestroyCondition.signal(); 364 } 365 366 367 /** 368 * Asynchronously notify the player that the queue is ready to be pulled from. 369 */ 370 void StreamPlayer::queueRefilled() { 371 // async notification that the ABQ was refilled: the player should pull from the ABQ, and 372 // and push to shared memory (to the media server) 373 (new AMessage(kWhatPullFromAbq, this))->post(); 374 } 375 376 377 void StreamPlayer::appClear_l() { 378 // the user of StreamPlayer has cleared its AndroidBufferQueue: 379 // there's no clear() for the shared memory queue, so this is a no-op 380 } 381 382 383 //-------------------------------------------------- 384 // Event handlers 385 void StreamPlayer::onPrepare() { 386 SL_LOGD("StreamPlayer::onPrepare()"); 387 sp<IMediaPlayerService> mediaPlayerService(getMediaPlayerService()); 388 if (mediaPlayerService != NULL) { 389 mPlayer = mediaPlayerService->create(mPlayerClient /*IMediaPlayerClient*/, 390 mPlaybackParams.sessionId); 391 if (mPlayer == NULL) { 392 SL_LOGE("media player service failed to create player by app proxy"); 393 } else if (mPlayer->setDataSource(static_cast<sp<IStreamSource>>(mAppProxy)) != 394 NO_ERROR) { 395 SL_LOGE("setDataSource failed"); 396 mPlayer.clear(); 397 } 398 } 399 if (mPlayer == NULL) { 400 mStateFlags |= kFlagPreparedUnsuccessfully; 401 } 402 GenericMediaPlayer::onPrepare(); 403 SL_LOGD("StreamPlayer::onPrepare() done"); 404 } 405 406 407 void StreamPlayer::onPlay() { 408 SL_LOGD("StreamPlayer::onPlay()"); 409 // enqueue a message that will cause StreamAppProxy to consume from the queue (again if the 410 // player had starved the shared memory) 411 queueRefilled(); 412 413 GenericMediaPlayer::onPlay(); 414 } 415 416 417 void StreamPlayer::onPullFromAndroidBufferQueue() { 418 SL_LOGD("StreamPlayer::onPullFromAndroidBufferQueue()"); 419 mAppProxy->pullFromBuffQueue(); 420 } 421 422 } // namespace android 423