1 /* Copyright 2015 The TensorFlow Authors. All Rights Reserved. 2 3 Licensed under the Apache License, Version 2.0 (the "License"); 4 you may not use this file except in compliance with the License. 5 You may obtain a copy of the License at 6 7 http://www.apache.org/licenses/LICENSE-2.0 8 9 Unless required by applicable law or agreed to in writing, software 10 distributed under the License is distributed on an "AS IS" BASIS, 11 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 See the License for the specific language governing permissions and 13 limitations under the License. 14 ==============================================================================*/ 15 16 #include "tensorflow/core/lib/io/record_writer.h" 17 18 #include "tensorflow/core/lib/core/coding.h" 19 #include "tensorflow/core/lib/hash/crc32c.h" 20 #include "tensorflow/core/lib/io/compression.h" 21 #include "tensorflow/core/platform/env.h" 22 23 namespace tensorflow { 24 namespace io { 25 namespace { 26 bool IsZlibCompressed(RecordWriterOptions options) { 27 return options.compression_type == RecordWriterOptions::ZLIB_COMPRESSION; 28 } 29 } // namespace 30 31 RecordWriterOptions RecordWriterOptions::CreateRecordWriterOptions( 32 const string& compression_type) { 33 RecordWriterOptions options; 34 if (compression_type == "ZLIB") { 35 options.compression_type = io::RecordWriterOptions::ZLIB_COMPRESSION; 36 #if defined(IS_SLIM_BUILD) 37 LOG(ERROR) << "Compression is not supported but compression_type is set." 38 << " No compression will be used."; 39 #else 40 options.zlib_options = io::ZlibCompressionOptions::DEFAULT(); 41 #endif // IS_SLIM_BUILD 42 } else if (compression_type == compression::kGzip) { 43 options.compression_type = io::RecordWriterOptions::ZLIB_COMPRESSION; 44 #if defined(IS_SLIM_BUILD) 45 LOG(ERROR) << "Compression is not supported but compression_type is set." 46 << " No compression will be used."; 47 #else 48 options.zlib_options = io::ZlibCompressionOptions::GZIP(); 49 #endif // IS_SLIM_BUILD 50 } else if (compression_type != compression::kNone) { 51 LOG(ERROR) << "Unsupported compression_type:" << compression_type 52 << ". No compression will be used."; 53 } 54 return options; 55 } 56 57 RecordWriter::RecordWriter(WritableFile* dest, 58 const RecordWriterOptions& options) 59 : dest_(dest), options_(options) { 60 if (IsZlibCompressed(options)) { 61 // We don't have zlib available on all embedded platforms, so fail. 62 #if defined(IS_SLIM_BUILD) 63 LOG(FATAL) << "Zlib compression is unsupported on mobile platforms."; 64 #else // IS_SLIM_BUILD 65 ZlibOutputBuffer* zlib_output_buffer = new ZlibOutputBuffer( 66 dest, options.zlib_options.input_buffer_size, 67 options.zlib_options.output_buffer_size, options.zlib_options); 68 Status s = zlib_output_buffer->Init(); 69 if (!s.ok()) { 70 LOG(FATAL) << "Failed to initialize Zlib inputbuffer. Error: " 71 << s.ToString(); 72 } 73 dest_ = zlib_output_buffer; 74 #endif // IS_SLIM_BUILD 75 } else if (options.compression_type == RecordWriterOptions::NONE) { 76 // Nothing to do 77 } else { 78 LOG(FATAL) << "Unspecified compression type :" << options.compression_type; 79 } 80 } 81 82 RecordWriter::~RecordWriter() { 83 if (dest_ != nullptr) { 84 Status s = Close(); 85 if (!s.ok()) { 86 LOG(ERROR) << "Could not finish writing file: " << s; 87 } 88 } 89 } 90 91 static uint32 MaskedCrc(const char* data, size_t n) { 92 return crc32c::Mask(crc32c::Value(data, n)); 93 } 94 95 Status RecordWriter::WriteRecord(StringPiece data) { 96 // Format of a single record: 97 // uint64 length 98 // uint32 masked crc of length 99 // byte data[length] 100 // uint32 masked crc of data 101 char header[sizeof(uint64) + sizeof(uint32)]; 102 core::EncodeFixed64(header + 0, data.size()); 103 core::EncodeFixed32(header + sizeof(uint64), 104 MaskedCrc(header, sizeof(uint64))); 105 char footer[sizeof(uint32)]; 106 core::EncodeFixed32(footer, MaskedCrc(data.data(), data.size())); 107 108 TF_RETURN_IF_ERROR(dest_->Append(StringPiece(header, sizeof(header)))); 109 TF_RETURN_IF_ERROR(dest_->Append(data)); 110 return dest_->Append(StringPiece(footer, sizeof(footer))); 111 } 112 113 Status RecordWriter::Close() { 114 #if !defined(IS_SLIM_BUILD) 115 if (IsZlibCompressed(options_)) { 116 Status s = dest_->Close(); 117 delete dest_; 118 dest_ = nullptr; 119 return s; 120 } 121 #endif // IS_SLIM_BUILD 122 return Status::OK(); 123 } 124 125 Status RecordWriter::Flush() { 126 if (IsZlibCompressed(options_)) { 127 return dest_->Flush(); 128 } 129 return Status::OK(); 130 } 131 132 } // namespace io 133 } // namespace tensorflow 134