Home | History | Annotate | Download | only in io
      1 // Protocol Buffers - Google's data interchange format
      2 // Copyright 2008 Google Inc.  All rights reserved.
      3 // https://developers.google.com/protocol-buffers/
      4 //
      5 // Redistribution and use in source and binary forms, with or without
      6 // modification, are permitted provided that the following conditions are
      7 // met:
      8 //
      9 //     * Redistributions of source code must retain the above copyright
     10 // notice, this list of conditions and the following disclaimer.
     11 //     * Redistributions in binary form must reproduce the above
     12 // copyright notice, this list of conditions and the following disclaimer
     13 // in the documentation and/or other materials provided with the
     14 // distribution.
     15 //     * Neither the name of Google Inc. nor the names of its
     16 // contributors may be used to endorse or promote products derived from
     17 // this software without specific prior written permission.
     18 //
     19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     30 
     31 // Author: kenton (at) google.com (Kenton Varda)
     32 //  Based on original Protocol Buffers design by
     33 //  Sanjay Ghemawat, Jeff Dean, and others.
     34 
     35 #ifdef _MSC_VER
     36 #include <io.h>
     37 #else
     38 #include <unistd.h>
     39 #include <sys/types.h>
     40 #include <sys/stat.h>
     41 #include <fcntl.h>
     42 #endif
     43 #include <errno.h>
     44 #include <iostream>
     45 #include <algorithm>
     46 
     47 #include <google/protobuf/io/zero_copy_stream_impl.h>
     48 #include <google/protobuf/stubs/common.h>
     49 #include <google/protobuf/stubs/logging.h>
     50 #include <google/protobuf/stubs/stl_util.h>
     51 
     52 
     53 namespace google {
     54 namespace protobuf {
     55 namespace io {
     56 
     57 #ifdef _WIN32
     58 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
     59 // return value is undefined.  We re-define it to always produce an error.
     60 #define lseek(fd, offset, origin) ((off_t)-1)
     61 #endif
     62 
     63 namespace {
     64 
     65 // EINTR sucks.
     66 int close_no_eintr(int fd) {
     67   int result;
     68   do {
     69     result = close(fd);
     70   } while (result < 0 && errno == EINTR);
     71   return result;
     72 }
     73 
     74 }  // namespace
     75 
     76 
     77 // ===================================================================
     78 
     79 FileInputStream::FileInputStream(int file_descriptor, int block_size)
     80   : copying_input_(file_descriptor),
     81     impl_(&copying_input_, block_size) {
     82 }
     83 
     84 FileInputStream::~FileInputStream() {}
     85 
     86 bool FileInputStream::Close() {
     87   return copying_input_.Close();
     88 }
     89 
     90 bool FileInputStream::Next(const void** data, int* size) {
     91   return impl_.Next(data, size);
     92 }
     93 
     94 void FileInputStream::BackUp(int count) {
     95   impl_.BackUp(count);
     96 }
     97 
     98 bool FileInputStream::Skip(int count) {
     99   return impl_.Skip(count);
    100 }
    101 
    102 int64 FileInputStream::ByteCount() const {
    103   return impl_.ByteCount();
    104 }
    105 
    106 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
    107     int file_descriptor)
    108   : file_(file_descriptor),
    109     close_on_delete_(false),
    110     is_closed_(false),
    111     errno_(0),
    112     previous_seek_failed_(false) {
    113 }
    114 
    115 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
    116   if (close_on_delete_) {
    117     if (!Close()) {
    118       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
    119     }
    120   }
    121 }
    122 
    123 bool FileInputStream::CopyingFileInputStream::Close() {
    124   GOOGLE_CHECK(!is_closed_);
    125 
    126   is_closed_ = true;
    127   if (close_no_eintr(file_) != 0) {
    128     // The docs on close() do not specify whether a file descriptor is still
    129     // open after close() fails with EIO.  However, the glibc source code
    130     // seems to indicate that it is not.
    131     errno_ = errno;
    132     return false;
    133   }
    134 
    135   return true;
    136 }
    137 
    138 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
    139   GOOGLE_CHECK(!is_closed_);
    140 
    141   int result;
    142   do {
    143     result = read(file_, buffer, size);
    144   } while (result < 0 && errno == EINTR);
    145 
    146   if (result < 0) {
    147     // Read error (not EOF).
    148     errno_ = errno;
    149   }
    150 
    151   return result;
    152 }
    153 
    154 int FileInputStream::CopyingFileInputStream::Skip(int count) {
    155   GOOGLE_CHECK(!is_closed_);
    156 
    157   if (!previous_seek_failed_ &&
    158       lseek(file_, count, SEEK_CUR) != (off_t)-1) {
    159     // Seek succeeded.
    160     return count;
    161   } else {
    162     // Failed to seek.
    163 
    164     // Note to self:  Don't seek again.  This file descriptor doesn't
    165     // support it.
    166     previous_seek_failed_ = true;
    167 
    168     // Use the default implementation.
    169     return CopyingInputStream::Skip(count);
    170   }
    171 }
    172 
    173 // ===================================================================
    174 
    175 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
    176   : copying_output_(file_descriptor),
    177     impl_(&copying_output_, block_size) {
    178 }
    179 
    180 FileOutputStream::~FileOutputStream() {
    181   impl_.Flush();
    182 }
    183 
    184 bool FileOutputStream::Close() {
    185   bool flush_succeeded = impl_.Flush();
    186   return copying_output_.Close() && flush_succeeded;
    187 }
    188 
    189 bool FileOutputStream::Flush() {
    190   return impl_.Flush();
    191 }
    192 
    193 bool FileOutputStream::Next(void** data, int* size) {
    194   return impl_.Next(data, size);
    195 }
    196 
    197 void FileOutputStream::BackUp(int count) {
    198   impl_.BackUp(count);
    199 }
    200 
    201 int64 FileOutputStream::ByteCount() const {
    202   return impl_.ByteCount();
    203 }
    204 
    205 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
    206     int file_descriptor)
    207   : file_(file_descriptor),
    208     close_on_delete_(false),
    209     is_closed_(false),
    210     errno_(0) {
    211 }
    212 
    213 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
    214   if (close_on_delete_) {
    215     if (!Close()) {
    216       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
    217     }
    218   }
    219 }
    220 
    221 bool FileOutputStream::CopyingFileOutputStream::Close() {
    222   GOOGLE_CHECK(!is_closed_);
    223 
    224   is_closed_ = true;
    225   if (close_no_eintr(file_) != 0) {
    226     // The docs on close() do not specify whether a file descriptor is still
    227     // open after close() fails with EIO.  However, the glibc source code
    228     // seems to indicate that it is not.
    229     errno_ = errno;
    230     return false;
    231   }
    232 
    233   return true;
    234 }
    235 
    236 bool FileOutputStream::CopyingFileOutputStream::Write(
    237     const void* buffer, int size) {
    238   GOOGLE_CHECK(!is_closed_);
    239   int total_written = 0;
    240 
    241   const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
    242 
    243   while (total_written < size) {
    244     int bytes;
    245     do {
    246       bytes = write(file_, buffer_base + total_written, size - total_written);
    247     } while (bytes < 0 && errno == EINTR);
    248 
    249     if (bytes <= 0) {
    250       // Write error.
    251 
    252       // FIXME(kenton):  According to the man page, if write() returns zero,
    253       //   there was no error; write() simply did not write anything.  It's
    254       //   unclear under what circumstances this might happen, but presumably
    255       //   errno won't be set in this case.  I am confused as to how such an
    256       //   event should be handled.  For now I'm treating it as an error, since
    257       //   retrying seems like it could lead to an infinite loop.  I suspect
    258       //   this never actually happens anyway.
    259 
    260       if (bytes < 0) {
    261         errno_ = errno;
    262       }
    263       return false;
    264     }
    265     total_written += bytes;
    266   }
    267 
    268   return true;
    269 }
    270 
    271 // ===================================================================
    272 
    273 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
    274   : copying_input_(input),
    275     impl_(&copying_input_, block_size) {
    276 }
    277 
    278 IstreamInputStream::~IstreamInputStream() {}
    279 
    280 bool IstreamInputStream::Next(const void** data, int* size) {
    281   return impl_.Next(data, size);
    282 }
    283 
    284 void IstreamInputStream::BackUp(int count) {
    285   impl_.BackUp(count);
    286 }
    287 
    288 bool IstreamInputStream::Skip(int count) {
    289   return impl_.Skip(count);
    290 }
    291 
    292 int64 IstreamInputStream::ByteCount() const {
    293   return impl_.ByteCount();
    294 }
    295 
    296 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
    297     istream* input)
    298   : input_(input) {
    299 }
    300 
    301 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
    302 
    303 int IstreamInputStream::CopyingIstreamInputStream::Read(
    304     void* buffer, int size) {
    305   input_->read(reinterpret_cast<char*>(buffer), size);
    306   int result = input_->gcount();
    307   if (result == 0 && input_->fail() && !input_->eof()) {
    308     return -1;
    309   }
    310   return result;
    311 }
    312 
    313 // ===================================================================
    314 
    315 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
    316   : copying_output_(output),
    317     impl_(&copying_output_, block_size) {
    318 }
    319 
    320 OstreamOutputStream::~OstreamOutputStream() {
    321   impl_.Flush();
    322 }
    323 
    324 bool OstreamOutputStream::Next(void** data, int* size) {
    325   return impl_.Next(data, size);
    326 }
    327 
    328 void OstreamOutputStream::BackUp(int count) {
    329   impl_.BackUp(count);
    330 }
    331 
    332 int64 OstreamOutputStream::ByteCount() const {
    333   return impl_.ByteCount();
    334 }
    335 
    336 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
    337     ostream* output)
    338   : output_(output) {
    339 }
    340 
    341 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
    342 }
    343 
    344 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
    345     const void* buffer, int size) {
    346   output_->write(reinterpret_cast<const char*>(buffer), size);
    347   return output_->good();
    348 }
    349 
    350 // ===================================================================
    351 
    352 ConcatenatingInputStream::ConcatenatingInputStream(
    353     ZeroCopyInputStream* const streams[], int count)
    354   : streams_(streams), stream_count_(count), bytes_retired_(0) {
    355 }
    356 
    357 ConcatenatingInputStream::~ConcatenatingInputStream() {
    358 }
    359 
    360 bool ConcatenatingInputStream::Next(const void** data, int* size) {
    361   while (stream_count_ > 0) {
    362     if (streams_[0]->Next(data, size)) return true;
    363 
    364     // That stream is done.  Advance to the next one.
    365     bytes_retired_ += streams_[0]->ByteCount();
    366     ++streams_;
    367     --stream_count_;
    368   }
    369 
    370   // No more streams.
    371   return false;
    372 }
    373 
    374 void ConcatenatingInputStream::BackUp(int count) {
    375   if (stream_count_ > 0) {
    376     streams_[0]->BackUp(count);
    377   } else {
    378     GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
    379   }
    380 }
    381 
    382 bool ConcatenatingInputStream::Skip(int count) {
    383   while (stream_count_ > 0) {
    384     // Assume that ByteCount() can be used to find out how much we actually
    385     // skipped when Skip() fails.
    386     int64 target_byte_count = streams_[0]->ByteCount() + count;
    387     if (streams_[0]->Skip(count)) return true;
    388 
    389     // Hit the end of the stream.  Figure out how many more bytes we still have
    390     // to skip.
    391     int64 final_byte_count = streams_[0]->ByteCount();
    392     GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
    393     count = target_byte_count - final_byte_count;
    394 
    395     // That stream is done.  Advance to the next one.
    396     bytes_retired_ += final_byte_count;
    397     ++streams_;
    398     --stream_count_;
    399   }
    400 
    401   return false;
    402 }
    403 
    404 int64 ConcatenatingInputStream::ByteCount() const {
    405   if (stream_count_ == 0) {
    406     return bytes_retired_;
    407   } else {
    408     return bytes_retired_ + streams_[0]->ByteCount();
    409   }
    410 }
    411 
    412 
    413 // ===================================================================
    414 
    415 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
    416                                          int64 limit)
    417   : input_(input), limit_(limit) {
    418   prior_bytes_read_ = input_->ByteCount();
    419 }
    420 
    421 LimitingInputStream::~LimitingInputStream() {
    422   // If we overshot the limit, back up.
    423   if (limit_ < 0) input_->BackUp(-limit_);
    424 }
    425 
    426 bool LimitingInputStream::Next(const void** data, int* size) {
    427   if (limit_ <= 0) return false;
    428   if (!input_->Next(data, size)) return false;
    429 
    430   limit_ -= *size;
    431   if (limit_ < 0) {
    432     // We overshot the limit.  Reduce *size to hide the rest of the buffer.
    433     *size += limit_;
    434   }
    435   return true;
    436 }
    437 
    438 void LimitingInputStream::BackUp(int count) {
    439   if (limit_ < 0) {
    440     input_->BackUp(count - limit_);
    441     limit_ = count;
    442   } else {
    443     input_->BackUp(count);
    444     limit_ += count;
    445   }
    446 }
    447 
    448 bool LimitingInputStream::Skip(int count) {
    449   if (count > limit_) {
    450     if (limit_ < 0) return false;
    451     input_->Skip(limit_);
    452     limit_ = 0;
    453     return false;
    454   } else {
    455     if (!input_->Skip(count)) return false;
    456     limit_ -= count;
    457     return true;
    458   }
    459 }
    460 
    461 int64 LimitingInputStream::ByteCount() const {
    462   if (limit_ < 0) {
    463     return input_->ByteCount() + limit_ - prior_bytes_read_;
    464   } else {
    465     return input_->ByteCount() - prior_bytes_read_;
    466   }
    467 }
    468 
    469 
    470 // ===================================================================
    471 
    472 }  // namespace io
    473 }  // namespace protobuf
    474 }  // namespace google
    475