1 # Copyright 2012, Google Inc. 2 # All rights reserved. 3 # 4 # Redistribution and use in source and binary forms, with or without 5 # modification, are permitted provided that the following conditions are 6 # met: 7 # 8 # * Redistributions of source code must retain the above copyright 9 # notice, this list of conditions and the following disclaimer. 10 # * Redistributions in binary form must reproduce the above 11 # copyright notice, this list of conditions and the following disclaimer 12 # in the documentation and/or other materials provided with the 13 # distribution. 14 # * Neither the name of Google Inc. nor the names of its 15 # contributors may be used to endorse or promote products derived from 16 # this software without specific prior written permission. 17 # 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31 from mod_pywebsocket import common 32 from mod_pywebsocket import util 33 from mod_pywebsocket.http_header_util import quote_if_necessary 34 35 36 _available_processors = {} 37 _compression_extension_names = [] 38 39 40 class ExtensionProcessorInterface(object): 41 42 def __init__(self, request): 43 self._request = request 44 self._active = True 45 46 def request(self): 47 return self._request 48 49 def name(self): 50 return None 51 52 def check_consistency_with_other_processors(self, processors): 53 pass 54 55 def set_active(self, active): 56 self._active = active 57 58 def is_active(self): 59 return self._active 60 61 def _get_extension_response_internal(self): 62 return None 63 64 def get_extension_response(self): 65 if self._active: 66 response = self._get_extension_response_internal() 67 if response is None: 68 self._active = False 69 return response 70 return None 71 72 def _setup_stream_options_internal(self, stream_options): 73 pass 74 75 def setup_stream_options(self, stream_options): 76 if self._active: 77 self._setup_stream_options_internal(stream_options) 78 79 80 def _log_outgoing_compression_ratio( 81 logger, original_bytes, filtered_bytes, average_ratio): 82 # Print inf when ratio is not available. 83 ratio = float('inf') 84 if original_bytes != 0: 85 ratio = float(filtered_bytes) / original_bytes 86 87 logger.debug('Outgoing compression ratio: %f (average: %f)' % 88 (ratio, average_ratio)) 89 90 91 def _log_incoming_compression_ratio( 92 logger, received_bytes, filtered_bytes, average_ratio): 93 # Print inf when ratio is not available. 94 ratio = float('inf') 95 if filtered_bytes != 0: 96 ratio = float(received_bytes) / filtered_bytes 97 98 logger.debug('Incoming compression ratio: %f (average: %f)' % 99 (ratio, average_ratio)) 100 101 102 def _parse_window_bits(bits): 103 """Return parsed integer value iff the given string conforms to the 104 grammar of the window bits extension parameters. 105 """ 106 107 if bits is None: 108 raise ValueError('Value is required') 109 110 # For non integer values such as "10.0", ValueError will be raised. 111 int_bits = int(bits) 112 113 # First condition is to drop leading zero case e.g. "08". 114 if bits != str(int_bits) or int_bits < 8 or int_bits > 15: 115 raise ValueError('Invalid value: %r' % bits) 116 117 return int_bits 118 119 120 class _AverageRatioCalculator(object): 121 """Stores total bytes of original and result data, and calculates average 122 result / original ratio. 123 """ 124 125 def __init__(self): 126 self._total_original_bytes = 0 127 self._total_result_bytes = 0 128 129 def add_original_bytes(self, value): 130 self._total_original_bytes += value 131 132 def add_result_bytes(self, value): 133 self._total_result_bytes += value 134 135 def get_average_ratio(self): 136 if self._total_original_bytes != 0: 137 return (float(self._total_result_bytes) / 138 self._total_original_bytes) 139 else: 140 return float('inf') 141 142 143 class DeflateFrameExtensionProcessor(ExtensionProcessorInterface): 144 """deflate-frame extension processor. 145 146 Specification: 147 http://tools.ietf.org/html/draft-tyoshino-hybi-websocket-perframe-deflate 148 """ 149 150 _WINDOW_BITS_PARAM = 'max_window_bits' 151 _NO_CONTEXT_TAKEOVER_PARAM = 'no_context_takeover' 152 153 def __init__(self, request): 154 ExtensionProcessorInterface.__init__(self, request) 155 self._logger = util.get_class_logger(self) 156 157 self._response_window_bits = None 158 self._response_no_context_takeover = False 159 self._bfinal = False 160 161 # Calculates 162 # (Total outgoing bytes supplied to this filter) / 163 # (Total bytes sent to the network after applying this filter) 164 self._outgoing_average_ratio_calculator = _AverageRatioCalculator() 165 166 # Calculates 167 # (Total bytes received from the network) / 168 # (Total incoming bytes obtained after applying this filter) 169 self._incoming_average_ratio_calculator = _AverageRatioCalculator() 170 171 def name(self): 172 return common.DEFLATE_FRAME_EXTENSION 173 174 def _get_extension_response_internal(self): 175 # Any unknown parameter will be just ignored. 176 177 window_bits = None 178 if self._request.has_parameter(self._WINDOW_BITS_PARAM): 179 window_bits = self._request.get_parameter_value( 180 self._WINDOW_BITS_PARAM) 181 try: 182 window_bits = _parse_window_bits(window_bits) 183 except ValueError, e: 184 return None 185 186 no_context_takeover = self._request.has_parameter( 187 self._NO_CONTEXT_TAKEOVER_PARAM) 188 if (no_context_takeover and 189 self._request.get_parameter_value( 190 self._NO_CONTEXT_TAKEOVER_PARAM) is not None): 191 return None 192 193 self._rfc1979_deflater = util._RFC1979Deflater( 194 window_bits, no_context_takeover) 195 196 self._rfc1979_inflater = util._RFC1979Inflater() 197 198 self._compress_outgoing = True 199 200 response = common.ExtensionParameter(self._request.name()) 201 202 if self._response_window_bits is not None: 203 response.add_parameter( 204 self._WINDOW_BITS_PARAM, str(self._response_window_bits)) 205 if self._response_no_context_takeover: 206 response.add_parameter( 207 self._NO_CONTEXT_TAKEOVER_PARAM, None) 208 209 self._logger.debug( 210 'Enable %s extension (' 211 'request: window_bits=%s; no_context_takeover=%r, ' 212 'response: window_wbits=%s; no_context_takeover=%r)' % 213 (self._request.name(), 214 window_bits, 215 no_context_takeover, 216 self._response_window_bits, 217 self._response_no_context_takeover)) 218 219 return response 220 221 def _setup_stream_options_internal(self, stream_options): 222 223 class _OutgoingFilter(object): 224 225 def __init__(self, parent): 226 self._parent = parent 227 228 def filter(self, frame): 229 self._parent._outgoing_filter(frame) 230 231 class _IncomingFilter(object): 232 233 def __init__(self, parent): 234 self._parent = parent 235 236 def filter(self, frame): 237 self._parent._incoming_filter(frame) 238 239 stream_options.outgoing_frame_filters.append( 240 _OutgoingFilter(self)) 241 stream_options.incoming_frame_filters.insert( 242 0, _IncomingFilter(self)) 243 244 def set_response_window_bits(self, value): 245 self._response_window_bits = value 246 247 def set_response_no_context_takeover(self, value): 248 self._response_no_context_takeover = value 249 250 def set_bfinal(self, value): 251 self._bfinal = value 252 253 def enable_outgoing_compression(self): 254 self._compress_outgoing = True 255 256 def disable_outgoing_compression(self): 257 self._compress_outgoing = False 258 259 def _outgoing_filter(self, frame): 260 """Transform outgoing frames. This method is called only by 261 an _OutgoingFilter instance. 262 """ 263 264 original_payload_size = len(frame.payload) 265 self._outgoing_average_ratio_calculator.add_original_bytes( 266 original_payload_size) 267 268 if (not self._compress_outgoing or 269 common.is_control_opcode(frame.opcode)): 270 self._outgoing_average_ratio_calculator.add_result_bytes( 271 original_payload_size) 272 return 273 274 frame.payload = self._rfc1979_deflater.filter( 275 frame.payload, bfinal=self._bfinal) 276 frame.rsv1 = 1 277 278 filtered_payload_size = len(frame.payload) 279 self._outgoing_average_ratio_calculator.add_result_bytes( 280 filtered_payload_size) 281 282 _log_outgoing_compression_ratio( 283 self._logger, 284 original_payload_size, 285 filtered_payload_size, 286 self._outgoing_average_ratio_calculator.get_average_ratio()) 287 288 def _incoming_filter(self, frame): 289 """Transform incoming frames. This method is called only by 290 an _IncomingFilter instance. 291 """ 292 293 received_payload_size = len(frame.payload) 294 self._incoming_average_ratio_calculator.add_result_bytes( 295 received_payload_size) 296 297 if frame.rsv1 != 1 or common.is_control_opcode(frame.opcode): 298 self._incoming_average_ratio_calculator.add_original_bytes( 299 received_payload_size) 300 return 301 302 frame.payload = self._rfc1979_inflater.filter(frame.payload) 303 frame.rsv1 = 0 304 305 filtered_payload_size = len(frame.payload) 306 self._incoming_average_ratio_calculator.add_original_bytes( 307 filtered_payload_size) 308 309 _log_incoming_compression_ratio( 310 self._logger, 311 received_payload_size, 312 filtered_payload_size, 313 self._incoming_average_ratio_calculator.get_average_ratio()) 314 315 316 _available_processors[common.DEFLATE_FRAME_EXTENSION] = ( 317 DeflateFrameExtensionProcessor) 318 _compression_extension_names.append(common.DEFLATE_FRAME_EXTENSION) 319 320 _available_processors[common.X_WEBKIT_DEFLATE_FRAME_EXTENSION] = ( 321 DeflateFrameExtensionProcessor) 322 _compression_extension_names.append(common.X_WEBKIT_DEFLATE_FRAME_EXTENSION) 323 324 325 def _parse_compression_method(data): 326 """Parses the value of "method" extension parameter.""" 327 328 return common.parse_extensions(data, allow_quoted_string=True) 329 330 331 def _create_accepted_method_desc(method_name, method_params): 332 """Creates accepted-method-desc from given method name and parameters""" 333 334 extension = common.ExtensionParameter(method_name) 335 for name, value in method_params: 336 extension.add_parameter(name, value) 337 return common.format_extension(extension) 338 339 340 class CompressionExtensionProcessorBase(ExtensionProcessorInterface): 341 """Base class for perframe-compress and permessage-compress extension.""" 342 343 _METHOD_PARAM = 'method' 344 345 def __init__(self, request): 346 ExtensionProcessorInterface.__init__(self, request) 347 self._logger = util.get_class_logger(self) 348 self._compression_method_name = None 349 self._compression_processor = None 350 self._compression_processor_hook = None 351 352 def name(self): 353 return '' 354 355 def _lookup_compression_processor(self, method_desc): 356 return None 357 358 def _get_compression_processor_response(self): 359 """Looks up the compression processor based on the self._request and 360 returns the compression processor's response. 361 """ 362 363 method_list = self._request.get_parameter_value(self._METHOD_PARAM) 364 if method_list is None: 365 return None 366 methods = _parse_compression_method(method_list) 367 if methods is None: 368 return None 369 comression_processor = None 370 # The current implementation tries only the first method that matches 371 # supported algorithm. Following methods aren't tried even if the 372 # first one is rejected. 373 # TODO(bashi): Need to clarify this behavior. 374 for method_desc in methods: 375 compression_processor = self._lookup_compression_processor( 376 method_desc) 377 if compression_processor is not None: 378 self._compression_method_name = method_desc.name() 379 break 380 if compression_processor is None: 381 return None 382 383 if self._compression_processor_hook: 384 self._compression_processor_hook(compression_processor) 385 386 processor_response = compression_processor.get_extension_response() 387 if processor_response is None: 388 return None 389 self._compression_processor = compression_processor 390 return processor_response 391 392 def _get_extension_response_internal(self): 393 processor_response = self._get_compression_processor_response() 394 if processor_response is None: 395 return None 396 397 response = common.ExtensionParameter(self._request.name()) 398 accepted_method_desc = _create_accepted_method_desc( 399 self._compression_method_name, 400 processor_response.get_parameters()) 401 response.add_parameter(self._METHOD_PARAM, accepted_method_desc) 402 self._logger.debug( 403 'Enable %s extension (method: %s)' % 404 (self._request.name(), self._compression_method_name)) 405 return response 406 407 def _setup_stream_options_internal(self, stream_options): 408 if self._compression_processor is None: 409 return 410 self._compression_processor.setup_stream_options(stream_options) 411 412 def set_compression_processor_hook(self, hook): 413 self._compression_processor_hook = hook 414 415 def get_compression_processor(self): 416 return self._compression_processor 417 418 419 class PerFrameCompressExtensionProcessor(CompressionExtensionProcessorBase): 420 """perframe-compress processor. 421 422 Specification: 423 http://tools.ietf.org/html/draft-ietf-hybi-websocket-perframe-compression 424 """ 425 426 _DEFLATE_METHOD = 'deflate' 427 428 def __init__(self, request): 429 CompressionExtensionProcessorBase.__init__(self, request) 430 431 def name(self): 432 return common.PERFRAME_COMPRESSION_EXTENSION 433 434 def _lookup_compression_processor(self, method_desc): 435 if method_desc.name() == self._DEFLATE_METHOD: 436 return DeflateFrameExtensionProcessor(method_desc) 437 return None 438 439 440 _available_processors[common.PERFRAME_COMPRESSION_EXTENSION] = ( 441 PerFrameCompressExtensionProcessor) 442 _compression_extension_names.append(common.PERFRAME_COMPRESSION_EXTENSION) 443 444 445 class PerMessageDeflateExtensionProcessor(ExtensionProcessorInterface): 446 """permessage-deflate extension processor. It's also used for 447 permessage-compress extension when the deflate method is chosen. 448 449 Specification: 450 http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression-08 451 """ 452 453 _S2C_MAX_WINDOW_BITS_PARAM = 's2c_max_window_bits' 454 _S2C_NO_CONTEXT_TAKEOVER_PARAM = 's2c_no_context_takeover' 455 _C2S_MAX_WINDOW_BITS_PARAM = 'c2s_max_window_bits' 456 _C2S_NO_CONTEXT_TAKEOVER_PARAM = 'c2s_no_context_takeover' 457 458 def __init__(self, request, draft08=True): 459 """Construct PerMessageDeflateExtensionProcessor 460 461 Args: 462 draft08: Follow the constraints on the parameters that were not 463 specified for permessage-compress but are specified for 464 permessage-deflate as on 465 draft-ietf-hybi-permessage-compression-08. 466 """ 467 468 ExtensionProcessorInterface.__init__(self, request) 469 self._logger = util.get_class_logger(self) 470 471 self._preferred_c2s_max_window_bits = None 472 self._c2s_no_context_takeover = False 473 474 self._draft08 = draft08 475 476 def name(self): 477 return 'deflate' 478 479 def _get_extension_response_internal(self): 480 if self._draft08: 481 for name in self._request.get_parameter_names(): 482 if name not in [self._S2C_MAX_WINDOW_BITS_PARAM, 483 self._S2C_NO_CONTEXT_TAKEOVER_PARAM, 484 self._C2S_MAX_WINDOW_BITS_PARAM]: 485 self._logger.debug('Unknown parameter: %r', name) 486 return None 487 else: 488 # Any unknown parameter will be just ignored. 489 pass 490 491 s2c_max_window_bits = None 492 if self._request.has_parameter(self._S2C_MAX_WINDOW_BITS_PARAM): 493 s2c_max_window_bits = self._request.get_parameter_value( 494 self._S2C_MAX_WINDOW_BITS_PARAM) 495 try: 496 s2c_max_window_bits = _parse_window_bits(s2c_max_window_bits) 497 except ValueError, e: 498 self._logger.debug('Bad %s parameter: %r', 499 self._S2C_MAX_WINDOW_BITS_PARAM, 500 e) 501 return None 502 503 s2c_no_context_takeover = self._request.has_parameter( 504 self._S2C_NO_CONTEXT_TAKEOVER_PARAM) 505 if (s2c_no_context_takeover and 506 self._request.get_parameter_value( 507 self._S2C_NO_CONTEXT_TAKEOVER_PARAM) is not None): 508 self._logger.debug('%s parameter must not have a value: %r', 509 self._S2C_NO_CONTEXT_TAKEOVER_PARAM, 510 s2c_no_context_takeover) 511 return None 512 513 # c2s_max_window_bits from a client indicates whether the client can 514 # accept c2s_max_window_bits from a server or not. 515 client_c2s_max_window_bits = self._request.has_parameter( 516 self._C2S_MAX_WINDOW_BITS_PARAM) 517 if (self._draft08 and 518 client_c2s_max_window_bits and 519 self._request.get_parameter_value( 520 self._C2S_MAX_WINDOW_BITS_PARAM) is not None): 521 self._logger.debug('%s parameter must not have a value in a ' 522 'client\'s opening handshake: %r', 523 self._C2S_MAX_WINDOW_BITS_PARAM, 524 client_c2s_max_window_bits) 525 return None 526 527 self._rfc1979_deflater = util._RFC1979Deflater( 528 s2c_max_window_bits, s2c_no_context_takeover) 529 530 # Note that we prepare for incoming messages compressed with window 531 # bits upto 15 regardless of the c2s_max_window_bits value to be sent 532 # to the client. 533 self._rfc1979_inflater = util._RFC1979Inflater() 534 535 self._framer = _PerMessageDeflateFramer( 536 s2c_max_window_bits, s2c_no_context_takeover) 537 self._framer.set_bfinal(False) 538 self._framer.set_compress_outgoing_enabled(True) 539 540 response = common.ExtensionParameter(self._request.name()) 541 542 if s2c_max_window_bits is not None: 543 response.add_parameter( 544 self._S2C_MAX_WINDOW_BITS_PARAM, str(s2c_max_window_bits)) 545 546 if s2c_no_context_takeover: 547 response.add_parameter( 548 self._S2C_NO_CONTEXT_TAKEOVER_PARAM, None) 549 550 if self._preferred_c2s_max_window_bits is not None: 551 if self._draft08 and not client_c2s_max_window_bits: 552 self._logger.debug('Processor is configured to use %s but ' 553 'the client cannot accept it', 554 self._C2S_MAX_WINDOW_BITS_PARAM) 555 return None 556 response.add_parameter( 557 self._C2S_MAX_WINDOW_BITS_PARAM, 558 str(self._preferred_c2s_max_window_bits)) 559 560 if self._c2s_no_context_takeover: 561 response.add_parameter( 562 self._C2S_NO_CONTEXT_TAKEOVER_PARAM, None) 563 564 self._logger.debug( 565 'Enable %s extension (' 566 'request: s2c_max_window_bits=%s; s2c_no_context_takeover=%r, ' 567 'response: c2s_max_window_bits=%s; c2s_no_context_takeover=%r)' % 568 (self._request.name(), 569 s2c_max_window_bits, 570 s2c_no_context_takeover, 571 self._preferred_c2s_max_window_bits, 572 self._c2s_no_context_takeover)) 573 574 return response 575 576 def _setup_stream_options_internal(self, stream_options): 577 self._framer.setup_stream_options(stream_options) 578 579 def set_c2s_max_window_bits(self, value): 580 """If this option is specified, this class adds the c2s_max_window_bits 581 extension parameter to the handshake response, but doesn't reduce the 582 LZ77 sliding window size of its inflater. I.e., you can use this for 583 testing client implementation but cannot reduce memory usage of this 584 class. 585 586 If this method has been called with True and an offer without the 587 c2s_max_window_bits extension parameter is received, 588 - (When processing the permessage-deflate extension) this processor 589 declines the request. 590 - (When processing the permessage-compress extension) this processor 591 accepts the request. 592 """ 593 594 self._preferred_c2s_max_window_bits = value 595 596 def set_c2s_no_context_takeover(self, value): 597 """If this option is specified, this class adds the 598 c2s_no_context_takeover extension parameter to the handshake response, 599 but doesn't reset inflater for each message. I.e., you can use this for 600 testing client implementation but cannot reduce memory usage of this 601 class. 602 """ 603 604 self._c2s_no_context_takeover = value 605 606 def set_bfinal(self, value): 607 self._framer.set_bfinal(value) 608 609 def enable_outgoing_compression(self): 610 self._framer.set_compress_outgoing_enabled(True) 611 612 def disable_outgoing_compression(self): 613 self._framer.set_compress_outgoing_enabled(False) 614 615 616 class _PerMessageDeflateFramer(object): 617 """A framer for extensions with per-message DEFLATE feature.""" 618 619 def __init__(self, deflate_max_window_bits, deflate_no_context_takeover): 620 self._logger = util.get_class_logger(self) 621 622 self._rfc1979_deflater = util._RFC1979Deflater( 623 deflate_max_window_bits, deflate_no_context_takeover) 624 625 self._rfc1979_inflater = util._RFC1979Inflater() 626 627 self._bfinal = False 628 629 self._compress_outgoing_enabled = False 630 631 # True if a message is fragmented and compression is ongoing. 632 self._compress_ongoing = False 633 634 # Calculates 635 # (Total outgoing bytes supplied to this filter) / 636 # (Total bytes sent to the network after applying this filter) 637 self._outgoing_average_ratio_calculator = _AverageRatioCalculator() 638 639 # Calculates 640 # (Total bytes received from the network) / 641 # (Total incoming bytes obtained after applying this filter) 642 self._incoming_average_ratio_calculator = _AverageRatioCalculator() 643 644 def set_bfinal(self, value): 645 self._bfinal = value 646 647 def set_compress_outgoing_enabled(self, value): 648 self._compress_outgoing_enabled = value 649 650 def _process_incoming_message(self, message, decompress): 651 if not decompress: 652 return message 653 654 received_payload_size = len(message) 655 self._incoming_average_ratio_calculator.add_result_bytes( 656 received_payload_size) 657 658 message = self._rfc1979_inflater.filter(message) 659 660 filtered_payload_size = len(message) 661 self._incoming_average_ratio_calculator.add_original_bytes( 662 filtered_payload_size) 663 664 _log_incoming_compression_ratio( 665 self._logger, 666 received_payload_size, 667 filtered_payload_size, 668 self._incoming_average_ratio_calculator.get_average_ratio()) 669 670 return message 671 672 def _process_outgoing_message(self, message, end, binary): 673 if not binary: 674 message = message.encode('utf-8') 675 676 if not self._compress_outgoing_enabled: 677 return message 678 679 original_payload_size = len(message) 680 self._outgoing_average_ratio_calculator.add_original_bytes( 681 original_payload_size) 682 683 message = self._rfc1979_deflater.filter( 684 message, flush=end, bfinal=self._bfinal) 685 686 filtered_payload_size = len(message) 687 self._outgoing_average_ratio_calculator.add_result_bytes( 688 filtered_payload_size) 689 690 _log_outgoing_compression_ratio( 691 self._logger, 692 original_payload_size, 693 filtered_payload_size, 694 self._outgoing_average_ratio_calculator.get_average_ratio()) 695 696 if not self._compress_ongoing: 697 self._outgoing_frame_filter.set_compression_bit() 698 self._compress_ongoing = not end 699 return message 700 701 def _process_incoming_frame(self, frame): 702 if frame.rsv1 == 1 and not common.is_control_opcode(frame.opcode): 703 self._incoming_message_filter.decompress_next_message() 704 frame.rsv1 = 0 705 706 def _process_outgoing_frame(self, frame, compression_bit): 707 if (not compression_bit or 708 common.is_control_opcode(frame.opcode)): 709 return 710 711 frame.rsv1 = 1 712 713 def setup_stream_options(self, stream_options): 714 """Creates filters and sets them to the StreamOptions.""" 715 716 class _OutgoingMessageFilter(object): 717 718 def __init__(self, parent): 719 self._parent = parent 720 721 def filter(self, message, end=True, binary=False): 722 return self._parent._process_outgoing_message( 723 message, end, binary) 724 725 class _IncomingMessageFilter(object): 726 727 def __init__(self, parent): 728 self._parent = parent 729 self._decompress_next_message = False 730 731 def decompress_next_message(self): 732 self._decompress_next_message = True 733 734 def filter(self, message): 735 message = self._parent._process_incoming_message( 736 message, self._decompress_next_message) 737 self._decompress_next_message = False 738 return message 739 740 self._outgoing_message_filter = _OutgoingMessageFilter(self) 741 self._incoming_message_filter = _IncomingMessageFilter(self) 742 stream_options.outgoing_message_filters.append( 743 self._outgoing_message_filter) 744 stream_options.incoming_message_filters.append( 745 self._incoming_message_filter) 746 747 class _OutgoingFrameFilter(object): 748 749 def __init__(self, parent): 750 self._parent = parent 751 self._set_compression_bit = False 752 753 def set_compression_bit(self): 754 self._set_compression_bit = True 755 756 def filter(self, frame): 757 self._parent._process_outgoing_frame( 758 frame, self._set_compression_bit) 759 self._set_compression_bit = False 760 761 class _IncomingFrameFilter(object): 762 763 def __init__(self, parent): 764 self._parent = parent 765 766 def filter(self, frame): 767 self._parent._process_incoming_frame(frame) 768 769 self._outgoing_frame_filter = _OutgoingFrameFilter(self) 770 self._incoming_frame_filter = _IncomingFrameFilter(self) 771 stream_options.outgoing_frame_filters.append( 772 self._outgoing_frame_filter) 773 stream_options.incoming_frame_filters.append( 774 self._incoming_frame_filter) 775 776 stream_options.encode_text_message_to_utf8 = False 777 778 779 _available_processors[common.PERMESSAGE_DEFLATE_EXTENSION] = ( 780 PerMessageDeflateExtensionProcessor) 781 # TODO(tyoshino): Reorganize class names. 782 _compression_extension_names.append('deflate') 783 784 785 class PerMessageCompressExtensionProcessor( 786 CompressionExtensionProcessorBase): 787 """permessage-compress extension processor. 788 789 Specification: 790 http://tools.ietf.org/html/draft-ietf-hybi-permessage-compression 791 """ 792 793 _DEFLATE_METHOD = 'deflate' 794 795 def __init__(self, request): 796 CompressionExtensionProcessorBase.__init__(self, request) 797 798 def name(self): 799 return common.PERMESSAGE_COMPRESSION_EXTENSION 800 801 def _lookup_compression_processor(self, method_desc): 802 if method_desc.name() == self._DEFLATE_METHOD: 803 return PerMessageDeflateExtensionProcessor(method_desc, False) 804 return None 805 806 807 _available_processors[common.PERMESSAGE_COMPRESSION_EXTENSION] = ( 808 PerMessageCompressExtensionProcessor) 809 _compression_extension_names.append(common.PERMESSAGE_COMPRESSION_EXTENSION) 810 811 812 class MuxExtensionProcessor(ExtensionProcessorInterface): 813 """WebSocket multiplexing extension processor.""" 814 815 _QUOTA_PARAM = 'quota' 816 817 def __init__(self, request): 818 ExtensionProcessorInterface.__init__(self, request) 819 self._quota = 0 820 self._extensions = [] 821 822 def name(self): 823 return common.MUX_EXTENSION 824 825 def check_consistency_with_other_processors(self, processors): 826 before_mux = True 827 for processor in processors: 828 name = processor.name() 829 if name == self.name(): 830 before_mux = False 831 continue 832 if not processor.is_active(): 833 continue 834 if before_mux: 835 # Mux extension cannot be used after extensions 836 # that depend on frame boundary, extension data field, or any 837 # reserved bits which are attributed to each frame. 838 if (name == common.PERFRAME_COMPRESSION_EXTENSION or 839 name == common.DEFLATE_FRAME_EXTENSION or 840 name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION): 841 self.set_active(False) 842 return 843 else: 844 # Mux extension should not be applied before any history-based 845 # compression extension. 846 if (name == common.PERFRAME_COMPRESSION_EXTENSION or 847 name == common.DEFLATE_FRAME_EXTENSION or 848 name == common.X_WEBKIT_DEFLATE_FRAME_EXTENSION or 849 name == common.PERMESSAGE_COMPRESSION_EXTENSION or 850 name == common.X_WEBKIT_PERMESSAGE_COMPRESSION_EXTENSION): 851 self.set_active(False) 852 return 853 854 def _get_extension_response_internal(self): 855 self._active = False 856 quota = self._request.get_parameter_value(self._QUOTA_PARAM) 857 if quota is not None: 858 try: 859 quota = int(quota) 860 except ValueError, e: 861 return None 862 if quota < 0 or quota >= 2 ** 32: 863 return None 864 self._quota = quota 865 866 self._active = True 867 return common.ExtensionParameter(common.MUX_EXTENSION) 868 869 def _setup_stream_options_internal(self, stream_options): 870 pass 871 872 def set_quota(self, quota): 873 self._quota = quota 874 875 def quota(self): 876 return self._quota 877 878 def set_extensions(self, extensions): 879 self._extensions = extensions 880 881 def extensions(self): 882 return self._extensions 883 884 885 _available_processors[common.MUX_EXTENSION] = MuxExtensionProcessor 886 887 888 def get_extension_processor(extension_request): 889 processor_class = _available_processors.get(extension_request.name()) 890 if processor_class is None: 891 return None 892 return processor_class(extension_request) 893 894 895 def is_compression_extension(extension_name): 896 return extension_name in _compression_extension_names 897 898 899 # vi:sts=4 sw=4 et 900