1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "src/tracing/core/shared_memory_arbiter_impl.h" 18 19 #include "perfetto/base/logging.h" 20 #include "perfetto/base/task_runner.h" 21 #include "perfetto/tracing/core/commit_data_request.h" 22 #include "perfetto/tracing/core/shared_memory.h" 23 #include "src/tracing/core/null_trace_writer.h" 24 #include "src/tracing/core/trace_writer_impl.h" 25 26 #include <limits> 27 #include <utility> 28 29 namespace perfetto { 30 31 using Chunk = SharedMemoryABI::Chunk; 32 33 // static 34 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout = 35 SharedMemoryABI::PageLayout::kPageDiv1; 36 37 // static 38 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance( 39 SharedMemory* shared_memory, 40 size_t page_size, 41 Service::ProducerEndpoint* producer_endpoint, 42 base::TaskRunner* task_runner) { 43 return std::unique_ptr<SharedMemoryArbiterImpl>( 44 new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(), 45 page_size, producer_endpoint, task_runner)); 46 } 47 48 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl( 49 void* start, 50 size_t size, 51 size_t page_size, 52 Service::ProducerEndpoint* producer_endpoint, 53 base::TaskRunner* task_runner) 54 : task_runner_(task_runner), 55 producer_endpoint_(producer_endpoint), 56 shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size), 57 active_writer_ids_(kMaxWriterID), 58 weak_ptr_factory_(this) {} 59 60 Chunk SharedMemoryArbiterImpl::GetNewChunk( 61 const SharedMemoryABI::ChunkHeader& header, 62 size_t size_hint) { 63 PERFETTO_DCHECK(size_hint == 0); // Not implemented yet. 64 int stall_count = 0; 65 useconds_t stall_interval_us = 0; 66 static const useconds_t kMaxStallIntervalUs = 100000; 67 static const int kLogAfterNStalls = 3; 68 69 for (;;) { 70 // TODO(primiano): Probably this lock is not really required and this code 71 // could be rewritten leveraging only the Try* atomic operations in 72 // SharedMemoryABI. But let's not be too adventurous for the moment. 73 { 74 std::lock_guard<std::mutex> scoped_lock(lock_); 75 const size_t initial_page_idx = page_idx_; 76 for (size_t i = 0; i < shmem_abi_.num_pages(); i++) { 77 page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages(); 78 bool is_new_page = false; 79 80 // TODO(primiano): make the page layout dynamic. 81 auto layout = SharedMemoryArbiterImpl::default_page_layout; 82 83 if (shmem_abi_.is_page_free(page_idx_)) { 84 // TODO(primiano): Use the |size_hint| here to decide the layout. 85 is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout); 86 } 87 uint32_t free_chunks; 88 if (is_new_page) { 89 free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1; 90 } else { 91 free_chunks = shmem_abi_.GetFreeChunks(page_idx_); 92 } 93 94 for (uint32_t chunk_idx = 0; free_chunks; 95 chunk_idx++, free_chunks >>= 1) { 96 if (!(free_chunks & 1)) 97 continue; 98 // We found a free chunk. 99 Chunk chunk = shmem_abi_.TryAcquireChunkForWriting( 100 page_idx_, chunk_idx, &header); 101 if (!chunk.is_valid()) 102 continue; 103 if (stall_count > kLogAfterNStalls) { 104 PERFETTO_LOG("Recovered from stall after %d iterations", 105 stall_count); 106 } 107 return chunk; 108 } 109 } 110 } // std::lock_guard<std::mutex> 111 112 // All chunks are taken (either kBeingWritten by us or kBeingRead by the 113 // Service). TODO: at this point we should return a bankrupcy chunk, not 114 // crash the process. 115 if (stall_count++ == kLogAfterNStalls) { 116 PERFETTO_ELOG("Shared memory buffer overrun! Stalling"); 117 118 // TODO(primiano): sending the IPC synchronously is a temporary workaround 119 // until the backpressure logic in probes_producer is sorted out. Until 120 // then the risk is that we stall the message loop waiting for the 121 // tracing service to consume the shared memory buffer (SMB) and, for 122 // this reason, never run the task that tells the service to purge the 123 // SMB. 124 FlushPendingCommitDataRequests(); 125 } 126 usleep(stall_interval_us); 127 stall_interval_us = 128 std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8); 129 } 130 } 131 132 void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk, 133 BufferID target_buffer, 134 PatchList* patch_list) { 135 bool should_post_callback = false; 136 bool should_commit_synchronously = false; 137 base::WeakPtr<SharedMemoryArbiterImpl> weak_this; 138 { 139 std::lock_guard<std::mutex> scoped_lock(lock_); 140 uint8_t chunk_idx = chunk.chunk_idx(); 141 const WriterID writer_id = chunk.writer_id(); 142 bytes_pending_commit_ += chunk.size(); 143 size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk)); 144 145 // DO NOT access |chunk| after this point, has been std::move()-d above. 146 147 if (!commit_data_req_) { 148 commit_data_req_.reset(new CommitDataRequest()); 149 weak_this = weak_ptr_factory_.GetWeakPtr(); 150 should_post_callback = true; 151 } 152 CommitDataRequest::ChunksToMove* ctm = 153 commit_data_req_->add_chunks_to_move(); 154 ctm->set_page(static_cast<uint32_t>(page_idx)); 155 ctm->set_chunk(chunk_idx); 156 ctm->set_target_buffer(target_buffer); 157 158 // If more than half of the SMB.size() is filled with completed chunks for 159 // which we haven't notified the service yet (i.e. they are still enqueued 160 // in |commit_data_req_|), force a synchronous CommitDataRequest(), to 161 // reduce the likeliness of stalling the writer. 162 if (bytes_pending_commit_ >= shmem_abi_.size() / 2) { 163 should_commit_synchronously = true; 164 should_post_callback = false; 165 } 166 167 // Get the patches completed for the previous chunk from the |patch_list| 168 // and update it. 169 ChunkID last_chunk_id = 0; // 0 is irrelevant but keeps the compiler happy. 170 CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr; 171 while (!patch_list->empty() && patch_list->front().is_patched()) { 172 if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) { 173 last_chunk_req = commit_data_req_->add_chunks_to_patch(); 174 last_chunk_req->set_writer_id(writer_id); 175 last_chunk_id = patch_list->front().chunk_id; 176 last_chunk_req->set_chunk_id(last_chunk_id); 177 last_chunk_req->set_target_buffer(target_buffer); 178 } 179 auto* patch_req = last_chunk_req->add_patches(); 180 patch_req->set_offset(patch_list->front().offset); 181 patch_req->set_data(&patch_list->front().size_field[0], 182 patch_list->front().size_field.size()); 183 patch_list->pop_front(); 184 } 185 // Patches are enqueued in the |patch_list| in order and are notified to 186 // the service when the chunk is returned. The only case when the current 187 // patch list is incomplete is if there is an unpatched entry at the head of 188 // the |patch_list| that belongs to the same ChunkID as the last one we are 189 // about to send to the service. 190 if (last_chunk_req && !patch_list->empty() && 191 patch_list->front().chunk_id == last_chunk_id) { 192 last_chunk_req->set_has_more_patches(true); 193 } 194 } // scoped_lock(lock_) 195 196 if (should_post_callback) { 197 PERFETTO_DCHECK(weak_this); 198 task_runner_->PostTask([weak_this] { 199 if (weak_this) 200 weak_this->FlushPendingCommitDataRequests(); 201 }); 202 } 203 204 if (should_commit_synchronously) 205 FlushPendingCommitDataRequests(); 206 } 207 208 // TODO(primiano): this is wrong w.r.t. threading because it will try to send 209 // an IPC from a different thread than the IPC thread. Right now this works 210 // because everything is single threaded. It will hit the thread checker 211 // otherwise. What we really want to do here is doing this sync IPC only if 212 // task_runner_.RunsTaskOnCurrentThread(), otherwise PostTask(). 213 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests( 214 std::function<void()> callback) { 215 PERFETTO_DCHECK_THREAD(thread_checker_); 216 217 std::unique_ptr<CommitDataRequest> req; 218 { 219 std::lock_guard<std::mutex> scoped_lock(lock_); 220 req = std::move(commit_data_req_); 221 bytes_pending_commit_ = 0; 222 } 223 // |commit_data_req_| could become nullptr if the forced sync flush happens 224 // in GetNewChunk(). 225 if (req) { 226 producer_endpoint_->CommitData(*req, callback); 227 } else if (callback) { 228 // If |commit_data_req_| was nullptr, it means that an enqueued deferred 229 // commit was executed just before this. At this point send an empty commit 230 // request to the service, just to linearize with it and give the guarantee 231 // to the caller that the data has been flushed into the service. 232 producer_endpoint_->CommitData(CommitDataRequest(), callback); 233 } 234 } 235 236 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter( 237 BufferID target_buffer) { 238 WriterID id; 239 { 240 std::lock_guard<std::mutex> scoped_lock(lock_); 241 id = active_writer_ids_.Allocate(); 242 } 243 if (!id) 244 return std::unique_ptr<TraceWriter>(new NullTraceWriter()); 245 return std::unique_ptr<TraceWriter>( 246 new TraceWriterImpl(this, id, target_buffer)); 247 } 248 249 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) { 250 bool should_post_commit_task = false; 251 { 252 std::lock_guard<std::mutex> scoped_lock(lock_); 253 // If a commit_data_req_ exists it means that somebody else already posted a 254 // FlushPendingCommitDataRequests() task. 255 if (!commit_data_req_) { 256 commit_data_req_.reset(new CommitDataRequest()); 257 should_post_commit_task = true; 258 } else { 259 // If there is another request queued and that also contains is a reply 260 // to a flush request, reply with the highest id. 261 req_id = std::max(req_id, commit_data_req_->flush_request_id()); 262 } 263 commit_data_req_->set_flush_request_id(req_id); 264 } 265 if (should_post_commit_task) { 266 auto weak_this = weak_ptr_factory_.GetWeakPtr(); 267 task_runner_->PostTask([weak_this] { 268 if (weak_this) 269 weak_this->FlushPendingCommitDataRequests(); 270 }); 271 } 272 } 273 274 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) { 275 std::lock_guard<std::mutex> scoped_lock(lock_); 276 active_writer_ids_.Free(id); 277 } 278 279 } // namespace perfetto 280