1 // Protocol Buffers - Google's data interchange format 2 // Copyright 2008 Google Inc. All rights reserved. 3 // https://developers.google.com/protocol-buffers/ 4 // 5 // Redistribution and use in source and binary forms, with or without 6 // modification, are permitted provided that the following conditions are 7 // met: 8 // 9 // * Redistributions of source code must retain the above copyright 10 // notice, this list of conditions and the following disclaimer. 11 // * Redistributions in binary form must reproduce the above 12 // copyright notice, this list of conditions and the following disclaimer 13 // in the documentation and/or other materials provided with the 14 // distribution. 15 // * Neither the name of Google Inc. nor the names of its 16 // contributors may be used to endorse or promote products derived from 17 // this software without specific prior written permission. 18 // 19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 31 // Author: kenton (at) google.com (Kenton Varda) 32 // Based on original Protocol Buffers design by 33 // Sanjay Ghemawat, Jeff Dean, and others. 34 35 #ifdef _MSC_VER 36 #include <io.h> 37 #else 38 #include <unistd.h> 39 #include <sys/types.h> 40 #include <sys/stat.h> 41 #include <fcntl.h> 42 #endif 43 #include <errno.h> 44 #include <iostream> 45 #include <algorithm> 46 47 #include <google/protobuf/io/zero_copy_stream_impl.h> 48 #include <google/protobuf/stubs/common.h> 49 #include <google/protobuf/stubs/logging.h> 50 #include <google/protobuf/stubs/stl_util.h> 51 52 53 namespace google { 54 namespace protobuf { 55 namespace io { 56 57 #ifdef _WIN32 58 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its 59 // return value is undefined. We re-define it to always produce an error. 60 #define lseek(fd, offset, origin) ((off_t)-1) 61 #endif 62 63 namespace { 64 65 // EINTR sucks. 66 int close_no_eintr(int fd) { 67 int result; 68 do { 69 result = close(fd); 70 } while (result < 0 && errno == EINTR); 71 return result; 72 } 73 74 } // namespace 75 76 77 // =================================================================== 78 79 FileInputStream::FileInputStream(int file_descriptor, int block_size) 80 : copying_input_(file_descriptor), 81 impl_(©ing_input_, block_size) { 82 } 83 84 FileInputStream::~FileInputStream() {} 85 86 bool FileInputStream::Close() { 87 return copying_input_.Close(); 88 } 89 90 bool FileInputStream::Next(const void** data, int* size) { 91 return impl_.Next(data, size); 92 } 93 94 void FileInputStream::BackUp(int count) { 95 impl_.BackUp(count); 96 } 97 98 bool FileInputStream::Skip(int count) { 99 return impl_.Skip(count); 100 } 101 102 int64 FileInputStream::ByteCount() const { 103 return impl_.ByteCount(); 104 } 105 106 FileInputStream::CopyingFileInputStream::CopyingFileInputStream( 107 int file_descriptor) 108 : file_(file_descriptor), 109 close_on_delete_(false), 110 is_closed_(false), 111 errno_(0), 112 previous_seek_failed_(false) { 113 } 114 115 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() { 116 if (close_on_delete_) { 117 if (!Close()) { 118 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 119 } 120 } 121 } 122 123 bool FileInputStream::CopyingFileInputStream::Close() { 124 GOOGLE_CHECK(!is_closed_); 125 126 is_closed_ = true; 127 if (close_no_eintr(file_) != 0) { 128 // The docs on close() do not specify whether a file descriptor is still 129 // open after close() fails with EIO. However, the glibc source code 130 // seems to indicate that it is not. 131 errno_ = errno; 132 return false; 133 } 134 135 return true; 136 } 137 138 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) { 139 GOOGLE_CHECK(!is_closed_); 140 141 int result; 142 do { 143 result = read(file_, buffer, size); 144 } while (result < 0 && errno == EINTR); 145 146 if (result < 0) { 147 // Read error (not EOF). 148 errno_ = errno; 149 } 150 151 return result; 152 } 153 154 int FileInputStream::CopyingFileInputStream::Skip(int count) { 155 GOOGLE_CHECK(!is_closed_); 156 157 if (!previous_seek_failed_ && 158 lseek(file_, count, SEEK_CUR) != (off_t)-1) { 159 // Seek succeeded. 160 return count; 161 } else { 162 // Failed to seek. 163 164 // Note to self: Don't seek again. This file descriptor doesn't 165 // support it. 166 previous_seek_failed_ = true; 167 168 // Use the default implementation. 169 return CopyingInputStream::Skip(count); 170 } 171 } 172 173 // =================================================================== 174 175 FileOutputStream::FileOutputStream(int file_descriptor, int block_size) 176 : copying_output_(file_descriptor), 177 impl_(©ing_output_, block_size) { 178 } 179 180 FileOutputStream::~FileOutputStream() { 181 impl_.Flush(); 182 } 183 184 bool FileOutputStream::Close() { 185 bool flush_succeeded = impl_.Flush(); 186 return copying_output_.Close() && flush_succeeded; 187 } 188 189 bool FileOutputStream::Flush() { 190 return impl_.Flush(); 191 } 192 193 bool FileOutputStream::Next(void** data, int* size) { 194 return impl_.Next(data, size); 195 } 196 197 void FileOutputStream::BackUp(int count) { 198 impl_.BackUp(count); 199 } 200 201 int64 FileOutputStream::ByteCount() const { 202 return impl_.ByteCount(); 203 } 204 205 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream( 206 int file_descriptor) 207 : file_(file_descriptor), 208 close_on_delete_(false), 209 is_closed_(false), 210 errno_(0) { 211 } 212 213 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() { 214 if (close_on_delete_) { 215 if (!Close()) { 216 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 217 } 218 } 219 } 220 221 bool FileOutputStream::CopyingFileOutputStream::Close() { 222 GOOGLE_CHECK(!is_closed_); 223 224 is_closed_ = true; 225 if (close_no_eintr(file_) != 0) { 226 // The docs on close() do not specify whether a file descriptor is still 227 // open after close() fails with EIO. However, the glibc source code 228 // seems to indicate that it is not. 229 errno_ = errno; 230 return false; 231 } 232 233 return true; 234 } 235 236 bool FileOutputStream::CopyingFileOutputStream::Write( 237 const void* buffer, int size) { 238 GOOGLE_CHECK(!is_closed_); 239 int total_written = 0; 240 241 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer); 242 243 while (total_written < size) { 244 int bytes; 245 do { 246 bytes = write(file_, buffer_base + total_written, size - total_written); 247 } while (bytes < 0 && errno == EINTR); 248 249 if (bytes <= 0) { 250 // Write error. 251 252 // FIXME(kenton): According to the man page, if write() returns zero, 253 // there was no error; write() simply did not write anything. It's 254 // unclear under what circumstances this might happen, but presumably 255 // errno won't be set in this case. I am confused as to how such an 256 // event should be handled. For now I'm treating it as an error, since 257 // retrying seems like it could lead to an infinite loop. I suspect 258 // this never actually happens anyway. 259 260 if (bytes < 0) { 261 errno_ = errno; 262 } 263 return false; 264 } 265 total_written += bytes; 266 } 267 268 return true; 269 } 270 271 // =================================================================== 272 273 IstreamInputStream::IstreamInputStream(istream* input, int block_size) 274 : copying_input_(input), 275 impl_(©ing_input_, block_size) { 276 } 277 278 IstreamInputStream::~IstreamInputStream() {} 279 280 bool IstreamInputStream::Next(const void** data, int* size) { 281 return impl_.Next(data, size); 282 } 283 284 void IstreamInputStream::BackUp(int count) { 285 impl_.BackUp(count); 286 } 287 288 bool IstreamInputStream::Skip(int count) { 289 return impl_.Skip(count); 290 } 291 292 int64 IstreamInputStream::ByteCount() const { 293 return impl_.ByteCount(); 294 } 295 296 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream( 297 istream* input) 298 : input_(input) { 299 } 300 301 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {} 302 303 int IstreamInputStream::CopyingIstreamInputStream::Read( 304 void* buffer, int size) { 305 input_->read(reinterpret_cast<char*>(buffer), size); 306 int result = input_->gcount(); 307 if (result == 0 && input_->fail() && !input_->eof()) { 308 return -1; 309 } 310 return result; 311 } 312 313 // =================================================================== 314 315 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size) 316 : copying_output_(output), 317 impl_(©ing_output_, block_size) { 318 } 319 320 OstreamOutputStream::~OstreamOutputStream() { 321 impl_.Flush(); 322 } 323 324 bool OstreamOutputStream::Next(void** data, int* size) { 325 return impl_.Next(data, size); 326 } 327 328 void OstreamOutputStream::BackUp(int count) { 329 impl_.BackUp(count); 330 } 331 332 int64 OstreamOutputStream::ByteCount() const { 333 return impl_.ByteCount(); 334 } 335 336 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream( 337 ostream* output) 338 : output_(output) { 339 } 340 341 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() { 342 } 343 344 bool OstreamOutputStream::CopyingOstreamOutputStream::Write( 345 const void* buffer, int size) { 346 output_->write(reinterpret_cast<const char*>(buffer), size); 347 return output_->good(); 348 } 349 350 // =================================================================== 351 352 ConcatenatingInputStream::ConcatenatingInputStream( 353 ZeroCopyInputStream* const streams[], int count) 354 : streams_(streams), stream_count_(count), bytes_retired_(0) { 355 } 356 357 ConcatenatingInputStream::~ConcatenatingInputStream() { 358 } 359 360 bool ConcatenatingInputStream::Next(const void** data, int* size) { 361 while (stream_count_ > 0) { 362 if (streams_[0]->Next(data, size)) return true; 363 364 // That stream is done. Advance to the next one. 365 bytes_retired_ += streams_[0]->ByteCount(); 366 ++streams_; 367 --stream_count_; 368 } 369 370 // No more streams. 371 return false; 372 } 373 374 void ConcatenatingInputStream::BackUp(int count) { 375 if (stream_count_ > 0) { 376 streams_[0]->BackUp(count); 377 } else { 378 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next()."; 379 } 380 } 381 382 bool ConcatenatingInputStream::Skip(int count) { 383 while (stream_count_ > 0) { 384 // Assume that ByteCount() can be used to find out how much we actually 385 // skipped when Skip() fails. 386 int64 target_byte_count = streams_[0]->ByteCount() + count; 387 if (streams_[0]->Skip(count)) return true; 388 389 // Hit the end of the stream. Figure out how many more bytes we still have 390 // to skip. 391 int64 final_byte_count = streams_[0]->ByteCount(); 392 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count); 393 count = target_byte_count - final_byte_count; 394 395 // That stream is done. Advance to the next one. 396 bytes_retired_ += final_byte_count; 397 ++streams_; 398 --stream_count_; 399 } 400 401 return false; 402 } 403 404 int64 ConcatenatingInputStream::ByteCount() const { 405 if (stream_count_ == 0) { 406 return bytes_retired_; 407 } else { 408 return bytes_retired_ + streams_[0]->ByteCount(); 409 } 410 } 411 412 413 // =================================================================== 414 415 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input, 416 int64 limit) 417 : input_(input), limit_(limit) { 418 prior_bytes_read_ = input_->ByteCount(); 419 } 420 421 LimitingInputStream::~LimitingInputStream() { 422 // If we overshot the limit, back up. 423 if (limit_ < 0) input_->BackUp(-limit_); 424 } 425 426 bool LimitingInputStream::Next(const void** data, int* size) { 427 if (limit_ <= 0) return false; 428 if (!input_->Next(data, size)) return false; 429 430 limit_ -= *size; 431 if (limit_ < 0) { 432 // We overshot the limit. Reduce *size to hide the rest of the buffer. 433 *size += limit_; 434 } 435 return true; 436 } 437 438 void LimitingInputStream::BackUp(int count) { 439 if (limit_ < 0) { 440 input_->BackUp(count - limit_); 441 limit_ = count; 442 } else { 443 input_->BackUp(count); 444 limit_ += count; 445 } 446 } 447 448 bool LimitingInputStream::Skip(int count) { 449 if (count > limit_) { 450 if (limit_ < 0) return false; 451 input_->Skip(limit_); 452 limit_ = 0; 453 return false; 454 } else { 455 if (!input_->Skip(count)) return false; 456 limit_ -= count; 457 return true; 458 } 459 } 460 461 int64 LimitingInputStream::ByteCount() const { 462 if (limit_ < 0) { 463 return input_->ByteCount() + limit_ - prior_bytes_read_; 464 } else { 465 return input_->ByteCount() - prior_bytes_read_; 466 } 467 } 468 469 470 // =================================================================== 471 472 } // namespace io 473 } // namespace protobuf 474 } // namespace google 475