Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/tracing/core/service_impl.h"
     18 
     19 #include <errno.h>
     20 #include <inttypes.h>
     21 #include <limits.h>
     22 #include <string.h>
     23 #include <sys/uio.h>
     24 #include <unistd.h>
     25 
     26 #include <algorithm>
     27 
     28 #include "perfetto/base/build_config.h"
     29 #include "perfetto/base/task_runner.h"
     30 #include "perfetto/base/utils.h"
     31 #include "perfetto/tracing/core/consumer.h"
     32 #include "perfetto/tracing/core/data_source_config.h"
     33 #include "perfetto/tracing/core/producer.h"
     34 #include "perfetto/tracing/core/shared_memory.h"
     35 #include "perfetto/tracing/core/shared_memory_abi.h"
     36 #include "perfetto/tracing/core/trace_packet.h"
     37 #include "perfetto/tracing/core/trace_writer.h"
     38 #include "src/tracing/core/packet_stream_validator.h"
     39 #include "src/tracing/core/shared_memory_arbiter_impl.h"
     40 #include "src/tracing/core/trace_buffer.h"
     41 
     42 #include "perfetto/trace/clock_snapshot.pb.h"
     43 #include "perfetto/trace/trusted_packet.pb.h"
     44 
     45 // General note: this class must assume that Producers are malicious and will
     46 // try to crash / exploit this class. We can trust pointers because they come
     47 // from the IPC layer, but we should never assume that that the producer calls
     48 // come in the right order or their arguments are sane / within bounds.
     49 
     50 namespace perfetto {
     51 
     52 namespace {
     53 constexpr size_t kDefaultShmPageSize = base::kPageSize;
     54 constexpr int kMaxBuffersPerConsumer = 128;
     55 constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000);
     56 constexpr base::TimeMillis kStatsSnapshotInterval(10 * 1000);
     57 constexpr int kMinWriteIntoFilePeriodMs = 100;
     58 constexpr int kDefaultWriteIntoFilePeriodMs = 5000;
     59 constexpr int kFlushTimeoutMs = 1000;
     60 constexpr int kMaxConcurrentTracingSessions = 5;
     61 
     62 constexpr uint64_t kMillisPerHour = 3600000;
     63 
     64 // These apply only if enable_extra_guardrails is true.
     65 constexpr uint64_t kMaxTracingDurationMillis = 24 * kMillisPerHour;
     66 constexpr uint64_t kMaxTracingBufferSizeKb = 32 * 1024;
     67 }  // namespace
     68 
     69 // These constants instead are defined in the header because are used by tests.
     70 constexpr size_t ServiceImpl::kDefaultShmSize;
     71 constexpr size_t ServiceImpl::kMaxShmSize;
     72 
     73 // static
     74 std::unique_ptr<Service> Service::CreateInstance(
     75     std::unique_ptr<SharedMemory::Factory> shm_factory,
     76     base::TaskRunner* task_runner) {
     77   return std::unique_ptr<Service>(
     78       new ServiceImpl(std::move(shm_factory), task_runner));
     79 }
     80 
     81 ServiceImpl::ServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory,
     82                          base::TaskRunner* task_runner)
     83     : task_runner_(task_runner),
     84       shm_factory_(std::move(shm_factory)),
     85       uid_(getuid()),
     86       buffer_ids_(kMaxTraceBufferID),
     87       weak_ptr_factory_(this) {
     88   PERFETTO_DCHECK(task_runner_);
     89 }
     90 
     91 ServiceImpl::~ServiceImpl() {
     92   // TODO(fmayer): handle teardown of all Producer.
     93 }
     94 
     95 std::unique_ptr<Service::ProducerEndpoint> ServiceImpl::ConnectProducer(
     96     Producer* producer,
     97     uid_t uid,
     98     const std::string& producer_name,
     99     size_t shared_memory_size_hint_bytes) {
    100   PERFETTO_DCHECK_THREAD(thread_checker_);
    101 
    102   if (lockdown_mode_ && uid != geteuid()) {
    103     PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld",
    104                   static_cast<unsigned long>(uid));
    105     return nullptr;
    106   }
    107 
    108   if (producers_.size() >= kMaxProducerID) {
    109     PERFETTO_DCHECK(false);
    110     return nullptr;
    111   }
    112   const ProducerID id = GetNextProducerID();
    113   PERFETTO_DLOG("Producer %" PRIu16 " connected", id);
    114 
    115   std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl(
    116       id, uid, this, task_runner_, producer, producer_name));
    117   auto it_and_inserted = producers_.emplace(id, endpoint.get());
    118   PERFETTO_DCHECK(it_and_inserted.second);
    119   endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes;
    120   task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_));
    121 
    122   return std::move(endpoint);
    123 }
    124 
    125 void ServiceImpl::DisconnectProducer(ProducerID id) {
    126   PERFETTO_DCHECK_THREAD(thread_checker_);
    127   PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id);
    128   PERFETTO_DCHECK(producers_.count(id));
    129 
    130   for (auto it = data_sources_.begin(); it != data_sources_.end();) {
    131     auto next = it;
    132     next++;
    133     if (it->second.producer_id == id)
    134       UnregisterDataSource(id, it->second.descriptor.name());
    135     it = next;
    136   }
    137 
    138   producers_.erase(id);
    139   UpdateMemoryGuardrail();
    140 }
    141 
    142 ServiceImpl::ProducerEndpointImpl* ServiceImpl::GetProducer(
    143     ProducerID id) const {
    144   PERFETTO_DCHECK_THREAD(thread_checker_);
    145   auto it = producers_.find(id);
    146   if (it == producers_.end())
    147     return nullptr;
    148   return it->second;
    149 }
    150 
    151 std::unique_ptr<Service::ConsumerEndpoint> ServiceImpl::ConnectConsumer(
    152     Consumer* consumer) {
    153   PERFETTO_DCHECK_THREAD(thread_checker_);
    154   PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer));
    155   std::unique_ptr<ConsumerEndpointImpl> endpoint(
    156       new ConsumerEndpointImpl(this, task_runner_, consumer));
    157   auto it_and_inserted = consumers_.emplace(endpoint.get());
    158   PERFETTO_DCHECK(it_and_inserted.second);
    159   task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_));
    160   return std::move(endpoint);
    161 }
    162 
    163 void ServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) {
    164   PERFETTO_DCHECK_THREAD(thread_checker_);
    165   PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer));
    166   PERFETTO_DCHECK(consumers_.count(consumer));
    167 
    168   // TODO(primiano) : Check that this is safe (what happens if there are
    169   // ReadBuffers() calls posted in the meantime? They need to become noop).
    170   if (consumer->tracing_session_id_)
    171     FreeBuffers(consumer->tracing_session_id_);  // Will also DisableTracing().
    172   consumers_.erase(consumer);
    173 
    174 // At this point no more pointers to |consumer| should be around.
    175 #if PERFETTO_DCHECK_IS_ON()
    176   PERFETTO_DCHECK(!std::any_of(
    177       tracing_sessions_.begin(), tracing_sessions_.end(),
    178       [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) {
    179         return kv.second.consumer == consumer;
    180       }));
    181 #endif
    182 }
    183 
    184 bool ServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer,
    185                                 const TraceConfig& cfg,
    186                                 base::ScopedFile fd) {
    187   PERFETTO_DCHECK_THREAD(thread_checker_);
    188   PERFETTO_DLOG("Enabling tracing for consumer %p",
    189                 reinterpret_cast<void*>(consumer));
    190   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_SET)
    191     lockdown_mode_ = true;
    192   if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR)
    193     lockdown_mode_ = false;
    194   TracingSession* tracing_session =
    195       GetTracingSession(consumer->tracing_session_id_);
    196   if (tracing_session) {
    197     PERFETTO_DLOG(
    198         "A Consumer is trying to EnableTracing() but another tracing session "
    199         "is already active (forgot a call to FreeBuffers() ?)");
    200     return false;
    201   }
    202 
    203   if (cfg.enable_extra_guardrails()) {
    204     if (cfg.duration_ms() > kMaxTracingDurationMillis) {
    205       PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms  > %" PRIu64
    206                     " ms)",
    207                     cfg.duration_ms(), kMaxTracingDurationMillis);
    208       return false;
    209     }
    210     uint64_t buf_size_sum = 0;
    211     for (const auto& buf : cfg.buffers())
    212       buf_size_sum += buf.size_kb();
    213     if (buf_size_sum > kMaxTracingBufferSizeKb) {
    214       PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64
    215                     "kB  > %" PRIu64 " kB)",
    216                     buf_size_sum, kMaxTracingBufferSizeKb);
    217       return false;
    218     }
    219   }
    220 
    221   if (cfg.buffers_size() > kMaxBuffersPerConsumer) {
    222     PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size());
    223     return false;
    224   }
    225 
    226   // TODO(primiano): This is a workaround to prevent that a producer gets stuck
    227   // in a state where it stalls by design by having more TraceWriterImpl
    228   // instances than free pages in the buffer. This is really a bug in
    229   // trace_probes and the way it handles stalls in the shmem buffer.
    230   if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) {
    231     PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)",
    232                   tracing_sessions_.size());
    233     return false;
    234   }
    235 
    236   const TracingSessionID tsid = ++last_tracing_session_id_;
    237   tracing_session =
    238       &tracing_sessions_.emplace(tsid, TracingSession(consumer, cfg))
    239            .first->second;
    240 
    241   if (cfg.write_into_file()) {
    242     if (!fd) {
    243       PERFETTO_ELOG(
    244           "The TraceConfig had write_into_file==true but no fd was passed");
    245       return false;
    246     }
    247     tracing_session->write_into_file = std::move(fd);
    248     uint32_t write_period_ms = cfg.file_write_period_ms();
    249     if (write_period_ms == 0)
    250       write_period_ms = kDefaultWriteIntoFilePeriodMs;
    251     if (write_period_ms < kMinWriteIntoFilePeriodMs)
    252       write_period_ms = kMinWriteIntoFilePeriodMs;
    253     tracing_session->write_period_ms = write_period_ms;
    254     tracing_session->max_file_size_bytes = cfg.max_file_size_bytes();
    255     tracing_session->bytes_written_into_file = 0;
    256   }
    257 
    258   // Initialize the log buffers.
    259   bool did_allocate_all_buffers = true;
    260 
    261   // Allocate the trace buffers. Also create a map to translate a consumer
    262   // relative index (TraceConfig.DataSourceConfig.target_buffer) into the
    263   // corresponding BufferID, which is a global ID namespace for the service and
    264   // all producers.
    265   size_t total_buf_size_kb = 0;
    266   const size_t num_buffers = static_cast<size_t>(cfg.buffers_size());
    267   tracing_session->buffers_index.reserve(num_buffers);
    268   for (size_t i = 0; i < num_buffers; i++) {
    269     const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i];
    270     BufferID global_id = buffer_ids_.Allocate();
    271     if (!global_id) {
    272       did_allocate_all_buffers = false;  // We ran out of IDs.
    273       break;
    274     }
    275     tracing_session->buffers_index.push_back(global_id);
    276     const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u;
    277     total_buf_size_kb += buffer_cfg.size_kb();
    278     auto it_and_inserted =
    279         buffers_.emplace(global_id, TraceBuffer::Create(buf_size_bytes));
    280     PERFETTO_DCHECK(it_and_inserted.second);  // buffers_.count(global_id) == 0.
    281     std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second;
    282     if (!trace_buffer) {
    283       did_allocate_all_buffers = false;
    284       break;
    285     }
    286   }
    287 
    288   UpdateMemoryGuardrail();
    289 
    290   // This can happen if either:
    291   // - All the kMaxTraceBufferID slots are taken.
    292   // - OOM, or, more relistically, we exhausted virtual memory.
    293   // In any case, free all the previously allocated buffers and abort.
    294   // TODO(fmayer): add a test to cover this case, this is quite subtle.
    295   if (!did_allocate_all_buffers) {
    296     for (BufferID global_id : tracing_session->buffers_index) {
    297       buffer_ids_.Free(global_id);
    298       buffers_.erase(global_id);
    299     }
    300     tracing_sessions_.erase(tsid);
    301     return false;
    302   }
    303 
    304   consumer->tracing_session_id_ = tsid;
    305 
    306   // Enable the data sources on the producers.
    307   for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) {
    308     // Scan all the registered data sources with a matching name.
    309     auto range = data_sources_.equal_range(cfg_data_source.config().name());
    310     for (auto it = range.first; it != range.second; it++) {
    311       TraceConfig::ProducerConfig producer_config;
    312       for (auto& config : cfg.producers()) {
    313         if (GetProducer(it->second.producer_id)->name_ ==
    314             config.producer_name()) {
    315           producer_config = config;
    316           break;
    317         }
    318       }
    319       CreateDataSourceInstance(cfg_data_source, producer_config, it->second,
    320                                tracing_session);
    321     }
    322   }
    323 
    324   // Trigger delayed task if the trace is time limited.
    325   const uint32_t trace_duration_ms = cfg.duration_ms();
    326   if (trace_duration_ms > 0) {
    327     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    328     task_runner_->PostDelayedTask(
    329         [weak_this, tsid] {
    330           if (weak_this)
    331             weak_this->FlushAndDisableTracing(tsid);
    332         },
    333         trace_duration_ms);
    334   }
    335 
    336   // Start the periodic drain tasks if we should to save the trace into a file.
    337   if (cfg.write_into_file()) {
    338     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    339     task_runner_->PostDelayedTask(
    340         [weak_this, tsid] {
    341           if (weak_this)
    342             weak_this->ReadBuffers(tsid, nullptr);
    343         },
    344         tracing_session->delay_to_next_write_period_ms());
    345   }
    346 
    347   tracing_session->tracing_enabled = true;
    348   PERFETTO_LOG(
    349       "Enabled tracing, #sources:%zu, duration:%d ms, #buffers:%d, total "
    350       "buffer size:%zu KB, total sessions:%zu",
    351       cfg.data_sources().size(), trace_duration_ms, cfg.buffers_size(),
    352       total_buf_size_kb, tracing_sessions_.size());
    353   return true;
    354 }
    355 
    356 // DisableTracing just stops the data sources but doesn't free up any buffer.
    357 // This is to allow the consumer to freeze the buffers (by stopping the trace)
    358 // and then drain the buffers. The actual teardown of the TracingSession happens
    359 // in FreeBuffers().
    360 void ServiceImpl::DisableTracing(TracingSessionID tsid) {
    361   PERFETTO_DCHECK_THREAD(thread_checker_);
    362   TracingSession* tracing_session = GetTracingSession(tsid);
    363   if (!tracing_session) {
    364     // Can happen if the consumer calls this before EnableTracing() or after
    365     // FreeBuffers().
    366     PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid);
    367     return;
    368   }
    369 
    370   for (const auto& data_source_inst : tracing_session->data_source_instances) {
    371     const ProducerID producer_id = data_source_inst.first;
    372     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
    373     ProducerEndpointImpl* producer = GetProducer(producer_id);
    374     producer->TearDownDataSource(ds_inst_id);
    375   }
    376   tracing_session->data_source_instances.clear();
    377 
    378   // If the client requested us to periodically save the buffer into the passed
    379   // file, force a write pass.
    380   if (tracing_session->write_into_file) {
    381     tracing_session->write_period_ms = 0;
    382     ReadBuffers(tsid, nullptr);
    383   }
    384 
    385   if (tracing_session->tracing_enabled) {
    386     tracing_session->tracing_enabled = false;
    387     tracing_session->consumer->NotifyOnTracingDisabled();
    388   }
    389 
    390   // Deliberately NOT removing the session from |tracing_session_|, it's still
    391   // needed to call ReadBuffers(). FreeBuffers() will erase() the session.
    392 }
    393 
    394 void ServiceImpl::Flush(TracingSessionID tsid,
    395                         uint32_t timeout_ms,
    396                         ConsumerEndpoint::FlushCallback callback) {
    397   PERFETTO_DCHECK_THREAD(thread_checker_);
    398   TracingSession* tracing_session = GetTracingSession(tsid);
    399   if (!tracing_session) {
    400     PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid);
    401     return;
    402   }
    403 
    404   if (tracing_session->pending_flushes.size() > 1000) {
    405     PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session",
    406                   tracing_session->pending_flushes.size());
    407     callback(false);
    408     return;
    409   }
    410 
    411   FlushRequestID flush_request_id = ++last_flush_request_id_;
    412   PendingFlush& pending_flush =
    413       tracing_session->pending_flushes
    414           .emplace_hint(tracing_session->pending_flushes.end(),
    415                         flush_request_id, PendingFlush(std::move(callback)))
    416           ->second;
    417 
    418   // Send a flush request to each producer involved in the tracing session. In
    419   // order to issue a flush request we have to build a map of all data source
    420   // instance ids enabled for each producer.
    421   std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map;
    422   for (const auto& data_source_inst : tracing_session->data_source_instances) {
    423     const ProducerID producer_id = data_source_inst.first;
    424     const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id;
    425     flush_map[producer_id].push_back(ds_inst_id);
    426   }
    427 
    428   for (const auto& kv : flush_map) {
    429     ProducerID producer_id = kv.first;
    430     ProducerEndpointImpl* producer = GetProducer(producer_id);
    431     const std::vector<DataSourceInstanceID>& data_sources = kv.second;
    432     producer->Flush(flush_request_id, data_sources);
    433     pending_flush.producers.insert(producer_id);
    434   }
    435 
    436   auto weak_this = weak_ptr_factory_.GetWeakPtr();
    437   task_runner_->PostDelayedTask(
    438       [weak_this, tsid, flush_request_id] {
    439         if (weak_this)
    440           weak_this->OnFlushTimeout(tsid, flush_request_id);
    441       },
    442       timeout_ms);
    443 }
    444 
    445 void ServiceImpl::NotifyFlushDoneForProducer(ProducerID producer_id,
    446                                              FlushRequestID flush_request_id) {
    447   for (auto& kv : tracing_sessions_) {
    448     // Remove all pending flushes <= |flush_request_id| for |producer_id|.
    449     auto& pending_flushes = kv.second.pending_flushes;
    450     auto end_it = pending_flushes.upper_bound(flush_request_id);
    451     for (auto it = pending_flushes.begin(); it != end_it;) {
    452       PendingFlush& pending_flush = it->second;
    453       pending_flush.producers.erase(producer_id);
    454       if (pending_flush.producers.empty()) {
    455         task_runner_->PostTask(
    456             std::bind(std::move(pending_flush.callback), /*success=*/true));
    457         it = pending_flushes.erase(it);
    458       } else {
    459         it++;
    460       }
    461     }  // for (pending_flushes)
    462   }    // for (tracing_session)
    463 }
    464 
    465 void ServiceImpl::OnFlushTimeout(TracingSessionID tsid,
    466                                  FlushRequestID flush_request_id) {
    467   TracingSession* tracing_session = GetTracingSession(tsid);
    468   if (!tracing_session)
    469     return;
    470   auto it = tracing_session->pending_flushes.find(flush_request_id);
    471   if (it == tracing_session->pending_flushes.end())
    472     return;  // Nominal case: flush was completed and acked on time.
    473   auto callback = std::move(it->second.callback);
    474   tracing_session->pending_flushes.erase(it);
    475   callback(/*success=*/false);
    476 }
    477 
    478 void ServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) {
    479   PERFETTO_DCHECK_THREAD(thread_checker_);
    480   auto weak_this = weak_ptr_factory_.GetWeakPtr();
    481   Flush(tsid, kFlushTimeoutMs, [weak_this, tsid](bool success) {
    482     PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64,
    483                   success, tsid);
    484     if (weak_this)
    485       weak_this->DisableTracing(tsid);
    486   });
    487 }
    488 
    489 // Note: when this is called to write into a file passed when starting tracing
    490 // |consumer| will be == nullptr (as opposite to the case of a consumer asking
    491 // to send the trace data back over IPC).
    492 void ServiceImpl::ReadBuffers(TracingSessionID tsid,
    493                               ConsumerEndpointImpl* consumer) {
    494   PERFETTO_DCHECK_THREAD(thread_checker_);
    495   TracingSession* tracing_session = GetTracingSession(tsid);
    496   if (!tracing_session) {
    497     // This will be hit systematically from the PostDelayedTask when directly
    498     // writing into the file (in which case consumer == nullptr). Suppress the
    499     // log in this case as it's just spam.
    500     if (consumer)
    501       PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active");
    502     return;  // TODO(primiano): signal failure?
    503   }
    504 
    505   // This can happen if the file is closed by a previous task because it reaches
    506   // |max_file_size_bytes|.
    507   if (!tracing_session->write_into_file && !consumer)
    508     return;
    509 
    510   if (tracing_session->write_into_file && consumer) {
    511     // If the consumer enabled tracing and asked to save the contents into the
    512     // passed file makes little sense to also try to read the buffers over IPC,
    513     // as that would just steal data from the periodic draining task.
    514     PERFETTO_DCHECK(false);
    515     return;
    516   }
    517 
    518   std::vector<TracePacket> packets;
    519   packets.reserve(1024);  // Just an educated guess to avoid trivial expansions.
    520   MaybeSnapshotClocks(tracing_session, &packets);
    521   MaybeSnapshotStats(tracing_session, &packets);
    522   MaybeEmitTraceConfig(tracing_session, &packets);
    523 
    524   size_t packets_bytes = 0;  // SUM(slice.size() for each slice in |packets|).
    525   size_t total_slices = 0;   // SUM(#slices in |packets|).
    526 
    527   // Add up size for packets added by the Maybe* calls above.
    528   for (const TracePacket& packet : packets) {
    529     packets_bytes += packet.size();
    530     total_slices += packet.slices().size();
    531   }
    532 
    533   // This is a rough threshold to determine how much to read from the buffer in
    534   // each task. This is to avoid executing a single huge sending task for too
    535   // long and risk to hit the watchdog. This is *not* an upper bound: we just
    536   // stop accumulating new packets and PostTask *after* we cross this threshold.
    537   // This constant essentially balances the PostTask and IPC overhead vs the
    538   // responsiveness of the service. An extremely small value will cause one IPC
    539   // and one PostTask for each slice but will keep the service extremely
    540   // responsive. An extremely large value will batch the send for the full
    541   // buffer in one large task, will hit the blocking send() once the socket
    542   // buffers are full and hang the service for a bit (until the consumer
    543   // catches up).
    544   static constexpr size_t kApproxBytesPerTask = 32768;
    545   bool did_hit_threshold = false;
    546 
    547   // TODO(primiano): Extend the ReadBuffers API to allow reading only some
    548   // buffers, not all of them in one go.
    549   for (size_t buf_idx = 0;
    550        buf_idx < tracing_session->num_buffers() && !did_hit_threshold;
    551        buf_idx++) {
    552     auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]);
    553     if (tbuf_iter == buffers_.end()) {
    554       PERFETTO_DCHECK(false);
    555       continue;
    556     }
    557     TraceBuffer& tbuf = *tbuf_iter->second;
    558     tbuf.BeginRead();
    559     while (!did_hit_threshold) {
    560       TracePacket packet;
    561       uid_t producer_uid = kInvalidUid;
    562       if (!tbuf.ReadNextTracePacket(&packet, &producer_uid))
    563         break;
    564       PERFETTO_DCHECK(producer_uid != kInvalidUid);
    565       PERFETTO_DCHECK(packet.size() > 0);
    566       if (!PacketStreamValidator::Validate(packet.slices())) {
    567         PERFETTO_DLOG("Dropping invalid packet");
    568         continue;
    569       }
    570 
    571       // Append a slice with the trusted UID of the producer. This can't
    572       // be spoofed because above we validated that the existing slices
    573       // don't contain any trusted UID fields. For added safety we append
    574       // instead of prepending because according to protobuf semantics, if
    575       // the same field is encountered multiple times the last instance
    576       // takes priority. Note that truncated packets are also rejected, so
    577       // the producer can't give us a partial packet (e.g., a truncated
    578       // string) which only becomes valid when the UID is appended here.
    579       protos::TrustedPacket trusted_packet;
    580       trusted_packet.set_trusted_uid(static_cast<int32_t>(producer_uid));
    581       static constexpr size_t kTrustedBufSize = 16;
    582       Slice slice = Slice::Allocate(kTrustedBufSize);
    583       PERFETTO_CHECK(
    584           trusted_packet.SerializeToArray(slice.own_data(), kTrustedBufSize));
    585       slice.size = static_cast<size_t>(trusted_packet.GetCachedSize());
    586       PERFETTO_DCHECK(slice.size > 0 && slice.size <= kTrustedBufSize);
    587       packet.AddSlice(std::move(slice));
    588 
    589       // Append the packet (inclusive of the trusted uid) to |packets|.
    590       packets_bytes += packet.size();
    591       total_slices += packet.slices().size();
    592       did_hit_threshold = packets_bytes >= kApproxBytesPerTask &&
    593                           !tracing_session->write_into_file;
    594       packets.emplace_back(std::move(packet));
    595     }  // for(packets...)
    596   }    // for(buffers...)
    597 
    598   // If the caller asked us to write into a file by setting
    599   // |write_into_file| == true in the trace config, drain the packets read
    600   // (if any) into the given file descriptor.
    601   if (tracing_session->write_into_file) {
    602     const uint64_t max_size = tracing_session->max_file_size_bytes
    603                                   ? tracing_session->max_file_size_bytes
    604                                   : std::numeric_limits<size_t>::max();
    605 
    606     // When writing into a file, the file should look like a root trace.proto
    607     // message. Each packet should be prepended with a proto preamble stating
    608     // its field id (within trace.proto) and size. Hence the addition below.
    609     const size_t max_iovecs = total_slices + packets.size();
    610 
    611     size_t num_iovecs = 0;
    612     bool stop_writing_into_file = tracing_session->write_period_ms == 0;
    613     std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]);
    614     size_t num_iovecs_at_last_packet = 0;
    615     uint64_t bytes_about_to_be_written = 0;
    616     for (TracePacket& packet : packets) {
    617       std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) =
    618           packet.GetProtoPreamble();
    619       bytes_about_to_be_written += iovecs[num_iovecs].iov_len;
    620       num_iovecs++;
    621       for (const Slice& slice : packet.slices()) {
    622         // writev() doesn't change the passed pointer. However, struct iovec
    623         // take a non-const ptr because it's the same struct used by readv().
    624         // Hence the const_cast here.
    625         char* start = static_cast<char*>(const_cast<void*>(slice.start));
    626         bytes_about_to_be_written += slice.size;
    627         iovecs[num_iovecs++] = {start, slice.size};
    628       }
    629 
    630       if (tracing_session->bytes_written_into_file +
    631               bytes_about_to_be_written >=
    632           max_size) {
    633         stop_writing_into_file = true;
    634         num_iovecs = num_iovecs_at_last_packet;
    635         break;
    636       }
    637 
    638       num_iovecs_at_last_packet = num_iovecs;
    639     }
    640     PERFETTO_DCHECK(num_iovecs <= max_iovecs);
    641     int fd = *tracing_session->write_into_file;
    642 
    643     uint64_t total_wr_size = 0;
    644 
    645     // writev() can take at most IOV_MAX entries per call. Batch them.
    646     constexpr size_t kIOVMax = IOV_MAX;
    647     for (size_t i = 0; i < num_iovecs; i += kIOVMax) {
    648       int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax));
    649       ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size));
    650       if (wr_size <= 0) {
    651         PERFETTO_PLOG("writev() failed");
    652         stop_writing_into_file = true;
    653         break;
    654       }
    655       total_wr_size += static_cast<size_t>(wr_size);
    656     }
    657 
    658     tracing_session->bytes_written_into_file += total_wr_size;
    659 
    660     PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d",
    661                   (total_wr_size + 1023) / 1024, stop_writing_into_file);
    662     if (stop_writing_into_file) {
    663       tracing_session->write_into_file.reset();
    664       tracing_session->write_period_ms = 0;
    665       DisableTracing(tsid);
    666       return;
    667     }
    668 
    669     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    670     task_runner_->PostDelayedTask(
    671         [weak_this, tsid] {
    672           if (weak_this)
    673             weak_this->ReadBuffers(tsid, nullptr);
    674         },
    675         tracing_session->delay_to_next_write_period_ms());
    676     return;
    677   }  // if (tracing_session->write_into_file)
    678 
    679   const bool has_more = did_hit_threshold;
    680   if (has_more) {
    681     auto weak_consumer = consumer->GetWeakPtr();
    682     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    683     task_runner_->PostTask([weak_this, weak_consumer, tsid] {
    684       if (!weak_this || !weak_consumer)
    685         return;
    686       weak_this->ReadBuffers(tsid, weak_consumer.get());
    687     });
    688   }
    689 
    690   // Keep this as tail call, just in case the consumer re-enters.
    691   consumer->consumer_->OnTraceData(std::move(packets), has_more);
    692 }
    693 
    694 void ServiceImpl::FreeBuffers(TracingSessionID tsid) {
    695   PERFETTO_DCHECK_THREAD(thread_checker_);
    696   PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid);
    697   TracingSession* tracing_session = GetTracingSession(tsid);
    698   if (!tracing_session) {
    699     PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid);
    700     return;  // TODO(primiano): signal failure?
    701   }
    702   DisableTracing(tsid);
    703 
    704   for (BufferID buffer_id : tracing_session->buffers_index) {
    705     buffer_ids_.Free(buffer_id);
    706     PERFETTO_DCHECK(buffers_.count(buffer_id) == 1);
    707     buffers_.erase(buffer_id);
    708   }
    709   tracing_sessions_.erase(tsid);
    710   UpdateMemoryGuardrail();
    711 
    712   PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid,
    713                tracing_sessions_.size());
    714 }
    715 
    716 void ServiceImpl::RegisterDataSource(ProducerID producer_id,
    717                                      const DataSourceDescriptor& desc) {
    718   PERFETTO_DCHECK_THREAD(thread_checker_);
    719   PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"",
    720                 producer_id, desc.name().c_str());
    721 
    722   PERFETTO_DCHECK(!desc.name().empty());
    723   auto reg_ds = data_sources_.emplace(desc.name(),
    724                                       RegisteredDataSource{producer_id, desc});
    725 
    726   // If there are existing tracing sessions, we need to check if the new
    727   // data source is enabled by any of them.
    728   if (tracing_sessions_.empty())
    729     return;
    730 
    731   ProducerEndpointImpl* producer = GetProducer(producer_id);
    732   if (!producer) {
    733     PERFETTO_DCHECK(false);
    734     return;
    735   }
    736 
    737   for (auto& iter : tracing_sessions_) {
    738     TracingSession& tracing_session = iter.second;
    739     TraceConfig::ProducerConfig producer_config;
    740     for (auto& config : tracing_session.config.producers()) {
    741       if (producer->name_ == config.producer_name()) {
    742         producer_config = config;
    743         break;
    744       }
    745     }
    746     for (const TraceConfig::DataSource& cfg_data_source :
    747          tracing_session.config.data_sources()) {
    748       if (cfg_data_source.config().name() == desc.name())
    749         CreateDataSourceInstance(cfg_data_source, producer_config,
    750                                  reg_ds->second, &tracing_session);
    751     }
    752   }
    753 }
    754 
    755 void ServiceImpl::UnregisterDataSource(ProducerID producer_id,
    756                                        const std::string& name) {
    757   PERFETTO_DCHECK_THREAD(thread_checker_);
    758   PERFETTO_CHECK(producer_id);
    759   ProducerEndpointImpl* producer = GetProducer(producer_id);
    760   PERFETTO_DCHECK(producer);
    761   for (auto& kv : tracing_sessions_) {
    762     auto& ds_instances = kv.second.data_source_instances;
    763     for (auto it = ds_instances.begin(); it != ds_instances.end();) {
    764       if (it->first == producer_id && it->second.data_source_name == name) {
    765         DataSourceInstanceID ds_inst_id = it->second.instance_id;
    766         producer->TearDownDataSource(ds_inst_id);
    767         it = ds_instances.erase(it);
    768       } else {
    769         ++it;
    770       }
    771     }  // for (data_source_instances)
    772   }    // for (tracing_session)
    773 
    774   for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) {
    775     if (it->second.producer_id == producer_id &&
    776         it->second.descriptor.name() == name) {
    777       data_sources_.erase(it);
    778       return;
    779     }
    780   }
    781 
    782   PERFETTO_DLOG(
    783       "Tried to unregister a non-existent data source \"%s\" for "
    784       "producer %" PRIu16,
    785       name.c_str(), producer_id);
    786   PERFETTO_DCHECK(false);
    787 }
    788 
    789 void ServiceImpl::CreateDataSourceInstance(
    790     const TraceConfig::DataSource& cfg_data_source,
    791     const TraceConfig::ProducerConfig& producer_config,
    792     const RegisteredDataSource& data_source,
    793     TracingSession* tracing_session) {
    794   PERFETTO_DCHECK_THREAD(thread_checker_);
    795   ProducerEndpointImpl* producer = GetProducer(data_source.producer_id);
    796   PERFETTO_DCHECK(producer);
    797   // An existing producer that is not ftrace could have registered itself as
    798   // ftrace, we must not enable it in that case.
    799   if (lockdown_mode_ && producer->uid_ != uid_) {
    800     PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_);
    801     return;
    802   }
    803   // TODO(primiano): Add tests for registration ordering
    804   // (data sources vs consumers).
    805   if (!cfg_data_source.producer_name_filter().empty()) {
    806     if (std::find(cfg_data_source.producer_name_filter().begin(),
    807                   cfg_data_source.producer_name_filter().end(),
    808                   producer->name_) ==
    809         cfg_data_source.producer_name_filter().end()) {
    810       PERFETTO_DLOG("Data source: %s is filtered out for producer: %s",
    811                     cfg_data_source.config().name().c_str(),
    812                     producer->name_.c_str());
    813       return;
    814     }
    815   }
    816 
    817   // Create a copy of the DataSourceConfig specified in the trace config. This
    818   // will be passed to the producer after translating the |target_buffer| id.
    819   // The |target_buffer| parameter passed by the consumer in the trace config is
    820   // relative to the buffers declared in the same trace config. This has to be
    821   // translated to the global BufferID before passing it to the producers, which
    822   // don't know anything about tracing sessions and consumers.
    823 
    824   DataSourceConfig ds_config = cfg_data_source.config();  // Deliberate copy.
    825   ds_config.set_trace_duration_ms(tracing_session->config.duration_ms());
    826   auto relative_buffer_id = ds_config.target_buffer();
    827   if (relative_buffer_id >= tracing_session->num_buffers()) {
    828     PERFETTO_LOG(
    829         "The TraceConfig for DataSource %s specified a target_buffer out of "
    830         "bound (%d). Skipping it.",
    831         ds_config.name().c_str(), relative_buffer_id);
    832     return;
    833   }
    834   BufferID global_id = tracing_session->buffers_index[relative_buffer_id];
    835   PERFETTO_DCHECK(global_id);
    836   ds_config.set_target_buffer(global_id);
    837 
    838   DataSourceInstanceID inst_id = ++last_data_source_instance_id_;
    839   tracing_session->data_source_instances.emplace(
    840       producer->id_,
    841       DataSourceInstance{inst_id, data_source.descriptor.name()});
    842   PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16,
    843                 ds_config.name().c_str(), global_id);
    844   if (!producer->shared_memory()) {
    845     // Determine the SMB page size. Must be an integer multiple of 4k.
    846     size_t page_size = std::min<size_t>(producer_config.page_size_kb() * 1024,
    847                                         SharedMemoryABI::kMaxPageSize);
    848     if (page_size < base::kPageSize || page_size % base::kPageSize != 0)
    849       page_size = kDefaultShmPageSize;
    850     producer->shared_buffer_page_size_kb_ = page_size / 1024;
    851 
    852     // Determine the SMB size. Must be an integer multiple of the SMB page size.
    853     // The decisional tree is as follows:
    854     // 1. Give priority to what defined in the trace config.
    855     // 2. If unset give priority to the hint passed by the producer.
    856     // 3. Keep within bounds and ensure it's a multiple of the page size.
    857     size_t shm_size = producer_config.shm_size_kb() * 1024;
    858     if (shm_size == 0)
    859       shm_size = producer->shmem_size_hint_bytes_;
    860     shm_size = std::min<size_t>(shm_size, kMaxShmSize);
    861     if (shm_size < page_size || shm_size % page_size)
    862       shm_size = kDefaultShmSize;
    863 
    864     // TODO(primiano): right now Create() will suicide in case of OOM if the
    865     // mmap fails. We should instead gracefully fail the request and tell the
    866     // client to go away.
    867     auto shared_memory = shm_factory_->CreateSharedMemory(shm_size);
    868     producer->SetSharedMemory(std::move(shared_memory));
    869     producer->OnTracingSetup();
    870     UpdateMemoryGuardrail();
    871   }
    872   producer->CreateDataSourceInstance(inst_id, ds_config);
    873 }
    874 
    875 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer
    876 // might be lying / returning garbage contents. |src| and |size| can be trusted
    877 // in terms of being a valid pointer, but not the contents.
    878 void ServiceImpl::CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted,
    879                                                 uid_t producer_uid_trusted,
    880                                                 WriterID writer_id,
    881                                                 ChunkID chunk_id,
    882                                                 BufferID buffer_id,
    883                                                 uint16_t num_fragments,
    884                                                 uint8_t chunk_flags,
    885                                                 const uint8_t* src,
    886                                                 size_t size) {
    887   PERFETTO_DCHECK_THREAD(thread_checker_);
    888   TraceBuffer* buf = GetBufferByID(buffer_id);
    889   if (!buf) {
    890     PERFETTO_DLOG("Could not find target buffer %" PRIu16
    891                   " for producer %" PRIu16,
    892                   buffer_id, producer_id_trusted);
    893     return;
    894   }
    895 
    896   // TODO(primiano): we should have a set<BufferID> |allowed_target_buffers| in
    897   // ProducerEndpointImpl to perform ACL checks and prevent that the Producer
    898   // passes a |target_buffer| which is valid, but that we never asked it to use.
    899   // Essentially we want to prevent a malicious producer to inject data into a
    900   // log buffer that has nothing to do with it.
    901 
    902   buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id,
    903                           chunk_id, num_fragments, chunk_flags, src, size);
    904 }
    905 
    906 void ServiceImpl::ApplyChunkPatches(
    907     ProducerID producer_id_trusted,
    908     const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) {
    909   PERFETTO_DCHECK_THREAD(thread_checker_);
    910 
    911   for (const auto& chunk : chunks_to_patch) {
    912     const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id());
    913     const WriterID writer_id = static_cast<WriterID>(chunk.writer_id());
    914     TraceBuffer* buf =
    915         GetBufferByID(static_cast<BufferID>(chunk.target_buffer()));
    916     static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID,
    917                   "Add a '|| chunk_id > kMaxChunkID' below if this fails");
    918     if (!writer_id || writer_id > kMaxWriterID || !buf) {
    919       PERFETTO_DLOG(
    920           "Received invalid chunks_to_patch request from Producer: %" PRIu16
    921           ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16,
    922           producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id);
    923       continue;
    924     }
    925     // Speculate on the fact that there are going to be a limited amount of
    926     // patches per request, so we can allocate the |patches| array on the stack.
    927     std::array<TraceBuffer::Patch, 1024> patches;  // Uninitialized.
    928     if (chunk.patches().size() > patches.size()) {
    929       PERFETTO_DLOG("Too many patches (%zu) batched in the same request",
    930                     patches.size());
    931       PERFETTO_DCHECK(false);
    932       continue;
    933     }
    934 
    935     size_t i = 0;
    936     for (const auto& patch : chunk.patches()) {
    937       const std::string& patch_data = patch.data();
    938       if (patch_data.size() != patches[i].data.size()) {
    939         PERFETTO_DLOG("Received patch from producer: %" PRIu16
    940                       " of unexpected size %zu",
    941                       producer_id_trusted, patch_data.size());
    942         continue;
    943       }
    944       patches[i].offset_untrusted = patch.offset();
    945       memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size());
    946       i++;
    947     }
    948     buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id,
    949                                &patches[0], i, chunk.has_more_patches());
    950   }
    951 }
    952 
    953 ServiceImpl::TracingSession* ServiceImpl::GetTracingSession(
    954     TracingSessionID tsid) {
    955   PERFETTO_DCHECK_THREAD(thread_checker_);
    956   auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end();
    957   if (it == tracing_sessions_.end())
    958     return nullptr;
    959   return &it->second;
    960 }
    961 
    962 ProducerID ServiceImpl::GetNextProducerID() {
    963   PERFETTO_DCHECK_THREAD(thread_checker_);
    964   PERFETTO_CHECK(producers_.size() < kMaxProducerID);
    965   do {
    966     ++last_producer_id_;
    967   } while (producers_.count(last_producer_id_) || last_producer_id_ == 0);
    968   PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID);
    969   return last_producer_id_;
    970 }
    971 
    972 TraceBuffer* ServiceImpl::GetBufferByID(BufferID buffer_id) {
    973   auto buf_iter = buffers_.find(buffer_id);
    974   if (buf_iter == buffers_.end())
    975     return nullptr;
    976   return &*buf_iter->second;
    977 }
    978 
    979 void ServiceImpl::UpdateMemoryGuardrail() {
    980 #if !PERFETTO_BUILDFLAG(PERFETTO_CHROMIUM_BUILD) && \
    981     !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
    982   uint64_t total_buffer_bytes = 0;
    983 
    984   // Sum up all the shared memory buffers.
    985   for (const auto& id_to_producer : producers_) {
    986     if (id_to_producer.second->shared_memory())
    987       total_buffer_bytes += id_to_producer.second->shared_memory()->size();
    988   }
    989 
    990   // Sum up all the trace buffers.
    991   for (const auto& id_to_buffer : buffers_) {
    992     total_buffer_bytes += id_to_buffer.second->size();
    993   }
    994 
    995   // Set the guard rail to 32MB + the sum of all the buffers over a 30 second
    996   // interval.
    997   uint64_t guardrail = 32 * 1024 * 1024 + total_buffer_bytes;
    998   base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000);
    999 #endif
   1000 }
   1001 
   1002 void ServiceImpl::MaybeSnapshotClocks(TracingSession* tracing_session,
   1003                                       std::vector<TracePacket>* packets) {
   1004   base::TimeMillis now = base::GetWallTimeMs();
   1005   if (now < tracing_session->last_clock_snapshot + kClockSnapshotInterval)
   1006     return;
   1007   tracing_session->last_clock_snapshot = now;
   1008 
   1009   protos::TrustedPacket packet;
   1010   protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot();
   1011 
   1012 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   1013   struct {
   1014     clockid_t id;
   1015     protos::ClockSnapshot::Clock::Type type;
   1016     struct timespec ts;
   1017   } clocks[] = {
   1018       {CLOCK_BOOTTIME, protos::ClockSnapshot::Clock::BOOTTIME, {0, 0}},
   1019       {CLOCK_REALTIME_COARSE,
   1020        protos::ClockSnapshot::Clock::REALTIME_COARSE,
   1021        {0, 0}},
   1022       {CLOCK_MONOTONIC_COARSE,
   1023        protos::ClockSnapshot::Clock::MONOTONIC_COARSE,
   1024        {0, 0}},
   1025       {CLOCK_REALTIME, protos::ClockSnapshot::Clock::REALTIME, {0, 0}},
   1026       {CLOCK_MONOTONIC, protos::ClockSnapshot::Clock::MONOTONIC, {0, 0}},
   1027       {CLOCK_MONOTONIC_RAW,
   1028        protos::ClockSnapshot::Clock::MONOTONIC_RAW,
   1029        {0, 0}},
   1030       {CLOCK_PROCESS_CPUTIME_ID,
   1031        protos::ClockSnapshot::Clock::PROCESS_CPUTIME,
   1032        {0, 0}},
   1033       {CLOCK_THREAD_CPUTIME_ID,
   1034        protos::ClockSnapshot::Clock::THREAD_CPUTIME,
   1035        {0, 0}},
   1036   };
   1037   // First snapshot all the clocks as atomically as we can.
   1038   for (auto& clock : clocks) {
   1039     if (clock_gettime(clock.id, &clock.ts) == -1)
   1040       PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id);
   1041   }
   1042   for (auto& clock : clocks) {
   1043     protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
   1044     c->set_type(clock.type);
   1045     c->set_timestamp(
   1046         static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count()));
   1047   }
   1048 #else   // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   1049   protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks();
   1050   c->set_type(protos::ClockSnapshot::Clock::MONOTONIC);
   1051   c->set_timestamp(static_cast<uint64_t>(base::GetWallTimeNs().count()));
   1052 #endif  // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX)
   1053 
   1054   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   1055   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   1056   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   1057   packets->emplace_back();
   1058   packets->back().AddSlice(std::move(slice));
   1059 }
   1060 
   1061 void ServiceImpl::MaybeSnapshotStats(TracingSession* tracing_session,
   1062                                      std::vector<TracePacket>* packets) {
   1063   base::TimeMillis now = base::GetWallTimeMs();
   1064   if (now < tracing_session->last_stats_snapshot + kStatsSnapshotInterval)
   1065     return;
   1066   tracing_session->last_stats_snapshot = now;
   1067 
   1068   protos::TrustedPacket packet;
   1069   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   1070 
   1071   protos::TraceStats* trace_stats = packet.mutable_trace_stats();
   1072   trace_stats->set_producers_connected(
   1073       static_cast<uint32_t>(producers_.size()));
   1074   trace_stats->set_producers_seen(last_producer_id_);
   1075   trace_stats->set_data_sources_registered(
   1076       static_cast<uint32_t>(data_sources_.size()));
   1077   trace_stats->set_data_sources_seen(last_data_source_instance_id_);
   1078   trace_stats->set_tracing_sessions(
   1079       static_cast<uint32_t>(tracing_sessions_.size()));
   1080   trace_stats->set_total_buffers(static_cast<uint32_t>(buffers_.size()));
   1081 
   1082   for (BufferID buf_id : tracing_session->buffers_index) {
   1083     TraceBuffer* buf = GetBufferByID(buf_id);
   1084     if (!buf) {
   1085       PERFETTO_DCHECK(false);
   1086       continue;
   1087     }
   1088     auto* buf_stats_proto = trace_stats->add_buffer_stats();
   1089     const TraceBuffer::Stats& buf_stats = buf->stats();
   1090     buf_stats_proto->set_bytes_written(buf_stats.bytes_written);
   1091     buf_stats_proto->set_chunks_written(buf_stats.chunks_written);
   1092     buf_stats_proto->set_chunks_overwritten(buf_stats.chunks_overwritten);
   1093     buf_stats_proto->set_write_wrap_count(buf_stats.write_wrap_count);
   1094     buf_stats_proto->set_patches_succeeded(buf_stats.patches_succeeded);
   1095     buf_stats_proto->set_patches_failed(buf_stats.patches_failed);
   1096     buf_stats_proto->set_readaheads_succeeded(buf_stats.readaheads_succeeded);
   1097     buf_stats_proto->set_readaheads_failed(buf_stats.readaheads_failed);
   1098     buf_stats_proto->set_abi_violations(buf_stats.abi_violations);
   1099   }  // for (buf in session).
   1100   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   1101   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   1102   packets->emplace_back();
   1103   packets->back().AddSlice(std::move(slice));
   1104 }
   1105 
   1106 void ServiceImpl::MaybeEmitTraceConfig(TracingSession* tracing_session,
   1107                                        std::vector<TracePacket>* packets) {
   1108   if (tracing_session->did_emit_config)
   1109     return;
   1110   tracing_session->did_emit_config = true;
   1111   protos::TrustedPacket packet;
   1112   tracing_session->config.ToProto(packet.mutable_trace_config());
   1113   packet.set_trusted_uid(static_cast<int32_t>(uid_));
   1114   Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize()));
   1115   PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data()));
   1116   packets->emplace_back();
   1117   packets->back().AddSlice(std::move(slice));
   1118 }
   1119 
   1120 ////////////////////////////////////////////////////////////////////////////////
   1121 // ServiceImpl::ConsumerEndpointImpl implementation
   1122 ////////////////////////////////////////////////////////////////////////////////
   1123 
   1124 ServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl(
   1125     ServiceImpl* service,
   1126     base::TaskRunner* task_runner,
   1127     Consumer* consumer)
   1128     : task_runner_(task_runner),
   1129       service_(service),
   1130       consumer_(consumer),
   1131       weak_ptr_factory_(this) {}
   1132 
   1133 ServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() {
   1134   service_->DisconnectConsumer(this);
   1135   consumer_->OnDisconnect();
   1136 }
   1137 
   1138 void ServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() {
   1139   PERFETTO_DCHECK_THREAD(thread_checker_);
   1140   auto weak_this = GetWeakPtr();
   1141   task_runner_->PostTask([weak_this] {
   1142     if (weak_this)
   1143       weak_this->consumer_->OnTracingDisabled();
   1144   });
   1145 }
   1146 
   1147 void ServiceImpl::ConsumerEndpointImpl::EnableTracing(const TraceConfig& cfg,
   1148                                                       base::ScopedFile fd) {
   1149   PERFETTO_DCHECK_THREAD(thread_checker_);
   1150   if (!service_->EnableTracing(this, cfg, std::move(fd)))
   1151     NotifyOnTracingDisabled();
   1152 }
   1153 
   1154 void ServiceImpl::ConsumerEndpointImpl::DisableTracing() {
   1155   PERFETTO_DCHECK_THREAD(thread_checker_);
   1156   if (!tracing_session_id_) {
   1157     PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active");
   1158     return;
   1159   }
   1160   service_->DisableTracing(tracing_session_id_);
   1161 }
   1162 
   1163 void ServiceImpl::ConsumerEndpointImpl::ReadBuffers() {
   1164   PERFETTO_DCHECK_THREAD(thread_checker_);
   1165   if (!tracing_session_id_) {
   1166     PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active");
   1167     return;
   1168   }
   1169   service_->ReadBuffers(tracing_session_id_, this);
   1170 }
   1171 
   1172 void ServiceImpl::ConsumerEndpointImpl::FreeBuffers() {
   1173   PERFETTO_DCHECK_THREAD(thread_checker_);
   1174   if (!tracing_session_id_) {
   1175     PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active");
   1176     return;
   1177   }
   1178   service_->FreeBuffers(tracing_session_id_);
   1179   tracing_session_id_ = 0;
   1180 }
   1181 
   1182 void ServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms,
   1183                                               FlushCallback callback) {
   1184   PERFETTO_DCHECK_THREAD(thread_checker_);
   1185   if (!tracing_session_id_) {
   1186     PERFETTO_LOG("Consumer called Flush() but tracing was not active");
   1187     return;
   1188   }
   1189   service_->Flush(tracing_session_id_, timeout_ms, callback);
   1190 }
   1191 
   1192 base::WeakPtr<ServiceImpl::ConsumerEndpointImpl>
   1193 ServiceImpl::ConsumerEndpointImpl::GetWeakPtr() {
   1194   PERFETTO_DCHECK_THREAD(thread_checker_);
   1195   return weak_ptr_factory_.GetWeakPtr();
   1196 }
   1197 
   1198 ////////////////////////////////////////////////////////////////////////////////
   1199 // ServiceImpl::ProducerEndpointImpl implementation
   1200 ////////////////////////////////////////////////////////////////////////////////
   1201 
   1202 ServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl(
   1203     ProducerID id,
   1204     uid_t uid,
   1205     ServiceImpl* service,
   1206     base::TaskRunner* task_runner,
   1207     Producer* producer,
   1208     const std::string& producer_name)
   1209     : id_(id),
   1210       uid_(uid),
   1211       service_(service),
   1212       task_runner_(task_runner),
   1213       producer_(producer),
   1214       name_(producer_name),
   1215       weak_ptr_factory_(this) {}
   1216 
   1217 ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() {
   1218   service_->DisconnectProducer(id_);
   1219   producer_->OnDisconnect();
   1220 }
   1221 
   1222 void ServiceImpl::ProducerEndpointImpl::RegisterDataSource(
   1223     const DataSourceDescriptor& desc) {
   1224   PERFETTO_DCHECK_THREAD(thread_checker_);
   1225   if (desc.name().empty()) {
   1226     PERFETTO_DLOG("Received RegisterDataSource() with empty name");
   1227     return;
   1228   }
   1229 
   1230   service_->RegisterDataSource(id_, desc);
   1231 }
   1232 
   1233 void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource(
   1234     const std::string& name) {
   1235   PERFETTO_DCHECK_THREAD(thread_checker_);
   1236   service_->UnregisterDataSource(id_, name);
   1237 }
   1238 
   1239 void ServiceImpl::ProducerEndpointImpl::CommitData(
   1240     const CommitDataRequest& req_untrusted,
   1241     CommitDataCallback callback) {
   1242   PERFETTO_DCHECK_THREAD(thread_checker_);
   1243 
   1244   if (!shared_memory_) {
   1245     PERFETTO_DLOG(
   1246         "Attempted to commit data before the shared memory was allocated.");
   1247     return;
   1248   }
   1249   PERFETTO_DCHECK(shmem_abi_.is_valid());
   1250   for (const auto& entry : req_untrusted.chunks_to_move()) {
   1251     const uint32_t page_idx = entry.page();
   1252     if (page_idx >= shmem_abi_.num_pages())
   1253       continue;  // A buggy or malicious producer.
   1254 
   1255     SharedMemoryABI::Chunk chunk =
   1256         shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk());
   1257     if (!chunk.is_valid()) {
   1258       PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete",
   1259                     entry.page(), entry.chunk());
   1260       continue;
   1261     }
   1262 
   1263     // TryAcquireChunkForReading() has load-acquire semantics. Once acquired,
   1264     // the ABI contract expects the producer to not touch the chunk anymore
   1265     // (until the service marks that as free). This is why all the reads below
   1266     // are just memory_order_relaxed. Also, the code here assumes that all this
   1267     // data can be malicious and just gives up if anything is malformed.
   1268     BufferID buffer_id = static_cast<BufferID>(entry.target_buffer());
   1269     const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header();
   1270     WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed);
   1271     ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed);
   1272     auto packets = chunk_header.packets.load(std::memory_order_relaxed);
   1273     uint16_t num_fragments = packets.count;
   1274     uint8_t chunk_flags = packets.flags;
   1275 
   1276     service_->CopyProducerPageIntoLogBuffer(
   1277         id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags,
   1278         chunk.payload_begin(), chunk.payload_size());
   1279 
   1280     // This one has release-store semantics.
   1281     shmem_abi_.ReleaseChunkAsFree(std::move(chunk));
   1282   }  // for(chunks_to_move)
   1283 
   1284   service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch());
   1285 
   1286   if (req_untrusted.flush_request_id()) {
   1287     service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id());
   1288   }
   1289 
   1290   // Keep this invocation last. ProducerIPCService::CommitData() relies on this
   1291   // callback being invoked within the same callstack and not posted. If this
   1292   // changes, the code there needs to be changed accordingly.
   1293   if (callback)
   1294     callback();
   1295 }
   1296 
   1297 void ServiceImpl::ProducerEndpointImpl::SetSharedMemory(
   1298     std::unique_ptr<SharedMemory> shared_memory) {
   1299   PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid());
   1300   shared_memory_ = std::move(shared_memory);
   1301   shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()),
   1302                         shared_memory_->size(),
   1303                         shared_buffer_page_size_kb() * 1024);
   1304 }
   1305 
   1306 SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const {
   1307   PERFETTO_DCHECK_THREAD(thread_checker_);
   1308   return shared_memory_.get();
   1309 }
   1310 
   1311 size_t ServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb() const {
   1312   return shared_buffer_page_size_kb_;
   1313 }
   1314 
   1315 void ServiceImpl::ProducerEndpointImpl::TearDownDataSource(
   1316     DataSourceInstanceID ds_inst_id) {
   1317   // TODO(primiano): When we'll support tearing down the SMB, at this point we
   1318   // should send the Producer a TearDownTracing if all its data sources have
   1319   // been disabled (see b/77532839 and aosp/655179 PS1).
   1320   PERFETTO_DCHECK_THREAD(thread_checker_);
   1321   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1322   task_runner_->PostTask([weak_this, ds_inst_id] {
   1323     if (weak_this)
   1324       weak_this->producer_->TearDownDataSourceInstance(ds_inst_id);
   1325   });
   1326 }
   1327 
   1328 SharedMemoryArbiterImpl*
   1329 ServiceImpl::ProducerEndpointImpl::GetOrCreateShmemArbiter() {
   1330   PERFETTO_DCHECK_THREAD(thread_checker_);
   1331   if (!inproc_shmem_arbiter_) {
   1332     inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl(
   1333         shared_memory_->start(), shared_memory_->size(),
   1334         shared_buffer_page_size_kb_ * 1024, this, task_runner_));
   1335   }
   1336   return inproc_shmem_arbiter_.get();
   1337 }
   1338 
   1339 std::unique_ptr<TraceWriter>
   1340 ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) {
   1341   PERFETTO_DCHECK_THREAD(thread_checker_);
   1342   return GetOrCreateShmemArbiter()->CreateTraceWriter(buf_id);
   1343 }
   1344 
   1345 void ServiceImpl::ProducerEndpointImpl::OnTracingSetup() {
   1346   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1347   task_runner_->PostTask([weak_this] {
   1348     if (weak_this)
   1349       weak_this->producer_->OnTracingSetup();
   1350   });
   1351 }
   1352 
   1353 void ServiceImpl::ProducerEndpointImpl::Flush(
   1354     FlushRequestID flush_request_id,
   1355     const std::vector<DataSourceInstanceID>& data_sources) {
   1356   PERFETTO_DCHECK_THREAD(thread_checker_);
   1357   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1358   task_runner_->PostTask([weak_this, flush_request_id, data_sources] {
   1359     if (weak_this) {
   1360       weak_this->producer_->Flush(flush_request_id, data_sources.data(),
   1361                                   data_sources.size());
   1362     }
   1363   });
   1364 }
   1365 
   1366 void ServiceImpl::ProducerEndpointImpl::CreateDataSourceInstance(
   1367     DataSourceInstanceID ds_id,
   1368     const DataSourceConfig& config) {
   1369   PERFETTO_DCHECK_THREAD(thread_checker_);
   1370   auto weak_this = weak_ptr_factory_.GetWeakPtr();
   1371   task_runner_->PostTask([weak_this, ds_id, config] {
   1372     if (weak_this)
   1373       weak_this->producer_->CreateDataSourceInstance(ds_id, std::move(config));
   1374   });
   1375 }
   1376 
   1377 void ServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(FlushRequestID id) {
   1378   PERFETTO_DCHECK_THREAD(thread_checker_);
   1379   return GetOrCreateShmemArbiter()->NotifyFlushComplete(id);
   1380 }
   1381 
   1382 ////////////////////////////////////////////////////////////////////////////////
   1383 // ServiceImpl::TracingSession implementation
   1384 ////////////////////////////////////////////////////////////////////////////////
   1385 
   1386 ServiceImpl::TracingSession::TracingSession(ConsumerEndpointImpl* consumer_ptr,
   1387                                             const TraceConfig& new_config)
   1388     : consumer(consumer_ptr), config(new_config) {}
   1389 
   1390 }  // namespace perfetto
   1391