Home | History | Annotate | Download | only in io
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/record_reader.h"
     17 
     18 #include <limits.h>
     19 
     20 #include "tensorflow/core/lib/core/coding.h"
     21 #include "tensorflow/core/lib/core/errors.h"
     22 #include "tensorflow/core/lib/hash/crc32c.h"
     23 #include "tensorflow/core/lib/io/buffered_inputstream.h"
     24 #include "tensorflow/core/lib/io/compression.h"
     25 #include "tensorflow/core/lib/io/random_inputstream.h"
     26 #include "tensorflow/core/platform/env.h"
     27 
     28 namespace tensorflow {
     29 namespace io {
     30 
     31 RecordReaderOptions RecordReaderOptions::CreateRecordReaderOptions(
     32     const string& compression_type) {
     33   RecordReaderOptions options;
     34   if (compression_type == "ZLIB") {
     35     options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION;
     36 #if defined(IS_SLIM_BUILD)
     37     LOG(ERROR) << "Compression is not supported but compression_type is set."
     38                << " No compression will be used.";
     39 #else
     40     options.zlib_options = io::ZlibCompressionOptions::DEFAULT();
     41 #endif  // IS_SLIM_BUILD
     42   } else if (compression_type == compression::kGzip) {
     43     options.compression_type = io::RecordReaderOptions::ZLIB_COMPRESSION;
     44 #if defined(IS_SLIM_BUILD)
     45     LOG(ERROR) << "Compression is not supported but compression_type is set."
     46                << " No compression will be used.";
     47 #else
     48     options.zlib_options = io::ZlibCompressionOptions::GZIP();
     49 #endif  // IS_SLIM_BUILD
     50   } else if (compression_type != compression::kNone) {
     51     LOG(ERROR) << "Unsupported compression_type:" << compression_type
     52                << ". No compression will be used.";
     53   }
     54   return options;
     55 }
     56 
     57 RecordReader::RecordReader(RandomAccessFile* file,
     58                            const RecordReaderOptions& options)
     59     : src_(file), options_(options) {
     60   if (options.buffer_size > 0) {
     61     input_stream_.reset(new BufferedInputStream(file, options.buffer_size));
     62   } else {
     63     input_stream_.reset(new RandomAccessInputStream(file));
     64   }
     65   if (options.compression_type == RecordReaderOptions::ZLIB_COMPRESSION) {
     66 // We don't have zlib available on all embedded platforms, so fail.
     67 #if defined(IS_SLIM_BUILD)
     68     LOG(FATAL) << "Zlib compression is unsupported on mobile platforms.";
     69 #else   // IS_SLIM_BUILD
     70     zlib_input_stream_.reset(new ZlibInputStream(
     71         input_stream_.get(), options.zlib_options.input_buffer_size,
     72         options.zlib_options.output_buffer_size, options.zlib_options));
     73 #endif  // IS_SLIM_BUILD
     74   } else if (options.compression_type == RecordReaderOptions::NONE) {
     75     // Nothing to do.
     76   } else {
     77     LOG(FATAL) << "Unspecified compression type :" << options.compression_type;
     78   }
     79 }
     80 
     81 // Read n+4 bytes from file, verify that checksum of first n bytes is
     82 // stored in the last 4 bytes and store the first n bytes in *result.
     83 // May use *storage as backing store.
     84 Status RecordReader::ReadChecksummed(uint64 offset, size_t n,
     85                                      StringPiece* result, string* storage) {
     86   if (n >= SIZE_MAX - sizeof(uint32)) {
     87     return errors::DataLoss("record size too large");
     88   }
     89 
     90   const size_t expected = n + sizeof(uint32);
     91   storage->resize(expected);
     92 
     93 #if !defined(IS_SLIM_BUILD)
     94   if (zlib_input_stream_) {
     95     // If we have a zlib compressed buffer, we assume that the
     96     // file is being read sequentially, and we use the underlying
     97     // implementation to read the data.
     98     //
     99     // No checks are done to validate that the file is being read
    100     // sequentially.  At some point the zlib input buffer may support
    101     // seeking, possibly inefficiently.
    102     TF_RETURN_IF_ERROR(zlib_input_stream_->ReadNBytes(expected, storage));
    103 
    104     if (storage->size() != expected) {
    105       if (storage->empty()) {
    106         return errors::OutOfRange("eof");
    107       } else {
    108         return errors::DataLoss("truncated record at ", offset);
    109       }
    110     }
    111 
    112     uint32 masked_crc = core::DecodeFixed32(storage->data() + n);
    113     if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) {
    114       return errors::DataLoss("corrupted record at ", offset);
    115     }
    116     *result = StringPiece(storage->data(), n);
    117   } else {
    118 #endif  // IS_SLIM_BUILD
    119     if (options_.buffer_size > 0) {
    120       // If we have a buffer, we assume that the file is being read
    121       // sequentially, and we use the underlying implementation to read the
    122       // data.
    123       //
    124       // No checks are done to validate that the file is being read
    125       // sequentially.
    126       TF_RETURN_IF_ERROR(input_stream_->ReadNBytes(expected, storage));
    127 
    128       if (storage->size() != expected) {
    129         if (storage->empty()) {
    130           return errors::OutOfRange("eof");
    131         } else {
    132           return errors::DataLoss("truncated record at ", offset);
    133         }
    134       }
    135 
    136       const uint32 masked_crc = core::DecodeFixed32(storage->data() + n);
    137       if (crc32c::Unmask(masked_crc) != crc32c::Value(storage->data(), n)) {
    138         return errors::DataLoss("corrupted record at ", offset);
    139       }
    140       *result = StringPiece(storage->data(), n);
    141     } else {
    142       // This version supports reading from arbitrary offsets
    143       // since we are accessing the random access file directly.
    144       StringPiece data;
    145       TF_RETURN_IF_ERROR(src_->Read(offset, expected, &data, &(*storage)[0]));
    146       if (data.size() != expected) {
    147         if (data.empty()) {
    148           return errors::OutOfRange("eof");
    149         } else {
    150           return errors::DataLoss("truncated record at ", offset);
    151         }
    152       }
    153       const uint32 masked_crc = core::DecodeFixed32(data.data() + n);
    154       if (crc32c::Unmask(masked_crc) != crc32c::Value(data.data(), n)) {
    155         return errors::DataLoss("corrupted record at ", offset);
    156       }
    157       *result = StringPiece(data.data(), n);
    158     }
    159 #if !defined(IS_SLIM_BUILD)
    160   }
    161 #endif  // IS_SLIM_BUILD
    162 
    163   return Status::OK();
    164 }
    165 
    166 Status RecordReader::ReadRecord(uint64* offset, string* record) {
    167   static const size_t kHeaderSize = sizeof(uint64) + sizeof(uint32);
    168   static const size_t kFooterSize = sizeof(uint32);
    169 
    170   // Read header data.
    171   StringPiece lbuf;
    172   Status s = ReadChecksummed(*offset, sizeof(uint64), &lbuf, record);
    173   if (!s.ok()) {
    174     return s;
    175   }
    176   const uint64 length = core::DecodeFixed64(lbuf.data());
    177 
    178   // Read data
    179   StringPiece data;
    180   s = ReadChecksummed(*offset + kHeaderSize, length, &data, record);
    181   if (!s.ok()) {
    182     if (errors::IsOutOfRange(s)) {
    183       s = errors::DataLoss("truncated record at ", *offset);
    184     }
    185     return s;
    186   }
    187 
    188   if (record->data() != data.data()) {
    189     // RandomAccessFile placed the data in some other location.
    190     memmove(&(*record)[0], data.data(), data.size());
    191   }
    192 
    193   record->resize(data.size());
    194 
    195   *offset += kHeaderSize + length + kFooterSize;
    196   return Status::OK();
    197 }
    198 
    199 Status RecordReader::SkipNBytes(uint64 offset) {
    200 #if !defined(IS_SLIM_BUILD)
    201   if (zlib_input_stream_) {
    202     TF_RETURN_IF_ERROR(zlib_input_stream_->SkipNBytes(offset));
    203   } else {
    204 #endif
    205     if (options_.buffer_size > 0) {
    206       TF_RETURN_IF_ERROR(input_stream_->SkipNBytes(offset));
    207     }
    208   }
    209   return Status::OK();
    210 }  // namespace io
    211 
    212 SequentialRecordReader::SequentialRecordReader(
    213     RandomAccessFile* file, const RecordReaderOptions& options)
    214     : underlying_(file, options), offset_(0) {}
    215 
    216 }  // namespace io
    217 }  // namespace tensorflow
    218