1 /* 2 * Copyright 2004 The WebRTC Project Authors. All rights reserved. 3 * 4 * Use of this source code is governed by a BSD-style license 5 * that can be found in the LICENSE file in the root of the source 6 * tree. An additional intellectual property rights grant can be found 7 * in the file PATENTS. All contributing project authors may 8 * be found in the AUTHORS file in the root of the source tree. 9 */ 10 11 12 #if defined(WEBRTC_WIN) 13 #include "webrtc/base/win32.h" 14 #else // !WEBRTC_WIN 15 #define SEC_E_CERT_EXPIRED (-2146893016) 16 #endif // !WEBRTC_WIN 17 18 #include "webrtc/base/common.h" 19 #include "webrtc/base/httpbase.h" 20 #include "webrtc/base/logging.h" 21 #include "webrtc/base/socket.h" 22 #include "webrtc/base/stringutils.h" 23 #include "webrtc/base/thread.h" 24 25 namespace rtc { 26 27 ////////////////////////////////////////////////////////////////////// 28 // Helpers 29 ////////////////////////////////////////////////////////////////////// 30 31 bool MatchHeader(const char* str, size_t len, HttpHeader header) { 32 const char* const header_str = ToString(header); 33 const size_t header_len = strlen(header_str); 34 return (len == header_len) && (_strnicmp(str, header_str, header_len) == 0); 35 } 36 37 enum { 38 MSG_READ 39 }; 40 41 ////////////////////////////////////////////////////////////////////// 42 // HttpParser 43 ////////////////////////////////////////////////////////////////////// 44 45 HttpParser::HttpParser() { 46 reset(); 47 } 48 49 HttpParser::~HttpParser() { 50 } 51 52 void 53 HttpParser::reset() { 54 state_ = ST_LEADER; 55 chunked_ = false; 56 data_size_ = SIZE_UNKNOWN; 57 } 58 59 HttpParser::ProcessResult 60 HttpParser::Process(const char* buffer, size_t len, size_t* processed, 61 HttpError* error) { 62 *processed = 0; 63 *error = HE_NONE; 64 65 if (state_ >= ST_COMPLETE) { 66 ASSERT(false); 67 return PR_COMPLETE; 68 } 69 70 while (true) { 71 if (state_ < ST_DATA) { 72 size_t pos = *processed; 73 while ((pos < len) && (buffer[pos] != '\n')) { 74 pos += 1; 75 } 76 if (pos >= len) { 77 break; // don't have a full header 78 } 79 const char* line = buffer + *processed; 80 size_t len = (pos - *processed); 81 *processed = pos + 1; 82 while ((len > 0) && isspace(static_cast<unsigned char>(line[len-1]))) { 83 len -= 1; 84 } 85 ProcessResult result = ProcessLine(line, len, error); 86 LOG(LS_VERBOSE) << "Processed line, result=" << result; 87 88 if (PR_CONTINUE != result) { 89 return result; 90 } 91 } else if (data_size_ == 0) { 92 if (chunked_) { 93 state_ = ST_CHUNKTERM; 94 } else { 95 return PR_COMPLETE; 96 } 97 } else { 98 size_t available = len - *processed; 99 if (available <= 0) { 100 break; // no more data 101 } 102 if ((data_size_ != SIZE_UNKNOWN) && (available > data_size_)) { 103 available = data_size_; 104 } 105 size_t read = 0; 106 ProcessResult result = ProcessData(buffer + *processed, available, read, 107 error); 108 LOG(LS_VERBOSE) << "Processed data, result: " << result << " read: " 109 << read << " err: " << error; 110 111 if (PR_CONTINUE != result) { 112 return result; 113 } 114 *processed += read; 115 if (data_size_ != SIZE_UNKNOWN) { 116 data_size_ -= read; 117 } 118 } 119 } 120 121 return PR_CONTINUE; 122 } 123 124 HttpParser::ProcessResult 125 HttpParser::ProcessLine(const char* line, size_t len, HttpError* error) { 126 LOG_F(LS_VERBOSE) << " state: " << state_ << " line: " 127 << std::string(line, len) << " len: " << len << " err: " 128 << error; 129 130 switch (state_) { 131 case ST_LEADER: 132 state_ = ST_HEADERS; 133 return ProcessLeader(line, len, error); 134 135 case ST_HEADERS: 136 if (len > 0) { 137 const char* value = strchrn(line, len, ':'); 138 if (!value) { 139 *error = HE_PROTOCOL; 140 return PR_COMPLETE; 141 } 142 size_t nlen = (value - line); 143 const char* eol = line + len; 144 do { 145 value += 1; 146 } while ((value < eol) && isspace(static_cast<unsigned char>(*value))); 147 size_t vlen = eol - value; 148 if (MatchHeader(line, nlen, HH_CONTENT_LENGTH)) { 149 // sscanf isn't safe with strings that aren't null-terminated, and there 150 // is no guarantee that |value| is. 151 // Create a local copy that is null-terminated. 152 std::string value_str(value, vlen); 153 unsigned int temp_size; 154 if (sscanf(value_str.c_str(), "%u", &temp_size) != 1) { 155 *error = HE_PROTOCOL; 156 return PR_COMPLETE; 157 } 158 data_size_ = static_cast<size_t>(temp_size); 159 } else if (MatchHeader(line, nlen, HH_TRANSFER_ENCODING)) { 160 if ((vlen == 7) && (_strnicmp(value, "chunked", 7) == 0)) { 161 chunked_ = true; 162 } else if ((vlen == 8) && (_strnicmp(value, "identity", 8) == 0)) { 163 chunked_ = false; 164 } else { 165 *error = HE_PROTOCOL; 166 return PR_COMPLETE; 167 } 168 } 169 return ProcessHeader(line, nlen, value, vlen, error); 170 } else { 171 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA; 172 return ProcessHeaderComplete(chunked_, data_size_, error); 173 } 174 break; 175 176 case ST_CHUNKSIZE: 177 if (len > 0) { 178 char* ptr = NULL; 179 data_size_ = strtoul(line, &ptr, 16); 180 if (ptr != line + len) { 181 *error = HE_PROTOCOL; 182 return PR_COMPLETE; 183 } 184 state_ = (data_size_ == 0) ? ST_TRAILERS : ST_DATA; 185 } else { 186 *error = HE_PROTOCOL; 187 return PR_COMPLETE; 188 } 189 break; 190 191 case ST_CHUNKTERM: 192 if (len > 0) { 193 *error = HE_PROTOCOL; 194 return PR_COMPLETE; 195 } else { 196 state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA; 197 } 198 break; 199 200 case ST_TRAILERS: 201 if (len == 0) { 202 return PR_COMPLETE; 203 } 204 // *error = onHttpRecvTrailer(); 205 break; 206 207 default: 208 ASSERT(false); 209 break; 210 } 211 212 return PR_CONTINUE; 213 } 214 215 bool 216 HttpParser::is_valid_end_of_input() const { 217 return (state_ == ST_DATA) && (data_size_ == SIZE_UNKNOWN); 218 } 219 220 void 221 HttpParser::complete(HttpError error) { 222 if (state_ < ST_COMPLETE) { 223 state_ = ST_COMPLETE; 224 OnComplete(error); 225 } 226 } 227 228 ////////////////////////////////////////////////////////////////////// 229 // HttpBase::DocumentStream 230 ////////////////////////////////////////////////////////////////////// 231 232 class BlockingMemoryStream : public ExternalMemoryStream { 233 public: 234 BlockingMemoryStream(char* buffer, size_t size) 235 : ExternalMemoryStream(buffer, size) { } 236 237 StreamResult DoReserve(size_t size, int* error) override { 238 return (buffer_length_ >= size) ? SR_SUCCESS : SR_BLOCK; 239 } 240 }; 241 242 class HttpBase::DocumentStream : public StreamInterface { 243 public: 244 DocumentStream(HttpBase* base) : base_(base), error_(HE_DEFAULT) { } 245 246 StreamState GetState() const override { 247 if (NULL == base_) 248 return SS_CLOSED; 249 if (HM_RECV == base_->mode_) 250 return SS_OPEN; 251 return SS_OPENING; 252 } 253 254 StreamResult Read(void* buffer, 255 size_t buffer_len, 256 size_t* read, 257 int* error) override { 258 if (!base_) { 259 if (error) *error = error_; 260 return (HE_NONE == error_) ? SR_EOS : SR_ERROR; 261 } 262 263 if (HM_RECV != base_->mode_) { 264 return SR_BLOCK; 265 } 266 267 // DoReceiveLoop writes http document data to the StreamInterface* document 268 // member of HttpData. In this case, we want this data to be written 269 // directly to our buffer. To accomplish this, we wrap our buffer with a 270 // StreamInterface, and replace the existing document with our wrapper. 271 // When the method returns, we restore the old document. Ideally, we would 272 // pass our StreamInterface* to DoReceiveLoop, but due to the callbacks 273 // of HttpParser, we would still need to store the pointer temporarily. 274 scoped_ptr<StreamInterface> 275 stream(new BlockingMemoryStream(reinterpret_cast<char*>(buffer), 276 buffer_len)); 277 278 // Replace the existing document with our wrapped buffer. 279 base_->data_->document.swap(stream); 280 281 // Pump the I/O loop. DoReceiveLoop is guaranteed not to attempt to 282 // complete the I/O process, which means that our wrapper is not in danger 283 // of being deleted. To ensure this, DoReceiveLoop returns true when it 284 // wants complete to be called. We make sure to uninstall our wrapper 285 // before calling complete(). 286 HttpError http_error; 287 bool complete = base_->DoReceiveLoop(&http_error); 288 289 // Reinstall the original output document. 290 base_->data_->document.swap(stream); 291 292 // If we reach the end of the receive stream, we disconnect our stream 293 // adapter from the HttpBase, and further calls to read will either return 294 // EOS or ERROR, appropriately. Finally, we call complete(). 295 StreamResult result = SR_BLOCK; 296 if (complete) { 297 HttpBase* base = Disconnect(http_error); 298 if (error) *error = error_; 299 result = (HE_NONE == error_) ? SR_EOS : SR_ERROR; 300 base->complete(http_error); 301 } 302 303 // Even if we are complete, if some data was read we must return SUCCESS. 304 // Future Reads will return EOS or ERROR based on the error_ variable. 305 size_t position; 306 stream->GetPosition(&position); 307 if (position > 0) { 308 if (read) *read = position; 309 result = SR_SUCCESS; 310 } 311 return result; 312 } 313 314 StreamResult Write(const void* data, 315 size_t data_len, 316 size_t* written, 317 int* error) override { 318 if (error) *error = -1; 319 return SR_ERROR; 320 } 321 322 void Close() override { 323 if (base_) { 324 HttpBase* base = Disconnect(HE_NONE); 325 if (HM_RECV == base->mode_ && base->http_stream_) { 326 // Read I/O could have been stalled on the user of this DocumentStream, 327 // so restart the I/O process now that we've removed ourselves. 328 base->http_stream_->PostEvent(SE_READ, 0); 329 } 330 } 331 } 332 333 bool GetAvailable(size_t* size) const override { 334 if (!base_ || HM_RECV != base_->mode_) 335 return false; 336 size_t data_size = base_->GetDataRemaining(); 337 if (SIZE_UNKNOWN == data_size) 338 return false; 339 if (size) 340 *size = data_size; 341 return true; 342 } 343 344 HttpBase* Disconnect(HttpError error) { 345 ASSERT(NULL != base_); 346 ASSERT(NULL != base_->doc_stream_); 347 HttpBase* base = base_; 348 base_->doc_stream_ = NULL; 349 base_ = NULL; 350 error_ = error; 351 return base; 352 } 353 354 private: 355 HttpBase* base_; 356 HttpError error_; 357 }; 358 359 ////////////////////////////////////////////////////////////////////// 360 // HttpBase 361 ////////////////////////////////////////////////////////////////////// 362 363 HttpBase::HttpBase() : mode_(HM_NONE), data_(NULL), notify_(NULL), 364 http_stream_(NULL), doc_stream_(NULL) { 365 } 366 367 HttpBase::~HttpBase() { 368 ASSERT(HM_NONE == mode_); 369 } 370 371 bool 372 HttpBase::isConnected() const { 373 return (http_stream_ != NULL) && (http_stream_->GetState() == SS_OPEN); 374 } 375 376 bool 377 HttpBase::attach(StreamInterface* stream) { 378 if ((mode_ != HM_NONE) || (http_stream_ != NULL) || (stream == NULL)) { 379 ASSERT(false); 380 return false; 381 } 382 http_stream_ = stream; 383 http_stream_->SignalEvent.connect(this, &HttpBase::OnHttpStreamEvent); 384 mode_ = (http_stream_->GetState() == SS_OPENING) ? HM_CONNECT : HM_NONE; 385 return true; 386 } 387 388 StreamInterface* 389 HttpBase::detach() { 390 ASSERT(HM_NONE == mode_); 391 if (mode_ != HM_NONE) { 392 return NULL; 393 } 394 StreamInterface* stream = http_stream_; 395 http_stream_ = NULL; 396 if (stream) { 397 stream->SignalEvent.disconnect(this); 398 } 399 return stream; 400 } 401 402 void 403 HttpBase::send(HttpData* data) { 404 ASSERT(HM_NONE == mode_); 405 if (mode_ != HM_NONE) { 406 return; 407 } else if (!isConnected()) { 408 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED); 409 return; 410 } 411 412 mode_ = HM_SEND; 413 data_ = data; 414 len_ = 0; 415 ignore_data_ = chunk_data_ = false; 416 417 if (data_->document) { 418 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent); 419 } 420 421 std::string encoding; 422 if (data_->hasHeader(HH_TRANSFER_ENCODING, &encoding) 423 && (encoding == "chunked")) { 424 chunk_data_ = true; 425 } 426 427 len_ = data_->formatLeader(buffer_, sizeof(buffer_)); 428 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n"); 429 430 header_ = data_->begin(); 431 if (header_ == data_->end()) { 432 // We must call this at least once, in the case where there are no headers. 433 queue_headers(); 434 } 435 436 flush_data(); 437 } 438 439 void 440 HttpBase::recv(HttpData* data) { 441 ASSERT(HM_NONE == mode_); 442 if (mode_ != HM_NONE) { 443 return; 444 } else if (!isConnected()) { 445 OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED); 446 return; 447 } 448 449 mode_ = HM_RECV; 450 data_ = data; 451 len_ = 0; 452 ignore_data_ = chunk_data_ = false; 453 454 reset(); 455 if (doc_stream_) { 456 doc_stream_->SignalEvent(doc_stream_, SE_OPEN | SE_READ, 0); 457 } else { 458 read_and_process_data(); 459 } 460 } 461 462 void 463 HttpBase::abort(HttpError err) { 464 if (mode_ != HM_NONE) { 465 if (http_stream_ != NULL) { 466 http_stream_->Close(); 467 } 468 do_complete(err); 469 } 470 } 471 472 StreamInterface* HttpBase::GetDocumentStream() { 473 if (doc_stream_) 474 return NULL; 475 doc_stream_ = new DocumentStream(this); 476 return doc_stream_; 477 } 478 479 HttpError HttpBase::HandleStreamClose(int error) { 480 if (http_stream_ != NULL) { 481 http_stream_->Close(); 482 } 483 if (error == 0) { 484 if ((mode_ == HM_RECV) && is_valid_end_of_input()) { 485 return HE_NONE; 486 } else { 487 return HE_DISCONNECTED; 488 } 489 } else if (error == SOCKET_EACCES) { 490 return HE_AUTH; 491 } else if (error == SEC_E_CERT_EXPIRED) { 492 return HE_CERTIFICATE_EXPIRED; 493 } 494 LOG_F(LS_ERROR) << "(" << error << ")"; 495 return (HM_CONNECT == mode_) ? HE_CONNECT_FAILED : HE_SOCKET_ERROR; 496 } 497 498 bool HttpBase::DoReceiveLoop(HttpError* error) { 499 ASSERT(HM_RECV == mode_); 500 ASSERT(NULL != error); 501 502 // Do to the latency between receiving read notifications from 503 // pseudotcpchannel, we rely on repeated calls to read in order to acheive 504 // ideal throughput. The number of reads is limited to prevent starving 505 // the caller. 506 507 size_t loop_count = 0; 508 const size_t kMaxReadCount = 20; 509 bool process_requires_more_data = false; 510 do { 511 // The most frequent use of this function is response to new data available 512 // on http_stream_. Therefore, we optimize by attempting to read from the 513 // network first (as opposed to processing existing data first). 514 515 if (len_ < sizeof(buffer_)) { 516 // Attempt to buffer more data. 517 size_t read; 518 int read_error; 519 StreamResult read_result = http_stream_->Read(buffer_ + len_, 520 sizeof(buffer_) - len_, 521 &read, &read_error); 522 switch (read_result) { 523 case SR_SUCCESS: 524 ASSERT(len_ + read <= sizeof(buffer_)); 525 len_ += read; 526 break; 527 case SR_BLOCK: 528 if (process_requires_more_data) { 529 // We're can't make progress until more data is available. 530 return false; 531 } 532 // Attempt to process the data already in our buffer. 533 break; 534 case SR_EOS: 535 // Clean close, with no error. 536 read_error = 0; 537 FALLTHROUGH(); // Fall through to HandleStreamClose. 538 case SR_ERROR: 539 *error = HandleStreamClose(read_error); 540 return true; 541 } 542 } else if (process_requires_more_data) { 543 // We have too much unprocessed data in our buffer. This should only 544 // occur when a single HTTP header is longer than the buffer size (32K). 545 // Anything longer than that is almost certainly an error. 546 *error = HE_OVERFLOW; 547 return true; 548 } 549 550 // Process data in our buffer. Process is not guaranteed to process all 551 // the buffered data. In particular, it will wait until a complete 552 // protocol element (such as http header, or chunk size) is available, 553 // before processing it in its entirety. Also, it is valid and sometimes 554 // necessary to call Process with an empty buffer, since the state machine 555 // may have interrupted state transitions to complete. 556 size_t processed; 557 ProcessResult process_result = Process(buffer_, len_, &processed, 558 error); 559 ASSERT(processed <= len_); 560 len_ -= processed; 561 memmove(buffer_, buffer_ + processed, len_); 562 switch (process_result) { 563 case PR_CONTINUE: 564 // We need more data to make progress. 565 process_requires_more_data = true; 566 break; 567 case PR_BLOCK: 568 // We're stalled on writing the processed data. 569 return false; 570 case PR_COMPLETE: 571 // *error already contains the correct code. 572 return true; 573 } 574 } while (++loop_count <= kMaxReadCount); 575 576 LOG_F(LS_WARNING) << "danger of starvation"; 577 return false; 578 } 579 580 void 581 HttpBase::read_and_process_data() { 582 HttpError error; 583 if (DoReceiveLoop(&error)) { 584 complete(error); 585 } 586 } 587 588 void 589 HttpBase::flush_data() { 590 ASSERT(HM_SEND == mode_); 591 592 // When send_required is true, no more buffering can occur without a network 593 // write. 594 bool send_required = (len_ >= sizeof(buffer_)); 595 596 while (true) { 597 ASSERT(len_ <= sizeof(buffer_)); 598 599 // HTTP is inherently sensitive to round trip latency, since a frequent use 600 // case is for small requests and responses to be sent back and forth, and 601 // the lack of pipelining forces a single request to take a minimum of the 602 // round trip time. As a result, it is to our benefit to pack as much data 603 // into each packet as possible. Thus, we defer network writes until we've 604 // buffered as much data as possible. 605 606 if (!send_required && (header_ != data_->end())) { 607 // First, attempt to queue more header data. 608 send_required = queue_headers(); 609 } 610 611 if (!send_required && data_->document) { 612 // Next, attempt to queue document data. 613 614 const size_t kChunkDigits = 8; 615 size_t offset, reserve; 616 if (chunk_data_) { 617 // Reserve characters at the start for X-byte hex value and \r\n 618 offset = len_ + kChunkDigits + 2; 619 // ... and 2 characters at the end for \r\n 620 reserve = offset + 2; 621 } else { 622 offset = len_; 623 reserve = offset; 624 } 625 626 if (reserve >= sizeof(buffer_)) { 627 send_required = true; 628 } else { 629 size_t read; 630 int error; 631 StreamResult result = data_->document->Read(buffer_ + offset, 632 sizeof(buffer_) - reserve, 633 &read, &error); 634 if (result == SR_SUCCESS) { 635 ASSERT(reserve + read <= sizeof(buffer_)); 636 if (chunk_data_) { 637 // Prepend the chunk length in hex. 638 // Note: sprintfn appends a null terminator, which is why we can't 639 // combine it with the line terminator. 640 sprintfn(buffer_ + len_, kChunkDigits + 1, "%.*x", 641 kChunkDigits, read); 642 // Add line terminator to the chunk length. 643 memcpy(buffer_ + len_ + kChunkDigits, "\r\n", 2); 644 // Add line terminator to the end of the chunk. 645 memcpy(buffer_ + offset + read, "\r\n", 2); 646 } 647 len_ = reserve + read; 648 } else if (result == SR_BLOCK) { 649 // Nothing to do but flush data to the network. 650 send_required = true; 651 } else if (result == SR_EOS) { 652 if (chunk_data_) { 653 // Append the empty chunk and empty trailers, then turn off 654 // chunking. 655 ASSERT(len_ + 5 <= sizeof(buffer_)); 656 memcpy(buffer_ + len_, "0\r\n\r\n", 5); 657 len_ += 5; 658 chunk_data_ = false; 659 } else if (0 == len_) { 660 // No more data to read, and no more data to write. 661 do_complete(); 662 return; 663 } 664 // Although we are done reading data, there is still data which needs 665 // to be flushed to the network. 666 send_required = true; 667 } else { 668 LOG_F(LS_ERROR) << "Read error: " << error; 669 do_complete(HE_STREAM); 670 return; 671 } 672 } 673 } 674 675 if (0 == len_) { 676 // No data currently available to send. 677 if (!data_->document) { 678 // If there is no source document, that means we're done. 679 do_complete(); 680 } 681 return; 682 } 683 684 size_t written; 685 int error; 686 StreamResult result = http_stream_->Write(buffer_, len_, &written, &error); 687 if (result == SR_SUCCESS) { 688 ASSERT(written <= len_); 689 len_ -= written; 690 memmove(buffer_, buffer_ + written, len_); 691 send_required = false; 692 } else if (result == SR_BLOCK) { 693 if (send_required) { 694 // Nothing more we can do until network is writeable. 695 return; 696 } 697 } else { 698 ASSERT(result == SR_ERROR); 699 LOG_F(LS_ERROR) << "error"; 700 OnHttpStreamEvent(http_stream_, SE_CLOSE, error); 701 return; 702 } 703 } 704 705 ASSERT(false); 706 } 707 708 bool 709 HttpBase::queue_headers() { 710 ASSERT(HM_SEND == mode_); 711 while (header_ != data_->end()) { 712 size_t len = sprintfn(buffer_ + len_, sizeof(buffer_) - len_, 713 "%.*s: %.*s\r\n", 714 header_->first.size(), header_->first.data(), 715 header_->second.size(), header_->second.data()); 716 if (len_ + len < sizeof(buffer_) - 3) { 717 len_ += len; 718 ++header_; 719 } else if (len_ == 0) { 720 LOG(WARNING) << "discarding header that is too long: " << header_->first; 721 ++header_; 722 } else { 723 // Not enough room for the next header, write to network first. 724 return true; 725 } 726 } 727 // End of headers 728 len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n"); 729 return false; 730 } 731 732 void 733 HttpBase::do_complete(HttpError err) { 734 ASSERT(mode_ != HM_NONE); 735 HttpMode mode = mode_; 736 mode_ = HM_NONE; 737 if (data_ && data_->document) { 738 data_->document->SignalEvent.disconnect(this); 739 } 740 data_ = NULL; 741 if ((HM_RECV == mode) && doc_stream_) { 742 ASSERT(HE_NONE != err); // We should have Disconnected doc_stream_ already. 743 DocumentStream* ds = doc_stream_; 744 ds->Disconnect(err); 745 ds->SignalEvent(ds, SE_CLOSE, err); 746 } 747 if (notify_) { 748 notify_->onHttpComplete(mode, err); 749 } 750 } 751 752 // 753 // Stream Signals 754 // 755 756 void 757 HttpBase::OnHttpStreamEvent(StreamInterface* stream, int events, int error) { 758 ASSERT(stream == http_stream_); 759 if ((events & SE_OPEN) && (mode_ == HM_CONNECT)) { 760 do_complete(); 761 return; 762 } 763 764 if ((events & SE_WRITE) && (mode_ == HM_SEND)) { 765 flush_data(); 766 return; 767 } 768 769 if ((events & SE_READ) && (mode_ == HM_RECV)) { 770 if (doc_stream_) { 771 doc_stream_->SignalEvent(doc_stream_, SE_READ, 0); 772 } else { 773 read_and_process_data(); 774 } 775 return; 776 } 777 778 if ((events & SE_CLOSE) == 0) 779 return; 780 781 HttpError http_error = HandleStreamClose(error); 782 if (mode_ == HM_RECV) { 783 complete(http_error); 784 } else if (mode_ != HM_NONE) { 785 do_complete(http_error); 786 } else if (notify_) { 787 notify_->onHttpClosed(http_error); 788 } 789 } 790 791 void 792 HttpBase::OnDocumentEvent(StreamInterface* stream, int events, int error) { 793 ASSERT(stream == data_->document.get()); 794 if ((events & SE_WRITE) && (mode_ == HM_RECV)) { 795 read_and_process_data(); 796 return; 797 } 798 799 if ((events & SE_READ) && (mode_ == HM_SEND)) { 800 flush_data(); 801 return; 802 } 803 804 if (events & SE_CLOSE) { 805 LOG_F(LS_ERROR) << "Read error: " << error; 806 do_complete(HE_STREAM); 807 return; 808 } 809 } 810 811 // 812 // HttpParser Implementation 813 // 814 815 HttpParser::ProcessResult 816 HttpBase::ProcessLeader(const char* line, size_t len, HttpError* error) { 817 *error = data_->parseLeader(line, len); 818 return (HE_NONE == *error) ? PR_CONTINUE : PR_COMPLETE; 819 } 820 821 HttpParser::ProcessResult 822 HttpBase::ProcessHeader(const char* name, size_t nlen, const char* value, 823 size_t vlen, HttpError* error) { 824 std::string sname(name, nlen), svalue(value, vlen); 825 data_->addHeader(sname, svalue); 826 return PR_CONTINUE; 827 } 828 829 HttpParser::ProcessResult 830 HttpBase::ProcessHeaderComplete(bool chunked, size_t& data_size, 831 HttpError* error) { 832 StreamInterface* old_docstream = doc_stream_; 833 if (notify_) { 834 *error = notify_->onHttpHeaderComplete(chunked, data_size); 835 // The request must not be aborted as a result of this callback. 836 ASSERT(NULL != data_); 837 } 838 if ((HE_NONE == *error) && data_->document) { 839 data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent); 840 } 841 if (HE_NONE != *error) { 842 return PR_COMPLETE; 843 } 844 if (old_docstream != doc_stream_) { 845 // Break out of Process loop, since our I/O model just changed. 846 return PR_BLOCK; 847 } 848 return PR_CONTINUE; 849 } 850 851 HttpParser::ProcessResult 852 HttpBase::ProcessData(const char* data, size_t len, size_t& read, 853 HttpError* error) { 854 if (ignore_data_ || !data_->document) { 855 read = len; 856 return PR_CONTINUE; 857 } 858 int write_error = 0; 859 switch (data_->document->Write(data, len, &read, &write_error)) { 860 case SR_SUCCESS: 861 return PR_CONTINUE; 862 case SR_BLOCK: 863 return PR_BLOCK; 864 case SR_EOS: 865 LOG_F(LS_ERROR) << "Unexpected EOS"; 866 *error = HE_STREAM; 867 return PR_COMPLETE; 868 case SR_ERROR: 869 default: 870 LOG_F(LS_ERROR) << "Write error: " << write_error; 871 *error = HE_STREAM; 872 return PR_COMPLETE; 873 } 874 } 875 876 void 877 HttpBase::OnComplete(HttpError err) { 878 LOG_F(LS_VERBOSE); 879 do_complete(err); 880 } 881 882 } // namespace rtc 883