Home | History | Annotate | Download | only in ipc
      1 /*
      2  * Copyright (C) 2017 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 
     17 #include "src/ipc/buffered_frame_deserializer.h"
     18 
     19 #include <inttypes.h>
     20 
     21 #include <algorithm>
     22 #include <type_traits>
     23 #include <utility>
     24 
     25 #include "google/protobuf/io/zero_copy_stream_impl_lite.h"
     26 #include "perfetto/base/logging.h"
     27 #include "perfetto/base/utils.h"
     28 
     29 #include "src/ipc/wire_protocol.pb.h"
     30 
     31 namespace perfetto {
     32 namespace ipc {
     33 
     34 namespace {
     35 
     36 // The header is just the number of bytes of the Frame protobuf message.
     37 constexpr size_t kHeaderSize = sizeof(uint32_t);
     38 }  // namespace
     39 
     40 BufferedFrameDeserializer::BufferedFrameDeserializer(size_t max_capacity)
     41     : capacity_(max_capacity) {
     42   PERFETTO_CHECK(max_capacity % base::kPageSize == 0);
     43   PERFETTO_CHECK(max_capacity > base::kPageSize);
     44 }
     45 
     46 BufferedFrameDeserializer::~BufferedFrameDeserializer() = default;
     47 
     48 BufferedFrameDeserializer::ReceiveBuffer
     49 BufferedFrameDeserializer::BeginReceive() {
     50   // Upon the first recv initialize the buffer to the max message size but
     51   // release the physical memory for all but the first page. The kernel will
     52   // automatically give us physical pages back as soon as we page-fault on them.
     53   if (!buf_) {
     54     PERFETTO_DCHECK(size_ == 0);
     55     buf_ = base::PageAllocator::Allocate(capacity_);
     56 
     57     // Surely we are going to use at least the first page. There is very little
     58     // point in madvising that as well and immedately after telling the kernel
     59     // that we want it back (via recv()).
     60     int res = madvise(buf() + base::kPageSize, capacity_ - base::kPageSize,
     61                       MADV_DONTNEED);
     62     PERFETTO_DCHECK(res == 0);
     63   }
     64 
     65   PERFETTO_CHECK(capacity_ > size_);
     66   return ReceiveBuffer{buf() + size_, capacity_ - size_};
     67 }
     68 
     69 bool BufferedFrameDeserializer::EndReceive(size_t recv_size) {
     70   PERFETTO_CHECK(recv_size + size_ <= capacity_);
     71   size_ += recv_size;
     72 
     73   // At this point the contents buf_ can contain:
     74   // A) Only a fragment of the header (the size of the frame). E.g.,
     75   //    03 00 00 (the header is 4 bytes, one is missing).
     76   //
     77   // B) A header and a part of the frame. E.g.,
     78   //     05 00 00 00         11 22 33
     79   //    [ header, size=5 ]  [ Partial frame ]
     80   //
     81   // C) One or more complete header+frame. E.g.,
     82   //     05 00 00 00         11 22 33 44 55   03 00 00 00        AA BB CC
     83   //    [ header, size=5 ]  [ Whole frame ]  [ header, size=3 ] [ Whole frame ]
     84   //
     85   // D) Some complete header+frame(s) and a partial header or frame (C + A/B).
     86   //
     87   // C Is the more likely case and the one we are optimizing for. A, B, D can
     88   // happen because of the streaming nature of the socket.
     89   // The invariant of this function is that, when it returns, buf_ is either
     90   // empty (we drained all the complete frames) or starts with the header of the
     91   // next, still incomplete, frame.
     92 
     93   size_t consumed_size = 0;
     94   for (;;) {
     95     if (size_ < consumed_size + kHeaderSize)
     96       break;  // Case A, not enough data to read even the header.
     97 
     98     // Read the header into |payload_size|.
     99     uint32_t payload_size = 0;
    100     const char* rd_ptr = buf() + consumed_size;
    101     memcpy(base::AssumeLittleEndian(&payload_size), rd_ptr, kHeaderSize);
    102 
    103     // Saturate the |payload_size| to prevent overflows. The > capacity_ check
    104     // below will abort the parsing.
    105     size_t next_frame_size =
    106         std::min(static_cast<size_t>(payload_size), capacity_);
    107     next_frame_size += kHeaderSize;
    108     rd_ptr += kHeaderSize;
    109 
    110     if (size_ < consumed_size + next_frame_size) {
    111       // Case B. We got the header but not the whole frame.
    112       if (next_frame_size > capacity_) {
    113         // The caller is expected to shut down the socket and give up at this
    114         // point. If it doesn't do that and insists going on at some point it
    115         // will hit the capacity check in BeginReceive().
    116         PERFETTO_DLOG("Frame too large (size %zu)", next_frame_size);
    117         return false;
    118       }
    119       break;
    120     }
    121 
    122     // Case C. We got at least one header and whole frame.
    123     DecodeFrame(rd_ptr, payload_size);
    124     consumed_size += next_frame_size;
    125   }
    126 
    127   PERFETTO_DCHECK(consumed_size <= size_);
    128   if (consumed_size > 0) {
    129     // Shift out the consumed data from the buffer. In the typical case (C)
    130     // there is nothing to shift really, just setting size_ = 0 is enough.
    131     // Shifting is only for the (unlikely) case D.
    132     size_ -= consumed_size;
    133     if (size_ > 0) {
    134       // Case D. We consumed some frames but there is a leftover at the end of
    135       // the buffer. Shift out the consumed bytes, so that on the next round
    136       // |buf_| starts with the header of the next unconsumed frame.
    137       const char* move_begin = buf() + consumed_size;
    138       PERFETTO_CHECK(move_begin > buf());
    139       PERFETTO_CHECK(move_begin + size_ <= buf() + capacity_);
    140       memmove(buf(), move_begin, size_);
    141     }
    142     // If we just finished decoding a large frame that used more than one page,
    143     // release the extra memory in the buffer. Large frames should be quite
    144     // rare.
    145     if (consumed_size > base::kPageSize) {
    146       size_t size_rounded_up = (size_ / base::kPageSize + 1) * base::kPageSize;
    147       if (size_rounded_up < capacity_) {
    148         char* madvise_begin = buf() + size_rounded_up;
    149         const size_t madvise_size = capacity_ - size_rounded_up;
    150         PERFETTO_CHECK(madvise_begin > buf() + size_);
    151         PERFETTO_CHECK(madvise_begin + madvise_size <= buf() + capacity_);
    152         int res = madvise(madvise_begin, madvise_size, MADV_DONTNEED);
    153         PERFETTO_DCHECK(res == 0);
    154       }
    155     }
    156   }
    157   // At this point |size_| == 0 for case C, > 0 for cases A, B, D.
    158   return true;
    159 }
    160 
    161 std::unique_ptr<Frame> BufferedFrameDeserializer::PopNextFrame() {
    162   if (decoded_frames_.empty())
    163     return nullptr;
    164   std::unique_ptr<Frame> frame = std::move(decoded_frames_.front());
    165   decoded_frames_.pop_front();
    166   return frame;
    167 }
    168 
    169 void BufferedFrameDeserializer::DecodeFrame(const char* data, size_t size) {
    170   if (size == 0)
    171     return;
    172   std::unique_ptr<Frame> frame(new Frame);
    173   const int sz = static_cast<int>(size);
    174   ::google::protobuf::io::ArrayInputStream stream(data, sz);
    175   if (frame->ParseFromBoundedZeroCopyStream(&stream, sz))
    176     decoded_frames_.push_back(std::move(frame));
    177 }
    178 
    179 // static
    180 std::string BufferedFrameDeserializer::Serialize(const Frame& frame) {
    181   std::string buf;
    182   buf.reserve(1024);  // Just an educated guess to avoid trivial expansions.
    183   buf.insert(0, kHeaderSize, 0);  // Reserve the space for the header.
    184   frame.AppendToString(&buf);
    185   const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize);
    186   PERFETTO_DCHECK(payload_size == static_cast<uint32_t>(frame.GetCachedSize()));
    187   // Don't send messages larger than what the receiver can handle.
    188   PERFETTO_DCHECK(kHeaderSize + payload_size <= kIPCBufferSize);
    189   char header[kHeaderSize];
    190   memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize);
    191   buf.replace(0, kHeaderSize, header, kHeaderSize);
    192   return buf;
    193 }
    194 
    195 }  // namespace ipc
    196 }  // namespace perfetto
    197