1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/inputbuffer.h" 17 #include "tensorflow/core/lib/core/errors.h" 18 #include "tensorflow/core/platform/logging.h" 19 20 namespace tensorflow { 21 namespace io { 22 23 InputBuffer::InputBuffer(RandomAccessFile* file, size_t buffer_bytes) 24 : file_(file), 25 file_pos_(0), 26 size_(buffer_bytes), 27 buf_(new char[size_]), 28 pos_(buf_), 29 limit_(buf_) {} 30 31 InputBuffer::~InputBuffer() { delete[] buf_; } 32 33 Status InputBuffer::FillBuffer() { 34 StringPiece data; 35 Status s = file_->Read(file_pos_, size_, &data, buf_); 36 if (data.data() != buf_) { 37 memmove(buf_, data.data(), data.size()); 38 } 39 pos_ = buf_; 40 limit_ = pos_ + data.size(); 41 file_pos_ += data.size(); 42 return s; 43 } 44 45 Status InputBuffer::ReadLine(string* result) { 46 result->clear(); 47 Status s; 48 do { 49 size_t buf_remain = limit_ - pos_; 50 char* newline = static_cast<char*>(memchr(pos_, '\n', buf_remain)); 51 if (newline != nullptr) { 52 size_t result_len = newline - pos_; 53 result->append(pos_, result_len); 54 pos_ = newline + 1; 55 if (!result->empty() && result->back() == '\r') { 56 result->resize(result->size() - 1); 57 } 58 return Status::OK(); 59 } 60 if (buf_remain > 0) result->append(pos_, buf_remain); 61 // Get more data into buffer 62 s = FillBuffer(); 63 DCHECK_EQ(pos_, buf_); 64 } while (limit_ != buf_); 65 if (!result->empty() && result->back() == '\r') { 66 result->resize(result->size() - 1); 67 } 68 if (errors::IsOutOfRange(s) && !result->empty()) { 69 return Status::OK(); 70 } 71 return s; 72 } 73 74 Status InputBuffer::ReadNBytes(int64 bytes_to_read, string* result) { 75 result->clear(); 76 if (bytes_to_read < 0) { 77 return errors::InvalidArgument("Can't read a negative number of bytes: ", 78 bytes_to_read); 79 } 80 result->resize(bytes_to_read); 81 size_t bytes_read = 0; 82 Status status = ReadNBytes(bytes_to_read, &(*result)[0], &bytes_read); 83 if (bytes_read < bytes_to_read) result->resize(bytes_read); 84 return status; 85 } 86 87 Status InputBuffer::ReadNBytes(int64 bytes_to_read, char* result, 88 size_t* bytes_read) { 89 if (bytes_to_read < 0) { 90 return errors::InvalidArgument("Can't read a negative number of bytes: ", 91 bytes_to_read); 92 } 93 Status status; 94 *bytes_read = 0; 95 while (*bytes_read < static_cast<size_t>(bytes_to_read)) { 96 if (pos_ == limit_) { 97 // Get more data into buffer. 98 status = FillBuffer(); 99 if (limit_ == buf_) { 100 break; 101 } 102 } 103 // Do not go over the buffer boundary. 104 const int64 bytes_to_copy = 105 std::min<int64>(limit_ - pos_, bytes_to_read - *bytes_read); 106 // Copies buffered data into the destination. 107 memcpy(result + *bytes_read, pos_, bytes_to_copy); 108 pos_ += bytes_to_copy; 109 *bytes_read += bytes_to_copy; 110 } 111 if (errors::IsOutOfRange(status) && 112 (*bytes_read == static_cast<size_t>(bytes_to_read))) { 113 return Status::OK(); 114 } 115 return status; 116 } 117 118 Status InputBuffer::ReadVarint32Fallback(uint32* result) { 119 Status s = ReadVarintFallback(result, core::kMaxVarint32Bytes); 120 if (errors::IsDataLoss(s)) { 121 return errors::DataLoss("Stored data is too large to be a varint32."); 122 } 123 return s; 124 } 125 126 Status InputBuffer::ReadVarint64Fallback(uint64* result) { 127 Status s = ReadVarintFallback(result, core::kMaxVarint64Bytes); 128 if (errors::IsDataLoss(s)) { 129 return errors::DataLoss("Stored data is too large to be a varint64."); 130 } 131 return s; 132 } 133 134 template <typename T> 135 Status InputBuffer::ReadVarintFallback(T* result, int max_bytes) { 136 uint8 scratch = 0; 137 auto* p = reinterpret_cast<char*>(&scratch); 138 size_t unused_bytes_read = 0; 139 140 *result = 0; 141 for (int index = 0; index < max_bytes; index++) { 142 int shift = 7 * index; 143 TF_RETURN_IF_ERROR(ReadNBytes(1, p, &unused_bytes_read)); 144 *result |= (static_cast<T>(scratch) & 127) << shift; 145 if (!(scratch & 128)) return Status::OK(); 146 } 147 return errors::DataLoss("Stored data longer than ", max_bytes, " bytes."); 148 } 149 150 Status InputBuffer::SkipNBytes(int64 bytes_to_skip) { 151 if (bytes_to_skip < 0) { 152 return errors::InvalidArgument("Can only skip forward, not ", 153 bytes_to_skip); 154 } 155 int64 bytes_skipped = 0; 156 Status s; 157 while (bytes_skipped < bytes_to_skip) { 158 if (pos_ == limit_) { 159 // Get more data into buffer 160 s = FillBuffer(); 161 if (limit_ == buf_) { 162 break; 163 } 164 } 165 const int64 bytes_to_advance = 166 std::min<int64>(limit_ - pos_, bytes_to_skip - bytes_skipped); 167 bytes_skipped += bytes_to_advance; 168 pos_ += bytes_to_advance; 169 } 170 if (errors::IsOutOfRange(s) && bytes_skipped == bytes_to_skip) { 171 return Status::OK(); 172 } 173 return s; 174 } 175 176 Status InputBuffer::Seek(int64 position) { 177 if (position < 0) { 178 return errors::InvalidArgument("Seeking to a negative position: ", 179 position); 180 } 181 // Position of the buffer within file. 182 const int64 bufpos = file_pos_ - static_cast<int64>(limit_ - buf_); 183 if (position >= bufpos && position < file_pos_) { 184 // Seeks to somewhere inside the buffer. 185 pos_ = buf_ + (position - bufpos); 186 DCHECK(pos_ >= buf_ && pos_ < limit_); 187 } else { 188 // Seeks to somewhere outside. Discards the buffered data. 189 pos_ = limit_ = buf_; 190 file_pos_ = position; 191 } 192 return Status::OK(); 193 } 194 195 } // namespace io 196 } // namespace tensorflow 197