1 /* 2 * Copyright (C) 2017 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 #include "src/ipc/buffered_frame_deserializer.h" 18 19 #include <inttypes.h> 20 21 #include <algorithm> 22 #include <type_traits> 23 #include <utility> 24 25 #include "google/protobuf/io/zero_copy_stream_impl_lite.h" 26 #include "perfetto/base/logging.h" 27 #include "perfetto/base/utils.h" 28 29 #include "src/ipc/wire_protocol.pb.h" 30 31 namespace perfetto { 32 namespace ipc { 33 34 namespace { 35 36 // The header is just the number of bytes of the Frame protobuf message. 37 constexpr size_t kHeaderSize = sizeof(uint32_t); 38 } // namespace 39 40 BufferedFrameDeserializer::BufferedFrameDeserializer(size_t max_capacity) 41 : capacity_(max_capacity) { 42 PERFETTO_CHECK(max_capacity % base::kPageSize == 0); 43 PERFETTO_CHECK(max_capacity > base::kPageSize); 44 } 45 46 BufferedFrameDeserializer::~BufferedFrameDeserializer() = default; 47 48 BufferedFrameDeserializer::ReceiveBuffer 49 BufferedFrameDeserializer::BeginReceive() { 50 // Upon the first recv initialize the buffer to the max message size but 51 // release the physical memory for all but the first page. The kernel will 52 // automatically give us physical pages back as soon as we page-fault on them. 53 if (!buf_) { 54 PERFETTO_DCHECK(size_ == 0); 55 buf_ = base::PageAllocator::Allocate(capacity_); 56 57 // Surely we are going to use at least the first page. There is very little 58 // point in madvising that as well and immedately after telling the kernel 59 // that we want it back (via recv()). 60 int res = madvise(buf() + base::kPageSize, capacity_ - base::kPageSize, 61 MADV_DONTNEED); 62 PERFETTO_DCHECK(res == 0); 63 } 64 65 PERFETTO_CHECK(capacity_ > size_); 66 return ReceiveBuffer{buf() + size_, capacity_ - size_}; 67 } 68 69 bool BufferedFrameDeserializer::EndReceive(size_t recv_size) { 70 PERFETTO_CHECK(recv_size + size_ <= capacity_); 71 size_ += recv_size; 72 73 // At this point the contents buf_ can contain: 74 // A) Only a fragment of the header (the size of the frame). E.g., 75 // 03 00 00 (the header is 4 bytes, one is missing). 76 // 77 // B) A header and a part of the frame. E.g., 78 // 05 00 00 00 11 22 33 79 // [ header, size=5 ] [ Partial frame ] 80 // 81 // C) One or more complete header+frame. E.g., 82 // 05 00 00 00 11 22 33 44 55 03 00 00 00 AA BB CC 83 // [ header, size=5 ] [ Whole frame ] [ header, size=3 ] [ Whole frame ] 84 // 85 // D) Some complete header+frame(s) and a partial header or frame (C + A/B). 86 // 87 // C Is the more likely case and the one we are optimizing for. A, B, D can 88 // happen because of the streaming nature of the socket. 89 // The invariant of this function is that, when it returns, buf_ is either 90 // empty (we drained all the complete frames) or starts with the header of the 91 // next, still incomplete, frame. 92 93 size_t consumed_size = 0; 94 for (;;) { 95 if (size_ < consumed_size + kHeaderSize) 96 break; // Case A, not enough data to read even the header. 97 98 // Read the header into |payload_size|. 99 uint32_t payload_size = 0; 100 const char* rd_ptr = buf() + consumed_size; 101 memcpy(base::AssumeLittleEndian(&payload_size), rd_ptr, kHeaderSize); 102 103 // Saturate the |payload_size| to prevent overflows. The > capacity_ check 104 // below will abort the parsing. 105 size_t next_frame_size = 106 std::min(static_cast<size_t>(payload_size), capacity_); 107 next_frame_size += kHeaderSize; 108 rd_ptr += kHeaderSize; 109 110 if (size_ < consumed_size + next_frame_size) { 111 // Case B. We got the header but not the whole frame. 112 if (next_frame_size > capacity_) { 113 // The caller is expected to shut down the socket and give up at this 114 // point. If it doesn't do that and insists going on at some point it 115 // will hit the capacity check in BeginReceive(). 116 PERFETTO_DLOG("Frame too large (size %zu)", next_frame_size); 117 return false; 118 } 119 break; 120 } 121 122 // Case C. We got at least one header and whole frame. 123 DecodeFrame(rd_ptr, payload_size); 124 consumed_size += next_frame_size; 125 } 126 127 PERFETTO_DCHECK(consumed_size <= size_); 128 if (consumed_size > 0) { 129 // Shift out the consumed data from the buffer. In the typical case (C) 130 // there is nothing to shift really, just setting size_ = 0 is enough. 131 // Shifting is only for the (unlikely) case D. 132 size_ -= consumed_size; 133 if (size_ > 0) { 134 // Case D. We consumed some frames but there is a leftover at the end of 135 // the buffer. Shift out the consumed bytes, so that on the next round 136 // |buf_| starts with the header of the next unconsumed frame. 137 const char* move_begin = buf() + consumed_size; 138 PERFETTO_CHECK(move_begin > buf()); 139 PERFETTO_CHECK(move_begin + size_ <= buf() + capacity_); 140 memmove(buf(), move_begin, size_); 141 } 142 // If we just finished decoding a large frame that used more than one page, 143 // release the extra memory in the buffer. Large frames should be quite 144 // rare. 145 if (consumed_size > base::kPageSize) { 146 size_t size_rounded_up = (size_ / base::kPageSize + 1) * base::kPageSize; 147 if (size_rounded_up < capacity_) { 148 char* madvise_begin = buf() + size_rounded_up; 149 const size_t madvise_size = capacity_ - size_rounded_up; 150 PERFETTO_CHECK(madvise_begin > buf() + size_); 151 PERFETTO_CHECK(madvise_begin + madvise_size <= buf() + capacity_); 152 int res = madvise(madvise_begin, madvise_size, MADV_DONTNEED); 153 PERFETTO_DCHECK(res == 0); 154 } 155 } 156 } 157 // At this point |size_| == 0 for case C, > 0 for cases A, B, D. 158 return true; 159 } 160 161 std::unique_ptr<Frame> BufferedFrameDeserializer::PopNextFrame() { 162 if (decoded_frames_.empty()) 163 return nullptr; 164 std::unique_ptr<Frame> frame = std::move(decoded_frames_.front()); 165 decoded_frames_.pop_front(); 166 return frame; 167 } 168 169 void BufferedFrameDeserializer::DecodeFrame(const char* data, size_t size) { 170 if (size == 0) 171 return; 172 std::unique_ptr<Frame> frame(new Frame); 173 const int sz = static_cast<int>(size); 174 ::google::protobuf::io::ArrayInputStream stream(data, sz); 175 if (frame->ParseFromBoundedZeroCopyStream(&stream, sz)) 176 decoded_frames_.push_back(std::move(frame)); 177 } 178 179 // static 180 std::string BufferedFrameDeserializer::Serialize(const Frame& frame) { 181 std::string buf; 182 buf.reserve(1024); // Just an educated guess to avoid trivial expansions. 183 buf.insert(0, kHeaderSize, 0); // Reserve the space for the header. 184 frame.AppendToString(&buf); 185 const uint32_t payload_size = static_cast<uint32_t>(buf.size() - kHeaderSize); 186 PERFETTO_DCHECK(payload_size == static_cast<uint32_t>(frame.GetCachedSize())); 187 // Don't send messages larger than what the receiver can handle. 188 PERFETTO_DCHECK(kHeaderSize + payload_size <= kIPCBufferSize); 189 char header[kHeaderSize]; 190 memcpy(header, base::AssumeLittleEndian(&payload_size), kHeaderSize); 191 buf.replace(0, kHeaderSize, header, kHeaderSize); 192 return buf; 193 } 194 195 } // namespace ipc 196 } // namespace perfetto 197