1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 #if defined(WEBRTC_POSIX) 12 #include <sys/file.h> 13 #endif // WEBRTC_POSIX 14 #include <sys/types.h> 15 #include <sys/stat.h> 16 #include <errno.h> 17 #include <string> 18 #include "webrtc/base/basictypes.h" 19 #include "webrtc/base/common.h" 20 #include "webrtc/base/logging.h" 21 #include "webrtc/base/messagequeue.h" 22 #include "webrtc/base/stream.h" 23 #include "webrtc/base/stringencode.h" 24 #include "webrtc/base/stringutils.h" 25 #include "webrtc/base/thread.h" 26 #include "webrtc/base/timeutils.h" 27 28 #if defined(WEBRTC_WIN) 29 #include "webrtc/base/win32.h" 30 #define fileno _fileno 31 #endif 32 33 namespace rtc { 34 35 /////////////////////////////////////////////////////////////////////////////// 36 // StreamInterface 37 /////////////////////////////////////////////////////////////////////////////// 38 StreamInterface::~StreamInterface() { 39 } 40 41 StreamResult StreamInterface::WriteAll(const void* data, size_t data_len, 42 size_t* written, int* error) { 43 StreamResult result = SR_SUCCESS; 44 size_t total_written = 0, current_written; 45 while (total_written < data_len) { 46 result = Write(static_cast<const char*>(data) + total_written, 47 data_len - total_written, ¤t_written, error); 48 if (result != SR_SUCCESS) 49 break; 50 total_written += current_written; 51 } 52 if (written) 53 *written = total_written; 54 return result; 55 } 56 57 StreamResult StreamInterface::ReadAll(void* buffer, size_t buffer_len, 58 size_t* read, int* error) { 59 StreamResult result = SR_SUCCESS; 60 size_t total_read = 0, current_read; 61 while (total_read < buffer_len) { 62 result = Read(static_cast<char*>(buffer) + total_read, 63 buffer_len - total_read, ¤t_read, error); 64 if (result != SR_SUCCESS) 65 break; 66 total_read += current_read; 67 } 68 if (read) 69 *read = total_read; 70 return result; 71 } 72 73 StreamResult StreamInterface::ReadLine(std::string* line) { 74 line->clear(); 75 StreamResult result = SR_SUCCESS; 76 while (true) { 77 char ch; 78 result = Read(&ch, sizeof(ch), NULL, NULL); 79 if (result != SR_SUCCESS) { 80 break; 81 } 82 if (ch == '\n') { 83 break; 84 } 85 line->push_back(ch); 86 } 87 if (!line->empty()) { // give back the line we've collected so far with 88 result = SR_SUCCESS; // a success code. Otherwise return the last code 89 } 90 return result; 91 } 92 93 void StreamInterface::PostEvent(Thread* t, int events, int err) { 94 t->Post(this, MSG_POST_EVENT, new StreamEventData(events, err)); 95 } 96 97 void StreamInterface::PostEvent(int events, int err) { 98 PostEvent(Thread::Current(), events, err); 99 } 100 101 StreamInterface::StreamInterface() { 102 } 103 104 void StreamInterface::OnMessage(Message* msg) { 105 if (MSG_POST_EVENT == msg->message_id) { 106 StreamEventData* pe = static_cast<StreamEventData*>(msg->pdata); 107 SignalEvent(this, pe->events, pe->error); 108 delete msg->pdata; 109 } 110 } 111 112 /////////////////////////////////////////////////////////////////////////////// 113 // StreamAdapterInterface 114 /////////////////////////////////////////////////////////////////////////////// 115 116 StreamAdapterInterface::StreamAdapterInterface(StreamInterface* stream, 117 bool owned) 118 : stream_(stream), owned_(owned) { 119 if (NULL != stream_) 120 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 121 } 122 123 void StreamAdapterInterface::Attach(StreamInterface* stream, bool owned) { 124 if (NULL != stream_) 125 stream_->SignalEvent.disconnect(this); 126 if (owned_) 127 delete stream_; 128 stream_ = stream; 129 owned_ = owned; 130 if (NULL != stream_) 131 stream_->SignalEvent.connect(this, &StreamAdapterInterface::OnEvent); 132 } 133 134 StreamInterface* StreamAdapterInterface::Detach() { 135 if (NULL != stream_) 136 stream_->SignalEvent.disconnect(this); 137 StreamInterface* stream = stream_; 138 stream_ = NULL; 139 return stream; 140 } 141 142 StreamAdapterInterface::~StreamAdapterInterface() { 143 if (owned_) 144 delete stream_; 145 } 146 147 /////////////////////////////////////////////////////////////////////////////// 148 // StreamTap 149 /////////////////////////////////////////////////////////////////////////////// 150 151 StreamTap::StreamTap(StreamInterface* stream, StreamInterface* tap) 152 : StreamAdapterInterface(stream), tap_(), tap_result_(SR_SUCCESS), 153 tap_error_(0) { 154 AttachTap(tap); 155 } 156 157 void StreamTap::AttachTap(StreamInterface* tap) { 158 tap_.reset(tap); 159 } 160 161 StreamInterface* StreamTap::DetachTap() { 162 return tap_.release(); 163 } 164 165 StreamResult StreamTap::GetTapResult(int* error) { 166 if (error) { 167 *error = tap_error_; 168 } 169 return tap_result_; 170 } 171 172 StreamResult StreamTap::Read(void* buffer, size_t buffer_len, 173 size_t* read, int* error) { 174 size_t backup_read; 175 if (!read) { 176 read = &backup_read; 177 } 178 StreamResult res = StreamAdapterInterface::Read(buffer, buffer_len, 179 read, error); 180 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 181 tap_result_ = tap_->WriteAll(buffer, *read, NULL, &tap_error_); 182 } 183 return res; 184 } 185 186 StreamResult StreamTap::Write(const void* data, size_t data_len, 187 size_t* written, int* error) { 188 size_t backup_written; 189 if (!written) { 190 written = &backup_written; 191 } 192 StreamResult res = StreamAdapterInterface::Write(data, data_len, 193 written, error); 194 if ((res == SR_SUCCESS) && (tap_result_ == SR_SUCCESS)) { 195 tap_result_ = tap_->WriteAll(data, *written, NULL, &tap_error_); 196 } 197 return res; 198 } 199 200 /////////////////////////////////////////////////////////////////////////////// 201 // StreamSegment 202 /////////////////////////////////////////////////////////////////////////////// 203 204 StreamSegment::StreamSegment(StreamInterface* stream) 205 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 206 length_(SIZE_UNKNOWN) { 207 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 208 stream->GetPosition(&start_); 209 } 210 211 StreamSegment::StreamSegment(StreamInterface* stream, size_t length) 212 : StreamAdapterInterface(stream), start_(SIZE_UNKNOWN), pos_(0), 213 length_(length) { 214 // It's ok for this to fail, in which case start_ is left as SIZE_UNKNOWN. 215 stream->GetPosition(&start_); 216 } 217 218 StreamResult StreamSegment::Read(void* buffer, size_t buffer_len, 219 size_t* read, int* error) { 220 if (SIZE_UNKNOWN != length_) { 221 if (pos_ >= length_) 222 return SR_EOS; 223 buffer_len = _min(buffer_len, length_ - pos_); 224 } 225 size_t backup_read; 226 if (!read) { 227 read = &backup_read; 228 } 229 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, 230 read, error); 231 if (SR_SUCCESS == result) { 232 pos_ += *read; 233 } 234 return result; 235 } 236 237 bool StreamSegment::SetPosition(size_t position) { 238 if (SIZE_UNKNOWN == start_) 239 return false; // Not seekable 240 if ((SIZE_UNKNOWN != length_) && (position > length_)) 241 return false; // Seek past end of segment 242 if (!StreamAdapterInterface::SetPosition(start_ + position)) 243 return false; 244 pos_ = position; 245 return true; 246 } 247 248 bool StreamSegment::GetPosition(size_t* position) const { 249 if (SIZE_UNKNOWN == start_) 250 return false; // Not seekable 251 if (!StreamAdapterInterface::GetPosition(position)) 252 return false; 253 if (position) { 254 ASSERT(*position >= start_); 255 *position -= start_; 256 } 257 return true; 258 } 259 260 bool StreamSegment::GetSize(size_t* size) const { 261 if (!StreamAdapterInterface::GetSize(size)) 262 return false; 263 if (size) { 264 if (SIZE_UNKNOWN != start_) { 265 ASSERT(*size >= start_); 266 *size -= start_; 267 } 268 if (SIZE_UNKNOWN != length_) { 269 *size = _min(*size, length_); 270 } 271 } 272 return true; 273 } 274 275 bool StreamSegment::GetAvailable(size_t* size) const { 276 if (!StreamAdapterInterface::GetAvailable(size)) 277 return false; 278 if (size && (SIZE_UNKNOWN != length_)) 279 *size = _min(*size, length_ - pos_); 280 return true; 281 } 282 283 /////////////////////////////////////////////////////////////////////////////// 284 // NullStream 285 /////////////////////////////////////////////////////////////////////////////// 286 287 NullStream::NullStream() { 288 } 289 290 NullStream::~NullStream() { 291 } 292 293 StreamState NullStream::GetState() const { 294 return SS_OPEN; 295 } 296 297 StreamResult NullStream::Read(void* buffer, size_t buffer_len, 298 size_t* read, int* error) { 299 if (error) *error = -1; 300 return SR_ERROR; 301 } 302 303 StreamResult NullStream::Write(const void* data, size_t data_len, 304 size_t* written, int* error) { 305 if (written) *written = data_len; 306 return SR_SUCCESS; 307 } 308 309 void NullStream::Close() { 310 } 311 312 /////////////////////////////////////////////////////////////////////////////// 313 // FileStream 314 /////////////////////////////////////////////////////////////////////////////// 315 316 FileStream::FileStream() : file_(NULL) { 317 } 318 319 FileStream::~FileStream() { 320 FileStream::Close(); 321 } 322 323 bool FileStream::Open(const std::string& filename, const char* mode, 324 int* error) { 325 Close(); 326 #if defined(WEBRTC_WIN) 327 std::wstring wfilename; 328 if (Utf8ToWindowsFilename(filename, &wfilename)) { 329 file_ = _wfopen(wfilename.c_str(), ToUtf16(mode).c_str()); 330 } else { 331 if (error) { 332 *error = -1; 333 return false; 334 } 335 } 336 #else 337 file_ = fopen(filename.c_str(), mode); 338 #endif 339 if (!file_ && error) { 340 *error = errno; 341 } 342 return (file_ != NULL); 343 } 344 345 bool FileStream::OpenShare(const std::string& filename, const char* mode, 346 int shflag, int* error) { 347 Close(); 348 #if defined(WEBRTC_WIN) 349 std::wstring wfilename; 350 if (Utf8ToWindowsFilename(filename, &wfilename)) { 351 file_ = _wfsopen(wfilename.c_str(), ToUtf16(mode).c_str(), shflag); 352 if (!file_ && error) { 353 *error = errno; 354 return false; 355 } 356 return file_ != NULL; 357 } else { 358 if (error) { 359 *error = -1; 360 } 361 return false; 362 } 363 #else 364 return Open(filename, mode, error); 365 #endif 366 } 367 368 bool FileStream::DisableBuffering() { 369 if (!file_) 370 return false; 371 return (setvbuf(file_, NULL, _IONBF, 0) == 0); 372 } 373 374 StreamState FileStream::GetState() const { 375 return (file_ == NULL) ? SS_CLOSED : SS_OPEN; 376 } 377 378 StreamResult FileStream::Read(void* buffer, size_t buffer_len, 379 size_t* read, int* error) { 380 if (!file_) 381 return SR_EOS; 382 size_t result = fread(buffer, 1, buffer_len, file_); 383 if ((result == 0) && (buffer_len > 0)) { 384 if (feof(file_)) 385 return SR_EOS; 386 if (error) 387 *error = errno; 388 return SR_ERROR; 389 } 390 if (read) 391 *read = result; 392 return SR_SUCCESS; 393 } 394 395 StreamResult FileStream::Write(const void* data, size_t data_len, 396 size_t* written, int* error) { 397 if (!file_) 398 return SR_EOS; 399 size_t result = fwrite(data, 1, data_len, file_); 400 if ((result == 0) && (data_len > 0)) { 401 if (error) 402 *error = errno; 403 return SR_ERROR; 404 } 405 if (written) 406 *written = result; 407 return SR_SUCCESS; 408 } 409 410 void FileStream::Close() { 411 if (file_) { 412 DoClose(); 413 file_ = NULL; 414 } 415 } 416 417 bool FileStream::SetPosition(size_t position) { 418 if (!file_) 419 return false; 420 return (fseek(file_, static_cast<int>(position), SEEK_SET) == 0); 421 } 422 423 bool FileStream::GetPosition(size_t* position) const { 424 ASSERT(NULL != position); 425 if (!file_) 426 return false; 427 long result = ftell(file_); 428 if (result < 0) 429 return false; 430 if (position) 431 *position = result; 432 return true; 433 } 434 435 bool FileStream::GetSize(size_t* size) const { 436 ASSERT(NULL != size); 437 if (!file_) 438 return false; 439 struct stat file_stats; 440 if (fstat(fileno(file_), &file_stats) != 0) 441 return false; 442 if (size) 443 *size = file_stats.st_size; 444 return true; 445 } 446 447 bool FileStream::GetAvailable(size_t* size) const { 448 ASSERT(NULL != size); 449 if (!GetSize(size)) 450 return false; 451 long result = ftell(file_); 452 if (result < 0) 453 return false; 454 if (size) 455 *size -= result; 456 return true; 457 } 458 459 bool FileStream::ReserveSize(size_t size) { 460 // TODO: extend the file to the proper length 461 return true; 462 } 463 464 bool FileStream::GetSize(const std::string& filename, size_t* size) { 465 struct stat file_stats; 466 if (stat(filename.c_str(), &file_stats) != 0) 467 return false; 468 *size = file_stats.st_size; 469 return true; 470 } 471 472 bool FileStream::Flush() { 473 if (file_) { 474 return (0 == fflush(file_)); 475 } 476 // try to flush empty file? 477 ASSERT(false); 478 return false; 479 } 480 481 #if defined(WEBRTC_POSIX) && !defined(__native_client__) 482 483 bool FileStream::TryLock() { 484 if (file_ == NULL) { 485 // Stream not open. 486 ASSERT(false); 487 return false; 488 } 489 490 return flock(fileno(file_), LOCK_EX|LOCK_NB) == 0; 491 } 492 493 bool FileStream::Unlock() { 494 if (file_ == NULL) { 495 // Stream not open. 496 ASSERT(false); 497 return false; 498 } 499 500 return flock(fileno(file_), LOCK_UN) == 0; 501 } 502 503 #endif 504 505 void FileStream::DoClose() { 506 fclose(file_); 507 } 508 509 CircularFileStream::CircularFileStream(size_t max_size) 510 : max_write_size_(max_size), 511 position_(0), 512 marked_position_(max_size / 2), 513 last_write_position_(0), 514 read_segment_(READ_LATEST), 515 read_segment_available_(0) { 516 } 517 518 bool CircularFileStream::Open( 519 const std::string& filename, const char* mode, int* error) { 520 if (!FileStream::Open(filename.c_str(), mode, error)) 521 return false; 522 523 if (strchr(mode, "r") != NULL) { // Opened in read mode. 524 // Check if the buffer has been overwritten and determine how to read the 525 // log in time sequence. 526 size_t file_size; 527 GetSize(&file_size); 528 if (file_size == position_) { 529 // The buffer has not been overwritten yet. Read 0 .. file_size 530 read_segment_ = READ_LATEST; 531 read_segment_available_ = file_size; 532 } else { 533 // The buffer has been over written. There are three segments: The first 534 // one is 0 .. marked_position_, which is the marked earliest log. The 535 // second one is position_ .. file_size, which is the middle log. The 536 // last one is marked_position_ .. position_, which is the latest log. 537 read_segment_ = READ_MARKED; 538 read_segment_available_ = marked_position_; 539 last_write_position_ = position_; 540 } 541 542 // Read from the beginning. 543 position_ = 0; 544 SetPosition(position_); 545 } 546 547 return true; 548 } 549 550 StreamResult CircularFileStream::Read(void* buffer, size_t buffer_len, 551 size_t* read, int* error) { 552 if (read_segment_available_ == 0) { 553 size_t file_size; 554 switch (read_segment_) { 555 case READ_MARKED: // Finished READ_MARKED and start READ_MIDDLE. 556 read_segment_ = READ_MIDDLE; 557 position_ = last_write_position_; 558 SetPosition(position_); 559 GetSize(&file_size); 560 read_segment_available_ = file_size - position_; 561 break; 562 563 case READ_MIDDLE: // Finished READ_MIDDLE and start READ_LATEST. 564 read_segment_ = READ_LATEST; 565 position_ = marked_position_; 566 SetPosition(position_); 567 read_segment_available_ = last_write_position_ - position_; 568 break; 569 570 default: // Finished READ_LATEST and return EOS. 571 return rtc::SR_EOS; 572 } 573 } 574 575 size_t local_read; 576 if (!read) read = &local_read; 577 578 size_t to_read = rtc::_min(buffer_len, read_segment_available_); 579 rtc::StreamResult result 580 = rtc::FileStream::Read(buffer, to_read, read, error); 581 if (result == rtc::SR_SUCCESS) { 582 read_segment_available_ -= *read; 583 position_ += *read; 584 } 585 return result; 586 } 587 588 StreamResult CircularFileStream::Write(const void* data, size_t data_len, 589 size_t* written, int* error) { 590 if (position_ >= max_write_size_) { 591 ASSERT(position_ == max_write_size_); 592 position_ = marked_position_; 593 SetPosition(position_); 594 } 595 596 size_t local_written; 597 if (!written) written = &local_written; 598 599 size_t to_eof = max_write_size_ - position_; 600 size_t to_write = rtc::_min(data_len, to_eof); 601 rtc::StreamResult result 602 = rtc::FileStream::Write(data, to_write, written, error); 603 if (result == rtc::SR_SUCCESS) { 604 position_ += *written; 605 } 606 return result; 607 } 608 609 AsyncWriteStream::~AsyncWriteStream() { 610 write_thread_->Clear(this, 0, NULL); 611 ClearBufferAndWrite(); 612 613 CritScope cs(&crit_stream_); 614 stream_.reset(); 615 } 616 617 // This is needed by some stream writers, such as RtpDumpWriter. 618 bool AsyncWriteStream::GetPosition(size_t* position) const { 619 CritScope cs(&crit_stream_); 620 return stream_->GetPosition(position); 621 } 622 623 // This is needed by some stream writers, such as the plugin log writers. 624 StreamResult AsyncWriteStream::Read(void* buffer, size_t buffer_len, 625 size_t* read, int* error) { 626 CritScope cs(&crit_stream_); 627 return stream_->Read(buffer, buffer_len, read, error); 628 } 629 630 void AsyncWriteStream::Close() { 631 if (state_ == SS_CLOSED) { 632 return; 633 } 634 635 write_thread_->Clear(this, 0, NULL); 636 ClearBufferAndWrite(); 637 638 CritScope cs(&crit_stream_); 639 stream_->Close(); 640 state_ = SS_CLOSED; 641 } 642 643 StreamResult AsyncWriteStream::Write(const void* data, size_t data_len, 644 size_t* written, int* error) { 645 if (state_ == SS_CLOSED) { 646 return SR_ERROR; 647 } 648 649 size_t previous_buffer_length = 0; 650 { 651 CritScope cs(&crit_buffer_); 652 previous_buffer_length = buffer_.length(); 653 buffer_.AppendData(data, data_len); 654 } 655 656 if (previous_buffer_length == 0) { 657 // If there's stuff already in the buffer, then we already called 658 // Post and the write_thread_ hasn't pulled it out yet, so we 659 // don't need to re-Post. 660 write_thread_->Post(this, 0, NULL); 661 } 662 // Return immediately, assuming that it works. 663 if (written) { 664 *written = data_len; 665 } 666 return SR_SUCCESS; 667 } 668 669 void AsyncWriteStream::OnMessage(rtc::Message* pmsg) { 670 ClearBufferAndWrite(); 671 } 672 673 bool AsyncWriteStream::Flush() { 674 if (state_ == SS_CLOSED) { 675 return false; 676 } 677 678 ClearBufferAndWrite(); 679 680 CritScope cs(&crit_stream_); 681 return stream_->Flush(); 682 } 683 684 void AsyncWriteStream::ClearBufferAndWrite() { 685 Buffer to_write; 686 { 687 CritScope cs_buffer(&crit_buffer_); 688 buffer_.TransferTo(&to_write); 689 } 690 691 if (to_write.length() > 0) { 692 CritScope cs(&crit_stream_); 693 stream_->WriteAll(to_write.data(), to_write.length(), NULL, NULL); 694 } 695 } 696 697 #if defined(WEBRTC_POSIX) && !defined(__native_client__) 698 699 // Have to identically rewrite the FileStream destructor or else it would call 700 // the base class's Close() instead of the sub-class's. 701 POpenStream::~POpenStream() { 702 POpenStream::Close(); 703 } 704 705 bool POpenStream::Open(const std::string& subcommand, 706 const char* mode, 707 int* error) { 708 Close(); 709 file_ = popen(subcommand.c_str(), mode); 710 if (file_ == NULL) { 711 if (error) 712 *error = errno; 713 return false; 714 } 715 return true; 716 } 717 718 bool POpenStream::OpenShare(const std::string& subcommand, const char* mode, 719 int shflag, int* error) { 720 return Open(subcommand, mode, error); 721 } 722 723 void POpenStream::DoClose() { 724 wait_status_ = pclose(file_); 725 } 726 727 #endif 728 729 /////////////////////////////////////////////////////////////////////////////// 730 // MemoryStream 731 /////////////////////////////////////////////////////////////////////////////// 732 733 MemoryStreamBase::MemoryStreamBase() 734 : buffer_(NULL), buffer_length_(0), data_length_(0), 735 seek_position_(0) { 736 } 737 738 StreamState MemoryStreamBase::GetState() const { 739 return SS_OPEN; 740 } 741 742 StreamResult MemoryStreamBase::Read(void* buffer, size_t bytes, 743 size_t* bytes_read, int* error) { 744 if (seek_position_ >= data_length_) { 745 return SR_EOS; 746 } 747 size_t available = data_length_ - seek_position_; 748 if (bytes > available) { 749 // Read partial buffer 750 bytes = available; 751 } 752 memcpy(buffer, &buffer_[seek_position_], bytes); 753 seek_position_ += bytes; 754 if (bytes_read) { 755 *bytes_read = bytes; 756 } 757 return SR_SUCCESS; 758 } 759 760 StreamResult MemoryStreamBase::Write(const void* buffer, size_t bytes, 761 size_t* bytes_written, int* error) { 762 size_t available = buffer_length_ - seek_position_; 763 if (0 == available) { 764 // Increase buffer size to the larger of: 765 // a) new position rounded up to next 256 bytes 766 // b) double the previous length 767 size_t new_buffer_length = _max(((seek_position_ + bytes) | 0xFF) + 1, 768 buffer_length_ * 2); 769 StreamResult result = DoReserve(new_buffer_length, error); 770 if (SR_SUCCESS != result) { 771 return result; 772 } 773 ASSERT(buffer_length_ >= new_buffer_length); 774 available = buffer_length_ - seek_position_; 775 } 776 777 if (bytes > available) { 778 bytes = available; 779 } 780 memcpy(&buffer_[seek_position_], buffer, bytes); 781 seek_position_ += bytes; 782 if (data_length_ < seek_position_) { 783 data_length_ = seek_position_; 784 } 785 if (bytes_written) { 786 *bytes_written = bytes; 787 } 788 return SR_SUCCESS; 789 } 790 791 void MemoryStreamBase::Close() { 792 // nothing to do 793 } 794 795 bool MemoryStreamBase::SetPosition(size_t position) { 796 if (position > data_length_) 797 return false; 798 seek_position_ = position; 799 return true; 800 } 801 802 bool MemoryStreamBase::GetPosition(size_t* position) const { 803 if (position) 804 *position = seek_position_; 805 return true; 806 } 807 808 bool MemoryStreamBase::GetSize(size_t* size) const { 809 if (size) 810 *size = data_length_; 811 return true; 812 } 813 814 bool MemoryStreamBase::GetAvailable(size_t* size) const { 815 if (size) 816 *size = data_length_ - seek_position_; 817 return true; 818 } 819 820 bool MemoryStreamBase::ReserveSize(size_t size) { 821 return (SR_SUCCESS == DoReserve(size, NULL)); 822 } 823 824 StreamResult MemoryStreamBase::DoReserve(size_t size, int* error) { 825 return (buffer_length_ >= size) ? SR_SUCCESS : SR_EOS; 826 } 827 828 /////////////////////////////////////////////////////////////////////////////// 829 830 MemoryStream::MemoryStream() 831 : buffer_alloc_(NULL) { 832 } 833 834 MemoryStream::MemoryStream(const char* data) 835 : buffer_alloc_(NULL) { 836 SetData(data, strlen(data)); 837 } 838 839 MemoryStream::MemoryStream(const void* data, size_t length) 840 : buffer_alloc_(NULL) { 841 SetData(data, length); 842 } 843 844 MemoryStream::~MemoryStream() { 845 delete [] buffer_alloc_; 846 } 847 848 void MemoryStream::SetData(const void* data, size_t length) { 849 data_length_ = buffer_length_ = length; 850 delete [] buffer_alloc_; 851 buffer_alloc_ = new char[buffer_length_ + kAlignment]; 852 buffer_ = reinterpret_cast<char*>(ALIGNP(buffer_alloc_, kAlignment)); 853 memcpy(buffer_, data, data_length_); 854 seek_position_ = 0; 855 } 856 857 StreamResult MemoryStream::DoReserve(size_t size, int* error) { 858 if (buffer_length_ >= size) 859 return SR_SUCCESS; 860 861 if (char* new_buffer_alloc = new char[size + kAlignment]) { 862 char* new_buffer = reinterpret_cast<char*>( 863 ALIGNP(new_buffer_alloc, kAlignment)); 864 memcpy(new_buffer, buffer_, data_length_); 865 delete [] buffer_alloc_; 866 buffer_alloc_ = new_buffer_alloc; 867 buffer_ = new_buffer; 868 buffer_length_ = size; 869 return SR_SUCCESS; 870 } 871 872 if (error) { 873 *error = ENOMEM; 874 } 875 return SR_ERROR; 876 } 877 878 /////////////////////////////////////////////////////////////////////////////// 879 880 ExternalMemoryStream::ExternalMemoryStream() { 881 } 882 883 ExternalMemoryStream::ExternalMemoryStream(void* data, size_t length) { 884 SetData(data, length); 885 } 886 887 ExternalMemoryStream::~ExternalMemoryStream() { 888 } 889 890 void ExternalMemoryStream::SetData(void* data, size_t length) { 891 data_length_ = buffer_length_ = length; 892 buffer_ = static_cast<char*>(data); 893 seek_position_ = 0; 894 } 895 896 /////////////////////////////////////////////////////////////////////////////// 897 // FifoBuffer 898 /////////////////////////////////////////////////////////////////////////////// 899 900 FifoBuffer::FifoBuffer(size_t size) 901 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 902 data_length_(0), read_position_(0), owner_(Thread::Current()) { 903 // all events are done on the owner_ thread 904 } 905 906 FifoBuffer::FifoBuffer(size_t size, Thread* owner) 907 : state_(SS_OPEN), buffer_(new char[size]), buffer_length_(size), 908 data_length_(0), read_position_(0), owner_(owner) { 909 // all events are done on the owner_ thread 910 } 911 912 FifoBuffer::~FifoBuffer() { 913 } 914 915 bool FifoBuffer::GetBuffered(size_t* size) const { 916 CritScope cs(&crit_); 917 *size = data_length_; 918 return true; 919 } 920 921 bool FifoBuffer::SetCapacity(size_t size) { 922 CritScope cs(&crit_); 923 if (data_length_ > size) { 924 return false; 925 } 926 927 if (size != buffer_length_) { 928 char* buffer = new char[size]; 929 const size_t copy = data_length_; 930 const size_t tail_copy = _min(copy, buffer_length_ - read_position_); 931 memcpy(buffer, &buffer_[read_position_], tail_copy); 932 memcpy(buffer + tail_copy, &buffer_[0], copy - tail_copy); 933 buffer_.reset(buffer); 934 read_position_ = 0; 935 buffer_length_ = size; 936 } 937 return true; 938 } 939 940 StreamResult FifoBuffer::ReadOffset(void* buffer, size_t bytes, 941 size_t offset, size_t* bytes_read) { 942 CritScope cs(&crit_); 943 return ReadOffsetLocked(buffer, bytes, offset, bytes_read); 944 } 945 946 StreamResult FifoBuffer::WriteOffset(const void* buffer, size_t bytes, 947 size_t offset, size_t* bytes_written) { 948 CritScope cs(&crit_); 949 return WriteOffsetLocked(buffer, bytes, offset, bytes_written); 950 } 951 952 StreamState FifoBuffer::GetState() const { 953 return state_; 954 } 955 956 StreamResult FifoBuffer::Read(void* buffer, size_t bytes, 957 size_t* bytes_read, int* error) { 958 CritScope cs(&crit_); 959 const bool was_writable = data_length_ < buffer_length_; 960 size_t copy = 0; 961 StreamResult result = ReadOffsetLocked(buffer, bytes, 0, ©); 962 963 if (result == SR_SUCCESS) { 964 // If read was successful then adjust the read position and number of 965 // bytes buffered. 966 read_position_ = (read_position_ + copy) % buffer_length_; 967 data_length_ -= copy; 968 if (bytes_read) { 969 *bytes_read = copy; 970 } 971 972 // if we were full before, and now we're not, post an event 973 if (!was_writable && copy > 0) { 974 PostEvent(owner_, SE_WRITE, 0); 975 } 976 } 977 return result; 978 } 979 980 StreamResult FifoBuffer::Write(const void* buffer, size_t bytes, 981 size_t* bytes_written, int* error) { 982 CritScope cs(&crit_); 983 984 const bool was_readable = (data_length_ > 0); 985 size_t copy = 0; 986 StreamResult result = WriteOffsetLocked(buffer, bytes, 0, ©); 987 988 if (result == SR_SUCCESS) { 989 // If write was successful then adjust the number of readable bytes. 990 data_length_ += copy; 991 if (bytes_written) { 992 *bytes_written = copy; 993 } 994 995 // if we didn't have any data to read before, and now we do, post an event 996 if (!was_readable && copy > 0) { 997 PostEvent(owner_, SE_READ, 0); 998 } 999 } 1000 return result; 1001 } 1002 1003 void FifoBuffer::Close() { 1004 CritScope cs(&crit_); 1005 state_ = SS_CLOSED; 1006 } 1007 1008 const void* FifoBuffer::GetReadData(size_t* size) { 1009 CritScope cs(&crit_); 1010 *size = (read_position_ + data_length_ <= buffer_length_) ? 1011 data_length_ : buffer_length_ - read_position_; 1012 return &buffer_[read_position_]; 1013 } 1014 1015 void FifoBuffer::ConsumeReadData(size_t size) { 1016 CritScope cs(&crit_); 1017 ASSERT(size <= data_length_); 1018 const bool was_writable = data_length_ < buffer_length_; 1019 read_position_ = (read_position_ + size) % buffer_length_; 1020 data_length_ -= size; 1021 if (!was_writable && size > 0) { 1022 PostEvent(owner_, SE_WRITE, 0); 1023 } 1024 } 1025 1026 void* FifoBuffer::GetWriteBuffer(size_t* size) { 1027 CritScope cs(&crit_); 1028 if (state_ == SS_CLOSED) { 1029 return NULL; 1030 } 1031 1032 // if empty, reset the write position to the beginning, so we can get 1033 // the biggest possible block 1034 if (data_length_ == 0) { 1035 read_position_ = 0; 1036 } 1037 1038 const size_t write_position = (read_position_ + data_length_) 1039 % buffer_length_; 1040 *size = (write_position > read_position_ || data_length_ == 0) ? 1041 buffer_length_ - write_position : read_position_ - write_position; 1042 return &buffer_[write_position]; 1043 } 1044 1045 void FifoBuffer::ConsumeWriteBuffer(size_t size) { 1046 CritScope cs(&crit_); 1047 ASSERT(size <= buffer_length_ - data_length_); 1048 const bool was_readable = (data_length_ > 0); 1049 data_length_ += size; 1050 if (!was_readable && size > 0) { 1051 PostEvent(owner_, SE_READ, 0); 1052 } 1053 } 1054 1055 bool FifoBuffer::GetWriteRemaining(size_t* size) const { 1056 CritScope cs(&crit_); 1057 *size = buffer_length_ - data_length_; 1058 return true; 1059 } 1060 1061 StreamResult FifoBuffer::ReadOffsetLocked(void* buffer, 1062 size_t bytes, 1063 size_t offset, 1064 size_t* bytes_read) { 1065 if (offset >= data_length_) { 1066 return (state_ != SS_CLOSED) ? SR_BLOCK : SR_EOS; 1067 } 1068 1069 const size_t available = data_length_ - offset; 1070 const size_t read_position = (read_position_ + offset) % buffer_length_; 1071 const size_t copy = _min(bytes, available); 1072 const size_t tail_copy = _min(copy, buffer_length_ - read_position); 1073 char* const p = static_cast<char*>(buffer); 1074 memcpy(p, &buffer_[read_position], tail_copy); 1075 memcpy(p + tail_copy, &buffer_[0], copy - tail_copy); 1076 1077 if (bytes_read) { 1078 *bytes_read = copy; 1079 } 1080 return SR_SUCCESS; 1081 } 1082 1083 StreamResult FifoBuffer::WriteOffsetLocked(const void* buffer, 1084 size_t bytes, 1085 size_t offset, 1086 size_t* bytes_written) { 1087 if (state_ == SS_CLOSED) { 1088 return SR_EOS; 1089 } 1090 1091 if (data_length_ + offset >= buffer_length_) { 1092 return SR_BLOCK; 1093 } 1094 1095 const size_t available = buffer_length_ - data_length_ - offset; 1096 const size_t write_position = (read_position_ + data_length_ + offset) 1097 % buffer_length_; 1098 const size_t copy = _min(bytes, available); 1099 const size_t tail_copy = _min(copy, buffer_length_ - write_position); 1100 const char* const p = static_cast<const char*>(buffer); 1101 memcpy(&buffer_[write_position], p, tail_copy); 1102 memcpy(&buffer_[0], p + tail_copy, copy - tail_copy); 1103 1104 if (bytes_written) { 1105 *bytes_written = copy; 1106 } 1107 return SR_SUCCESS; 1108 } 1109 1110 1111 1112 /////////////////////////////////////////////////////////////////////////////// 1113 // LoggingAdapter 1114 /////////////////////////////////////////////////////////////////////////////// 1115 1116 LoggingAdapter::LoggingAdapter(StreamInterface* stream, LoggingSeverity level, 1117 const std::string& label, bool hex_mode) 1118 : StreamAdapterInterface(stream), level_(level), hex_mode_(hex_mode) { 1119 set_label(label); 1120 } 1121 1122 void LoggingAdapter::set_label(const std::string& label) { 1123 label_.assign("["); 1124 label_.append(label); 1125 label_.append("]"); 1126 } 1127 1128 StreamResult LoggingAdapter::Read(void* buffer, size_t buffer_len, 1129 size_t* read, int* error) { 1130 size_t local_read; if (!read) read = &local_read; 1131 StreamResult result = StreamAdapterInterface::Read(buffer, buffer_len, read, 1132 error); 1133 if (result == SR_SUCCESS) { 1134 LogMultiline(level_, label_.c_str(), true, buffer, *read, hex_mode_, &lms_); 1135 } 1136 return result; 1137 } 1138 1139 StreamResult LoggingAdapter::Write(const void* data, size_t data_len, 1140 size_t* written, int* error) { 1141 size_t local_written; 1142 if (!written) written = &local_written; 1143 StreamResult result = StreamAdapterInterface::Write(data, data_len, written, 1144 error); 1145 if (result == SR_SUCCESS) { 1146 LogMultiline(level_, label_.c_str(), false, data, *written, hex_mode_, 1147 &lms_); 1148 } 1149 return result; 1150 } 1151 1152 void LoggingAdapter::Close() { 1153 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1154 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1155 LOG_V(level_) << label_ << " Closed locally"; 1156 StreamAdapterInterface::Close(); 1157 } 1158 1159 void LoggingAdapter::OnEvent(StreamInterface* stream, int events, int err) { 1160 if (events & SE_OPEN) { 1161 LOG_V(level_) << label_ << " Open"; 1162 } else if (events & SE_CLOSE) { 1163 LogMultiline(level_, label_.c_str(), false, NULL, 0, hex_mode_, &lms_); 1164 LogMultiline(level_, label_.c_str(), true, NULL, 0, hex_mode_, &lms_); 1165 LOG_V(level_) << label_ << " Closed with error: " << err; 1166 } 1167 StreamAdapterInterface::OnEvent(stream, events, err); 1168 } 1169 1170 /////////////////////////////////////////////////////////////////////////////// 1171 // StringStream - Reads/Writes to an external std::string 1172 /////////////////////////////////////////////////////////////////////////////// 1173 1174 StringStream::StringStream(std::string& str) 1175 : str_(str), read_pos_(0), read_only_(false) { 1176 } 1177 1178 StringStream::StringStream(const std::string& str) 1179 : str_(const_cast<std::string&>(str)), read_pos_(0), read_only_(true) { 1180 } 1181 1182 StreamState StringStream::GetState() const { 1183 return SS_OPEN; 1184 } 1185 1186 StreamResult StringStream::Read(void* buffer, size_t buffer_len, 1187 size_t* read, int* error) { 1188 size_t available = _min(buffer_len, str_.size() - read_pos_); 1189 if (!available) 1190 return SR_EOS; 1191 memcpy(buffer, str_.data() + read_pos_, available); 1192 read_pos_ += available; 1193 if (read) 1194 *read = available; 1195 return SR_SUCCESS; 1196 } 1197 1198 StreamResult StringStream::Write(const void* data, size_t data_len, 1199 size_t* written, int* error) { 1200 if (read_only_) { 1201 if (error) { 1202 *error = -1; 1203 } 1204 return SR_ERROR; 1205 } 1206 str_.append(static_cast<const char*>(data), 1207 static_cast<const char*>(data) + data_len); 1208 if (written) 1209 *written = data_len; 1210 return SR_SUCCESS; 1211 } 1212 1213 void StringStream::Close() { 1214 } 1215 1216 bool StringStream::SetPosition(size_t position) { 1217 if (position > str_.size()) 1218 return false; 1219 read_pos_ = position; 1220 return true; 1221 } 1222 1223 bool StringStream::GetPosition(size_t* position) const { 1224 if (position) 1225 *position = read_pos_; 1226 return true; 1227 } 1228 1229 bool StringStream::GetSize(size_t* size) const { 1230 if (size) 1231 *size = str_.size(); 1232 return true; 1233 } 1234 1235 bool StringStream::GetAvailable(size_t* size) const { 1236 if (size) 1237 *size = str_.size() - read_pos_; 1238 return true; 1239 } 1240 1241 bool StringStream::ReserveSize(size_t size) { 1242 if (read_only_) 1243 return false; 1244 str_.reserve(size); 1245 return true; 1246 } 1247 1248 /////////////////////////////////////////////////////////////////////////////// 1249 // StreamReference 1250 /////////////////////////////////////////////////////////////////////////////// 1251 1252 StreamReference::StreamReference(StreamInterface* stream) 1253 : StreamAdapterInterface(stream, false) { 1254 // owner set to false so the destructor does not free the stream. 1255 stream_ref_count_ = new StreamRefCount(stream); 1256 } 1257 1258 StreamInterface* StreamReference::NewReference() { 1259 stream_ref_count_->AddReference(); 1260 return new StreamReference(stream_ref_count_, stream()); 1261 } 1262 1263 StreamReference::~StreamReference() { 1264 stream_ref_count_->Release(); 1265 } 1266 1267 StreamReference::StreamReference(StreamRefCount* stream_ref_count, 1268 StreamInterface* stream) 1269 : StreamAdapterInterface(stream, false), 1270 stream_ref_count_(stream_ref_count) { 1271 } 1272 1273 /////////////////////////////////////////////////////////////////////////////// 1274 1275 StreamResult Flow(StreamInterface* source, 1276 char* buffer, size_t buffer_len, 1277 StreamInterface* sink, 1278 size_t* data_len /* = NULL */) { 1279 ASSERT(buffer_len > 0); 1280 1281 StreamResult result; 1282 size_t count, read_pos, write_pos; 1283 if (data_len) { 1284 read_pos = *data_len; 1285 } else { 1286 read_pos = 0; 1287 } 1288 1289 bool end_of_stream = false; 1290 do { 1291 // Read until buffer is full, end of stream, or error 1292 while (!end_of_stream && (read_pos < buffer_len)) { 1293 result = source->Read(buffer + read_pos, buffer_len - read_pos, 1294 &count, NULL); 1295 if (result == SR_EOS) { 1296 end_of_stream = true; 1297 } else if (result != SR_SUCCESS) { 1298 if (data_len) { 1299 *data_len = read_pos; 1300 } 1301 return result; 1302 } else { 1303 read_pos += count; 1304 } 1305 } 1306 1307 // Write until buffer is empty, or error (including end of stream) 1308 write_pos = 0; 1309 while (write_pos < read_pos) { 1310 result = sink->Write(buffer + write_pos, read_pos - write_pos, 1311 &count, NULL); 1312 if (result != SR_SUCCESS) { 1313 if (data_len) { 1314 *data_len = read_pos - write_pos; 1315 if (write_pos > 0) { 1316 memmove(buffer, buffer + write_pos, *data_len); 1317 } 1318 } 1319 return result; 1320 } 1321 write_pos += count; 1322 } 1323 1324 read_pos = 0; 1325 } while (!end_of_stream); 1326 1327 if (data_len) { 1328 *data_len = 0; 1329 } 1330 return SR_SUCCESS; 1331 } 1332 1333 /////////////////////////////////////////////////////////////////////////////// 1334 1335 } // namespace rtc 1336