1 /* 2 * Copyright (C) 2016 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 #define DEBUG false 17 #include "Log.h" 18 19 #include "FdBuffer.h" 20 21 #include <log/log.h> 22 #include <utils/SystemClock.h> 23 24 #include <fcntl.h> 25 #include <poll.h> 26 #include <unistd.h> 27 #include <wait.h> 28 29 namespace android { 30 namespace os { 31 namespace incidentd { 32 33 const ssize_t BUFFER_SIZE = 16 * 1024; // 16 KB 34 const ssize_t MAX_BUFFER_COUNT = 6144; // 96 MB max 35 36 FdBuffer::FdBuffer() 37 :mBuffer(new EncodedBuffer(BUFFER_SIZE)), 38 mStartTime(-1), 39 mFinishTime(-1), 40 mTimedOut(false), 41 mTruncated(false) { 42 } 43 44 FdBuffer::~FdBuffer() { 45 } 46 47 status_t FdBuffer::read(int fd, int64_t timeout) { 48 struct pollfd pfds = {.fd = fd, .events = POLLIN}; 49 mStartTime = uptimeMillis(); 50 51 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 52 53 while (true) { 54 if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 55 mTruncated = true; 56 VLOG("Truncating data"); 57 break; 58 } 59 if (mBuffer->writeBuffer() == NULL) { 60 VLOG("No memory"); 61 return NO_MEMORY; 62 } 63 64 int64_t remainingTime = (mStartTime + timeout) - uptimeMillis(); 65 if (remainingTime <= 0) { 66 VLOG("timed out due to long read"); 67 mTimedOut = true; 68 break; 69 } 70 71 int count = TEMP_FAILURE_RETRY(poll(&pfds, 1, remainingTime)); 72 if (count == 0) { 73 VLOG("timed out due to block calling poll"); 74 mTimedOut = true; 75 break; 76 } else if (count < 0) { 77 VLOG("poll failed: %s", strerror(errno)); 78 return -errno; 79 } else { 80 if ((pfds.revents & POLLERR) != 0) { 81 VLOG("return event has error %s", strerror(errno)); 82 return errno != 0 ? -errno : UNKNOWN_ERROR; 83 } else { 84 ssize_t amt = TEMP_FAILURE_RETRY( 85 ::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite())); 86 if (amt < 0) { 87 if (errno == EAGAIN || errno == EWOULDBLOCK) { 88 continue; 89 } else { 90 VLOG("Fail to read %d: %s", fd, strerror(errno)); 91 return -errno; 92 } 93 } else if (amt == 0) { 94 VLOG("Reached EOF of fd=%d", fd); 95 break; 96 } 97 mBuffer->wp()->move(amt); 98 } 99 } 100 } 101 mFinishTime = uptimeMillis(); 102 return NO_ERROR; 103 } 104 105 status_t FdBuffer::readFully(int fd) { 106 mStartTime = uptimeMillis(); 107 108 while (true) { 109 if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 110 // Don't let it get too big. 111 mTruncated = true; 112 VLOG("Truncating data"); 113 break; 114 } 115 if (mBuffer->writeBuffer() == NULL) { 116 VLOG("No memory"); 117 return NO_MEMORY; 118 } 119 120 ssize_t amt = 121 TEMP_FAILURE_RETRY(::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite())); 122 if (amt < 0) { 123 VLOG("Fail to read %d: %s", fd, strerror(errno)); 124 return -errno; 125 } else if (amt == 0) { 126 VLOG("Done reading %zu bytes", mBuffer->size()); 127 // We're done. 128 break; 129 } 130 mBuffer->wp()->move(amt); 131 } 132 133 mFinishTime = uptimeMillis(); 134 return NO_ERROR; 135 } 136 137 status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd, 138 int64_t timeoutMs, const bool isSysfs) { 139 struct pollfd pfds[] = { 140 {.fd = fd, .events = POLLIN}, 141 {.fd = toFd.get(), .events = POLLOUT}, 142 {.fd = fromFd.get(), .events = POLLIN}, 143 }; 144 145 mStartTime = uptimeMillis(); 146 147 // mark all fds non blocking 148 fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK); 149 fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK); 150 fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK); 151 152 // A circular buffer holds data read from fd and writes to parsing process 153 uint8_t cirBuf[BUFFER_SIZE]; 154 size_t cirSize = 0; 155 int rpos = 0, wpos = 0; 156 157 // This is the buffer used to store processed data 158 while (true) { 159 if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) { 160 VLOG("Truncating data"); 161 mTruncated = true; 162 break; 163 } 164 if (mBuffer->writeBuffer() == NULL) { 165 VLOG("No memory"); 166 return NO_MEMORY; 167 } 168 169 int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis(); 170 if (remainingTime <= 0) { 171 VLOG("timed out due to long read"); 172 mTimedOut = true; 173 break; 174 } 175 176 // wait for any pfds to be ready to perform IO 177 int count = TEMP_FAILURE_RETRY(poll(pfds, 3, remainingTime)); 178 if (count == 0) { 179 VLOG("timed out due to block calling poll"); 180 mTimedOut = true; 181 break; 182 } else if (count < 0) { 183 VLOG("Fail to poll: %s", strerror(errno)); 184 return -errno; 185 } 186 187 // make sure no errors occur on any fds 188 for (int i = 0; i < 3; ++i) { 189 if ((pfds[i].revents & POLLERR) != 0) { 190 if (i == 0 && isSysfs) { 191 VLOG("fd %d is sysfs, ignore its POLLERR return value", fd); 192 continue; 193 } 194 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno)); 195 return errno != 0 ? -errno : UNKNOWN_ERROR; 196 } 197 } 198 199 // read from fd 200 if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) { 201 ssize_t amt; 202 if (rpos >= wpos) { 203 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos)); 204 } else { 205 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, wpos - rpos)); 206 } 207 if (amt < 0) { 208 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 209 VLOG("Fail to read fd %d: %s", fd, strerror(errno)); 210 return -errno; 211 } // otherwise just continue 212 } else if (amt == 0) { 213 VLOG("Reached EOF of input file %d", fd); 214 pfds[0].fd = -1; // reach EOF so don't have to poll pfds[0]. 215 } else { 216 rpos += amt; 217 cirSize += amt; 218 } 219 } 220 221 // write to parsing process 222 if (cirSize > 0 && pfds[1].fd != -1) { 223 ssize_t amt; 224 if (rpos > wpos) { 225 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, rpos - wpos)); 226 } else { 227 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos)); 228 } 229 if (amt < 0) { 230 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 231 VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno)); 232 return -errno; 233 } // otherwise just continue 234 } else { 235 wpos += amt; 236 cirSize -= amt; 237 } 238 } 239 240 // if buffer is empty and fd is closed, close write fd. 241 if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) { 242 VLOG("Close write pipe %d", toFd.get()); 243 toFd.reset(); 244 pfds[1].fd = -1; 245 } 246 247 // circular buffer, reset rpos and wpos 248 if (rpos >= BUFFER_SIZE) { 249 rpos = 0; 250 } 251 if (wpos >= BUFFER_SIZE) { 252 wpos = 0; 253 } 254 255 // read from parsing process 256 ssize_t amt = TEMP_FAILURE_RETRY( 257 ::read(fromFd.get(), mBuffer->writeBuffer(), mBuffer->currentToWrite())); 258 if (amt < 0) { 259 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { 260 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno)); 261 return -errno; 262 } // otherwise just continue 263 } else if (amt == 0) { 264 VLOG("Reached EOF of fromFd %d", fromFd.get()); 265 break; 266 } else { 267 mBuffer->wp()->move(amt); 268 } 269 } 270 271 mFinishTime = uptimeMillis(); 272 return NO_ERROR; 273 } 274 275 status_t FdBuffer::write(uint8_t const* buf, size_t size) { 276 return mBuffer->writeRaw(buf, size); 277 } 278 279 status_t FdBuffer::write(const sp<ProtoReader>& reader) { 280 return mBuffer->writeRaw(reader); 281 } 282 283 status_t FdBuffer::write(const sp<ProtoReader>& reader, size_t size) { 284 return mBuffer->writeRaw(reader, size); 285 } 286 287 size_t FdBuffer::size() const { 288 return mBuffer->size(); 289 } 290 291 sp<EncodedBuffer> FdBuffer::data() const { 292 return mBuffer; 293 } 294 295 } // namespace incidentd 296 } // namespace os 297 } // namespace android 298