1 // Protocol Buffers - Google's data interchange format 2 // Copyright 2008 Google Inc. All rights reserved. 3 // http://code.google.com/p/protobuf/ 4 // 5 // Redistribution and use in source and binary forms, with or without 6 // modification, are permitted provided that the following conditions are 7 // met: 8 // 9 // * Redistributions of source code must retain the above copyright 10 // notice, this list of conditions and the following disclaimer. 11 // * Redistributions in binary form must reproduce the above 12 // copyright notice, this list of conditions and the following disclaimer 13 // in the documentation and/or other materials provided with the 14 // distribution. 15 // * Neither the name of Google Inc. nor the names of its 16 // contributors may be used to endorse or promote products derived from 17 // this software without specific prior written permission. 18 // 19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 30 31 // Author: kenton (at) google.com (Kenton Varda) 32 // Based on original Protocol Buffers design by 33 // Sanjay Ghemawat, Jeff Dean, and others. 34 35 #ifdef _MSC_VER 36 #include <io.h> 37 #else 38 #include <unistd.h> 39 #include <sys/types.h> 40 #include <sys/stat.h> 41 #include <fcntl.h> 42 #endif 43 #include <errno.h> 44 #include <iostream> 45 #include <algorithm> 46 47 #include <google/protobuf/io/zero_copy_stream_impl.h> 48 #include <google/protobuf/stubs/common.h> 49 #include <google/protobuf/stubs/stl_util-inl.h> 50 51 namespace google { 52 namespace protobuf { 53 namespace io { 54 55 #ifdef _WIN32 56 // Win32 lseek is broken: If invoked on a non-seekable file descriptor, its 57 // return value is undefined. We re-define it to always produce an error. 58 #define lseek(fd, offset, origin) ((off_t)-1) 59 #endif 60 61 namespace { 62 63 // EINTR sucks. 64 int close_no_eintr(int fd) { 65 int result; 66 do { 67 result = close(fd); 68 } while (result < 0 && errno == EINTR); 69 return result; 70 } 71 72 } // namespace 73 74 75 // =================================================================== 76 77 FileInputStream::FileInputStream(int file_descriptor, int block_size) 78 : copying_input_(file_descriptor), 79 impl_(©ing_input_, block_size) { 80 } 81 82 FileInputStream::~FileInputStream() {} 83 84 bool FileInputStream::Close() { 85 return copying_input_.Close(); 86 } 87 88 bool FileInputStream::Next(const void** data, int* size) { 89 return impl_.Next(data, size); 90 } 91 92 void FileInputStream::BackUp(int count) { 93 impl_.BackUp(count); 94 } 95 96 bool FileInputStream::Skip(int count) { 97 return impl_.Skip(count); 98 } 99 100 int64 FileInputStream::ByteCount() const { 101 return impl_.ByteCount(); 102 } 103 104 FileInputStream::CopyingFileInputStream::CopyingFileInputStream( 105 int file_descriptor) 106 : file_(file_descriptor), 107 close_on_delete_(false), 108 is_closed_(false), 109 errno_(0), 110 previous_seek_failed_(false) { 111 } 112 113 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() { 114 if (close_on_delete_) { 115 if (!Close()) { 116 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 117 } 118 } 119 } 120 121 bool FileInputStream::CopyingFileInputStream::Close() { 122 GOOGLE_CHECK(!is_closed_); 123 124 is_closed_ = true; 125 if (close_no_eintr(file_) != 0) { 126 // The docs on close() do not specify whether a file descriptor is still 127 // open after close() fails with EIO. However, the glibc source code 128 // seems to indicate that it is not. 129 errno_ = errno; 130 return false; 131 } 132 133 return true; 134 } 135 136 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) { 137 GOOGLE_CHECK(!is_closed_); 138 139 int result; 140 do { 141 result = read(file_, buffer, size); 142 } while (result < 0 && errno == EINTR); 143 144 if (result < 0) { 145 // Read error (not EOF). 146 errno_ = errno; 147 } 148 149 return result; 150 } 151 152 int FileInputStream::CopyingFileInputStream::Skip(int count) { 153 GOOGLE_CHECK(!is_closed_); 154 155 if (!previous_seek_failed_ && 156 lseek(file_, count, SEEK_CUR) != (off_t)-1) { 157 // Seek succeeded. 158 return count; 159 } else { 160 // Failed to seek. 161 162 // Note to self: Don't seek again. This file descriptor doesn't 163 // support it. 164 previous_seek_failed_ = true; 165 166 // Use the default implementation. 167 return CopyingInputStream::Skip(count); 168 } 169 } 170 171 // =================================================================== 172 173 FileOutputStream::FileOutputStream(int file_descriptor, int block_size) 174 : copying_output_(file_descriptor), 175 impl_(©ing_output_, block_size) { 176 } 177 178 FileOutputStream::~FileOutputStream() { 179 impl_.Flush(); 180 } 181 182 bool FileOutputStream::Close() { 183 bool flush_succeeded = impl_.Flush(); 184 return copying_output_.Close() && flush_succeeded; 185 } 186 187 bool FileOutputStream::Flush() { 188 return impl_.Flush(); 189 } 190 191 bool FileOutputStream::Next(void** data, int* size) { 192 return impl_.Next(data, size); 193 } 194 195 void FileOutputStream::BackUp(int count) { 196 impl_.BackUp(count); 197 } 198 199 int64 FileOutputStream::ByteCount() const { 200 return impl_.ByteCount(); 201 } 202 203 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream( 204 int file_descriptor) 205 : file_(file_descriptor), 206 close_on_delete_(false), 207 is_closed_(false), 208 errno_(0) { 209 } 210 211 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() { 212 if (close_on_delete_) { 213 if (!Close()) { 214 GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_); 215 } 216 } 217 } 218 219 bool FileOutputStream::CopyingFileOutputStream::Close() { 220 GOOGLE_CHECK(!is_closed_); 221 222 is_closed_ = true; 223 if (close_no_eintr(file_) != 0) { 224 // The docs on close() do not specify whether a file descriptor is still 225 // open after close() fails with EIO. However, the glibc source code 226 // seems to indicate that it is not. 227 errno_ = errno; 228 return false; 229 } 230 231 return true; 232 } 233 234 bool FileOutputStream::CopyingFileOutputStream::Write( 235 const void* buffer, int size) { 236 GOOGLE_CHECK(!is_closed_); 237 int total_written = 0; 238 239 const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer); 240 241 while (total_written < size) { 242 int bytes; 243 do { 244 bytes = write(file_, buffer_base + total_written, size - total_written); 245 } while (bytes < 0 && errno == EINTR); 246 247 if (bytes <= 0) { 248 // Write error. 249 250 // FIXME(kenton): According to the man page, if write() returns zero, 251 // there was no error; write() simply did not write anything. It's 252 // unclear under what circumstances this might happen, but presumably 253 // errno won't be set in this case. I am confused as to how such an 254 // event should be handled. For now I'm treating it as an error, since 255 // retrying seems like it could lead to an infinite loop. I suspect 256 // this never actually happens anyway. 257 258 if (bytes < 0) { 259 errno_ = errno; 260 } 261 return false; 262 } 263 total_written += bytes; 264 } 265 266 return true; 267 } 268 269 // =================================================================== 270 271 IstreamInputStream::IstreamInputStream(istream* input, int block_size) 272 : copying_input_(input), 273 impl_(©ing_input_, block_size) { 274 } 275 276 IstreamInputStream::~IstreamInputStream() {} 277 278 bool IstreamInputStream::Next(const void** data, int* size) { 279 return impl_.Next(data, size); 280 } 281 282 void IstreamInputStream::BackUp(int count) { 283 impl_.BackUp(count); 284 } 285 286 bool IstreamInputStream::Skip(int count) { 287 return impl_.Skip(count); 288 } 289 290 int64 IstreamInputStream::ByteCount() const { 291 return impl_.ByteCount(); 292 } 293 294 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream( 295 istream* input) 296 : input_(input) { 297 } 298 299 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {} 300 301 int IstreamInputStream::CopyingIstreamInputStream::Read( 302 void* buffer, int size) { 303 input_->read(reinterpret_cast<char*>(buffer), size); 304 int result = input_->gcount(); 305 if (result == 0 && input_->fail() && !input_->eof()) { 306 return -1; 307 } 308 return result; 309 } 310 311 // =================================================================== 312 313 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size) 314 : copying_output_(output), 315 impl_(©ing_output_, block_size) { 316 } 317 318 OstreamOutputStream::~OstreamOutputStream() { 319 impl_.Flush(); 320 } 321 322 bool OstreamOutputStream::Next(void** data, int* size) { 323 return impl_.Next(data, size); 324 } 325 326 void OstreamOutputStream::BackUp(int count) { 327 impl_.BackUp(count); 328 } 329 330 int64 OstreamOutputStream::ByteCount() const { 331 return impl_.ByteCount(); 332 } 333 334 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream( 335 ostream* output) 336 : output_(output) { 337 } 338 339 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() { 340 } 341 342 bool OstreamOutputStream::CopyingOstreamOutputStream::Write( 343 const void* buffer, int size) { 344 output_->write(reinterpret_cast<const char*>(buffer), size); 345 return output_->good(); 346 } 347 348 // =================================================================== 349 350 ConcatenatingInputStream::ConcatenatingInputStream( 351 ZeroCopyInputStream* const streams[], int count) 352 : streams_(streams), stream_count_(count), bytes_retired_(0) { 353 } 354 355 ConcatenatingInputStream::~ConcatenatingInputStream() { 356 } 357 358 bool ConcatenatingInputStream::Next(const void** data, int* size) { 359 while (stream_count_ > 0) { 360 if (streams_[0]->Next(data, size)) return true; 361 362 // That stream is done. Advance to the next one. 363 bytes_retired_ += streams_[0]->ByteCount(); 364 ++streams_; 365 --stream_count_; 366 } 367 368 // No more streams. 369 return false; 370 } 371 372 void ConcatenatingInputStream::BackUp(int count) { 373 if (stream_count_ > 0) { 374 streams_[0]->BackUp(count); 375 } else { 376 GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next()."; 377 } 378 } 379 380 bool ConcatenatingInputStream::Skip(int count) { 381 while (stream_count_ > 0) { 382 // Assume that ByteCount() can be used to find out how much we actually 383 // skipped when Skip() fails. 384 int64 target_byte_count = streams_[0]->ByteCount() + count; 385 if (streams_[0]->Skip(count)) return true; 386 387 // Hit the end of the stream. Figure out how many more bytes we still have 388 // to skip. 389 int64 final_byte_count = streams_[0]->ByteCount(); 390 GOOGLE_DCHECK_LT(final_byte_count, target_byte_count); 391 count = target_byte_count - final_byte_count; 392 393 // That stream is done. Advance to the next one. 394 bytes_retired_ += final_byte_count; 395 ++streams_; 396 --stream_count_; 397 } 398 399 return false; 400 } 401 402 int64 ConcatenatingInputStream::ByteCount() const { 403 if (stream_count_ == 0) { 404 return bytes_retired_; 405 } else { 406 return bytes_retired_ + streams_[0]->ByteCount(); 407 } 408 } 409 410 411 // =================================================================== 412 413 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input, 414 int64 limit) 415 : input_(input), limit_(limit) {} 416 417 LimitingInputStream::~LimitingInputStream() { 418 // If we overshot the limit, back up. 419 if (limit_ < 0) input_->BackUp(-limit_); 420 } 421 422 bool LimitingInputStream::Next(const void** data, int* size) { 423 if (limit_ <= 0) return false; 424 if (!input_->Next(data, size)) return false; 425 426 limit_ -= *size; 427 if (limit_ < 0) { 428 // We overshot the limit. Reduce *size to hide the rest of the buffer. 429 *size += limit_; 430 } 431 return true; 432 } 433 434 void LimitingInputStream::BackUp(int count) { 435 if (limit_ < 0) { 436 input_->BackUp(count - limit_); 437 limit_ = count; 438 } else { 439 input_->BackUp(count); 440 limit_ += count; 441 } 442 } 443 444 bool LimitingInputStream::Skip(int count) { 445 if (count > limit_) { 446 if (limit_ < 0) return false; 447 input_->Skip(limit_); 448 limit_ = 0; 449 return false; 450 } else { 451 if (!input_->Skip(count)) return false; 452 limit_ -= count; 453 return true; 454 } 455 } 456 457 int64 LimitingInputStream::ByteCount() const { 458 if (limit_ < 0) { 459 return input_->ByteCount() + limit_; 460 } else { 461 return input_->ByteCount(); 462 } 463 } 464 465 466 // =================================================================== 467 468 } // namespace io 469 } // namespace protobuf 470 } // namespace google 471