Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/tracing/core/tracing_service_impl.h"
     18 
     19 #include "perfetto/base/build_config.h"
     20 
     21 #include <errno.h>
     22 #include <inttypes.h>
     23 #include <limits.h>
     24 #include <string.h>
     25 #include <regex>
     26 #include <unordered_set>
     27 
     28 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
     29 #include <sys/uio.h>
     30 #include <sys/utsname.h>
     31 #include <unistd.h>
     32 #endif
     33 
     34 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
     35 #include <sys/system_properties.h>
     36 #endif
     37 
     38 #include <algorithm>
     39 
     40 #include "perfetto/base/build_config.h"
     41 #include "perfetto/base/file_utils.h"
     42 #include "perfetto/base/task_runner.h"
     43 #include "perfetto/base/utils.h"
     44 #include "perfetto/tracing/core/consumer.h"
     45 #include "perfetto/tracing/core/data_source_config.h"
     46 #include "perfetto/tracing/core/producer.h"
     47 #include "perfetto/tracing/core/shared_memory.h"
     48 #include "perfetto/tracing/core/shared_memory_abi.h"
     49 #include "perfetto/tracing/core/trace_packet.h"
     50 #include "perfetto/tracing/core/trace_writer.h"
     51 #include "src/tracing/core/packet_stream_validator.h"
     52 #include "src/tracing/core/shared_memory_arbiter_impl.h"
     53 #include "src/tracing/core/trace_buffer.h"
     54 
     55 #include "perfetto/trace/clock_snapshot.pb.h"
     56 #include "perfetto/trace/system_info.pb.h"
     57 #include "perfetto/trace/trusted_packet.pb.h"
     58 
     59 // General note: this class must assume that Producers are malicious and will
     60 // try to crash / exploit this class. We can trust pointers because they come
     61 // from the IPC layer, but we should never assume that that the producer calls
     62 // come in the right order or their arguments are sane / within bounds.
     63 
     64 namespace perfetto {
     65 
     66 namespace {
     67 constexpr size_t kDefaultShmPageSize = base::kPageSize;
     68 constexpr int kMaxBuffersPerConsumer = 128;
     69 constexpr base::TimeMillis kSnapshotsInterval(10 * 1000);
     70 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
     71 constexpr int kMaxConcurrentTracingSessions = 5;
     72 
     73 constexpr uint32_t kMillisPerHour = 3600000;
     74 constexpr uint32_t kMaxTracingDurationMillis = 7 * 24 * kMillisPerHour;
     75 
     76 // These apply only if enable_extra_guardrails is true.
     77 constexpr uint32_t kGuardrailsMaxTracingBufferSizeKb = 32 * 1024;
     78 constexpr uint32_t kGuardrailsMaxTracingDurationMillis = 24 * kMillisPerHour;
     79 
     80 #if PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
     81 struct iovec {
     82   void* iov_base;  // Address
     83   size_t iov_len;  // Block size
     84 };
     85 
     86 // Simple implementation of writev. Note that this does not give the atomicity
     87 // guarantees of a real writev, but we don't depend on these (we aren't writing
     88 // to the same file from another thread).
     89 ssize_t writev(int fd, const struct iovec* iov, int iovcnt) {
     90   ssize_t total_size = 0;
     91   for (int i = 0; i < iovcnt; ++i) {
     92     ssize_t current_size = base::WriteAll(fd, iov[i].iov_base, iov[i].iov_len);
     93     if (current_size != static_cast<ssize_t>(iov[i].iov_len))
     94       return -1;
     95     total_size += current_size;
     96   }
     97   return total_size;
     98 }
     99 
    100 #define IOV_MAX 1024  // Linux compatible limit.
    101 
    102 // uid checking is a NOP on Windows.
    103 uid_t getuid() {
    104   return 0;
    105 }
    106 uid_t geteuid() {
    107   return 0;
    108 }
    109 #endif  // PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
    110 
    111 }  // namespace
    112 
    113 // These constants instead are defined in the header because are used by tests.
    114 constexpr size_t TracingServiceImpl::kDefaultShmSize;
    115 constexpr size_t TracingServiceImpl::kMaxShmSize;
    116 constexpr uint32_t TracingServiceImpl::kDataSourceStopTimeoutMs;
    117 constexpr uint8_t TracingServiceImpl::kSyncMarker[];
    118 
    119 // static
    120 std::unique_ptr<TracingService> TracingService::CreateInstance(
    121     std::unique_ptr<SharedMemory::Factory> shm_factory,
    122     base::TaskRunner* task_runner) {
    123   return std::unique_ptr<TracingService>(
    124       new TracingServiceImpl(std::move(shm_factory), task_runner));
    125 }
    126 
    127 TracingServiceImpl::TracingServiceImpl(
    128     std::unique_ptr<SharedMemory::Factory> shm_factory,
    129     base::TaskRunner* task_runner)
    130     : task_runner_(task_runner),
    131       shm_factory_(std::move(shm_factory)),
    132       uid_(getuid()),
    133       buffer_ids_(kMaxTraceBufferID),
    134       weak_ptr_factory_(this) {
    135   PERFETTO_DCHECK(task_runner_);
    136 }
    137 
    138 TracingServiceImpl::~TracingServiceImpl() {
    139   // TODO(fmayer): handle teardown of all Producer.
    140 }
    141 
    142 std::unique_ptr<TracingService::ProducerEndpoint>
    143 TracingServiceImpl::ConnectProducer(Producer* producer,
    144                                     uid_t uid,
    145                                     const std::string& producer_name,
    146                                     size_t shared_memory_size_hint_bytes,
    147                                     bool in_process,
    148                                     ProducerSMBScrapingMode smb_scraping_mode) {
    149   PERFETTO_DCHECK_THREAD(thread_checker_);
    150 
    151   if (lockdown_mode_ && uid != geteuid()) {
    152     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
    153                   static_cast<unsigned long>(uid));
    154     return nullptr;
    155   }
    156 
    157   if (producers_.size() >= kMaxProducerID) {
    158     PERFETTO_DFATAL("Too many producers.");
    159     return nullptr;
    160   }
    161   const ProducerID id = GetNextProducerID();
    162   PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
    163 
    164   bool smb_scraping_enabled = smb_scraping_enabled_;
    165   switch (smb_scraping_mode) {
    166     case ProducerSMBScrapingMode::kDefault:
    167       break;
    168     case ProducerSMBScrapingMode::kEnabled:
    169       smb_scraping_enabled = true;
    170       break;
    171     case ProducerSMBScrapingMode::kDisabled:
    172       smb_scraping_enabled = false;
    173       break;
    174   }
    175 
    176   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
    177       id, uid, this, task_runner_, producer, producer_name, in_process,
    178       smb_scraping_enabled));
    179   auto it_and_inserted = producers_.emplace(id, endpoint.get());
    180   PERFETTO_DCHECK(it_and_inserted.second);
    181   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
    182   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
    183 
    184   return std::move(endpoint);
    185 }
    186 
    187 void TracingServiceImpl::DisconnectProducer(ProducerID id) {
    188   PERFETTO_DCHECK_THREAD(thread_checker_);
    189   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
    190   PERFETTO_DCHECK(producers_.count(id));
    191 
    192   // Scrape remaining chunks for this producer to ensure we don't lose data.
    193   if (auto* producer = GetProducer(id)) {
    194     for (auto& session_id_and_session : tracing_sessions_)
    195       ScrapeSharedMemoryBuffers(&session_id_and_session.second, producer);
    196   }
    197 
    198   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
    199     auto next = it;
    200     next++;
    201     if (it->second.producer_id == id)
    202       UnregisterDataSource(id, it->second.descriptor.name());
    203     it = next;
    204   }
    205 
    206   producers_.erase(id);
    207   UpdateMemoryGuardrail();
    208 }
    209 
    210 TracingServiceImpl::ProducerEndpointImpl* TracingServiceImpl::GetProducer(
    211     ProducerID id) const {
    212   PERFETTO_DCHECK_THREAD(thread_checker_);
    213   auto it = producers_.find(id);
    214   if (it == producers_.end())
    215     return nullptr;
    216   return it->second;
    217 }
    218 
    219 std::unique_ptr<TracingService::ConsumerEndpoint>
    220 TracingServiceImpl::ConnectConsumer(Consumer* consumer, uid_t uid) {
    221   PERFETTO_DCHECK_THREAD(thread_checker_);
    222   PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
    223   std::unique_ptr<ConsumerEndpointImpl> endpoint(
    224       new ConsumerEndpointImpl(this, task_runner_, consumer, uid));
    225   auto it_and_inserted = consumers_.emplace(endpoint.get());
    226   PERFETTO_DCHECK(it_and_inserted.second);
    227   task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_));
    228   return std::move(endpoint);
    229 }
    230 
    231 void TracingServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
    232   PERFETTO_DCHECK_THREAD(thread_checker_);
    233   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
    234   PERFETTO_DCHECK(consumers_.count(consumer));
    235 
    236   // TODO(primiano) : Check that this is safe (what happens if there are
    237   // ReadBuffers() calls posted in the meantime? They need to become noop).
    238   if (consumer->tracing_session_id_)
    239     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
    240   consumers_.erase(consumer);
    241 
    242   // At this point no more pointers to |consumer| should be around.
    243   PERFETTO_DCHECK(!std::any_of(
    244       tracing_sessions_.begin(), tracing_sessions_.end(),
    245       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
    246         return kv.second.consumer_maybe_null == consumer;
    247       }));
    248 }
    249 
    250 bool TracingServiceImpl::DetachConsumer(ConsumerEndpointImpl* consumer,
    251                                         const std::string& key) {
    252   PERFETTO_DCHECK_THREAD(thread_checker_);
    253   PERFETTO_DLOG("Consumer %p detached", reinterpret_cast<void*>(consumer));
    254   PERFETTO_DCHECK(consumers_.count(consumer));
    255 
    256   TracingSessionID tsid = consumer->tracing_session_id_;
    257   TracingSession* tracing_session;
    258   if (!tsid || !(tracing_session = GetTracingSession(tsid)))
    259     return false;
    260 
    261   if (GetDetachedSession(consumer->uid_, key)) {
    262     PERFETTO_ELOG("Another session has been detached with the same key \"%s\"",
    263                   key.c_str());
    264     return false;
    265   }
    266 
    267   PERFETTO_DCHECK(tracing_session->consumer_maybe_null == consumer);
    268   tracing_session->consumer_maybe_null = nullptr;
    269   tracing_session->detach_key = key;
    270   consumer->tracing_session_id_ = 0;
    271   return true;
    272 }
    273 
    274 bool TracingServiceImpl::AttachConsumer(ConsumerEndpointImpl* consumer,
    275                                         const std::string& key) {
    276   PERFETTO_DCHECK_THREAD(thread_checker_);
    277   PERFETTO_DLOG("Consumer %p attaching to session %s",
    278                 reinterpret_cast<void*>(consumer), key.c_str());
    279   PERFETTO_DCHECK(consumers_.count(consumer));
    280 
    281   if (consumer->tracing_session_id_) {
    282     PERFETTO_ELOG(
    283         "Cannot reattach consumer to session %s"
    284         " while it already attached tracing session ID %" PRIu64,
    285         key.c_str(), consumer->tracing_session_id_);
    286     return false;
    287   }
    288 
    289   auto* tracing_session = GetDetachedSession(consumer->uid_, key);
    290   if (!tracing_session) {
    291     PERFETTO_ELOG(
    292         "Failed to attach consumer, session '%s' not found for uid %d",
    293         key.c_str(), static_cast<int>(consumer->uid_));
    294     return false;
    295   }
    296 
    297   consumer->tracing_session_id_ = tracing_session->id;
    298   tracing_session->consumer_maybe_null = consumer;
    299   tracing_session->detach_key.clear();
    300   return true;
    301 }
    302 
    303 bool TracingServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
    304                                        const TraceConfig& cfg,
    305                                        base::ScopedFile fd) {
    306   PERFETTO_DCHECK_THREAD(thread_checker_);
    307   PERFETTO_DLOG("Enabling tracing for consumer %p",
    308                 reinterpret_cast<void*>(consumer));
    309   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_SET)
    310     lockdown_mode_ = true;
    311   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR)
    312     lockdown_mode_ = false;
    313   TracingSession* tracing_session =
    314       GetTracingSession(consumer->tracing_session_id_);
    315   if (tracing_session) {
    316     PERFETTO_DLOG(
    317         "A Consumer is trying to EnableTracing() but another tracing session "
    318         "is already active (forgot a call to FreeBuffers() ?)");
    319     return false;
    320   }
    321 
    322   const uint32_t max_duration_ms = cfg.enable_extra_guardrails()
    323                                        ? kGuardrailsMaxTracingDurationMillis
    324                                        : kMaxTracingDurationMillis;
    325   if (cfg.duration_ms() > max_duration_ms) {
    326     PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu32 " ms)",
    327                   cfg.duration_ms(), max_duration_ms);
    328     return false;
    329   }
    330 
    331   const bool has_trigger_config = cfg.trigger_config().trigger_mode() !=
    332                                   TraceConfig::TriggerConfig::UNSPECIFIED;
    333   if (has_trigger_config && (cfg.trigger_config().trigger_timeout_ms() == 0 ||
    334                              cfg.trigger_config().trigger_timeout_ms() >
    335                                  kGuardrailsMaxTracingDurationMillis)) {
    336     PERFETTO_ELOG(
    337         "Traces with START_TRACING triggers must provide a positive "
    338         "trigger_timeout_ms < 7 days (received %" PRIu32 "ms)",
    339         cfg.trigger_config().trigger_timeout_ms());
    340     return false;
    341   }
    342 
    343   if (has_trigger_config && cfg.duration_ms() != 0) {
    344     PERFETTO_ELOG(
    345         "duration_ms was set, this must not be set for traces with triggers.");
    346     return false;
    347   }
    348 
    349   std::unordered_set<std::string> triggers;
    350   for (const auto& trigger : cfg.trigger_config().triggers()) {
    351     if (!triggers.insert(trigger.name()).second) {
    352       PERFETTO_ELOG("Duplicate trigger name: %s", trigger.name().c_str());
    353       return false;
    354     }
    355   }
    356 
    357   if (cfg.enable_extra_guardrails()) {
    358     if (cfg.deferred_start()) {
    359       PERFETTO_ELOG(
    360           "deferred_start=true is not supported in unsupervised traces");
    361       return false;
    362     }
    363     uint64_t buf_size_sum = 0;
    364     for (const auto& buf : cfg.buffers())
    365       buf_size_sum += buf.size_kb();
    366     if (buf_size_sum > kGuardrailsMaxTracingBufferSizeKb) {
    367       PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
    368                     "kB  > %" PRIu32 " kB)",
    369                     buf_size_sum, kGuardrailsMaxTracingBufferSizeKb);
    370       return false;
    371     }
    372   }
    373 
    374   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
    375     PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size());
    376     return false;
    377   }
    378 
    379   if (!cfg.unique_session_name().empty()) {
    380     const std::string& name = cfg.unique_session_name();
    381     for (auto& kv : tracing_sessions_) {
    382       if (kv.second.config.unique_session_name() == name) {
    383         PERFETTO_ELOG(
    384             "A trace wtih this unique session name (%s) already exists",
    385             name.c_str());
    386         return false;
    387       }
    388     }
    389   }
    390 
    391   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
    392   // in a state where it stalls by design by having more TraceWriterImpl
    393   // instances than free pages in the buffer. This is really a bug in
    394   // trace_probes and the way it handles stalls in the shmem buffer.
    395   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
    396     PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
    397                   tracing_sessions_.size());
    398     return false;
    399   }
    400 
    401   const TracingSessionID tsid = ++last_tracing_session_id_;
    402   tracing_session =
    403       &tracing_sessions_.emplace(tsid, TracingSession(tsid, consumer, cfg))
    404            .first->second;
    405 
    406   if (cfg.write_into_file()) {
    407     if (!fd) {
    408       PERFETTO_ELOG(
    409           "The TraceConfig had write_into_file==true but no fd was passed");
    410       tracing_sessions_.erase(tsid);
    411       return false;
    412     }
    413     tracing_session->write_into_file = std::move(fd);
    414     uint32_t write_period_ms = cfg.file_write_period_ms();
    415     if (write_period_ms == 0)
    416       write_period_ms = kDefaultWriteIntoFilePeriodMs;
    417     if (write_period_ms < min_write_period_ms_)
    418       write_period_ms = min_write_period_ms_;
    419     tracing_session->write_period_ms = write_period_ms;
    420     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
    421     tracing_session->bytes_written_into_file = 0;
    422   }
    423 
    424   // Initialize the log buffers.
    425   bool did_allocate_all_buffers = true;
    426 
    427   // Allocate the trace buffers. Also create a map to translate a consumer
    428   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
    429   // corresponding BufferID, which is a global ID namespace for the service and
    430   // all producers.
    431   size_t total_buf_size_kb = 0;
    432   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
    433   tracing_session->buffers_index.reserve(num_buffers);
    434   for (size_t i = 0; i < num_buffers; i++) {
    435     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
    436     BufferID global_id = buffer_ids_.Allocate();
    437     if (!global_id) {
    438       did_allocate_all_buffers = false;  // We ran out of IDs.
    439       break;
    440     }
    441     tracing_session->buffers_index.push_back(global_id);
    442     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
    443     total_buf_size_kb += buffer_cfg.size_kb();
    444     TraceBuffer::OverwritePolicy policy =
    445         buffer_cfg.fill_policy() == TraceConfig::BufferConfig::DISCARD
    446             ? TraceBuffer::kDiscard
    447             : TraceBuffer::kOverwrite;
    448     auto it_and_inserted = buffers_.emplace(
    449         global_id, TraceBuffer::Create(buf_size_bytes, policy));
    450     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
    451     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
    452     if (!trace_buffer) {
    453       did_allocate_all_buffers = false;
    454       break;
    455     }
    456   }
    457 
    458   UpdateMemoryGuardrail();
    459 
    460   // This can happen if either:
    461   // - All the kMaxTraceBufferID slots are taken.
    462   // - OOM, or, more relistically, we exhausted virtual memory.
    463   // In any case, free all the previously allocated buffers and abort.
    464   // TODO(fmayer): add a test to cover this case, this is quite subtle.
    465   if (!did_allocate_all_buffers) {
    466     for (BufferID global_id : tracing_session->buffers_index) {
    467       buffer_ids_.Free(global_id);
    468       buffers_.erase(global_id);
    469     }
    470     tracing_sessions_.erase(tsid);
    471     return false;
    472   }
    473 
    474   consumer->tracing_session_id_ = tsid;
    475 
    476   // Setup the data sources on the producers without starting them.
    477   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
    478     // Scan all the registered data sources with a matching name.
    479     auto range = data_sources_.equal_range(cfg_data_source.config().name());
    480     for (auto it = range.first; it != range.second; it++) {
    481       TraceConfig::ProducerConfig producer_config;
    482       for (auto& config : cfg.producers()) {
    483         if (GetProducer(it->second.producer_id)->name_ ==
    484             config.producer_name()) {
    485           producer_config = config;
    486           break;
    487         }
    488       }
    489       SetupDataSource(cfg_data_source, producer_config, it->second,
    490                       tracing_session);
    491     }
    492   }
    493 
    494   bool has_start_trigger = false;
    495   auto weak_this = weak_ptr_factory_.GetWeakPtr();
    496   switch (cfg.trigger_config().trigger_mode()) {
    497     case TraceConfig::TriggerConfig::UNSPECIFIED:
    498       // no triggers are specified so this isn't a trace that is using triggers.
    499       PERFETTO_DCHECK(!has_trigger_config);
    500       break;
    501     case TraceConfig::TriggerConfig::START_TRACING:
    502       // For traces which use START_TRACE triggers we need to ensure that the
    503       // tracing session will be cleaned up when it times out.
    504       has_start_trigger = true;
    505       task_runner_->PostDelayedTask(
    506           [weak_this, tsid]() {
    507             if (weak_this)
    508               weak_this->OnStartTriggersTimeout(tsid);
    509           },
    510           cfg.trigger_config().trigger_timeout_ms());
    511       break;
    512     case TraceConfig::TriggerConfig::STOP_TRACING:
    513       // Update the tracing_session's duration_ms to ensure that if no trigger
    514       // is received the session will end and be cleaned up equal to the
    515       // timeout.
    516       //
    517       // TODO(nuskos): Refactor this so that rather then modifying the config we
    518       // have a field we look at on the tracing_session.
    519       tracing_session->config.set_duration_ms(
    520           cfg.trigger_config().trigger_timeout_ms());
    521       break;
    522   }
    523 
    524   tracing_session->state = TracingSession::CONFIGURED;
    525   PERFETTO_LOG(
    526       "Configured tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
    527       "buffer size:%zu KB, total sessions:%zu",
    528       cfg.data_sources().size(), tracing_session->config.duration_ms(),
    529       cfg.buffers_size(), total_buf_size_kb, tracing_sessions_.size());
    530 
    531   // Start the data sources, unless this is a case of early setup + fast
    532   // triggering, either through TraceConfig.deferred_start or
    533   // TraceConfig.trigger_config(). If both are specified which ever one occurs
    534   // first will initiate the trace.
    535   if (!cfg.deferred_start() && !has_start_trigger)
    536     return StartTracing(tsid);
    537 
    538   return true;
    539 }
    540 
    541 void TracingServiceImpl::ChangeTraceConfig(ConsumerEndpointImpl* consumer,
    542                                            const TraceConfig& updated_cfg) {
    543   PERFETTO_DCHECK_THREAD(thread_checker_);
    544   TracingSession* tracing_session =
    545       GetTracingSession(consumer->tracing_session_id_);
    546   PERFETTO_DCHECK(tracing_session);
    547 
    548   if ((tracing_session->state != TracingSession::STARTED) &&
    549       (tracing_session->state != TracingSession::CONFIGURED)) {
    550     PERFETTO_ELOG(
    551         "ChangeTraceConfig() was called for a tracing session which isn't "
    552         "running.");
    553     return;
    554   }
    555 
    556   // We only support updating producer_name_filter (and pass-through configs)
    557   // for now; null out any changeable fields and make sure the rest are
    558   // identical.
    559   TraceConfig new_config_copy(updated_cfg);
    560   for (auto& ds_cfg : *new_config_copy.mutable_data_sources()) {
    561     ds_cfg.clear_producer_name_filter();
    562   }
    563 
    564   TraceConfig current_config_copy(tracing_session->config);
    565   for (auto& ds_cfg : *current_config_copy.mutable_data_sources())
    566     ds_cfg.clear_producer_name_filter();
    567 
    568   if (new_config_copy != current_config_copy) {
    569     PERFETTO_LOG(
    570         "ChangeTraceConfig() was called with a config containing unsupported "
    571         "changes; only adding to the producer_name_filter is currently "
    572         "supported and will have an effect.");
    573   }
    574 
    575   for (TraceConfig::DataSource& cfg_data_source :
    576        *tracing_session->config.mutable_data_sources()) {
    577     // Find the updated producer_filter in the new config.
    578     std::vector<std::string> new_producer_name_filter;
    579     bool found_data_source = false;
    580     for (auto it : updated_cfg.data_sources()) {
    581       if (cfg_data_source.config().name() == it.config().name()) {
    582         new_producer_name_filter = it.producer_name_filter();
    583         found_data_source = true;
    584         break;
    585       }
    586     }
    587 
    588     // Bail out if data source not present in the new config.
    589     if (!found_data_source) {
    590       PERFETTO_ELOG(
    591           "ChangeTraceConfig() called without a current data source also "
    592           "present in the new "
    593           "config: %s",
    594           cfg_data_source.config().name().c_str());
    595       continue;
    596     }
    597 
    598     // TODO(oysteine): Just replacing the filter means that if
    599     // there are any filter entries which were present in the original config,
    600     // but removed from the config passed to ChangeTraceConfig, any matching
    601     // producers will keep producing but newly added producers after this
    602     // point will never start.
    603     *cfg_data_source.mutable_producer_name_filter() = new_producer_name_filter;
    604 
    605     // Scan all the registered data sources with a matching name.
    606     auto range = data_sources_.equal_range(cfg_data_source.config().name());
    607     for (auto it = range.first; it != range.second; it++) {
    608       ProducerEndpointImpl* producer = GetProducer(it->second.producer_id);
    609       PERFETTO_DCHECK(producer);
    610 
    611       // Check if the producer name of this data source is present
    612       // in the name filter. We currently only support new filters, not removing
    613       // old ones.
    614       if (!new_producer_name_filter.empty() &&
    615           std::find(new_producer_name_filter.begin(),
    616                     new_producer_name_filter.end(),
    617                     producer->name_) == new_producer_name_filter.end()) {
    618         continue;
    619       }
    620 
    621       bool already_setup = false;
    622       auto& ds_instances = tracing_session->data_source_instances;
    623       for (auto instance_it = ds_instances.begin();
    624            instance_it != ds_instances.end(); ++instance_it) {
    625         if (instance_it->first == it->second.producer_id &&
    626             instance_it->second.data_source_name ==
    627                 cfg_data_source.config().name()) {
    628           already_setup = true;
    629           break;
    630         }
    631       }
    632 
    633       if (already_setup)
    634         continue;
    635 
    636       // If it wasn't previously setup, set it up now.
    637       // (The per-producer config is optional).
    638       TraceConfig::ProducerConfig producer_config;
    639       for (auto& config : tracing_session->config.producers()) {
    640         if (producer->name_ == config.producer_name()) {
    641           producer_config = config;
    642           break;
    643         }
    644       }
    645 
    646       DataSourceInstance* ds_inst = SetupDataSource(
    647           cfg_data_source, producer_config, it->second, tracing_session);
    648 
    649       if (ds_inst && tracing_session->state == TracingSession::STARTED)
    650         StartDataSourceInstance(producer, tracing_session, ds_inst);
    651     }
    652   }
    653 }
    654 
    655 bool TracingServiceImpl::StartTracing(TracingSessionID tsid) {
    656   PERFETTO_DCHECK_THREAD(thread_checker_);
    657   TracingSession* tracing_session = GetTracingSession(tsid);
    658   if (!tracing_session) {
    659     PERFETTO_DLOG("StartTracing() failed, invalid session ID %" PRIu64, tsid);
    660     return false;
    661   }
    662 
    663   if (tracing_session->state != TracingSession::CONFIGURED) {
    664     PERFETTO_DLOG("StartTracing() failed, invalid session state: %d",
    665                   tracing_session->state);
    666     return false;
    667   }
    668 
    669   tracing_session->state = TracingSession::STARTED;
    670 
    671   if (!tracing_session->config.builtin_data_sources()
    672            .disable_clock_snapshotting()) {
    673     SnapshotClocks(&tracing_session->initial_clock_snapshot_,
    674                    /*set_root_timestamp=*/true);
    675   }
    676 
    677   // Trigger delayed task if the trace is time limited.
    678   const uint32_t trace_duration_ms = tracing_session->config.duration_ms();
    679   if (trace_duration_ms > 0) {
    680     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    681     task_runner_->PostDelayedTask(
    682         [weak_this, tsid] {
    683           // Skip entirely the flush if the trace session doesn't exist anymore.
    684           // This is to prevent misleading error messages to be logged.
    685           if (!weak_this)
    686             return;
    687           auto* tracing_session_ptr = weak_this->GetTracingSession(tsid);
    688           if (!tracing_session_ptr)
    689             return;
    690           // If this trace was using STOP_TRACING triggers and we've seen
    691           // one, then the trigger overrides the normal timeout. In this
    692           // case we just return and let the other task clean up this trace.
    693           if (tracing_session_ptr->config.trigger_config().trigger_mode() ==
    694                   TraceConfig::TriggerConfig::STOP_TRACING &&
    695               !tracing_session_ptr->received_triggers.empty())
    696             return;
    697           // In all other cases (START_TRACING or no triggers) we flush
    698           // after |trace_duration_ms| unconditionally.
    699           weak_this->FlushAndDisableTracing(tsid);
    700         },
    701         trace_duration_ms);
    702   }
    703 
    704   // Start the periodic drain tasks if we should to save the trace into a file.
    705   if (tracing_session->config.write_into_file()) {
    706     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    707     task_runner_->PostDelayedTask(
    708         [weak_this, tsid] {
    709           if (weak_this)
    710             weak_this->ReadBuffers(tsid, nullptr);
    711         },
    712         tracing_session->delay_to_next_write_period_ms());
    713   }
    714 
    715   // Start the periodic flush tasks if the config specified a flush period.
    716   if (tracing_session->config.flush_period_ms())
    717     PeriodicFlushTask(tsid, /*post_next_only=*/true);
    718 
    719   // Start the periodic incremental state clear tasks if the config specified a
    720   // period.
    721   if (tracing_session->config.incremental_state_config().clear_period_ms()) {
    722     PeriodicClearIncrementalStateTask(tsid, /*post_next_only=*/true);
    723   }
    724 
    725   for (auto& kv : tracing_session->data_source_instances) {
    726     ProducerID producer_id = kv.first;
    727     DataSourceInstance& data_source = kv.second;
    728     ProducerEndpointImpl* producer = GetProducer(producer_id);
    729     if (!producer) {
    730       PERFETTO_DFATAL("Producer does not exist.");
    731       continue;
    732     }
    733     StartDataSourceInstance(producer, tracing_session, &data_source);
    734   }
    735   return true;
    736 }
    737 
    738 void TracingServiceImpl::StartDataSourceInstance(
    739     ProducerEndpointImpl* producer,
    740     TracingSession* tracing_session,
    741     TracingServiceImpl::DataSourceInstance* instance) {
    742   PERFETTO_DCHECK(instance->state == DataSourceInstance::CONFIGURED);
    743   if (instance->will_notify_on_start) {
    744     instance->state = DataSourceInstance::STARTING;
    745   } else {
    746     instance->state = DataSourceInstance::STARTED;
    747   }
    748   if (tracing_session->consumer_maybe_null) {
    749     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
    750         *producer, *instance);
    751   }
    752   producer->StartDataSource(instance->instance_id, instance->config);
    753 }
    754 
    755 // DisableTracing just stops the data sources but doesn't free up any buffer.
    756 // This is to allow the consumer to freeze the buffers (by stopping the trace)
    757 // and then drain the buffers. The actual teardown of the TracingSession happens
    758 // in FreeBuffers().
    759 void TracingServiceImpl::DisableTracing(TracingSessionID tsid,
    760                                         bool disable_immediately) {
    761   PERFETTO_DCHECK_THREAD(thread_checker_);
    762   TracingSession* tracing_session = GetTracingSession(tsid);
    763   if (!tracing_session) {
    764     // Can happen if the consumer calls this before EnableTracing() or after
    765     // FreeBuffers().
    766     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
    767     return;
    768   }
    769 
    770   switch (tracing_session->state) {
    771     // Spurious call to DisableTracing() while already disabled, nothing to do.
    772     case TracingSession::DISABLED:
    773       PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
    774       return;
    775 
    776     // This is either:
    777     // A) The case of a graceful DisableTracing() call followed by a call to
    778     //    FreeBuffers(), iff |disable_immediately| == true. In this case we want
    779     //    to forcefully transition in the disabled state without waiting for the
    780     //    outstanding acks because the buffers are going to be destroyed soon.
    781     // B) A spurious call, iff |disable_immediately| == false, in which case
    782     //    there is nothing to do.
    783     case TracingSession::DISABLING_WAITING_STOP_ACKS:
    784       PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
    785       if (disable_immediately)
    786         DisableTracingNotifyConsumerAndFlushFile(tracing_session);
    787       return;
    788 
    789     // Continues below.
    790     case TracingSession::CONFIGURED:
    791       // If the session didn't even start there is no need to orchestrate a
    792       // graceful stop of data sources.
    793       disable_immediately = true;
    794       break;
    795 
    796     // This is the nominal case, continues below.
    797     case TracingSession::STARTED:
    798       break;
    799   }
    800 
    801   for (auto& data_source_inst : tracing_session->data_source_instances) {
    802     const ProducerID producer_id = data_source_inst.first;
    803     DataSourceInstance& instance = data_source_inst.second;
    804     const DataSourceInstanceID ds_inst_id = instance.instance_id;
    805     ProducerEndpointImpl* producer = GetProducer(producer_id);
    806     PERFETTO_DCHECK(producer);
    807     PERFETTO_DCHECK(instance.state == DataSourceInstance::CONFIGURED ||
    808                     instance.state == DataSourceInstance::STARTING ||
    809                     instance.state == DataSourceInstance::STARTED);
    810     if (instance.will_notify_on_stop && !disable_immediately) {
    811       instance.state = DataSourceInstance::STOPPING;
    812     } else {
    813       instance.state = DataSourceInstance::STOPPED;
    814     }
    815     if (tracing_session->consumer_maybe_null) {
    816       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
    817           *producer, instance);
    818     }
    819     producer->StopDataSource(ds_inst_id);
    820   }
    821 
    822   // Either this request is flagged with |disable_immediately| or there are no
    823   // data sources that are requesting a final handshake. In both cases just mark
    824   // the session as disabled immediately, notify the consumer and flush the
    825   // trace file (if used).
    826   if (tracing_session->AllDataSourceInstancesStopped())
    827     return DisableTracingNotifyConsumerAndFlushFile(tracing_session);
    828 
    829   tracing_session->state = TracingSession::DISABLING_WAITING_STOP_ACKS;
    830   auto weak_this = weak_ptr_factory_.GetWeakPtr();
    831   auto timeout_ms = override_data_source_test_timeout_ms_for_testing
    832                         ? override_data_source_test_timeout_ms_for_testing
    833                         : kDataSourceStopTimeoutMs;
    834   task_runner_->PostDelayedTask(
    835       [weak_this, tsid] {
    836         if (weak_this)
    837           weak_this->OnDisableTracingTimeout(tsid);
    838       },
    839       timeout_ms);
    840 
    841   // Deliberately NOT removing the session from |tracing_session_|, it's still
    842   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
    843 }
    844 
    845 void TracingServiceImpl::NotifyDataSourceStarted(
    846     ProducerID producer_id,
    847     DataSourceInstanceID instance_id) {
    848   PERFETTO_DCHECK_THREAD(thread_checker_);
    849   for (auto& kv : tracing_sessions_) {
    850     TracingSession& tracing_session = kv.second;
    851     DataSourceInstance* instance =
    852         tracing_session.GetDataSourceInstance(producer_id, instance_id);
    853 
    854     if (!instance)
    855       continue;
    856 
    857     if (instance->state != DataSourceInstance::STARTING) {
    858       PERFETTO_ELOG("Data source instance in incorrect state.");
    859       continue;
    860     }
    861 
    862     instance->state = DataSourceInstance::STARTED;
    863 
    864     ProducerEndpointImpl* producer = GetProducer(producer_id);
    865     PERFETTO_DCHECK(producer);
    866     if (tracing_session.consumer_maybe_null) {
    867       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
    868           *producer, *instance);
    869     }
    870   }  // for (tracing_session)
    871 }
    872 
    873 void TracingServiceImpl::NotifyDataSourceStopped(
    874     ProducerID producer_id,
    875     DataSourceInstanceID instance_id) {
    876   PERFETTO_DCHECK_THREAD(thread_checker_);
    877   for (auto& kv : tracing_sessions_) {
    878     TracingSession& tracing_session = kv.second;
    879     DataSourceInstance* instance =
    880         tracing_session.GetDataSourceInstance(producer_id, instance_id);
    881 
    882     if (!instance)
    883       continue;
    884 
    885     if (instance->state != DataSourceInstance::STOPPING) {
    886       PERFETTO_ELOG("Data source instance in incorrect state.");
    887       continue;
    888     }
    889     PERFETTO_DCHECK(tracing_session.state ==
    890                     TracingSession::DISABLING_WAITING_STOP_ACKS);
    891 
    892     instance->state = DataSourceInstance::STOPPED;
    893 
    894     ProducerEndpointImpl* producer = GetProducer(producer_id);
    895     PERFETTO_DCHECK(producer);
    896     if (tracing_session.consumer_maybe_null) {
    897       tracing_session.consumer_maybe_null->OnDataSourceInstanceStateChange(
    898           *producer, *instance);
    899     }
    900 
    901     if (!tracing_session.AllDataSourceInstancesStopped())
    902       continue;
    903 
    904     // All data sources acked the termination.
    905     DisableTracingNotifyConsumerAndFlushFile(&tracing_session);
    906   }  // for (tracing_session)
    907 }
    908 
    909 void TracingServiceImpl::ActivateTriggers(
    910     ProducerID producer_id,
    911     const std::vector<std::string>& triggers) {
    912   PERFETTO_DCHECK_THREAD(thread_checker_);
    913   auto* producer = GetProducer(producer_id);
    914   PERFETTO_DCHECK(producer);
    915   for (const auto& trigger_name : triggers) {
    916     for (auto& id_and_tracing_session : tracing_sessions_) {
    917       auto& tracing_session = id_and_tracing_session.second;
    918       TracingSessionID tsid = id_and_tracing_session.first;
    919       auto iter = std::find_if(
    920           tracing_session.config.trigger_config().triggers().begin(),
    921           tracing_session.config.trigger_config().triggers().end(),
    922           [&trigger_name](const TraceConfig::TriggerConfig::Trigger& trigger) {
    923             return trigger.name() == trigger_name;
    924           });
    925       if (iter == tracing_session.config.trigger_config().triggers().end()) {
    926         continue;
    927       }
    928 
    929       // If this trigger requires a certain producer to have sent it
    930       // (non-empty producer_name()) ensure the producer who sent this trigger
    931       // matches.
    932       if (!iter->producer_name_regex().empty() &&
    933           !std::regex_match(producer->name_,
    934                             std::regex(iter->producer_name_regex()))) {
    935         continue;
    936       }
    937 
    938       const bool triggers_already_received =
    939           !tracing_session.received_triggers.empty();
    940       tracing_session.received_triggers.push_back(
    941           {static_cast<uint64_t>(base::GetBootTimeNs().count()), iter->name(),
    942            producer->name_, producer->uid_});
    943       auto weak_this = weak_ptr_factory_.GetWeakPtr();
    944       switch (tracing_session.config.trigger_config().trigger_mode()) {
    945         case TraceConfig::TriggerConfig::START_TRACING:
    946           // If the session has already been triggered and moved past
    947           // CONFIGURED then we don't need to repeat StartTracing. This would
    948           // work fine (StartTracing would return false) but would add error
    949           // logs.
    950           if (tracing_session.state != TracingSession::CONFIGURED)
    951             break;
    952 
    953           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
    954                         " with duration of %" PRIu32 "ms.",
    955                         iter->name().c_str(), tsid, iter->stop_delay_ms());
    956           // We override the trace duration to be the trigger's requested
    957           // value, this ensures that the trace will end after this amount
    958           // of time has passed.
    959           tracing_session.config.set_duration_ms(iter->stop_delay_ms());
    960           StartTracing(tsid);
    961           break;
    962         case TraceConfig::TriggerConfig::STOP_TRACING:
    963           // Only stop the trace once to avoid confusing log messages. I.E.
    964           // when we've already hit the first trigger we've already Posted the
    965           // task to FlushAndDisable. So all future triggers will just break
    966           // out.
    967           if (triggers_already_received)
    968             break;
    969 
    970           PERFETTO_DLOG("Triggering '%s' on tracing session %" PRIu64
    971                         " with duration of %" PRIu32 "ms.",
    972                         iter->name().c_str(), tsid, iter->stop_delay_ms());
    973           // Now that we've seen a trigger we need to stop, flush, and disable
    974           // this session after the configured |stop_delay_ms|.
    975           task_runner_->PostDelayedTask(
    976               [weak_this, tsid] {
    977                 // Skip entirely the flush if the trace session doesn't exist
    978                 // anymore. This is to prevent misleading error messages to be
    979                 // logged.
    980                 if (weak_this && weak_this->GetTracingSession(tsid))
    981                   weak_this->FlushAndDisableTracing(tsid);
    982               },
    983               // If this trigger is zero this will immediately executable and
    984               // will happen shortly.
    985               iter->stop_delay_ms());
    986           break;
    987         case TraceConfig::TriggerConfig::UNSPECIFIED:
    988           PERFETTO_ELOG("Trigger activated but trigger mode unspecified.");
    989           break;
    990       }
    991     }
    992   }
    993 }
    994 
    995 // Always invoked kDataSourceStopTimeoutMs after DisableTracing(). In nominal
    996 // conditions all data sources should have acked the stop and this will early
    997 // out.
    998 void TracingServiceImpl::OnDisableTracingTimeout(TracingSessionID tsid) {
    999   PERFETTO_DCHECK_THREAD(thread_checker_);
   1000   TracingSession* tracing_session = GetTracingSession(tsid);
   1001   if (!tracing_session ||
   1002       tracing_session->state != TracingSession::DISABLING_WAITING_STOP_ACKS) {
   1003     return;  // Tracing session was successfully disabled.
   1004   }
   1005 
   1006   PERFETTO_ILOG("Timeout while waiting for ACKs for tracing session %" PRIu64,
   1007                 tsid);
   1008   PERFETTO_DCHECK(!tracing_session->AllDataSourceInstancesStopped());
   1009   DisableTracingNotifyConsumerAndFlushFile(tracing_session);
   1010 }
   1011 
   1012 void TracingServiceImpl::DisableTracingNotifyConsumerAndFlushFile(
   1013     TracingSession* tracing_session) {
   1014   PERFETTO_DCHECK(tracing_session->state != TracingSession::DISABLED);
   1015   for (auto& inst_kv : tracing_session->data_source_instances) {
   1016     if (inst_kv.second.state == DataSourceInstance::STOPPED)
   1017       continue;
   1018     inst_kv.second.state = DataSourceInstance::STOPPED;
   1019     ProducerEndpointImpl* producer = GetProducer(inst_kv.first);
   1020     PERFETTO_DCHECK(producer);
   1021     if (tracing_session->consumer_maybe_null) {
   1022       tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
   1023           *producer, inst_kv.second);
   1024     }
   1025   }
   1026   tracing_session->state = TracingSession::DISABLED;
   1027 
   1028   // Scrape any remaining chunks that weren't flushed by the producers.
   1029   for (auto& producer_id_and_producer : producers_)
   1030     ScrapeSharedMemoryBuffers(tracing_session, producer_id_and_producer.second);
   1031 
   1032   if (tracing_session->write_into_file) {
   1033     tracing_session->write_period_ms = 0;
   1034     ReadBuffers(tracing_session->id, nullptr);
   1035   }
   1036 
   1037   if (tracing_session->consumer_maybe_null)
   1038     tracing_session->consumer_maybe_null->NotifyOnTracingDisabled();
   1039 }
   1040 
   1041 void TracingServiceImpl::Flush(TracingSessionID tsid,
   1042                                uint32_t timeout_ms,
   1043                                ConsumerEndpoint::FlushCallback callback) {
   1044   PERFETTO_DCHECK_THREAD(thread_checker_);
   1045   TracingSession* tracing_session = GetTracingSession(tsid);
   1046   if (!tracing_session) {
   1047     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
   1048     return;
   1049   }
   1050 
   1051   if (!timeout_ms)
   1052     timeout_ms = tracing_session->flush_timeout_ms();
   1053 
   1054   if (tracing_session->pending_flushes.size() > 1000) {
   1055     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
   1056                   tracing_session->pending_flushes.size());
   1057     callback(false);
   1058     return;
   1059   }
   1060 
   1061   FlushRequestID flush_request_id = ++last_flush_request_id_;
   1062   PendingFlush& pending_flush =
   1063       tracing_session->pending_flushes
   1064           .emplace_hint(tracing_session->pending_flushes.end(),
   1065                         flush_request_id, PendingFlush(std::move(callback)))
   1066           ->second;
   1067 
   1068   // Send a flush request to each producer involved in the tracing session. In
   1069   // order to issue a flush request we have to build a map of all data source
   1070   // instance ids enabled for each producer.
   1071   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
   1072   for (const auto& data_source_inst : tracing_session->data_source_instances) {
   1073     const ProducerID producer_id = data_source_inst.first;
   1074     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
   1075     flush_map[producer_id].push_back(ds_inst_id);
   1076   }
   1077 
   1078   for (const auto& kv : flush_map) {
   1079     ProducerID producer_id = kv.first;
   1080     ProducerEndpointImpl* producer = GetProducer(producer_id);
   1081     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
   1082     producer->Flush(flush_request_id, data_sources);
   1083     pending_flush.producers.insert(producer_id);
   1084   }
   1085 
   1086   // If there are no producers to flush (realistically this happens only in
   1087   // some tests) fire OnFlushTimeout() straight away, without waiting.
   1088   if (flush_map.empty())
   1089     timeout_ms = 0;
   1090 
   1091   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1092   task_runner_->PostDelayedTask(
   1093       [weak_this, tsid, flush_request_id] {
   1094         if (weak_this)
   1095           weak_this->OnFlushTimeout(tsid, flush_request_id);
   1096       },
   1097       timeout_ms);
   1098 }
   1099 
   1100 void TracingServiceImpl::NotifyFlushDoneForProducer(
   1101     ProducerID producer_id,
   1102     FlushRequestID flush_request_id) {
   1103   for (auto& kv : tracing_sessions_) {
   1104     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
   1105     auto& pending_flushes = kv.second.pending_flushes;
   1106     auto end_it = pending_flushes.upper_bound(flush_request_id);
   1107     for (auto it = pending_flushes.begin(); it != end_it;) {
   1108       PendingFlush& pending_flush = it->second;
   1109       pending_flush.producers.erase(producer_id);
   1110       if (pending_flush.producers.empty()) {
   1111         auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1112         TracingSessionID tsid = kv.first;
   1113         auto callback = std::move(pending_flush.callback);
   1114         task_runner_->PostTask([weak_this, tsid, callback]() {
   1115           if (weak_this) {
   1116             weak_this->CompleteFlush(tsid, std::move(callback),
   1117                                      /*success=*/true);
   1118           }
   1119         });
   1120         it = pending_flushes.erase(it);
   1121       } else {
   1122         it++;
   1123       }
   1124     }  // for (pending_flushes)
   1125   }    // for (tracing_session)
   1126 }
   1127 
   1128 void TracingServiceImpl::OnFlushTimeout(TracingSessionID tsid,
   1129                                         FlushRequestID flush_request_id) {
   1130   TracingSession* tracing_session = GetTracingSession(tsid);
   1131   if (!tracing_session)
   1132     return;
   1133   auto it = tracing_session->pending_flushes.find(flush_request_id);
   1134   if (it == tracing_session->pending_flushes.end())
   1135     return;  // Nominal case: flush was completed and acked on time.
   1136 
   1137   // If there were no producers to flush, consider it a success.
   1138   bool success = it->second.producers.empty();
   1139 
   1140   auto callback = std::move(it->second.callback);
   1141   tracing_session->pending_flushes.erase(it);
   1142   CompleteFlush(tsid, std::move(callback), success);
   1143 }
   1144 
   1145 void TracingServiceImpl::CompleteFlush(TracingSessionID tsid,
   1146                                        ConsumerEndpoint::FlushCallback callback,
   1147                                        bool success) {
   1148   TracingSession* tracing_session = GetTracingSession(tsid);
   1149   if (tracing_session) {
   1150     // Producers may not have been able to flush all their data, even if they
   1151     // indicated flush completion. If possible, also collect uncommitted chunks
   1152     // to make sure we have everything they wrote so far.
   1153     for (auto& producer_id_and_producer : producers_) {
   1154       ScrapeSharedMemoryBuffers(tracing_session,
   1155                                 producer_id_and_producer.second);
   1156     }
   1157   }
   1158   callback(success);
   1159 }
   1160 
   1161 void TracingServiceImpl::ScrapeSharedMemoryBuffers(
   1162     TracingSession* tracing_session,
   1163     ProducerEndpointImpl* producer) {
   1164   if (!producer->smb_scraping_enabled_)
   1165     return;
   1166 
   1167   // Can't copy chunks if we don't know about any trace writers.
   1168   if (producer->writers_.empty())
   1169     return;
   1170 
   1171   // Performance optimization: On flush or session disconnect, this method is
   1172   // called for each producer. If the producer doesn't participate in the
   1173   // session, there's no need to scape its chunks right now. We can tell if a
   1174   // producer participates in the session by checking if the producer is allowed
   1175   // to write into the session's log buffers.
   1176   const auto& session_buffers = tracing_session->buffers_index;
   1177   bool producer_in_session =
   1178       std::any_of(session_buffers.begin(), session_buffers.end(),
   1179                   [producer](BufferID buffer_id) {
   1180                     return producer->allowed_target_buffers_.count(buffer_id);
   1181                   });
   1182   if (!producer_in_session)
   1183     return;
   1184 
   1185   PERFETTO_DLOG("Scraping SMB for producer %" PRIu16, producer->id_);
   1186 
   1187   // Find and copy any uncommitted chunks from the SMB.
   1188   //
   1189   // In nominal conditions, the page layout of the used SMB pages should never
   1190   // change because the service is the only one who is supposed to modify used
   1191   // pages (to make them free again).
   1192   //
   1193   // However, the code here needs to deal with the case of a malicious producer
   1194   // altering the SMB in unpredictable ways. Thankfully the SMB size is
   1195   // immutable, so a chunk will always point to some valid memory, even if the
   1196   // producer alters the intended layout and chunk header concurrently.
   1197   // Ultimately a malicious producer altering the SMB's chunk layout while we
   1198   // are iterating in this function is not any different from the case of a
   1199   // malicious producer asking to commit a chunk made of random data, which is
   1200   // something this class has to deal with regardless.
   1201   //
   1202   // The only legitimate mutations that can happen from sane producers,
   1203   // concurrently to this function, are:
   1204   //   A. free pages being partitioned,
   1205   //   B. free chunks being migrated to kChunkBeingWritten,
   1206   //   C. kChunkBeingWritten chunks being migrated to kChunkCompleted.
   1207 
   1208   SharedMemoryABI* abi = &producer->shmem_abi_;
   1209   // num_pages() is immutable after the SMB is initialized and cannot be changed
   1210   // even by a producer even if malicious.
   1211   for (size_t page_idx = 0; page_idx < abi->num_pages(); page_idx++) {
   1212     uint32_t layout = abi->GetPageLayout(page_idx);
   1213 
   1214     uint32_t used_chunks = abi->GetUsedChunks(layout);  // Returns a bitmap.
   1215     // Skip empty pages.
   1216     if (used_chunks == 0)
   1217       continue;
   1218 
   1219     // Scrape the chunks that are currently used. These should be either in
   1220     // state kChunkBeingWritten or kChunkComplete.
   1221     for (uint32_t chunk_idx = 0; used_chunks; chunk_idx++, used_chunks >>= 1) {
   1222       if (!(used_chunks & 1))
   1223         continue;
   1224 
   1225       SharedMemoryABI::ChunkState state =
   1226           SharedMemoryABI::GetChunkStateFromLayout(layout, chunk_idx);
   1227       PERFETTO_DCHECK(state == SharedMemoryABI::kChunkBeingWritten ||
   1228                       state == SharedMemoryABI::kChunkComplete);
   1229       bool chunk_complete = state == SharedMemoryABI::kChunkComplete;
   1230 
   1231       SharedMemoryABI::Chunk chunk =
   1232           abi->GetChunkUnchecked(page_idx, layout, chunk_idx);
   1233 
   1234       uint16_t packet_count;
   1235       uint8_t flags;
   1236       // GetPacketCountAndFlags has acquire_load semantics.
   1237       std::tie(packet_count, flags) = chunk.GetPacketCountAndFlags();
   1238 
   1239       // It only makes sense to copy an incomplete chunk if there's at least
   1240       // one full packet available. (The producer may not have completed the
   1241       // last packet in it yet, so we need at least 2.)
   1242       if (!chunk_complete && packet_count < 2)
   1243         continue;
   1244 
   1245       // At this point, it is safe to access the remaining header fields of
   1246       // the chunk. Even if the chunk was only just transferred from
   1247       // kChunkFree into kChunkBeingWritten state, the header should be
   1248       // written completely once the packet count increased above 1 (it was
   1249       // reset to 0 by the service when the chunk was freed).
   1250 
   1251       WriterID writer_id = chunk.writer_id();
   1252       base::Optional<BufferID> target_buffer_id =
   1253           producer->buffer_id_for_writer(writer_id);
   1254 
   1255       // We can only scrape this chunk if we know which log buffer to copy it
   1256       // into.
   1257       if (!target_buffer_id)
   1258         continue;
   1259 
   1260       // Skip chunks that don't belong to the requested tracing session.
   1261       bool target_buffer_belongs_to_session =
   1262           std::find(session_buffers.begin(), session_buffers.end(),
   1263                     *target_buffer_id) != session_buffers.end();
   1264       if (!target_buffer_belongs_to_session)
   1265         continue;
   1266 
   1267       uint32_t chunk_id =
   1268           chunk.header()->chunk_id.load(std::memory_order_relaxed);
   1269 
   1270       CopyProducerPageIntoLogBuffer(
   1271           producer->id_, producer->uid_, writer_id, chunk_id, *target_buffer_id,
   1272           packet_count, flags, chunk_complete, chunk.payload_begin(),
   1273           chunk.payload_size());
   1274     }
   1275   }
   1276 }
   1277 
   1278 void TracingServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
   1279   PERFETTO_DCHECK_THREAD(thread_checker_);
   1280   PERFETTO_DLOG("Triggering final flush for %" PRIu64, tsid);
   1281   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1282   Flush(tsid, 0, [weak_this, tsid](bool success) {
   1283     PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
   1284                   success, tsid);
   1285     if (!weak_this)
   1286       return;
   1287     TracingSession* session = weak_this->GetTracingSession(tsid);
   1288     if (session->consumer_maybe_null) {
   1289       // If the consumer is still attached, just disable the session but give it
   1290       // a chance to read the contents.
   1291       weak_this->DisableTracing(tsid);
   1292     } else {
   1293       // If the consumer detached, destroy the session. If the consumer did
   1294       // start the session in long-tracing mode, the service will have saved
   1295       // the contents to the passed file. If not, the contents will be
   1296       // destroyed.
   1297       weak_this->FreeBuffers(tsid);
   1298     }
   1299   });
   1300 }
   1301 
   1302 void TracingServiceImpl::PeriodicFlushTask(TracingSessionID tsid,
   1303                                            bool post_next_only) {
   1304   PERFETTO_DCHECK_THREAD(thread_checker_);
   1305   TracingSession* tracing_session = GetTracingSession(tsid);
   1306   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
   1307     return;
   1308 
   1309   uint32_t flush_period_ms = tracing_session->config.flush_period_ms();
   1310   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1311   task_runner_->PostDelayedTask(
   1312       [weak_this, tsid] {
   1313         if (weak_this)
   1314           weak_this->PeriodicFlushTask(tsid, /*post_next_only=*/false);
   1315       },
   1316       flush_period_ms - (base::GetWallTimeMs().count() % flush_period_ms));
   1317 
   1318   if (post_next_only)
   1319     return;
   1320 
   1321   PERFETTO_DLOG("Triggering periodic flush for trace session %" PRIu64, tsid);
   1322   Flush(tsid, 0, [](bool success) {
   1323     if (!success)
   1324       PERFETTO_ELOG("Periodic flush timed out");
   1325   });
   1326 }
   1327 
   1328 void TracingServiceImpl::PeriodicClearIncrementalStateTask(
   1329     TracingSessionID tsid,
   1330     bool post_next_only) {
   1331   PERFETTO_DCHECK_THREAD(thread_checker_);
   1332   TracingSession* tracing_session = GetTracingSession(tsid);
   1333   if (!tracing_session || tracing_session->state != TracingSession::STARTED)
   1334     return;
   1335 
   1336   uint32_t clear_period_ms =
   1337       tracing_session->config.incremental_state_config().clear_period_ms();
   1338   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1339   task_runner_->PostDelayedTask(
   1340       [weak_this, tsid] {
   1341         if (weak_this)
   1342           weak_this->PeriodicClearIncrementalStateTask(
   1343               tsid, /*post_next_only=*/false);
   1344       },
   1345       clear_period_ms - (base::GetWallTimeMs().count() % clear_period_ms));
   1346 
   1347   if (post_next_only)
   1348     return;
   1349 
   1350   PERFETTO_DLOG(
   1351       "Performing periodic incremental state clear for trace session %" PRIu64,
   1352       tsid);
   1353 
   1354   // Queue the IPCs to producers with active data sources that opted in.
   1355   std::map<ProducerID, std::vector<DataSourceInstanceID>> clear_map;
   1356   for (const auto& kv : tracing_session->data_source_instances) {
   1357     ProducerID producer_id = kv.first;
   1358     const DataSourceInstance& data_source = kv.second;
   1359     if (data_source.handles_incremental_state_clear)
   1360       clear_map[producer_id].push_back(data_source.instance_id);
   1361   }
   1362 
   1363   for (const auto& kv : clear_map) {
   1364     ProducerID producer_id = kv.first;
   1365     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
   1366     ProducerEndpointImpl* producer = GetProducer(producer_id);
   1367     if (!producer) {
   1368       PERFETTO_DFATAL("Producer does not exist.");
   1369       continue;
   1370     }
   1371     producer->ClearIncrementalState(data_sources);
   1372   }
   1373 }
   1374 
   1375 // Note: when this is called to write into a file passed when starting tracing
   1376 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
   1377 // to send the trace data back over IPC).
   1378 void TracingServiceImpl::ReadBuffers(TracingSessionID tsid,
   1379                                      ConsumerEndpointImpl* consumer) {
   1380   PERFETTO_DCHECK_THREAD(thread_checker_);
   1381   TracingSession* tracing_session = GetTracingSession(tsid);
   1382   if (!tracing_session) {
   1383     // This will be hit systematically from the PostDelayedTask when directly
   1384     // writing into the file (in which case consumer == nullptr). Suppress the
   1385     // log in this case as it's just spam.
   1386     if (consumer)
   1387       PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
   1388     return;  // TODO(primiano): signal failure?
   1389   }
   1390 
   1391   // When a tracing session is waiting for a trigger it is considered empty. If
   1392   // a tracing session finishes and moves into DISABLED without ever receiving a
   1393   // trigger the trace should never return any data. This includes the synthetic
   1394   // packets like TraceConfig and Clock snapshots. So we bail out early and let
   1395   // the consumer know there is no data.
   1396   if (!tracing_session->config.trigger_config().triggers().empty() &&
   1397       tracing_session->received_triggers.empty()) {
   1398     if (consumer)
   1399       consumer->consumer_->OnTraceData({}, /* has_more = */ false);
   1400     PERFETTO_DLOG(
   1401         "ReadBuffers(): tracing session has not received a trigger yet.");
   1402     return;
   1403   }
   1404 
   1405   // This can happen if the file is closed by a previous task because it reaches
   1406   // |max_file_size_bytes|.
   1407   if (!tracing_session->write_into_file && !consumer)
   1408     return;
   1409 
   1410   if (tracing_session->write_into_file && consumer) {
   1411     // If the consumer enabled tracing and asked to save the contents into the
   1412     // passed file makes little sense to also try to read the buffers over IPC,
   1413     // as that would just steal data from the periodic draining task.
   1414     PERFETTO_DFATAL("Consumer trying to read from write_into_file session.");
   1415     return;
   1416   }
   1417 
   1418   std::vector<TracePacket> packets;
   1419   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
   1420 
   1421   std::move(tracing_session->initial_clock_snapshot_.begin(),
   1422             tracing_session->initial_clock_snapshot_.end(),
   1423             std::back_inserter(packets));
   1424   tracing_session->initial_clock_snapshot_.clear();
   1425 
   1426   base::TimeMillis now = base::GetWallTimeMs();
   1427   if (now >= tracing_session->last_snapshot_time + kSnapshotsInterval) {
   1428     tracing_session->last_snapshot_time = now;
   1429     SnapshotSyncMarker(&packets);
   1430     SnapshotStats(tracing_session, &packets);
   1431 
   1432     if (!tracing_session->config.builtin_data_sources()
   1433              .disable_clock_snapshotting()) {
   1434       // We don't want to put a root timestamp in this snapshot as the packet
   1435       // may be very out of order with respect to the actual trace packets
   1436       // since consuming the trace may happen at any point after it starts.
   1437       SnapshotClocks(&packets, /*set_root_timestamp=*/false);
   1438     }
   1439   }
   1440   if (!tracing_session->config.builtin_data_sources().disable_trace_config()) {
   1441     MaybeEmitTraceConfig(tracing_session, &packets);
   1442     MaybeEmitReceivedTriggers(tracing_session, &packets);
   1443   }
   1444   if (!tracing_session->config.builtin_data_sources().disable_system_info())
   1445     MaybeEmitSystemInfo(tracing_session, &packets);
   1446 
   1447   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
   1448   size_t total_slices = 0;   // SUM(#slices in |packets|).
   1449 
   1450   // Add up size for packets added by the Maybe* calls above.
   1451   for (const TracePacket& packet : packets) {
   1452     packets_bytes += packet.size();
   1453     total_slices += packet.slices().size();
   1454   }
   1455 
   1456   // This is a rough threshold to determine how much to read from the buffer in
   1457   // each task. This is to avoid executing a single huge sending task for too
   1458   // long and risk to hit the watchdog. This is *not* an upper bound: we just
   1459   // stop accumulating new packets and PostTask *after* we cross this threshold.
   1460   // This constant essentially balances the PostTask and IPC overhead vs the
   1461   // responsiveness of the service. An extremely small value will cause one IPC
   1462   // and one PostTask for each slice but will keep the service extremely
   1463   // responsive. An extremely large value will batch the send for the full
   1464   // buffer in one large task, will hit the blocking send() once the socket
   1465   // buffers are full and hang the service for a bit (until the consumer
   1466   // catches up).
   1467   static constexpr size_t kApproxBytesPerTask = 32768;
   1468   bool did_hit_threshold = false;
   1469 
   1470   // TODO(primiano): Extend the ReadBuffers API to allow reading only some
   1471   // buffers, not all of them in one go.
   1472   for (size_t buf_idx = 0;
   1473        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
   1474        buf_idx++) {
   1475     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
   1476     if (tbuf_iter == buffers_.end()) {
   1477       PERFETTO_DFATAL("Buffer not found.");
   1478       continue;
   1479     }
   1480     TraceBuffer& tbuf = *tbuf_iter->second;
   1481     tbuf.BeginRead();
   1482     while (!did_hit_threshold) {
   1483       TracePacket packet;
   1484       TraceBuffer::PacketSequenceProperties sequence_properties{};
   1485       bool previous_packet_dropped;
   1486       if (!tbuf.ReadNextTracePacket(&packet, &sequence_properties,
   1487                                     &previous_packet_dropped)) {
   1488         break;
   1489       }
   1490       PERFETTO_DCHECK(sequence_properties.producer_id_trusted != 0);
   1491       PERFETTO_DCHECK(sequence_properties.writer_id != 0);
   1492       PERFETTO_DCHECK(sequence_properties.producer_uid_trusted != kInvalidUid);
   1493       PERFETTO_DCHECK(packet.size() > 0);
   1494       if (!PacketStreamValidator::Validate(packet.slices())) {
   1495         PERFETTO_DLOG("Dropping invalid packet");
   1496         continue;
   1497       }
   1498 
   1499       // Append a slice with the trusted field data. This can't be spoofed
   1500       // because above we validated that the existing slices don't contain any
   1501       // trusted fields. For added safety we append instead of prepending
   1502       // because according to protobuf semantics, if the same field is
   1503       // encountered multiple times the last instance takes priority. Note that
   1504       // truncated packets are also rejected, so the producer can't give us a
   1505       // partial packet (e.g., a truncated string) which only becomes valid when
   1506       // the trusted data is appended here.
   1507       protos::TrustedPacket trusted_packet;
   1508       trusted_packet.set_trusted_uid(
   1509           static_cast<int32_t>(sequence_properties.producer_uid_trusted));
   1510       trusted_packet.set_trusted_packet_sequence_id(
   1511           tracing_session->GetPacketSequenceID(
   1512               sequence_properties.producer_id_trusted,
   1513               sequence_properties.writer_id));
   1514       if (previous_packet_dropped)
   1515         trusted_packet.set_previous_packet_dropped(previous_packet_dropped);
   1516       static constexpr size_t kTrustedBufSize = 16;
   1517       Slice slice = Slice::Allocate(kTrustedBufSize);
   1518       PERFETTO_CHECK(
   1519           trusted_packet.SerializeToArray(slice.own_data(), kTrustedBufSize));
   1520       slice.size = static_cast<size_t>(trusted_packet.GetCachedSize());
   1521       PERFETTO_DCHECK(slice.size > 0 && slice.size <= kTrustedBufSize);
   1522       packet.AddSlice(std::move(slice));
   1523 
   1524       // Append the packet (inclusive of the trusted uid) to |packets|.
   1525       packets_bytes += packet.size();
   1526       total_slices += packet.slices().size();
   1527       did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
   1528                           !tracing_session->write_into_file;
   1529       packets.emplace_back(std::move(packet));
   1530     }  // for(packets...)
   1531   }    // for(buffers...)
   1532 
   1533   // If the caller asked us to write into a file by setting
   1534   // |write_into_file| == true in the trace config, drain the packets read
   1535   // (if any) into the given file descriptor.
   1536   if (tracing_session->write_into_file) {
   1537     const uint64_t max_size = tracing_session->max_file_size_bytes
   1538                                   ? tracing_session->max_file_size_bytes
   1539                                   : std::numeric_limits<size_t>::max();
   1540 
   1541     // When writing into a file, the file should look like a root trace.proto
   1542     // message. Each packet should be prepended with a proto preamble stating
   1543     // its field id (within trace.proto) and size. Hence the addition below.
   1544     const size_t max_iovecs = total_slices + packets.size();
   1545 
   1546     size_t num_iovecs = 0;
   1547     bool stop_writing_into_file = tracing_session->write_period_ms == 0;
   1548     std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
   1549     size_t num_iovecs_at_last_packet = 0;
   1550     uint64_t bytes_about_to_be_written = 0;
   1551     for (TracePacket& packet : packets) {
   1552       std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
   1553           packet.GetProtoPreamble();
   1554       bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
   1555       num_iovecs++;
   1556       for (const Slice& slice : packet.slices()) {
   1557         // writev() doesn't change the passed pointer. However, struct iovec
   1558         // take a non-const ptr because it's the same struct used by readv().
   1559         // Hence the const_cast here.
   1560         char* start = static_cast<char*>(const_cast<void*>(slice.start));
   1561         bytes_about_to_be_written += slice.size;
   1562         iovecs[num_iovecs++] = {start, slice.size};
   1563       }
   1564 
   1565       if (tracing_session->bytes_written_into_file +
   1566               bytes_about_to_be_written >=
   1567           max_size) {
   1568         stop_writing_into_file = true;
   1569         num_iovecs = num_iovecs_at_last_packet;
   1570         break;
   1571       }
   1572 
   1573       num_iovecs_at_last_packet = num_iovecs;
   1574     }
   1575     PERFETTO_DCHECK(num_iovecs <= max_iovecs);
   1576     int fd = *tracing_session->write_into_file;
   1577 
   1578     uint64_t total_wr_size = 0;
   1579 
   1580     // writev() can take at most IOV_MAX entries per call. Batch them.
   1581     constexpr size_t kIOVMax = IOV_MAX;
   1582     for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
   1583       int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
   1584       ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
   1585       if (wr_size <= 0) {
   1586         PERFETTO_PLOG("writev() failed");
   1587         stop_writing_into_file = true;
   1588         break;
   1589       }
   1590       total_wr_size += static_cast<size_t>(wr_size);
   1591     }
   1592 
   1593     tracing_session->bytes_written_into_file += total_wr_size;
   1594 
   1595     PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
   1596                   (total_wr_size + 1023) / 1024, stop_writing_into_file);
   1597     if (stop_writing_into_file) {
   1598       // Ensure all data was written to the file before we close it.
   1599       base::FlushFile(fd);
   1600       tracing_session->write_into_file.reset();
   1601       tracing_session->write_period_ms = 0;
   1602       if (tracing_session->state == TracingSession::STARTED)
   1603         DisableTracing(tsid);
   1604       return;
   1605     }
   1606 
   1607     auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1608     task_runner_->PostDelayedTask(
   1609         [weak_this, tsid] {
   1610           if (weak_this)
   1611             weak_this->ReadBuffers(tsid, nullptr);
   1612         },
   1613         tracing_session->delay_to_next_write_period_ms());
   1614     return;
   1615   }  // if (tracing_session->write_into_file)
   1616 
   1617   const bool has_more = did_hit_threshold;
   1618   if (has_more) {
   1619     auto weak_consumer = consumer->GetWeakPtr();
   1620     auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1621     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
   1622       if (!weak_this || !weak_consumer)
   1623         return;
   1624       weak_this->ReadBuffers(tsid, weak_consumer.get());
   1625     });
   1626   }
   1627 
   1628   // Keep this as tail call, just in case the consumer re-enters.
   1629   consumer->consumer_->OnTraceData(std::move(packets), has_more);
   1630 }
   1631 
   1632 void TracingServiceImpl::FreeBuffers(TracingSessionID tsid) {
   1633   PERFETTO_DCHECK_THREAD(thread_checker_);
   1634   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
   1635   TracingSession* tracing_session = GetTracingSession(tsid);
   1636   if (!tracing_session) {
   1637     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
   1638     return;  // TODO(primiano): signal failure?
   1639   }
   1640   DisableTracing(tsid, /*disable_immediately=*/true);
   1641 
   1642   PERFETTO_DCHECK(tracing_session->AllDataSourceInstancesStopped());
   1643   tracing_session->data_source_instances.clear();
   1644 
   1645   for (auto& producer_entry : producers_) {
   1646     ProducerEndpointImpl* producer = producer_entry.second;
   1647     producer->OnFreeBuffers(tracing_session->buffers_index);
   1648   }
   1649 
   1650   for (BufferID buffer_id : tracing_session->buffers_index) {
   1651     buffer_ids_.Free(buffer_id);
   1652     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
   1653     buffers_.erase(buffer_id);
   1654   }
   1655   bool notify_traceur = tracing_session->config.notify_traceur();
   1656   tracing_sessions_.erase(tsid);
   1657   UpdateMemoryGuardrail();
   1658 
   1659   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
   1660                tracing_sessions_.size());
   1661 
   1662 #if PERFETTO_BUILDFLAG(PERFETTO_OS_ANDROID)
   1663   static const char kTraceurProp[] = "sys.trace.trace_end_signal";
   1664   if (notify_traceur && __system_property_set(kTraceurProp, "1"))
   1665     PERFETTO_ELOG("Failed to setprop %s=1", kTraceurProp);
   1666 #else
   1667   base::ignore_result(notify_traceur);
   1668 #endif
   1669 }
   1670 
   1671 void TracingServiceImpl::RegisterDataSource(ProducerID producer_id,
   1672                                             const DataSourceDescriptor& desc) {
   1673   PERFETTO_DCHECK_THREAD(thread_checker_);
   1674   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
   1675                 producer_id, desc.name().c_str());
   1676 
   1677   PERFETTO_DCHECK(!desc.name().empty());
   1678   auto reg_ds = data_sources_.emplace(desc.name(),
   1679                                       RegisteredDataSource{producer_id, desc});
   1680 
   1681   // If there are existing tracing sessions, we need to check if the new
   1682   // data source is enabled by any of them.
   1683   if (tracing_sessions_.empty())
   1684     return;
   1685 
   1686   ProducerEndpointImpl* producer = GetProducer(producer_id);
   1687   if (!producer) {
   1688     PERFETTO_DFATAL("Producer not found.");
   1689     return;
   1690   }
   1691 
   1692   for (auto& iter : tracing_sessions_) {
   1693     TracingSession& tracing_session = iter.second;
   1694     if (tracing_session.state != TracingSession::STARTED &&
   1695         tracing_session.state != TracingSession::CONFIGURED) {
   1696       continue;
   1697     }
   1698 
   1699     TraceConfig::ProducerConfig producer_config;
   1700     for (auto& config : tracing_session.config.producers()) {
   1701       if (producer->name_ == config.producer_name()) {
   1702         producer_config = config;
   1703         break;
   1704       }
   1705     }
   1706     for (const TraceConfig::DataSource& cfg_data_source :
   1707          tracing_session.config.data_sources()) {
   1708       if (cfg_data_source.config().name() != desc.name())
   1709         continue;
   1710       DataSourceInstance* ds_inst = SetupDataSource(
   1711           cfg_data_source, producer_config, reg_ds->second, &tracing_session);
   1712       if (ds_inst && tracing_session.state == TracingSession::STARTED)
   1713         StartDataSourceInstance(producer, &tracing_session, ds_inst);
   1714     }
   1715   }
   1716 }
   1717 
   1718 void TracingServiceImpl::UnregisterDataSource(ProducerID producer_id,
   1719                                               const std::string& name) {
   1720   PERFETTO_DCHECK_THREAD(thread_checker_);
   1721   PERFETTO_CHECK(producer_id);
   1722   ProducerEndpointImpl* producer = GetProducer(producer_id);
   1723   PERFETTO_DCHECK(producer);
   1724   for (auto& kv : tracing_sessions_) {
   1725     auto& ds_instances = kv.second.data_source_instances;
   1726     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
   1727       if (it->first == producer_id && it->second.data_source_name == name) {
   1728         DataSourceInstanceID ds_inst_id = it->second.instance_id;
   1729         if (it->second.state != DataSourceInstance::STOPPED) {
   1730           if (it->second.state != DataSourceInstance::STOPPING)
   1731             producer->StopDataSource(ds_inst_id);
   1732           // Mark the instance as stopped immediately, since we are
   1733           // unregistering it below.
   1734           if (it->second.state == DataSourceInstance::STOPPING)
   1735             NotifyDataSourceStopped(producer_id, ds_inst_id);
   1736         }
   1737         it = ds_instances.erase(it);
   1738       } else {
   1739         ++it;
   1740       }
   1741     }  // for (data_source_instances)
   1742   }    // for (tracing_session)
   1743 
   1744   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
   1745     if (it->second.producer_id == producer_id &&
   1746         it->second.descriptor.name() == name) {
   1747       data_sources_.erase(it);
   1748       return;
   1749     }
   1750   }
   1751 
   1752   PERFETTO_DFATAL(
   1753       "Tried to unregister a non-existent data source \"%s\" for "
   1754       "producer %" PRIu16,
   1755       name.c_str(), producer_id);
   1756 }
   1757 
   1758 TracingServiceImpl::DataSourceInstance* TracingServiceImpl::SetupDataSource(
   1759     const TraceConfig::DataSource& cfg_data_source,
   1760     const TraceConfig::ProducerConfig& producer_config,
   1761     const RegisteredDataSource& data_source,
   1762     TracingSession* tracing_session) {
   1763   PERFETTO_DCHECK_THREAD(thread_checker_);
   1764   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
   1765   PERFETTO_DCHECK(producer);
   1766   // An existing producer that is not ftrace could have registered itself as
   1767   // ftrace, we must not enable it in that case.
   1768   if (lockdown_mode_ && producer->uid_ != uid_) {
   1769     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
   1770     return nullptr;
   1771   }
   1772   // TODO(primiano): Add tests for registration ordering
   1773   // (data sources vs consumers).
   1774   // TODO: This logic is duplicated in ChangeTraceConfig, consider refactoring
   1775   // it. Meanwhile update both.
   1776   if (!cfg_data_source.producer_name_filter().empty()) {
   1777     if (std::find(cfg_data_source.producer_name_filter().begin(),
   1778                   cfg_data_source.producer_name_filter().end(),
   1779                   producer->name_) ==
   1780         cfg_data_source.producer_name_filter().end()) {
   1781       PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
   1782                     cfg_data_source.config().name().c_str(),
   1783                     producer->name_.c_str());
   1784       return nullptr;
   1785     }
   1786   }
   1787 
   1788   auto relative_buffer_id = cfg_data_source.config().target_buffer();
   1789   if (relative_buffer_id >= tracing_session->num_buffers()) {
   1790     PERFETTO_LOG(
   1791         "The TraceConfig for DataSource %s specified a target_buffer out of "
   1792         "bound (%d). Skipping it.",
   1793         cfg_data_source.config().name().c_str(), relative_buffer_id);
   1794     return nullptr;
   1795   }
   1796 
   1797   // Create a copy of the DataSourceConfig specified in the trace config. This
   1798   // will be passed to the producer after translating the |target_buffer| id.
   1799   // The |target_buffer| parameter passed by the consumer in the trace config is
   1800   // relative to the buffers declared in the same trace config. This has to be
   1801   // translated to the global BufferID before passing it to the producers, which
   1802   // don't know anything about tracing sessions and consumers.
   1803 
   1804   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
   1805   auto insert_iter = tracing_session->data_source_instances.emplace(
   1806       std::piecewise_construct,  //
   1807       std::forward_as_tuple(producer->id_),
   1808       std::forward_as_tuple(
   1809           inst_id,
   1810           cfg_data_source.config(),  //  Deliberate copy.
   1811           data_source.descriptor.name(),
   1812           data_source.descriptor.will_notify_on_start(),
   1813           data_source.descriptor.will_notify_on_stop(),
   1814           data_source.descriptor.handles_incremental_state_clear()));
   1815   DataSourceInstance* ds_instance = &insert_iter->second;
   1816 
   1817   // New data source instance starts out in CONFIGURED state.
   1818   if (tracing_session->consumer_maybe_null) {
   1819     tracing_session->consumer_maybe_null->OnDataSourceInstanceStateChange(
   1820         *producer, *ds_instance);
   1821   }
   1822 
   1823   DataSourceConfig& ds_config = ds_instance->config;
   1824   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
   1825   ds_config.set_enable_extra_guardrails(
   1826       tracing_session->config.enable_extra_guardrails());
   1827   ds_config.set_tracing_session_id(tracing_session->id);
   1828   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
   1829   PERFETTO_DCHECK(global_id);
   1830   ds_config.set_target_buffer(global_id);
   1831 
   1832   PERFETTO_DLOG("Setting up data source %s with target buffer %" PRIu16,
   1833                 ds_config.name().c_str(), global_id);
   1834   if (!producer->shared_memory()) {
   1835     // Determine the SMB page size. Must be an integer multiple of 4k.
   1836     size_t page_size = std::min<size_t>(producer_config.page_size_kb() * 1024,
   1837                                         SharedMemoryABI::kMaxPageSize);
   1838     if (page_size < base::kPageSize || page_size % base::kPageSize != 0)
   1839       page_size = kDefaultShmPageSize;
   1840     producer->shared_buffer_page_size_kb_ = page_size / 1024;
   1841 
   1842     // Determine the SMB size. Must be an integer multiple of the SMB page size.
   1843     // The decisional tree is as follows:
   1844     // 1. Give priority to what defined in the trace config.
   1845     // 2. If unset give priority to the hint passed by the producer.
   1846     // 3. Keep within bounds and ensure it's a multiple of the page size.
   1847     size_t shm_size = producer_config.shm_size_kb() * 1024;
   1848     if (shm_size == 0)
   1849       shm_size = producer->shmem_size_hint_bytes_;
   1850     shm_size = std::min<size_t>(shm_size, kMaxShmSize);
   1851     if (shm_size < page_size || shm_size % page_size)
   1852       shm_size = kDefaultShmSize;
   1853 
   1854     // TODO(primiano): right now Create() will suicide in case of OOM if the
   1855     // mmap fails. We should instead gracefully fail the request and tell the
   1856     // client to go away.
   1857     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
   1858     producer->SetSharedMemory(std::move(shared_memory));
   1859     producer->OnTracingSetup();
   1860     UpdateMemoryGuardrail();
   1861   }
   1862   producer->SetupDataSource(inst_id, ds_config);
   1863   return ds_instance;
   1864 }
   1865 
   1866 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
   1867 // might be lying / returning garbage contents. |src| and |size| can be trusted
   1868 // in terms of being a valid pointer, but not the contents.
   1869 void TracingServiceImpl::CopyProducerPageIntoLogBuffer(
   1870     ProducerID producer_id_trusted,
   1871     uid_t producer_uid_trusted,
   1872     WriterID writer_id,
   1873     ChunkID chunk_id,
   1874     BufferID buffer_id,
   1875     uint16_t num_fragments,
   1876     uint8_t chunk_flags,
   1877     bool chunk_complete,
   1878     const uint8_t* src,
   1879     size_t size) {
   1880   PERFETTO_DCHECK_THREAD(thread_checker_);
   1881 
   1882   ProducerEndpointImpl* producer = GetProducer(producer_id_trusted);
   1883   if (!producer) {
   1884     PERFETTO_DFATAL("Producer not found.");
   1885     chunks_discarded_++;
   1886     return;
   1887   }
   1888 
   1889   TraceBuffer* buf = GetBufferByID(buffer_id);
   1890   if (!buf) {
   1891     PERFETTO_DLOG("Could not find target buffer %" PRIu16
   1892                   " for producer %" PRIu16,
   1893                   buffer_id, producer_id_trusted);
   1894     chunks_discarded_++;
   1895     return;
   1896   }
   1897 
   1898   // Verify that the producer is actually allowed to write into the target
   1899   // buffer specified in the request. This prevents a malicious producer from
   1900   // injecting data into a log buffer that belongs to a tracing session the
   1901   // producer is not part of.
   1902   if (!producer->is_allowed_target_buffer(buffer_id)) {
   1903     PERFETTO_ELOG("Producer %" PRIu16
   1904                   " tried to write into forbidden target buffer %" PRIu16,
   1905                   producer_id_trusted, buffer_id);
   1906     PERFETTO_DFATAL("Forbidden target buffer");
   1907     chunks_discarded_++;
   1908     return;
   1909   }
   1910 
   1911   // If the writer was registered by the producer, it should only write into the
   1912   // buffer it was registered with.
   1913   base::Optional<BufferID> associated_buffer =
   1914       producer->buffer_id_for_writer(writer_id);
   1915   if (associated_buffer && *associated_buffer != buffer_id) {
   1916     PERFETTO_ELOG("Writer %" PRIu16 " of producer %" PRIu16
   1917                   " was registered to write into target buffer %" PRIu16
   1918                   ", but tried to write into buffer %" PRIu16,
   1919                   writer_id, producer_id_trusted, *associated_buffer,
   1920                   buffer_id);
   1921     PERFETTO_DFATAL("Wrong target buffer");
   1922     chunks_discarded_++;
   1923     return;
   1924   }
   1925 
   1926   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
   1927                           chunk_id, num_fragments, chunk_flags, chunk_complete,
   1928                           src, size);
   1929 }
   1930 
   1931 void TracingServiceImpl::ApplyChunkPatches(
   1932     ProducerID producer_id_trusted,
   1933     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
   1934   PERFETTO_DCHECK_THREAD(thread_checker_);
   1935 
   1936   for (const auto& chunk : chunks_to_patch) {
   1937     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
   1938     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
   1939     TraceBuffer* buf =
   1940         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
   1941     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
   1942                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
   1943     if (!writer_id || writer_id > kMaxWriterID || !buf) {
   1944       PERFETTO_ELOG(
   1945           "Received invalid chunks_to_patch request from Producer: %" PRIu16
   1946           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
   1947           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
   1948       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
   1949       continue;
   1950     }
   1951 
   1952     // Note, there's no need to validate that the producer is allowed to write
   1953     // to the specified buffer ID (or that it's the correct buffer ID for a
   1954     // registered TraceWriter). That's because TraceBuffer uses the producer ID
   1955     // and writer ID to look up the chunk to patch. If the producer specifies an
   1956     // incorrect buffer, this lookup will fail and TraceBuffer will ignore the
   1957     // patches. Because the producer ID is trusted, there's also no way for a
   1958     // malicious producer to patch another producer's data.
   1959 
   1960     // Speculate on the fact that there are going to be a limited amount of
   1961     // patches per request, so we can allocate the |patches| array on the stack.
   1962     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
   1963     if (chunk.patches().size() > patches.size()) {
   1964       PERFETTO_ELOG("Too many patches (%zu) batched in the same request",
   1965                     patches.size());
   1966       PERFETTO_DFATAL("Too many patches");
   1967       patches_discarded_ += static_cast<uint64_t>(chunk.patches_size());
   1968       continue;
   1969     }
   1970 
   1971     size_t i = 0;
   1972     for (const auto& patch : chunk.patches()) {
   1973       const std::string& patch_data = patch.data();
   1974       if (patch_data.size() != patches[i].data.size()) {
   1975         PERFETTO_ELOG("Received patch from producer: %" PRIu16
   1976                       " of unexpected size %zu",
   1977                       producer_id_trusted, patch_data.size());
   1978         patches_discarded_++;
   1979         continue;
   1980       }
   1981       patches[i].offset_untrusted = patch.offset();
   1982       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
   1983       i++;
   1984     }
   1985     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
   1986                                &patches[0], i, chunk.has_more_patches());
   1987   }
   1988 }
   1989 
   1990 TracingServiceImpl::TracingSession* TracingServiceImpl::GetDetachedSession(
   1991     uid_t uid,
   1992     const std::string& key) {
   1993   PERFETTO_DCHECK_THREAD(thread_checker_);
   1994   for (auto& kv : tracing_sessions_) {
   1995     TracingSession* session = &kv.second;
   1996     if (session->consumer_uid == uid && session->detach_key == key) {
   1997       PERFETTO_DCHECK(session->consumer_maybe_null == nullptr);
   1998       return session;
   1999     }
   2000   }
   2001   return nullptr;
   2002 }
   2003 
   2004 TracingServiceImpl::TracingSession* TracingServiceImpl::GetTracingSession(
   2005     TracingSessionID tsid) {
   2006   PERFETTO_DCHECK_THREAD(thread_checker_);
   2007   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
   2008   if (it == tracing_sessions_.end())
   2009     return nullptr;
   2010   return &it->second;
   2011 }
   2012 
   2013 ProducerID TracingServiceImpl::GetNextProducerID() {
   2014   PERFETTO_DCHECK_THREAD(thread_checker_);
   2015   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
   2016   do {
   2017     ++last_producer_id_;
   2018   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
   2019   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
   2020   return last_producer_id_;
   2021 }
   2022 
   2023 TraceBuffer* TracingServiceImpl::GetBufferByID(BufferID buffer_id) {
   2024   auto buf_iter = buffers_.find(buffer_id);
   2025   if (buf_iter == buffers_.end())
   2026     return nullptr;
   2027   return &*buf_iter->second;
   2028 }
   2029 
   2030 void TracingServiceImpl::OnStartTriggersTimeout(TracingSessionID tsid) {
   2031   // Skip entirely the flush if the trace session doesn't exist anymore.
   2032   // This is to prevent misleading error messages to be logged.
   2033   //
   2034   // if the trace has started from the trigger we rely on
   2035   // the |stop_delay_ms| from the trigger so don't flush and
   2036   // disable if we've moved beyond a CONFIGURED state
   2037   auto* tracing_session_ptr = GetTracingSession(tsid);
   2038   if (tracing_session_ptr &&
   2039       tracing_session_ptr->state == TracingSession::CONFIGURED) {
   2040     PERFETTO_DLOG("Disabling TracingSession %" PRIu64
   2041                   " since no triggers activated.",
   2042                   tsid);
   2043     // No data should be returned from ReadBuffers() regardless of if we
   2044     // call FreeBuffers() or DisableTracing(). This is because in
   2045     // STOP_TRACING we need this promise in either case, and using
   2046     // DisableTracing() allows a graceful shutdown. Consumers can follow
   2047     // their normal path and check the buffers through ReadBuffers() and
   2048     // the code won't hang because the tracing session will still be
   2049     // alive just disabled.
   2050     DisableTracing(tsid);
   2051   }
   2052 }
   2053 
   2054 void TracingServiceImpl::UpdateMemoryGuardrail() {
   2055 #if !PERFETTO_BUILDFLAG(PERFETTO_EMBEDDER_BUILD) && \
   2056     !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   2057   uint64_t total_buffer_bytes = 0;
   2058 
   2059   // Sum up all the shared memory buffers.
   2060   for (const auto& id_to_producer : producers_) {
   2061     if (id_to_producer.second->shared_memory())
   2062       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
   2063   }
   2064 
   2065   // Sum up all the trace buffers.
   2066   for (const auto& id_to_buffer : buffers_) {
   2067     total_buffer_bytes += id_to_buffer.second->size();
   2068   }
   2069 
   2070   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
   2071   // interval.
   2072   uint64_t guardrail = 32 * 1024 * 1024 + total_buffer_bytes;
   2073   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
   2074 #endif
   2075 }
   2076 
   2077 void TracingServiceImpl::SnapshotSyncMarker(std::vector<TracePacket>* packets) {
   2078   // The sync markes is used to tokenize large traces efficiently.
   2079   // See description in trace_packet.proto.
   2080   if (sync_marker_packet_size_ == 0) {
   2081     // Serialize the marker and the uid separately to guarantee that the marker
   2082     // is serialzied at the end and is adjacent to the start of the next packet.
   2083     int size_left = static_cast<int>(sizeof(sync_marker_packet_));
   2084     uint8_t* dst = &sync_marker_packet_[0];
   2085     protos::TrustedPacket packet;
   2086     packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2087     packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2088     PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
   2089     size_left -= packet.ByteSize();
   2090     sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
   2091     dst += sync_marker_packet_size_;
   2092 
   2093     packet.Clear();
   2094     packet.set_synchronization_marker(kSyncMarker, sizeof(kSyncMarker));
   2095     PERFETTO_CHECK(packet.SerializeToArray(dst, size_left));
   2096     sync_marker_packet_size_ += static_cast<size_t>(packet.ByteSize());
   2097     PERFETTO_CHECK(sync_marker_packet_size_ <= sizeof(sync_marker_packet_));
   2098   };
   2099   packets->emplace_back();
   2100   packets->back().AddSlice(&sync_marker_packet_[0], sync_marker_packet_size_);
   2101 }
   2102 
   2103 void TracingServiceImpl::SnapshotClocks(std::vector<TracePacket>* packets,
   2104                                         bool set_root_timestamp) {
   2105   protos::TrustedPacket packet;
   2106   protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot();
   2107 
   2108 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) && \
   2109     !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
   2110   struct {
   2111     clockid_t id;
   2112     protos::ClockSnapshot::Clock::Type type;
   2113     struct timespec ts;
   2114   } clocks[] = {
   2115       {CLOCK_BOOTTIME, protos::ClockSnapshot::Clock::BOOTTIME, {0, 0}},
   2116       {CLOCK_REALTIME_COARSE,
   2117        protos::ClockSnapshot::Clock::REALTIME_COARSE,
   2118        {0, 0}},
   2119       {CLOCK_MONOTONIC_COARSE,
   2120        protos::ClockSnapshot::Clock::MONOTONIC_COARSE,
   2121        {0, 0}},
   2122       {CLOCK_REALTIME, protos::ClockSnapshot::Clock::REALTIME, {0, 0}},
   2123       {CLOCK_MONOTONIC, protos::ClockSnapshot::Clock::MONOTONIC, {0, 0}},
   2124       {CLOCK_MONOTONIC_RAW,
   2125        protos::ClockSnapshot::Clock::MONOTONIC_RAW,
   2126        {0, 0}},
   2127       {CLOCK_PROCESS_CPUTIME_ID,
   2128        protos::ClockSnapshot::Clock::PROCESS_CPUTIME,
   2129        {0, 0}},
   2130       {CLOCK_THREAD_CPUTIME_ID,
   2131        protos::ClockSnapshot::Clock::THREAD_CPUTIME,
   2132        {0, 0}},
   2133   };
   2134   // First snapshot all the clocks as atomically as we can.
   2135   for (auto& clock : clocks) {
   2136     if (clock_gettime(clock.id, &clock.ts) == -1)
   2137       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
   2138   }
   2139   for (auto& clock : clocks) {
   2140     if (set_root_timestamp &&
   2141         clock.type == protos::ClockSnapshot::Clock::BOOTTIME) {
   2142       packet.set_timestamp(
   2143           static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
   2144     };
   2145     protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
   2146     c->set_type(clock.type);
   2147     c->set_timestamp(
   2148         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
   2149   }
   2150 #else   // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   2151   auto wall_time_ns = static_cast<uint64_t>(base::GetWallTimeNs().count());
   2152   if (set_root_timestamp)
   2153     packet.set_timestamp(wall_time_ns);
   2154   protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
   2155   c->set_type(protos::ClockSnapshot::Clock::MONOTONIC);
   2156   c->set_timestamp(wall_time_ns);
   2157 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   2158 
   2159   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2160   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2161   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   2162   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   2163   packets->emplace_back();
   2164   packets->back().AddSlice(std::move(slice));
   2165 }
   2166 
   2167 void TracingServiceImpl::SnapshotStats(TracingSession* tracing_session,
   2168                                        std::vector<TracePacket>* packets) {
   2169   protos::TrustedPacket packet;
   2170   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2171   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2172 
   2173   protos::TraceStats* trace_stats = packet.mutable_trace_stats();
   2174   GetTraceStats(tracing_session).ToProto(trace_stats);
   2175   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   2176   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   2177   packets->emplace_back();
   2178   packets->back().AddSlice(std::move(slice));
   2179 }
   2180 
   2181 TraceStats TracingServiceImpl::GetTraceStats(TracingSession* tracing_session) {
   2182   TraceStats trace_stats;
   2183   trace_stats.set_producers_connected(static_cast<uint32_t>(producers_.size()));
   2184   trace_stats.set_producers_seen(last_producer_id_);
   2185   trace_stats.set_data_sources_registered(
   2186       static_cast<uint32_t>(data_sources_.size()));
   2187   trace_stats.set_data_sources_seen(last_data_source_instance_id_);
   2188   trace_stats.set_tracing_sessions(
   2189       static_cast<uint32_t>(tracing_sessions_.size()));
   2190   trace_stats.set_total_buffers(static_cast<uint32_t>(buffers_.size()));
   2191   trace_stats.set_chunks_discarded(chunks_discarded_);
   2192   trace_stats.set_patches_discarded(patches_discarded_);
   2193 
   2194   for (BufferID buf_id : tracing_session->buffers_index) {
   2195     TraceBuffer* buf = GetBufferByID(buf_id);
   2196     if (!buf) {
   2197       PERFETTO_DFATAL("Buffer not found.");
   2198       continue;
   2199     }
   2200     *trace_stats.add_buffer_stats() = buf->stats();
   2201   }  // for (buf in session).
   2202   return trace_stats;
   2203 }
   2204 
   2205 void TracingServiceImpl::MaybeEmitTraceConfig(
   2206     TracingSession* tracing_session,
   2207     std::vector<TracePacket>* packets) {
   2208   if (tracing_session->did_emit_config)
   2209     return;
   2210   tracing_session->did_emit_config = true;
   2211   protos::TrustedPacket packet;
   2212   tracing_session->config.ToProto(packet.mutable_trace_config());
   2213   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2214   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2215   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   2216   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   2217   packets->emplace_back();
   2218   packets->back().AddSlice(std::move(slice));
   2219 }
   2220 
   2221 void TracingServiceImpl::MaybeEmitSystemInfo(
   2222     TracingSession* tracing_session,
   2223     std::vector<TracePacket>* packets) {
   2224   if (tracing_session->did_emit_system_info)
   2225     return;
   2226   tracing_session->did_emit_system_info = true;
   2227   protos::TrustedPacket packet;
   2228 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_WIN)
   2229   protos::SystemInfo* info = packet.mutable_system_info();
   2230   struct utsname uname_info;
   2231   if (uname(&uname_info) == 0) {
   2232     protos::Utsname* utsname_info = info->mutable_utsname();
   2233     utsname_info->set_sysname(uname_info.sysname);
   2234     utsname_info->set_version(uname_info.version);
   2235     utsname_info->set_machine(uname_info.machine);
   2236     utsname_info->set_release(uname_info.release);
   2237   }
   2238 #endif
   2239   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2240   packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2241   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   2242   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   2243   packets->emplace_back();
   2244   packets->back().AddSlice(std::move(slice));
   2245 }
   2246 
   2247 void TracingServiceImpl::MaybeEmitReceivedTriggers(
   2248     TracingSession* tracing_session,
   2249     std::vector<TracePacket>* packets) {
   2250   PERFETTO_DCHECK(tracing_session->num_triggers_emitted_into_trace <=
   2251                   tracing_session->received_triggers.size());
   2252   for (size_t i = tracing_session->num_triggers_emitted_into_trace;
   2253        i < tracing_session->received_triggers.size(); ++i) {
   2254     const auto& info = tracing_session->received_triggers[i];
   2255     protos::TrustedPacket packet;
   2256 
   2257     protos::Trigger* trigger = packet.mutable_trigger();
   2258     trigger->set_trigger_name(info.trigger_name);
   2259     trigger->set_producer_name(info.producer_name);
   2260     trigger->set_trusted_producer_uid(static_cast<int32_t>(info.producer_uid));
   2261 
   2262     packet.set_timestamp(info.boot_time_ns);
   2263     packet.set_trusted_uid(static_cast<int32_t>(uid_));
   2264     packet.set_trusted_packet_sequence_id(kServicePacketSequenceID);
   2265     Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   2266     PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   2267     packets->emplace_back();
   2268     packets->back().AddSlice(std::move(slice));
   2269     ++tracing_session->num_triggers_emitted_into_trace;
   2270   }
   2271 }
   2272 
   2273 ////////////////////////////////////////////////////////////////////////////////
   2274 // TracingServiceImpl::ConsumerEndpointImpl implementation
   2275 ////////////////////////////////////////////////////////////////////////////////
   2276 
   2277 TracingServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
   2278     TracingServiceImpl* service,
   2279     base::TaskRunner* task_runner,
   2280     Consumer* consumer,
   2281     uid_t uid)
   2282     : task_runner_(task_runner),
   2283       service_(service),
   2284       consumer_(consumer),
   2285       uid_(uid),
   2286       weak_ptr_factory_(this) {}
   2287 
   2288 TracingServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
   2289   service_->DisconnectConsumer(this);
   2290   consumer_->OnDisconnect();
   2291 }
   2292 
   2293 void TracingServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
   2294   PERFETTO_DCHECK_THREAD(thread_checker_);
   2295   auto weak_this = GetWeakPtr();
   2296   task_runner_->PostTask([weak_this] {
   2297     if (weak_this)
   2298       weak_this->consumer_->OnTracingDisabled();
   2299   });
   2300 }
   2301 
   2302 void TracingServiceImpl::ConsumerEndpointImpl::EnableTracing(
   2303     const TraceConfig& cfg,
   2304     base::ScopedFile fd) {
   2305   PERFETTO_DCHECK_THREAD(thread_checker_);
   2306   if (!service_->EnableTracing(this, cfg, std::move(fd)))
   2307     NotifyOnTracingDisabled();
   2308 }
   2309 
   2310 void TracingServiceImpl::ConsumerEndpointImpl::ChangeTraceConfig(
   2311     const TraceConfig& cfg) {
   2312   if (!tracing_session_id_) {
   2313     PERFETTO_LOG(
   2314         "Consumer called ChangeTraceConfig() but tracing was "
   2315         "not active");
   2316     return;
   2317   }
   2318   service_->ChangeTraceConfig(this, cfg);
   2319 }
   2320 
   2321 void TracingServiceImpl::ConsumerEndpointImpl::StartTracing() {
   2322   PERFETTO_DCHECK_THREAD(thread_checker_);
   2323   if (!tracing_session_id_) {
   2324     PERFETTO_LOG("Consumer called StartTracing() but tracing was not active");
   2325     return;
   2326   }
   2327   service_->StartTracing(tracing_session_id_);
   2328 }
   2329 
   2330 void TracingServiceImpl::ConsumerEndpointImpl::DisableTracing() {
   2331   PERFETTO_DCHECK_THREAD(thread_checker_);
   2332   if (!tracing_session_id_) {
   2333     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
   2334     return;
   2335   }
   2336   service_->DisableTracing(tracing_session_id_);
   2337 }
   2338 
   2339 void TracingServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
   2340   PERFETTO_DCHECK_THREAD(thread_checker_);
   2341   if (!tracing_session_id_) {
   2342     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
   2343     return;
   2344   }
   2345   service_->ReadBuffers(tracing_session_id_, this);
   2346 }
   2347 
   2348 void TracingServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
   2349   PERFETTO_DCHECK_THREAD(thread_checker_);
   2350   if (!tracing_session_id_) {
   2351     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
   2352     return;
   2353   }
   2354   service_->FreeBuffers(tracing_session_id_);
   2355   tracing_session_id_ = 0;
   2356 }
   2357 
   2358 void TracingServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
   2359                                                      FlushCallback callback) {
   2360   PERFETTO_DCHECK_THREAD(thread_checker_);
   2361   if (!tracing_session_id_) {
   2362     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
   2363     return;
   2364   }
   2365   service_->Flush(tracing_session_id_, timeout_ms, callback);
   2366 }
   2367 
   2368 void TracingServiceImpl::ConsumerEndpointImpl::Detach(const std::string& key) {
   2369   PERFETTO_DCHECK_THREAD(thread_checker_);
   2370   bool success = service_->DetachConsumer(this, key);
   2371   auto weak_this = GetWeakPtr();
   2372   task_runner_->PostTask([weak_this, success] {
   2373     if (weak_this)
   2374       weak_this->consumer_->OnDetach(success);
   2375   });
   2376 }
   2377 
   2378 void TracingServiceImpl::ConsumerEndpointImpl::Attach(const std::string& key) {
   2379   PERFETTO_DCHECK_THREAD(thread_checker_);
   2380   bool success = service_->AttachConsumer(this, key);
   2381   auto weak_this = GetWeakPtr();
   2382   task_runner_->PostTask([weak_this, success] {
   2383     if (!weak_this)
   2384       return;
   2385     Consumer* consumer = weak_this->consumer_;
   2386     TracingSession* session =
   2387         weak_this->service_->GetTracingSession(weak_this->tracing_session_id_);
   2388     if (!session) {
   2389       consumer->OnAttach(false, TraceConfig());
   2390       return;
   2391     }
   2392     consumer->OnAttach(success, session->config);
   2393   });
   2394 }
   2395 
   2396 void TracingServiceImpl::ConsumerEndpointImpl::GetTraceStats() {
   2397   PERFETTO_DCHECK_THREAD(thread_checker_);
   2398   bool success = false;
   2399   TraceStats stats;
   2400   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
   2401   if (session) {
   2402     success = true;
   2403     stats = service_->GetTraceStats(session);
   2404   }
   2405   auto weak_this = GetWeakPtr();
   2406   task_runner_->PostTask([weak_this, success, stats] {
   2407     if (weak_this)
   2408       weak_this->consumer_->OnTraceStats(success, stats);
   2409   });
   2410 }
   2411 
   2412 void TracingServiceImpl::ConsumerEndpointImpl::ObserveEvents(
   2413     uint32_t enabled_event_types) {
   2414   PERFETTO_DCHECK_THREAD(thread_checker_);
   2415   enabled_observable_event_types_ = enabled_event_types;
   2416 
   2417   if (enabled_observable_event_types_ == ObservableEventType::kNone)
   2418     return;
   2419 
   2420   PERFETTO_DCHECK(enabled_observable_event_types_ ==
   2421                   ObservableEventType::kDataSourceInstances);
   2422 
   2423   TracingSession* session = service_->GetTracingSession(tracing_session_id_);
   2424   if (!session)
   2425     return;
   2426 
   2427   // Issue initial states
   2428   for (const auto& kv : session->data_source_instances) {
   2429     ProducerEndpointImpl* producer = service_->GetProducer(kv.first);
   2430     PERFETTO_DCHECK(producer);
   2431     OnDataSourceInstanceStateChange(*producer, kv.second);
   2432   }
   2433 }
   2434 
   2435 void TracingServiceImpl::ConsumerEndpointImpl::OnDataSourceInstanceStateChange(
   2436     const ProducerEndpointImpl& producer,
   2437     const DataSourceInstance& instance) {
   2438   if (!(enabled_observable_event_types_ &
   2439         ObservableEventType::kDataSourceInstances)) {
   2440     return;
   2441   }
   2442 
   2443   if (instance.state != DataSourceInstance::CONFIGURED &&
   2444       instance.state != DataSourceInstance::STARTED &&
   2445       instance.state != DataSourceInstance::STOPPED) {
   2446     return;
   2447   }
   2448 
   2449   auto* observable_events = AddObservableEvents();
   2450   auto* change = observable_events->add_instance_state_changes();
   2451   change->set_producer_name(producer.name_);
   2452   change->set_data_source_name(instance.data_source_name);
   2453   if (instance.state == DataSourceInstance::STARTED) {
   2454     change->set_state(ObservableEvents::DataSourceInstanceStateChange::
   2455                           DATA_SOURCE_INSTANCE_STATE_STARTED);
   2456   } else {
   2457     change->set_state(ObservableEvents::DataSourceInstanceStateChange::
   2458                           DATA_SOURCE_INSTANCE_STATE_STOPPED);
   2459   }
   2460 }
   2461 
   2462 base::WeakPtr<TracingServiceImpl::ConsumerEndpointImpl>
   2463 TracingServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
   2464   PERFETTO_DCHECK_THREAD(thread_checker_);
   2465   return weak_ptr_factory_.GetWeakPtr();
   2466 }
   2467 
   2468 ObservableEvents*
   2469 TracingServiceImpl::ConsumerEndpointImpl::AddObservableEvents() {
   2470   PERFETTO_DCHECK_THREAD(thread_checker_);
   2471   if (!observable_events_) {
   2472     observable_events_.reset(new ObservableEvents());
   2473     auto weak_this = GetWeakPtr();
   2474     task_runner_->PostTask([weak_this] {
   2475       if (!weak_this)
   2476         return;
   2477 
   2478       // Move into a temporary to allow reentrancy in OnObservableEvents.
   2479       auto observable_events = std::move(weak_this->observable_events_);
   2480       weak_this->consumer_->OnObservableEvents(*observable_events);
   2481     });
   2482   }
   2483   return observable_events_.get();
   2484 }
   2485 
   2486 ////////////////////////////////////////////////////////////////////////////////
   2487 // TracingServiceImpl::ProducerEndpointImpl implementation
   2488 ////////////////////////////////////////////////////////////////////////////////
   2489 
   2490 TracingServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
   2491     ProducerID id,
   2492     uid_t uid,
   2493     TracingServiceImpl* service,
   2494     base::TaskRunner* task_runner,
   2495     Producer* producer,
   2496     const std::string& producer_name,
   2497     bool in_process,
   2498     bool smb_scraping_enabled)
   2499     : id_(id),
   2500       uid_(uid),
   2501       service_(service),
   2502       task_runner_(task_runner),
   2503       producer_(producer),
   2504       name_(producer_name),
   2505       in_process_(in_process),
   2506       smb_scraping_enabled_(smb_scraping_enabled),
   2507       weak_ptr_factory_(this) {}
   2508 
   2509 TracingServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
   2510   service_->DisconnectProducer(id_);
   2511   producer_->OnDisconnect();
   2512 }
   2513 
   2514 void TracingServiceImpl::ProducerEndpointImpl::RegisterDataSource(
   2515     const DataSourceDescriptor& desc) {
   2516   PERFETTO_DCHECK_THREAD(thread_checker_);
   2517   if (desc.name().empty()) {
   2518     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
   2519     return;
   2520   }
   2521 
   2522   service_->RegisterDataSource(id_, desc);
   2523 }
   2524 
   2525 void TracingServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
   2526     const std::string& name) {
   2527   PERFETTO_DCHECK_THREAD(thread_checker_);
   2528   service_->UnregisterDataSource(id_, name);
   2529 }
   2530 
   2531 void TracingServiceImpl::ProducerEndpointImpl::RegisterTraceWriter(
   2532     uint32_t writer_id,
   2533     uint32_t target_buffer) {
   2534   PERFETTO_DCHECK_THREAD(thread_checker_);
   2535   PERFETTO_DCHECK(!buffer_id_for_writer(static_cast<WriterID>(writer_id)));
   2536   writers_[static_cast<WriterID>(writer_id)] =
   2537       static_cast<BufferID>(target_buffer);
   2538 }
   2539 
   2540 void TracingServiceImpl::ProducerEndpointImpl::UnregisterTraceWriter(
   2541     uint32_t writer_id) {
   2542   PERFETTO_DCHECK_THREAD(thread_checker_);
   2543   PERFETTO_DCHECK(buffer_id_for_writer(static_cast<WriterID>(writer_id)));
   2544   writers_.erase(static_cast<WriterID>(writer_id));
   2545 }
   2546 
   2547 void TracingServiceImpl::ProducerEndpointImpl::CommitData(
   2548     const CommitDataRequest& req_untrusted,
   2549     CommitDataCallback callback) {
   2550   PERFETTO_DCHECK_THREAD(thread_checker_);
   2551 
   2552   if (!shared_memory_) {
   2553     PERFETTO_DLOG(
   2554         "Attempted to commit data before the shared memory was allocated.");
   2555     return;
   2556   }
   2557   PERFETTO_DCHECK(shmem_abi_.is_valid());
   2558   for (const auto& entry : req_untrusted.chunks_to_move()) {
   2559     const uint32_t page_idx = entry.page();
   2560     if (page_idx >= shmem_abi_.num_pages())
   2561       continue;  // A buggy or malicious producer.
   2562 
   2563     SharedMemoryABI::Chunk chunk =
   2564         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
   2565     if (!chunk.is_valid()) {
   2566       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
   2567                     entry.page(), entry.chunk());
   2568       continue;
   2569     }
   2570 
   2571     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
   2572     // the ABI contract expects the producer to not touch the chunk anymore
   2573     // (until the service marks that as free). This is why all the reads below
   2574     // are just memory_order_relaxed. Also, the code here assumes that all this
   2575     // data can be malicious and just gives up if anything is malformed.
   2576     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
   2577     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
   2578     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
   2579     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
   2580     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
   2581     uint16_t num_fragments = packets.count;
   2582     uint8_t chunk_flags = packets.flags;
   2583 
   2584     service_->CopyProducerPageIntoLogBuffer(
   2585         id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
   2586         /*chunk_complete=*/true, chunk.payload_begin(), chunk.payload_size());
   2587 
   2588     // This one has release-store semantics.
   2589     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
   2590   }  // for(chunks_to_move)
   2591 
   2592   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
   2593 
   2594   if (req_untrusted.flush_request_id()) {
   2595     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
   2596   }
   2597 
   2598   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
   2599   // callback being invoked within the same callstack and not posted. If this
   2600   // changes, the code there needs to be changed accordingly.
   2601   if (callback)
   2602     callback();
   2603 }
   2604 
   2605 void TracingServiceImpl::ProducerEndpointImpl::SetSharedMemory(
   2606     std::unique_ptr<SharedMemory> shared_memory) {
   2607   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
   2608   shared_memory_ = std::move(shared_memory);
   2609   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
   2610                         shared_memory_->size(),
   2611                         shared_buffer_page_size_kb() * 1024);
   2612   if (in_process_) {
   2613     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
   2614         shared_memory_->start(), shared_memory_->size(),
   2615         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
   2616   }
   2617 }
   2618 
   2619 SharedMemory* TracingServiceImpl::ProducerEndpointImpl::shared_memory() const {
   2620   PERFETTO_DCHECK_THREAD(thread_checker_);
   2621   return shared_memory_.get();
   2622 }
   2623 
   2624 size_t TracingServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb()
   2625     const {
   2626   return shared_buffer_page_size_kb_;
   2627 }
   2628 
   2629 void TracingServiceImpl::ProducerEndpointImpl::ActivateTriggers(
   2630     const std::vector<std::string>& triggers) {
   2631   service_->ActivateTriggers(id_, triggers);
   2632 }
   2633 
   2634 void TracingServiceImpl::ProducerEndpointImpl::StopDataSource(
   2635     DataSourceInstanceID ds_inst_id) {
   2636   // TODO(primiano): When we'll support tearing down the SMB, at this point we
   2637   // should send the Producer a TearDownTracing if all its data sources have
   2638   // been disabled (see b/77532839 and aosp/655179 PS1).
   2639   PERFETTO_DCHECK_THREAD(thread_checker_);
   2640   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2641   task_runner_->PostTask([weak_this, ds_inst_id] {
   2642     if (weak_this)
   2643       weak_this->producer_->StopDataSource(ds_inst_id);
   2644   });
   2645 }
   2646 
   2647 SharedMemoryArbiter*
   2648 TracingServiceImpl::ProducerEndpointImpl::GetInProcessShmemArbiter() {
   2649   if (!inproc_shmem_arbiter_) {
   2650     PERFETTO_FATAL(
   2651         "The in-process SharedMemoryArbiter can only be used when "
   2652         "CreateProducer has been called with in_process=true and after tracing "
   2653         "has started.");
   2654   }
   2655 
   2656   PERFETTO_DCHECK(in_process_);
   2657   return inproc_shmem_arbiter_.get();
   2658 }
   2659 
   2660 // Can be called on any thread.
   2661 std::unique_ptr<TraceWriter>
   2662 TracingServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
   2663   return GetInProcessShmemArbiter()->CreateTraceWriter(buf_id);
   2664 }
   2665 
   2666 void TracingServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(
   2667     FlushRequestID id) {
   2668   PERFETTO_DCHECK_THREAD(thread_checker_);
   2669   return GetInProcessShmemArbiter()->NotifyFlushComplete(id);
   2670 }
   2671 
   2672 void TracingServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
   2673   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2674   task_runner_->PostTask([weak_this] {
   2675     if (weak_this)
   2676       weak_this->producer_->OnTracingSetup();
   2677   });
   2678 }
   2679 
   2680 void TracingServiceImpl::ProducerEndpointImpl::Flush(
   2681     FlushRequestID flush_request_id,
   2682     const std::vector<DataSourceInstanceID>& data_sources) {
   2683   PERFETTO_DCHECK_THREAD(thread_checker_);
   2684   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2685   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
   2686     if (weak_this) {
   2687       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
   2688                                   data_sources.size());
   2689     }
   2690   });
   2691 }
   2692 
   2693 void TracingServiceImpl::ProducerEndpointImpl::SetupDataSource(
   2694     DataSourceInstanceID ds_id,
   2695     const DataSourceConfig& config) {
   2696   PERFETTO_DCHECK_THREAD(thread_checker_);
   2697   allowed_target_buffers_.insert(static_cast<BufferID>(config.target_buffer()));
   2698   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2699   task_runner_->PostTask([weak_this, ds_id, config] {
   2700     if (weak_this)
   2701       weak_this->producer_->SetupDataSource(ds_id, std::move(config));
   2702   });
   2703 }
   2704 
   2705 void TracingServiceImpl::ProducerEndpointImpl::StartDataSource(
   2706     DataSourceInstanceID ds_id,
   2707     const DataSourceConfig& config) {
   2708   PERFETTO_DCHECK_THREAD(thread_checker_);
   2709   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2710   task_runner_->PostTask([weak_this, ds_id, config] {
   2711     if (weak_this)
   2712       weak_this->producer_->StartDataSource(ds_id, std::move(config));
   2713   });
   2714 }
   2715 
   2716 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStarted(
   2717     DataSourceInstanceID data_source_id) {
   2718   PERFETTO_DCHECK_THREAD(thread_checker_);
   2719   service_->NotifyDataSourceStarted(id_, data_source_id);
   2720 }
   2721 
   2722 void TracingServiceImpl::ProducerEndpointImpl::NotifyDataSourceStopped(
   2723     DataSourceInstanceID data_source_id) {
   2724   PERFETTO_DCHECK_THREAD(thread_checker_);
   2725   service_->NotifyDataSourceStopped(id_, data_source_id);
   2726 }
   2727 
   2728 void TracingServiceImpl::ProducerEndpointImpl::OnFreeBuffers(
   2729     const std::vector<BufferID>& target_buffers) {
   2730   if (allowed_target_buffers_.empty())
   2731     return;
   2732   for (BufferID buffer : target_buffers)
   2733     allowed_target_buffers_.erase(buffer);
   2734 }
   2735 
   2736 void TracingServiceImpl::ProducerEndpointImpl::ClearIncrementalState(
   2737     const std::vector<DataSourceInstanceID>& data_sources) {
   2738   PERFETTO_DCHECK_THREAD(thread_checker_);
   2739   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   2740   task_runner_->PostTask([weak_this, data_sources] {
   2741     if (weak_this) {
   2742       weak_this->producer_->ClearIncrementalState(data_sources.data(),
   2743                                                   data_sources.size());
   2744     }
   2745   });
   2746 }
   2747 
   2748 ////////////////////////////////////////////////////////////////////////////////
   2749 // TracingServiceImpl::TracingSession implementation
   2750 ////////////////////////////////////////////////////////////////////////////////
   2751 
   2752 TracingServiceImpl::TracingSession::TracingSession(
   2753     TracingSessionID session_id,
   2754     ConsumerEndpointImpl* consumer,
   2755     const TraceConfig& new_config)
   2756     : id(session_id),
   2757       consumer_maybe_null(consumer),
   2758       consumer_uid(consumer->uid_),
   2759       config(new_config) {}
   2760 
   2761 }  // namespace perfetto
   2762