Home | History | Annotate | Download | only in test
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include <inttypes.h>
     18 #include <unistd.h>
     19 
     20 #include "gmock/gmock.h"
     21 #include "gtest/gtest.h"
     22 #include "perfetto/base/temp_file.h"
     23 #include "perfetto/tracing/core/consumer.h"
     24 #include "perfetto/tracing/core/data_source_config.h"
     25 #include "perfetto/tracing/core/data_source_descriptor.h"
     26 #include "perfetto/tracing/core/producer.h"
     27 #include "perfetto/tracing/core/trace_config.h"
     28 #include "perfetto/tracing/core/trace_packet.h"
     29 #include "perfetto/tracing/core/trace_stats.h"
     30 #include "perfetto/tracing/core/trace_writer.h"
     31 #include "perfetto/tracing/ipc/consumer_ipc_client.h"
     32 #include "perfetto/tracing/ipc/producer_ipc_client.h"
     33 #include "perfetto/tracing/ipc/service_ipc_host.h"
     34 #include "src/base/test/test_task_runner.h"
     35 #include "src/ipc/test/test_socket.h"
     36 #include "src/tracing/core/tracing_service_impl.h"
     37 
     38 #include "perfetto/config/trace_config.pb.h"
     39 #include "perfetto/trace/test_event.pbzero.h"
     40 #include "perfetto/trace/trace.pb.h"
     41 #include "perfetto/trace/trace_packet.pb.h"
     42 #include "perfetto/trace/trace_packet.pbzero.h"
     43 
     44 namespace perfetto {
     45 namespace {
     46 
     47 using testing::Invoke;
     48 using testing::InvokeWithoutArgs;
     49 using testing::_;
     50 
     51 constexpr char kProducerSockName[] = TEST_SOCK_NAME("tracing_test-producer");
     52 constexpr char kConsumerSockName[] = TEST_SOCK_NAME("tracing_test-consumer");
     53 
     54 // TODO(rsavitski): consider using src/tracing/test/mock_producer.h.
     55 class MockProducer : public Producer {
     56  public:
     57   ~MockProducer() override {}
     58 
     59   // Producer implementation.
     60   MOCK_METHOD0(OnConnect, void());
     61   MOCK_METHOD0(OnDisconnect, void());
     62   MOCK_METHOD2(SetupDataSource,
     63                void(DataSourceInstanceID, const DataSourceConfig&));
     64   MOCK_METHOD2(StartDataSource,
     65                void(DataSourceInstanceID, const DataSourceConfig&));
     66   MOCK_METHOD1(StopDataSource, void(DataSourceInstanceID));
     67   MOCK_METHOD0(uid, uid_t());
     68   MOCK_METHOD0(OnTracingSetup, void());
     69   MOCK_METHOD3(Flush,
     70                void(FlushRequestID, const DataSourceInstanceID*, size_t));
     71   MOCK_METHOD2(ClearIncrementalState,
     72                void(const DataSourceInstanceID*, size_t));
     73 };
     74 
     75 class MockConsumer : public Consumer {
     76  public:
     77   ~MockConsumer() override {}
     78 
     79   // Producer implementation.
     80   MOCK_METHOD0(OnConnect, void());
     81   MOCK_METHOD0(OnDisconnect, void());
     82   MOCK_METHOD0(OnTracingDisabled, void());
     83   MOCK_METHOD2(OnTracePackets, void(std::vector<TracePacket>*, bool));
     84   MOCK_METHOD1(OnDetach, void(bool));
     85   MOCK_METHOD2(OnAttach, void(bool, const TraceConfig&));
     86   MOCK_METHOD2(OnTraceStats, void(bool, const TraceStats&));
     87   MOCK_METHOD1(OnObservableEvents, void(const ObservableEvents&));
     88 
     89   // Workaround, gmock doesn't support yet move-only types, passing a pointer.
     90   void OnTraceData(std::vector<TracePacket> packets, bool has_more) {
     91     OnTracePackets(&packets, has_more);
     92   }
     93 };
     94 
     95 void CheckTraceStats(const protos::TracePacket& packet) {
     96   EXPECT_TRUE(packet.has_trace_stats());
     97   EXPECT_GE(packet.trace_stats().producers_seen(), 1u);
     98   EXPECT_EQ(1u, packet.trace_stats().producers_connected());
     99   EXPECT_EQ(1u, packet.trace_stats().data_sources_registered());
    100   EXPECT_EQ(1u, packet.trace_stats().tracing_sessions());
    101   EXPECT_EQ(1u, packet.trace_stats().total_buffers());
    102   EXPECT_EQ(1, packet.trace_stats().buffer_stats_size());
    103 
    104   const auto& buf_stats = packet.trace_stats().buffer_stats(0);
    105   EXPECT_GT(buf_stats.bytes_written(), 0u);
    106   EXPECT_GT(buf_stats.chunks_written(), 0u);
    107   EXPECT_EQ(0u, buf_stats.chunks_overwritten());
    108   EXPECT_EQ(0u, buf_stats.chunks_rewritten());
    109   EXPECT_EQ(0u, buf_stats.chunks_committed_out_of_order());
    110   EXPECT_EQ(0u, buf_stats.write_wrap_count());
    111   EXPECT_EQ(0u, buf_stats.patches_failed());
    112   EXPECT_EQ(0u, buf_stats.readaheads_failed());
    113   EXPECT_EQ(0u, buf_stats.abi_violations());
    114 }
    115 
    116 }  // namespace
    117 
    118 class TracingIntegrationTest : public ::testing::Test {
    119  public:
    120   void SetUp() override {
    121     DESTROY_TEST_SOCK(kProducerSockName);
    122     DESTROY_TEST_SOCK(kConsumerSockName);
    123     task_runner_.reset(new base::TestTaskRunner());
    124 
    125     // Create the service host.
    126     svc_ = ServiceIPCHost::CreateInstance(task_runner_.get());
    127     svc_->Start(kProducerSockName, kConsumerSockName);
    128 
    129     // Create and connect a Producer.
    130     producer_endpoint_ = ProducerIPCClient::Connect(
    131         kProducerSockName, &producer_, "perfetto.mock_producer",
    132         task_runner_.get(), GetProducerSMBScrapingMode());
    133     auto on_producer_connect =
    134         task_runner_->CreateCheckpoint("on_producer_connect");
    135     EXPECT_CALL(producer_, OnConnect()).WillOnce(Invoke(on_producer_connect));
    136     task_runner_->RunUntilCheckpoint("on_producer_connect");
    137 
    138     // Register a data source.
    139     DataSourceDescriptor ds_desc;
    140     ds_desc.set_name("perfetto.test");
    141     producer_endpoint_->RegisterDataSource(ds_desc);
    142 
    143     // Create and connect a Consumer.
    144     consumer_endpoint_ = ConsumerIPCClient::Connect(
    145         kConsumerSockName, &consumer_, task_runner_.get());
    146     auto on_consumer_connect =
    147         task_runner_->CreateCheckpoint("on_consumer_connect");
    148     EXPECT_CALL(consumer_, OnConnect()).WillOnce(Invoke(on_consumer_connect));
    149     task_runner_->RunUntilCheckpoint("on_consumer_connect");
    150 
    151     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
    152     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
    153   }
    154 
    155   void TearDown() override {
    156     // Destroy the service and check that both Producer and Consumer see an
    157     // OnDisconnect() call.
    158 
    159     auto on_producer_disconnect =
    160         task_runner_->CreateCheckpoint("on_producer_disconnect");
    161     EXPECT_CALL(producer_, OnDisconnect())
    162         .WillOnce(Invoke(on_producer_disconnect));
    163 
    164     auto on_consumer_disconnect =
    165         task_runner_->CreateCheckpoint("on_consumer_disconnect");
    166     EXPECT_CALL(consumer_, OnDisconnect())
    167         .WillOnce(Invoke(on_consumer_disconnect));
    168 
    169     svc_.reset();
    170     task_runner_->RunUntilCheckpoint("on_producer_disconnect");
    171     task_runner_->RunUntilCheckpoint("on_consumer_disconnect");
    172 
    173     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&producer_));
    174     ASSERT_TRUE(testing::Mock::VerifyAndClearExpectations(&consumer_));
    175 
    176     task_runner_.reset();
    177     DESTROY_TEST_SOCK(kProducerSockName);
    178     DESTROY_TEST_SOCK(kConsumerSockName);
    179   }
    180 
    181   virtual TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode() {
    182     return TracingService::ProducerSMBScrapingMode::kDefault;
    183   }
    184 
    185   void WaitForTraceWritersChanged(ProducerID producer_id) {
    186     static int i = 0;
    187     auto checkpoint_name = "writers_changed_" + std::to_string(producer_id) +
    188                            "_" + std::to_string(i++);
    189     auto writers_changed = task_runner_->CreateCheckpoint(checkpoint_name);
    190     auto writers = GetWriters(producer_id);
    191     std::function<void()> task;
    192     task = [&task, writers, writers_changed, producer_id, this]() {
    193       if (writers != GetWriters(producer_id)) {
    194         writers_changed();
    195         return;
    196       }
    197       task_runner_->PostDelayedTask(task, 1);
    198     };
    199     task_runner_->PostDelayedTask(task, 1);
    200     task_runner_->RunUntilCheckpoint(checkpoint_name);
    201   }
    202 
    203   const std::map<WriterID, BufferID>& GetWriters(ProducerID producer_id) {
    204     return reinterpret_cast<TracingServiceImpl*>(svc_->service())
    205         ->GetProducer(producer_id)
    206         ->writers_;
    207   }
    208 
    209   ProducerID* last_producer_id() {
    210     return &reinterpret_cast<TracingServiceImpl*>(svc_->service())
    211                 ->last_producer_id_;
    212   }
    213 
    214   std::unique_ptr<base::TestTaskRunner> task_runner_;
    215   std::unique_ptr<ServiceIPCHost> svc_;
    216   std::unique_ptr<TracingService::ProducerEndpoint> producer_endpoint_;
    217   MockProducer producer_;
    218   std::unique_ptr<TracingService::ConsumerEndpoint> consumer_endpoint_;
    219   MockConsumer consumer_;
    220 };
    221 
    222 TEST_F(TracingIntegrationTest, WithIPCTransport) {
    223   // Start tracing.
    224   TraceConfig trace_config;
    225   trace_config.add_buffers()->set_size_kb(4096 * 10);
    226   auto* ds_config = trace_config.add_data_sources()->mutable_config();
    227   ds_config->set_name("perfetto.test");
    228   ds_config->set_target_buffer(0);
    229   consumer_endpoint_->EnableTracing(trace_config);
    230 
    231   // At this point, the Producer should be asked to turn its data source on.
    232   DataSourceInstanceID ds_iid = 0;
    233 
    234   BufferID global_buf_id = 0;
    235   auto on_create_ds_instance =
    236       task_runner_->CreateCheckpoint("on_create_ds_instance");
    237   EXPECT_CALL(producer_, OnTracingSetup());
    238 
    239   // Store the arguments passed to SetupDataSource() and later check that they
    240   // match the ones passed to StartDataSource().
    241   DataSourceInstanceID setup_id;
    242   perfetto::protos::DataSourceConfig setup_cfg_proto;
    243   EXPECT_CALL(producer_, SetupDataSource(_, _))
    244       .WillOnce(
    245           Invoke([&setup_id, &setup_cfg_proto](DataSourceInstanceID id,
    246                                                const DataSourceConfig& cfg) {
    247 
    248             setup_id = id;
    249             cfg.ToProto(&setup_cfg_proto);
    250           }));
    251   EXPECT_CALL(producer_, StartDataSource(_, _))
    252       .WillOnce(
    253           Invoke([on_create_ds_instance, &ds_iid, &global_buf_id, &setup_id,
    254                   &setup_cfg_proto](DataSourceInstanceID id,
    255                                     const DataSourceConfig& cfg) {
    256             // id and config should match the ones passed to SetupDataSource.
    257             ASSERT_EQ(id, setup_id);
    258             perfetto::protos::DataSourceConfig cfg_proto;
    259             cfg.ToProto(&cfg_proto);
    260             ASSERT_EQ(cfg_proto.SerializeAsString(),
    261                       setup_cfg_proto.SerializeAsString());
    262 
    263             ASSERT_NE(0u, id);
    264             ds_iid = id;
    265             ASSERT_EQ("perfetto.test", cfg.name());
    266             global_buf_id = static_cast<BufferID>(cfg.target_buffer());
    267             ASSERT_NE(0u, global_buf_id);
    268             ASSERT_LE(global_buf_id, std::numeric_limits<BufferID>::max());
    269             on_create_ds_instance();
    270           }));
    271   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
    272 
    273   // Now let the data source fill some pages within the same task.
    274   // Doing so should accumulate a bunch of chunks that will be notified by the
    275   // a future task in one batch.
    276   std::unique_ptr<TraceWriter> writer =
    277       producer_endpoint_->CreateTraceWriter(global_buf_id);
    278   ASSERT_TRUE(writer);
    279 
    280   const size_t kNumPackets = 10;
    281   for (size_t i = 0; i < kNumPackets; i++) {
    282     char buf[16];
    283     sprintf(buf, "evt_%zu", i);
    284     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
    285   }
    286 
    287   // Allow the service to see the CommitData() before reading back.
    288   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
    289   writer->Flush(on_data_committed);
    290   task_runner_->RunUntilCheckpoint("on_data_committed");
    291 
    292   // Read the log buffer.
    293   consumer_endpoint_->ReadBuffers();
    294   size_t num_pack_rx = 0;
    295   bool saw_clock_snapshot = false;
    296   bool saw_trace_config = false;
    297   bool saw_trace_stats = false;
    298   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
    299   EXPECT_CALL(consumer_, OnTracePackets(_, _))
    300       .WillRepeatedly(
    301           Invoke([&num_pack_rx, all_packets_rx, &trace_config,
    302                   &saw_clock_snapshot, &saw_trace_config, &saw_trace_stats](
    303                      std::vector<TracePacket>* packets, bool has_more) {
    304 #if PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
    305             const int kExpectedMinNumberOfClocks = 1;
    306 #else
    307             const int kExpectedMinNumberOfClocks = 6;
    308 #endif
    309 
    310             for (auto& encoded_packet : *packets) {
    311               protos::TracePacket packet;
    312               ASSERT_TRUE(encoded_packet.Decode(&packet));
    313               if (packet.has_for_testing()) {
    314                 char buf[8];
    315                 sprintf(buf, "evt_%zu", num_pack_rx++);
    316                 EXPECT_EQ(std::string(buf), packet.for_testing().str());
    317               } else if (packet.has_clock_snapshot()) {
    318                 EXPECT_GE(packet.clock_snapshot().clocks_size(),
    319                           kExpectedMinNumberOfClocks);
    320                 saw_clock_snapshot = true;
    321               } else if (packet.has_trace_config()) {
    322                 protos::TraceConfig config_proto;
    323                 trace_config.ToProto(&config_proto);
    324                 Slice expected_slice = Slice::Allocate(
    325                     static_cast<size_t>(config_proto.ByteSize()));
    326                 config_proto.SerializeWithCachedSizesToArray(
    327                     expected_slice.own_data());
    328                 Slice actual_slice = Slice::Allocate(
    329                     static_cast<size_t>(packet.trace_config().ByteSize()));
    330                 packet.trace_config().SerializeWithCachedSizesToArray(
    331                     actual_slice.own_data());
    332                 EXPECT_EQ(std::string(reinterpret_cast<const char*>(
    333                                           expected_slice.own_data()),
    334                                       expected_slice.size),
    335                           std::string(reinterpret_cast<const char*>(
    336                                           actual_slice.own_data()),
    337                                       actual_slice.size));
    338                 saw_trace_config = true;
    339               } else if (packet.has_trace_stats()) {
    340                 saw_trace_stats = true;
    341                 CheckTraceStats(packet);
    342               }
    343             }
    344             if (!has_more)
    345               all_packets_rx();
    346           }));
    347   task_runner_->RunUntilCheckpoint("all_packets_rx");
    348   ASSERT_EQ(kNumPackets, num_pack_rx);
    349   EXPECT_TRUE(saw_clock_snapshot);
    350   EXPECT_TRUE(saw_trace_config);
    351   EXPECT_TRUE(saw_trace_stats);
    352 
    353   // Disable tracing.
    354   consumer_endpoint_->DisableTracing();
    355 
    356   auto on_tracing_disabled =
    357       task_runner_->CreateCheckpoint("on_tracing_disabled");
    358   EXPECT_CALL(producer_, StopDataSource(_));
    359   EXPECT_CALL(consumer_, OnTracingDisabled())
    360       .WillOnce(Invoke(on_tracing_disabled));
    361   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
    362 }
    363 
    364 TEST_F(TracingIntegrationTest, WriteIntoFile) {
    365   // Start tracing.
    366   TraceConfig trace_config;
    367   trace_config.add_buffers()->set_size_kb(4096 * 10);
    368   auto* ds_config = trace_config.add_data_sources()->mutable_config();
    369   ds_config->set_name("perfetto.test");
    370   ds_config->set_target_buffer(0);
    371   trace_config.set_write_into_file(true);
    372   base::TempFile tmp_file = base::TempFile::CreateUnlinked();
    373   consumer_endpoint_->EnableTracing(trace_config,
    374                                     base::ScopedFile(dup(tmp_file.fd())));
    375 
    376   // At this point, the producer_ should be asked to turn its data source on.
    377   BufferID global_buf_id = 0;
    378   auto on_create_ds_instance =
    379       task_runner_->CreateCheckpoint("on_create_ds_instance");
    380   EXPECT_CALL(producer_, OnTracingSetup());
    381   EXPECT_CALL(producer_, SetupDataSource(_, _));
    382   EXPECT_CALL(producer_, StartDataSource(_, _))
    383       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
    384                            DataSourceInstanceID, const DataSourceConfig& cfg) {
    385         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
    386         on_create_ds_instance();
    387       }));
    388   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
    389 
    390   std::unique_ptr<TraceWriter> writer =
    391       producer_endpoint_->CreateTraceWriter(global_buf_id);
    392   ASSERT_TRUE(writer);
    393 
    394   const size_t kNumPackets = 10;
    395   for (size_t i = 0; i < kNumPackets; i++) {
    396     char buf[16];
    397     sprintf(buf, "evt_%zu", i);
    398     writer->NewTracePacket()->set_for_testing()->set_str(buf, strlen(buf));
    399   }
    400   auto on_data_committed = task_runner_->CreateCheckpoint("on_data_committed");
    401   writer->Flush(on_data_committed);
    402   task_runner_->RunUntilCheckpoint("on_data_committed");
    403 
    404   // Will disable tracing and will force the buffers to be written into the
    405   // file before destroying them.
    406   consumer_endpoint_->FreeBuffers();
    407 
    408   auto on_tracing_disabled =
    409       task_runner_->CreateCheckpoint("on_tracing_disabled");
    410   EXPECT_CALL(producer_, StopDataSource(_));
    411   EXPECT_CALL(consumer_, OnTracingDisabled())
    412       .WillOnce(Invoke(on_tracing_disabled));
    413   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
    414 
    415   // Check that |tmp_file| contains a valid trace.proto message.
    416   ASSERT_EQ(0, lseek(tmp_file.fd(), 0, SEEK_SET));
    417   char tmp_buf[1024];
    418   ssize_t rsize = read(tmp_file.fd(), tmp_buf, sizeof(tmp_buf));
    419   ASSERT_GT(rsize, 0);
    420   protos::Trace tmp_trace;
    421   ASSERT_TRUE(tmp_trace.ParseFromArray(tmp_buf, static_cast<int>(rsize)));
    422   size_t num_test_packet = 0;
    423   size_t num_clock_snapshot_packet = 0;
    424   size_t num_system_info_packet = 0;
    425   bool saw_trace_stats = false;
    426   for (int i = 0; i < tmp_trace.packet_size(); i++) {
    427     const protos::TracePacket& packet = tmp_trace.packet(i);
    428     if (packet.has_for_testing()) {
    429       ASSERT_EQ("evt_" + std::to_string(num_test_packet++),
    430                 packet.for_testing().str());
    431     } else if (packet.has_trace_stats()) {
    432       saw_trace_stats = true;
    433       CheckTraceStats(packet);
    434     } else if (packet.has_clock_snapshot()) {
    435       num_clock_snapshot_packet++;
    436     } else if (packet.has_system_info()) {
    437       num_system_info_packet++;
    438     }
    439   }
    440   ASSERT_TRUE(saw_trace_stats);
    441   ASSERT_GT(num_clock_snapshot_packet, 0u);
    442   ASSERT_GT(num_system_info_packet, 0u);
    443 }
    444 
    445 class TracingIntegrationTestWithSMBScrapingProducer
    446     : public TracingIntegrationTest {
    447  public:
    448   TracingService::ProducerSMBScrapingMode GetProducerSMBScrapingMode()
    449       override {
    450     return TracingService::ProducerSMBScrapingMode::kEnabled;
    451   }
    452 };
    453 
    454 TEST_F(TracingIntegrationTestWithSMBScrapingProducer, ScrapeOnFlush) {
    455   // Start tracing.
    456   TraceConfig trace_config;
    457   trace_config.add_buffers()->set_size_kb(4096 * 10);
    458   auto* ds_config = trace_config.add_data_sources()->mutable_config();
    459   ds_config->set_name("perfetto.test");
    460   ds_config->set_target_buffer(0);
    461   consumer_endpoint_->EnableTracing(trace_config);
    462 
    463   // At this point, the Producer should be asked to turn its data source on.
    464 
    465   BufferID global_buf_id = 0;
    466   auto on_create_ds_instance =
    467       task_runner_->CreateCheckpoint("on_create_ds_instance");
    468   EXPECT_CALL(producer_, OnTracingSetup());
    469 
    470   EXPECT_CALL(producer_, SetupDataSource(_, _));
    471   EXPECT_CALL(producer_, StartDataSource(_, _))
    472       .WillOnce(Invoke([on_create_ds_instance, &global_buf_id](
    473                            DataSourceInstanceID, const DataSourceConfig& cfg) {
    474         global_buf_id = static_cast<BufferID>(cfg.target_buffer());
    475         on_create_ds_instance();
    476       }));
    477   task_runner_->RunUntilCheckpoint("on_create_ds_instance");
    478 
    479   // Create writer, which will post a task to register the writer with the
    480   // service.
    481   std::unique_ptr<TraceWriter> writer =
    482       producer_endpoint_->CreateTraceWriter(global_buf_id);
    483   ASSERT_TRUE(writer);
    484 
    485   // Wait for the writer to be registered.
    486   WaitForTraceWritersChanged(*last_producer_id());
    487 
    488   // Write a few trace packets.
    489   writer->NewTracePacket()->set_for_testing()->set_str("payload1");
    490   writer->NewTracePacket()->set_for_testing()->set_str("payload2");
    491   writer->NewTracePacket()->set_for_testing()->set_str("payload3");
    492 
    493   // Ask the service to flush, but don't flush our trace writer. This should
    494   // cause our uncommitted SMB chunk to be scraped.
    495   auto on_flush_complete = task_runner_->CreateCheckpoint("on_flush_complete");
    496   consumer_endpoint_->Flush(5000, [on_flush_complete](bool success) {
    497     EXPECT_TRUE(success);
    498     on_flush_complete();
    499   });
    500   EXPECT_CALL(producer_, Flush(_, _, _))
    501       .WillOnce(Invoke([this](FlushRequestID flush_req_id,
    502                               const DataSourceInstanceID*, size_t) {
    503         producer_endpoint_->NotifyFlushComplete(flush_req_id);
    504       }));
    505   task_runner_->RunUntilCheckpoint("on_flush_complete");
    506 
    507   // Read the log buffer. We should only see the first two written trace
    508   // packets, because the service can't be sure the last one was written
    509   // completely by the trace writer.
    510   consumer_endpoint_->ReadBuffers();
    511 
    512   size_t num_test_pack_rx = 0;
    513   auto all_packets_rx = task_runner_->CreateCheckpoint("all_packets_rx");
    514   EXPECT_CALL(consumer_, OnTracePackets(_, _))
    515       .WillRepeatedly(
    516           Invoke([&num_test_pack_rx, all_packets_rx](
    517                      std::vector<TracePacket>* packets, bool has_more) {
    518             for (auto& encoded_packet : *packets) {
    519               protos::TracePacket packet;
    520               ASSERT_TRUE(encoded_packet.Decode(&packet));
    521               if (packet.has_for_testing()) {
    522                 num_test_pack_rx++;
    523               }
    524             }
    525             if (!has_more)
    526               all_packets_rx();
    527           }));
    528   task_runner_->RunUntilCheckpoint("all_packets_rx");
    529   ASSERT_EQ(2, num_test_pack_rx);
    530 
    531   // Disable tracing.
    532   consumer_endpoint_->DisableTracing();
    533 
    534   auto on_tracing_disabled =
    535       task_runner_->CreateCheckpoint("on_tracing_disabled");
    536   EXPECT_CALL(producer_, StopDataSource(_));
    537   EXPECT_CALL(consumer_, OnTracingDisabled())
    538       .WillOnce(Invoke(on_tracing_disabled));
    539   task_runner_->RunUntilCheckpoint("on_tracing_disabled");
    540 }
    541 
    542 // TODO(primiano): add tests to cover:
    543 // - unknown fields preserved end-to-end.
    544 // - >1 data source.
    545 // - >1 data consumer sharing the same data source, with different TraceBuffers.
    546 // - >1 consumer with > 1 buffer each.
    547 // - Consumer disconnecting in the middle of a ReadBuffers() call.
    548 // - Multiple calls to DisableTracing.
    549 // - Out of order Enable/Disable/FreeBuffers calls.
    550 // - DisableTracing does actually freeze the buffers.
    551 
    552 }  // namespace perfetto
    553