1 // Copyright (c) 2009 The Chromium Authors. All rights reserved. 2 // Use of this source code is governed by a BSD-style license that can be 3 // found in the LICENSE file. 4 5 #include "base/scoped_ptr.h" 6 #include "base/stats_counters.h" 7 8 #include "flip_framer.h" // cross-google3 directory naming. 9 #include "flip_frame_builder.h" 10 #include "flip_bitmasks.h" 11 12 #if defined(USE_SYSTEM_ZLIB) 13 #include <zlib.h> 14 #else 15 #include "third_party/zlib/zlib.h" 16 #endif 17 18 namespace flip { 19 20 // The initial size of the control frame buffer; this is used internally 21 // as we parse through control frames. 22 static const size_t kControlFrameBufferInitialSize = 32 * 1024; 23 // The maximum size of the control frame buffer that we support. 24 // TODO(mbelshe): We should make this stream-based so there are no limits. 25 static const size_t kControlFrameBufferMaxSize = 64 * 1024; 26 27 // By default is compression on or off. 28 bool FlipFramer::compression_default_ = true; 29 30 #ifdef DEBUG_FLIP_STATE_CHANGES 31 #define CHANGE_STATE(newstate) \ 32 { \ 33 do { \ 34 LOG(INFO) << "Changing state from: " \ 35 << StateToString(state_) \ 36 << " to " << StateToString(newstate) << "\n"; \ 37 state_ = newstate; \ 38 } while (false); \ 39 } 40 #else 41 #define CHANGE_STATE(newstate) (state_ = newstate) 42 #endif 43 44 FlipFramer::FlipFramer() 45 : state_(FLIP_RESET), 46 error_code_(FLIP_NO_ERROR), 47 remaining_payload_(0), 48 remaining_control_payload_(0), 49 current_frame_buffer_(NULL), 50 current_frame_len_(0), 51 current_frame_capacity_(0), 52 enable_compression_(compression_default_), 53 visitor_(NULL) { 54 } 55 56 FlipFramer::~FlipFramer() { 57 if (compressor_.get()) { 58 deflateEnd(compressor_.get()); 59 } 60 if (decompressor_.get()) { 61 inflateEnd(decompressor_.get()); 62 } 63 delete [] current_frame_buffer_; 64 } 65 66 void FlipFramer::Reset() { 67 state_ = FLIP_RESET; 68 error_code_ = FLIP_NO_ERROR; 69 remaining_payload_ = 0; 70 remaining_control_payload_ = 0; 71 current_frame_len_ = 0; 72 if (current_frame_capacity_ != kControlFrameBufferInitialSize) { 73 delete [] current_frame_buffer_; 74 current_frame_buffer_ = 0; 75 current_frame_capacity_ = 0; 76 ExpandControlFrameBuffer(kControlFrameBufferInitialSize); 77 } 78 } 79 80 const char* FlipFramer::StateToString(int state) { 81 switch (state) { 82 case FLIP_ERROR: 83 return "ERROR"; 84 case FLIP_DONE: 85 return "DONE"; 86 case FLIP_AUTO_RESET: 87 return "AUTO_RESET"; 88 case FLIP_RESET: 89 return "RESET"; 90 case FLIP_READING_COMMON_HEADER: 91 return "READING_COMMON_HEADER"; 92 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: 93 return "INTERPRET_CONTROL_FRAME_COMMON_HEADER"; 94 case FLIP_CONTROL_FRAME_PAYLOAD: 95 return "CONTROL_FRAME_PAYLOAD"; 96 case FLIP_IGNORE_REMAINING_PAYLOAD: 97 return "IGNORE_REMAINING_PAYLOAD"; 98 case FLIP_FORWARD_STREAM_FRAME: 99 return "FORWARD_STREAM_FRAME"; 100 } 101 return "UNKNOWN_STATE"; 102 } 103 104 size_t FlipFramer::BytesSafeToRead() const { 105 switch (state_) { 106 case FLIP_ERROR: 107 case FLIP_DONE: 108 case FLIP_AUTO_RESET: 109 case FLIP_RESET: 110 return 0; 111 case FLIP_READING_COMMON_HEADER: 112 DCHECK(current_frame_len_ < FlipFrame::size()); 113 return FlipFrame::size() - current_frame_len_; 114 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: 115 return 0; 116 case FLIP_CONTROL_FRAME_PAYLOAD: 117 case FLIP_IGNORE_REMAINING_PAYLOAD: 118 case FLIP_FORWARD_STREAM_FRAME: 119 return remaining_payload_; 120 } 121 // We should never get to here. 122 return 0; 123 } 124 125 void FlipFramer::set_error(FlipError error) { 126 DCHECK(visitor_); 127 error_code_ = error; 128 CHANGE_STATE(FLIP_ERROR); 129 visitor_->OnError(this); 130 } 131 132 const char* FlipFramer::ErrorCodeToString(int error_code) { 133 switch (error_code) { 134 case FLIP_NO_ERROR: 135 return "NO_ERROR"; 136 case FLIP_UNKNOWN_CONTROL_TYPE: 137 return "UNKNOWN_CONTROL_TYPE"; 138 case FLIP_INVALID_CONTROL_FRAME: 139 return "INVALID_CONTROL_FRAME"; 140 case FLIP_CONTROL_PAYLOAD_TOO_LARGE: 141 return "CONTROL_PAYLOAD_TOO_LARGE"; 142 case FLIP_ZLIB_INIT_FAILURE: 143 return "ZLIB_INIT_FAILURE"; 144 case FLIP_UNSUPPORTED_VERSION: 145 return "UNSUPPORTED_VERSION"; 146 case FLIP_DECOMPRESS_FAILURE: 147 return "DECOMPRESS_FAILURE"; 148 } 149 return "UNKNOWN_STATE"; 150 } 151 152 size_t FlipFramer::ProcessInput(const char* data, size_t len) { 153 DCHECK(visitor_); 154 DCHECK(data); 155 156 size_t original_len = len; 157 while (len != 0) { 158 switch (state_) { 159 case FLIP_ERROR: 160 case FLIP_DONE: 161 goto bottom; 162 163 case FLIP_AUTO_RESET: 164 case FLIP_RESET: 165 Reset(); 166 CHANGE_STATE(FLIP_READING_COMMON_HEADER); 167 continue; 168 169 case FLIP_READING_COMMON_HEADER: { 170 int bytes_read = ProcessCommonHeader(data, len); 171 len -= bytes_read; 172 data += bytes_read; 173 continue; 174 } 175 176 // Arguably, this case is not necessary, as no bytes are consumed here. 177 // I felt it was a nice partitioning, however (which probably indicates 178 // that it should be refactored into its own function!) 179 case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER: 180 ProcessControlFrameHeader(); 181 continue; 182 183 case FLIP_CONTROL_FRAME_PAYLOAD: { 184 int bytes_read = ProcessControlFramePayload(data, len); 185 len -= bytes_read; 186 data += bytes_read; 187 } 188 // intentional fallthrough 189 case FLIP_IGNORE_REMAINING_PAYLOAD: 190 // control frame has too-large payload 191 // intentional fallthrough 192 case FLIP_FORWARD_STREAM_FRAME: { 193 int bytes_read = ProcessDataFramePayload(data, len); 194 len -= bytes_read; 195 data += bytes_read; 196 continue; 197 } 198 default: 199 break; 200 } 201 } 202 bottom: 203 return original_len - len; 204 } 205 206 size_t FlipFramer::ProcessCommonHeader(const char* data, size_t len) { 207 // This should only be called when we're in the FLIP_READING_COMMON_HEADER 208 // state. 209 DCHECK(state_ == FLIP_READING_COMMON_HEADER); 210 211 int original_len = len; 212 FlipFrame current_frame(current_frame_buffer_, false); 213 214 do { 215 if (current_frame_len_ < FlipFrame::size()) { 216 size_t bytes_desired = FlipFrame::size() - current_frame_len_; 217 size_t bytes_to_append = std::min(bytes_desired, len); 218 char* header_buffer = current_frame_buffer_; 219 memcpy(&header_buffer[current_frame_len_], data, bytes_to_append); 220 current_frame_len_ += bytes_to_append; 221 data += bytes_to_append; 222 len -= bytes_to_append; 223 // Check for an empty data frame. 224 if (current_frame_len_ == FlipFrame::size() && 225 !current_frame.is_control_frame() && 226 current_frame.length() == 0) { 227 if (current_frame.flags() & CONTROL_FLAG_FIN) { 228 FlipDataFrame data_frame(current_frame_buffer_, false); 229 visitor_->OnStreamFrameData(data_frame.stream_id(), NULL, 0); 230 } 231 CHANGE_STATE(FLIP_RESET); 232 } 233 break; 234 } 235 remaining_payload_ = current_frame.length(); 236 237 // This is just a sanity check for help debugging early frame errors. 238 if (remaining_payload_ > 1000000u) { 239 LOG(ERROR) << 240 "Unexpectedly large frame. Flip session is likely corrupt."; 241 } 242 243 // if we're here, then we have the common header all received. 244 if (!current_frame.is_control_frame()) 245 CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME); 246 else 247 CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER); 248 } while (false); 249 250 return original_len - len; 251 } 252 253 void FlipFramer::ProcessControlFrameHeader() { 254 DCHECK_EQ(FLIP_NO_ERROR, error_code_); 255 DCHECK_LE(FlipFrame::size(), current_frame_len_); 256 FlipControlFrame current_control_frame(current_frame_buffer_, false); 257 // Do some sanity checking on the control frame sizes. 258 switch (current_control_frame.type()) { 259 case SYN_STREAM: 260 if (current_control_frame.length() < 261 FlipSynStreamControlFrame::size() - FlipControlFrame::size()) 262 set_error(FLIP_INVALID_CONTROL_FRAME); 263 break; 264 case SYN_REPLY: 265 if (current_control_frame.length() < 266 FlipSynReplyControlFrame::size() - FlipControlFrame::size()) 267 set_error(FLIP_INVALID_CONTROL_FRAME); 268 break; 269 case FIN_STREAM: 270 if (current_control_frame.length() != 271 FlipFinStreamControlFrame::size() - FlipFrame::size()) 272 set_error(FLIP_INVALID_CONTROL_FRAME); 273 break; 274 case NOOP: 275 // NOOP. Swallow it. 276 CHANGE_STATE(FLIP_AUTO_RESET); 277 return; 278 default: 279 set_error(FLIP_UNKNOWN_CONTROL_TYPE); 280 break; 281 } 282 283 // We only support version 1 of this protocol. 284 if (current_control_frame.version() != kFlipProtocolVersion) 285 set_error(FLIP_UNSUPPORTED_VERSION); 286 287 remaining_control_payload_ = current_control_frame.length(); 288 if (remaining_control_payload_ > kControlFrameBufferMaxSize) 289 set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE); 290 291 if (error_code_) 292 return; 293 294 ExpandControlFrameBuffer(remaining_control_payload_); 295 CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD); 296 } 297 298 size_t FlipFramer::ProcessControlFramePayload(const char* data, size_t len) { 299 size_t original_len = len; 300 do { 301 if (remaining_control_payload_) { 302 size_t amount_to_consume = std::min(remaining_control_payload_, len); 303 memcpy(¤t_frame_buffer_[current_frame_len_], data, 304 amount_to_consume); 305 current_frame_len_ += amount_to_consume; 306 data += amount_to_consume; 307 len -= amount_to_consume; 308 remaining_control_payload_ -= amount_to_consume; 309 remaining_payload_ -= amount_to_consume; 310 if (remaining_control_payload_) 311 break; 312 } 313 FlipControlFrame control_frame(current_frame_buffer_, false); 314 visitor_->OnControl(&control_frame); 315 316 // If this is a FIN, tell the caller. 317 if (control_frame.type() == SYN_REPLY && 318 control_frame.flags() & CONTROL_FLAG_FIN) 319 visitor_->OnStreamFrameData(control_frame.stream_id(), NULL, 0); 320 321 CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD); 322 } while (false); 323 return original_len - len; 324 } 325 326 size_t FlipFramer::ProcessDataFramePayload(const char* data, size_t len) { 327 size_t original_len = len; 328 329 FlipDataFrame current_data_frame(current_frame_buffer_, false); 330 if (remaining_payload_) { 331 size_t amount_to_forward = std::min(remaining_payload_, len); 332 if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) { 333 if (current_data_frame.flags() & DATA_FLAG_COMPRESSED) { 334 // TODO(mbelshe): Assert that the decompressor is init'ed. 335 if (!InitializeDecompressor()) 336 return NULL; 337 338 size_t decompressed_max_size = amount_to_forward * 100; 339 scoped_ptr<char> decompressed(new char[decompressed_max_size]); 340 decompressor_->next_in = reinterpret_cast<Bytef*>( 341 const_cast<char*>(data)); 342 decompressor_->avail_in = amount_to_forward; 343 decompressor_->next_out = 344 reinterpret_cast<Bytef*>(decompressed.get()); 345 decompressor_->avail_out = decompressed_max_size; 346 347 int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); 348 if (rv != Z_OK) { 349 set_error(FLIP_DECOMPRESS_FAILURE); 350 return 0; 351 } 352 size_t decompressed_size = decompressed_max_size - 353 decompressor_->avail_out; 354 // Only inform the visitor if there is data. 355 if (decompressed_size) 356 visitor_->OnStreamFrameData(current_data_frame.stream_id(), 357 decompressed.get(), 358 decompressed_size); 359 amount_to_forward -= decompressor_->avail_in; 360 } else { 361 // The data frame was not compressed. 362 // Only inform the visitor if there is data. 363 if (amount_to_forward) 364 visitor_->OnStreamFrameData(current_data_frame.stream_id(), 365 data, amount_to_forward); 366 } 367 } 368 data += amount_to_forward; 369 len -= amount_to_forward; 370 remaining_payload_ -= amount_to_forward; 371 372 // If the FIN flag is set, and there is no more data in this data 373 // frame, inform the visitor of EOF via a 0-length data frame. 374 if (!remaining_payload_ && 375 current_data_frame.flags() & DATA_FLAG_FIN) 376 visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL, 377 0); 378 } else { 379 CHANGE_STATE(FLIP_AUTO_RESET); 380 } 381 return original_len - len; 382 } 383 384 void FlipFramer::ExpandControlFrameBuffer(size_t size) { 385 DCHECK(size < kControlFrameBufferMaxSize); 386 if (size < current_frame_capacity_) 387 return; 388 389 int alloc_size = size + FlipFrame::size(); 390 char* new_buffer = new char[alloc_size]; 391 memcpy(new_buffer, current_frame_buffer_, current_frame_len_); 392 current_frame_capacity_ = alloc_size; 393 current_frame_buffer_ = new_buffer; 394 } 395 396 bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame, 397 FlipHeaderBlock* block) { 398 FlipControlFrame control_frame(frame->data(), false); 399 uint32 type = control_frame.type(); 400 if (type != SYN_STREAM && type != SYN_REPLY) 401 return false; 402 403 // Find the header data within the control frame. 404 scoped_ptr<FlipFrame> decompressed_frame(DecompressFrame(frame)); 405 if (!decompressed_frame.get()) 406 return false; 407 FlipSynStreamControlFrame syn_frame(decompressed_frame->data(), false); 408 const char *header_data = syn_frame.header_block(); 409 int header_length = syn_frame.header_block_len(); 410 411 FlipFrameBuilder builder(header_data, header_length); 412 void* iter = NULL; 413 uint16 num_headers; 414 if (builder.ReadUInt16(&iter, &num_headers)) { 415 for (int index = 0; index < num_headers; ++index) { 416 std::string name; 417 std::string value; 418 if (!builder.ReadString(&iter, &name)) 419 break; 420 if (!builder.ReadString(&iter, &value)) 421 break; 422 if (block->find(name) == block->end()) { 423 (*block)[name] = value; 424 } else { 425 return false; 426 } 427 } 428 return true; 429 } 430 return false; 431 } 432 433 FlipSynStreamControlFrame* FlipFramer::CreateSynStream( 434 FlipStreamId stream_id, int priority, FlipControlFlags flags, 435 bool compressed, FlipHeaderBlock* headers) { 436 FlipFrameBuilder frame; 437 438 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); 439 frame.WriteUInt16(SYN_STREAM); 440 frame.WriteUInt32(0); // Placeholder for the length and flags 441 frame.WriteUInt32(stream_id); 442 frame.WriteUInt16(ntohs(priority) << 6); // Priority. 443 444 frame.WriteUInt16(headers->size()); // Number of headers. 445 FlipHeaderBlock::iterator it; 446 for (it = headers->begin(); it != headers->end(); ++it) { 447 frame.WriteString(it->first); 448 frame.WriteString(it->second); 449 } 450 451 // Write the length and flags. 452 size_t length = frame.length() - FlipFrame::size(); 453 DCHECK(length < static_cast<size_t>(kLengthMask)); 454 FlagsAndLength flags_length; 455 flags_length.length_ = htonl(static_cast<uint32>(length)); 456 flags_length.flags_[0] = flags; 457 frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length)); 458 459 scoped_ptr<FlipFrame> syn_frame(frame.take()); 460 if (compressed) { 461 return reinterpret_cast<FlipSynStreamControlFrame*>( 462 CompressFrame(syn_frame.get())); 463 } 464 return reinterpret_cast<FlipSynStreamControlFrame*>(syn_frame.release()); 465 } 466 467 /* static */ 468 FlipFinStreamControlFrame* FlipFramer::CreateFinStream(FlipStreamId stream_id, 469 int status) { 470 FlipFrameBuilder frame; 471 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); 472 frame.WriteUInt16(FIN_STREAM); 473 frame.WriteUInt32(8); 474 frame.WriteUInt32(stream_id); 475 frame.WriteUInt32(status); 476 return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take()); 477 } 478 479 FlipSynReplyControlFrame* FlipFramer::CreateSynReply(FlipStreamId stream_id, 480 FlipControlFlags flags, bool compressed, FlipHeaderBlock* headers) { 481 482 FlipFrameBuilder frame; 483 484 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); 485 frame.WriteUInt16(SYN_REPLY); 486 frame.WriteUInt32(0); // Placeholder for the length and flags. 487 frame.WriteUInt32(stream_id); 488 frame.WriteUInt16(0); // Unused 489 490 frame.WriteUInt16(headers->size()); // Number of headers. 491 FlipHeaderBlock::iterator it; 492 for (it = headers->begin(); it != headers->end(); ++it) { 493 // TODO(mbelshe): Headers need to be sorted. 494 frame.WriteString(it->first); 495 frame.WriteString(it->second); 496 } 497 498 // Write the length 499 size_t length = frame.length() - FlipFrame::size(); 500 DCHECK(length < static_cast<size_t>(kLengthMask)); 501 FlagsAndLength flags_length; 502 flags_length.length_ = htonl(static_cast<uint32>(length)); 503 flags_length.flags_[0] = flags; 504 frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length)); 505 506 scoped_ptr<FlipFrame> reply_frame(frame.take()); 507 if (compressed) { 508 return reinterpret_cast<FlipSynReplyControlFrame*>( 509 CompressFrame(reply_frame.get())); 510 } 511 return reinterpret_cast<FlipSynReplyControlFrame*>(reply_frame.release()); 512 } 513 514 FlipDataFrame* FlipFramer::CreateDataFrame(FlipStreamId stream_id, 515 const char* data, 516 uint32 len, FlipDataFlags flags) { 517 FlipFrameBuilder frame; 518 519 frame.WriteUInt32(stream_id); 520 521 DCHECK(len < static_cast<size_t>(kLengthMask)); 522 FlagsAndLength flags_length; 523 flags_length.length_ = htonl(len); 524 flags_length.flags_[0] = flags; 525 frame.WriteBytes(&flags_length, sizeof(flags_length)); 526 527 frame.WriteBytes(data, len); 528 scoped_ptr<FlipFrame> data_frame(frame.take()); 529 if (flags & DATA_FLAG_COMPRESSED) 530 return reinterpret_cast<FlipDataFrame*>(CompressFrame(data_frame.get())); 531 return reinterpret_cast<FlipDataFrame*>(data_frame.release()); 532 } 533 534 /* static */ 535 FlipControlFrame* FlipFramer::CreateNopFrame() { 536 FlipFrameBuilder frame; 537 frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion); 538 frame.WriteUInt16(NOOP); 539 frame.WriteUInt32(0); 540 return reinterpret_cast<FlipControlFrame*>(frame.take()); 541 } 542 543 static const int kCompressorLevel = Z_DEFAULT_COMPRESSION; 544 // This is just a hacked dictionary to use for shrinking HTTP-like headers. 545 // TODO(mbelshe): Use a scientific methodology for computing the dictionary. 546 static const char dictionary[] = 547 "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-" 548 "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi" 549 "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser" 550 "-agent10010120020120220320420520630030130230330430530630740040140240340440" 551 "5406407408409410411412413414415416417500501502503504505accept-rangesageeta" 552 "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic" 553 "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran" 554 "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati" 555 "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo" 556 "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe" 557 "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic" 558 "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1" 559 ".1statusversionurl"; 560 static uLong dictionary_id = 0; 561 562 bool FlipFramer::InitializeCompressor() { 563 if (compressor_.get()) 564 return true; // Already initialized. 565 566 compressor_.reset(new z_stream); 567 memset(compressor_.get(), 0, sizeof(z_stream)); 568 569 int success = deflateInit(compressor_.get(), kCompressorLevel); 570 if (success == Z_OK) 571 success = deflateSetDictionary(compressor_.get(), 572 reinterpret_cast<const Bytef*>(dictionary), 573 sizeof(dictionary)); 574 if (success != Z_OK) 575 compressor_.reset(NULL); 576 return success == Z_OK; 577 } 578 579 bool FlipFramer::InitializeDecompressor() { 580 if (decompressor_.get()) 581 return true; // Already initialized. 582 583 decompressor_.reset(new z_stream); 584 memset(decompressor_.get(), 0, sizeof(z_stream)); 585 586 // Compute the id of our dictionary so that we know we're using the 587 // right one when asked for it. 588 if (dictionary_id == 0) { 589 dictionary_id = adler32(0L, Z_NULL, 0); 590 dictionary_id = adler32(dictionary_id, 591 reinterpret_cast<const Bytef*>(dictionary), 592 sizeof(dictionary)); 593 } 594 595 int success = inflateInit(decompressor_.get()); 596 if (success != Z_OK) 597 decompressor_.reset(NULL); 598 return success == Z_OK; 599 } 600 601 bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame, 602 int* payload_length, 603 int* header_length, 604 const char** payload) const { 605 if (frame->is_control_frame()) { 606 const FlipControlFrame* control_frame = 607 reinterpret_cast<const FlipControlFrame*>(frame); 608 switch (control_frame->type()) { 609 case SYN_STREAM: 610 case SYN_REPLY: 611 { 612 const FlipSynStreamControlFrame *syn_frame = 613 reinterpret_cast<const FlipSynStreamControlFrame*>(frame); 614 *payload_length = syn_frame->header_block_len(); 615 *header_length = syn_frame->size(); 616 *payload = frame->data() + *header_length; 617 } 618 break; 619 default: 620 // TODO(mbelshe): set an error? 621 return false; // We can't compress this frame! 622 } 623 } else { 624 *header_length = FlipFrame::size(); 625 *payload_length = frame->length(); 626 *payload = frame->data() + FlipFrame::size(); 627 } 628 DCHECK(static_cast<size_t>(*header_length) <= 629 FlipFrame::size() + *payload_length); 630 return true; 631 } 632 633 634 FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) { 635 int payload_length; 636 int header_length; 637 const char* payload; 638 639 static StatsCounter pre_compress_bytes("flip.PreCompressSize"); 640 static StatsCounter post_compress_bytes("flip.PostCompressSize"); 641 642 if (!enable_compression_) 643 return DuplicateFrame(frame); 644 645 if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) 646 return NULL; 647 648 if (!InitializeCompressor()) 649 return NULL; 650 651 // TODO(mbelshe): Should we have a zlib header like what http servers do? 652 653 // Create an output frame. 654 int compressed_max_size = deflateBound(compressor_.get(), payload_length); 655 int new_frame_size = header_length + compressed_max_size; 656 FlipFrame* new_frame = new FlipFrame(new_frame_size); 657 memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size()); 658 659 compressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); 660 compressor_->avail_in = payload_length; 661 compressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + 662 header_length; 663 compressor_->avail_out = compressed_max_size; 664 665 // Data packets have a 'compressed flag 666 if (!new_frame->is_control_frame()) { 667 FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); 668 data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED); 669 } 670 671 int rv = deflate(compressor_.get(), Z_SYNC_FLUSH); 672 if (rv != Z_OK) { // How can we know that it compressed everything? 673 // This shouldn't happen, right? 674 delete new_frame; 675 return NULL; 676 } 677 678 int compressed_size = compressed_max_size - compressor_->avail_out; 679 new_frame->set_length(header_length + compressed_size - FlipFrame::size()); 680 681 pre_compress_bytes.Add(payload_length); 682 post_compress_bytes.Add(new_frame->length()); 683 684 return new_frame; 685 } 686 687 FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) { 688 int payload_length; 689 int header_length; 690 const char* payload; 691 692 static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize"); 693 static StatsCounter post_decompress_bytes("flip.PostDeCompressSize"); 694 695 if (!enable_compression_) 696 return DuplicateFrame(frame); 697 698 if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload)) 699 return NULL; 700 701 if (!frame->is_control_frame()) { 702 const FlipDataFrame* data_frame = 703 reinterpret_cast<const FlipDataFrame*>(frame); 704 if ((data_frame->flags() & DATA_FLAG_COMPRESSED) == 0) 705 return DuplicateFrame(frame); 706 } 707 708 if (!InitializeDecompressor()) 709 return NULL; 710 711 // TODO(mbelshe): Should we have a zlib header like what http servers do? 712 713 // Create an output frame. Assume it does not need to be longer than 714 // the input data. 715 int decompressed_max_size = kControlFrameBufferInitialSize; 716 int new_frame_size = header_length + decompressed_max_size; 717 FlipFrame* new_frame = new FlipFrame(new_frame_size); 718 memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size()); 719 720 decompressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload)); 721 decompressor_->avail_in = payload_length; 722 decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) + 723 header_length; 724 decompressor_->avail_out = decompressed_max_size; 725 726 int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); 727 if (rv == Z_NEED_DICT) { 728 // Need to try again with the right dictionary. 729 if (decompressor_->adler == dictionary_id) { 730 rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary, 731 sizeof(dictionary)); 732 if (rv == Z_OK) 733 rv = inflate(decompressor_.get(), Z_SYNC_FLUSH); 734 } 735 } 736 if (rv != Z_OK) { // How can we know that it decompressed everything? 737 delete new_frame; 738 return NULL; 739 } 740 741 // Unset the compressed flag for data frames. 742 if (!new_frame->is_control_frame()) { 743 FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame); 744 data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED); 745 } 746 747 int decompressed_size = decompressed_max_size - decompressor_->avail_out; 748 new_frame->set_length(header_length + decompressed_size - FlipFrame::size()); 749 750 pre_decompress_bytes.Add(frame->length()); 751 post_decompress_bytes.Add(new_frame->length()); 752 753 return new_frame; 754 } 755 756 FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) { 757 int size = FlipFrame::size() + frame->length(); 758 FlipFrame* new_frame = new FlipFrame(size); 759 memcpy(new_frame->data(), frame->data(), size); 760 return new_frame; 761 } 762 763 void FlipFramer::set_enable_compression(bool value) { 764 enable_compression_ = value; 765 } 766 767 void FlipFramer::set_enable_compression_default(bool value) { 768 compression_default_ = value; 769 } 770 771 } // namespace flip 772 773