Home | History | Annotate | Download | only in flip
      1 // Copyright (c) 2009 The Chromium Authors. All rights reserved.
      2 // Use of this source code is governed by a BSD-style license that can be
      3 // found in the LICENSE file.
      4 
      5 #include "base/scoped_ptr.h"
      6 #include "base/stats_counters.h"
      7 
      8 #include "flip_framer.h"  // cross-google3 directory naming.
      9 #include "flip_frame_builder.h"
     10 #include "flip_bitmasks.h"
     11 
     12 #if defined(USE_SYSTEM_ZLIB)
     13 #include <zlib.h>
     14 #else
     15 #include "third_party/zlib/zlib.h"
     16 #endif
     17 
     18 namespace flip {
     19 
     20 // The initial size of the control frame buffer; this is used internally
     21 // as we parse through control frames.
     22 static const size_t kControlFrameBufferInitialSize = 32 * 1024;
     23 // The maximum size of the control frame buffer that we support.
     24 // TODO(mbelshe): We should make this stream-based so there are no limits.
     25 static const size_t kControlFrameBufferMaxSize = 64 * 1024;
     26 
     27 // By default is compression on or off.
     28 bool FlipFramer::compression_default_ = true;
     29 
     30 #ifdef DEBUG_FLIP_STATE_CHANGES
     31 #define CHANGE_STATE(newstate) \
     32 { \
     33   do { \
     34     LOG(INFO) << "Changing state from: " \
     35       << StateToString(state_) \
     36       << " to " << StateToString(newstate) << "\n"; \
     37     state_ = newstate; \
     38   } while (false); \
     39 }
     40 #else
     41 #define CHANGE_STATE(newstate) (state_ = newstate)
     42 #endif
     43 
     44 FlipFramer::FlipFramer()
     45     : state_(FLIP_RESET),
     46       error_code_(FLIP_NO_ERROR),
     47       remaining_payload_(0),
     48       remaining_control_payload_(0),
     49       current_frame_buffer_(NULL),
     50       current_frame_len_(0),
     51       current_frame_capacity_(0),
     52       enable_compression_(compression_default_),
     53       visitor_(NULL) {
     54 }
     55 
     56 FlipFramer::~FlipFramer() {
     57   if (compressor_.get()) {
     58     deflateEnd(compressor_.get());
     59   }
     60   if (decompressor_.get()) {
     61     inflateEnd(decompressor_.get());
     62   }
     63   delete [] current_frame_buffer_;
     64 }
     65 
     66 void FlipFramer::Reset() {
     67   state_ = FLIP_RESET;
     68   error_code_ = FLIP_NO_ERROR;
     69   remaining_payload_ = 0;
     70   remaining_control_payload_ = 0;
     71   current_frame_len_ = 0;
     72   if (current_frame_capacity_ != kControlFrameBufferInitialSize) {
     73     delete [] current_frame_buffer_;
     74     current_frame_buffer_ = 0;
     75     current_frame_capacity_ = 0;
     76     ExpandControlFrameBuffer(kControlFrameBufferInitialSize);
     77   }
     78 }
     79 
     80 const char* FlipFramer::StateToString(int state) {
     81   switch (state) {
     82     case FLIP_ERROR:
     83       return "ERROR";
     84     case FLIP_DONE:
     85       return "DONE";
     86     case FLIP_AUTO_RESET:
     87       return "AUTO_RESET";
     88     case FLIP_RESET:
     89       return "RESET";
     90     case FLIP_READING_COMMON_HEADER:
     91       return "READING_COMMON_HEADER";
     92     case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
     93       return "INTERPRET_CONTROL_FRAME_COMMON_HEADER";
     94     case FLIP_CONTROL_FRAME_PAYLOAD:
     95       return "CONTROL_FRAME_PAYLOAD";
     96     case FLIP_IGNORE_REMAINING_PAYLOAD:
     97       return "IGNORE_REMAINING_PAYLOAD";
     98     case FLIP_FORWARD_STREAM_FRAME:
     99       return "FORWARD_STREAM_FRAME";
    100   }
    101   return "UNKNOWN_STATE";
    102 }
    103 
    104 size_t FlipFramer::BytesSafeToRead() const {
    105   switch (state_) {
    106     case FLIP_ERROR:
    107     case FLIP_DONE:
    108     case FLIP_AUTO_RESET:
    109     case FLIP_RESET:
    110       return 0;
    111     case FLIP_READING_COMMON_HEADER:
    112       DCHECK(current_frame_len_ < FlipFrame::size());
    113       return FlipFrame::size() - current_frame_len_;
    114     case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
    115       return 0;
    116     case FLIP_CONTROL_FRAME_PAYLOAD:
    117     case FLIP_IGNORE_REMAINING_PAYLOAD:
    118     case FLIP_FORWARD_STREAM_FRAME:
    119       return remaining_payload_;
    120   }
    121   // We should never get to here.
    122   return 0;
    123 }
    124 
    125 void FlipFramer::set_error(FlipError error) {
    126   DCHECK(visitor_);
    127   error_code_ = error;
    128   CHANGE_STATE(FLIP_ERROR);
    129   visitor_->OnError(this);
    130 }
    131 
    132 const char* FlipFramer::ErrorCodeToString(int error_code) {
    133   switch (error_code) {
    134     case FLIP_NO_ERROR:
    135       return "NO_ERROR";
    136     case FLIP_UNKNOWN_CONTROL_TYPE:
    137       return "UNKNOWN_CONTROL_TYPE";
    138     case FLIP_INVALID_CONTROL_FRAME:
    139       return "INVALID_CONTROL_FRAME";
    140     case FLIP_CONTROL_PAYLOAD_TOO_LARGE:
    141       return "CONTROL_PAYLOAD_TOO_LARGE";
    142     case FLIP_ZLIB_INIT_FAILURE:
    143       return "ZLIB_INIT_FAILURE";
    144     case FLIP_UNSUPPORTED_VERSION:
    145       return "UNSUPPORTED_VERSION";
    146     case FLIP_DECOMPRESS_FAILURE:
    147       return "DECOMPRESS_FAILURE";
    148   }
    149   return "UNKNOWN_STATE";
    150 }
    151 
    152 size_t FlipFramer::ProcessInput(const char* data, size_t len) {
    153   DCHECK(visitor_);
    154   DCHECK(data);
    155 
    156   size_t original_len = len;
    157   while (len != 0) {
    158     switch (state_) {
    159       case FLIP_ERROR:
    160       case FLIP_DONE:
    161         goto bottom;
    162 
    163       case FLIP_AUTO_RESET:
    164       case FLIP_RESET:
    165         Reset();
    166         CHANGE_STATE(FLIP_READING_COMMON_HEADER);
    167         continue;
    168 
    169       case FLIP_READING_COMMON_HEADER: {
    170         int bytes_read = ProcessCommonHeader(data, len);
    171         len -= bytes_read;
    172         data += bytes_read;
    173         continue;
    174       }
    175 
    176       // Arguably, this case is not necessary, as no bytes are consumed here.
    177       // I felt it was a nice partitioning, however (which probably indicates
    178       // that it should be refactored into its own function!)
    179       case FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER:
    180         ProcessControlFrameHeader();
    181         continue;
    182 
    183       case FLIP_CONTROL_FRAME_PAYLOAD: {
    184         int bytes_read = ProcessControlFramePayload(data, len);
    185         len -= bytes_read;
    186         data += bytes_read;
    187       }
    188         // intentional fallthrough
    189       case FLIP_IGNORE_REMAINING_PAYLOAD:
    190         // control frame has too-large payload
    191         // intentional fallthrough
    192       case FLIP_FORWARD_STREAM_FRAME: {
    193         int bytes_read = ProcessDataFramePayload(data, len);
    194         len -= bytes_read;
    195         data += bytes_read;
    196         continue;
    197       }
    198       default:
    199         break;
    200     }
    201   }
    202  bottom:
    203   return original_len - len;
    204 }
    205 
    206 size_t FlipFramer::ProcessCommonHeader(const char* data, size_t len) {
    207   // This should only be called when we're in the FLIP_READING_COMMON_HEADER
    208   // state.
    209   DCHECK(state_ == FLIP_READING_COMMON_HEADER);
    210 
    211   int original_len = len;
    212   FlipFrame current_frame(current_frame_buffer_, false);
    213 
    214   do {
    215     if (current_frame_len_ < FlipFrame::size()) {
    216       size_t bytes_desired = FlipFrame::size() - current_frame_len_;
    217       size_t bytes_to_append = std::min(bytes_desired, len);
    218       char* header_buffer = current_frame_buffer_;
    219       memcpy(&header_buffer[current_frame_len_], data, bytes_to_append);
    220       current_frame_len_ += bytes_to_append;
    221       data += bytes_to_append;
    222       len -= bytes_to_append;
    223       // Check for an empty data frame.
    224       if (current_frame_len_ == FlipFrame::size() &&
    225           !current_frame.is_control_frame() &&
    226           current_frame.length() == 0) {
    227         if (current_frame.flags() & CONTROL_FLAG_FIN) {
    228           FlipDataFrame data_frame(current_frame_buffer_, false);
    229           visitor_->OnStreamFrameData(data_frame.stream_id(), NULL, 0);
    230         }
    231         CHANGE_STATE(FLIP_RESET);
    232       }
    233       break;
    234     }
    235     remaining_payload_ = current_frame.length();
    236 
    237     // This is just a sanity check for help debugging early frame errors.
    238     if (remaining_payload_ > 1000000u) {
    239       LOG(ERROR) <<
    240           "Unexpectedly large frame.  Flip session is likely corrupt.";
    241     }
    242 
    243     // if we're here, then we have the common header all received.
    244     if (!current_frame.is_control_frame())
    245       CHANGE_STATE(FLIP_FORWARD_STREAM_FRAME);
    246     else
    247       CHANGE_STATE(FLIP_INTERPRET_CONTROL_FRAME_COMMON_HEADER);
    248   } while (false);
    249 
    250   return original_len - len;
    251 }
    252 
    253 void FlipFramer::ProcessControlFrameHeader() {
    254   DCHECK_EQ(FLIP_NO_ERROR, error_code_);
    255   DCHECK_LE(FlipFrame::size(), current_frame_len_);
    256   FlipControlFrame current_control_frame(current_frame_buffer_, false);
    257   // Do some sanity checking on the control frame sizes.
    258   switch (current_control_frame.type()) {
    259     case SYN_STREAM:
    260       if (current_control_frame.length() <
    261           FlipSynStreamControlFrame::size() - FlipControlFrame::size())
    262         set_error(FLIP_INVALID_CONTROL_FRAME);
    263       break;
    264     case SYN_REPLY:
    265       if (current_control_frame.length() <
    266           FlipSynReplyControlFrame::size() - FlipControlFrame::size())
    267         set_error(FLIP_INVALID_CONTROL_FRAME);
    268       break;
    269     case FIN_STREAM:
    270       if (current_control_frame.length() !=
    271           FlipFinStreamControlFrame::size() - FlipFrame::size())
    272         set_error(FLIP_INVALID_CONTROL_FRAME);
    273       break;
    274     case NOOP:
    275       // NOOP.  Swallow it.
    276       CHANGE_STATE(FLIP_AUTO_RESET);
    277       return;
    278     default:
    279       set_error(FLIP_UNKNOWN_CONTROL_TYPE);
    280       break;
    281   }
    282 
    283   // We only support version 1 of this protocol.
    284   if (current_control_frame.version() != kFlipProtocolVersion)
    285     set_error(FLIP_UNSUPPORTED_VERSION);
    286 
    287   remaining_control_payload_ = current_control_frame.length();
    288   if (remaining_control_payload_ > kControlFrameBufferMaxSize)
    289     set_error(FLIP_CONTROL_PAYLOAD_TOO_LARGE);
    290 
    291   if (error_code_)
    292     return;
    293 
    294   ExpandControlFrameBuffer(remaining_control_payload_);
    295   CHANGE_STATE(FLIP_CONTROL_FRAME_PAYLOAD);
    296 }
    297 
    298 size_t FlipFramer::ProcessControlFramePayload(const char* data, size_t len) {
    299   size_t original_len = len;
    300   do {
    301     if (remaining_control_payload_) {
    302       size_t amount_to_consume = std::min(remaining_control_payload_, len);
    303       memcpy(&current_frame_buffer_[current_frame_len_], data,
    304              amount_to_consume);
    305       current_frame_len_ += amount_to_consume;
    306       data += amount_to_consume;
    307       len -= amount_to_consume;
    308       remaining_control_payload_ -= amount_to_consume;
    309       remaining_payload_ -= amount_to_consume;
    310       if (remaining_control_payload_)
    311         break;
    312     }
    313     FlipControlFrame control_frame(current_frame_buffer_, false);
    314     visitor_->OnControl(&control_frame);
    315 
    316     // If this is a FIN, tell the caller.
    317     if (control_frame.type() == SYN_REPLY &&
    318         control_frame.flags() & CONTROL_FLAG_FIN)
    319       visitor_->OnStreamFrameData(control_frame.stream_id(), NULL, 0);
    320 
    321     CHANGE_STATE(FLIP_IGNORE_REMAINING_PAYLOAD);
    322   } while (false);
    323   return original_len - len;
    324 }
    325 
    326 size_t FlipFramer::ProcessDataFramePayload(const char* data, size_t len) {
    327   size_t original_len = len;
    328 
    329   FlipDataFrame current_data_frame(current_frame_buffer_, false);
    330   if (remaining_payload_) {
    331     size_t amount_to_forward = std::min(remaining_payload_, len);
    332     if (amount_to_forward && state_ != FLIP_IGNORE_REMAINING_PAYLOAD) {
    333       if (current_data_frame.flags() & DATA_FLAG_COMPRESSED) {
    334         // TODO(mbelshe): Assert that the decompressor is init'ed.
    335         if (!InitializeDecompressor())
    336           return NULL;
    337 
    338         size_t decompressed_max_size = amount_to_forward * 100;
    339         scoped_ptr<char> decompressed(new char[decompressed_max_size]);
    340         decompressor_->next_in = reinterpret_cast<Bytef*>(
    341             const_cast<char*>(data));
    342         decompressor_->avail_in = amount_to_forward;
    343         decompressor_->next_out =
    344             reinterpret_cast<Bytef*>(decompressed.get());
    345         decompressor_->avail_out = decompressed_max_size;
    346 
    347         int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
    348         if (rv != Z_OK) {
    349           set_error(FLIP_DECOMPRESS_FAILURE);
    350           return 0;
    351         }
    352         size_t decompressed_size = decompressed_max_size -
    353                                    decompressor_->avail_out;
    354         // Only inform the visitor if there is data.
    355         if (decompressed_size)
    356           visitor_->OnStreamFrameData(current_data_frame.stream_id(),
    357                                       decompressed.get(),
    358                                       decompressed_size);
    359         amount_to_forward -= decompressor_->avail_in;
    360       } else {
    361         // The data frame was not compressed.
    362         // Only inform the visitor if there is data.
    363         if (amount_to_forward)
    364           visitor_->OnStreamFrameData(current_data_frame.stream_id(),
    365                                       data, amount_to_forward);
    366       }
    367     }
    368     data += amount_to_forward;
    369     len -= amount_to_forward;
    370     remaining_payload_ -= amount_to_forward;
    371 
    372     // If the FIN flag is set, and there is no more data in this data
    373     // frame, inform the visitor of EOF via a 0-length data frame.
    374     if (!remaining_payload_ &&
    375         current_data_frame.flags() & DATA_FLAG_FIN)
    376       visitor_->OnStreamFrameData(current_data_frame.stream_id(), NULL,
    377                                   0);
    378   } else {
    379     CHANGE_STATE(FLIP_AUTO_RESET);
    380   }
    381   return original_len - len;
    382 }
    383 
    384 void FlipFramer::ExpandControlFrameBuffer(size_t size) {
    385   DCHECK(size < kControlFrameBufferMaxSize);
    386   if (size < current_frame_capacity_)
    387     return;
    388 
    389   int alloc_size = size + FlipFrame::size();
    390   char* new_buffer = new char[alloc_size];
    391   memcpy(new_buffer, current_frame_buffer_, current_frame_len_);
    392   current_frame_capacity_ = alloc_size;
    393   current_frame_buffer_ = new_buffer;
    394 }
    395 
    396 bool FlipFramer::ParseHeaderBlock(const FlipFrame* frame,
    397                                   FlipHeaderBlock* block) {
    398   FlipControlFrame control_frame(frame->data(), false);
    399   uint32 type = control_frame.type();
    400   if (type != SYN_STREAM && type != SYN_REPLY)
    401     return false;
    402 
    403   // Find the header data within the control frame.
    404   scoped_ptr<FlipFrame> decompressed_frame(DecompressFrame(frame));
    405   if (!decompressed_frame.get())
    406     return false;
    407   FlipSynStreamControlFrame syn_frame(decompressed_frame->data(), false);
    408   const char *header_data = syn_frame.header_block();
    409   int header_length = syn_frame.header_block_len();
    410 
    411   FlipFrameBuilder builder(header_data, header_length);
    412   void* iter = NULL;
    413   uint16 num_headers;
    414   if (builder.ReadUInt16(&iter, &num_headers)) {
    415     for (int index = 0; index < num_headers; ++index) {
    416       std::string name;
    417       std::string value;
    418       if (!builder.ReadString(&iter, &name))
    419         break;
    420       if (!builder.ReadString(&iter, &value))
    421         break;
    422       if (block->find(name) == block->end()) {
    423         (*block)[name] = value;
    424       } else {
    425         return false;
    426       }
    427     }
    428     return true;
    429   }
    430   return false;
    431 }
    432 
    433 FlipSynStreamControlFrame* FlipFramer::CreateSynStream(
    434     FlipStreamId stream_id, int priority, FlipControlFlags flags,
    435     bool compressed, FlipHeaderBlock* headers) {
    436   FlipFrameBuilder frame;
    437 
    438   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
    439   frame.WriteUInt16(SYN_STREAM);
    440   frame.WriteUInt32(0);  // Placeholder for the length and flags
    441   frame.WriteUInt32(stream_id);
    442   frame.WriteUInt16(ntohs(priority) << 6);  // Priority.
    443 
    444   frame.WriteUInt16(headers->size());  // Number of headers.
    445   FlipHeaderBlock::iterator it;
    446   for (it = headers->begin(); it != headers->end(); ++it) {
    447     frame.WriteString(it->first);
    448     frame.WriteString(it->second);
    449   }
    450 
    451   // Write the length and flags.
    452   size_t length = frame.length() - FlipFrame::size();
    453   DCHECK(length < static_cast<size_t>(kLengthMask));
    454   FlagsAndLength flags_length;
    455   flags_length.length_ = htonl(static_cast<uint32>(length));
    456   flags_length.flags_[0] = flags;
    457   frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
    458 
    459   scoped_ptr<FlipFrame> syn_frame(frame.take());
    460   if (compressed) {
    461     return reinterpret_cast<FlipSynStreamControlFrame*>(
    462         CompressFrame(syn_frame.get()));
    463   }
    464   return reinterpret_cast<FlipSynStreamControlFrame*>(syn_frame.release());
    465 }
    466 
    467 /* static */
    468 FlipFinStreamControlFrame* FlipFramer::CreateFinStream(FlipStreamId stream_id,
    469                                                        int status) {
    470   FlipFrameBuilder frame;
    471   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
    472   frame.WriteUInt16(FIN_STREAM);
    473   frame.WriteUInt32(8);
    474   frame.WriteUInt32(stream_id);
    475   frame.WriteUInt32(status);
    476   return reinterpret_cast<FlipFinStreamControlFrame*>(frame.take());
    477 }
    478 
    479 FlipSynReplyControlFrame* FlipFramer::CreateSynReply(FlipStreamId stream_id,
    480     FlipControlFlags flags, bool compressed, FlipHeaderBlock* headers) {
    481 
    482   FlipFrameBuilder frame;
    483 
    484   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
    485   frame.WriteUInt16(SYN_REPLY);
    486   frame.WriteUInt32(0);  // Placeholder for the length and flags.
    487   frame.WriteUInt32(stream_id);
    488   frame.WriteUInt16(0);  // Unused
    489 
    490   frame.WriteUInt16(headers->size());  // Number of headers.
    491   FlipHeaderBlock::iterator it;
    492   for (it = headers->begin(); it != headers->end(); ++it) {
    493     // TODO(mbelshe): Headers need to be sorted.
    494     frame.WriteString(it->first);
    495     frame.WriteString(it->second);
    496   }
    497 
    498   // Write the length
    499   size_t length = frame.length() - FlipFrame::size();
    500   DCHECK(length < static_cast<size_t>(kLengthMask));
    501   FlagsAndLength flags_length;
    502   flags_length.length_ = htonl(static_cast<uint32>(length));
    503   flags_length.flags_[0] = flags;
    504   frame.WriteBytesToOffset(4, &flags_length, sizeof(flags_length));
    505 
    506   scoped_ptr<FlipFrame> reply_frame(frame.take());
    507   if (compressed) {
    508     return reinterpret_cast<FlipSynReplyControlFrame*>(
    509         CompressFrame(reply_frame.get()));
    510   }
    511   return reinterpret_cast<FlipSynReplyControlFrame*>(reply_frame.release());
    512 }
    513 
    514 FlipDataFrame* FlipFramer::CreateDataFrame(FlipStreamId stream_id,
    515                                            const char* data,
    516                                            uint32 len, FlipDataFlags flags) {
    517   FlipFrameBuilder frame;
    518 
    519   frame.WriteUInt32(stream_id);
    520 
    521   DCHECK(len < static_cast<size_t>(kLengthMask));
    522   FlagsAndLength flags_length;
    523   flags_length.length_ = htonl(len);
    524   flags_length.flags_[0] = flags;
    525   frame.WriteBytes(&flags_length, sizeof(flags_length));
    526 
    527   frame.WriteBytes(data, len);
    528   scoped_ptr<FlipFrame> data_frame(frame.take());
    529   if (flags & DATA_FLAG_COMPRESSED)
    530     return reinterpret_cast<FlipDataFrame*>(CompressFrame(data_frame.get()));
    531   return reinterpret_cast<FlipDataFrame*>(data_frame.release());
    532 }
    533 
    534 /* static */
    535 FlipControlFrame* FlipFramer::CreateNopFrame() {
    536   FlipFrameBuilder frame;
    537   frame.WriteUInt16(kControlFlagMask | kFlipProtocolVersion);
    538   frame.WriteUInt16(NOOP);
    539   frame.WriteUInt32(0);
    540   return reinterpret_cast<FlipControlFrame*>(frame.take());
    541 }
    542 
    543 static const int kCompressorLevel = Z_DEFAULT_COMPRESSION;
    544 // This is just a hacked dictionary to use for shrinking HTTP-like headers.
    545 // TODO(mbelshe): Use a scientific methodology for computing the dictionary.
    546 static const char dictionary[] =
    547   "optionsgetheadpostputdeletetraceacceptaccept-charsetaccept-encodingaccept-"
    548   "languageauthorizationexpectfromhostif-modified-sinceif-matchif-none-matchi"
    549   "f-rangeif-unmodifiedsincemax-forwardsproxy-authorizationrangerefererteuser"
    550   "-agent10010120020120220320420520630030130230330430530630740040140240340440"
    551   "5406407408409410411412413414415416417500501502503504505accept-rangesageeta"
    552   "glocationproxy-authenticatepublicretry-afterservervarywarningwww-authentic"
    553   "ateallowcontent-basecontent-encodingcache-controlconnectiondatetrailertran"
    554   "sfer-encodingupgradeviawarningcontent-languagecontent-lengthcontent-locati"
    555   "oncontent-md5content-rangecontent-typeetagexpireslast-modifiedset-cookieMo"
    556   "ndayTuesdayWednesdayThursdayFridaySaturdaySundayJanFebMarAprMayJunJulAugSe"
    557   "pOctNovDecchunkedtext/htmlimage/pngimage/jpgimage/gifapplication/xmlapplic"
    558   "ation/xhtmltext/plainpublicmax-agecharset=iso-8859-1utf-8gzipdeflateHTTP/1"
    559   ".1statusversionurl";
    560 static uLong dictionary_id = 0;
    561 
    562 bool FlipFramer::InitializeCompressor() {
    563   if (compressor_.get())
    564     return true;  // Already initialized.
    565 
    566   compressor_.reset(new z_stream);
    567   memset(compressor_.get(), 0, sizeof(z_stream));
    568 
    569   int success = deflateInit(compressor_.get(), kCompressorLevel);
    570   if (success == Z_OK)
    571     success = deflateSetDictionary(compressor_.get(),
    572                                    reinterpret_cast<const Bytef*>(dictionary),
    573                                    sizeof(dictionary));
    574   if (success != Z_OK)
    575     compressor_.reset(NULL);
    576   return success == Z_OK;
    577 }
    578 
    579 bool FlipFramer::InitializeDecompressor() {
    580   if (decompressor_.get())
    581     return true;  // Already initialized.
    582 
    583   decompressor_.reset(new z_stream);
    584   memset(decompressor_.get(), 0, sizeof(z_stream));
    585 
    586   // Compute the id of our dictionary so that we know we're using the
    587   // right one when asked for it.
    588   if (dictionary_id == 0) {
    589     dictionary_id = adler32(0L, Z_NULL, 0);
    590     dictionary_id = adler32(dictionary_id,
    591                             reinterpret_cast<const Bytef*>(dictionary),
    592                             sizeof(dictionary));
    593   }
    594 
    595   int success = inflateInit(decompressor_.get());
    596   if (success != Z_OK)
    597     decompressor_.reset(NULL);
    598   return success == Z_OK;
    599 }
    600 
    601 bool FlipFramer::GetFrameBoundaries(const FlipFrame* frame,
    602                                     int* payload_length,
    603                                     int* header_length,
    604                                     const char** payload) const {
    605   if (frame->is_control_frame()) {
    606     const FlipControlFrame* control_frame =
    607         reinterpret_cast<const FlipControlFrame*>(frame);
    608     switch (control_frame->type()) {
    609       case SYN_STREAM:
    610       case SYN_REPLY:
    611         {
    612           const FlipSynStreamControlFrame *syn_frame =
    613               reinterpret_cast<const FlipSynStreamControlFrame*>(frame);
    614           *payload_length = syn_frame->header_block_len();
    615           *header_length = syn_frame->size();
    616           *payload = frame->data() + *header_length;
    617         }
    618         break;
    619       default:
    620         // TODO(mbelshe): set an error?
    621         return false;  // We can't compress this frame!
    622     }
    623   } else {
    624     *header_length = FlipFrame::size();
    625     *payload_length = frame->length();
    626     *payload = frame->data() + FlipFrame::size();
    627   }
    628   DCHECK(static_cast<size_t>(*header_length) <=
    629       FlipFrame::size() + *payload_length);
    630   return true;
    631 }
    632 
    633 
    634 FlipFrame* FlipFramer::CompressFrame(const FlipFrame* frame) {
    635   int payload_length;
    636   int header_length;
    637   const char* payload;
    638 
    639   static StatsCounter pre_compress_bytes("flip.PreCompressSize");
    640   static StatsCounter post_compress_bytes("flip.PostCompressSize");
    641 
    642   if (!enable_compression_)
    643     return DuplicateFrame(frame);
    644 
    645   if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
    646     return NULL;
    647 
    648   if (!InitializeCompressor())
    649     return NULL;
    650 
    651   // TODO(mbelshe): Should we have a zlib header like what http servers do?
    652 
    653   // Create an output frame.
    654   int compressed_max_size = deflateBound(compressor_.get(), payload_length);
    655   int new_frame_size = header_length + compressed_max_size;
    656   FlipFrame* new_frame = new FlipFrame(new_frame_size);
    657   memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
    658 
    659   compressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
    660   compressor_->avail_in = payload_length;
    661   compressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
    662                           header_length;
    663   compressor_->avail_out = compressed_max_size;
    664 
    665   // Data packets have a 'compressed flag
    666   if (!new_frame->is_control_frame()) {
    667     FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
    668     data_frame->set_flags(data_frame->flags() | DATA_FLAG_COMPRESSED);
    669   }
    670 
    671   int rv = deflate(compressor_.get(), Z_SYNC_FLUSH);
    672   if (rv != Z_OK) {  // How can we know that it compressed everything?
    673     // This shouldn't happen, right?
    674     delete new_frame;
    675     return NULL;
    676   }
    677 
    678   int compressed_size = compressed_max_size - compressor_->avail_out;
    679   new_frame->set_length(header_length + compressed_size - FlipFrame::size());
    680 
    681   pre_compress_bytes.Add(payload_length);
    682   post_compress_bytes.Add(new_frame->length());
    683 
    684   return new_frame;
    685 }
    686 
    687 FlipFrame* FlipFramer::DecompressFrame(const FlipFrame* frame) {
    688   int payload_length;
    689   int header_length;
    690   const char* payload;
    691 
    692   static StatsCounter pre_decompress_bytes("flip.PreDeCompressSize");
    693   static StatsCounter post_decompress_bytes("flip.PostDeCompressSize");
    694 
    695   if (!enable_compression_)
    696     return DuplicateFrame(frame);
    697 
    698   if (!GetFrameBoundaries(frame, &payload_length, &header_length, &payload))
    699     return NULL;
    700 
    701   if (!frame->is_control_frame()) {
    702     const FlipDataFrame* data_frame =
    703         reinterpret_cast<const FlipDataFrame*>(frame);
    704     if ((data_frame->flags() & DATA_FLAG_COMPRESSED) == 0)
    705       return DuplicateFrame(frame);
    706   }
    707 
    708   if (!InitializeDecompressor())
    709     return NULL;
    710 
    711   // TODO(mbelshe): Should we have a zlib header like what http servers do?
    712 
    713   // Create an output frame.  Assume it does not need to be longer than
    714   // the input data.
    715   int decompressed_max_size = kControlFrameBufferInitialSize;
    716   int new_frame_size = header_length + decompressed_max_size;
    717   FlipFrame* new_frame = new FlipFrame(new_frame_size);
    718   memcpy(new_frame->data(), frame->data(), frame->length() + FlipFrame::size());
    719 
    720   decompressor_->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(payload));
    721   decompressor_->avail_in = payload_length;
    722   decompressor_->next_out = reinterpret_cast<Bytef*>(new_frame->data()) +
    723       header_length;
    724   decompressor_->avail_out = decompressed_max_size;
    725 
    726   int rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
    727   if (rv == Z_NEED_DICT) {
    728     // Need to try again with the right dictionary.
    729     if (decompressor_->adler == dictionary_id) {
    730       rv = inflateSetDictionary(decompressor_.get(), (const Bytef*)dictionary,
    731                                 sizeof(dictionary));
    732       if (rv == Z_OK)
    733         rv = inflate(decompressor_.get(), Z_SYNC_FLUSH);
    734     }
    735   }
    736   if (rv != Z_OK) {  // How can we know that it decompressed everything?
    737     delete new_frame;
    738     return NULL;
    739   }
    740 
    741   // Unset the compressed flag for data frames.
    742   if (!new_frame->is_control_frame()) {
    743     FlipDataFrame* data_frame = reinterpret_cast<FlipDataFrame*>(new_frame);
    744     data_frame->set_flags(data_frame->flags() & ~DATA_FLAG_COMPRESSED);
    745   }
    746 
    747   int decompressed_size = decompressed_max_size - decompressor_->avail_out;
    748   new_frame->set_length(header_length + decompressed_size - FlipFrame::size());
    749 
    750   pre_decompress_bytes.Add(frame->length());
    751   post_decompress_bytes.Add(new_frame->length());
    752 
    753   return new_frame;
    754 }
    755 
    756 FlipFrame* FlipFramer::DuplicateFrame(const FlipFrame* frame) {
    757   int size = FlipFrame::size() + frame->length();
    758   FlipFrame* new_frame = new FlipFrame(size);
    759   memcpy(new_frame->data(), frame->data(), size);
    760   return new_frame;
    761 }
    762 
    763 void FlipFramer::set_enable_compression(bool value) {
    764   enable_compression_ = value;
    765 }
    766 
    767 void FlipFramer::set_enable_compression_default(bool value) {
    768   compression_default_ = value;
    769 }
    770 
    771 }  // namespace flip
    772 
    773