1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "src/tracing/core/trace_writer_impl.h" 18 19 #include <string.h> 20 21 #include <algorithm> 22 #include <type_traits> 23 #include <utility> 24 25 #include "perfetto/base/logging.h" 26 #include "perfetto/protozero/proto_utils.h" 27 #include "src/tracing/core/shared_memory_arbiter_impl.h" 28 29 #include "perfetto/trace/trace_packet.pbzero.h" 30 31 using protozero::proto_utils::kMessageLengthFieldSize; 32 using protozero::proto_utils::WriteRedundantVarInt; 33 using ChunkHeader = perfetto::SharedMemoryABI::ChunkHeader; 34 35 namespace perfetto { 36 37 namespace { 38 constexpr size_t kPacketHeaderSize = SharedMemoryABI::kPacketHeaderSize; 39 } // namespace 40 41 TraceWriterImpl::TraceWriterImpl(SharedMemoryArbiterImpl* shmem_arbiter, 42 WriterID id, 43 BufferID target_buffer) 44 : shmem_arbiter_(shmem_arbiter), 45 id_(id), 46 target_buffer_(target_buffer), 47 protobuf_stream_writer_(this) { 48 // TODO(primiano): we could handle the case of running out of TraceWriterID(s) 49 // more gracefully and always return a no-op TracePacket in NewTracePacket(). 50 PERFETTO_CHECK(id_ != 0); 51 52 cur_packet_.reset(new protos::pbzero::TracePacket()); 53 cur_packet_->Finalize(); // To avoid the DCHECK in NewTracePacket(). 54 } 55 56 TraceWriterImpl::~TraceWriterImpl() { 57 if (cur_chunk_.is_valid()) { 58 cur_packet_->Finalize(); 59 Flush(); 60 } 61 shmem_arbiter_->ReleaseWriterID(id_); 62 } 63 64 void TraceWriterImpl::Flush(std::function<void()> callback) { 65 // Flush() cannot be called in the middle of a TracePacket. 66 PERFETTO_CHECK(cur_packet_->is_finalized()); 67 68 if (cur_chunk_.is_valid()) { 69 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_, 70 &patch_list_); 71 shmem_arbiter_->FlushPendingCommitDataRequests(callback); 72 } else { 73 PERFETTO_DCHECK(patch_list_.empty()); 74 } 75 protobuf_stream_writer_.Reset({nullptr, nullptr}); 76 } 77 78 TraceWriterImpl::TracePacketHandle TraceWriterImpl::NewTracePacket() { 79 // If we hit this, the caller is calling NewTracePacket() without having 80 // finalized the previous packet. 81 PERFETTO_DCHECK(cur_packet_->is_finalized()); 82 83 fragmenting_packet_ = false; 84 85 // Reserve space for the size of the message. Note: this call might re-enter 86 // into this class invoking GetNewBuffer() if there isn't enough space or if 87 // this is the very first call to NewTracePacket(). 88 static_assert(kPacketHeaderSize == kMessageLengthFieldSize, 89 "The packet header must match the Message header size"); 90 91 // It doesn't make sense to begin a packet that is going to fragment 92 // immediately after (8 is just an arbitrary estimation on the minimum size of 93 // a realistic packet). 94 if (protobuf_stream_writer_.bytes_available() < kPacketHeaderSize + 8) 95 protobuf_stream_writer_.Reset(GetNewBuffer()); 96 97 cur_packet_->Reset(&protobuf_stream_writer_); 98 uint8_t* header = protobuf_stream_writer_.ReserveBytes(kPacketHeaderSize); 99 memset(header, 0, kPacketHeaderSize); 100 cur_packet_->set_size_field(header); 101 cur_chunk_.IncrementPacketCount(); 102 TracePacketHandle handle(cur_packet_.get()); 103 cur_fragment_start_ = protobuf_stream_writer_.write_ptr(); 104 fragmenting_packet_ = true; 105 return handle; 106 } 107 108 // Called by the Message. We can get here in two cases: 109 // 1. In the middle of writing a Message, 110 // when |fragmenting_packet_| == true. In this case we want to update the 111 // chunk header with a partial packet and start a new partial packet in the 112 // new chunk. 113 // 2. While calling ReserveBytes() for the packet header in NewTracePacket(). 114 // In this case |fragmenting_packet_| == false and we just want a new chunk 115 // without creating any fragments. 116 protozero::ContiguousMemoryRange TraceWriterImpl::GetNewBuffer() { 117 if (fragmenting_packet_) { 118 uint8_t* const wptr = protobuf_stream_writer_.write_ptr(); 119 PERFETTO_DCHECK(wptr >= cur_fragment_start_); 120 uint32_t partial_size = static_cast<uint32_t>(wptr - cur_fragment_start_); 121 PERFETTO_DCHECK(partial_size < cur_chunk_.size()); 122 123 // Backfill the packet header with the fragment size. 124 PERFETTO_DCHECK(partial_size > 0); 125 cur_packet_->inc_size_already_written(partial_size); 126 cur_chunk_.SetFlag(ChunkHeader::kLastPacketContinuesOnNextChunk); 127 WriteRedundantVarInt(partial_size, cur_packet_->size_field()); 128 129 // Descend in the stack of non-finalized nested submessages (if any) and 130 // detour their |size_field| into the |patch_list_|. At this point we have 131 // to release the chunk and they cannot write anymore into that. 132 // TODO(primiano): add tests to cover this logic. 133 for (auto* nested_msg = cur_packet_->nested_message(); nested_msg; 134 nested_msg = nested_msg->nested_message()) { 135 uint8_t* const cur_hdr = nested_msg->size_field(); 136 137 // If this is false the protozero Message has already been instructed to 138 // write, upon Finalize(), its size into the patch list. 139 bool size_field_points_within_chunk = 140 cur_hdr >= cur_chunk_.payload_begin() && 141 cur_hdr + kMessageLengthFieldSize <= cur_chunk_.end(); 142 143 if (size_field_points_within_chunk) { 144 auto offset = 145 static_cast<uint16_t>(cur_hdr - cur_chunk_.payload_begin()); 146 const ChunkID cur_chunk_id = 147 cur_chunk_.header()->chunk_id.load(std::memory_order_relaxed); 148 Patch* patch = patch_list_.emplace_back(cur_chunk_id, offset); 149 nested_msg->set_size_field(&patch->size_field[0]); 150 } else { 151 #if PERFETTO_DCHECK_IS_ON() 152 // Ensure that the size field of the message points to an element of the 153 // patch list. 154 auto patch_it = std::find_if( 155 patch_list_.begin(), patch_list_.end(), 156 [cur_hdr](const Patch& p) { return &p.size_field[0] == cur_hdr; }); 157 PERFETTO_DCHECK(patch_it != patch_list_.end()); 158 #endif 159 } 160 } // for(nested_msg 161 } // if(fragmenting_packet) 162 163 if (cur_chunk_.is_valid()) { 164 // ReturnCompletedChunk will consume the first patched entries from 165 // |patch_list_| and shrink it. 166 shmem_arbiter_->ReturnCompletedChunk(std::move(cur_chunk_), target_buffer_, 167 &patch_list_); 168 } 169 170 // Start a new chunk. 171 172 ChunkHeader::Packets packets = {}; 173 if (fragmenting_packet_) { 174 packets.count = 1; 175 packets.flags = ChunkHeader::kFirstPacketContinuesFromPrevChunk; 176 } 177 178 // The memory order of the stores below doesn't really matter. This |header| 179 // is just a local temporary object. The GetNewChunk() call below will copy it 180 // into the shared buffer with the proper barriers. 181 ChunkHeader header = {}; 182 header.writer_id.store(id_, std::memory_order_relaxed); 183 header.chunk_id.store(next_chunk_id_++, std::memory_order_relaxed); 184 header.packets.store(packets, std::memory_order_relaxed); 185 186 cur_chunk_ = shmem_arbiter_->GetNewChunk(header); 187 uint8_t* payload_begin = cur_chunk_.payload_begin(); 188 if (fragmenting_packet_) { 189 cur_packet_->set_size_field(payload_begin); 190 memset(payload_begin, 0, kPacketHeaderSize); 191 payload_begin += kPacketHeaderSize; 192 cur_fragment_start_ = payload_begin; 193 } 194 195 return protozero::ContiguousMemoryRange{payload_begin, cur_chunk_.end()}; 196 } 197 198 WriterID TraceWriterImpl::writer_id() const { 199 return id_; 200 } 201 202 // Base class ctor/dtor definition. 203 TraceWriter::TraceWriter() = default; 204 TraceWriter::~TraceWriter() = default; 205 206 } // namespace perfetto 207