1 #include "include/private/dvr/buffer_hub_queue_client.h" 2 3 #include <inttypes.h> 4 #include <log/log.h> 5 #include <poll.h> 6 #include <sys/epoll.h> 7 8 #include <array> 9 10 #include <pdx/default_transport/client_channel.h> 11 #include <pdx/default_transport/client_channel_factory.h> 12 #include <pdx/file_handle.h> 13 #include <pdx/trace.h> 14 15 #define RETRY_EINTR(fnc_call) \ 16 ([&]() -> decltype(fnc_call) { \ 17 decltype(fnc_call) result; \ 18 do { \ 19 result = (fnc_call); \ 20 } while (result == -1 && errno == EINTR); \ 21 return result; \ 22 })() 23 24 using android::pdx::ErrorStatus; 25 using android::pdx::LocalChannelHandle; 26 using android::pdx::LocalHandle; 27 using android::pdx::Status; 28 29 namespace android { 30 namespace dvr { 31 32 namespace { 33 34 // Polls an fd for the given events. 35 Status<int> PollEvents(int fd, short events) { 36 const int kTimeoutMs = 0; 37 pollfd pfd{fd, events, 0}; 38 const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs)); 39 if (count < 0) { 40 return ErrorStatus(errno); 41 } else if (count == 0) { 42 return ErrorStatus(ETIMEDOUT); 43 } else { 44 return {pfd.revents}; 45 } 46 } 47 48 std::pair<int32_t, int32_t> Unstuff(uint64_t value) { 49 return {static_cast<int32_t>(value >> 32), 50 static_cast<int32_t>(value & ((1ull << 32) - 1))}; 51 } 52 53 uint64_t Stuff(int32_t a, int32_t b) { 54 const uint32_t ua = static_cast<uint32_t>(a); 55 const uint32_t ub = static_cast<uint32_t>(b); 56 return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub); 57 } 58 59 } // anonymous namespace 60 61 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle) 62 : Client{pdx::default_transport::ClientChannel::Create( 63 std::move(channel_handle))} { 64 Initialize(); 65 } 66 67 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path) 68 : Client{ 69 pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} { 70 Initialize(); 71 } 72 73 void BufferHubQueue::Initialize() { 74 int ret = epoll_fd_.Create(); 75 if (ret < 0) { 76 ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s", 77 strerror(-ret)); 78 return; 79 } 80 81 epoll_event event = { 82 .events = EPOLLIN | EPOLLET, 83 .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}}; 84 ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event); 85 if (ret < 0) { 86 ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s", 87 strerror(-ret)); 88 } 89 } 90 91 Status<void> BufferHubQueue::ImportQueue() { 92 auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>(); 93 if (!status) { 94 ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s", 95 status.GetErrorMessage().c_str()); 96 return ErrorStatus(status.error()); 97 } else { 98 SetupQueue(status.get()); 99 return {}; 100 } 101 } 102 103 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) { 104 is_async_ = queue_info.producer_config.is_async; 105 default_width_ = queue_info.producer_config.default_width; 106 default_height_ = queue_info.producer_config.default_height; 107 default_format_ = queue_info.producer_config.default_format; 108 user_metadata_size_ = queue_info.producer_config.user_metadata_size; 109 id_ = queue_info.id; 110 } 111 112 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() { 113 if (auto status = CreateConsumerQueueHandle(/*silent*/ false)) 114 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 115 else 116 return nullptr; 117 } 118 119 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() { 120 if (auto status = CreateConsumerQueueHandle(/*silent*/ true)) 121 return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take())); 122 else 123 return nullptr; 124 } 125 126 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle( 127 bool silent) { 128 auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent); 129 if (!status) { 130 ALOGE( 131 "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: " 132 "%s", 133 status.GetErrorMessage().c_str()); 134 return ErrorStatus(status.error()); 135 } 136 137 return status; 138 } 139 140 bool BufferHubQueue::WaitForBuffers(int timeout) { 141 ATRACE_NAME("BufferHubQueue::WaitForBuffers"); 142 std::array<epoll_event, kMaxEvents> events; 143 144 // Loop at least once to check for hangups. 145 do { 146 ALOGD_IF( 147 TRACE, 148 "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu", 149 id(), count(), capacity()); 150 151 // If there is already a buffer then just check for hangup without waiting. 152 const int ret = epoll_fd_.Wait(events.data(), events.size(), 153 count() == 0 ? timeout : 0); 154 155 if (ret == 0) { 156 ALOGI_IF(TRACE, 157 "BufferHubQueue::WaitForBuffers: No events before timeout: " 158 "queue_id=%d", 159 id()); 160 return count() != 0; 161 } 162 163 if (ret < 0 && ret != -EINTR) { 164 ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s", 165 strerror(-ret)); 166 return false; 167 } 168 169 const int num_events = ret; 170 171 // A BufferQueue's epoll fd tracks N+1 events, where there are N events, 172 // one for each buffer in the queue, and one extra event for the queue 173 // client itself. 174 for (int i = 0; i < num_events; i++) { 175 int32_t event_fd; 176 int32_t index; 177 std::tie(event_fd, index) = Unstuff(events[i].data.u64); 178 179 PDX_TRACE_FORMAT( 180 "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;" 181 "slot=%d|", 182 id(), num_events, i, event_fd, index); 183 184 ALOGD_IF(TRACE, 185 "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d", 186 i, event_fd, index); 187 188 if (is_buffer_event_index(index)) { 189 HandleBufferEvent(static_cast<size_t>(index), event_fd, 190 events[i].events); 191 } else if (is_queue_event_index(index)) { 192 HandleQueueEvent(events[i].events); 193 } else { 194 ALOGW( 195 "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d " 196 "index=%d", 197 event_fd, index); 198 } 199 } 200 } while (count() == 0 && capacity() > 0 && !hung_up()); 201 202 return count() != 0; 203 } 204 205 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd, 206 int poll_events) { 207 ATRACE_NAME("BufferHubQueue::HandleBufferEvent"); 208 if (!buffers_[slot]) { 209 ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot); 210 return ErrorStatus(ENOENT); 211 } 212 213 auto status = buffers_[slot]->GetEventMask(poll_events); 214 if (!status) { 215 ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s", 216 status.GetErrorMessage().c_str()); 217 return status.error_status(); 218 } 219 220 const int events = status.get(); 221 PDX_TRACE_FORMAT( 222 "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;" 223 "events=%d|", 224 id(), buffers_[slot]->id(), slot, event_fd, poll_events, events); 225 226 if (events & EPOLLIN) { 227 return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()}); 228 } else if (events & EPOLLHUP) { 229 ALOGW( 230 "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu " 231 "event_fd=%d buffer_id=%d", 232 slot, buffers_[slot]->event_fd(), buffers_[slot]->id()); 233 return RemoveBuffer(slot); 234 } else { 235 ALOGW( 236 "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll " 237 "events=%d", 238 slot, events); 239 } 240 241 return {}; 242 } 243 244 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) { 245 ATRACE_NAME("BufferHubQueue::HandleQueueEvent"); 246 auto status = GetEventMask(poll_event); 247 if (!status) { 248 ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s", 249 status.GetErrorMessage().c_str()); 250 return status.error_status(); 251 } 252 253 const int events = status.get(); 254 if (events & EPOLLIN) { 255 // Note that after buffer imports, if |count()| still returns 0, epoll 256 // wait will be tried again to acquire the newly imported buffer. 257 auto buffer_status = OnBufferAllocated(); 258 if (!buffer_status) { 259 ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s", 260 buffer_status.GetErrorMessage().c_str()); 261 } 262 } else if (events & EPOLLHUP) { 263 ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!"); 264 hung_up_ = true; 265 } else { 266 ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events); 267 } 268 269 return {}; 270 } 271 272 Status<void> BufferHubQueue::AddBuffer( 273 const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) { 274 ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu", 275 buffer->id(), slot); 276 277 if (is_full()) { 278 ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu", 279 capacity_); 280 return ErrorStatus(E2BIG); 281 } 282 283 if (buffers_[slot]) { 284 // Replace the buffer if the slot is occupied. This could happen when the 285 // producer side replaced the slot with a newly allocated buffer. Remove the 286 // buffer before setting up with the new one. 287 auto remove_status = RemoveBuffer(slot); 288 if (!remove_status) 289 return remove_status.error_status(); 290 } 291 292 for (const auto& event_source : buffer->GetEventSources()) { 293 epoll_event event = {.events = event_source.event_mask | EPOLLET, 294 .data = {.u64 = Stuff(buffer->event_fd(), slot)}}; 295 const int ret = 296 epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event); 297 if (ret < 0) { 298 ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s", 299 strerror(-ret)); 300 return ErrorStatus(-ret); 301 } 302 } 303 304 buffers_[slot] = buffer; 305 capacity_++; 306 return {}; 307 } 308 309 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) { 310 ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot); 311 312 if (buffers_[slot]) { 313 for (const auto& event_source : buffers_[slot]->GetEventSources()) { 314 const int ret = 315 epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr); 316 if (ret < 0) { 317 ALOGE( 318 "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll " 319 "set: %s", 320 strerror(-ret)); 321 return ErrorStatus(-ret); 322 } 323 } 324 325 // Trigger OnBufferRemoved callback if registered. 326 if (on_buffer_removed_) 327 on_buffer_removed_(buffers_[slot]); 328 329 buffers_[slot] = nullptr; 330 capacity_--; 331 } 332 333 return {}; 334 } 335 336 Status<void> BufferHubQueue::Enqueue(Entry entry) { 337 if (!is_full()) { 338 available_buffers_.push(std::move(entry)); 339 340 // Trigger OnBufferAvailable callback if registered. 341 if (on_buffer_available_) 342 on_buffer_available_(); 343 344 return {}; 345 } else { 346 ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!"); 347 return ErrorStatus(E2BIG); 348 } 349 } 350 351 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout, 352 size_t* slot) { 353 ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(), 354 timeout); 355 356 PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count()); 357 358 if (count() == 0) { 359 if (!WaitForBuffers(timeout)) 360 return ErrorStatus(ETIMEDOUT); 361 } 362 363 auto& entry = available_buffers_.top(); 364 PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(), 365 entry.slot); 366 367 std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer); 368 *slot = entry.slot; 369 370 available_buffers_.pop(); 371 372 return {std::move(buffer)}; 373 } 374 375 void BufferHubQueue::SetBufferAvailableCallback( 376 BufferAvailableCallback callback) { 377 on_buffer_available_ = callback; 378 } 379 380 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) { 381 on_buffer_removed_ = callback; 382 } 383 384 pdx::Status<void> BufferHubQueue::FreeAllBuffers() { 385 // Clear all available buffers. 386 while (!available_buffers_.empty()) 387 available_buffers_.pop(); 388 389 pdx::Status<void> last_error; // No error. 390 // Clear all buffers this producer queue is tracking. 391 for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) { 392 if (buffers_[slot] != nullptr) { 393 auto status = RemoveBuffer(slot); 394 if (!status) { 395 ALOGE( 396 "ProducerQueue::FreeAllBuffers: Failed to remove buffer at " 397 "slot=%zu.", 398 slot); 399 last_error = status.error_status(); 400 } 401 } 402 } 403 404 return last_error; 405 } 406 407 ProducerQueue::ProducerQueue(LocalChannelHandle handle) 408 : BASE(std::move(handle)) { 409 auto status = ImportQueue(); 410 if (!status) { 411 ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s", 412 status.GetErrorMessage().c_str()); 413 Close(-status.error()); 414 } 415 } 416 417 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config, 418 const UsagePolicy& usage) 419 : BASE(BufferHubRPC::kClientPath) { 420 auto status = 421 InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage); 422 if (!status) { 423 ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s", 424 status.GetErrorMessage().c_str()); 425 Close(-status.error()); 426 return; 427 } 428 429 SetupQueue(status.get()); 430 } 431 432 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers( 433 uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format, 434 uint64_t usage, size_t buffer_count) { 435 if (capacity() + buffer_count > kMaxQueueCapacity) { 436 ALOGE( 437 "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot " 438 "allocate %zu more buffer(s).", 439 capacity(), buffer_count); 440 return ErrorStatus(E2BIG); 441 } 442 443 Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status = 444 InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( 445 width, height, layer_count, format, usage, buffer_count); 446 if (!status) { 447 ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s", 448 status.GetErrorMessage().c_str()); 449 return status.error_status(); 450 } 451 452 auto buffer_handle_slots = status.take(); 453 LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count, 454 "BufferHubRPC::ProducerQueueAllocateBuffers should " 455 "return %zu buffer handle(s), but returned %zu instead.", 456 buffer_count, buffer_handle_slots.size()); 457 458 std::vector<size_t> buffer_slots; 459 buffer_slots.reserve(buffer_count); 460 461 // Bookkeeping for each buffer. 462 for (auto& hs : buffer_handle_slots) { 463 auto& buffer_handle = hs.first; 464 size_t buffer_slot = hs.second; 465 466 // Note that import might (though very unlikely) fail. If so, buffer_handle 467 // will be closed and included in returned buffer_slots. 468 if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)), 469 buffer_slot)) { 470 ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu", 471 buffer_slot); 472 buffer_slots.push_back(buffer_slot); 473 } 474 } 475 476 if (buffer_slots.size() == 0) { 477 // Error out if no buffer is allocated and improted. 478 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated."); 479 ErrorStatus(ENOMEM); 480 } 481 482 return {std::move(buffer_slots)}; 483 } 484 485 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height, 486 uint32_t layer_count, 487 uint32_t format, uint64_t usage) { 488 // We only allocate one buffer at a time. 489 constexpr size_t buffer_count = 1; 490 auto status = 491 AllocateBuffers(width, height, layer_count, format, usage, buffer_count); 492 if (!status) { 493 ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s", 494 status.GetErrorMessage().c_str()); 495 return status.error_status(); 496 } 497 498 if (status.get().size() == 0) { 499 ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated."); 500 ErrorStatus(ENOMEM); 501 } 502 503 return {status.get()[0]}; 504 } 505 506 Status<void> ProducerQueue::AddBuffer( 507 const std::shared_ptr<BufferProducer>& buffer, size_t slot) { 508 ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", 509 id(), buffer->id(), slot); 510 // For producer buffer, we need to enqueue the newly added buffer 511 // immediately. Producer queue starts with all buffers in available state. 512 auto status = BufferHubQueue::AddBuffer(buffer, slot); 513 if (!status) 514 return status; 515 516 return BufferHubQueue::Enqueue({buffer, slot, 0ULL}); 517 } 518 519 Status<void> ProducerQueue::RemoveBuffer(size_t slot) { 520 auto status = 521 InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot); 522 if (!status) { 523 ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s", 524 status.GetErrorMessage().c_str()); 525 return status.error_status(); 526 } 527 528 return BufferHubQueue::RemoveBuffer(slot); 529 } 530 531 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( 532 int timeout, size_t* slot, LocalHandle* release_fence) { 533 DvrNativeBufferMetadata canonical_meta; 534 return Dequeue(timeout, slot, &canonical_meta, release_fence); 535 } 536 537 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue( 538 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 539 pdx::LocalHandle* release_fence) { 540 ATRACE_NAME("ProducerQueue::Dequeue"); 541 if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) { 542 ALOGE("ProducerQueue::Dequeue: Invalid parameter."); 543 return ErrorStatus(EINVAL); 544 } 545 546 auto status = BufferHubQueue::Dequeue(timeout, slot); 547 if (!status) 548 return status.error_status(); 549 550 auto buffer = std::static_pointer_cast<BufferProducer>(status.take()); 551 const int ret = buffer->GainAsync(out_meta, release_fence); 552 if (ret < 0 && ret != -EALREADY) 553 return ErrorStatus(-ret); 554 555 return {std::move(buffer)}; 556 } 557 558 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle) 559 : BufferHubQueue(std::move(handle)) { 560 auto status = ImportQueue(); 561 if (!status) { 562 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s", 563 status.GetErrorMessage().c_str()); 564 Close(-status.error()); 565 } 566 567 auto import_status = ImportBuffers(); 568 if (import_status) { 569 ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.", 570 import_status.get()); 571 } else { 572 ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s", 573 import_status.GetErrorMessage().c_str()); 574 } 575 } 576 577 Status<size_t> ConsumerQueue::ImportBuffers() { 578 auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(); 579 if (!status) { 580 if (status.error() == EBADR) { 581 ALOGI( 582 "ConsumerQueue::ImportBuffers: Queue is silent, no buffers " 583 "imported."); 584 return {0}; 585 } else { 586 ALOGE( 587 "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s", 588 status.GetErrorMessage().c_str()); 589 return status.error_status(); 590 } 591 } 592 593 int ret; 594 Status<void> last_error; 595 size_t imported_buffers_count = 0; 596 597 auto buffer_handle_slots = status.take(); 598 for (auto& buffer_handle_slot : buffer_handle_slots) { 599 ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d", 600 buffer_handle_slot.first.value()); 601 602 std::unique_ptr<BufferConsumer> buffer_consumer = 603 BufferConsumer::Import(std::move(buffer_handle_slot.first)); 604 if (!buffer_consumer) { 605 ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu", 606 buffer_handle_slot.second); 607 last_error = ErrorStatus(EPIPE); 608 continue; 609 } 610 611 auto add_status = 612 AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second); 613 if (!add_status) { 614 ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s", 615 add_status.GetErrorMessage().c_str()); 616 last_error = add_status; 617 } else { 618 imported_buffers_count++; 619 } 620 } 621 622 if (imported_buffers_count > 0) 623 return {imported_buffers_count}; 624 else 625 return last_error.error_status(); 626 } 627 628 Status<void> ConsumerQueue::AddBuffer( 629 const std::shared_ptr<BufferConsumer>& buffer, size_t slot) { 630 ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu", 631 id(), buffer->id(), slot); 632 auto status = BufferHubQueue::AddBuffer(buffer, slot); 633 if (!status) 634 return status; 635 636 // Check to see if the buffer is already signaled. This is necessary to catch 637 // cases where buffers are already available; epoll edge triggered mode does 638 // not fire until an edge transition when adding new buffers to the epoll 639 // set. Note that we only poll the fd events because HandleBufferEvent() takes 640 // care of checking the translated buffer events. 641 auto poll_status = PollEvents(buffer->event_fd(), POLLIN); 642 if (!poll_status && poll_status.error() != ETIMEDOUT) { 643 ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s", 644 poll_status.GetErrorMessage().c_str()); 645 return poll_status.error_status(); 646 } 647 648 // Update accounting if the buffer is available. 649 if (poll_status) 650 return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get()); 651 else 652 return {}; 653 } 654 655 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( 656 int timeout, size_t* slot, void* meta, size_t user_metadata_size, 657 LocalHandle* acquire_fence) { 658 if (user_metadata_size != user_metadata_size_) { 659 ALOGE( 660 "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer " 661 "does not match metadata size (%zu) for the queue.", 662 user_metadata_size, user_metadata_size_); 663 return ErrorStatus(EINVAL); 664 } 665 666 DvrNativeBufferMetadata canonical_meta; 667 auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence); 668 if (!status) 669 return status.error_status(); 670 671 if (meta && user_metadata_size) { 672 void* metadata_src = 673 reinterpret_cast<void*>(canonical_meta.user_metadata_ptr); 674 if (metadata_src) { 675 memcpy(meta, metadata_src, user_metadata_size); 676 } else { 677 ALOGW("ConsumerQueue::Dequeue: no user-defined metadata."); 678 } 679 } 680 681 return status; 682 } 683 684 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue( 685 int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta, 686 pdx::LocalHandle* acquire_fence) { 687 ATRACE_NAME("ConsumerQueue::Dequeue"); 688 if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) { 689 ALOGE("ConsumerQueue::Dequeue: Invalid parameter."); 690 return ErrorStatus(EINVAL); 691 } 692 693 auto status = BufferHubQueue::Dequeue(timeout, slot); 694 if (!status) 695 return status.error_status(); 696 697 auto buffer = std::static_pointer_cast<BufferConsumer>(status.take()); 698 const int ret = buffer->AcquireAsync(out_meta, acquire_fence); 699 if (ret < 0) 700 return ErrorStatus(-ret); 701 702 return {std::move(buffer)}; 703 } 704 705 Status<void> ConsumerQueue::OnBufferAllocated() { 706 ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id()); 707 708 auto status = ImportBuffers(); 709 if (!status) { 710 ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s", 711 status.GetErrorMessage().c_str()); 712 return ErrorStatus(status.error()); 713 } else if (status.get() == 0) { 714 ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!"); 715 return ErrorStatus(ENOBUFS); 716 } else { 717 ALOGD_IF(TRACE, 718 "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.", 719 status.get()); 720 return {}; 721 } 722 } 723 724 } // namespace dvr 725 } // namespace android 726