Home | History | Annotate | Download | only in snappy
      1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved.
      2 
      3 Licensed under the Apache License, Version 2.0 (the "License");
      4 you may not use this file except in compliance with the License.
      5 You may obtain a copy of the License at
      6 
      7     http://www.apache.org/licenses/LICENSE-2.0
      8 
      9 Unless required by applicable law or agreed to in writing, software
     10 distributed under the License is distributed on an "AS IS" BASIS,
     11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     12 See the License for the specific language governing permissions and
     13 limitations under the License.
     14 ==============================================================================*/
     15 
     16 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h"
     17 
     18 namespace tensorflow {
     19 namespace io {
     20 
     21 SnappyOutputBuffer::SnappyOutputBuffer(WritableFile* file,
     22                                        int32 input_buffer_bytes,
     23                                        int32 output_buffer_bytes)
     24     : file_(file),
     25       input_buffer_(new char[input_buffer_bytes]),
     26       input_buffer_capacity_(input_buffer_bytes),
     27       next_in_(input_buffer_.get()),
     28       output_buffer_(new char[output_buffer_bytes]),
     29       output_buffer_capacity_(output_buffer_bytes),
     30       next_out_(output_buffer_.get()),
     31       avail_out_(output_buffer_bytes) {}
     32 
     33 Status SnappyOutputBuffer::Write(StringPiece data) {
     34   //
     35   // The deflated output is accumulated in output_buffer_ and gets written to
     36   // file as and when needed.
     37 
     38   size_t bytes_to_write = data.size();
     39 
     40   // If there is sufficient free space in input_buffer_ to fit data we
     41   // add it there and return.
     42   if (bytes_to_write <= AvailableInputSpace()) {
     43     AddToInputBuffer(data);
     44     return Status::OK();
     45   }
     46 
     47   // If there isn't enough available space in the input_buffer_ we empty it
     48   // by uncompressing its contents. If data now fits in input_buffer_
     49   // we add it there else we directly deflate it.
     50   TF_RETURN_IF_ERROR(DeflateBuffered());
     51 
     52   // input_buffer_ should be empty at this point.
     53   if (bytes_to_write <= AvailableInputSpace()) {
     54     AddToInputBuffer(data);
     55     return Status::OK();
     56   }
     57 
     58   // `data` is too large to fit in input buffer so we deflate it directly.
     59   // Note that at this point we have already deflated all existing input so
     60   // we do not need to backup next_in and avail_in.
     61   next_in_ = const_cast<char*>(data.data());
     62   avail_in_ = bytes_to_write;
     63 
     64   TF_RETURN_IF_ERROR(Deflate());
     65 
     66   DCHECK(avail_in_ == 0);  // All input will be used up.
     67 
     68   next_in_ = input_buffer_.get();
     69 
     70   return Status::OK();
     71 }
     72 
     73 Status SnappyOutputBuffer::Flush() {
     74   TF_RETURN_IF_ERROR(DeflateBuffered());
     75   TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
     76   return Status::OK();
     77 }
     78 
     79 int32 SnappyOutputBuffer::AvailableInputSpace() const {
     80   return input_buffer_capacity_ - avail_in_;
     81 }
     82 
     83 void SnappyOutputBuffer::AddToInputBuffer(StringPiece data) {
     84   size_t bytes_to_write = data.size();
     85   DCHECK_LE(bytes_to_write, AvailableInputSpace());
     86 
     87   // Input stream ->
     88   // [....................input_buffer_capacity_...............]
     89   // [<...read_bytes...><...avail_in...>......empty space......]
     90   //  ^                 ^
     91   //  |                 |
     92   //  input_buffer_   next_in
     93   //
     94   // Data in the input stream is sharded as shown above. next_in_ could
     95   // be pointing to some byte in the buffer with avail_in number of bytes
     96   // available to be read.
     97   //
     98   // In order to avoid shifting the avail_in bytes at next_in to the head of
     99   // the buffer we try to fit `data` in the empty space at the tail of the
    100   // input stream.
    101   // TODO(srbs): This could be avoided if we had a circular buffer.
    102   // If it doesn't fit we free the space at the head of the stream and then
    103   // append `data` at the end of existing data.
    104 
    105   const int32 read_bytes = next_in_ - input_buffer_.get();
    106   const int32 unread_bytes = avail_in_;
    107   const int32 free_tail_bytes =
    108       input_buffer_capacity_ - (read_bytes + unread_bytes);
    109 
    110   if (bytes_to_write > free_tail_bytes) {
    111     memmove(input_buffer_.get(), next_in_, avail_in_);
    112     next_in_ = input_buffer_.get();
    113   }
    114   memcpy(next_in_ + avail_in_, data.data(), bytes_to_write);
    115   avail_in_ += bytes_to_write;
    116 }
    117 
    118 Status SnappyOutputBuffer::AddToOutputBuffer(const char* data, size_t length) {
    119   while (length > 0) {
    120     size_t bytes_to_copy = std::min(length, avail_out_);
    121     memcpy(next_out_, data, bytes_to_copy);
    122     data += bytes_to_copy;
    123     next_out_ += bytes_to_copy;
    124     avail_out_ -= bytes_to_copy;
    125     length -= bytes_to_copy;
    126     if (avail_out_ == 0) {
    127       TF_RETURN_IF_ERROR(FlushOutputBufferToFile());
    128     }
    129   }
    130   return Status::OK();
    131 }
    132 
    133 Status SnappyOutputBuffer::DeflateBuffered() {
    134   TF_RETURN_IF_ERROR(Deflate());
    135   DCHECK(avail_in_ == 0);
    136   next_in_ = input_buffer_.get();
    137   return Status::OK();
    138 }
    139 
    140 Status SnappyOutputBuffer::FlushOutputBufferToFile() {
    141   size_t bytes_to_write = output_buffer_capacity_ - avail_out_;
    142   if (bytes_to_write > 0) {
    143     Status s = file_->Append(StringPiece(
    144         reinterpret_cast<char*>(output_buffer_.get()), bytes_to_write));
    145     if (s.ok()) {
    146       next_out_ = output_buffer_.get();
    147       avail_out_ = output_buffer_capacity_;
    148     }
    149     return s;
    150   }
    151   return Status::OK();
    152 }
    153 
    154 Status SnappyOutputBuffer::Deflate() {
    155   if (avail_in_ == 0) {
    156     return Status::OK();
    157   }
    158   string output;
    159   if (!port::Snappy_Compress(next_in_, avail_in_, &output)) {
    160     return errors::DataLoss("Snappy_Compress failed");
    161   }
    162 
    163   // Write length of compressed block to output buffer.
    164   char compressed_length_array[4];
    165   std::fill(compressed_length_array, compressed_length_array + 4, 0);
    166   for (int i = 0; i < 4; i++) {
    167     // Little endian.
    168     compressed_length_array[i] = output.size() >> (8 * (3 - i));
    169   }
    170   TF_RETURN_IF_ERROR(AddToOutputBuffer(compressed_length_array, 4));
    171 
    172   // Write compressed output to buffer.
    173   TF_RETURN_IF_ERROR(AddToOutputBuffer(output.data(), output.size()));
    174   next_in_ += avail_in_;
    175   avail_in_ = 0;
    176 
    177   return Status::OK();
    178 }
    179 
    180 }  // namespace io
    181 }  // namespace tensorflow
    182