1 /* 2 * libjingle 3 * Copyright 2011, Google Inc. 4 * 5 * Redistribution and use in source and binary forms, with or without 6 * modification, are permitted provided that the following conditions are met: 7 * 8 * 1. Redistributions of source code must retain the above copyright notice, 9 * this list of conditions and the following disclaimer. 10 * 2. Redistributions in binary form must reproduce the above copyright notice, 11 * this list of conditions and the following disclaimer in the documentation 12 * and/or other materials provided with the distribution. 13 * 3. The name of the author may not be used to endorse or promote products 14 * derived from this software without specific prior written permission. 15 * 16 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED 17 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF 18 * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO 19 * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 20 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, 21 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; 22 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, 23 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR 24 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF 25 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 26 */ 27 28 #if defined(POSIX) 29 #include <sys/file.h> 30 #endif // POSIX 31 #include <sys/types.h> 32 #include <sys/stat.h> 33 #include <errno.h> 34 #include <string> 35 #include "talk/base/basictypes.h" 36 #include "talk/base/common.h" 37 #include "talk/base/messagequeue.h" 38 #include "talk/base/stream.h" 39 #include "talk/base/stringencode.h" 40 #include "talk/base/stringutils.h" 41 #include "talk/base/thread.h" 42 43 #ifdef WIN32 44 #include "talk/base/win32.h" 45 #define fileno _fileno 46 #endif 47 48 namespace talk_base { 49 50 /////////////////////////////////////////////////////////////////////////////// 51 // StreamInterface 52 /////////////////////////////////////////////////////////////////////////////// 53 54 enum { 55 MSG_POST_EVENT = 0xF1F1 56 }; 57 58 StreamInterface::~StreamInterface() { 59 } 60 61 struct PostEventData : public MessageData { 62 int events, error; 63 PostEventData(int ev, int er) : events(ev), error(er) { } 64 }; 65 66 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 67 size_t* written, int* error) { 68 StreamResult result = SR_SUCCESS; 69 size_t total_written = 0, current_written; 70 while (total_written < data_len) { 71 result = Write(static_cast<const char*>(data) + total_written, 72 data_len - total_written, ¤t_written, error); 73 if (result != SR_SUCCESS) 74 break; 75 total_written += current_written; 76 } 77 if (written) 78 *written = total_written; 79 return result; 80 } 81 82 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 83 size_t* read, int* error) { 84 StreamResult result = SR_SUCCESS; 85 size_t total_read = 0, current_read; 86 while (total_read < buffer_len) { 87 result = Read(static_cast<char*>(buffer) + total_read, 88 buffer_len - total_read, ¤t_read, error); 89 if (result != SR_SUCCESS) 90 break; 91 total_read += current_read; 92 } 93 if (read) 94 *read = total_read; 95 return result; 96 } 97 98 StreamResult StreamInterface::ReadLine(std::string* line) { 99 line->clear(); 100 StreamResult result = SR_SUCCESS; 101 while (true) { 102 char ch; 103 result = Read(&ch, sizeof(ch), NULL, NULL); 104 if (result != SR_SUCCESS) { 105 break; 106 } 107 if (ch == '\n') { 108 break; 109 } 110 line->push_back(ch); 111 } 112 if (!line->empty()) { // give back the line we've collected so far with 113 result = SR_SUCCESS; // a success code. Otherwise return the last code 114 } 115 return result; 116 } 117 118 void StreamInterface::PostEvent(Thread* t, int events, int err) { 119 t->Post(this, MSG_POST_EVENT, new PostEventData(events, err)); 120 } 121 122 void StreamInterface::PostEvent(int events, int err) { 123 PostEvent(Thread::Current(), events, err); 124 } 125 126 StreamInterface::StreamInterface() { 127 } 128 129 void StreamInterface::OnMessage(Message* msg) { 130 if (MSG_POST_EVENT == msg->message_id) { 131 PostEventData* pe = static_cast<PostEventData*>(msg->pdata); 132 SignalEvent(this, pe->events, pe->error); 133 delete msg->pdata; 134 } 135 } 136 137 /////////////////////////////////////////////////////////////////////////////// 138 // StreamAdapterInterface 139 /////////////////////////////////////////////////////////////////////////////// 140 141 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 142 bool owned) 143 : stream_(stream), owned_(owned) { 144 if (NULL != stream_) 145 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 146 } 147 148 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 149 if (NULL != stream_) 150 stream_->SignalEvent.disconnect(this); 151 if (owned_) 152 delete stream_; 153 stream_ = stream; 154 owned_ = owned; 155 if (NULL != stream_) 156 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 157 } 158 159 StreamInterface* StreamAdapterInterface::Detach() { 160 if (NULL != stream_) 161 stream_->SignalEvent.disconnect(this); 162 StreamInterface* stream = stream_; 163 stream_ = NULL; 164 return stream; 165 } 166 167 StreamAdapterInterface::~StreamAdapterInterface() { 168 if (owned_) 169 delete stream_; 170 } 171 172 /////////////////////////////////////////////////////////////////////////////// 173 // StreamTap 174 /////////////////////////////////////////////////////////////////////////////// 175 176 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 177 : StreamAdapterInterface(stream), tap_(NULL), tap_result_(SR_SUCCESS), 178 tap_error_(0) 179 { 180 AttachTap(tap); 181 } 182 183 void StreamTap::AttachTap(StreamInterface* tap) { 184 tap_.reset(tap); 185 } 186 187 StreamInterface* StreamTap::DetachTap() { 188 return tap_.release(); 189 } 190 191 StreamResult StreamTap::GetTapResult(int* error) { 192 if (error) { 193 *error = tap_error_; 194 } 195 return tap_result_; 196 } 197 198 StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 199 size_t* read, int* error) { 200 size_t backup_read; 201 if (!read) { 202 read = &backup_read; 203 } 204 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 205 read, error); 206 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 207 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 208 } 209 return res; 210 } 211 212 StreamResult StreamTap::Write(const void* data, size_t data_len, 213 size_t* written, int* error) { 214 size_t backup_written; 215 if (!written) { 216 written = &backup_written; 217 } 218 StreamResult res = StreamAdapterInterface::Write(data, data_len, 219 written, error); 220 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 221 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 222 } 223 return res; 224 } 225 226 /////////////////////////////////////////////////////////////////////////////// 227 // StreamSegment 228 /////////////////////////////////////////////////////////////////////////////// 229 230 StreamSegment::StreamSegment(StreamInterface* stream) 231 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 232 length_(SIZE_UNKNOWN) 233 { 234 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 235 stream->GetPosition(&start_); 236 } 237 238 StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 239 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 240 length_(length) 241 { 242 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 243 stream->GetPosition(&start_); 244 } 245 246 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 247 size_t* read, int* error) 248 { 249 if (SIZE_UNKNOWN != length_) { 250 if (pos_ >= length_) 251 return SR_EOS; 252 buffer_len = _min(buffer_len, length_ - pos_); 253 } 254 size_t backup_read; 255 if (!read) { 256 read = &backup_read; 257 } 258 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 259 read, error); 260 if (SR_SUCCESS == result) { 261 pos_ += *read; 262 } 263 return result; 264 } 265 266 bool StreamSegment::SetPosition(size_t position) { 267 if (SIZE_UNKNOWN == start_) 268 return false; // Not seekable 269 if ((SIZE_UNKNOWN != length_) && (position > length_)) 270 return false; // Seek past end of segment 271 if (!StreamAdapterInterface::SetPosition(start_ + position)) 272 return false; 273 pos_ = position; 274 return true; 275 } 276 277 bool StreamSegment::GetPosition(size_t* position) const { 278 if (SIZE_UNKNOWN == start_) 279 return false; // Not seekable 280 if (!StreamAdapterInterface::GetPosition(position)) 281 return false; 282 if (position) { 283 ASSERT(*position >= start_); 284 *position -= start_; 285 } 286 return true; 287 } 288 289 bool StreamSegment::GetSize(size_t* size) const { 290 if (!StreamAdapterInterface::GetSize(size)) 291 return false; 292 if (size) { 293 if (SIZE_UNKNOWN != start_) { 294 ASSERT(*size >= start_); 295 *size -= start_; 296 } 297 if (SIZE_UNKNOWN != length_) { 298 *size = _min(*size, length_); 299 } 300 } 301 return true; 302 } 303 304 bool StreamSegment::GetAvailable(size_t* size) const { 305 if (!StreamAdapterInterface::GetAvailable(size)) 306 return false; 307 if (size && (SIZE_UNKNOWN != length_)) 308 *size = _min(*size, length_ - pos_); 309 return true; 310 } 311 312 /////////////////////////////////////////////////////////////////////////////// 313 // NullStream 314 /////////////////////////////////////////////////////////////////////////////// 315 316 NullStream::NullStream() { 317 } 318 319 NullStream::~NullStream() { 320 } 321 322 StreamState NullStream::GetState() const { 323 return SS_OPEN; 324 } 325 326 StreamResult NullStream::Read(void* buffer, size_t buffer_len, 327 size_t* read, int* error) { 328 if (error) *error = -1; 329 return SR_ERROR; 330 } 331 332 StreamResult NullStream::Write(const void* data, size_t data_len, 333 size_t* written, int* error) { 334 if (written) *written = data_len; 335 return SR_SUCCESS; 336 } 337 338 void NullStream::Close() { 339 } 340 341 /////////////////////////////////////////////////////////////////////////////// 342 // FileStream 343 /////////////////////////////////////////////////////////////////////////////// 344 345 FileStream::FileStream() : file_(NULL) { 346 } 347 348 FileStream::~FileStream() { 349 FileStream::Close(); 350 } 351 352 bool FileStream::Open(const std::string& filename, const char* mode) { 353 Close(); 354 #ifdef WIN32 355 std::wstring wfilename; 356 if (Utf8ToWindowsFilename(filename, &wfilename)) { 357 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 358 } else { 359 file_ = NULL; 360 } 361 #else 362 file_ = fopen(filename.c_str(), mode); 363 #endif 364 return (file_ != NULL); 365 } 366 367 bool FileStream::OpenShare(const std::string& filename, const char* mode, 368 int shflag) { 369 Close(); 370 #ifdef WIN32 371 std::wstring wfilename; 372 if (Utf8ToWindowsFilename(filename, &wfilename)) { 373 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 374 } else { 375 file_ = NULL; 376 } 377 #else 378 return Open(filename, mode); 379 #endif 380 return (file_ != NULL); 381 } 382 383 bool FileStream::DisableBuffering() { 384 if (!file_) 385 return false; 386 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 387 } 388 389 StreamState FileStream::GetState() const { 390 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 391 } 392 393 StreamResult FileStream::Read(void* buffer, size_t buffer_len, 394 size_t* read, int* error) { 395 if (!file_) 396 return SR_EOS; 397 size_t result = fread(buffer, 1, buffer_len, file_); 398 if ((result == 0) && (buffer_len > 0)) { 399 if (feof(file_)) 400 return SR_EOS; 401 if (error) 402 *error = errno; 403 return SR_ERROR; 404 } 405 if (read) 406 *read = result; 407 return SR_SUCCESS; 408 } 409 410 StreamResult FileStream::Write(const void* data, size_t data_len, 411 size_t* written, int* error) { 412 if (!file_) 413 return SR_EOS; 414 size_t result = fwrite(data, 1, data_len, file_); 415 if ((result == 0) && (data_len > 0)) { 416 if (error) 417 *error = errno; 418 return SR_ERROR; 419 } 420 if (written) 421 *written = result; 422 return SR_SUCCESS; 423 } 424 425 void FileStream::Close() { 426 if (file_) { 427 DoClose(); 428 file_ = NULL; 429 } 430 } 431 432 bool FileStream::SetPosition(size_t position) { 433 if (!file_) 434 return false; 435 return (fseek(file_, position, SEEK_SET) == 0); 436 } 437 438 bool FileStream::GetPosition(size_t* position) const { 439 ASSERT(NULL != position); 440 if (!file_) 441 return false; 442 long result = ftell(file_); 443 if (result < 0) 444 return false; 445 if (position) 446 *position = result; 447 return true; 448 } 449 450 bool FileStream::GetSize(size_t* size) const { 451 ASSERT(NULL != size); 452 if (!file_) 453 return false; 454 struct stat file_stats; 455 if (fstat(fileno(file_), &file_stats) != 0) 456 return false; 457 if (size) 458 *size = file_stats.st_size; 459 return true; 460 } 461 462 bool FileStream::GetAvailable(size_t* size) const { 463 ASSERT(NULL != size); 464 if (!GetSize(size)) 465 return false; 466 long result = ftell(file_); 467 if (result < 0) 468 return false; 469 if (size) 470 *size -= result; 471 return true; 472 } 473 474 bool FileStream::ReserveSize(size_t size) { 475 // TODO: extend the file to the proper length 476 return true; 477 } 478 479 bool FileStream::GetSize(const std::string& filename, size_t* size) { 480 struct stat file_stats; 481 if (stat(filename.c_str(), &file_stats) != 0) 482 return false; 483 *size = file_stats.st_size; 484 return true; 485 } 486 487 bool FileStream::Flush() { 488 if (file_) { 489 return (0 == fflush(file_)); 490 } 491 // try to flush empty file? 492 ASSERT(false); 493 return false; 494 } 495 496 #if defined(POSIX) 497 498 bool FileStream::TryLock() { 499 if (file_ == NULL) { 500 // Stream not open. 501 ASSERT(false); 502 return false; 503 } 504 505 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 506 } 507 508 bool FileStream::Unlock() { 509 if (file_ == NULL) { 510 // Stream not open. 511 ASSERT(false); 512 return false; 513 } 514 515 return flock(fileno(file_), LOCK_UN) == 0; 516 } 517 518 #endif 519 520 void FileStream::DoClose() { 521 fclose(file_); 522 } 523 524 #ifdef POSIX 525 526 // Have to identically rewrite the FileStream destructor or else it would call 527 // the base class's Close() instead of the sub-class's. 528 POpenStream::~POpenStream() { 529 POpenStream::Close(); 530 } 531 532 bool POpenStream::Open(const std::string& subcommand, const char* mode) { 533 Close(); 534 file_ = popen(subcommand.c_str(), mode); 535 return file_ != NULL; 536 } 537 538 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 539 int shflag) { 540 return Open(subcommand, mode); 541 } 542 543 void POpenStream::DoClose() { 544 wait_status_ = pclose(file_); 545 } 546 547 #endif 548 549 /////////////////////////////////////////////////////////////////////////////// 550 // MemoryStream 551 /////////////////////////////////////////////////////////////////////////////// 552 553 MemoryStreamBase::MemoryStreamBase() 554 : buffer_(NULL), buffer_length_(0), data_length_(0), 555 seek_position_(0) { 556 } 557 558 StreamState MemoryStreamBase::GetState() const { 559 return SS_OPEN; 560 } 561 562 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 563 size_t* bytes_read, int* error) { 564 if (seek_position_ >= data_length_) { 565 return SR_EOS; 566 } 567 size_t available = data_length_ - seek_position_; 568 if (bytes > available) { 569 // Read partial buffer 570 bytes = available; 571 } 572 memcpy(buffer, &buffer_[seek_position_], bytes); 573 seek_position_ += bytes; 574 if (bytes_read) { 575 *bytes_read = bytes; 576 } 577 return SR_SUCCESS; 578 } 579 580 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 581 size_t* bytes_written, int* error) { 582 size_t available = buffer_length_ - seek_position_; 583 if (0 == available) { 584 // Increase buffer size to the larger of: 585 // a) new position rounded up to next 256 bytes 586 // b) double the previous length 587 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 588 buffer_length_ * 2); 589 StreamResult result = DoReserve(new_buffer_length, error); 590 if (SR_SUCCESS != result) { 591 return result; 592 } 593 ASSERT(buffer_length_ >= new_buffer_length); 594 available = buffer_length_ - seek_position_; 595 } 596 597 if (bytes > available) { 598 bytes = available; 599 } 600 memcpy(&buffer_[seek_position_], buffer, bytes); 601 seek_position_ += bytes; 602 if (data_length_ < seek_position_) { 603 data_length_ = seek_position_; 604 } 605 if (bytes_written) { 606 *bytes_written = bytes; 607 } 608 return SR_SUCCESS; 609 } 610 611 void MemoryStreamBase::Close() { 612 // nothing to do 613 } 614 615 bool MemoryStreamBase::SetPosition(size_t position) { 616 if (position > data_length_) 617 return false; 618 seek_position_ = position; 619 return true; 620 } 621 622 bool MemoryStreamBase::GetPosition(size_t *position) const { 623 if (position) 624 *position = seek_position_; 625 return true; 626 } 627 628 bool MemoryStreamBase::GetSize(size_t *size) const { 629 if (size) 630 *size = data_length_; 631 return true; 632 } 633 634 bool MemoryStreamBase::GetAvailable(size_t *size) const { 635 if (size) 636 *size = data_length_ - seek_position_; 637 return true; 638 } 639 640 bool MemoryStreamBase::ReserveSize(size_t size) { 641 return (SR_SUCCESS == DoReserve(size, NULL)); 642 } 643 644 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 645 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 646 } 647 648 /////////////////////////////////////////////////////////////////////////////// 649 650 MemoryStream::MemoryStream() 651 : buffer_alloc_(NULL) { 652 } 653 654 MemoryStream::MemoryStream(const char* data) 655 : buffer_alloc_(NULL) { 656 SetData(data, strlen(data)); 657 } 658 659 MemoryStream::MemoryStream(const void* data, size_t length) 660 : buffer_alloc_(NULL) { 661 SetData(data, length); 662 } 663 664 MemoryStream::~MemoryStream() { 665 delete [] buffer_alloc_; 666 } 667 668 void MemoryStream::SetData(const void* data, size_t length) { 669 data_length_ = buffer_length_ = length; 670 delete [] buffer_alloc_; 671 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 672 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 673 memcpy(buffer_, data, data_length_); 674 seek_position_ = 0; 675 } 676 677 StreamResult MemoryStream::DoReserve(size_t size, int* error) { 678 if (buffer_length_ >= size) 679 return SR_SUCCESS; 680 681 if (char* new_buffer_alloc = new char[size + kAlignment]) { 682 char* new_buffer = reinterpret_cast<char*>( 683 ALIGNP(new_buffer_alloc, kAlignment)); 684 memcpy(new_buffer, buffer_, data_length_); 685 delete [] buffer_alloc_; 686 buffer_alloc_ = new_buffer_alloc; 687 buffer_ = new_buffer; 688 buffer_length_ = size; 689 return SR_SUCCESS; 690 } 691 692 if (error) { 693 *error = ENOMEM; 694 } 695 return SR_ERROR; 696 } 697 698 /////////////////////////////////////////////////////////////////////////////// 699 700 ExternalMemoryStream::ExternalMemoryStream() { 701 } 702 703 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 704 SetData(data, length); 705 } 706 707 ExternalMemoryStream::~ExternalMemoryStream() { 708 } 709 710 void ExternalMemoryStream::SetData(void* data, size_t length) { 711 data_length_ = buffer_length_ = length; 712 buffer_ = static_cast<char*>(data); 713 seek_position_ = 0; 714 } 715 716 /////////////////////////////////////////////////////////////////////////////// 717 // FifoBuffer 718 /////////////////////////////////////////////////////////////////////////////// 719 720 FifoBuffer::FifoBuffer(size_t size) 721 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 722 data_length_(0), read_position_(0), owner_(Thread::Current()) { 723 // all events are done on the owner_ thread 724 } 725 726 FifoBuffer::~FifoBuffer() { 727 } 728 729 bool FifoBuffer::GetBuffered(size_t* size) const { 730 CritScope cs(&crit_); 731 *size = data_length_; 732 return true; 733 } 734 735 bool FifoBuffer::SetCapacity(size_t size) { 736 CritScope cs(&crit_); 737 if (data_length_ > size) { 738 return false; 739 } 740 741 if (size != buffer_length_) { 742 char* buffer = new char[size]; 743 const size_t copy = data_length_; 744 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 745 memcpy(buffer, &buffer_[read_position_], tail_copy); 746 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 747 buffer_.reset(buffer); 748 read_position_ = 0; 749 buffer_length_ = size; 750 } 751 return true; 752 } 753 754 StreamState FifoBuffer::GetState() const { 755 return state_; 756 } 757 758 StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 759 size_t* bytes_read, int* error) { 760 CritScope cs(&crit_); 761 const size_t available = data_length_; 762 if (0 == available) { 763 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 764 } 765 766 const bool was_writable = data_length_ < buffer_length_; 767 const size_t copy = _min(bytes, available); 768 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 769 char* const p = static_cast<char*>(buffer); 770 memcpy(p, &buffer_[read_position_], tail_copy); 771 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 772 read_position_ = (read_position_ + copy) % buffer_length_; 773 data_length_ -= copy; 774 if (bytes_read) { 775 *bytes_read = copy; 776 } 777 // if we were full before, and now we're not, post an event 778 if (!was_writable && copy > 0) { 779 PostEvent(owner_, SE_WRITE, 0); 780 } 781 782 return SR_SUCCESS; 783 } 784 785 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 786 size_t* bytes_written, int* error) { 787 CritScope cs(&crit_); 788 if (state_ == SS_CLOSED) { 789 return SR_EOS; 790 } 791 792 const size_t available = buffer_length_ - data_length_; 793 if (0 == available) { 794 return SR_BLOCK; 795 } 796 797 const bool was_readable = (data_length_ > 0); 798 const size_t write_position = (read_position_ + data_length_) 799 % buffer_length_; 800 const size_t copy = _min(bytes, available); 801 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 802 const char* const p = static_cast<const char*>(buffer); 803 memcpy(&buffer_[write_position], p, tail_copy); 804 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 805 data_length_ += copy; 806 if (bytes_written) { 807 *bytes_written = copy; 808 } 809 // if we didn't have any data to read before, and now we do, post an event 810 if (!was_readable && copy > 0) { 811 PostEvent(owner_, SE_READ, 0); 812 } 813 814 return SR_SUCCESS; 815 } 816 817 void FifoBuffer::Close() { 818 CritScope cs(&crit_); 819 state_ = SS_CLOSED; 820 } 821 822 const void* FifoBuffer::GetReadData(size_t* size) { 823 CritScope cs(&crit_); 824 *size = (read_position_ + data_length_ <= buffer_length_) ? 825 data_length_ : buffer_length_ - read_position_; 826 return &buffer_[read_position_]; 827 } 828 829 void FifoBuffer::ConsumeReadData(size_t size) { 830 CritScope cs(&crit_); 831 ASSERT(size <= data_length_); 832 const bool was_writable = data_length_ < buffer_length_; 833 read_position_ = (read_position_ + size) % buffer_length_; 834 data_length_ -= size; 835 if (!was_writable && size > 0) { 836 PostEvent(owner_, SE_WRITE, 0); 837 } 838 } 839 840 void* FifoBuffer::GetWriteBuffer(size_t* size) { 841 CritScope cs(&crit_); 842 if (state_ == SS_CLOSED) { 843 return NULL; 844 } 845 846 // if empty, reset the write position to the beginning, so we can get 847 // the biggest possible block 848 if (data_length_ == 0) { 849 read_position_ = 0; 850 } 851 852 const size_t write_position = (read_position_ + data_length_) 853 % buffer_length_; 854 *size = (write_position >= read_position_) ? 855 buffer_length_ - write_position : read_position_ - write_position; 856 return &buffer_[write_position]; 857 } 858 859 void FifoBuffer::ConsumeWriteBuffer(size_t size) { 860 CritScope cs(&crit_); 861 ASSERT(size <= buffer_length_ - data_length_); 862 const bool was_readable = (data_length_ > 0); 863 data_length_ += size; 864 if (!was_readable && size > 0) { 865 PostEvent(owner_, SE_READ, 0); 866 } 867 } 868 869 /////////////////////////////////////////////////////////////////////////////// 870 // LoggingAdapter 871 /////////////////////////////////////////////////////////////////////////////// 872 873 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 874 const std::string& label, bool hex_mode) 875 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) 876 { 877 set_label(label); 878 } 879 880 void LoggingAdapter::set_label(const std::string& label) { 881 label_.assign("["); 882 label_.append(label); 883 label_.append("]"); 884 } 885 886 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 887 size_t* read, int* error) { 888 size_t local_read; if (!read) read = &local_read; 889 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 890 error); 891 if (result == SR_SUCCESS) { 892 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 893 } 894 return result; 895 } 896 897 StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 898 size_t* written, int* error) { 899 size_t local_written; if (!written) written = &local_written; 900 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 901 error); 902 if (result == SR_SUCCESS) { 903 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 904 &lms_); 905 } 906 return result; 907 } 908 909 void LoggingAdapter::Close() { 910 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 911 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 912 LOG_V(level_) << label_ << " Closed locally"; 913 StreamAdapterInterface::Close(); 914 } 915 916 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 917 if (events & SE_OPEN) { 918 LOG_V(level_) << label_ << " Open"; 919 } else if (events & SE_CLOSE) { 920 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 921 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 922 LOG_V(level_) << label_ << " Closed with error: " << err; 923 } 924 StreamAdapterInterface::OnEvent(stream, events, err); 925 } 926 927 /////////////////////////////////////////////////////////////////////////////// 928 // StringStream - Reads/Writes to an external std::string 929 /////////////////////////////////////////////////////////////////////////////// 930 931 StringStream::StringStream(std::string& str) 932 : str_(str), read_pos_(0), read_only_(false) 933 { 934 } 935 936 StringStream::StringStream(const std::string& str) 937 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) 938 { 939 } 940 941 StreamState StringStream::GetState() const { 942 return SS_OPEN; 943 } 944 945 StreamResult StringStream::Read(void* buffer, size_t buffer_len, 946 size_t* read, int* error) { 947 size_t available = _min(buffer_len, str_.size() - read_pos_); 948 if (!available) 949 return SR_EOS; 950 memcpy(buffer, str_.data() + read_pos_, available); 951 read_pos_ += available; 952 if (read) 953 *read = available; 954 return SR_SUCCESS; 955 } 956 957 StreamResult StringStream::Write(const void* data, size_t data_len, 958 size_t* written, int* error) { 959 if (read_only_) { 960 if (error) { 961 *error = -1; 962 } 963 return SR_ERROR; 964 } 965 str_.append(static_cast<const char*>(data), 966 static_cast<const char*>(data) + data_len); 967 if (written) 968 *written = data_len; 969 return SR_SUCCESS; 970 } 971 972 void StringStream::Close() { 973 } 974 975 bool StringStream::SetPosition(size_t position) { 976 if (position > str_.size()) 977 return false; 978 read_pos_ = position; 979 return true; 980 } 981 982 bool StringStream::GetPosition(size_t* position) const { 983 if (position) 984 *position = read_pos_; 985 return true; 986 } 987 988 bool StringStream::GetSize(size_t* size) const { 989 if (size) 990 *size = str_.size(); 991 return true; 992 } 993 994 bool StringStream::GetAvailable(size_t* size) const { 995 if (size) 996 *size = str_.size() - read_pos_; 997 return true; 998 } 999 1000 bool StringStream::ReserveSize(size_t size) { 1001 if (read_only_) 1002 return false; 1003 str_.reserve(size); 1004 return true; 1005 } 1006 1007 /////////////////////////////////////////////////////////////////////////////// 1008 // StreamReference 1009 /////////////////////////////////////////////////////////////////////////////// 1010 1011 StreamReference::StreamReference(StreamInterface* stream) 1012 : StreamAdapterInterface(stream, false) { 1013 // owner set to false so the destructor does not free the stream. 1014 stream_ref_count_ = new StreamRefCount(stream); 1015 } 1016 1017 StreamInterface* StreamReference::NewReference() { 1018 stream_ref_count_->AddReference(); 1019 return new StreamReference(stream_ref_count_, stream()); 1020 } 1021 1022 StreamReference::~StreamReference() { 1023 stream_ref_count_->Release(); 1024 } 1025 1026 StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1027 StreamInterface* stream) 1028 : StreamAdapterInterface(stream, false), 1029 stream_ref_count_(stream_ref_count) { 1030 } 1031 1032 /////////////////////////////////////////////////////////////////////////////// 1033 1034 StreamResult Flow(StreamInterface* source, 1035 char* buffer, size_t buffer_len, 1036 StreamInterface* sink, 1037 size_t* data_len /* = NULL */) { 1038 ASSERT(buffer_len > 0); 1039 1040 StreamResult result; 1041 size_t count, read_pos, write_pos; 1042 if (data_len) { 1043 read_pos = *data_len; 1044 } else { 1045 read_pos = 0; 1046 } 1047 1048 bool end_of_stream = false; 1049 do { 1050 // Read until buffer is full, end of stream, or error 1051 while (!end_of_stream && (read_pos < buffer_len)) { 1052 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1053 &count, NULL); 1054 if (result == SR_EOS) { 1055 end_of_stream = true; 1056 } else if (result != SR_SUCCESS) { 1057 if (data_len) { 1058 *data_len = read_pos; 1059 } 1060 return result; 1061 } else { 1062 read_pos += count; 1063 } 1064 } 1065 1066 // Write until buffer is empty, or error (including end of stream) 1067 write_pos = 0; 1068 while (write_pos < read_pos) { 1069 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1070 &count, NULL); 1071 if (result != SR_SUCCESS) { 1072 if (data_len) { 1073 *data_len = read_pos - write_pos; 1074 if (write_pos > 0) { 1075 memmove(buffer, buffer + write_pos, *data_len); 1076 } 1077 } 1078 return result; 1079 } 1080 write_pos += count; 1081 } 1082 1083 read_pos = 0; 1084 } while (!end_of_stream); 1085 1086 if (data_len) { 1087 *data_len = 0; 1088 } 1089 return SR_SUCCESS; 1090 } 1091 1092 /////////////////////////////////////////////////////////////////////////////// 1093 1094 } // namespace talk_base 1095