Home | History | Annotate | Download | only in io
      1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #ifndef TENSORFLOW_LIB_IO_RECORD_READER_H_
     17 #define TENSORFLOW_LIB_IO_RECORD_READER_H_
     18 
     19 #include "tensorflow/core/lib/core/status.h"
     20 #include "tensorflow/core/lib/core/stringpiece.h"
     21 #if !defined(IS_SLIM_BUILD)
     22 #include "tensorflow/core/lib/io/inputstream_interface.h"
     23 #include "tensorflow/core/lib/io/zlib_compression_options.h"
     24 #include "tensorflow/core/lib/io/zlib_inputstream.h"
     25 #endif  // IS_SLIM_BUILD
     26 #include "tensorflow/core/platform/macros.h"
     27 #include "tensorflow/core/platform/types.h"
     28 
     29 namespace tensorflow {
     30 
     31 class RandomAccessFile;
     32 
     33 namespace io {
     34 
     35 class RecordReaderOptions {
     36  public:
     37   enum CompressionType { NONE = 0, ZLIB_COMPRESSION = 1 };
     38   CompressionType compression_type = NONE;
     39 
     40   // If buffer_size is non-zero, then all reads must be sequential, and no
     41   // skipping around is permitted. (Note: this is the same behavior as reading
     42   // compressed files.) Consider using SequentialRecordReader.
     43   int64 buffer_size = 0;
     44 
     45   static RecordReaderOptions CreateRecordReaderOptions(
     46       const string& compression_type);
     47 
     48 #if !defined(IS_SLIM_BUILD)
     49   // Options specific to zlib compression.
     50   ZlibCompressionOptions zlib_options;
     51 #endif  // IS_SLIM_BUILD
     52 };
     53 
     54 // Low-level interface to read TFRecord files.
     55 //
     56 // If using compression or buffering, consider using SequentialRecordReader.
     57 //
     58 // Note: this class is not thread safe; external synchronization required.
     59 class RecordReader {
     60  public:
     61   // Create a reader that will return log records from "*file".
     62   // "*file" must remain live while this Reader is in use.
     63   explicit RecordReader(
     64       RandomAccessFile* file,
     65       const RecordReaderOptions& options = RecordReaderOptions());
     66 
     67   virtual ~RecordReader() = default;
     68 
     69   // Read the record at "*offset" into *record and update *offset to
     70   // point to the offset of the next record.  Returns OK on success,
     71   // OUT_OF_RANGE for end of file, or something else for an error.
     72   //
     73   // Note: if buffering is used (with or without compression), access must be
     74   // sequential.
     75   Status ReadRecord(uint64* offset, string* record);
     76 
     77   // Skip the records till "offset". Returns OK on success,
     78   // OUT_OF_RANGE for end of file, or something else for an error.
     79   Status SkipNBytes(uint64 offset);
     80 
     81  private:
     82   Status ReadChecksummed(uint64 offset, size_t n, StringPiece* result,
     83                          string* storage);
     84 
     85   RandomAccessFile* src_;
     86   RecordReaderOptions options_;
     87   std::unique_ptr<InputStreamInterface> input_stream_;
     88 #if !defined(IS_SLIM_BUILD)
     89   std::unique_ptr<ZlibInputStream> zlib_input_stream_;
     90 #endif  // IS_SLIM_BUILD
     91 
     92   TF_DISALLOW_COPY_AND_ASSIGN(RecordReader);
     93 };
     94 
     95 // High-level interface to read TFRecord files.
     96 //
     97 // Note: this class is not thread safe; external synchronization required.
     98 class SequentialRecordReader {
     99  public:
    100   // Create a reader that will return log records from "*file".
    101   // "*file" must remain live while this Reader is in use.
    102   explicit SequentialRecordReader(
    103       RandomAccessFile* file,
    104       const RecordReaderOptions& options = RecordReaderOptions());
    105 
    106   virtual ~SequentialRecordReader() = default;
    107 
    108   // Reads the next record in the file into *record. Returns OK on success,
    109   // OUT_OF_RANGE for end of file, or something else for an error.
    110   Status ReadRecord(string* record) {
    111     return underlying_.ReadRecord(&offset_, record);
    112   }
    113 
    114   // Returns the current offset in the file.
    115   uint64 TellOffset() { return offset_; }
    116 
    117   // Seek to this offset within the file and set this offset as the current
    118   // offset. Trying to seek backward will throw error.
    119   Status SeekOffset(uint64 offset) {
    120     if (offset < offset_)
    121       return errors::InvalidArgument(
    122           "Trying to seek offset: ", offset,
    123           " which is less than the current offset: ", offset_);
    124     TF_RETURN_IF_ERROR(underlying_.SkipNBytes(offset - offset_));
    125     offset_ = offset;
    126     return Status::OK();
    127   }
    128 
    129  private:
    130   RecordReader underlying_;
    131   uint64 offset_ = 0;
    132 };
    133 
    134 }  // namespace io
    135 }  // namespace tensorflow
    136 
    137 #endif  // TENSORFLOW_LIB_IO_RECORD_READER_H_
    138