Home | History | Annotate | Download | only in io
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/inputbuffer.h"
     17 #include "tensorflow/core/lib/core/errors.h"
     18 #include "tensorflow/core/platform/logging.h"
     19 
     20 namespace tensorflow {
     21 namespace io {
     22 
     23 InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes)
     24     : file_(file),
     25       file_pos_(0),
     26       size_(buffer_bytes),
     27       buf_(new char[size_]),
     28       pos_(buf_),
     29       limit_(buf_) {}
     30 
     31 InputBuffer::~InputBuffer() { delete[] buf_; }
     32 
     33 Status InputBuffer::FillBuffer() {
     34   StringPiece data;
     35   Status s = file_->Read(file_pos_, size_, &data, buf_);
     36   if (data.data() != buf_) {
     37     memmove(buf_, data.data(), data.size());
     38   }
     39   pos_ = buf_;
     40   limit_ = pos_ + data.size();
     41   file_pos_ += data.size();
     42   return s;
     43 }
     44 
     45 Status InputBuffer::ReadLine(string* result) {
     46   result->clear();
     47   Status s;
     48   do {
     49     size_t buf_remain = limit_ - pos_;
     50     char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain));
     51     if (newline != nullptr) {
     52       size_t result_len = newline - pos_;
     53       result->append(pos_, result_len);
     54       pos_ = newline + 1;
     55       if (!result->empty() && result->back() == '\r') {
     56         result->resize(result->size() - 1);
     57       }
     58       return Status::OK();
     59     }
     60     if (buf_remain > 0) result->append(pos_, buf_remain);
     61     // Get more data into buffer
     62     s = FillBuffer();
     63     DCHECK_EQ(pos_, buf_);
     64   } while (limit_ != buf_);
     65   if (!result->empty() && result->back() == '\r') {
     66     result->resize(result->size() - 1);
     67   }
     68   if (errors::IsOutOfRange(s) && !result->empty()) {
     69     return Status::OK();
     70   }
     71   return s;
     72 }
     73 
     74 Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) {
     75   result->clear();
     76   if (bytes_to_read < 0) {
     77     return errors::InvalidArgument("Can't read a negative number of bytes: ",
     78                                    bytes_to_read);
     79   }
     80   result->resize(bytes_to_read);
     81   size_t bytes_read = 0;
     82   Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read);
     83   if (bytes_read < bytes_to_read) result->resize(bytes_read);
     84   return status;
     85 }
     86 
     87 Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result,
     88                                size_t* bytes_read) {
     89   if (bytes_to_read < 0) {
     90     return errors::InvalidArgument("Can't read a negative number of bytes: ",
     91                                    bytes_to_read);
     92   }
     93   Status status;
     94   *bytes_read = 0;
     95   while (*bytes_read < static_cast<size_t>(bytes_to_read)) {
     96     if (pos_ == limit_) {
     97       // Get more data into buffer.
     98       status = FillBuffer();
     99       if (limit_ == buf_) {
    100         break;
    101       }
    102     }
    103     // Do not go over the buffer boundary.
    104     const int64 bytes_to_copy =
    105         std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read);
    106     // Copies buffered data into the destination.
    107     memcpy(result + *bytes_read, pos_, bytes_to_copy);
    108     pos_ += bytes_to_copy;
    109     *bytes_read += bytes_to_copy;
    110   }
    111   if (errors::IsOutOfRange(status) &&
    112       (*bytes_read == static_cast<size_t>(bytes_to_read))) {
    113     return Status::OK();
    114   }
    115   return status;
    116 }
    117 
    118 Status InputBuffer::ReadVarint32Fallback(uint32* result) {
    119   Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes);
    120   if (errors::IsDataLoss(s)) {
    121     return errors::DataLoss("Stored data is too large to be a varint32.");
    122   }
    123   return s;
    124 }
    125 
    126 Status InputBuffer::ReadVarint64Fallback(uint64* result) {
    127   Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes);
    128   if (errors::IsDataLoss(s)) {
    129     return errors::DataLoss("Stored data is too large to be a varint64.");
    130   }
    131   return s;
    132 }
    133 
    134 template <typename T>
    135 Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) {
    136   uint8 scratch = 0;
    137   auto* p = reinterpret_cast<char*>(&scratch);
    138   size_t unused_bytes_read = 0;
    139 
    140   *result = 0;
    141   for (int index = 0; index < max_bytes; index++) {
    142     int shift = 7 * index;
    143     TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read));
    144     *result |= (static_cast<T>(scratch) & 127) << shift;
    145     if (!(scratch & 128)) return Status::OK();
    146   }
    147   return errors::DataLoss("Stored data longer than ", max_bytes, " bytes.");
    148 }
    149 
    150 Status InputBuffer::SkipNBytes(int64 bytes_to_skip) {
    151   if (bytes_to_skip < 0) {
    152     return errors::InvalidArgument("Can only skip forward, not ",
    153                                    bytes_to_skip);
    154   }
    155   int64 bytes_skipped = 0;
    156   Status s;
    157   while (bytes_skipped < bytes_to_skip) {
    158     if (pos_ == limit_) {
    159       // Get more data into buffer
    160       s = FillBuffer();
    161       if (limit_ == buf_) {
    162         break;
    163       }
    164     }
    165     const int64 bytes_to_advance =
    166         std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped);
    167     bytes_skipped += bytes_to_advance;
    168     pos_ += bytes_to_advance;
    169   }
    170   if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) {
    171     return Status::OK();
    172   }
    173   return s;
    174 }
    175 
    176 Status InputBuffer::Seek(int64 position) {
    177   if (position < 0) {
    178     return errors::InvalidArgument("Seeking to a negative position: ",
    179                                    position);
    180   }
    181   // Position of the buffer within file.
    182   const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_);
    183   if (position >= bufpos && position < file_pos_) {
    184     // Seeks to somewhere inside the buffer.
    185     pos_ = buf_ + (position - bufpos);
    186     DCHECK(pos_ >= buf_ && pos_ < limit_);
    187   } else {
    188     // Seeks to somewhere outside.  Discards the buffered data.
    189     pos_ = limit_ = buf_;
    190     file_pos_ = position;
    191   }
    192   return Status::OK();
    193 }
    194 
    195 }  // namespace io
    196 }  // namespace tensorflow
    197