1 # Copyright 2012, Google Inc. 2 # All rights reserved. 3 # 4 # Redistribution and use in source and binary forms, with or without 5 # modification, are permitted provided that the following conditions are 6 # met: 7 # 8 # * Redistributions of source code must retain the above copyright 9 # notice, this list of conditions and the following disclaimer. 10 # * Redistributions in binary form must reproduce the above 11 # copyright notice, this list of conditions and the following disclaimer 12 # in the documentation and/or other materials provided with the 13 # distribution. 14 # * Neither the name of Google Inc. nor the names of its 15 # contributors may be used to endorse or promote products derived from 16 # this software without specific prior written permission. 17 # 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31 """This file provides classes and helper functions for parsing/building frames 32 of the WebSocket protocol (RFC 6455). 33 34 Specification: 35 http://tools.ietf.org/html/rfc6455 36 """ 37 38 39 from collections import deque 40 import logging 41 import os 42 import struct 43 import time 44 45 from mod_pywebsocket import common 46 from mod_pywebsocket import util 47 from mod_pywebsocket._stream_base import BadOperationException 48 from mod_pywebsocket._stream_base import ConnectionTerminatedException 49 from mod_pywebsocket._stream_base import InvalidFrameException 50 from mod_pywebsocket._stream_base import InvalidUTF8Exception 51 from mod_pywebsocket._stream_base import StreamBase 52 from mod_pywebsocket._stream_base import UnsupportedFrameException 53 54 55 _NOOP_MASKER = util.NoopMasker() 56 57 58 class Frame(object): 59 60 def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0, 61 opcode=None, payload=''): 62 self.fin = fin 63 self.rsv1 = rsv1 64 self.rsv2 = rsv2 65 self.rsv3 = rsv3 66 self.opcode = opcode 67 self.payload = payload 68 69 70 # Helper functions made public to be used for writing unittests for WebSocket 71 # clients. 72 73 74 def create_length_header(length, mask): 75 """Creates a length header. 76 77 Args: 78 length: Frame length. Must be less than 2^63. 79 mask: Mask bit. Must be boolean. 80 81 Raises: 82 ValueError: when bad data is given. 83 """ 84 85 if mask: 86 mask_bit = 1 << 7 87 else: 88 mask_bit = 0 89 90 if length < 0: 91 raise ValueError('length must be non negative integer') 92 elif length <= 125: 93 return chr(mask_bit | length) 94 elif length < (1 << 16): 95 return chr(mask_bit | 126) + struct.pack('!H', length) 96 elif length < (1 << 63): 97 return chr(mask_bit | 127) + struct.pack('!Q', length) 98 else: 99 raise ValueError('Payload is too big for one frame') 100 101 102 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask): 103 """Creates a frame header. 104 105 Raises: 106 Exception: when bad data is given. 107 """ 108 109 if opcode < 0 or 0xf < opcode: 110 raise ValueError('Opcode out of range') 111 112 if payload_length < 0 or (1 << 63) <= payload_length: 113 raise ValueError('payload_length out of range') 114 115 if (fin | rsv1 | rsv2 | rsv3) & ~1: 116 raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1') 117 118 header = '' 119 120 first_byte = ((fin << 7) 121 | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4) 122 | opcode) 123 header += chr(first_byte) 124 header += create_length_header(payload_length, mask) 125 126 return header 127 128 129 def _build_frame(header, body, mask): 130 if not mask: 131 return header + body 132 133 masking_nonce = os.urandom(4) 134 masker = util.RepeatedXorMasker(masking_nonce) 135 136 return header + masking_nonce + masker.mask(body) 137 138 139 def _filter_and_format_frame_object(frame, mask, frame_filters): 140 for frame_filter in frame_filters: 141 frame_filter.filter(frame) 142 143 header = create_header( 144 frame.opcode, len(frame.payload), frame.fin, 145 frame.rsv1, frame.rsv2, frame.rsv3, mask) 146 return _build_frame(header, frame.payload, mask) 147 148 149 def create_binary_frame( 150 message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]): 151 """Creates a simple binary frame with no extension, reserved bit.""" 152 153 frame = Frame(fin=fin, opcode=opcode, payload=message) 154 return _filter_and_format_frame_object(frame, mask, frame_filters) 155 156 157 def create_text_frame( 158 message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]): 159 """Creates a simple text frame with no extension, reserved bit.""" 160 161 encoded_message = message.encode('utf-8') 162 return create_binary_frame(encoded_message, opcode, fin, mask, 163 frame_filters) 164 165 166 def parse_frame(receive_bytes, logger=None, 167 ws_version=common.VERSION_HYBI_LATEST, 168 unmask_receive=True): 169 """Parses a frame. Returns a tuple containing each header field and 170 payload. 171 172 Args: 173 receive_bytes: a function that reads frame data from a stream or 174 something similar. The function takes length of the bytes to be 175 read. The function must raise ConnectionTerminatedException if 176 there is not enough data to be read. 177 logger: a logging object. 178 ws_version: the version of WebSocket protocol. 179 unmask_receive: unmask received frames. When received unmasked 180 frame, raises InvalidFrameException. 181 182 Raises: 183 ConnectionTerminatedException: when receive_bytes raises it. 184 InvalidFrameException: when the frame contains invalid data. 185 """ 186 187 if not logger: 188 logger = logging.getLogger() 189 190 logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame') 191 192 received = receive_bytes(2) 193 194 first_byte = ord(received[0]) 195 fin = (first_byte >> 7) & 1 196 rsv1 = (first_byte >> 6) & 1 197 rsv2 = (first_byte >> 5) & 1 198 rsv3 = (first_byte >> 4) & 1 199 opcode = first_byte & 0xf 200 201 second_byte = ord(received[1]) 202 mask = (second_byte >> 7) & 1 203 payload_length = second_byte & 0x7f 204 205 logger.log(common.LOGLEVEL_FINE, 206 'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, ' 207 'Mask=%s, Payload_length=%s', 208 fin, rsv1, rsv2, rsv3, opcode, mask, payload_length) 209 210 if (mask == 1) != unmask_receive: 211 raise InvalidFrameException( 212 'Mask bit on the received frame did\'nt match masking ' 213 'configuration for received frames') 214 215 # The HyBi and later specs disallow putting a value in 0x0-0xFFFF 216 # into the 8-octet extended payload length field (or 0x0-0xFD in 217 # 2-octet field). 218 valid_length_encoding = True 219 length_encoding_bytes = 1 220 if payload_length == 127: 221 logger.log(common.LOGLEVEL_FINE, 222 'Receive 8-octet extended payload length') 223 224 extended_payload_length = receive_bytes(8) 225 payload_length = struct.unpack( 226 '!Q', extended_payload_length)[0] 227 if payload_length > 0x7FFFFFFFFFFFFFFF: 228 raise InvalidFrameException( 229 'Extended payload length >= 2^63') 230 if ws_version >= 13 and payload_length < 0x10000: 231 valid_length_encoding = False 232 length_encoding_bytes = 8 233 234 logger.log(common.LOGLEVEL_FINE, 235 'Decoded_payload_length=%s', payload_length) 236 elif payload_length == 126: 237 logger.log(common.LOGLEVEL_FINE, 238 'Receive 2-octet extended payload length') 239 240 extended_payload_length = receive_bytes(2) 241 payload_length = struct.unpack( 242 '!H', extended_payload_length)[0] 243 if ws_version >= 13 and payload_length < 126: 244 valid_length_encoding = False 245 length_encoding_bytes = 2 246 247 logger.log(common.LOGLEVEL_FINE, 248 'Decoded_payload_length=%s', payload_length) 249 250 if not valid_length_encoding: 251 logger.warning( 252 'Payload length is not encoded using the minimal number of ' 253 'bytes (%d is encoded using %d bytes)', 254 payload_length, 255 length_encoding_bytes) 256 257 if mask == 1: 258 logger.log(common.LOGLEVEL_FINE, 'Receive mask') 259 260 masking_nonce = receive_bytes(4) 261 masker = util.RepeatedXorMasker(masking_nonce) 262 263 logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce) 264 else: 265 masker = _NOOP_MASKER 266 267 logger.log(common.LOGLEVEL_FINE, 'Receive payload data') 268 if logger.isEnabledFor(common.LOGLEVEL_FINE): 269 receive_start = time.time() 270 271 raw_payload_bytes = receive_bytes(payload_length) 272 273 if logger.isEnabledFor(common.LOGLEVEL_FINE): 274 logger.log( 275 common.LOGLEVEL_FINE, 276 'Done receiving payload data at %s MB/s', 277 payload_length / (time.time() - receive_start) / 1000 / 1000) 278 logger.log(common.LOGLEVEL_FINE, 'Unmask payload data') 279 280 if logger.isEnabledFor(common.LOGLEVEL_FINE): 281 unmask_start = time.time() 282 283 unmasked_bytes = masker.mask(raw_payload_bytes) 284 285 if logger.isEnabledFor(common.LOGLEVEL_FINE): 286 logger.log( 287 common.LOGLEVEL_FINE, 288 'Done unmasking payload data at %s MB/s', 289 payload_length / (time.time() - unmask_start) / 1000 / 1000) 290 291 return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 292 293 294 class FragmentedFrameBuilder(object): 295 """A stateful class to send a message as fragments.""" 296 297 def __init__(self, mask, frame_filters=[], encode_utf8=True): 298 """Constructs an instance.""" 299 300 self._mask = mask 301 self._frame_filters = frame_filters 302 # This is for skipping UTF-8 encoding when building text type frames 303 # from compressed data. 304 self._encode_utf8 = encode_utf8 305 306 self._started = False 307 308 # Hold opcode of the first frame in messages to verify types of other 309 # frames in the message are all the same. 310 self._opcode = common.OPCODE_TEXT 311 312 def build(self, payload_data, end, binary): 313 if binary: 314 frame_type = common.OPCODE_BINARY 315 else: 316 frame_type = common.OPCODE_TEXT 317 if self._started: 318 if self._opcode != frame_type: 319 raise ValueError('Message types are different in frames for ' 320 'the same message') 321 opcode = common.OPCODE_CONTINUATION 322 else: 323 opcode = frame_type 324 self._opcode = frame_type 325 326 if end: 327 self._started = False 328 fin = 1 329 else: 330 self._started = True 331 fin = 0 332 333 if binary or not self._encode_utf8: 334 return create_binary_frame( 335 payload_data, opcode, fin, self._mask, self._frame_filters) 336 else: 337 return create_text_frame( 338 payload_data, opcode, fin, self._mask, self._frame_filters) 339 340 341 def _create_control_frame(opcode, body, mask, frame_filters): 342 frame = Frame(opcode=opcode, payload=body) 343 344 for frame_filter in frame_filters: 345 frame_filter.filter(frame) 346 347 if len(frame.payload) > 125: 348 raise BadOperationException( 349 'Payload data size of control frames must be 125 bytes or less') 350 351 header = create_header( 352 frame.opcode, len(frame.payload), frame.fin, 353 frame.rsv1, frame.rsv2, frame.rsv3, mask) 354 return _build_frame(header, frame.payload, mask) 355 356 357 def create_ping_frame(body, mask=False, frame_filters=[]): 358 return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters) 359 360 361 def create_pong_frame(body, mask=False, frame_filters=[]): 362 return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters) 363 364 365 def create_close_frame(body, mask=False, frame_filters=[]): 366 return _create_control_frame( 367 common.OPCODE_CLOSE, body, mask, frame_filters) 368 369 370 def create_closing_handshake_body(code, reason): 371 body = '' 372 if code is not None: 373 if (code > common.STATUS_USER_PRIVATE_MAX or 374 code < common.STATUS_NORMAL_CLOSURE): 375 raise BadOperationException('Status code is out of range') 376 if (code == common.STATUS_NO_STATUS_RECEIVED or 377 code == common.STATUS_ABNORMAL_CLOSURE or 378 code == common.STATUS_TLS_HANDSHAKE): 379 raise BadOperationException('Status code is reserved pseudo ' 380 'code') 381 encoded_reason = reason.encode('utf-8') 382 body = struct.pack('!H', code) + encoded_reason 383 return body 384 385 386 class StreamOptions(object): 387 """Holds option values to configure Stream objects.""" 388 389 def __init__(self): 390 """Constructs StreamOptions.""" 391 392 # Filters applied to frames. 393 self.outgoing_frame_filters = [] 394 self.incoming_frame_filters = [] 395 396 # Filters applied to messages. Control frames are not affected by them. 397 self.outgoing_message_filters = [] 398 self.incoming_message_filters = [] 399 400 self.encode_text_message_to_utf8 = True 401 self.mask_send = False 402 self.unmask_receive = True 403 404 405 class Stream(StreamBase): 406 """A class for parsing/building frames of the WebSocket protocol 407 (RFC 6455). 408 """ 409 410 def __init__(self, request, options): 411 """Constructs an instance. 412 413 Args: 414 request: mod_python request. 415 """ 416 417 StreamBase.__init__(self, request) 418 419 self._logger = util.get_class_logger(self) 420 421 self._options = options 422 423 self._request.client_terminated = False 424 self._request.server_terminated = False 425 426 # Holds body of received fragments. 427 self._received_fragments = [] 428 # Holds the opcode of the first fragment. 429 self._original_opcode = None 430 431 self._writer = FragmentedFrameBuilder( 432 self._options.mask_send, self._options.outgoing_frame_filters, 433 self._options.encode_text_message_to_utf8) 434 435 self._ping_queue = deque() 436 437 def _receive_frame(self): 438 """Receives a frame and return data in the frame as a tuple containing 439 each header field and payload separately. 440 441 Raises: 442 ConnectionTerminatedException: when read returns empty 443 string. 444 InvalidFrameException: when the frame contains invalid data. 445 """ 446 447 def _receive_bytes(length): 448 return self.receive_bytes(length) 449 450 return parse_frame(receive_bytes=_receive_bytes, 451 logger=self._logger, 452 ws_version=self._request.ws_version, 453 unmask_receive=self._options.unmask_receive) 454 455 def _receive_frame_as_frame_object(self): 456 opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame() 457 458 return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3, 459 opcode=opcode, payload=unmasked_bytes) 460 461 def receive_filtered_frame(self): 462 """Receives a frame and applies frame filters and message filters. 463 The frame to be received must satisfy following conditions: 464 - The frame is not fragmented. 465 - The opcode of the frame is TEXT or BINARY. 466 467 DO NOT USE this method except for testing purpose. 468 """ 469 470 frame = self._receive_frame_as_frame_object() 471 if not frame.fin: 472 raise InvalidFrameException( 473 'Segmented frames must not be received via ' 474 'receive_filtered_frame()') 475 if (frame.opcode != common.OPCODE_TEXT and 476 frame.opcode != common.OPCODE_BINARY): 477 raise InvalidFrameException( 478 'Control frames must not be received via ' 479 'receive_filtered_frame()') 480 481 for frame_filter in self._options.incoming_frame_filters: 482 frame_filter.filter(frame) 483 for message_filter in self._options.incoming_message_filters: 484 frame.payload = message_filter.filter(frame.payload) 485 return frame 486 487 def send_message(self, message, end=True, binary=False): 488 """Send message. 489 490 Args: 491 message: text in unicode or binary in str to send. 492 binary: send message as binary frame. 493 494 Raises: 495 BadOperationException: when called on a server-terminated 496 connection or called with inconsistent message type or 497 binary parameter. 498 """ 499 500 if self._request.server_terminated: 501 raise BadOperationException( 502 'Requested send_message after sending out a closing handshake') 503 504 if binary and isinstance(message, unicode): 505 raise BadOperationException( 506 'Message for binary frame must be instance of str') 507 508 for message_filter in self._options.outgoing_message_filters: 509 message = message_filter.filter(message, end, binary) 510 511 try: 512 # Set this to any positive integer to limit maximum size of data in 513 # payload data of each frame. 514 MAX_PAYLOAD_DATA_SIZE = -1 515 516 if MAX_PAYLOAD_DATA_SIZE <= 0: 517 self._write(self._writer.build(message, end, binary)) 518 return 519 520 bytes_written = 0 521 while True: 522 end_for_this_frame = end 523 bytes_to_write = len(message) - bytes_written 524 if (MAX_PAYLOAD_DATA_SIZE > 0 and 525 bytes_to_write > MAX_PAYLOAD_DATA_SIZE): 526 end_for_this_frame = False 527 bytes_to_write = MAX_PAYLOAD_DATA_SIZE 528 529 frame = self._writer.build( 530 message[bytes_written:bytes_written + bytes_to_write], 531 end_for_this_frame, 532 binary) 533 self._write(frame) 534 535 bytes_written += bytes_to_write 536 537 # This if must be placed here (the end of while block) so that 538 # at least one frame is sent. 539 if len(message) <= bytes_written: 540 break 541 except ValueError, e: 542 raise BadOperationException(e) 543 544 def _get_message_from_frame(self, frame): 545 """Gets a message from frame. If the message is composed of fragmented 546 frames and the frame is not the last fragmented frame, this method 547 returns None. The whole message will be returned when the last 548 fragmented frame is passed to this method. 549 550 Raises: 551 InvalidFrameException: when the frame doesn't match defragmentation 552 context, or the frame contains invalid data. 553 """ 554 555 if frame.opcode == common.OPCODE_CONTINUATION: 556 if not self._received_fragments: 557 if frame.fin: 558 raise InvalidFrameException( 559 'Received a termination frame but fragmentation ' 560 'not started') 561 else: 562 raise InvalidFrameException( 563 'Received an intermediate frame but ' 564 'fragmentation not started') 565 566 if frame.fin: 567 # End of fragmentation frame 568 self._received_fragments.append(frame.payload) 569 message = ''.join(self._received_fragments) 570 self._received_fragments = [] 571 return message 572 else: 573 # Intermediate frame 574 self._received_fragments.append(frame.payload) 575 return None 576 else: 577 if self._received_fragments: 578 if frame.fin: 579 raise InvalidFrameException( 580 'Received an unfragmented frame without ' 581 'terminating existing fragmentation') 582 else: 583 raise InvalidFrameException( 584 'New fragmentation started without terminating ' 585 'existing fragmentation') 586 587 if frame.fin: 588 # Unfragmented frame 589 590 self._original_opcode = frame.opcode 591 return frame.payload 592 else: 593 # Start of fragmentation frame 594 595 if common.is_control_opcode(frame.opcode): 596 raise InvalidFrameException( 597 'Control frames must not be fragmented') 598 599 self._original_opcode = frame.opcode 600 self._received_fragments.append(frame.payload) 601 return None 602 603 def _process_close_message(self, message): 604 """Processes close message. 605 606 Args: 607 message: close message. 608 609 Raises: 610 InvalidFrameException: when the message is invalid. 611 """ 612 613 self._request.client_terminated = True 614 615 # Status code is optional. We can have status reason only if we 616 # have status code. Status reason can be empty string. So, 617 # allowed cases are 618 # - no application data: no code no reason 619 # - 2 octet of application data: has code but no reason 620 # - 3 or more octet of application data: both code and reason 621 if len(message) == 0: 622 self._logger.debug('Received close frame (empty body)') 623 self._request.ws_close_code = ( 624 common.STATUS_NO_STATUS_RECEIVED) 625 elif len(message) == 1: 626 raise InvalidFrameException( 627 'If a close frame has status code, the length of ' 628 'status code must be 2 octet') 629 elif len(message) >= 2: 630 self._request.ws_close_code = struct.unpack( 631 '!H', message[0:2])[0] 632 self._request.ws_close_reason = message[2:].decode( 633 'utf-8', 'replace') 634 self._logger.debug( 635 'Received close frame (code=%d, reason=%r)', 636 self._request.ws_close_code, 637 self._request.ws_close_reason) 638 639 # As we've received a close frame, no more data is coming over the 640 # socket. We can now safely close the socket without worrying about 641 # RST sending. 642 643 if self._request.server_terminated: 644 self._logger.debug( 645 'Received ack for server-initiated closing handshake') 646 return 647 648 self._logger.debug( 649 'Received client-initiated closing handshake') 650 651 code = common.STATUS_NORMAL_CLOSURE 652 reason = '' 653 if hasattr(self._request, '_dispatcher'): 654 dispatcher = self._request._dispatcher 655 code, reason = dispatcher.passive_closing_handshake( 656 self._request) 657 if code is None and reason is not None and len(reason) > 0: 658 self._logger.warning( 659 'Handler specified reason despite code being None') 660 reason = '' 661 if reason is None: 662 reason = '' 663 self._send_closing_handshake(code, reason) 664 self._logger.debug( 665 'Acknowledged closing handshake initiated by the peer ' 666 '(code=%r, reason=%r)', code, reason) 667 668 def _process_ping_message(self, message): 669 """Processes ping message. 670 671 Args: 672 message: ping message. 673 """ 674 675 try: 676 handler = self._request.on_ping_handler 677 if handler: 678 handler(self._request, message) 679 return 680 except AttributeError, e: 681 pass 682 self._send_pong(message) 683 684 def _process_pong_message(self, message): 685 """Processes pong message. 686 687 Args: 688 message: pong message. 689 """ 690 691 # TODO(tyoshino): Add ping timeout handling. 692 693 inflight_pings = deque() 694 695 while True: 696 try: 697 expected_body = self._ping_queue.popleft() 698 if expected_body == message: 699 # inflight_pings contains pings ignored by the 700 # other peer. Just forget them. 701 self._logger.debug( 702 'Ping %r is acked (%d pings were ignored)', 703 expected_body, len(inflight_pings)) 704 break 705 else: 706 inflight_pings.append(expected_body) 707 except IndexError, e: 708 # The received pong was unsolicited pong. Keep the 709 # ping queue as is. 710 self._ping_queue = inflight_pings 711 self._logger.debug('Received a unsolicited pong') 712 break 713 714 try: 715 handler = self._request.on_pong_handler 716 if handler: 717 handler(self._request, message) 718 except AttributeError, e: 719 pass 720 721 def receive_message(self): 722 """Receive a WebSocket frame and return its payload as a text in 723 unicode or a binary in str. 724 725 Returns: 726 payload data of the frame 727 - as unicode instance if received text frame 728 - as str instance if received binary frame 729 or None iff received closing handshake. 730 Raises: 731 BadOperationException: when called on a client-terminated 732 connection. 733 ConnectionTerminatedException: when read returns empty 734 string. 735 InvalidFrameException: when the frame contains invalid 736 data. 737 UnsupportedFrameException: when the received frame has 738 flags, opcode we cannot handle. You can ignore this 739 exception and continue receiving the next frame. 740 """ 741 742 if self._request.client_terminated: 743 raise BadOperationException( 744 'Requested receive_message after receiving a closing ' 745 'handshake') 746 747 while True: 748 # mp_conn.read will block if no bytes are available. 749 # Timeout is controlled by TimeOut directive of Apache. 750 751 frame = self._receive_frame_as_frame_object() 752 753 # Check the constraint on the payload size for control frames 754 # before extension processes the frame. 755 # See also http://tools.ietf.org/html/rfc6455#section-5.5 756 if (common.is_control_opcode(frame.opcode) and 757 len(frame.payload) > 125): 758 raise InvalidFrameException( 759 'Payload data size of control frames must be 125 bytes or ' 760 'less') 761 762 for frame_filter in self._options.incoming_frame_filters: 763 frame_filter.filter(frame) 764 765 if frame.rsv1 or frame.rsv2 or frame.rsv3: 766 raise UnsupportedFrameException( 767 'Unsupported flag is set (rsv = %d%d%d)' % 768 (frame.rsv1, frame.rsv2, frame.rsv3)) 769 770 message = self._get_message_from_frame(frame) 771 if message is None: 772 continue 773 774 for message_filter in self._options.incoming_message_filters: 775 message = message_filter.filter(message) 776 777 if self._original_opcode == common.OPCODE_TEXT: 778 # The WebSocket protocol section 4.4 specifies that invalid 779 # characters must be replaced with U+fffd REPLACEMENT 780 # CHARACTER. 781 try: 782 return message.decode('utf-8') 783 except UnicodeDecodeError, e: 784 raise InvalidUTF8Exception(e) 785 elif self._original_opcode == common.OPCODE_BINARY: 786 return message 787 elif self._original_opcode == common.OPCODE_CLOSE: 788 self._process_close_message(message) 789 return None 790 elif self._original_opcode == common.OPCODE_PING: 791 self._process_ping_message(message) 792 elif self._original_opcode == common.OPCODE_PONG: 793 self._process_pong_message(message) 794 else: 795 raise UnsupportedFrameException( 796 'Opcode %d is not supported' % self._original_opcode) 797 798 def _send_closing_handshake(self, code, reason): 799 body = create_closing_handshake_body(code, reason) 800 frame = create_close_frame( 801 body, mask=self._options.mask_send, 802 frame_filters=self._options.outgoing_frame_filters) 803 804 self._request.server_terminated = True 805 806 self._write(frame) 807 808 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason='', 809 wait_response=True): 810 """Closes a WebSocket connection. 811 812 Args: 813 code: Status code for close frame. If code is None, a close 814 frame with empty body will be sent. 815 reason: string representing close reason. 816 wait_response: True when caller want to wait the response. 817 Raises: 818 BadOperationException: when reason is specified with code None 819 or reason is not an instance of both str and unicode. 820 """ 821 822 if self._request.server_terminated: 823 self._logger.debug( 824 'Requested close_connection but server is already terminated') 825 return 826 827 if code is None: 828 if reason is not None and len(reason) > 0: 829 raise BadOperationException( 830 'close reason must not be specified if code is None') 831 reason = '' 832 else: 833 if not isinstance(reason, str) and not isinstance(reason, unicode): 834 raise BadOperationException( 835 'close reason must be an instance of str or unicode') 836 837 self._send_closing_handshake(code, reason) 838 self._logger.debug( 839 'Initiated closing handshake (code=%r, reason=%r)', 840 code, reason) 841 842 if (code == common.STATUS_GOING_AWAY or 843 code == common.STATUS_PROTOCOL_ERROR) or not wait_response: 844 # It doesn't make sense to wait for a close frame if the reason is 845 # protocol error or that the server is going away. For some of 846 # other reasons, it might not make sense to wait for a close frame, 847 # but it's not clear, yet. 848 return 849 850 # TODO(ukai): 2. wait until the /client terminated/ flag has been set, 851 # or until a server-defined timeout expires. 852 # 853 # For now, we expect receiving closing handshake right after sending 854 # out closing handshake. 855 message = self.receive_message() 856 if message is not None: 857 raise ConnectionTerminatedException( 858 'Didn\'t receive valid ack for closing handshake') 859 # TODO: 3. close the WebSocket connection. 860 # note: mod_python Connection (mp_conn) doesn't have close method. 861 862 def send_ping(self, body=''): 863 frame = create_ping_frame( 864 body, 865 self._options.mask_send, 866 self._options.outgoing_frame_filters) 867 self._write(frame) 868 869 self._ping_queue.append(body) 870 871 def _send_pong(self, body): 872 frame = create_pong_frame( 873 body, 874 self._options.mask_send, 875 self._options.outgoing_frame_filters) 876 self._write(frame) 877 878 def get_last_received_opcode(self): 879 """Returns the opcode of the WebSocket message which the last received 880 frame belongs to. The return value is valid iff immediately after 881 receive_message call. 882 """ 883 884 return self._original_opcode 885 886 887 # vi:sts=4 sw=4 et 888