1 /* 2 * libjingle 3 * Copyright 2004 Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #if defined(POSIX) 29 #include <sys/file.h> 30 #endif // POSIX 31 #include <sys/types.h> 32 #include <sys/stat.h> 33 #include <errno.h> 34 #include <string> 35 #include "talk/base/basictypes.h" 36 #include "talk/base/common.h" 37 #include "talk/base/logging.h" 38 #include "talk/base/messagequeue.h" 39 #include "talk/base/stream.h" 40 #include "talk/base/stringencode.h" 41 #include "talk/base/stringutils.h" 42 #include "talk/base/thread.h" 43 #include "talk/base/timeutils.h" 44 45 #ifdef WIN32 46 #include "talk/base/win32.h" 47 #define fileno _fileno 48 #endif 49 50 namespace talk_base { 51 52 /////////////////////////////////////////////////////////////////////////////// 53 // StreamInterface 54 /////////////////////////////////////////////////////////////////////////////// 55 StreamInterface::~StreamInterface() { 56 } 57 58 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 59 size_t* written, int* error) { 60 StreamResult result = SR_SUCCESS; 61 size_t total_written = 0, current_written; 62 while (total_written < data_len) { 63 result = Write(static_cast<const char*>(data) + total_written, 64 data_len - total_written, ¤t_written, error); 65 if (result != SR_SUCCESS) 66 break; 67 total_written += current_written; 68 } 69 if (written) 70 *written = total_written; 71 return result; 72 } 73 74 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 75 size_t* read, int* error) { 76 StreamResult result = SR_SUCCESS; 77 size_t total_read = 0, current_read; 78 while (total_read < buffer_len) { 79 result = Read(static_cast<char*>(buffer) + total_read, 80 buffer_len - total_read, ¤t_read, error); 81 if (result != SR_SUCCESS) 82 break; 83 total_read += current_read; 84 } 85 if (read) 86 *read = total_read; 87 return result; 88 } 89 90 StreamResult StreamInterface::ReadLine(std::string* line) { 91 line->clear(); 92 StreamResult result = SR_SUCCESS; 93 while (true) { 94 char ch; 95 result = Read(&ch, sizeof(ch), NULL, NULL); 96 if (result != SR_SUCCESS) { 97 break; 98 } 99 if (ch == '\n') { 100 break; 101 } 102 line->push_back(ch); 103 } 104 if (!line->empty()) { // give back the line we've collected so far with 105 result = SR_SUCCESS; // a success code. Otherwise return the last code 106 } 107 return result; 108 } 109 110 void StreamInterface::PostEvent(Thread* t, int events, int err) { 111 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err)); 112 } 113 114 void StreamInterface::PostEvent(int events, int err) { 115 PostEvent(Thread::Current(), events, err); 116 } 117 118 StreamInterface::StreamInterface() { 119 } 120 121 void StreamInterface::OnMessage(Message* msg) { 122 if (MSG_POST_EVENT == msg->message_id) { 123 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); 124 SignalEvent(this, pe->events, pe->error); 125 delete msg->pdata; 126 } 127 } 128 129 /////////////////////////////////////////////////////////////////////////////// 130 // StreamAdapterInterface 131 /////////////////////////////////////////////////////////////////////////////// 132 133 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 134 bool owned) 135 : stream_(stream), owned_(owned) { 136 if (NULL != stream_) 137 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 138 } 139 140 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 141 if (NULL != stream_) 142 stream_->SignalEvent.disconnect(this); 143 if (owned_) 144 delete stream_; 145 stream_ = stream; 146 owned_ = owned; 147 if (NULL != stream_) 148 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 149 } 150 151 StreamInterface* StreamAdapterInterface::Detach() { 152 if (NULL != stream_) 153 stream_->SignalEvent.disconnect(this); 154 StreamInterface* stream = stream_; 155 stream_ = NULL; 156 return stream; 157 } 158 159 StreamAdapterInterface::~StreamAdapterInterface() { 160 if (owned_) 161 delete stream_; 162 } 163 164 /////////////////////////////////////////////////////////////////////////////// 165 // StreamTap 166 /////////////////////////////////////////////////////////////////////////////// 167 168 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 169 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS), 170 tap_error_(0) { 171 AttachTap(tap); 172 } 173 174 void StreamTap::AttachTap(StreamInterface* tap) { 175 tap_.reset(tap); 176 } 177 178 StreamInterface* StreamTap::DetachTap() { 179 return tap_.release(); 180 } 181 182 StreamResult StreamTap::GetTapResult(int* error) { 183 if (error) { 184 *error = tap_error_; 185 } 186 return tap_result_; 187 } 188 189 StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 190 size_t* read, int* error) { 191 size_t backup_read; 192 if (!read) { 193 read = &backup_read; 194 } 195 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 196 read, error); 197 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 198 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 199 } 200 return res; 201 } 202 203 StreamResult StreamTap::Write(const void* data, size_t data_len, 204 size_t* written, int* error) { 205 size_t backup_written; 206 if (!written) { 207 written = &backup_written; 208 } 209 StreamResult res = StreamAdapterInterface::Write(data, data_len, 210 written, error); 211 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 212 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 213 } 214 return res; 215 } 216 217 /////////////////////////////////////////////////////////////////////////////// 218 // StreamSegment 219 /////////////////////////////////////////////////////////////////////////////// 220 221 StreamSegment::StreamSegment(StreamInterface* stream) 222 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 223 length_(SIZE_UNKNOWN) { 224 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 225 stream->GetPosition(&start_); 226 } 227 228 StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 229 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 230 length_(length) { 231 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 232 stream->GetPosition(&start_); 233 } 234 235 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 236 size_t* read, int* error) { 237 if (SIZE_UNKNOWN != length_) { 238 if (pos_ >= length_) 239 return SR_EOS; 240 buffer_len = _min(buffer_len, length_ - pos_); 241 } 242 size_t backup_read; 243 if (!read) { 244 read = &backup_read; 245 } 246 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 247 read, error); 248 if (SR_SUCCESS == result) { 249 pos_ += *read; 250 } 251 return result; 252 } 253 254 bool StreamSegment::SetPosition(size_t position) { 255 if (SIZE_UNKNOWN == start_) 256 return false; // Not seekable 257 if ((SIZE_UNKNOWN != length_) && (position > length_)) 258 return false; // Seek past end of segment 259 if (!StreamAdapterInterface::SetPosition(start_ + position)) 260 return false; 261 pos_ = position; 262 return true; 263 } 264 265 bool StreamSegment::GetPosition(size_t* position) const { 266 if (SIZE_UNKNOWN == start_) 267 return false; // Not seekable 268 if (!StreamAdapterInterface::GetPosition(position)) 269 return false; 270 if (position) { 271 ASSERT(*position >= start_); 272 *position -= start_; 273 } 274 return true; 275 } 276 277 bool StreamSegment::GetSize(size_t* size) const { 278 if (!StreamAdapterInterface::GetSize(size)) 279 return false; 280 if (size) { 281 if (SIZE_UNKNOWN != start_) { 282 ASSERT(*size >= start_); 283 *size -= start_; 284 } 285 if (SIZE_UNKNOWN != length_) { 286 *size = _min(*size, length_); 287 } 288 } 289 return true; 290 } 291 292 bool StreamSegment::GetAvailable(size_t* size) const { 293 if (!StreamAdapterInterface::GetAvailable(size)) 294 return false; 295 if (size && (SIZE_UNKNOWN != length_)) 296 *size = _min(*size, length_ - pos_); 297 return true; 298 } 299 300 /////////////////////////////////////////////////////////////////////////////// 301 // NullStream 302 /////////////////////////////////////////////////////////////////////////////// 303 304 NullStream::NullStream() { 305 } 306 307 NullStream::~NullStream() { 308 } 309 310 StreamState NullStream::GetState() const { 311 return SS_OPEN; 312 } 313 314 StreamResult NullStream::Read(void* buffer, size_t buffer_len, 315 size_t* read, int* error) { 316 if (error) *error = -1; 317 return SR_ERROR; 318 } 319 320 StreamResult NullStream::Write(const void* data, size_t data_len, 321 size_t* written, int* error) { 322 if (written) *written = data_len; 323 return SR_SUCCESS; 324 } 325 326 void NullStream::Close() { 327 } 328 329 /////////////////////////////////////////////////////////////////////////////// 330 // FileStream 331 /////////////////////////////////////////////////////////////////////////////// 332 333 FileStream::FileStream() : file_(NULL) { 334 } 335 336 FileStream::~FileStream() { 337 FileStream::Close(); 338 } 339 340 bool FileStream::Open(const std::string& filename, const char* mode, 341 int* error) { 342 Close(); 343 #ifdef WIN32 344 std::wstring wfilename; 345 if (Utf8ToWindowsFilename(filename, &wfilename)) { 346 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 347 } else { 348 if (error) { 349 *error = -1; 350 return false; 351 } 352 } 353 #else 354 file_ = fopen(filename.c_str(), mode); 355 #endif 356 if (!file_ && error) { 357 *error = errno; 358 } 359 return (file_ != NULL); 360 } 361 362 bool FileStream::OpenShare(const std::string& filename, const char* mode, 363 int shflag, int* error) { 364 Close(); 365 #ifdef WIN32 366 std::wstring wfilename; 367 if (Utf8ToWindowsFilename(filename, &wfilename)) { 368 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 369 if (!file_ && error) { 370 *error = errno; 371 return false; 372 } 373 return file_ != NULL; 374 } else { 375 if (error) { 376 *error = -1; 377 } 378 return false; 379 } 380 #else 381 return Open(filename, mode, error); 382 #endif 383 } 384 385 bool FileStream::DisableBuffering() { 386 if (!file_) 387 return false; 388 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 389 } 390 391 StreamState FileStream::GetState() const { 392 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 393 } 394 395 StreamResult FileStream::Read(void* buffer, size_t buffer_len, 396 size_t* read, int* error) { 397 if (!file_) 398 return SR_EOS; 399 size_t result = fread(buffer, 1, buffer_len, file_); 400 if ((result == 0) && (buffer_len > 0)) { 401 if (feof(file_)) 402 return SR_EOS; 403 if (error) 404 *error = errno; 405 return SR_ERROR; 406 } 407 if (read) 408 *read = result; 409 return SR_SUCCESS; 410 } 411 412 StreamResult FileStream::Write(const void* data, size_t data_len, 413 size_t* written, int* error) { 414 if (!file_) 415 return SR_EOS; 416 size_t result = fwrite(data, 1, data_len, file_); 417 if ((result == 0) && (data_len > 0)) { 418 if (error) 419 *error = errno; 420 return SR_ERROR; 421 } 422 if (written) 423 *written = result; 424 return SR_SUCCESS; 425 } 426 427 void FileStream::Close() { 428 if (file_) { 429 DoClose(); 430 file_ = NULL; 431 } 432 } 433 434 bool FileStream::SetPosition(size_t position) { 435 if (!file_) 436 return false; 437 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); 438 } 439 440 bool FileStream::GetPosition(size_t* position) const { 441 ASSERT(NULL != position); 442 if (!file_) 443 return false; 444 long result = ftell(file_); 445 if (result < 0) 446 return false; 447 if (position) 448 *position = result; 449 return true; 450 } 451 452 bool FileStream::GetSize(size_t* size) const { 453 ASSERT(NULL != size); 454 if (!file_) 455 return false; 456 struct stat file_stats; 457 if (fstat(fileno(file_), &file_stats) != 0) 458 return false; 459 if (size) 460 *size = file_stats.st_size; 461 return true; 462 } 463 464 bool FileStream::GetAvailable(size_t* size) const { 465 ASSERT(NULL != size); 466 if (!GetSize(size)) 467 return false; 468 long result = ftell(file_); 469 if (result < 0) 470 return false; 471 if (size) 472 *size -= result; 473 return true; 474 } 475 476 bool FileStream::ReserveSize(size_t size) { 477 // TODO: extend the file to the proper length 478 return true; 479 } 480 481 bool FileStream::GetSize(const std::string& filename, size_t* size) { 482 struct stat file_stats; 483 if (stat(filename.c_str(), &file_stats) != 0) 484 return false; 485 *size = file_stats.st_size; 486 return true; 487 } 488 489 bool FileStream::Flush() { 490 if (file_) { 491 return (0 == fflush(file_)); 492 } 493 // try to flush empty file? 494 ASSERT(false); 495 return false; 496 } 497 498 #if defined(POSIX) 499 500 bool FileStream::TryLock() { 501 if (file_ == NULL) { 502 // Stream not open. 503 ASSERT(false); 504 return false; 505 } 506 507 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 508 } 509 510 bool FileStream::Unlock() { 511 if (file_ == NULL) { 512 // Stream not open. 513 ASSERT(false); 514 return false; 515 } 516 517 return flock(fileno(file_), LOCK_UN) == 0; 518 } 519 520 #endif 521 522 void FileStream::DoClose() { 523 fclose(file_); 524 } 525 526 AsyncWriteStream::~AsyncWriteStream() { 527 write_thread_->Clear(this, 0, NULL); 528 ClearBufferAndWrite(); 529 530 CritScope cs(&crit_stream_); 531 stream_.reset(); 532 } 533 534 // This is needed by some stream writers, such as RtpDumpWriter. 535 bool AsyncWriteStream::GetPosition(size_t* position) const { 536 CritScope cs(&crit_stream_); 537 return stream_->GetPosition(position); 538 } 539 540 // This is needed by some stream writers, such as the plugin log writers. 541 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len, 542 size_t* read, int* error) { 543 CritScope cs(&crit_stream_); 544 return stream_->Read(buffer, buffer_len, read, error); 545 } 546 547 void AsyncWriteStream::Close() { 548 if (state_ == SS_CLOSED) { 549 return; 550 } 551 552 write_thread_->Clear(this, 0, NULL); 553 ClearBufferAndWrite(); 554 555 CritScope cs(&crit_stream_); 556 stream_->Close(); 557 state_ = SS_CLOSED; 558 } 559 560 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len, 561 size_t* written, int* error) { 562 if (state_ == SS_CLOSED) { 563 return SR_ERROR; 564 } 565 566 size_t previous_buffer_length = 0; 567 { 568 CritScope cs(&crit_buffer_); 569 previous_buffer_length = buffer_.length(); 570 buffer_.AppendData(data, data_len); 571 } 572 573 if (previous_buffer_length == 0) { 574 // If there's stuff already in the buffer, then we already called 575 // Post and the write_thread_ hasn't pulled it out yet, so we 576 // don't need to re-Post. 577 write_thread_->Post(this, 0, NULL); 578 } 579 // Return immediately, assuming that it works. 580 if (written) { 581 *written = data_len; 582 } 583 return SR_SUCCESS; 584 } 585 586 void AsyncWriteStream::OnMessage(talk_base::Message* pmsg) { 587 ClearBufferAndWrite(); 588 } 589 590 bool AsyncWriteStream::Flush() { 591 if (state_ == SS_CLOSED) { 592 return false; 593 } 594 595 ClearBufferAndWrite(); 596 597 CritScope cs(&crit_stream_); 598 return stream_->Flush(); 599 } 600 601 void AsyncWriteStream::ClearBufferAndWrite() { 602 Buffer to_write; 603 { 604 CritScope cs_buffer(&crit_buffer_); 605 buffer_.TransferTo(&to_write); 606 } 607 608 if (to_write.length() > 0) { 609 CritScope cs(&crit_stream_); 610 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL); 611 } 612 } 613 614 #ifdef POSIX 615 616 // Have to identically rewrite the FileStream destructor or else it would call 617 // the base class's Close() instead of the sub-class's. 618 POpenStream::~POpenStream() { 619 POpenStream::Close(); 620 } 621 622 bool POpenStream::Open(const std::string& subcommand, 623 const char* mode, 624 int* error) { 625 Close(); 626 file_ = popen(subcommand.c_str(), mode); 627 if (file_ == NULL) { 628 if (error) 629 *error = errno; 630 return false; 631 } 632 return true; 633 } 634 635 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 636 int shflag, int* error) { 637 return Open(subcommand, mode, error); 638 } 639 640 void POpenStream::DoClose() { 641 wait_status_ = pclose(file_); 642 } 643 644 #endif 645 646 /////////////////////////////////////////////////////////////////////////////// 647 // MemoryStream 648 /////////////////////////////////////////////////////////////////////////////// 649 650 MemoryStreamBase::MemoryStreamBase() 651 : buffer_(NULL), buffer_length_(0), data_length_(0), 652 seek_position_(0) { 653 } 654 655 StreamState MemoryStreamBase::GetState() const { 656 return SS_OPEN; 657 } 658 659 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 660 size_t* bytes_read, int* error) { 661 if (seek_position_ >= data_length_) { 662 return SR_EOS; 663 } 664 size_t available = data_length_ - seek_position_; 665 if (bytes > available) { 666 // Read partial buffer 667 bytes = available; 668 } 669 memcpy(buffer, &buffer_[seek_position_], bytes); 670 seek_position_ += bytes; 671 if (bytes_read) { 672 *bytes_read = bytes; 673 } 674 return SR_SUCCESS; 675 } 676 677 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 678 size_t* bytes_written, int* error) { 679 size_t available = buffer_length_ - seek_position_; 680 if (0 == available) { 681 // Increase buffer size to the larger of: 682 // a) new position rounded up to next 256 bytes 683 // b) double the previous length 684 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 685 buffer_length_ * 2); 686 StreamResult result = DoReserve(new_buffer_length, error); 687 if (SR_SUCCESS != result) { 688 return result; 689 } 690 ASSERT(buffer_length_ >= new_buffer_length); 691 available = buffer_length_ - seek_position_; 692 } 693 694 if (bytes > available) { 695 bytes = available; 696 } 697 memcpy(&buffer_[seek_position_], buffer, bytes); 698 seek_position_ += bytes; 699 if (data_length_ < seek_position_) { 700 data_length_ = seek_position_; 701 } 702 if (bytes_written) { 703 *bytes_written = bytes; 704 } 705 return SR_SUCCESS; 706 } 707 708 void MemoryStreamBase::Close() { 709 // nothing to do 710 } 711 712 bool MemoryStreamBase::SetPosition(size_t position) { 713 if (position > data_length_) 714 return false; 715 seek_position_ = position; 716 return true; 717 } 718 719 bool MemoryStreamBase::GetPosition(size_t* position) const { 720 if (position) 721 *position = seek_position_; 722 return true; 723 } 724 725 bool MemoryStreamBase::GetSize(size_t* size) const { 726 if (size) 727 *size = data_length_; 728 return true; 729 } 730 731 bool MemoryStreamBase::GetAvailable(size_t* size) const { 732 if (size) 733 *size = data_length_ - seek_position_; 734 return true; 735 } 736 737 bool MemoryStreamBase::ReserveSize(size_t size) { 738 return (SR_SUCCESS == DoReserve(size, NULL)); 739 } 740 741 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 742 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 743 } 744 745 /////////////////////////////////////////////////////////////////////////////// 746 747 MemoryStream::MemoryStream() 748 : buffer_alloc_(NULL) { 749 } 750 751 MemoryStream::MemoryStream(const char* data) 752 : buffer_alloc_(NULL) { 753 SetData(data, strlen(data)); 754 } 755 756 MemoryStream::MemoryStream(const void* data, size_t length) 757 : buffer_alloc_(NULL) { 758 SetData(data, length); 759 } 760 761 MemoryStream::~MemoryStream() { 762 delete [] buffer_alloc_; 763 } 764 765 void MemoryStream::SetData(const void* data, size_t length) { 766 data_length_ = buffer_length_ = length; 767 delete [] buffer_alloc_; 768 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 769 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 770 memcpy(buffer_, data, data_length_); 771 seek_position_ = 0; 772 } 773 774 StreamResult MemoryStream::DoReserve(size_t size, int* error) { 775 if (buffer_length_ >= size) 776 return SR_SUCCESS; 777 778 if (char* new_buffer_alloc = new char[size + kAlignment]) { 779 char* new_buffer = reinterpret_cast<char*>( 780 ALIGNP(new_buffer_alloc, kAlignment)); 781 memcpy(new_buffer, buffer_, data_length_); 782 delete [] buffer_alloc_; 783 buffer_alloc_ = new_buffer_alloc; 784 buffer_ = new_buffer; 785 buffer_length_ = size; 786 return SR_SUCCESS; 787 } 788 789 if (error) { 790 *error = ENOMEM; 791 } 792 return SR_ERROR; 793 } 794 795 /////////////////////////////////////////////////////////////////////////////// 796 797 ExternalMemoryStream::ExternalMemoryStream() { 798 } 799 800 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 801 SetData(data, length); 802 } 803 804 ExternalMemoryStream::~ExternalMemoryStream() { 805 } 806 807 void ExternalMemoryStream::SetData(void* data, size_t length) { 808 data_length_ = buffer_length_ = length; 809 buffer_ = static_cast<char*>(data); 810 seek_position_ = 0; 811 } 812 813 /////////////////////////////////////////////////////////////////////////////// 814 // FifoBuffer 815 /////////////////////////////////////////////////////////////////////////////// 816 817 FifoBuffer::FifoBuffer(size_t size) 818 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 819 data_length_(0), read_position_(0), owner_(Thread::Current()) { 820 // all events are done on the owner_ thread 821 } 822 823 FifoBuffer::FifoBuffer(size_t size, Thread* owner) 824 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 825 data_length_(0), read_position_(0), owner_(owner) { 826 // all events are done on the owner_ thread 827 } 828 829 FifoBuffer::~FifoBuffer() { 830 } 831 832 bool FifoBuffer::GetBuffered(size_t* size) const { 833 CritScope cs(&crit_); 834 *size = data_length_; 835 return true; 836 } 837 838 bool FifoBuffer::SetCapacity(size_t size) { 839 CritScope cs(&crit_); 840 if (data_length_ > size) { 841 return false; 842 } 843 844 if (size != buffer_length_) { 845 char* buffer = new char[size]; 846 const size_t copy = data_length_; 847 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 848 memcpy(buffer, &buffer_[read_position_], tail_copy); 849 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 850 buffer_.reset(buffer); 851 read_position_ = 0; 852 buffer_length_ = size; 853 } 854 return true; 855 } 856 857 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, 858 size_t offset, size_t* bytes_read) { 859 CritScope cs(&crit_); 860 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); 861 } 862 863 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, 864 size_t offset, size_t* bytes_written) { 865 CritScope cs(&crit_); 866 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); 867 } 868 869 StreamState FifoBuffer::GetState() const { 870 return state_; 871 } 872 873 StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 874 size_t* bytes_read, int* error) { 875 CritScope cs(&crit_); 876 const bool was_writable = data_length_ < buffer_length_; 877 size_t copy = 0; 878 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); 879 880 if (result == SR_SUCCESS) { 881 // If read was successful then adjust the read position and number of 882 // bytes buffered. 883 read_position_ = (read_position_ + copy) % buffer_length_; 884 data_length_ -= copy; 885 if (bytes_read) { 886 *bytes_read = copy; 887 } 888 889 // if we were full before, and now we're not, post an event 890 if (!was_writable && copy > 0) { 891 PostEvent(owner_, SE_WRITE, 0); 892 } 893 } 894 return result; 895 } 896 897 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 898 size_t* bytes_written, int* error) { 899 CritScope cs(&crit_); 900 901 const bool was_readable = (data_length_ > 0); 902 size_t copy = 0; 903 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); 904 905 if (result == SR_SUCCESS) { 906 // If write was successful then adjust the number of readable bytes. 907 data_length_ += copy; 908 if (bytes_written) { 909 *bytes_written = copy; 910 } 911 912 // if we didn't have any data to read before, and now we do, post an event 913 if (!was_readable && copy > 0) { 914 PostEvent(owner_, SE_READ, 0); 915 } 916 } 917 return result; 918 } 919 920 void FifoBuffer::Close() { 921 CritScope cs(&crit_); 922 state_ = SS_CLOSED; 923 } 924 925 const void* FifoBuffer::GetReadData(size_t* size) { 926 CritScope cs(&crit_); 927 *size = (read_position_ + data_length_ <= buffer_length_) ? 928 data_length_ : buffer_length_ - read_position_; 929 return &buffer_[read_position_]; 930 } 931 932 void FifoBuffer::ConsumeReadData(size_t size) { 933 CritScope cs(&crit_); 934 ASSERT(size <= data_length_); 935 const bool was_writable = data_length_ < buffer_length_; 936 read_position_ = (read_position_ + size) % buffer_length_; 937 data_length_ -= size; 938 if (!was_writable && size > 0) { 939 PostEvent(owner_, SE_WRITE, 0); 940 } 941 } 942 943 void* FifoBuffer::GetWriteBuffer(size_t* size) { 944 CritScope cs(&crit_); 945 if (state_ == SS_CLOSED) { 946 return NULL; 947 } 948 949 // if empty, reset the write position to the beginning, so we can get 950 // the biggest possible block 951 if (data_length_ == 0) { 952 read_position_ = 0; 953 } 954 955 const size_t write_position = (read_position_ + data_length_) 956 % buffer_length_; 957 *size = (write_position > read_position_ || data_length_ == 0) ? 958 buffer_length_ - write_position : read_position_ - write_position; 959 return &buffer_[write_position]; 960 } 961 962 void FifoBuffer::ConsumeWriteBuffer(size_t size) { 963 CritScope cs(&crit_); 964 ASSERT(size <= buffer_length_ - data_length_); 965 const bool was_readable = (data_length_ > 0); 966 data_length_ += size; 967 if (!was_readable && size > 0) { 968 PostEvent(owner_, SE_READ, 0); 969 } 970 } 971 972 bool FifoBuffer::GetWriteRemaining(size_t* size) const { 973 CritScope cs(&crit_); 974 *size = buffer_length_ - data_length_; 975 return true; 976 } 977 978 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, 979 size_t bytes, 980 size_t offset, 981 size_t* bytes_read) { 982 if (offset >= data_length_) { 983 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 984 } 985 986 const size_t available = data_length_ - offset; 987 const size_t read_position = (read_position_ + offset) % buffer_length_; 988 const size_t copy = _min(bytes, available); 989 const size_t tail_copy = _min(copy, buffer_length_ - read_position); 990 char* const p = static_cast<char*>(buffer); 991 memcpy(p, &buffer_[read_position], tail_copy); 992 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 993 994 if (bytes_read) { 995 *bytes_read = copy; 996 } 997 return SR_SUCCESS; 998 } 999 1000 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, 1001 size_t bytes, 1002 size_t offset, 1003 size_t* bytes_written) { 1004 if (state_ == SS_CLOSED) { 1005 return SR_EOS; 1006 } 1007 1008 if (data_length_ + offset >= buffer_length_) { 1009 return SR_BLOCK; 1010 } 1011 1012 const size_t available = buffer_length_ - data_length_ - offset; 1013 const size_t write_position = (read_position_ + data_length_ + offset) 1014 % buffer_length_; 1015 const size_t copy = _min(bytes, available); 1016 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 1017 const char* const p = static_cast<const char*>(buffer); 1018 memcpy(&buffer_[write_position], p, tail_copy); 1019 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 1020 1021 if (bytes_written) { 1022 *bytes_written = copy; 1023 } 1024 return SR_SUCCESS; 1025 } 1026 1027 1028 1029 /////////////////////////////////////////////////////////////////////////////// 1030 // LoggingAdapter 1031 /////////////////////////////////////////////////////////////////////////////// 1032 1033 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 1034 const std::string& label, bool hex_mode) 1035 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { 1036 set_label(label); 1037 } 1038 1039 void LoggingAdapter::set_label(const std::string& label) { 1040 label_.assign("["); 1041 label_.append(label); 1042 label_.append("]"); 1043 } 1044 1045 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 1046 size_t* read, int* error) { 1047 size_t local_read; if (!read) read = &local_read; 1048 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 1049 error); 1050 if (result == SR_SUCCESS) { 1051 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 1052 } 1053 return result; 1054 } 1055 1056 StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 1057 size_t* written, int* error) { 1058 size_t local_written; 1059 if (!written) written = &local_written; 1060 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 1061 error); 1062 if (result == SR_SUCCESS) { 1063 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 1064 &lms_); 1065 } 1066 return result; 1067 } 1068 1069 void LoggingAdapter::Close() { 1070 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1071 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1072 LOG_V(level_) << label_ << " Closed locally"; 1073 StreamAdapterInterface::Close(); 1074 } 1075 1076 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 1077 if (events & SE_OPEN) { 1078 LOG_V(level_) << label_ << " Open"; 1079 } else if (events & SE_CLOSE) { 1080 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1081 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1082 LOG_V(level_) << label_ << " Closed with error: " << err; 1083 } 1084 StreamAdapterInterface::OnEvent(stream, events, err); 1085 } 1086 1087 /////////////////////////////////////////////////////////////////////////////// 1088 // StringStream - Reads/Writes to an external std::string 1089 /////////////////////////////////////////////////////////////////////////////// 1090 1091 StringStream::StringStream(std::string& str) 1092 : str_(str), read_pos_(0), read_only_(false) { 1093 } 1094 1095 StringStream::StringStream(const std::string& str) 1096 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { 1097 } 1098 1099 StreamState StringStream::GetState() const { 1100 return SS_OPEN; 1101 } 1102 1103 StreamResult StringStream::Read(void* buffer, size_t buffer_len, 1104 size_t* read, int* error) { 1105 size_t available = _min(buffer_len, str_.size() - read_pos_); 1106 if (!available) 1107 return SR_EOS; 1108 memcpy(buffer, str_.data() + read_pos_, available); 1109 read_pos_ += available; 1110 if (read) 1111 *read = available; 1112 return SR_SUCCESS; 1113 } 1114 1115 StreamResult StringStream::Write(const void* data, size_t data_len, 1116 size_t* written, int* error) { 1117 if (read_only_) { 1118 if (error) { 1119 *error = -1; 1120 } 1121 return SR_ERROR; 1122 } 1123 str_.append(static_cast<const char*>(data), 1124 static_cast<const char*>(data) + data_len); 1125 if (written) 1126 *written = data_len; 1127 return SR_SUCCESS; 1128 } 1129 1130 void StringStream::Close() { 1131 } 1132 1133 bool StringStream::SetPosition(size_t position) { 1134 if (position > str_.size()) 1135 return false; 1136 read_pos_ = position; 1137 return true; 1138 } 1139 1140 bool StringStream::GetPosition(size_t* position) const { 1141 if (position) 1142 *position = read_pos_; 1143 return true; 1144 } 1145 1146 bool StringStream::GetSize(size_t* size) const { 1147 if (size) 1148 *size = str_.size(); 1149 return true; 1150 } 1151 1152 bool StringStream::GetAvailable(size_t* size) const { 1153 if (size) 1154 *size = str_.size() - read_pos_; 1155 return true; 1156 } 1157 1158 bool StringStream::ReserveSize(size_t size) { 1159 if (read_only_) 1160 return false; 1161 str_.reserve(size); 1162 return true; 1163 } 1164 1165 /////////////////////////////////////////////////////////////////////////////// 1166 // StreamReference 1167 /////////////////////////////////////////////////////////////////////////////// 1168 1169 StreamReference::StreamReference(StreamInterface* stream) 1170 : StreamAdapterInterface(stream, false) { 1171 // owner set to false so the destructor does not free the stream. 1172 stream_ref_count_ = new StreamRefCount(stream); 1173 } 1174 1175 StreamInterface* StreamReference::NewReference() { 1176 stream_ref_count_->AddReference(); 1177 return new StreamReference(stream_ref_count_, stream()); 1178 } 1179 1180 StreamReference::~StreamReference() { 1181 stream_ref_count_->Release(); 1182 } 1183 1184 StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1185 StreamInterface* stream) 1186 : StreamAdapterInterface(stream, false), 1187 stream_ref_count_(stream_ref_count) { 1188 } 1189 1190 /////////////////////////////////////////////////////////////////////////////// 1191 1192 StreamResult Flow(StreamInterface* source, 1193 char* buffer, size_t buffer_len, 1194 StreamInterface* sink, 1195 size_t* data_len /* = NULL */) { 1196 ASSERT(buffer_len > 0); 1197 1198 StreamResult result; 1199 size_t count, read_pos, write_pos; 1200 if (data_len) { 1201 read_pos = *data_len; 1202 } else { 1203 read_pos = 0; 1204 } 1205 1206 bool end_of_stream = false; 1207 do { 1208 // Read until buffer is full, end of stream, or error 1209 while (!end_of_stream && (read_pos < buffer_len)) { 1210 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1211 &count, NULL); 1212 if (result == SR_EOS) { 1213 end_of_stream = true; 1214 } else if (result != SR_SUCCESS) { 1215 if (data_len) { 1216 *data_len = read_pos; 1217 } 1218 return result; 1219 } else { 1220 read_pos += count; 1221 } 1222 } 1223 1224 // Write until buffer is empty, or error (including end of stream) 1225 write_pos = 0; 1226 while (write_pos < read_pos) { 1227 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1228 &count, NULL); 1229 if (result != SR_SUCCESS) { 1230 if (data_len) { 1231 *data_len = read_pos - write_pos; 1232 if (write_pos > 0) { 1233 memmove(buffer, buffer + write_pos, *data_len); 1234 } 1235 } 1236 return result; 1237 } 1238 write_pos += count; 1239 } 1240 1241 read_pos = 0; 1242 } while (!end_of_stream); 1243 1244 if (data_len) { 1245 *data_len = 0; 1246 } 1247 return SR_SUCCESS; 1248 } 1249 1250 /////////////////////////////////////////////////////////////////////////////// 1251 1252 } // namespace talk_base 1253