Home | History | Annotate | Download | only in test
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "test/fake_producer.h"
     18 
     19 #include <condition_variable>
     20 #include <mutex>
     21 
     22 #include "gtest/gtest.h"
     23 #include "perfetto/base/logging.h"
     24 #include "perfetto/base/time.h"
     25 #include "perfetto/base/utils.h"
     26 #include "perfetto/trace/test_event.pbzero.h"
     27 #include "perfetto/trace/trace_packet.pbzero.h"
     28 #include "perfetto/traced/traced.h"
     29 #include "perfetto/tracing/core/trace_packet.h"
     30 #include "perfetto/tracing/core/trace_writer.h"
     31 
     32 namespace perfetto {
     33 
     34 FakeProducer::FakeProducer(const std::string& name) : name_(name) {}
     35 FakeProducer::~FakeProducer() = default;
     36 
     37 void FakeProducer::Connect(
     38     const char* socket_name,
     39     base::TaskRunner* task_runner,
     40     std::function<void()> on_setup_data_source_instance,
     41     std::function<void()> on_create_data_source_instance) {
     42   PERFETTO_DCHECK_THREAD(thread_checker_);
     43   task_runner_ = task_runner;
     44   endpoint_ = ProducerIPCClient::Connect(
     45       socket_name, this, "android.perfetto.FakeProducer", task_runner);
     46   on_setup_data_source_instance_ = std::move(on_setup_data_source_instance);
     47   on_create_data_source_instance_ = std::move(on_create_data_source_instance);
     48 }
     49 
     50 void FakeProducer::OnConnect() {
     51   PERFETTO_DCHECK_THREAD(thread_checker_);
     52   DataSourceDescriptor descriptor;
     53   descriptor.set_name(name_);
     54   endpoint_->RegisterDataSource(descriptor);
     55 }
     56 
     57 void FakeProducer::OnDisconnect() {
     58   PERFETTO_DCHECK_THREAD(thread_checker_);
     59   FAIL() << "Producer unexpectedly disconnected from the service";
     60 }
     61 
     62 void FakeProducer::SetupDataSource(DataSourceInstanceID,
     63                                    const DataSourceConfig&) {
     64   task_runner_->PostTask(on_setup_data_source_instance_);
     65 }
     66 
     67 void FakeProducer::StartDataSource(DataSourceInstanceID,
     68                                    const DataSourceConfig& source_config) {
     69   PERFETTO_DCHECK_THREAD(thread_checker_);
     70   trace_writer_ = endpoint_->CreateTraceWriter(
     71       static_cast<BufferID>(source_config.target_buffer()));
     72   rnd_engine_ = std::minstd_rand0(source_config.for_testing().seed());
     73   message_count_ = source_config.for_testing().message_count();
     74   message_size_ = source_config.for_testing().message_size();
     75   max_messages_per_second_ =
     76       source_config.for_testing().max_messages_per_second();
     77   if (source_config.for_testing().send_batch_on_register()) {
     78     ProduceEventBatch(on_create_data_source_instance_);
     79   } else {
     80     task_runner_->PostTask(on_create_data_source_instance_);
     81   }
     82 }
     83 
     84 void FakeProducer::StopDataSource(DataSourceInstanceID) {
     85   PERFETTO_DCHECK_THREAD(thread_checker_);
     86   trace_writer_.reset();
     87 }
     88 
     89 // Note: this can be called on a different thread.
     90 void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
     91   task_runner_->PostTask([this, callback] {
     92     PERFETTO_CHECK(trace_writer_);
     93     PERFETTO_CHECK(message_size_ > 1);
     94     std::unique_ptr<char, base::FreeDeleter> payload(
     95         static_cast<char*>(malloc(message_size_)));
     96     memset(payload.get(), '.', message_size_);
     97     payload.get()[message_size_ - 1] = 0;
     98 
     99     base::TimeMillis start = base::GetWallTimeMs();
    100     int64_t iterations = 0;
    101     uint32_t messages_to_emit = message_count_;
    102     while (messages_to_emit > 0) {
    103       uint32_t messages_in_minibatch =
    104           max_messages_per_second_ == 0
    105               ? messages_to_emit
    106               : std::min(max_messages_per_second_, messages_to_emit);
    107       PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
    108 
    109       for (uint32_t i = 0; i < messages_in_minibatch; i++) {
    110         auto handle = trace_writer_->NewTracePacket();
    111         handle->set_for_testing()->set_seq_value(
    112             static_cast<uint32_t>(rnd_engine_()));
    113         handle->set_for_testing()->set_str(payload.get(), message_size_);
    114       }
    115       messages_to_emit -= messages_in_minibatch;
    116       iterations++;
    117 
    118       // Pause until the second boundary to make sure that we are adhering to
    119       // the speed limitation.
    120       if (max_messages_per_second_ > 0) {
    121         int64_t expected_time_taken = iterations * 1000;
    122         base::TimeMillis time_taken = base::GetWallTimeMs() - start;
    123         while (time_taken.count() < expected_time_taken) {
    124           usleep(static_cast<useconds_t>(
    125               (expected_time_taken - time_taken.count()) * 1000));
    126           time_taken = base::GetWallTimeMs() - start;
    127         }
    128       }
    129       trace_writer_->Flush(messages_to_emit > 0 ? [] {} : callback);
    130     }
    131   });
    132 }
    133 
    134 void FakeProducer::OnTracingSetup() {}
    135 
    136 void FakeProducer::Flush(FlushRequestID flush_request_id,
    137                          const DataSourceInstanceID*,
    138                          size_t num_data_sources) {
    139   PERFETTO_DCHECK(num_data_sources > 0);
    140   if (trace_writer_)
    141     trace_writer_->Flush();
    142   endpoint_->NotifyFlushComplete(flush_request_id);
    143 }
    144 
    145 }  // namespace perfetto
    146