Home | History | Annotate | Download | only in producer
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/tracing/ipc/producer/producer_ipc_client_impl.h"
     18 
     19 #include <inttypes.h>
     20 #include <string.h>
     21 
     22 #include "perfetto/base/task_runner.h"
     23 #include "perfetto/ipc/client.h"
     24 #include "perfetto/tracing/core/commit_data_request.h"
     25 #include "perfetto/tracing/core/data_source_config.h"
     26 #include "perfetto/tracing/core/data_source_descriptor.h"
     27 #include "perfetto/tracing/core/producer.h"
     28 #include "perfetto/tracing/core/shared_memory_arbiter.h"
     29 #include "perfetto/tracing/core/trace_config.h"
     30 #include "perfetto/tracing/core/trace_writer.h"
     31 #include "src/tracing/ipc/posix_shared_memory.h"
     32 
     33 // TODO(fmayer): think to what happens when ProducerIPCClientImpl gets destroyed
     34 // w.r.t. the Producer pointer. Also think to lifetime of the Producer* during
     35 // the callbacks.
     36 
     37 namespace perfetto {
     38 
     39 // static. (Declared in include/tracing/ipc/producer_ipc_client.h).
     40 std::unique_ptr<TracingService::ProducerEndpoint> ProducerIPCClient::Connect(
     41     const char* service_sock_name,
     42     Producer* producer,
     43     const std::string& producer_name,
     44     base::TaskRunner* task_runner,
     45     TracingService::ProducerSMBScrapingMode smb_scraping_mode) {
     46   return std::unique_ptr<TracingService::ProducerEndpoint>(
     47       new ProducerIPCClientImpl(service_sock_name, producer, producer_name,
     48                                 task_runner, smb_scraping_mode));
     49 }
     50 
     51 ProducerIPCClientImpl::ProducerIPCClientImpl(
     52     const char* service_sock_name,
     53     Producer* producer,
     54     const std::string& producer_name,
     55     base::TaskRunner* task_runner,
     56     TracingService::ProducerSMBScrapingMode smb_scraping_mode)
     57     : producer_(producer),
     58       task_runner_(task_runner),
     59       ipc_channel_(ipc::Client::CreateInstance(service_sock_name, task_runner)),
     60       producer_port_(this /* event_listener */),
     61       name_(producer_name),
     62       smb_scraping_mode_(smb_scraping_mode) {
     63   ipc_channel_->BindService(producer_port_.GetWeakPtr());
     64   PERFETTO_DCHECK_THREAD(thread_checker_);
     65 }
     66 
     67 ProducerIPCClientImpl::~ProducerIPCClientImpl() = default;
     68 
     69 // Called by the IPC layer if the BindService() succeeds.
     70 void ProducerIPCClientImpl::OnConnect() {
     71   PERFETTO_DCHECK_THREAD(thread_checker_);
     72   connected_ = true;
     73 
     74   // The IPC layer guarantees that any outstanding callback will be dropped on
     75   // the floor if producer_port_ is destroyed between the request and the reply.
     76   // Binding |this| is hence safe.
     77   ipc::Deferred<protos::InitializeConnectionResponse> on_init;
     78   on_init.Bind(
     79       [this](ipc::AsyncResult<protos::InitializeConnectionResponse> resp) {
     80         OnConnectionInitialized(resp.success());
     81       });
     82   protos::InitializeConnectionRequest req;
     83   req.set_producer_name(name_);
     84   switch (smb_scraping_mode_) {
     85     case TracingService::ProducerSMBScrapingMode::kDefault:
     86       // No need to set the mode, it defaults to use the service default if
     87       // unspecified.
     88       break;
     89     case TracingService::ProducerSMBScrapingMode::kEnabled:
     90       req.set_smb_scraping_mode(
     91           protos::InitializeConnectionRequest::SMB_SCRAPING_ENABLED);
     92       break;
     93     case TracingService::ProducerSMBScrapingMode::kDisabled:
     94       req.set_smb_scraping_mode(
     95           protos::InitializeConnectionRequest::SMB_SCRAPING_DISABLED);
     96       break;
     97   }
     98   producer_port_.InitializeConnection(req, std::move(on_init));
     99 
    100   // Create the back channel to receive commands from the Service.
    101   ipc::Deferred<protos::GetAsyncCommandResponse> on_cmd;
    102   on_cmd.Bind([this](ipc::AsyncResult<protos::GetAsyncCommandResponse> resp) {
    103     if (!resp)
    104       return;  // The IPC channel was closed and |resp| was auto-rejected.
    105     OnServiceRequest(*resp);
    106   });
    107   producer_port_.GetAsyncCommand(protos::GetAsyncCommandRequest(),
    108                                  std::move(on_cmd));
    109 }
    110 
    111 void ProducerIPCClientImpl::OnDisconnect() {
    112   PERFETTO_DCHECK_THREAD(thread_checker_);
    113   PERFETTO_DLOG("Tracing service connection failure");
    114   connected_ = false;
    115   producer_->OnDisconnect();
    116   data_sources_setup_.clear();
    117 }
    118 
    119 void ProducerIPCClientImpl::OnConnectionInitialized(bool connection_succeeded) {
    120   PERFETTO_DCHECK_THREAD(thread_checker_);
    121   // If connection_succeeded == false, the OnDisconnect() call will follow next
    122   // and there we'll notify the |producer_|. TODO: add a test for this.
    123   if (!connection_succeeded)
    124     return;
    125   producer_->OnConnect();
    126 }
    127 
    128 void ProducerIPCClientImpl::OnServiceRequest(
    129     const protos::GetAsyncCommandResponse& cmd) {
    130   PERFETTO_DCHECK_THREAD(thread_checker_);
    131 
    132   // This message is sent only when connecting to a service running Android Q+.
    133   // See comment below in kStartDataSource.
    134   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupDataSource) {
    135     const auto& req = cmd.setup_data_source();
    136     const DataSourceInstanceID dsid = req.new_instance_id();
    137     DataSourceConfig cfg;
    138     cfg.FromProto(req.config());
    139     data_sources_setup_.insert(dsid);
    140     producer_->SetupDataSource(dsid, cfg);
    141     return;
    142   }
    143 
    144   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStartDataSource) {
    145     const auto& req = cmd.start_data_source();
    146     const DataSourceInstanceID dsid = req.new_instance_id();
    147     DataSourceConfig cfg;
    148     cfg.FromProto(req.config());
    149     if (!data_sources_setup_.count(dsid)) {
    150       // When connecting with an older (Android P) service, the service will not
    151       // send a SetupDataSource message. We synthesize it here in that case.
    152       producer_->SetupDataSource(dsid, cfg);
    153     }
    154     producer_->StartDataSource(dsid, cfg);
    155     return;
    156   }
    157 
    158   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kStopDataSource) {
    159     const DataSourceInstanceID dsid = cmd.stop_data_source().instance_id();
    160     producer_->StopDataSource(dsid);
    161     data_sources_setup_.erase(dsid);
    162     return;
    163   }
    164 
    165   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kSetupTracing) {
    166     base::ScopedFile shmem_fd = ipc_channel_->TakeReceivedFD();
    167     PERFETTO_CHECK(shmem_fd);
    168 
    169     // TODO(primiano): handle mmap failure in case of OOM.
    170     shared_memory_ = PosixSharedMemory::AttachToFd(std::move(shmem_fd));
    171     shared_buffer_page_size_kb_ =
    172         cmd.setup_tracing().shared_buffer_page_size_kb();
    173     shared_memory_arbiter_ = SharedMemoryArbiter::CreateInstance(
    174         shared_memory_.get(), shared_buffer_page_size_kb_ * 1024, this,
    175         task_runner_);
    176     producer_->OnTracingSetup();
    177     return;
    178   }
    179 
    180   if (cmd.cmd_case() == protos::GetAsyncCommandResponse::kFlush) {
    181     // This cast boilerplate is required only because protobuf uses its own
    182     // uint64 and not stdint's uint64_t. On some 64 bit archs they differ on the
    183     // type (long vs long long) even though they have the same size.
    184     const auto* data_source_ids = cmd.flush().data_source_ids().data();
    185     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
    186                   "data_source_ids should be 64-bit");
    187     producer_->Flush(
    188         cmd.flush().request_id(),
    189         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
    190         static_cast<size_t>(cmd.flush().data_source_ids().size()));
    191     return;
    192   }
    193 
    194   if (cmd.cmd_case() ==
    195       protos::GetAsyncCommandResponse::kClearIncrementalState) {
    196     const auto* data_source_ids =
    197         cmd.clear_incremental_state().data_source_ids().data();
    198     static_assert(sizeof(data_source_ids[0]) == sizeof(DataSourceInstanceID),
    199                   "data_source_ids should be 64-bit");
    200     producer_->ClearIncrementalState(
    201         reinterpret_cast<const DataSourceInstanceID*>(data_source_ids),
    202         static_cast<size_t>(
    203             cmd.clear_incremental_state().data_source_ids().size()));
    204     return;
    205   }
    206 
    207   PERFETTO_DFATAL("Unknown async request %d received from tracing service",
    208                   cmd.cmd_case());
    209 }
    210 
    211 void ProducerIPCClientImpl::RegisterDataSource(
    212     const DataSourceDescriptor& descriptor) {
    213   PERFETTO_DCHECK_THREAD(thread_checker_);
    214   if (!connected_) {
    215     PERFETTO_DLOG(
    216         "Cannot RegisterDataSource(), not connected to tracing service");
    217   }
    218   protos::RegisterDataSourceRequest req;
    219   descriptor.ToProto(req.mutable_data_source_descriptor());
    220   ipc::Deferred<protos::RegisterDataSourceResponse> async_response;
    221   async_response.Bind(
    222       [](ipc::AsyncResult<protos::RegisterDataSourceResponse> response) {
    223         if (!response)
    224           PERFETTO_DLOG("RegisterDataSource() failed: connection reset");
    225       });
    226   producer_port_.RegisterDataSource(req, std::move(async_response));
    227 }
    228 
    229 void ProducerIPCClientImpl::UnregisterDataSource(const std::string& name) {
    230   PERFETTO_DCHECK_THREAD(thread_checker_);
    231   if (!connected_) {
    232     PERFETTO_DLOG(
    233         "Cannot UnregisterDataSource(), not connected to tracing service");
    234     return;
    235   }
    236   protos::UnregisterDataSourceRequest req;
    237   req.set_data_source_name(name);
    238   producer_port_.UnregisterDataSource(
    239       req, ipc::Deferred<protos::UnregisterDataSourceResponse>());
    240 }
    241 
    242 void ProducerIPCClientImpl::RegisterTraceWriter(uint32_t writer_id,
    243                                                 uint32_t target_buffer) {
    244   PERFETTO_DCHECK_THREAD(thread_checker_);
    245   if (!connected_) {
    246     PERFETTO_DLOG(
    247         "Cannot RegisterTraceWriter(), not connected to tracing service");
    248     return;
    249   }
    250   protos::RegisterTraceWriterRequest req;
    251   req.set_trace_writer_id(writer_id);
    252   req.set_target_buffer(target_buffer);
    253   producer_port_.RegisterTraceWriter(
    254       req, ipc::Deferred<protos::RegisterTraceWriterResponse>());
    255 }
    256 
    257 void ProducerIPCClientImpl::UnregisterTraceWriter(uint32_t writer_id) {
    258   PERFETTO_DCHECK_THREAD(thread_checker_);
    259   if (!connected_) {
    260     PERFETTO_DLOG(
    261         "Cannot UnregisterTraceWriter(), not connected to tracing service");
    262     return;
    263   }
    264   protos::UnregisterTraceWriterRequest req;
    265   req.set_trace_writer_id(writer_id);
    266   producer_port_.UnregisterTraceWriter(
    267       req, ipc::Deferred<protos::UnregisterTraceWriterResponse>());
    268 }
    269 
    270 void ProducerIPCClientImpl::CommitData(const CommitDataRequest& req,
    271                                        CommitDataCallback callback) {
    272   PERFETTO_DCHECK_THREAD(thread_checker_);
    273   if (!connected_) {
    274     PERFETTO_DLOG("Cannot CommitData(), not connected to tracing service");
    275     return;
    276   }
    277   protos::CommitDataRequest proto_req;
    278   req.ToProto(&proto_req);
    279   ipc::Deferred<protos::CommitDataResponse> async_response;
    280   // TODO(primiano): add a test that destroys ProducerIPCClientImpl soon after
    281   // this call and checks that the callback is dropped.
    282   if (callback) {
    283     async_response.Bind(
    284         [callback](ipc::AsyncResult<protos::CommitDataResponse> response) {
    285           if (!response) {
    286             PERFETTO_DLOG("CommitData() failed: connection reset");
    287             return;
    288           }
    289           callback();
    290         });
    291   }
    292   producer_port_.CommitData(proto_req, std::move(async_response));
    293 }
    294 
    295 void ProducerIPCClientImpl::NotifyDataSourceStarted(DataSourceInstanceID id) {
    296   PERFETTO_DCHECK_THREAD(thread_checker_);
    297   if (!connected_) {
    298     PERFETTO_DLOG(
    299         "Cannot NotifyDataSourceStarted(), not connected to tracing service");
    300     return;
    301   }
    302   protos::NotifyDataSourceStartedRequest req;
    303   req.set_data_source_id(id);
    304   producer_port_.NotifyDataSourceStarted(
    305       req, ipc::Deferred<protos::NotifyDataSourceStartedResponse>());
    306 }
    307 
    308 void ProducerIPCClientImpl::NotifyDataSourceStopped(DataSourceInstanceID id) {
    309   PERFETTO_DCHECK_THREAD(thread_checker_);
    310   if (!connected_) {
    311     PERFETTO_DLOG(
    312         "Cannot NotifyDataSourceStopped(), not connected to tracing service");
    313     return;
    314   }
    315   protos::NotifyDataSourceStoppedRequest req;
    316   req.set_data_source_id(id);
    317   producer_port_.NotifyDataSourceStopped(
    318       req, ipc::Deferred<protos::NotifyDataSourceStoppedResponse>());
    319 }
    320 
    321 void ProducerIPCClientImpl::ActivateTriggers(
    322     const std::vector<std::string>& triggers) {
    323   PERFETTO_DCHECK_THREAD(thread_checker_);
    324   if (!connected_) {
    325     PERFETTO_DLOG(
    326         "Cannot ActivateTriggers(), not connected to tracing service");
    327     return;
    328   }
    329   protos::ActivateTriggersRequest proto_req;
    330   for (const auto& name : triggers) {
    331     *proto_req.add_trigger_names() = name;
    332   }
    333   producer_port_.ActivateTriggers(
    334       proto_req, ipc::Deferred<protos::ActivateTriggersResponse>());
    335 }
    336 
    337 std::unique_ptr<TraceWriter> ProducerIPCClientImpl::CreateTraceWriter(
    338     BufferID target_buffer) {
    339   // This method can be called by different threads. |shared_memory_arbiter_| is
    340   // thread-safe but be aware of accessing any other state in this function.
    341   return shared_memory_arbiter_->CreateTraceWriter(target_buffer);
    342 }
    343 
    344 SharedMemoryArbiter* ProducerIPCClientImpl::GetInProcessShmemArbiter() {
    345   PERFETTO_DLOG("Cannot GetInProcessShmemArbiter() via the IPC layer.");
    346   return nullptr;
    347 }
    348 
    349 void ProducerIPCClientImpl::NotifyFlushComplete(FlushRequestID req_id) {
    350   return shared_memory_arbiter_->NotifyFlushComplete(req_id);
    351 }
    352 
    353 SharedMemory* ProducerIPCClientImpl::shared_memory() const {
    354   return shared_memory_.get();
    355 }
    356 
    357 size_t ProducerIPCClientImpl::shared_buffer_page_size_kb() const {
    358   return shared_buffer_page_size_kb_;
    359 }
    360 
    361 }  // namespace perfetto
    362