1 /* Copyright 2016 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/snappy/snappy_outputbuffer.h" 17 18 namespace tensorflow { 19 namespace io { 20 21 SnappyOutputBuffer::SnappyOutputBuffer(WritableFile* file, 22 int32 input_buffer_bytes, 23 int32 output_buffer_bytes) 24 : file_(file), 25 input_buffer_(new char[input_buffer_bytes]), 26 input_buffer_capacity_(input_buffer_bytes), 27 next_in_(input_buffer_.get()), 28 output_buffer_(new char[output_buffer_bytes]), 29 output_buffer_capacity_(output_buffer_bytes), 30 next_out_(output_buffer_.get()), 31 avail_out_(output_buffer_bytes) {} 32 33 Status SnappyOutputBuffer::Write(StringPiece data) { 34 // 35 // The deflated output is accumulated in output_buffer_ and gets written to 36 // file as and when needed. 37 38 size_t bytes_to_write = data.size(); 39 40 // If there is sufficient free space in input_buffer_ to fit data we 41 // add it there and return. 42 if (bytes_to_write <= AvailableInputSpace()) { 43 AddToInputBuffer(data); 44 return Status::OK(); 45 } 46 47 // If there isn't enough available space in the input_buffer_ we empty it 48 // by uncompressing its contents. If data now fits in input_buffer_ 49 // we add it there else we directly deflate it. 50 TF_RETURN_IF_ERROR(DeflateBuffered()); 51 52 // input_buffer_ should be empty at this point. 53 if (bytes_to_write <= AvailableInputSpace()) { 54 AddToInputBuffer(data); 55 return Status::OK(); 56 } 57 58 // `data` is too large to fit in input buffer so we deflate it directly. 59 // Note that at this point we have already deflated all existing input so 60 // we do not need to backup next_in and avail_in. 61 next_in_ = const_cast<char*>(data.data()); 62 avail_in_ = bytes_to_write; 63 64 TF_RETURN_IF_ERROR(Deflate()); 65 66 DCHECK(avail_in_ == 0); // All input will be used up. 67 68 next_in_ = input_buffer_.get(); 69 70 return Status::OK(); 71 } 72 73 Status SnappyOutputBuffer::Flush() { 74 TF_RETURN_IF_ERROR(DeflateBuffered()); 75 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 76 return Status::OK(); 77 } 78 79 int32 SnappyOutputBuffer::AvailableInputSpace() const { 80 return input_buffer_capacity_ - avail_in_; 81 } 82 83 void SnappyOutputBuffer::AddToInputBuffer(StringPiece data) { 84 size_t bytes_to_write = data.size(); 85 DCHECK_LE(bytes_to_write, AvailableInputSpace()); 86 87 // Input stream -> 88 // [....................input_buffer_capacity_...............] 89 // [<...read_bytes...><...avail_in...>......empty space......] 90 // ^ ^ 91 // | | 92 // input_buffer_ next_in 93 // 94 // Data in the input stream is sharded as shown above. next_in_ could 95 // be pointing to some byte in the buffer with avail_in number of bytes 96 // available to be read. 97 // 98 // In order to avoid shifting the avail_in bytes at next_in to the head of 99 // the buffer we try to fit `data` in the empty space at the tail of the 100 // input stream. 101 // TODO(srbs): This could be avoided if we had a circular buffer. 102 // If it doesn't fit we free the space at the head of the stream and then 103 // append `data` at the end of existing data. 104 105 const int32 read_bytes = next_in_ - input_buffer_.get(); 106 const int32 unread_bytes = avail_in_; 107 const int32 free_tail_bytes = 108 input_buffer_capacity_ - (read_bytes + unread_bytes); 109 110 if (bytes_to_write > free_tail_bytes) { 111 memmove(input_buffer_.get(), next_in_, avail_in_); 112 next_in_ = input_buffer_.get(); 113 } 114 memcpy(next_in_ + avail_in_, data.data(), bytes_to_write); 115 avail_in_ += bytes_to_write; 116 } 117 118 Status SnappyOutputBuffer::AddToOutputBuffer(const char* data, size_t length) { 119 while (length > 0) { 120 size_t bytes_to_copy = std::min(length, avail_out_); 121 memcpy(next_out_, data, bytes_to_copy); 122 data += bytes_to_copy; 123 next_out_ += bytes_to_copy; 124 avail_out_ -= bytes_to_copy; 125 length -= bytes_to_copy; 126 if (avail_out_ == 0) { 127 TF_RETURN_IF_ERROR(FlushOutputBufferToFile()); 128 } 129 } 130 return Status::OK(); 131 } 132 133 Status SnappyOutputBuffer::DeflateBuffered() { 134 TF_RETURN_IF_ERROR(Deflate()); 135 DCHECK(avail_in_ == 0); 136 next_in_ = input_buffer_.get(); 137 return Status::OK(); 138 } 139 140 Status SnappyOutputBuffer::FlushOutputBufferToFile() { 141 size_t bytes_to_write = output_buffer_capacity_ - avail_out_; 142 if (bytes_to_write > 0) { 143 Status s = file_->Append(StringPiece( 144 reinterpret_cast<char*>(output_buffer_.get()), bytes_to_write)); 145 if (s.ok()) { 146 next_out_ = output_buffer_.get(); 147 avail_out_ = output_buffer_capacity_; 148 } 149 return s; 150 } 151 return Status::OK(); 152 } 153 154 Status SnappyOutputBuffer::Deflate() { 155 if (avail_in_ == 0) { 156 return Status::OK(); 157 } 158 string output; 159 if (!port::Snappy_Compress(next_in_, avail_in_, &output)) { 160 return errors::DataLoss("Snappy_Compress failed"); 161 } 162 163 // Write length of compressed block to output buffer. 164 char compressed_length_array[4]; 165 std::fill(compressed_length_array, compressed_length_array + 4, 0); 166 for (int i = 0; i < 4; i++) { 167 // Little endian. 168 compressed_length_array[i] = output.size() >> (8 * (3 - i)); 169 } 170 TF_RETURN_IF_ERROR(AddToOutputBuffer(compressed_length_array, 4)); 171 172 // Write compressed output to buffer. 173 TF_RETURN_IF_ERROR(AddToOutputBuffer(output.data(), output.size())); 174 next_in_ += avail_in_; 175 avail_in_ = 0; 176 177 return Status::OK(); 178 } 179 180 } // namespace io 181 } // namespace tensorflow 182