Home | History | Annotate | Download | only in bufferhubd
      1 #include "consumer_queue_channel.h"
      2 
      3 #include <pdx/channel_handle.h>
      4 
      5 #include "producer_channel.h"
      6 
      7 using android::pdx::ErrorStatus;
      8 using android::pdx::RemoteChannelHandle;
      9 using android::pdx::Status;
     10 using android::pdx::rpc::DispatchRemoteMethod;
     11 using android::pdx::rpc::RemoteMethodError;
     12 
     13 namespace android {
     14 namespace dvr {
     15 
     16 ConsumerQueueChannel::ConsumerQueueChannel(
     17     BufferHubService* service, int buffer_id, int channel_id,
     18     const std::shared_ptr<Channel>& producer, bool silent)
     19     : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType),
     20       producer_(producer),
     21       capacity_(0),
     22       silent_(silent) {
     23   GetProducer()->AddConsumer(this);
     24 }
     25 
     26 ConsumerQueueChannel::~ConsumerQueueChannel() {
     27   ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d",
     28            channel_id());
     29 
     30   if (auto producer = GetProducer()) {
     31     producer->RemoveConsumer(this);
     32   }
     33 }
     34 
     35 bool ConsumerQueueChannel::HandleMessage(Message& message) {
     36   ATRACE_NAME("ConsumerQueueChannel::HandleMessage");
     37   auto producer = GetProducer();
     38   if (!producer) {
     39     RemoteMethodError(message, EPIPE);
     40     return true;
     41   }
     42 
     43   switch (message.GetOp()) {
     44     case BufferHubRPC::CreateConsumerQueue::Opcode:
     45       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
     46           *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message);
     47       return true;
     48 
     49     case BufferHubRPC::GetQueueInfo::Opcode:
     50       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
     51           *producer, &ProducerQueueChannel::OnGetQueueInfo, message);
     52       return true;
     53 
     54     case BufferHubRPC::ConsumerQueueImportBuffers::Opcode:
     55       DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>(
     56           *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message);
     57       return true;
     58 
     59     default:
     60       return false;
     61   }
     62 }
     63 
     64 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer()
     65     const {
     66   return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock());
     67 }
     68 
     69 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) {
     70   ATRACE_NAME("ConsumerQueueChannel::HandleImpulse");
     71 }
     72 
     73 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const {
     74   BufferHubChannel::BufferInfo info;
     75   if (auto producer = GetProducer()) {
     76     // If producer has not hung up, copy most buffer info from the producer.
     77     info = producer->GetBufferInfo();
     78   }
     79   info.id = buffer_id();
     80   info.capacity = capacity_;
     81   return info;
     82 }
     83 
     84 void ConsumerQueueChannel::RegisterNewBuffer(
     85     const std::shared_ptr<ProducerChannel>& producer_channel, size_t slot) {
     86   ALOGD_IF(TRACE,
     87            "ConsumerQueueChannel::RegisterNewBuffer: queue_id=%d buffer_id=%d "
     88            "slot=%zu silent=%d",
     89            buffer_id(), producer_channel->buffer_id(), slot, silent_);
     90   // Only register buffers if the queue is not silent.
     91   if (!silent_) {
     92     pending_buffer_slots_.emplace(producer_channel, slot);
     93 
     94     // Signal the client that there is new buffer available.
     95     SignalAvailable();
     96   }
     97 }
     98 
     99 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
    100 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) {
    101   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
    102   ATRACE_NAME("ConsumerQueueChannel::OnConsumerQueueImportBuffers");
    103   ALOGD_IF(TRACE,
    104            "ConsumerQueueChannel::OnConsumerQueueImportBuffers: "
    105            "pending_buffer_slots=%zu",
    106            pending_buffer_slots_.size());
    107 
    108   // Indicate this is a silent queue that will not import buffers.
    109   if (silent_)
    110     return ErrorStatus(EBADR);
    111 
    112   while (!pending_buffer_slots_.empty()) {
    113     auto producer_channel = pending_buffer_slots_.front().first.lock();
    114     size_t producer_slot = pending_buffer_slots_.front().second;
    115     pending_buffer_slots_.pop();
    116 
    117     // It's possible that the producer channel has expired. When this occurs,
    118     // ignore the producer channel.
    119     if (producer_channel == nullptr) {
    120       ALOGW(
    121           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: producer "
    122           "channel has already been expired.");
    123       continue;
    124     }
    125 
    126     auto status = producer_channel->CreateConsumer(message);
    127 
    128     // If no buffers are imported successfully, clear available and return an
    129     // error. Otherwise, return all consumer handles already imported
    130     // successfully, but keep available bits on, so that the client can retry
    131     // importing remaining consumer buffers.
    132     if (!status) {
    133       ALOGE(
    134           "ConsumerQueueChannel::OnConsumerQueueImportBuffers: Failed create "
    135           "consumer: %s",
    136           status.GetErrorMessage().c_str());
    137       if (buffer_handles.empty()) {
    138         ClearAvailable();
    139         return status.error_status();
    140       } else {
    141         return {std::move(buffer_handles)};
    142       }
    143     }
    144 
    145     buffer_handles.emplace_back(status.take(), producer_slot);
    146   }
    147 
    148   ClearAvailable();
    149   return {std::move(buffer_handles)};
    150 }
    151 
    152 void ConsumerQueueChannel::OnProducerClosed() {
    153   ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d",
    154            buffer_id());
    155   producer_.reset();
    156   Hangup();
    157 }
    158 
    159 }  // namespace dvr
    160 }  // namespace android
    161