Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2011, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 #if defined(POSIX)
     29 #include <sys/file.h>
     30 #endif  // POSIX
     31 #include <sys/types.h>
     32 #include <sys/stat.h>
     33 #include <errno.h>
     34 #include <string>
     35 #include "talk/base/basictypes.h"
     36 #include "talk/base/common.h"
     37 #include "talk/base/messagequeue.h"
     38 #include "talk/base/stream.h"
     39 #include "talk/base/stringencode.h"
     40 #include "talk/base/stringutils.h"
     41 #include "talk/base/thread.h"
     42 
     43 #ifdef WIN32
     44 #include "talk/base/win32.h"
     45 #define fileno _fileno
     46 #endif
     47 
     48 namespace talk_base {
     49 
     50 ///////////////////////////////////////////////////////////////////////////////
     51 // StreamInterface
     52 ///////////////////////////////////////////////////////////////////////////////
     53 
     54 enum {
     55   MSG_POST_EVENT = 0xF1F1
     56 };
     57 
     58 StreamInterface::~StreamInterface() {
     59 }
     60 
     61 struct PostEventData : public MessageData {
     62   int events, error;
     63   PostEventData(int ev, int er) : events(ev), error(er) { }
     64 };
     65 
     66 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len,
     67                                        size_t* written, int* error) {
     68   StreamResult result = SR_SUCCESS;
     69   size_t total_written = 0, current_written;
     70   while (total_written < data_len) {
     71     result = Write(static_cast<const char*>(data) + total_written,
     72                    data_len - total_written, &current_written, error);
     73     if (result != SR_SUCCESS)
     74       break;
     75     total_written += current_written;
     76   }
     77   if (written)
     78     *written = total_written;
     79   return result;
     80 }
     81 
     82 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len,
     83                                       size_t* read, int* error) {
     84   StreamResult result = SR_SUCCESS;
     85   size_t total_read = 0, current_read;
     86   while (total_read < buffer_len) {
     87     result = Read(static_cast<char*>(buffer) + total_read,
     88                   buffer_len - total_read, &current_read, error);
     89     if (result != SR_SUCCESS)
     90       break;
     91     total_read += current_read;
     92   }
     93   if (read)
     94     *read = total_read;
     95   return result;
     96 }
     97 
     98 StreamResult StreamInterface::ReadLine(std::string* line) {
     99   line->clear();
    100   StreamResult result = SR_SUCCESS;
    101   while (true) {
    102     char ch;
    103     result = Read(&ch, sizeof(ch), NULL, NULL);
    104     if (result != SR_SUCCESS) {
    105       break;
    106     }
    107     if (ch == '\n') {
    108       break;
    109     }
    110     line->push_back(ch);
    111   }
    112   if (!line->empty()) {   // give back the line we've collected so far with
    113     result = SR_SUCCESS;  // a success code.  Otherwise return the last code
    114   }
    115   return result;
    116 }
    117 
    118 void StreamInterface::PostEvent(Thread* t, int events, int err) {
    119   t->Post(this, MSG_POST_EVENT, new PostEventData(events, err));
    120 }
    121 
    122 void StreamInterface::PostEvent(int events, int err) {
    123   PostEvent(Thread::Current(), events, err);
    124 }
    125 
    126 StreamInterface::StreamInterface() {
    127 }
    128 
    129 void StreamInterface::OnMessage(Message* msg) {
    130   if (MSG_POST_EVENT == msg->message_id) {
    131     PostEventData* pe = static_cast<PostEventData*>(msg->pdata);
    132     SignalEvent(this, pe->events, pe->error);
    133     delete msg->pdata;
    134   }
    135 }
    136 
    137 ///////////////////////////////////////////////////////////////////////////////
    138 // StreamAdapterInterface
    139 ///////////////////////////////////////////////////////////////////////////////
    140 
    141 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream,
    142                                                bool owned)
    143     : stream_(stream), owned_(owned) {
    144   if (NULL != stream_)
    145     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    146 }
    147 
    148 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) {
    149   if (NULL != stream_)
    150     stream_->SignalEvent.disconnect(this);
    151   if (owned_)
    152     delete stream_;
    153   stream_ = stream;
    154   owned_ = owned;
    155   if (NULL != stream_)
    156     stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent);
    157 }
    158 
    159 StreamInterface* StreamAdapterInterface::Detach() {
    160   if (NULL != stream_)
    161     stream_->SignalEvent.disconnect(this);
    162   StreamInterface* stream = stream_;
    163   stream_ = NULL;
    164   return stream;
    165 }
    166 
    167 StreamAdapterInterface::~StreamAdapterInterface() {
    168   if (owned_)
    169     delete stream_;
    170 }
    171 
    172 ///////////////////////////////////////////////////////////////////////////////
    173 // StreamTap
    174 ///////////////////////////////////////////////////////////////////////////////
    175 
    176 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap)
    177 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS),
    178   tap_error_(0)
    179 {
    180   AttachTap(tap);
    181 }
    182 
    183 void StreamTap::AttachTap(StreamInterface* tap) {
    184   tap_.reset(tap);
    185 }
    186 
    187 StreamInterface* StreamTap::DetachTap() {
    188   return tap_.release();
    189 }
    190 
    191 StreamResult StreamTap::GetTapResult(int* error) {
    192   if (error) {
    193     *error = tap_error_;
    194   }
    195   return tap_result_;
    196 }
    197 
    198 StreamResult StreamTap::Read(void* buffer, size_t buffer_len,
    199                              size_t* read, int* error) {
    200   size_t backup_read;
    201   if (!read) {
    202     read = &backup_read;
    203   }
    204   StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len,
    205                                                   read, error);
    206   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    207     tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_);
    208   }
    209   return res;
    210 }
    211 
    212 StreamResult StreamTap::Write(const void* data, size_t data_len,
    213                               size_t* written, int* error) {
    214   size_t backup_written;
    215   if (!written) {
    216     written = &backup_written;
    217   }
    218   StreamResult res = StreamAdapterInterface::Write(data, data_len,
    219                                                    written, error);
    220   if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) {
    221     tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_);
    222   }
    223   return res;
    224 }
    225 
    226 ///////////////////////////////////////////////////////////////////////////////
    227 // StreamSegment
    228 ///////////////////////////////////////////////////////////////////////////////
    229 
    230 StreamSegment::StreamSegment(StreamInterface* stream)
    231 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    232   length_(SIZE_UNKNOWN)
    233 {
    234   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    235   stream->GetPosition(&start_);
    236 }
    237 
    238 StreamSegment::StreamSegment(StreamInterface* stream, size_t length)
    239 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0),
    240   length_(length)
    241 {
    242   // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN.
    243   stream->GetPosition(&start_);
    244 }
    245 
    246 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len,
    247                                  size_t* read, int* error)
    248 {
    249   if (SIZE_UNKNOWN != length_) {
    250     if (pos_ >= length_)
    251       return SR_EOS;
    252     buffer_len = _min(buffer_len, length_ - pos_);
    253   }
    254   size_t backup_read;
    255   if (!read) {
    256     read = &backup_read;
    257   }
    258   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len,
    259                                                      read, error);
    260   if (SR_SUCCESS == result) {
    261     pos_ += *read;
    262   }
    263   return result;
    264 }
    265 
    266 bool StreamSegment::SetPosition(size_t position) {
    267   if (SIZE_UNKNOWN == start_)
    268     return false;  // Not seekable
    269   if ((SIZE_UNKNOWN != length_) && (position > length_))
    270     return false;  // Seek past end of segment
    271   if (!StreamAdapterInterface::SetPosition(start_ + position))
    272     return false;
    273   pos_ = position;
    274   return true;
    275 }
    276 
    277 bool StreamSegment::GetPosition(size_t* position) const {
    278   if (SIZE_UNKNOWN == start_)
    279     return false;  // Not seekable
    280   if (!StreamAdapterInterface::GetPosition(position))
    281     return false;
    282   if (position) {
    283     ASSERT(*position >= start_);
    284     *position -= start_;
    285   }
    286   return true;
    287 }
    288 
    289 bool StreamSegment::GetSize(size_t* size) const {
    290   if (!StreamAdapterInterface::GetSize(size))
    291     return false;
    292   if (size) {
    293     if (SIZE_UNKNOWN != start_) {
    294       ASSERT(*size >= start_);
    295       *size -= start_;
    296     }
    297     if (SIZE_UNKNOWN != length_) {
    298       *size = _min(*size, length_);
    299     }
    300   }
    301   return true;
    302 }
    303 
    304 bool StreamSegment::GetAvailable(size_t* size) const {
    305   if (!StreamAdapterInterface::GetAvailable(size))
    306     return false;
    307   if (size && (SIZE_UNKNOWN != length_))
    308     *size = _min(*size, length_ - pos_);
    309   return true;
    310 }
    311 
    312 ///////////////////////////////////////////////////////////////////////////////
    313 // NullStream
    314 ///////////////////////////////////////////////////////////////////////////////
    315 
    316 NullStream::NullStream() {
    317 }
    318 
    319 NullStream::~NullStream() {
    320 }
    321 
    322 StreamState NullStream::GetState() const {
    323   return SS_OPEN;
    324 }
    325 
    326 StreamResult NullStream::Read(void* buffer, size_t buffer_len,
    327                               size_t* read, int* error) {
    328   if (error) *error = -1;
    329   return SR_ERROR;
    330 }
    331 
    332 StreamResult NullStream::Write(const void* data, size_t data_len,
    333                                size_t* written, int* error) {
    334   if (written) *written = data_len;
    335   return SR_SUCCESS;
    336 }
    337 
    338 void NullStream::Close() {
    339 }
    340 
    341 ///////////////////////////////////////////////////////////////////////////////
    342 // FileStream
    343 ///////////////////////////////////////////////////////////////////////////////
    344 
    345 FileStream::FileStream() : file_(NULL) {
    346 }
    347 
    348 FileStream::~FileStream() {
    349   FileStream::Close();
    350 }
    351 
    352 bool FileStream::Open(const std::string& filename, const char* mode) {
    353   Close();
    354 #ifdef WIN32
    355   std::wstring wfilename;
    356   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    357     file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str());
    358   } else {
    359     file_ = NULL;
    360   }
    361 #else
    362   file_ = fopen(filename.c_str(), mode);
    363 #endif
    364   return (file_ != NULL);
    365 }
    366 
    367 bool FileStream::OpenShare(const std::string& filename, const char* mode,
    368                            int shflag) {
    369   Close();
    370 #ifdef WIN32
    371   std::wstring wfilename;
    372   if (Utf8ToWindowsFilename(filename, &wfilename)) {
    373     file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag);
    374   } else {
    375     file_ = NULL;
    376   }
    377 #else
    378   return Open(filename, mode);
    379 #endif
    380   return (file_ != NULL);
    381 }
    382 
    383 bool FileStream::DisableBuffering() {
    384   if (!file_)
    385     return false;
    386   return (setvbuf(file_, NULL, _IONBF, 0) == 0);
    387 }
    388 
    389 StreamState FileStream::GetState() const {
    390   return (file_ == NULL) ? SS_CLOSED : SS_OPEN;
    391 }
    392 
    393 StreamResult FileStream::Read(void* buffer, size_t buffer_len,
    394                               size_t* read, int* error) {
    395   if (!file_)
    396     return SR_EOS;
    397   size_t result = fread(buffer, 1, buffer_len, file_);
    398   if ((result == 0) && (buffer_len > 0)) {
    399     if (feof(file_))
    400       return SR_EOS;
    401     if (error)
    402       *error = errno;
    403     return SR_ERROR;
    404   }
    405   if (read)
    406     *read = result;
    407   return SR_SUCCESS;
    408 }
    409 
    410 StreamResult FileStream::Write(const void* data, size_t data_len,
    411                                size_t* written, int* error) {
    412   if (!file_)
    413     return SR_EOS;
    414   size_t result = fwrite(data, 1, data_len, file_);
    415   if ((result == 0) && (data_len > 0)) {
    416     if (error)
    417       *error = errno;
    418     return SR_ERROR;
    419   }
    420   if (written)
    421     *written = result;
    422   return SR_SUCCESS;
    423 }
    424 
    425 void FileStream::Close() {
    426   if (file_) {
    427     DoClose();
    428     file_ = NULL;
    429   }
    430 }
    431 
    432 bool FileStream::SetPosition(size_t position) {
    433   if (!file_)
    434     return false;
    435   return (fseek(file_, position, SEEK_SET) == 0);
    436 }
    437 
    438 bool FileStream::GetPosition(size_t* position) const {
    439   ASSERT(NULL != position);
    440   if (!file_)
    441     return false;
    442   long result = ftell(file_);
    443   if (result < 0)
    444     return false;
    445   if (position)
    446     *position = result;
    447   return true;
    448 }
    449 
    450 bool FileStream::GetSize(size_t* size) const {
    451   ASSERT(NULL != size);
    452   if (!file_)
    453     return false;
    454   struct stat file_stats;
    455   if (fstat(fileno(file_), &file_stats) != 0)
    456     return false;
    457   if (size)
    458     *size = file_stats.st_size;
    459   return true;
    460 }
    461 
    462 bool FileStream::GetAvailable(size_t* size) const {
    463   ASSERT(NULL != size);
    464   if (!GetSize(size))
    465     return false;
    466   long result = ftell(file_);
    467   if (result < 0)
    468     return false;
    469   if (size)
    470     *size -= result;
    471   return true;
    472 }
    473 
    474 bool FileStream::ReserveSize(size_t size) {
    475   // TODO: extend the file to the proper length
    476   return true;
    477 }
    478 
    479 bool FileStream::GetSize(const std::string& filename, size_t* size) {
    480   struct stat file_stats;
    481   if (stat(filename.c_str(), &file_stats) != 0)
    482     return false;
    483   *size = file_stats.st_size;
    484   return true;
    485 }
    486 
    487 bool FileStream::Flush() {
    488   if (file_) {
    489     return (0 == fflush(file_));
    490   }
    491   // try to flush empty file?
    492   ASSERT(false);
    493   return false;
    494 }
    495 
    496 #if defined(POSIX)
    497 
    498 bool FileStream::TryLock() {
    499   if (file_ == NULL) {
    500     // Stream not open.
    501     ASSERT(false);
    502     return false;
    503   }
    504 
    505   return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0;
    506 }
    507 
    508 bool FileStream::Unlock() {
    509   if (file_ == NULL) {
    510     // Stream not open.
    511     ASSERT(false);
    512     return false;
    513   }
    514 
    515   return flock(fileno(file_), LOCK_UN) == 0;
    516 }
    517 
    518 #endif
    519 
    520 void FileStream::DoClose() {
    521   fclose(file_);
    522 }
    523 
    524 #ifdef POSIX
    525 
    526 // Have to identically rewrite the FileStream destructor or else it would call
    527 // the base class's Close() instead of the sub-class's.
    528 POpenStream::~POpenStream() {
    529   POpenStream::Close();
    530 }
    531 
    532 bool POpenStream::Open(const std::string& subcommand, const char* mode) {
    533   Close();
    534   file_ = popen(subcommand.c_str(), mode);
    535   return file_ != NULL;
    536 }
    537 
    538 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode,
    539                             int shflag) {
    540   return Open(subcommand, mode);
    541 }
    542 
    543 void POpenStream::DoClose() {
    544   wait_status_ = pclose(file_);
    545 }
    546 
    547 #endif
    548 
    549 ///////////////////////////////////////////////////////////////////////////////
    550 // MemoryStream
    551 ///////////////////////////////////////////////////////////////////////////////
    552 
    553 MemoryStreamBase::MemoryStreamBase()
    554   : buffer_(NULL), buffer_length_(0), data_length_(0),
    555     seek_position_(0) {
    556 }
    557 
    558 StreamState MemoryStreamBase::GetState() const {
    559   return SS_OPEN;
    560 }
    561 
    562 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes,
    563                                     size_t* bytes_read, int* error) {
    564   if (seek_position_ >= data_length_) {
    565     return SR_EOS;
    566   }
    567   size_t available = data_length_ - seek_position_;
    568   if (bytes > available) {
    569     // Read partial buffer
    570     bytes = available;
    571   }
    572   memcpy(buffer, &buffer_[seek_position_], bytes);
    573   seek_position_ += bytes;
    574   if (bytes_read) {
    575     *bytes_read = bytes;
    576   }
    577   return SR_SUCCESS;
    578 }
    579 
    580 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes,
    581                                      size_t* bytes_written, int* error) {
    582   size_t available = buffer_length_ - seek_position_;
    583   if (0 == available) {
    584     // Increase buffer size to the larger of:
    585     // a) new position rounded up to next 256 bytes
    586     // b) double the previous length
    587     size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1,
    588                                     buffer_length_ * 2);
    589     StreamResult result = DoReserve(new_buffer_length, error);
    590     if (SR_SUCCESS != result) {
    591       return result;
    592     }
    593     ASSERT(buffer_length_ >= new_buffer_length);
    594     available = buffer_length_ - seek_position_;
    595   }
    596 
    597   if (bytes > available) {
    598     bytes = available;
    599   }
    600   memcpy(&buffer_[seek_position_], buffer, bytes);
    601   seek_position_ += bytes;
    602   if (data_length_ < seek_position_) {
    603     data_length_ = seek_position_;
    604   }
    605   if (bytes_written) {
    606     *bytes_written = bytes;
    607   }
    608   return SR_SUCCESS;
    609 }
    610 
    611 void MemoryStreamBase::Close() {
    612   // nothing to do
    613 }
    614 
    615 bool MemoryStreamBase::SetPosition(size_t position) {
    616   if (position > data_length_)
    617     return false;
    618   seek_position_ = position;
    619   return true;
    620 }
    621 
    622 bool MemoryStreamBase::GetPosition(size_t *position) const {
    623   if (position)
    624     *position = seek_position_;
    625   return true;
    626 }
    627 
    628 bool MemoryStreamBase::GetSize(size_t *size) const {
    629   if (size)
    630     *size = data_length_;
    631   return true;
    632 }
    633 
    634 bool MemoryStreamBase::GetAvailable(size_t *size) const {
    635   if (size)
    636     *size = data_length_ - seek_position_;
    637   return true;
    638 }
    639 
    640 bool MemoryStreamBase::ReserveSize(size_t size) {
    641   return (SR_SUCCESS == DoReserve(size, NULL));
    642 }
    643 
    644 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) {
    645   return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS;
    646 }
    647 
    648 ///////////////////////////////////////////////////////////////////////////////
    649 
    650 MemoryStream::MemoryStream()
    651   : buffer_alloc_(NULL) {
    652 }
    653 
    654 MemoryStream::MemoryStream(const char* data)
    655   : buffer_alloc_(NULL) {
    656   SetData(data, strlen(data));
    657 }
    658 
    659 MemoryStream::MemoryStream(const void* data, size_t length)
    660   : buffer_alloc_(NULL) {
    661   SetData(data, length);
    662 }
    663 
    664 MemoryStream::~MemoryStream() {
    665   delete [] buffer_alloc_;
    666 }
    667 
    668 void MemoryStream::SetData(const void* data, size_t length) {
    669   data_length_ = buffer_length_ = length;
    670   delete [] buffer_alloc_;
    671   buffer_alloc_ = new char[buffer_length_ + kAlignment];
    672   buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment));
    673   memcpy(buffer_, data, data_length_);
    674   seek_position_ = 0;
    675 }
    676 
    677 StreamResult MemoryStream::DoReserve(size_t size, int* error) {
    678   if (buffer_length_ >= size)
    679     return SR_SUCCESS;
    680 
    681   if (char* new_buffer_alloc = new char[size + kAlignment]) {
    682     char* new_buffer = reinterpret_cast<char*>(
    683         ALIGNP(new_buffer_alloc, kAlignment));
    684     memcpy(new_buffer, buffer_, data_length_);
    685     delete [] buffer_alloc_;
    686     buffer_alloc_ = new_buffer_alloc;
    687     buffer_ = new_buffer;
    688     buffer_length_ = size;
    689     return SR_SUCCESS;
    690   }
    691 
    692   if (error) {
    693     *error = ENOMEM;
    694   }
    695   return SR_ERROR;
    696 }
    697 
    698 ///////////////////////////////////////////////////////////////////////////////
    699 
    700 ExternalMemoryStream::ExternalMemoryStream() {
    701 }
    702 
    703 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) {
    704   SetData(data, length);
    705 }
    706 
    707 ExternalMemoryStream::~ExternalMemoryStream() {
    708 }
    709 
    710 void ExternalMemoryStream::SetData(void* data, size_t length) {
    711   data_length_ = buffer_length_ = length;
    712   buffer_ = static_cast<char*>(data);
    713   seek_position_ = 0;
    714 }
    715 
    716 ///////////////////////////////////////////////////////////////////////////////
    717 // FifoBuffer
    718 ///////////////////////////////////////////////////////////////////////////////
    719 
    720 FifoBuffer::FifoBuffer(size_t size)
    721     : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size),
    722       data_length_(0), read_position_(0), owner_(Thread::Current()) {
    723   // all events are done on the owner_ thread
    724 }
    725 
    726 FifoBuffer::~FifoBuffer() {
    727 }
    728 
    729 bool FifoBuffer::GetBuffered(size_t* size) const {
    730   CritScope cs(&crit_);
    731   *size = data_length_;
    732   return true;
    733 }
    734 
    735 bool FifoBuffer::SetCapacity(size_t size) {
    736   CritScope cs(&crit_);
    737   if (data_length_ > size) {
    738     return false;
    739   }
    740 
    741   if (size != buffer_length_) {
    742     char* buffer = new char[size];
    743     const size_t copy = data_length_;
    744     const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
    745     memcpy(buffer, &buffer_[read_position_], tail_copy);
    746     memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy);
    747     buffer_.reset(buffer);
    748     read_position_ = 0;
    749     buffer_length_ = size;
    750   }
    751   return true;
    752 }
    753 
    754 StreamState FifoBuffer::GetState() const {
    755   return state_;
    756 }
    757 
    758 StreamResult FifoBuffer::Read(void* buffer, size_t bytes,
    759                               size_t* bytes_read, int* error) {
    760   CritScope cs(&crit_);
    761   const size_t available = data_length_;
    762   if (0 == available) {
    763     return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS;
    764   }
    765 
    766   const bool was_writable = data_length_ < buffer_length_;
    767   const size_t copy = _min(bytes, available);
    768   const size_t tail_copy = _min(copy, buffer_length_ - read_position_);
    769   char* const p = static_cast<char*>(buffer);
    770   memcpy(p, &buffer_[read_position_], tail_copy);
    771   memcpy(p + tail_copy, &buffer_[0], copy - tail_copy);
    772   read_position_ = (read_position_ + copy) % buffer_length_;
    773   data_length_ -= copy;
    774   if (bytes_read) {
    775     *bytes_read = copy;
    776   }
    777   // if we were full before, and now we're not, post an event
    778   if (!was_writable && copy > 0) {
    779     PostEvent(owner_, SE_WRITE, 0);
    780   }
    781 
    782   return SR_SUCCESS;
    783 }
    784 
    785 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes,
    786                                size_t* bytes_written, int* error) {
    787   CritScope cs(&crit_);
    788   if (state_ == SS_CLOSED) {
    789     return SR_EOS;
    790   }
    791 
    792   const size_t available = buffer_length_ - data_length_;
    793   if (0 == available) {
    794     return SR_BLOCK;
    795   }
    796 
    797   const bool was_readable = (data_length_ > 0);
    798   const size_t write_position = (read_position_ + data_length_)
    799       % buffer_length_;
    800   const size_t copy = _min(bytes, available);
    801   const size_t tail_copy = _min(copy, buffer_length_ - write_position);
    802   const char* const p = static_cast<const char*>(buffer);
    803   memcpy(&buffer_[write_position], p, tail_copy);
    804   memcpy(&buffer_[0], p + tail_copy, copy - tail_copy);
    805   data_length_ += copy;
    806   if (bytes_written) {
    807     *bytes_written = copy;
    808   }
    809   // if we didn't have any data to read before, and now we do, post an event
    810   if (!was_readable && copy > 0) {
    811     PostEvent(owner_, SE_READ, 0);
    812   }
    813 
    814   return SR_SUCCESS;
    815 }
    816 
    817 void FifoBuffer::Close() {
    818   CritScope cs(&crit_);
    819   state_ = SS_CLOSED;
    820 }
    821 
    822 const void* FifoBuffer::GetReadData(size_t* size) {
    823   CritScope cs(&crit_);
    824   *size = (read_position_ + data_length_ <= buffer_length_) ?
    825       data_length_ : buffer_length_ - read_position_;
    826   return &buffer_[read_position_];
    827 }
    828 
    829 void FifoBuffer::ConsumeReadData(size_t size) {
    830   CritScope cs(&crit_);
    831   ASSERT(size <= data_length_);
    832   const bool was_writable = data_length_ < buffer_length_;
    833   read_position_ = (read_position_ + size) % buffer_length_;
    834   data_length_ -= size;
    835   if (!was_writable && size > 0) {
    836     PostEvent(owner_, SE_WRITE, 0);
    837   }
    838 }
    839 
    840 void* FifoBuffer::GetWriteBuffer(size_t* size) {
    841   CritScope cs(&crit_);
    842   if (state_ == SS_CLOSED) {
    843     return NULL;
    844   }
    845 
    846   // if empty, reset the write position to the beginning, so we can get
    847   // the biggest possible block
    848   if (data_length_ == 0) {
    849     read_position_ = 0;
    850   }
    851 
    852   const size_t write_position = (read_position_ + data_length_)
    853       % buffer_length_;
    854   *size = (write_position >= read_position_) ?
    855       buffer_length_ - write_position : read_position_ - write_position;
    856   return &buffer_[write_position];
    857 }
    858 
    859 void FifoBuffer::ConsumeWriteBuffer(size_t size) {
    860   CritScope cs(&crit_);
    861   ASSERT(size <= buffer_length_ - data_length_);
    862   const bool was_readable = (data_length_ > 0);
    863   data_length_ += size;
    864   if (!was_readable && size > 0) {
    865     PostEvent(owner_, SE_READ, 0);
    866   }
    867 }
    868 
    869 ///////////////////////////////////////////////////////////////////////////////
    870 // LoggingAdapter
    871 ///////////////////////////////////////////////////////////////////////////////
    872 
    873 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level,
    874                                const std::string& label, bool hex_mode)
    875 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode)
    876 {
    877   set_label(label);
    878 }
    879 
    880 void LoggingAdapter::set_label(const std::string& label) {
    881   label_.assign("[");
    882   label_.append(label);
    883   label_.append("]");
    884 }
    885 
    886 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len,
    887                                   size_t* read, int* error) {
    888   size_t local_read; if (!read) read = &local_read;
    889   StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read,
    890                                                      error);
    891   if (result == SR_SUCCESS) {
    892     LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_);
    893   }
    894   return result;
    895 }
    896 
    897 StreamResult LoggingAdapter::Write(const void* data, size_t data_len,
    898                                    size_t* written, int* error) {
    899   size_t local_written; if (!written) written = &local_written;
    900   StreamResult result = StreamAdapterInterface::Write(data, data_len, written,
    901                                                       error);
    902   if (result == SR_SUCCESS) {
    903     LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_,
    904                  &lms_);
    905   }
    906   return result;
    907 }
    908 
    909 void LoggingAdapter::Close() {
    910   LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
    911   LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
    912   LOG_V(level_) << label_ << " Closed locally";
    913   StreamAdapterInterface::Close();
    914 }
    915 
    916 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) {
    917   if (events & SE_OPEN) {
    918     LOG_V(level_) << label_ << " Open";
    919   } else if (events & SE_CLOSE) {
    920     LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_);
    921     LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_);
    922     LOG_V(level_) << label_ << " Closed with error: " << err;
    923   }
    924   StreamAdapterInterface::OnEvent(stream, events, err);
    925 }
    926 
    927 ///////////////////////////////////////////////////////////////////////////////
    928 // StringStream - Reads/Writes to an external std::string
    929 ///////////////////////////////////////////////////////////////////////////////
    930 
    931 StringStream::StringStream(std::string& str)
    932 : str_(str), read_pos_(0), read_only_(false)
    933 {
    934 }
    935 
    936 StringStream::StringStream(const std::string& str)
    937 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true)
    938 {
    939 }
    940 
    941 StreamState StringStream::GetState() const {
    942   return SS_OPEN;
    943 }
    944 
    945 StreamResult StringStream::Read(void* buffer, size_t buffer_len,
    946                                       size_t* read, int* error) {
    947   size_t available = _min(buffer_len, str_.size() - read_pos_);
    948   if (!available)
    949     return SR_EOS;
    950   memcpy(buffer, str_.data() + read_pos_, available);
    951   read_pos_ += available;
    952   if (read)
    953     *read = available;
    954   return SR_SUCCESS;
    955 }
    956 
    957 StreamResult StringStream::Write(const void* data, size_t data_len,
    958                                       size_t* written, int* error) {
    959   if (read_only_) {
    960     if (error) {
    961       *error = -1;
    962     }
    963     return SR_ERROR;
    964   }
    965   str_.append(static_cast<const char*>(data),
    966               static_cast<const char*>(data) + data_len);
    967   if (written)
    968     *written = data_len;
    969   return SR_SUCCESS;
    970 }
    971 
    972 void StringStream::Close() {
    973 }
    974 
    975 bool StringStream::SetPosition(size_t position) {
    976   if (position > str_.size())
    977     return false;
    978   read_pos_ = position;
    979   return true;
    980 }
    981 
    982 bool StringStream::GetPosition(size_t* position) const {
    983   if (position)
    984     *position = read_pos_;
    985   return true;
    986 }
    987 
    988 bool StringStream::GetSize(size_t* size) const {
    989   if (size)
    990     *size = str_.size();
    991   return true;
    992 }
    993 
    994 bool StringStream::GetAvailable(size_t* size) const {
    995   if (size)
    996     *size = str_.size() - read_pos_;
    997   return true;
    998 }
    999 
   1000 bool StringStream::ReserveSize(size_t size) {
   1001   if (read_only_)
   1002     return false;
   1003   str_.reserve(size);
   1004   return true;
   1005 }
   1006 
   1007 ///////////////////////////////////////////////////////////////////////////////
   1008 // StreamReference
   1009 ///////////////////////////////////////////////////////////////////////////////
   1010 
   1011 StreamReference::StreamReference(StreamInterface* stream)
   1012     : StreamAdapterInterface(stream, false) {
   1013   // owner set to false so the destructor does not free the stream.
   1014   stream_ref_count_ = new StreamRefCount(stream);
   1015 }
   1016 
   1017 StreamInterface* StreamReference::NewReference() {
   1018   stream_ref_count_->AddReference();
   1019   return new StreamReference(stream_ref_count_, stream());
   1020 }
   1021 
   1022 StreamReference::~StreamReference() {
   1023   stream_ref_count_->Release();
   1024 }
   1025 
   1026 StreamReference::StreamReference(StreamRefCount* stream_ref_count,
   1027                                  StreamInterface* stream)
   1028     : StreamAdapterInterface(stream, false),
   1029       stream_ref_count_(stream_ref_count) {
   1030 }
   1031 
   1032 ///////////////////////////////////////////////////////////////////////////////
   1033 
   1034 StreamResult Flow(StreamInterface* source,
   1035                   char* buffer, size_t buffer_len,
   1036                   StreamInterface* sink,
   1037                   size_t* data_len /* = NULL */) {
   1038   ASSERT(buffer_len > 0);
   1039 
   1040   StreamResult result;
   1041   size_t count, read_pos, write_pos;
   1042   if (data_len) {
   1043     read_pos = *data_len;
   1044   } else {
   1045     read_pos = 0;
   1046   }
   1047 
   1048   bool end_of_stream = false;
   1049   do {
   1050     // Read until buffer is full, end of stream, or error
   1051     while (!end_of_stream && (read_pos < buffer_len)) {
   1052       result = source->Read(buffer + read_pos, buffer_len - read_pos,
   1053                             &count, NULL);
   1054       if (result == SR_EOS) {
   1055         end_of_stream = true;
   1056       } else if (result != SR_SUCCESS) {
   1057         if (data_len) {
   1058           *data_len = read_pos;
   1059         }
   1060         return result;
   1061       } else {
   1062         read_pos += count;
   1063       }
   1064     }
   1065 
   1066     // Write until buffer is empty, or error (including end of stream)
   1067     write_pos = 0;
   1068     while (write_pos < read_pos) {
   1069       result = sink->Write(buffer + write_pos, read_pos - write_pos,
   1070                            &count, NULL);
   1071       if (result != SR_SUCCESS) {
   1072         if (data_len) {
   1073           *data_len = read_pos - write_pos;
   1074           if (write_pos > 0) {
   1075             memmove(buffer, buffer + write_pos, *data_len);
   1076           }
   1077         }
   1078         return result;
   1079       }
   1080       write_pos += count;
   1081     }
   1082 
   1083     read_pos = 0;
   1084   } while (!end_of_stream);
   1085 
   1086   if (data_len) {
   1087     *data_len = 0;
   1088   }
   1089   return SR_SUCCESS;
   1090 }
   1091 
   1092 ///////////////////////////////////////////////////////////////////////////////
   1093 
   1094 } // namespace talk_base
   1095