Home | History | Annotate | Download | only in base
      1 /*
      2  *  Copyright 2004 The WebRTC Project Authors. All rights reserved.
      3  *
      4  *  Use of this source code is governed by a BSD-style license
      5  *  that can be found in the LICENSE file in the root of the source
      6  *  tree. An additional intellectual property rights grant can be found
      7  *  in the file PATENTS.  All contributing project authors may
      8  *  be found in the AUTHORS file in the root of the source tree.
      9  */
     10 
     11 #if defined(WEBRTC_POSIX)
     12 #include <sys/file.h>
     13 #endif  // WEBRTC_POSIX
     14 #include <sys/types.h>
     15 #include <sys/stat.h>
     16 #include <errno.h>
     17 
     18 #include <algorithm>
     19 #include <string>
     20 
     21 #include "webrtc/base/basictypes.h"
     22 #include "webrtc/base/common.h"
     23 #include "webrtc/base/logging.h"
     24 #include "webrtc/base/messagequeue.h"
     25 #include "webrtc/base/stream.h"
     26 #include "webrtc/base/stringencode.h"
     27 #include "webrtc/base/stringutils.h"
     28 #include "webrtc/base/thread.h"
     29 #include "webrtc/base/timeutils.h"
     30 
     31 #if defined(WEBRTC_WIN)
     32 #include "webrtc/base/win32.h"
     33 #define fileno _fileno
     34 #endif
     35 
     36 namespace rtc {
     37 
     38 ///////////////////////////////////////////////////////////////////////////////
     39 // StreamInterface
     40 ///////////////////////////////////////////////////////////////////////////////
     41 StreamInterface::~StreamInterface() {
     42 }
     43 
     44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
     45                                        size_t* written, int* error) {
     46   StreamResult result = SR_SUCCESS;
     47   size_t total_written = 0, current_written;
     48   while (total_written < data_len) {
     49     result = Write(static_cast<const char*>(data) + total_written,
     50                    data_len - total_written, &current_written, error);
     51     if (result != SR_SUCCESS)
     52       break;
     53     total_written += current_written;
     54   }
     55   if (written)
     56     *written = total_written;
     57   return result;
     58 }
     59 
     60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
     61                                       size_t* read, int* error) {
     62   StreamResult result = SR_SUCCESS;
     63   size_t total_read = 0, current_read;
     64   while (total_read < buffer_len) {
     65     result = Read(static_cast<char*>(buffer) + total_read,
     66                   buffer_len - total_read, &current_read, error);
     67     if (result != SR_SUCCESS)
     68       break;
     69     total_read += current_read;
     70   }
     71   if (read)
     72     *read = total_read;
     73   return result;
     74 }
     75 
     76 StreamResult StreamInterface::ReadLine(std::string* line) {
     77   line->clear();
     78   StreamResult result = SR_SUCCESS;
     79   while (true) {
     80     char ch;
     81     result = Read(&ch, sizeof(ch), NULL, NULL);
     82     if (result != SR_SUCCESS) {
     83       break;
     84     }
     85     if (ch == '\n') {
     86       break;
     87     }
     88     line->push_back(ch);
     89   }
     90   if (!line->empty()) {   // give back the line we've collected so far with
     91     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
     92   }
     93   return result;
     94 }
     95 
     96 void StreamInterface::PostEvent(Thread* t, int events, int err) {
     97   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
     98 }
     99 
    100 void StreamInterface::PostEvent(int events, int err) {
    101   PostEvent(Thread::Current(), events, err);
    102 }
    103 
    104 const void* StreamInterface::GetReadData(size_t* data_len) {
    105   return NULL;
    106 }
    107 
    108 void* StreamInterface::GetWriteBuffer(size_t* buf_len) {
    109   return NULL;
    110 }
    111 
    112 bool StreamInterface::SetPosition(size_t position) {
    113   return false;
    114 }
    115 
    116 bool StreamInterface::GetPosition(size_t* position) const {
    117   return false;
    118 }
    119 
    120 bool StreamInterface::GetSize(size_t* size) const {
    121   return false;
    122 }
    123 
    124 bool StreamInterface::GetAvailable(size_t* size) const {
    125   return false;
    126 }
    127 
    128 bool StreamInterface::GetWriteRemaining(size_t* size) const {
    129   return false;
    130 }
    131 
    132 bool StreamInterface::Flush() {
    133   return false;
    134 }
    135 
    136 bool StreamInterface::ReserveSize(size_t size) {
    137   return true;
    138 }
    139 
    140 StreamInterface::StreamInterface() {
    141 }
    142 
    143 void StreamInterface::OnMessage(Message* msg) {
    144   if (MSG_POST_EVENT == msg->message_id) {
    145     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
    146     SignalEvent(this, pe->events, pe->error);
    147     delete msg->pdata;
    148   }
    149 }
    150 
    151 ///////////////////////////////////////////////////////////////////////////////
    152 // StreamAdapterInterface
    153 ///////////////////////////////////////////////////////////////////////////////
    154 
    155 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
    156                                                bool owned)
    157     : stream_(stream), owned_(owned) {
    158   if (NULL != stream_)
    159     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    160 }
    161 
    162 StreamState StreamAdapterInterface::GetState() const {
    163   return stream_->GetState();
    164 }
    165 StreamResult StreamAdapterInterface::Read(void* buffer,
    166                                           size_t buffer_len,
    167                                           size_t* read,
    168                                           int* error) {
    169   return stream_->Read(buffer, buffer_len, read, error);
    170 }
    171 StreamResult StreamAdapterInterface::Write(const void* data,
    172                                            size_t data_len,
    173                                            size_t* written,
    174                                            int* error) {
    175   return stream_->Write(data, data_len, written, error);
    176 }
    177 void StreamAdapterInterface::Close() {
    178   stream_->Close();
    179 }
    180 
    181 bool StreamAdapterInterface::SetPosition(size_t position) {
    182   return stream_->SetPosition(position);
    183 }
    184 
    185 bool StreamAdapterInterface::GetPosition(size_t* position) const {
    186   return stream_->GetPosition(position);
    187 }
    188 
    189 bool StreamAdapterInterface::GetSize(size_t* size) const {
    190   return stream_->GetSize(size);
    191 }
    192 
    193 bool StreamAdapterInterface::GetAvailable(size_t* size) const {
    194   return stream_->GetAvailable(size);
    195 }
    196 
    197 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const {
    198   return stream_->GetWriteRemaining(size);
    199 }
    200 
    201 bool StreamAdapterInterface::ReserveSize(size_t size) {
    202   return stream_->ReserveSize(size);
    203 }
    204 
    205 bool StreamAdapterInterface::Flush() {
    206   return stream_->Flush();
    207 }
    208 
    209 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
    210   if (NULL != stream_)
    211     stream_->SignalEvent.disconnect(this);
    212   if (owned_)
    213     delete stream_;
    214   stream_ = stream;
    215   owned_ = owned;
    216   if (NULL != stream_)
    217     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    218 }
    219 
    220 StreamInterface* StreamAdapterInterface::Detach() {
    221   if (NULL != stream_)
    222     stream_->SignalEvent.disconnect(this);
    223   StreamInterface* stream = stream_;
    224   stream_ = NULL;
    225   return stream;
    226 }
    227 
    228 StreamAdapterInterface::~StreamAdapterInterface() {
    229   if (owned_)
    230     delete stream_;
    231 }
    232 
    233 void StreamAdapterInterface::OnEvent(StreamInterface* stream,
    234                                      int events,
    235                                      int err) {
    236   SignalEvent(this, events, err);
    237 }
    238 
    239 ///////////////////////////////////////////////////////////////////////////////
    240 // StreamTap
    241 ///////////////////////////////////////////////////////////////////////////////
    242 
    243 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
    244     : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS),
    245         tap_error_(0) {
    246   AttachTap(tap);
    247 }
    248 
    249 StreamTap::~StreamTap() = default;
    250 
    251 void StreamTap::AttachTap(StreamInterface* tap) {
    252   tap_.reset(tap);
    253 }
    254 
    255 StreamInterface* StreamTap::DetachTap() {
    256   return tap_.release();
    257 }
    258 
    259 StreamResult StreamTap::GetTapResult(int* error) {
    260   if (error) {
    261     *error = tap_error_;
    262   }
    263   return tap_result_;
    264 }
    265 
    266 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
    267                              size_t* read, int* error) {
    268   size_t backup_read;
    269   if (!read) {
    270     read = &backup_read;
    271   }
    272   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
    273                                                   read, error);
    274   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    275     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
    276   }
    277   return res;
    278 }
    279 
    280 StreamResult StreamTap::Write(const void* data, size_t data_len,
    281                               size_t* written, int* error) {
    282   size_t backup_written;
    283   if (!written) {
    284     written = &backup_written;
    285   }
    286   StreamResult res = StreamAdapterInterface::Write(data, data_len,
    287                                                    written, error);
    288   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    289     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
    290   }
    291   return res;
    292 }
    293 
    294 ///////////////////////////////////////////////////////////////////////////////
    295 // NullStream
    296 ///////////////////////////////////////////////////////////////////////////////
    297 
    298 NullStream::NullStream() {
    299 }
    300 
    301 NullStream::~NullStream() {
    302 }
    303 
    304 StreamState NullStream::GetState() const {
    305   return SS_OPEN;
    306 }
    307 
    308 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
    309                               size_t* read, int* error) {
    310   if (error) *error = -1;
    311   return SR_ERROR;
    312 }
    313 
    314 StreamResult NullStream::Write(const void* data, size_t data_len,
    315                                size_t* written, int* error) {
    316   if (written) *written = data_len;
    317   return SR_SUCCESS;
    318 }
    319 
    320 void NullStream::Close() {
    321 }
    322 
    323 ///////////////////////////////////////////////////////////////////////////////
    324 // FileStream
    325 ///////////////////////////////////////////////////////////////////////////////
    326 
    327 FileStream::FileStream() : file_(NULL) {
    328 }
    329 
    330 FileStream::~FileStream() {
    331   FileStream::Close();
    332 }
    333 
    334 bool FileStream::Open(const std::string& filename, const char* mode,
    335                       int* error) {
    336   Close();
    337 #if defined(WEBRTC_WIN)
    338   std::wstring wfilename;
    339   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    340     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
    341   } else {
    342     if (error) {
    343       *error = -1;
    344       return false;
    345     }
    346   }
    347 #else
    348   file_ = fopen(filename.c_str(), mode);
    349 #endif
    350   if (!file_ && error) {
    351     *error = errno;
    352   }
    353   return (file_ != NULL);
    354 }
    355 
    356 bool FileStream::OpenShare(const std::string& filename, const char* mode,
    357                            int shflag, int* error) {
    358   Close();
    359 #if defined(WEBRTC_WIN)
    360   std::wstring wfilename;
    361   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    362     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
    363     if (!file_ && error) {
    364       *error = errno;
    365       return false;
    366     }
    367     return file_ != NULL;
    368   } else {
    369     if (error) {
    370       *error = -1;
    371     }
    372     return false;
    373   }
    374 #else
    375   return Open(filename, mode, error);
    376 #endif
    377 }
    378 
    379 bool FileStream::DisableBuffering() {
    380   if (!file_)
    381     return false;
    382   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
    383 }
    384 
    385 StreamState FileStream::GetState() const {
    386   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
    387 }
    388 
    389 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
    390                               size_t* read, int* error) {
    391   if (!file_)
    392     return SR_EOS;
    393   size_t result = fread(buffer, 1, buffer_len, file_);
    394   if ((result == 0) && (buffer_len > 0)) {
    395     if (feof(file_))
    396       return SR_EOS;
    397     if (error)
    398       *error = errno;
    399     return SR_ERROR;
    400   }
    401   if (read)
    402     *read = result;
    403   return SR_SUCCESS;
    404 }
    405 
    406 StreamResult FileStream::Write(const void* data, size_t data_len,
    407                                size_t* written, int* error) {
    408   if (!file_)
    409     return SR_EOS;
    410   size_t result = fwrite(data, 1, data_len, file_);
    411   if ((result == 0) && (data_len > 0)) {
    412     if (error)
    413       *error = errno;
    414     return SR_ERROR;
    415   }
    416   if (written)
    417     *written = result;
    418   return SR_SUCCESS;
    419 }
    420 
    421 void FileStream::Close() {
    422   if (file_) {
    423     DoClose();
    424     file_ = NULL;
    425   }
    426 }
    427 
    428 bool FileStream::SetPosition(size_t position) {
    429   if (!file_)
    430     return false;
    431   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
    432 }
    433 
    434 bool FileStream::GetPosition(size_t* position) const {
    435   ASSERT(NULL != position);
    436   if (!file_)
    437     return false;
    438   long result = ftell(file_);
    439   if (result < 0)
    440     return false;
    441   if (position)
    442     *position = result;
    443   return true;
    444 }
    445 
    446 bool FileStream::GetSize(size_t* size) const {
    447   ASSERT(NULL != size);
    448   if (!file_)
    449     return false;
    450   struct stat file_stats;
    451   if (fstat(fileno(file_), &file_stats) != 0)
    452     return false;
    453   if (size)
    454     *size = file_stats.st_size;
    455   return true;
    456 }
    457 
    458 bool FileStream::GetAvailable(size_t* size) const {
    459   ASSERT(NULL != size);
    460   if (!GetSize(size))
    461     return false;
    462   long result = ftell(file_);
    463   if (result < 0)
    464     return false;
    465   if (size)
    466     *size -= result;
    467   return true;
    468 }
    469 
    470 bool FileStream::ReserveSize(size_t size) {
    471   // TODO: extend the file to the proper length
    472   return true;
    473 }
    474 
    475 bool FileStream::GetSize(const std::string& filename, size_t* size) {
    476   struct stat file_stats;
    477   if (stat(filename.c_str(), &file_stats) != 0)
    478     return false;
    479   *size = file_stats.st_size;
    480   return true;
    481 }
    482 
    483 bool FileStream::Flush() {
    484   if (file_) {
    485     return (0 == fflush(file_));
    486   }
    487   // try to flush empty file?
    488   ASSERT(false);
    489   return false;
    490 }
    491 
    492 #if defined(WEBRTC_POSIX) && !defined(__native_client__)
    493 
    494 bool FileStream::TryLock() {
    495   if (file_ == NULL) {
    496     // Stream not open.
    497     ASSERT(false);
    498     return false;
    499   }
    500 
    501   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
    502 }
    503 
    504 bool FileStream::Unlock() {
    505   if (file_ == NULL) {
    506     // Stream not open.
    507     ASSERT(false);
    508     return false;
    509   }
    510 
    511   return flock(fileno(file_), LOCK_UN) == 0;
    512 }
    513 
    514 #endif
    515 
    516 void FileStream::DoClose() {
    517   fclose(file_);
    518 }
    519 
    520 ///////////////////////////////////////////////////////////////////////////////
    521 // MemoryStream
    522 ///////////////////////////////////////////////////////////////////////////////
    523 
    524 MemoryStreamBase::MemoryStreamBase()
    525   : buffer_(NULL), buffer_length_(0), data_length_(0),
    526     seek_position_(0) {
    527 }
    528 
    529 StreamState MemoryStreamBase::GetState() const {
    530   return SS_OPEN;
    531 }
    532 
    533 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
    534                                     size_t* bytes_read, int* error) {
    535   if (seek_position_ >= data_length_) {
    536     return SR_EOS;
    537   }
    538   size_t available = data_length_ - seek_position_;
    539   if (bytes > available) {
    540     // Read partial buffer
    541     bytes = available;
    542   }
    543   memcpy(buffer, &buffer_[seek_position_], bytes);
    544   seek_position_ += bytes;
    545   if (bytes_read) {
    546     *bytes_read = bytes;
    547   }
    548   return SR_SUCCESS;
    549 }
    550 
    551 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
    552                                      size_t* bytes_written, int* error) {
    553   size_t available = buffer_length_ - seek_position_;
    554   if (0 == available) {
    555     // Increase buffer size to the larger of:
    556     // a) new position rounded up to next 256 bytes
    557     // b) double the previous length
    558     size_t new_buffer_length =
    559         std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2);
    560     StreamResult result = DoReserve(new_buffer_length, error);
    561     if (SR_SUCCESS != result) {
    562       return result;
    563     }
    564     ASSERT(buffer_length_ >= new_buffer_length);
    565     available = buffer_length_ - seek_position_;
    566   }
    567 
    568   if (bytes > available) {
    569     bytes = available;
    570   }
    571   memcpy(&buffer_[seek_position_], buffer, bytes);
    572   seek_position_ += bytes;
    573   if (data_length_ < seek_position_) {
    574     data_length_ = seek_position_;
    575   }
    576   if (bytes_written) {
    577     *bytes_written = bytes;
    578   }
    579   return SR_SUCCESS;
    580 }
    581 
    582 void MemoryStreamBase::Close() {
    583   // nothing to do
    584 }
    585 
    586 bool MemoryStreamBase::SetPosition(size_t position) {
    587   if (position > data_length_)
    588     return false;
    589   seek_position_ = position;
    590   return true;
    591 }
    592 
    593 bool MemoryStreamBase::GetPosition(size_t* position) const {
    594   if (position)
    595     *position = seek_position_;
    596   return true;
    597 }
    598 
    599 bool MemoryStreamBase::GetSize(size_t* size) const {
    600   if (size)
    601     *size = data_length_;
    602   return true;
    603 }
    604 
    605 bool MemoryStreamBase::GetAvailable(size_t* size) const {
    606   if (size)
    607     *size = data_length_ - seek_position_;
    608   return true;
    609 }
    610 
    611 bool MemoryStreamBase::ReserveSize(size_t size) {
    612   return (SR_SUCCESS == DoReserve(size, NULL));
    613 }
    614 
    615 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
    616   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
    617 }
    618 
    619 ///////////////////////////////////////////////////////////////////////////////
    620 
    621 MemoryStream::MemoryStream()
    622   : buffer_alloc_(NULL) {
    623 }
    624 
    625 MemoryStream::MemoryStream(const char* data)
    626   : buffer_alloc_(NULL) {
    627   SetData(data, strlen(data));
    628 }
    629 
    630 MemoryStream::MemoryStream(const void* data, size_t length)
    631   : buffer_alloc_(NULL) {
    632   SetData(data, length);
    633 }
    634 
    635 MemoryStream::~MemoryStream() {
    636   delete [] buffer_alloc_;
    637 }
    638 
    639 void MemoryStream::SetData(const void* data, size_t length) {
    640   data_length_ = buffer_length_ = length;
    641   delete [] buffer_alloc_;
    642   buffer_alloc_ = new char[buffer_length_ + kAlignment];
    643   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
    644   memcpy(buffer_, data, data_length_);
    645   seek_position_ = 0;
    646 }
    647 
    648 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
    649   if (buffer_length_ >= size)
    650     return SR_SUCCESS;
    651 
    652   if (char* new_buffer_alloc = new char[size + kAlignment]) {
    653     char* new_buffer = reinterpret_cast<char*>(
    654         ALIGNP(new_buffer_alloc, kAlignment));
    655     memcpy(new_buffer, buffer_, data_length_);
    656     delete [] buffer_alloc_;
    657     buffer_alloc_ = new_buffer_alloc;
    658     buffer_ = new_buffer;
    659     buffer_length_ = size;
    660     return SR_SUCCESS;
    661   }
    662 
    663   if (error) {
    664     *error = ENOMEM;
    665   }
    666   return SR_ERROR;
    667 }
    668 
    669 ///////////////////////////////////////////////////////////////////////////////
    670 
    671 ExternalMemoryStream::ExternalMemoryStream() {
    672 }
    673 
    674 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
    675   SetData(data, length);
    676 }
    677 
    678 ExternalMemoryStream::~ExternalMemoryStream() {
    679 }
    680 
    681 void ExternalMemoryStream::SetData(void* data, size_t length) {
    682   data_length_ = buffer_length_ = length;
    683   buffer_ = static_cast<char*>(data);
    684   seek_position_ = 0;
    685 }
    686 
    687 ///////////////////////////////////////////////////////////////////////////////
    688 // FifoBuffer
    689 ///////////////////////////////////////////////////////////////////////////////
    690 
    691 FifoBuffer::FifoBuffer(size_t size)
    692     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    693       data_length_(0), read_position_(0), owner_(Thread::Current()) {
    694   // all events are done on the owner_ thread
    695 }
    696 
    697 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
    698     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    699       data_length_(0), read_position_(0), owner_(owner) {
    700   // all events are done on the owner_ thread
    701 }
    702 
    703 FifoBuffer::~FifoBuffer() {
    704 }
    705 
    706 bool FifoBuffer::GetBuffered(size_t* size) const {
    707   CritScope cs(&crit_);
    708   *size = data_length_;
    709   return true;
    710 }
    711 
    712 bool FifoBuffer::SetCapacity(size_t size) {
    713   CritScope cs(&crit_);
    714   if (data_length_ > size) {
    715     return false;
    716   }
    717 
    718   if (size != buffer_length_) {
    719     char* buffer = new char[size];
    720     const size_t copy = data_length_;
    721     const size_t tail_copy = std::min(copy, buffer_length_ - read_position_);
    722     memcpy(buffer, &buffer_[read_position_], tail_copy);
    723     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
    724     buffer_.reset(buffer);
    725     read_position_ = 0;
    726     buffer_length_ = size;
    727   }
    728   return true;
    729 }
    730 
    731 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
    732                                     size_t offset, size_t* bytes_read) {
    733   CritScope cs(&crit_);
    734   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
    735 }
    736 
    737 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
    738                                      size_t offset, size_t* bytes_written) {
    739   CritScope cs(&crit_);
    740   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
    741 }
    742 
    743 StreamState FifoBuffer::GetState() const {
    744   return state_;
    745 }
    746 
    747 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
    748                               size_t* bytes_read, int* error) {
    749   CritScope cs(&crit_);
    750   const bool was_writable = data_length_ < buffer_length_;
    751   size_t copy = 0;
    752   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
    753 
    754   if (result == SR_SUCCESS) {
    755     // If read was successful then adjust the read position and number of
    756     // bytes buffered.
    757     read_position_ = (read_position_ + copy) % buffer_length_;
    758     data_length_ -= copy;
    759     if (bytes_read) {
    760       *bytes_read = copy;
    761     }
    762 
    763     // if we were full before, and now we're not, post an event
    764     if (!was_writable && copy > 0) {
    765       PostEvent(owner_, SE_WRITE, 0);
    766     }
    767   }
    768   return result;
    769 }
    770 
    771 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
    772                                size_t* bytes_written, int* error) {
    773   CritScope cs(&crit_);
    774 
    775   const bool was_readable = (data_length_ > 0);
    776   size_t copy = 0;
    777   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
    778 
    779   if (result == SR_SUCCESS) {
    780     // If write was successful then adjust the number of readable bytes.
    781     data_length_ += copy;
    782     if (bytes_written) {
    783       *bytes_written = copy;
    784     }
    785 
    786     // if we didn't have any data to read before, and now we do, post an event
    787     if (!was_readable && copy > 0) {
    788       PostEvent(owner_, SE_READ, 0);
    789     }
    790   }
    791   return result;
    792 }
    793 
    794 void FifoBuffer::Close() {
    795   CritScope cs(&crit_);
    796   state_ = SS_CLOSED;
    797 }
    798 
    799 const void* FifoBuffer::GetReadData(size_t* size) {
    800   CritScope cs(&crit_);
    801   *size = (read_position_ + data_length_ <= buffer_length_) ?
    802       data_length_ : buffer_length_ - read_position_;
    803   return &buffer_[read_position_];
    804 }
    805 
    806 void FifoBuffer::ConsumeReadData(size_t size) {
    807   CritScope cs(&crit_);
    808   ASSERT(size <= data_length_);
    809   const bool was_writable = data_length_ < buffer_length_;
    810   read_position_ = (read_position_ + size) % buffer_length_;
    811   data_length_ -= size;
    812   if (!was_writable && size > 0) {
    813     PostEvent(owner_, SE_WRITE, 0);
    814   }
    815 }
    816 
    817 void* FifoBuffer::GetWriteBuffer(size_t* size) {
    818   CritScope cs(&crit_);
    819   if (state_ == SS_CLOSED) {
    820     return NULL;
    821   }
    822 
    823   // if empty, reset the write position to the beginning, so we can get
    824   // the biggest possible block
    825   if (data_length_ == 0) {
    826     read_position_ = 0;
    827   }
    828 
    829   const size_t write_position = (read_position_ + data_length_)
    830       % buffer_length_;
    831   *size = (write_position > read_position_ || data_length_ == 0) ?
    832       buffer_length_ - write_position : read_position_ - write_position;
    833   return &buffer_[write_position];
    834 }
    835 
    836 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
    837   CritScope cs(&crit_);
    838   ASSERT(size <= buffer_length_ - data_length_);
    839   const bool was_readable = (data_length_ > 0);
    840   data_length_ += size;
    841   if (!was_readable && size > 0) {
    842     PostEvent(owner_, SE_READ, 0);
    843   }
    844 }
    845 
    846 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
    847   CritScope cs(&crit_);
    848   *size = buffer_length_ - data_length_;
    849   return true;
    850 }
    851 
    852 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
    853                                           size_t bytes,
    854                                           size_t offset,
    855                                           size_t* bytes_read) {
    856   if (offset >= data_length_) {
    857     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
    858   }
    859 
    860   const size_t available = data_length_ - offset;
    861   const size_t read_position = (read_position_ + offset) % buffer_length_;
    862   const size_t copy = std::min(bytes, available);
    863   const size_t tail_copy = std::min(copy, buffer_length_ - read_position);
    864   char* const p = static_cast<char*>(buffer);
    865   memcpy(p, &buffer_[read_position], tail_copy);
    866   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
    867 
    868   if (bytes_read) {
    869     *bytes_read = copy;
    870   }
    871   return SR_SUCCESS;
    872 }
    873 
    874 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
    875                                            size_t bytes,
    876                                            size_t offset,
    877                                            size_t* bytes_written) {
    878   if (state_ == SS_CLOSED) {
    879     return SR_EOS;
    880   }
    881 
    882   if (data_length_ + offset >= buffer_length_) {
    883     return SR_BLOCK;
    884   }
    885 
    886   const size_t available = buffer_length_ - data_length_ - offset;
    887   const size_t write_position = (read_position_ + data_length_ + offset)
    888       % buffer_length_;
    889   const size_t copy = std::min(bytes, available);
    890   const size_t tail_copy = std::min(copy, buffer_length_ - write_position);
    891   const char* const p = static_cast<const char*>(buffer);
    892   memcpy(&buffer_[write_position], p, tail_copy);
    893   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
    894 
    895   if (bytes_written) {
    896     *bytes_written = copy;
    897   }
    898   return SR_SUCCESS;
    899 }
    900 
    901 
    902 
    903 ///////////////////////////////////////////////////////////////////////////////
    904 // LoggingAdapter
    905 ///////////////////////////////////////////////////////////////////////////////
    906 
    907 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
    908                                const std::string& label, bool hex_mode)
    909     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
    910   set_label(label);
    911 }
    912 
    913 void LoggingAdapter::set_label(const std::string& label) {
    914   label_.assign("[");
    915   label_.append(label);
    916   label_.append("]");
    917 }
    918 
    919 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
    920                                   size_t* read, int* error) {
    921   size_t local_read; if (!read) read = &local_read;
    922   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
    923                                                      error);
    924   if (result == SR_SUCCESS) {
    925     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
    926   }
    927   return result;
    928 }
    929 
    930 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
    931                                    size_t* written, int* error) {
    932   size_t local_written;
    933   if (!written) written = &local_written;
    934   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
    935                                                       error);
    936   if (result == SR_SUCCESS) {
    937     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
    938                  &lms_);
    939   }
    940   return result;
    941 }
    942 
    943 void LoggingAdapter::Close() {
    944   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
    945   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
    946   LOG_V(level_) << label_ << " Closed locally";
    947   StreamAdapterInterface::Close();
    948 }
    949 
    950 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
    951   if (events & SE_OPEN) {
    952     LOG_V(level_) << label_ << " Open";
    953   } else if (events & SE_CLOSE) {
    954     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
    955     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
    956     LOG_V(level_) << label_ << " Closed with error: " << err;
    957   }
    958   StreamAdapterInterface::OnEvent(stream, events, err);
    959 }
    960 
    961 ///////////////////////////////////////////////////////////////////////////////
    962 // StringStream - Reads/Writes to an external std::string
    963 ///////////////////////////////////////////////////////////////////////////////
    964 
    965 StringStream::StringStream(std::string* str)
    966     : str_(*str), read_pos_(0), read_only_(false) {
    967 }
    968 
    969 StringStream::StringStream(const std::string& str)
    970     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
    971 }
    972 
    973 StreamState StringStream::GetState() const {
    974   return SS_OPEN;
    975 }
    976 
    977 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
    978                                       size_t* read, int* error) {
    979   size_t available = std::min(buffer_len, str_.size() - read_pos_);
    980   if (!available)
    981     return SR_EOS;
    982   memcpy(buffer, str_.data() + read_pos_, available);
    983   read_pos_ += available;
    984   if (read)
    985     *read = available;
    986   return SR_SUCCESS;
    987 }
    988 
    989 StreamResult StringStream::Write(const void* data, size_t data_len,
    990                                       size_t* written, int* error) {
    991   if (read_only_) {
    992     if (error) {
    993       *error = -1;
    994     }
    995     return SR_ERROR;
    996   }
    997   str_.append(static_cast<const char*>(data),
    998               static_cast<const char*>(data) + data_len);
    999   if (written)
   1000     *written = data_len;
   1001   return SR_SUCCESS;
   1002 }
   1003 
   1004 void StringStream::Close() {
   1005 }
   1006 
   1007 bool StringStream::SetPosition(size_t position) {
   1008   if (position > str_.size())
   1009     return false;
   1010   read_pos_ = position;
   1011   return true;
   1012 }
   1013 
   1014 bool StringStream::GetPosition(size_t* position) const {
   1015   if (position)
   1016     *position = read_pos_;
   1017   return true;
   1018 }
   1019 
   1020 bool StringStream::GetSize(size_t* size) const {
   1021   if (size)
   1022     *size = str_.size();
   1023   return true;
   1024 }
   1025 
   1026 bool StringStream::GetAvailable(size_t* size) const {
   1027   if (size)
   1028     *size = str_.size() - read_pos_;
   1029   return true;
   1030 }
   1031 
   1032 bool StringStream::ReserveSize(size_t size) {
   1033   if (read_only_)
   1034     return false;
   1035   str_.reserve(size);
   1036   return true;
   1037 }
   1038 
   1039 ///////////////////////////////////////////////////////////////////////////////
   1040 // StreamReference
   1041 ///////////////////////////////////////////////////////////////////////////////
   1042 
   1043 StreamReference::StreamReference(StreamInterface* stream)
   1044     : StreamAdapterInterface(stream, false) {
   1045   // owner set to false so the destructor does not free the stream.
   1046   stream_ref_count_ = new StreamRefCount(stream);
   1047 }
   1048 
   1049 StreamInterface* StreamReference::NewReference() {
   1050   stream_ref_count_->AddReference();
   1051   return new StreamReference(stream_ref_count_, stream());
   1052 }
   1053 
   1054 StreamReference::~StreamReference() {
   1055   stream_ref_count_->Release();
   1056 }
   1057 
   1058 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
   1059                                  StreamInterface* stream)
   1060     : StreamAdapterInterface(stream, false),
   1061       stream_ref_count_(stream_ref_count) {
   1062 }
   1063 
   1064 ///////////////////////////////////////////////////////////////////////////////
   1065 
   1066 StreamResult Flow(StreamInterface* source,
   1067                   char* buffer, size_t buffer_len,
   1068                   StreamInterface* sink,
   1069                   size_t* data_len /* = NULL */) {
   1070   ASSERT(buffer_len > 0);
   1071 
   1072   StreamResult result;
   1073   size_t count, read_pos, write_pos;
   1074   if (data_len) {
   1075     read_pos = *data_len;
   1076   } else {
   1077     read_pos = 0;
   1078   }
   1079 
   1080   bool end_of_stream = false;
   1081   do {
   1082     // Read until buffer is full, end of stream, or error
   1083     while (!end_of_stream && (read_pos < buffer_len)) {
   1084       result = source->Read(buffer + read_pos, buffer_len - read_pos,
   1085                             &count, NULL);
   1086       if (result == SR_EOS) {
   1087         end_of_stream = true;
   1088       } else if (result != SR_SUCCESS) {
   1089         if (data_len) {
   1090           *data_len = read_pos;
   1091         }
   1092         return result;
   1093       } else {
   1094         read_pos += count;
   1095       }
   1096     }
   1097 
   1098     // Write until buffer is empty, or error (including end of stream)
   1099     write_pos = 0;
   1100     while (write_pos < read_pos) {
   1101       result = sink->Write(buffer + write_pos, read_pos - write_pos,
   1102                            &count, NULL);
   1103       if (result != SR_SUCCESS) {
   1104         if (data_len) {
   1105           *data_len = read_pos - write_pos;
   1106           if (write_pos > 0) {
   1107             memmove(buffer, buffer + write_pos, *data_len);
   1108           }
   1109         }
   1110         return result;
   1111       }
   1112       write_pos += count;
   1113     }
   1114 
   1115     read_pos = 0;
   1116   } while (!end_of_stream);
   1117 
   1118   if (data_len) {
   1119     *data_len = 0;
   1120   }
   1121   return SR_SUCCESS;
   1122 }
   1123 
   1124 ///////////////////////////////////////////////////////////////////////////////
   1125 
   1126 }  // namespace rtc
   1127