Home | History | Annotate | Download | only in libbufferhubqueue
      1 #include "include/private/dvr/buffer_hub_queue_client.h"
      2 
      3 #include <inttypes.h>
      4 #include <log/log.h>
      5 #include <poll.h>
      6 #include <sys/epoll.h>
      7 
      8 #include <array>
      9 
     10 #include <pdx/default_transport/client_channel.h>
     11 #include <pdx/default_transport/client_channel_factory.h>
     12 #include <pdx/file_handle.h>
     13 #include <pdx/trace.h>
     14 
     15 #define RETRY_EINTR(fnc_call)                 \
     16   ([&]() -> decltype(fnc_call) {              \
     17     decltype(fnc_call) result;                \
     18     do {                                      \
     19       result = (fnc_call);                    \
     20     } while (result == -1 && errno == EINTR); \
     21     return result;                            \
     22   })()
     23 
     24 using android::pdx::ErrorStatus;
     25 using android::pdx::LocalChannelHandle;
     26 using android::pdx::LocalHandle;
     27 using android::pdx::Status;
     28 
     29 namespace android {
     30 namespace dvr {
     31 
     32 namespace {
     33 
     34 // Polls an fd for the given events.
     35 Status<int> PollEvents(int fd, short events) {
     36   const int kTimeoutMs = 0;
     37   pollfd pfd{fd, events, 0};
     38   const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
     39   if (count < 0) {
     40     return ErrorStatus(errno);
     41   } else if (count == 0) {
     42     return ErrorStatus(ETIMEDOUT);
     43   } else {
     44     return {pfd.revents};
     45   }
     46 }
     47 
     48 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
     49   return {static_cast<int32_t>(value >> 32),
     50           static_cast<int32_t>(value & ((1ull << 32) - 1))};
     51 }
     52 
     53 uint64_t Stuff(int32_t a, int32_t b) {
     54   const uint32_t ua = static_cast<uint32_t>(a);
     55   const uint32_t ub = static_cast<uint32_t>(b);
     56   return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
     57 }
     58 
     59 }  // anonymous namespace
     60 
     61 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
     62     : Client{pdx::default_transport::ClientChannel::Create(
     63           std::move(channel_handle))} {
     64   Initialize();
     65 }
     66 
     67 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
     68     : Client{
     69           pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
     70   Initialize();
     71 }
     72 
     73 void BufferHubQueue::Initialize() {
     74   int ret = epoll_fd_.Create();
     75   if (ret < 0) {
     76     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
     77           strerror(-ret));
     78     return;
     79   }
     80 
     81   epoll_event event = {
     82       .events = EPOLLIN | EPOLLET,
     83       .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
     84   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
     85   if (ret < 0) {
     86     ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
     87           strerror(-ret));
     88   }
     89 }
     90 
     91 Status<void> BufferHubQueue::ImportQueue() {
     92   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
     93   if (!status) {
     94     ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
     95           status.GetErrorMessage().c_str());
     96     return ErrorStatus(status.error());
     97   } else {
     98     SetupQueue(status.get());
     99     return {};
    100   }
    101 }
    102 
    103 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
    104   is_async_ = queue_info.producer_config.is_async;
    105   default_width_ = queue_info.producer_config.default_width;
    106   default_height_ = queue_info.producer_config.default_height;
    107   default_format_ = queue_info.producer_config.default_format;
    108   user_metadata_size_ = queue_info.producer_config.user_metadata_size;
    109   id_ = queue_info.id;
    110 }
    111 
    112 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
    113   if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
    114     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
    115   else
    116     return nullptr;
    117 }
    118 
    119 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
    120   if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
    121     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
    122   else
    123     return nullptr;
    124 }
    125 
    126 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
    127     bool silent) {
    128   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
    129   if (!status) {
    130     ALOGE(
    131         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
    132         "%s",
    133         status.GetErrorMessage().c_str());
    134     return ErrorStatus(status.error());
    135   }
    136 
    137   return status;
    138 }
    139 
    140 bool BufferHubQueue::WaitForBuffers(int timeout) {
    141   ATRACE_NAME("BufferHubQueue::WaitForBuffers");
    142   std::array<epoll_event, kMaxEvents> events;
    143 
    144   // Loop at least once to check for hangups.
    145   do {
    146     ALOGD_IF(
    147         TRACE,
    148         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
    149         id(), count(), capacity());
    150 
    151     // If there is already a buffer then just check for hangup without waiting.
    152     const int ret = epoll_fd_.Wait(events.data(), events.size(),
    153                                    count() == 0 ? timeout : 0);
    154 
    155     if (ret == 0) {
    156       ALOGI_IF(TRACE,
    157                "BufferHubQueue::WaitForBuffers: No events before timeout: "
    158                "queue_id=%d",
    159                id());
    160       return count() != 0;
    161     }
    162 
    163     if (ret < 0 && ret != -EINTR) {
    164       ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
    165             strerror(-ret));
    166       return false;
    167     }
    168 
    169     const int num_events = ret;
    170 
    171     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
    172     // one for each buffer in the queue, and one extra event for the queue
    173     // client itself.
    174     for (int i = 0; i < num_events; i++) {
    175       int32_t event_fd;
    176       int32_t index;
    177       std::tie(event_fd, index) = Unstuff(events[i].data.u64);
    178 
    179       PDX_TRACE_FORMAT(
    180           "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
    181           "slot=%d|",
    182           id(), num_events, i, event_fd, index);
    183 
    184       ALOGD_IF(TRACE,
    185                "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
    186                i, event_fd, index);
    187 
    188       if (is_buffer_event_index(index)) {
    189         HandleBufferEvent(static_cast<size_t>(index), event_fd,
    190                           events[i].events);
    191       } else if (is_queue_event_index(index)) {
    192         HandleQueueEvent(events[i].events);
    193       } else {
    194         ALOGW(
    195             "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
    196             "index=%d",
    197             event_fd, index);
    198       }
    199     }
    200   } while (count() == 0 && capacity() > 0 && !hung_up());
    201 
    202   return count() != 0;
    203 }
    204 
    205 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
    206                                                int poll_events) {
    207   ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
    208   if (!buffers_[slot]) {
    209     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    210     return ErrorStatus(ENOENT);
    211   }
    212 
    213   auto status = buffers_[slot]->GetEventMask(poll_events);
    214   if (!status) {
    215     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
    216           status.GetErrorMessage().c_str());
    217     return status.error_status();
    218   }
    219 
    220   const int events = status.get();
    221   PDX_TRACE_FORMAT(
    222       "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
    223       "events=%d|",
    224       id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
    225 
    226   if (events & EPOLLIN) {
    227     return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
    228   } else if (events & EPOLLHUP) {
    229     ALOGW(
    230         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
    231         "event_fd=%d buffer_id=%d",
    232         slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
    233     return RemoveBuffer(slot);
    234   } else {
    235     ALOGW(
    236         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
    237         "events=%d",
    238         slot, events);
    239   }
    240 
    241   return {};
    242 }
    243 
    244 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
    245   ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
    246   auto status = GetEventMask(poll_event);
    247   if (!status) {
    248     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
    249           status.GetErrorMessage().c_str());
    250     return status.error_status();
    251   }
    252 
    253   const int events = status.get();
    254   if (events & EPOLLIN) {
    255     // Note that after buffer imports, if |count()| still returns 0, epoll
    256     // wait will be tried again to acquire the newly imported buffer.
    257     auto buffer_status = OnBufferAllocated();
    258     if (!buffer_status) {
    259       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
    260             buffer_status.GetErrorMessage().c_str());
    261     }
    262   } else if (events & EPOLLHUP) {
    263     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
    264     hung_up_ = true;
    265   } else {
    266     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
    267   }
    268 
    269   return {};
    270 }
    271 
    272 Status<void> BufferHubQueue::AddBuffer(
    273     const std::shared_ptr<BufferHubBuffer>& buffer, size_t slot) {
    274   ALOGD_IF(TRACE, "BufferHubQueue::AddBuffer: buffer_id=%d slot=%zu",
    275            buffer->id(), slot);
    276 
    277   if (is_full()) {
    278     ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
    279           capacity_);
    280     return ErrorStatus(E2BIG);
    281   }
    282 
    283   if (buffers_[slot]) {
    284     // Replace the buffer if the slot is occupied. This could happen when the
    285     // producer side replaced the slot with a newly allocated buffer. Remove the
    286     // buffer before setting up with the new one.
    287     auto remove_status = RemoveBuffer(slot);
    288     if (!remove_status)
    289       return remove_status.error_status();
    290   }
    291 
    292   for (const auto& event_source : buffer->GetEventSources()) {
    293     epoll_event event = {.events = event_source.event_mask | EPOLLET,
    294                          .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
    295     const int ret =
    296         epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
    297     if (ret < 0) {
    298       ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
    299             strerror(-ret));
    300       return ErrorStatus(-ret);
    301     }
    302   }
    303 
    304   buffers_[slot] = buffer;
    305   capacity_++;
    306   return {};
    307 }
    308 
    309 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
    310   ALOGD_IF(TRACE, "BufferHubQueue::RemoveBuffer: slot=%zu", slot);
    311 
    312   if (buffers_[slot]) {
    313     for (const auto& event_source : buffers_[slot]->GetEventSources()) {
    314       const int ret =
    315           epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
    316       if (ret < 0) {
    317         ALOGE(
    318             "BufferHubQueue::RemoveBuffer: Failed to remove buffer from epoll "
    319             "set: %s",
    320             strerror(-ret));
    321         return ErrorStatus(-ret);
    322       }
    323     }
    324 
    325     // Trigger OnBufferRemoved callback if registered.
    326     if (on_buffer_removed_)
    327       on_buffer_removed_(buffers_[slot]);
    328 
    329     buffers_[slot] = nullptr;
    330     capacity_--;
    331   }
    332 
    333   return {};
    334 }
    335 
    336 Status<void> BufferHubQueue::Enqueue(Entry entry) {
    337   if (!is_full()) {
    338     available_buffers_.push(std::move(entry));
    339 
    340     // Trigger OnBufferAvailable callback if registered.
    341     if (on_buffer_available_)
    342       on_buffer_available_();
    343 
    344     return {};
    345   } else {
    346     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    347     return ErrorStatus(E2BIG);
    348   }
    349 }
    350 
    351 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(int timeout,
    352                                                                  size_t* slot) {
    353   ALOGD_IF(TRACE, "BufferHubQueue::Dequeue: count=%zu, timeout=%d", count(),
    354            timeout);
    355 
    356   PDX_TRACE_FORMAT("BufferHubQueue::Dequeue|count=%zu|", count());
    357 
    358   if (count() == 0) {
    359     if (!WaitForBuffers(timeout))
    360       return ErrorStatus(ETIMEDOUT);
    361   }
    362 
    363   auto& entry = available_buffers_.top();
    364   PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
    365                    entry.slot);
    366 
    367   std::shared_ptr<BufferHubBuffer> buffer = std::move(entry.buffer);
    368   *slot = entry.slot;
    369 
    370   available_buffers_.pop();
    371 
    372   return {std::move(buffer)};
    373 }
    374 
    375 void BufferHubQueue::SetBufferAvailableCallback(
    376     BufferAvailableCallback callback) {
    377   on_buffer_available_ = callback;
    378 }
    379 
    380 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
    381   on_buffer_removed_ = callback;
    382 }
    383 
    384 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
    385   // Clear all available buffers.
    386   while (!available_buffers_.empty())
    387     available_buffers_.pop();
    388 
    389   pdx::Status<void> last_error;  // No error.
    390   // Clear all buffers this producer queue is tracking.
    391   for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
    392     if (buffers_[slot] != nullptr) {
    393       auto status = RemoveBuffer(slot);
    394       if (!status) {
    395         ALOGE(
    396             "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
    397             "slot=%zu.",
    398             slot);
    399         last_error = status.error_status();
    400       }
    401     }
    402   }
    403 
    404   return last_error;
    405 }
    406 
    407 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
    408     : BASE(std::move(handle)) {
    409   auto status = ImportQueue();
    410   if (!status) {
    411     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
    412           status.GetErrorMessage().c_str());
    413     Close(-status.error());
    414   }
    415 }
    416 
    417 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
    418                              const UsagePolicy& usage)
    419     : BASE(BufferHubRPC::kClientPath) {
    420   auto status =
    421       InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
    422   if (!status) {
    423     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
    424           status.GetErrorMessage().c_str());
    425     Close(-status.error());
    426     return;
    427   }
    428 
    429   SetupQueue(status.get());
    430 }
    431 
    432 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
    433     uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
    434     uint64_t usage, size_t buffer_count) {
    435   if (capacity() + buffer_count > kMaxQueueCapacity) {
    436     ALOGE(
    437         "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
    438         "allocate %zu more buffer(s).",
    439         capacity(), buffer_count);
    440     return ErrorStatus(E2BIG);
    441   }
    442 
    443   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
    444       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
    445           width, height, layer_count, format, usage, buffer_count);
    446   if (!status) {
    447     ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
    448           status.GetErrorMessage().c_str());
    449     return status.error_status();
    450   }
    451 
    452   auto buffer_handle_slots = status.take();
    453   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
    454                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
    455                       "return %zu buffer handle(s), but returned %zu instead.",
    456                       buffer_count, buffer_handle_slots.size());
    457 
    458   std::vector<size_t> buffer_slots;
    459   buffer_slots.reserve(buffer_count);
    460 
    461   // Bookkeeping for each buffer.
    462   for (auto& hs : buffer_handle_slots) {
    463     auto& buffer_handle = hs.first;
    464     size_t buffer_slot = hs.second;
    465 
    466     // Note that import might (though very unlikely) fail. If so, buffer_handle
    467     // will be closed and included in returned buffer_slots.
    468     if (AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
    469                   buffer_slot)) {
    470       ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
    471                buffer_slot);
    472       buffer_slots.push_back(buffer_slot);
    473     }
    474   }
    475 
    476   if (buffer_slots.size() == 0) {
    477     // Error out if no buffer is allocated and improted.
    478     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffers: no buffer allocated.");
    479     ErrorStatus(ENOMEM);
    480   }
    481 
    482   return {std::move(buffer_slots)};
    483 }
    484 
    485 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
    486                                              uint32_t layer_count,
    487                                              uint32_t format, uint64_t usage) {
    488   // We only allocate one buffer at a time.
    489   constexpr size_t buffer_count = 1;
    490   auto status =
    491       AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
    492   if (!status) {
    493     ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
    494           status.GetErrorMessage().c_str());
    495     return status.error_status();
    496   }
    497 
    498   if (status.get().size() == 0) {
    499     ALOGE_IF(TRACE, "ProducerQueue::AllocateBuffer: no buffer allocated.");
    500     ErrorStatus(ENOMEM);
    501   }
    502 
    503   return {status.get()[0]};
    504 }
    505 
    506 Status<void> ProducerQueue::AddBuffer(
    507     const std::shared_ptr<BufferProducer>& buffer, size_t slot) {
    508   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
    509            id(), buffer->id(), slot);
    510   // For producer buffer, we need to enqueue the newly added buffer
    511   // immediately. Producer queue starts with all buffers in available state.
    512   auto status = BufferHubQueue::AddBuffer(buffer, slot);
    513   if (!status)
    514     return status;
    515 
    516   return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
    517 }
    518 
    519 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
    520   auto status =
    521       InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
    522   if (!status) {
    523     ALOGE("ProducerQueue::RemoveBuffer: Failed to remove producer buffer: %s",
    524           status.GetErrorMessage().c_str());
    525     return status.error_status();
    526   }
    527 
    528   return BufferHubQueue::RemoveBuffer(slot);
    529 }
    530 
    531 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
    532     int timeout, size_t* slot, LocalHandle* release_fence) {
    533   DvrNativeBufferMetadata canonical_meta;
    534   return Dequeue(timeout, slot, &canonical_meta, release_fence);
    535 }
    536 
    537 pdx::Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
    538     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
    539     pdx::LocalHandle* release_fence) {
    540   ATRACE_NAME("ProducerQueue::Dequeue");
    541   if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
    542     ALOGE("ProducerQueue::Dequeue: Invalid parameter.");
    543     return ErrorStatus(EINVAL);
    544   }
    545 
    546   auto status = BufferHubQueue::Dequeue(timeout, slot);
    547   if (!status)
    548     return status.error_status();
    549 
    550   auto buffer = std::static_pointer_cast<BufferProducer>(status.take());
    551   const int ret = buffer->GainAsync(out_meta, release_fence);
    552   if (ret < 0 && ret != -EALREADY)
    553     return ErrorStatus(-ret);
    554 
    555   return {std::move(buffer)};
    556 }
    557 
    558 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
    559     : BufferHubQueue(std::move(handle)) {
    560   auto status = ImportQueue();
    561   if (!status) {
    562     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
    563           status.GetErrorMessage().c_str());
    564     Close(-status.error());
    565   }
    566 
    567   auto import_status = ImportBuffers();
    568   if (import_status) {
    569     ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
    570           import_status.get());
    571   } else {
    572     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
    573           import_status.GetErrorMessage().c_str());
    574   }
    575 }
    576 
    577 Status<size_t> ConsumerQueue::ImportBuffers() {
    578   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
    579   if (!status) {
    580     if (status.error() == EBADR) {
    581       ALOGI(
    582           "ConsumerQueue::ImportBuffers: Queue is silent, no buffers "
    583           "imported.");
    584       return {0};
    585     } else {
    586       ALOGE(
    587           "ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
    588           status.GetErrorMessage().c_str());
    589       return status.error_status();
    590     }
    591   }
    592 
    593   int ret;
    594   Status<void> last_error;
    595   size_t imported_buffers_count = 0;
    596 
    597   auto buffer_handle_slots = status.take();
    598   for (auto& buffer_handle_slot : buffer_handle_slots) {
    599     ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
    600              buffer_handle_slot.first.value());
    601 
    602     std::unique_ptr<BufferConsumer> buffer_consumer =
    603         BufferConsumer::Import(std::move(buffer_handle_slot.first));
    604     if (!buffer_consumer) {
    605       ALOGE("ConsumerQueue::ImportBuffers: Failed to import buffer: slot=%zu",
    606             buffer_handle_slot.second);
    607       last_error = ErrorStatus(EPIPE);
    608       continue;
    609     }
    610 
    611     auto add_status =
    612         AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    613     if (!add_status) {
    614       ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
    615             add_status.GetErrorMessage().c_str());
    616       last_error = add_status;
    617     } else {
    618       imported_buffers_count++;
    619     }
    620   }
    621 
    622   if (imported_buffers_count > 0)
    623     return {imported_buffers_count};
    624   else
    625     return last_error.error_status();
    626 }
    627 
    628 Status<void> ConsumerQueue::AddBuffer(
    629     const std::shared_ptr<BufferConsumer>& buffer, size_t slot) {
    630   ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
    631            id(), buffer->id(), slot);
    632   auto status = BufferHubQueue::AddBuffer(buffer, slot);
    633   if (!status)
    634     return status;
    635 
    636   // Check to see if the buffer is already signaled. This is necessary to catch
    637   // cases where buffers are already available; epoll edge triggered mode does
    638   // not fire until an edge transition when adding new buffers to the epoll
    639   // set. Note that we only poll the fd events because HandleBufferEvent() takes
    640   // care of checking the translated buffer events.
    641   auto poll_status = PollEvents(buffer->event_fd(), POLLIN);
    642   if (!poll_status && poll_status.error() != ETIMEDOUT) {
    643     ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
    644           poll_status.GetErrorMessage().c_str());
    645     return poll_status.error_status();
    646   }
    647 
    648   // Update accounting if the buffer is available.
    649   if (poll_status)
    650     return HandleBufferEvent(slot, buffer->event_fd(), poll_status.get());
    651   else
    652     return {};
    653 }
    654 
    655 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    656     int timeout, size_t* slot, void* meta, size_t user_metadata_size,
    657     LocalHandle* acquire_fence) {
    658   if (user_metadata_size != user_metadata_size_) {
    659     ALOGE(
    660         "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
    661         "does not match metadata size (%zu) for the queue.",
    662         user_metadata_size, user_metadata_size_);
    663     return ErrorStatus(EINVAL);
    664   }
    665 
    666   DvrNativeBufferMetadata canonical_meta;
    667   auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
    668   if (!status)
    669     return status.error_status();
    670 
    671   if (meta && user_metadata_size) {
    672     void* metadata_src =
    673         reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
    674     if (metadata_src) {
    675       memcpy(meta, metadata_src, user_metadata_size);
    676     } else {
    677       ALOGW("ConsumerQueue::Dequeue: no user-defined metadata.");
    678     }
    679   }
    680 
    681   return status;
    682 }
    683 
    684 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    685     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
    686     pdx::LocalHandle* acquire_fence) {
    687   ATRACE_NAME("ConsumerQueue::Dequeue");
    688   if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
    689     ALOGE("ConsumerQueue::Dequeue: Invalid parameter.");
    690     return ErrorStatus(EINVAL);
    691   }
    692 
    693   auto status = BufferHubQueue::Dequeue(timeout, slot);
    694   if (!status)
    695     return status.error_status();
    696 
    697   auto buffer = std::static_pointer_cast<BufferConsumer>(status.take());
    698   const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
    699   if (ret < 0)
    700     return ErrorStatus(-ret);
    701 
    702   return {std::move(buffer)};
    703 }
    704 
    705 Status<void> ConsumerQueue::OnBufferAllocated() {
    706   ALOGD_IF(TRACE, "ConsumerQueue::OnBufferAllocated: queue_id=%d", id());
    707 
    708   auto status = ImportBuffers();
    709   if (!status) {
    710     ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
    711           status.GetErrorMessage().c_str());
    712     return ErrorStatus(status.error());
    713   } else if (status.get() == 0) {
    714     ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
    715     return ErrorStatus(ENOBUFS);
    716   } else {
    717     ALOGD_IF(TRACE,
    718              "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
    719              status.get());
    720     return {};
    721   }
    722 }
    723 
    724 }  // namespace dvr
    725 }  // namespace android
    726