Home | History | Annotate | Download | only in bufferhubd
      1 #include "producer_queue_channel.h"
      2 
      3 #include <inttypes.h>
      4 
      5 #include "consumer_queue_channel.h"
      6 #include "producer_channel.h"
      7 
      8 using android::pdx::ErrorStatus;
      9 using android::pdx::Message;
     10 using android::pdx::RemoteChannelHandle;
     11 using android::pdx::Status;
     12 using android::pdx::rpc::DispatchRemoteMethod;
     13 
     14 namespace android {
     15 namespace dvr {
     16 
     17 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
     18                                            int channel_id,
     19                                            const ProducerQueueConfig& config,
     20                                            const UsagePolicy& usage_policy,
     21                                            int* error)
     22     : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
     23       config_(config),
     24       usage_policy_(usage_policy),
     25       capacity_(0) {
     26   *error = 0;
     27 }
     28 
     29 ProducerQueueChannel::~ProducerQueueChannel() {
     30   ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
     31            buffer_id());
     32   for (auto* consumer : consumer_channels_)
     33     consumer->OnProducerClosed();
     34 }
     35 
     36 /* static */
     37 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
     38     BufferHubService* service, int channel_id,
     39     const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
     40   // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
     41   // should be mutually exclusive.
     42   if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
     43     ALOGE(
     44         "BufferHubService::OnCreateProducerQueue: illegal usage mask "
     45         "configuration: usage_deny_set_mask=%" PRIx64
     46         " usage_deny_clear_mask=%" PRIx64,
     47         usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
     48     return ErrorStatus(EINVAL);
     49   }
     50 
     51   int error = 0;
     52   std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
     53       service, channel_id, config, usage_policy, &error));
     54   if (error < 0)
     55     return ErrorStatus(-error);
     56   else
     57     return {std::move(producer)};
     58 }
     59 
     60 bool ProducerQueueChannel::HandleMessage(Message& message) {
     61   ATRACE_NAME("ProducerQueueChannel::HandleMessage");
     62   switch (message.GetOp()) {
     63     case BufferHubRPC::CreateConsumerQueue::Opcode:
     64       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
     65           *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
     66       return true;
     67 
     68     case BufferHubRPC::GetQueueInfo::Opcode:
     69       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
     70           *this, &ProducerQueueChannel::OnGetQueueInfo, message);
     71       return true;
     72 
     73     case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
     74       DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
     75           *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
     76           message);
     77       return true;
     78 
     79     case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
     80       DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
     81           *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
     82       return true;
     83 
     84     default:
     85       return false;
     86   }
     87 }
     88 
     89 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
     90   ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
     91 }
     92 
     93 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
     94   return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
     95                     usage_policy_);
     96 }
     97 
     98 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
     99     Message& message, bool silent) {
    100   ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
    101   ALOGD_IF(
    102       TRACE,
    103       "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
    104       channel_id(), silent);
    105 
    106   int channel_id;
    107   auto status = message.PushChannel(0, nullptr, &channel_id);
    108   if (!status) {
    109     ALOGE(
    110         "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
    111         "channel: %s",
    112         status.GetErrorMessage().c_str());
    113     return ErrorStatus(ENOMEM);
    114   }
    115 
    116   auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
    117       service(), buffer_id(), channel_id, shared_from_this(), silent);
    118 
    119   // Register the existing buffers with the new consumer queue.
    120   for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    121     if (auto buffer = buffers_[slot].lock())
    122       consumer_queue_channel->RegisterNewBuffer(buffer, slot);
    123   }
    124 
    125   const auto channel_status =
    126       service()->SetChannel(channel_id, consumer_queue_channel);
    127   if (!channel_status) {
    128     ALOGE(
    129         "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
    130         "%s",
    131         channel_status.GetErrorMessage().c_str());
    132     return ErrorStatus(ENOMEM);
    133   }
    134 
    135   return {status.take()};
    136 }
    137 
    138 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
    139   return {{config_, buffer_id()}};
    140 }
    141 
    142 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
    143 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
    144     Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
    145     uint32_t format, uint64_t usage, size_t buffer_count) {
    146   ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
    147   ALOGD_IF(TRACE,
    148            "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
    149            "producer_channel_id=%d",
    150            channel_id());
    151 
    152   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
    153 
    154   // Deny buffer allocation violating preset rules.
    155   if (usage & usage_policy_.usage_deny_set_mask) {
    156     ALOGE(
    157         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
    158         " is not permitted. Violating usage_deny_set_mask, the following  bits "
    159         "shall not be set: %" PRIx64 ".",
    160         usage, usage_policy_.usage_deny_set_mask);
    161     return ErrorStatus(EINVAL);
    162   }
    163 
    164   if (~usage & usage_policy_.usage_deny_clear_mask) {
    165     ALOGE(
    166         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
    167         " is not permitted. Violating usage_deny_clear_mask, the following "
    168         "bits must be set: %" PRIx64 ".",
    169         usage, usage_policy_.usage_deny_clear_mask);
    170     return ErrorStatus(EINVAL);
    171   }
    172 
    173   // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
    174   // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
    175   uint64_t effective_usage =
    176       (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
    177 
    178   for (size_t i = 0; i < buffer_count; i++) {
    179     auto status = AllocateBuffer(message, width, height, layer_count, format,
    180                                  effective_usage);
    181     if (!status) {
    182       ALOGE(
    183           "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
    184           "allocate new buffer.");
    185       return ErrorStatus(status.error());
    186     }
    187     buffer_handles.push_back(status.take());
    188   }
    189 
    190   return {std::move(buffer_handles)};
    191 }
    192 
    193 Status<std::pair<RemoteChannelHandle, size_t>>
    194 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
    195                                      uint32_t height, uint32_t layer_count,
    196                                      uint32_t format, uint64_t usage) {
    197   ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
    198   ALOGD_IF(TRACE,
    199            "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
    200            channel_id());
    201 
    202   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
    203     ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
    204     return ErrorStatus(E2BIG);
    205   }
    206 
    207   // Here we are creating a new BufferHubBuffer, initialize the producer
    208   // channel, and returning its file handle back to the client.
    209   // buffer_id is the id of the producer channel of BufferHubBuffer.
    210   int buffer_id;
    211   auto status = message.PushChannel(0, nullptr, &buffer_id);
    212 
    213   if (!status) {
    214     ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
    215           status.GetErrorMessage().c_str());
    216     return ErrorStatus(status.error());
    217   }
    218 
    219   ALOGD_IF(TRACE,
    220            "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
    221            "height=%u layer_count=%u format=%u usage=%" PRIx64,
    222            buffer_id, width, height, layer_count, format, usage);
    223   auto buffer_handle = status.take();
    224 
    225   auto producer_channel_status =
    226       ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
    227                               format, usage, config_.user_metadata_size);
    228   if (!producer_channel_status) {
    229     ALOGE(
    230         "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
    231         "buffer: %s",
    232         producer_channel_status.GetErrorMessage().c_str());
    233     return ErrorStatus(ENOMEM);
    234   }
    235   auto producer_channel = producer_channel_status.take();
    236 
    237   ALOGD_IF(
    238       TRACE,
    239       "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
    240       buffer_id, buffer_handle.value());
    241 
    242   const auto channel_status =
    243       service()->SetChannel(buffer_id, producer_channel);
    244   if (!channel_status) {
    245     ALOGE(
    246         "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
    247         "for new BufferHubBuffer: %s",
    248         channel_status.GetErrorMessage().c_str());
    249     return ErrorStatus(ENOMEM);
    250   }
    251 
    252   // Register the newly allocated buffer's channel_id into the first empty
    253   // buffer slot.
    254   size_t slot = 0;
    255   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    256     if (buffers_[slot].expired())
    257       break;
    258   }
    259   if (slot == BufferHubRPC::kMaxQueueCapacity) {
    260     ALOGE(
    261         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
    262         "buffer allocation.");
    263     return ErrorStatus(E2BIG);
    264   }
    265 
    266   buffers_[slot] = producer_channel;
    267   capacity_++;
    268 
    269   // Notify each consumer channel about the new buffer.
    270   for (auto* consumer_channel : consumer_channels_) {
    271     ALOGD(
    272         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
    273         "buffer, buffer_id=%d",
    274         buffer_id);
    275     consumer_channel->RegisterNewBuffer(producer_channel, slot);
    276   }
    277 
    278   return {{std::move(buffer_handle), slot}};
    279 }
    280 
    281 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
    282     Message& /*message*/, size_t slot) {
    283   if (buffers_[slot].expired()) {
    284     ALOGE(
    285         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
    286         "an invalid buffer producer at slot %zu",
    287         slot);
    288     return ErrorStatus(EINVAL);
    289   }
    290 
    291   if (capacity_ == 0) {
    292     ALOGE(
    293         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
    294         "buffer producer while the queue's capacity is already zero.");
    295     return ErrorStatus(EINVAL);
    296   }
    297 
    298   buffers_[slot].reset();
    299   capacity_--;
    300   return {};
    301 }
    302 
    303 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
    304   consumer_channels_.push_back(channel);
    305 }
    306 
    307 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
    308   consumer_channels_.erase(
    309       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
    310 }
    311 
    312 }  // namespace dvr
    313 }  // namespace android
    314