Home | History | Annotate | Download | only in bufferhubd
      1 #include <sys/epoll.h>
      2 #include <sys/eventfd.h>
      3 #include <sys/poll.h>
      4 
      5 #include <algorithm>
      6 #include <atomic>
      7 #include <thread>
      8 
      9 #include <log/log.h>
     10 #include <private/dvr/bufferhub_rpc.h>
     11 #include <private/dvr/consumer_channel.h>
     12 #include <private/dvr/producer_channel.h>
     13 #include <sync/sync.h>
     14 #include <utils/Trace.h>
     15 
     16 using android::pdx::BorrowedHandle;
     17 using android::pdx::ErrorStatus;
     18 using android::pdx::Message;
     19 using android::pdx::RemoteChannelHandle;
     20 using android::pdx::Status;
     21 using android::pdx::rpc::BufferWrapper;
     22 using android::pdx::rpc::DispatchRemoteMethod;
     23 using android::pdx::rpc::WrapBuffer;
     24 
     25 namespace android {
     26 namespace dvr {
     27 
     28 ProducerChannel::ProducerChannel(BufferHubService* service, int buffer_id,
     29                                  int channel_id, IonBuffer buffer,
     30                                  IonBuffer metadata_buffer,
     31                                  size_t user_metadata_size, int* error)
     32     : BufferHubChannel(service, buffer_id, channel_id, kProducerType),
     33       buffer_(std::move(buffer)),
     34       metadata_buffer_(std::move(metadata_buffer)),
     35       user_metadata_size_(user_metadata_size),
     36       metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
     37                          user_metadata_size) {
     38   if (!buffer_.IsValid()) {
     39     ALOGE("ProducerChannel::ProducerChannel: Invalid buffer.");
     40     *error = -EINVAL;
     41     return;
     42   }
     43   if (!metadata_buffer_.IsValid()) {
     44     ALOGE("ProducerChannel::ProducerChannel: Invalid metadata buffer.");
     45     *error = -EINVAL;
     46     return;
     47   }
     48 
     49   *error = InitializeBuffer();
     50 }
     51 
     52 ProducerChannel::ProducerChannel(BufferHubService* service, int channel_id,
     53                                  uint32_t width, uint32_t height,
     54                                  uint32_t layer_count, uint32_t format,
     55                                  uint64_t usage, size_t user_metadata_size,
     56                                  int* error)
     57     : BufferHubChannel(service, channel_id, channel_id, kProducerType),
     58       user_metadata_size_(user_metadata_size),
     59       metadata_buf_size_(BufferHubDefs::kMetadataHeaderSize +
     60                          user_metadata_size) {
     61   if (int ret = buffer_.Alloc(width, height, layer_count, format, usage)) {
     62     ALOGE("ProducerChannel::ProducerChannel: Failed to allocate buffer: %s",
     63           strerror(-ret));
     64     *error = ret;
     65     return;
     66   }
     67 
     68   if (int ret = metadata_buffer_.Alloc(metadata_buf_size_, /*height=*/1,
     69                                        /*layer_count=*/1,
     70                                        BufferHubDefs::kMetadataFormat,
     71                                        BufferHubDefs::kMetadataUsage)) {
     72     ALOGE("ProducerChannel::ProducerChannel: Failed to allocate metadata: %s",
     73           strerror(-ret));
     74     *error = ret;
     75     return;
     76   }
     77 
     78   *error = InitializeBuffer();
     79 }
     80 
     81 int ProducerChannel::InitializeBuffer() {
     82   void* metadata_ptr = nullptr;
     83   if (int ret = metadata_buffer_.Lock(BufferHubDefs::kMetadataUsage, /*x=*/0,
     84                                       /*y=*/0, metadata_buf_size_,
     85                                       /*height=*/1, &metadata_ptr)) {
     86     ALOGE("ProducerChannel::ProducerChannel: Failed to lock metadata.");
     87     return ret;
     88   }
     89   metadata_header_ =
     90       reinterpret_cast<BufferHubDefs::MetadataHeader*>(metadata_ptr);
     91 
     92   // Using placement new here to reuse shared memory instead of new allocation
     93   // and also initialize the value to zero.
     94   buffer_state_ = new (&metadata_header_->bufferState) std::atomic<uint32_t>(0);
     95   fence_state_ = new (&metadata_header_->fenceState) std::atomic<uint32_t>(0);
     96   active_clients_bit_mask_ =
     97       new (&metadata_header_->activeClientsBitMask) std::atomic<uint32_t>(0);
     98 
     99   // Producer channel is never created after consumer channel, and one buffer
    100   // only have one fixed producer for now. Thus, it is correct to assume
    101   // producer state bit is kFirstClientBitMask for now.
    102   active_clients_bit_mask_->store(BufferHubDefs::kFirstClientBitMask,
    103                                   std::memory_order_release);
    104 
    105   acquire_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
    106   release_fence_fd_.Reset(epoll_create1(EPOLL_CLOEXEC));
    107   if (!acquire_fence_fd_ || !release_fence_fd_) {
    108     ALOGE("ProducerChannel::ProducerChannel: Failed to create shared fences.");
    109     return -EIO;
    110   }
    111 
    112   dummy_fence_fd_.Reset(eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK));
    113   if (!dummy_fence_fd_) {
    114     ALOGE("ProducerChannel::ProducerChannel: Failed to create dummy fences.");
    115     return EIO;
    116   }
    117 
    118   epoll_event event;
    119   event.events = 0;
    120   event.data.u32 = 0U;
    121   if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_ADD, dummy_fence_fd_.Get(),
    122                 &event) < 0) {
    123     ALOGE(
    124         "ProducerChannel::ProducerChannel: Failed to modify the shared "
    125         "release fence to include the dummy fence: %s",
    126         strerror(errno));
    127     return -EIO;
    128   }
    129 
    130   // Success.
    131   return 0;
    132 }
    133 
    134 std::unique_ptr<ProducerChannel> ProducerChannel::Create(
    135     BufferHubService* service, int buffer_id, int channel_id, IonBuffer buffer,
    136     IonBuffer metadata_buffer, size_t user_metadata_size) {
    137   int error = 0;
    138   std::unique_ptr<ProducerChannel> producer(new ProducerChannel(
    139       service, buffer_id, channel_id, std::move(buffer),
    140       std::move(metadata_buffer), user_metadata_size, &error));
    141 
    142   if (error < 0)
    143     return nullptr;
    144   else
    145     return producer;
    146 }
    147 
    148 Status<std::shared_ptr<ProducerChannel>> ProducerChannel::Create(
    149     BufferHubService* service, int channel_id, uint32_t width, uint32_t height,
    150     uint32_t layer_count, uint32_t format, uint64_t usage,
    151     size_t user_metadata_size) {
    152   int error;
    153   std::shared_ptr<ProducerChannel> producer(
    154       new ProducerChannel(service, channel_id, width, height, layer_count,
    155                           format, usage, user_metadata_size, &error));
    156   if (error < 0)
    157     return ErrorStatus(-error);
    158   else
    159     return {std::move(producer)};
    160 }
    161 
    162 ProducerChannel::~ProducerChannel() {
    163   ALOGD_IF(TRACE,
    164            "ProducerChannel::~ProducerChannel: channel_id=%d buffer_id=%d "
    165            "state=%" PRIx32 ".",
    166            channel_id(), buffer_id(),
    167            buffer_state_->load(std::memory_order_acquire));
    168   for (auto consumer : consumer_channels_) {
    169     consumer->OnProducerClosed();
    170   }
    171   Hangup();
    172 }
    173 
    174 BufferHubChannel::BufferInfo ProducerChannel::GetBufferInfo() const {
    175   // Derive the mask of signaled buffers in this producer / consumer set.
    176   uint32_t signaled_mask = signaled() ? BufferHubDefs::kFirstClientBitMask : 0;
    177   for (const ConsumerChannel* consumer : consumer_channels_) {
    178     signaled_mask |= consumer->signaled() ? consumer->client_state_mask() : 0;
    179   }
    180 
    181   return BufferInfo(buffer_id(), consumer_channels_.size(), buffer_.width(),
    182                     buffer_.height(), buffer_.layer_count(), buffer_.format(),
    183                     buffer_.usage(),
    184                     buffer_state_->load(std::memory_order_acquire),
    185                     signaled_mask, metadata_header_->queueIndex);
    186 }
    187 
    188 void ProducerChannel::HandleImpulse(Message& message) {
    189   ATRACE_NAME("ProducerChannel::HandleImpulse");
    190   switch (message.GetOp()) {
    191     case BufferHubRPC::ProducerGain::Opcode:
    192       OnProducerGain(message);
    193       break;
    194     case BufferHubRPC::ProducerPost::Opcode:
    195       OnProducerPost(message, {});
    196       break;
    197   }
    198 }
    199 
    200 bool ProducerChannel::HandleMessage(Message& message) {
    201   ATRACE_NAME("ProducerChannel::HandleMessage");
    202   switch (message.GetOp()) {
    203     case BufferHubRPC::GetBuffer::Opcode:
    204       DispatchRemoteMethod<BufferHubRPC::GetBuffer>(
    205           *this, &ProducerChannel::OnGetBuffer, message);
    206       return true;
    207 
    208     case BufferHubRPC::NewConsumer::Opcode:
    209       DispatchRemoteMethod<BufferHubRPC::NewConsumer>(
    210           *this, &ProducerChannel::OnNewConsumer, message);
    211       return true;
    212 
    213     case BufferHubRPC::ProducerPost::Opcode:
    214       DispatchRemoteMethod<BufferHubRPC::ProducerPost>(
    215           *this, &ProducerChannel::OnProducerPost, message);
    216       return true;
    217 
    218     case BufferHubRPC::ProducerGain::Opcode:
    219       DispatchRemoteMethod<BufferHubRPC::ProducerGain>(
    220           *this, &ProducerChannel::OnProducerGain, message);
    221       return true;
    222 
    223     default:
    224       return false;
    225   }
    226 }
    227 
    228 BufferDescription<BorrowedHandle> ProducerChannel::GetBuffer(
    229     uint32_t client_state_mask) {
    230   return {buffer_,
    231           metadata_buffer_,
    232           buffer_id(),
    233           channel_id(),
    234           client_state_mask,
    235           acquire_fence_fd_.Borrow(),
    236           release_fence_fd_.Borrow()};
    237 }
    238 
    239 Status<BufferDescription<BorrowedHandle>> ProducerChannel::OnGetBuffer(
    240     Message& /*message*/) {
    241   ATRACE_NAME("ProducerChannel::OnGetBuffer");
    242   ALOGD_IF(TRACE, "ProducerChannel::OnGetBuffer: buffer=%d, state=%" PRIx32 ".",
    243            buffer_id(), buffer_state_->load(std::memory_order_acquire));
    244   return {GetBuffer(BufferHubDefs::kFirstClientBitMask)};
    245 }
    246 
    247 Status<uint32_t> ProducerChannel::CreateConsumerStateMask() {
    248   // Try find the next consumer state bit which has not been claimed by any
    249   // consumer yet.
    250   // memory_order_acquire is chosen here because all writes in other threads
    251   // that release active_clients_bit_mask_ need to be visible here.
    252   uint32_t current_active_clients_bit_mask =
    253       active_clients_bit_mask_->load(std::memory_order_acquire);
    254   uint32_t consumer_state_mask =
    255       BufferHubDefs::findNextAvailableClientStateMask(
    256           current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
    257   if (consumer_state_mask == 0U) {
    258     ALOGE("%s: reached the maximum mumber of consumers per producer: 63.",
    259           __FUNCTION__);
    260     return ErrorStatus(E2BIG);
    261   }
    262   uint32_t updated_active_clients_bit_mask =
    263       current_active_clients_bit_mask | consumer_state_mask;
    264   // Set the updated value only if the current value stays the same as what was
    265   // read before. If the comparison succeeds, update the value without
    266   // reordering anything before or after this read-modify-write in the current
    267   // thread, and the modification will be visible in other threads that acquire
    268   // active_clients_bit_mask_. If the comparison fails, load the result of
    269   // all writes from all threads to updated_active_clients_bit_mask.
    270   // Keep on finding the next available slient state mask until succeed or out
    271   // of memory.
    272   while (!active_clients_bit_mask_->compare_exchange_weak(
    273       current_active_clients_bit_mask, updated_active_clients_bit_mask,
    274       std::memory_order_acq_rel, std::memory_order_acquire)) {
    275     ALOGE("%s: Current active clients bit mask is changed to %" PRIx32
    276           ", which was expected to be %" PRIx32
    277           ". Trying to generate a new client state mask to resolve race "
    278           "condition.",
    279           __FUNCTION__, updated_active_clients_bit_mask,
    280           current_active_clients_bit_mask);
    281     consumer_state_mask = BufferHubDefs::findNextAvailableClientStateMask(
    282         current_active_clients_bit_mask | orphaned_consumer_bit_mask_);
    283     if (consumer_state_mask == 0U) {
    284       ALOGE("%s: reached the maximum mumber of consumers per producer: %d.",
    285             __FUNCTION__, (BufferHubDefs::kMaxNumberOfClients - 1));
    286       return ErrorStatus(E2BIG);
    287     }
    288     updated_active_clients_bit_mask =
    289         current_active_clients_bit_mask | consumer_state_mask;
    290   }
    291 
    292   return {consumer_state_mask};
    293 }
    294 
    295 void ProducerChannel::RemoveConsumerClientMask(uint32_t consumer_state_mask) {
    296   // Clear up the buffer state and fence state in case there is already
    297   // something there due to possible race condition between producer post and
    298   // consumer failed to create channel.
    299   buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
    300   fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
    301 
    302   // Restore the consumer state bit and make it visible in other threads that
    303   // acquire the active_clients_bit_mask_.
    304   active_clients_bit_mask_->fetch_and(~consumer_state_mask,
    305                                       std::memory_order_release);
    306 }
    307 
    308 Status<RemoteChannelHandle> ProducerChannel::CreateConsumer(
    309     Message& message, uint32_t consumer_state_mask) {
    310   ATRACE_NAME(__FUNCTION__);
    311   ALOGD("%s: buffer_id=%d", __FUNCTION__, buffer_id());
    312 
    313   int channel_id;
    314   auto status = message.PushChannel(0, nullptr, &channel_id);
    315   if (!status) {
    316     ALOGE("%s: Failed to push consumer channel: %s", __FUNCTION__,
    317           status.GetErrorMessage().c_str());
    318     RemoveConsumerClientMask(consumer_state_mask);
    319     return ErrorStatus(ENOMEM);
    320   }
    321 
    322   auto consumer = std::make_shared<ConsumerChannel>(
    323       service(), buffer_id(), channel_id, consumer_state_mask,
    324       shared_from_this());
    325   const auto channel_status = service()->SetChannel(channel_id, consumer);
    326   if (!channel_status) {
    327     ALOGE("%s: failed to set new consumer channel: %s.", __FUNCTION__,
    328           channel_status.GetErrorMessage().c_str());
    329     RemoveConsumerClientMask(consumer_state_mask);
    330     return ErrorStatus(ENOMEM);
    331   }
    332 
    333   uint32_t current_buffer_state =
    334       buffer_state_->load(std::memory_order_acquire);
    335   // Return the consumer channel handle without signal when adding the new
    336   // consumer to a buffer that is available to producer (a.k.a a fully-released
    337   // buffer) or a gained buffer.
    338   if (current_buffer_state == 0U ||
    339       BufferHubDefs::isAnyClientGained(current_buffer_state)) {
    340     return {status.take()};
    341   }
    342 
    343   // Signal the new consumer when adding it to a posted producer.
    344   bool update_buffer_state = true;
    345   if (!BufferHubDefs::isClientPosted(current_buffer_state,
    346                                      consumer_state_mask)) {
    347     uint32_t updated_buffer_state =
    348         current_buffer_state ^
    349         (consumer_state_mask & BufferHubDefs::kHighBitsMask);
    350     while (!buffer_state_->compare_exchange_weak(
    351         current_buffer_state, updated_buffer_state, std::memory_order_acq_rel,
    352         std::memory_order_acquire)) {
    353       ALOGI(
    354           "%s: Failed to post to the new consumer. "
    355           "Current buffer state was changed to %" PRIx32
    356           " when trying to acquire the buffer and modify the buffer state to "
    357           "%" PRIx32
    358           ". About to try again if the buffer is still not gained nor fully "
    359           "released.",
    360           __FUNCTION__, current_buffer_state, updated_buffer_state);
    361       if (current_buffer_state == 0U ||
    362           BufferHubDefs::isAnyClientGained(current_buffer_state)) {
    363         ALOGI("%s: buffer is gained or fully released, state=%" PRIx32 ".",
    364               __FUNCTION__, current_buffer_state);
    365         update_buffer_state = false;
    366         break;
    367       }
    368       updated_buffer_state =
    369           current_buffer_state ^
    370           (consumer_state_mask & BufferHubDefs::kHighBitsMask);
    371     }
    372   }
    373   if (update_buffer_state || BufferHubDefs::isClientPosted(
    374                                  buffer_state_->load(std::memory_order_acquire),
    375                                  consumer_state_mask)) {
    376     consumer->OnProducerPosted();
    377   }
    378 
    379   return {status.take()};
    380 }
    381 
    382 Status<RemoteChannelHandle> ProducerChannel::OnNewConsumer(Message& message) {
    383   ATRACE_NAME("ProducerChannel::OnNewConsumer");
    384   ALOGD_IF(TRACE, "ProducerChannel::OnNewConsumer: buffer_id=%d", buffer_id());
    385   auto status = CreateConsumerStateMask();
    386   if (!status.ok()) {
    387     return status.error_status();
    388   }
    389   return CreateConsumer(message, /*consumer_state_mask=*/status.get());
    390 }
    391 
    392 Status<void> ProducerChannel::OnProducerPost(Message&,
    393                                              LocalFence acquire_fence) {
    394   ATRACE_NAME("ProducerChannel::OnProducerPost");
    395   ALOGD_IF(TRACE, "%s: buffer_id=%d, state=0x%x", __FUNCTION__, buffer_id(),
    396            buffer_state_->load(std::memory_order_acquire));
    397 
    398   epoll_event event;
    399   event.events = 0;
    400   event.data.u32 = 0U;
    401   int ret = epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
    402                       dummy_fence_fd_.Get(), &event);
    403   ALOGE_IF(ret < 0,
    404            "ProducerChannel::OnProducerPost: Failed to modify the shared "
    405            "release fence to include the dummy fence: %s",
    406            strerror(errno));
    407 
    408   eventfd_t dummy_fence_count = 0U;
    409   if (eventfd_read(dummy_fence_fd_.Get(), &dummy_fence_count) < 0) {
    410     const int error = errno;
    411     if (error != EAGAIN) {
    412       ALOGE(
    413           "ProducerChannel::ProducerChannel: Failed to read dummy fence, "
    414           "error: %s",
    415           strerror(error));
    416       return ErrorStatus(error);
    417     }
    418   }
    419 
    420   ALOGW_IF(dummy_fence_count > 0,
    421            "ProducerChannel::ProducerChannel: %" PRIu64
    422            " dummy fence(s) was signaled during last release/gain cycle "
    423            "buffer_id=%d.",
    424            dummy_fence_count, buffer_id());
    425 
    426   post_fence_ = std::move(acquire_fence);
    427 
    428   // Signal any interested consumers. If there are none, the buffer will stay
    429   // in posted state until a consumer comes online. This behavior guarantees
    430   // that no frame is silently dropped.
    431   for (auto& consumer : consumer_channels_) {
    432     consumer->OnProducerPosted();
    433   }
    434 
    435   return {};
    436 }
    437 
    438 Status<LocalFence> ProducerChannel::OnProducerGain(Message& /*message*/) {
    439   ATRACE_NAME("ProducerChannel::OnGain");
    440   ALOGD_IF(TRACE, "%s: buffer_id=%d", __FUNCTION__, buffer_id());
    441 
    442   ClearAvailable();
    443   post_fence_.close();
    444   for (auto& consumer : consumer_channels_) {
    445     consumer->OnProducerGained();
    446   }
    447   return {std::move(returned_fence_)};
    448 }
    449 
    450 // TODO(b/112338294) Keep here for reference. Remove it after new logic is
    451 // written.
    452 /* Status<RemoteChannelHandle> ProducerChannel::OnProducerDetach(
    453     Message& message) {
    454   ATRACE_NAME("ProducerChannel::OnProducerDetach");
    455   ALOGD_IF(TRACE, "ProducerChannel::OnProducerDetach: buffer_id=%d",
    456            buffer_id());
    457 
    458   uint32_t buffer_state = buffer_state_->load(std::memory_order_acquire);
    459   if (!BufferHubDefs::isClientGained(
    460       buffer_state, BufferHubDefs::kFirstClientStateMask)) {
    461     // Can only detach a ProducerBuffer when it's in gained state.
    462     ALOGW(
    463         "ProducerChannel::OnProducerDetach: The buffer (id=%d, state=%"
    464         PRIx32
    465         ") is not in gained state.",
    466         buffer_id(), buffer_state);
    467     return {};
    468   }
    469 
    470   int channel_id;
    471   auto status = message.PushChannel(0, nullptr, &channel_id);
    472   if (!status) {
    473     ALOGE(
    474         "ProducerChannel::OnProducerDetach: Failed to push detached buffer "
    475         "channel: %s",
    476         status.GetErrorMessage().c_str());
    477     return ErrorStatus(ENOMEM);
    478   }
    479 
    480   // Make sure we unlock the buffer.
    481   if (int ret = metadata_buffer_.Unlock()) {
    482     ALOGE("ProducerChannel::OnProducerDetach: Failed to unlock metadata.");
    483     return ErrorStatus(-ret);
    484   };
    485 
    486   std::unique_ptr<BufferChannel> channel =
    487       BufferChannel::Create(service(), buffer_id(), channel_id,
    488                             std::move(buffer_), user_metadata_size_);
    489   if (!channel) {
    490     ALOGE("ProducerChannel::OnProducerDetach: Invalid buffer.");
    491     return ErrorStatus(EINVAL);
    492   }
    493 
    494   const auto channel_status =
    495       service()->SetChannel(channel_id, std::move(channel));
    496   if (!channel_status) {
    497     // Technically, this should never fail, as we just pushed the channel.
    498     // Note that LOG_FATAL will be stripped out in non-debug build.
    499     LOG_FATAL(
    500         "ProducerChannel::OnProducerDetach: Failed to set new detached "
    501         "buffer channel: %s.", channel_status.GetErrorMessage().c_str());
    502   }
    503 
    504   return status;
    505 } */
    506 
    507 Status<LocalFence> ProducerChannel::OnConsumerAcquire(Message& /*message*/) {
    508   ATRACE_NAME("ProducerChannel::OnConsumerAcquire");
    509   ALOGD_IF(TRACE, "ProducerChannel::OnConsumerAcquire: buffer_id=%d",
    510            buffer_id());
    511 
    512   // Return a borrowed fd to avoid unnecessary duplication of the underlying
    513   // fd. Serialization just needs to read the handle.
    514   return {std::move(post_fence_)};
    515 }
    516 
    517 Status<void> ProducerChannel::OnConsumerRelease(Message&,
    518                                                 LocalFence release_fence) {
    519   ATRACE_NAME("ProducerChannel::OnConsumerRelease");
    520   ALOGD_IF(TRACE, "ProducerChannel::OnConsumerRelease: buffer_id=%d",
    521            buffer_id());
    522 
    523   // Attempt to merge the fences if necessary.
    524   if (release_fence) {
    525     if (returned_fence_) {
    526       LocalFence merged_fence(sync_merge("bufferhub_merged",
    527                                          returned_fence_.get_fd(),
    528                                          release_fence.get_fd()));
    529       const int error = errno;
    530       if (!merged_fence) {
    531         ALOGE("ProducerChannel::OnConsumerRelease: Failed to merge fences: %s",
    532               strerror(error));
    533         return ErrorStatus(error);
    534       }
    535       returned_fence_ = std::move(merged_fence);
    536     } else {
    537       returned_fence_ = std::move(release_fence);
    538     }
    539   }
    540 
    541   if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
    542     buffer_state_->store(0U);
    543     SignalAvailable();
    544     if (orphaned_consumer_bit_mask_) {
    545       ALOGW(
    546           "%s: orphaned buffer detected during the this acquire/release cycle: "
    547           "id=%d orphaned=0x%" PRIx32 " queue_index=%" PRId64 ".",
    548           __FUNCTION__, buffer_id(), orphaned_consumer_bit_mask_,
    549           metadata_header_->queueIndex);
    550       orphaned_consumer_bit_mask_ = 0;
    551     }
    552   }
    553 
    554   return {};
    555 }
    556 
    557 void ProducerChannel::OnConsumerOrphaned(const uint32_t& consumer_state_mask) {
    558   // Remember the ignored consumer so that newly added consumer won't be
    559   // taking the same state mask as this orphaned consumer.
    560   ALOGE_IF(orphaned_consumer_bit_mask_ & consumer_state_mask,
    561            "%s: Consumer (consumer_state_mask=%" PRIx32
    562            ") is already orphaned.",
    563            __FUNCTION__, consumer_state_mask);
    564   orphaned_consumer_bit_mask_ |= consumer_state_mask;
    565 
    566   if (IsBufferReleasedByAllActiveClientsExceptForOrphans()) {
    567     buffer_state_->store(0U);
    568     SignalAvailable();
    569   }
    570 
    571   // Atomically clear the fence state bit as an orphaned consumer will never
    572   // signal a release fence.
    573   fence_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
    574 
    575   // Atomically set the buffer state of this consumer to released state.
    576   buffer_state_->fetch_and(~consumer_state_mask, std::memory_order_release);
    577 
    578   ALOGW(
    579       "%s: detected new orphaned consumer buffer_id=%d "
    580       "consumer_state_mask=%" PRIx32 " queue_index=%" PRId64
    581       " buffer_state=%" PRIx32 " fence_state=%" PRIx32 ".",
    582       __FUNCTION__, buffer_id(), consumer_state_mask,
    583       metadata_header_->queueIndex,
    584       buffer_state_->load(std::memory_order_acquire),
    585       fence_state_->load(std::memory_order_acquire));
    586 }
    587 
    588 void ProducerChannel::AddConsumer(ConsumerChannel* channel) {
    589   consumer_channels_.push_back(channel);
    590 }
    591 
    592 void ProducerChannel::RemoveConsumer(ConsumerChannel* channel) {
    593   consumer_channels_.erase(
    594       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
    595   // Restore the consumer state bit and make it visible in other threads that
    596   // acquire the active_clients_bit_mask_.
    597   uint32_t consumer_state_mask = channel->client_state_mask();
    598   uint32_t current_active_clients_bit_mask =
    599       active_clients_bit_mask_->load(std::memory_order_acquire);
    600   uint32_t updated_active_clients_bit_mask =
    601       current_active_clients_bit_mask & (~consumer_state_mask);
    602   while (!active_clients_bit_mask_->compare_exchange_weak(
    603       current_active_clients_bit_mask, updated_active_clients_bit_mask,
    604       std::memory_order_acq_rel, std::memory_order_acquire)) {
    605     ALOGI(
    606         "%s: Failed to remove consumer state mask. Current active clients bit "
    607         "mask is changed to %" PRIx32
    608         " when trying to acquire and modify it to %" PRIx32
    609         ". About to try again.",
    610         __FUNCTION__, current_active_clients_bit_mask,
    611         updated_active_clients_bit_mask);
    612     updated_active_clients_bit_mask =
    613         current_active_clients_bit_mask & (~consumer_state_mask);
    614   }
    615 
    616   const uint32_t current_buffer_state =
    617       buffer_state_->load(std::memory_order_acquire);
    618   if (BufferHubDefs::isClientPosted(current_buffer_state,
    619                                     consumer_state_mask) ||
    620       BufferHubDefs::isClientAcquired(current_buffer_state,
    621                                       consumer_state_mask)) {
    622     // The consumer client is being destoryed without releasing. This could
    623     // happen in corner cases when the consumer crashes. Here we mark it
    624     // orphaned before remove it from producer.
    625     OnConsumerOrphaned(consumer_state_mask);
    626     return;
    627   }
    628 
    629   if (BufferHubDefs::isClientReleased(current_buffer_state,
    630                                       consumer_state_mask) ||
    631       BufferHubDefs::isAnyClientGained(current_buffer_state)) {
    632     // The consumer is being close while it is suppose to signal a release
    633     // fence. Signal the dummy fence here.
    634     if (fence_state_->load(std::memory_order_acquire) & consumer_state_mask) {
    635       epoll_event event;
    636       event.events = EPOLLIN;
    637       event.data.u32 = consumer_state_mask;
    638       if (epoll_ctl(release_fence_fd_.Get(), EPOLL_CTL_MOD,
    639                     dummy_fence_fd_.Get(), &event) < 0) {
    640         ALOGE(
    641             "%s: Failed to modify the shared release fence to include the "
    642             "dummy fence: %s",
    643             __FUNCTION__, strerror(errno));
    644         return;
    645       }
    646       ALOGW("%s: signal dummy release fence buffer_id=%d", __FUNCTION__,
    647             buffer_id());
    648       eventfd_write(dummy_fence_fd_.Get(), 1);
    649     }
    650   }
    651 }
    652 
    653 // Returns true if the given parameters match the underlying buffer
    654 // parameters.
    655 bool ProducerChannel::CheckParameters(uint32_t width, uint32_t height,
    656                                       uint32_t layer_count, uint32_t format,
    657                                       uint64_t usage,
    658                                       size_t user_metadata_size) const {
    659   return user_metadata_size == user_metadata_size_ &&
    660          buffer_.width() == width && buffer_.height() == height &&
    661          buffer_.layer_count() == layer_count && buffer_.format() == format &&
    662          buffer_.usage() == usage;
    663 }
    664 
    665 bool ProducerChannel::IsBufferReleasedByAllActiveClientsExceptForOrphans()
    666     const {
    667   return (buffer_state_->load(std::memory_order_acquire) &
    668           ~orphaned_consumer_bit_mask_ &
    669           active_clients_bit_mask_->load(std::memory_order_acquire)) == 0U;
    670 }
    671 
    672 }  // namespace dvr
    673 }  // namespace android
    674