Home | History | Annotate | Download | only in bufferhubd
      1 #include <pdx/channel_handle.h>
      2 #include <private/dvr/consumer_queue_channel.h>
      3 #include <private/dvr/producer_channel.h>
      4 
      5 using android::pdx::ErrorStatus;
      6 using android::pdx::RemoteChannelHandle;
      7 using android::pdx::Status;
      8 using android::pdx::rpc::DispatchRemoteMethod;
      9 using android::pdx::rpc::RemoteMethodError;
     10 
     11 namespace android {
     12 namespace dvr {
     13 
     14 ConsumerQueueChannel::ConsumerQueueChannel(
     15     BufferHubService* service, int buffer_id, int channel_id,
     16     const std::shared_ptr<Channel>& producer, bool silent)
     17     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
     18       producer_(producer),
     19       capacity_(0),
     20       silent_(silent) {
     21   GetProducer()->AddConsumer(this);
     22 }
     23 
     24 ConsumerQueueChannel::~ConsumerQueueChannel() {
     25   ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
     26            channel_id());
     27 
     28   if (auto producer = GetProducer()) {
     29     producer->RemoveConsumer(this);
     30   }
     31 }
     32 
     33 bool ConsumerQueueChannel::HandleMessage(Message& message) {
     34   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
     35   auto producer = GetProducer();
     36   if (!producer) {
     37     RemoteMethodError(message, EPIPE);
     38     return true;
     39   }
     40 
     41   switch (message.GetOp()) {
     42     case BufferHubRPC::CreateConsumerQueue::Opcode:
     43       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
     44           *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
     45       return true;
     46 
     47     case BufferHubRPC::GetQueueInfo::Opcode:
     48       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
     49           *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
     50       return true;
     51 
     52     case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
     53       DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
     54           *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
     55       return true;
     56 
     57     default:
     58       return false;
     59   }
     60 }
     61 
     62 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
     63     const {
     64   return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
     65 }
     66 
     67 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
     68   ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
     69 }
     70 
     71 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
     72   BufferHubChannel::BufferInfo info;
     73   if (auto producer = GetProducer()) {
     74     // If producer has not hung up, copy most buffer info from the producer.
     75     info = producer->GetBufferInfo();
     76   }
     77   info.id = buffer_id();
     78   info.capacity = capacity_;
     79   return info;
     80 }
     81 
     82 void ConsumerQueueChannel::RegisterNewBuffer(
     83     const std::shared_ptr<ProducerChannel>& producer_channel,
     84     size_t producer_slot) {
     85   ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d",
     86            __FUNCTION__, buffer_id(), producer_channel->buffer_id(),
     87            producer_slot, silent_);
     88   // Only register buffers if the queue is not silent.
     89   if (silent_) {
     90     return;
     91   }
     92 
     93   auto status = producer_channel->CreateConsumerStateMask();
     94   if (!status.ok()) {
     95     ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__,
     96           status.GetErrorMessage().c_str());
     97     return;
     98   }
     99   uint64_t consumer_state_mask = status.get();
    100 
    101   pending_buffer_slots_.emplace(producer_channel, producer_slot,
    102                                 consumer_state_mask);
    103   // Signal the client that there is new buffer available.
    104   SignalAvailable();
    105 }
    106 
    107 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
    108 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
    109   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
    110   ATRACE_NAME(__FUNCTION__);
    111   ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__,
    112            pending_buffer_slots_.size());
    113 
    114   // Indicate this is a silent queue that will not import buffers.
    115   if (silent_)
    116     return ErrorStatus(EBADR);
    117 
    118   while (!pending_buffer_slots_.empty()) {
    119     auto producer_channel =
    120         pending_buffer_slots_.front().producer_channel.lock();
    121     size_t producer_slot = pending_buffer_slots_.front().producer_slot;
    122     uint64_t consumer_state_mask =
    123         pending_buffer_slots_.front().consumer_state_mask;
    124     pending_buffer_slots_.pop();
    125 
    126     // It's possible that the producer channel has expired. When this occurs,
    127     // ignore the producer channel.
    128     if (producer_channel == nullptr) {
    129       ALOGW("%s: producer channel has already been expired.", __FUNCTION__);
    130       continue;
    131     }
    132 
    133     auto status =
    134         producer_channel->CreateConsumer(message, consumer_state_mask);
    135 
    136     // If no buffers are imported successfully, clear available and return an
    137     // error. Otherwise, return all consumer handles already imported
    138     // successfully, but keep available bits on, so that the client can retry
    139     // importing remaining consumer buffers.
    140     if (!status) {
    141       ALOGE("%s: Failed create consumer: %s", __FUNCTION__,
    142             status.GetErrorMessage().c_str());
    143       if (buffer_handles.empty()) {
    144         ClearAvailable();
    145         return status.error_status();
    146       } else {
    147         return {std::move(buffer_handles)};
    148       }
    149     }
    150 
    151     buffer_handles.emplace_back(status.take(), producer_slot);
    152   }
    153 
    154   ClearAvailable();
    155   return {std::move(buffer_handles)};
    156 }
    157 
    158 void ConsumerQueueChannel::OnProducerClosed() {
    159   ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
    160            buffer_id());
    161   producer_.reset();
    162   Hangup();
    163 }
    164 
    165 }  // namespace dvr
    166 }  // namespace android
    167