1 /* 2 * Copyright (C) 2019 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "perfetto/tracing/core/startup_trace_writer.h" 18 19 #include <numeric> 20 21 #include "perfetto/base/logging.h" 22 #include "perfetto/protozero/proto_utils.h" 23 #include "perfetto/trace/trace_packet.pbzero.h" 24 #include "perfetto/tracing/core/shared_memory_abi.h" 25 #include "perfetto/tracing/core/startup_trace_writer_registry.h" 26 #include "src/tracing/core/patch_list.h" 27 #include "src/tracing/core/shared_memory_arbiter_impl.h" 28 29 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader; 30 31 namespace perfetto { 32 33 namespace { 34 35 SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter, 36 WriterID writer_id, 37 ChunkID chunk_id, 38 bool fragmenting_packet) { 39 ChunkHeader::Packets packets = {}; 40 if (fragmenting_packet) { 41 packets.count = 1; 42 packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk; 43 } 44 45 // The memory order of the stores below doesn't really matter. This |header| 46 // is just a local temporary object. The GetNewChunk() call below will copy it 47 // into the shared buffer with the proper barriers. 48 ChunkHeader header = {}; 49 header.writer_id.store(writer_id, std::memory_order_relaxed); 50 header.chunk_id.store(chunk_id, std::memory_order_relaxed); 51 header.packets.store(packets, std::memory_order_relaxed); 52 53 return arbiter->GetNewChunk(header); 54 } 55 56 class LocalBufferReader { 57 public: 58 LocalBufferReader(protozero::ScatteredHeapBuffer* buffer) 59 : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {} 60 61 size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk, 62 size_t num_bytes, 63 size_t cur_payload_size) { 64 PERFETTO_CHECK(target_chunk->payload_size() >= 65 num_bytes + cur_payload_size); 66 uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size; 67 size_t bytes_read = 0; 68 while (bytes_read < num_bytes) { 69 if (cur_slice_ == buffer_slices_.end()) 70 return bytes_read; 71 72 auto cur_slice_range = cur_slice_->GetUsedRange(); 73 74 if (cur_slice_range.size() == cur_slice_offset_) { 75 cur_slice_offset_ = 0; 76 cur_slice_++; 77 continue; 78 } 79 80 size_t read_size = std::min(num_bytes - bytes_read, 81 cur_slice_range.size() - cur_slice_offset_); 82 memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_, 83 read_size); 84 cur_slice_offset_ += read_size; 85 bytes_read += read_size; 86 87 // Should have either read all of the chunk or completed reading now. 88 PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() || 89 bytes_read == num_bytes); 90 } 91 return bytes_read; 92 } 93 94 size_t TotalUsedSize() const { 95 size_t used_size = 0; 96 for (const auto& slice : buffer_slices_) { 97 used_size += slice.GetUsedRange().size(); 98 } 99 return used_size; 100 } 101 102 bool DidReadAllData() const { 103 if (cur_slice_ == buffer_slices_.end()) 104 return true; 105 106 const auto next_slice = cur_slice_ + 1; 107 return next_slice == buffer_slices_.end() && 108 cur_slice_->GetUsedRange().size() == cur_slice_offset_; 109 } 110 111 private: 112 const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_; 113 114 // Iterator pointing to slice in |buffer_slices_| that we're currently reading 115 // from. 116 std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_; 117 // Read offset in the current slice in bytes. 118 size_t cur_slice_offset_ = 0; 119 }; 120 121 } // namespace 122 123 StartupTraceWriter::StartupTraceWriter( 124 std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle) 125 : registry_handle_(std::move(registry_handle)), 126 memory_buffer_(new protozero::ScatteredHeapBuffer()), 127 memory_stream_writer_( 128 new protozero::ScatteredStreamWriter(memory_buffer_.get())) { 129 memory_buffer_->set_writer(memory_stream_writer_.get()); 130 PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_); 131 } 132 133 StartupTraceWriter::StartupTraceWriter( 134 std::unique_ptr<TraceWriter> trace_writer) 135 : was_bound_(true), trace_writer_(std::move(trace_writer)) {} 136 137 StartupTraceWriter::~StartupTraceWriter() { 138 if (registry_handle_) 139 registry_handle_->OnWriterDestroyed(this); 140 } 141 142 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter, 143 BufferID target_buffer) { 144 // Create and destroy trace writer without holding lock, since this will post 145 // a task and task posting may trigger a trace event, which would cause a 146 // deadlock. This may create a few more trace writers than necessary in cases 147 // where a concurrent write is in progress (other than causing some 148 // computational overhead, this is not problematic). 149 auto trace_writer = arbiter->CreateTraceWriter(target_buffer); 150 151 { 152 std::lock_guard<std::mutex> lock(lock_); 153 154 PERFETTO_DCHECK(!trace_writer_); 155 156 // Can't bind while the writer thread is writing. 157 if (write_in_progress_) 158 return false; 159 160 // If there's a pending trace packet, it should have been completed by the 161 // writer thread before write_in_progress_ is reset. 162 if (cur_packet_) { 163 PERFETTO_DCHECK(cur_packet_->is_finalized()); 164 cur_packet_.reset(); 165 } 166 167 trace_writer_ = std::move(trace_writer); 168 ChunkID next_chunk_id = CommitLocalBufferChunks( 169 arbiter, trace_writer_->writer_id(), target_buffer); 170 171 // The real TraceWriter should start writing at the subsequent chunk ID. 172 bool success = trace_writer_->SetFirstChunkId(next_chunk_id); 173 PERFETTO_DCHECK(success); 174 175 memory_stream_writer_.reset(); 176 memory_buffer_.reset(); 177 } 178 179 return true; 180 } 181 182 TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() { 183 PERFETTO_DCHECK_THREAD(writer_thread_checker_); 184 185 // Check if we are already bound without grabbing the lock. This is an 186 // optimization to avoid any locking in the common case where the proxy was 187 // bound some time ago. 188 if (PERFETTO_LIKELY(was_bound_)) { 189 PERFETTO_DCHECK(!cur_packet_); 190 PERFETTO_DCHECK(trace_writer_); 191 return trace_writer_->NewTracePacket(); 192 } 193 194 // Now grab the lock and safely check whether we are still unbound. 195 { 196 std::unique_lock<std::mutex> lock(lock_); 197 if (trace_writer_) { 198 PERFETTO_DCHECK(!cur_packet_); 199 // Set the |was_bound_| flag to avoid locking in future calls to 200 // NewTracePacket(). 201 was_bound_ = true; 202 // Don't hold the lock while calling NewTracePacket() on |trace_writer_|. 203 // This is safe because |trace_writer_| remains valid once set. It also 204 // avoids deadlocks that may be caused by holding the lock while waiting 205 // for a new SMB chunk in |trace_writer_|. 206 lock.unlock(); 207 return trace_writer_->NewTracePacket(); 208 } 209 // Not bound. Make sure it stays this way until the TracePacketHandle goes 210 // out of scope by setting |write_in_progress_|. 211 PERFETTO_DCHECK(!write_in_progress_); 212 write_in_progress_ = true; 213 } 214 215 // Write to the local buffer. 216 if (cur_packet_) { 217 // If we hit this, the caller is calling NewTracePacket() without having 218 // finalized the previous packet. 219 PERFETTO_DCHECK(cur_packet_->is_finalized()); 220 } else { 221 cur_packet_.reset(new protos::pbzero::TracePacket()); 222 } 223 cur_packet_->Reset(memory_stream_writer_.get()); 224 TraceWriter::TracePacketHandle handle(cur_packet_.get()); 225 // |this| outlives the packet handle. 226 handle.set_finalization_listener(this); 227 return handle; 228 } 229 230 void StartupTraceWriter::Flush(std::function<void()> callback) { 231 PERFETTO_DCHECK_THREAD(writer_thread_checker_); 232 // It's fine to check |was_bound_| instead of acquiring the lock because 233 // |trace_writer_| will only need flushing after the first trace packet was 234 // written to it and |was_bound_| is set. 235 if (PERFETTO_LIKELY(was_bound_)) { 236 PERFETTO_DCHECK(trace_writer_); 237 return trace_writer_->Flush(std::move(callback)); 238 } 239 240 // Can't flush while unbound. 241 if (callback) 242 callback(); 243 } 244 245 WriterID StartupTraceWriter::writer_id() const { 246 PERFETTO_DCHECK_THREAD(writer_thread_checker_); 247 // We can't acquire the lock because this is a const method. So we'll only 248 // proxy to |trace_writer_| once we have written the first packet to it 249 // instead. 250 if (PERFETTO_LIKELY(was_bound_)) { 251 PERFETTO_DCHECK(trace_writer_); 252 return trace_writer_->writer_id(); 253 } 254 return 0; 255 } 256 257 uint64_t StartupTraceWriter::written() const { 258 PERFETTO_DCHECK_THREAD(writer_thread_checker_); 259 // We can't acquire the lock because this is a const method. So we'll only 260 // proxy to |trace_writer_| once we have written the first packet to it 261 // instead. 262 if (PERFETTO_LIKELY(was_bound_)) { 263 PERFETTO_DCHECK(trace_writer_); 264 return trace_writer_->written(); 265 } 266 return 0; 267 } 268 269 size_t StartupTraceWriter::used_buffer_size() { 270 PERFETTO_DCHECK_THREAD(writer_thread_checker_); 271 if (PERFETTO_LIKELY(was_bound_)) 272 return 0; 273 274 std::lock_guard<std::mutex> lock(lock_); 275 if (trace_writer_) 276 return 0; 277 278 size_t used_size = 0; 279 memory_buffer_->AdjustUsedSizeOfCurrentSlice(); 280 for (const auto& slice : memory_buffer_->slices()) { 281 used_size += slice.GetUsedRange().size(); 282 } 283 return used_size; 284 } 285 286 void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) { 287 PERFETTO_DCHECK(cur_packet_.get() == message); 288 PERFETTO_DCHECK(cur_packet_->is_finalized()); 289 // Finalize() is a no-op because the packet is already finalized. 290 uint32_t packet_size = cur_packet_->Finalize(); 291 packet_sizes_.push_back(packet_size); 292 293 // Write is complete, reset the flag to allow binding. 294 std::lock_guard<std::mutex> lock(lock_); 295 PERFETTO_DCHECK(write_in_progress_); 296 write_in_progress_ = false; 297 } 298 299 ChunkID StartupTraceWriter::CommitLocalBufferChunks( 300 SharedMemoryArbiterImpl* arbiter, 301 WriterID writer_id, 302 BufferID target_buffer) { 303 // TODO(eseckler): Write and commit these chunks asynchronously. This would 304 // require that the service is informed of the missing initial chunks, e.g. by 305 // committing our first chunk here before the new trace writer has a chance to 306 // commit its first chunk. Otherwise the service wouldn't know to wait for our 307 // chunks. 308 309 if (packet_sizes_.empty() || !writer_id) 310 return 0; 311 312 memory_buffer_->AdjustUsedSizeOfCurrentSlice(); 313 LocalBufferReader local_buffer_reader(memory_buffer_.get()); 314 315 PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() == 316 std::accumulate(packet_sizes_.begin(), packet_sizes_.end(), 317 static_cast<size_t>(0u))); 318 319 ChunkID next_chunk_id = 0; 320 SharedMemoryABI::Chunk cur_chunk = 321 NewChunk(arbiter, writer_id, next_chunk_id++, false); 322 323 size_t max_payload_size = cur_chunk.payload_size(); 324 size_t cur_payload_size = 0; 325 uint16_t cur_num_packets = 0; 326 size_t total_num_packets = packet_sizes_.size(); 327 PatchList empty_patch_list; 328 for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) { 329 uint32_t packet_size = packet_sizes_[packet_idx]; 330 uint32_t remaining_packet_size = packet_size; 331 ++cur_num_packets; 332 do { 333 uint32_t fragment_size = static_cast<uint32_t>( 334 std::min(static_cast<size_t>(remaining_packet_size), 335 max_payload_size - cur_payload_size - 336 SharedMemoryABI::kPacketHeaderSize)); 337 // Write packet header, i.e. the fragment size. 338 protozero::proto_utils::WriteRedundantVarInt( 339 fragment_size, cur_chunk.payload_begin() + cur_payload_size); 340 cur_payload_size += SharedMemoryABI::kPacketHeaderSize; 341 342 // Copy packet content into the chunk. 343 size_t bytes_read = local_buffer_reader.ReadBytes( 344 &cur_chunk, fragment_size, cur_payload_size); 345 PERFETTO_DCHECK(bytes_read == fragment_size); 346 347 cur_payload_size += fragment_size; 348 remaining_packet_size -= fragment_size; 349 350 bool last_write = 351 packet_idx == total_num_packets - 1 && remaining_packet_size == 0; 352 353 // We should return the current chunk if we've filled its payload, reached 354 // the maximum number of packets, or wrote everything we wanted to. 355 bool return_chunk = 356 cur_payload_size >= 357 max_payload_size - SharedMemoryABI::kPacketHeaderSize || 358 cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write; 359 360 if (return_chunk) { 361 auto new_packet_count = 362 cur_chunk.IncreasePacketCountTo(cur_num_packets); 363 PERFETTO_DCHECK(new_packet_count == cur_num_packets); 364 365 bool is_fragmenting = remaining_packet_size > 0; 366 if (is_fragmenting) { 367 PERFETTO_DCHECK(cur_payload_size == max_payload_size); 368 cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk); 369 } 370 371 arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer, 372 &empty_patch_list); 373 374 // Avoid creating a new chunk after the last write. 375 if (!last_write) { 376 cur_chunk = 377 NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting); 378 max_payload_size = cur_chunk.payload_size(); 379 cur_payload_size = 0; 380 cur_num_packets = is_fragmenting ? 1 : 0; 381 } else { 382 PERFETTO_DCHECK(!is_fragmenting); 383 } 384 } 385 } while (remaining_packet_size > 0); 386 } 387 388 // The last chunk should have been returned. 389 PERFETTO_DCHECK(!cur_chunk.is_valid()); 390 // We should have read all data from the local buffer. 391 PERFETTO_DCHECK(local_buffer_reader.DidReadAllData()); 392 393 return next_chunk_id; 394 } 395 396 } // namespace perfetto 397