1 #include <inttypes.h> 2 3 #include <private/dvr/consumer_queue_channel.h> 4 #include <private/dvr/producer_channel.h> 5 #include <private/dvr/producer_queue_channel.h> 6 7 using android::pdx::ErrorStatus; 8 using android::pdx::Message; 9 using android::pdx::RemoteChannelHandle; 10 using android::pdx::Status; 11 using android::pdx::rpc::DispatchRemoteMethod; 12 13 namespace android { 14 namespace dvr { 15 16 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service, 17 int channel_id, 18 const ProducerQueueConfig& config, 19 const UsagePolicy& usage_policy, 20 int* error) 21 : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType), 22 config_(config), 23 usage_policy_(usage_policy), 24 capacity_(0) { 25 *error = 0; 26 } 27 28 ProducerQueueChannel::~ProducerQueueChannel() { 29 ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d", 30 buffer_id()); 31 for (auto* consumer : consumer_channels_) 32 consumer->OnProducerClosed(); 33 } 34 35 /* static */ 36 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create( 37 BufferHubService* service, int channel_id, 38 const ProducerQueueConfig& config, const UsagePolicy& usage_policy) { 39 // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask| 40 // should be mutually exclusive. 41 if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) { 42 ALOGE( 43 "BufferHubService::OnCreateProducerQueue: illegal usage mask " 44 "configuration: usage_deny_set_mask=%" PRIx64 45 " usage_deny_clear_mask=%" PRIx64, 46 usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask); 47 return ErrorStatus(EINVAL); 48 } 49 50 int error = 0; 51 std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel( 52 service, channel_id, config, usage_policy, &error)); 53 if (error < 0) 54 return ErrorStatus(-error); 55 else 56 return {std::move(producer)}; 57 } 58 59 bool ProducerQueueChannel::HandleMessage(Message& message) { 60 ATRACE_NAME("ProducerQueueChannel::HandleMessage"); 61 switch (message.GetOp()) { 62 case BufferHubRPC::CreateConsumerQueue::Opcode: 63 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( 64 *this, &ProducerQueueChannel::OnCreateConsumerQueue, message); 65 return true; 66 67 case BufferHubRPC::GetQueueInfo::Opcode: 68 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( 69 *this, &ProducerQueueChannel::OnGetQueueInfo, message); 70 return true; 71 72 case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode: 73 DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( 74 *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers, 75 message); 76 return true; 77 78 case BufferHubRPC::ProducerQueueInsertBuffer::Opcode: 79 DispatchRemoteMethod<BufferHubRPC::ProducerQueueInsertBuffer>( 80 *this, &ProducerQueueChannel::OnProducerQueueInsertBuffer, message); 81 return true; 82 83 case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode: 84 DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>( 85 *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message); 86 return true; 87 88 default: 89 return false; 90 } 91 } 92 93 void ProducerQueueChannel::HandleImpulse(Message& /* message */) { 94 ATRACE_NAME("ProducerQueueChannel::HandleImpulse"); 95 } 96 97 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const { 98 return BufferInfo(channel_id(), consumer_channels_.size(), capacity_, 99 usage_policy_); 100 } 101 102 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue( 103 Message& message, bool silent) { 104 ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue"); 105 ALOGD_IF( 106 TRACE, 107 "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d", 108 channel_id(), silent); 109 110 int channel_id; 111 auto status = message.PushChannel(0, nullptr, &channel_id); 112 if (!status) { 113 ALOGE( 114 "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer " 115 "channel: %s", 116 status.GetErrorMessage().c_str()); 117 return ErrorStatus(ENOMEM); 118 } 119 120 auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>( 121 service(), buffer_id(), channel_id, shared_from_this(), silent); 122 123 // Register the existing buffers with the new consumer queue. 124 for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { 125 if (auto buffer = buffers_[slot].lock()) 126 consumer_queue_channel->RegisterNewBuffer(buffer, slot); 127 } 128 129 const auto channel_status = 130 service()->SetChannel(channel_id, consumer_queue_channel); 131 if (!channel_status) { 132 ALOGE( 133 "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: " 134 "%s", 135 channel_status.GetErrorMessage().c_str()); 136 return ErrorStatus(ENOMEM); 137 } 138 139 return {status.take()}; 140 } 141 142 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) { 143 return {{config_, buffer_id()}}; 144 } 145 146 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> 147 ProducerQueueChannel::OnProducerQueueAllocateBuffers( 148 Message& message, uint32_t width, uint32_t height, uint32_t layer_count, 149 uint32_t format, uint64_t usage, size_t buffer_count) { 150 ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers"); 151 ALOGD_IF(TRACE, 152 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: " 153 "producer_channel_id=%d", 154 channel_id()); 155 156 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; 157 158 // Deny buffer allocation violating preset rules. 159 if (usage & usage_policy_.usage_deny_set_mask) { 160 ALOGE( 161 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64 162 " is not permitted. Violating usage_deny_set_mask, the following bits " 163 "shall not be set: %" PRIx64 ".", 164 usage, usage_policy_.usage_deny_set_mask); 165 return ErrorStatus(EINVAL); 166 } 167 168 if (~usage & usage_policy_.usage_deny_clear_mask) { 169 ALOGE( 170 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64 171 " is not permitted. Violating usage_deny_clear_mask, the following " 172 "bits must be set: %" PRIx64 ".", 173 usage, usage_policy_.usage_deny_clear_mask); 174 return ErrorStatus(EINVAL); 175 } 176 177 // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_| 178 // takes precedence and will overwrite |usage_policy_.usage_clear_mask|. 179 uint64_t effective_usage = 180 (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask; 181 182 for (size_t i = 0; i < buffer_count; i++) { 183 auto status = AllocateBuffer(message, width, height, layer_count, format, 184 effective_usage); 185 if (!status) { 186 ALOGE( 187 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to " 188 "allocate new buffer."); 189 return ErrorStatus(status.error()); 190 } 191 buffer_handles.push_back(status.take()); 192 } 193 194 return {std::move(buffer_handles)}; 195 } 196 197 Status<std::pair<RemoteChannelHandle, size_t>> 198 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width, 199 uint32_t height, uint32_t layer_count, 200 uint32_t format, uint64_t usage) { 201 ATRACE_NAME("ProducerQueueChannel::AllocateBuffer"); 202 ALOGD_IF(TRACE, 203 "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d", 204 channel_id()); 205 206 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) { 207 ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity."); 208 return ErrorStatus(E2BIG); 209 } 210 211 // Here we are creating a new BufferHubBuffer, initialize the producer 212 // channel, and returning its file handle back to the client. 213 // buffer_id is the id of the producer channel of BufferHubBuffer. 214 int buffer_id; 215 auto status = message.PushChannel(0, nullptr, &buffer_id); 216 217 if (!status) { 218 ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s", 219 status.GetErrorMessage().c_str()); 220 return ErrorStatus(status.error()); 221 } 222 223 ALOGD_IF(TRACE, 224 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u " 225 "height=%u layer_count=%u format=%u usage=%" PRIx64, 226 buffer_id, width, height, layer_count, format, usage); 227 auto buffer_handle = status.take(); 228 229 auto producer_channel_status = 230 ProducerChannel::Create(service(), buffer_id, width, height, layer_count, 231 format, usage, config_.user_metadata_size); 232 if (!producer_channel_status) { 233 ALOGE( 234 "ProducerQueueChannel::AllocateBuffer: Failed to create producer " 235 "buffer: %s", 236 producer_channel_status.GetErrorMessage().c_str()); 237 return ErrorStatus(ENOMEM); 238 } 239 auto producer_channel = producer_channel_status.take(); 240 241 ALOGD_IF( 242 TRACE, 243 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d", 244 buffer_id, buffer_handle.value()); 245 246 const auto channel_status = 247 service()->SetChannel(buffer_id, producer_channel); 248 if (!channel_status) { 249 ALOGE( 250 "ProducerQueueChannel::AllocateBuffer: failed to set producer channel " 251 "for new BufferHubBuffer: %s", 252 channel_status.GetErrorMessage().c_str()); 253 return ErrorStatus(ENOMEM); 254 } 255 256 // Register the newly allocated buffer's channel_id into the first empty 257 // buffer slot. 258 size_t slot = 0; 259 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { 260 if (buffers_[slot].expired()) 261 break; 262 } 263 if (slot == BufferHubRPC::kMaxQueueCapacity) { 264 ALOGE( 265 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new " 266 "buffer allocation."); 267 return ErrorStatus(E2BIG); 268 } 269 270 buffers_[slot] = producer_channel; 271 capacity_++; 272 273 // Notify each consumer channel about the new buffer. 274 for (auto* consumer_channel : consumer_channels_) { 275 ALOGD( 276 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new " 277 "buffer, buffer_id=%d", 278 buffer_id); 279 consumer_channel->RegisterNewBuffer(producer_channel, slot); 280 } 281 282 return {{std::move(buffer_handle), slot}}; 283 } 284 285 Status<size_t> ProducerQueueChannel::OnProducerQueueInsertBuffer( 286 pdx::Message& message, int buffer_cid) { 287 ATRACE_NAME("ProducerQueueChannel::InsertBuffer"); 288 ALOGD_IF(TRACE, 289 "ProducerQueueChannel::InsertBuffer: channel_id=%d, buffer_cid=%d", 290 channel_id(), buffer_cid); 291 292 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) { 293 ALOGE("ProducerQueueChannel::InsertBuffer: reaches kMaxQueueCapacity."); 294 return ErrorStatus(E2BIG); 295 } 296 auto producer_channel = std::static_pointer_cast<ProducerChannel>( 297 service()->GetChannel(buffer_cid)); 298 if (producer_channel == nullptr || 299 producer_channel->channel_type() != BufferHubChannel::kProducerType) { 300 // Rejects the request if the requested buffer channel is invalid and/or 301 // it's not a ProducerChannel. 302 ALOGE( 303 "ProducerQueueChannel::InsertBuffer: Invalid buffer_cid=%d, " 304 "producer_buffer=0x%p, channel_type=%d.", 305 buffer_cid, producer_channel.get(), 306 producer_channel == nullptr ? -1 : producer_channel->channel_type()); 307 return ErrorStatus(EINVAL); 308 } 309 if (producer_channel->GetActiveProcessId() != message.GetProcessId()) { 310 // Rejects the request if the requested buffer channel is not currently 311 // connected to the caller this is IPC request. This effectively prevents 312 // fake buffer_cid from being injected. 313 ALOGE( 314 "ProducerQueueChannel::InsertBuffer: Requested buffer channel " 315 "(buffer_cid=%d) is not connected to the calling process (pid=%d). " 316 "It's connected to a different process (pid=%d).", 317 buffer_cid, message.GetProcessId(), 318 producer_channel->GetActiveProcessId()); 319 return ErrorStatus(EINVAL); 320 } 321 uint64_t buffer_state = producer_channel->buffer_state(); 322 // TODO(b/112007999) add an atomic variable in metadata header in shared 323 // memory to indicate which client is the last producer of the buffer. 324 // Currently, the first client is the only producer to the buffer. 325 // Thus, it checks whether the first client gains the buffer below. 326 if (!BufferHubDefs::isClientGained(buffer_state, 327 BufferHubDefs::kFirstClientBitMask)) { 328 // Rejects the request if the requested buffer is not in Gained state. 329 ALOGE( 330 "ProducerQueueChannel::InsertBuffer: The buffer (cid=%d, " 331 "state=0x%" PRIx64 ") is not in gained state.", 332 buffer_cid, buffer_state); 333 return ErrorStatus(EINVAL); 334 } 335 336 // Register the to-be-inserted buffer's channel_id into the first empty 337 // buffer slot. 338 size_t slot = 0; 339 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { 340 if (buffers_[slot].expired()) 341 break; 342 } 343 if (slot == BufferHubRPC::kMaxQueueCapacity) { 344 ALOGE( 345 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new " 346 "buffer allocation."); 347 return ErrorStatus(E2BIG); 348 } 349 350 buffers_[slot] = producer_channel; 351 capacity_++; 352 353 // Notify each consumer channel about the new buffer. 354 for (auto* consumer_channel : consumer_channels_) { 355 ALOGD( 356 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new " 357 "buffer, buffer_cid=%d", 358 buffer_cid); 359 consumer_channel->RegisterNewBuffer(producer_channel, slot); 360 } 361 362 return {slot}; 363 } 364 365 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer( 366 Message& /*message*/, size_t slot) { 367 if (buffers_[slot].expired()) { 368 ALOGE( 369 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove " 370 "an invalid buffer producer at slot %zu", 371 slot); 372 return ErrorStatus(EINVAL); 373 } 374 375 if (capacity_ == 0) { 376 ALOGE( 377 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a " 378 "buffer producer while the queue's capacity is already zero."); 379 return ErrorStatus(EINVAL); 380 } 381 382 buffers_[slot].reset(); 383 capacity_--; 384 return {}; 385 } 386 387 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) { 388 consumer_channels_.push_back(channel); 389 } 390 391 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) { 392 consumer_channels_.erase( 393 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel)); 394 } 395 396 } // namespace dvr 397 } // namespace android 398