Home | History | Annotate | Download | only in base
      1 /*
      2  * libjingle
      3  * Copyright 2004--2005, Google Inc.
      4  *
      5  * Redistribution and use in source and binary forms, with or without
      6  * modification, are permitted provided that the following conditions are met:
      7  *
      8  *  1. Redistributions of source code must retain the above copyright notice,
      9  *     this list of conditions and the following disclaimer.
     10  *  2. Redistributions in binary form must reproduce the above copyright notice,
     11  *     this list of conditions and the following disclaimer in the documentation
     12  *     and/or other materials provided with the distribution.
     13  *  3. The name of the author may not be used to endorse or promote products
     14  *     derived from this software without specific prior written permission.
     15  *
     16  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR IMPLIED
     17  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
     18  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO
     19  * EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     20  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
     21  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
     22  * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
     23  * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
     24  * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
     25  * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     26  */
     27 
     28 // Copyright 2005 Google Inc.  All Rights Reserved.
     29 //
     30 
     31 
     32 #ifdef WIN32
     33 #include "talk/base/win32.h"
     34 #else  // !WIN32
     35 #define SEC_E_CERT_EXPIRED (-2146893016)
     36 #endif  // !WIN32
     37 
     38 #include "talk/base/common.h"
     39 #include "talk/base/httpbase.h"
     40 #include "talk/base/logging.h"
     41 #include "talk/base/socket.h"
     42 #include "talk/base/stringutils.h"
     43 #include "talk/base/thread.h"
     44 
     45 namespace talk_base {
     46 
     47 //////////////////////////////////////////////////////////////////////
     48 // Helpers
     49 //////////////////////////////////////////////////////////////////////
     50 
     51 bool MatchHeader(const char* str, size_t len, HttpHeader header) {
     52   const char* const header_str = ToString(header);
     53   const size_t header_len = strlen(header_str);
     54   return (len == header_len) && (_strnicmp(str, header_str, header_len) == 0);
     55 }
     56 
     57 enum {
     58   MSG_READ
     59 };
     60 
     61 //////////////////////////////////////////////////////////////////////
     62 // HttpParser
     63 //////////////////////////////////////////////////////////////////////
     64 
     65 HttpParser::HttpParser() {
     66   reset();
     67 }
     68 
     69 HttpParser::~HttpParser() {
     70 }
     71 
     72 void
     73 HttpParser::reset() {
     74   state_ = ST_LEADER;
     75   chunked_ = false;
     76   data_size_ = SIZE_UNKNOWN;
     77 }
     78 
     79 HttpParser::ProcessResult
     80 HttpParser::Process(const char* buffer, size_t len, size_t* processed,
     81                     HttpError* error) {
     82   *processed = 0;
     83   *error = HE_NONE;
     84 
     85   if (state_ >= ST_COMPLETE) {
     86     ASSERT(false);
     87     return PR_COMPLETE;
     88   }
     89 
     90   while (true) {
     91     if (state_ < ST_DATA) {
     92       size_t pos = *processed;
     93       while ((pos < len) && (buffer[pos] != '\n')) {
     94         pos += 1;
     95       }
     96       if (pos >= len) {
     97         break;  // don't have a full header
     98       }
     99       const char* line = buffer + *processed;
    100       size_t len = (pos - *processed);
    101       *processed = pos + 1;
    102       while ((len > 0) && isspace(static_cast<unsigned char>(line[len-1]))) {
    103         len -= 1;
    104       }
    105       ProcessResult result = ProcessLine(line, len, error);
    106       LOG(LS_VERBOSE) << "Processed line, result=" << result;
    107 
    108       if (PR_CONTINUE != result) {
    109         return result;
    110       }
    111     } else if (data_size_ == 0) {
    112       if (chunked_) {
    113         state_ = ST_CHUNKTERM;
    114       } else {
    115         return PR_COMPLETE;
    116       }
    117     } else {
    118       size_t available = len - *processed;
    119       if (available <= 0) {
    120         break; // no more data
    121       }
    122       if ((data_size_ != SIZE_UNKNOWN) && (available > data_size_)) {
    123         available = data_size_;
    124       }
    125       size_t read = 0;
    126       ProcessResult result = ProcessData(buffer + *processed, available, read,
    127                                          error);
    128       LOG(LS_VERBOSE) << "Processed data, result: " << result << " read: "
    129                       << read << " err: " << error;
    130 
    131       if (PR_CONTINUE != result) {
    132         return result;
    133       }
    134       *processed += read;
    135       if (data_size_ != SIZE_UNKNOWN) {
    136         data_size_ -= read;
    137       }
    138     }
    139   }
    140 
    141   return PR_CONTINUE;
    142 }
    143 
    144 HttpParser::ProcessResult
    145 HttpParser::ProcessLine(const char* line, size_t len, HttpError* error) {
    146   LOG_F(LS_VERBOSE) << " state: " << state_ << " line: "
    147                     << std::string(line, len) << " len: " << len << " err: "
    148                     << error;
    149 
    150   switch (state_) {
    151   case ST_LEADER:
    152     state_ = ST_HEADERS;
    153     return ProcessLeader(line, len, error);
    154 
    155   case ST_HEADERS:
    156     if (len > 0) {
    157       const char* value = strchrn(line, len, ':');
    158       if (!value) {
    159         *error = HE_PROTOCOL;
    160         return PR_COMPLETE;
    161       }
    162       size_t nlen = (value - line);
    163       const char* eol = line + len;
    164       do {
    165         value += 1;
    166       } while ((value < eol) && isspace(static_cast<unsigned char>(*value)));
    167       size_t vlen = eol - value;
    168       if (MatchHeader(line, nlen, HH_CONTENT_LENGTH)) {
    169         // sscanf isn't safe with strings that aren't null-terminated, and there
    170         // is no guarantee that |value| is.
    171         // Create a local copy that is null-terminated.
    172         std::string value_str(value, vlen);
    173         unsigned int temp_size;
    174         if (sscanf(value_str.c_str(), "%u", &temp_size) != 1) {
    175           *error = HE_PROTOCOL;
    176           return PR_COMPLETE;
    177         }
    178         data_size_ = static_cast<size_t>(temp_size);
    179       } else if (MatchHeader(line, nlen, HH_TRANSFER_ENCODING)) {
    180         if ((vlen == 7) && (_strnicmp(value, "chunked", 7) == 0)) {
    181           chunked_ = true;
    182         } else if ((vlen == 8) && (_strnicmp(value, "identity", 8) == 0)) {
    183           chunked_ = false;
    184         } else {
    185           *error = HE_PROTOCOL;
    186           return PR_COMPLETE;
    187         }
    188       }
    189       return ProcessHeader(line, nlen, value, vlen, error);
    190     } else {
    191       state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
    192       return ProcessHeaderComplete(chunked_, data_size_, error);
    193     }
    194     break;
    195 
    196   case ST_CHUNKSIZE:
    197     if (len > 0) {
    198       char* ptr = NULL;
    199       data_size_ = strtoul(line, &ptr, 16);
    200       if (ptr != line + len) {
    201         *error = HE_PROTOCOL;
    202         return PR_COMPLETE;
    203       }
    204       state_ = (data_size_ == 0) ? ST_TRAILERS : ST_DATA;
    205     } else {
    206       *error = HE_PROTOCOL;
    207       return PR_COMPLETE;
    208     }
    209     break;
    210 
    211   case ST_CHUNKTERM:
    212     if (len > 0) {
    213       *error = HE_PROTOCOL;
    214       return PR_COMPLETE;
    215     } else {
    216       state_ = chunked_ ? ST_CHUNKSIZE : ST_DATA;
    217     }
    218     break;
    219 
    220   case ST_TRAILERS:
    221     if (len == 0) {
    222       return PR_COMPLETE;
    223     }
    224     // *error = onHttpRecvTrailer();
    225     break;
    226 
    227   default:
    228     ASSERT(false);
    229     break;
    230   }
    231 
    232   return PR_CONTINUE;
    233 }
    234 
    235 bool
    236 HttpParser::is_valid_end_of_input() const {
    237   return (state_ == ST_DATA) && (data_size_ == SIZE_UNKNOWN);
    238 }
    239 
    240 void
    241 HttpParser::complete(HttpError error) {
    242   if (state_ < ST_COMPLETE) {
    243     state_ = ST_COMPLETE;
    244     OnComplete(error);
    245   }
    246 }
    247 
    248 //////////////////////////////////////////////////////////////////////
    249 // HttpBase::DocumentStream
    250 //////////////////////////////////////////////////////////////////////
    251 
    252 class BlockingMemoryStream : public ExternalMemoryStream {
    253 public:
    254   BlockingMemoryStream(char* buffer, size_t size)
    255   : ExternalMemoryStream(buffer, size) { }
    256 
    257   virtual StreamResult DoReserve(size_t size, int* error) {
    258     return (buffer_length_ >= size) ? SR_SUCCESS : SR_BLOCK;
    259   }
    260 };
    261 
    262 class HttpBase::DocumentStream : public StreamInterface {
    263 public:
    264   DocumentStream(HttpBase* base) : base_(base), error_(HE_DEFAULT) { }
    265 
    266   virtual StreamState GetState() const {
    267     if (NULL == base_)
    268       return SS_CLOSED;
    269     if (HM_RECV == base_->mode_)
    270       return SS_OPEN;
    271     return SS_OPENING;
    272   }
    273 
    274   virtual StreamResult Read(void* buffer, size_t buffer_len,
    275                             size_t* read, int* error) {
    276     if (!base_) {
    277       if (error) *error = error_;
    278       return (HE_NONE == error_) ? SR_EOS : SR_ERROR;
    279     }
    280 
    281     if (HM_RECV != base_->mode_) {
    282       return SR_BLOCK;
    283     }
    284 
    285     // DoReceiveLoop writes http document data to the StreamInterface* document
    286     // member of HttpData.  In this case, we want this data to be written
    287     // directly to our buffer.  To accomplish this, we wrap our buffer with a
    288     // StreamInterface, and replace the existing document with our wrapper.
    289     // When the method returns, we restore the old document.  Ideally, we would
    290     // pass our StreamInterface* to DoReceiveLoop, but due to the callbacks
    291     // of HttpParser, we would still need to store the pointer temporarily.
    292     scoped_ptr<StreamInterface>
    293         stream(new BlockingMemoryStream(reinterpret_cast<char*>(buffer),
    294                                         buffer_len));
    295 
    296     // Replace the existing document with our wrapped buffer.
    297     base_->data_->document.swap(stream);
    298 
    299     // Pump the I/O loop.  DoReceiveLoop is guaranteed not to attempt to
    300     // complete the I/O process, which means that our wrapper is not in danger
    301     // of being deleted.  To ensure this, DoReceiveLoop returns true when it
    302     // wants complete to be called.  We make sure to uninstall our wrapper
    303     // before calling complete().
    304     HttpError http_error;
    305     bool complete = base_->DoReceiveLoop(&http_error);
    306 
    307     // Reinstall the original output document.
    308     base_->data_->document.swap(stream);
    309 
    310     // If we reach the end of the receive stream, we disconnect our stream
    311     // adapter from the HttpBase, and further calls to read will either return
    312     // EOS or ERROR, appropriately.  Finally, we call complete().
    313     StreamResult result = SR_BLOCK;
    314     if (complete) {
    315       HttpBase* base = Disconnect(http_error);
    316       if (error) *error = error_;
    317       result = (HE_NONE == error_) ? SR_EOS : SR_ERROR;
    318       base->complete(http_error);
    319     }
    320 
    321     // Even if we are complete, if some data was read we must return SUCCESS.
    322     // Future Reads will return EOS or ERROR based on the error_ variable.
    323     size_t position;
    324     stream->GetPosition(&position);
    325     if (position > 0) {
    326       if (read) *read = position;
    327       result = SR_SUCCESS;
    328     }
    329     return result;
    330   }
    331 
    332   virtual StreamResult Write(const void* data, size_t data_len,
    333                              size_t* written, int* error) {
    334     if (error) *error = -1;
    335     return SR_ERROR;
    336   }
    337 
    338   virtual void Close() {
    339     if (base_) {
    340       HttpBase* base = Disconnect(HE_NONE);
    341       if (HM_RECV == base->mode_ && base->http_stream_) {
    342         // Read I/O could have been stalled on the user of this DocumentStream,
    343         // so restart the I/O process now that we've removed ourselves.
    344         base->http_stream_->PostEvent(SE_READ, 0);
    345       }
    346     }
    347   }
    348 
    349   virtual bool GetAvailable(size_t* size) const {
    350     if (!base_ || HM_RECV != base_->mode_)
    351       return false;
    352     size_t data_size = base_->GetDataRemaining();
    353     if (SIZE_UNKNOWN == data_size)
    354       return false;
    355     if (size)
    356       *size = data_size;
    357     return true;
    358   }
    359 
    360   HttpBase* Disconnect(HttpError error) {
    361     ASSERT(NULL != base_);
    362     ASSERT(NULL != base_->doc_stream_);
    363     HttpBase* base = base_;
    364     base_->doc_stream_ = NULL;
    365     base_ = NULL;
    366     error_ = error;
    367     return base;
    368   }
    369 
    370 private:
    371   HttpBase* base_;
    372   HttpError error_;
    373 };
    374 
    375 //////////////////////////////////////////////////////////////////////
    376 // HttpBase
    377 //////////////////////////////////////////////////////////////////////
    378 
    379 HttpBase::HttpBase() : mode_(HM_NONE), data_(NULL), notify_(NULL),
    380                        http_stream_(NULL), doc_stream_(NULL) {
    381 }
    382 
    383 HttpBase::~HttpBase() {
    384   ASSERT(HM_NONE == mode_);
    385 }
    386 
    387 bool
    388 HttpBase::isConnected() const {
    389   return (http_stream_ != NULL) && (http_stream_->GetState() == SS_OPEN);
    390 }
    391 
    392 bool
    393 HttpBase::attach(StreamInterface* stream) {
    394   if ((mode_ != HM_NONE) || (http_stream_ != NULL) || (stream == NULL)) {
    395     ASSERT(false);
    396     return false;
    397   }
    398   http_stream_ = stream;
    399   http_stream_->SignalEvent.connect(this, &HttpBase::OnHttpStreamEvent);
    400   mode_ = (http_stream_->GetState() == SS_OPENING) ? HM_CONNECT : HM_NONE;
    401   return true;
    402 }
    403 
    404 StreamInterface*
    405 HttpBase::detach() {
    406   ASSERT(HM_NONE == mode_);
    407   if (mode_ != HM_NONE) {
    408     return NULL;
    409   }
    410   StreamInterface* stream = http_stream_;
    411   http_stream_ = NULL;
    412   if (stream) {
    413     stream->SignalEvent.disconnect(this);
    414   }
    415   return stream;
    416 }
    417 
    418 void
    419 HttpBase::send(HttpData* data) {
    420   ASSERT(HM_NONE == mode_);
    421   if (mode_ != HM_NONE) {
    422     return;
    423   } else if (!isConnected()) {
    424     OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
    425     return;
    426   }
    427 
    428   mode_ = HM_SEND;
    429   data_ = data;
    430   len_ = 0;
    431   ignore_data_ = chunk_data_ = false;
    432 
    433   if (data_->document) {
    434     data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
    435   }
    436 
    437   std::string encoding;
    438   if (data_->hasHeader(HH_TRANSFER_ENCODING, &encoding)
    439       && (encoding == "chunked")) {
    440     chunk_data_ = true;
    441   }
    442 
    443   len_ = data_->formatLeader(buffer_, sizeof(buffer_));
    444   len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
    445 
    446   header_ = data_->begin();
    447   if (header_ == data_->end()) {
    448     // We must call this at least once, in the case where there are no headers.
    449     queue_headers();
    450   }
    451 
    452   flush_data();
    453 }
    454 
    455 void
    456 HttpBase::recv(HttpData* data) {
    457   ASSERT(HM_NONE == mode_);
    458   if (mode_ != HM_NONE) {
    459     return;
    460   } else if (!isConnected()) {
    461     OnHttpStreamEvent(http_stream_, SE_CLOSE, HE_DISCONNECTED);
    462     return;
    463   }
    464 
    465   mode_ = HM_RECV;
    466   data_ = data;
    467   len_ = 0;
    468   ignore_data_ = chunk_data_ = false;
    469 
    470   reset();
    471   if (doc_stream_) {
    472     doc_stream_->SignalEvent(doc_stream_, SE_OPEN | SE_READ, 0);
    473   } else {
    474     read_and_process_data();
    475   }
    476 }
    477 
    478 void
    479 HttpBase::abort(HttpError err) {
    480   if (mode_ != HM_NONE) {
    481     if (http_stream_ != NULL) {
    482       http_stream_->Close();
    483     }
    484     do_complete(err);
    485   }
    486 }
    487 
    488 StreamInterface* HttpBase::GetDocumentStream() {
    489   if (doc_stream_)
    490     return NULL;
    491   doc_stream_ = new DocumentStream(this);
    492   return doc_stream_;
    493 }
    494 
    495 HttpError HttpBase::HandleStreamClose(int error) {
    496   if (http_stream_ != NULL) {
    497     http_stream_->Close();
    498   }
    499   if (error == 0) {
    500     if ((mode_ == HM_RECV) && is_valid_end_of_input()) {
    501       return HE_NONE;
    502     } else {
    503       return HE_DISCONNECTED;
    504     }
    505   } else if (error == SOCKET_EACCES) {
    506     return HE_AUTH;
    507   } else if (error == SEC_E_CERT_EXPIRED) {
    508     return HE_CERTIFICATE_EXPIRED;
    509   }
    510   LOG_F(LS_ERROR) << "(" << error << ")";
    511   return (HM_CONNECT == mode_) ? HE_CONNECT_FAILED : HE_SOCKET_ERROR;
    512 }
    513 
    514 bool HttpBase::DoReceiveLoop(HttpError* error) {
    515   ASSERT(HM_RECV == mode_);
    516   ASSERT(NULL != error);
    517 
    518   // Do to the latency between receiving read notifications from
    519   // pseudotcpchannel, we rely on repeated calls to read in order to acheive
    520   // ideal throughput.  The number of reads is limited to prevent starving
    521   // the caller.
    522 
    523   size_t loop_count = 0;
    524   const size_t kMaxReadCount = 20;
    525   bool process_requires_more_data = false;
    526   do {
    527     // The most frequent use of this function is response to new data available
    528     // on http_stream_.  Therefore, we optimize by attempting to read from the
    529     // network first (as opposed to processing existing data first).
    530 
    531     if (len_ < sizeof(buffer_)) {
    532       // Attempt to buffer more data.
    533       size_t read;
    534       int read_error;
    535       StreamResult read_result = http_stream_->Read(buffer_ + len_,
    536                                                     sizeof(buffer_) - len_,
    537                                                     &read, &read_error);
    538       switch (read_result) {
    539       case SR_SUCCESS:
    540         ASSERT(len_ + read <= sizeof(buffer_));
    541         len_ += read;
    542         break;
    543       case SR_BLOCK:
    544         if (process_requires_more_data) {
    545           // We're can't make progress until more data is available.
    546           return false;
    547         }
    548         // Attempt to process the data already in our buffer.
    549         break;
    550       case SR_EOS:
    551         // Clean close, with no error.  Fall through to HandleStreamClose.
    552         read_error = 0;
    553       case SR_ERROR:
    554         *error = HandleStreamClose(read_error);
    555         return true;
    556       }
    557     } else if (process_requires_more_data) {
    558       // We have too much unprocessed data in our buffer.  This should only
    559       // occur when a single HTTP header is longer than the buffer size (32K).
    560       // Anything longer than that is almost certainly an error.
    561       *error = HE_OVERFLOW;
    562       return true;
    563     }
    564 
    565     // Process data in our buffer.  Process is not guaranteed to process all
    566     // the buffered data.  In particular, it will wait until a complete
    567     // protocol element (such as http header, or chunk size) is available,
    568     // before processing it in its entirety.  Also, it is valid and sometimes
    569     // necessary to call Process with an empty buffer, since the state machine
    570     // may have interrupted state transitions to complete.
    571     size_t processed;
    572     ProcessResult process_result = Process(buffer_, len_, &processed,
    573                                             error);
    574     ASSERT(processed <= len_);
    575     len_ -= processed;
    576     memmove(buffer_, buffer_ + processed, len_);
    577     switch (process_result) {
    578     case PR_CONTINUE:
    579       // We need more data to make progress.
    580       process_requires_more_data = true;
    581       break;
    582     case PR_BLOCK:
    583       // We're stalled on writing the processed data.
    584       return false;
    585     case PR_COMPLETE:
    586       // *error already contains the correct code.
    587       return true;
    588     }
    589   } while (++loop_count <= kMaxReadCount);
    590 
    591   LOG_F(LS_WARNING) << "danger of starvation";
    592   return false;
    593 }
    594 
    595 void
    596 HttpBase::read_and_process_data() {
    597   HttpError error;
    598   if (DoReceiveLoop(&error)) {
    599     complete(error);
    600   }
    601 }
    602 
    603 void
    604 HttpBase::flush_data() {
    605   ASSERT(HM_SEND == mode_);
    606 
    607   // When send_required is true, no more buffering can occur without a network
    608   // write.
    609   bool send_required = (len_ >= sizeof(buffer_));
    610 
    611   while (true) {
    612     ASSERT(len_ <= sizeof(buffer_));
    613 
    614     // HTTP is inherently sensitive to round trip latency, since a frequent use
    615     // case is for small requests and responses to be sent back and forth, and
    616     // the lack of pipelining forces a single request to take a minimum of the
    617     // round trip time.  As a result, it is to our benefit to pack as much data
    618     // into each packet as possible.  Thus, we defer network writes until we've
    619     // buffered as much data as possible.
    620 
    621     if (!send_required && (header_ != data_->end())) {
    622       // First, attempt to queue more header data.
    623       send_required = queue_headers();
    624     }
    625 
    626     if (!send_required && data_->document) {
    627       // Next, attempt to queue document data.
    628 
    629       const size_t kChunkDigits = 8;
    630       size_t offset, reserve;
    631       if (chunk_data_) {
    632         // Reserve characters at the start for X-byte hex value and \r\n
    633         offset = len_ + kChunkDigits + 2;
    634         // ... and 2 characters at the end for \r\n
    635         reserve = offset + 2;
    636       } else {
    637         offset = len_;
    638         reserve = offset;
    639       }
    640 
    641       if (reserve >= sizeof(buffer_)) {
    642         send_required = true;
    643       } else {
    644         size_t read;
    645         int error;
    646         StreamResult result = data_->document->Read(buffer_ + offset,
    647                                                     sizeof(buffer_) - reserve,
    648                                                     &read, &error);
    649         if (result == SR_SUCCESS) {
    650           ASSERT(reserve + read <= sizeof(buffer_));
    651           if (chunk_data_) {
    652             // Prepend the chunk length in hex.
    653             // Note: sprintfn appends a null terminator, which is why we can't
    654             // combine it with the line terminator.
    655             sprintfn(buffer_ + len_, kChunkDigits + 1, "%.*x",
    656                      kChunkDigits, read);
    657             // Add line terminator to the chunk length.
    658             memcpy(buffer_ + len_ + kChunkDigits, "\r\n", 2);
    659             // Add line terminator to the end of the chunk.
    660             memcpy(buffer_ + offset + read, "\r\n", 2);
    661           }
    662           len_ = reserve + read;
    663         } else if (result == SR_BLOCK) {
    664           // Nothing to do but flush data to the network.
    665           send_required = true;
    666         } else if (result == SR_EOS) {
    667           if (chunk_data_) {
    668             // Append the empty chunk and empty trailers, then turn off
    669             // chunking.
    670             ASSERT(len_ + 5 <= sizeof(buffer_));
    671             memcpy(buffer_ + len_, "0\r\n\r\n", 5);
    672             len_ += 5;
    673             chunk_data_ = false;
    674           } else if (0 == len_) {
    675             // No more data to read, and no more data to write.
    676             do_complete();
    677             return;
    678           }
    679           // Although we are done reading data, there is still data which needs
    680           // to be flushed to the network.
    681           send_required = true;
    682         } else {
    683           LOG_F(LS_ERROR) << "Read error: " << error;
    684           do_complete(HE_STREAM);
    685           return;
    686         }
    687       }
    688     }
    689 
    690     if (0 == len_) {
    691       // No data currently available to send.
    692       if (!data_->document) {
    693         // If there is no source document, that means we're done.
    694         do_complete();
    695       }
    696       return;
    697     }
    698 
    699     size_t written;
    700     int error;
    701     StreamResult result = http_stream_->Write(buffer_, len_, &written, &error);
    702     if (result == SR_SUCCESS) {
    703       ASSERT(written <= len_);
    704       len_ -= written;
    705       memmove(buffer_, buffer_ + written, len_);
    706       send_required = false;
    707     } else if (result == SR_BLOCK) {
    708       if (send_required) {
    709         // Nothing more we can do until network is writeable.
    710         return;
    711       }
    712     } else {
    713       ASSERT(result == SR_ERROR);
    714       LOG_F(LS_ERROR) << "error";
    715       OnHttpStreamEvent(http_stream_, SE_CLOSE, error);
    716       return;
    717     }
    718   }
    719 
    720   ASSERT(false);
    721 }
    722 
    723 bool
    724 HttpBase::queue_headers() {
    725   ASSERT(HM_SEND == mode_);
    726   while (header_ != data_->end()) {
    727     size_t len = sprintfn(buffer_ + len_, sizeof(buffer_) - len_,
    728                           "%.*s: %.*s\r\n",
    729                           header_->first.size(), header_->first.data(),
    730                           header_->second.size(), header_->second.data());
    731     if (len_ + len < sizeof(buffer_) - 3) {
    732       len_ += len;
    733       ++header_;
    734     } else if (len_ == 0) {
    735       LOG(WARNING) << "discarding header that is too long: " << header_->first;
    736       ++header_;
    737     } else {
    738       // Not enough room for the next header, write to network first.
    739       return true;
    740     }
    741   }
    742   // End of headers
    743   len_ += strcpyn(buffer_ + len_, sizeof(buffer_) - len_, "\r\n");
    744   return false;
    745 }
    746 
    747 void
    748 HttpBase::do_complete(HttpError err) {
    749   ASSERT(mode_ != HM_NONE);
    750   HttpMode mode = mode_;
    751   mode_ = HM_NONE;
    752   if (data_ && data_->document) {
    753     data_->document->SignalEvent.disconnect(this);
    754   }
    755   data_ = NULL;
    756   if ((HM_RECV == mode) && doc_stream_) {
    757     ASSERT(HE_NONE != err);  // We should have Disconnected doc_stream_ already.
    758     DocumentStream* ds = doc_stream_;
    759     ds->Disconnect(err);
    760     ds->SignalEvent(ds, SE_CLOSE, err);
    761   }
    762   if (notify_) {
    763     notify_->onHttpComplete(mode, err);
    764   }
    765 }
    766 
    767 //
    768 // Stream Signals
    769 //
    770 
    771 void
    772 HttpBase::OnHttpStreamEvent(StreamInterface* stream, int events, int error) {
    773   ASSERT(stream == http_stream_);
    774   if ((events & SE_OPEN) && (mode_ == HM_CONNECT)) {
    775     do_complete();
    776     return;
    777   }
    778 
    779   if ((events & SE_WRITE) && (mode_ == HM_SEND)) {
    780     flush_data();
    781     return;
    782   }
    783 
    784   if ((events & SE_READ) && (mode_ == HM_RECV)) {
    785     if (doc_stream_) {
    786       doc_stream_->SignalEvent(doc_stream_, SE_READ, 0);
    787     } else {
    788       read_and_process_data();
    789     }
    790     return;
    791   }
    792 
    793   if ((events & SE_CLOSE) == 0)
    794     return;
    795 
    796   HttpError http_error = HandleStreamClose(error);
    797   if (mode_ == HM_RECV) {
    798     complete(http_error);
    799   } else if (mode_ != HM_NONE) {
    800     do_complete(http_error);
    801   } else if (notify_) {
    802     notify_->onHttpClosed(http_error);
    803   }
    804 }
    805 
    806 void
    807 HttpBase::OnDocumentEvent(StreamInterface* stream, int events, int error) {
    808   ASSERT(stream == data_->document.get());
    809   if ((events & SE_WRITE) && (mode_ == HM_RECV)) {
    810     read_and_process_data();
    811     return;
    812   }
    813 
    814   if ((events & SE_READ) && (mode_ == HM_SEND)) {
    815     flush_data();
    816     return;
    817   }
    818 
    819   if (events & SE_CLOSE) {
    820     LOG_F(LS_ERROR) << "Read error: " << error;
    821     do_complete(HE_STREAM);
    822     return;
    823   }
    824 }
    825 
    826 //
    827 // HttpParser Implementation
    828 //
    829 
    830 HttpParser::ProcessResult
    831 HttpBase::ProcessLeader(const char* line, size_t len, HttpError* error) {
    832   *error = data_->parseLeader(line, len);
    833   return (HE_NONE == *error) ? PR_CONTINUE : PR_COMPLETE;
    834 }
    835 
    836 HttpParser::ProcessResult
    837 HttpBase::ProcessHeader(const char* name, size_t nlen, const char* value,
    838                         size_t vlen, HttpError* error) {
    839   std::string sname(name, nlen), svalue(value, vlen);
    840   data_->addHeader(sname, svalue);
    841   return PR_CONTINUE;
    842 }
    843 
    844 HttpParser::ProcessResult
    845 HttpBase::ProcessHeaderComplete(bool chunked, size_t& data_size,
    846                                 HttpError* error) {
    847   StreamInterface* old_docstream = doc_stream_;
    848   if (notify_) {
    849     *error = notify_->onHttpHeaderComplete(chunked, data_size);
    850     // The request must not be aborted as a result of this callback.
    851     ASSERT(NULL != data_);
    852   }
    853   if ((HE_NONE == *error) && data_->document) {
    854     data_->document->SignalEvent.connect(this, &HttpBase::OnDocumentEvent);
    855   }
    856   if (HE_NONE != *error) {
    857     return PR_COMPLETE;
    858   }
    859   if (old_docstream != doc_stream_) {
    860     // Break out of Process loop, since our I/O model just changed.
    861     return PR_BLOCK;
    862   }
    863   return PR_CONTINUE;
    864 }
    865 
    866 HttpParser::ProcessResult
    867 HttpBase::ProcessData(const char* data, size_t len, size_t& read,
    868                       HttpError* error) {
    869   if (ignore_data_ || !data_->document) {
    870     read = len;
    871     return PR_CONTINUE;
    872   }
    873   int write_error = 0;
    874   switch (data_->document->Write(data, len, &read, &write_error)) {
    875   case SR_SUCCESS:
    876     return PR_CONTINUE;
    877   case SR_BLOCK:
    878     return PR_BLOCK;
    879   case SR_EOS:
    880     LOG_F(LS_ERROR) << "Unexpected EOS";
    881     *error = HE_STREAM;
    882     return PR_COMPLETE;
    883   case SR_ERROR:
    884   default:
    885     LOG_F(LS_ERROR) << "Write error: " << write_error;
    886     *error = HE_STREAM;
    887     return PR_COMPLETE;
    888   }
    889 }
    890 
    891 void
    892 HttpBase::OnComplete(HttpError err) {
    893   LOG_F(LS_VERBOSE);
    894   do_complete(err);
    895 }
    896 
    897 } // namespace talk_base
    898