1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "src/tracing/core/service_impl.h" 18 19 #include <errno.h> 20 #include <inttypes.h> 21 #include <limits.h> 22 #include <string.h> 23 #include <sys/uio.h> 24 #include <unistd.h> 25 26 #include <algorithm> 27 28 #include "perfetto/base/build_config.h" 29 #include "perfetto/base/task_runner.h" 30 #include "perfetto/base/utils.h" 31 #include "perfetto/tracing/core/consumer.h" 32 #include "perfetto/tracing/core/data_source_config.h" 33 #include "perfetto/tracing/core/producer.h" 34 #include "perfetto/tracing/core/shared_memory.h" 35 #include "perfetto/tracing/core/shared_memory_abi.h" 36 #include "perfetto/tracing/core/trace_packet.h" 37 #include "perfetto/tracing/core/trace_writer.h" 38 #include "src/tracing/core/packet_stream_validator.h" 39 #include "src/tracing/core/shared_memory_arbiter_impl.h" 40 #include "src/tracing/core/trace_buffer.h" 41 42 #include "perfetto/trace/clock_snapshot.pb.h" 43 #include "perfetto/trace/trusted_packet.pb.h" 44 45 // General note: this class must assume that Producers are malicious and will 46 // try to crash / exploit this class. We can trust pointers because they come 47 // from the IPC layer, but we should never assume that that the producer calls 48 // come in the right order or their arguments are sane / within bounds. 49 50 namespace perfetto { 51 52 namespace { 53 constexpr size_t kDefaultShmPageSize = base::kPageSize; 54 constexpr int kMaxBuffersPerConsumer = 128; 55 constexpr base::TimeMillis kClockSnapshotInterval(10 * 1000); 56 constexpr base::TimeMillis kStatsSnapshotInterval(10 * 1000); 57 constexpr int kMinWriteIntoFilePeriodMs = 100; 58 constexpr int kDefaultWriteIntoFilePeriodMs = 5000; 59 constexpr int kFlushTimeoutMs = 1000; 60 constexpr int kMaxConcurrentTracingSessions = 5; 61 62 constexpr uint64_t kMillisPerHour = 3600000; 63 64 // These apply only if enable_extra_guardrails is true. 65 constexpr uint64_t kMaxTracingDurationMillis = 24 * kMillisPerHour; 66 constexpr uint64_t kMaxTracingBufferSizeKb = 32 * 1024; 67 } // namespace 68 69 // These constants instead are defined in the header because are used by tests. 70 constexpr size_t ServiceImpl::kDefaultShmSize; 71 constexpr size_t ServiceImpl::kMaxShmSize; 72 73 // static 74 std::unique_ptr<Service> Service::CreateInstance( 75 std::unique_ptr<SharedMemory::Factory> shm_factory, 76 base::TaskRunner* task_runner) { 77 return std::unique_ptr<Service>( 78 new ServiceImpl(std::move(shm_factory), task_runner)); 79 } 80 81 ServiceImpl::ServiceImpl(std::unique_ptr<SharedMemory::Factory> shm_factory, 82 base::TaskRunner* task_runner) 83 : task_runner_(task_runner), 84 shm_factory_(std::move(shm_factory)), 85 uid_(getuid()), 86 buffer_ids_(kMaxTraceBufferID), 87 weak_ptr_factory_(this) { 88 PERFETTO_DCHECK(task_runner_); 89 } 90 91 ServiceImpl::~ServiceImpl() { 92 // TODO(fmayer): handle teardown of all Producer. 93 } 94 95 std::unique_ptr<Service::ProducerEndpoint> ServiceImpl::ConnectProducer( 96 Producer* producer, 97 uid_t uid, 98 const std::string& producer_name, 99 size_t shared_memory_size_hint_bytes) { 100 PERFETTO_DCHECK_THREAD(thread_checker_); 101 102 if (lockdown_mode_ && uid != geteuid()) { 103 PERFETTO_DLOG("Lockdown mode. Rejecting producer with UID %ld", 104 static_cast<unsigned long>(uid)); 105 return nullptr; 106 } 107 108 if (producers_.size() >= kMaxProducerID) { 109 PERFETTO_DCHECK(false); 110 return nullptr; 111 } 112 const ProducerID id = GetNextProducerID(); 113 PERFETTO_DLOG("Producer %" PRIu16 " connected", id); 114 115 std::unique_ptr<ProducerEndpointImpl> endpoint(new ProducerEndpointImpl( 116 id, uid, this, task_runner_, producer, producer_name)); 117 auto it_and_inserted = producers_.emplace(id, endpoint.get()); 118 PERFETTO_DCHECK(it_and_inserted.second); 119 endpoint->shmem_size_hint_bytes_ = shared_memory_size_hint_bytes; 120 task_runner_->PostTask(std::bind(&Producer::OnConnect, endpoint->producer_)); 121 122 return std::move(endpoint); 123 } 124 125 void ServiceImpl::DisconnectProducer(ProducerID id) { 126 PERFETTO_DCHECK_THREAD(thread_checker_); 127 PERFETTO_DLOG("Producer %" PRIu16 " disconnected", id); 128 PERFETTO_DCHECK(producers_.count(id)); 129 130 for (auto it = data_sources_.begin(); it != data_sources_.end();) { 131 auto next = it; 132 next++; 133 if (it->second.producer_id == id) 134 UnregisterDataSource(id, it->second.descriptor.name()); 135 it = next; 136 } 137 138 producers_.erase(id); 139 UpdateMemoryGuardrail(); 140 } 141 142 ServiceImpl::ProducerEndpointImpl* ServiceImpl::GetProducer( 143 ProducerID id) const { 144 PERFETTO_DCHECK_THREAD(thread_checker_); 145 auto it = producers_.find(id); 146 if (it == producers_.end()) 147 return nullptr; 148 return it->second; 149 } 150 151 std::unique_ptr<Service::ConsumerEndpoint> ServiceImpl::ConnectConsumer( 152 Consumer* consumer) { 153 PERFETTO_DCHECK_THREAD(thread_checker_); 154 PERFETTO_DLOG("Consumer %p connected", reinterpret_cast<void*>(consumer)); 155 std::unique_ptr<ConsumerEndpointImpl> endpoint( 156 new ConsumerEndpointImpl(this, task_runner_, consumer)); 157 auto it_and_inserted = consumers_.emplace(endpoint.get()); 158 PERFETTO_DCHECK(it_and_inserted.second); 159 task_runner_->PostTask(std::bind(&Consumer::OnConnect, endpoint->consumer_)); 160 return std::move(endpoint); 161 } 162 163 void ServiceImpl::DisconnectConsumer(ConsumerEndpointImpl* consumer) { 164 PERFETTO_DCHECK_THREAD(thread_checker_); 165 PERFETTO_DLOG("Consumer %p disconnected", reinterpret_cast<void*>(consumer)); 166 PERFETTO_DCHECK(consumers_.count(consumer)); 167 168 // TODO(primiano) : Check that this is safe (what happens if there are 169 // ReadBuffers() calls posted in the meantime? They need to become noop). 170 if (consumer->tracing_session_id_) 171 FreeBuffers(consumer->tracing_session_id_); // Will also DisableTracing(). 172 consumers_.erase(consumer); 173 174 // At this point no more pointers to |consumer| should be around. 175 #if PERFETTO_DCHECK_IS_ON() 176 PERFETTO_DCHECK(!std::any_of( 177 tracing_sessions_.begin(), tracing_sessions_.end(), 178 [consumer](const std::pair<const TracingSessionID, TracingSession>& kv) { 179 return kv.second.consumer == consumer; 180 })); 181 #endif 182 } 183 184 bool ServiceImpl::EnableTracing(ConsumerEndpointImpl* consumer, 185 const TraceConfig& cfg, 186 base::ScopedFile fd) { 187 PERFETTO_DCHECK_THREAD(thread_checker_); 188 PERFETTO_DLOG("Enabling tracing for consumer %p", 189 reinterpret_cast<void*>(consumer)); 190 if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_SET) 191 lockdown_mode_ = true; 192 if (cfg.lockdown_mode() == TraceConfig::LockdownModeOperation::LOCKDOWN_CLEAR) 193 lockdown_mode_ = false; 194 TracingSession* tracing_session = 195 GetTracingSession(consumer->tracing_session_id_); 196 if (tracing_session) { 197 PERFETTO_DLOG( 198 "A Consumer is trying to EnableTracing() but another tracing session " 199 "is already active (forgot a call to FreeBuffers() ?)"); 200 return false; 201 } 202 203 if (cfg.enable_extra_guardrails()) { 204 if (cfg.duration_ms() > kMaxTracingDurationMillis) { 205 PERFETTO_ELOG("Requested too long trace (%" PRIu32 "ms > %" PRIu64 206 " ms)", 207 cfg.duration_ms(), kMaxTracingDurationMillis); 208 return false; 209 } 210 uint64_t buf_size_sum = 0; 211 for (const auto& buf : cfg.buffers()) 212 buf_size_sum += buf.size_kb(); 213 if (buf_size_sum > kMaxTracingBufferSizeKb) { 214 PERFETTO_ELOG("Requested too large trace buffer (%" PRIu64 215 "kB > %" PRIu64 " kB)", 216 buf_size_sum, kMaxTracingBufferSizeKb); 217 return false; 218 } 219 } 220 221 if (cfg.buffers_size() > kMaxBuffersPerConsumer) { 222 PERFETTO_DLOG("Too many buffers configured (%d)", cfg.buffers_size()); 223 return false; 224 } 225 226 // TODO(primiano): This is a workaround to prevent that a producer gets stuck 227 // in a state where it stalls by design by having more TraceWriterImpl 228 // instances than free pages in the buffer. This is really a bug in 229 // trace_probes and the way it handles stalls in the shmem buffer. 230 if (tracing_sessions_.size() >= kMaxConcurrentTracingSessions) { 231 PERFETTO_ELOG("Too many concurrent tracing sesions (%zu)", 232 tracing_sessions_.size()); 233 return false; 234 } 235 236 const TracingSessionID tsid = ++last_tracing_session_id_; 237 tracing_session = 238 &tracing_sessions_.emplace(tsid, TracingSession(consumer, cfg)) 239 .first->second; 240 241 if (cfg.write_into_file()) { 242 if (!fd) { 243 PERFETTO_ELOG( 244 "The TraceConfig had write_into_file==true but no fd was passed"); 245 return false; 246 } 247 tracing_session->write_into_file = std::move(fd); 248 uint32_t write_period_ms = cfg.file_write_period_ms(); 249 if (write_period_ms == 0) 250 write_period_ms = kDefaultWriteIntoFilePeriodMs; 251 if (write_period_ms < kMinWriteIntoFilePeriodMs) 252 write_period_ms = kMinWriteIntoFilePeriodMs; 253 tracing_session->write_period_ms = write_period_ms; 254 tracing_session->max_file_size_bytes = cfg.max_file_size_bytes(); 255 tracing_session->bytes_written_into_file = 0; 256 } 257 258 // Initialize the log buffers. 259 bool did_allocate_all_buffers = true; 260 261 // Allocate the trace buffers. Also create a map to translate a consumer 262 // relative index (TraceConfig.DataSourceConfig.target_buffer) into the 263 // corresponding BufferID, which is a global ID namespace for the service and 264 // all producers. 265 size_t total_buf_size_kb = 0; 266 const size_t num_buffers = static_cast<size_t>(cfg.buffers_size()); 267 tracing_session->buffers_index.reserve(num_buffers); 268 for (size_t i = 0; i < num_buffers; i++) { 269 const TraceConfig::BufferConfig& buffer_cfg = cfg.buffers()[i]; 270 BufferID global_id = buffer_ids_.Allocate(); 271 if (!global_id) { 272 did_allocate_all_buffers = false; // We ran out of IDs. 273 break; 274 } 275 tracing_session->buffers_index.push_back(global_id); 276 const size_t buf_size_bytes = buffer_cfg.size_kb() * 1024u; 277 total_buf_size_kb += buffer_cfg.size_kb(); 278 auto it_and_inserted = 279 buffers_.emplace(global_id, TraceBuffer::Create(buf_size_bytes)); 280 PERFETTO_DCHECK(it_and_inserted.second); // buffers_.count(global_id) == 0. 281 std::unique_ptr<TraceBuffer>& trace_buffer = it_and_inserted.first->second; 282 if (!trace_buffer) { 283 did_allocate_all_buffers = false; 284 break; 285 } 286 } 287 288 UpdateMemoryGuardrail(); 289 290 // This can happen if either: 291 // - All the kMaxTraceBufferID slots are taken. 292 // - OOM, or, more relistically, we exhausted virtual memory. 293 // In any case, free all the previously allocated buffers and abort. 294 // TODO(fmayer): add a test to cover this case, this is quite subtle. 295 if (!did_allocate_all_buffers) { 296 for (BufferID global_id : tracing_session->buffers_index) { 297 buffer_ids_.Free(global_id); 298 buffers_.erase(global_id); 299 } 300 tracing_sessions_.erase(tsid); 301 return false; 302 } 303 304 consumer->tracing_session_id_ = tsid; 305 306 // Enable the data sources on the producers. 307 for (const TraceConfig::DataSource& cfg_data_source : cfg.data_sources()) { 308 // Scan all the registered data sources with a matching name. 309 auto range = data_sources_.equal_range(cfg_data_source.config().name()); 310 for (auto it = range.first; it != range.second; it++) { 311 TraceConfig::ProducerConfig producer_config; 312 for (auto& config : cfg.producers()) { 313 if (GetProducer(it->second.producer_id)->name_ == 314 config.producer_name()) { 315 producer_config = config; 316 break; 317 } 318 } 319 CreateDataSourceInstance(cfg_data_source, producer_config, it->second, 320 tracing_session); 321 } 322 } 323 324 // Trigger delayed task if the trace is time limited. 325 const uint32_t trace_duration_ms = cfg.duration_ms(); 326 if (trace_duration_ms > 0) { 327 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 328 task_runner_->PostDelayedTask( 329 [weak_this, tsid] { 330 if (weak_this) 331 weak_this->FlushAndDisableTracing(tsid); 332 }, 333 trace_duration_ms); 334 } 335 336 // Start the periodic drain tasks if we should to save the trace into a file. 337 if (cfg.write_into_file()) { 338 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 339 task_runner_->PostDelayedTask( 340 [weak_this, tsid] { 341 if (weak_this) 342 weak_this->ReadBuffers(tsid, nullptr); 343 }, 344 tracing_session->delay_to_next_write_period_ms()); 345 } 346 347 tracing_session->tracing_enabled = true; 348 PERFETTO_LOG( 349 "Enabled tracing, #sources:%zu, duration:%d ms, #buffers:%d, total " 350 "buffer size:%zu KB, total sessions:%zu", 351 cfg.data_sources().size(), trace_duration_ms, cfg.buffers_size(), 352 total_buf_size_kb, tracing_sessions_.size()); 353 return true; 354 } 355 356 // DisableTracing just stops the data sources but doesn't free up any buffer. 357 // This is to allow the consumer to freeze the buffers (by stopping the trace) 358 // and then drain the buffers. The actual teardown of the TracingSession happens 359 // in FreeBuffers(). 360 void ServiceImpl::DisableTracing(TracingSessionID tsid) { 361 PERFETTO_DCHECK_THREAD(thread_checker_); 362 TracingSession* tracing_session = GetTracingSession(tsid); 363 if (!tracing_session) { 364 // Can happen if the consumer calls this before EnableTracing() or after 365 // FreeBuffers(). 366 PERFETTO_DLOG("DisableTracing() failed, invalid session ID %" PRIu64, tsid); 367 return; 368 } 369 370 for (const auto& data_source_inst : tracing_session->data_source_instances) { 371 const ProducerID producer_id = data_source_inst.first; 372 const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id; 373 ProducerEndpointImpl* producer = GetProducer(producer_id); 374 producer->TearDownDataSource(ds_inst_id); 375 } 376 tracing_session->data_source_instances.clear(); 377 378 // If the client requested us to periodically save the buffer into the passed 379 // file, force a write pass. 380 if (tracing_session->write_into_file) { 381 tracing_session->write_period_ms = 0; 382 ReadBuffers(tsid, nullptr); 383 } 384 385 if (tracing_session->tracing_enabled) { 386 tracing_session->tracing_enabled = false; 387 tracing_session->consumer->NotifyOnTracingDisabled(); 388 } 389 390 // Deliberately NOT removing the session from |tracing_session_|, it's still 391 // needed to call ReadBuffers(). FreeBuffers() will erase() the session. 392 } 393 394 void ServiceImpl::Flush(TracingSessionID tsid, 395 uint32_t timeout_ms, 396 ConsumerEndpoint::FlushCallback callback) { 397 PERFETTO_DCHECK_THREAD(thread_checker_); 398 TracingSession* tracing_session = GetTracingSession(tsid); 399 if (!tracing_session) { 400 PERFETTO_DLOG("Flush() failed, invalid session ID %" PRIu64, tsid); 401 return; 402 } 403 404 if (tracing_session->pending_flushes.size() > 1000) { 405 PERFETTO_ELOG("Too many flushes (%zu) pending for the tracing session", 406 tracing_session->pending_flushes.size()); 407 callback(false); 408 return; 409 } 410 411 FlushRequestID flush_request_id = ++last_flush_request_id_; 412 PendingFlush& pending_flush = 413 tracing_session->pending_flushes 414 .emplace_hint(tracing_session->pending_flushes.end(), 415 flush_request_id, PendingFlush(std::move(callback))) 416 ->second; 417 418 // Send a flush request to each producer involved in the tracing session. In 419 // order to issue a flush request we have to build a map of all data source 420 // instance ids enabled for each producer. 421 std::map<ProducerID, std::vector<DataSourceInstanceID>> flush_map; 422 for (const auto& data_source_inst : tracing_session->data_source_instances) { 423 const ProducerID producer_id = data_source_inst.first; 424 const DataSourceInstanceID ds_inst_id = data_source_inst.second.instance_id; 425 flush_map[producer_id].push_back(ds_inst_id); 426 } 427 428 for (const auto& kv : flush_map) { 429 ProducerID producer_id = kv.first; 430 ProducerEndpointImpl* producer = GetProducer(producer_id); 431 const std::vector<DataSourceInstanceID>& data_sources = kv.second; 432 producer->Flush(flush_request_id, data_sources); 433 pending_flush.producers.insert(producer_id); 434 } 435 436 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 437 task_runner_->PostDelayedTask( 438 [weak_this, tsid, flush_request_id] { 439 if (weak_this) 440 weak_this->OnFlushTimeout(tsid, flush_request_id); 441 }, 442 timeout_ms); 443 } 444 445 void ServiceImpl::NotifyFlushDoneForProducer(ProducerID producer_id, 446 FlushRequestID flush_request_id) { 447 for (auto& kv : tracing_sessions_) { 448 // Remove all pending flushes <= |flush_request_id| for |producer_id|. 449 auto& pending_flushes = kv.second.pending_flushes; 450 auto end_it = pending_flushes.upper_bound(flush_request_id); 451 for (auto it = pending_flushes.begin(); it != end_it;) { 452 PendingFlush& pending_flush = it->second; 453 pending_flush.producers.erase(producer_id); 454 if (pending_flush.producers.empty()) { 455 task_runner_->PostTask( 456 std::bind(std::move(pending_flush.callback), /*success=*/true)); 457 it = pending_flushes.erase(it); 458 } else { 459 it++; 460 } 461 } // for (pending_flushes) 462 } // for (tracing_session) 463 } 464 465 void ServiceImpl::OnFlushTimeout(TracingSessionID tsid, 466 FlushRequestID flush_request_id) { 467 TracingSession* tracing_session = GetTracingSession(tsid); 468 if (!tracing_session) 469 return; 470 auto it = tracing_session->pending_flushes.find(flush_request_id); 471 if (it == tracing_session->pending_flushes.end()) 472 return; // Nominal case: flush was completed and acked on time. 473 auto callback = std::move(it->second.callback); 474 tracing_session->pending_flushes.erase(it); 475 callback(/*success=*/false); 476 } 477 478 void ServiceImpl::FlushAndDisableTracing(TracingSessionID tsid) { 479 PERFETTO_DCHECK_THREAD(thread_checker_); 480 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 481 Flush(tsid, kFlushTimeoutMs, [weak_this, tsid](bool success) { 482 PERFETTO_DLOG("Flush done (success: %d), disabling trace session %" PRIu64, 483 success, tsid); 484 if (weak_this) 485 weak_this->DisableTracing(tsid); 486 }); 487 } 488 489 // Note: when this is called to write into a file passed when starting tracing 490 // |consumer| will be == nullptr (as opposite to the case of a consumer asking 491 // to send the trace data back over IPC). 492 void ServiceImpl::ReadBuffers(TracingSessionID tsid, 493 ConsumerEndpointImpl* consumer) { 494 PERFETTO_DCHECK_THREAD(thread_checker_); 495 TracingSession* tracing_session = GetTracingSession(tsid); 496 if (!tracing_session) { 497 // This will be hit systematically from the PostDelayedTask when directly 498 // writing into the file (in which case consumer == nullptr). Suppress the 499 // log in this case as it's just spam. 500 if (consumer) 501 PERFETTO_DLOG("Cannot ReadBuffers(): no tracing session is active"); 502 return; // TODO(primiano): signal failure? 503 } 504 505 // This can happen if the file is closed by a previous task because it reaches 506 // |max_file_size_bytes|. 507 if (!tracing_session->write_into_file && !consumer) 508 return; 509 510 if (tracing_session->write_into_file && consumer) { 511 // If the consumer enabled tracing and asked to save the contents into the 512 // passed file makes little sense to also try to read the buffers over IPC, 513 // as that would just steal data from the periodic draining task. 514 PERFETTO_DCHECK(false); 515 return; 516 } 517 518 std::vector<TracePacket> packets; 519 packets.reserve(1024); // Just an educated guess to avoid trivial expansions. 520 MaybeSnapshotClocks(tracing_session, &packets); 521 MaybeSnapshotStats(tracing_session, &packets); 522 MaybeEmitTraceConfig(tracing_session, &packets); 523 524 size_t packets_bytes = 0; // SUM(slice.size() for each slice in |packets|). 525 size_t total_slices = 0; // SUM(#slices in |packets|). 526 527 // Add up size for packets added by the Maybe* calls above. 528 for (const TracePacket& packet : packets) { 529 packets_bytes += packet.size(); 530 total_slices += packet.slices().size(); 531 } 532 533 // This is a rough threshold to determine how much to read from the buffer in 534 // each task. This is to avoid executing a single huge sending task for too 535 // long and risk to hit the watchdog. This is *not* an upper bound: we just 536 // stop accumulating new packets and PostTask *after* we cross this threshold. 537 // This constant essentially balances the PostTask and IPC overhead vs the 538 // responsiveness of the service. An extremely small value will cause one IPC 539 // and one PostTask for each slice but will keep the service extremely 540 // responsive. An extremely large value will batch the send for the full 541 // buffer in one large task, will hit the blocking send() once the socket 542 // buffers are full and hang the service for a bit (until the consumer 543 // catches up). 544 static constexpr size_t kApproxBytesPerTask = 32768; 545 bool did_hit_threshold = false; 546 547 // TODO(primiano): Extend the ReadBuffers API to allow reading only some 548 // buffers, not all of them in one go. 549 for (size_t buf_idx = 0; 550 buf_idx < tracing_session->num_buffers() && !did_hit_threshold; 551 buf_idx++) { 552 auto tbuf_iter = buffers_.find(tracing_session->buffers_index[buf_idx]); 553 if (tbuf_iter == buffers_.end()) { 554 PERFETTO_DCHECK(false); 555 continue; 556 } 557 TraceBuffer& tbuf = *tbuf_iter->second; 558 tbuf.BeginRead(); 559 while (!did_hit_threshold) { 560 TracePacket packet; 561 uid_t producer_uid = kInvalidUid; 562 if (!tbuf.ReadNextTracePacket(&packet, &producer_uid)) 563 break; 564 PERFETTO_DCHECK(producer_uid != kInvalidUid); 565 PERFETTO_DCHECK(packet.size() > 0); 566 if (!PacketStreamValidator::Validate(packet.slices())) { 567 PERFETTO_DLOG("Dropping invalid packet"); 568 continue; 569 } 570 571 // Append a slice with the trusted UID of the producer. This can't 572 // be spoofed because above we validated that the existing slices 573 // don't contain any trusted UID fields. For added safety we append 574 // instead of prepending because according to protobuf semantics, if 575 // the same field is encountered multiple times the last instance 576 // takes priority. Note that truncated packets are also rejected, so 577 // the producer can't give us a partial packet (e.g., a truncated 578 // string) which only becomes valid when the UID is appended here. 579 protos::TrustedPacket trusted_packet; 580 trusted_packet.set_trusted_uid(static_cast<int32_t>(producer_uid)); 581 static constexpr size_t kTrustedBufSize = 16; 582 Slice slice = Slice::Allocate(kTrustedBufSize); 583 PERFETTO_CHECK( 584 trusted_packet.SerializeToArray(slice.own_data(), kTrustedBufSize)); 585 slice.size = static_cast<size_t>(trusted_packet.GetCachedSize()); 586 PERFETTO_DCHECK(slice.size > 0 && slice.size <= kTrustedBufSize); 587 packet.AddSlice(std::move(slice)); 588 589 // Append the packet (inclusive of the trusted uid) to |packets|. 590 packets_bytes += packet.size(); 591 total_slices += packet.slices().size(); 592 did_hit_threshold = packets_bytes >= kApproxBytesPerTask && 593 !tracing_session->write_into_file; 594 packets.emplace_back(std::move(packet)); 595 } // for(packets...) 596 } // for(buffers...) 597 598 // If the caller asked us to write into a file by setting 599 // |write_into_file| == true in the trace config, drain the packets read 600 // (if any) into the given file descriptor. 601 if (tracing_session->write_into_file) { 602 const uint64_t max_size = tracing_session->max_file_size_bytes 603 ? tracing_session->max_file_size_bytes 604 : std::numeric_limits<size_t>::max(); 605 606 // When writing into a file, the file should look like a root trace.proto 607 // message. Each packet should be prepended with a proto preamble stating 608 // its field id (within trace.proto) and size. Hence the addition below. 609 const size_t max_iovecs = total_slices + packets.size(); 610 611 size_t num_iovecs = 0; 612 bool stop_writing_into_file = tracing_session->write_period_ms == 0; 613 std::unique_ptr<struct iovec[]> iovecs(new struct iovec[max_iovecs]); 614 size_t num_iovecs_at_last_packet = 0; 615 uint64_t bytes_about_to_be_written = 0; 616 for (TracePacket& packet : packets) { 617 std::tie(iovecs[num_iovecs].iov_base, iovecs[num_iovecs].iov_len) = 618 packet.GetProtoPreamble(); 619 bytes_about_to_be_written += iovecs[num_iovecs].iov_len; 620 num_iovecs++; 621 for (const Slice& slice : packet.slices()) { 622 // writev() doesn't change the passed pointer. However, struct iovec 623 // take a non-const ptr because it's the same struct used by readv(). 624 // Hence the const_cast here. 625 char* start = static_cast<char*>(const_cast<void*>(slice.start)); 626 bytes_about_to_be_written += slice.size; 627 iovecs[num_iovecs++] = {start, slice.size}; 628 } 629 630 if (tracing_session->bytes_written_into_file + 631 bytes_about_to_be_written >= 632 max_size) { 633 stop_writing_into_file = true; 634 num_iovecs = num_iovecs_at_last_packet; 635 break; 636 } 637 638 num_iovecs_at_last_packet = num_iovecs; 639 } 640 PERFETTO_DCHECK(num_iovecs <= max_iovecs); 641 int fd = *tracing_session->write_into_file; 642 643 uint64_t total_wr_size = 0; 644 645 // writev() can take at most IOV_MAX entries per call. Batch them. 646 constexpr size_t kIOVMax = IOV_MAX; 647 for (size_t i = 0; i < num_iovecs; i += kIOVMax) { 648 int iov_batch_size = static_cast<int>(std::min(num_iovecs - i, kIOVMax)); 649 ssize_t wr_size = PERFETTO_EINTR(writev(fd, &iovecs[i], iov_batch_size)); 650 if (wr_size <= 0) { 651 PERFETTO_PLOG("writev() failed"); 652 stop_writing_into_file = true; 653 break; 654 } 655 total_wr_size += static_cast<size_t>(wr_size); 656 } 657 658 tracing_session->bytes_written_into_file += total_wr_size; 659 660 PERFETTO_DLOG("Draining into file, written: %" PRIu64 " KB, stop: %d", 661 (total_wr_size + 1023) / 1024, stop_writing_into_file); 662 if (stop_writing_into_file) { 663 tracing_session->write_into_file.reset(); 664 tracing_session->write_period_ms = 0; 665 DisableTracing(tsid); 666 return; 667 } 668 669 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 670 task_runner_->PostDelayedTask( 671 [weak_this, tsid] { 672 if (weak_this) 673 weak_this->ReadBuffers(tsid, nullptr); 674 }, 675 tracing_session->delay_to_next_write_period_ms()); 676 return; 677 } // if (tracing_session->write_into_file) 678 679 const bool has_more = did_hit_threshold; 680 if (has_more) { 681 auto weak_consumer = consumer->GetWeakPtr(); 682 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 683 task_runner_->PostTask([weak_this, weak_consumer, tsid] { 684 if (!weak_this || !weak_consumer) 685 return; 686 weak_this->ReadBuffers(tsid, weak_consumer.get()); 687 }); 688 } 689 690 // Keep this as tail call, just in case the consumer re-enters. 691 consumer->consumer_->OnTraceData(std::move(packets), has_more); 692 } 693 694 void ServiceImpl::FreeBuffers(TracingSessionID tsid) { 695 PERFETTO_DCHECK_THREAD(thread_checker_); 696 PERFETTO_DLOG("Freeing buffers for session %" PRIu64, tsid); 697 TracingSession* tracing_session = GetTracingSession(tsid); 698 if (!tracing_session) { 699 PERFETTO_DLOG("FreeBuffers() failed, invalid session ID %" PRIu64, tsid); 700 return; // TODO(primiano): signal failure? 701 } 702 DisableTracing(tsid); 703 704 for (BufferID buffer_id : tracing_session->buffers_index) { 705 buffer_ids_.Free(buffer_id); 706 PERFETTO_DCHECK(buffers_.count(buffer_id) == 1); 707 buffers_.erase(buffer_id); 708 } 709 tracing_sessions_.erase(tsid); 710 UpdateMemoryGuardrail(); 711 712 PERFETTO_LOG("Tracing session %" PRIu64 " ended, total sessions:%zu", tsid, 713 tracing_sessions_.size()); 714 } 715 716 void ServiceImpl::RegisterDataSource(ProducerID producer_id, 717 const DataSourceDescriptor& desc) { 718 PERFETTO_DCHECK_THREAD(thread_checker_); 719 PERFETTO_DLOG("Producer %" PRIu16 " registered data source \"%s\"", 720 producer_id, desc.name().c_str()); 721 722 PERFETTO_DCHECK(!desc.name().empty()); 723 auto reg_ds = data_sources_.emplace(desc.name(), 724 RegisteredDataSource{producer_id, desc}); 725 726 // If there are existing tracing sessions, we need to check if the new 727 // data source is enabled by any of them. 728 if (tracing_sessions_.empty()) 729 return; 730 731 ProducerEndpointImpl* producer = GetProducer(producer_id); 732 if (!producer) { 733 PERFETTO_DCHECK(false); 734 return; 735 } 736 737 for (auto& iter : tracing_sessions_) { 738 TracingSession& tracing_session = iter.second; 739 TraceConfig::ProducerConfig producer_config; 740 for (auto& config : tracing_session.config.producers()) { 741 if (producer->name_ == config.producer_name()) { 742 producer_config = config; 743 break; 744 } 745 } 746 for (const TraceConfig::DataSource& cfg_data_source : 747 tracing_session.config.data_sources()) { 748 if (cfg_data_source.config().name() == desc.name()) 749 CreateDataSourceInstance(cfg_data_source, producer_config, 750 reg_ds->second, &tracing_session); 751 } 752 } 753 } 754 755 void ServiceImpl::UnregisterDataSource(ProducerID producer_id, 756 const std::string& name) { 757 PERFETTO_DCHECK_THREAD(thread_checker_); 758 PERFETTO_CHECK(producer_id); 759 ProducerEndpointImpl* producer = GetProducer(producer_id); 760 PERFETTO_DCHECK(producer); 761 for (auto& kv : tracing_sessions_) { 762 auto& ds_instances = kv.second.data_source_instances; 763 for (auto it = ds_instances.begin(); it != ds_instances.end();) { 764 if (it->first == producer_id && it->second.data_source_name == name) { 765 DataSourceInstanceID ds_inst_id = it->second.instance_id; 766 producer->TearDownDataSource(ds_inst_id); 767 it = ds_instances.erase(it); 768 } else { 769 ++it; 770 } 771 } // for (data_source_instances) 772 } // for (tracing_session) 773 774 for (auto it = data_sources_.begin(); it != data_sources_.end(); ++it) { 775 if (it->second.producer_id == producer_id && 776 it->second.descriptor.name() == name) { 777 data_sources_.erase(it); 778 return; 779 } 780 } 781 782 PERFETTO_DLOG( 783 "Tried to unregister a non-existent data source \"%s\" for " 784 "producer %" PRIu16, 785 name.c_str(), producer_id); 786 PERFETTO_DCHECK(false); 787 } 788 789 void ServiceImpl::CreateDataSourceInstance( 790 const TraceConfig::DataSource& cfg_data_source, 791 const TraceConfig::ProducerConfig& producer_config, 792 const RegisteredDataSource& data_source, 793 TracingSession* tracing_session) { 794 PERFETTO_DCHECK_THREAD(thread_checker_); 795 ProducerEndpointImpl* producer = GetProducer(data_source.producer_id); 796 PERFETTO_DCHECK(producer); 797 // An existing producer that is not ftrace could have registered itself as 798 // ftrace, we must not enable it in that case. 799 if (lockdown_mode_ && producer->uid_ != uid_) { 800 PERFETTO_DLOG("Lockdown mode: not enabling producer %hu", producer->id_); 801 return; 802 } 803 // TODO(primiano): Add tests for registration ordering 804 // (data sources vs consumers). 805 if (!cfg_data_source.producer_name_filter().empty()) { 806 if (std::find(cfg_data_source.producer_name_filter().begin(), 807 cfg_data_source.producer_name_filter().end(), 808 producer->name_) == 809 cfg_data_source.producer_name_filter().end()) { 810 PERFETTO_DLOG("Data source: %s is filtered out for producer: %s", 811 cfg_data_source.config().name().c_str(), 812 producer->name_.c_str()); 813 return; 814 } 815 } 816 817 // Create a copy of the DataSourceConfig specified in the trace config. This 818 // will be passed to the producer after translating the |target_buffer| id. 819 // The |target_buffer| parameter passed by the consumer in the trace config is 820 // relative to the buffers declared in the same trace config. This has to be 821 // translated to the global BufferID before passing it to the producers, which 822 // don't know anything about tracing sessions and consumers. 823 824 DataSourceConfig ds_config = cfg_data_source.config(); // Deliberate copy. 825 ds_config.set_trace_duration_ms(tracing_session->config.duration_ms()); 826 auto relative_buffer_id = ds_config.target_buffer(); 827 if (relative_buffer_id >= tracing_session->num_buffers()) { 828 PERFETTO_LOG( 829 "The TraceConfig for DataSource %s specified a target_buffer out of " 830 "bound (%d). Skipping it.", 831 ds_config.name().c_str(), relative_buffer_id); 832 return; 833 } 834 BufferID global_id = tracing_session->buffers_index[relative_buffer_id]; 835 PERFETTO_DCHECK(global_id); 836 ds_config.set_target_buffer(global_id); 837 838 DataSourceInstanceID inst_id = ++last_data_source_instance_id_; 839 tracing_session->data_source_instances.emplace( 840 producer->id_, 841 DataSourceInstance{inst_id, data_source.descriptor.name()}); 842 PERFETTO_DLOG("Starting data source %s with target buffer %" PRIu16, 843 ds_config.name().c_str(), global_id); 844 if (!producer->shared_memory()) { 845 // Determine the SMB page size. Must be an integer multiple of 4k. 846 size_t page_size = std::min<size_t>(producer_config.page_size_kb() * 1024, 847 SharedMemoryABI::kMaxPageSize); 848 if (page_size < base::kPageSize || page_size % base::kPageSize != 0) 849 page_size = kDefaultShmPageSize; 850 producer->shared_buffer_page_size_kb_ = page_size / 1024; 851 852 // Determine the SMB size. Must be an integer multiple of the SMB page size. 853 // The decisional tree is as follows: 854 // 1. Give priority to what defined in the trace config. 855 // 2. If unset give priority to the hint passed by the producer. 856 // 3. Keep within bounds and ensure it's a multiple of the page size. 857 size_t shm_size = producer_config.shm_size_kb() * 1024; 858 if (shm_size == 0) 859 shm_size = producer->shmem_size_hint_bytes_; 860 shm_size = std::min<size_t>(shm_size, kMaxShmSize); 861 if (shm_size < page_size || shm_size % page_size) 862 shm_size = kDefaultShmSize; 863 864 // TODO(primiano): right now Create() will suicide in case of OOM if the 865 // mmap fails. We should instead gracefully fail the request and tell the 866 // client to go away. 867 auto shared_memory = shm_factory_->CreateSharedMemory(shm_size); 868 producer->SetSharedMemory(std::move(shared_memory)); 869 producer->OnTracingSetup(); 870 UpdateMemoryGuardrail(); 871 } 872 producer->CreateDataSourceInstance(inst_id, ds_config); 873 } 874 875 // Note: all the fields % *_trusted ones are untrusted, as in, the Producer 876 // might be lying / returning garbage contents. |src| and |size| can be trusted 877 // in terms of being a valid pointer, but not the contents. 878 void ServiceImpl::CopyProducerPageIntoLogBuffer(ProducerID producer_id_trusted, 879 uid_t producer_uid_trusted, 880 WriterID writer_id, 881 ChunkID chunk_id, 882 BufferID buffer_id, 883 uint16_t num_fragments, 884 uint8_t chunk_flags, 885 const uint8_t* src, 886 size_t size) { 887 PERFETTO_DCHECK_THREAD(thread_checker_); 888 TraceBuffer* buf = GetBufferByID(buffer_id); 889 if (!buf) { 890 PERFETTO_DLOG("Could not find target buffer %" PRIu16 891 " for producer %" PRIu16, 892 buffer_id, producer_id_trusted); 893 return; 894 } 895 896 // TODO(primiano): we should have a set<BufferID> |allowed_target_buffers| in 897 // ProducerEndpointImpl to perform ACL checks and prevent that the Producer 898 // passes a |target_buffer| which is valid, but that we never asked it to use. 899 // Essentially we want to prevent a malicious producer to inject data into a 900 // log buffer that has nothing to do with it. 901 902 buf->CopyChunkUntrusted(producer_id_trusted, producer_uid_trusted, writer_id, 903 chunk_id, num_fragments, chunk_flags, src, size); 904 } 905 906 void ServiceImpl::ApplyChunkPatches( 907 ProducerID producer_id_trusted, 908 const std::vector<CommitDataRequest::ChunkToPatch>& chunks_to_patch) { 909 PERFETTO_DCHECK_THREAD(thread_checker_); 910 911 for (const auto& chunk : chunks_to_patch) { 912 const ChunkID chunk_id = static_cast<ChunkID>(chunk.chunk_id()); 913 const WriterID writer_id = static_cast<WriterID>(chunk.writer_id()); 914 TraceBuffer* buf = 915 GetBufferByID(static_cast<BufferID>(chunk.target_buffer())); 916 static_assert(std::numeric_limits<ChunkID>::max() == kMaxChunkID, 917 "Add a '|| chunk_id > kMaxChunkID' below if this fails"); 918 if (!writer_id || writer_id > kMaxWriterID || !buf) { 919 PERFETTO_DLOG( 920 "Received invalid chunks_to_patch request from Producer: %" PRIu16 921 ", BufferID: %" PRIu32 " ChunkdID: %" PRIu32 " WriterID: %" PRIu16, 922 producer_id_trusted, chunk.target_buffer(), chunk_id, writer_id); 923 continue; 924 } 925 // Speculate on the fact that there are going to be a limited amount of 926 // patches per request, so we can allocate the |patches| array on the stack. 927 std::array<TraceBuffer::Patch, 1024> patches; // Uninitialized. 928 if (chunk.patches().size() > patches.size()) { 929 PERFETTO_DLOG("Too many patches (%zu) batched in the same request", 930 patches.size()); 931 PERFETTO_DCHECK(false); 932 continue; 933 } 934 935 size_t i = 0; 936 for (const auto& patch : chunk.patches()) { 937 const std::string& patch_data = patch.data(); 938 if (patch_data.size() != patches[i].data.size()) { 939 PERFETTO_DLOG("Received patch from producer: %" PRIu16 940 " of unexpected size %zu", 941 producer_id_trusted, patch_data.size()); 942 continue; 943 } 944 patches[i].offset_untrusted = patch.offset(); 945 memcpy(&patches[i].data[0], patch_data.data(), patches[i].data.size()); 946 i++; 947 } 948 buf->TryPatchChunkContents(producer_id_trusted, writer_id, chunk_id, 949 &patches[0], i, chunk.has_more_patches()); 950 } 951 } 952 953 ServiceImpl::TracingSession* ServiceImpl::GetTracingSession( 954 TracingSessionID tsid) { 955 PERFETTO_DCHECK_THREAD(thread_checker_); 956 auto it = tsid ? tracing_sessions_.find(tsid) : tracing_sessions_.end(); 957 if (it == tracing_sessions_.end()) 958 return nullptr; 959 return &it->second; 960 } 961 962 ProducerID ServiceImpl::GetNextProducerID() { 963 PERFETTO_DCHECK_THREAD(thread_checker_); 964 PERFETTO_CHECK(producers_.size() < kMaxProducerID); 965 do { 966 ++last_producer_id_; 967 } while (producers_.count(last_producer_id_) || last_producer_id_ == 0); 968 PERFETTO_DCHECK(last_producer_id_ > 0 && last_producer_id_ <= kMaxProducerID); 969 return last_producer_id_; 970 } 971 972 TraceBuffer* ServiceImpl::GetBufferByID(BufferID buffer_id) { 973 auto buf_iter = buffers_.find(buffer_id); 974 if (buf_iter == buffers_.end()) 975 return nullptr; 976 return &*buf_iter->second; 977 } 978 979 void ServiceImpl::UpdateMemoryGuardrail() { 980 #if !PERFETTO_BUILDFLAG(PERFETTO_CHROMIUM_BUILD) && \ 981 !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) 982 uint64_t total_buffer_bytes = 0; 983 984 // Sum up all the shared memory buffers. 985 for (const auto& id_to_producer : producers_) { 986 if (id_to_producer.second->shared_memory()) 987 total_buffer_bytes += id_to_producer.second->shared_memory()->size(); 988 } 989 990 // Sum up all the trace buffers. 991 for (const auto& id_to_buffer : buffers_) { 992 total_buffer_bytes += id_to_buffer.second->size(); 993 } 994 995 // Set the guard rail to 32MB + the sum of all the buffers over a 30 second 996 // interval. 997 uint64_t guardrail = 32 * 1024 * 1024 + total_buffer_bytes; 998 base::Watchdog::GetInstance()->SetMemoryLimit(guardrail, 30 * 1000); 999 #endif 1000 } 1001 1002 void ServiceImpl::MaybeSnapshotClocks(TracingSession* tracing_session, 1003 std::vector<TracePacket>* packets) { 1004 base::TimeMillis now = base::GetWallTimeMs(); 1005 if (now < tracing_session->last_clock_snapshot + kClockSnapshotInterval) 1006 return; 1007 tracing_session->last_clock_snapshot = now; 1008 1009 protos::TrustedPacket packet; 1010 protos::ClockSnapshot* clock_snapshot = packet.mutable_clock_snapshot(); 1011 1012 #if !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) 1013 struct { 1014 clockid_t id; 1015 protos::ClockSnapshot::Clock::Type type; 1016 struct timespec ts; 1017 } clocks[] = { 1018 {CLOCK_BOOTTIME, protos::ClockSnapshot::Clock::BOOTTIME, {0, 0}}, 1019 {CLOCK_REALTIME_COARSE, 1020 protos::ClockSnapshot::Clock::REALTIME_COARSE, 1021 {0, 0}}, 1022 {CLOCK_MONOTONIC_COARSE, 1023 protos::ClockSnapshot::Clock::MONOTONIC_COARSE, 1024 {0, 0}}, 1025 {CLOCK_REALTIME, protos::ClockSnapshot::Clock::REALTIME, {0, 0}}, 1026 {CLOCK_MONOTONIC, protos::ClockSnapshot::Clock::MONOTONIC, {0, 0}}, 1027 {CLOCK_MONOTONIC_RAW, 1028 protos::ClockSnapshot::Clock::MONOTONIC_RAW, 1029 {0, 0}}, 1030 {CLOCK_PROCESS_CPUTIME_ID, 1031 protos::ClockSnapshot::Clock::PROCESS_CPUTIME, 1032 {0, 0}}, 1033 {CLOCK_THREAD_CPUTIME_ID, 1034 protos::ClockSnapshot::Clock::THREAD_CPUTIME, 1035 {0, 0}}, 1036 }; 1037 // First snapshot all the clocks as atomically as we can. 1038 for (auto& clock : clocks) { 1039 if (clock_gettime(clock.id, &clock.ts) == -1) 1040 PERFETTO_DLOG("clock_gettime failed for clock %d", clock.id); 1041 } 1042 for (auto& clock : clocks) { 1043 protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks(); 1044 c->set_type(clock.type); 1045 c->set_timestamp( 1046 static_cast<uint64_t>(base::FromPosixTimespec(clock.ts).count())); 1047 } 1048 #else // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) 1049 protos::ClockSnapshot::Clock* c = clock_snapshot->add_clocks(); 1050 c->set_type(protos::ClockSnapshot::Clock::MONOTONIC); 1051 c->set_timestamp(static_cast<uint64_t>(base::GetWallTimeNs().count())); 1052 #endif // !PERFETTO_BUILDFLAG(PERFETTO_OS_MACOSX) 1053 1054 packet.set_trusted_uid(static_cast<int32_t>(uid_)); 1055 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize())); 1056 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data())); 1057 packets->emplace_back(); 1058 packets->back().AddSlice(std::move(slice)); 1059 } 1060 1061 void ServiceImpl::MaybeSnapshotStats(TracingSession* tracing_session, 1062 std::vector<TracePacket>* packets) { 1063 base::TimeMillis now = base::GetWallTimeMs(); 1064 if (now < tracing_session->last_stats_snapshot + kStatsSnapshotInterval) 1065 return; 1066 tracing_session->last_stats_snapshot = now; 1067 1068 protos::TrustedPacket packet; 1069 packet.set_trusted_uid(static_cast<int32_t>(uid_)); 1070 1071 protos::TraceStats* trace_stats = packet.mutable_trace_stats(); 1072 trace_stats->set_producers_connected( 1073 static_cast<uint32_t>(producers_.size())); 1074 trace_stats->set_producers_seen(last_producer_id_); 1075 trace_stats->set_data_sources_registered( 1076 static_cast<uint32_t>(data_sources_.size())); 1077 trace_stats->set_data_sources_seen(last_data_source_instance_id_); 1078 trace_stats->set_tracing_sessions( 1079 static_cast<uint32_t>(tracing_sessions_.size())); 1080 trace_stats->set_total_buffers(static_cast<uint32_t>(buffers_.size())); 1081 1082 for (BufferID buf_id : tracing_session->buffers_index) { 1083 TraceBuffer* buf = GetBufferByID(buf_id); 1084 if (!buf) { 1085 PERFETTO_DCHECK(false); 1086 continue; 1087 } 1088 auto* buf_stats_proto = trace_stats->add_buffer_stats(); 1089 const TraceBuffer::Stats& buf_stats = buf->stats(); 1090 buf_stats_proto->set_bytes_written(buf_stats.bytes_written); 1091 buf_stats_proto->set_chunks_written(buf_stats.chunks_written); 1092 buf_stats_proto->set_chunks_overwritten(buf_stats.chunks_overwritten); 1093 buf_stats_proto->set_write_wrap_count(buf_stats.write_wrap_count); 1094 buf_stats_proto->set_patches_succeeded(buf_stats.patches_succeeded); 1095 buf_stats_proto->set_patches_failed(buf_stats.patches_failed); 1096 buf_stats_proto->set_readaheads_succeeded(buf_stats.readaheads_succeeded); 1097 buf_stats_proto->set_readaheads_failed(buf_stats.readaheads_failed); 1098 buf_stats_proto->set_abi_violations(buf_stats.abi_violations); 1099 } // for (buf in session). 1100 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize())); 1101 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data())); 1102 packets->emplace_back(); 1103 packets->back().AddSlice(std::move(slice)); 1104 } 1105 1106 void ServiceImpl::MaybeEmitTraceConfig(TracingSession* tracing_session, 1107 std::vector<TracePacket>* packets) { 1108 if (tracing_session->did_emit_config) 1109 return; 1110 tracing_session->did_emit_config = true; 1111 protos::TrustedPacket packet; 1112 tracing_session->config.ToProto(packet.mutable_trace_config()); 1113 packet.set_trusted_uid(static_cast<int32_t>(uid_)); 1114 Slice slice = Slice::Allocate(static_cast<size_t>(packet.ByteSize())); 1115 PERFETTO_CHECK(packet.SerializeWithCachedSizesToArray(slice.own_data())); 1116 packets->emplace_back(); 1117 packets->back().AddSlice(std::move(slice)); 1118 } 1119 1120 //////////////////////////////////////////////////////////////////////////////// 1121 // ServiceImpl::ConsumerEndpointImpl implementation 1122 //////////////////////////////////////////////////////////////////////////////// 1123 1124 ServiceImpl::ConsumerEndpointImpl::ConsumerEndpointImpl( 1125 ServiceImpl* service, 1126 base::TaskRunner* task_runner, 1127 Consumer* consumer) 1128 : task_runner_(task_runner), 1129 service_(service), 1130 consumer_(consumer), 1131 weak_ptr_factory_(this) {} 1132 1133 ServiceImpl::ConsumerEndpointImpl::~ConsumerEndpointImpl() { 1134 service_->DisconnectConsumer(this); 1135 consumer_->OnDisconnect(); 1136 } 1137 1138 void ServiceImpl::ConsumerEndpointImpl::NotifyOnTracingDisabled() { 1139 PERFETTO_DCHECK_THREAD(thread_checker_); 1140 auto weak_this = GetWeakPtr(); 1141 task_runner_->PostTask([weak_this] { 1142 if (weak_this) 1143 weak_this->consumer_->OnTracingDisabled(); 1144 }); 1145 } 1146 1147 void ServiceImpl::ConsumerEndpointImpl::EnableTracing(const TraceConfig& cfg, 1148 base::ScopedFile fd) { 1149 PERFETTO_DCHECK_THREAD(thread_checker_); 1150 if (!service_->EnableTracing(this, cfg, std::move(fd))) 1151 NotifyOnTracingDisabled(); 1152 } 1153 1154 void ServiceImpl::ConsumerEndpointImpl::DisableTracing() { 1155 PERFETTO_DCHECK_THREAD(thread_checker_); 1156 if (!tracing_session_id_) { 1157 PERFETTO_LOG("Consumer called DisableTracing() but tracing was not active"); 1158 return; 1159 } 1160 service_->DisableTracing(tracing_session_id_); 1161 } 1162 1163 void ServiceImpl::ConsumerEndpointImpl::ReadBuffers() { 1164 PERFETTO_DCHECK_THREAD(thread_checker_); 1165 if (!tracing_session_id_) { 1166 PERFETTO_LOG("Consumer called ReadBuffers() but tracing was not active"); 1167 return; 1168 } 1169 service_->ReadBuffers(tracing_session_id_, this); 1170 } 1171 1172 void ServiceImpl::ConsumerEndpointImpl::FreeBuffers() { 1173 PERFETTO_DCHECK_THREAD(thread_checker_); 1174 if (!tracing_session_id_) { 1175 PERFETTO_LOG("Consumer called FreeBuffers() but tracing was not active"); 1176 return; 1177 } 1178 service_->FreeBuffers(tracing_session_id_); 1179 tracing_session_id_ = 0; 1180 } 1181 1182 void ServiceImpl::ConsumerEndpointImpl::Flush(uint32_t timeout_ms, 1183 FlushCallback callback) { 1184 PERFETTO_DCHECK_THREAD(thread_checker_); 1185 if (!tracing_session_id_) { 1186 PERFETTO_LOG("Consumer called Flush() but tracing was not active"); 1187 return; 1188 } 1189 service_->Flush(tracing_session_id_, timeout_ms, callback); 1190 } 1191 1192 base::WeakPtr<ServiceImpl::ConsumerEndpointImpl> 1193 ServiceImpl::ConsumerEndpointImpl::GetWeakPtr() { 1194 PERFETTO_DCHECK_THREAD(thread_checker_); 1195 return weak_ptr_factory_.GetWeakPtr(); 1196 } 1197 1198 //////////////////////////////////////////////////////////////////////////////// 1199 // ServiceImpl::ProducerEndpointImpl implementation 1200 //////////////////////////////////////////////////////////////////////////////// 1201 1202 ServiceImpl::ProducerEndpointImpl::ProducerEndpointImpl( 1203 ProducerID id, 1204 uid_t uid, 1205 ServiceImpl* service, 1206 base::TaskRunner* task_runner, 1207 Producer* producer, 1208 const std::string& producer_name) 1209 : id_(id), 1210 uid_(uid), 1211 service_(service), 1212 task_runner_(task_runner), 1213 producer_(producer), 1214 name_(producer_name), 1215 weak_ptr_factory_(this) {} 1216 1217 ServiceImpl::ProducerEndpointImpl::~ProducerEndpointImpl() { 1218 service_->DisconnectProducer(id_); 1219 producer_->OnDisconnect(); 1220 } 1221 1222 void ServiceImpl::ProducerEndpointImpl::RegisterDataSource( 1223 const DataSourceDescriptor& desc) { 1224 PERFETTO_DCHECK_THREAD(thread_checker_); 1225 if (desc.name().empty()) { 1226 PERFETTO_DLOG("Received RegisterDataSource() with empty name"); 1227 return; 1228 } 1229 1230 service_->RegisterDataSource(id_, desc); 1231 } 1232 1233 void ServiceImpl::ProducerEndpointImpl::UnregisterDataSource( 1234 const std::string& name) { 1235 PERFETTO_DCHECK_THREAD(thread_checker_); 1236 service_->UnregisterDataSource(id_, name); 1237 } 1238 1239 void ServiceImpl::ProducerEndpointImpl::CommitData( 1240 const CommitDataRequest& req_untrusted, 1241 CommitDataCallback callback) { 1242 PERFETTO_DCHECK_THREAD(thread_checker_); 1243 1244 if (!shared_memory_) { 1245 PERFETTO_DLOG( 1246 "Attempted to commit data before the shared memory was allocated."); 1247 return; 1248 } 1249 PERFETTO_DCHECK(shmem_abi_.is_valid()); 1250 for (const auto& entry : req_untrusted.chunks_to_move()) { 1251 const uint32_t page_idx = entry.page(); 1252 if (page_idx >= shmem_abi_.num_pages()) 1253 continue; // A buggy or malicious producer. 1254 1255 SharedMemoryABI::Chunk chunk = 1256 shmem_abi_.TryAcquireChunkForReading(page_idx, entry.chunk()); 1257 if (!chunk.is_valid()) { 1258 PERFETTO_DLOG("Asked to move chunk %d:%d, but it's not complete", 1259 entry.page(), entry.chunk()); 1260 continue; 1261 } 1262 1263 // TryAcquireChunkForReading() has load-acquire semantics. Once acquired, 1264 // the ABI contract expects the producer to not touch the chunk anymore 1265 // (until the service marks that as free). This is why all the reads below 1266 // are just memory_order_relaxed. Also, the code here assumes that all this 1267 // data can be malicious and just gives up if anything is malformed. 1268 BufferID buffer_id = static_cast<BufferID>(entry.target_buffer()); 1269 const SharedMemoryABI::ChunkHeader& chunk_header = *chunk.header(); 1270 WriterID writer_id = chunk_header.writer_id.load(std::memory_order_relaxed); 1271 ChunkID chunk_id = chunk_header.chunk_id.load(std::memory_order_relaxed); 1272 auto packets = chunk_header.packets.load(std::memory_order_relaxed); 1273 uint16_t num_fragments = packets.count; 1274 uint8_t chunk_flags = packets.flags; 1275 1276 service_->CopyProducerPageIntoLogBuffer( 1277 id_, uid_, writer_id, chunk_id, buffer_id, num_fragments, chunk_flags, 1278 chunk.payload_begin(), chunk.payload_size()); 1279 1280 // This one has release-store semantics. 1281 shmem_abi_.ReleaseChunkAsFree(std::move(chunk)); 1282 } // for(chunks_to_move) 1283 1284 service_->ApplyChunkPatches(id_, req_untrusted.chunks_to_patch()); 1285 1286 if (req_untrusted.flush_request_id()) { 1287 service_->NotifyFlushDoneForProducer(id_, req_untrusted.flush_request_id()); 1288 } 1289 1290 // Keep this invocation last. ProducerIPCService::CommitData() relies on this 1291 // callback being invoked within the same callstack and not posted. If this 1292 // changes, the code there needs to be changed accordingly. 1293 if (callback) 1294 callback(); 1295 } 1296 1297 void ServiceImpl::ProducerEndpointImpl::SetSharedMemory( 1298 std::unique_ptr<SharedMemory> shared_memory) { 1299 PERFETTO_DCHECK(!shared_memory_ && !shmem_abi_.is_valid()); 1300 shared_memory_ = std::move(shared_memory); 1301 shmem_abi_.Initialize(reinterpret_cast<uint8_t*>(shared_memory_->start()), 1302 shared_memory_->size(), 1303 shared_buffer_page_size_kb() * 1024); 1304 } 1305 1306 SharedMemory* ServiceImpl::ProducerEndpointImpl::shared_memory() const { 1307 PERFETTO_DCHECK_THREAD(thread_checker_); 1308 return shared_memory_.get(); 1309 } 1310 1311 size_t ServiceImpl::ProducerEndpointImpl::shared_buffer_page_size_kb() const { 1312 return shared_buffer_page_size_kb_; 1313 } 1314 1315 void ServiceImpl::ProducerEndpointImpl::TearDownDataSource( 1316 DataSourceInstanceID ds_inst_id) { 1317 // TODO(primiano): When we'll support tearing down the SMB, at this point we 1318 // should send the Producer a TearDownTracing if all its data sources have 1319 // been disabled (see b/77532839 and aosp/655179 PS1). 1320 PERFETTO_DCHECK_THREAD(thread_checker_); 1321 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 1322 task_runner_->PostTask([weak_this, ds_inst_id] { 1323 if (weak_this) 1324 weak_this->producer_->TearDownDataSourceInstance(ds_inst_id); 1325 }); 1326 } 1327 1328 SharedMemoryArbiterImpl* 1329 ServiceImpl::ProducerEndpointImpl::GetOrCreateShmemArbiter() { 1330 PERFETTO_DCHECK_THREAD(thread_checker_); 1331 if (!inproc_shmem_arbiter_) { 1332 inproc_shmem_arbiter_.reset(new SharedMemoryArbiterImpl( 1333 shared_memory_->start(), shared_memory_->size(), 1334 shared_buffer_page_size_kb_ * 1024, this, task_runner_)); 1335 } 1336 return inproc_shmem_arbiter_.get(); 1337 } 1338 1339 std::unique_ptr<TraceWriter> 1340 ServiceImpl::ProducerEndpointImpl::CreateTraceWriter(BufferID buf_id) { 1341 PERFETTO_DCHECK_THREAD(thread_checker_); 1342 return GetOrCreateShmemArbiter()->CreateTraceWriter(buf_id); 1343 } 1344 1345 void ServiceImpl::ProducerEndpointImpl::OnTracingSetup() { 1346 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 1347 task_runner_->PostTask([weak_this] { 1348 if (weak_this) 1349 weak_this->producer_->OnTracingSetup(); 1350 }); 1351 } 1352 1353 void ServiceImpl::ProducerEndpointImpl::Flush( 1354 FlushRequestID flush_request_id, 1355 const std::vector<DataSourceInstanceID>& data_sources) { 1356 PERFETTO_DCHECK_THREAD(thread_checker_); 1357 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 1358 task_runner_->PostTask([weak_this, flush_request_id, data_sources] { 1359 if (weak_this) { 1360 weak_this->producer_->Flush(flush_request_id, data_sources.data(), 1361 data_sources.size()); 1362 } 1363 }); 1364 } 1365 1366 void ServiceImpl::ProducerEndpointImpl::CreateDataSourceInstance( 1367 DataSourceInstanceID ds_id, 1368 const DataSourceConfig& config) { 1369 PERFETTO_DCHECK_THREAD(thread_checker_); 1370 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 1371 task_runner_->PostTask([weak_this, ds_id, config] { 1372 if (weak_this) 1373 weak_this->producer_->CreateDataSourceInstance(ds_id, std::move(config)); 1374 }); 1375 } 1376 1377 void ServiceImpl::ProducerEndpointImpl::NotifyFlushComplete(FlushRequestID id) { 1378 PERFETTO_DCHECK_THREAD(thread_checker_); 1379 return GetOrCreateShmemArbiter()->NotifyFlushComplete(id); 1380 } 1381 1382 //////////////////////////////////////////////////////////////////////////////// 1383 // ServiceImpl::TracingSession implementation 1384 //////////////////////////////////////////////////////////////////////////////// 1385 1386 ServiceImpl::TracingSession::TracingSession(ConsumerEndpointImpl* consumer_ptr, 1387 const TraceConfig& new_config) 1388 : consumer(consumer_ptr), config(new_config) {} 1389 1390 } // namespace perfetto 1391