1 #include "producer_queue_channel.h" 2 3 #include <inttypes.h> 4 5 #include "consumer_queue_channel.h" 6 #include "producer_channel.h" 7 8 using android::pdx::ErrorStatus; 9 using android::pdx::Message; 10 using android::pdx::RemoteChannelHandle; 11 using android::pdx::Status; 12 using android::pdx::rpc::DispatchRemoteMethod; 13 14 namespace android { 15 namespace dvr { 16 17 ProducerQueueChannel::ProducerQueueChannel(BufferHubService* service, 18 int channel_id, 19 const ProducerQueueConfig& config, 20 const UsagePolicy& usage_policy, 21 int* error) 22 : BufferHubChannel(service, channel_id, channel_id, kProducerQueueType), 23 config_(config), 24 usage_policy_(usage_policy), 25 capacity_(0) { 26 *error = 0; 27 } 28 29 ProducerQueueChannel::~ProducerQueueChannel() { 30 ALOGD_IF(TRACE, "ProducerQueueChannel::~ProducerQueueChannel: queue_id=%d", 31 buffer_id()); 32 for (auto* consumer : consumer_channels_) 33 consumer->OnProducerClosed(); 34 } 35 36 /* static */ 37 Status<std::shared_ptr<ProducerQueueChannel>> ProducerQueueChannel::Create( 38 BufferHubService* service, int channel_id, 39 const ProducerQueueConfig& config, const UsagePolicy& usage_policy) { 40 // Configuration between |usage_deny_set_mask| and |usage_deny_clear_mask| 41 // should be mutually exclusive. 42 if ((usage_policy.usage_deny_set_mask & usage_policy.usage_deny_clear_mask)) { 43 ALOGE( 44 "BufferHubService::OnCreateProducerQueue: illegal usage mask " 45 "configuration: usage_deny_set_mask=%" PRIx64 46 " usage_deny_clear_mask=%" PRIx64, 47 usage_policy.usage_deny_set_mask, usage_policy.usage_deny_clear_mask); 48 return ErrorStatus(EINVAL); 49 } 50 51 int error = 0; 52 std::shared_ptr<ProducerQueueChannel> producer(new ProducerQueueChannel( 53 service, channel_id, config, usage_policy, &error)); 54 if (error < 0) 55 return ErrorStatus(-error); 56 else 57 return {std::move(producer)}; 58 } 59 60 bool ProducerQueueChannel::HandleMessage(Message& message) { 61 ATRACE_NAME("ProducerQueueChannel::HandleMessage"); 62 switch (message.GetOp()) { 63 case BufferHubRPC::CreateConsumerQueue::Opcode: 64 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( 65 *this, &ProducerQueueChannel::OnCreateConsumerQueue, message); 66 return true; 67 68 case BufferHubRPC::GetQueueInfo::Opcode: 69 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( 70 *this, &ProducerQueueChannel::OnGetQueueInfo, message); 71 return true; 72 73 case BufferHubRPC::ProducerQueueAllocateBuffers::Opcode: 74 DispatchRemoteMethod<BufferHubRPC::ProducerQueueAllocateBuffers>( 75 *this, &ProducerQueueChannel::OnProducerQueueAllocateBuffers, 76 message); 77 return true; 78 79 case BufferHubRPC::ProducerQueueRemoveBuffer::Opcode: 80 DispatchRemoteMethod<BufferHubRPC::ProducerQueueRemoveBuffer>( 81 *this, &ProducerQueueChannel::OnProducerQueueRemoveBuffer, message); 82 return true; 83 84 default: 85 return false; 86 } 87 } 88 89 void ProducerQueueChannel::HandleImpulse(Message& /* message */) { 90 ATRACE_NAME("ProducerQueueChannel::HandleImpulse"); 91 } 92 93 BufferHubChannel::BufferInfo ProducerQueueChannel::GetBufferInfo() const { 94 return BufferInfo(channel_id(), consumer_channels_.size(), capacity_, 95 usage_policy_); 96 } 97 98 Status<RemoteChannelHandle> ProducerQueueChannel::OnCreateConsumerQueue( 99 Message& message, bool silent) { 100 ATRACE_NAME("ProducerQueueChannel::OnCreateConsumerQueue"); 101 ALOGD_IF( 102 TRACE, 103 "ProducerQueueChannel::OnCreateConsumerQueue: channel_id=%d slient=%d", 104 channel_id(), silent); 105 106 int channel_id; 107 auto status = message.PushChannel(0, nullptr, &channel_id); 108 if (!status) { 109 ALOGE( 110 "ProducerQueueChannel::OnCreateConsumerQueue: failed to push consumer " 111 "channel: %s", 112 status.GetErrorMessage().c_str()); 113 return ErrorStatus(ENOMEM); 114 } 115 116 auto consumer_queue_channel = std::make_shared<ConsumerQueueChannel>( 117 service(), buffer_id(), channel_id, shared_from_this(), silent); 118 119 // Register the existing buffers with the new consumer queue. 120 for (size_t slot = 0; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { 121 if (auto buffer = buffers_[slot].lock()) 122 consumer_queue_channel->RegisterNewBuffer(buffer, slot); 123 } 124 125 const auto channel_status = 126 service()->SetChannel(channel_id, consumer_queue_channel); 127 if (!channel_status) { 128 ALOGE( 129 "ProducerQueueChannel::OnCreateConsumerQueue: Failed to set channel: " 130 "%s", 131 channel_status.GetErrorMessage().c_str()); 132 return ErrorStatus(ENOMEM); 133 } 134 135 return {status.take()}; 136 } 137 138 Status<QueueInfo> ProducerQueueChannel::OnGetQueueInfo(Message&) { 139 return {{config_, buffer_id()}}; 140 } 141 142 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> 143 ProducerQueueChannel::OnProducerQueueAllocateBuffers( 144 Message& message, uint32_t width, uint32_t height, uint32_t layer_count, 145 uint32_t format, uint64_t usage, size_t buffer_count) { 146 ATRACE_NAME("ProducerQueueChannel::OnProducerQueueAllocateBuffers"); 147 ALOGD_IF(TRACE, 148 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: " 149 "producer_channel_id=%d", 150 channel_id()); 151 152 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; 153 154 // Deny buffer allocation violating preset rules. 155 if (usage & usage_policy_.usage_deny_set_mask) { 156 ALOGE( 157 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64 158 " is not permitted. Violating usage_deny_set_mask, the following bits " 159 "shall not be set: %" PRIx64 ".", 160 usage, usage_policy_.usage_deny_set_mask); 161 return ErrorStatus(EINVAL); 162 } 163 164 if (~usage & usage_policy_.usage_deny_clear_mask) { 165 ALOGE( 166 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: usage: %" PRIx64 167 " is not permitted. Violating usage_deny_clear_mask, the following " 168 "bits must be set: %" PRIx64 ".", 169 usage, usage_policy_.usage_deny_clear_mask); 170 return ErrorStatus(EINVAL); 171 } 172 173 // Force set mask and clear mask. Note that |usage_policy_.usage_set_mask_| 174 // takes precedence and will overwrite |usage_policy_.usage_clear_mask|. 175 uint64_t effective_usage = 176 (usage & ~usage_policy_.usage_clear_mask) | usage_policy_.usage_set_mask; 177 178 for (size_t i = 0; i < buffer_count; i++) { 179 auto status = AllocateBuffer(message, width, height, layer_count, format, 180 effective_usage); 181 if (!status) { 182 ALOGE( 183 "ProducerQueueChannel::OnProducerQueueAllocateBuffers: Failed to " 184 "allocate new buffer."); 185 return ErrorStatus(status.error()); 186 } 187 buffer_handles.push_back(status.take()); 188 } 189 190 return {std::move(buffer_handles)}; 191 } 192 193 Status<std::pair<RemoteChannelHandle, size_t>> 194 ProducerQueueChannel::AllocateBuffer(Message& message, uint32_t width, 195 uint32_t height, uint32_t layer_count, 196 uint32_t format, uint64_t usage) { 197 ATRACE_NAME("ProducerQueueChannel::AllocateBuffer"); 198 ALOGD_IF(TRACE, 199 "ProducerQueueChannel::AllocateBuffer: producer_channel_id=%d", 200 channel_id()); 201 202 if (capacity_ >= BufferHubRPC::kMaxQueueCapacity) { 203 ALOGE("ProducerQueueChannel::AllocateBuffer: reaches kMaxQueueCapacity."); 204 return ErrorStatus(E2BIG); 205 } 206 207 // Here we are creating a new BufferHubBuffer, initialize the producer 208 // channel, and returning its file handle back to the client. 209 // buffer_id is the id of the producer channel of BufferHubBuffer. 210 int buffer_id; 211 auto status = message.PushChannel(0, nullptr, &buffer_id); 212 213 if (!status) { 214 ALOGE("ProducerQueueChannel::AllocateBuffer: failed to push channel: %s", 215 status.GetErrorMessage().c_str()); 216 return ErrorStatus(status.error()); 217 } 218 219 ALOGD_IF(TRACE, 220 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d width=%u " 221 "height=%u layer_count=%u format=%u usage=%" PRIx64, 222 buffer_id, width, height, layer_count, format, usage); 223 auto buffer_handle = status.take(); 224 225 auto producer_channel_status = 226 ProducerChannel::Create(service(), buffer_id, width, height, layer_count, 227 format, usage, config_.user_metadata_size); 228 if (!producer_channel_status) { 229 ALOGE( 230 "ProducerQueueChannel::AllocateBuffer: Failed to create producer " 231 "buffer: %s", 232 producer_channel_status.GetErrorMessage().c_str()); 233 return ErrorStatus(ENOMEM); 234 } 235 auto producer_channel = producer_channel_status.take(); 236 237 ALOGD_IF( 238 TRACE, 239 "ProducerQueueChannel::AllocateBuffer: buffer_id=%d, buffer_handle=%d", 240 buffer_id, buffer_handle.value()); 241 242 const auto channel_status = 243 service()->SetChannel(buffer_id, producer_channel); 244 if (!channel_status) { 245 ALOGE( 246 "ProducerQueueChannel::AllocateBuffer: failed to set producer channel " 247 "for new BufferHubBuffer: %s", 248 channel_status.GetErrorMessage().c_str()); 249 return ErrorStatus(ENOMEM); 250 } 251 252 // Register the newly allocated buffer's channel_id into the first empty 253 // buffer slot. 254 size_t slot = 0; 255 for (; slot < BufferHubRPC::kMaxQueueCapacity; slot++) { 256 if (buffers_[slot].expired()) 257 break; 258 } 259 if (slot == BufferHubRPC::kMaxQueueCapacity) { 260 ALOGE( 261 "ProducerQueueChannel::AllocateBuffer: Cannot find empty slot for new " 262 "buffer allocation."); 263 return ErrorStatus(E2BIG); 264 } 265 266 buffers_[slot] = producer_channel; 267 capacity_++; 268 269 // Notify each consumer channel about the new buffer. 270 for (auto* consumer_channel : consumer_channels_) { 271 ALOGD( 272 "ProducerQueueChannel::AllocateBuffer: Notified consumer with new " 273 "buffer, buffer_id=%d", 274 buffer_id); 275 consumer_channel->RegisterNewBuffer(producer_channel, slot); 276 } 277 278 return {{std::move(buffer_handle), slot}}; 279 } 280 281 Status<void> ProducerQueueChannel::OnProducerQueueRemoveBuffer( 282 Message& /*message*/, size_t slot) { 283 if (buffers_[slot].expired()) { 284 ALOGE( 285 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove " 286 "an invalid buffer producer at slot %zu", 287 slot); 288 return ErrorStatus(EINVAL); 289 } 290 291 if (capacity_ == 0) { 292 ALOGE( 293 "ProducerQueueChannel::OnProducerQueueRemoveBuffer: trying to remove a " 294 "buffer producer while the queue's capacity is already zero."); 295 return ErrorStatus(EINVAL); 296 } 297 298 buffers_[slot].reset(); 299 capacity_--; 300 return {}; 301 } 302 303 void ProducerQueueChannel::AddConsumer(ConsumerQueueChannel* channel) { 304 consumer_channels_.push_back(channel); 305 } 306 307 void ProducerQueueChannel::RemoveConsumer(ConsumerQueueChannel* channel) { 308 consumer_channels_.erase( 309 std::find(consumer_channels_.begin(), consumer_channels_.end(), channel)); 310 } 311 312 } // namespace dvr 313 } // namespace android 314