Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #if defined(WEBRTC_POSIX)
     12 #include <sys/file.h>
     13 #endif  // WEBRTC_POSIX
     14 #include <sys/types.h>
     15 #include <sys/stat.h>
     16 #include <errno.h>
     17 #include <string>
     18 #include "webrtc/base/basictypes.h"
     19 #include "webrtc/base/common.h"
     20 #include "webrtc/base/logging.h"
     21 #include "webrtc/base/messagequeue.h"
     22 #include "webrtc/base/stream.h"
     23 #include "webrtc/base/stringencode.h"
     24 #include "webrtc/base/stringutils.h"
     25 #include "webrtc/base/thread.h"
     26 #include "webrtc/base/timeutils.h"
     27 
     28 #if defined(WEBRTC_WIN)
     29 #include "webrtc/base/win32.h"
     30 #define fileno _fileno
     31 #endif
     32 
     33 namespace rtc {
     34 
     35 ///////////////////////////////////////////////////////////////////////////////
     36 // StreamInterface
     37 ///////////////////////////////////////////////////////////////////////////////
     38 StreamInterface::~StreamInterface() {
     39 }
     40 
     41 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
     42                                        size_t* written, int* error) {
     43   StreamResult result = SR_SUCCESS;
     44   size_t total_written = 0, current_written;
     45   while (total_written < data_len) {
     46     result = Write(static_cast<const char*>(data) + total_written,
     47                    data_len - total_written, &current_written, error);
     48     if (result != SR_SUCCESS)
     49       break;
     50     total_written += current_written;
     51   }
     52   if (written)
     53     *written = total_written;
     54   return result;
     55 }
     56 
     57 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
     58                                       size_t* read, int* error) {
     59   StreamResult result = SR_SUCCESS;
     60   size_t total_read = 0, current_read;
     61   while (total_read < buffer_len) {
     62     result = Read(static_cast<char*>(buffer) + total_read,
     63                   buffer_len - total_read, &current_read, error);
     64     if (result != SR_SUCCESS)
     65       break;
     66     total_read += current_read;
     67   }
     68   if (read)
     69     *read = total_read;
     70   return result;
     71 }
     72 
     73 StreamResult StreamInterface::ReadLine(std::string* line) {
     74   line->clear();
     75   StreamResult result = SR_SUCCESS;
     76   while (true) {
     77     char ch;
     78     result = Read(&ch, sizeof(ch), NULL, NULL);
     79     if (result != SR_SUCCESS) {
     80       break;
     81     }
     82     if (ch == '\n') {
     83       break;
     84     }
     85     line->push_back(ch);
     86   }
     87   if (!line->empty()) {   // give back the line we've collected so far with
     88     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
     89   }
     90   return result;
     91 }
     92 
     93 void StreamInterface::PostEvent(Thread* t, int events, int err) {
     94   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
     95 }
     96 
     97 void StreamInterface::PostEvent(int events, int err) {
     98   PostEvent(Thread::Current(), events, err);
     99 }
    100 
    101 StreamInterface::StreamInterface() {
    102 }
    103 
    104 void StreamInterface::OnMessage(Message* msg) {
    105   if (MSG_POST_EVENT == msg->message_id) {
    106     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
    107     SignalEvent(this, pe->events, pe->error);
    108     delete msg->pdata;
    109   }
    110 }
    111 
    112 ///////////////////////////////////////////////////////////////////////////////
    113 // StreamAdapterInterface
    114 ///////////////////////////////////////////////////////////////////////////////
    115 
    116 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
    117                                                bool owned)
    118     : stream_(stream), owned_(owned) {
    119   if (NULL != stream_)
    120     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    121 }
    122 
    123 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
    124   if (NULL != stream_)
    125     stream_->SignalEvent.disconnect(this);
    126   if (owned_)
    127     delete stream_;
    128   stream_ = stream;
    129   owned_ = owned;
    130   if (NULL != stream_)
    131     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    132 }
    133 
    134 StreamInterface* StreamAdapterInterface::Detach() {
    135   if (NULL != stream_)
    136     stream_->SignalEvent.disconnect(this);
    137   StreamInterface* stream = stream_;
    138   stream_ = NULL;
    139   return stream;
    140 }
    141 
    142 StreamAdapterInterface::~StreamAdapterInterface() {
    143   if (owned_)
    144     delete stream_;
    145 }
    146 
    147 ///////////////////////////////////////////////////////////////////////////////
    148 // StreamTap
    149 ///////////////////////////////////////////////////////////////////////////////
    150 
    151 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
    152     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
    153         tap_error_(0) {
    154   AttachTap(tap);
    155 }
    156 
    157 void StreamTap::AttachTap(StreamInterface* tap) {
    158   tap_.reset(tap);
    159 }
    160 
    161 StreamInterface* StreamTap::DetachTap() {
    162   return tap_.release();
    163 }
    164 
    165 StreamResult StreamTap::GetTapResult(int* error) {
    166   if (error) {
    167     *error = tap_error_;
    168   }
    169   return tap_result_;
    170 }
    171 
    172 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
    173                              size_t* read, int* error) {
    174   size_t backup_read;
    175   if (!read) {
    176     read = &backup_read;
    177   }
    178   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
    179                                                   read, error);
    180   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    181     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
    182   }
    183   return res;
    184 }
    185 
    186 StreamResult StreamTap::Write(const void* data, size_t data_len,
    187                               size_t* written, int* error) {
    188   size_t backup_written;
    189   if (!written) {
    190     written = &backup_written;
    191   }
    192   StreamResult res = StreamAdapterInterface::Write(data, data_len,
    193                                                    written, error);
    194   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    195     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
    196   }
    197   return res;
    198 }
    199 
    200 ///////////////////////////////////////////////////////////////////////////////
    201 // StreamSegment
    202 ///////////////////////////////////////////////////////////////////////////////
    203 
    204 StreamSegment::StreamSegment(StreamInterface* stream)
    205     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    206     length_(SIZE_UNKNOWN) {
    207   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    208   stream->GetPosition(&start_);
    209 }
    210 
    211 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
    212     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    213     length_(length) {
    214   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    215   stream->GetPosition(&start_);
    216 }
    217 
    218 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
    219                                  size_t* read, int* error) {
    220   if (SIZE_UNKNOWN != length_) {
    221     if (pos_ >= length_)
    222       return SR_EOS;
    223     buffer_len = _min(buffer_len, length_ - pos_);
    224   }
    225   size_t backup_read;
    226   if (!read) {
    227     read = &backup_read;
    228   }
    229   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
    230                                                      read, error);
    231   if (SR_SUCCESS == result) {
    232     pos_ += *read;
    233   }
    234   return result;
    235 }
    236 
    237 bool StreamSegment::SetPosition(size_t position) {
    238   if (SIZE_UNKNOWN == start_)
    239     return false;  // Not seekable
    240   if ((SIZE_UNKNOWN != length_) && (position > length_))
    241     return false;  // Seek past end of segment
    242   if (!StreamAdapterInterface::SetPosition(start_ + position))
    243     return false;
    244   pos_ = position;
    245   return true;
    246 }
    247 
    248 bool StreamSegment::GetPosition(size_t* position) const {
    249   if (SIZE_UNKNOWN == start_)
    250     return false;  // Not seekable
    251   if (!StreamAdapterInterface::GetPosition(position))
    252     return false;
    253   if (position) {
    254     ASSERT(*position >= start_);
    255     *position -= start_;
    256   }
    257   return true;
    258 }
    259 
    260 bool StreamSegment::GetSize(size_t* size) const {
    261   if (!StreamAdapterInterface::GetSize(size))
    262     return false;
    263   if (size) {
    264     if (SIZE_UNKNOWN != start_) {
    265       ASSERT(*size >= start_);
    266       *size -= start_;
    267     }
    268     if (SIZE_UNKNOWN != length_) {
    269       *size = _min(*size, length_);
    270     }
    271   }
    272   return true;
    273 }
    274 
    275 bool StreamSegment::GetAvailable(size_t* size) const {
    276   if (!StreamAdapterInterface::GetAvailable(size))
    277     return false;
    278   if (size && (SIZE_UNKNOWN != length_))
    279     *size = _min(*size, length_ - pos_);
    280   return true;
    281 }
    282 
    283 ///////////////////////////////////////////////////////////////////////////////
    284 // NullStream
    285 ///////////////////////////////////////////////////////////////////////////////
    286 
    287 NullStream::NullStream() {
    288 }
    289 
    290 NullStream::~NullStream() {
    291 }
    292 
    293 StreamState NullStream::GetState() const {
    294   return SS_OPEN;
    295 }
    296 
    297 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
    298                               size_t* read, int* error) {
    299   if (error) *error = -1;
    300   return SR_ERROR;
    301 }
    302 
    303 StreamResult NullStream::Write(const void* data, size_t data_len,
    304                                size_t* written, int* error) {
    305   if (written) *written = data_len;
    306   return SR_SUCCESS;
    307 }
    308 
    309 void NullStream::Close() {
    310 }
    311 
    312 ///////////////////////////////////////////////////////////////////////////////
    313 // FileStream
    314 ///////////////////////////////////////////////////////////////////////////////
    315 
    316 FileStream::FileStream() : file_(NULL) {
    317 }
    318 
    319 FileStream::~FileStream() {
    320   FileStream::Close();
    321 }
    322 
    323 bool FileStream::Open(const std::string& filename, const char* mode,
    324                       int* error) {
    325   Close();
    326 #if defined(WEBRTC_WIN)
    327   std::wstring wfilename;
    328   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    329     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
    330   } else {
    331     if (error) {
    332       *error = -1;
    333       return false;
    334     }
    335   }
    336 #else
    337   file_ = fopen(filename.c_str(), mode);
    338 #endif
    339   if (!file_ && error) {
    340     *error = errno;
    341   }
    342   return (file_ != NULL);
    343 }
    344 
    345 bool FileStream::OpenShare(const std::string& filename, const char* mode,
    346                            int shflag, int* error) {
    347   Close();
    348 #if defined(WEBRTC_WIN)
    349   std::wstring wfilename;
    350   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    351     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
    352     if (!file_ && error) {
    353       *error = errno;
    354       return false;
    355     }
    356     return file_ != NULL;
    357   } else {
    358     if (error) {
    359       *error = -1;
    360     }
    361     return false;
    362   }
    363 #else
    364   return Open(filename, mode, error);
    365 #endif
    366 }
    367 
    368 bool FileStream::DisableBuffering() {
    369   if (!file_)
    370     return false;
    371   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
    372 }
    373 
    374 StreamState FileStream::GetState() const {
    375   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
    376 }
    377 
    378 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
    379                               size_t* read, int* error) {
    380   if (!file_)
    381     return SR_EOS;
    382   size_t result = fread(buffer, 1, buffer_len, file_);
    383   if ((result == 0) && (buffer_len > 0)) {
    384     if (feof(file_))
    385       return SR_EOS;
    386     if (error)
    387       *error = errno;
    388     return SR_ERROR;
    389   }
    390   if (read)
    391     *read = result;
    392   return SR_SUCCESS;
    393 }
    394 
    395 StreamResult FileStream::Write(const void* data, size_t data_len,
    396                                size_t* written, int* error) {
    397   if (!file_)
    398     return SR_EOS;
    399   size_t result = fwrite(data, 1, data_len, file_);
    400   if ((result == 0) && (data_len > 0)) {
    401     if (error)
    402       *error = errno;
    403     return SR_ERROR;
    404   }
    405   if (written)
    406     *written = result;
    407   return SR_SUCCESS;
    408 }
    409 
    410 void FileStream::Close() {
    411   if (file_) {
    412     DoClose();
    413     file_ = NULL;
    414   }
    415 }
    416 
    417 bool FileStream::SetPosition(size_t position) {
    418   if (!file_)
    419     return false;
    420   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
    421 }
    422 
    423 bool FileStream::GetPosition(size_t* position) const {
    424   ASSERT(NULL != position);
    425   if (!file_)
    426     return false;
    427   long result = ftell(file_);
    428   if (result < 0)
    429     return false;
    430   if (position)
    431     *position = result;
    432   return true;
    433 }
    434 
    435 bool FileStream::GetSize(size_t* size) const {
    436   ASSERT(NULL != size);
    437   if (!file_)
    438     return false;
    439   struct stat file_stats;
    440   if (fstat(fileno(file_), &file_stats) != 0)
    441     return false;
    442   if (size)
    443     *size = file_stats.st_size;
    444   return true;
    445 }
    446 
    447 bool FileStream::GetAvailable(size_t* size) const {
    448   ASSERT(NULL != size);
    449   if (!GetSize(size))
    450     return false;
    451   long result = ftell(file_);
    452   if (result < 0)
    453     return false;
    454   if (size)
    455     *size -= result;
    456   return true;
    457 }
    458 
    459 bool FileStream::ReserveSize(size_t size) {
    460   // TODO: extend the file to the proper length
    461   return true;
    462 }
    463 
    464 bool FileStream::GetSize(const std::string& filename, size_t* size) {
    465   struct stat file_stats;
    466   if (stat(filename.c_str(), &file_stats) != 0)
    467     return false;
    468   *size = file_stats.st_size;
    469   return true;
    470 }
    471 
    472 bool FileStream::Flush() {
    473   if (file_) {
    474     return (0 == fflush(file_));
    475   }
    476   // try to flush empty file?
    477   ASSERT(false);
    478   return false;
    479 }
    480 
    481 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
    482 
    483 bool FileStream::TryLock() {
    484   if (file_ == NULL) {
    485     // Stream not open.
    486     ASSERT(false);
    487     return false;
    488   }
    489 
    490   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
    491 }
    492 
    493 bool FileStream::Unlock() {
    494   if (file_ == NULL) {
    495     // Stream not open.
    496     ASSERT(false);
    497     return false;
    498   }
    499 
    500   return flock(fileno(file_), LOCK_UN) == 0;
    501 }
    502 
    503 #endif
    504 
    505 void FileStream::DoClose() {
    506   fclose(file_);
    507 }
    508 
    509 CircularFileStream::CircularFileStream(size_t max_size)
    510   : max_write_size_(max_size),
    511     position_(0),
    512     marked_position_(max_size / 2),
    513     last_write_position_(0),
    514     read_segment_(READ_LATEST),
    515     read_segment_available_(0) {
    516 }
    517 
    518 bool CircularFileStream::Open(
    519     const std::string& filename, const char* mode, int* error) {
    520   if (!FileStream::Open(filename.c_str(), mode, error))
    521     return false;
    522 
    523   if (strchr(mode, "r") != NULL) {  // Opened in read mode.
    524     // Check if the buffer has been overwritten and determine how to read the
    525     // log in time sequence.
    526     size_t file_size;
    527     GetSize(&file_size);
    528     if (file_size == position_) {
    529       // The buffer has not been overwritten yet. Read 0 .. file_size
    530       read_segment_ = READ_LATEST;
    531       read_segment_available_ = file_size;
    532     } else {
    533       // The buffer has been over written. There are three segments: The first
    534       // one is 0 .. marked_position_, which is the marked earliest log. The
    535       // second one is position_ .. file_size, which is the middle log. The
    536       // last one is marked_position_ .. position_, which is the latest log.
    537       read_segment_ = READ_MARKED;
    538       read_segment_available_ = marked_position_;
    539       last_write_position_ = position_;
    540     }
    541 
    542     // Read from the beginning.
    543     position_ = 0;
    544     SetPosition(position_);
    545   }
    546 
    547   return true;
    548 }
    549 
    550 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len,
    551                                       size_t* read, int* error) {
    552   if (read_segment_available_ == 0) {
    553     size_t file_size;
    554     switch (read_segment_) {
    555       case READ_MARKED:  // Finished READ_MARKED and start READ_MIDDLE.
    556         read_segment_ = READ_MIDDLE;
    557         position_ = last_write_position_;
    558         SetPosition(position_);
    559         GetSize(&file_size);
    560         read_segment_available_ = file_size - position_;
    561         break;
    562 
    563       case READ_MIDDLE:  // Finished READ_MIDDLE and start READ_LATEST.
    564         read_segment_ = READ_LATEST;
    565         position_ = marked_position_;
    566         SetPosition(position_);
    567         read_segment_available_ = last_write_position_ - position_;
    568         break;
    569 
    570       default:  // Finished READ_LATEST and return EOS.
    571         return rtc::SR_EOS;
    572     }
    573   }
    574 
    575   size_t local_read;
    576   if (!read) read = &local_read;
    577 
    578   size_t to_read = rtc::_min(buffer_len, read_segment_available_);
    579   rtc::StreamResult result
    580     = rtc::FileStream::Read(buffer, to_read, read, error);
    581   if (result == rtc::SR_SUCCESS) {
    582     read_segment_available_ -= *read;
    583     position_ += *read;
    584   }
    585   return result;
    586 }
    587 
    588 StreamResult CircularFileStream::Write(const void* data, size_t data_len,
    589                                        size_t* written, int* error) {
    590   if (position_ >= max_write_size_) {
    591     ASSERT(position_ == max_write_size_);
    592     position_ = marked_position_;
    593     SetPosition(position_);
    594   }
    595 
    596   size_t local_written;
    597   if (!written) written = &local_written;
    598 
    599   size_t to_eof = max_write_size_ - position_;
    600   size_t to_write = rtc::_min(data_len, to_eof);
    601   rtc::StreamResult result
    602     = rtc::FileStream::Write(data, to_write, written, error);
    603   if (result == rtc::SR_SUCCESS) {
    604     position_ += *written;
    605   }
    606   return result;
    607 }
    608 
    609 AsyncWriteStream::~AsyncWriteStream() {
    610   write_thread_->Clear(this, 0, NULL);
    611   ClearBufferAndWrite();
    612 
    613   CritScope cs(&crit_stream_);
    614   stream_.reset();
    615 }
    616 
    617 // This is needed by some stream writers, such as RtpDumpWriter.
    618 bool AsyncWriteStream::GetPosition(size_t* position) const {
    619   CritScope cs(&crit_stream_);
    620   return stream_->GetPosition(position);
    621 }
    622 
    623 // This is needed by some stream writers, such as the plugin log writers.
    624 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
    625                                     size_t* read, int* error) {
    626   CritScope cs(&crit_stream_);
    627   return stream_->Read(buffer, buffer_len, read, error);
    628 }
    629 
    630 void AsyncWriteStream::Close() {
    631   if (state_ == SS_CLOSED) {
    632     return;
    633   }
    634 
    635   write_thread_->Clear(this, 0, NULL);
    636   ClearBufferAndWrite();
    637 
    638   CritScope cs(&crit_stream_);
    639   stream_->Close();
    640   state_ = SS_CLOSED;
    641 }
    642 
    643 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
    644                                      size_t* written, int* error) {
    645   if (state_ == SS_CLOSED) {
    646     return SR_ERROR;
    647   }
    648 
    649   size_t previous_buffer_length = 0;
    650   {
    651     CritScope cs(&crit_buffer_);
    652     previous_buffer_length = buffer_.length();
    653     buffer_.AppendData(data, data_len);
    654   }
    655 
    656   if (previous_buffer_length == 0) {
    657     // If there's stuff already in the buffer, then we already called
    658     // Post and the write_thread_ hasn't pulled it out yet, so we
    659     // don't need to re-Post.
    660     write_thread_->Post(this, 0, NULL);
    661   }
    662   // Return immediately, assuming that it works.
    663   if (written) {
    664     *written = data_len;
    665   }
    666   return SR_SUCCESS;
    667 }
    668 
    669 void AsyncWriteStream::OnMessage(rtc::Message* pmsg) {
    670   ClearBufferAndWrite();
    671 }
    672 
    673 bool AsyncWriteStream::Flush() {
    674   if (state_ == SS_CLOSED) {
    675     return false;
    676   }
    677 
    678   ClearBufferAndWrite();
    679 
    680   CritScope cs(&crit_stream_);
    681   return stream_->Flush();
    682 }
    683 
    684 void AsyncWriteStream::ClearBufferAndWrite() {
    685   Buffer to_write;
    686   {
    687     CritScope cs_buffer(&crit_buffer_);
    688     buffer_.TransferTo(&to_write);
    689   }
    690 
    691   if (to_write.length() > 0) {
    692     CritScope cs(&crit_stream_);
    693     stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
    694   }
    695 }
    696 
    697 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
    698 
    699 // Have to identically rewrite the FileStream destructor or else it would call
    700 // the base class's Close() instead of the sub-class's.
    701 POpenStream::~POpenStream() {
    702   POpenStream::Close();
    703 }
    704 
    705 bool POpenStream::Open(const std::string& subcommand,
    706                        const char* mode,
    707                        int* error) {
    708   Close();
    709   file_ = popen(subcommand.c_str(), mode);
    710   if (file_ == NULL) {
    711     if (error)
    712       *error = errno;
    713     return false;
    714   }
    715   return true;
    716 }
    717 
    718 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
    719                             int shflag, int* error) {
    720   return Open(subcommand, mode, error);
    721 }
    722 
    723 void POpenStream::DoClose() {
    724   wait_status_ = pclose(file_);
    725 }
    726 
    727 #endif
    728 
    729 ///////////////////////////////////////////////////////////////////////////////
    730 // MemoryStream
    731 ///////////////////////////////////////////////////////////////////////////////
    732 
    733 MemoryStreamBase::MemoryStreamBase()
    734   : buffer_(NULL), buffer_length_(0), data_length_(0),
    735     seek_position_(0) {
    736 }
    737 
    738 StreamState MemoryStreamBase::GetState() const {
    739   return SS_OPEN;
    740 }
    741 
    742 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
    743                                     size_t* bytes_read, int* error) {
    744   if (seek_position_ >= data_length_) {
    745     return SR_EOS;
    746   }
    747   size_t available = data_length_ - seek_position_;
    748   if (bytes > available) {
    749     // Read partial buffer
    750     bytes = available;
    751   }
    752   memcpy(buffer, &buffer_[seek_position_], bytes);
    753   seek_position_ += bytes;
    754   if (bytes_read) {
    755     *bytes_read = bytes;
    756   }
    757   return SR_SUCCESS;
    758 }
    759 
    760 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
    761                                      size_t* bytes_written, int* error) {
    762   size_t available = buffer_length_ - seek_position_;
    763   if (0 == available) {
    764     // Increase buffer size to the larger of:
    765     // a) new position rounded up to next 256 bytes
    766     // b) double the previous length
    767     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
    768                                     buffer_length_ * 2);
    769     StreamResult result = DoReserve(new_buffer_length, error);
    770     if (SR_SUCCESS != result) {
    771       return result;
    772     }
    773     ASSERT(buffer_length_ >= new_buffer_length);
    774     available = buffer_length_ - seek_position_;
    775   }
    776 
    777   if (bytes > available) {
    778     bytes = available;
    779   }
    780   memcpy(&buffer_[seek_position_], buffer, bytes);
    781   seek_position_ += bytes;
    782   if (data_length_ < seek_position_) {
    783     data_length_ = seek_position_;
    784   }
    785   if (bytes_written) {
    786     *bytes_written = bytes;
    787   }
    788   return SR_SUCCESS;
    789 }
    790 
    791 void MemoryStreamBase::Close() {
    792   // nothing to do
    793 }
    794 
    795 bool MemoryStreamBase::SetPosition(size_t position) {
    796   if (position > data_length_)
    797     return false;
    798   seek_position_ = position;
    799   return true;
    800 }
    801 
    802 bool MemoryStreamBase::GetPosition(size_t* position) const {
    803   if (position)
    804     *position = seek_position_;
    805   return true;
    806 }
    807 
    808 bool MemoryStreamBase::GetSize(size_t* size) const {
    809   if (size)
    810     *size = data_length_;
    811   return true;
    812 }
    813 
    814 bool MemoryStreamBase::GetAvailable(size_t* size) const {
    815   if (size)
    816     *size = data_length_ - seek_position_;
    817   return true;
    818 }
    819 
    820 bool MemoryStreamBase::ReserveSize(size_t size) {
    821   return (SR_SUCCESS == DoReserve(size, NULL));
    822 }
    823 
    824 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
    825   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
    826 }
    827 
    828 ///////////////////////////////////////////////////////////////////////////////
    829 
    830 MemoryStream::MemoryStream()
    831   : buffer_alloc_(NULL) {
    832 }
    833 
    834 MemoryStream::MemoryStream(const char* data)
    835   : buffer_alloc_(NULL) {
    836   SetData(data, strlen(data));
    837 }
    838 
    839 MemoryStream::MemoryStream(const void* data, size_t length)
    840   : buffer_alloc_(NULL) {
    841   SetData(data, length);
    842 }
    843 
    844 MemoryStream::~MemoryStream() {
    845   delete [] buffer_alloc_;
    846 }
    847 
    848 void MemoryStream::SetData(const void* data, size_t length) {
    849   data_length_ = buffer_length_ = length;
    850   delete [] buffer_alloc_;
    851   buffer_alloc_ = new char[buffer_length_ + kAlignment];
    852   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
    853   memcpy(buffer_, data, data_length_);
    854   seek_position_ = 0;
    855 }
    856 
    857 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
    858   if (buffer_length_ >= size)
    859     return SR_SUCCESS;
    860 
    861   if (char* new_buffer_alloc = new char[size + kAlignment]) {
    862     char* new_buffer = reinterpret_cast<char*>(
    863         ALIGNP(new_buffer_alloc, kAlignment));
    864     memcpy(new_buffer, buffer_, data_length_);
    865     delete [] buffer_alloc_;
    866     buffer_alloc_ = new_buffer_alloc;
    867     buffer_ = new_buffer;
    868     buffer_length_ = size;
    869     return SR_SUCCESS;
    870   }
    871 
    872   if (error) {
    873     *error = ENOMEM;
    874   }
    875   return SR_ERROR;
    876 }
    877 
    878 ///////////////////////////////////////////////////////////////////////////////
    879 
    880 ExternalMemoryStream::ExternalMemoryStream() {
    881 }
    882 
    883 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
    884   SetData(data, length);
    885 }
    886 
    887 ExternalMemoryStream::~ExternalMemoryStream() {
    888 }
    889 
    890 void ExternalMemoryStream::SetData(void* data, size_t length) {
    891   data_length_ = buffer_length_ = length;
    892   buffer_ = static_cast<char*>(data);
    893   seek_position_ = 0;
    894 }
    895 
    896 ///////////////////////////////////////////////////////////////////////////////
    897 // FifoBuffer
    898 ///////////////////////////////////////////////////////////////////////////////
    899 
    900 FifoBuffer::FifoBuffer(size_t size)
    901     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    902       data_length_(0), read_position_(0), owner_(Thread::Current()) {
    903   // all events are done on the owner_ thread
    904 }
    905 
    906 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
    907     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    908       data_length_(0), read_position_(0), owner_(owner) {
    909   // all events are done on the owner_ thread
    910 }
    911 
    912 FifoBuffer::~FifoBuffer() {
    913 }
    914 
    915 bool FifoBuffer::GetBuffered(size_t* size) const {
    916   CritScope cs(&crit_);
    917   *size = data_length_;
    918   return true;
    919 }
    920 
    921 bool FifoBuffer::SetCapacity(size_t size) {
    922   CritScope cs(&crit_);
    923   if (data_length_ > size) {
    924     return false;
    925   }
    926 
    927   if (size != buffer_length_) {
    928     char* buffer = new char[size];
    929     const size_t copy = data_length_;
    930     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
    931     memcpy(buffer, &buffer_[read_position_], tail_copy);
    932     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
    933     buffer_.reset(buffer);
    934     read_position_ = 0;
    935     buffer_length_ = size;
    936   }
    937   return true;
    938 }
    939 
    940 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
    941                                     size_t offset, size_t* bytes_read) {
    942   CritScope cs(&crit_);
    943   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
    944 }
    945 
    946 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
    947                                      size_t offset, size_t* bytes_written) {
    948   CritScope cs(&crit_);
    949   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
    950 }
    951 
    952 StreamState FifoBuffer::GetState() const {
    953   return state_;
    954 }
    955 
    956 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
    957                               size_t* bytes_read, int* error) {
    958   CritScope cs(&crit_);
    959   const bool was_writable = data_length_ < buffer_length_;
    960   size_t copy = 0;
    961   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
    962 
    963   if (result == SR_SUCCESS) {
    964     // If read was successful then adjust the read position and number of
    965     // bytes buffered.
    966     read_position_ = (read_position_ + copy) % buffer_length_;
    967     data_length_ -= copy;
    968     if (bytes_read) {
    969       *bytes_read = copy;
    970     }
    971 
    972     // if we were full before, and now we're not, post an event
    973     if (!was_writable && copy > 0) {
    974       PostEvent(owner_, SE_WRITE, 0);
    975     }
    976   }
    977   return result;
    978 }
    979 
    980 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
    981                                size_t* bytes_written, int* error) {
    982   CritScope cs(&crit_);
    983 
    984   const bool was_readable = (data_length_ > 0);
    985   size_t copy = 0;
    986   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
    987 
    988   if (result == SR_SUCCESS) {
    989     // If write was successful then adjust the number of readable bytes.
    990     data_length_ += copy;
    991     if (bytes_written) {
    992       *bytes_written = copy;
    993     }
    994 
    995     // if we didn't have any data to read before, and now we do, post an event
    996     if (!was_readable && copy > 0) {
    997       PostEvent(owner_, SE_READ, 0);
    998     }
    999   }
   1000   return result;
   1001 }
   1002 
   1003 void FifoBuffer::Close() {
   1004   CritScope cs(&crit_);
   1005   state_ = SS_CLOSED;
   1006 }
   1007 
   1008 const void* FifoBuffer::GetReadData(size_t* size) {
   1009   CritScope cs(&crit_);
   1010   *size = (read_position_ + data_length_ <= buffer_length_) ?
   1011       data_length_ : buffer_length_ - read_position_;
   1012   return &buffer_[read_position_];
   1013 }
   1014 
   1015 void FifoBuffer::ConsumeReadData(size_t size) {
   1016   CritScope cs(&crit_);
   1017   ASSERT(size <= data_length_);
   1018   const bool was_writable = data_length_ < buffer_length_;
   1019   read_position_ = (read_position_ + size) % buffer_length_;
   1020   data_length_ -= size;
   1021   if (!was_writable && size > 0) {
   1022     PostEvent(owner_, SE_WRITE, 0);
   1023   }
   1024 }
   1025 
   1026 void* FifoBuffer::GetWriteBuffer(size_t* size) {
   1027   CritScope cs(&crit_);
   1028   if (state_ == SS_CLOSED) {
   1029     return NULL;
   1030   }
   1031 
   1032   // if empty, reset the write position to the beginning, so we can get
   1033   // the biggest possible block
   1034   if (data_length_ == 0) {
   1035     read_position_ = 0;
   1036   }
   1037 
   1038   const size_t write_position = (read_position_ + data_length_)
   1039       % buffer_length_;
   1040   *size = (write_position > read_position_ || data_length_ == 0) ?
   1041       buffer_length_ - write_position : read_position_ - write_position;
   1042   return &buffer_[write_position];
   1043 }
   1044 
   1045 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
   1046   CritScope cs(&crit_);
   1047   ASSERT(size <= buffer_length_ - data_length_);
   1048   const bool was_readable = (data_length_ > 0);
   1049   data_length_ += size;
   1050   if (!was_readable && size > 0) {
   1051     PostEvent(owner_, SE_READ, 0);
   1052   }
   1053 }
   1054 
   1055 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
   1056   CritScope cs(&crit_);
   1057   *size = buffer_length_ - data_length_;
   1058   return true;
   1059 }
   1060 
   1061 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
   1062                                           size_t bytes,
   1063                                           size_t offset,
   1064                                           size_t* bytes_read) {
   1065   if (offset >= data_length_) {
   1066     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
   1067   }
   1068 
   1069   const size_t available = data_length_ - offset;
   1070   const size_t read_position = (read_position_ + offset) % buffer_length_;
   1071   const size_t copy = _min(bytes, available);
   1072   const size_t tail_copy = _min(copy, buffer_length_ - read_position);
   1073   char* const p = static_cast<char*>(buffer);
   1074   memcpy(p, &buffer_[read_position], tail_copy);
   1075   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
   1076 
   1077   if (bytes_read) {
   1078     *bytes_read = copy;
   1079   }
   1080   return SR_SUCCESS;
   1081 }
   1082 
   1083 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
   1084                                            size_t bytes,
   1085                                            size_t offset,
   1086                                            size_t* bytes_written) {
   1087   if (state_ == SS_CLOSED) {
   1088     return SR_EOS;
   1089   }
   1090 
   1091   if (data_length_ + offset >= buffer_length_) {
   1092     return SR_BLOCK;
   1093   }
   1094 
   1095   const size_t available = buffer_length_ - data_length_ - offset;
   1096   const size_t write_position = (read_position_ + data_length_ + offset)
   1097       % buffer_length_;
   1098   const size_t copy = _min(bytes, available);
   1099   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
   1100   const char* const p = static_cast<const char*>(buffer);
   1101   memcpy(&buffer_[write_position], p, tail_copy);
   1102   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
   1103 
   1104   if (bytes_written) {
   1105     *bytes_written = copy;
   1106   }
   1107   return SR_SUCCESS;
   1108 }
   1109 
   1110 
   1111 
   1112 ///////////////////////////////////////////////////////////////////////////////
   1113 // LoggingAdapter
   1114 ///////////////////////////////////////////////////////////////////////////////
   1115 
   1116 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
   1117                                const std::string& label, bool hex_mode)
   1118     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
   1119   set_label(label);
   1120 }
   1121 
   1122 void LoggingAdapter::set_label(const std::string& label) {
   1123   label_.assign("[");
   1124   label_.append(label);
   1125   label_.append("]");
   1126 }
   1127 
   1128 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
   1129                                   size_t* read, int* error) {
   1130   size_t local_read; if (!read) read = &local_read;
   1131   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
   1132                                                      error);
   1133   if (result == SR_SUCCESS) {
   1134     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
   1135   }
   1136   return result;
   1137 }
   1138 
   1139 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
   1140                                    size_t* written, int* error) {
   1141   size_t local_written;
   1142   if (!written) written = &local_written;
   1143   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
   1144                                                       error);
   1145   if (result == SR_SUCCESS) {
   1146     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
   1147                  &lms_);
   1148   }
   1149   return result;
   1150 }
   1151 
   1152 void LoggingAdapter::Close() {
   1153   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1154   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1155   LOG_V(level_) << label_ << " Closed locally";
   1156   StreamAdapterInterface::Close();
   1157 }
   1158 
   1159 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
   1160   if (events & SE_OPEN) {
   1161     LOG_V(level_) << label_ << " Open";
   1162   } else if (events & SE_CLOSE) {
   1163     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1164     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1165     LOG_V(level_) << label_ << " Closed with error: " << err;
   1166   }
   1167   StreamAdapterInterface::OnEvent(stream, events, err);
   1168 }
   1169 
   1170 ///////////////////////////////////////////////////////////////////////////////
   1171 // StringStream - Reads/Writes to an external std::string
   1172 ///////////////////////////////////////////////////////////////////////////////
   1173 
   1174 StringStream::StringStream(std::string& str)
   1175     : str_(str), read_pos_(0), read_only_(false) {
   1176 }
   1177 
   1178 StringStream::StringStream(const std::string& str)
   1179     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
   1180 }
   1181 
   1182 StreamState StringStream::GetState() const {
   1183   return SS_OPEN;
   1184 }
   1185 
   1186 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
   1187                                       size_t* read, int* error) {
   1188   size_t available = _min(buffer_len, str_.size() - read_pos_);
   1189   if (!available)
   1190     return SR_EOS;
   1191   memcpy(buffer, str_.data() + read_pos_, available);
   1192   read_pos_ += available;
   1193   if (read)
   1194     *read = available;
   1195   return SR_SUCCESS;
   1196 }
   1197 
   1198 StreamResult StringStream::Write(const void* data, size_t data_len,
   1199                                       size_t* written, int* error) {
   1200   if (read_only_) {
   1201     if (error) {
   1202       *error = -1;
   1203     }
   1204     return SR_ERROR;
   1205   }
   1206   str_.append(static_cast<const char*>(data),
   1207               static_cast<const char*>(data) + data_len);
   1208   if (written)
   1209     *written = data_len;
   1210   return SR_SUCCESS;
   1211 }
   1212 
   1213 void StringStream::Close() {
   1214 }
   1215 
   1216 bool StringStream::SetPosition(size_t position) {
   1217   if (position > str_.size())
   1218     return false;
   1219   read_pos_ = position;
   1220   return true;
   1221 }
   1222 
   1223 bool StringStream::GetPosition(size_t* position) const {
   1224   if (position)
   1225     *position = read_pos_;
   1226   return true;
   1227 }
   1228 
   1229 bool StringStream::GetSize(size_t* size) const {
   1230   if (size)
   1231     *size = str_.size();
   1232   return true;
   1233 }
   1234 
   1235 bool StringStream::GetAvailable(size_t* size) const {
   1236   if (size)
   1237     *size = str_.size() - read_pos_;
   1238   return true;
   1239 }
   1240 
   1241 bool StringStream::ReserveSize(size_t size) {
   1242   if (read_only_)
   1243     return false;
   1244   str_.reserve(size);
   1245   return true;
   1246 }
   1247 
   1248 ///////////////////////////////////////////////////////////////////////////////
   1249 // StreamReference
   1250 ///////////////////////////////////////////////////////////////////////////////
   1251 
   1252 StreamReference::StreamReference(StreamInterface* stream)
   1253     : StreamAdapterInterface(stream, false) {
   1254   // owner set to false so the destructor does not free the stream.
   1255   stream_ref_count_ = new StreamRefCount(stream);
   1256 }
   1257 
   1258 StreamInterface* StreamReference::NewReference() {
   1259   stream_ref_count_->AddReference();
   1260   return new StreamReference(stream_ref_count_, stream());
   1261 }
   1262 
   1263 StreamReference::~StreamReference() {
   1264   stream_ref_count_->Release();
   1265 }
   1266 
   1267 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
   1268                                  StreamInterface* stream)
   1269     : StreamAdapterInterface(stream, false),
   1270       stream_ref_count_(stream_ref_count) {
   1271 }
   1272 
   1273 ///////////////////////////////////////////////////////////////////////////////
   1274 
   1275 StreamResult Flow(StreamInterface* source,
   1276                   char* buffer, size_t buffer_len,
   1277                   StreamInterface* sink,
   1278                   size_t* data_len /* = NULL */) {
   1279   ASSERT(buffer_len > 0);
   1280 
   1281   StreamResult result;
   1282   size_t count, read_pos, write_pos;
   1283   if (data_len) {
   1284     read_pos = *data_len;
   1285   } else {
   1286     read_pos = 0;
   1287   }
   1288 
   1289   bool end_of_stream = false;
   1290   do {
   1291     // Read until buffer is full, end of stream, or error
   1292     while (!end_of_stream && (read_pos < buffer_len)) {
   1293       result = source->Read(buffer + read_pos, buffer_len - read_pos,
   1294                             &count, NULL);
   1295       if (result == SR_EOS) {
   1296         end_of_stream = true;
   1297       } else if (result != SR_SUCCESS) {
   1298         if (data_len) {
   1299           *data_len = read_pos;
   1300         }
   1301         return result;
   1302       } else {
   1303         read_pos += count;
   1304       }
   1305     }
   1306 
   1307     // Write until buffer is empty, or error (including end of stream)
   1308     write_pos = 0;
   1309     while (write_pos < read_pos) {
   1310       result = sink->Write(buffer + write_pos, read_pos - write_pos,
   1311                            &count, NULL);
   1312       if (result != SR_SUCCESS) {
   1313         if (data_len) {
   1314           *data_len = read_pos - write_pos;
   1315           if (write_pos > 0) {
   1316             memmove(buffer, buffer + write_pos, *data_len);
   1317           }
   1318         }
   1319         return result;
   1320       }
   1321       write_pos += count;
   1322     }
   1323 
   1324     read_pos = 0;
   1325   } while (!end_of_stream);
   1326 
   1327   if (data_len) {
   1328     *data_len = 0;
   1329   }
   1330   return SR_SUCCESS;
   1331 }
   1332 
   1333 ///////////////////////////////////////////////////////////////////////////////
   1334 
   1335 }  // namespace rtc
   1336