Home | History | Annotate | Download | only in io
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/zlib_outputbuffer.h"
     17 
     18 #include "tensorflow/core/lib/core/errors.h"
     19 
     20 namespace tensorflow {
     21 namespace io {
     22 
     23 ZlibOutputBuffer::ZlibOutputBuffer(
     24     WritableFile* file,
     25     int32 input_buffer_bytes,  // size of z_stream.next_in buffer
     26     int32 output_buffer_bytes,
     27     const ZlibCompressionOptions&
     28         zlib_options)  // size of z_stream.next_out buffer
     29     : file_(file),
     30       init_status_(),
     31       input_buffer_capacity_(input_buffer_bytes),
     32       output_buffer_capacity_(output_buffer_bytes),
     33       z_stream_input_(new Bytef[input_buffer_bytes]),
     34       z_stream_output_(new Bytef[output_buffer_bytes]),
     35       zlib_options_(zlib_options),
     36       z_stream_(new z_stream) {}
     37 
     38 ZlibOutputBuffer::~ZlibOutputBuffer() {
     39   if (z_stream_) {
     40     LOG(WARNING) << "ZlibOutputBuffer::Close() not called. Possible data loss";
     41   }
     42 }
     43 
     44 Status ZlibOutputBuffer::Init() {
     45   // Output buffer size should be greater than 1 because deflation needs atleast
     46   // one byte for book keeping etc.
     47   if (output_buffer_capacity_ <= 1) {
     48     return errors::InvalidArgument(
     49         "output_buffer_bytes should be greater than "
     50         "1");
     51   }
     52   memset(z_stream_.get(), 0, sizeof(z_stream));
     53   z_stream_->zalloc = Z_NULL;
     54   z_stream_->zfree = Z_NULL;
     55   z_stream_->opaque = Z_NULL;
     56   int status =
     57       deflateInit2(z_stream_.get(), zlib_options_.compression_level,
     58                    zlib_options_.compression_method, zlib_options_.window_bits,
     59                    zlib_options_.mem_level, zlib_options_.compression_strategy);
     60   if (status != Z_OK) {
     61     z_stream_.reset(nullptr);
     62     return errors::InvalidArgument("deflateInit failed with status", status);
     63   }
     64   z_stream_->next_in = z_stream_input_.get();
     65   z_stream_->next_out = z_stream_output_.get();
     66   z_stream_->avail_in = 0;
     67   z_stream_->avail_out = output_buffer_capacity_;
     68   return Status::OK();
     69 }
     70 
     71 int32 ZlibOutputBuffer::AvailableInputSpace() const {
     72   return input_buffer_capacity_ - z_stream_->avail_in;
     73 }
     74 
     75 void ZlibOutputBuffer::AddToInputBuffer(StringPiece data) {
     76   size_t bytes_to_write = data.size();
     77   CHECK_LE(bytes_to_write, AvailableInputSpace());
     78 
     79   // Input stream ->
     80   // [....................input_buffer_capacity_...............]
     81   // [<...read_bytes...><...avail_in...>......empty space......]
     82   //  ^                 ^
     83   //  |                 |
     84   //  z_stream_input_   next_in
     85   //
     86   // Data in the input stream is sharded as show above. z_stream_->next_in could
     87   // be pointing to some byte in the buffer with avail_in number of bytes
     88   // available to be read.
     89   //
     90   // In order to avoid shifting the avail_in bytes at next_in to the head of
     91   // the buffer we try to fit `data` in the empty space at the tail of the
     92   // input stream.
     93   // TODO(srbs): This could be avoided if we had a circular buffer.
     94   // If it doesn't fit we free the space at the head of the stream and then
     95   // append `data` at the end of existing data.
     96 
     97   int32 read_bytes = z_stream_->next_in - z_stream_input_.get();
     98   int32 unread_bytes = z_stream_->avail_in;
     99   int32 free_tail_bytes = input_buffer_capacity_ - (read_bytes + unread_bytes);
    100 
    101   if (bytes_to_write > free_tail_bytes) {
    102     memmove(z_stream_input_.get(), z_stream_->next_in, z_stream_->avail_in);
    103     z_stream_->next_in = z_stream_input_.get();
    104   }
    105   memcpy(z_stream_->next_in + z_stream_->avail_in, data.data(), bytes_to_write);
    106   z_stream_->avail_in += bytes_to_write;
    107 }
    108 
    109 Status ZlibOutputBuffer::DeflateBuffered(bool last) {
    110   int flush_mode = last ? Z_FINISH : zlib_options_.flush_mode;
    111   do {
    112     // From zlib manual (http://www.zlib.net/manual.html):
    113     //
    114     // "In the case of a Z_FULL_FLUSH or Z_SYNC_FLUSH, make sure that
    115     // avail_out is greater than six to avoid repeated flush markers due
    116     // to avail_out == 0 on return."
    117     //
    118     // If above condition is met or if output buffer is full we flush contents
    119     // to file.
    120     if (z_stream_->avail_out == 0 ||
    121         (IsSyncOrFullFlush(flush_mode) && z_stream_->avail_out < 6)) {
    122       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    123     }
    124     TF_RETURN_IF_ERROR(Deflate(flush_mode));
    125   } while (z_stream_->avail_out == 0);
    126 
    127   DCHECK(z_stream_->avail_in == 0);
    128   z_stream_->next_in = z_stream_input_.get();
    129   return Status::OK();
    130 }
    131 
    132 Status ZlibOutputBuffer::FlushOutputBufferToFile() {
    133   uint32 bytes_to_write = output_buffer_capacity_ - z_stream_->avail_out;
    134   if (bytes_to_write > 0) {
    135     Status s = file_->Append(StringPiece(
    136         reinterpret_cast<char*>(z_stream_output_.get()), bytes_to_write));
    137     if (s.ok()) {
    138       z_stream_->next_out = z_stream_output_.get();
    139       z_stream_->avail_out = output_buffer_capacity_;
    140     }
    141     return s;
    142   }
    143   return Status::OK();
    144 }
    145 
    146 Status ZlibOutputBuffer::Append(const StringPiece& data) {
    147   // If there is sufficient free space in z_stream_input_ to fit data we
    148   // add it there and return.
    149   // If there isn't enough space we deflate the existing contents of
    150   // z_input_stream_. If data now fits in z_input_stream_ we add it there
    151   // else we directly deflate it.
    152   //
    153   // The deflated output is accumulated in z_stream_output_ and gets written to
    154   // file as and when needed.
    155 
    156   size_t bytes_to_write = data.size();
    157 
    158   if (bytes_to_write <= AvailableInputSpace()) {
    159     AddToInputBuffer(data);
    160     return Status::OK();
    161   }
    162 
    163   TF_RETURN_IF_ERROR(DeflateBuffered());
    164 
    165   // At this point input stream should be empty.
    166   if (bytes_to_write <= AvailableInputSpace()) {
    167     AddToInputBuffer(data);
    168     return Status::OK();
    169   }
    170 
    171   // `data` is too large to fit in input buffer so we deflate it directly.
    172   // Note that at this point we have already deflated all existing input so
    173   // we do not need to backup next_in and avail_in.
    174   z_stream_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data.data()));
    175   z_stream_->avail_in = bytes_to_write;
    176 
    177   do {
    178     if (z_stream_->avail_out == 0) {
    179       // No available output space.
    180       // Write output buffer to file.
    181       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    182     }
    183     TF_RETURN_IF_ERROR(Deflate(zlib_options_.flush_mode));
    184   } while (z_stream_->avail_out == 0);
    185 
    186   DCHECK(z_stream_->avail_in == 0);  // All input will be used up.
    187 
    188   // Restore z_stream input pointers.
    189   z_stream_->next_in = z_stream_input_.get();
    190 
    191   return Status::OK();
    192 }
    193 
    194 Status ZlibOutputBuffer::Flush() {
    195   TF_RETURN_IF_ERROR(DeflateBuffered());
    196   TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    197   return Status::OK();
    198 }
    199 
    200 Status ZlibOutputBuffer::Sync() {
    201   TF_RETURN_IF_ERROR(Flush());
    202   return file_->Sync();
    203 }
    204 
    205 Status ZlibOutputBuffer::Close() {
    206   TF_RETURN_IF_ERROR(DeflateBuffered(true));
    207   TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    208   deflateEnd(z_stream_.get());
    209   z_stream_.reset(nullptr);
    210   return Status::OK();
    211 }
    212 
    213 Status ZlibOutputBuffer::Deflate(int flush) {
    214   int error = deflate(z_stream_.get(), flush);
    215   if (error == Z_OK || error == Z_BUF_ERROR ||
    216       (error == Z_STREAM_END && flush == Z_FINISH)) {
    217     return Status::OK();
    218   }
    219   string error_string = strings::StrCat("deflate() failed with error ", error);
    220   if (z_stream_->msg != nullptr) {
    221     strings::StrAppend(&error_string, ": ", z_stream_->msg);
    222   }
    223   return errors::DataLoss(error_string);
    224 }
    225 
    226 }  // namespace io
    227 }  // namespace tensorflow
    228