1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/zlib_inputstream.h" 17 18 #include "tensorflow/core/lib/strings/strcat.h" 19 #include "tensorflow/core/platform/logging.h" 20 21 namespace tensorflow { 22 namespace io { 23 24 ZlibInputStream::ZlibInputStream( 25 InputStreamInterface* input_stream, 26 size_t input_buffer_bytes, // size of z_stream.next_in buffer 27 size_t output_buffer_bytes, // size of z_stream.next_out buffer 28 const ZlibCompressionOptions& zlib_options) 29 : input_stream_(input_stream), 30 input_buffer_capacity_(input_buffer_bytes), 31 output_buffer_capacity_(output_buffer_bytes), 32 z_stream_input_(new Bytef[input_buffer_capacity_]), 33 z_stream_output_(new Bytef[output_buffer_capacity_]), 34 zlib_options_(zlib_options), 35 z_stream_(new z_stream), 36 bytes_read_(0) { 37 InitZlibBuffer(); 38 } 39 40 ZlibInputStream::~ZlibInputStream() { 41 if (z_stream_) { 42 inflateEnd(z_stream_.get()); 43 } 44 } 45 46 Status ZlibInputStream::Reset() { 47 TF_RETURN_IF_ERROR(input_stream_->Reset()); 48 InitZlibBuffer(); 49 bytes_read_ = 0; 50 return Status::OK(); 51 } 52 53 void ZlibInputStream::InitZlibBuffer() { 54 memset(z_stream_.get(), 0, sizeof(z_stream)); 55 56 z_stream_->zalloc = Z_NULL; 57 z_stream_->zfree = Z_NULL; 58 z_stream_->opaque = Z_NULL; 59 z_stream_->next_in = Z_NULL; 60 z_stream_->avail_in = 0; 61 62 int status = inflateInit2(z_stream_.get(), zlib_options_.window_bits); 63 64 CHECK_EQ(status, Z_OK) << "inflateInit failed with status " << status; 65 66 z_stream_->next_in = z_stream_input_.get(); 67 z_stream_->next_out = z_stream_output_.get(); 68 next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get()); 69 z_stream_->avail_in = 0; 70 z_stream_->avail_out = output_buffer_capacity_; 71 } 72 73 Status ZlibInputStream::ReadFromStream() { 74 int bytes_to_read = input_buffer_capacity_; 75 char* read_location = reinterpret_cast<char*>(z_stream_input_.get()); 76 77 // If there are unread bytes in the input stream we move them to the head 78 // of the stream to maximize the space available to read new data into. 79 if (z_stream_->avail_in > 0) { 80 uLong read_bytes = z_stream_->next_in - z_stream_input_.get(); 81 // Remove `read_bytes` from the head of the input stream. 82 // Move unread bytes to the head of the input stream. 83 if (read_bytes > 0) { 84 memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in); 85 } 86 87 bytes_to_read -= z_stream_->avail_in; 88 read_location += z_stream_->avail_in; 89 } 90 string data; 91 // Try to read enough data to fill up z_stream_input_. 92 // TODO(rohanj): Add a char* version of ReadNBytes to InputStreamInterface 93 // and use that instead to make this more efficient. 94 Status s = input_stream_->ReadNBytes(bytes_to_read, &data); 95 memcpy(read_location, data.data(), data.size()); 96 97 // Since we moved unread data to the head of the input stream we can point 98 // next_in to the head of the input stream. 99 z_stream_->next_in = z_stream_input_.get(); 100 101 // Note: data.size() could be different from bytes_to_read. 102 z_stream_->avail_in += data.size(); 103 104 if (!s.ok() && !errors::IsOutOfRange(s)) { 105 return s; 106 } 107 108 // We throw OutOfRange error iff no new data has been read from stream. 109 // Since we never check how much data is remaining in the stream, it is 110 // possible that on the last read there isn't enough data in the stream to 111 // fill up the buffer in which case input_stream_->ReadNBytes would return an 112 // OutOfRange error. 113 if (data.empty()) { 114 return errors::OutOfRange("EOF reached"); 115 } 116 if (errors::IsOutOfRange(s)) { 117 return Status::OK(); 118 } 119 120 return s; 121 } 122 123 size_t ZlibInputStream::ReadBytesFromCache(size_t bytes_to_read, 124 string* result) { 125 size_t unread_bytes = 126 reinterpret_cast<char*>(z_stream_->next_out) - next_unread_byte_; 127 size_t can_read_bytes = std::min(bytes_to_read, unread_bytes); 128 if (can_read_bytes > 0) { 129 result->append(next_unread_byte_, can_read_bytes); 130 next_unread_byte_ += can_read_bytes; 131 } 132 bytes_read_ += can_read_bytes; 133 return can_read_bytes; 134 } 135 136 size_t ZlibInputStream::NumUnreadBytes() const { 137 size_t read_bytes = 138 next_unread_byte_ - reinterpret_cast<char*>(z_stream_output_.get()); 139 return output_buffer_capacity_ - z_stream_->avail_out - read_bytes; 140 } 141 142 Status ZlibInputStream::ReadNBytes(int64 bytes_to_read, string* result) { 143 result->clear(); 144 // Read as many bytes as possible from cache. 145 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result); 146 147 while (bytes_to_read > 0) { 148 // At this point we can be sure that cache has been emptied. 149 DCHECK_EQ(NumUnreadBytes(), 0); 150 151 // Now that the cache is empty we need to inflate more data. 152 153 // Step 1. Fill up input buffer. 154 // We read from stream only after the previously read contents have been 155 // completely consumed. This is an optimization and can be removed if 156 // it causes problems. `ReadFromStream` is capable of handling partially 157 // filled up buffers. 158 if (z_stream_->avail_in == 0) { 159 TF_RETURN_IF_ERROR(ReadFromStream()); 160 } 161 162 // Step 2. Setup output stream. 163 z_stream_->next_out = z_stream_output_.get(); 164 next_unread_byte_ = reinterpret_cast<char*>(z_stream_output_.get()); 165 z_stream_->avail_out = output_buffer_capacity_; 166 167 // Step 3. Inflate Inflate Inflate! 168 TF_RETURN_IF_ERROR(Inflate()); 169 170 bytes_to_read -= ReadBytesFromCache(bytes_to_read, result); 171 } 172 173 return Status::OK(); 174 } 175 176 int64 ZlibInputStream::Tell() const { return bytes_read_; } 177 178 Status ZlibInputStream::Inflate() { 179 int error = inflate(z_stream_.get(), zlib_options_.flush_mode); 180 if (error != Z_OK && error != Z_STREAM_END) { 181 string error_string = 182 strings::StrCat("inflate() failed with error ", error); 183 if (z_stream_->msg != nullptr) { 184 strings::StrAppend(&error_string, ": ", z_stream_->msg); 185 } 186 return errors::DataLoss(error_string); 187 } 188 return Status::OK(); 189 } 190 191 } // namespace io 192 } // namespace tensorflow 193