1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/zlib_outputbuffer.h" 17 18 #include "tensorflow/core/lib/core/errors.h" 19 20 namespace tensorflow { 21 namespace io { 22 23 ZlibOutputBuffer::ZlibOutputBuffer( 24 WritableFile* file, 25 int32 input_buffer_bytes, // size of z_stream.next_in buffer 26 int32 output_buffer_bytes, 27 const ZlibCompressionOptions& 28 zlib_options) // size of z_stream.next_out buffer 29 : file_(file), 30 init_status_(), 31 input_buffer_capacity_(input_buffer_bytes), 32 output_buffer_capacity_(output_buffer_bytes), 33 z_stream_input_(new Bytef[input_buffer_bytes]), 34 z_stream_output_(new Bytef[output_buffer_bytes]), 35 zlib_options_(zlib_options), 36 z_stream_(new z_stream) {} 37 38 ZlibOutputBuffer::~ZlibOutputBuffer() { 39 if (z_stream_) { 40 LOG(WARNING) << "ZlibOutputBuffer::Close() not called. Possible data loss"; 41 } 42 } 43 44 Status ZlibOutputBuffer::Init() { 45 // Output buffer size should be greater than 1 because deflation needs atleast 46 // one byte for book keeping etc. 47 if (output_buffer_capacity_ <= 1) { 48 return errors::InvalidArgument( 49 "output_buffer_bytes should be greater than " 50 "1"); 51 } 52 memset(z_stream_.get(), 0, sizeof(z_stream)); 53 z_stream_->zalloc = Z_NULL; 54 z_stream_->zfree = Z_NULL; 55 z_stream_->opaque = Z_NULL; 56 int status = 57 deflateInit2(z_stream_.get(), zlib_options_.compression_level, 58 zlib_options_.compression_method, zlib_options_.window_bits, 59 zlib_options_.mem_level, zlib_options_.compression_strategy); 60 if (status != Z_OK) { 61 z_stream_.reset(nullptr); 62 return errors::InvalidArgument("deflateInit failed with status", status); 63 } 64 z_stream_->next_in = z_stream_input_.get(); 65 z_stream_->next_out = z_stream_output_.get(); 66 z_stream_->avail_in = 0; 67 z_stream_->avail_out = output_buffer_capacity_; 68 return Status::OK(); 69 } 70 71 int32 ZlibOutputBuffer::AvailableInputSpace() const { 72 return input_buffer_capacity_ - z_stream_->avail_in; 73 } 74 75 void ZlibOutputBuffer::AddToInputBuffer(StringPiece data) { 76 size_t bytes_to_write = data.size(); 77 CHECK_LE(bytes_to_write, AvailableInputSpace()); 78 79 // Input stream -> 80 // [....................input_buffer_capacity_...............] 81 // [<...read_bytes...><...avail_in...>......empty space......] 82 // ^ ^ 83 // | | 84 // z_stream_input_ next_in 85 // 86 // Data in the input stream is sharded as show above. z_stream_->next_in could 87 // be pointing to some byte in the buffer with avail_in number of bytes 88 // available to be read. 89 // 90 // In order to avoid shifting the avail_in bytes at next_in to the head of 91 // the buffer we try to fit `data` in the empty space at the tail of the 92 // input stream. 93 // TODO(srbs): This could be avoided if we had a circular buffer. 94 // If it doesn't fit we free the space at the head of the stream and then 95 // append `data` at the end of existing data. 96 97 int32 read_bytes = z_stream_->next_in - z_stream_input_.get(); 98 int32 unread_bytes = z_stream_->avail_in; 99 int32 free_tail_bytes = input_buffer_capacity_ - (read_bytes + unread_bytes); 100 101 if (bytes_to_write > free_tail_bytes) { 102 memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in); 103 z_stream_->next_in = z_stream_input_.get(); 104 } 105 memcpy(z_stream_->next_in + z_stream_->avail_in, data.data(), bytes_to_write); 106 z_stream_->avail_in += bytes_to_write; 107 } 108 109 Status ZlibOutputBuffer::DeflateBuffered(bool last) { 110 int flush_mode = last ? Z_FINISH : zlib_options_.flush_mode; 111 do { 112 // From zlib manual (http://www.zlib.net/manual.html): 113 // 114 // "In the case of a Z_FULL_FLUSH or Z_SYNC_FLUSH, make sure that 115 // avail_out is greater than six to avoid repeated flush markers due 116 // to avail_out == 0 on return." 117 // 118 // If above condition is met or if output buffer is full we flush contents 119 // to file. 120 if (z_stream_->avail_out == 0 || 121 (IsSyncOrFullFlush(flush_mode) && z_stream_->avail_out < 6)) { 122 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 123 } 124 TF_RETURN_IF_ERROR(Deflate(flush_mode)); 125 } while (z_stream_->avail_out == 0); 126 127 DCHECK(z_stream_->avail_in == 0); 128 z_stream_->next_in = z_stream_input_.get(); 129 return Status::OK(); 130 } 131 132 Status ZlibOutputBuffer::FlushOutputBufferToFile() { 133 uint32 bytes_to_write = output_buffer_capacity_ - z_stream_->avail_out; 134 if (bytes_to_write > 0) { 135 Status s = file_->Append(StringPiece( 136 reinterpret_cast<char*>(z_stream_output_.get()), bytes_to_write)); 137 if (s.ok()) { 138 z_stream_->next_out = z_stream_output_.get(); 139 z_stream_->avail_out = output_buffer_capacity_; 140 } 141 return s; 142 } 143 return Status::OK(); 144 } 145 146 Status ZlibOutputBuffer::Append(const StringPiece& data) { 147 // If there is sufficient free space in z_stream_input_ to fit data we 148 // add it there and return. 149 // If there isn't enough space we deflate the existing contents of 150 // z_input_stream_. If data now fits in z_input_stream_ we add it there 151 // else we directly deflate it. 152 // 153 // The deflated output is accumulated in z_stream_output_ and gets written to 154 // file as and when needed. 155 156 size_t bytes_to_write = data.size(); 157 158 if (bytes_to_write <= AvailableInputSpace()) { 159 AddToInputBuffer(data); 160 return Status::OK(); 161 } 162 163 TF_RETURN_IF_ERROR(DeflateBuffered()); 164 165 // At this point input stream should be empty. 166 if (bytes_to_write <= AvailableInputSpace()) { 167 AddToInputBuffer(data); 168 return Status::OK(); 169 } 170 171 // `data` is too large to fit in input buffer so we deflate it directly. 172 // Note that at this point we have already deflated all existing input so 173 // we do not need to backup next_in and avail_in. 174 z_stream_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data.data())); 175 z_stream_->avail_in = bytes_to_write; 176 177 do { 178 if (z_stream_->avail_out == 0) { 179 // No available output space. 180 // Write output buffer to file. 181 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 182 } 183 TF_RETURN_IF_ERROR(Deflate(zlib_options_.flush_mode)); 184 } while (z_stream_->avail_out == 0); 185 186 DCHECK(z_stream_->avail_in == 0); // All input will be used up. 187 188 // Restore z_stream input pointers. 189 z_stream_->next_in = z_stream_input_.get(); 190 191 return Status::OK(); 192 } 193 194 Status ZlibOutputBuffer::Flush() { 195 TF_RETURN_IF_ERROR(DeflateBuffered()); 196 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 197 return Status::OK(); 198 } 199 200 Status ZlibOutputBuffer::Sync() { 201 TF_RETURN_IF_ERROR(Flush()); 202 return file_->Sync(); 203 } 204 205 Status ZlibOutputBuffer::Close() { 206 TF_RETURN_IF_ERROR(DeflateBuffered(true)); 207 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 208 deflateEnd(z_stream_.get()); 209 z_stream_.reset(nullptr); 210 return Status::OK(); 211 } 212 213 Status ZlibOutputBuffer::Deflate(int flush) { 214 int error = deflate(z_stream_.get(), flush); 215 if (error == Z_OK || error == Z_BUF_ERROR || 216 (error == Z_STREAM_END && flush == Z_FINISH)) { 217 return Status::OK(); 218 } 219 string error_string = strings::StrCat("deflate() failed with error ", error); 220 if (z_stream_->msg != nullptr) { 221 strings::StrAppend(&error_string, ": ", z_stream_->msg); 222 } 223 return errors::DataLoss(error_string); 224 } 225 226 } // namespace io 227 } // namespace tensorflow 228