Home | History | Annotate | Download | only in src
      1 /*
      2  * Copyright (C) 2016 The Android Open Source Project
      3  *
      4  * Licensed under the Apache License, Version 2.0 (the "License");
      5  * you may not use this file except in compliance with the License.
      6  * You may obtain a copy of the License at
      7  *
      8  *      http://www.apache.org/licenses/LICENSE-2.0
      9  *
     10  * Unless required by applicable law or agreed to in writing, software
     11  * distributed under the License is distributed on an "AS IS" BASIS,
     12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13  * See the License for the specific language governing permissions and
     14  * limitations under the License.
     15  */
     16 #define DEBUG false
     17 #include "Log.h"
     18 
     19 #include "FdBuffer.h"
     20 
     21 #include <log/log.h>
     22 #include <utils/SystemClock.h>
     23 
     24 #include <fcntl.h>
     25 #include <poll.h>
     26 #include <unistd.h>
     27 #include <wait.h>
     28 
     29 namespace android {
     30 namespace os {
     31 namespace incidentd {
     32 
     33 const ssize_t BUFFER_SIZE = 16 * 1024;  // 16 KB
     34 const ssize_t MAX_BUFFER_COUNT = 6144;   // 96 MB max
     35 
     36 FdBuffer::FdBuffer()
     37         :mBuffer(new EncodedBuffer(BUFFER_SIZE)),
     38          mStartTime(-1),
     39          mFinishTime(-1),
     40          mTimedOut(false),
     41          mTruncated(false) {
     42 }
     43 
     44 FdBuffer::~FdBuffer() {
     45 }
     46 
     47 status_t FdBuffer::read(int fd, int64_t timeout) {
     48     struct pollfd pfds = {.fd = fd, .events = POLLIN};
     49     mStartTime = uptimeMillis();
     50 
     51     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
     52 
     53     while (true) {
     54         if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
     55             mTruncated = true;
     56             VLOG("Truncating data");
     57             break;
     58         }
     59         if (mBuffer->writeBuffer() == NULL) {
     60             VLOG("No memory");
     61             return NO_MEMORY;
     62         }
     63 
     64         int64_t remainingTime = (mStartTime + timeout) - uptimeMillis();
     65         if (remainingTime <= 0) {
     66             VLOG("timed out due to long read");
     67             mTimedOut = true;
     68             break;
     69         }
     70 
     71         int count = TEMP_FAILURE_RETRY(poll(&pfds, 1, remainingTime));
     72         if (count == 0) {
     73             VLOG("timed out due to block calling poll");
     74             mTimedOut = true;
     75             break;
     76         } else if (count < 0) {
     77             VLOG("poll failed: %s", strerror(errno));
     78             return -errno;
     79         } else {
     80             if ((pfds.revents & POLLERR) != 0) {
     81                 VLOG("return event has error %s", strerror(errno));
     82                 return errno != 0 ? -errno : UNKNOWN_ERROR;
     83             } else {
     84                 ssize_t amt = TEMP_FAILURE_RETRY(
     85                         ::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
     86                 if (amt < 0) {
     87                     if (errno == EAGAIN || errno == EWOULDBLOCK) {
     88                         continue;
     89                     } else {
     90                         VLOG("Fail to read %d: %s", fd, strerror(errno));
     91                         return -errno;
     92                     }
     93                 } else if (amt == 0) {
     94                     VLOG("Reached EOF of fd=%d", fd);
     95                     break;
     96                 }
     97                 mBuffer->wp()->move(amt);
     98             }
     99         }
    100     }
    101     mFinishTime = uptimeMillis();
    102     return NO_ERROR;
    103 }
    104 
    105 status_t FdBuffer::readFully(int fd) {
    106     mStartTime = uptimeMillis();
    107 
    108     while (true) {
    109         if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
    110             // Don't let it get too big.
    111             mTruncated = true;
    112             VLOG("Truncating data");
    113             break;
    114         }
    115         if (mBuffer->writeBuffer() == NULL) {
    116             VLOG("No memory");
    117             return NO_MEMORY;
    118         }
    119 
    120         ssize_t amt =
    121                 TEMP_FAILURE_RETRY(::read(fd, mBuffer->writeBuffer(), mBuffer->currentToWrite()));
    122         if (amt < 0) {
    123             VLOG("Fail to read %d: %s", fd, strerror(errno));
    124             return -errno;
    125         } else if (amt == 0) {
    126             VLOG("Done reading %zu bytes", mBuffer->size());
    127             // We're done.
    128             break;
    129         }
    130         mBuffer->wp()->move(amt);
    131     }
    132 
    133     mFinishTime = uptimeMillis();
    134     return NO_ERROR;
    135 }
    136 
    137 status_t FdBuffer::readProcessedDataInStream(int fd, unique_fd toFd, unique_fd fromFd,
    138                                              int64_t timeoutMs, const bool isSysfs) {
    139     struct pollfd pfds[] = {
    140             {.fd = fd, .events = POLLIN},
    141             {.fd = toFd.get(), .events = POLLOUT},
    142             {.fd = fromFd.get(), .events = POLLIN},
    143     };
    144 
    145     mStartTime = uptimeMillis();
    146 
    147     // mark all fds non blocking
    148     fcntl(fd, F_SETFL, fcntl(fd, F_GETFL, 0) | O_NONBLOCK);
    149     fcntl(toFd.get(), F_SETFL, fcntl(toFd.get(), F_GETFL, 0) | O_NONBLOCK);
    150     fcntl(fromFd.get(), F_SETFL, fcntl(fromFd.get(), F_GETFL, 0) | O_NONBLOCK);
    151 
    152     // A circular buffer holds data read from fd and writes to parsing process
    153     uint8_t cirBuf[BUFFER_SIZE];
    154     size_t cirSize = 0;
    155     int rpos = 0, wpos = 0;
    156 
    157     // This is the buffer used to store processed data
    158     while (true) {
    159         if (mBuffer->size() >= MAX_BUFFER_COUNT * BUFFER_SIZE) {
    160             VLOG("Truncating data");
    161             mTruncated = true;
    162             break;
    163         }
    164         if (mBuffer->writeBuffer() == NULL) {
    165             VLOG("No memory");
    166             return NO_MEMORY;
    167         }
    168 
    169         int64_t remainingTime = (mStartTime + timeoutMs) - uptimeMillis();
    170         if (remainingTime <= 0) {
    171             VLOG("timed out due to long read");
    172             mTimedOut = true;
    173             break;
    174         }
    175 
    176         // wait for any pfds to be ready to perform IO
    177         int count = TEMP_FAILURE_RETRY(poll(pfds, 3, remainingTime));
    178         if (count == 0) {
    179             VLOG("timed out due to block calling poll");
    180             mTimedOut = true;
    181             break;
    182         } else if (count < 0) {
    183             VLOG("Fail to poll: %s", strerror(errno));
    184             return -errno;
    185         }
    186 
    187         // make sure no errors occur on any fds
    188         for (int i = 0; i < 3; ++i) {
    189             if ((pfds[i].revents & POLLERR) != 0) {
    190                 if (i == 0 && isSysfs) {
    191                     VLOG("fd %d is sysfs, ignore its POLLERR return value", fd);
    192                     continue;
    193                 }
    194                 VLOG("fd[%d]=%d returns error events: %s", i, fd, strerror(errno));
    195                 return errno != 0 ? -errno : UNKNOWN_ERROR;
    196             }
    197         }
    198 
    199         // read from fd
    200         if (cirSize != BUFFER_SIZE && pfds[0].fd != -1) {
    201             ssize_t amt;
    202             if (rpos >= wpos) {
    203                 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, BUFFER_SIZE - rpos));
    204             } else {
    205                 amt = TEMP_FAILURE_RETRY(::read(fd, cirBuf + rpos, wpos - rpos));
    206             }
    207             if (amt < 0) {
    208                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
    209                     VLOG("Fail to read fd %d: %s", fd, strerror(errno));
    210                     return -errno;
    211                 }  // otherwise just continue
    212             } else if (amt == 0) {
    213                 VLOG("Reached EOF of input file %d", fd);
    214                 pfds[0].fd = -1;  // reach EOF so don't have to poll pfds[0].
    215             } else {
    216                 rpos += amt;
    217                 cirSize += amt;
    218             }
    219         }
    220 
    221         // write to parsing process
    222         if (cirSize > 0 && pfds[1].fd != -1) {
    223             ssize_t amt;
    224             if (rpos > wpos) {
    225                 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, rpos - wpos));
    226             } else {
    227                 amt = TEMP_FAILURE_RETRY(::write(toFd.get(), cirBuf + wpos, BUFFER_SIZE - wpos));
    228             }
    229             if (amt < 0) {
    230                 if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
    231                     VLOG("Fail to write toFd %d: %s", toFd.get(), strerror(errno));
    232                     return -errno;
    233                 }  // otherwise just continue
    234             } else {
    235                 wpos += amt;
    236                 cirSize -= amt;
    237             }
    238         }
    239 
    240         // if buffer is empty and fd is closed, close write fd.
    241         if (cirSize == 0 && pfds[0].fd == -1 && pfds[1].fd != -1) {
    242             VLOG("Close write pipe %d", toFd.get());
    243             toFd.reset();
    244             pfds[1].fd = -1;
    245         }
    246 
    247         // circular buffer, reset rpos and wpos
    248         if (rpos >= BUFFER_SIZE) {
    249             rpos = 0;
    250         }
    251         if (wpos >= BUFFER_SIZE) {
    252             wpos = 0;
    253         }
    254 
    255         // read from parsing process
    256         ssize_t amt = TEMP_FAILURE_RETRY(
    257                 ::read(fromFd.get(), mBuffer->writeBuffer(), mBuffer->currentToWrite()));
    258         if (amt < 0) {
    259             if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
    260                 VLOG("Fail to read fromFd %d: %s", fromFd.get(), strerror(errno));
    261                 return -errno;
    262             }  // otherwise just continue
    263         } else if (amt == 0) {
    264             VLOG("Reached EOF of fromFd %d", fromFd.get());
    265             break;
    266         } else {
    267             mBuffer->wp()->move(amt);
    268         }
    269     }
    270 
    271     mFinishTime = uptimeMillis();
    272     return NO_ERROR;
    273 }
    274 
    275 status_t FdBuffer::write(uint8_t const* buf, size_t size) {
    276     return mBuffer->writeRaw(buf, size);
    277 }
    278 
    279 status_t FdBuffer::write(const sp<ProtoReader>& reader) {
    280     return mBuffer->writeRaw(reader);
    281 }
    282 
    283 status_t FdBuffer::write(const sp<ProtoReader>& reader, size_t size) {
    284     return mBuffer->writeRaw(reader, size);
    285 }
    286 
    287 size_t FdBuffer::size() const {
    288     return mBuffer->size();
    289 }
    290 
    291 sp<EncodedBuffer> FdBuffer::data() const {
    292     return mBuffer;
    293 }
    294 
    295 }  // namespace incidentd
    296 }  // namespace os
    297 }  // namespace android
    298