Home | History | Annotate | Download | only in libbufferhubqueue
      1 #include "include/private/dvr/buffer_hub_queue_client.h"
      2 
      3 #include <inttypes.h>
      4 #include <log/log.h>
      5 #include <poll.h>
      6 #include <sys/epoll.h>
      7 
      8 #include <array>
      9 
     10 #include <pdx/default_transport/client_channel.h>
     11 #include <pdx/default_transport/client_channel_factory.h>
     12 #include <pdx/file_handle.h>
     13 #include <private/dvr/bufferhub_rpc.h>
     14 
     15 #define RETRY_EINTR(fnc_call)                 \
     16   ([&]() -> decltype(fnc_call) {              \
     17     decltype(fnc_call) result;                \
     18     do {                                      \
     19       result = (fnc_call);                    \
     20     } while (result == -1 && errno == EINTR); \
     21     return result;                            \
     22   })()
     23 
     24 using android::pdx::ErrorStatus;
     25 using android::pdx::LocalChannelHandle;
     26 using android::pdx::Status;
     27 
     28 namespace android {
     29 namespace dvr {
     30 
     31 BufferHubQueue::BufferHubQueue(LocalChannelHandle channel_handle)
     32     : Client{pdx::default_transport::ClientChannel::Create(
     33           std::move(channel_handle))},
     34       meta_size_(0),
     35       buffers_(BufferHubQueue::kMaxQueueCapacity),
     36       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
     37       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
     38       fences_(BufferHubQueue::kMaxQueueCapacity),
     39       capacity_(0),
     40       id_(-1) {
     41   Initialize();
     42 }
     43 
     44 BufferHubQueue::BufferHubQueue(const std::string& endpoint_path)
     45     : Client{pdx::default_transport::ClientChannelFactory::Create(
     46           endpoint_path)},
     47       meta_size_(0),
     48       buffers_(BufferHubQueue::kMaxQueueCapacity),
     49       epollhup_pending_(BufferHubQueue::kMaxQueueCapacity, false),
     50       available_buffers_(BufferHubQueue::kMaxQueueCapacity),
     51       fences_(BufferHubQueue::kMaxQueueCapacity),
     52       capacity_(0),
     53       id_(-1) {
     54   Initialize();
     55 }
     56 
     57 void BufferHubQueue::Initialize() {
     58   int ret = epoll_fd_.Create();
     59   if (ret < 0) {
     60     ALOGE("BufferHubQueue::BufferHubQueue: Failed to create epoll fd: %s",
     61           strerror(-ret));
     62     return;
     63   }
     64 
     65   epoll_event event = {.events = EPOLLIN | EPOLLET,
     66                        .data = {.u64 = static_cast<uint64_t>(
     67                                     BufferHubQueue::kEpollQueueEventIndex)}};
     68   ret = epoll_fd_.Control(EPOLL_CTL_ADD, event_fd(), &event);
     69   if (ret < 0) {
     70     ALOGE("BufferHubQueue::Initialize: Failed to add event fd to epoll set: %s",
     71           strerror(-ret));
     72   }
     73 }
     74 
     75 Status<void> BufferHubQueue::ImportQueue() {
     76   auto status = InvokeRemoteMethod<BufferHubRPC::GetQueueInfo>();
     77   if (!status) {
     78     ALOGE("BufferHubQueue::ImportQueue: Failed to import queue: %s",
     79           status.GetErrorMessage().c_str());
     80     return ErrorStatus(status.error());
     81   } else {
     82     SetupQueue(status.get().meta_size_bytes, status.get().id);
     83     return {};
     84   }
     85 }
     86 
     87 void BufferHubQueue::SetupQueue(size_t meta_size_bytes, int id) {
     88   meta_size_ = meta_size_bytes;
     89   id_ = id;
     90   meta_buffer_tmp_.reset(meta_size_ > 0 ? new uint8_t[meta_size_] : nullptr);
     91 }
     92 
     93 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateConsumerQueue() {
     94   if (auto status = CreateConsumerQueueHandle())
     95     return std::unique_ptr<ConsumerQueue>(new ConsumerQueue(status.take()));
     96   else
     97     return nullptr;
     98 }
     99 
    100 std::unique_ptr<ConsumerQueue> BufferHubQueue::CreateSilentConsumerQueue() {
    101   if (auto status = CreateConsumerQueueHandle())
    102     return std::unique_ptr<ConsumerQueue>(
    103         new ConsumerQueue(status.take(), true));
    104   else
    105     return nullptr;
    106 }
    107 
    108 Status<LocalChannelHandle> BufferHubQueue::CreateConsumerQueueHandle() {
    109   auto status = InvokeRemoteMethod<BufferHubRPC::CreateConsumerQueue>();
    110   if (!status) {
    111     ALOGE(
    112         "BufferHubQueue::CreateConsumerQueue: Failed to create consumer queue: "
    113         "%s",
    114         status.GetErrorMessage().c_str());
    115     return ErrorStatus(status.error());
    116   }
    117 
    118   return status;
    119 }
    120 
    121 bool BufferHubQueue::WaitForBuffers(int timeout) {
    122   std::array<epoll_event, kMaxEvents> events;
    123 
    124   // Loop at least once to check for hangups.
    125   do {
    126     ALOGD_IF(
    127         TRACE,
    128         "BufferHubQueue::WaitForBuffers: queue_id=%d count=%zu capacity=%zu",
    129         id(), count(), capacity());
    130 
    131     // If there is already a buffer then just check for hangup without waiting.
    132     const int ret = epoll_fd_.Wait(events.data(), events.size(),
    133                                    count() == 0 ? timeout : 0);
    134 
    135     if (ret == 0) {
    136       ALOGI_IF(TRACE,
    137                "BufferHubQueue::WaitForBuffers: No events before timeout: "
    138                "queue_id=%d",
    139                id());
    140       return count() != 0;
    141     }
    142 
    143     if (ret < 0 && ret != -EINTR) {
    144       ALOGE("BufferHubQueue::WaitForBuffers: Failed to wait for buffers: %s",
    145             strerror(-ret));
    146       return false;
    147     }
    148 
    149     const int num_events = ret;
    150 
    151     // A BufferQueue's epoll fd tracks N+1 events, where there are N events,
    152     // one for each buffer, in the queue and one extra event for the queue
    153     // client itself.
    154     for (int i = 0; i < num_events; i++) {
    155       int64_t index = static_cast<int64_t>(events[i].data.u64);
    156 
    157       ALOGD_IF(TRACE,
    158                "BufferHubQueue::WaitForBuffers: event %d: index=%" PRId64, i,
    159                index);
    160 
    161       if (is_buffer_event_index(index)) {
    162         HandleBufferEvent(static_cast<size_t>(index), events[i].events);
    163       } else if (is_queue_event_index(index)) {
    164         HandleQueueEvent(events[i].events);
    165       } else {
    166         ALOGW("BufferHubQueue::WaitForBuffers: Unknown event index: %" PRId64,
    167               index);
    168       }
    169     }
    170   } while (count() == 0 && capacity() > 0 && !hung_up());
    171 
    172   return count() != 0;
    173 }
    174 
    175 void BufferHubQueue::HandleBufferEvent(size_t slot, int poll_events) {
    176   auto buffer = buffers_[slot];
    177   if (!buffer) {
    178     ALOGW("BufferHubQueue::HandleBufferEvent: Invalid buffer slot: %zu", slot);
    179     return;
    180   }
    181 
    182   auto status = buffer->GetEventMask(poll_events);
    183   if (!status) {
    184     ALOGW("BufferHubQueue::HandleBufferEvent: Failed to get event mask: %s",
    185           status.GetErrorMessage().c_str());
    186     return;
    187   }
    188 
    189   const int events = status.get();
    190   if (events & EPOLLIN) {
    191     const int ret = OnBufferReady(buffer, &fences_[slot]);
    192     if (ret == 0 || ret == -EALREADY || ret == -EBUSY) {
    193       // Only enqueue the buffer if it moves to or is already in the state
    194       // requested in OnBufferReady(). If the buffer is busy this means that the
    195       // buffer moved from released to posted when a new consumer was created
    196       // before the ProducerQueue had a chance to regain it. This is a valid
    197       // transition that we have to handle because edge triggered poll events
    198       // latch the ready state even if it is later de-asserted -- don't enqueue
    199       // or print an error log in this case.
    200       if (ret != -EBUSY)
    201         Enqueue(buffer, slot);
    202     } else {
    203       ALOGE(
    204           "BufferHubQueue::HandleBufferEvent: Failed to set buffer ready, "
    205           "queue_id=%d buffer_id=%d: %s",
    206           id(), buffer->id(), strerror(-ret));
    207     }
    208   } else if (events & EPOLLHUP) {
    209     // This might be caused by producer replacing an existing buffer slot, or
    210     // when BufferHubQueue is shutting down. For the first case, currently the
    211     // epoll FD is cleaned up when the replacement consumer client is imported,
    212     // we shouldn't detach again if |epollhub_pending_[slot]| is set.
    213     ALOGW(
    214         "BufferHubQueue::HandleBufferEvent: Received EPOLLHUP at slot: %zu, "
    215         "buffer event fd: %d, EPOLLHUP pending: %d",
    216         slot, buffer->event_fd(), int{epollhup_pending_[slot]});
    217     if (epollhup_pending_[slot]) {
    218       epollhup_pending_[slot] = false;
    219     } else {
    220       DetachBuffer(slot);
    221     }
    222   } else {
    223     ALOGW(
    224         "BufferHubQueue::HandleBufferEvent: Unknown event, slot=%zu, epoll "
    225         "events=%d",
    226         slot, events);
    227   }
    228 }
    229 
    230 void BufferHubQueue::HandleQueueEvent(int poll_event) {
    231   auto status = GetEventMask(poll_event);
    232   if (!status) {
    233     ALOGW("BufferHubQueue::HandleQueueEvent: Failed to get event mask: %s",
    234           status.GetErrorMessage().c_str());
    235     return;
    236   }
    237 
    238   const int events = status.get();
    239   if (events & EPOLLIN) {
    240     // Note that after buffer imports, if |count()| still returns 0, epoll
    241     // wait will be tried again to acquire the newly imported buffer.
    242     auto buffer_status = OnBufferAllocated();
    243     if (!buffer_status) {
    244       ALOGE("BufferHubQueue::HandleQueueEvent: Failed to import buffer: %s",
    245             buffer_status.GetErrorMessage().c_str());
    246     }
    247   } else if (events & EPOLLHUP) {
    248     ALOGD_IF(TRACE, "BufferHubQueue::HandleQueueEvent: hang up event!");
    249     hung_up_ = true;
    250   } else {
    251     ALOGW("BufferHubQueue::HandleQueueEvent: Unknown epoll events=%x", events);
    252   }
    253 }
    254 
    255 int BufferHubQueue::AddBuffer(const std::shared_ptr<BufferHubBuffer>& buf,
    256                               size_t slot) {
    257   if (is_full()) {
    258     // TODO(jwcai) Move the check into Producer's AllocateBuffer and consumer's
    259     // import buffer.
    260     ALOGE("BufferHubQueue::AddBuffer queue is at maximum capacity: %zu",
    261           capacity_);
    262     return -E2BIG;
    263   }
    264 
    265   if (buffers_[slot] != nullptr) {
    266     // Replace the buffer if the slot is preoccupied. This could happen when the
    267     // producer side replaced the slot with a newly allocated buffer. Detach the
    268     // buffer before setting up with the new one.
    269     DetachBuffer(slot);
    270     epollhup_pending_[slot] = true;
    271   }
    272 
    273   epoll_event event = {.events = EPOLLIN | EPOLLET, .data = {.u64 = slot}};
    274   const int ret = epoll_fd_.Control(EPOLL_CTL_ADD, buf->event_fd(), &event);
    275   if (ret < 0) {
    276     ALOGE("BufferHubQueue::AddBuffer: Failed to add buffer to epoll set: %s",
    277           strerror(-ret));
    278     return ret;
    279   }
    280 
    281   buffers_[slot] = buf;
    282   capacity_++;
    283   return 0;
    284 }
    285 
    286 int BufferHubQueue::DetachBuffer(size_t slot) {
    287   auto& buf = buffers_[slot];
    288   if (buf == nullptr) {
    289     ALOGE("BufferHubQueue::DetachBuffer: Invalid slot: %zu", slot);
    290     return -EINVAL;
    291   }
    292 
    293   const int ret = epoll_fd_.Control(EPOLL_CTL_DEL, buf->event_fd(), nullptr);
    294   if (ret < 0) {
    295     ALOGE(
    296         "BufferHubQueue::DetachBuffer: Failed to detach buffer from epoll set: "
    297         "%s",
    298         strerror(-ret));
    299     return ret;
    300   }
    301 
    302   buffers_[slot] = nullptr;
    303   capacity_--;
    304   return 0;
    305 }
    306 
    307 void BufferHubQueue::Enqueue(const std::shared_ptr<BufferHubBuffer>& buf,
    308                              size_t slot) {
    309   if (count() == capacity_) {
    310     ALOGE("BufferHubQueue::Enqueue: Buffer queue is full!");
    311     return;
    312   }
    313 
    314   // Set slot buffer back to vector.
    315   // TODO(jwcai) Here have to dynamically allocate BufferInfo::metadata due to
    316   // the limitation of the RingBuffer we are using. Would be better to refactor
    317   // that.
    318   BufferInfo buffer_info(slot, meta_size_);
    319   buffer_info.buffer = buf;
    320   // Swap metadata loaded during onBufferReady into vector.
    321   std::swap(buffer_info.metadata, meta_buffer_tmp_);
    322 
    323   available_buffers_.Append(std::move(buffer_info));
    324 }
    325 
    326 Status<std::shared_ptr<BufferHubBuffer>> BufferHubQueue::Dequeue(
    327     int timeout, size_t* slot, void* meta, LocalHandle* fence) {
    328   ALOGD_IF(TRACE, "Dequeue: count=%zu, timeout=%d", count(), timeout);
    329 
    330   if (!WaitForBuffers(timeout))
    331     return ErrorStatus(ETIMEDOUT);
    332 
    333   std::shared_ptr<BufferHubBuffer> buf;
    334   BufferInfo& buffer_info = available_buffers_.Front();
    335 
    336   *fence = std::move(fences_[buffer_info.slot]);
    337 
    338   // Report current pos as the output slot.
    339   std::swap(buffer_info.slot, *slot);
    340   // Swap buffer from vector to be returned later.
    341   std::swap(buffer_info.buffer, buf);
    342   // Swap metadata from vector into tmp so that we can write out to |meta|.
    343   std::swap(buffer_info.metadata, meta_buffer_tmp_);
    344 
    345   available_buffers_.PopFront();
    346 
    347   if (!buf) {
    348     ALOGE("BufferHubQueue::Dequeue: Buffer to be dequeued is nullptr");
    349     return ErrorStatus(ENOBUFS);
    350   }
    351 
    352   if (meta) {
    353     std::copy(meta_buffer_tmp_.get(), meta_buffer_tmp_.get() + meta_size_,
    354               reinterpret_cast<uint8_t*>(meta));
    355   }
    356 
    357   return {std::move(buf)};
    358 }
    359 
    360 ProducerQueue::ProducerQueue(size_t meta_size)
    361     : ProducerQueue(meta_size, 0, 0, 0, 0) {}
    362 
    363 ProducerQueue::ProducerQueue(LocalChannelHandle handle)
    364     : BASE(std::move(handle)) {
    365   auto status = ImportQueue();
    366   if (!status) {
    367     ALOGE("ProducerQueue::ProducerQueue: Failed to import queue: %s",
    368           status.GetErrorMessage().c_str());
    369     Close(-status.error());
    370   }
    371 }
    372 
    373 ProducerQueue::ProducerQueue(size_t meta_size, uint64_t usage_set_mask,
    374                              uint64_t usage_clear_mask,
    375                              uint64_t usage_deny_set_mask,
    376                              uint64_t usage_deny_clear_mask)
    377     : BASE(BufferHubRPC::kClientPath) {
    378   auto status = InvokeRemoteMethod<BufferHubRPC::CreateProducerQueue>(
    379       meta_size, UsagePolicy{usage_set_mask, usage_clear_mask,
    380                              usage_deny_set_mask, usage_deny_clear_mask});
    381   if (!status) {
    382     ALOGE("ProducerQueue::ProducerQueue: Failed to create producer queue: %s",
    383           status.GetErrorMessage().c_str());
    384     Close(-status.error());
    385     return;
    386   }
    387 
    388   SetupQueue(status.get().meta_size_bytes, status.get().id);
    389 }
    390 
    391 int ProducerQueue::AllocateBuffer(uint32_t width, uint32_t height,
    392                                   uint32_t layer_count, uint32_t format,
    393                                   uint64_t usage, size_t* out_slot) {
    394   if (out_slot == nullptr) {
    395     ALOGE("ProducerQueue::AllocateBuffer: Parameter out_slot cannot be null.");
    396     return -EINVAL;
    397   }
    398 
    399   if (is_full()) {
    400     ALOGE("ProducerQueue::AllocateBuffer queue is at maximum capacity: %zu",
    401           capacity());
    402     return -E2BIG;
    403   }
    404 
    405   const size_t kBufferCount = 1U;
    406   Status<std::vector<std::pair<LocalChannelHandle, size_t>>> status =
    407       InvokeRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
    408           width, height, layer_count, format, usage, kBufferCount);
    409   if (!status) {
    410     ALOGE("ProducerQueue::AllocateBuffer failed to create producer buffer: %s",
    411           status.GetErrorMessage().c_str());
    412     return -status.error();
    413   }
    414 
    415   auto buffer_handle_slots = status.take();
    416   LOG_ALWAYS_FATAL_IF(buffer_handle_slots.size() != kBufferCount,
    417                       "BufferHubRPC::ProducerQueueAllocateBuffers should "
    418                       "return one and only one buffer handle.");
    419 
    420   // We only allocate one buffer at a time.
    421   auto& buffer_handle = buffer_handle_slots[0].first;
    422   size_t buffer_slot = buffer_handle_slots[0].second;
    423   ALOGD_IF(TRACE,
    424            "ProducerQueue::AllocateBuffer, new buffer, channel_handle: %d",
    425            buffer_handle.value());
    426 
    427   *out_slot = buffer_slot;
    428   return AddBuffer(BufferProducer::Import(std::move(buffer_handle)),
    429                    buffer_slot);
    430 }
    431 
    432 int ProducerQueue::AddBuffer(const std::shared_ptr<BufferProducer>& buf,
    433                              size_t slot) {
    434   ALOGD_IF(TRACE, "ProducerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
    435            id(), buf->id(), slot);
    436   // For producer buffer, we need to enqueue the newly added buffer
    437   // immediately. Producer queue starts with all buffers in available state.
    438   const int ret = BufferHubQueue::AddBuffer(buf, slot);
    439   if (ret < 0)
    440     return ret;
    441 
    442   Enqueue(buf, slot);
    443   return 0;
    444 }
    445 
    446 int ProducerQueue::DetachBuffer(size_t slot) {
    447   auto status =
    448       InvokeRemoteMethod<BufferHubRPC::ProducerQueueDetachBuffer>(slot);
    449   if (!status) {
    450     ALOGE("ProducerQueue::DetachBuffer: Failed to detach producer buffer: %s",
    451           status.GetErrorMessage().c_str());
    452     return -status.error();
    453   }
    454 
    455   return BufferHubQueue::DetachBuffer(slot);
    456 }
    457 
    458 Status<std::shared_ptr<BufferProducer>> ProducerQueue::Dequeue(
    459     int timeout, size_t* slot, LocalHandle* release_fence) {
    460   if (slot == nullptr || release_fence == nullptr) {
    461     ALOGE("ProducerQueue::Dequeue: Invalid parameter: slot=%p release_fence=%p",
    462           slot, release_fence);
    463     return ErrorStatus(EINVAL);
    464   }
    465 
    466   auto buffer_status =
    467       BufferHubQueue::Dequeue(timeout, slot, nullptr, release_fence);
    468   if (!buffer_status)
    469     return buffer_status.error_status();
    470 
    471   return {std::static_pointer_cast<BufferProducer>(buffer_status.take())};
    472 }
    473 
    474 int ProducerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
    475                                  LocalHandle* release_fence) {
    476   ALOGD_IF(TRACE, "ProducerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
    477            id(), buf->id());
    478   auto buffer = std::static_pointer_cast<BufferProducer>(buf);
    479   return buffer->Gain(release_fence);
    480 }
    481 
    482 ConsumerQueue::ConsumerQueue(LocalChannelHandle handle, bool ignore_on_import)
    483     : BufferHubQueue(std::move(handle)), ignore_on_import_(ignore_on_import) {
    484   auto status = ImportQueue();
    485   if (!status) {
    486     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import queue: %s",
    487           status.GetErrorMessage().c_str());
    488     Close(-status.error());
    489   }
    490 
    491   auto import_status = ImportBuffers();
    492   if (import_status) {
    493     ALOGI("ConsumerQueue::ConsumerQueue: Imported %zu buffers.",
    494           import_status.get());
    495   } else {
    496     ALOGE("ConsumerQueue::ConsumerQueue: Failed to import buffers: %s",
    497           import_status.GetErrorMessage().c_str());
    498   }
    499 }
    500 
    501 Status<size_t> ConsumerQueue::ImportBuffers() {
    502   auto status = InvokeRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>();
    503   if (!status) {
    504     ALOGE("ConsumerQueue::ImportBuffers: Failed to import consumer buffer: %s",
    505           status.GetErrorMessage().c_str());
    506     return ErrorStatus(status.error());
    507   }
    508 
    509   int ret;
    510   int last_error = 0;
    511   int imported_buffers = 0;
    512 
    513   auto buffer_handle_slots = status.take();
    514   for (auto& buffer_handle_slot : buffer_handle_slots) {
    515     ALOGD_IF(TRACE, "ConsumerQueue::ImportBuffers: buffer_handle=%d",
    516              buffer_handle_slot.first.value());
    517 
    518     std::unique_ptr<BufferConsumer> buffer_consumer =
    519         BufferConsumer::Import(std::move(buffer_handle_slot.first));
    520 
    521     // Setup ignore state before adding buffer to the queue.
    522     if (ignore_on_import_) {
    523       ALOGD_IF(TRACE,
    524                "ConsumerQueue::ImportBuffers: Setting buffer to ignored state: "
    525                "buffer_id=%d",
    526                buffer_consumer->id());
    527       ret = buffer_consumer->SetIgnore(true);
    528       if (ret < 0) {
    529         ALOGE(
    530             "ConsumerQueue::ImportBuffers: Failed to set ignored state on "
    531             "imported buffer buffer_id=%d: %s",
    532             buffer_consumer->id(), strerror(-ret));
    533         last_error = ret;
    534       }
    535     }
    536 
    537     ret = AddBuffer(std::move(buffer_consumer), buffer_handle_slot.second);
    538     if (ret < 0) {
    539       ALOGE("ConsumerQueue::ImportBuffers: Failed to add buffer: %s",
    540             strerror(-ret));
    541       last_error = ret;
    542       continue;
    543     } else {
    544       imported_buffers++;
    545     }
    546   }
    547 
    548   if (imported_buffers > 0)
    549     return {imported_buffers};
    550   else
    551     return ErrorStatus(-last_error);
    552 }
    553 
    554 int ConsumerQueue::AddBuffer(const std::shared_ptr<BufferConsumer>& buf,
    555                              size_t slot) {
    556   ALOGD_IF(TRACE, "ConsumerQueue::AddBuffer: queue_id=%d buffer_id=%d slot=%zu",
    557            id(), buf->id(), slot);
    558   const int ret = BufferHubQueue::AddBuffer(buf, slot);
    559   if (ret < 0)
    560     return ret;
    561 
    562   // Check to see if the buffer is already signaled. This is necessary to catch
    563   // cases where buffers are already available; epoll edge triggered mode does
    564   // not fire until and edge transition when adding new buffers to the epoll
    565   // set.
    566   const int kTimeoutMs = 0;
    567   pollfd pfd{buf->event_fd(), POLLIN, 0};
    568   const int count = RETRY_EINTR(poll(&pfd, 1, kTimeoutMs));
    569   if (count < 0) {
    570     const int error = errno;
    571     ALOGE("ConsumerQueue::AddBuffer: Failed to poll consumer buffer: %s",
    572           strerror(errno));
    573     return -error;
    574   }
    575 
    576   if (count == 1)
    577     HandleBufferEvent(slot, pfd.revents);
    578 
    579   return 0;
    580 }
    581 
    582 Status<std::shared_ptr<BufferConsumer>> ConsumerQueue::Dequeue(
    583     int timeout, size_t* slot, void* meta, size_t meta_size,
    584     LocalHandle* acquire_fence) {
    585   if (meta_size != meta_size_) {
    586     ALOGE(
    587         "ConsumerQueue::Dequeue: Metadata size (%zu) for the dequeuing buffer "
    588         "does not match metadata size (%zu) for the queue.",
    589         meta_size, meta_size_);
    590     return ErrorStatus(EINVAL);
    591   }
    592 
    593   if (slot == nullptr || acquire_fence == nullptr) {
    594     ALOGE(
    595         "ConsumerQueue::Dequeue: Invalid parameter: slot=%p meta=%p "
    596         "acquire_fence=%p",
    597         slot, meta, acquire_fence);
    598     return ErrorStatus(EINVAL);
    599   }
    600 
    601   auto buffer_status =
    602       BufferHubQueue::Dequeue(timeout, slot, meta, acquire_fence);
    603   if (!buffer_status)
    604     return buffer_status.error_status();
    605 
    606   return {std::static_pointer_cast<BufferConsumer>(buffer_status.take())};
    607 }
    608 
    609 int ConsumerQueue::OnBufferReady(const std::shared_ptr<BufferHubBuffer>& buf,
    610                                  LocalHandle* acquire_fence) {
    611   ALOGD_IF(TRACE, "ConsumerQueue::OnBufferReady: queue_id=%d buffer_id=%d",
    612            id(), buf->id());
    613   auto buffer = std::static_pointer_cast<BufferConsumer>(buf);
    614   return buffer->Acquire(acquire_fence, meta_buffer_tmp_.get(), meta_size_);
    615 }
    616 
    617 Status<void> ConsumerQueue::OnBufferAllocated() {
    618   auto status = ImportBuffers();
    619   if (!status) {
    620     ALOGE("ConsumerQueue::OnBufferAllocated: Failed to import buffers: %s",
    621           status.GetErrorMessage().c_str());
    622     return ErrorStatus(status.error());
    623   } else if (status.get() == 0) {
    624     ALOGW("ConsumerQueue::OnBufferAllocated: No new buffers allocated!");
    625     return ErrorStatus(ENOBUFS);
    626   } else {
    627     ALOGD_IF(TRACE,
    628              "ConsumerQueue::OnBufferAllocated: Imported %zu consumer buffers.",
    629              status.get());
    630     return {};
    631   }
    632 }
    633 
    634 }  // namespace dvr
    635 }  // namespace android
    636