Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright (C) 2019 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "perfetto/tracing/core/startup_trace_writer.h"
     18 
     19 #include <numeric>
     20 
     21 #include "perfetto/base/logging.h"
     22 #include "perfetto/protozero/proto_utils.h"
     23 #include "perfetto/trace/trace_packet.pbzero.h"
     24 #include "perfetto/tracing/core/shared_memory_abi.h"
     25 #include "perfetto/tracing/core/startup_trace_writer_registry.h"
     26 #include "src/tracing/core/patch_list.h"
     27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
     28 
     29 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
     30 
     31 namespace perfetto {
     32 
     33 namespace {
     34 
     35 SharedMemoryABI::Chunk NewChunk(SharedMemoryArbiterImpl* arbiter,
     36                                 WriterID writer_id,
     37                                 ChunkID chunk_id,
     38                                 bool fragmenting_packet) {
     39   ChunkHeader::Packets packets = {};
     40   if (fragmenting_packet) {
     41     packets.count = 1;
     42     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
     43   }
     44 
     45   // The memory order of the stores below doesn't really matter. This |header|
     46   // is just a local temporary object. The GetNewChunk() call below will copy it
     47   // into the shared buffer with the proper barriers.
     48   ChunkHeader header = {};
     49   header.writer_id.store(writer_id, std::memory_order_relaxed);
     50   header.chunk_id.store(chunk_id, std::memory_order_relaxed);
     51   header.packets.store(packets, std::memory_order_relaxed);
     52 
     53   return arbiter->GetNewChunk(header);
     54 }
     55 
     56 class LocalBufferReader {
     57  public:
     58   LocalBufferReader(protozero::ScatteredHeapBuffer* buffer)
     59       : buffer_slices_(buffer->slices()), cur_slice_(buffer_slices_.begin()) {}
     60 
     61   size_t ReadBytes(SharedMemoryABI::Chunk* target_chunk,
     62                    size_t num_bytes,
     63                    size_t cur_payload_size) {
     64     PERFETTO_CHECK(target_chunk->payload_size() >=
     65                    num_bytes + cur_payload_size);
     66     uint8_t* target_ptr = target_chunk->payload_begin() + cur_payload_size;
     67     size_t bytes_read = 0;
     68     while (bytes_read < num_bytes) {
     69       if (cur_slice_ == buffer_slices_.end())
     70         return bytes_read;
     71 
     72       auto cur_slice_range = cur_slice_->GetUsedRange();
     73 
     74       if (cur_slice_range.size() == cur_slice_offset_) {
     75         cur_slice_offset_ = 0;
     76         cur_slice_++;
     77         continue;
     78       }
     79 
     80       size_t read_size = std::min(num_bytes - bytes_read,
     81                                   cur_slice_range.size() - cur_slice_offset_);
     82       memcpy(target_ptr + bytes_read, cur_slice_range.begin + cur_slice_offset_,
     83              read_size);
     84       cur_slice_offset_ += read_size;
     85       bytes_read += read_size;
     86 
     87       // Should have either read all of the chunk or completed reading now.
     88       PERFETTO_DCHECK(cur_slice_offset_ == cur_slice_range.size() ||
     89                       bytes_read == num_bytes);
     90     }
     91     return bytes_read;
     92   }
     93 
     94   size_t TotalUsedSize() const {
     95     size_t used_size = 0;
     96     for (const auto& slice : buffer_slices_) {
     97       used_size += slice.GetUsedRange().size();
     98     }
     99     return used_size;
    100   }
    101 
    102   bool DidReadAllData() const {
    103     if (cur_slice_ == buffer_slices_.end())
    104       return true;
    105 
    106     const auto next_slice = cur_slice_ + 1;
    107     return next_slice == buffer_slices_.end() &&
    108            cur_slice_->GetUsedRange().size() == cur_slice_offset_;
    109   }
    110 
    111  private:
    112   const std::vector<protozero::ScatteredHeapBuffer::Slice>& buffer_slices_;
    113 
    114   // Iterator pointing to slice in |buffer_slices_| that we're currently reading
    115   // from.
    116   std::vector<protozero::ScatteredHeapBuffer::Slice>::const_iterator cur_slice_;
    117   // Read offset in the current slice in bytes.
    118   size_t cur_slice_offset_ = 0;
    119 };
    120 
    121 }  // namespace
    122 
    123 StartupTraceWriter::StartupTraceWriter(
    124     std::shared_ptr<StartupTraceWriterRegistryHandle> registry_handle)
    125     : registry_handle_(std::move(registry_handle)),
    126       memory_buffer_(new protozero::ScatteredHeapBuffer()),
    127       memory_stream_writer_(
    128           new protozero::ScatteredStreamWriter(memory_buffer_.get())) {
    129   memory_buffer_->set_writer(memory_stream_writer_.get());
    130   PERFETTO_DETACH_FROM_THREAD(writer_thread_checker_);
    131 }
    132 
    133 StartupTraceWriter::StartupTraceWriter(
    134     std::unique_ptr<TraceWriter> trace_writer)
    135     : was_bound_(true), trace_writer_(std::move(trace_writer)) {}
    136 
    137 StartupTraceWriter::~StartupTraceWriter() {
    138   if (registry_handle_)
    139     registry_handle_->OnWriterDestroyed(this);
    140 }
    141 
    142 bool StartupTraceWriter::BindToArbiter(SharedMemoryArbiterImpl* arbiter,
    143                                        BufferID target_buffer) {
    144   // Create and destroy trace writer without holding lock, since this will post
    145   // a task and task posting may trigger a trace event, which would cause a
    146   // deadlock. This may create a few more trace writers than necessary in cases
    147   // where a concurrent write is in progress (other than causing some
    148   // computational overhead, this is not problematic).
    149   auto trace_writer = arbiter->CreateTraceWriter(target_buffer);
    150 
    151   {
    152     std::lock_guard<std::mutex> lock(lock_);
    153 
    154     PERFETTO_DCHECK(!trace_writer_);
    155 
    156     // Can't bind while the writer thread is writing.
    157     if (write_in_progress_)
    158       return false;
    159 
    160     // If there's a pending trace packet, it should have been completed by the
    161     // writer thread before write_in_progress_ is reset.
    162     if (cur_packet_) {
    163       PERFETTO_DCHECK(cur_packet_->is_finalized());
    164       cur_packet_.reset();
    165     }
    166 
    167     trace_writer_ = std::move(trace_writer);
    168     ChunkID next_chunk_id = CommitLocalBufferChunks(
    169         arbiter, trace_writer_->writer_id(), target_buffer);
    170 
    171     // The real TraceWriter should start writing at the subsequent chunk ID.
    172     bool success = trace_writer_->SetFirstChunkId(next_chunk_id);
    173     PERFETTO_DCHECK(success);
    174 
    175     memory_stream_writer_.reset();
    176     memory_buffer_.reset();
    177   }
    178 
    179   return true;
    180 }
    181 
    182 TraceWriter::TracePacketHandle StartupTraceWriter::NewTracePacket() {
    183   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
    184 
    185   // Check if we are already bound without grabbing the lock. This is an
    186   // optimization to avoid any locking in the common case where the proxy was
    187   // bound some time ago.
    188   if (PERFETTO_LIKELY(was_bound_)) {
    189     PERFETTO_DCHECK(!cur_packet_);
    190     PERFETTO_DCHECK(trace_writer_);
    191     return trace_writer_->NewTracePacket();
    192   }
    193 
    194   // Now grab the lock and safely check whether we are still unbound.
    195   {
    196     std::unique_lock<std::mutex> lock(lock_);
    197     if (trace_writer_) {
    198       PERFETTO_DCHECK(!cur_packet_);
    199       // Set the |was_bound_| flag to avoid locking in future calls to
    200       // NewTracePacket().
    201       was_bound_ = true;
    202       // Don't hold the lock while calling NewTracePacket() on |trace_writer_|.
    203       // This is safe because |trace_writer_| remains valid once set. It also
    204       // avoids deadlocks that may be caused by holding the lock while waiting
    205       // for a new SMB chunk in |trace_writer_|.
    206       lock.unlock();
    207       return trace_writer_->NewTracePacket();
    208     }
    209     // Not bound. Make sure it stays this way until the TracePacketHandle goes
    210     // out of scope by setting |write_in_progress_|.
    211     PERFETTO_DCHECK(!write_in_progress_);
    212     write_in_progress_ = true;
    213   }
    214 
    215   // Write to the local buffer.
    216   if (cur_packet_) {
    217     // If we hit this, the caller is calling NewTracePacket() without having
    218     // finalized the previous packet.
    219     PERFETTO_DCHECK(cur_packet_->is_finalized());
    220   } else {
    221     cur_packet_.reset(new protos::pbzero::TracePacket());
    222   }
    223   cur_packet_->Reset(memory_stream_writer_.get());
    224   TraceWriter::TracePacketHandle handle(cur_packet_.get());
    225   // |this| outlives the packet handle.
    226   handle.set_finalization_listener(this);
    227   return handle;
    228 }
    229 
    230 void StartupTraceWriter::Flush(std::function<void()> callback) {
    231   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
    232   // It's fine to check |was_bound_| instead of acquiring the lock because
    233   // |trace_writer_| will only need flushing after the first trace packet was
    234   // written to it and |was_bound_| is set.
    235   if (PERFETTO_LIKELY(was_bound_)) {
    236     PERFETTO_DCHECK(trace_writer_);
    237     return trace_writer_->Flush(std::move(callback));
    238   }
    239 
    240   // Can't flush while unbound.
    241   if (callback)
    242     callback();
    243 }
    244 
    245 WriterID StartupTraceWriter::writer_id() const {
    246   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
    247   // We can't acquire the lock because this is a const method. So we'll only
    248   // proxy to |trace_writer_| once we have written the first packet to it
    249   // instead.
    250   if (PERFETTO_LIKELY(was_bound_)) {
    251     PERFETTO_DCHECK(trace_writer_);
    252     return trace_writer_->writer_id();
    253   }
    254   return 0;
    255 }
    256 
    257 uint64_t StartupTraceWriter::written() const {
    258   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
    259   // We can't acquire the lock because this is a const method. So we'll only
    260   // proxy to |trace_writer_| once we have written the first packet to it
    261   // instead.
    262   if (PERFETTO_LIKELY(was_bound_)) {
    263     PERFETTO_DCHECK(trace_writer_);
    264     return trace_writer_->written();
    265   }
    266   return 0;
    267 }
    268 
    269 size_t StartupTraceWriter::used_buffer_size() {
    270   PERFETTO_DCHECK_THREAD(writer_thread_checker_);
    271   if (PERFETTO_LIKELY(was_bound_))
    272     return 0;
    273 
    274   std::lock_guard<std::mutex> lock(lock_);
    275   if (trace_writer_)
    276     return 0;
    277 
    278   size_t used_size = 0;
    279   memory_buffer_->AdjustUsedSizeOfCurrentSlice();
    280   for (const auto& slice : memory_buffer_->slices()) {
    281     used_size += slice.GetUsedRange().size();
    282   }
    283   return used_size;
    284 }
    285 
    286 void StartupTraceWriter::OnMessageFinalized(protozero::Message* message) {
    287   PERFETTO_DCHECK(cur_packet_.get() == message);
    288   PERFETTO_DCHECK(cur_packet_->is_finalized());
    289   // Finalize() is a no-op because the packet is already finalized.
    290   uint32_t packet_size = cur_packet_->Finalize();
    291   packet_sizes_.push_back(packet_size);
    292 
    293   // Write is complete, reset the flag to allow binding.
    294   std::lock_guard<std::mutex> lock(lock_);
    295   PERFETTO_DCHECK(write_in_progress_);
    296   write_in_progress_ = false;
    297 }
    298 
    299 ChunkID StartupTraceWriter::CommitLocalBufferChunks(
    300     SharedMemoryArbiterImpl* arbiter,
    301     WriterID writer_id,
    302     BufferID target_buffer) {
    303   // TODO(eseckler): Write and commit these chunks asynchronously. This would
    304   // require that the service is informed of the missing initial chunks, e.g. by
    305   // committing our first chunk here before the new trace writer has a chance to
    306   // commit its first chunk. Otherwise the service wouldn't know to wait for our
    307   // chunks.
    308 
    309   if (packet_sizes_.empty() || !writer_id)
    310     return 0;
    311 
    312   memory_buffer_->AdjustUsedSizeOfCurrentSlice();
    313   LocalBufferReader local_buffer_reader(memory_buffer_.get());
    314 
    315   PERFETTO_DCHECK(local_buffer_reader.TotalUsedSize() ==
    316                   std::accumulate(packet_sizes_.begin(), packet_sizes_.end(),
    317                                   static_cast<size_t>(0u)));
    318 
    319   ChunkID next_chunk_id = 0;
    320   SharedMemoryABI::Chunk cur_chunk =
    321       NewChunk(arbiter, writer_id, next_chunk_id++, false);
    322 
    323   size_t max_payload_size = cur_chunk.payload_size();
    324   size_t cur_payload_size = 0;
    325   uint16_t cur_num_packets = 0;
    326   size_t total_num_packets = packet_sizes_.size();
    327   PatchList empty_patch_list;
    328   for (size_t packet_idx = 0; packet_idx < total_num_packets; packet_idx++) {
    329     uint32_t packet_size = packet_sizes_[packet_idx];
    330     uint32_t remaining_packet_size = packet_size;
    331     ++cur_num_packets;
    332     do {
    333       uint32_t fragment_size = static_cast<uint32_t>(
    334           std::min(static_cast<size_t>(remaining_packet_size),
    335                    max_payload_size - cur_payload_size -
    336                        SharedMemoryABI::kPacketHeaderSize));
    337       // Write packet header, i.e. the fragment size.
    338       protozero::proto_utils::WriteRedundantVarInt(
    339           fragment_size, cur_chunk.payload_begin() + cur_payload_size);
    340       cur_payload_size += SharedMemoryABI::kPacketHeaderSize;
    341 
    342       // Copy packet content into the chunk.
    343       size_t bytes_read = local_buffer_reader.ReadBytes(
    344           &cur_chunk, fragment_size, cur_payload_size);
    345       PERFETTO_DCHECK(bytes_read == fragment_size);
    346 
    347       cur_payload_size += fragment_size;
    348       remaining_packet_size -= fragment_size;
    349 
    350       bool last_write =
    351           packet_idx == total_num_packets - 1 && remaining_packet_size == 0;
    352 
    353       // We should return the current chunk if we've filled its payload, reached
    354       // the maximum number of packets, or wrote everything we wanted to.
    355       bool return_chunk =
    356           cur_payload_size >=
    357               max_payload_size - SharedMemoryABI::kPacketHeaderSize ||
    358           cur_num_packets == ChunkHeader::Packets::kMaxCount || last_write;
    359 
    360       if (return_chunk) {
    361         auto new_packet_count =
    362             cur_chunk.IncreasePacketCountTo(cur_num_packets);
    363         PERFETTO_DCHECK(new_packet_count == cur_num_packets);
    364 
    365         bool is_fragmenting = remaining_packet_size > 0;
    366         if (is_fragmenting) {
    367           PERFETTO_DCHECK(cur_payload_size == max_payload_size);
    368           cur_chunk.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
    369         }
    370 
    371         arbiter->ReturnCompletedChunk(std::move(cur_chunk), target_buffer,
    372                                       &empty_patch_list);
    373 
    374         // Avoid creating a new chunk after the last write.
    375         if (!last_write) {
    376           cur_chunk =
    377               NewChunk(arbiter, writer_id, next_chunk_id++, is_fragmenting);
    378           max_payload_size = cur_chunk.payload_size();
    379           cur_payload_size = 0;
    380           cur_num_packets = is_fragmenting ? 1 : 0;
    381         } else {
    382           PERFETTO_DCHECK(!is_fragmenting);
    383         }
    384       }
    385     } while (remaining_packet_size > 0);
    386   }
    387 
    388   // The last chunk should have been returned.
    389   PERFETTO_DCHECK(!cur_chunk.is_valid());
    390   // We should have read all data from the local buffer.
    391   PERFETTO_DCHECK(local_buffer_reader.DidReadAllData());
    392 
    393   return next_chunk_id;
    394 }
    395 
    396 }  // namespace perfetto
    397