Home | History | Annotate | Download | only in mod_pywebsocket
      1 # Copyright 2012, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 
     31 from mod_pywebsocket import common
     32 from mod_pywebsocket import util
     33 from mod_pywebsocket.http_header_util import quote_if_necessary
     34 
     35 
     36 _available_processors = {}
     37 _compression_extension_names = []
     38 
     39 
     40 class ExtensionProcessorInterface(object):
     41 
     42     def __init__(self, request):
     43         self._request = request
     44         self._active = True
     45 
     46     def request(self):
     47         return self._request
     48 
     49     def name(self):
     50         return None
     51 
     52     def check_consistency_with_other_processors(self, processors):
     53         pass
     54 
     55     def set_active(self, active):
     56         self._active = active
     57 
     58     def is_active(self):
     59         return self._active
     60 
     61     def _get_extension_response_internal(self):
     62         return None
     63 
     64     def get_extension_response(self):
     65         if self._active:
     66             response = self._get_extension_response_internal()
     67             if response is None:
     68                 self._active = False
     69             return response
     70         return None
     71 
     72     def _setup_stream_options_internal(self, stream_options):
     73         pass
     74 
     75     def setup_stream_options(self, stream_options):
     76         if self._active:
     77             self._setup_stream_options_internal(stream_options)
     78 
     79 
     80 def _log_outgoing_compression_ratio(
     81         logger, original_bytes, filtered_bytes, average_ratio):
     82     # Print inf when ratio is not available.
     83     ratio = float('inf')
     84     if original_bytes != 0:
     85         ratio = float(filtered_bytes) / original_bytes
     86 
     87     logger.debug('Outgoing compression ratio: %f (average: %f)' %
     88             (ratio, average_ratio))
     89 
     90 
     91 def _log_incoming_compression_ratio(
     92         logger, received_bytes, filtered_bytes, average_ratio):
     93     # Print inf when ratio is not available.
     94     ratio = float('inf')
     95     if filtered_bytes != 0:
     96         ratio = float(received_bytes) / filtered_bytes
     97 
     98     logger.debug('Incoming compression ratio: %f (average: %f)' %
     99             (ratio, average_ratio))
    100 
    101 
    102 def _parse_window_bits(bits):
    103     """Return parsed integer value iff the given string conforms to the
    104     grammar of the window bits extension parameters.
    105     """
    106 
    107     if bits is None:
    108         raise ValueError('Value is required')
    109 
    110     # For non integer values such as "10.0", ValueError will be raised.
    111     int_bits = int(bits)
    112 
    113     # First condition is to drop leading zero case e.g. "08".
    114     if bits != str(int_bits) or int_bits < 8 or int_bits > 15:
    115         raise ValueError('Invalid value: %r' % bits)
    116 
    117     return int_bits
    118 
    119 
    120 class _AverageRatioCalculator(object):
    121     """Stores total bytes of original and result data, and calculates average
    122     result / original ratio.
    123     """
    124 
    125     def __init__(self):
    126         self._total_original_bytes = 0
    127         self._total_result_bytes = 0
    128 
    129     def add_original_bytes(self, value):
    130         self._total_original_bytes += value
    131 
    132     def add_result_bytes(self, value):
    133         self._total_result_bytes += value
    134 
    135     def get_average_ratio(self):
    136         if self._total_original_bytes != 0:
    137             return (float(self._total_result_bytes) /
    138                     self._total_original_bytes)
    139         else:
    140             return float('inf')
    141 
    142 
    143 class DeflateFrameExtensionProcessor(ExtensionProcessorInterface):
    144     """deflate-frame extension processor.
    145 
    146     Specification:
    147     http://tools.ietf.org/html/draft-tyoshino-hybi-websocket-perframe-deflate
    148     """
    149 
    150     _WINDOW_BITS_PARAM = 'max_window_bits'
    151     _NO_CONTEXT_TAKEOVER_PARAM = 'no_context_takeover'
    152 
    153     def __init__(self, request):
    154         ExtensionProcessorInterface.__init__(self, request)
    155         self._logger = util.get_class_logger(self)
    156 
    157         self._response_window_bits = None
    158         self._response_no_context_takeover = False
    159         self._bfinal = False
    160 
    161         # Calculates
    162         #     (Total outgoing bytes supplied to this filter) /
    163         #     (Total bytes sent to the network after applying this filter)
    164         self._outgoing_average_ratio_calculator = _AverageRatioCalculator()
    165 
    166         # Calculates
    167         #     (Total bytes received from the network) /
    168         #     (Total incoming bytes obtained after applying this filter)
    169         self._incoming_average_ratio_calculator = _AverageRatioCalculator()
    170 
    171     def name(self):
    172         return common.DEFLATE_FRAME_EXTENSION
    173 
    174     def _get_extension_response_internal(self):
    175         # Any unknown parameter will be just ignored.
    176 
    177         window_bits = None
    178         if self._request.has_parameter(self._WINDOW_BITS_PARAM):
    179             window_bits = self._request.get_parameter_value(
    180                 self._WINDOW_BITS_PARAM)
    181             try:
    182                 window_bits = _parse_window_bits(window_bits)
    183             except ValueError, e:
    184                 return None
    185 
    186         no_context_takeover = self._request.has_parameter(
    187             self._NO_CONTEXT_TAKEOVER_PARAM)
    188         if (no_context_takeover and
    189             self._request.get_parameter_value(
    190                 self._NO_CONTEXT_TAKEOVER_PARAM) is not None):
    191             return None
    192 
    193         self._rfc1979_deflater = util._RFC1979Deflater(
    194             window_bits, no_context_takeover)
    195 
    196         self._rfc1979_inflater = util._RFC1979Inflater()
    197 
    198         self._compress_outgoing = True
    199 
    200         response = common.ExtensionParameter(self._request.name())
    201 
    202         if self._response_window_bits is not None:
    203             response.add_parameter(
    204                 self._WINDOW_BITS_PARAM, str(self._response_window_bits))
    205         if self._response_no_context_takeover:
    206             response.add_parameter(
    207                 self._NO_CONTEXT_TAKEOVER_PARAM, None)
    208 
    209         self._logger.debug(
    210             'Enable %s extension ('
    211             'request: window_bits=%s; no_context_takeover=%r, '
    212             'response: window_wbits=%s; no_context_takeover=%r)' %
    213             (self._request.name(),
    214              window_bits,
    215              no_context_takeover,
    216              self._response_window_bits,
    217              self._response_no_context_takeover))
    218 
    219         return response
    220 
    221     def _setup_stream_options_internal(self, stream_options):
    222 
    223         class _OutgoingFilter(object):
    224 
    225             def __init__(self, parent):
    226                 self._parent = parent
    227 
    228             def filter(self, frame):
    229                 self._parent._outgoing_filter(frame)
    230 
    231         class _IncomingFilter(object):
    232 
    233             def __init__(self, parent):
    234                 self._parent = parent
    235 
    236             def filter(self, frame):
    237                 self._parent._incoming_filter(frame)
    238 
    239         stream_options.outgoing_frame_filters.append(
    240             _OutgoingFilter(self))
    241         stream_options.incoming_frame_filters.insert(
    242             0, _IncomingFilter(self))
    243 
    244     def set_response_window_bits(self, value):
    245         self._response_window_bits = value
    246 
    247     def set_response_no_context_takeover(self, value):
    248         self._response_no_context_takeover = value
    249 
    250     def set_bfinal(self, value):
    251         self._bfinal = value
    252 
    253     def enable_outgoing_compression(self):
    254         self._compress_outgoing = True
    255 
    256     def disable_outgoing_compression(self):
    257         self._compress_outgoing = False
    258 
    259     def _outgoing_filter(self, frame):
    260         """Transform outgoing frames. This method is called only by
    261         an _OutgoingFilter instance.
    262         """
    263 
    264         original_payload_size = len(frame.payload)
    265         self._outgoing_average_ratio_calculator.add_original_bytes(
    266                 original_payload_size)
    267 
    268         if (not self._compress_outgoing or
    269             common.is_control_opcode(frame.opcode)):
    270             self._outgoing_average_ratio_calculator.add_result_bytes(
    271                     original_payload_size)
    272             return
    273 
    274         frame.payload = self._rfc1979_deflater.filter(
    275             frame.payload, bfinal=self._bfinal)
    276         frame.rsv1 = 1
    277 
    278         filtered_payload_size = len(frame.payload)
    279         self._outgoing_average_ratio_calculator.add_result_bytes(
    280                 filtered_payload_size)
    281 
    282         _log_outgoing_compression_ratio(
    283                 self._logger,
    284                 original_payload_size,
    285                 filtered_payload_size,
    286                 self._outgoing_average_ratio_calculator.get_average_ratio())
    287 
    288     def _incoming_filter(self, frame):
    289         """Transform incoming frames. This method is called only by
    290         an _IncomingFilter instance.
    291         """
    292 
    293         received_payload_size = len(frame.payload)
    294         self._incoming_average_ratio_calculator.add_result_bytes(
    295                 received_payload_size)
    296 
    297         if frame.rsv1 != 1 or common.is_control_opcode(frame.opcode):
    298             self._incoming_average_ratio_calculator.add_original_bytes(
    299                     received_payload_size)
    300             return
    301 
    302         frame.payload = self._rfc1979_inflater.filter(frame.payload)
    303         frame.rsv1 = 0
    304 
    305         filtered_payload_size = len(frame.payload)
    306         self._incoming_average_ratio_calculator.add_original_bytes(
    307                 filtered_payload_size)
    308 
    309         _log_incoming_compression_ratio(
    310                 self._logger,
    311                 received_payload_size,
    312                 filtered_payload_size,
    313                 self._incoming_average_ratio_calculator.get_average_ratio())
    314 
    315 
    316 _available_processors[common.DEFLATE_FRAME_EXTENSION] = (
    317     DeflateFrameExtensionProcessor)
    318 _compression_extension_names.append(common.DEFLATE_FRAME_EXTENSION)
    319 
    320 _available_processors[common.X_WEBKIT_DEFLATE_FRAME_EXTENSION] = (
    321     DeflateFrameExtensionProcessor)
    322 _compression_extension_names.append(common.X_WEBKIT_DEFLATE_FRAME_EXTENSION)
    323 
    324 
    325 def _parse_compression_method(data):
    326     """Parses the value of "method" extension parameter."""
    327 
    328     return common.parse_extensions(data, allow_quoted_string=True)
    329 
    330 
    331 def _create_accepted_method_desc(method_name, method_params):
    332     """Creates accepted-method-desc from given method name and parameters"""
    333 
    334     extension = common.ExtensionParameter(method_name)
    335     for name, value in method_params:
    336         extension.add_parameter(name, value)
    337     return common.format_extension(extension)
    338 
    339 
    340 class CompressionExtensionProcessorBase(ExtensionProcessorInterface):
    341     """Base class for perframe-compress and permessage-compress extension."""
    342 
    343     _METHOD_PARAM = 'method'
    344 
    345     def __init__(self, request):
    346         ExtensionProcessorInterface.__init__(self, request)
    347         self._logger = util.get_class_logger(self)
    348         self._compression_method_name = None
    349         self._compression_processor = None
    350         self._compression_processor_hook = None
    351 
    352     def name(self):
    353         return ''
    354 
    355     def _lookup_compression_processor(self, method_desc):
    356         return None
    357 
    358     def _get_compression_processor_response(self):
    359         """Looks up the compression processor based on the self._request and
    360            returns the compression processor's response.
    361         """
    362 
    363         method_list = self._request.get_parameter_value(self._METHOD_PARAM)
    364         if method_list is None:
    365             return None
    366         methods = _parse_compression_method(method_list)
    367         if methods is None:
    368             return None
    369         comression_processor = None
    370         # The current implementation tries only the first method that matches
    371         # supported algorithm. Following methods aren't tried even if the
    372         # first one is rejected.
    373         # TODO(bashi): Need to clarify this behavior.
    374         for method_desc in methods:
    375             compression_processor = self._lookup_compression_processor(
    376                 method_desc)
    377             if compression_processor is not None:
    378                 self._compression_method_name = method_desc.name()
    379                 break
    380         if compression_processor is None:
    381             return None
    382 
    383         if self._compression_processor_hook:
    384             self._compression_processor_hook(compression_processor)
    385 
    386         processor_response = compression_processor.get_extension_response()
    387         if processor_response is None:
    388             return None
    389         self._compression_processor = compression_processor
    390         return processor_response
    391 
    392     def _get_extension_response_internal(self):
    393         processor_response = self._get_compression_processor_response()
    394         if processor_response is None:
    395             return None
    396 
    397         response = common.ExtensionParameter(self._request.name())
    398         accepted_method_desc = _create_accepted_method_desc(
    399                                    self._compression_method_name,
    400                                    processor_response.get_parameters())
    401         response.add_parameter(self._METHOD_PARAM, accepted_method_desc)
    402         self._logger.debug(
    403             'Enable %s extension (method: %s)' %
    404             (self._request.name(), self._compression_method_name))
    405         return response
    406 
    407     def _setup_stream_options_internal(self, stream_options):
    408         if self._compression_processor is None:
    409             return
    410         self._compression_processor.setup_stream_options(stream_options)
    411 
    412     def set_compression_processor_hook(self, hook):
    413         self._compression_processor_hook = hook
    414 
    415     def get_compression_processor(self):
    416         return self._compression_processor
    417 
    418 
    419 class PerFrameCompressExtensionProcessor(CompressionExtensionProcessorBase):
    420     """perframe-compress processor.
    421 
    422     Specification:
    423     http://tools.ietf.org/html/draft-ietf-hybi-websocket-perframe-compression
    424     """
    425 
    426     _DEFLATE_METHOD = 'deflate'
    427 
    428     def __init__(self, request):
    429         CompressionExtensionProcessorBase.__init__(self, request)
    430 
    431     def name(self):
    432         return common.PERFRAME_COMPRESSION_EXTENSION
    433 
    434     def _lookup_compression_processor(self, method_desc):
    435         if method_desc.name() == self._DEFLATE_METHOD:
    436             return DeflateFrameExtensionProcessor(method_desc)
    437         return None
    438 
    439 
    440 _available_processors[common.PERFRAME_COMPRESSION_EXTENSION] = (
    441     PerFrameCompressExtensionProcessor)
    442 _compression_extension_names.append(common.PERFRAME_COMPRESSION_EXTENSION)
    443 
    444 
    445 class PerMessageDeflateExtensionProcessor(ExtensionProcessorInterface):
    446     """permessage-deflate extension processor. It's also used for
    447     permessage-compress extension when the deflate method is chosen.
    448 
    449     Specification:
    450     http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-08
    451     """
    452 
    453     _S2C_MAX_WINDOW_BITS_PARAM = 's2c_max_window_bits'
    454     _S2C_NO_CONTEXT_TAKEOVER_PARAM = 's2c_no_context_takeover'
    455     _C2S_MAX_WINDOW_BITS_PARAM = 'c2s_max_window_bits'
    456     _C2S_NO_CONTEXT_TAKEOVER_PARAM = 'c2s_no_context_takeover'
    457 
    458     def __init__(self, request, draft08=True):
    459         """Construct PerMessageDeflateExtensionProcessor
    460 
    461         Args:
    462             draft08: Follow the constraints on the parameters that were not
    463                 specified for permessage-compress but are specified for
    464                 permessage-deflate as on
    465                 draft-ietf-hybi-permessage-compression-08.
    466         """
    467 
    468         ExtensionProcessorInterface.__init__(self, request)
    469         self._logger = util.get_class_logger(self)
    470 
    471         self._preferred_c2s_max_window_bits = None
    472         self._c2s_no_context_takeover = False
    473 
    474         self._draft08 = draft08
    475 
    476     def name(self):
    477         return 'deflate'
    478 
    479     def _get_extension_response_internal(self):
    480         if self._draft08:
    481             for name in self._request.get_parameter_names():
    482                 if name not in [self._S2C_MAX_WINDOW_BITS_PARAM,
    483                                 self._S2C_NO_CONTEXT_TAKEOVER_PARAM,
    484                                 self._C2S_MAX_WINDOW_BITS_PARAM]:
    485                     self._logger.debug('Unknown parameter: %r', name)
    486                     return None
    487         else:
    488             # Any unknown parameter will be just ignored.
    489             pass
    490 
    491         s2c_max_window_bits = None
    492         if self._request.has_parameter(self._S2C_MAX_WINDOW_BITS_PARAM):
    493             s2c_max_window_bits = self._request.get_parameter_value(
    494                     self._S2C_MAX_WINDOW_BITS_PARAM)
    495             try:
    496                 s2c_max_window_bits = _parse_window_bits(s2c_max_window_bits)
    497             except ValueError, e:
    498                 self._logger.debug('Bad %s parameter: %r',
    499                                    self._S2C_MAX_WINDOW_BITS_PARAM,
    500                                    e)
    501                 return None
    502 
    503         s2c_no_context_takeover = self._request.has_parameter(
    504             self._S2C_NO_CONTEXT_TAKEOVER_PARAM)
    505         if (s2c_no_context_takeover and
    506             self._request.get_parameter_value(
    507                 self._S2C_NO_CONTEXT_TAKEOVER_PARAM) is not None):
    508             self._logger.debug('%s parameter must not have a value: %r',
    509                                self._S2C_NO_CONTEXT_TAKEOVER_PARAM,
    510                                s2c_no_context_takeover)
    511             return None
    512 
    513         # c2s_max_window_bits from a client indicates whether the client can
    514         # accept c2s_max_window_bits from a server or not.
    515         client_c2s_max_window_bits = self._request.has_parameter(
    516             self._C2S_MAX_WINDOW_BITS_PARAM)
    517         if (self._draft08 and
    518             client_c2s_max_window_bits and
    519             self._request.get_parameter_value(
    520                 self._C2S_MAX_WINDOW_BITS_PARAM) is not None):
    521             self._logger.debug('%s parameter must not have a value in a '
    522                                'client\'s opening handshake: %r',
    523                                self._C2S_MAX_WINDOW_BITS_PARAM,
    524                                client_c2s_max_window_bits)
    525             return None
    526 
    527         self._rfc1979_deflater = util._RFC1979Deflater(
    528             s2c_max_window_bits, s2c_no_context_takeover)
    529 
    530         # Note that we prepare for incoming messages compressed with window
    531         # bits upto 15 regardless of the c2s_max_window_bits value to be sent
    532         # to the client.
    533         self._rfc1979_inflater = util._RFC1979Inflater()
    534 
    535         self._framer = _PerMessageDeflateFramer(
    536             s2c_max_window_bits, s2c_no_context_takeover)
    537         self._framer.set_bfinal(False)
    538         self._framer.set_compress_outgoing_enabled(True)
    539 
    540         response = common.ExtensionParameter(self._request.name())
    541 
    542         if s2c_max_window_bits is not None:
    543             response.add_parameter(
    544                 self._S2C_MAX_WINDOW_BITS_PARAM, str(s2c_max_window_bits))
    545 
    546         if s2c_no_context_takeover:
    547             response.add_parameter(
    548                 self._S2C_NO_CONTEXT_TAKEOVER_PARAM, None)
    549 
    550         if self._preferred_c2s_max_window_bits is not None:
    551             if self._draft08 and not client_c2s_max_window_bits:
    552                 self._logger.debug('Processor is configured to use %s but '
    553                                    'the client cannot accept it',
    554                                    self._C2S_MAX_WINDOW_BITS_PARAM)
    555                 return None
    556             response.add_parameter(
    557                 self._C2S_MAX_WINDOW_BITS_PARAM,
    558                 str(self._preferred_c2s_max_window_bits))
    559 
    560         if self._c2s_no_context_takeover:
    561             response.add_parameter(
    562                 self._C2S_NO_CONTEXT_TAKEOVER_PARAM, None)
    563 
    564         self._logger.debug(
    565             'Enable %s extension ('
    566             'request: s2c_max_window_bits=%s; s2c_no_context_takeover=%r, '
    567             'response: c2s_max_window_bits=%s; c2s_no_context_takeover=%r)' %
    568             (self._request.name(),
    569              s2c_max_window_bits,
    570              s2c_no_context_takeover,
    571              self._preferred_c2s_max_window_bits,
    572              self._c2s_no_context_takeover))
    573 
    574         return response
    575 
    576     def _setup_stream_options_internal(self, stream_options):
    577         self._framer.setup_stream_options(stream_options)
    578 
    579     def set_c2s_max_window_bits(self, value):
    580         """If this option is specified, this class adds the c2s_max_window_bits
    581         extension parameter to the handshake response, but doesn't reduce the
    582         LZ77 sliding window size of its inflater. I.e., you can use this for
    583         testing client implementation but cannot reduce memory usage of this
    584         class.
    585 
    586         If this method has been called with True and an offer without the
    587         c2s_max_window_bits extension parameter is received,
    588         - (When processing the permessage-deflate extension) this processor
    589           declines the request.
    590         - (When processing the permessage-compress extension) this processor
    591           accepts the request.
    592         """
    593 
    594         self._preferred_c2s_max_window_bits = value
    595 
    596     def set_c2s_no_context_takeover(self, value):
    597         """If this option is specified, this class adds the
    598         c2s_no_context_takeover extension parameter to the handshake response,
    599         but doesn't reset inflater for each message. I.e., you can use this for
    600         testing client implementation but cannot reduce memory usage of this
    601         class.
    602         """
    603 
    604         self._c2s_no_context_takeover = value
    605 
    606     def set_bfinal(self, value):
    607         self._framer.set_bfinal(value)
    608 
    609     def enable_outgoing_compression(self):
    610         self._framer.set_compress_outgoing_enabled(True)
    611 
    612     def disable_outgoing_compression(self):
    613         self._framer.set_compress_outgoing_enabled(False)
    614 
    615 
    616 class _PerMessageDeflateFramer(object):
    617     """A framer for extensions with per-message DEFLATE feature."""
    618 
    619     def __init__(self, deflate_max_window_bits, deflate_no_context_takeover):
    620         self._logger = util.get_class_logger(self)
    621 
    622         self._rfc1979_deflater = util._RFC1979Deflater(
    623             deflate_max_window_bits, deflate_no_context_takeover)
    624 
    625         self._rfc1979_inflater = util._RFC1979Inflater()
    626 
    627         self._bfinal = False
    628 
    629         self._compress_outgoing_enabled = False
    630 
    631         # True if a message is fragmented and compression is ongoing.
    632         self._compress_ongoing = False
    633 
    634         # Calculates
    635         #     (Total outgoing bytes supplied to this filter) /
    636         #     (Total bytes sent to the network after applying this filter)
    637         self._outgoing_average_ratio_calculator = _AverageRatioCalculator()
    638 
    639         # Calculates
    640         #     (Total bytes received from the network) /
    641         #     (Total incoming bytes obtained after applying this filter)
    642         self._incoming_average_ratio_calculator = _AverageRatioCalculator()
    643 
    644     def set_bfinal(self, value):
    645         self._bfinal = value
    646 
    647     def set_compress_outgoing_enabled(self, value):
    648         self._compress_outgoing_enabled = value
    649 
    650     def _process_incoming_message(self, message, decompress):
    651         if not decompress:
    652             return message
    653 
    654         received_payload_size = len(message)
    655         self._incoming_average_ratio_calculator.add_result_bytes(
    656                 received_payload_size)
    657 
    658         message = self._rfc1979_inflater.filter(message)
    659 
    660         filtered_payload_size = len(message)
    661         self._incoming_average_ratio_calculator.add_original_bytes(
    662                 filtered_payload_size)
    663 
    664         _log_incoming_compression_ratio(
    665                 self._logger,
    666                 received_payload_size,
    667                 filtered_payload_size,
    668                 self._incoming_average_ratio_calculator.get_average_ratio())
    669 
    670         return message
    671 
    672     def _process_outgoing_message(self, message, end, binary):
    673         if not binary:
    674             message = message.encode('utf-8')
    675 
    676         if not self._compress_outgoing_enabled:
    677             return message
    678 
    679         original_payload_size = len(message)
    680         self._outgoing_average_ratio_calculator.add_original_bytes(
    681             original_payload_size)
    682 
    683         message = self._rfc1979_deflater.filter(
    684             message, flush=end, bfinal=self._bfinal)
    685 
    686         filtered_payload_size = len(message)
    687         self._outgoing_average_ratio_calculator.add_result_bytes(
    688             filtered_payload_size)
    689 
    690         _log_outgoing_compression_ratio(
    691                 self._logger,
    692                 original_payload_size,
    693                 filtered_payload_size,
    694                 self._outgoing_average_ratio_calculator.get_average_ratio())
    695 
    696         if not self._compress_ongoing:
    697             self._outgoing_frame_filter.set_compression_bit()
    698         self._compress_ongoing = not end
    699         return message
    700 
    701     def _process_incoming_frame(self, frame):
    702         if frame.rsv1 == 1 and not common.is_control_opcode(frame.opcode):
    703             self._incoming_message_filter.decompress_next_message()
    704             frame.rsv1 = 0
    705 
    706     def _process_outgoing_frame(self, frame, compression_bit):
    707         if (not compression_bit or
    708             common.is_control_opcode(frame.opcode)):
    709             return
    710 
    711         frame.rsv1 = 1
    712 
    713     def setup_stream_options(self, stream_options):
    714         """Creates filters and sets them to the StreamOptions."""
    715 
    716         class _OutgoingMessageFilter(object):
    717 
    718             def __init__(self, parent):
    719                 self._parent = parent
    720 
    721             def filter(self, message, end=True, binary=False):
    722                 return self._parent._process_outgoing_message(
    723                     message, end, binary)
    724 
    725         class _IncomingMessageFilter(object):
    726 
    727             def __init__(self, parent):
    728                 self._parent = parent
    729                 self._decompress_next_message = False
    730 
    731             def decompress_next_message(self):
    732                 self._decompress_next_message = True
    733 
    734             def filter(self, message):
    735                 message = self._parent._process_incoming_message(
    736                     message, self._decompress_next_message)
    737                 self._decompress_next_message = False
    738                 return message
    739 
    740         self._outgoing_message_filter = _OutgoingMessageFilter(self)
    741         self._incoming_message_filter = _IncomingMessageFilter(self)
    742         stream_options.outgoing_message_filters.append(
    743             self._outgoing_message_filter)
    744         stream_options.incoming_message_filters.append(
    745             self._incoming_message_filter)
    746 
    747         class _OutgoingFrameFilter(object):
    748 
    749             def __init__(self, parent):
    750                 self._parent = parent
    751                 self._set_compression_bit = False
    752 
    753             def set_compression_bit(self):
    754                 self._set_compression_bit = True
    755 
    756             def filter(self, frame):
    757                 self._parent._process_outgoing_frame(
    758                     frame, self._set_compression_bit)
    759                 self._set_compression_bit = False
    760 
    761         class _IncomingFrameFilter(object):
    762 
    763             def __init__(self, parent):
    764                 self._parent = parent
    765 
    766             def filter(self, frame):
    767                 self._parent._process_incoming_frame(frame)
    768 
    769         self._outgoing_frame_filter = _OutgoingFrameFilter(self)
    770         self._incoming_frame_filter = _IncomingFrameFilter(self)
    771         stream_options.outgoing_frame_filters.append(
    772             self._outgoing_frame_filter)
    773         stream_options.incoming_frame_filters.append(
    774             self._incoming_frame_filter)
    775 
    776         stream_options.encode_text_message_to_utf8 = False
    777 
    778 
    779 _available_processors[common.PERMESSAGE_DEFLATE_EXTENSION] = (
    780         PerMessageDeflateExtensionProcessor)
    781 # TODO(tyoshino): Reorganize class names.
    782 _compression_extension_names.append('deflate')
    783 
    784 
    785 class PerMessageCompressExtensionProcessor(
    786     CompressionExtensionProcessorBase):
    787     """permessage-compress extension processor.
    788 
    789     Specification:
    790     http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression
    791     """
    792 
    793     _DEFLATE_METHOD = 'deflate'
    794 
    795     def __init__(self, request):
    796         CompressionExtensionProcessorBase.__init__(self, request)
    797 
    798     def name(self):
    799         return common.PERMESSAGE_COMPRESSION_EXTENSION
    800 
    801     def _lookup_compression_processor(self, method_desc):
    802         if method_desc.name() == self._DEFLATE_METHOD:
    803             return PerMessageDeflateExtensionProcessor(method_desc, False)
    804         return None
    805 
    806 
    807 _available_processors[common.PERMESSAGE_COMPRESSION_EXTENSION] = (
    808     PerMessageCompressExtensionProcessor)
    809 _compression_extension_names.append(common.PERMESSAGE_COMPRESSION_EXTENSION)
    810 
    811 
    812 class MuxExtensionProcessor(ExtensionProcessorInterface):
    813     """WebSocket multiplexing extension processor."""
    814 
    815     _QUOTA_PARAM = 'quota'
    816 
    817     def __init__(self, request):
    818         ExtensionProcessorInterface.__init__(self, request)
    819         self._quota = 0
    820         self._extensions = []
    821 
    822     def name(self):
    823         return common.MUX_EXTENSION
    824 
    825     def check_consistency_with_other_processors(self, processors):
    826         before_mux = True
    827         for processor in processors:
    828             name = processor.name()
    829             if name == self.name():
    830                 before_mux = False
    831                 continue
    832             if not processor.is_active():
    833                 continue
    834             if before_mux:
    835                 # Mux extension cannot be used after extensions
    836                 # that depend on frame boundary, extension data field, or any
    837                 # reserved bits which are attributed to each frame.
    838                 if (name == common.PERFRAME_COMPRESSION_EXTENSION or
    839                     name == common.DEFLATE_FRAME_EXTENSION or
    840                     name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION):
    841                     self.set_active(False)
    842                     return
    843             else:
    844                 # Mux extension should not be applied before any history-based
    845                 # compression extension.
    846                 if (name == common.PERFRAME_COMPRESSION_EXTENSION or
    847                     name == common.DEFLATE_FRAME_EXTENSION or
    848                     name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION or
    849                     name == common.PERMESSAGE_COMPRESSION_EXTENSION or
    850                     name == common.X_WEBKIT_PERMESSAGE_COMPRESSION_EXTENSION):
    851                     self.set_active(False)
    852                     return
    853 
    854     def _get_extension_response_internal(self):
    855         self._active = False
    856         quota = self._request.get_parameter_value(self._QUOTA_PARAM)
    857         if quota is not None:
    858             try:
    859                 quota = int(quota)
    860             except ValueError, e:
    861                 return None
    862             if quota < 0 or quota >= 2 ** 32:
    863                 return None
    864             self._quota = quota
    865 
    866         self._active = True
    867         return common.ExtensionParameter(common.MUX_EXTENSION)
    868 
    869     def _setup_stream_options_internal(self, stream_options):
    870         pass
    871 
    872     def set_quota(self, quota):
    873         self._quota = quota
    874 
    875     def quota(self):
    876         return self._quota
    877 
    878     def set_extensions(self, extensions):
    879         self._extensions = extensions
    880 
    881     def extensions(self):
    882         return self._extensions
    883 
    884 
    885 _available_processors[common.MUX_EXTENSION] = MuxExtensionProcessor
    886 
    887 
    888 def get_extension_processor(extension_request):
    889     processor_class = _available_processors.get(extension_request.name())
    890     if processor_class is None:
    891         return None
    892     return processor_class(extension_request)
    893 
    894 
    895 def is_compression_extension(extension_name):
    896     return extension_name in _compression_extension_names
    897 
    898 
    899 # vi:sts=4 sw=4 et
    900