Home | History | Annotate | Download | only in sink
      1 /*
      2  * Copyright 2012, The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *     http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "TunnelRenderer"
     19 #include <utils/Log.h>
     20 
     21 #include "TunnelRenderer.h"
     22 
     23 #include "ATSParser.h"
     24 
     25 #include <binder/IMemory.h>
     26 #include <binder/IServiceManager.h>
     27 #include <gui/SurfaceComposerClient.h>
     28 #include <media/IMediaPlayerService.h>
     29 #include <media/IStreamSource.h>
     30 #include <media/stagefright/foundation/ABuffer.h>
     31 #include <media/stagefright/foundation/ADebug.h>
     32 #include <media/stagefright/foundation/AMessage.h>
     33 #include <ui/DisplayInfo.h>
     34 
     35 namespace android {
     36 
     37 struct TunnelRenderer::PlayerClient : public BnMediaPlayerClient {
     38     PlayerClient() {}
     39 
     40     virtual void notify(int msg, int ext1, int ext2, const Parcel *obj) {
     41         ALOGI("notify %d, %d, %d", msg, ext1, ext2);
     42     }
     43 
     44 protected:
     45     virtual ~PlayerClient() {}
     46 
     47 private:
     48     DISALLOW_EVIL_CONSTRUCTORS(PlayerClient);
     49 };
     50 
     51 struct TunnelRenderer::StreamSource : public BnStreamSource {
     52     StreamSource(TunnelRenderer *owner);
     53 
     54     virtual void setListener(const sp<IStreamListener> &listener);
     55     virtual void setBuffers(const Vector<sp<IMemory> > &buffers);
     56 
     57     virtual void onBufferAvailable(size_t index);
     58 
     59     virtual uint32_t flags() const;
     60 
     61     void doSomeWork();
     62 
     63 protected:
     64     virtual ~StreamSource();
     65 
     66 private:
     67     mutable Mutex mLock;
     68 
     69     TunnelRenderer *mOwner;
     70 
     71     sp<IStreamListener> mListener;
     72 
     73     Vector<sp<IMemory> > mBuffers;
     74     List<size_t> mIndicesAvailable;
     75 
     76     size_t mNumDeqeued;
     77 
     78     DISALLOW_EVIL_CONSTRUCTORS(StreamSource);
     79 };
     80 
     81 ////////////////////////////////////////////////////////////////////////////////
     82 
     83 TunnelRenderer::StreamSource::StreamSource(TunnelRenderer *owner)
     84     : mOwner(owner),
     85       mNumDeqeued(0) {
     86 }
     87 
     88 TunnelRenderer::StreamSource::~StreamSource() {
     89 }
     90 
     91 void TunnelRenderer::StreamSource::setListener(
     92         const sp<IStreamListener> &listener) {
     93     mListener = listener;
     94 }
     95 
     96 void TunnelRenderer::StreamSource::setBuffers(
     97         const Vector<sp<IMemory> > &buffers) {
     98     mBuffers = buffers;
     99 }
    100 
    101 void TunnelRenderer::StreamSource::onBufferAvailable(size_t index) {
    102     CHECK_LT(index, mBuffers.size());
    103 
    104     {
    105         Mutex::Autolock autoLock(mLock);
    106         mIndicesAvailable.push_back(index);
    107     }
    108 
    109     doSomeWork();
    110 }
    111 
    112 uint32_t TunnelRenderer::StreamSource::flags() const {
    113     return kFlagAlignedVideoData;
    114 }
    115 
    116 void TunnelRenderer::StreamSource::doSomeWork() {
    117     Mutex::Autolock autoLock(mLock);
    118 
    119     while (!mIndicesAvailable.empty()) {
    120         sp<ABuffer> srcBuffer = mOwner->dequeueBuffer();
    121         if (srcBuffer == NULL) {
    122             break;
    123         }
    124 
    125         ++mNumDeqeued;
    126 
    127         if (mNumDeqeued == 1) {
    128             ALOGI("fixing real time now.");
    129 
    130             sp<AMessage> extra = new AMessage;
    131 
    132             extra->setInt32(
    133                     IStreamListener::kKeyDiscontinuityMask,
    134                     ATSParser::DISCONTINUITY_ABSOLUTE_TIME);
    135 
    136             extra->setInt64("timeUs", ALooper::GetNowUs());
    137 
    138             mListener->issueCommand(
    139                     IStreamListener::DISCONTINUITY,
    140                     false /* synchronous */,
    141                     extra);
    142         }
    143 
    144         ALOGV("dequeue TS packet of size %d", srcBuffer->size());
    145 
    146         size_t index = *mIndicesAvailable.begin();
    147         mIndicesAvailable.erase(mIndicesAvailable.begin());
    148 
    149         sp<IMemory> mem = mBuffers.itemAt(index);
    150         CHECK_LE(srcBuffer->size(), mem->size());
    151         CHECK_EQ((srcBuffer->size() % 188), 0u);
    152 
    153         memcpy(mem->pointer(), srcBuffer->data(), srcBuffer->size());
    154         mListener->queueBuffer(index, srcBuffer->size());
    155     }
    156 }
    157 
    158 ////////////////////////////////////////////////////////////////////////////////
    159 
    160 TunnelRenderer::TunnelRenderer(
    161         const sp<AMessage> &notifyLost,
    162         const sp<ISurfaceTexture> &surfaceTex)
    163     : mNotifyLost(notifyLost),
    164       mSurfaceTex(surfaceTex),
    165       mTotalBytesQueued(0ll),
    166       mLastDequeuedExtSeqNo(-1),
    167       mFirstFailedAttemptUs(-1ll),
    168       mRequestedRetransmission(false) {
    169 }
    170 
    171 TunnelRenderer::~TunnelRenderer() {
    172     destroyPlayer();
    173 }
    174 
    175 void TunnelRenderer::queueBuffer(const sp<ABuffer> &buffer) {
    176     Mutex::Autolock autoLock(mLock);
    177 
    178     mTotalBytesQueued += buffer->size();
    179 
    180     if (mPackets.empty()) {
    181         mPackets.push_back(buffer);
    182         return;
    183     }
    184 
    185     int32_t newExtendedSeqNo = buffer->int32Data();
    186 
    187     List<sp<ABuffer> >::iterator firstIt = mPackets.begin();
    188     List<sp<ABuffer> >::iterator it = --mPackets.end();
    189     for (;;) {
    190         int32_t extendedSeqNo = (*it)->int32Data();
    191 
    192         if (extendedSeqNo == newExtendedSeqNo) {
    193             // Duplicate packet.
    194             return;
    195         }
    196 
    197         if (extendedSeqNo < newExtendedSeqNo) {
    198             // Insert new packet after the one at "it".
    199             mPackets.insert(++it, buffer);
    200             return;
    201         }
    202 
    203         if (it == firstIt) {
    204             // Insert new packet before the first existing one.
    205             mPackets.insert(it, buffer);
    206             return;
    207         }
    208 
    209         --it;
    210     }
    211 }
    212 
    213 sp<ABuffer> TunnelRenderer::dequeueBuffer() {
    214     Mutex::Autolock autoLock(mLock);
    215 
    216     sp<ABuffer> buffer;
    217     int32_t extSeqNo;
    218     while (!mPackets.empty()) {
    219         buffer = *mPackets.begin();
    220         extSeqNo = buffer->int32Data();
    221 
    222         if (mLastDequeuedExtSeqNo < 0 || extSeqNo > mLastDequeuedExtSeqNo) {
    223             break;
    224         }
    225 
    226         // This is a retransmission of a packet we've already returned.
    227 
    228         mTotalBytesQueued -= buffer->size();
    229         buffer.clear();
    230         extSeqNo = -1;
    231 
    232         mPackets.erase(mPackets.begin());
    233     }
    234 
    235     if (mPackets.empty()) {
    236         if (mFirstFailedAttemptUs < 0ll) {
    237             mFirstFailedAttemptUs = ALooper::GetNowUs();
    238             mRequestedRetransmission = false;
    239         } else {
    240             ALOGV("no packets available for %.2f secs",
    241                     (ALooper::GetNowUs() - mFirstFailedAttemptUs) / 1E6);
    242         }
    243 
    244         return NULL;
    245     }
    246 
    247     if (mLastDequeuedExtSeqNo < 0 || extSeqNo == mLastDequeuedExtSeqNo + 1) {
    248         if (mRequestedRetransmission) {
    249             ALOGI("Recovered after requesting retransmission of %d",
    250                   extSeqNo);
    251         }
    252 
    253         mLastDequeuedExtSeqNo = extSeqNo;
    254         mFirstFailedAttemptUs = -1ll;
    255         mRequestedRetransmission = false;
    256 
    257         mPackets.erase(mPackets.begin());
    258 
    259         mTotalBytesQueued -= buffer->size();
    260 
    261         return buffer;
    262     }
    263 
    264     if (mFirstFailedAttemptUs < 0ll) {
    265         mFirstFailedAttemptUs = ALooper::GetNowUs();
    266 
    267         ALOGI("failed to get the correct packet the first time.");
    268         return NULL;
    269     }
    270 
    271     if (mFirstFailedAttemptUs + 50000ll > ALooper::GetNowUs()) {
    272         // We're willing to wait a little while to get the right packet.
    273 
    274         if (!mRequestedRetransmission) {
    275             ALOGI("requesting retransmission of seqNo %d",
    276                   (mLastDequeuedExtSeqNo + 1) & 0xffff);
    277 
    278             sp<AMessage> notify = mNotifyLost->dup();
    279             notify->setInt32("seqNo", (mLastDequeuedExtSeqNo + 1) & 0xffff);
    280             notify->post();
    281 
    282             mRequestedRetransmission = true;
    283         } else {
    284             ALOGI("still waiting for the correct packet to arrive.");
    285         }
    286 
    287         return NULL;
    288     }
    289 
    290     ALOGI("dropping packet. extSeqNo %d didn't arrive in time",
    291             mLastDequeuedExtSeqNo + 1);
    292 
    293     // Permanent failure, we never received the packet.
    294     mLastDequeuedExtSeqNo = extSeqNo;
    295     mFirstFailedAttemptUs = -1ll;
    296     mRequestedRetransmission = false;
    297 
    298     mTotalBytesQueued -= buffer->size();
    299 
    300     mPackets.erase(mPackets.begin());
    301 
    302     return buffer;
    303 }
    304 
    305 void TunnelRenderer::onMessageReceived(const sp<AMessage> &msg) {
    306     switch (msg->what()) {
    307         case kWhatQueueBuffer:
    308         {
    309             sp<ABuffer> buffer;
    310             CHECK(msg->findBuffer("buffer", &buffer));
    311 
    312             queueBuffer(buffer);
    313 
    314             if (mStreamSource == NULL) {
    315                 if (mTotalBytesQueued > 0ll) {
    316                     initPlayer();
    317                 } else {
    318                     ALOGI("Have %lld bytes queued...", mTotalBytesQueued);
    319                 }
    320             } else {
    321                 mStreamSource->doSomeWork();
    322             }
    323             break;
    324         }
    325 
    326         default:
    327             TRESPASS();
    328     }
    329 }
    330 
    331 void TunnelRenderer::initPlayer() {
    332     if (mSurfaceTex == NULL) {
    333         mComposerClient = new SurfaceComposerClient;
    334         CHECK_EQ(mComposerClient->initCheck(), (status_t)OK);
    335 
    336         DisplayInfo info;
    337         SurfaceComposerClient::getDisplayInfo(0, &info);
    338         ssize_t displayWidth = info.w;
    339         ssize_t displayHeight = info.h;
    340 
    341         mSurfaceControl =
    342             mComposerClient->createSurface(
    343                     String8("A Surface"),
    344                     displayWidth,
    345                     displayHeight,
    346                     PIXEL_FORMAT_RGB_565,
    347                     0);
    348 
    349         CHECK(mSurfaceControl != NULL);
    350         CHECK(mSurfaceControl->isValid());
    351 
    352         SurfaceComposerClient::openGlobalTransaction();
    353         CHECK_EQ(mSurfaceControl->setLayer(INT_MAX), (status_t)OK);
    354         CHECK_EQ(mSurfaceControl->show(), (status_t)OK);
    355         SurfaceComposerClient::closeGlobalTransaction();
    356 
    357         mSurface = mSurfaceControl->getSurface();
    358         CHECK(mSurface != NULL);
    359     }
    360 
    361     sp<IServiceManager> sm = defaultServiceManager();
    362     sp<IBinder> binder = sm->getService(String16("media.player"));
    363     sp<IMediaPlayerService> service = interface_cast<IMediaPlayerService>(binder);
    364     CHECK(service.get() != NULL);
    365 
    366     mStreamSource = new StreamSource(this);
    367 
    368     mPlayerClient = new PlayerClient;
    369 
    370     mPlayer = service->create(getpid(), mPlayerClient, 0);
    371     CHECK(mPlayer != NULL);
    372     CHECK_EQ(mPlayer->setDataSource(mStreamSource), (status_t)OK);
    373 
    374     mPlayer->setVideoSurfaceTexture(
    375             mSurfaceTex != NULL ? mSurfaceTex : mSurface->getSurfaceTexture());
    376 
    377     mPlayer->start();
    378 }
    379 
    380 void TunnelRenderer::destroyPlayer() {
    381     mStreamSource.clear();
    382 
    383     mPlayer->stop();
    384     mPlayer.clear();
    385 
    386     if (mSurfaceTex == NULL) {
    387         mSurface.clear();
    388         mSurfaceControl.clear();
    389 
    390         mComposerClient->dispose();
    391         mComposerClient.clear();
    392     }
    393 }
    394 
    395 }  // namespace android
    396 
    397