Home | History | Annotate | Download | only in mod_pywebsocket
      1 # Copyright 2012, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 
     31 """This file provides classes and helper functions for parsing/building frames
     32 of the WebSocket protocol (RFC 6455).
     33 
     34 Specification:
     35 http://tools.ietf.org/html/rfc6455
     36 """
     37 
     38 
     39 from collections import deque
     40 import logging
     41 import os
     42 import struct
     43 import time
     44 
     45 from mod_pywebsocket import common
     46 from mod_pywebsocket import util
     47 from mod_pywebsocket._stream_base import BadOperationException
     48 from mod_pywebsocket._stream_base import ConnectionTerminatedException
     49 from mod_pywebsocket._stream_base import InvalidFrameException
     50 from mod_pywebsocket._stream_base import InvalidUTF8Exception
     51 from mod_pywebsocket._stream_base import StreamBase
     52 from mod_pywebsocket._stream_base import UnsupportedFrameException
     53 
     54 
     55 _NOOP_MASKER = util.NoopMasker()
     56 
     57 
     58 class Frame(object):
     59 
     60     def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
     61                  opcode=None, payload=''):
     62         self.fin = fin
     63         self.rsv1 = rsv1
     64         self.rsv2 = rsv2
     65         self.rsv3 = rsv3
     66         self.opcode = opcode
     67         self.payload = payload
     68 
     69 
     70 # Helper functions made public to be used for writing unittests for WebSocket
     71 # clients.
     72 
     73 
     74 def create_length_header(length, mask):
     75     """Creates a length header.
     76 
     77     Args:
     78         length: Frame length. Must be less than 2^63.
     79         mask: Mask bit. Must be boolean.
     80 
     81     Raises:
     82         ValueError: when bad data is given.
     83     """
     84 
     85     if mask:
     86         mask_bit = 1 << 7
     87     else:
     88         mask_bit = 0
     89 
     90     if length < 0:
     91         raise ValueError('length must be non negative integer')
     92     elif length <= 125:
     93         return chr(mask_bit | length)
     94     elif length < (1 << 16):
     95         return chr(mask_bit | 126) + struct.pack('!H', length)
     96     elif length < (1 << 63):
     97         return chr(mask_bit | 127) + struct.pack('!Q', length)
     98     else:
     99         raise ValueError('Payload is too big for one frame')
    100 
    101 
    102 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
    103     """Creates a frame header.
    104 
    105     Raises:
    106         Exception: when bad data is given.
    107     """
    108 
    109     if opcode < 0 or 0xf < opcode:
    110         raise ValueError('Opcode out of range')
    111 
    112     if payload_length < 0 or (1 << 63) <= payload_length:
    113         raise ValueError('payload_length out of range')
    114 
    115     if (fin | rsv1 | rsv2 | rsv3) & ~1:
    116         raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
    117 
    118     header = ''
    119 
    120     first_byte = ((fin << 7)
    121                   | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
    122                   | opcode)
    123     header += chr(first_byte)
    124     header += create_length_header(payload_length, mask)
    125 
    126     return header
    127 
    128 
    129 def _build_frame(header, body, mask):
    130     if not mask:
    131         return header + body
    132 
    133     masking_nonce = os.urandom(4)
    134     masker = util.RepeatedXorMasker(masking_nonce)
    135 
    136     return header + masking_nonce + masker.mask(body)
    137 
    138 
    139 def _filter_and_format_frame_object(frame, mask, frame_filters):
    140     for frame_filter in frame_filters:
    141         frame_filter.filter(frame)
    142 
    143     header = create_header(
    144         frame.opcode, len(frame.payload), frame.fin,
    145         frame.rsv1, frame.rsv2, frame.rsv3, mask)
    146     return _build_frame(header, frame.payload, mask)
    147 
    148 
    149 def create_binary_frame(
    150     message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
    151     """Creates a simple binary frame with no extension, reserved bit."""
    152 
    153     frame = Frame(fin=fin, opcode=opcode, payload=message)
    154     return _filter_and_format_frame_object(frame, mask, frame_filters)
    155 
    156 
    157 def create_text_frame(
    158     message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
    159     """Creates a simple text frame with no extension, reserved bit."""
    160 
    161     encoded_message = message.encode('utf-8')
    162     return create_binary_frame(encoded_message, opcode, fin, mask,
    163                                frame_filters)
    164 
    165 
    166 def parse_frame(receive_bytes, logger=None,
    167                 ws_version=common.VERSION_HYBI_LATEST,
    168                 unmask_receive=True):
    169     """Parses a frame. Returns a tuple containing each header field and
    170     payload.
    171 
    172     Args:
    173         receive_bytes: a function that reads frame data from a stream or
    174             something similar. The function takes length of the bytes to be
    175             read. The function must raise ConnectionTerminatedException if
    176             there is not enough data to be read.
    177         logger: a logging object.
    178         ws_version: the version of WebSocket protocol.
    179         unmask_receive: unmask received frames. When received unmasked
    180             frame, raises InvalidFrameException.
    181 
    182     Raises:
    183         ConnectionTerminatedException: when receive_bytes raises it.
    184         InvalidFrameException: when the frame contains invalid data.
    185     """
    186 
    187     if not logger:
    188         logger = logging.getLogger()
    189 
    190     logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
    191 
    192     received = receive_bytes(2)
    193 
    194     first_byte = ord(received[0])
    195     fin = (first_byte >> 7) & 1
    196     rsv1 = (first_byte >> 6) & 1
    197     rsv2 = (first_byte >> 5) & 1
    198     rsv3 = (first_byte >> 4) & 1
    199     opcode = first_byte & 0xf
    200 
    201     second_byte = ord(received[1])
    202     mask = (second_byte >> 7) & 1
    203     payload_length = second_byte & 0x7f
    204 
    205     logger.log(common.LOGLEVEL_FINE,
    206                'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
    207                'Mask=%s, Payload_length=%s',
    208                fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
    209 
    210     if (mask == 1) != unmask_receive:
    211         raise InvalidFrameException(
    212             'Mask bit on the received frame did\'nt match masking '
    213             'configuration for received frames')
    214 
    215     # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
    216     # into the 8-octet extended payload length field (or 0x0-0xFD in
    217     # 2-octet field).
    218     valid_length_encoding = True
    219     length_encoding_bytes = 1
    220     if payload_length == 127:
    221         logger.log(common.LOGLEVEL_FINE,
    222                    'Receive 8-octet extended payload length')
    223 
    224         extended_payload_length = receive_bytes(8)
    225         payload_length = struct.unpack(
    226             '!Q', extended_payload_length)[0]
    227         if payload_length > 0x7FFFFFFFFFFFFFFF:
    228             raise InvalidFrameException(
    229                 'Extended payload length >= 2^63')
    230         if ws_version >= 13 and payload_length < 0x10000:
    231             valid_length_encoding = False
    232             length_encoding_bytes = 8
    233 
    234         logger.log(common.LOGLEVEL_FINE,
    235                    'Decoded_payload_length=%s', payload_length)
    236     elif payload_length == 126:
    237         logger.log(common.LOGLEVEL_FINE,
    238                    'Receive 2-octet extended payload length')
    239 
    240         extended_payload_length = receive_bytes(2)
    241         payload_length = struct.unpack(
    242             '!H', extended_payload_length)[0]
    243         if ws_version >= 13 and payload_length < 126:
    244             valid_length_encoding = False
    245             length_encoding_bytes = 2
    246 
    247         logger.log(common.LOGLEVEL_FINE,
    248                    'Decoded_payload_length=%s', payload_length)
    249 
    250     if not valid_length_encoding:
    251         logger.warning(
    252             'Payload length is not encoded using the minimal number of '
    253             'bytes (%d is encoded using %d bytes)',
    254             payload_length,
    255             length_encoding_bytes)
    256 
    257     if mask == 1:
    258         logger.log(common.LOGLEVEL_FINE, 'Receive mask')
    259 
    260         masking_nonce = receive_bytes(4)
    261         masker = util.RepeatedXorMasker(masking_nonce)
    262 
    263         logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
    264     else:
    265         masker = _NOOP_MASKER
    266 
    267     logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
    268     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    269         receive_start = time.time()
    270 
    271     raw_payload_bytes = receive_bytes(payload_length)
    272 
    273     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    274         logger.log(
    275             common.LOGLEVEL_FINE,
    276             'Done receiving payload data at %s MB/s',
    277             payload_length / (time.time() - receive_start) / 1000 / 1000)
    278     logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
    279 
    280     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    281         unmask_start = time.time()
    282 
    283     unmasked_bytes = masker.mask(raw_payload_bytes)
    284 
    285     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    286         logger.log(
    287             common.LOGLEVEL_FINE,
    288             'Done unmasking payload data at %s MB/s',
    289             payload_length / (time.time() - unmask_start) / 1000 / 1000)
    290 
    291     return opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3
    292 
    293 
    294 class FragmentedFrameBuilder(object):
    295     """A stateful class to send a message as fragments."""
    296 
    297     def __init__(self, mask, frame_filters=[], encode_utf8=True):
    298         """Constructs an instance."""
    299 
    300         self._mask = mask
    301         self._frame_filters = frame_filters
    302         # This is for skipping UTF-8 encoding when building text type frames
    303         # from compressed data.
    304         self._encode_utf8 = encode_utf8
    305 
    306         self._started = False
    307 
    308         # Hold opcode of the first frame in messages to verify types of other
    309         # frames in the message are all the same.
    310         self._opcode = common.OPCODE_TEXT
    311 
    312     def build(self, payload_data, end, binary):
    313         if binary:
    314             frame_type = common.OPCODE_BINARY
    315         else:
    316             frame_type = common.OPCODE_TEXT
    317         if self._started:
    318             if self._opcode != frame_type:
    319                 raise ValueError('Message types are different in frames for '
    320                                  'the same message')
    321             opcode = common.OPCODE_CONTINUATION
    322         else:
    323             opcode = frame_type
    324             self._opcode = frame_type
    325 
    326         if end:
    327             self._started = False
    328             fin = 1
    329         else:
    330             self._started = True
    331             fin = 0
    332 
    333         if binary or not self._encode_utf8:
    334             return create_binary_frame(
    335                 payload_data, opcode, fin, self._mask, self._frame_filters)
    336         else:
    337             return create_text_frame(
    338                 payload_data, opcode, fin, self._mask, self._frame_filters)
    339 
    340 
    341 def _create_control_frame(opcode, body, mask, frame_filters):
    342     frame = Frame(opcode=opcode, payload=body)
    343 
    344     for frame_filter in frame_filters:
    345         frame_filter.filter(frame)
    346 
    347     if len(frame.payload) > 125:
    348         raise BadOperationException(
    349             'Payload data size of control frames must be 125 bytes or less')
    350 
    351     header = create_header(
    352         frame.opcode, len(frame.payload), frame.fin,
    353         frame.rsv1, frame.rsv2, frame.rsv3, mask)
    354     return _build_frame(header, frame.payload, mask)
    355 
    356 
    357 def create_ping_frame(body, mask=False, frame_filters=[]):
    358     return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
    359 
    360 
    361 def create_pong_frame(body, mask=False, frame_filters=[]):
    362     return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
    363 
    364 
    365 def create_close_frame(body, mask=False, frame_filters=[]):
    366     return _create_control_frame(
    367         common.OPCODE_CLOSE, body, mask, frame_filters)
    368 
    369 
    370 def create_closing_handshake_body(code, reason):
    371     body = ''
    372     if code is not None:
    373         if (code > common.STATUS_USER_PRIVATE_MAX or
    374             code < common.STATUS_NORMAL_CLOSURE):
    375             raise BadOperationException('Status code is out of range')
    376         if (code == common.STATUS_NO_STATUS_RECEIVED or
    377             code == common.STATUS_ABNORMAL_CLOSURE or
    378             code == common.STATUS_TLS_HANDSHAKE):
    379             raise BadOperationException('Status code is reserved pseudo '
    380                 'code')
    381         encoded_reason = reason.encode('utf-8')
    382         body = struct.pack('!H', code) + encoded_reason
    383     return body
    384 
    385 
    386 class StreamOptions(object):
    387     """Holds option values to configure Stream objects."""
    388 
    389     def __init__(self):
    390         """Constructs StreamOptions."""
    391 
    392         # Filters applied to frames.
    393         self.outgoing_frame_filters = []
    394         self.incoming_frame_filters = []
    395 
    396         # Filters applied to messages. Control frames are not affected by them.
    397         self.outgoing_message_filters = []
    398         self.incoming_message_filters = []
    399 
    400         self.encode_text_message_to_utf8 = True
    401         self.mask_send = False
    402         self.unmask_receive = True
    403 
    404 
    405 class Stream(StreamBase):
    406     """A class for parsing/building frames of the WebSocket protocol
    407     (RFC 6455).
    408     """
    409 
    410     def __init__(self, request, options):
    411         """Constructs an instance.
    412 
    413         Args:
    414             request: mod_python request.
    415         """
    416 
    417         StreamBase.__init__(self, request)
    418 
    419         self._logger = util.get_class_logger(self)
    420 
    421         self._options = options
    422 
    423         self._request.client_terminated = False
    424         self._request.server_terminated = False
    425 
    426         # Holds body of received fragments.
    427         self._received_fragments = []
    428         # Holds the opcode of the first fragment.
    429         self._original_opcode = None
    430 
    431         self._writer = FragmentedFrameBuilder(
    432             self._options.mask_send, self._options.outgoing_frame_filters,
    433             self._options.encode_text_message_to_utf8)
    434 
    435         self._ping_queue = deque()
    436 
    437     def _receive_frame(self):
    438         """Receives a frame and return data in the frame as a tuple containing
    439         each header field and payload separately.
    440 
    441         Raises:
    442             ConnectionTerminatedException: when read returns empty
    443                 string.
    444             InvalidFrameException: when the frame contains invalid data.
    445         """
    446 
    447         def _receive_bytes(length):
    448             return self.receive_bytes(length)
    449 
    450         return parse_frame(receive_bytes=_receive_bytes,
    451                            logger=self._logger,
    452                            ws_version=self._request.ws_version,
    453                            unmask_receive=self._options.unmask_receive)
    454 
    455     def _receive_frame_as_frame_object(self):
    456         opcode, unmasked_bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
    457 
    458         return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
    459                      opcode=opcode, payload=unmasked_bytes)
    460 
    461     def receive_filtered_frame(self):
    462         """Receives a frame and applies frame filters and message filters.
    463         The frame to be received must satisfy following conditions:
    464         - The frame is not fragmented.
    465         - The opcode of the frame is TEXT or BINARY.
    466 
    467         DO NOT USE this method except for testing purpose.
    468         """
    469 
    470         frame = self._receive_frame_as_frame_object()
    471         if not frame.fin:
    472             raise InvalidFrameException(
    473                 'Segmented frames must not be received via '
    474                 'receive_filtered_frame()')
    475         if (frame.opcode != common.OPCODE_TEXT and
    476             frame.opcode != common.OPCODE_BINARY):
    477             raise InvalidFrameException(
    478                 'Control frames must not be received via '
    479                 'receive_filtered_frame()')
    480 
    481         for frame_filter in self._options.incoming_frame_filters:
    482             frame_filter.filter(frame)
    483         for message_filter in self._options.incoming_message_filters:
    484             frame.payload = message_filter.filter(frame.payload)
    485         return frame
    486 
    487     def send_message(self, message, end=True, binary=False):
    488         """Send message.
    489 
    490         Args:
    491             message: text in unicode or binary in str to send.
    492             binary: send message as binary frame.
    493 
    494         Raises:
    495             BadOperationException: when called on a server-terminated
    496                 connection or called with inconsistent message type or
    497                 binary parameter.
    498         """
    499 
    500         if self._request.server_terminated:
    501             raise BadOperationException(
    502                 'Requested send_message after sending out a closing handshake')
    503 
    504         if binary and isinstance(message, unicode):
    505             raise BadOperationException(
    506                 'Message for binary frame must be instance of str')
    507 
    508         for message_filter in self._options.outgoing_message_filters:
    509             message = message_filter.filter(message, end, binary)
    510 
    511         try:
    512             # Set this to any positive integer to limit maximum size of data in
    513             # payload data of each frame.
    514             MAX_PAYLOAD_DATA_SIZE = -1
    515 
    516             if MAX_PAYLOAD_DATA_SIZE <= 0:
    517                 self._write(self._writer.build(message, end, binary))
    518                 return
    519 
    520             bytes_written = 0
    521             while True:
    522                 end_for_this_frame = end
    523                 bytes_to_write = len(message) - bytes_written
    524                 if (MAX_PAYLOAD_DATA_SIZE > 0 and
    525                     bytes_to_write > MAX_PAYLOAD_DATA_SIZE):
    526                     end_for_this_frame = False
    527                     bytes_to_write = MAX_PAYLOAD_DATA_SIZE
    528 
    529                 frame = self._writer.build(
    530                     message[bytes_written:bytes_written + bytes_to_write],
    531                     end_for_this_frame,
    532                     binary)
    533                 self._write(frame)
    534 
    535                 bytes_written += bytes_to_write
    536 
    537                 # This if must be placed here (the end of while block) so that
    538                 # at least one frame is sent.
    539                 if len(message) <= bytes_written:
    540                     break
    541         except ValueError, e:
    542             raise BadOperationException(e)
    543 
    544     def _get_message_from_frame(self, frame):
    545         """Gets a message from frame. If the message is composed of fragmented
    546         frames and the frame is not the last fragmented frame, this method
    547         returns None. The whole message will be returned when the last
    548         fragmented frame is passed to this method.
    549 
    550         Raises:
    551             InvalidFrameException: when the frame doesn't match defragmentation
    552                 context, or the frame contains invalid data.
    553         """
    554 
    555         if frame.opcode == common.OPCODE_CONTINUATION:
    556             if not self._received_fragments:
    557                 if frame.fin:
    558                     raise InvalidFrameException(
    559                         'Received a termination frame but fragmentation '
    560                         'not started')
    561                 else:
    562                     raise InvalidFrameException(
    563                         'Received an intermediate frame but '
    564                         'fragmentation not started')
    565 
    566             if frame.fin:
    567                 # End of fragmentation frame
    568                 self._received_fragments.append(frame.payload)
    569                 message = ''.join(self._received_fragments)
    570                 self._received_fragments = []
    571                 return message
    572             else:
    573                 # Intermediate frame
    574                 self._received_fragments.append(frame.payload)
    575                 return None
    576         else:
    577             if self._received_fragments:
    578                 if frame.fin:
    579                     raise InvalidFrameException(
    580                         'Received an unfragmented frame without '
    581                         'terminating existing fragmentation')
    582                 else:
    583                     raise InvalidFrameException(
    584                         'New fragmentation started without terminating '
    585                         'existing fragmentation')
    586 
    587             if frame.fin:
    588                 # Unfragmented frame
    589 
    590                 self._original_opcode = frame.opcode
    591                 return frame.payload
    592             else:
    593                 # Start of fragmentation frame
    594 
    595                 if common.is_control_opcode(frame.opcode):
    596                     raise InvalidFrameException(
    597                         'Control frames must not be fragmented')
    598 
    599                 self._original_opcode = frame.opcode
    600                 self._received_fragments.append(frame.payload)
    601                 return None
    602 
    603     def _process_close_message(self, message):
    604         """Processes close message.
    605 
    606         Args:
    607             message: close message.
    608 
    609         Raises:
    610             InvalidFrameException: when the message is invalid.
    611         """
    612 
    613         self._request.client_terminated = True
    614 
    615         # Status code is optional. We can have status reason only if we
    616         # have status code. Status reason can be empty string. So,
    617         # allowed cases are
    618         # - no application data: no code no reason
    619         # - 2 octet of application data: has code but no reason
    620         # - 3 or more octet of application data: both code and reason
    621         if len(message) == 0:
    622             self._logger.debug('Received close frame (empty body)')
    623             self._request.ws_close_code = (
    624                 common.STATUS_NO_STATUS_RECEIVED)
    625         elif len(message) == 1:
    626             raise InvalidFrameException(
    627                 'If a close frame has status code, the length of '
    628                 'status code must be 2 octet')
    629         elif len(message) >= 2:
    630             self._request.ws_close_code = struct.unpack(
    631                 '!H', message[0:2])[0]
    632             self._request.ws_close_reason = message[2:].decode(
    633                 'utf-8', 'replace')
    634             self._logger.debug(
    635                 'Received close frame (code=%d, reason=%r)',
    636                 self._request.ws_close_code,
    637                 self._request.ws_close_reason)
    638 
    639         # As we've received a close frame, no more data is coming over the
    640         # socket. We can now safely close the socket without worrying about
    641         # RST sending.
    642 
    643         if self._request.server_terminated:
    644             self._logger.debug(
    645                 'Received ack for server-initiated closing handshake')
    646             return
    647 
    648         self._logger.debug(
    649             'Received client-initiated closing handshake')
    650 
    651         code = common.STATUS_NORMAL_CLOSURE
    652         reason = ''
    653         if hasattr(self._request, '_dispatcher'):
    654             dispatcher = self._request._dispatcher
    655             code, reason = dispatcher.passive_closing_handshake(
    656                 self._request)
    657             if code is None and reason is not None and len(reason) > 0:
    658                 self._logger.warning(
    659                     'Handler specified reason despite code being None')
    660                 reason = ''
    661             if reason is None:
    662                 reason = ''
    663         self._send_closing_handshake(code, reason)
    664         self._logger.debug(
    665             'Acknowledged closing handshake initiated by the peer '
    666             '(code=%r, reason=%r)', code, reason)
    667 
    668     def _process_ping_message(self, message):
    669         """Processes ping message.
    670 
    671         Args:
    672             message: ping message.
    673         """
    674 
    675         try:
    676             handler = self._request.on_ping_handler
    677             if handler:
    678                 handler(self._request, message)
    679                 return
    680         except AttributeError, e:
    681             pass
    682         self._send_pong(message)
    683 
    684     def _process_pong_message(self, message):
    685         """Processes pong message.
    686 
    687         Args:
    688             message: pong message.
    689         """
    690 
    691         # TODO(tyoshino): Add ping timeout handling.
    692 
    693         inflight_pings = deque()
    694 
    695         while True:
    696             try:
    697                 expected_body = self._ping_queue.popleft()
    698                 if expected_body == message:
    699                     # inflight_pings contains pings ignored by the
    700                     # other peer. Just forget them.
    701                     self._logger.debug(
    702                         'Ping %r is acked (%d pings were ignored)',
    703                         expected_body, len(inflight_pings))
    704                     break
    705                 else:
    706                     inflight_pings.append(expected_body)
    707             except IndexError, e:
    708                 # The received pong was unsolicited pong. Keep the
    709                 # ping queue as is.
    710                 self._ping_queue = inflight_pings
    711                 self._logger.debug('Received a unsolicited pong')
    712                 break
    713 
    714         try:
    715             handler = self._request.on_pong_handler
    716             if handler:
    717                 handler(self._request, message)
    718         except AttributeError, e:
    719             pass
    720 
    721     def receive_message(self):
    722         """Receive a WebSocket frame and return its payload as a text in
    723         unicode or a binary in str.
    724 
    725         Returns:
    726             payload data of the frame
    727             - as unicode instance if received text frame
    728             - as str instance if received binary frame
    729             or None iff received closing handshake.
    730         Raises:
    731             BadOperationException: when called on a client-terminated
    732                 connection.
    733             ConnectionTerminatedException: when read returns empty
    734                 string.
    735             InvalidFrameException: when the frame contains invalid
    736                 data.
    737             UnsupportedFrameException: when the received frame has
    738                 flags, opcode we cannot handle. You can ignore this
    739                 exception and continue receiving the next frame.
    740         """
    741 
    742         if self._request.client_terminated:
    743             raise BadOperationException(
    744                 'Requested receive_message after receiving a closing '
    745                 'handshake')
    746 
    747         while True:
    748             # mp_conn.read will block if no bytes are available.
    749             # Timeout is controlled by TimeOut directive of Apache.
    750 
    751             frame = self._receive_frame_as_frame_object()
    752 
    753             # Check the constraint on the payload size for control frames
    754             # before extension processes the frame.
    755             # See also http://tools.ietf.org/html/rfc6455#section-5.5
    756             if (common.is_control_opcode(frame.opcode) and
    757                 len(frame.payload) > 125):
    758                 raise InvalidFrameException(
    759                     'Payload data size of control frames must be 125 bytes or '
    760                     'less')
    761 
    762             for frame_filter in self._options.incoming_frame_filters:
    763                 frame_filter.filter(frame)
    764 
    765             if frame.rsv1 or frame.rsv2 or frame.rsv3:
    766                 raise UnsupportedFrameException(
    767                     'Unsupported flag is set (rsv = %d%d%d)' %
    768                     (frame.rsv1, frame.rsv2, frame.rsv3))
    769 
    770             message = self._get_message_from_frame(frame)
    771             if message is None:
    772                 continue
    773 
    774             for message_filter in self._options.incoming_message_filters:
    775                 message = message_filter.filter(message)
    776 
    777             if self._original_opcode == common.OPCODE_TEXT:
    778                 # The WebSocket protocol section 4.4 specifies that invalid
    779                 # characters must be replaced with U+fffd REPLACEMENT
    780                 # CHARACTER.
    781                 try:
    782                     return message.decode('utf-8')
    783                 except UnicodeDecodeError, e:
    784                     raise InvalidUTF8Exception(e)
    785             elif self._original_opcode == common.OPCODE_BINARY:
    786                 return message
    787             elif self._original_opcode == common.OPCODE_CLOSE:
    788                 self._process_close_message(message)
    789                 return None
    790             elif self._original_opcode == common.OPCODE_PING:
    791                 self._process_ping_message(message)
    792             elif self._original_opcode == common.OPCODE_PONG:
    793                 self._process_pong_message(message)
    794             else:
    795                 raise UnsupportedFrameException(
    796                     'Opcode %d is not supported' % self._original_opcode)
    797 
    798     def _send_closing_handshake(self, code, reason):
    799         body = create_closing_handshake_body(code, reason)
    800         frame = create_close_frame(
    801             body, mask=self._options.mask_send,
    802             frame_filters=self._options.outgoing_frame_filters)
    803 
    804         self._request.server_terminated = True
    805 
    806         self._write(frame)
    807 
    808     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason='',
    809                          wait_response=True):
    810         """Closes a WebSocket connection.
    811 
    812         Args:
    813             code: Status code for close frame. If code is None, a close
    814                 frame with empty body will be sent.
    815             reason: string representing close reason.
    816             wait_response: True when caller want to wait the response.
    817         Raises:
    818             BadOperationException: when reason is specified with code None
    819             or reason is not an instance of both str and unicode.
    820         """
    821 
    822         if self._request.server_terminated:
    823             self._logger.debug(
    824                 'Requested close_connection but server is already terminated')
    825             return
    826 
    827         if code is None:
    828             if reason is not None and len(reason) > 0:
    829                 raise BadOperationException(
    830                     'close reason must not be specified if code is None')
    831             reason = ''
    832         else:
    833             if not isinstance(reason, str) and not isinstance(reason, unicode):
    834                 raise BadOperationException(
    835                     'close reason must be an instance of str or unicode')
    836 
    837         self._send_closing_handshake(code, reason)
    838         self._logger.debug(
    839             'Initiated closing handshake (code=%r, reason=%r)',
    840             code, reason)
    841 
    842         if (code == common.STATUS_GOING_AWAY or
    843             code == common.STATUS_PROTOCOL_ERROR) or not wait_response:
    844             # It doesn't make sense to wait for a close frame if the reason is
    845             # protocol error or that the server is going away. For some of
    846             # other reasons, it might not make sense to wait for a close frame,
    847             # but it's not clear, yet.
    848             return
    849 
    850         # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
    851         # or until a server-defined timeout expires.
    852         #
    853         # For now, we expect receiving closing handshake right after sending
    854         # out closing handshake.
    855         message = self.receive_message()
    856         if message is not None:
    857             raise ConnectionTerminatedException(
    858                 'Didn\'t receive valid ack for closing handshake')
    859         # TODO: 3. close the WebSocket connection.
    860         # note: mod_python Connection (mp_conn) doesn't have close method.
    861 
    862     def send_ping(self, body=''):
    863         frame = create_ping_frame(
    864             body,
    865             self._options.mask_send,
    866             self._options.outgoing_frame_filters)
    867         self._write(frame)
    868 
    869         self._ping_queue.append(body)
    870 
    871     def _send_pong(self, body):
    872         frame = create_pong_frame(
    873             body,
    874             self._options.mask_send,
    875             self._options.outgoing_frame_filters)
    876         self._write(frame)
    877 
    878     def get_last_received_opcode(self):
    879         """Returns the opcode of the WebSocket message which the last received
    880         frame belongs to. The return value is valid iff immediately after
    881         receive_message call.
    882         """
    883 
    884         return self._original_opcode
    885 
    886 
    887 # vi:sts=4 sw=4 et
    888