Home | History | Annotate | Download | only in io
      1 // Protocol Buffers - Google's data interchange format
      2 // Copyright 2008 Google Inc.  All rights reserved.
      3 // http://code.google.com/p/protobuf/
      4 //
      5 // Redistribution and use in source and binary forms, with or without
      6 // modification, are permitted provided that the following conditions are
      7 // met:
      8 //
      9 //     * Redistributions of source code must retain the above copyright
     10 // notice, this list of conditions and the following disclaimer.
     11 //     * Redistributions in binary form must reproduce the above
     12 // copyright notice, this list of conditions and the following disclaimer
     13 // in the documentation and/or other materials provided with the
     14 // distribution.
     15 //     * Neither the name of Google Inc. nor the names of its
     16 // contributors may be used to endorse or promote products derived from
     17 // this software without specific prior written permission.
     18 //
     19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     20 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     21 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     22 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     23 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     24 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     25 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     26 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     27 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     28 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     29 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     30 
     31 // Author: kenton (at) google.com (Kenton Varda)
     32 //  Based on original Protocol Buffers design by
     33 //  Sanjay Ghemawat, Jeff Dean, and others.
     34 
     35 #ifdef _MSC_VER
     36 #include <io.h>
     37 #else
     38 #include <unistd.h>
     39 #include <sys/types.h>
     40 #include <sys/stat.h>
     41 #include <fcntl.h>
     42 #endif
     43 #include <errno.h>
     44 #include <iostream>
     45 #include <algorithm>
     46 
     47 #include <google/protobuf/io/zero_copy_stream_impl.h>
     48 #include <google/protobuf/stubs/common.h>
     49 #include <google/protobuf/stubs/stl_util-inl.h>
     50 
     51 namespace google {
     52 namespace protobuf {
     53 namespace io {
     54 
     55 #ifdef _WIN32
     56 // Win32 lseek is broken:  If invoked on a non-seekable file descriptor, its
     57 // return value is undefined.  We re-define it to always produce an error.
     58 #define lseek(fd, offset, origin) ((off_t)-1)
     59 #endif
     60 
     61 namespace {
     62 
     63 // EINTR sucks.
     64 int close_no_eintr(int fd) {
     65   int result;
     66   do {
     67     result = close(fd);
     68   } while (result < 0 && errno == EINTR);
     69   return result;
     70 }
     71 
     72 }  // namespace
     73 
     74 
     75 // ===================================================================
     76 
     77 FileInputStream::FileInputStream(int file_descriptor, int block_size)
     78   : copying_input_(file_descriptor),
     79     impl_(&copying_input_, block_size) {
     80 }
     81 
     82 FileInputStream::~FileInputStream() {}
     83 
     84 bool FileInputStream::Close() {
     85   return copying_input_.Close();
     86 }
     87 
     88 bool FileInputStream::Next(const void** data, int* size) {
     89   return impl_.Next(data, size);
     90 }
     91 
     92 void FileInputStream::BackUp(int count) {
     93   impl_.BackUp(count);
     94 }
     95 
     96 bool FileInputStream::Skip(int count) {
     97   return impl_.Skip(count);
     98 }
     99 
    100 int64 FileInputStream::ByteCount() const {
    101   return impl_.ByteCount();
    102 }
    103 
    104 FileInputStream::CopyingFileInputStream::CopyingFileInputStream(
    105     int file_descriptor)
    106   : file_(file_descriptor),
    107     close_on_delete_(false),
    108     is_closed_(false),
    109     errno_(0),
    110     previous_seek_failed_(false) {
    111 }
    112 
    113 FileInputStream::CopyingFileInputStream::~CopyingFileInputStream() {
    114   if (close_on_delete_) {
    115     if (!Close()) {
    116       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
    117     }
    118   }
    119 }
    120 
    121 bool FileInputStream::CopyingFileInputStream::Close() {
    122   GOOGLE_CHECK(!is_closed_);
    123 
    124   is_closed_ = true;
    125   if (close_no_eintr(file_) != 0) {
    126     // The docs on close() do not specify whether a file descriptor is still
    127     // open after close() fails with EIO.  However, the glibc source code
    128     // seems to indicate that it is not.
    129     errno_ = errno;
    130     return false;
    131   }
    132 
    133   return true;
    134 }
    135 
    136 int FileInputStream::CopyingFileInputStream::Read(void* buffer, int size) {
    137   GOOGLE_CHECK(!is_closed_);
    138 
    139   int result;
    140   do {
    141     result = read(file_, buffer, size);
    142   } while (result < 0 && errno == EINTR);
    143 
    144   if (result < 0) {
    145     // Read error (not EOF).
    146     errno_ = errno;
    147   }
    148 
    149   return result;
    150 }
    151 
    152 int FileInputStream::CopyingFileInputStream::Skip(int count) {
    153   GOOGLE_CHECK(!is_closed_);
    154 
    155   if (!previous_seek_failed_ &&
    156       lseek(file_, count, SEEK_CUR) != (off_t)-1) {
    157     // Seek succeeded.
    158     return count;
    159   } else {
    160     // Failed to seek.
    161 
    162     // Note to self:  Don't seek again.  This file descriptor doesn't
    163     // support it.
    164     previous_seek_failed_ = true;
    165 
    166     // Use the default implementation.
    167     return CopyingInputStream::Skip(count);
    168   }
    169 }
    170 
    171 // ===================================================================
    172 
    173 FileOutputStream::FileOutputStream(int file_descriptor, int block_size)
    174   : copying_output_(file_descriptor),
    175     impl_(&copying_output_, block_size) {
    176 }
    177 
    178 FileOutputStream::~FileOutputStream() {
    179   impl_.Flush();
    180 }
    181 
    182 bool FileOutputStream::Close() {
    183   bool flush_succeeded = impl_.Flush();
    184   return copying_output_.Close() && flush_succeeded;
    185 }
    186 
    187 bool FileOutputStream::Flush() {
    188   return impl_.Flush();
    189 }
    190 
    191 bool FileOutputStream::Next(void** data, int* size) {
    192   return impl_.Next(data, size);
    193 }
    194 
    195 void FileOutputStream::BackUp(int count) {
    196   impl_.BackUp(count);
    197 }
    198 
    199 int64 FileOutputStream::ByteCount() const {
    200   return impl_.ByteCount();
    201 }
    202 
    203 FileOutputStream::CopyingFileOutputStream::CopyingFileOutputStream(
    204     int file_descriptor)
    205   : file_(file_descriptor),
    206     close_on_delete_(false),
    207     is_closed_(false),
    208     errno_(0) {
    209 }
    210 
    211 FileOutputStream::CopyingFileOutputStream::~CopyingFileOutputStream() {
    212   if (close_on_delete_) {
    213     if (!Close()) {
    214       GOOGLE_LOG(ERROR) << "close() failed: " << strerror(errno_);
    215     }
    216   }
    217 }
    218 
    219 bool FileOutputStream::CopyingFileOutputStream::Close() {
    220   GOOGLE_CHECK(!is_closed_);
    221 
    222   is_closed_ = true;
    223   if (close_no_eintr(file_) != 0) {
    224     // The docs on close() do not specify whether a file descriptor is still
    225     // open after close() fails with EIO.  However, the glibc source code
    226     // seems to indicate that it is not.
    227     errno_ = errno;
    228     return false;
    229   }
    230 
    231   return true;
    232 }
    233 
    234 bool FileOutputStream::CopyingFileOutputStream::Write(
    235     const void* buffer, int size) {
    236   GOOGLE_CHECK(!is_closed_);
    237   int total_written = 0;
    238 
    239   const uint8* buffer_base = reinterpret_cast<const uint8*>(buffer);
    240 
    241   while (total_written < size) {
    242     int bytes;
    243     do {
    244       bytes = write(file_, buffer_base + total_written, size - total_written);
    245     } while (bytes < 0 && errno == EINTR);
    246 
    247     if (bytes <= 0) {
    248       // Write error.
    249 
    250       // FIXME(kenton):  According to the man page, if write() returns zero,
    251       //   there was no error; write() simply did not write anything.  It's
    252       //   unclear under what circumstances this might happen, but presumably
    253       //   errno won't be set in this case.  I am confused as to how such an
    254       //   event should be handled.  For now I'm treating it as an error, since
    255       //   retrying seems like it could lead to an infinite loop.  I suspect
    256       //   this never actually happens anyway.
    257 
    258       if (bytes < 0) {
    259         errno_ = errno;
    260       }
    261       return false;
    262     }
    263     total_written += bytes;
    264   }
    265 
    266   return true;
    267 }
    268 
    269 // ===================================================================
    270 
    271 IstreamInputStream::IstreamInputStream(istream* input, int block_size)
    272   : copying_input_(input),
    273     impl_(&copying_input_, block_size) {
    274 }
    275 
    276 IstreamInputStream::~IstreamInputStream() {}
    277 
    278 bool IstreamInputStream::Next(const void** data, int* size) {
    279   return impl_.Next(data, size);
    280 }
    281 
    282 void IstreamInputStream::BackUp(int count) {
    283   impl_.BackUp(count);
    284 }
    285 
    286 bool IstreamInputStream::Skip(int count) {
    287   return impl_.Skip(count);
    288 }
    289 
    290 int64 IstreamInputStream::ByteCount() const {
    291   return impl_.ByteCount();
    292 }
    293 
    294 IstreamInputStream::CopyingIstreamInputStream::CopyingIstreamInputStream(
    295     istream* input)
    296   : input_(input) {
    297 }
    298 
    299 IstreamInputStream::CopyingIstreamInputStream::~CopyingIstreamInputStream() {}
    300 
    301 int IstreamInputStream::CopyingIstreamInputStream::Read(
    302     void* buffer, int size) {
    303   input_->read(reinterpret_cast<char*>(buffer), size);
    304   int result = input_->gcount();
    305   if (result == 0 && input_->fail() && !input_->eof()) {
    306     return -1;
    307   }
    308   return result;
    309 }
    310 
    311 // ===================================================================
    312 
    313 OstreamOutputStream::OstreamOutputStream(ostream* output, int block_size)
    314   : copying_output_(output),
    315     impl_(&copying_output_, block_size) {
    316 }
    317 
    318 OstreamOutputStream::~OstreamOutputStream() {
    319   impl_.Flush();
    320 }
    321 
    322 bool OstreamOutputStream::Next(void** data, int* size) {
    323   return impl_.Next(data, size);
    324 }
    325 
    326 void OstreamOutputStream::BackUp(int count) {
    327   impl_.BackUp(count);
    328 }
    329 
    330 int64 OstreamOutputStream::ByteCount() const {
    331   return impl_.ByteCount();
    332 }
    333 
    334 OstreamOutputStream::CopyingOstreamOutputStream::CopyingOstreamOutputStream(
    335     ostream* output)
    336   : output_(output) {
    337 }
    338 
    339 OstreamOutputStream::CopyingOstreamOutputStream::~CopyingOstreamOutputStream() {
    340 }
    341 
    342 bool OstreamOutputStream::CopyingOstreamOutputStream::Write(
    343     const void* buffer, int size) {
    344   output_->write(reinterpret_cast<const char*>(buffer), size);
    345   return output_->good();
    346 }
    347 
    348 // ===================================================================
    349 
    350 ConcatenatingInputStream::ConcatenatingInputStream(
    351     ZeroCopyInputStream* const streams[], int count)
    352   : streams_(streams), stream_count_(count), bytes_retired_(0) {
    353 }
    354 
    355 ConcatenatingInputStream::~ConcatenatingInputStream() {
    356 }
    357 
    358 bool ConcatenatingInputStream::Next(const void** data, int* size) {
    359   while (stream_count_ > 0) {
    360     if (streams_[0]->Next(data, size)) return true;
    361 
    362     // That stream is done.  Advance to the next one.
    363     bytes_retired_ += streams_[0]->ByteCount();
    364     ++streams_;
    365     --stream_count_;
    366   }
    367 
    368   // No more streams.
    369   return false;
    370 }
    371 
    372 void ConcatenatingInputStream::BackUp(int count) {
    373   if (stream_count_ > 0) {
    374     streams_[0]->BackUp(count);
    375   } else {
    376     GOOGLE_LOG(DFATAL) << "Can't BackUp() after failed Next().";
    377   }
    378 }
    379 
    380 bool ConcatenatingInputStream::Skip(int count) {
    381   while (stream_count_ > 0) {
    382     // Assume that ByteCount() can be used to find out how much we actually
    383     // skipped when Skip() fails.
    384     int64 target_byte_count = streams_[0]->ByteCount() + count;
    385     if (streams_[0]->Skip(count)) return true;
    386 
    387     // Hit the end of the stream.  Figure out how many more bytes we still have
    388     // to skip.
    389     int64 final_byte_count = streams_[0]->ByteCount();
    390     GOOGLE_DCHECK_LT(final_byte_count, target_byte_count);
    391     count = target_byte_count - final_byte_count;
    392 
    393     // That stream is done.  Advance to the next one.
    394     bytes_retired_ += final_byte_count;
    395     ++streams_;
    396     --stream_count_;
    397   }
    398 
    399   return false;
    400 }
    401 
    402 int64 ConcatenatingInputStream::ByteCount() const {
    403   if (stream_count_ == 0) {
    404     return bytes_retired_;
    405   } else {
    406     return bytes_retired_ + streams_[0]->ByteCount();
    407   }
    408 }
    409 
    410 
    411 // ===================================================================
    412 
    413 LimitingInputStream::LimitingInputStream(ZeroCopyInputStream* input,
    414                                          int64 limit)
    415   : input_(input), limit_(limit) {}
    416 
    417 LimitingInputStream::~LimitingInputStream() {
    418   // If we overshot the limit, back up.
    419   if (limit_ < 0) input_->BackUp(-limit_);
    420 }
    421 
    422 bool LimitingInputStream::Next(const void** data, int* size) {
    423   if (limit_ <= 0) return false;
    424   if (!input_->Next(data, size)) return false;
    425 
    426   limit_ -= *size;
    427   if (limit_ < 0) {
    428     // We overshot the limit.  Reduce *size to hide the rest of the buffer.
    429     *size += limit_;
    430   }
    431   return true;
    432 }
    433 
    434 void LimitingInputStream::BackUp(int count) {
    435   if (limit_ < 0) {
    436     input_->BackUp(count - limit_);
    437     limit_ = count;
    438   } else {
    439     input_->BackUp(count);
    440     limit_ += count;
    441   }
    442 }
    443 
    444 bool LimitingInputStream::Skip(int count) {
    445   if (count > limit_) {
    446     if (limit_ < 0) return false;
    447     input_->Skip(limit_);
    448     limit_ = 0;
    449     return false;
    450   } else {
    451     if (!input_->Skip(count)) return false;
    452     limit_ -= count;
    453     return true;
    454   }
    455 }
    456 
    457 int64 LimitingInputStream::ByteCount() const {
    458   if (limit_ < 0) {
    459     return input_->ByteCount() + limit_;
    460   } else {
    461     return input_->ByteCount();
    462   }
    463 }
    464 
    465 
    466 // ===================================================================
    467 
    468 }  // namespace io
    469 }  // namespace protobuf
    470 }  // namespace google
    471