1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 //#define LOG_NDEBUG 0 18 #define LOG_TAG "SimpleC2Component" 19 #include <log/log.h> 20 21 #include <cutils/properties.h> 22 23 #include <inttypes.h> 24 25 #include <C2Config.h> 26 #include <C2Debug.h> 27 #include <C2PlatformSupport.h> 28 #include <SimpleC2Component.h> 29 30 namespace android { 31 32 std::unique_ptr<C2Work> SimpleC2Component::WorkQueue::pop_front() { 33 std::unique_ptr<C2Work> work = std::move(mQueue.front().work); 34 mQueue.pop_front(); 35 return work; 36 } 37 38 void SimpleC2Component::WorkQueue::push_back(std::unique_ptr<C2Work> work) { 39 mQueue.push_back({ std::move(work), NO_DRAIN }); 40 } 41 42 bool SimpleC2Component::WorkQueue::empty() const { 43 return mQueue.empty(); 44 } 45 46 void SimpleC2Component::WorkQueue::clear() { 47 mQueue.clear(); 48 } 49 50 uint32_t SimpleC2Component::WorkQueue::drainMode() const { 51 return mQueue.front().drainMode; 52 } 53 54 void SimpleC2Component::WorkQueue::markDrain(uint32_t drainMode) { 55 mQueue.push_back({ nullptr, drainMode }); 56 } 57 58 //////////////////////////////////////////////////////////////////////////////// 59 60 namespace { 61 62 struct DummyReadView : public C2ReadView { 63 DummyReadView() : C2ReadView(C2_NO_INIT) {} 64 }; 65 66 } // namespace 67 68 SimpleC2Component::SimpleC2Component( 69 const std::shared_ptr<C2ComponentInterface> &intf) 70 : mDummyReadView(DummyReadView()), 71 mIntf(intf) { 72 } 73 74 c2_status_t SimpleC2Component::setListener_vb( 75 const std::shared_ptr<C2Component::Listener> &listener, c2_blocking_t mayBlock) { 76 Mutexed<ExecState>::Locked state(mExecState); 77 if (state->mState == RUNNING) { 78 if (listener) { 79 return C2_BAD_STATE; 80 } else if (!mayBlock) { 81 return C2_BLOCKING; 82 } 83 } 84 state->mListener = listener; 85 // TODO: wait for listener change to have taken place before returning 86 // (e.g. if there is an ongoing listener callback) 87 return C2_OK; 88 } 89 90 c2_status_t SimpleC2Component::queue_nb(std::list<std::unique_ptr<C2Work>> * const items) { 91 { 92 Mutexed<ExecState>::Locked state(mExecState); 93 if (state->mState != RUNNING) { 94 return C2_BAD_STATE; 95 } 96 } 97 { 98 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 99 while (!items->empty()) { 100 queue->push_back(std::move(items->front())); 101 items->pop_front(); 102 } 103 queue->mCondition.broadcast(); 104 } 105 return C2_OK; 106 } 107 108 c2_status_t SimpleC2Component::announce_nb(const std::vector<C2WorkOutline> &items) { 109 (void)items; 110 return C2_OMITTED; 111 } 112 113 c2_status_t SimpleC2Component::flush_sm( 114 flush_mode_t flushMode, std::list<std::unique_ptr<C2Work>>* const flushedWork) { 115 (void)flushMode; 116 { 117 Mutexed<ExecState>::Locked state(mExecState); 118 if (state->mState != RUNNING) { 119 return C2_BAD_STATE; 120 } 121 } 122 { 123 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 124 queue->incGeneration(); 125 // TODO: queue->splicedBy(flushedWork, flushedWork->end()); 126 while (!queue->empty()) { 127 std::unique_ptr<C2Work> work = queue->pop_front(); 128 if (work) { 129 flushedWork->push_back(std::move(work)); 130 } 131 } 132 } 133 { 134 Mutexed<PendingWork>::Locked pending(mPendingWork); 135 while (!pending->empty()) { 136 flushedWork->push_back(std::move(pending->begin()->second)); 137 pending->erase(pending->begin()); 138 } 139 } 140 141 return C2_OK; 142 } 143 144 c2_status_t SimpleC2Component::drain_nb(drain_mode_t drainMode) { 145 if (drainMode == DRAIN_CHAIN) { 146 return C2_OMITTED; 147 } 148 { 149 Mutexed<ExecState>::Locked state(mExecState); 150 if (state->mState != RUNNING) { 151 return C2_BAD_STATE; 152 } 153 } 154 { 155 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 156 queue->markDrain(drainMode); 157 queue->mCondition.broadcast(); 158 } 159 160 return C2_OK; 161 } 162 163 c2_status_t SimpleC2Component::start() { 164 Mutexed<ExecState>::Locked state(mExecState); 165 if (state->mState == RUNNING) { 166 return C2_BAD_STATE; 167 } 168 bool needsInit = (state->mState == UNINITIALIZED); 169 if (needsInit) { 170 state.unlock(); 171 c2_status_t err = onInit(); 172 if (err != C2_OK) { 173 return err; 174 } 175 state.lock(); 176 } 177 if (!state->mThread.joinable()) { 178 mExitRequested = false; 179 { 180 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); 181 monitor->mExited = false; 182 } 183 state->mThread = std::thread( 184 [](std::weak_ptr<SimpleC2Component> wp) { 185 while (true) { 186 std::shared_ptr<SimpleC2Component> thiz = wp.lock(); 187 if (!thiz) { 188 return; 189 } 190 if (thiz->exitRequested()) { 191 ALOGV("stop processing"); 192 thiz->signalExit(); 193 return; 194 } 195 thiz->processQueue(); 196 } 197 }, 198 shared_from_this()); 199 } 200 state->mState = RUNNING; 201 return C2_OK; 202 } 203 204 void SimpleC2Component::signalExit() { 205 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); 206 monitor->mExited = true; 207 monitor->mCondition.broadcast(); 208 } 209 210 void SimpleC2Component::requestExitAndWait(std::function<void()> job) { 211 { 212 Mutexed<ExecState>::Locked state(mExecState); 213 if (!state->mThread.joinable()) { 214 return; 215 } 216 } 217 mExitRequested = true; 218 { 219 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 220 queue->mCondition.broadcast(); 221 } 222 // TODO: timeout? 223 { 224 Mutexed<ExitMonitor>::Locked monitor(mExitMonitor); 225 while (!monitor->mExited) { 226 monitor.waitForCondition(monitor->mCondition); 227 } 228 job(); 229 } 230 Mutexed<ExecState>::Locked state(mExecState); 231 if (state->mThread.joinable()) { 232 ALOGV("joining the processing thread"); 233 state->mThread.join(); 234 ALOGV("joined the processing thread"); 235 } 236 } 237 238 c2_status_t SimpleC2Component::stop() { 239 ALOGV("stop"); 240 { 241 Mutexed<ExecState>::Locked state(mExecState); 242 if (state->mState != RUNNING) { 243 return C2_BAD_STATE; 244 } 245 state->mState = STOPPED; 246 } 247 { 248 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 249 queue->clear(); 250 } 251 { 252 Mutexed<PendingWork>::Locked pending(mPendingWork); 253 pending->clear(); 254 } 255 c2_status_t err; 256 requestExitAndWait([this, &err]{ err = onStop(); }); 257 if (err != C2_OK) { 258 return err; 259 } 260 return C2_OK; 261 } 262 263 c2_status_t SimpleC2Component::reset() { 264 ALOGV("reset"); 265 { 266 Mutexed<ExecState>::Locked state(mExecState); 267 state->mState = UNINITIALIZED; 268 } 269 { 270 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 271 queue->clear(); 272 } 273 { 274 Mutexed<PendingWork>::Locked pending(mPendingWork); 275 pending->clear(); 276 } 277 requestExitAndWait([this]{ onReset(); }); 278 return C2_OK; 279 } 280 281 c2_status_t SimpleC2Component::release() { 282 ALOGV("release"); 283 requestExitAndWait([this]{ onRelease(); }); 284 return C2_OK; 285 } 286 287 std::shared_ptr<C2ComponentInterface> SimpleC2Component::intf() { 288 return mIntf; 289 } 290 291 namespace { 292 293 std::list<std::unique_ptr<C2Work>> vec(std::unique_ptr<C2Work> &work) { 294 std::list<std::unique_ptr<C2Work>> ret; 295 ret.push_back(std::move(work)); 296 return ret; 297 } 298 299 } // namespace 300 301 void SimpleC2Component::finish( 302 uint64_t frameIndex, std::function<void(const std::unique_ptr<C2Work> &)> fillWork) { 303 std::unique_ptr<C2Work> work; 304 { 305 Mutexed<PendingWork>::Locked pending(mPendingWork); 306 if (pending->count(frameIndex) == 0) { 307 ALOGW("unknown frame index: %" PRIu64, frameIndex); 308 return; 309 } 310 work = std::move(pending->at(frameIndex)); 311 pending->erase(frameIndex); 312 } 313 if (work) { 314 fillWork(work); 315 Mutexed<ExecState>::Locked state(mExecState); 316 std::shared_ptr<C2Component::Listener> listener = state->mListener; 317 state.unlock(); 318 listener->onWorkDone_nb(shared_from_this(), vec(work)); 319 ALOGV("returning pending work"); 320 } 321 } 322 323 void SimpleC2Component::processQueue() { 324 std::unique_ptr<C2Work> work; 325 uint64_t generation; 326 int32_t drainMode; 327 bool isFlushPending = false; 328 { 329 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 330 nsecs_t deadline = systemTime() + ms2ns(250); 331 while (queue->empty()) { 332 if (exitRequested()) { 333 return; 334 } 335 nsecs_t now = systemTime(); 336 if (now >= deadline) { 337 return; 338 } 339 status_t err = queue.waitForConditionRelative(queue->mCondition, deadline - now); 340 if (err == TIMED_OUT) { 341 return; 342 } 343 } 344 345 generation = queue->generation(); 346 drainMode = queue->drainMode(); 347 isFlushPending = queue->popPendingFlush(); 348 work = queue->pop_front(); 349 } 350 if (isFlushPending) { 351 ALOGV("processing pending flush"); 352 c2_status_t err = onFlush_sm(); 353 if (err != C2_OK) { 354 ALOGD("flush err: %d", err); 355 // TODO: error 356 } 357 } 358 359 if (!mOutputBlockPool) { 360 c2_status_t err = [this] { 361 // TODO: don't use query_vb 362 C2StreamFormatConfig::output outputFormat(0u); 363 std::vector<std::unique_ptr<C2Param>> params; 364 c2_status_t err = intf()->query_vb( 365 { &outputFormat }, 366 { C2PortBlockPoolsTuning::output::PARAM_TYPE }, 367 C2_DONT_BLOCK, 368 ¶ms); 369 if (err != C2_OK && err != C2_BAD_INDEX) { 370 ALOGD("query err = %d", err); 371 return err; 372 } 373 C2BlockPool::local_id_t poolId = 374 outputFormat.value == C2FormatVideo 375 ? C2BlockPool::BASIC_GRAPHIC 376 : C2BlockPool::BASIC_LINEAR; 377 if (params.size()) { 378 C2PortBlockPoolsTuning::output *outputPools = 379 C2PortBlockPoolsTuning::output::From(params[0].get()); 380 if (outputPools && outputPools->flexCount() >= 1) { 381 poolId = outputPools->m.values[0]; 382 } 383 } 384 385 err = GetCodec2BlockPool(poolId, shared_from_this(), &mOutputBlockPool); 386 ALOGD("Using output block pool with poolID %llu => got %llu - %d", 387 (unsigned long long)poolId, 388 (unsigned long long)( 389 mOutputBlockPool ? mOutputBlockPool->getLocalId() : 111000111), 390 err); 391 return err; 392 }(); 393 if (err != C2_OK) { 394 Mutexed<ExecState>::Locked state(mExecState); 395 std::shared_ptr<C2Component::Listener> listener = state->mListener; 396 state.unlock(); 397 listener->onError_nb(shared_from_this(), err); 398 return; 399 } 400 } 401 402 if (!work) { 403 c2_status_t err = drain(drainMode, mOutputBlockPool); 404 if (err != C2_OK) { 405 Mutexed<ExecState>::Locked state(mExecState); 406 std::shared_ptr<C2Component::Listener> listener = state->mListener; 407 state.unlock(); 408 listener->onError_nb(shared_from_this(), err); 409 } 410 return; 411 } 412 413 process(work, mOutputBlockPool); 414 ALOGV("processed frame #%" PRIu64, work->input.ordinal.frameIndex.peeku()); 415 { 416 Mutexed<WorkQueue>::Locked queue(mWorkQueue); 417 if (queue->generation() != generation) { 418 ALOGD("work form old generation: was %" PRIu64 " now %" PRIu64, 419 queue->generation(), generation); 420 work->result = C2_NOT_FOUND; 421 queue.unlock(); 422 { 423 Mutexed<ExecState>::Locked state(mExecState); 424 std::shared_ptr<C2Component::Listener> listener = state->mListener; 425 state.unlock(); 426 listener->onWorkDone_nb(shared_from_this(), vec(work)); 427 } 428 queue.lock(); 429 return; 430 } 431 } 432 if (work->workletsProcessed != 0u) { 433 Mutexed<ExecState>::Locked state(mExecState); 434 ALOGV("returning this work"); 435 std::shared_ptr<C2Component::Listener> listener = state->mListener; 436 state.unlock(); 437 listener->onWorkDone_nb(shared_from_this(), vec(work)); 438 } else { 439 ALOGV("queue pending work"); 440 std::unique_ptr<C2Work> unexpected; 441 { 442 Mutexed<PendingWork>::Locked pending(mPendingWork); 443 uint64_t frameIndex = work->input.ordinal.frameIndex.peeku(); 444 if (pending->count(frameIndex) != 0) { 445 unexpected = std::move(pending->at(frameIndex)); 446 pending->erase(frameIndex); 447 } 448 (void)pending->insert({ frameIndex, std::move(work) }); 449 } 450 if (unexpected) { 451 ALOGD("unexpected pending work"); 452 unexpected->result = C2_CORRUPTED; 453 Mutexed<ExecState>::Locked state(mExecState); 454 std::shared_ptr<C2Component::Listener> listener = state->mListener; 455 state.unlock(); 456 listener->onWorkDone_nb(shared_from_this(), vec(unexpected)); 457 } 458 } 459 } 460 461 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer( 462 const std::shared_ptr<C2LinearBlock> &block) { 463 return createLinearBuffer(block, block->offset(), block->size()); 464 } 465 466 std::shared_ptr<C2Buffer> SimpleC2Component::createLinearBuffer( 467 const std::shared_ptr<C2LinearBlock> &block, size_t offset, size_t size) { 468 return C2Buffer::CreateLinearBuffer(block->share(offset, size, ::C2Fence())); 469 } 470 471 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer( 472 const std::shared_ptr<C2GraphicBlock> &block) { 473 return createGraphicBuffer(block, C2Rect(block->width(), block->height())); 474 } 475 476 std::shared_ptr<C2Buffer> SimpleC2Component::createGraphicBuffer( 477 const std::shared_ptr<C2GraphicBlock> &block, const C2Rect &crop) { 478 return C2Buffer::CreateGraphicBuffer(block->share(crop, ::C2Fence())); 479 } 480 481 } // namespace android 482