1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #if defined(WEBRTC_POSIX) 12 #include <sys/file.h> 13 #endif // WEBRTC_POSIX 14 #include <sys/types.h> 15 #include <sys/stat.h> 16 #include <errno.h> 17 18 #include <algorithm> 19 #include <string> 20 21 #include "webrtc/base/basictypes.h" 22 #include "webrtc/base/common.h" 23 #include "webrtc/base/logging.h" 24 #include "webrtc/base/messagequeue.h" 25 #include "webrtc/base/stream.h" 26 #include "webrtc/base/stringencode.h" 27 #include "webrtc/base/stringutils.h" 28 #include "webrtc/base/thread.h" 29 #include "webrtc/base/timeutils.h" 30 31 #if defined(WEBRTC_WIN) 32 #include "webrtc/base/win32.h" 33 #define fileno _fileno 34 #endif 35 36 namespace rtc { 37 38 /////////////////////////////////////////////////////////////////////////////// 39 // StreamInterface 40 /////////////////////////////////////////////////////////////////////////////// 41 StreamInterface::~StreamInterface() { 42 } 43 44 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 45 size_t* written, int* error) { 46 StreamResult result = SR_SUCCESS; 47 size_t total_written = 0, current_written; 48 while (total_written < data_len) { 49 result = Write(static_cast<const char*>(data) + total_written, 50 data_len - total_written, ¤t_written, error); 51 if (result != SR_SUCCESS) 52 break; 53 total_written += current_written; 54 } 55 if (written) 56 *written = total_written; 57 return result; 58 } 59 60 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 61 size_t* read, int* error) { 62 StreamResult result = SR_SUCCESS; 63 size_t total_read = 0, current_read; 64 while (total_read < buffer_len) { 65 result = Read(static_cast<char*>(buffer) + total_read, 66 buffer_len - total_read, ¤t_read, error); 67 if (result != SR_SUCCESS) 68 break; 69 total_read += current_read; 70 } 71 if (read) 72 *read = total_read; 73 return result; 74 } 75 76 StreamResult StreamInterface::ReadLine(std::string* line) { 77 line->clear(); 78 StreamResult result = SR_SUCCESS; 79 while (true) { 80 char ch; 81 result = Read(&ch, sizeof(ch), NULL, NULL); 82 if (result != SR_SUCCESS) { 83 break; 84 } 85 if (ch == '\n') { 86 break; 87 } 88 line->push_back(ch); 89 } 90 if (!line->empty()) { // give back the line we've collected so far with 91 result = SR_SUCCESS; // a success code. Otherwise return the last code 92 } 93 return result; 94 } 95 96 void StreamInterface::PostEvent(Thread* t, int events, int err) { 97 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err)); 98 } 99 100 void StreamInterface::PostEvent(int events, int err) { 101 PostEvent(Thread::Current(), events, err); 102 } 103 104 const void* StreamInterface::GetReadData(size_t* data_len) { 105 return NULL; 106 } 107 108 void* StreamInterface::GetWriteBuffer(size_t* buf_len) { 109 return NULL; 110 } 111 112 bool StreamInterface::SetPosition(size_t position) { 113 return false; 114 } 115 116 bool StreamInterface::GetPosition(size_t* position) const { 117 return false; 118 } 119 120 bool StreamInterface::GetSize(size_t* size) const { 121 return false; 122 } 123 124 bool StreamInterface::GetAvailable(size_t* size) const { 125 return false; 126 } 127 128 bool StreamInterface::GetWriteRemaining(size_t* size) const { 129 return false; 130 } 131 132 bool StreamInterface::Flush() { 133 return false; 134 } 135 136 bool StreamInterface::ReserveSize(size_t size) { 137 return true; 138 } 139 140 StreamInterface::StreamInterface() { 141 } 142 143 void StreamInterface::OnMessage(Message* msg) { 144 if (MSG_POST_EVENT == msg->message_id) { 145 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); 146 SignalEvent(this, pe->events, pe->error); 147 delete msg->pdata; 148 } 149 } 150 151 /////////////////////////////////////////////////////////////////////////////// 152 // StreamAdapterInterface 153 /////////////////////////////////////////////////////////////////////////////// 154 155 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 156 bool owned) 157 : stream_(stream), owned_(owned) { 158 if (NULL != stream_) 159 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 160 } 161 162 StreamState StreamAdapterInterface::GetState() const { 163 return stream_->GetState(); 164 } 165 StreamResult StreamAdapterInterface::Read(void* buffer, 166 size_t buffer_len, 167 size_t* read, 168 int* error) { 169 return stream_->Read(buffer, buffer_len, read, error); 170 } 171 StreamResult StreamAdapterInterface::Write(const void* data, 172 size_t data_len, 173 size_t* written, 174 int* error) { 175 return stream_->Write(data, data_len, written, error); 176 } 177 void StreamAdapterInterface::Close() { 178 stream_->Close(); 179 } 180 181 bool StreamAdapterInterface::SetPosition(size_t position) { 182 return stream_->SetPosition(position); 183 } 184 185 bool StreamAdapterInterface::GetPosition(size_t* position) const { 186 return stream_->GetPosition(position); 187 } 188 189 bool StreamAdapterInterface::GetSize(size_t* size) const { 190 return stream_->GetSize(size); 191 } 192 193 bool StreamAdapterInterface::GetAvailable(size_t* size) const { 194 return stream_->GetAvailable(size); 195 } 196 197 bool StreamAdapterInterface::GetWriteRemaining(size_t* size) const { 198 return stream_->GetWriteRemaining(size); 199 } 200 201 bool StreamAdapterInterface::ReserveSize(size_t size) { 202 return stream_->ReserveSize(size); 203 } 204 205 bool StreamAdapterInterface::Flush() { 206 return stream_->Flush(); 207 } 208 209 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 210 if (NULL != stream_) 211 stream_->SignalEvent.disconnect(this); 212 if (owned_) 213 delete stream_; 214 stream_ = stream; 215 owned_ = owned; 216 if (NULL != stream_) 217 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 218 } 219 220 StreamInterface* StreamAdapterInterface::Detach() { 221 if (NULL != stream_) 222 stream_->SignalEvent.disconnect(this); 223 StreamInterface* stream = stream_; 224 stream_ = NULL; 225 return stream; 226 } 227 228 StreamAdapterInterface::~StreamAdapterInterface() { 229 if (owned_) 230 delete stream_; 231 } 232 233 void StreamAdapterInterface::OnEvent(StreamInterface* stream, 234 int events, 235 int err) { 236 SignalEvent(this, events, err); 237 } 238 239 /////////////////////////////////////////////////////////////////////////////// 240 // StreamTap 241 /////////////////////////////////////////////////////////////////////////////// 242 243 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 244 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS), 245 tap_error_(0) { 246 AttachTap(tap); 247 } 248 249 StreamTap::~StreamTap() = default; 250 251 void StreamTap::AttachTap(StreamInterface* tap) { 252 tap_.reset(tap); 253 } 254 255 StreamInterface* StreamTap::DetachTap() { 256 return tap_.release(); 257 } 258 259 StreamResult StreamTap::GetTapResult(int* error) { 260 if (error) { 261 *error = tap_error_; 262 } 263 return tap_result_; 264 } 265 266 StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 267 size_t* read, int* error) { 268 size_t backup_read; 269 if (!read) { 270 read = &backup_read; 271 } 272 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 273 read, error); 274 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 275 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 276 } 277 return res; 278 } 279 280 StreamResult StreamTap::Write(const void* data, size_t data_len, 281 size_t* written, int* error) { 282 size_t backup_written; 283 if (!written) { 284 written = &backup_written; 285 } 286 StreamResult res = StreamAdapterInterface::Write(data, data_len, 287 written, error); 288 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 289 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 290 } 291 return res; 292 } 293 294 /////////////////////////////////////////////////////////////////////////////// 295 // NullStream 296 /////////////////////////////////////////////////////////////////////////////// 297 298 NullStream::NullStream() { 299 } 300 301 NullStream::~NullStream() { 302 } 303 304 StreamState NullStream::GetState() const { 305 return SS_OPEN; 306 } 307 308 StreamResult NullStream::Read(void* buffer, size_t buffer_len, 309 size_t* read, int* error) { 310 if (error) *error = -1; 311 return SR_ERROR; 312 } 313 314 StreamResult NullStream::Write(const void* data, size_t data_len, 315 size_t* written, int* error) { 316 if (written) *written = data_len; 317 return SR_SUCCESS; 318 } 319 320 void NullStream::Close() { 321 } 322 323 /////////////////////////////////////////////////////////////////////////////// 324 // FileStream 325 /////////////////////////////////////////////////////////////////////////////// 326 327 FileStream::FileStream() : file_(NULL) { 328 } 329 330 FileStream::~FileStream() { 331 FileStream::Close(); 332 } 333 334 bool FileStream::Open(const std::string& filename, const char* mode, 335 int* error) { 336 Close(); 337 #if defined(WEBRTC_WIN) 338 std::wstring wfilename; 339 if (Utf8ToWindowsFilename(filename, &wfilename)) { 340 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 341 } else { 342 if (error) { 343 *error = -1; 344 return false; 345 } 346 } 347 #else 348 file_ = fopen(filename.c_str(), mode); 349 #endif 350 if (!file_ && error) { 351 *error = errno; 352 } 353 return (file_ != NULL); 354 } 355 356 bool FileStream::OpenShare(const std::string& filename, const char* mode, 357 int shflag, int* error) { 358 Close(); 359 #if defined(WEBRTC_WIN) 360 std::wstring wfilename; 361 if (Utf8ToWindowsFilename(filename, &wfilename)) { 362 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 363 if (!file_ && error) { 364 *error = errno; 365 return false; 366 } 367 return file_ != NULL; 368 } else { 369 if (error) { 370 *error = -1; 371 } 372 return false; 373 } 374 #else 375 return Open(filename, mode, error); 376 #endif 377 } 378 379 bool FileStream::DisableBuffering() { 380 if (!file_) 381 return false; 382 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 383 } 384 385 StreamState FileStream::GetState() const { 386 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 387 } 388 389 StreamResult FileStream::Read(void* buffer, size_t buffer_len, 390 size_t* read, int* error) { 391 if (!file_) 392 return SR_EOS; 393 size_t result = fread(buffer, 1, buffer_len, file_); 394 if ((result == 0) && (buffer_len > 0)) { 395 if (feof(file_)) 396 return SR_EOS; 397 if (error) 398 *error = errno; 399 return SR_ERROR; 400 } 401 if (read) 402 *read = result; 403 return SR_SUCCESS; 404 } 405 406 StreamResult FileStream::Write(const void* data, size_t data_len, 407 size_t* written, int* error) { 408 if (!file_) 409 return SR_EOS; 410 size_t result = fwrite(data, 1, data_len, file_); 411 if ((result == 0) && (data_len > 0)) { 412 if (error) 413 *error = errno; 414 return SR_ERROR; 415 } 416 if (written) 417 *written = result; 418 return SR_SUCCESS; 419 } 420 421 void FileStream::Close() { 422 if (file_) { 423 DoClose(); 424 file_ = NULL; 425 } 426 } 427 428 bool FileStream::SetPosition(size_t position) { 429 if (!file_) 430 return false; 431 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); 432 } 433 434 bool FileStream::GetPosition(size_t* position) const { 435 ASSERT(NULL != position); 436 if (!file_) 437 return false; 438 long result = ftell(file_); 439 if (result < 0) 440 return false; 441 if (position) 442 *position = result; 443 return true; 444 } 445 446 bool FileStream::GetSize(size_t* size) const { 447 ASSERT(NULL != size); 448 if (!file_) 449 return false; 450 struct stat file_stats; 451 if (fstat(fileno(file_), &file_stats) != 0) 452 return false; 453 if (size) 454 *size = file_stats.st_size; 455 return true; 456 } 457 458 bool FileStream::GetAvailable(size_t* size) const { 459 ASSERT(NULL != size); 460 if (!GetSize(size)) 461 return false; 462 long result = ftell(file_); 463 if (result < 0) 464 return false; 465 if (size) 466 *size -= result; 467 return true; 468 } 469 470 bool FileStream::ReserveSize(size_t size) { 471 // TODO: extend the file to the proper length 472 return true; 473 } 474 475 bool FileStream::GetSize(const std::string& filename, size_t* size) { 476 struct stat file_stats; 477 if (stat(filename.c_str(), &file_stats) != 0) 478 return false; 479 *size = file_stats.st_size; 480 return true; 481 } 482 483 bool FileStream::Flush() { 484 if (file_) { 485 return (0 == fflush(file_)); 486 } 487 // try to flush empty file? 488 ASSERT(false); 489 return false; 490 } 491 492 #if defined(WEBRTC_POSIX) && !defined(__native_client__) 493 494 bool FileStream::TryLock() { 495 if (file_ == NULL) { 496 // Stream not open. 497 ASSERT(false); 498 return false; 499 } 500 501 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 502 } 503 504 bool FileStream::Unlock() { 505 if (file_ == NULL) { 506 // Stream not open. 507 ASSERT(false); 508 return false; 509 } 510 511 return flock(fileno(file_), LOCK_UN) == 0; 512 } 513 514 #endif 515 516 void FileStream::DoClose() { 517 fclose(file_); 518 } 519 520 /////////////////////////////////////////////////////////////////////////////// 521 // MemoryStream 522 /////////////////////////////////////////////////////////////////////////////// 523 524 MemoryStreamBase::MemoryStreamBase() 525 : buffer_(NULL), buffer_length_(0), data_length_(0), 526 seek_position_(0) { 527 } 528 529 StreamState MemoryStreamBase::GetState() const { 530 return SS_OPEN; 531 } 532 533 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 534 size_t* bytes_read, int* error) { 535 if (seek_position_ >= data_length_) { 536 return SR_EOS; 537 } 538 size_t available = data_length_ - seek_position_; 539 if (bytes > available) { 540 // Read partial buffer 541 bytes = available; 542 } 543 memcpy(buffer, &buffer_[seek_position_], bytes); 544 seek_position_ += bytes; 545 if (bytes_read) { 546 *bytes_read = bytes; 547 } 548 return SR_SUCCESS; 549 } 550 551 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 552 size_t* bytes_written, int* error) { 553 size_t available = buffer_length_ - seek_position_; 554 if (0 == available) { 555 // Increase buffer size to the larger of: 556 // a) new position rounded up to next 256 bytes 557 // b) double the previous length 558 size_t new_buffer_length = 559 std::max(((seek_position_ + bytes) | 0xFF) + 1, buffer_length_ * 2); 560 StreamResult result = DoReserve(new_buffer_length, error); 561 if (SR_SUCCESS != result) { 562 return result; 563 } 564 ASSERT(buffer_length_ >= new_buffer_length); 565 available = buffer_length_ - seek_position_; 566 } 567 568 if (bytes > available) { 569 bytes = available; 570 } 571 memcpy(&buffer_[seek_position_], buffer, bytes); 572 seek_position_ += bytes; 573 if (data_length_ < seek_position_) { 574 data_length_ = seek_position_; 575 } 576 if (bytes_written) { 577 *bytes_written = bytes; 578 } 579 return SR_SUCCESS; 580 } 581 582 void MemoryStreamBase::Close() { 583 // nothing to do 584 } 585 586 bool MemoryStreamBase::SetPosition(size_t position) { 587 if (position > data_length_) 588 return false; 589 seek_position_ = position; 590 return true; 591 } 592 593 bool MemoryStreamBase::GetPosition(size_t* position) const { 594 if (position) 595 *position = seek_position_; 596 return true; 597 } 598 599 bool MemoryStreamBase::GetSize(size_t* size) const { 600 if (size) 601 *size = data_length_; 602 return true; 603 } 604 605 bool MemoryStreamBase::GetAvailable(size_t* size) const { 606 if (size) 607 *size = data_length_ - seek_position_; 608 return true; 609 } 610 611 bool MemoryStreamBase::ReserveSize(size_t size) { 612 return (SR_SUCCESS == DoReserve(size, NULL)); 613 } 614 615 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 616 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 617 } 618 619 /////////////////////////////////////////////////////////////////////////////// 620 621 MemoryStream::MemoryStream() 622 : buffer_alloc_(NULL) { 623 } 624 625 MemoryStream::MemoryStream(const char* data) 626 : buffer_alloc_(NULL) { 627 SetData(data, strlen(data)); 628 } 629 630 MemoryStream::MemoryStream(const void* data, size_t length) 631 : buffer_alloc_(NULL) { 632 SetData(data, length); 633 } 634 635 MemoryStream::~MemoryStream() { 636 delete [] buffer_alloc_; 637 } 638 639 void MemoryStream::SetData(const void* data, size_t length) { 640 data_length_ = buffer_length_ = length; 641 delete [] buffer_alloc_; 642 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 643 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 644 memcpy(buffer_, data, data_length_); 645 seek_position_ = 0; 646 } 647 648 StreamResult MemoryStream::DoReserve(size_t size, int* error) { 649 if (buffer_length_ >= size) 650 return SR_SUCCESS; 651 652 if (char* new_buffer_alloc = new char[size + kAlignment]) { 653 char* new_buffer = reinterpret_cast<char*>( 654 ALIGNP(new_buffer_alloc, kAlignment)); 655 memcpy(new_buffer, buffer_, data_length_); 656 delete [] buffer_alloc_; 657 buffer_alloc_ = new_buffer_alloc; 658 buffer_ = new_buffer; 659 buffer_length_ = size; 660 return SR_SUCCESS; 661 } 662 663 if (error) { 664 *error = ENOMEM; 665 } 666 return SR_ERROR; 667 } 668 669 /////////////////////////////////////////////////////////////////////////////// 670 671 ExternalMemoryStream::ExternalMemoryStream() { 672 } 673 674 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 675 SetData(data, length); 676 } 677 678 ExternalMemoryStream::~ExternalMemoryStream() { 679 } 680 681 void ExternalMemoryStream::SetData(void* data, size_t length) { 682 data_length_ = buffer_length_ = length; 683 buffer_ = static_cast<char*>(data); 684 seek_position_ = 0; 685 } 686 687 /////////////////////////////////////////////////////////////////////////////// 688 // FifoBuffer 689 /////////////////////////////////////////////////////////////////////////////// 690 691 FifoBuffer::FifoBuffer(size_t size) 692 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 693 data_length_(0), read_position_(0), owner_(Thread::Current()) { 694 // all events are done on the owner_ thread 695 } 696 697 FifoBuffer::FifoBuffer(size_t size, Thread* owner) 698 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 699 data_length_(0), read_position_(0), owner_(owner) { 700 // all events are done on the owner_ thread 701 } 702 703 FifoBuffer::~FifoBuffer() { 704 } 705 706 bool FifoBuffer::GetBuffered(size_t* size) const { 707 CritScope cs(&crit_); 708 *size = data_length_; 709 return true; 710 } 711 712 bool FifoBuffer::SetCapacity(size_t size) { 713 CritScope cs(&crit_); 714 if (data_length_ > size) { 715 return false; 716 } 717 718 if (size != buffer_length_) { 719 char* buffer = new char[size]; 720 const size_t copy = data_length_; 721 const size_t tail_copy = std::min(copy, buffer_length_ - read_position_); 722 memcpy(buffer, &buffer_[read_position_], tail_copy); 723 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 724 buffer_.reset(buffer); 725 read_position_ = 0; 726 buffer_length_ = size; 727 } 728 return true; 729 } 730 731 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, 732 size_t offset, size_t* bytes_read) { 733 CritScope cs(&crit_); 734 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); 735 } 736 737 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, 738 size_t offset, size_t* bytes_written) { 739 CritScope cs(&crit_); 740 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); 741 } 742 743 StreamState FifoBuffer::GetState() const { 744 return state_; 745 } 746 747 StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 748 size_t* bytes_read, int* error) { 749 CritScope cs(&crit_); 750 const bool was_writable = data_length_ < buffer_length_; 751 size_t copy = 0; 752 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); 753 754 if (result == SR_SUCCESS) { 755 // If read was successful then adjust the read position and number of 756 // bytes buffered. 757 read_position_ = (read_position_ + copy) % buffer_length_; 758 data_length_ -= copy; 759 if (bytes_read) { 760 *bytes_read = copy; 761 } 762 763 // if we were full before, and now we're not, post an event 764 if (!was_writable && copy > 0) { 765 PostEvent(owner_, SE_WRITE, 0); 766 } 767 } 768 return result; 769 } 770 771 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 772 size_t* bytes_written, int* error) { 773 CritScope cs(&crit_); 774 775 const bool was_readable = (data_length_ > 0); 776 size_t copy = 0; 777 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); 778 779 if (result == SR_SUCCESS) { 780 // If write was successful then adjust the number of readable bytes. 781 data_length_ += copy; 782 if (bytes_written) { 783 *bytes_written = copy; 784 } 785 786 // if we didn't have any data to read before, and now we do, post an event 787 if (!was_readable && copy > 0) { 788 PostEvent(owner_, SE_READ, 0); 789 } 790 } 791 return result; 792 } 793 794 void FifoBuffer::Close() { 795 CritScope cs(&crit_); 796 state_ = SS_CLOSED; 797 } 798 799 const void* FifoBuffer::GetReadData(size_t* size) { 800 CritScope cs(&crit_); 801 *size = (read_position_ + data_length_ <= buffer_length_) ? 802 data_length_ : buffer_length_ - read_position_; 803 return &buffer_[read_position_]; 804 } 805 806 void FifoBuffer::ConsumeReadData(size_t size) { 807 CritScope cs(&crit_); 808 ASSERT(size <= data_length_); 809 const bool was_writable = data_length_ < buffer_length_; 810 read_position_ = (read_position_ + size) % buffer_length_; 811 data_length_ -= size; 812 if (!was_writable && size > 0) { 813 PostEvent(owner_, SE_WRITE, 0); 814 } 815 } 816 817 void* FifoBuffer::GetWriteBuffer(size_t* size) { 818 CritScope cs(&crit_); 819 if (state_ == SS_CLOSED) { 820 return NULL; 821 } 822 823 // if empty, reset the write position to the beginning, so we can get 824 // the biggest possible block 825 if (data_length_ == 0) { 826 read_position_ = 0; 827 } 828 829 const size_t write_position = (read_position_ + data_length_) 830 % buffer_length_; 831 *size = (write_position > read_position_ || data_length_ == 0) ? 832 buffer_length_ - write_position : read_position_ - write_position; 833 return &buffer_[write_position]; 834 } 835 836 void FifoBuffer::ConsumeWriteBuffer(size_t size) { 837 CritScope cs(&crit_); 838 ASSERT(size <= buffer_length_ - data_length_); 839 const bool was_readable = (data_length_ > 0); 840 data_length_ += size; 841 if (!was_readable && size > 0) { 842 PostEvent(owner_, SE_READ, 0); 843 } 844 } 845 846 bool FifoBuffer::GetWriteRemaining(size_t* size) const { 847 CritScope cs(&crit_); 848 *size = buffer_length_ - data_length_; 849 return true; 850 } 851 852 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, 853 size_t bytes, 854 size_t offset, 855 size_t* bytes_read) { 856 if (offset >= data_length_) { 857 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 858 } 859 860 const size_t available = data_length_ - offset; 861 const size_t read_position = (read_position_ + offset) % buffer_length_; 862 const size_t copy = std::min(bytes, available); 863 const size_t tail_copy = std::min(copy, buffer_length_ - read_position); 864 char* const p = static_cast<char*>(buffer); 865 memcpy(p, &buffer_[read_position], tail_copy); 866 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 867 868 if (bytes_read) { 869 *bytes_read = copy; 870 } 871 return SR_SUCCESS; 872 } 873 874 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, 875 size_t bytes, 876 size_t offset, 877 size_t* bytes_written) { 878 if (state_ == SS_CLOSED) { 879 return SR_EOS; 880 } 881 882 if (data_length_ + offset >= buffer_length_) { 883 return SR_BLOCK; 884 } 885 886 const size_t available = buffer_length_ - data_length_ - offset; 887 const size_t write_position = (read_position_ + data_length_ + offset) 888 % buffer_length_; 889 const size_t copy = std::min(bytes, available); 890 const size_t tail_copy = std::min(copy, buffer_length_ - write_position); 891 const char* const p = static_cast<const char*>(buffer); 892 memcpy(&buffer_[write_position], p, tail_copy); 893 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 894 895 if (bytes_written) { 896 *bytes_written = copy; 897 } 898 return SR_SUCCESS; 899 } 900 901 902 903 /////////////////////////////////////////////////////////////////////////////// 904 // LoggingAdapter 905 /////////////////////////////////////////////////////////////////////////////// 906 907 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 908 const std::string& label, bool hex_mode) 909 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { 910 set_label(label); 911 } 912 913 void LoggingAdapter::set_label(const std::string& label) { 914 label_.assign("["); 915 label_.append(label); 916 label_.append("]"); 917 } 918 919 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 920 size_t* read, int* error) { 921 size_t local_read; if (!read) read = &local_read; 922 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 923 error); 924 if (result == SR_SUCCESS) { 925 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 926 } 927 return result; 928 } 929 930 StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 931 size_t* written, int* error) { 932 size_t local_written; 933 if (!written) written = &local_written; 934 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 935 error); 936 if (result == SR_SUCCESS) { 937 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 938 &lms_); 939 } 940 return result; 941 } 942 943 void LoggingAdapter::Close() { 944 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 945 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 946 LOG_V(level_) << label_ << " Closed locally"; 947 StreamAdapterInterface::Close(); 948 } 949 950 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 951 if (events & SE_OPEN) { 952 LOG_V(level_) << label_ << " Open"; 953 } else if (events & SE_CLOSE) { 954 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 955 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 956 LOG_V(level_) << label_ << " Closed with error: " << err; 957 } 958 StreamAdapterInterface::OnEvent(stream, events, err); 959 } 960 961 /////////////////////////////////////////////////////////////////////////////// 962 // StringStream - Reads/Writes to an external std::string 963 /////////////////////////////////////////////////////////////////////////////// 964 965 StringStream::StringStream(std::string* str) 966 : str_(*str), read_pos_(0), read_only_(false) { 967 } 968 969 StringStream::StringStream(const std::string& str) 970 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { 971 } 972 973 StreamState StringStream::GetState() const { 974 return SS_OPEN; 975 } 976 977 StreamResult StringStream::Read(void* buffer, size_t buffer_len, 978 size_t* read, int* error) { 979 size_t available = std::min(buffer_len, str_.size() - read_pos_); 980 if (!available) 981 return SR_EOS; 982 memcpy(buffer, str_.data() + read_pos_, available); 983 read_pos_ += available; 984 if (read) 985 *read = available; 986 return SR_SUCCESS; 987 } 988 989 StreamResult StringStream::Write(const void* data, size_t data_len, 990 size_t* written, int* error) { 991 if (read_only_) { 992 if (error) { 993 *error = -1; 994 } 995 return SR_ERROR; 996 } 997 str_.append(static_cast<const char*>(data), 998 static_cast<const char*>(data) + data_len); 999 if (written) 1000 *written = data_len; 1001 return SR_SUCCESS; 1002 } 1003 1004 void StringStream::Close() { 1005 } 1006 1007 bool StringStream::SetPosition(size_t position) { 1008 if (position > str_.size()) 1009 return false; 1010 read_pos_ = position; 1011 return true; 1012 } 1013 1014 bool StringStream::GetPosition(size_t* position) const { 1015 if (position) 1016 *position = read_pos_; 1017 return true; 1018 } 1019 1020 bool StringStream::GetSize(size_t* size) const { 1021 if (size) 1022 *size = str_.size(); 1023 return true; 1024 } 1025 1026 bool StringStream::GetAvailable(size_t* size) const { 1027 if (size) 1028 *size = str_.size() - read_pos_; 1029 return true; 1030 } 1031 1032 bool StringStream::ReserveSize(size_t size) { 1033 if (read_only_) 1034 return false; 1035 str_.reserve(size); 1036 return true; 1037 } 1038 1039 /////////////////////////////////////////////////////////////////////////////// 1040 // StreamReference 1041 /////////////////////////////////////////////////////////////////////////////// 1042 1043 StreamReference::StreamReference(StreamInterface* stream) 1044 : StreamAdapterInterface(stream, false) { 1045 // owner set to false so the destructor does not free the stream. 1046 stream_ref_count_ = new StreamRefCount(stream); 1047 } 1048 1049 StreamInterface* StreamReference::NewReference() { 1050 stream_ref_count_->AddReference(); 1051 return new StreamReference(stream_ref_count_, stream()); 1052 } 1053 1054 StreamReference::~StreamReference() { 1055 stream_ref_count_->Release(); 1056 } 1057 1058 StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1059 StreamInterface* stream) 1060 : StreamAdapterInterface(stream, false), 1061 stream_ref_count_(stream_ref_count) { 1062 } 1063 1064 /////////////////////////////////////////////////////////////////////////////// 1065 1066 StreamResult Flow(StreamInterface* source, 1067 char* buffer, size_t buffer_len, 1068 StreamInterface* sink, 1069 size_t* data_len /* = NULL */) { 1070 ASSERT(buffer_len > 0); 1071 1072 StreamResult result; 1073 size_t count, read_pos, write_pos; 1074 if (data_len) { 1075 read_pos = *data_len; 1076 } else { 1077 read_pos = 0; 1078 } 1079 1080 bool end_of_stream = false; 1081 do { 1082 // Read until buffer is full, end of stream, or error 1083 while (!end_of_stream && (read_pos < buffer_len)) { 1084 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1085 &count, NULL); 1086 if (result == SR_EOS) { 1087 end_of_stream = true; 1088 } else if (result != SR_SUCCESS) { 1089 if (data_len) { 1090 *data_len = read_pos; 1091 } 1092 return result; 1093 } else { 1094 read_pos += count; 1095 } 1096 } 1097 1098 // Write until buffer is empty, or error (including end of stream) 1099 write_pos = 0; 1100 while (write_pos < read_pos) { 1101 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1102 &count, NULL); 1103 if (result != SR_SUCCESS) { 1104 if (data_len) { 1105 *data_len = read_pos - write_pos; 1106 if (write_pos > 0) { 1107 memmove(buffer, buffer + write_pos, *data_len); 1108 } 1109 } 1110 return result; 1111 } 1112 write_pos += count; 1113 } 1114 1115 read_pos = 0; 1116 } while (!end_of_stream); 1117 1118 if (data_len) { 1119 *data_len = 0; 1120 } 1121 return SR_SUCCESS; 1122 } 1123 1124 /////////////////////////////////////////////////////////////////////////////// 1125 1126 } // namespace rtc 1127