Home | History | Annotate | Download | only in io
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/buffered_inputstream.h"
     17 
     18 #include "tensorflow/core/lib/io/random_inputstream.h"
     19 
     20 namespace tensorflow {
     21 namespace io {
     22 
     23 BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream,
     24                                          size_t buffer_size,
     25                                          bool owns_input_stream)
     26     : input_stream_(input_stream),
     27       size_(buffer_size),
     28       owns_input_stream_(owns_input_stream) {
     29   buf_.reserve(size_);
     30 }
     31 
     32 BufferedInputStream::BufferedInputStream(RandomAccessFile* file,
     33                                          size_t buffer_size)
     34     : BufferedInputStream(new RandomAccessInputStream(file), buffer_size,
     35                           true) {}
     36 
     37 BufferedInputStream::~BufferedInputStream() {
     38   if (owns_input_stream_) {
     39     delete input_stream_;
     40   }
     41 }
     42 
     43 Status BufferedInputStream::FillBuffer() {
     44   if (!file_status_.ok()) {
     45     pos_ = 0;
     46     limit_ = 0;
     47     return file_status_;
     48   }
     49   Status s = input_stream_->ReadNBytes(size_, &buf_);
     50   pos_ = 0;
     51   limit_ = buf_.size();
     52   if (buf_.empty()) {
     53     DCHECK(!s.ok());
     54     file_status_ = s;
     55   }
     56   return s;
     57 }
     58 
     59 Status BufferedInputStream::ReadLineHelper(string* result, bool include_eol) {
     60   result->clear();
     61   Status s;
     62   while (true) {
     63     if (pos_ == limit_) {
     64       // Get more data into buffer
     65       s = FillBuffer();
     66       if (limit_ == 0) {
     67         break;
     68       }
     69     }
     70     char c = buf_[pos_++];
     71     if (c == '\n') {
     72       if (include_eol) {
     73         *result += c;
     74       }
     75       return Status::OK();
     76     }
     77     // We don't append '\r' to *result
     78     if (c != '\r') {
     79       *result += c;
     80     }
     81   }
     82   if (errors::IsOutOfRange(s) && !result->empty()) {
     83     return Status::OK();
     84   }
     85   return s;
     86 }
     87 
     88 Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
     89   if (bytes_to_read < 0) {
     90     return errors::InvalidArgument("Can't read a negative number of bytes: ",
     91                                    bytes_to_read);
     92   }
     93   result->clear();
     94   if (!file_status_.ok() && bytes_to_read > 0) {
     95     return file_status_;
     96   }
     97   result->reserve(bytes_to_read);
     98 
     99   Status s;
    100   while (result->size() < static_cast<size_t>(bytes_to_read)) {
    101     // Check whether the buffer is fully read or not.
    102     if (pos_ == limit_) {
    103       s = FillBuffer();
    104       // If we didn't read any bytes, we're at the end of the file; break out.
    105       if (limit_ == 0) {
    106         DCHECK(!s.ok());
    107         file_status_ = s;
    108         break;
    109       }
    110     }
    111     const int64 bytes_to_copy =
    112         std::min<int64>(limit_ - pos_, bytes_to_read - result->size());
    113     result->insert(result->size(), buf_, pos_, bytes_to_copy);
    114     pos_ += bytes_to_copy;
    115   }
    116   // Filling the buffer might lead to a situation when we go past the end of
    117   // the file leading to an OutOfRange() status return. But we might have
    118   // obtained enough data to satisfy the function call. Returning OK then.
    119   if (errors::IsOutOfRange(s) &&
    120       (result->size() == static_cast<size_t>(bytes_to_read))) {
    121     return Status::OK();
    122   }
    123   return s;
    124 }
    125 
    126 Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) {
    127   if (bytes_to_skip < 0) {
    128     return errors::InvalidArgument("Can only skip forward, not ",
    129                                    bytes_to_skip);
    130   }
    131   if (pos_ + bytes_to_skip < limit_) {
    132     // If we aren't skipping too much, then we can just move pos_;
    133     pos_ += bytes_to_skip;
    134   } else {
    135     // Otherwise, we already have read limit_ - pos_, so skip the rest. At this
    136     // point we need to get fresh data into the buffer, so reset pos_ and
    137     // limit_.
    138     Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_));
    139     pos_ = 0;
    140     limit_ = 0;
    141     if (errors::IsOutOfRange(s)) {
    142       file_status_ = s;
    143     }
    144     return s;
    145   }
    146   return Status::OK();
    147 }
    148 
    149 int64 BufferedInputStream::Tell() const {
    150   return input_stream_->Tell() - (limit_ - pos_);
    151 }
    152 
    153 Status BufferedInputStream::Seek(int64 position) {
    154   if (position < 0) {
    155     return errors::InvalidArgument("Seeking to a negative position: ",
    156                                    position);
    157   }
    158 
    159   // Position of the buffer within file.
    160   const int64 bufpos = Tell();
    161   if (position < bufpos) {
    162     // Reset input stream and skip 'position' bytes.
    163     TF_RETURN_IF_ERROR(Reset());
    164     return SkipNBytes(position);
    165   }
    166 
    167   return SkipNBytes(position - bufpos);
    168 }
    169 
    170 Status BufferedInputStream::ReadAll(string* result) {
    171   result->clear();
    172   Status status;
    173   while (status.ok()) {
    174     status = FillBuffer();
    175     if (limit_ == 0) {
    176       break;
    177     }
    178     result->append(buf_);
    179     pos_ = limit_;
    180   }
    181 
    182   if (errors::IsOutOfRange(status)) {
    183     file_status_ = status;
    184     return Status::OK();
    185   }
    186   return status;
    187 }
    188 
    189 Status BufferedInputStream::Reset() {
    190   TF_RETURN_IF_ERROR(input_stream_->Reset());
    191   pos_ = 0;
    192   limit_ = 0;
    193   file_status_ = Status::OK();
    194   return Status::OK();
    195 }
    196 
    197 Status BufferedInputStream::ReadLine(string* result) {
    198   return ReadLineHelper(result, false);
    199 }
    200 
    201 string BufferedInputStream::ReadLineAsString() {
    202   string result;
    203   ReadLineHelper(&result, true).IgnoreError();
    204   return result;
    205 }
    206 
    207 }  // namespace io
    208 }  // namespace tensorflow
    209