Home | History | Annotate | Download | only in test
      1 /*
      2  * Copyright (C) 2018 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "test/fake_producer.h"
     18 
     19 #include <condition_variable>
     20 #include <mutex>
     21 
     22 #include "gtest/gtest.h"
     23 #include "perfetto/base/logging.h"
     24 #include "perfetto/base/time.h"
     25 #include "perfetto/base/utils.h"
     26 #include "perfetto/trace/test_event.pbzero.h"
     27 #include "perfetto/trace/trace_packet.pbzero.h"
     28 #include "perfetto/traced/traced.h"
     29 #include "perfetto/tracing/core/trace_packet.h"
     30 #include "perfetto/tracing/core/trace_writer.h"
     31 
     32 namespace perfetto {
     33 
     34 FakeProducer::FakeProducer(const std::string& name) : name_(name) {}
     35 FakeProducer::~FakeProducer() = default;
     36 
     37 void FakeProducer::Connect(
     38     const char* socket_name,
     39     base::TaskRunner* task_runner,
     40     std::function<void()> on_create_data_source_instance) {
     41   PERFETTO_DCHECK_THREAD(thread_checker_);
     42   task_runner_ = task_runner;
     43   endpoint_ = ProducerIPCClient::Connect(
     44       socket_name, this, "android.perfetto.FakeProducer", task_runner);
     45   on_create_data_source_instance_ = std::move(on_create_data_source_instance);
     46 }
     47 
     48 void FakeProducer::OnConnect() {
     49   PERFETTO_DCHECK_THREAD(thread_checker_);
     50   DataSourceDescriptor descriptor;
     51   descriptor.set_name(name_);
     52   endpoint_->RegisterDataSource(descriptor);
     53 }
     54 
     55 void FakeProducer::OnDisconnect() {
     56   PERFETTO_DCHECK_THREAD(thread_checker_);
     57   FAIL() << "Producer unexpectedly disconnected from the service";
     58 }
     59 
     60 void FakeProducer::CreateDataSourceInstance(
     61     DataSourceInstanceID,
     62     const DataSourceConfig& source_config) {
     63   PERFETTO_DCHECK_THREAD(thread_checker_);
     64   trace_writer_ = endpoint_->CreateTraceWriter(
     65       static_cast<BufferID>(source_config.target_buffer()));
     66   rnd_engine_ = std::minstd_rand0(source_config.for_testing().seed());
     67   message_count_ = source_config.for_testing().message_count();
     68   message_size_ = source_config.for_testing().message_size();
     69   max_messages_per_second_ =
     70       source_config.for_testing().max_messages_per_second();
     71   if (source_config.for_testing().send_batch_on_register()) {
     72     ProduceEventBatch(on_create_data_source_instance_);
     73   } else {
     74     task_runner_->PostTask(on_create_data_source_instance_);
     75   }
     76 }
     77 
     78 void FakeProducer::TearDownDataSourceInstance(DataSourceInstanceID) {
     79   PERFETTO_DCHECK_THREAD(thread_checker_);
     80   trace_writer_.reset();
     81 }
     82 
     83 // Note: this can be called on a different thread.
     84 void FakeProducer::ProduceEventBatch(std::function<void()> callback) {
     85   task_runner_->PostTask([this, callback] {
     86     PERFETTO_CHECK(trace_writer_);
     87     PERFETTO_CHECK(message_size_ > 1);
     88     std::unique_ptr<char, base::FreeDeleter> payload(
     89         static_cast<char*>(malloc(message_size_)));
     90     memset(payload.get(), '.', message_size_);
     91     payload.get()[message_size_ - 1] = 0;
     92 
     93     base::TimeMillis start = base::GetWallTimeMs();
     94     int64_t iterations = 0;
     95     uint32_t messages_to_emit = message_count_;
     96     while (messages_to_emit > 0) {
     97       uint32_t messages_in_minibatch =
     98           max_messages_per_second_ == 0
     99               ? messages_to_emit
    100               : std::min(max_messages_per_second_, messages_to_emit);
    101       PERFETTO_DCHECK(messages_to_emit >= messages_in_minibatch);
    102 
    103       for (uint32_t i = 0; i < messages_in_minibatch; i++) {
    104         auto handle = trace_writer_->NewTracePacket();
    105         handle->set_for_testing()->set_seq_value(
    106             static_cast<uint32_t>(rnd_engine_()));
    107         handle->set_for_testing()->set_str(payload.get(), message_size_);
    108       }
    109       messages_to_emit -= messages_in_minibatch;
    110 
    111       // Pause until the second boundary to make sure that we are adhering to
    112       // the speed limitation.
    113       if (max_messages_per_second_ > 0) {
    114         int64_t expected_time_taken = ++iterations * 1000;
    115         base::TimeMillis time_taken = base::GetWallTimeMs() - start;
    116         while (time_taken.count() < expected_time_taken) {
    117           usleep(static_cast<useconds_t>(
    118               (expected_time_taken - time_taken.count()) * 1000));
    119           time_taken = base::GetWallTimeMs() - start;
    120         }
    121       }
    122     }
    123     trace_writer_->Flush(callback);
    124   });
    125 }
    126 
    127 void FakeProducer::OnTracingSetup() {}
    128 
    129 void FakeProducer::Flush(FlushRequestID flush_request_id,
    130                          const DataSourceInstanceID*,
    131                          size_t num_data_sources) {
    132   PERFETTO_DCHECK(num_data_sources > 0);
    133   if (trace_writer_)
    134     trace_writer_->Flush();
    135   endpoint_->NotifyFlushComplete(flush_request_id);
    136 }
    137 
    138 }  // namespace perfetto
    139