Home | History | Annotate | Download | only in core
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/tracing/core/shared_memory_arbiter_impl.h"
     18 
     19 #include "perfetto/base/logging.h"
     20 #include "perfetto/base/task_runner.h"
     21 #include "perfetto/tracing/core/commit_data_request.h"
     22 #include "perfetto/tracing/core/shared_memory.h"
     23 #include "src/tracing/core/null_trace_writer.h"
     24 #include "src/tracing/core/trace_writer_impl.h"
     25 
     26 #include <limits>
     27 #include <utility>
     28 
     29 namespace perfetto {
     30 
     31 using Chunk = SharedMemoryABI::Chunk;
     32 
     33 // static
     34 SharedMemoryABI::PageLayout SharedMemoryArbiterImpl::default_page_layout =
     35     SharedMemoryABI::PageLayout::kPageDiv1;
     36 
     37 // static
     38 std::unique_ptr<SharedMemoryArbiter> SharedMemoryArbiter::CreateInstance(
     39     SharedMemory* shared_memory,
     40     size_t page_size,
     41     Service::ProducerEndpoint* producer_endpoint,
     42     base::TaskRunner* task_runner) {
     43   return std::unique_ptr<SharedMemoryArbiterImpl>(
     44       new SharedMemoryArbiterImpl(shared_memory->start(), shared_memory->size(),
     45                                   page_size, producer_endpoint, task_runner));
     46 }
     47 
     48 SharedMemoryArbiterImpl::SharedMemoryArbiterImpl(
     49     void* start,
     50     size_t size,
     51     size_t page_size,
     52     Service::ProducerEndpoint* producer_endpoint,
     53     base::TaskRunner* task_runner)
     54     : task_runner_(task_runner),
     55       producer_endpoint_(producer_endpoint),
     56       shmem_abi_(reinterpret_cast<uint8_t*>(start), size, page_size),
     57       active_writer_ids_(kMaxWriterID),
     58       weak_ptr_factory_(this) {}
     59 
     60 Chunk SharedMemoryArbiterImpl::GetNewChunk(
     61     const SharedMemoryABI::ChunkHeader& header,
     62     size_t size_hint) {
     63   PERFETTO_DCHECK(size_hint == 0);  // Not implemented yet.
     64   int stall_count = 0;
     65   useconds_t stall_interval_us = 0;
     66   static const useconds_t kMaxStallIntervalUs = 100000;
     67   static const int kLogAfterNStalls = 3;
     68 
     69   for (;;) {
     70     // TODO(primiano): Probably this lock is not really required and this code
     71     // could be rewritten leveraging only the Try* atomic operations in
     72     // SharedMemoryABI. But let's not be too adventurous for the moment.
     73     {
     74       std::lock_guard<std::mutex> scoped_lock(lock_);
     75       const size_t initial_page_idx = page_idx_;
     76       for (size_t i = 0; i < shmem_abi_.num_pages(); i++) {
     77         page_idx_ = (initial_page_idx + i) % shmem_abi_.num_pages();
     78         bool is_new_page = false;
     79 
     80         // TODO(primiano): make the page layout dynamic.
     81         auto layout = SharedMemoryArbiterImpl::default_page_layout;
     82 
     83         if (shmem_abi_.is_page_free(page_idx_)) {
     84           // TODO(primiano): Use the |size_hint| here to decide the layout.
     85           is_new_page = shmem_abi_.TryPartitionPage(page_idx_, layout);
     86         }
     87         uint32_t free_chunks;
     88         if (is_new_page) {
     89           free_chunks = (1 << SharedMemoryABI::kNumChunksForLayout[layout]) - 1;
     90         } else {
     91           free_chunks = shmem_abi_.GetFreeChunks(page_idx_);
     92         }
     93 
     94         for (uint32_t chunk_idx = 0; free_chunks;
     95              chunk_idx++, free_chunks >>= 1) {
     96           if (!(free_chunks & 1))
     97             continue;
     98           // We found a free chunk.
     99           Chunk chunk = shmem_abi_.TryAcquireChunkForWriting(
    100               page_idx_, chunk_idx, &header);
    101           if (!chunk.is_valid())
    102             continue;
    103           if (stall_count > kLogAfterNStalls) {
    104             PERFETTO_LOG("Recovered from stall after %d iterations",
    105                          stall_count);
    106           }
    107           return chunk;
    108         }
    109       }
    110     }  // std::lock_guard<std::mutex>
    111 
    112     // All chunks are taken (either kBeingWritten by us or kBeingRead by the
    113     // Service). TODO: at this point we should return a bankrupcy chunk, not
    114     // crash the process.
    115     if (stall_count++ == kLogAfterNStalls) {
    116       PERFETTO_ELOG("Shared memory buffer overrun! Stalling");
    117 
    118       // TODO(primiano): sending the IPC synchronously is a temporary workaround
    119       // until the backpressure logic in probes_producer is sorted out. Until
    120       // then the risk is that we stall the message loop waiting for the
    121       // tracing service to  consume the shared memory buffer (SMB) and, for
    122       // this reason, never run the task that tells the service to purge the
    123       // SMB.
    124       FlushPendingCommitDataRequests();
    125     }
    126     usleep(stall_interval_us);
    127     stall_interval_us =
    128         std::min(kMaxStallIntervalUs, (stall_interval_us + 1) * 8);
    129   }
    130 }
    131 
    132 void SharedMemoryArbiterImpl::ReturnCompletedChunk(Chunk chunk,
    133                                                    BufferID target_buffer,
    134                                                    PatchList* patch_list) {
    135   bool should_post_callback = false;
    136   bool should_commit_synchronously = false;
    137   base::WeakPtr<SharedMemoryArbiterImpl> weak_this;
    138   {
    139     std::lock_guard<std::mutex> scoped_lock(lock_);
    140     uint8_t chunk_idx = chunk.chunk_idx();
    141     const WriterID writer_id = chunk.writer_id();
    142     bytes_pending_commit_ += chunk.size();
    143     size_t page_idx = shmem_abi_.ReleaseChunkAsComplete(std::move(chunk));
    144 
    145     // DO NOT access |chunk| after this point, has been std::move()-d above.
    146 
    147     if (!commit_data_req_) {
    148       commit_data_req_.reset(new CommitDataRequest());
    149       weak_this = weak_ptr_factory_.GetWeakPtr();
    150       should_post_callback = true;
    151     }
    152     CommitDataRequest::ChunksToMove* ctm =
    153         commit_data_req_->add_chunks_to_move();
    154     ctm->set_page(static_cast<uint32_t>(page_idx));
    155     ctm->set_chunk(chunk_idx);
    156     ctm->set_target_buffer(target_buffer);
    157 
    158     // If more than half of the SMB.size() is filled with completed chunks for
    159     // which we haven't notified the service yet (i.e. they are still enqueued
    160     // in |commit_data_req_|), force a synchronous CommitDataRequest(), to
    161     // reduce the likeliness of stalling the writer.
    162     if (bytes_pending_commit_ >= shmem_abi_.size() / 2) {
    163       should_commit_synchronously = true;
    164       should_post_callback = false;
    165     }
    166 
    167     // Get the patches completed for the previous chunk from the |patch_list|
    168     // and update it.
    169     ChunkID last_chunk_id = 0;  // 0 is irrelevant but keeps the compiler happy.
    170     CommitDataRequest::ChunkToPatch* last_chunk_req = nullptr;
    171     while (!patch_list->empty() && patch_list->front().is_patched()) {
    172       if (!last_chunk_req || last_chunk_id != patch_list->front().chunk_id) {
    173         last_chunk_req = commit_data_req_->add_chunks_to_patch();
    174         last_chunk_req->set_writer_id(writer_id);
    175         last_chunk_id = patch_list->front().chunk_id;
    176         last_chunk_req->set_chunk_id(last_chunk_id);
    177         last_chunk_req->set_target_buffer(target_buffer);
    178       }
    179       auto* patch_req = last_chunk_req->add_patches();
    180       patch_req->set_offset(patch_list->front().offset);
    181       patch_req->set_data(&patch_list->front().size_field[0],
    182                           patch_list->front().size_field.size());
    183       patch_list->pop_front();
    184     }
    185     // Patches are enqueued in the |patch_list| in order and are notified to
    186     // the service when the chunk is returned. The only case when the current
    187     // patch list is incomplete is if there is an unpatched entry at the head of
    188     // the |patch_list| that belongs to the same ChunkID as the last one we are
    189     // about to send to the service.
    190     if (last_chunk_req && !patch_list->empty() &&
    191         patch_list->front().chunk_id == last_chunk_id) {
    192       last_chunk_req->set_has_more_patches(true);
    193     }
    194   }  // scoped_lock(lock_)
    195 
    196   if (should_post_callback) {
    197     PERFETTO_DCHECK(weak_this);
    198     task_runner_->PostTask([weak_this] {
    199       if (weak_this)
    200         weak_this->FlushPendingCommitDataRequests();
    201     });
    202   }
    203 
    204   if (should_commit_synchronously)
    205     FlushPendingCommitDataRequests();
    206 }
    207 
    208 // TODO(primiano): this is wrong w.r.t. threading because it will try to send
    209 // an IPC from a different thread than the IPC thread. Right now this works
    210 // because everything is single threaded. It will hit the thread checker
    211 // otherwise. What we really want to do here is doing this sync IPC only if
    212 // task_runner_.RunsTaskOnCurrentThread(), otherwise PostTask().
    213 void SharedMemoryArbiterImpl::FlushPendingCommitDataRequests(
    214     std::function<void()> callback) {
    215   PERFETTO_DCHECK_THREAD(thread_checker_);
    216 
    217   std::unique_ptr<CommitDataRequest> req;
    218   {
    219     std::lock_guard<std::mutex> scoped_lock(lock_);
    220     req = std::move(commit_data_req_);
    221     bytes_pending_commit_ = 0;
    222   }
    223   // |commit_data_req_| could become nullptr if the forced sync flush happens
    224   // in GetNewChunk().
    225   if (req) {
    226     producer_endpoint_->CommitData(*req, callback);
    227   } else if (callback) {
    228     // If |commit_data_req_| was nullptr, it means that an enqueued deferred
    229     // commit was executed just before this. At this point send an empty commit
    230     // request to the service, just to linearize with it and give the guarantee
    231     // to the caller that the data has been flushed into the service.
    232     producer_endpoint_->CommitData(CommitDataRequest(), callback);
    233   }
    234 }
    235 
    236 std::unique_ptr<TraceWriter> SharedMemoryArbiterImpl::CreateTraceWriter(
    237     BufferID target_buffer) {
    238   WriterID id;
    239   {
    240     std::lock_guard<std::mutex> scoped_lock(lock_);
    241     id = active_writer_ids_.Allocate();
    242   }
    243   if (!id)
    244     return std::unique_ptr<TraceWriter>(new NullTraceWriter());
    245   return std::unique_ptr<TraceWriter>(
    246       new TraceWriterImpl(this, id, target_buffer));
    247 }
    248 
    249 void SharedMemoryArbiterImpl::NotifyFlushComplete(FlushRequestID req_id) {
    250   bool should_post_commit_task = false;
    251   {
    252     std::lock_guard<std::mutex> scoped_lock(lock_);
    253     // If a commit_data_req_ exists it means that somebody else already posted a
    254     // FlushPendingCommitDataRequests() task.
    255     if (!commit_data_req_) {
    256       commit_data_req_.reset(new CommitDataRequest());
    257       should_post_commit_task = true;
    258     } else {
    259       // If there is another request queued and that also contains is a reply
    260       // to a flush request, reply with the highest id.
    261       req_id = std::max(req_id, commit_data_req_->flush_request_id());
    262     }
    263     commit_data_req_->set_flush_request_id(req_id);
    264   }
    265   if (should_post_commit_task) {
    266     auto weak_this = weak_ptr_factory_.GetWeakPtr();
    267     task_runner_->PostTask([weak_this] {
    268       if (weak_this)
    269         weak_this->FlushPendingCommitDataRequests();
    270     });
    271   }
    272 }
    273 
    274 void SharedMemoryArbiterImpl::ReleaseWriterID(WriterID id) {
    275   std::lock_guard<std::mutex> scoped_lock(lock_);
    276   active_writer_ids_.Free(id);
    277 }
    278 
    279 }  // namespace perfetto
    280