1 #include <thread> 2 3 #include <log/log.h> 4 #include <private/dvr/bufferhub_rpc.h> 5 #include <private/dvr/consumer_channel.h> 6 #include <private/dvr/producer_channel.h> 7 #include <utils/Trace.h> 8 9 using android::pdx::BorrowedHandle; 10 using android::pdx::Channel; 11 using android::pdx::ErrorStatus; 12 using android::pdx::Message; 13 using android::pdx::Status; 14 using android::pdx::rpc::DispatchRemoteMethod; 15 16 namespace android { 17 namespace dvr { 18 19 ConsumerChannel::ConsumerChannel(BufferHubService* service, int buffer_id, 20 int channel_id, uint32_t client_state_mask, 21 const std::shared_ptr<Channel> producer) 22 : BufferHubChannel(service, buffer_id, channel_id, kConsumerType), 23 client_state_mask_(client_state_mask), 24 producer_(producer) { 25 GetProducer()->AddConsumer(this); 26 } 27 28 ConsumerChannel::~ConsumerChannel() { 29 ALOGD_IF(TRACE, 30 "ConsumerChannel::~ConsumerChannel: channel_id=%d buffer_id=%d", 31 channel_id(), buffer_id()); 32 33 if (auto producer = GetProducer()) { 34 producer->RemoveConsumer(this); 35 } 36 } 37 38 BufferHubChannel::BufferInfo ConsumerChannel::GetBufferInfo() const { 39 BufferHubChannel::BufferInfo info; 40 if (auto producer = GetProducer()) { 41 // If producer has not hung up, copy most buffer info from the producer. 42 info = producer->GetBufferInfo(); 43 } else { 44 info.signaled_mask = client_state_mask(); 45 } 46 info.id = buffer_id(); 47 return info; 48 } 49 50 std::shared_ptr<ProducerChannel> ConsumerChannel::GetProducer() const { 51 return std::static_pointer_cast<ProducerChannel>(producer_.lock()); 52 } 53 54 void ConsumerChannel::HandleImpulse(Message& message) { 55 ATRACE_NAME("ConsumerChannel::HandleImpulse"); 56 switch (message.GetOp()) { 57 case BufferHubRPC::ConsumerAcquire::Opcode: 58 OnConsumerAcquire(message); 59 break; 60 case BufferHubRPC::ConsumerRelease::Opcode: 61 OnConsumerRelease(message, {}); 62 break; 63 } 64 } 65 66 bool ConsumerChannel::HandleMessage(Message& message) { 67 ATRACE_NAME("ConsumerChannel::HandleMessage"); 68 auto producer = GetProducer(); 69 if (!producer) 70 REPLY_ERROR_RETURN(message, EPIPE, true); 71 72 switch (message.GetOp()) { 73 case BufferHubRPC::GetBuffer::Opcode: 74 DispatchRemoteMethod<BufferHubRPC::GetBuffer>( 75 *this, &ConsumerChannel::OnGetBuffer, message); 76 return true; 77 78 case BufferHubRPC::NewConsumer::Opcode: 79 DispatchRemoteMethod<BufferHubRPC::NewConsumer>( 80 *producer, &ProducerChannel::OnNewConsumer, message); 81 return true; 82 83 case BufferHubRPC::ConsumerAcquire::Opcode: 84 DispatchRemoteMethod<BufferHubRPC::ConsumerAcquire>( 85 *this, &ConsumerChannel::OnConsumerAcquire, message); 86 return true; 87 88 case BufferHubRPC::ConsumerRelease::Opcode: 89 DispatchRemoteMethod<BufferHubRPC::ConsumerRelease>( 90 *this, &ConsumerChannel::OnConsumerRelease, message); 91 return true; 92 93 default: 94 return false; 95 } 96 } 97 98 Status<BufferDescription<BorrowedHandle>> ConsumerChannel::OnGetBuffer( 99 Message& /*message*/) { 100 ATRACE_NAME("ConsumerChannel::OnGetBuffer"); 101 ALOGD_IF(TRACE, "ConsumerChannel::OnGetBuffer: buffer=%d", buffer_id()); 102 if (auto producer = GetProducer()) { 103 return {producer->GetBuffer(client_state_mask_)}; 104 } else { 105 return ErrorStatus(EPIPE); 106 } 107 } 108 109 Status<LocalFence> ConsumerChannel::OnConsumerAcquire(Message& message) { 110 ATRACE_NAME("ConsumerChannel::OnConsumerAcquire"); 111 auto producer = GetProducer(); 112 if (!producer) 113 return ErrorStatus(EPIPE); 114 115 if (acquired_ || released_) { 116 ALOGE( 117 "ConsumerChannel::OnConsumerAcquire: Acquire when not posted: " 118 "acquired=%d released=%d channel_id=%d buffer_id=%d", 119 acquired_, released_, message.GetChannelId(), producer->buffer_id()); 120 return ErrorStatus(EBUSY); 121 } else { 122 auto status = producer->OnConsumerAcquire(message); 123 if (status) { 124 ClearAvailable(); 125 acquired_ = true; 126 } 127 return status; 128 } 129 } 130 131 Status<void> ConsumerChannel::OnConsumerRelease(Message& message, 132 LocalFence release_fence) { 133 ATRACE_NAME("ConsumerChannel::OnConsumerRelease"); 134 auto producer = GetProducer(); 135 if (!producer) 136 return ErrorStatus(EPIPE); 137 138 if (!acquired_ || released_) { 139 ALOGE( 140 "ConsumerChannel::OnConsumerRelease: Release when not acquired: " 141 "acquired=%d released=%d channel_id=%d buffer_id=%d", 142 acquired_, released_, message.GetChannelId(), producer->buffer_id()); 143 return ErrorStatus(EBUSY); 144 } else { 145 auto status = 146 producer->OnConsumerRelease(message, std::move(release_fence)); 147 if (status) { 148 ClearAvailable(); 149 acquired_ = false; 150 released_ = true; 151 } 152 return status; 153 } 154 } 155 156 void ConsumerChannel::OnProducerGained() { 157 // Clear the signal if exist. There is a possiblity that the signal still 158 // exist in consumer client when producer gains the buffer, e.g. newly added 159 // consumer fail to acquire the previous posted buffer in time. Then, when 160 // producer gains back the buffer, posts the buffer again and signal the 161 // consumer later, there won't be an signal change in eventfd, and thus, 162 // consumer will miss the posted buffer later. Thus, we need to clear the 163 // signal in consumer clients if the signal exist. 164 ClearAvailable(); 165 } 166 167 void ConsumerChannel::OnProducerPosted() { 168 acquired_ = false; 169 released_ = false; 170 SignalAvailable(); 171 } 172 173 void ConsumerChannel::OnProducerClosed() { 174 producer_.reset(); 175 Hangup(); 176 } 177 178 } // namespace dvr 179 } // namespace android 180