Home | History | Annotate | Download | only in bufferhubd
      1 #include <inttypes.h>
      2 
      3 #include <private/dvr/consumer_queue_channel.h>
      4 #include <private/dvr/producer_channel.h>
      5 #include <private/dvr/producer_queue_channel.h>
      6 
      7 using android::pdx::ErrorStatus;
      8 using android::pdx::Message;
      9 using android::pdx::RemoteChannelHandle;
     10 using android::pdx::Status;
     11 using android::pdx::rpc::DispatchRemoteMethod;
     12 
     13 namespace android {
     14 namespace dvr {
     15 
     16 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service,
     17                                            int channel_id,
     18                                            const ProducerQueueConfig& config,
     19                                            const UsagePolicy& usage_policy,
     20                                            int* error)
     21     : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType),
     22       config_(config),
     23       usage_policy_(usage_policy),
     24       capacity_(0) {
     25   *error = 0;
     26 }
     27 
     28 ProducerQueueChannel::~ProducerQueueChannel() {
     29   ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d",
     30            buffer_id());
     31   for (auto* consumer : consumer_channels_)
     32     consumer->OnProducerClosed();
     33 }
     34 
     35 /* static */
     36 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create(
     37     BufferHubService* service, int channel_id,
     38     const ProducerQueueConfig& config, const UsagePolicy& usage_policy) {
     39   // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask|
     40   // should be mutually exclusive.
     41   if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) {
     42     ALOGE(
     43         "BufferHubService::OnCreateProducerQueue: illegal usage mask "
     44         "configuration: usage_deny_set_mask=%" PRIx64
     45         " usage_deny_clear_mask=%" PRIx64,
     46         usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask);
     47     return ErrorStatus(EINVAL);
     48   }
     49 
     50   int error = 0;
     51   std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel(
     52       service, channel_id, config, usage_policy, &error));
     53   if (error < 0)
     54     return ErrorStatus(-error);
     55   else
     56     return {std::move(producer)};
     57 }
     58 
     59 bool ProducerQueueChannel::HandleMessage(Message& message) {
     60   ATRACE_NAME("ProducerQueueChannel::HandleMessage");
     61   switch (message.GetOp()) {
     62     case BufferHubRPC::CreateConsumerQueue::Opcode:
     63       DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>(
     64           *this, &ProducerQueueChannel::OnCreateConsumerQueue, message);
     65       return true;
     66 
     67     case BufferHubRPC::GetQueueInfo::Opcode:
     68       DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>(
     69           *this, &ProducerQueueChannel::OnGetQueueInfo, message);
     70       return true;
     71 
     72     case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode:
     73       DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>(
     74           *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers,
     75           message);
     76       return true;
     77 
     78     case BufferHubRPC::ProducerQueueInsertBuffer::Opcode:
     79       DispatchRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>(
     80           *this, &ProducerQueueChannel::OnProducerQueueInsertBuffer, message);
     81       return true;
     82 
     83     case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode:
     84       DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>(
     85           *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message);
     86       return true;
     87 
     88     default:
     89       return false;
     90   }
     91 }
     92 
     93 void ProducerQueueChannel::HandleImpulse(Message& /* message */) {
     94   ATRACE_NAME("ProducerQueueChannel::HandleImpulse");
     95 }
     96 
     97 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const {
     98   return BufferInfo(channel_id(), consumer_channels_.size(), capacity_,
     99                     usage_policy_);
    100 }
    101 
    102 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue(
    103     Message& message, bool silent) {
    104   ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue");
    105   ALOGD_IF(
    106       TRACE,
    107       "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d",
    108       channel_id(), silent);
    109 
    110   int channel_id;
    111   auto status = message.PushChannel(0, nullptr, &channel_id);
    112   if (!status) {
    113     ALOGE(
    114         "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer "
    115         "channel: %s",
    116         status.GetErrorMessage().c_str());
    117     return ErrorStatus(ENOMEM);
    118   }
    119 
    120   auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>(
    121       service(), buffer_id(), channel_id, shared_from_this(), silent);
    122 
    123   // Register the existing buffers with the new consumer queue.
    124   for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    125     if (auto buffer = buffers_[slot].lock())
    126       consumer_queue_channel->RegisterNewBuffer(buffer, slot);
    127   }
    128 
    129   const auto channel_status =
    130       service()->SetChannel(channel_id, consumer_queue_channel);
    131   if (!channel_status) {
    132     ALOGE(
    133         "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: "
    134         "%s",
    135         channel_status.GetErrorMessage().c_str());
    136     return ErrorStatus(ENOMEM);
    137   }
    138 
    139   return {status.take()};
    140 }
    141 
    142 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) {
    143   return {{config_, buffer_id()}};
    144 }
    145 
    146 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>>
    147 ProducerQueueChannel::OnProducerQueueAllocateBuffers(
    148     Message& message, uint32_t width, uint32_t height, uint32_t layer_count,
    149     uint32_t format, uint64_t usage, size_t buffer_count) {
    150   ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers");
    151   ALOGD_IF(TRACE,
    152            "ProducerQueueChannel::OnProducerQueueAllocateBuffers: "
    153            "producer_channel_id=%d",
    154            channel_id());
    155 
    156   std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles;
    157 
    158   // Deny buffer allocation violating preset rules.
    159   if (usage & usage_policy_.usage_deny_set_mask) {
    160     ALOGE(
    161         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
    162         " is not permitted. Violating usage_deny_set_mask, the following  bits "
    163         "shall not be set: %" PRIx64 ".",
    164         usage, usage_policy_.usage_deny_set_mask);
    165     return ErrorStatus(EINVAL);
    166   }
    167 
    168   if (~usage & usage_policy_.usage_deny_clear_mask) {
    169     ALOGE(
    170         "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64
    171         " is not permitted. Violating usage_deny_clear_mask, the following "
    172         "bits must be set: %" PRIx64 ".",
    173         usage, usage_policy_.usage_deny_clear_mask);
    174     return ErrorStatus(EINVAL);
    175   }
    176 
    177   // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_|
    178   // takes precedence and will overwrite |usage_policy_.usage_clear_mask|.
    179   uint64_t effective_usage =
    180       (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask;
    181 
    182   for (size_t i = 0; i < buffer_count; i++) {
    183     auto status = AllocateBuffer(message, width, height, layer_count, format,
    184                                  effective_usage);
    185     if (!status) {
    186       ALOGE(
    187           "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to "
    188           "allocate new buffer.");
    189       return ErrorStatus(status.error());
    190     }
    191     buffer_handles.push_back(status.take());
    192   }
    193 
    194   return {std::move(buffer_handles)};
    195 }
    196 
    197 Status<std::pair<RemoteChannelHandle, size_t>>
    198 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width,
    199                                      uint32_t height, uint32_t layer_count,
    200                                      uint32_t format, uint64_t usage) {
    201   ATRACE_NAME("ProducerQueueChannel::AllocateBuffer");
    202   ALOGD_IF(TRACE,
    203            "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d",
    204            channel_id());
    205 
    206   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
    207     ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity.");
    208     return ErrorStatus(E2BIG);
    209   }
    210 
    211   // Here we are creating a new BufferHubBuffer, initialize the producer
    212   // channel, and returning its file handle back to the client.
    213   // buffer_id is the id of the producer channel of BufferHubBuffer.
    214   int buffer_id;
    215   auto status = message.PushChannel(0, nullptr, &buffer_id);
    216 
    217   if (!status) {
    218     ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s",
    219           status.GetErrorMessage().c_str());
    220     return ErrorStatus(status.error());
    221   }
    222 
    223   ALOGD_IF(TRACE,
    224            "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u "
    225            "height=%u layer_count=%u format=%u usage=%" PRIx64,
    226            buffer_id, width, height, layer_count, format, usage);
    227   auto buffer_handle = status.take();
    228 
    229   auto producer_channel_status =
    230       ProducerChannel::Create(service(), buffer_id, width, height, layer_count,
    231                               format, usage, config_.user_metadata_size);
    232   if (!producer_channel_status) {
    233     ALOGE(
    234         "ProducerQueueChannel::AllocateBuffer: Failed to create producer "
    235         "buffer: %s",
    236         producer_channel_status.GetErrorMessage().c_str());
    237     return ErrorStatus(ENOMEM);
    238   }
    239   auto producer_channel = producer_channel_status.take();
    240 
    241   ALOGD_IF(
    242       TRACE,
    243       "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d",
    244       buffer_id, buffer_handle.value());
    245 
    246   const auto channel_status =
    247       service()->SetChannel(buffer_id, producer_channel);
    248   if (!channel_status) {
    249     ALOGE(
    250         "ProducerQueueChannel::AllocateBuffer: failed to set producer channel "
    251         "for new BufferHubBuffer: %s",
    252         channel_status.GetErrorMessage().c_str());
    253     return ErrorStatus(ENOMEM);
    254   }
    255 
    256   // Register the newly allocated buffer's channel_id into the first empty
    257   // buffer slot.
    258   size_t slot = 0;
    259   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    260     if (buffers_[slot].expired())
    261       break;
    262   }
    263   if (slot == BufferHubRPC::kMaxQueueCapacity) {
    264     ALOGE(
    265         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
    266         "buffer allocation.");
    267     return ErrorStatus(E2BIG);
    268   }
    269 
    270   buffers_[slot] = producer_channel;
    271   capacity_++;
    272 
    273   // Notify each consumer channel about the new buffer.
    274   for (auto* consumer_channel : consumer_channels_) {
    275     ALOGD(
    276         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
    277         "buffer, buffer_id=%d",
    278         buffer_id);
    279     consumer_channel->RegisterNewBuffer(producer_channel, slot);
    280   }
    281 
    282   return {{std::move(buffer_handle), slot}};
    283 }
    284 
    285 Status<size_t> ProducerQueueChannel::OnProducerQueueInsertBuffer(
    286     pdx::Message& message, int buffer_cid) {
    287   ATRACE_NAME("ProducerQueueChannel::InsertBuffer");
    288   ALOGD_IF(TRACE,
    289            "ProducerQueueChannel::InsertBuffer: channel_id=%d, buffer_cid=%d",
    290            channel_id(), buffer_cid);
    291 
    292   if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) {
    293     ALOGE("ProducerQueueChannel::InsertBuffer: reaches kMaxQueueCapacity.");
    294     return ErrorStatus(E2BIG);
    295   }
    296   auto producer_channel = std::static_pointer_cast<ProducerChannel>(
    297       service()->GetChannel(buffer_cid));
    298   if (producer_channel == nullptr ||
    299       producer_channel->channel_type() != BufferHubChannel::kProducerType) {
    300     // Rejects the request if the requested buffer channel is invalid and/or
    301     // it's not a ProducerChannel.
    302     ALOGE(
    303         "ProducerQueueChannel::InsertBuffer: Invalid buffer_cid=%d, "
    304         "producer_buffer=0x%p, channel_type=%d.",
    305         buffer_cid, producer_channel.get(),
    306         producer_channel == nullptr ? -1 : producer_channel->channel_type());
    307     return ErrorStatus(EINVAL);
    308   }
    309   if (producer_channel->GetActiveProcessId() != message.GetProcessId()) {
    310     // Rejects the request if the requested buffer channel is not currently
    311     // connected to the caller this is IPC request. This effectively prevents
    312     // fake buffer_cid from being injected.
    313     ALOGE(
    314         "ProducerQueueChannel::InsertBuffer: Requested buffer channel "
    315         "(buffer_cid=%d) is not connected to the calling process (pid=%d). "
    316         "It's connected to a different process (pid=%d).",
    317         buffer_cid, message.GetProcessId(),
    318         producer_channel->GetActiveProcessId());
    319     return ErrorStatus(EINVAL);
    320   }
    321   uint64_t buffer_state = producer_channel->buffer_state();
    322   // TODO(b/112007999) add an atomic variable in metadata header in shared
    323   // memory to indicate which client is the last producer of the buffer.
    324   // Currently, the first client is the only producer to the buffer.
    325   // Thus, it checks whether the first client gains the buffer below.
    326   if (!BufferHubDefs::isClientGained(buffer_state,
    327                                      BufferHubDefs::kFirstClientBitMask)) {
    328     // Rejects the request if the requested buffer is not in Gained state.
    329     ALOGE(
    330         "ProducerQueueChannel::InsertBuffer: The buffer (cid=%d, "
    331         "state=0x%" PRIx64 ") is not in gained state.",
    332         buffer_cid, buffer_state);
    333     return ErrorStatus(EINVAL);
    334   }
    335 
    336   // Register the to-be-inserted buffer's channel_id into the first empty
    337   // buffer slot.
    338   size_t slot = 0;
    339   for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) {
    340     if (buffers_[slot].expired())
    341       break;
    342   }
    343   if (slot == BufferHubRPC::kMaxQueueCapacity) {
    344     ALOGE(
    345         "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new "
    346         "buffer allocation.");
    347     return ErrorStatus(E2BIG);
    348   }
    349 
    350   buffers_[slot] = producer_channel;
    351   capacity_++;
    352 
    353   // Notify each consumer channel about the new buffer.
    354   for (auto* consumer_channel : consumer_channels_) {
    355     ALOGD(
    356         "ProducerQueueChannel::AllocateBuffer: Notified consumer with new "
    357         "buffer, buffer_cid=%d",
    358         buffer_cid);
    359     consumer_channel->RegisterNewBuffer(producer_channel, slot);
    360   }
    361 
    362   return {slot};
    363 }
    364 
    365 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer(
    366     Message& /*message*/, size_t slot) {
    367   if (buffers_[slot].expired()) {
    368     ALOGE(
    369         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove "
    370         "an invalid buffer producer at slot %zu",
    371         slot);
    372     return ErrorStatus(EINVAL);
    373   }
    374 
    375   if (capacity_ == 0) {
    376     ALOGE(
    377         "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a "
    378         "buffer producer while the queue's capacity is already zero.");
    379     return ErrorStatus(EINVAL);
    380   }
    381 
    382   buffers_[slot].reset();
    383   capacity_--;
    384   return {};
    385 }
    386 
    387 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) {
    388   consumer_channels_.push_back(channel);
    389 }
    390 
    391 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) {
    392   consumer_channels_.erase(
    393       std::find(consumer_channels_.begin(), consumer_channels_.end(), channel));
    394 }
    395 
    396 }  // namespace dvr
    397 }  // namespace android
    398