Home | History | Annotate | Download | only in libbufferhubqueue
      1 #include "include/private/dvr/buffer_hub_queue_client.h"
      2 
      3 #include <inttypes.h>
      4 #include <log/log.h>
      5 #include <poll.h>
      6 #include <sys/epoll.h>
      7 
      8 #include <array>
      9 
     10 #include <pdx/default_transport/client_channel.h>
     11 #include <pdx/default_transport/client_channel_factory.h>
     12 #include <pdx/file_handle.h>
     13 #include <pdx/trace.h>
     14 
     15 #define RETRY_EINTR(fnc_call)                 \
     16   ([&]() -> decltype(fnc_call) {              \
     17     decltype(fnc_call) result;                \
     18     do {                                      \
     19       result = (fnc_call);                    \
     20     } while (result == -1 && errno == EINTR); \
     21     return result;                            \
     22   })()
     23 
     24 using android::pdx::ErrorStatus;
     25 using android::pdx::LocalChannelHandle;
     26 using android::pdx::LocalHandle;
     27 using android::pdx::Status;
     28 
     29 namespace android {
     30 namespace dvr {
     31 
     32 namespace {
     33 
     34 std::pair<int32_t, int32_t> Unstuff(uint64_t value) {
     35   return {static_cast<int32_t>(value >> 32),
     36           static_cast<int32_t>(value & ((1ull << 32) - 1))};
     37 }
     38 
     39 uint64_t Stuff(int32_t a, int32_t b) {
     40   const uint32_t ua = static_cast<uint32_t>(a);
     41   const uint32_t ub = static_cast<uint32_t>(b);
     42   return (static_cast<uint64_t>(ua) << 32) | static_cast<uint64_t>(ub);
     43 }
     44 
     45 }  // anonymous namespace
     46 
     47 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
     48     : Client{pdx::default_transport::ClientChannel::Create(
     49           std::move(channel_handle))} {
     50   Initialize();
     51 }
     52 
     53 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
     54     : Client{
     55           pdx::default_transport::ClientChannelFactory::Create(endpoint_path)} {
     56   Initialize();
     57 }
     58 
     59 void BufferHubQueue::Initialize() {
     60   int ret = epoll_fd_.Create();
     61   if (ret < 0) {
     62     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
     63           strerror(-ret));
     64     return;
     65   }
     66 
     67   epoll_event event = {
     68       .events = EPOLLIN | EPOLLET,
     69       .data = {.u64 = Stuff(-1, BufferHubQueue::kEpollQueueEventIndex)}};
     70   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
     71   if (ret < 0) {
     72     ALOGE("%s: Failed to add event fd to epoll set: %s", __FUNCTION__,
     73           strerror(-ret));
     74   }
     75 }
     76 
     77 Status<void> BufferHubQueue::ImportQueue() {
     78   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
     79   if (!status) {
     80     ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
     81           status.GetErrorMessage().c_str());
     82     return ErrorStatus(status.error());
     83   } else {
     84     SetupQueue(status.get());
     85     return {};
     86   }
     87 }
     88 
     89 void BufferHubQueue::SetupQueue(const QueueInfo& queue_info) {
     90   is_async_ = queue_info.producer_config.is_async;
     91   default_width_ = queue_info.producer_config.default_width;
     92   default_height_ = queue_info.producer_config.default_height;
     93   default_format_ = queue_info.producer_config.default_format;
     94   user_metadata_size_ = queue_info.producer_config.user_metadata_size;
     95   id_ = queue_info.id;
     96 }
     97 
     98 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
     99   if (auto status = CreateConsumerQueueHandle(/*silent*/ false))
    100     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
    101   else
    102     return nullptr;
    103 }
    104 
    105 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
    106   if (auto status = CreateConsumerQueueHandle(/*silent*/ true))
    107     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
    108   else
    109     return nullptr;
    110 }
    111 
    112 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle(
    113     bool silent) {
    114   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>(silent);
    115   if (!status) {
    116     ALOGE(
    117         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
    118         "%s",
    119         status.GetErrorMessage().c_str());
    120     return ErrorStatus(status.error());
    121   }
    122 
    123   return status;
    124 }
    125 
    126 pdx::Status<ConsumerQueueParcelable>
    127 BufferHubQueue::CreateConsumerQueueParcelable(bool silent) {
    128   auto status = CreateConsumerQueueHandle(silent);
    129   if (!status)
    130     return status.error_status();
    131 
    132   // A temporary consumer queue client to pull its channel parcelable.
    133   auto consumer_queue =
    134       std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
    135   ConsumerQueueParcelable queue_parcelable(
    136       consumer_queue->GetChannel()->TakeChannelParcelable());
    137 
    138   if (!queue_parcelable.IsValid()) {
    139     ALOGE("%s: Failed to create consumer queue parcelable.", __FUNCTION__);
    140     return ErrorStatus(EINVAL);
    141   }
    142 
    143   return {std::move(queue_parcelable)};
    144 }
    145 
    146 bool BufferHubQueue::WaitForBuffers(int timeout) {
    147   ATRACE_NAME("BufferHubQueue::WaitForBuffers");
    148   std::array<epoll_event, kMaxEvents> events;
    149 
    150   // Loop at least once to check for hangups.
    151   do {
    152     ALOGD_IF(
    153         TRACE,
    154         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
    155         id(), count(), capacity());
    156 
    157     // If there is already a buffer then just check for hangup without waiting.
    158     const int ret = epoll_fd_.Wait(events.data(), events.size(),
    159                                    count() == 0 ? timeout : 0);
    160 
    161     if (ret == 0) {
    162       ALOGI_IF(TRACE,
    163                "BufferHubQueue::WaitForBuffers: No events before timeout: "
    164                "queue_id=%d",
    165                id());
    166       return count() != 0;
    167     }
    168 
    169     if (ret < 0 && ret != -EINTR) {
    170       ALOGE("%s: Failed to wait for buffers: %s", __FUNCTION__, strerror(-ret));
    171       return false;
    172     }
    173 
    174     const int num_events = ret;
    175 
    176     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
    177     // one for each buffer in the queue, and one extra event for the queue
    178     // client itself.
    179     for (int i = 0; i < num_events; i++) {
    180       int32_t event_fd;
    181       int32_t index;
    182       std::tie(event_fd, index) = Unstuff(events[i].data.u64);
    183 
    184       PDX_TRACE_FORMAT(
    185           "epoll_event|queue_id=%d;num_events=%d;event_index=%d;event_fd=%d;"
    186           "slot=%d|",
    187           id(), num_events, i, event_fd, index);
    188 
    189       ALOGD_IF(TRACE,
    190                "BufferHubQueue::WaitForBuffers: event %d: event_fd=%d index=%d",
    191                i, event_fd, index);
    192 
    193       if (is_buffer_event_index(index)) {
    194         HandleBufferEvent(static_cast<size_t>(index), event_fd,
    195                           events[i].events);
    196       } else if (is_queue_event_index(index)) {
    197         HandleQueueEvent(events[i].events);
    198       } else {
    199         ALOGW(
    200             "BufferHubQueue::WaitForBuffers: Unknown event type event_fd=%d "
    201             "index=%d",
    202             event_fd, index);
    203       }
    204     }
    205   } while (count() == 0 && capacity() > 0 && !hung_up());
    206 
    207   return count() != 0;
    208 }
    209 
    210 Status<void> BufferHubQueue::HandleBufferEvent(size_t slot, int event_fd,
    211                                                int poll_events) {
    212   ATRACE_NAME("BufferHubQueue::HandleBufferEvent");
    213   if (!buffers_[slot]) {
    214     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    215     return ErrorStatus(ENOENT);
    216   }
    217 
    218   auto status = buffers_[slot]->GetEventMask(poll_events);
    219   if (!status) {
    220     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
    221           status.GetErrorMessage().c_str());
    222     return status.error_status();
    223   }
    224 
    225   const int events = status.get();
    226   PDX_TRACE_FORMAT(
    227       "buffer|queue_id=%d;buffer_id=%d;slot=%zu;event_fd=%d;poll_events=%x;"
    228       "events=%d|",
    229       id(), buffers_[slot]->id(), slot, event_fd, poll_events, events);
    230 
    231   if (events & EPOLLIN) {
    232     return Enqueue({buffers_[slot], slot, buffers_[slot]->GetQueueIndex()});
    233   } else if (events & EPOLLHUP) {
    234     ALOGW(
    235         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP event: slot=%zu "
    236         "event_fd=%d buffer_id=%d",
    237         slot, buffers_[slot]->event_fd(), buffers_[slot]->id());
    238     return RemoveBuffer(slot);
    239   } else {
    240     ALOGW(
    241         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
    242         "events=%d",
    243         slot, events);
    244   }
    245 
    246   return {};
    247 }
    248 
    249 Status<void> BufferHubQueue::HandleQueueEvent(int poll_event) {
    250   ATRACE_NAME("BufferHubQueue::HandleQueueEvent");
    251   auto status = GetEventMask(poll_event);
    252   if (!status) {
    253     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
    254           status.GetErrorMessage().c_str());
    255     return status.error_status();
    256   }
    257 
    258   const int events = status.get();
    259   if (events & EPOLLIN) {
    260     // Note that after buffer imports, if |count()| still returns 0, epoll
    261     // wait will be tried again to acquire the newly imported buffer.
    262     auto buffer_status = OnBufferAllocated();
    263     if (!buffer_status) {
    264       ALOGE("%s: Failed to import buffer: %s", __FUNCTION__,
    265             buffer_status.GetErrorMessage().c_str());
    266     }
    267   } else if (events & EPOLLHUP) {
    268     ALOGD_IF(TRACE, "%s: hang up event!", __FUNCTION__);
    269     hung_up_ = true;
    270   } else {
    271     ALOGW("%s: Unknown epoll events=%x", __FUNCTION__, events);
    272   }
    273 
    274   return {};
    275 }
    276 
    277 Status<void> BufferHubQueue::AddBuffer(
    278     const std::shared_ptr<BufferHubBase>& buffer, size_t slot) {
    279   ALOGD_IF(TRACE, "%s: buffer_id=%d slot=%zu", __FUNCTION__, buffer->id(),
    280            slot);
    281 
    282   if (is_full()) {
    283     ALOGE("%s: queue is at maximum capacity: %zu", __FUNCTION__, capacity_);
    284     return ErrorStatus(E2BIG);
    285   }
    286 
    287   if (buffers_[slot]) {
    288     // Replace the buffer if the slot is occupied. This could happen when the
    289     // producer side replaced the slot with a newly allocated buffer. Remove the
    290     // buffer before setting up with the new one.
    291     auto remove_status = RemoveBuffer(slot);
    292     if (!remove_status)
    293       return remove_status.error_status();
    294   }
    295 
    296   for (const auto& event_source : buffer->GetEventSources()) {
    297     epoll_event event = {.events = event_source.event_mask | EPOLLET,
    298                          .data = {.u64 = Stuff(buffer->event_fd(), slot)}};
    299     const int ret =
    300         epoll_fd_.Control(EPOLL_CTL_ADD, event_source.event_fd, &event);
    301     if (ret < 0) {
    302       ALOGE("%s: Failed to add buffer to epoll set: %s", __FUNCTION__,
    303             strerror(-ret));
    304       return ErrorStatus(-ret);
    305     }
    306   }
    307 
    308   buffers_[slot] = buffer;
    309   capacity_++;
    310   return {};
    311 }
    312 
    313 Status<void> BufferHubQueue::RemoveBuffer(size_t slot) {
    314   ALOGD_IF(TRACE, "%s: slot=%zu", __FUNCTION__, slot);
    315 
    316   if (buffers_[slot]) {
    317     for (const auto& event_source : buffers_[slot]->GetEventSources()) {
    318       const int ret =
    319           epoll_fd_.Control(EPOLL_CTL_DEL, event_source.event_fd, nullptr);
    320       if (ret < 0) {
    321         ALOGE("%s: Failed to remove buffer from epoll set: %s", __FUNCTION__,
    322               strerror(-ret));
    323         return ErrorStatus(-ret);
    324       }
    325     }
    326 
    327     // Trigger OnBufferRemoved callback if registered.
    328     if (on_buffer_removed_)
    329       on_buffer_removed_(buffers_[slot]);
    330 
    331     buffers_[slot] = nullptr;
    332     capacity_--;
    333   }
    334 
    335   return {};
    336 }
    337 
    338 Status<void> BufferHubQueue::Enqueue(Entry entry) {
    339   if (!is_full()) {
    340     // Find and remove the enqueued buffer from unavailable_buffers_slot if
    341     // exist.
    342     auto enqueued_buffer_iter = std::find_if(
    343         unavailable_buffers_slot_.begin(), unavailable_buffers_slot_.end(),
    344         [&entry](size_t slot) -> bool { return slot == entry.slot; });
    345     if (enqueued_buffer_iter != unavailable_buffers_slot_.end()) {
    346       unavailable_buffers_slot_.erase(enqueued_buffer_iter);
    347     }
    348 
    349     available_buffers_.push(std::move(entry));
    350 
    351     // Trigger OnBufferAvailable callback if registered.
    352     if (on_buffer_available_)
    353       on_buffer_available_();
    354 
    355     return {};
    356   } else {
    357     ALOGE("%s: Buffer queue is full!", __FUNCTION__);
    358     return ErrorStatus(E2BIG);
    359   }
    360 }
    361 
    362 Status<std::shared_ptr<BufferHubBase>> BufferHubQueue::Dequeue(int timeout,
    363                                                                size_t* slot) {
    364   ALOGD_IF(TRACE, "%s: count=%zu, timeout=%d", __FUNCTION__, count(), timeout);
    365 
    366   PDX_TRACE_FORMAT("%s|count=%zu|", __FUNCTION__, count());
    367 
    368   if (count() == 0) {
    369     if (!WaitForBuffers(timeout))
    370       return ErrorStatus(ETIMEDOUT);
    371   }
    372 
    373   auto& entry = available_buffers_.top();
    374   PDX_TRACE_FORMAT("buffer|buffer_id=%d;slot=%zu|", entry.buffer->id(),
    375                    entry.slot);
    376 
    377   std::shared_ptr<BufferHubBase> buffer = std::move(entry.buffer);
    378   *slot = entry.slot;
    379 
    380   available_buffers_.pop();
    381   unavailable_buffers_slot_.push_back(*slot);
    382 
    383   return {std::move(buffer)};
    384 }
    385 
    386 void BufferHubQueue::SetBufferAvailableCallback(
    387     BufferAvailableCallback callback) {
    388   on_buffer_available_ = callback;
    389 }
    390 
    391 void BufferHubQueue::SetBufferRemovedCallback(BufferRemovedCallback callback) {
    392   on_buffer_removed_ = callback;
    393 }
    394 
    395 pdx::Status<void> BufferHubQueue::FreeAllBuffers() {
    396   // Clear all available buffers.
    397   while (!available_buffers_.empty())
    398     available_buffers_.pop();
    399 
    400   pdx::Status<void> last_error;  // No error.
    401   // Clear all buffers this producer queue is tracking.
    402   for (size_t slot = 0; slot < BufferHubQueue::kMaxQueueCapacity; slot++) {
    403     if (buffers_[slot] != nullptr) {
    404       auto status = RemoveBuffer(slot);
    405       if (!status) {
    406         ALOGE(
    407             "ProducerQueue::FreeAllBuffers: Failed to remove buffer at "
    408             "slot=%zu.",
    409             slot);
    410         last_error = status.error_status();
    411       }
    412     }
    413   }
    414 
    415   return last_error;
    416 }
    417 
    418 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
    419     : BASE(std::move(handle)) {
    420   auto status = ImportQueue();
    421   if (!status) {
    422     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
    423           status.GetErrorMessage().c_str());
    424     Close(-status.error());
    425   }
    426 }
    427 
    428 ProducerQueue::ProducerQueue(const ProducerQueueConfig& config,
    429                              const UsagePolicy& usage)
    430     : BASE(BufferHubRPC::kClientPath) {
    431   auto status =
    432       InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(config, usage);
    433   if (!status) {
    434     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
    435           status.GetErrorMessage().c_str());
    436     Close(-status.error());
    437     return;
    438   }
    439 
    440   SetupQueue(status.get());
    441 }
    442 
    443 Status<std::vector<size_t>> ProducerQueue::AllocateBuffers(
    444     uint32_t width, uint32_t height, uint32_t layer_count, uint32_t format,
    445     uint64_t usage, size_t buffer_count) {
    446   if (buffer_count == 0) {
    447     return {std::vector<size_t>()};
    448   }
    449 
    450   if (capacity() + buffer_count > kMaxQueueCapacity) {
    451     ALOGE(
    452         "ProducerQueue::AllocateBuffers: queue is at capacity: %zu, cannot "
    453         "allocate %zu more buffer(s).",
    454         capacity(), buffer_count);
    455     return ErrorStatus(E2BIG);
    456   }
    457 
    458   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
    459       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
    460           width, height, layer_count, format, usage, buffer_count);
    461   if (!status) {
    462     ALOGE("ProducerQueue::AllocateBuffers: failed to allocate buffers: %s",
    463           status.GetErrorMessage().c_str());
    464     return status.error_status();
    465   }
    466 
    467   auto buffer_handle_slots = status.take();
    468   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != buffer_count,
    469                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
    470                       "return %zu buffer handle(s), but returned %zu instead.",
    471                       buffer_count, buffer_handle_slots.size());
    472 
    473   std::vector<size_t> buffer_slots;
    474   buffer_slots.reserve(buffer_count);
    475 
    476   // Bookkeeping for each buffer.
    477   for (auto& hs : buffer_handle_slots) {
    478     auto& buffer_handle = hs.first;
    479     size_t buffer_slot = hs.second;
    480 
    481     // Note that import might (though very unlikely) fail. If so, buffer_handle
    482     // will be closed and included in returned buffer_slots.
    483     if (AddBuffer(ProducerBuffer::Import(std::move(buffer_handle)),
    484                   buffer_slot)) {
    485       ALOGD_IF(TRACE, "ProducerQueue::AllocateBuffers: new buffer at slot: %zu",
    486                buffer_slot);
    487       buffer_slots.push_back(buffer_slot);
    488     }
    489   }
    490 
    491   if (buffer_slots.size() != buffer_count) {
    492     // Error out if the count of imported buffer(s) is not correct.
    493     ALOGE(
    494         "ProducerQueue::AllocateBuffers: requested to import %zu "
    495         "buffers, but actually imported %zu buffers.",
    496         buffer_count, buffer_slots.size());
    497     return ErrorStatus(ENOMEM);
    498   }
    499 
    500   return {std::move(buffer_slots)};
    501 }
    502 
    503 Status<size_t> ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
    504                                              uint32_t layer_count,
    505                                              uint32_t format, uint64_t usage) {
    506   // We only allocate one buffer at a time.
    507   constexpr size_t buffer_count = 1;
    508   auto status =
    509       AllocateBuffers(width, height, layer_count, format, usage, buffer_count);
    510   if (!status) {
    511     ALOGE("ProducerQueue::AllocateBuffer: Failed to allocate buffer: %s",
    512           status.GetErrorMessage().c_str());
    513     return status.error_status();
    514   }
    515 
    516   return {status.get()[0]};
    517 }
    518 
    519 Status<void> ProducerQueue::AddBuffer(
    520     const std::shared_ptr<ProducerBuffer>& buffer, size_t slot) {
    521   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
    522            id(), buffer->id(), slot);
    523   // For producer buffer, we need to enqueue the newly added buffer
    524   // immediately. Producer queue starts with all buffers in available state.
    525   auto status = BufferHubQueue::AddBuffer(buffer, slot);
    526   if (!status)
    527     return status;
    528 
    529   return BufferHubQueue::Enqueue({buffer, slot, 0ULL});
    530 }
    531 
    532 Status<size_t> ProducerQueue::InsertBuffer(
    533     const std::shared_ptr<ProducerBuffer>& buffer) {
    534   if (buffer == nullptr ||
    535       !BufferHubDefs::isClientGained(buffer->buffer_state(),
    536                                      buffer->client_state_mask())) {
    537     ALOGE(
    538         "ProducerQueue::InsertBuffer: Can only insert a buffer when it's in "
    539         "gained state.");
    540     return ErrorStatus(EINVAL);
    541   }
    542 
    543   auto status_or_slot =
    544       InvokeRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
    545           buffer->cid());
    546   if (!status_or_slot) {
    547     ALOGE(
    548         "ProducerQueue::InsertBuffer: Failed to insert producer buffer: "
    549         "buffer_cid=%d, error: %s.",
    550         buffer->cid(), status_or_slot.GetErrorMessage().c_str());
    551     return status_or_slot.error_status();
    552   }
    553 
    554   size_t slot = status_or_slot.get();
    555 
    556   // Note that we are calling AddBuffer() from the base class to explicitly
    557   // avoid Enqueue() the ProducerBuffer.
    558   auto status = BufferHubQueue::AddBuffer(buffer, slot);
    559   if (!status) {
    560     ALOGE("ProducerQueue::InsertBuffer: Failed to add buffer: %s.",
    561           status.GetErrorMessage().c_str());
    562     return status.error_status();
    563   }
    564   return {slot};
    565 }
    566 
    567 Status<void> ProducerQueue::RemoveBuffer(size_t slot) {
    568   auto status =
    569       InvokeRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(slot);
    570   if (!status) {
    571     ALOGE("%s: Failed to remove producer buffer: %s", __FUNCTION__,
    572           status.GetErrorMessage().c_str());
    573     return status.error_status();
    574   }
    575 
    576   return BufferHubQueue::RemoveBuffer(slot);
    577 }
    578 
    579 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
    580     int timeout, size_t* slot, LocalHandle* release_fence) {
    581   DvrNativeBufferMetadata canonical_meta;
    582   return Dequeue(timeout, slot, &canonical_meta, release_fence);
    583 }
    584 
    585 pdx::Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::Dequeue(
    586     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
    587     pdx::LocalHandle* release_fence, bool gain_posted_buffer) {
    588   ATRACE_NAME("ProducerQueue::Dequeue");
    589   if (slot == nullptr || out_meta == nullptr || release_fence == nullptr) {
    590     ALOGE("%s: Invalid parameter.", __FUNCTION__);
    591     return ErrorStatus(EINVAL);
    592   }
    593 
    594   std::shared_ptr<ProducerBuffer> buffer;
    595   Status<std::shared_ptr<BufferHubBase>> dequeue_status =
    596       BufferHubQueue::Dequeue(timeout, slot);
    597   if (dequeue_status.ok()) {
    598     buffer = std::static_pointer_cast<ProducerBuffer>(dequeue_status.take());
    599   } else {
    600     if (gain_posted_buffer) {
    601       Status<std::shared_ptr<ProducerBuffer>> dequeue_unacquired_status =
    602           ProducerQueue::DequeueUnacquiredBuffer(slot);
    603       if (!dequeue_unacquired_status.ok()) {
    604         ALOGE("%s: DequeueUnacquiredBuffer returned error: %d", __FUNCTION__,
    605               dequeue_unacquired_status.error());
    606         return dequeue_unacquired_status.error_status();
    607       }
    608       buffer = dequeue_unacquired_status.take();
    609     } else {
    610       return dequeue_status.error_status();
    611     }
    612   }
    613   const int ret =
    614       buffer->GainAsync(out_meta, release_fence, gain_posted_buffer);
    615   if (ret < 0 && ret != -EALREADY)
    616     return ErrorStatus(-ret);
    617 
    618   return {std::move(buffer)};
    619 }
    620 
    621 Status<std::shared_ptr<ProducerBuffer>> ProducerQueue::DequeueUnacquiredBuffer(
    622     size_t* slot) {
    623   if (unavailable_buffers_slot_.size() < 1) {
    624     ALOGE(
    625         "%s: Failed to dequeue un-acquired buffer. All buffer(s) are in "
    626         "acquired state if exist.",
    627         __FUNCTION__);
    628     return ErrorStatus(ENOMEM);
    629   }
    630 
    631   // Find the first buffer that is not in acquired state from
    632   // unavailable_buffers_slot_.
    633   for (auto iter = unavailable_buffers_slot_.begin();
    634        iter != unavailable_buffers_slot_.end(); iter++) {
    635     std::shared_ptr<ProducerBuffer> buffer = ProducerQueue::GetBuffer(*iter);
    636     if (buffer == nullptr) {
    637       ALOGE("%s failed. Buffer slot %d is  null.", __FUNCTION__,
    638             static_cast<int>(*slot));
    639       return ErrorStatus(EIO);
    640     }
    641     if (!BufferHubDefs::isAnyClientAcquired(buffer->buffer_state())) {
    642       *slot = *iter;
    643       unavailable_buffers_slot_.erase(iter);
    644       unavailable_buffers_slot_.push_back(*slot);
    645       ALOGD("%s: Producer queue dequeue unacquired buffer in slot %d",
    646             __FUNCTION__, static_cast<int>(*slot));
    647       return {std::move(buffer)};
    648     }
    649   }
    650   ALOGE(
    651       "%s: Failed to dequeue un-acquired buffer. No un-acquired buffer exist.",
    652       __FUNCTION__);
    653   return ErrorStatus(EBUSY);
    654 }
    655 
    656 pdx::Status<ProducerQueueParcelable> ProducerQueue::TakeAsParcelable() {
    657   if (capacity() != 0) {
    658     ALOGE(
    659         "%s: producer queue can only be taken out as a parcelable when empty. "
    660         "Current queue capacity: %zu",
    661         __FUNCTION__, capacity());
    662     return ErrorStatus(EINVAL);
    663   }
    664 
    665   std::unique_ptr<pdx::ClientChannel> channel = TakeChannel();
    666   ProducerQueueParcelable queue_parcelable(channel->TakeChannelParcelable());
    667 
    668   // Here the queue parcelable is returned and holds the underlying system
    669   // resources backing the queue; while the original client channel of this
    670   // producer queue is destroyed in place so that this client can no longer
    671   // provide producer operations.
    672   return {std::move(queue_parcelable)};
    673 }
    674 
    675 /*static */
    676 std::unique_ptr<ConsumerQueue> ConsumerQueue::Import(
    677     LocalChannelHandle handle) {
    678   return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(std::move(handle)));
    679 }
    680 
    681 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle)
    682     : BufferHubQueue(std::move(handle)) {
    683   auto status = ImportQueue();
    684   if (!status) {
    685     ALOGE("%s: Failed to import queue: %s", __FUNCTION__,
    686           status.GetErrorMessage().c_str());
    687     Close(-status.error());
    688   }
    689 
    690   auto import_status = ImportBuffers();
    691   if (import_status) {
    692     ALOGI("%s: Imported %zu buffers.", __FUNCTION__, import_status.get());
    693   } else {
    694     ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
    695           import_status.GetErrorMessage().c_str());
    696   }
    697 }
    698 
    699 Status<size_t> ConsumerQueue::ImportBuffers() {
    700   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
    701   if (!status) {
    702     if (status.error() == EBADR) {
    703       ALOGI("%s: Queue is silent, no buffers imported.", __FUNCTION__);
    704       return {0};
    705     } else {
    706       ALOGE("%s: Failed to import consumer buffer: %s", __FUNCTION__,
    707             status.GetErrorMessage().c_str());
    708       return status.error_status();
    709     }
    710   }
    711 
    712   int ret;
    713   Status<void> last_error;
    714   size_t imported_buffers_count = 0;
    715 
    716   auto buffer_handle_slots = status.take();
    717   for (auto& buffer_handle_slot : buffer_handle_slots) {
    718     ALOGD_IF(TRACE, ": buffer_handle=%d", __FUNCTION__,
    719              buffer_handle_slot.first.value());
    720 
    721     std::unique_ptr<ConsumerBuffer> consumer_buffer =
    722         ConsumerBuffer::Import(std::move(buffer_handle_slot.first));
    723     if (!consumer_buffer) {
    724       ALOGE("%s: Failed to import buffer: slot=%zu", __FUNCTION__,
    725             buffer_handle_slot.second);
    726       last_error = ErrorStatus(EPIPE);
    727       continue;
    728     }
    729 
    730     auto add_status =
    731         AddBuffer(std::move(consumer_buffer), buffer_handle_slot.second);
    732     if (!add_status) {
    733       ALOGE("%s: Failed to add buffer: %s", __FUNCTION__,
    734             add_status.GetErrorMessage().c_str());
    735       last_error = add_status;
    736     } else {
    737       imported_buffers_count++;
    738     }
    739   }
    740 
    741   if (imported_buffers_count > 0)
    742     return {imported_buffers_count};
    743   else
    744     return last_error.error_status();
    745 }
    746 
    747 Status<void> ConsumerQueue::AddBuffer(
    748     const std::shared_ptr<ConsumerBuffer>& buffer, size_t slot) {
    749   ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu", __FUNCTION__, id(),
    750            buffer->id(), slot);
    751   return BufferHubQueue::AddBuffer(buffer, slot);
    752 }
    753 
    754 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
    755     int timeout, size_t* slot, void* meta, size_t user_metadata_size,
    756     LocalHandle* acquire_fence) {
    757   if (user_metadata_size != user_metadata_size_) {
    758     ALOGE(
    759         "%s: Metadata size (%zu) for the dequeuing buffer does not match "
    760         "metadata size (%zu) for the queue.",
    761         __FUNCTION__, user_metadata_size, user_metadata_size_);
    762     return ErrorStatus(EINVAL);
    763   }
    764 
    765   DvrNativeBufferMetadata canonical_meta;
    766   auto status = Dequeue(timeout, slot, &canonical_meta, acquire_fence);
    767   if (!status)
    768     return status.error_status();
    769 
    770   if (meta && user_metadata_size) {
    771     void* metadata_src =
    772         reinterpret_cast<void*>(canonical_meta.user_metadata_ptr);
    773     if (metadata_src) {
    774       memcpy(meta, metadata_src, user_metadata_size);
    775     } else {
    776       ALOGW("%s: no user-defined metadata.", __FUNCTION__);
    777     }
    778   }
    779 
    780   return status;
    781 }
    782 
    783 Status<std::shared_ptr<ConsumerBuffer>> ConsumerQueue::Dequeue(
    784     int timeout, size_t* slot, DvrNativeBufferMetadata* out_meta,
    785     pdx::LocalHandle* acquire_fence) {
    786   ATRACE_NAME("ConsumerQueue::Dequeue");
    787   if (slot == nullptr || out_meta == nullptr || acquire_fence == nullptr) {
    788     ALOGE("%s: Invalid parameter.", __FUNCTION__);
    789     return ErrorStatus(EINVAL);
    790   }
    791 
    792   auto status = BufferHubQueue::Dequeue(timeout, slot);
    793   if (!status)
    794     return status.error_status();
    795 
    796   auto buffer = std::static_pointer_cast<ConsumerBuffer>(status.take());
    797   const int ret = buffer->AcquireAsync(out_meta, acquire_fence);
    798   if (ret < 0)
    799     return ErrorStatus(-ret);
    800 
    801   return {std::move(buffer)};
    802 }
    803 
    804 Status<void> ConsumerQueue::OnBufferAllocated() {
    805   ALOGD_IF(TRACE, "%s: queue_id=%d", __FUNCTION__, id());
    806 
    807   auto status = ImportBuffers();
    808   if (!status) {
    809     ALOGE("%s: Failed to import buffers: %s", __FUNCTION__,
    810           status.GetErrorMessage().c_str());
    811     return ErrorStatus(status.error());
    812   } else if (status.get() == 0) {
    813     ALOGW("%s: No new buffers allocated!", __FUNCTION__);
    814     return ErrorStatus(ENOBUFS);
    815   } else {
    816     ALOGD_IF(TRACE, "%s: Imported %zu consumer buffers.", __FUNCTION__,
    817              status.get());
    818     return {};
    819   }
    820 }
    821 
    822 }  // namespace dvr
    823 }  // namespace android
    824