Home | History | Annotate | Download | only in base
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 //#define LOG_NDEBUG 0
     18 #define LOG_TAG "SimpleC2Component"
     19 #include <log/log.h>
     20 
     21 #include <cutils/properties.h>
     22 
     23 #include <inttypes.h>
     24 
     25 #include <C2Config.h>
     26 #include <C2Debug.h>
     27 #include <C2PlatformSupport.h>
     28 #include <SimpleC2Component.h>
     29 
     30 namespace android {
     31 
     32 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() {
     33     std::unique_ptr<C2Work> work = std::move(mQueue.front().work);
     34     mQueue.pop_front();
     35     return work;
     36 }
     37 
     38 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) {
     39     mQueue.push_back({ std::move(work), NO_DRAIN });
     40 }
     41 
     42 bool SimpleC2Component::WorkQueue::empty() const {
     43     return mQueue.empty();
     44 }
     45 
     46 void SimpleC2Component::WorkQueue::clear() {
     47     mQueue.clear();
     48 }
     49 
     50 uint32_t SimpleC2Component::WorkQueue::drainMode() const {
     51     return mQueue.front().drainMode;
     52 }
     53 
     54 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) {
     55     mQueue.push_back({ nullptr, drainMode });
     56 }
     57 
     58 ////////////////////////////////////////////////////////////////////////////////
     59 
     60 namespace {
     61 
     62 struct DummyReadView : public C2ReadView {
     63     DummyReadView() : C2ReadView(C2_NO_INIT) {}
     64 };
     65 
     66 }  // namespace
     67 
     68 SimpleC2Component::SimpleC2Component(
     69         const std::shared_ptr<C2ComponentInterface> &intf)
     70     : mDummyReadView(DummyReadView()),
     71       mIntf(intf) {
     72 }
     73 
     74 c2_status_t SimpleC2Component::setListener_vb(
     75         const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) {
     76     Mutexed<ExecState>::Locked state(mExecState);
     77     if (state->mState == RUNNING) {
     78         if (listener) {
     79             return C2_BAD_STATE;
     80         } else if (!mayBlock) {
     81             return C2_BLOCKING;
     82         }
     83     }
     84     state->mListener = listener;
     85     // TODO: wait for listener change to have taken place before returning
     86     // (e.g. if there is an ongoing listener callback)
     87     return C2_OK;
     88 }
     89 
     90 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) {
     91     {
     92         Mutexed<ExecState>::Locked state(mExecState);
     93         if (state->mState != RUNNING) {
     94             return C2_BAD_STATE;
     95         }
     96     }
     97     {
     98         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
     99         while (!items->empty()) {
    100             queue->push_back(std::move(items->front()));
    101             items->pop_front();
    102         }
    103         queue->mCondition.broadcast();
    104     }
    105     return C2_OK;
    106 }
    107 
    108 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) {
    109     (void)items;
    110     return C2_OMITTED;
    111 }
    112 
    113 c2_status_t SimpleC2Component::flush_sm(
    114         flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) {
    115     (void)flushMode;
    116     {
    117         Mutexed<ExecState>::Locked state(mExecState);
    118         if (state->mState != RUNNING) {
    119             return C2_BAD_STATE;
    120         }
    121     }
    122     {
    123         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    124         queue->incGeneration();
    125         // TODO: queue->splicedBy(flushedWork, flushedWork->end());
    126         while (!queue->empty()) {
    127             std::unique_ptr<C2Work> work = queue->pop_front();
    128             if (work) {
    129                 flushedWork->push_back(std::move(work));
    130             }
    131         }
    132     }
    133     {
    134         Mutexed<PendingWork>::Locked pending(mPendingWork);
    135         while (!pending->empty()) {
    136             flushedWork->push_back(std::move(pending->begin()->second));
    137             pending->erase(pending->begin());
    138         }
    139     }
    140 
    141     return C2_OK;
    142 }
    143 
    144 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) {
    145     if (drainMode == DRAIN_CHAIN) {
    146         return C2_OMITTED;
    147     }
    148     {
    149         Mutexed<ExecState>::Locked state(mExecState);
    150         if (state->mState != RUNNING) {
    151             return C2_BAD_STATE;
    152         }
    153     }
    154     {
    155         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    156         queue->markDrain(drainMode);
    157         queue->mCondition.broadcast();
    158     }
    159 
    160     return C2_OK;
    161 }
    162 
    163 c2_status_t SimpleC2Component::start() {
    164     Mutexed<ExecState>::Locked state(mExecState);
    165     if (state->mState == RUNNING) {
    166         return C2_BAD_STATE;
    167     }
    168     bool needsInit = (state->mState == UNINITIALIZED);
    169     if (needsInit) {
    170         state.unlock();
    171         c2_status_t err = onInit();
    172         if (err != C2_OK) {
    173             return err;
    174         }
    175         state.lock();
    176     }
    177     if (!state->mThread.joinable()) {
    178         mExitRequested = false;
    179         {
    180             Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
    181             monitor->mExited = false;
    182         }
    183         state->mThread = std::thread(
    184                 [](std::weak_ptr<SimpleC2Component> wp) {
    185                     while (true) {
    186                         std::shared_ptr<SimpleC2Component> thiz = wp.lock();
    187                         if (!thiz) {
    188                             return;
    189                         }
    190                         if (thiz->exitRequested()) {
    191                             ALOGV("stop processing");
    192                             thiz->signalExit();
    193                             return;
    194                         }
    195                         thiz->processQueue();
    196                     }
    197                 },
    198                 shared_from_this());
    199     }
    200     state->mState = RUNNING;
    201     return C2_OK;
    202 }
    203 
    204 void SimpleC2Component::signalExit() {
    205     Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
    206     monitor->mExited = true;
    207     monitor->mCondition.broadcast();
    208 }
    209 
    210 void SimpleC2Component::requestExitAndWait(std::function<void()> job) {
    211     {
    212         Mutexed<ExecState>::Locked state(mExecState);
    213         if (!state->mThread.joinable()) {
    214             return;
    215         }
    216     }
    217     mExitRequested = true;
    218     {
    219         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    220         queue->mCondition.broadcast();
    221     }
    222     // TODO: timeout?
    223     {
    224         Mutexed<ExitMonitor>::Locked monitor(mExitMonitor);
    225         while (!monitor->mExited) {
    226             monitor.waitForCondition(monitor->mCondition);
    227         }
    228         job();
    229     }
    230     Mutexed<ExecState>::Locked state(mExecState);
    231     if (state->mThread.joinable()) {
    232         ALOGV("joining the processing thread");
    233         state->mThread.join();
    234         ALOGV("joined the processing thread");
    235     }
    236 }
    237 
    238 c2_status_t SimpleC2Component::stop() {
    239     ALOGV("stop");
    240     {
    241         Mutexed<ExecState>::Locked state(mExecState);
    242         if (state->mState != RUNNING) {
    243             return C2_BAD_STATE;
    244         }
    245         state->mState = STOPPED;
    246     }
    247     {
    248         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    249         queue->clear();
    250     }
    251     {
    252         Mutexed<PendingWork>::Locked pending(mPendingWork);
    253         pending->clear();
    254     }
    255     c2_status_t err;
    256     requestExitAndWait([this, &err]{ err = onStop(); });
    257     if (err != C2_OK) {
    258         return err;
    259     }
    260     return C2_OK;
    261 }
    262 
    263 c2_status_t SimpleC2Component::reset() {
    264     ALOGV("reset");
    265     {
    266         Mutexed<ExecState>::Locked state(mExecState);
    267         state->mState = UNINITIALIZED;
    268     }
    269     {
    270         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    271         queue->clear();
    272     }
    273     {
    274         Mutexed<PendingWork>::Locked pending(mPendingWork);
    275         pending->clear();
    276     }
    277     requestExitAndWait([this]{ onReset(); });
    278     return C2_OK;
    279 }
    280 
    281 c2_status_t SimpleC2Component::release() {
    282     ALOGV("release");
    283     requestExitAndWait([this]{ onRelease(); });
    284     return C2_OK;
    285 }
    286 
    287 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() {
    288     return mIntf;
    289 }
    290 
    291 namespace {
    292 
    293 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) {
    294     std::list<std::unique_ptr<C2Work>> ret;
    295     ret.push_back(std::move(work));
    296     return ret;
    297 }
    298 
    299 }  // namespace
    300 
    301 void SimpleC2Component::finish(
    302         uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) {
    303     std::unique_ptr<C2Work> work;
    304     {
    305         Mutexed<PendingWork>::Locked pending(mPendingWork);
    306         if (pending->count(frameIndex) == 0) {
    307             ALOGW("unknown frame index: %" PRIu64, frameIndex);
    308             return;
    309         }
    310         work = std::move(pending->at(frameIndex));
    311         pending->erase(frameIndex);
    312     }
    313     if (work) {
    314         fillWork(work);
    315         Mutexed<ExecState>::Locked state(mExecState);
    316         std::shared_ptr<C2Component::Listener> listener = state->mListener;
    317         state.unlock();
    318         listener->onWorkDone_nb(shared_from_this(), vec(work));
    319         ALOGV("returning pending work");
    320     }
    321 }
    322 
    323 void SimpleC2Component::processQueue() {
    324     std::unique_ptr<C2Work> work;
    325     uint64_t generation;
    326     int32_t drainMode;
    327     bool isFlushPending = false;
    328     {
    329         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    330         nsecs_t deadline = systemTime() + ms2ns(250);
    331         while (queue->empty()) {
    332             if (exitRequested()) {
    333                 return;
    334             }
    335             nsecs_t now = systemTime();
    336             if (now >= deadline) {
    337                 return;
    338             }
    339             status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now);
    340             if (err == TIMED_OUT) {
    341                 return;
    342             }
    343         }
    344 
    345         generation = queue->generation();
    346         drainMode = queue->drainMode();
    347         isFlushPending = queue->popPendingFlush();
    348         work = queue->pop_front();
    349     }
    350     if (isFlushPending) {
    351         ALOGV("processing pending flush");
    352         c2_status_t err = onFlush_sm();
    353         if (err != C2_OK) {
    354             ALOGD("flush err: %d", err);
    355             // TODO: error
    356         }
    357     }
    358 
    359     if (!mOutputBlockPool) {
    360         c2_status_t err = [this] {
    361             // TODO: don't use query_vb
    362             C2StreamFormatConfig::output outputFormat(0u);
    363             std::vector<std::unique_ptr<C2Param>> params;
    364             c2_status_t err = intf()->query_vb(
    365                     { &outputFormat },
    366                     { C2PortBlockPoolsTuning::output::PARAM_TYPE },
    367                     C2_DONT_BLOCK,
    368                     &params);
    369             if (err != C2_OK && err != C2_BAD_INDEX) {
    370                 ALOGD("query err = %d", err);
    371                 return err;
    372             }
    373             C2BlockPool::local_id_t poolId =
    374                 outputFormat.value == C2FormatVideo
    375                         ? C2BlockPool::BASIC_GRAPHIC
    376                         : C2BlockPool::BASIC_LINEAR;
    377             if (params.size()) {
    378                 C2PortBlockPoolsTuning::output *outputPools =
    379                     C2PortBlockPoolsTuning::output::From(params[0].get());
    380                 if (outputPools && outputPools->flexCount() >= 1) {
    381                     poolId = outputPools->m.values[0];
    382                 }
    383             }
    384 
    385             err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool);
    386             ALOGD("Using output block pool with poolID %llu => got %llu - %d",
    387                     (unsigned long long)poolId,
    388                     (unsigned long long)(
    389                             mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111),
    390                     err);
    391             return err;
    392         }();
    393         if (err != C2_OK) {
    394             Mutexed<ExecState>::Locked state(mExecState);
    395             std::shared_ptr<C2Component::Listener> listener = state->mListener;
    396             state.unlock();
    397             listener->onError_nb(shared_from_this(), err);
    398             return;
    399         }
    400     }
    401 
    402     if (!work) {
    403         c2_status_t err = drain(drainMode, mOutputBlockPool);
    404         if (err != C2_OK) {
    405             Mutexed<ExecState>::Locked state(mExecState);
    406             std::shared_ptr<C2Component::Listener> listener = state->mListener;
    407             state.unlock();
    408             listener->onError_nb(shared_from_this(), err);
    409         }
    410         return;
    411     }
    412 
    413     process(work, mOutputBlockPool);
    414     ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku());
    415     {
    416         Mutexed<WorkQueue>::Locked queue(mWorkQueue);
    417         if (queue->generation() != generation) {
    418             ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64,
    419                     queue->generation(), generation);
    420             work->result = C2_NOT_FOUND;
    421             queue.unlock();
    422             {
    423                 Mutexed<ExecState>::Locked state(mExecState);
    424                 std::shared_ptr<C2Component::Listener> listener = state->mListener;
    425                 state.unlock();
    426                 listener->onWorkDone_nb(shared_from_this(), vec(work));
    427             }
    428             queue.lock();
    429             return;
    430         }
    431     }
    432     if (work->workletsProcessed != 0u) {
    433         Mutexed<ExecState>::Locked state(mExecState);
    434         ALOGV("returning this work");
    435         std::shared_ptr<C2Component::Listener> listener = state->mListener;
    436         state.unlock();
    437         listener->onWorkDone_nb(shared_from_this(), vec(work));
    438     } else {
    439         ALOGV("queue pending work");
    440         std::unique_ptr<C2Work> unexpected;
    441         {
    442             Mutexed<PendingWork>::Locked pending(mPendingWork);
    443             uint64_t frameIndex = work->input.ordinal.frameIndex.peeku();
    444             if (pending->count(frameIndex) != 0) {
    445                 unexpected = std::move(pending->at(frameIndex));
    446                 pending->erase(frameIndex);
    447             }
    448             (void)pending->insert({ frameIndex, std::move(work) });
    449         }
    450         if (unexpected) {
    451             ALOGD("unexpected pending work");
    452             unexpected->result = C2_CORRUPTED;
    453             Mutexed<ExecState>::Locked state(mExecState);
    454             std::shared_ptr<C2Component::Listener> listener = state->mListener;
    455             state.unlock();
    456             listener->onWorkDone_nb(shared_from_this(), vec(unexpected));
    457         }
    458     }
    459 }
    460 
    461 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
    462         const std::shared_ptr<C2LinearBlock> &block) {
    463     return createLinearBuffer(block, block->offset(), block->size());
    464 }
    465 
    466 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer(
    467         const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) {
    468     return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence()));
    469 }
    470 
    471 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
    472         const std::shared_ptr<C2GraphicBlock> &block) {
    473     return createGraphicBuffer(block, C2Rect(block->width(), block->height()));
    474 }
    475 
    476 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer(
    477         const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) {
    478     return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence()));
    479 }
    480 
    481 } // namespace android
    482