1 #include <pdx/channel_handle.h> 2 #include <private/dvr/consumer_queue_channel.h> 3 #include <private/dvr/producer_channel.h> 4 5 using android::pdx::ErrorStatus; 6 using android::pdx::RemoteChannelHandle; 7 using android::pdx::Status; 8 using android::pdx::rpc::DispatchRemoteMethod; 9 using android::pdx::rpc::RemoteMethodError; 10 11 namespace android { 12 namespace dvr { 13 14 ConsumerQueueChannel::ConsumerQueueChannel( 15 BufferHubService* service, int buffer_id, int channel_id, 16 const std::shared_ptr<Channel>& producer, bool silent) 17 : BufferHubChannel(service, buffer_id, channel_id, kConsumerQueueType), 18 producer_(producer), 19 capacity_(0), 20 silent_(silent) { 21 GetProducer()->AddConsumer(this); 22 } 23 24 ConsumerQueueChannel::~ConsumerQueueChannel() { 25 ALOGD_IF(TRACE, "ConsumerQueueChannel::~ConsumerQueueChannel: channel_id=%d", 26 channel_id()); 27 28 if (auto producer = GetProducer()) { 29 producer->RemoveConsumer(this); 30 } 31 } 32 33 bool ConsumerQueueChannel::HandleMessage(Message& message) { 34 ATRACE_NAME("ConsumerQueueChannel::HandleMessage"); 35 auto producer = GetProducer(); 36 if (!producer) { 37 RemoteMethodError(message, EPIPE); 38 return true; 39 } 40 41 switch (message.GetOp()) { 42 case BufferHubRPC::CreateConsumerQueue::Opcode: 43 DispatchRemoteMethod<BufferHubRPC::CreateConsumerQueue>( 44 *producer, &ProducerQueueChannel::OnCreateConsumerQueue, message); 45 return true; 46 47 case BufferHubRPC::GetQueueInfo::Opcode: 48 DispatchRemoteMethod<BufferHubRPC::GetQueueInfo>( 49 *producer, &ProducerQueueChannel::OnGetQueueInfo, message); 50 return true; 51 52 case BufferHubRPC::ConsumerQueueImportBuffers::Opcode: 53 DispatchRemoteMethod<BufferHubRPC::ConsumerQueueImportBuffers>( 54 *this, &ConsumerQueueChannel::OnConsumerQueueImportBuffers, message); 55 return true; 56 57 default: 58 return false; 59 } 60 } 61 62 std::shared_ptr<ProducerQueueChannel> ConsumerQueueChannel::GetProducer() 63 const { 64 return std::static_pointer_cast<ProducerQueueChannel>(producer_.lock()); 65 } 66 67 void ConsumerQueueChannel::HandleImpulse(Message& /* message */) { 68 ATRACE_NAME("ConsumerQueueChannel::HandleImpulse"); 69 } 70 71 BufferHubChannel::BufferInfo ConsumerQueueChannel::GetBufferInfo() const { 72 BufferHubChannel::BufferInfo info; 73 if (auto producer = GetProducer()) { 74 // If producer has not hung up, copy most buffer info from the producer. 75 info = producer->GetBufferInfo(); 76 } 77 info.id = buffer_id(); 78 info.capacity = capacity_; 79 return info; 80 } 81 82 void ConsumerQueueChannel::RegisterNewBuffer( 83 const std::shared_ptr<ProducerChannel>& producer_channel, 84 size_t producer_slot) { 85 ALOGD_IF(TRACE, "%s: queue_id=%d buffer_id=%d slot=%zu silent=%d", 86 __FUNCTION__, buffer_id(), producer_channel->buffer_id(), 87 producer_slot, silent_); 88 // Only register buffers if the queue is not silent. 89 if (silent_) { 90 return; 91 } 92 93 auto status = producer_channel->CreateConsumerStateMask(); 94 if (!status.ok()) { 95 ALOGE("%s: Failed to create consumer state mask: %s", __FUNCTION__, 96 status.GetErrorMessage().c_str()); 97 return; 98 } 99 uint64_t consumer_state_mask = status.get(); 100 101 pending_buffer_slots_.emplace(producer_channel, producer_slot, 102 consumer_state_mask); 103 // Signal the client that there is new buffer available. 104 SignalAvailable(); 105 } 106 107 Status<std::vector<std::pair<RemoteChannelHandle, size_t>>> 108 ConsumerQueueChannel::OnConsumerQueueImportBuffers(Message& message) { 109 std::vector<std::pair<RemoteChannelHandle, size_t>> buffer_handles; 110 ATRACE_NAME(__FUNCTION__); 111 ALOGD_IF(TRACE, "%s: pending_buffer_slots=%zu", __FUNCTION__, 112 pending_buffer_slots_.size()); 113 114 // Indicate this is a silent queue that will not import buffers. 115 if (silent_) 116 return ErrorStatus(EBADR); 117 118 while (!pending_buffer_slots_.empty()) { 119 auto producer_channel = 120 pending_buffer_slots_.front().producer_channel.lock(); 121 size_t producer_slot = pending_buffer_slots_.front().producer_slot; 122 uint64_t consumer_state_mask = 123 pending_buffer_slots_.front().consumer_state_mask; 124 pending_buffer_slots_.pop(); 125 126 // It's possible that the producer channel has expired. When this occurs, 127 // ignore the producer channel. 128 if (producer_channel == nullptr) { 129 ALOGW("%s: producer channel has already been expired.", __FUNCTION__); 130 continue; 131 } 132 133 auto status = 134 producer_channel->CreateConsumer(message, consumer_state_mask); 135 136 // If no buffers are imported successfully, clear available and return an 137 // error. Otherwise, return all consumer handles already imported 138 // successfully, but keep available bits on, so that the client can retry 139 // importing remaining consumer buffers. 140 if (!status) { 141 ALOGE("%s: Failed create consumer: %s", __FUNCTION__, 142 status.GetErrorMessage().c_str()); 143 if (buffer_handles.empty()) { 144 ClearAvailable(); 145 return status.error_status(); 146 } else { 147 return {std::move(buffer_handles)}; 148 } 149 } 150 151 buffer_handles.emplace_back(status.take(), producer_slot); 152 } 153 154 ClearAvailable(); 155 return {std::move(buffer_handles)}; 156 } 157 158 void ConsumerQueueChannel::OnProducerClosed() { 159 ALOGD_IF(TRACE, "ConsumerQueueChannel::OnProducerClosed: queue_id=%d", 160 buffer_id()); 161 producer_.reset(); 162 Hangup(); 163 } 164 165 } // namespace dvr 166 } // namespace android 167