1 #include "consumer_channel.h" 2 3 #include <log/log.h> 4 #include <utils/Trace.h> 5 6 #include <thread> 7 8 #include <private/dvr/bufferhub_rpc.h> 9 #include "producer_channel.h" 10 11 using android::pdx::ErrorStatus; 12 using android::pdx::BorrowedHandle; 13 using android::pdx::Channel; 14 using android::pdx::Message; 15 using android::pdx::Status; 16 using android::pdx::rpc::DispatchRemoteMethod; 17 18 namespace android { 19 namespace dvr { 20 21 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, 22 int channel_id, 23 const std::shared_ptr<Channel> producer) 24 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), 25 handled_(true), 26 ignored_(false), 27 producer_(producer) { 28 GetProducer()->AddConsumer(this); 29 } 30 31 ConsumerChannel::~ConsumerChannel() { 32 ALOGD_IF(TRACE, 33 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d", 34 channel_id(), buffer_id()); 35 36 if (auto producer = GetProducer()) { 37 if (!handled_) // Producer is waiting for our Release. 38 producer->OnConsumerIgnored(); 39 producer->RemoveConsumer(this); 40 } 41 } 42 43 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { 44 BufferHubChannel::BufferInfo info; 45 if (auto producer = GetProducer()) { 46 // If producer has not hung up, copy most buffer info from the producer. 47 info = producer->GetBufferInfo(); 48 } 49 info.id = buffer_id(); 50 return info; 51 } 52 53 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { 54 return std::static_pointer_cast<ProducerChannel>(producer_.lock()); 55 } 56 57 void ConsumerChannel::HandleImpulse(Message& message) { 58 ATRACE_NAME("ConsumerChannel::HandleImpulse"); 59 switch (message.GetOp()) { 60 case BufferHubRPC::ConsumerRelease::Opcode: 61 OnConsumerRelease(message, {}); 62 break; 63 } 64 } 65 66 bool ConsumerChannel::HandleMessage(Message& message) { 67 ATRACE_NAME("ConsumerChannel::HandleMessage"); 68 auto producer = GetProducer(); 69 if (!producer) 70 REPLY_ERROR_RETURN(message, EPIPE, true); 71 72 switch (message.GetOp()) { 73 case BufferHubRPC::GetBuffer::Opcode: 74 DispatchRemoteMethod<BufferHubRPC::GetBuffer>( 75 *producer, &ProducerChannel::OnGetBuffer, message); 76 return true; 77 78 case BufferHubRPC::NewConsumer::Opcode: 79 DispatchRemoteMethod<BufferHubRPC::NewConsumer>( 80 *producer, &ProducerChannel::OnNewConsumer, message); 81 return true; 82 83 case BufferHubRPC::ConsumerAcquire::Opcode: 84 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>( 85 *this, &ConsumerChannel::OnConsumerAcquire, message); 86 return true; 87 88 case BufferHubRPC::ConsumerRelease::Opcode: 89 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>( 90 *this, &ConsumerChannel::OnConsumerRelease, message); 91 return true; 92 93 case BufferHubRPC::ConsumerSetIgnore::Opcode: 94 DispatchRemoteMethod<BufferHubRPC::ConsumerSetIgnore>( 95 *this, &ConsumerChannel::OnConsumerSetIgnore, message); 96 return true; 97 98 default: 99 return false; 100 } 101 } 102 103 Status<std::pair<BorrowedFence, ConsumerChannel::MetaData>> 104 ConsumerChannel::OnConsumerAcquire(Message& message, 105 std::size_t metadata_size) { 106 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); 107 auto producer = GetProducer(); 108 if (!producer) 109 return ErrorStatus(EPIPE); 110 111 if (ignored_ || handled_) { 112 ALOGE( 113 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: " 114 "ignored=%d handled=%d channel_id=%d buffer_id=%d", 115 ignored_, handled_, message.GetChannelId(), producer->buffer_id()); 116 return ErrorStatus(EBUSY); 117 } else { 118 ClearAvailable(); 119 return producer->OnConsumerAcquire(message, metadata_size); 120 } 121 } 122 123 Status<void> ConsumerChannel::OnConsumerRelease(Message& message, 124 LocalFence release_fence) { 125 ATRACE_NAME("ConsumerChannel::OnConsumerRelease"); 126 auto producer = GetProducer(); 127 if (!producer) 128 return ErrorStatus(EPIPE); 129 130 if (ignored_ || handled_) { 131 ALOGE( 132 "ConsumerChannel::OnConsumerRelease: Release when not acquired: " 133 "ignored=%d handled=%d channel_id=%d buffer_id=%d", 134 ignored_, handled_, message.GetChannelId(), producer->buffer_id()); 135 return ErrorStatus(EBUSY); 136 } else { 137 ClearAvailable(); 138 auto status = 139 producer->OnConsumerRelease(message, std::move(release_fence)); 140 handled_ = !!status; 141 return status; 142 } 143 } 144 145 Status<void> ConsumerChannel::OnConsumerSetIgnore(Message&, bool ignored) { 146 ATRACE_NAME("ConsumerChannel::OnConsumerSetIgnore"); 147 auto producer = GetProducer(); 148 if (!producer) 149 return ErrorStatus(EPIPE); 150 151 ignored_ = ignored; 152 if (ignored_ && !handled_) { 153 // Update the producer if ignore is set after the consumer acquires the 154 // buffer. 155 ClearAvailable(); 156 producer->OnConsumerIgnored(); 157 handled_ = false; 158 } 159 160 return {}; 161 } 162 163 bool ConsumerChannel::OnProducerPosted() { 164 if (ignored_) { 165 handled_ = true; 166 return false; 167 } else { 168 handled_ = false; 169 SignalAvailable(); 170 return true; 171 } 172 } 173 174 void ConsumerChannel::OnProducerClosed() { 175 producer_.reset(); 176 Hangup(); 177 } 178 179 } // namespace dvr 180 } // namespace android 181