1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/buffered_inputstream.h" 17 18 #include "tensorflow/core/lib/io/random_inputstream.h" 19 20 namespace tensorflow { 21 namespace io { 22 23 BufferedInputStream::BufferedInputStream(InputStreamInterface* input_stream, 24 size_t buffer_size, 25 bool owns_input_stream) 26 : input_stream_(input_stream), 27 size_(buffer_size), 28 owns_input_stream_(owns_input_stream) { 29 buf_.reserve(size_); 30 } 31 32 BufferedInputStream::BufferedInputStream(RandomAccessFile* file, 33 size_t buffer_size) 34 : BufferedInputStream(new RandomAccessInputStream(file), buffer_size, 35 true) {} 36 37 BufferedInputStream::~BufferedInputStream() { 38 if (owns_input_stream_) { 39 delete input_stream_; 40 } 41 } 42 43 Status BufferedInputStream::FillBuffer() { 44 if (!file_status_.ok()) { 45 pos_ = 0; 46 limit_ = 0; 47 return file_status_; 48 } 49 Status s = input_stream_->ReadNBytes(size_, &buf_); 50 pos_ = 0; 51 limit_ = buf_.size(); 52 if (buf_.empty()) { 53 DCHECK(!s.ok()); 54 file_status_ = s; 55 } 56 return s; 57 } 58 59 Status BufferedInputStream::ReadLineHelper(string* result, bool include_eol) { 60 result->clear(); 61 Status s; 62 while (true) { 63 if (pos_ == limit_) { 64 // Get more data into buffer 65 s = FillBuffer(); 66 if (limit_ == 0) { 67 break; 68 } 69 } 70 char c = buf_[pos_++]; 71 if (c == '\n') { 72 if (include_eol) { 73 *result += c; 74 } 75 return Status::OK(); 76 } 77 // We don't append '\r' to *result 78 if (c != '\r') { 79 *result += c; 80 } 81 } 82 if (errors::IsOutOfRange(s) && !result->empty()) { 83 return Status::OK(); 84 } 85 return s; 86 } 87 88 Status BufferedInputStream::ReadNBytes(int64 bytes_to_read, string* result) { 89 if (bytes_to_read < 0) { 90 return errors::InvalidArgument("Can't read a negative number of bytes: ", 91 bytes_to_read); 92 } 93 result->clear(); 94 if (!file_status_.ok() && bytes_to_read > 0) { 95 return file_status_; 96 } 97 result->reserve(bytes_to_read); 98 99 Status s; 100 while (result->size() < static_cast<size_t>(bytes_to_read)) { 101 // Check whether the buffer is fully read or not. 102 if (pos_ == limit_) { 103 s = FillBuffer(); 104 // If we didn't read any bytes, we're at the end of the file; break out. 105 if (limit_ == 0) { 106 DCHECK(!s.ok()); 107 file_status_ = s; 108 break; 109 } 110 } 111 const int64 bytes_to_copy = 112 std::min<int64>(limit_ - pos_, bytes_to_read - result->size()); 113 result->insert(result->size(), buf_, pos_, bytes_to_copy); 114 pos_ += bytes_to_copy; 115 } 116 // Filling the buffer might lead to a situation when we go past the end of 117 // the file leading to an OutOfRange() status return. But we might have 118 // obtained enough data to satisfy the function call. Returning OK then. 119 if (errors::IsOutOfRange(s) && 120 (result->size() == static_cast<size_t>(bytes_to_read))) { 121 return Status::OK(); 122 } 123 return s; 124 } 125 126 Status BufferedInputStream::SkipNBytes(int64 bytes_to_skip) { 127 if (bytes_to_skip < 0) { 128 return errors::InvalidArgument("Can only skip forward, not ", 129 bytes_to_skip); 130 } 131 if (pos_ + bytes_to_skip < limit_) { 132 // If we aren't skipping too much, then we can just move pos_; 133 pos_ += bytes_to_skip; 134 } else { 135 // Otherwise, we already have read limit_ - pos_, so skip the rest. At this 136 // point we need to get fresh data into the buffer, so reset pos_ and 137 // limit_. 138 Status s = input_stream_->SkipNBytes(bytes_to_skip - (limit_ - pos_)); 139 pos_ = 0; 140 limit_ = 0; 141 if (errors::IsOutOfRange(s)) { 142 file_status_ = s; 143 } 144 return s; 145 } 146 return Status::OK(); 147 } 148 149 int64 BufferedInputStream::Tell() const { 150 return input_stream_->Tell() - (limit_ - pos_); 151 } 152 153 Status BufferedInputStream::Seek(int64 position) { 154 if (position < 0) { 155 return errors::InvalidArgument("Seeking to a negative position: ", 156 position); 157 } 158 159 // Position of the buffer within file. 160 const int64 bufpos = Tell(); 161 if (position < bufpos) { 162 // Reset input stream and skip 'position' bytes. 163 TF_RETURN_IF_ERROR(Reset()); 164 return SkipNBytes(position); 165 } 166 167 return SkipNBytes(position - bufpos); 168 } 169 170 Status BufferedInputStream::ReadAll(string* result) { 171 result->clear(); 172 Status status; 173 while (status.ok()) { 174 status = FillBuffer(); 175 if (limit_ == 0) { 176 break; 177 } 178 result->append(buf_); 179 pos_ = limit_; 180 } 181 182 if (errors::IsOutOfRange(status)) { 183 file_status_ = status; 184 return Status::OK(); 185 } 186 return status; 187 } 188 189 Status BufferedInputStream::Reset() { 190 TF_RETURN_IF_ERROR(input_stream_->Reset()); 191 pos_ = 0; 192 limit_ = 0; 193 file_status_ = Status::OK(); 194 return Status::OK(); 195 } 196 197 Status BufferedInputStream::ReadLine(string* result) { 198 return ReadLineHelper(result, false); 199 } 200 201 string BufferedInputStream::ReadLineAsString() { 202 string result; 203 ReadLineHelper(&result, true).IgnoreError(); 204 return result; 205 } 206 207 } // namespace io 208 } // namespace tensorflow 209