Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/tracing/core/trace_writer_impl.h"
     18 
     19 #include <string.h>
     20 
     21 #include <algorithm>
     22 #include <type_traits>
     23 #include <utility>
     24 
     25 #include "perfetto/base/logging.h"
     26 #include "perfetto/protozero/proto_utils.h"
     27 #include "src/tracing/core/shared_memory_arbiter_impl.h"
     28 
     29 #include "perfetto/trace/trace_packet.pbzero.h"
     30 
     31 using protozero::proto_utils::kMessageLengthFieldSize;
     32 using protozero::proto_utils::WriteRedundantVarInt;
     33 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader;
     34 
     35 namespace perfetto {
     36 
     37 namespace {
     38 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize;
     39 }  // namespace
     40 
     41 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter,
     42                                  WriterID id,
     43                                  BufferID target_buffer)
     44     : shmem_arbiter_(shmem_arbiter),
     45       id_(id),
     46       target_buffer_(target_buffer),
     47       protobuf_stream_writer_(this) {
     48   // TODO(primiano): we could handle the case of running out of TraceWriterID(s)
     49   // more gracefully and always return a no-op TracePacket in NewTracePacket().
     50   PERFETTO_CHECK(id_ != 0);
     51 
     52   cur_packet_.reset(new protos::pbzero::TracePacket());
     53   cur_packet_->Finalize();  // To avoid the DCHECK in NewTracePacket().
     54 }
     55 
     56 TraceWriterImpl::~TraceWriterImpl() {
     57   if (cur_chunk_.is_valid()) {
     58     cur_packet_->Finalize();
     59     Flush();
     60   }
     61   shmem_arbiter_->ReleaseWriterID(id_);
     62 }
     63 
     64 void TraceWriterImpl::Flush(std::function<void()> callback) {
     65   // Flush() cannot be called in the middle of a TracePacket.
     66   PERFETTO_CHECK(cur_packet_->is_finalized());
     67 
     68   if (cur_chunk_.is_valid()) {
     69     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
     70                                          &patch_list_);
     71     shmem_arbiter_->FlushPendingCommitDataRequests(callback);
     72   } else {
     73     PERFETTO_DCHECK(patch_list_.empty());
     74   }
     75   protobuf_stream_writer_.Reset({nullptr, nullptr});
     76 }
     77 
     78 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() {
     79   // If we hit this, the caller is calling NewTracePacket() without having
     80   // finalized the previous packet.
     81   PERFETTO_DCHECK(cur_packet_->is_finalized());
     82 
     83   fragmenting_packet_ = false;
     84 
     85   // Reserve space for the size of the message. Note: this call might re-enter
     86   // into this class invoking GetNewBuffer() if there isn't enough space or if
     87   // this is the very first call to NewTracePacket().
     88   static_assert(kPacketHeaderSize == kMessageLengthFieldSize,
     89                 "The packet header must match the Message header size");
     90 
     91   // It doesn't make sense to begin a packet that is going to fragment
     92   // immediately after (8 is just an arbitrary estimation on the minimum size of
     93   // a realistic packet).
     94   if (protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8)
     95     protobuf_stream_writer_.Reset(GetNewBuffer());
     96 
     97   cur_packet_->Reset(&protobuf_stream_writer_);
     98   uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize);
     99   memset(header, 0, kPacketHeaderSize);
    100   cur_packet_->set_size_field(header);
    101   cur_chunk_.IncrementPacketCount();
    102   TracePacketHandle handle(cur_packet_.get());
    103   cur_fragment_start_ = protobuf_stream_writer_.write_ptr();
    104   fragmenting_packet_ = true;
    105   return handle;
    106 }
    107 
    108 // Called by the Message. We can get here in two cases:
    109 // 1. In the middle of writing a Message,
    110 // when |fragmenting_packet_| == true. In this case we want to update the
    111 // chunk header with a partial packet and start a new partial packet in the
    112 // new chunk.
    113 // 2. While calling ReserveBytes() for the packet header in NewTracePacket().
    114 // In this case |fragmenting_packet_| == false and we just want a new chunk
    115 // without creating any fragments.
    116 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() {
    117   if (fragmenting_packet_) {
    118     uint8_t* const wptr = protobuf_stream_writer_.write_ptr();
    119     PERFETTO_DCHECK(wptr >= cur_fragment_start_);
    120     uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_);
    121     PERFETTO_DCHECK(partial_size < cur_chunk_.size());
    122 
    123     // Backfill the packet header with the fragment size.
    124     PERFETTO_DCHECK(partial_size > 0);
    125     cur_packet_->inc_size_already_written(partial_size);
    126     cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk);
    127     WriteRedundantVarInt(partial_size, cur_packet_->size_field());
    128 
    129     // Descend in the stack of non-finalized nested submessages (if any) and
    130     // detour their |size_field| into the |patch_list_|. At this point we have
    131     // to release the chunk and they cannot write anymore into that.
    132     // TODO(primiano): add tests to cover this logic.
    133     for (auto* nested_msg = cur_packet_->nested_message(); nested_msg;
    134          nested_msg = nested_msg->nested_message()) {
    135       uint8_t* const cur_hdr = nested_msg->size_field();
    136 
    137       // If this is false the protozero Message has already been instructed to
    138       // write, upon Finalize(), its size into the patch list.
    139       bool size_field_points_within_chunk =
    140           cur_hdr >= cur_chunk_.payload_begin() &&
    141           cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end();
    142 
    143       if (size_field_points_within_chunk) {
    144         auto offset =
    145             static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin());
    146         const ChunkID cur_chunk_id =
    147             cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed);
    148         Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset);
    149         nested_msg->set_size_field(&patch->size_field[0]);
    150       } else {
    151 #if PERFETTO_DCHECK_IS_ON()
    152         // Ensure that the size field of the message points to an element of the
    153         // patch list.
    154         auto patch_it = std::find_if(
    155             patch_list_.begin(), patch_list_.end(),
    156             [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; });
    157         PERFETTO_DCHECK(patch_it != patch_list_.end());
    158 #endif
    159       }
    160     }  // for(nested_msg
    161   }    // if(fragmenting_packet)
    162 
    163   if (cur_chunk_.is_valid()) {
    164     // ReturnCompletedChunk will consume the first patched entries from
    165     // |patch_list_| and shrink it.
    166     shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_,
    167                                          &patch_list_);
    168   }
    169 
    170   // Start a new chunk.
    171 
    172   ChunkHeader::Packets packets = {};
    173   if (fragmenting_packet_) {
    174     packets.count = 1;
    175     packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk;
    176   }
    177 
    178   // The memory order of the stores below doesn't really matter. This |header|
    179   // is just a local temporary object. The GetNewChunk() call below will copy it
    180   // into the shared buffer with the proper barriers.
    181   ChunkHeader header = {};
    182   header.writer_id.store(id_, std::memory_order_relaxed);
    183   header.chunk_id.store(next_chunk_id_++, std::memory_order_relaxed);
    184   header.packets.store(packets, std::memory_order_relaxed);
    185 
    186   cur_chunk_ = shmem_arbiter_->GetNewChunk(header);
    187   uint8_t* payload_begin = cur_chunk_.payload_begin();
    188   if (fragmenting_packet_) {
    189     cur_packet_->set_size_field(payload_begin);
    190     memset(payload_begin, 0, kPacketHeaderSize);
    191     payload_begin += kPacketHeaderSize;
    192     cur_fragment_start_ = payload_begin;
    193   }
    194 
    195   return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()};
    196 }
    197 
    198 WriterID TraceWriterImpl::writer_id() const {
    199   return id_;
    200 }
    201 
    202 // Base class ctor/dtor definition.
    203 TraceWriter::TraceWriter() = default;
    204 TraceWriter::~TraceWriter() = default;
    205 
    206 }  // namespace perfetto
    207