1 #include "include/private/dvr/buffer_hub_queue_client.h" 2 3 #include <inttypes.h> 4 #include <log/log.h> 5 #include <poll.h> 6 #include <sys/epoll.h> 7 8 #include <array> 9 10 #include <pdx/default_transport/client_channel.h> 11 #include <pdx/default_transport/client_channel_factory.h> 12 #include <pdx/file_handle.h> 13 #include <pdx/trace.h> 14 15 #define RETRY_EINTR(fnc_call) \ 16 ([&]() -> decltype(fnc_call) { \ 17 decltype(fnc_call) result; \ 18 do { \ 19 result = (fnc_call); \ 20 } while (result == -1 && errno == EINTR); \ 21 return result; \ 22 })() 23 24 using android::pdx::ErrorStatus; 25 using android::pdx::LocalChannelHandle; 26 using android::pdx::LocalHandle; 27 using android::pdx::Status; 28 29 namespace android { 30 namespace dvr { 31 32 namespace { 33 34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) { 35 return {static_cast<int32_t>(value >> 32), 36 static_cast<int32_t>(value & ((1ull << 32) - 1))}; 37 } 38 39 uint64_t Stuff(int32_t a, int32_t b) { 40 const uint32_t ua = static_cast<uint32_t>(a); 41 const uint32_t ub = static_cast<uint32_t>(b); 42 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub); 43 } 44 45 } // anonymous namespace 46 47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle) 48 : Client{pdx::default_transport::ClientChannel::Create( 49 std::move(channel_handle))} { 50 Initialize(); 51 } 52 53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path) 54 : Client{ 55 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} { 56 Initialize(); 57 } 58 59 void BufferHubQueue::Initialize() { 60 int ret = epoll_fd_.Create(); 61 if (ret < 0) { 62 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s", 63 strerror(-ret)); 64 return; 65 } 66 67 epoll_event event = { 68 .events = EPOLLIN | EPOLLET, 69 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; 70 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); 71 if (ret < 0) { 72 ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__, 73 strerror(-ret)); 74 } 75 } 76 77 Status<void> BufferHubQueue::ImportQueue() { 78 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); 79 if (!status) { 80 ALOGE("%s: Failed to import queue: %s", __FUNCTION__, 81 status.GetErrorMessage().c_str()); 82 return ErrorStatus(status.error()); 83 } else { 84 SetupQueue(status.get()); 85 return {}; 86 } 87 } 88 89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) { 90 is_async_ = queue_info.producer_config.is_async; 91 default_width_ = queue_info.producer_config.default_width; 92 default_height_ = queue_info.producer_config.default_height; 93 default_format_ = queue_info.producer_config.default_format; 94 user_metadata_size_ = queue_info.producer_config.user_metadata_size; 95 id_ = queue_info.id; 96 } 97 98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() { 99 if (auto status = CreateConsumerQueueHandle(/*silent*/ false)) 100 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 101 else 102 return nullptr; 103 } 104 105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() { 106 if (auto status = CreateConsumerQueueHandle(/*silent*/ true)) 107 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 108 else 109 return nullptr; 110 } 111 112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle( 113 bool silent) { 114 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent); 115 if (!status) { 116 ALOGE( 117 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: " 118 "%s", 119 status.GetErrorMessage().c_str()); 120 return ErrorStatus(status.error()); 121 } 122 123 return status; 124 } 125 126 pdx::Status<ConsumerQueueParcelable> 127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) { 128 auto status = CreateConsumerQueueHandle(silent); 129 if (!status) 130 return status.error_status(); 131 132 // A temporary consumer queue client to pull its channel parcelable. 133 auto consumer_queue = 134 std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 135 ConsumerQueueParcelable queue_parcelable( 136 consumer_queue->GetChannel()->TakeChannelParcelable()); 137 138 if (!queue_parcelable.IsValid()) { 139 ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__); 140 return ErrorStatus(EINVAL); 141 } 142 143 return {std::move(queue_parcelable)}; 144 } 145 146 bool BufferHubQueue::WaitForBuffers(int timeout) { 147 ATRACE_NAME("BufferHubQueue::WaitForBuffers"); 148 std::array<epoll_event, kMaxEvents> events; 149 150 // Loop at least once to check for hangups. 151 do { 152 ALOGD_IF( 153 TRACE, 154 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu", 155 id(), count(), capacity()); 156 157 // If there is already a buffer then just check for hangup without waiting. 158 const int ret = epoll_fd_.Wait(events.data(), events.size(), 159 count() == 0 ? timeout : 0); 160 161 if (ret == 0) { 162 ALOGI_IF(TRACE, 163 "BufferHubQueue::WaitForBuffers: No events before timeout: " 164 "queue_id=%d", 165 id()); 166 return count() != 0; 167 } 168 169 if (ret < 0 && ret != -EINTR) { 170 ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret)); 171 return false; 172 } 173 174 const int num_events = ret; 175 176 // A BufferQueue's epoll fd tracks N+1 events, where there are N events, 177 // one for each buffer in the queue, and one extra event for the queue 178 // client itself. 179 for (int i = 0; i < num_events; i++) { 180 int32_t event_fd; 181 int32_t index; 182 std::tie(event_fd, index) = Unstuff(events[i].data.u64); 183 184 PDX_TRACE_FORMAT( 185 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;" 186 "slot=%d|", 187 id(), num_events, i, event_fd, index); 188 189 ALOGD_IF(TRACE, 190 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d", 191 i, event_fd, index); 192 193 if (is_buffer_event_index(index)) { 194 HandleBufferEvent(static_cast<size_t>(index), event_fd, 195 events[i].events); 196 } else if (is_queue_event_index(index)) { 197 HandleQueueEvent(events[i].events); 198 } else { 199 ALOGW( 200 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d " 201 "index=%d", 202 event_fd, index); 203 } 204 } 205 } while (count() == 0 && capacity() > 0 && !hung_up()); 206 207 return count() != 0; 208 } 209 210 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, 211 int poll_events) { 212 ATRACE_NAME("BufferHubQueue::HandleBufferEvent"); 213 if (!buffers_[slot]) { 214 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot); 215 return ErrorStatus(ENOENT); 216 } 217 218 auto status = buffers_[slot]->GetEventMask(poll_events); 219 if (!status) { 220 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s", 221 status.GetErrorMessage().c_str()); 222 return status.error_status(); 223 } 224 225 const int events = status.get(); 226 PDX_TRACE_FORMAT( 227 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;" 228 "events=%d|", 229 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events); 230 231 if (events & EPOLLIN) { 232 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()}); 233 } else if (events & EPOLLHUP) { 234 ALOGW( 235 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu " 236 "event_fd=%d buffer_id=%d", 237 slot, buffers_[slot]->event_fd(), buffers_[slot]->id()); 238 return RemoveBuffer(slot); 239 } else { 240 ALOGW( 241 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll " 242 "events=%d", 243 slot, events); 244 } 245 246 return {}; 247 } 248 249 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { 250 ATRACE_NAME("BufferHubQueue::HandleQueueEvent"); 251 auto status = GetEventMask(poll_event); 252 if (!status) { 253 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s", 254 status.GetErrorMessage().c_str()); 255 return status.error_status(); 256 } 257 258 const int events = status.get(); 259 if (events & EPOLLIN) { 260 // Note that after buffer imports, if |count()| still returns 0, epoll 261 // wait will be tried again to acquire the newly imported buffer. 262 auto buffer_status = OnBufferAllocated(); 263 if (!buffer_status) { 264 ALOGE("%s: Failed to import buffer: %s", __FUNCTION__, 265 buffer_status.GetErrorMessage().c_str()); 266 } 267 } else if (events & EPOLLHUP) { 268 ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__); 269 hung_up_ = true; 270 } else { 271 ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events); 272 } 273 274 return {}; 275 } 276 277 Status<void> BufferHubQueue::AddBuffer( 278 const std::shared_ptr<BufferHubBase>& buffer, size_t slot) { 279 ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(), 280 slot); 281 282 if (is_full()) { 283 ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_); 284 return ErrorStatus(E2BIG); 285 } 286 287 if (buffers_[slot]) { 288 // Replace the buffer if the slot is occupied. This could happen when the 289 // producer side replaced the slot with a newly allocated buffer. Remove the 290 // buffer before setting up with the new one. 291 auto remove_status = RemoveBuffer(slot); 292 if (!remove_status) 293 return remove_status.error_status(); 294 } 295 296 for (const auto& event_source : buffer->GetEventSources()) { 297 epoll_event event = {.events = event_source.event_mask | EPOLLET, 298 .data = {.u64 = Stuff(buffer->event_fd(), slot)}}; 299 const int ret = 300 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); 301 if (ret < 0) { 302 ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__, 303 strerror(-ret)); 304 return ErrorStatus(-ret); 305 } 306 } 307 308 buffers_[slot] = buffer; 309 capacity_++; 310 return {}; 311 } 312 313 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { 314 ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot); 315 316 if (buffers_[slot]) { 317 for (const auto& event_source : buffers_[slot]->GetEventSources()) { 318 const int ret = 319 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); 320 if (ret < 0) { 321 ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__, 322 strerror(-ret)); 323 return ErrorStatus(-ret); 324 } 325 } 326 327 // Trigger OnBufferRemoved callback if registered. 328 if (on_buffer_removed_) 329 on_buffer_removed_(buffers_[slot]); 330 331 buffers_[slot] = nullptr; 332 capacity_--; 333 } 334 335 return {}; 336 } 337 338 Status<void> BufferHubQueue::Enqueue(Entry entry) { 339 if (!is_full()) { 340 // Find and remove the enqueued buffer from unavailable_buffers_slot if 341 // exist. 342 auto enqueued_buffer_iter = std::find_if( 343 unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(), 344 [&entry](size_t slot) -> bool { return slot == entry.slot; }); 345 if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) { 346 unavailable_buffers_slot_.erase(enqueued_buffer_iter); 347 } 348 349 available_buffers_.push(std::move(entry)); 350 351 // Trigger OnBufferAvailable callback if registered. 352 if (on_buffer_available_) 353 on_buffer_available_(); 354 355 return {}; 356 } else { 357 ALOGE("%s: Buffer queue is full!", __FUNCTION__); 358 return ErrorStatus(E2BIG); 359 } 360 } 361 362 Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout, 363 size_t* slot) { 364 ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout); 365 366 PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count()); 367 368 if (count() == 0) { 369 if (!WaitForBuffers(timeout)) 370 return ErrorStatus(ETIMEDOUT); 371 } 372 373 auto& entry = available_buffers_.top(); 374 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(), 375 entry.slot); 376 377 std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer); 378 *slot = entry.slot; 379 380 available_buffers_.pop(); 381 unavailable_buffers_slot_.push_back(*slot); 382 383 return {std::move(buffer)}; 384 } 385 386 void BufferHubQueue::SetBufferAvailableCallback( 387 BufferAvailableCallback callback) { 388 on_buffer_available_ = callback; 389 } 390 391 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) { 392 on_buffer_removed_ = callback; 393 } 394 395 pdx::Status<void> BufferHubQueue::FreeAllBuffers() { 396 // Clear all available buffers. 397 while (!available_buffers_.empty()) 398 available_buffers_.pop(); 399 400 pdx::Status<void> last_error; // No error. 401 // Clear all buffers this producer queue is tracking. 402 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) { 403 if (buffers_[slot] != nullptr) { 404 auto status = RemoveBuffer(slot); 405 if (!status) { 406 ALOGE( 407 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at " 408 "slot=%zu.", 409 slot); 410 last_error = status.error_status(); 411 } 412 } 413 } 414 415 return last_error; 416 } 417 418 ProducerQueue::ProducerQueue(LocalChannelHandle handle) 419 : BASE(std::move(handle)) { 420 auto status = ImportQueue(); 421 if (!status) { 422 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s", 423 status.GetErrorMessage().c_str()); 424 Close(-status.error()); 425 } 426 } 427 428 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config, 429 const UsagePolicy& usage) 430 : BASE(BufferHubRPC::kClientPath) { 431 auto status = 432 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage); 433 if (!status) { 434 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s", 435 status.GetErrorMessage().c_str()); 436 Close(-status.error()); 437 return; 438 } 439 440 SetupQueue(status.get()); 441 } 442 443 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers( 444 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, 445 uint64_t usage, size_t buffer_count) { 446 if (buffer_count == 0) { 447 return {std::vector<size_t>()}; 448 } 449 450 if (capacity() + buffer_count > kMaxQueueCapacity) { 451 ALOGE( 452 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot " 453 "allocate %zu more buffer(s).", 454 capacity(), buffer_count); 455 return ErrorStatus(E2BIG); 456 } 457 458 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status = 459 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( 460 width, height, layer_count, format, usage, buffer_count); 461 if (!status) { 462 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s", 463 status.GetErrorMessage().c_str()); 464 return status.error_status(); 465 } 466 467 auto buffer_handle_slots = status.take(); 468 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count, 469 "BufferHubRPC::ProducerQueueAllocateBuffers should " 470 "return %zu buffer handle(s), but returned %zu instead.", 471 buffer_count, buffer_handle_slots.size()); 472 473 std::vector<size_t> buffer_slots; 474 buffer_slots.reserve(buffer_count); 475 476 // Bookkeeping for each buffer. 477 for (auto& hs : buffer_handle_slots) { 478 auto& buffer_handle = hs.first; 479 size_t buffer_slot = hs.second; 480 481 // Note that import might (though very unlikely) fail. If so, buffer_handle 482 // will be closed and included in returned buffer_slots. 483 if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)), 484 buffer_slot)) { 485 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu", 486 buffer_slot); 487 buffer_slots.push_back(buffer_slot); 488 } 489 } 490 491 if (buffer_slots.size() != buffer_count) { 492 // Error out if the count of imported buffer(s) is not correct. 493 ALOGE( 494 "ProducerQueue::AllocateBuffers: requested to import %zu " 495 "buffers, but actually imported %zu buffers.", 496 buffer_count, buffer_slots.size()); 497 return ErrorStatus(ENOMEM); 498 } 499 500 return {std::move(buffer_slots)}; 501 } 502 503 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height, 504 uint32_t layer_count, 505 uint32_t format, uint64_t usage) { 506 // We only allocate one buffer at a time. 507 constexpr size_t buffer_count = 1; 508 auto status = 509 AllocateBuffers(width, height, layer_count, format, usage, buffer_count); 510 if (!status) { 511 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s", 512 status.GetErrorMessage().c_str()); 513 return status.error_status(); 514 } 515 516 return {status.get()[0]}; 517 } 518 519 Status<void> ProducerQueue::AddBuffer( 520 const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) { 521 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", 522 id(), buffer->id(), slot); 523 // For producer buffer, we need to enqueue the newly added buffer 524 // immediately. Producer queue starts with all buffers in available state. 525 auto status = BufferHubQueue::AddBuffer(buffer, slot); 526 if (!status) 527 return status; 528 529 return BufferHubQueue::Enqueue({buffer, slot, 0ULL}); 530 } 531 532 Status<size_t> ProducerQueue::InsertBuffer( 533 const std::shared_ptr<ProducerBuffer>& buffer) { 534 if (buffer == nullptr || 535 !BufferHubDefs::isClientGained(buffer->buffer_state(), 536 buffer->client_state_mask())) { 537 ALOGE( 538 "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in " 539 "gained state."); 540 return ErrorStatus(EINVAL); 541 } 542 543 auto status_or_slot = 544 InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>( 545 buffer->cid()); 546 if (!status_or_slot) { 547 ALOGE( 548 "ProducerQueue::InsertBuffer: Failed to insert producer buffer: " 549 "buffer_cid=%d, error: %s.", 550 buffer->cid(), status_or_slot.GetErrorMessage().c_str()); 551 return status_or_slot.error_status(); 552 } 553 554 size_t slot = status_or_slot.get(); 555 556 // Note that we are calling AddBuffer() from the base class to explicitly 557 // avoid Enqueue() the ProducerBuffer. 558 auto status = BufferHubQueue::AddBuffer(buffer, slot); 559 if (!status) { 560 ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.", 561 status.GetErrorMessage().c_str()); 562 return status.error_status(); 563 } 564 return {slot}; 565 } 566 567 Status<void> ProducerQueue::RemoveBuffer(size_t slot) { 568 auto status = 569 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); 570 if (!status) { 571 ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__, 572 status.GetErrorMessage().c_str()); 573 return status.error_status(); 574 } 575 576 return BufferHubQueue::RemoveBuffer(slot); 577 } 578 579 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue( 580 int timeout, size_t* slot, LocalHandle* release_fence) { 581 DvrNativeBufferMetadata canonical_meta; 582 return Dequeue(timeout, slot, &canonical_meta, release_fence); 583 } 584 585 pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue( 586 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 587 pdx::LocalHandle* release_fence, bool gain_posted_buffer) { 588 ATRACE_NAME("ProducerQueue::Dequeue"); 589 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { 590 ALOGE("%s: Invalid parameter.", __FUNCTION__); 591 return ErrorStatus(EINVAL); 592 } 593 594 std::shared_ptr<ProducerBuffer> buffer; 595 Status<std::shared_ptr<BufferHubBase>> dequeue_status = 596 BufferHubQueue::Dequeue(timeout, slot); 597 if (dequeue_status.ok()) { 598 buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take()); 599 } else { 600 if (gain_posted_buffer) { 601 Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status = 602 ProducerQueue::DequeueUnacquiredBuffer(slot); 603 if (!dequeue_unacquired_status.ok()) { 604 ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__, 605 dequeue_unacquired_status.error()); 606 return dequeue_unacquired_status.error_status(); 607 } 608 buffer = dequeue_unacquired_status.take(); 609 } else { 610 return dequeue_status.error_status(); 611 } 612 } 613 const int ret = 614 buffer->GainAsync(out_meta, release_fence, gain_posted_buffer); 615 if (ret < 0 && ret != -EALREADY) 616 return ErrorStatus(-ret); 617 618 return {std::move(buffer)}; 619 } 620 621 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer( 622 size_t* slot) { 623 if (unavailable_buffers_slot_.size() < 1) { 624 ALOGE( 625 "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in " 626 "acquired state if exist.", 627 __FUNCTION__); 628 return ErrorStatus(ENOMEM); 629 } 630 631 // Find the first buffer that is not in acquired state from 632 // unavailable_buffers_slot_. 633 for (auto iter = unavailable_buffers_slot_.begin(); 634 iter != unavailable_buffers_slot_.end(); iter++) { 635 std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter); 636 if (buffer == nullptr) { 637 ALOGE("%s failed. Buffer slot %d is null.", __FUNCTION__, 638 static_cast<int>(*slot)); 639 return ErrorStatus(EIO); 640 } 641 if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) { 642 *slot = *iter; 643 unavailable_buffers_slot_.erase(iter); 644 unavailable_buffers_slot_.push_back(*slot); 645 ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d", 646 __FUNCTION__, static_cast<int>(*slot)); 647 return {std::move(buffer)}; 648 } 649 } 650 ALOGE( 651 "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.", 652 __FUNCTION__); 653 return ErrorStatus(EBUSY); 654 } 655 656 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() { 657 if (capacity() != 0) { 658 ALOGE( 659 "%s: producer queue can only be taken out as a parcelable when empty. " 660 "Current queue capacity: %zu", 661 __FUNCTION__, capacity()); 662 return ErrorStatus(EINVAL); 663 } 664 665 std::unique_ptr<pdx::ClientChannel> channel = TakeChannel(); 666 ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable()); 667 668 // Here the queue parcelable is returned and holds the underlying system 669 // resources backing the queue; while the original client channel of this 670 // producer queue is destroyed in place so that this client can no longer 671 // provide producer operations. 672 return {std::move(queue_parcelable)}; 673 } 674 675 /*static */ 676 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import( 677 LocalChannelHandle handle) { 678 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle))); 679 } 680 681 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) 682 : BufferHubQueue(std::move(handle)) { 683 auto status = ImportQueue(); 684 if (!status) { 685 ALOGE("%s: Failed to import queue: %s", __FUNCTION__, 686 status.GetErrorMessage().c_str()); 687 Close(-status.error()); 688 } 689 690 auto import_status = ImportBuffers(); 691 if (import_status) { 692 ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get()); 693 } else { 694 ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, 695 import_status.GetErrorMessage().c_str()); 696 } 697 } 698 699 Status<size_t> ConsumerQueue::ImportBuffers() { 700 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); 701 if (!status) { 702 if (status.error() == EBADR) { 703 ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__); 704 return {0}; 705 } else { 706 ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__, 707 status.GetErrorMessage().c_str()); 708 return status.error_status(); 709 } 710 } 711 712 int ret; 713 Status<void> last_error; 714 size_t imported_buffers_count = 0; 715 716 auto buffer_handle_slots = status.take(); 717 for (auto& buffer_handle_slot : buffer_handle_slots) { 718 ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__, 719 buffer_handle_slot.first.value()); 720 721 std::unique_ptr<ConsumerBuffer> consumer_buffer = 722 ConsumerBuffer::Import(std::move(buffer_handle_slot.first)); 723 if (!consumer_buffer) { 724 ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__, 725 buffer_handle_slot.second); 726 last_error = ErrorStatus(EPIPE); 727 continue; 728 } 729 730 auto add_status = 731 AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second); 732 if (!add_status) { 733 ALOGE("%s: Failed to add buffer: %s", __FUNCTION__, 734 add_status.GetErrorMessage().c_str()); 735 last_error = add_status; 736 } else { 737 imported_buffers_count++; 738 } 739 } 740 741 if (imported_buffers_count > 0) 742 return {imported_buffers_count}; 743 else 744 return last_error.error_status(); 745 } 746 747 Status<void> ConsumerQueue::AddBuffer( 748 const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) { 749 ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(), 750 buffer->id(), slot); 751 return BufferHubQueue::AddBuffer(buffer, slot); 752 } 753 754 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue( 755 int timeout, size_t* slot, void* meta, size_t user_metadata_size, 756 LocalHandle* acquire_fence) { 757 if (user_metadata_size != user_metadata_size_) { 758 ALOGE( 759 "%s: Metadata size (%zu) for the dequeuing buffer does not match " 760 "metadata size (%zu) for the queue.", 761 __FUNCTION__, user_metadata_size, user_metadata_size_); 762 return ErrorStatus(EINVAL); 763 } 764 765 DvrNativeBufferMetadata canonical_meta; 766 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence); 767 if (!status) 768 return status.error_status(); 769 770 if (meta && user_metadata_size) { 771 void* metadata_src = 772 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr); 773 if (metadata_src) { 774 memcpy(meta, metadata_src, user_metadata_size); 775 } else { 776 ALOGW("%s: no user-defined metadata.", __FUNCTION__); 777 } 778 } 779 780 return status; 781 } 782 783 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue( 784 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 785 pdx::LocalHandle* acquire_fence) { 786 ATRACE_NAME("ConsumerQueue::Dequeue"); 787 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { 788 ALOGE("%s: Invalid parameter.", __FUNCTION__); 789 return ErrorStatus(EINVAL); 790 } 791 792 auto status = BufferHubQueue::Dequeue(timeout, slot); 793 if (!status) 794 return status.error_status(); 795 796 auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take()); 797 const int ret = buffer->AcquireAsync(out_meta, acquire_fence); 798 if (ret < 0) 799 return ErrorStatus(-ret); 800 801 return {std::move(buffer)}; 802 } 803 804 Status<void> ConsumerQueue::OnBufferAllocated() { 805 ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id()); 806 807 auto status = ImportBuffers(); 808 if (!status) { 809 ALOGE("%s: Failed to import buffers: %s", __FUNCTION__, 810 status.GetErrorMessage().c_str()); 811 return ErrorStatus(status.error()); 812 } else if (status.get() == 0) { 813 ALOGW("%s: No new buffers allocated!", __FUNCTION__); 814 return ErrorStatus(ENOBUFS); 815 } else { 816 ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__, 817 status.get()); 818 return {}; 819 } 820 } 821 822 } // namespace dvr 823 } // namespace android 824