Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004 Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #if defined(POSIX)
     29 #include <sys/file.h>
     30 #endif  // POSIX
     31 #include <sys/types.h>
     32 #include <sys/stat.h>
     33 #include <errno.h>
     34 #include <string>
     35 #include "talk/base/basictypes.h"
     36 #include "talk/base/common.h"
     37 #include "talk/base/logging.h"
     38 #include "talk/base/messagequeue.h"
     39 #include "talk/base/stream.h"
     40 #include "talk/base/stringencode.h"
     41 #include "talk/base/stringutils.h"
     42 #include "talk/base/thread.h"
     43 #include "talk/base/timeutils.h"
     44 
     45 #ifdef WIN32
     46 #include "talk/base/win32.h"
     47 #define fileno _fileno
     48 #endif
     49 
     50 namespace talk_base {
     51 
     52 ///////////////////////////////////////////////////////////////////////////////
     53 // StreamInterface
     54 ///////////////////////////////////////////////////////////////////////////////
     55 StreamInterface::~StreamInterface() {
     56 }
     57 
     58 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
     59                                        size_t* written, int* error) {
     60   StreamResult result = SR_SUCCESS;
     61   size_t total_written = 0, current_written;
     62   while (total_written < data_len) {
     63     result = Write(static_cast<const char*>(data) + total_written,
     64                    data_len - total_written, &current_written, error);
     65     if (result != SR_SUCCESS)
     66       break;
     67     total_written += current_written;
     68   }
     69   if (written)
     70     *written = total_written;
     71   return result;
     72 }
     73 
     74 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
     75                                       size_t* read, int* error) {
     76   StreamResult result = SR_SUCCESS;
     77   size_t total_read = 0, current_read;
     78   while (total_read < buffer_len) {
     79     result = Read(static_cast<char*>(buffer) + total_read,
     80                   buffer_len - total_read, &current_read, error);
     81     if (result != SR_SUCCESS)
     82       break;
     83     total_read += current_read;
     84   }
     85   if (read)
     86     *read = total_read;
     87   return result;
     88 }
     89 
     90 StreamResult StreamInterface::ReadLine(std::string* line) {
     91   line->clear();
     92   StreamResult result = SR_SUCCESS;
     93   while (true) {
     94     char ch;
     95     result = Read(&ch, sizeof(ch), NULL, NULL);
     96     if (result != SR_SUCCESS) {
     97       break;
     98     }
     99     if (ch == '\n') {
    100       break;
    101     }
    102     line->push_back(ch);
    103   }
    104   if (!line->empty()) {   // give back the line we've collected so far with
    105     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
    106   }
    107   return result;
    108 }
    109 
    110 void StreamInterface::PostEvent(Thread* t, int events, int err) {
    111   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
    112 }
    113 
    114 void StreamInterface::PostEvent(int events, int err) {
    115   PostEvent(Thread::Current(), events, err);
    116 }
    117 
    118 StreamInterface::StreamInterface() {
    119 }
    120 
    121 void StreamInterface::OnMessage(Message* msg) {
    122   if (MSG_POST_EVENT == msg->message_id) {
    123     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
    124     SignalEvent(this, pe->events, pe->error);
    125     delete msg->pdata;
    126   }
    127 }
    128 
    129 ///////////////////////////////////////////////////////////////////////////////
    130 // StreamAdapterInterface
    131 ///////////////////////////////////////////////////////////////////////////////
    132 
    133 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
    134                                                bool owned)
    135     : stream_(stream), owned_(owned) {
    136   if (NULL != stream_)
    137     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    138 }
    139 
    140 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
    141   if (NULL != stream_)
    142     stream_->SignalEvent.disconnect(this);
    143   if (owned_)
    144     delete stream_;
    145   stream_ = stream;
    146   owned_ = owned;
    147   if (NULL != stream_)
    148     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    149 }
    150 
    151 StreamInterface* StreamAdapterInterface::Detach() {
    152   if (NULL != stream_)
    153     stream_->SignalEvent.disconnect(this);
    154   StreamInterface* stream = stream_;
    155   stream_ = NULL;
    156   return stream;
    157 }
    158 
    159 StreamAdapterInterface::~StreamAdapterInterface() {
    160   if (owned_)
    161     delete stream_;
    162 }
    163 
    164 ///////////////////////////////////////////////////////////////////////////////
    165 // StreamTap
    166 ///////////////////////////////////////////////////////////////////////////////
    167 
    168 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
    169     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
    170         tap_error_(0) {
    171   AttachTap(tap);
    172 }
    173 
    174 void StreamTap::AttachTap(StreamInterface* tap) {
    175   tap_.reset(tap);
    176 }
    177 
    178 StreamInterface* StreamTap::DetachTap() {
    179   return tap_.release();
    180 }
    181 
    182 StreamResult StreamTap::GetTapResult(int* error) {
    183   if (error) {
    184     *error = tap_error_;
    185   }
    186   return tap_result_;
    187 }
    188 
    189 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
    190                              size_t* read, int* error) {
    191   size_t backup_read;
    192   if (!read) {
    193     read = &backup_read;
    194   }
    195   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
    196                                                   read, error);
    197   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    198     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
    199   }
    200   return res;
    201 }
    202 
    203 StreamResult StreamTap::Write(const void* data, size_t data_len,
    204                               size_t* written, int* error) {
    205   size_t backup_written;
    206   if (!written) {
    207     written = &backup_written;
    208   }
    209   StreamResult res = StreamAdapterInterface::Write(data, data_len,
    210                                                    written, error);
    211   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    212     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
    213   }
    214   return res;
    215 }
    216 
    217 ///////////////////////////////////////////////////////////////////////////////
    218 // StreamSegment
    219 ///////////////////////////////////////////////////////////////////////////////
    220 
    221 StreamSegment::StreamSegment(StreamInterface* stream)
    222     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    223     length_(SIZE_UNKNOWN) {
    224   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    225   stream->GetPosition(&start_);
    226 }
    227 
    228 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
    229     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    230     length_(length) {
    231   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    232   stream->GetPosition(&start_);
    233 }
    234 
    235 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
    236                                  size_t* read, int* error) {
    237   if (SIZE_UNKNOWN != length_) {
    238     if (pos_ >= length_)
    239       return SR_EOS;
    240     buffer_len = _min(buffer_len, length_ - pos_);
    241   }
    242   size_t backup_read;
    243   if (!read) {
    244     read = &backup_read;
    245   }
    246   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
    247                                                      read, error);
    248   if (SR_SUCCESS == result) {
    249     pos_ += *read;
    250   }
    251   return result;
    252 }
    253 
    254 bool StreamSegment::SetPosition(size_t position) {
    255   if (SIZE_UNKNOWN == start_)
    256     return false;  // Not seekable
    257   if ((SIZE_UNKNOWN != length_) && (position > length_))
    258     return false;  // Seek past end of segment
    259   if (!StreamAdapterInterface::SetPosition(start_ + position))
    260     return false;
    261   pos_ = position;
    262   return true;
    263 }
    264 
    265 bool StreamSegment::GetPosition(size_t* position) const {
    266   if (SIZE_UNKNOWN == start_)
    267     return false;  // Not seekable
    268   if (!StreamAdapterInterface::GetPosition(position))
    269     return false;
    270   if (position) {
    271     ASSERT(*position >= start_);
    272     *position -= start_;
    273   }
    274   return true;
    275 }
    276 
    277 bool StreamSegment::GetSize(size_t* size) const {
    278   if (!StreamAdapterInterface::GetSize(size))
    279     return false;
    280   if (size) {
    281     if (SIZE_UNKNOWN != start_) {
    282       ASSERT(*size >= start_);
    283       *size -= start_;
    284     }
    285     if (SIZE_UNKNOWN != length_) {
    286       *size = _min(*size, length_);
    287     }
    288   }
    289   return true;
    290 }
    291 
    292 bool StreamSegment::GetAvailable(size_t* size) const {
    293   if (!StreamAdapterInterface::GetAvailable(size))
    294     return false;
    295   if (size && (SIZE_UNKNOWN != length_))
    296     *size = _min(*size, length_ - pos_);
    297   return true;
    298 }
    299 
    300 ///////////////////////////////////////////////////////////////////////////////
    301 // NullStream
    302 ///////////////////////////////////////////////////////////////////////////////
    303 
    304 NullStream::NullStream() {
    305 }
    306 
    307 NullStream::~NullStream() {
    308 }
    309 
    310 StreamState NullStream::GetState() const {
    311   return SS_OPEN;
    312 }
    313 
    314 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
    315                               size_t* read, int* error) {
    316   if (error) *error = -1;
    317   return SR_ERROR;
    318 }
    319 
    320 StreamResult NullStream::Write(const void* data, size_t data_len,
    321                                size_t* written, int* error) {
    322   if (written) *written = data_len;
    323   return SR_SUCCESS;
    324 }
    325 
    326 void NullStream::Close() {
    327 }
    328 
    329 ///////////////////////////////////////////////////////////////////////////////
    330 // FileStream
    331 ///////////////////////////////////////////////////////////////////////////////
    332 
    333 FileStream::FileStream() : file_(NULL) {
    334 }
    335 
    336 FileStream::~FileStream() {
    337   FileStream::Close();
    338 }
    339 
    340 bool FileStream::Open(const std::string& filename, const char* mode,
    341                       int* error) {
    342   Close();
    343 #ifdef WIN32
    344   std::wstring wfilename;
    345   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    346     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
    347   } else {
    348     if (error) {
    349       *error = -1;
    350       return false;
    351     }
    352   }
    353 #else
    354   file_ = fopen(filename.c_str(), mode);
    355 #endif
    356   if (!file_ && error) {
    357     *error = errno;
    358   }
    359   return (file_ != NULL);
    360 }
    361 
    362 bool FileStream::OpenShare(const std::string& filename, const char* mode,
    363                            int shflag, int* error) {
    364   Close();
    365 #ifdef WIN32
    366   std::wstring wfilename;
    367   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    368     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
    369     if (!file_ && error) {
    370       *error = errno;
    371       return false;
    372     }
    373     return file_ != NULL;
    374   } else {
    375     if (error) {
    376       *error = -1;
    377     }
    378     return false;
    379   }
    380 #else
    381   return Open(filename, mode, error);
    382 #endif
    383 }
    384 
    385 bool FileStream::DisableBuffering() {
    386   if (!file_)
    387     return false;
    388   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
    389 }
    390 
    391 StreamState FileStream::GetState() const {
    392   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
    393 }
    394 
    395 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
    396                               size_t* read, int* error) {
    397   if (!file_)
    398     return SR_EOS;
    399   size_t result = fread(buffer, 1, buffer_len, file_);
    400   if ((result == 0) && (buffer_len > 0)) {
    401     if (feof(file_))
    402       return SR_EOS;
    403     if (error)
    404       *error = errno;
    405     return SR_ERROR;
    406   }
    407   if (read)
    408     *read = result;
    409   return SR_SUCCESS;
    410 }
    411 
    412 StreamResult FileStream::Write(const void* data, size_t data_len,
    413                                size_t* written, int* error) {
    414   if (!file_)
    415     return SR_EOS;
    416   size_t result = fwrite(data, 1, data_len, file_);
    417   if ((result == 0) && (data_len > 0)) {
    418     if (error)
    419       *error = errno;
    420     return SR_ERROR;
    421   }
    422   if (written)
    423     *written = result;
    424   return SR_SUCCESS;
    425 }
    426 
    427 void FileStream::Close() {
    428   if (file_) {
    429     DoClose();
    430     file_ = NULL;
    431   }
    432 }
    433 
    434 bool FileStream::SetPosition(size_t position) {
    435   if (!file_)
    436     return false;
    437   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
    438 }
    439 
    440 bool FileStream::GetPosition(size_t* position) const {
    441   ASSERT(NULL != position);
    442   if (!file_)
    443     return false;
    444   long result = ftell(file_);
    445   if (result < 0)
    446     return false;
    447   if (position)
    448     *position = result;
    449   return true;
    450 }
    451 
    452 bool FileStream::GetSize(size_t* size) const {
    453   ASSERT(NULL != size);
    454   if (!file_)
    455     return false;
    456   struct stat file_stats;
    457   if (fstat(fileno(file_), &file_stats) != 0)
    458     return false;
    459   if (size)
    460     *size = file_stats.st_size;
    461   return true;
    462 }
    463 
    464 bool FileStream::GetAvailable(size_t* size) const {
    465   ASSERT(NULL != size);
    466   if (!GetSize(size))
    467     return false;
    468   long result = ftell(file_);
    469   if (result < 0)
    470     return false;
    471   if (size)
    472     *size -= result;
    473   return true;
    474 }
    475 
    476 bool FileStream::ReserveSize(size_t size) {
    477   // TODO: extend the file to the proper length
    478   return true;
    479 }
    480 
    481 bool FileStream::GetSize(const std::string& filename, size_t* size) {
    482   struct stat file_stats;
    483   if (stat(filename.c_str(), &file_stats) != 0)
    484     return false;
    485   *size = file_stats.st_size;
    486   return true;
    487 }
    488 
    489 bool FileStream::Flush() {
    490   if (file_) {
    491     return (0 == fflush(file_));
    492   }
    493   // try to flush empty file?
    494   ASSERT(false);
    495   return false;
    496 }
    497 
    498 #if defined(POSIX)
    499 
    500 bool FileStream::TryLock() {
    501   if (file_ == NULL) {
    502     // Stream not open.
    503     ASSERT(false);
    504     return false;
    505   }
    506 
    507   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
    508 }
    509 
    510 bool FileStream::Unlock() {
    511   if (file_ == NULL) {
    512     // Stream not open.
    513     ASSERT(false);
    514     return false;
    515   }
    516 
    517   return flock(fileno(file_), LOCK_UN) == 0;
    518 }
    519 
    520 #endif
    521 
    522 void FileStream::DoClose() {
    523   fclose(file_);
    524 }
    525 
    526 CircularFileStream::CircularFileStream(size_t max_size)
    527   : max_write_size_(max_size),
    528     position_(0),
    529     marked_position_(max_size / 2),
    530     last_write_position_(0),
    531     read_segment_(READ_LATEST),
    532     read_segment_available_(0) {
    533 }
    534 
    535 bool CircularFileStream::Open(
    536     const std::string& filename, const char* mode, int* error) {
    537   if (!FileStream::Open(filename.c_str(), mode, error))
    538     return false;
    539 
    540   if (strchr(mode, "r") != NULL) {  // Opened in read mode.
    541     // Check if the buffer has been overwritten and determine how to read the
    542     // log in time sequence.
    543     size_t file_size;
    544     GetSize(&file_size);
    545     if (file_size == position_) {
    546       // The buffer has not been overwritten yet. Read 0 .. file_size
    547       read_segment_ = READ_LATEST;
    548       read_segment_available_ = file_size;
    549     } else {
    550       // The buffer has been over written. There are three segments: The first
    551       // one is 0 .. marked_position_, which is the marked earliest log. The
    552       // second one is position_ .. file_size, which is the middle log. The
    553       // last one is marked_position_ .. position_, which is the latest log.
    554       read_segment_ = READ_MARKED;
    555       read_segment_available_ = marked_position_;
    556       last_write_position_ = position_;
    557     }
    558 
    559     // Read from the beginning.
    560     position_ = 0;
    561     SetPosition(position_);
    562   }
    563 
    564   return true;
    565 }
    566 
    567 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
    568                                       size_t* read, int* error) {
    569   if (read_segment_available_ == 0) {
    570     size_t file_size;
    571     switch (read_segment_) {
    572       case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
    573         read_segment_ = READ_MIDDLE;
    574         position_ = last_write_position_;
    575         SetPosition(position_);
    576         GetSize(&file_size);
    577         read_segment_available_ = file_size - position_;
    578         break;
    579 
    580       case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
    581         read_segment_ = READ_LATEST;
    582         position_ = marked_position_;
    583         SetPosition(position_);
    584         read_segment_available_ = last_write_position_ - position_;
    585         break;
    586 
    587       default:  // Finished READ_LATEST and return EOS.
    588         return talk_base::SR_EOS;
    589     }
    590   }
    591 
    592   size_t local_read;
    593   if (!read) read = &local_read;
    594 
    595   size_t to_read = talk_base::_min(buffer_len, read_segment_available_);
    596   talk_base::StreamResult result
    597     = talk_base::FileStream::Read(buffer, to_read, read, error);
    598   if (result == talk_base::SR_SUCCESS) {
    599     read_segment_available_ -= *read;
    600     position_ += *read;
    601   }
    602   return result;
    603 }
    604 
    605 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
    606                                        size_t* written, int* error) {
    607   if (position_ >= max_write_size_) {
    608     ASSERT(position_ == max_write_size_);
    609     position_ = marked_position_;
    610     SetPosition(position_);
    611   }
    612 
    613   size_t local_written;
    614   if (!written) written = &local_written;
    615 
    616   size_t to_eof = max_write_size_ - position_;
    617   size_t to_write = talk_base::_min(data_len, to_eof);
    618   talk_base::StreamResult result
    619     = talk_base::FileStream::Write(data, to_write, written, error);
    620   if (result == talk_base::SR_SUCCESS) {
    621     position_ += *written;
    622   }
    623   return result;
    624 }
    625 
    626 AsyncWriteStream::~AsyncWriteStream() {
    627   write_thread_->Clear(this, 0, NULL);
    628   ClearBufferAndWrite();
    629 
    630   CritScope cs(&crit_stream_);
    631   stream_.reset();
    632 }
    633 
    634 // This is needed by some stream writers, such as RtpDumpWriter.
    635 bool AsyncWriteStream::GetPosition(size_t* position) const {
    636   CritScope cs(&crit_stream_);
    637   return stream_->GetPosition(position);
    638 }
    639 
    640 // This is needed by some stream writers, such as the plugin log writers.
    641 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
    642                                     size_t* read, int* error) {
    643   CritScope cs(&crit_stream_);
    644   return stream_->Read(buffer, buffer_len, read, error);
    645 }
    646 
    647 void AsyncWriteStream::Close() {
    648   if (state_ == SS_CLOSED) {
    649     return;
    650   }
    651 
    652   write_thread_->Clear(this, 0, NULL);
    653   ClearBufferAndWrite();
    654 
    655   CritScope cs(&crit_stream_);
    656   stream_->Close();
    657   state_ = SS_CLOSED;
    658 }
    659 
    660 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
    661                                      size_t* written, int* error) {
    662   if (state_ == SS_CLOSED) {
    663     return SR_ERROR;
    664   }
    665 
    666   size_t previous_buffer_length = 0;
    667   {
    668     CritScope cs(&crit_buffer_);
    669     previous_buffer_length = buffer_.length();
    670     buffer_.AppendData(data, data_len);
    671   }
    672 
    673   if (previous_buffer_length == 0) {
    674     // If there's stuff already in the buffer, then we already called
    675     // Post and the write_thread_ hasn't pulled it out yet, so we
    676     // don't need to re-Post.
    677     write_thread_->Post(this, 0, NULL);
    678   }
    679   // Return immediately, assuming that it works.
    680   if (written) {
    681     *written = data_len;
    682   }
    683   return SR_SUCCESS;
    684 }
    685 
    686 void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
    687   ClearBufferAndWrite();
    688 }
    689 
    690 bool AsyncWriteStream::Flush() {
    691   if (state_ == SS_CLOSED) {
    692     return false;
    693   }
    694 
    695   ClearBufferAndWrite();
    696 
    697   CritScope cs(&crit_stream_);
    698   return stream_->Flush();
    699 }
    700 
    701 void AsyncWriteStream::ClearBufferAndWrite() {
    702   Buffer to_write;
    703   {
    704     CritScope cs_buffer(&crit_buffer_);
    705     buffer_.TransferTo(&to_write);
    706   }
    707 
    708   if (to_write.length() > 0) {
    709     CritScope cs(&crit_stream_);
    710     stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
    711   }
    712 }
    713 
    714 #ifdef POSIX
    715 
    716 // Have to identically rewrite the FileStream destructor or else it would call
    717 // the base class's Close() instead of the sub-class's.
    718 POpenStream::~POpenStream() {
    719   POpenStream::Close();
    720 }
    721 
    722 bool POpenStream::Open(const std::string& subcommand,
    723                        const char* mode,
    724                        int* error) {
    725   Close();
    726   file_ = popen(subcommand.c_str(), mode);
    727   if (file_ == NULL) {
    728     if (error)
    729       *error = errno;
    730     return false;
    731   }
    732   return true;
    733 }
    734 
    735 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
    736                             int shflag, int* error) {
    737   return Open(subcommand, mode, error);
    738 }
    739 
    740 void POpenStream::DoClose() {
    741   wait_status_ = pclose(file_);
    742 }
    743 
    744 #endif
    745 
    746 ///////////////////////////////////////////////////////////////////////////////
    747 // MemoryStream
    748 ///////////////////////////////////////////////////////////////////////////////
    749 
    750 MemoryStreamBase::MemoryStreamBase()
    751   : buffer_(NULL), buffer_length_(0), data_length_(0),
    752     seek_position_(0) {
    753 }
    754 
    755 StreamState MemoryStreamBase::GetState() const {
    756   return SS_OPEN;
    757 }
    758 
    759 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
    760                                     size_t* bytes_read, int* error) {
    761   if (seek_position_ >= data_length_) {
    762     return SR_EOS;
    763   }
    764   size_t available = data_length_ - seek_position_;
    765   if (bytes > available) {
    766     // Read partial buffer
    767     bytes = available;
    768   }
    769   memcpy(buffer, &buffer_[seek_position_], bytes);
    770   seek_position_ += bytes;
    771   if (bytes_read) {
    772     *bytes_read = bytes;
    773   }
    774   return SR_SUCCESS;
    775 }
    776 
    777 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
    778                                      size_t* bytes_written, int* error) {
    779   size_t available = buffer_length_ - seek_position_;
    780   if (0 == available) {
    781     // Increase buffer size to the larger of:
    782     // a) new position rounded up to next 256 bytes
    783     // b) double the previous length
    784     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
    785                                     buffer_length_ * 2);
    786     StreamResult result = DoReserve(new_buffer_length, error);
    787     if (SR_SUCCESS != result) {
    788       return result;
    789     }
    790     ASSERT(buffer_length_ >= new_buffer_length);
    791     available = buffer_length_ - seek_position_;
    792   }
    793 
    794   if (bytes > available) {
    795     bytes = available;
    796   }
    797   memcpy(&buffer_[seek_position_], buffer, bytes);
    798   seek_position_ += bytes;
    799   if (data_length_ < seek_position_) {
    800     data_length_ = seek_position_;
    801   }
    802   if (bytes_written) {
    803     *bytes_written = bytes;
    804   }
    805   return SR_SUCCESS;
    806 }
    807 
    808 void MemoryStreamBase::Close() {
    809   // nothing to do
    810 }
    811 
    812 bool MemoryStreamBase::SetPosition(size_t position) {
    813   if (position > data_length_)
    814     return false;
    815   seek_position_ = position;
    816   return true;
    817 }
    818 
    819 bool MemoryStreamBase::GetPosition(size_t* position) const {
    820   if (position)
    821     *position = seek_position_;
    822   return true;
    823 }
    824 
    825 bool MemoryStreamBase::GetSize(size_t* size) const {
    826   if (size)
    827     *size = data_length_;
    828   return true;
    829 }
    830 
    831 bool MemoryStreamBase::GetAvailable(size_t* size) const {
    832   if (size)
    833     *size = data_length_ - seek_position_;
    834   return true;
    835 }
    836 
    837 bool MemoryStreamBase::ReserveSize(size_t size) {
    838   return (SR_SUCCESS == DoReserve(size, NULL));
    839 }
    840 
    841 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
    842   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
    843 }
    844 
    845 ///////////////////////////////////////////////////////////////////////////////
    846 
    847 MemoryStream::MemoryStream()
    848   : buffer_alloc_(NULL) {
    849 }
    850 
    851 MemoryStream::MemoryStream(const char* data)
    852   : buffer_alloc_(NULL) {
    853   SetData(data, strlen(data));
    854 }
    855 
    856 MemoryStream::MemoryStream(const void* data, size_t length)
    857   : buffer_alloc_(NULL) {
    858   SetData(data, length);
    859 }
    860 
    861 MemoryStream::~MemoryStream() {
    862   delete [] buffer_alloc_;
    863 }
    864 
    865 void MemoryStream::SetData(const void* data, size_t length) {
    866   data_length_ = buffer_length_ = length;
    867   delete [] buffer_alloc_;
    868   buffer_alloc_ = new char[buffer_length_ + kAlignment];
    869   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
    870   memcpy(buffer_, data, data_length_);
    871   seek_position_ = 0;
    872 }
    873 
    874 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
    875   if (buffer_length_ >= size)
    876     return SR_SUCCESS;
    877 
    878   if (char* new_buffer_alloc = new char[size + kAlignment]) {
    879     char* new_buffer = reinterpret_cast<char*>(
    880         ALIGNP(new_buffer_alloc, kAlignment));
    881     memcpy(new_buffer, buffer_, data_length_);
    882     delete [] buffer_alloc_;
    883     buffer_alloc_ = new_buffer_alloc;
    884     buffer_ = new_buffer;
    885     buffer_length_ = size;
    886     return SR_SUCCESS;
    887   }
    888 
    889   if (error) {
    890     *error = ENOMEM;
    891   }
    892   return SR_ERROR;
    893 }
    894 
    895 ///////////////////////////////////////////////////////////////////////////////
    896 
    897 ExternalMemoryStream::ExternalMemoryStream() {
    898 }
    899 
    900 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
    901   SetData(data, length);
    902 }
    903 
    904 ExternalMemoryStream::~ExternalMemoryStream() {
    905 }
    906 
    907 void ExternalMemoryStream::SetData(void* data, size_t length) {
    908   data_length_ = buffer_length_ = length;
    909   buffer_ = static_cast<char*>(data);
    910   seek_position_ = 0;
    911 }
    912 
    913 ///////////////////////////////////////////////////////////////////////////////
    914 // FifoBuffer
    915 ///////////////////////////////////////////////////////////////////////////////
    916 
    917 FifoBuffer::FifoBuffer(size_t size)
    918     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    919       data_length_(0), read_position_(0), owner_(Thread::Current()) {
    920   // all events are done on the owner_ thread
    921 }
    922 
    923 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
    924     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    925       data_length_(0), read_position_(0), owner_(owner) {
    926   // all events are done on the owner_ thread
    927 }
    928 
    929 FifoBuffer::~FifoBuffer() {
    930 }
    931 
    932 bool FifoBuffer::GetBuffered(size_t* size) const {
    933   CritScope cs(&crit_);
    934   *size = data_length_;
    935   return true;
    936 }
    937 
    938 bool FifoBuffer::SetCapacity(size_t size) {
    939   CritScope cs(&crit_);
    940   if (data_length_ > size) {
    941     return false;
    942   }
    943 
    944   if (size != buffer_length_) {
    945     char* buffer = new char[size];
    946     const size_t copy = data_length_;
    947     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
    948     memcpy(buffer, &buffer_[read_position_], tail_copy);
    949     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
    950     buffer_.reset(buffer);
    951     read_position_ = 0;
    952     buffer_length_ = size;
    953   }
    954   return true;
    955 }
    956 
    957 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
    958                                     size_t offset, size_t* bytes_read) {
    959   CritScope cs(&crit_);
    960   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
    961 }
    962 
    963 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
    964                                      size_t offset, size_t* bytes_written) {
    965   CritScope cs(&crit_);
    966   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
    967 }
    968 
    969 StreamState FifoBuffer::GetState() const {
    970   return state_;
    971 }
    972 
    973 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
    974                               size_t* bytes_read, int* error) {
    975   CritScope cs(&crit_);
    976   const bool was_writable = data_length_ < buffer_length_;
    977   size_t copy = 0;
    978   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
    979 
    980   if (result == SR_SUCCESS) {
    981     // If read was successful then adjust the read position and number of
    982     // bytes buffered.
    983     read_position_ = (read_position_ + copy) % buffer_length_;
    984     data_length_ -= copy;
    985     if (bytes_read) {
    986       *bytes_read = copy;
    987     }
    988 
    989     // if we were full before, and now we're not, post an event
    990     if (!was_writable && copy > 0) {
    991       PostEvent(owner_, SE_WRITE, 0);
    992     }
    993   }
    994   return result;
    995 }
    996 
    997 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
    998                                size_t* bytes_written, int* error) {
    999   CritScope cs(&crit_);
   1000 
   1001   const bool was_readable = (data_length_ > 0);
   1002   size_t copy = 0;
   1003   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
   1004 
   1005   if (result == SR_SUCCESS) {
   1006     // If write was successful then adjust the number of readable bytes.
   1007     data_length_ += copy;
   1008     if (bytes_written) {
   1009       *bytes_written = copy;
   1010     }
   1011 
   1012     // if we didn't have any data to read before, and now we do, post an event
   1013     if (!was_readable && copy > 0) {
   1014       PostEvent(owner_, SE_READ, 0);
   1015     }
   1016   }
   1017   return result;
   1018 }
   1019 
   1020 void FifoBuffer::Close() {
   1021   CritScope cs(&crit_);
   1022   state_ = SS_CLOSED;
   1023 }
   1024 
   1025 const void* FifoBuffer::GetReadData(size_t* size) {
   1026   CritScope cs(&crit_);
   1027   *size = (read_position_ + data_length_ <= buffer_length_) ?
   1028       data_length_ : buffer_length_ - read_position_;
   1029   return &buffer_[read_position_];
   1030 }
   1031 
   1032 void FifoBuffer::ConsumeReadData(size_t size) {
   1033   CritScope cs(&crit_);
   1034   ASSERT(size <= data_length_);
   1035   const bool was_writable = data_length_ < buffer_length_;
   1036   read_position_ = (read_position_ + size) % buffer_length_;
   1037   data_length_ -= size;
   1038   if (!was_writable && size > 0) {
   1039     PostEvent(owner_, SE_WRITE, 0);
   1040   }
   1041 }
   1042 
   1043 void* FifoBuffer::GetWriteBuffer(size_t* size) {
   1044   CritScope cs(&crit_);
   1045   if (state_ == SS_CLOSED) {
   1046     return NULL;
   1047   }
   1048 
   1049   // if empty, reset the write position to the beginning, so we can get
   1050   // the biggest possible block
   1051   if (data_length_ == 0) {
   1052     read_position_ = 0;
   1053   }
   1054 
   1055   const size_t write_position = (read_position_ + data_length_)
   1056       % buffer_length_;
   1057   *size = (write_position > read_position_ || data_length_ == 0) ?
   1058       buffer_length_ - write_position : read_position_ - write_position;
   1059   return &buffer_[write_position];
   1060 }
   1061 
   1062 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
   1063   CritScope cs(&crit_);
   1064   ASSERT(size <= buffer_length_ - data_length_);
   1065   const bool was_readable = (data_length_ > 0);
   1066   data_length_ += size;
   1067   if (!was_readable && size > 0) {
   1068     PostEvent(owner_, SE_READ, 0);
   1069   }
   1070 }
   1071 
   1072 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
   1073   CritScope cs(&crit_);
   1074   *size = buffer_length_ - data_length_;
   1075   return true;
   1076 }
   1077 
   1078 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
   1079                                           size_t bytes,
   1080                                           size_t offset,
   1081                                           size_t* bytes_read) {
   1082   if (offset >= data_length_) {
   1083     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
   1084   }
   1085 
   1086   const size_t available = data_length_ - offset;
   1087   const size_t read_position = (read_position_ + offset) % buffer_length_;
   1088   const size_t copy = _min(bytes, available);
   1089   const size_t tail_copy = _min(copy, buffer_length_ - read_position);
   1090   char* const p = static_cast<char*>(buffer);
   1091   memcpy(p, &buffer_[read_position], tail_copy);
   1092   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
   1093 
   1094   if (bytes_read) {
   1095     *bytes_read = copy;
   1096   }
   1097   return SR_SUCCESS;
   1098 }
   1099 
   1100 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
   1101                                            size_t bytes,
   1102                                            size_t offset,
   1103                                            size_t* bytes_written) {
   1104   if (state_ == SS_CLOSED) {
   1105     return SR_EOS;
   1106   }
   1107 
   1108   if (data_length_ + offset >= buffer_length_) {
   1109     return SR_BLOCK;
   1110   }
   1111 
   1112   const size_t available = buffer_length_ - data_length_ - offset;
   1113   const size_t write_position = (read_position_ + data_length_ + offset)
   1114       % buffer_length_;
   1115   const size_t copy = _min(bytes, available);
   1116   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
   1117   const char* const p = static_cast<const char*>(buffer);
   1118   memcpy(&buffer_[write_position], p, tail_copy);
   1119   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
   1120 
   1121   if (bytes_written) {
   1122     *bytes_written = copy;
   1123   }
   1124   return SR_SUCCESS;
   1125 }
   1126 
   1127 
   1128 
   1129 ///////////////////////////////////////////////////////////////////////////////
   1130 // LoggingAdapter
   1131 ///////////////////////////////////////////////////////////////////////////////
   1132 
   1133 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
   1134                                const std::string& label, bool hex_mode)
   1135     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
   1136   set_label(label);
   1137 }
   1138 
   1139 void LoggingAdapter::set_label(const std::string& label) {
   1140   label_.assign("[");
   1141   label_.append(label);
   1142   label_.append("]");
   1143 }
   1144 
   1145 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
   1146                                   size_t* read, int* error) {
   1147   size_t local_read; if (!read) read = &local_read;
   1148   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
   1149                                                      error);
   1150   if (result == SR_SUCCESS) {
   1151     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
   1152   }
   1153   return result;
   1154 }
   1155 
   1156 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
   1157                                    size_t* written, int* error) {
   1158   size_t local_written;
   1159   if (!written) written = &local_written;
   1160   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
   1161                                                       error);
   1162   if (result == SR_SUCCESS) {
   1163     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
   1164                  &lms_);
   1165   }
   1166   return result;
   1167 }
   1168 
   1169 void LoggingAdapter::Close() {
   1170   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1171   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1172   LOG_V(level_) << label_ << " Closed locally";
   1173   StreamAdapterInterface::Close();
   1174 }
   1175 
   1176 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
   1177   if (events & SE_OPEN) {
   1178     LOG_V(level_) << label_ << " Open";
   1179   } else if (events & SE_CLOSE) {
   1180     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1181     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1182     LOG_V(level_) << label_ << " Closed with error: " << err;
   1183   }
   1184   StreamAdapterInterface::OnEvent(stream, events, err);
   1185 }
   1186 
   1187 ///////////////////////////////////////////////////////////////////////////////
   1188 // StringStream - Reads/Writes to an external std::string
   1189 ///////////////////////////////////////////////////////////////////////////////
   1190 
   1191 StringStream::StringStream(std::string& str)
   1192     : str_(str), read_pos_(0), read_only_(false) {
   1193 }
   1194 
   1195 StringStream::StringStream(const std::string& str)
   1196     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
   1197 }
   1198 
   1199 StreamState StringStream::GetState() const {
   1200   return SS_OPEN;
   1201 }
   1202 
   1203 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
   1204                                       size_t* read, int* error) {
   1205   size_t available = _min(buffer_len, str_.size() - read_pos_);
   1206   if (!available)
   1207     return SR_EOS;
   1208   memcpy(buffer, str_.data() + read_pos_, available);
   1209   read_pos_ += available;
   1210   if (read)
   1211     *read = available;
   1212   return SR_SUCCESS;
   1213 }
   1214 
   1215 StreamResult StringStream::Write(const void* data, size_t data_len,
   1216                                       size_t* written, int* error) {
   1217   if (read_only_) {
   1218     if (error) {
   1219       *error = -1;
   1220     }
   1221     return SR_ERROR;
   1222   }
   1223   str_.append(static_cast<const char*>(data),
   1224               static_cast<const char*>(data) + data_len);
   1225   if (written)
   1226     *written = data_len;
   1227   return SR_SUCCESS;
   1228 }
   1229 
   1230 void StringStream::Close() {
   1231 }
   1232 
   1233 bool StringStream::SetPosition(size_t position) {
   1234   if (position > str_.size())
   1235     return false;
   1236   read_pos_ = position;
   1237   return true;
   1238 }
   1239 
   1240 bool StringStream::GetPosition(size_t* position) const {
   1241   if (position)
   1242     *position = read_pos_;
   1243   return true;
   1244 }
   1245 
   1246 bool StringStream::GetSize(size_t* size) const {
   1247   if (size)
   1248     *size = str_.size();
   1249   return true;
   1250 }
   1251 
   1252 bool StringStream::GetAvailable(size_t* size) const {
   1253   if (size)
   1254     *size = str_.size() - read_pos_;
   1255   return true;
   1256 }
   1257 
   1258 bool StringStream::ReserveSize(size_t size) {
   1259   if (read_only_)
   1260     return false;
   1261   str_.reserve(size);
   1262   return true;
   1263 }
   1264 
   1265 ///////////////////////////////////////////////////////////////////////////////
   1266 // StreamReference
   1267 ///////////////////////////////////////////////////////////////////////////////
   1268 
   1269 StreamReference::StreamReference(StreamInterface* stream)
   1270     : StreamAdapterInterface(stream, false) {
   1271   // owner set to false so the destructor does not free the stream.
   1272   stream_ref_count_ = new StreamRefCount(stream);
   1273 }
   1274 
   1275 StreamInterface* StreamReference::NewReference() {
   1276   stream_ref_count_->AddReference();
   1277   return new StreamReference(stream_ref_count_, stream());
   1278 }
   1279 
   1280 StreamReference::~StreamReference() {
   1281   stream_ref_count_->Release();
   1282 }
   1283 
   1284 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
   1285                                  StreamInterface* stream)
   1286     : StreamAdapterInterface(stream, false),
   1287       stream_ref_count_(stream_ref_count) {
   1288 }
   1289 
   1290 ///////////////////////////////////////////////////////////////////////////////
   1291 
   1292 StreamResult Flow(StreamInterface* source,
   1293                   char* buffer, size_t buffer_len,
   1294                   StreamInterface* sink,
   1295                   size_t* data_len /* = NULL */) {
   1296   ASSERT(buffer_len > 0);
   1297 
   1298   StreamResult result;
   1299   size_t count, read_pos, write_pos;
   1300   if (data_len) {
   1301     read_pos = *data_len;
   1302   } else {
   1303     read_pos = 0;
   1304   }
   1305 
   1306   bool end_of_stream = false;
   1307   do {
   1308     // Read until buffer is full, end of stream, or error
   1309     while (!end_of_stream && (read_pos < buffer_len)) {
   1310       result = source->Read(buffer + read_pos, buffer_len - read_pos,
   1311                             &count, NULL);
   1312       if (result == SR_EOS) {
   1313         end_of_stream = true;
   1314       } else if (result != SR_SUCCESS) {
   1315         if (data_len) {
   1316           *data_len = read_pos;
   1317         }
   1318         return result;
   1319       } else {
   1320         read_pos += count;
   1321       }
   1322     }
   1323 
   1324     // Write until buffer is empty, or error (including end of stream)
   1325     write_pos = 0;
   1326     while (write_pos < read_pos) {
   1327       result = sink->Write(buffer + write_pos, read_pos - write_pos,
   1328                            &count, NULL);
   1329       if (result != SR_SUCCESS) {
   1330         if (data_len) {
   1331           *data_len = read_pos - write_pos;
   1332           if (write_pos > 0) {
   1333             memmove(buffer, buffer + write_pos, *data_len);
   1334           }
   1335         }
   1336         return result;
   1337       }
   1338       write_pos += count;
   1339     }
   1340 
   1341     read_pos = 0;
   1342   } while (!end_of_stream);
   1343 
   1344   if (data_len) {
   1345     *data_len = 0;
   1346   }
   1347   return SR_SUCCESS;
   1348 }
   1349 
   1350 ///////////////////////////////////////////////////////////////////////////////
   1351 
   1352 }  // namespace talk_base
   1353