1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/record_reader.h" 17 18 #include <limits.h> 19 20 #include "tensorflow/core/lib/core/coding.h" 21 #include "tensorflow/core/lib/core/errors.h" 22 #include "tensorflow/core/lib/hash/crc32c.h" 23 #include "tensorflow/core/lib/io/buffered_inputstream.h" 24 #include "tensorflow/core/lib/io/compression.h" 25 #include "tensorflow/core/lib/io/random_inputstream.h" 26 #include "tensorflow/core/platform/env.h" 27 28 namespace tensorflow { 29 namespace io { 30 31 RecordReaderOptions RecordReaderOptions::CreateRecordReaderOptions( 32 const string& compression_type) { 33 RecordReaderOptions options; 34 if (compression_type == "ZLIB") { 35 options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION; 36 #if defined(IS_SLIM_BUILD) 37 LOG(ERROR) << "Compression is not supported but compression_type is set." 38 << " No compression will be used."; 39 #else 40 options.zlib_options = io::ZlibCompressionOptions::DEFAULT(); 41 #endif // IS_SLIM_BUILD 42 } else if (compression_type == compression::kGzip) { 43 options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION; 44 #if defined(IS_SLIM_BUILD) 45 LOG(ERROR) << "Compression is not supported but compression_type is set." 46 << " No compression will be used."; 47 #else 48 options.zlib_options = io::ZlibCompressionOptions::GZIP(); 49 #endif // IS_SLIM_BUILD 50 } else if (compression_type != compression::kNone) { 51 LOG(ERROR) << "Unsupported compression_type:" << compression_type 52 << ". No compression will be used."; 53 } 54 return options; 55 } 56 57 RecordReader::RecordReader(RandomAccessFile* file, 58 const RecordReaderOptions& options) 59 : src_(file), options_(options) { 60 if (options.buffer_size > 0) { 61 input_stream_.reset(new BufferedInputStream(file, options.buffer_size)); 62 } else { 63 input_stream_.reset(new RandomAccessInputStream(file)); 64 } 65 if (options.compression_type == RecordReaderOptions::ZLIB_COMPRESSION) { 66 // We don't have zlib available on all embedded platforms, so fail. 67 #if defined(IS_SLIM_BUILD) 68 LOG(FATAL) << "Zlib compression is unsupported on mobile platforms."; 69 #else // IS_SLIM_BUILD 70 zlib_input_stream_.reset(new ZlibInputStream( 71 input_stream_.get(), options.zlib_options.input_buffer_size, 72 options.zlib_options.output_buffer_size, options.zlib_options)); 73 #endif // IS_SLIM_BUILD 74 } else if (options.compression_type == RecordReaderOptions::NONE) { 75 // Nothing to do. 76 } else { 77 LOG(FATAL) << "Unspecified compression type :" << options.compression_type; 78 } 79 } 80 81 // Read n+4 bytes from file, verify that checksum of first n bytes is 82 // stored in the last 4 bytes and store the first n bytes in *result. 83 // May use *storage as backing store. 84 Status RecordReader::ReadChecksummed(uint64 offset, size_t n, 85 StringPiece* result, string* storage) { 86 if (n >= SIZE_MAX - sizeof(uint32)) { 87 return errors::DataLoss("record size too large"); 88 } 89 90 const size_t expected = n + sizeof(uint32); 91 storage->resize(expected); 92 93 #if !defined(IS_SLIM_BUILD) 94 if (zlib_input_stream_) { 95 // If we have a zlib compressed buffer, we assume that the 96 // file is being read sequentially, and we use the underlying 97 // implementation to read the data. 98 // 99 // No checks are done to validate that the file is being read 100 // sequentially. At some point the zlib input buffer may support 101 // seeking, possibly inefficiently. 102 TF_RETURN_IF_ERROR(zlib_input_stream_->ReadNBytes(expected, storage)); 103 104 if (storage->size() != expected) { 105 if (storage->empty()) { 106 return errors::OutOfRange("eof"); 107 } else { 108 return errors::DataLoss("truncated record at ", offset); 109 } 110 } 111 112 uint32 masked_crc = core::DecodeFixed32(storage->data() + n); 113 if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) { 114 return errors::DataLoss("corrupted record at ", offset); 115 } 116 *result = StringPiece(storage->data(), n); 117 } else { 118 #endif // IS_SLIM_BUILD 119 if (options_.buffer_size > 0) { 120 // If we have a buffer, we assume that the file is being read 121 // sequentially, and we use the underlying implementation to read the 122 // data. 123 // 124 // No checks are done to validate that the file is being read 125 // sequentially. 126 TF_RETURN_IF_ERROR(input_stream_->ReadNBytes(expected, storage)); 127 128 if (storage->size() != expected) { 129 if (storage->empty()) { 130 return errors::OutOfRange("eof"); 131 } else { 132 return errors::DataLoss("truncated record at ", offset); 133 } 134 } 135 136 const uint32 masked_crc = core::DecodeFixed32(storage->data() + n); 137 if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) { 138 return errors::DataLoss("corrupted record at ", offset); 139 } 140 *result = StringPiece(storage->data(), n); 141 } else { 142 // This version supports reading from arbitrary offsets 143 // since we are accessing the random access file directly. 144 StringPiece data; 145 TF_RETURN_IF_ERROR(src_->Read(offset, expected, &data, &(*storage)[0])); 146 if (data.size() != expected) { 147 if (data.empty()) { 148 return errors::OutOfRange("eof"); 149 } else { 150 return errors::DataLoss("truncated record at ", offset); 151 } 152 } 153 const uint32 masked_crc = core::DecodeFixed32(data.data() + n); 154 if (crc32c::Unmask(masked_crc) != crc32c::Value(data.data(), n)) { 155 return errors::DataLoss("corrupted record at ", offset); 156 } 157 *result = StringPiece(data.data(), n); 158 } 159 #if !defined(IS_SLIM_BUILD) 160 } 161 #endif // IS_SLIM_BUILD 162 163 return Status::OK(); 164 } 165 166 Status RecordReader::ReadRecord(uint64* offset, string* record) { 167 static const size_t kHeaderSize = sizeof(uint64) + sizeof(uint32); 168 static const size_t kFooterSize = sizeof(uint32); 169 170 // Read header data. 171 StringPiece lbuf; 172 Status s = ReadChecksummed(*offset, sizeof(uint64), &lbuf, record); 173 if (!s.ok()) { 174 return s; 175 } 176 const uint64 length = core::DecodeFixed64(lbuf.data()); 177 178 // Read data 179 StringPiece data; 180 s = ReadChecksummed(*offset + kHeaderSize, length, &data, record); 181 if (!s.ok()) { 182 if (errors::IsOutOfRange(s)) { 183 s = errors::DataLoss("truncated record at ", *offset); 184 } 185 return s; 186 } 187 188 if (record->data() != data.data()) { 189 // RandomAccessFile placed the data in some other location. 190 memmove(&(*record)[0], data.data(), data.size()); 191 } 192 193 record->resize(data.size()); 194 195 *offset += kHeaderSize + length + kFooterSize; 196 return Status::OK(); 197 } 198 199 Status RecordReader::SkipNBytes(uint64 offset) { 200 #if !defined(IS_SLIM_BUILD) 201 if (zlib_input_stream_) { 202 TF_RETURN_IF_ERROR(zlib_input_stream_->SkipNBytes(offset)); 203 } else { 204 #endif 205 if (options_.buffer_size > 0) { 206 TF_RETURN_IF_ERROR(input_stream_->SkipNBytes(offset)); 207 } 208 } 209 return Status::OK(); 210 } // namespace io 211 212 SequentialRecordReader::SequentialRecordReader( 213 RandomAccessFile* file, const RecordReaderOptions& options) 214 : underlying_(file, options), offset_(0) {} 215 216 } // namespace io 217 } // namespace tensorflow 218