Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004 Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #if defined(POSIX)
     29 #include <sys/file.h>
     30 #endif  // POSIX
     31 #include <sys/types.h>
     32 #include <sys/stat.h>
     33 #include <errno.h>
     34 #include <string>
     35 #include "talk/base/basictypes.h"
     36 #include "talk/base/common.h"
     37 #include "talk/base/logging.h"
     38 #include "talk/base/messagequeue.h"
     39 #include "talk/base/stream.h"
     40 #include "talk/base/stringencode.h"
     41 #include "talk/base/stringutils.h"
     42 #include "talk/base/thread.h"
     43 #include "talk/base/timeutils.h"
     44 
     45 #ifdef WIN32
     46 #include "talk/base/win32.h"
     47 #define fileno _fileno
     48 #endif
     49 
     50 namespace talk_base {
     51 
     52 ///////////////////////////////////////////////////////////////////////////////
     53 // StreamInterface
     54 ///////////////////////////////////////////////////////////////////////////////
     55 StreamInterface::~StreamInterface() {
     56 }
     57 
     58 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
     59                                        size_t* written, int* error) {
     60   StreamResult result = SR_SUCCESS;
     61   size_t total_written = 0, current_written;
     62   while (total_written < data_len) {
     63     result = Write(static_cast<const char*>(data) + total_written,
     64                    data_len - total_written, &current_written, error);
     65     if (result != SR_SUCCESS)
     66       break;
     67     total_written += current_written;
     68   }
     69   if (written)
     70     *written = total_written;
     71   return result;
     72 }
     73 
     74 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
     75                                       size_t* read, int* error) {
     76   StreamResult result = SR_SUCCESS;
     77   size_t total_read = 0, current_read;
     78   while (total_read < buffer_len) {
     79     result = Read(static_cast<char*>(buffer) + total_read,
     80                   buffer_len - total_read, &current_read, error);
     81     if (result != SR_SUCCESS)
     82       break;
     83     total_read += current_read;
     84   }
     85   if (read)
     86     *read = total_read;
     87   return result;
     88 }
     89 
     90 StreamResult StreamInterface::ReadLine(std::string* line) {
     91   line->clear();
     92   StreamResult result = SR_SUCCESS;
     93   while (true) {
     94     char ch;
     95     result = Read(&ch, sizeof(ch), NULL, NULL);
     96     if (result != SR_SUCCESS) {
     97       break;
     98     }
     99     if (ch == '\n') {
    100       break;
    101     }
    102     line->push_back(ch);
    103   }
    104   if (!line->empty()) {   // give back the line we've collected so far with
    105     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
    106   }
    107   return result;
    108 }
    109 
    110 void StreamInterface::PostEvent(Thread* t, int events, int err) {
    111   t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err));
    112 }
    113 
    114 void StreamInterface::PostEvent(int events, int err) {
    115   PostEvent(Thread::Current(), events, err);
    116 }
    117 
    118 StreamInterface::StreamInterface() {
    119 }
    120 
    121 void StreamInterface::OnMessage(Message* msg) {
    122   if (MSG_POST_EVENT == msg->message_id) {
    123     StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata);
    124     SignalEvent(this, pe->events, pe->error);
    125     delete msg->pdata;
    126   }
    127 }
    128 
    129 ///////////////////////////////////////////////////////////////////////////////
    130 // StreamAdapterInterface
    131 ///////////////////////////////////////////////////////////////////////////////
    132 
    133 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
    134                                                bool owned)
    135     : stream_(stream), owned_(owned) {
    136   if (NULL != stream_)
    137     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    138 }
    139 
    140 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
    141   if (NULL != stream_)
    142     stream_->SignalEvent.disconnect(this);
    143   if (owned_)
    144     delete stream_;
    145   stream_ = stream;
    146   owned_ = owned;
    147   if (NULL != stream_)
    148     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    149 }
    150 
    151 StreamInterface* StreamAdapterInterface::Detach() {
    152   if (NULL != stream_)
    153     stream_->SignalEvent.disconnect(this);
    154   StreamInterface* stream = stream_;
    155   stream_ = NULL;
    156   return stream;
    157 }
    158 
    159 StreamAdapterInterface::~StreamAdapterInterface() {
    160   if (owned_)
    161     delete stream_;
    162 }
    163 
    164 ///////////////////////////////////////////////////////////////////////////////
    165 // StreamTap
    166 ///////////////////////////////////////////////////////////////////////////////
    167 
    168 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
    169     : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
    170         tap_error_(0) {
    171   AttachTap(tap);
    172 }
    173 
    174 void StreamTap::AttachTap(StreamInterface* tap) {
    175   tap_.reset(tap);
    176 }
    177 
    178 StreamInterface* StreamTap::DetachTap() {
    179   return tap_.release();
    180 }
    181 
    182 StreamResult StreamTap::GetTapResult(int* error) {
    183   if (error) {
    184     *error = tap_error_;
    185   }
    186   return tap_result_;
    187 }
    188 
    189 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
    190                              size_t* read, int* error) {
    191   size_t backup_read;
    192   if (!read) {
    193     read = &backup_read;
    194   }
    195   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
    196                                                   read, error);
    197   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    198     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
    199   }
    200   return res;
    201 }
    202 
    203 StreamResult StreamTap::Write(const void* data, size_t data_len,
    204                               size_t* written, int* error) {
    205   size_t backup_written;
    206   if (!written) {
    207     written = &backup_written;
    208   }
    209   StreamResult res = StreamAdapterInterface::Write(data, data_len,
    210                                                    written, error);
    211   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    212     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
    213   }
    214   return res;
    215 }
    216 
    217 ///////////////////////////////////////////////////////////////////////////////
    218 // StreamSegment
    219 ///////////////////////////////////////////////////////////////////////////////
    220 
    221 StreamSegment::StreamSegment(StreamInterface* stream)
    222     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    223     length_(SIZE_UNKNOWN) {
    224   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    225   stream->GetPosition(&start_);
    226 }
    227 
    228 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
    229     : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    230     length_(length) {
    231   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    232   stream->GetPosition(&start_);
    233 }
    234 
    235 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
    236                                  size_t* read, int* error) {
    237   if (SIZE_UNKNOWN != length_) {
    238     if (pos_ >= length_)
    239       return SR_EOS;
    240     buffer_len = _min(buffer_len, length_ - pos_);
    241   }
    242   size_t backup_read;
    243   if (!read) {
    244     read = &backup_read;
    245   }
    246   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
    247                                                      read, error);
    248   if (SR_SUCCESS == result) {
    249     pos_ += *read;
    250   }
    251   return result;
    252 }
    253 
    254 bool StreamSegment::SetPosition(size_t position) {
    255   if (SIZE_UNKNOWN == start_)
    256     return false;  // Not seekable
    257   if ((SIZE_UNKNOWN != length_) && (position > length_))
    258     return false;  // Seek past end of segment
    259   if (!StreamAdapterInterface::SetPosition(start_ + position))
    260     return false;
    261   pos_ = position;
    262   return true;
    263 }
    264 
    265 bool StreamSegment::GetPosition(size_t* position) const {
    266   if (SIZE_UNKNOWN == start_)
    267     return false;  // Not seekable
    268   if (!StreamAdapterInterface::GetPosition(position))
    269     return false;
    270   if (position) {
    271     ASSERT(*position >= start_);
    272     *position -= start_;
    273   }
    274   return true;
    275 }
    276 
    277 bool StreamSegment::GetSize(size_t* size) const {
    278   if (!StreamAdapterInterface::GetSize(size))
    279     return false;
    280   if (size) {
    281     if (SIZE_UNKNOWN != start_) {
    282       ASSERT(*size >= start_);
    283       *size -= start_;
    284     }
    285     if (SIZE_UNKNOWN != length_) {
    286       *size = _min(*size, length_);
    287     }
    288   }
    289   return true;
    290 }
    291 
    292 bool StreamSegment::GetAvailable(size_t* size) const {
    293   if (!StreamAdapterInterface::GetAvailable(size))
    294     return false;
    295   if (size && (SIZE_UNKNOWN != length_))
    296     *size = _min(*size, length_ - pos_);
    297   return true;
    298 }
    299 
    300 ///////////////////////////////////////////////////////////////////////////////
    301 // NullStream
    302 ///////////////////////////////////////////////////////////////////////////////
    303 
    304 NullStream::NullStream() {
    305 }
    306 
    307 NullStream::~NullStream() {
    308 }
    309 
    310 StreamState NullStream::GetState() const {
    311   return SS_OPEN;
    312 }
    313 
    314 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
    315                               size_t* read, int* error) {
    316   if (error) *error = -1;
    317   return SR_ERROR;
    318 }
    319 
    320 StreamResult NullStream::Write(const void* data, size_t data_len,
    321                                size_t* written, int* error) {
    322   if (written) *written = data_len;
    323   return SR_SUCCESS;
    324 }
    325 
    326 void NullStream::Close() {
    327 }
    328 
    329 ///////////////////////////////////////////////////////////////////////////////
    330 // FileStream
    331 ///////////////////////////////////////////////////////////////////////////////
    332 
    333 FileStream::FileStream() : file_(NULL) {
    334 }
    335 
    336 FileStream::~FileStream() {
    337   FileStream::Close();
    338 }
    339 
    340 bool FileStream::Open(const std::string& filename, const char* mode,
    341                       int* error) {
    342   Close();
    343 #ifdef WIN32
    344   std::wstring wfilename;
    345   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    346     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
    347   } else {
    348     if (error) {
    349       *error = -1;
    350       return false;
    351     }
    352   }
    353 #else
    354   file_ = fopen(filename.c_str(), mode);
    355 #endif
    356   if (!file_ && error) {
    357     *error = errno;
    358   }
    359   return (file_ != NULL);
    360 }
    361 
    362 bool FileStream::OpenShare(const std::string& filename, const char* mode,
    363                            int shflag, int* error) {
    364   Close();
    365 #ifdef WIN32
    366   std::wstring wfilename;
    367   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    368     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
    369     if (!file_ && error) {
    370       *error = errno;
    371       return false;
    372     }
    373     return file_ != NULL;
    374   } else {
    375     if (error) {
    376       *error = -1;
    377     }
    378     return false;
    379   }
    380 #else
    381   return Open(filename, mode, error);
    382 #endif
    383 }
    384 
    385 bool FileStream::DisableBuffering() {
    386   if (!file_)
    387     return false;
    388   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
    389 }
    390 
    391 StreamState FileStream::GetState() const {
    392   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
    393 }
    394 
    395 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
    396                               size_t* read, int* error) {
    397   if (!file_)
    398     return SR_EOS;
    399   size_t result = fread(buffer, 1, buffer_len, file_);
    400   if ((result == 0) && (buffer_len > 0)) {
    401     if (feof(file_))
    402       return SR_EOS;
    403     if (error)
    404       *error = errno;
    405     return SR_ERROR;
    406   }
    407   if (read)
    408     *read = result;
    409   return SR_SUCCESS;
    410 }
    411 
    412 StreamResult FileStream::Write(const void* data, size_t data_len,
    413                                size_t* written, int* error) {
    414   if (!file_)
    415     return SR_EOS;
    416   size_t result = fwrite(data, 1, data_len, file_);
    417   if ((result == 0) && (data_len > 0)) {
    418     if (error)
    419       *error = errno;
    420     return SR_ERROR;
    421   }
    422   if (written)
    423     *written = result;
    424   return SR_SUCCESS;
    425 }
    426 
    427 void FileStream::Close() {
    428   if (file_) {
    429     DoClose();
    430     file_ = NULL;
    431   }
    432 }
    433 
    434 bool FileStream::SetPosition(size_t position) {
    435   if (!file_)
    436     return false;
    437   return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0);
    438 }
    439 
    440 bool FileStream::GetPosition(size_t* position) const {
    441   ASSERT(NULL != position);
    442   if (!file_)
    443     return false;
    444   long result = ftell(file_);
    445   if (result < 0)
    446     return false;
    447   if (position)
    448     *position = result;
    449   return true;
    450 }
    451 
    452 bool FileStream::GetSize(size_t* size) const {
    453   ASSERT(NULL != size);
    454   if (!file_)
    455     return false;
    456   struct stat file_stats;
    457   if (fstat(fileno(file_), &file_stats) != 0)
    458     return false;
    459   if (size)
    460     *size = file_stats.st_size;
    461   return true;
    462 }
    463 
    464 bool FileStream::GetAvailable(size_t* size) const {
    465   ASSERT(NULL != size);
    466   if (!GetSize(size))
    467     return false;
    468   long result = ftell(file_);
    469   if (result < 0)
    470     return false;
    471   if (size)
    472     *size -= result;
    473   return true;
    474 }
    475 
    476 bool FileStream::ReserveSize(size_t size) {
    477   // TODO: extend the file to the proper length
    478   return true;
    479 }
    480 
    481 bool FileStream::GetSize(const std::string& filename, size_t* size) {
    482   struct stat file_stats;
    483   if (stat(filename.c_str(), &file_stats) != 0)
    484     return false;
    485   *size = file_stats.st_size;
    486   return true;
    487 }
    488 
    489 bool FileStream::Flush() {
    490   if (file_) {
    491     return (0 == fflush(file_));
    492   }
    493   // try to flush empty file?
    494   ASSERT(false);
    495   return false;
    496 }
    497 
    498 #if defined(POSIX)
    499 
    500 bool FileStream::TryLock() {
    501   if (file_ == NULL) {
    502     // Stream not open.
    503     ASSERT(false);
    504     return false;
    505   }
    506 
    507   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
    508 }
    509 
    510 bool FileStream::Unlock() {
    511   if (file_ == NULL) {
    512     // Stream not open.
    513     ASSERT(false);
    514     return false;
    515   }
    516 
    517   return flock(fileno(file_), LOCK_UN) == 0;
    518 }
    519 
    520 #endif
    521 
    522 void FileStream::DoClose() {
    523   fclose(file_);
    524 }
    525 
    526 AsyncWriteStream::~AsyncWriteStream() {
    527   write_thread_->Clear(this, 0, NULL);
    528   ClearBufferAndWrite();
    529 
    530   CritScope cs(&crit_stream_);
    531   stream_.reset();
    532 }
    533 
    534 // This is needed by some stream writers, such as RtpDumpWriter.
    535 bool AsyncWriteStream::GetPosition(size_t* position) const {
    536   CritScope cs(&crit_stream_);
    537   return stream_->GetPosition(position);
    538 }
    539 
    540 // This is needed by some stream writers, such as the plugin log writers.
    541 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len,
    542                                     size_t* read, int* error) {
    543   CritScope cs(&crit_stream_);
    544   return stream_->Read(buffer, buffer_len, read, error);
    545 }
    546 
    547 void AsyncWriteStream::Close() {
    548   if (state_ == SS_CLOSED) {
    549     return;
    550   }
    551 
    552   write_thread_->Clear(this, 0, NULL);
    553   ClearBufferAndWrite();
    554 
    555   CritScope cs(&crit_stream_);
    556   stream_->Close();
    557   state_ = SS_CLOSED;
    558 }
    559 
    560 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len,
    561                                      size_t* written, int* error) {
    562   if (state_ == SS_CLOSED) {
    563     return SR_ERROR;
    564   }
    565 
    566   size_t previous_buffer_length = 0;
    567   {
    568     CritScope cs(&crit_buffer_);
    569     previous_buffer_length = buffer_.length();
    570     buffer_.AppendData(data, data_len);
    571   }
    572 
    573   if (previous_buffer_length == 0) {
    574     // If there's stuff already in the buffer, then we already called
    575     // Post and the write_thread_ hasn't pulled it out yet, so we
    576     // don't need to re-Post.
    577     write_thread_->Post(this, 0, NULL);
    578   }
    579   // Return immediately, assuming that it works.
    580   if (written) {
    581     *written = data_len;
    582   }
    583   return SR_SUCCESS;
    584 }
    585 
    586 void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) {
    587   ClearBufferAndWrite();
    588 }
    589 
    590 bool AsyncWriteStream::Flush() {
    591   if (state_ == SS_CLOSED) {
    592     return false;
    593   }
    594 
    595   ClearBufferAndWrite();
    596 
    597   CritScope cs(&crit_stream_);
    598   return stream_->Flush();
    599 }
    600 
    601 void AsyncWriteStream::ClearBufferAndWrite() {
    602   Buffer to_write;
    603   {
    604     CritScope cs_buffer(&crit_buffer_);
    605     buffer_.TransferTo(&to_write);
    606   }
    607 
    608   if (to_write.length() > 0) {
    609     CritScope cs(&crit_stream_);
    610     stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL);
    611   }
    612 }
    613 
    614 #ifdef POSIX
    615 
    616 // Have to identically rewrite the FileStream destructor or else it would call
    617 // the base class's Close() instead of the sub-class's.
    618 POpenStream::~POpenStream() {
    619   POpenStream::Close();
    620 }
    621 
    622 bool POpenStream::Open(const std::string& subcommand,
    623                        const char* mode,
    624                        int* error) {
    625   Close();
    626   file_ = popen(subcommand.c_str(), mode);
    627   if (file_ == NULL) {
    628     if (error)
    629       *error = errno;
    630     return false;
    631   }
    632   return true;
    633 }
    634 
    635 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
    636                             int shflag, int* error) {
    637   return Open(subcommand, mode, error);
    638 }
    639 
    640 void POpenStream::DoClose() {
    641   wait_status_ = pclose(file_);
    642 }
    643 
    644 #endif
    645 
    646 ///////////////////////////////////////////////////////////////////////////////
    647 // MemoryStream
    648 ///////////////////////////////////////////////////////////////////////////////
    649 
    650 MemoryStreamBase::MemoryStreamBase()
    651   : buffer_(NULL), buffer_length_(0), data_length_(0),
    652     seek_position_(0) {
    653 }
    654 
    655 StreamState MemoryStreamBase::GetState() const {
    656   return SS_OPEN;
    657 }
    658 
    659 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
    660                                     size_t* bytes_read, int* error) {
    661   if (seek_position_ >= data_length_) {
    662     return SR_EOS;
    663   }
    664   size_t available = data_length_ - seek_position_;
    665   if (bytes > available) {
    666     // Read partial buffer
    667     bytes = available;
    668   }
    669   memcpy(buffer, &buffer_[seek_position_], bytes);
    670   seek_position_ += bytes;
    671   if (bytes_read) {
    672     *bytes_read = bytes;
    673   }
    674   return SR_SUCCESS;
    675 }
    676 
    677 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
    678                                      size_t* bytes_written, int* error) {
    679   size_t available = buffer_length_ - seek_position_;
    680   if (0 == available) {
    681     // Increase buffer size to the larger of:
    682     // a) new position rounded up to next 256 bytes
    683     // b) double the previous length
    684     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
    685                                     buffer_length_ * 2);
    686     StreamResult result = DoReserve(new_buffer_length, error);
    687     if (SR_SUCCESS != result) {
    688       return result;
    689     }
    690     ASSERT(buffer_length_ >= new_buffer_length);
    691     available = buffer_length_ - seek_position_;
    692   }
    693 
    694   if (bytes > available) {
    695     bytes = available;
    696   }
    697   memcpy(&buffer_[seek_position_], buffer, bytes);
    698   seek_position_ += bytes;
    699   if (data_length_ < seek_position_) {
    700     data_length_ = seek_position_;
    701   }
    702   if (bytes_written) {
    703     *bytes_written = bytes;
    704   }
    705   return SR_SUCCESS;
    706 }
    707 
    708 void MemoryStreamBase::Close() {
    709   // nothing to do
    710 }
    711 
    712 bool MemoryStreamBase::SetPosition(size_t position) {
    713   if (position > data_length_)
    714     return false;
    715   seek_position_ = position;
    716   return true;
    717 }
    718 
    719 bool MemoryStreamBase::GetPosition(size_t* position) const {
    720   if (position)
    721     *position = seek_position_;
    722   return true;
    723 }
    724 
    725 bool MemoryStreamBase::GetSize(size_t* size) const {
    726   if (size)
    727     *size = data_length_;
    728   return true;
    729 }
    730 
    731 bool MemoryStreamBase::GetAvailable(size_t* size) const {
    732   if (size)
    733     *size = data_length_ - seek_position_;
    734   return true;
    735 }
    736 
    737 bool MemoryStreamBase::ReserveSize(size_t size) {
    738   return (SR_SUCCESS == DoReserve(size, NULL));
    739 }
    740 
    741 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
    742   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
    743 }
    744 
    745 ///////////////////////////////////////////////////////////////////////////////
    746 
    747 MemoryStream::MemoryStream()
    748   : buffer_alloc_(NULL) {
    749 }
    750 
    751 MemoryStream::MemoryStream(const char* data)
    752   : buffer_alloc_(NULL) {
    753   SetData(data, strlen(data));
    754 }
    755 
    756 MemoryStream::MemoryStream(const void* data, size_t length)
    757   : buffer_alloc_(NULL) {
    758   SetData(data, length);
    759 }
    760 
    761 MemoryStream::~MemoryStream() {
    762   delete [] buffer_alloc_;
    763 }
    764 
    765 void MemoryStream::SetData(const void* data, size_t length) {
    766   data_length_ = buffer_length_ = length;
    767   delete [] buffer_alloc_;
    768   buffer_alloc_ = new char[buffer_length_ + kAlignment];
    769   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
    770   memcpy(buffer_, data, data_length_);
    771   seek_position_ = 0;
    772 }
    773 
    774 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
    775   if (buffer_length_ >= size)
    776     return SR_SUCCESS;
    777 
    778   if (char* new_buffer_alloc = new char[size + kAlignment]) {
    779     char* new_buffer = reinterpret_cast<char*>(
    780         ALIGNP(new_buffer_alloc, kAlignment));
    781     memcpy(new_buffer, buffer_, data_length_);
    782     delete [] buffer_alloc_;
    783     buffer_alloc_ = new_buffer_alloc;
    784     buffer_ = new_buffer;
    785     buffer_length_ = size;
    786     return SR_SUCCESS;
    787   }
    788 
    789   if (error) {
    790     *error = ENOMEM;
    791   }
    792   return SR_ERROR;
    793 }
    794 
    795 ///////////////////////////////////////////////////////////////////////////////
    796 
    797 ExternalMemoryStream::ExternalMemoryStream() {
    798 }
    799 
    800 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
    801   SetData(data, length);
    802 }
    803 
    804 ExternalMemoryStream::~ExternalMemoryStream() {
    805 }
    806 
    807 void ExternalMemoryStream::SetData(void* data, size_t length) {
    808   data_length_ = buffer_length_ = length;
    809   buffer_ = static_cast<char*>(data);
    810   seek_position_ = 0;
    811 }
    812 
    813 ///////////////////////////////////////////////////////////////////////////////
    814 // FifoBuffer
    815 ///////////////////////////////////////////////////////////////////////////////
    816 
    817 FifoBuffer::FifoBuffer(size_t size)
    818     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    819       data_length_(0), read_position_(0), owner_(Thread::Current()) {
    820   // all events are done on the owner_ thread
    821 }
    822 
    823 FifoBuffer::FifoBuffer(size_t size, Thread* owner)
    824     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    825       data_length_(0), read_position_(0), owner_(owner) {
    826   // all events are done on the owner_ thread
    827 }
    828 
    829 FifoBuffer::~FifoBuffer() {
    830 }
    831 
    832 bool FifoBuffer::GetBuffered(size_t* size) const {
    833   CritScope cs(&crit_);
    834   *size = data_length_;
    835   return true;
    836 }
    837 
    838 bool FifoBuffer::SetCapacity(size_t size) {
    839   CritScope cs(&crit_);
    840   if (data_length_ > size) {
    841     return false;
    842   }
    843 
    844   if (size != buffer_length_) {
    845     char* buffer = new char[size];
    846     const size_t copy = data_length_;
    847     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
    848     memcpy(buffer, &buffer_[read_position_], tail_copy);
    849     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
    850     buffer_.reset(buffer);
    851     read_position_ = 0;
    852     buffer_length_ = size;
    853   }
    854   return true;
    855 }
    856 
    857 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes,
    858                                     size_t offset, size_t* bytes_read) {
    859   CritScope cs(&crit_);
    860   return ReadOffsetLocked(buffer, bytes, offset, bytes_read);
    861 }
    862 
    863 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes,
    864                                      size_t offset, size_t* bytes_written) {
    865   CritScope cs(&crit_);
    866   return WriteOffsetLocked(buffer, bytes, offset, bytes_written);
    867 }
    868 
    869 StreamState FifoBuffer::GetState() const {
    870   return state_;
    871 }
    872 
    873 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
    874                               size_t* bytes_read, int* error) {
    875   CritScope cs(&crit_);
    876   const bool was_writable = data_length_ < buffer_length_;
    877   size_t copy = 0;
    878   StreamResult result = ReadOffsetLocked(buffer, bytes, 0, &copy);
    879 
    880   if (result == SR_SUCCESS) {
    881     // If read was successful then adjust the read position and number of
    882     // bytes buffered.
    883     read_position_ = (read_position_ + copy) % buffer_length_;
    884     data_length_ -= copy;
    885     if (bytes_read) {
    886       *bytes_read = copy;
    887     }
    888 
    889     // if we were full before, and now we're not, post an event
    890     if (!was_writable && copy > 0) {
    891       PostEvent(owner_, SE_WRITE, 0);
    892     }
    893   }
    894   return result;
    895 }
    896 
    897 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
    898                                size_t* bytes_written, int* error) {
    899   CritScope cs(&crit_);
    900 
    901   const bool was_readable = (data_length_ > 0);
    902   size_t copy = 0;
    903   StreamResult result = WriteOffsetLocked(buffer, bytes, 0, &copy);
    904 
    905   if (result == SR_SUCCESS) {
    906     // If write was successful then adjust the number of readable bytes.
    907     data_length_ += copy;
    908     if (bytes_written) {
    909       *bytes_written = copy;
    910     }
    911 
    912     // if we didn't have any data to read before, and now we do, post an event
    913     if (!was_readable && copy > 0) {
    914       PostEvent(owner_, SE_READ, 0);
    915     }
    916   }
    917   return result;
    918 }
    919 
    920 void FifoBuffer::Close() {
    921   CritScope cs(&crit_);
    922   state_ = SS_CLOSED;
    923 }
    924 
    925 const void* FifoBuffer::GetReadData(size_t* size) {
    926   CritScope cs(&crit_);
    927   *size = (read_position_ + data_length_ <= buffer_length_) ?
    928       data_length_ : buffer_length_ - read_position_;
    929   return &buffer_[read_position_];
    930 }
    931 
    932 void FifoBuffer::ConsumeReadData(size_t size) {
    933   CritScope cs(&crit_);
    934   ASSERT(size <= data_length_);
    935   const bool was_writable = data_length_ < buffer_length_;
    936   read_position_ = (read_position_ + size) % buffer_length_;
    937   data_length_ -= size;
    938   if (!was_writable && size > 0) {
    939     PostEvent(owner_, SE_WRITE, 0);
    940   }
    941 }
    942 
    943 void* FifoBuffer::GetWriteBuffer(size_t* size) {
    944   CritScope cs(&crit_);
    945   if (state_ == SS_CLOSED) {
    946     return NULL;
    947   }
    948 
    949   // if empty, reset the write position to the beginning, so we can get
    950   // the biggest possible block
    951   if (data_length_ == 0) {
    952     read_position_ = 0;
    953   }
    954 
    955   const size_t write_position = (read_position_ + data_length_)
    956       % buffer_length_;
    957   *size = (write_position > read_position_ || data_length_ == 0) ?
    958       buffer_length_ - write_position : read_position_ - write_position;
    959   return &buffer_[write_position];
    960 }
    961 
    962 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
    963   CritScope cs(&crit_);
    964   ASSERT(size <= buffer_length_ - data_length_);
    965   const bool was_readable = (data_length_ > 0);
    966   data_length_ += size;
    967   if (!was_readable && size > 0) {
    968     PostEvent(owner_, SE_READ, 0);
    969   }
    970 }
    971 
    972 bool FifoBuffer::GetWriteRemaining(size_t* size) const {
    973   CritScope cs(&crit_);
    974   *size = buffer_length_ - data_length_;
    975   return true;
    976 }
    977 
    978 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer,
    979                                           size_t bytes,
    980                                           size_t offset,
    981                                           size_t* bytes_read) {
    982   if (offset >= data_length_) {
    983     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
    984   }
    985 
    986   const size_t available = data_length_ - offset;
    987   const size_t read_position = (read_position_ + offset) % buffer_length_;
    988   const size_t copy = _min(bytes, available);
    989   const size_t tail_copy = _min(copy, buffer_length_ - read_position);
    990   char* const p = static_cast<char*>(buffer);
    991   memcpy(p, &buffer_[read_position], tail_copy);
    992   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
    993 
    994   if (bytes_read) {
    995     *bytes_read = copy;
    996   }
    997   return SR_SUCCESS;
    998 }
    999 
   1000 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer,
   1001                                            size_t bytes,
   1002                                            size_t offset,
   1003                                            size_t* bytes_written) {
   1004   if (state_ == SS_CLOSED) {
   1005     return SR_EOS;
   1006   }
   1007 
   1008   if (data_length_ + offset >= buffer_length_) {
   1009     return SR_BLOCK;
   1010   }
   1011 
   1012   const size_t available = buffer_length_ - data_length_ - offset;
   1013   const size_t write_position = (read_position_ + data_length_ + offset)
   1014       % buffer_length_;
   1015   const size_t copy = _min(bytes, available);
   1016   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
   1017   const char* const p = static_cast<const char*>(buffer);
   1018   memcpy(&buffer_[write_position], p, tail_copy);
   1019   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
   1020 
   1021   if (bytes_written) {
   1022     *bytes_written = copy;
   1023   }
   1024   return SR_SUCCESS;
   1025 }
   1026 
   1027 
   1028 
   1029 ///////////////////////////////////////////////////////////////////////////////
   1030 // LoggingAdapter
   1031 ///////////////////////////////////////////////////////////////////////////////
   1032 
   1033 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
   1034                                const std::string& label, bool hex_mode)
   1035     : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) {
   1036   set_label(label);
   1037 }
   1038 
   1039 void LoggingAdapter::set_label(const std::string& label) {
   1040   label_.assign("[");
   1041   label_.append(label);
   1042   label_.append("]");
   1043 }
   1044 
   1045 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
   1046                                   size_t* read, int* error) {
   1047   size_t local_read; if (!read) read = &local_read;
   1048   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
   1049                                                      error);
   1050   if (result == SR_SUCCESS) {
   1051     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
   1052   }
   1053   return result;
   1054 }
   1055 
   1056 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
   1057                                    size_t* written, int* error) {
   1058   size_t local_written;
   1059   if (!written) written = &local_written;
   1060   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
   1061                                                       error);
   1062   if (result == SR_SUCCESS) {
   1063     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
   1064                  &lms_);
   1065   }
   1066   return result;
   1067 }
   1068 
   1069 void LoggingAdapter::Close() {
   1070   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1071   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1072   LOG_V(level_) << label_ << " Closed locally";
   1073   StreamAdapterInterface::Close();
   1074 }
   1075 
   1076 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
   1077   if (events & SE_OPEN) {
   1078     LOG_V(level_) << label_ << " Open";
   1079   } else if (events & SE_CLOSE) {
   1080     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
   1081     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
   1082     LOG_V(level_) << label_ << " Closed with error: " << err;
   1083   }
   1084   StreamAdapterInterface::OnEvent(stream, events, err);
   1085 }
   1086 
   1087 ///////////////////////////////////////////////////////////////////////////////
   1088 // StringStream - Reads/Writes to an external std::string
   1089 ///////////////////////////////////////////////////////////////////////////////
   1090 
   1091 StringStream::StringStream(std::string& str)
   1092     : str_(str), read_pos_(0), read_only_(false) {
   1093 }
   1094 
   1095 StringStream::StringStream(const std::string& str)
   1096     : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) {
   1097 }
   1098 
   1099 StreamState StringStream::GetState() const {
   1100   return SS_OPEN;
   1101 }
   1102 
   1103 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
   1104                                       size_t* read, int* error) {
   1105   size_t available = _min(buffer_len, str_.size() - read_pos_);
   1106   if (!available)
   1107     return SR_EOS;
   1108   memcpy(buffer, str_.data() + read_pos_, available);
   1109   read_pos_ += available;
   1110   if (read)
   1111     *read = available;
   1112   return SR_SUCCESS;
   1113 }
   1114 
   1115 StreamResult StringStream::Write(const void* data, size_t data_len,
   1116                                       size_t* written, int* error) {
   1117   if (read_only_) {
   1118     if (error) {
   1119       *error = -1;
   1120     }
   1121     return SR_ERROR;
   1122   }
   1123   str_.append(static_cast<const char*>(data),
   1124               static_cast<const char*>(data) + data_len);
   1125   if (written)
   1126     *written = data_len;
   1127   return SR_SUCCESS;
   1128 }
   1129 
   1130 void StringStream::Close() {
   1131 }
   1132 
   1133 bool StringStream::SetPosition(size_t position) {
   1134   if (position > str_.size())
   1135     return false;
   1136   read_pos_ = position;
   1137   return true;
   1138 }
   1139 
   1140 bool StringStream::GetPosition(size_t* position) const {
   1141   if (position)
   1142     *position = read_pos_;
   1143   return true;
   1144 }
   1145 
   1146 bool StringStream::GetSize(size_t* size) const {
   1147   if (size)
   1148     *size = str_.size();
   1149   return true;
   1150 }
   1151 
   1152 bool StringStream::GetAvailable(size_t* size) const {
   1153   if (size)
   1154     *size = str_.size() - read_pos_;
   1155   return true;
   1156 }
   1157 
   1158 bool StringStream::ReserveSize(size_t size) {
   1159   if (read_only_)
   1160     return false;
   1161   str_.reserve(size);
   1162   return true;
   1163 }
   1164 
   1165 ///////////////////////////////////////////////////////////////////////////////
   1166 // StreamReference
   1167 ///////////////////////////////////////////////////////////////////////////////
   1168 
   1169 StreamReference::StreamReference(StreamInterface* stream)
   1170     : StreamAdapterInterface(stream, false) {
   1171   // owner set to false so the destructor does not free the stream.
   1172   stream_ref_count_ = new StreamRefCount(stream);
   1173 }
   1174 
   1175 StreamInterface* StreamReference::NewReference() {
   1176   stream_ref_count_->AddReference();
   1177   return new StreamReference(stream_ref_count_, stream());
   1178 }
   1179 
   1180 StreamReference::~StreamReference() {
   1181   stream_ref_count_->Release();
   1182 }
   1183 
   1184 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
   1185                                  StreamInterface* stream)
   1186     : StreamAdapterInterface(stream, false),
   1187       stream_ref_count_(stream_ref_count) {
   1188 }
   1189 
   1190 ///////////////////////////////////////////////////////////////////////////////
   1191 
   1192 StreamResult Flow(StreamInterface* source,
   1193                   char* buffer, size_t buffer_len,
   1194                   StreamInterface* sink,
   1195                   size_t* data_len /* = NULL */) {
   1196   ASSERT(buffer_len > 0);
   1197 
   1198   StreamResult result;
   1199   size_t count, read_pos, write_pos;
   1200   if (data_len) {
   1201     read_pos = *data_len;
   1202   } else {
   1203     read_pos = 0;
   1204   }
   1205 
   1206   bool end_of_stream = false;
   1207   do {
   1208     // Read until buffer is full, end of stream, or error
   1209     while (!end_of_stream && (read_pos < buffer_len)) {
   1210       result = source->Read(buffer + read_pos, buffer_len - read_pos,
   1211                             &count, NULL);
   1212       if (result == SR_EOS) {
   1213         end_of_stream = true;
   1214       } else if (result != SR_SUCCESS) {
   1215         if (data_len) {
   1216           *data_len = read_pos;
   1217         }
   1218         return result;
   1219       } else {
   1220         read_pos += count;
   1221       }
   1222     }
   1223 
   1224     // Write until buffer is empty, or error (including end of stream)
   1225     write_pos = 0;
   1226     while (write_pos < read_pos) {
   1227       result = sink->Write(buffer + write_pos, read_pos - write_pos,
   1228                            &count, NULL);
   1229       if (result != SR_SUCCESS) {
   1230         if (data_len) {
   1231           *data_len = read_pos - write_pos;
   1232           if (write_pos > 0) {
   1233             memmove(buffer, buffer + write_pos, *data_len);
   1234           }
   1235         }
   1236         return result;
   1237       }
   1238       write_pos += count;
   1239     }
   1240 
   1241     read_pos = 0;
   1242   } while (!end_of_stream);
   1243 
   1244   if (data_len) {
   1245     *data_len = 0;
   1246   }
   1247   return SR_SUCCESS;
   1248 }
   1249 
   1250 ///////////////////////////////////////////////////////////////////////////////
   1251 
   1252 }  // namespace talk_base
   1253