Home | History | Annotate | Download | only in io
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/zlib_inputstream.h"
     17 
     18 #include "tensorflow/core/lib/strings/strcat.h"
     19 #include "tensorflow/core/platform/logging.h"
     20 
     21 namespace tensorflow {
     22 namespace io {
     23 
     24 ZlibInputStream::ZlibInputStream(
     25     InputStreamInterface* input_stream,
     26     size_t input_buffer_bytes,   // size of z_stream.next_in buffer
     27     size_t output_buffer_bytes,  // size of z_stream.next_out buffer
     28     const ZlibCompressionOptions& zlib_options)
     29     : input_stream_(input_stream),
     30       input_buffer_capacity_(input_buffer_bytes),
     31       output_buffer_capacity_(output_buffer_bytes),
     32       z_stream_input_(new Bytef[input_buffer_capacity_]),
     33       z_stream_output_(new Bytef[output_buffer_capacity_]),
     34       zlib_options_(zlib_options),
     35       z_stream_(new z_stream),
     36       bytes_read_(0) {
     37   InitZlibBuffer();
     38 }
     39 
     40 ZlibInputStream::~ZlibInputStream() {
     41   if (z_stream_) {
     42     inflateEnd(z_stream_.get());
     43   }
     44 }
     45 
     46 Status ZlibInputStream::Reset() {
     47   TF_RETURN_IF_ERROR(input_stream_->Reset());
     48   InitZlibBuffer();
     49   bytes_read_ = 0;
     50   return Status::OK();
     51 }
     52 
     53 void ZlibInputStream::InitZlibBuffer() {
     54   memset(z_stream_.get(), 0, sizeof(z_stream));
     55 
     56   z_stream_->zalloc = Z_NULL;
     57   z_stream_->zfree = Z_NULL;
     58   z_stream_->opaque = Z_NULL;
     59   z_stream_->next_in = Z_NULL;
     60   z_stream_->avail_in = 0;
     61 
     62   int status = inflateInit2(z_stream_.get(), zlib_options_.window_bits);
     63 
     64   CHECK_EQ(status, Z_OK) << "inflateInit failed with status " << status;
     65 
     66   z_stream_->next_in = z_stream_input_.get();
     67   z_stream_->next_out = z_stream_output_.get();
     68   next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get());
     69   z_stream_->avail_in = 0;
     70   z_stream_->avail_out = output_buffer_capacity_;
     71 }
     72 
     73 Status ZlibInputStream::ReadFromStream() {
     74   int bytes_to_read = input_buffer_capacity_;
     75   char* read_location = reinterpret_cast<char*>(z_stream_input_.get());
     76 
     77   // If there are unread bytes in the input stream we move them to the head
     78   // of the stream to maximize the space available to read new data into.
     79   if (z_stream_->avail_in > 0) {
     80     uLong read_bytes = z_stream_->next_in - z_stream_input_.get();
     81     // Remove `read_bytes` from the head of the input stream.
     82     // Move unread bytes to the head of the input stream.
     83     if (read_bytes > 0) {
     84       memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in);
     85     }
     86 
     87     bytes_to_read -= z_stream_->avail_in;
     88     read_location += z_stream_->avail_in;
     89   }
     90   string data;
     91   // Try to read enough data to fill up z_stream_input_.
     92   // TODO(rohanj): Add a char* version of ReadNBytes to InputStreamInterface
     93   // and use that instead to make this more efficient.
     94   Status s = input_stream_->ReadNBytes(bytes_to_read, &data);
     95   memcpy(read_location, data.data(), data.size());
     96 
     97   // Since we moved unread data to the head of the input stream we can point
     98   // next_in to the head of the input stream.
     99   z_stream_->next_in = z_stream_input_.get();
    100 
    101   // Note: data.size() could be different from bytes_to_read.
    102   z_stream_->avail_in += data.size();
    103 
    104   if (!s.ok() && !errors::IsOutOfRange(s)) {
    105     return s;
    106   }
    107 
    108   // We throw OutOfRange error iff no new data has been read from stream.
    109   // Since we never check how much data is remaining in the stream, it is
    110   // possible that on the last read there isn't enough data in the stream to
    111   // fill up the buffer in which case input_stream_->ReadNBytes would return an
    112   // OutOfRange error.
    113   if (data.empty()) {
    114     return errors::OutOfRange("EOF reached");
    115   }
    116   if (errors::IsOutOfRange(s)) {
    117     return Status::OK();
    118   }
    119 
    120   return s;
    121 }
    122 
    123 size_t ZlibInputStream::ReadBytesFromCache(size_t bytes_to_read,
    124                                            string* result) {
    125   size_t unread_bytes =
    126       reinterpret_cast<char*>(z_stream_->next_out) - next_unread_byte_;
    127   size_t can_read_bytes = std::min(bytes_to_read, unread_bytes);
    128   if (can_read_bytes > 0) {
    129     result->append(next_unread_byte_, can_read_bytes);
    130     next_unread_byte_ += can_read_bytes;
    131   }
    132   bytes_read_ += can_read_bytes;
    133   return can_read_bytes;
    134 }
    135 
    136 size_t ZlibInputStream::NumUnreadBytes() const {
    137   size_t read_bytes =
    138       next_unread_byte_ - reinterpret_cast<char*>(z_stream_output_.get());
    139   return output_buffer_capacity_ - z_stream_->avail_out - read_bytes;
    140 }
    141 
    142 Status ZlibInputStream::ReadNBytes(int64 bytes_to_read, string* result) {
    143   result->clear();
    144   // Read as many bytes as possible from cache.
    145   bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
    146 
    147   while (bytes_to_read > 0) {
    148     // At this point we can be sure that cache has been emptied.
    149     DCHECK_EQ(NumUnreadBytes(), 0);
    150 
    151     // Now that the cache is empty we need to inflate more data.
    152 
    153     // Step 1. Fill up input buffer.
    154     // We read from stream only after the previously read contents have been
    155     // completely consumed. This is an optimization and can be removed if
    156     // it causes problems. `ReadFromStream` is capable of handling partially
    157     // filled up buffers.
    158     if (z_stream_->avail_in == 0) {
    159       TF_RETURN_IF_ERROR(ReadFromStream());
    160     }
    161 
    162     // Step 2. Setup output stream.
    163     z_stream_->next_out = z_stream_output_.get();
    164     next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get());
    165     z_stream_->avail_out = output_buffer_capacity_;
    166 
    167     // Step 3. Inflate Inflate Inflate!
    168     TF_RETURN_IF_ERROR(Inflate());
    169 
    170     bytes_to_read -= ReadBytesFromCache(bytes_to_read, result);
    171   }
    172 
    173   return Status::OK();
    174 }
    175 
    176 int64 ZlibInputStream::Tell() const { return bytes_read_; }
    177 
    178 Status ZlibInputStream::Inflate() {
    179   int error = inflate(z_stream_.get(), zlib_options_.flush_mode);
    180   if (error != Z_OK && error != Z_STREAM_END) {
    181     string error_string =
    182         strings::StrCat("inflate() failed with error ", error);
    183     if (z_stream_->msg != nullptr) {
    184       strings::StrAppend(&error_string, ": ", z_stream_->msg);
    185     }
    186     return errors::DataLoss(error_string);
    187   }
    188   return Status::OK();
    189 }
    190 
    191 }  // namespace io
    192 }  // namespace tensorflow
    193