Home | History | Annotate | Download | only in mod_pywebsocket
      1 # Copyright 2012, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 
     31 """This file provides classes and helper functions for parsing/building frames
     32 of the WebSocket protocol (RFC 6455).
     33 
     34 Specification:
     35 http://tools.ietf.org/html/rfc6455
     36 """
     37 
     38 
     39 from collections import deque
     40 import logging
     41 import os
     42 import struct
     43 import time
     44 
     45 from mod_pywebsocket import common
     46 from mod_pywebsocket import util
     47 from mod_pywebsocket._stream_base import BadOperationException
     48 from mod_pywebsocket._stream_base import ConnectionTerminatedException
     49 from mod_pywebsocket._stream_base import InvalidFrameException
     50 from mod_pywebsocket._stream_base import InvalidUTF8Exception
     51 from mod_pywebsocket._stream_base import StreamBase
     52 from mod_pywebsocket._stream_base import UnsupportedFrameException
     53 
     54 
     55 _NOOP_MASKER = util.NoopMasker()
     56 
     57 
     58 class Frame(object):
     59 
     60     def __init__(self, fin=1, rsv1=0, rsv2=0, rsv3=0,
     61                  opcode=None, payload=''):
     62         self.fin = fin
     63         self.rsv1 = rsv1
     64         self.rsv2 = rsv2
     65         self.rsv3 = rsv3
     66         self.opcode = opcode
     67         self.payload = payload
     68 
     69 
     70 # Helper functions made public to be used for writing unittests for WebSocket
     71 # clients.
     72 
     73 
     74 def create_length_header(length, mask):
     75     """Creates a length header.
     76 
     77     Args:
     78         length: Frame length. Must be less than 2^63.
     79         mask: Mask bit. Must be boolean.
     80 
     81     Raises:
     82         ValueError: when bad data is given.
     83     """
     84 
     85     if mask:
     86         mask_bit = 1 << 7
     87     else:
     88         mask_bit = 0
     89 
     90     if length < 0:
     91         raise ValueError('length must be non negative integer')
     92     elif length <= 125:
     93         return chr(mask_bit | length)
     94     elif length < (1 << 16):
     95         return chr(mask_bit | 126) + struct.pack('!H', length)
     96     elif length < (1 << 63):
     97         return chr(mask_bit | 127) + struct.pack('!Q', length)
     98     else:
     99         raise ValueError('Payload is too big for one frame')
    100 
    101 
    102 def create_header(opcode, payload_length, fin, rsv1, rsv2, rsv3, mask):
    103     """Creates a frame header.
    104 
    105     Raises:
    106         Exception: when bad data is given.
    107     """
    108 
    109     if opcode < 0 or 0xf < opcode:
    110         raise ValueError('Opcode out of range')
    111 
    112     if payload_length < 0 or (1 << 63) <= payload_length:
    113         raise ValueError('payload_length out of range')
    114 
    115     if (fin | rsv1 | rsv2 | rsv3) & ~1:
    116         raise ValueError('FIN bit and Reserved bit parameter must be 0 or 1')
    117 
    118     header = ''
    119 
    120     first_byte = ((fin << 7)
    121                   | (rsv1 << 6) | (rsv2 << 5) | (rsv3 << 4)
    122                   | opcode)
    123     header += chr(first_byte)
    124     header += create_length_header(payload_length, mask)
    125 
    126     return header
    127 
    128 
    129 def _build_frame(header, body, mask):
    130     if not mask:
    131         return header + body
    132 
    133     masking_nonce = os.urandom(4)
    134     masker = util.RepeatedXorMasker(masking_nonce)
    135 
    136     return header + masking_nonce + masker.mask(body)
    137 
    138 
    139 def _filter_and_format_frame_object(frame, mask, frame_filters):
    140     for frame_filter in frame_filters:
    141         frame_filter.filter(frame)
    142 
    143     header = create_header(
    144         frame.opcode, len(frame.payload), frame.fin,
    145         frame.rsv1, frame.rsv2, frame.rsv3, mask)
    146     return _build_frame(header, frame.payload, mask)
    147 
    148 
    149 def create_binary_frame(
    150     message, opcode=common.OPCODE_BINARY, fin=1, mask=False, frame_filters=[]):
    151     """Creates a simple binary frame with no extension, reserved bit."""
    152 
    153     frame = Frame(fin=fin, opcode=opcode, payload=message)
    154     return _filter_and_format_frame_object(frame, mask, frame_filters)
    155 
    156 
    157 def create_text_frame(
    158     message, opcode=common.OPCODE_TEXT, fin=1, mask=False, frame_filters=[]):
    159     """Creates a simple text frame with no extension, reserved bit."""
    160 
    161     encoded_message = message.encode('utf-8')
    162     return create_binary_frame(encoded_message, opcode, fin, mask,
    163                                frame_filters)
    164 
    165 
    166 def parse_frame(receive_bytes, logger=None,
    167                 ws_version=common.VERSION_HYBI_LATEST,
    168                 unmask_receive=True):
    169     """Parses a frame. Returns a tuple containing each header field and
    170     payload.
    171 
    172     Args:
    173         receive_bytes: a function that reads frame data from a stream or
    174             something similar. The function takes length of the bytes to be
    175             read. The function must raise ConnectionTerminatedException if
    176             there is not enough data to be read.
    177         logger: a logging object.
    178         ws_version: the version of WebSocket protocol.
    179         unmask_receive: unmask received frames. When received unmasked
    180             frame, raises InvalidFrameException.
    181 
    182     Raises:
    183         ConnectionTerminatedException: when receive_bytes raises it.
    184         InvalidFrameException: when the frame contains invalid data.
    185     """
    186 
    187     if not logger:
    188         logger = logging.getLogger()
    189 
    190     logger.log(common.LOGLEVEL_FINE, 'Receive the first 2 octets of a frame')
    191 
    192     received = receive_bytes(2)
    193 
    194     first_byte = ord(received[0])
    195     fin = (first_byte >> 7) & 1
    196     rsv1 = (first_byte >> 6) & 1
    197     rsv2 = (first_byte >> 5) & 1
    198     rsv3 = (first_byte >> 4) & 1
    199     opcode = first_byte & 0xf
    200 
    201     second_byte = ord(received[1])
    202     mask = (second_byte >> 7) & 1
    203     payload_length = second_byte & 0x7f
    204 
    205     logger.log(common.LOGLEVEL_FINE,
    206                'FIN=%s, RSV1=%s, RSV2=%s, RSV3=%s, opcode=%s, '
    207                'Mask=%s, Payload_length=%s',
    208                fin, rsv1, rsv2, rsv3, opcode, mask, payload_length)
    209 
    210     if (mask == 1) != unmask_receive:
    211         raise InvalidFrameException(
    212             'Mask bit on the received frame did\'nt match masking '
    213             'configuration for received frames')
    214 
    215     # The HyBi and later specs disallow putting a value in 0x0-0xFFFF
    216     # into the 8-octet extended payload length field (or 0x0-0xFD in
    217     # 2-octet field).
    218     valid_length_encoding = True
    219     length_encoding_bytes = 1
    220     if payload_length == 127:
    221         logger.log(common.LOGLEVEL_FINE,
    222                    'Receive 8-octet extended payload length')
    223 
    224         extended_payload_length = receive_bytes(8)
    225         payload_length = struct.unpack(
    226             '!Q', extended_payload_length)[0]
    227         if payload_length > 0x7FFFFFFFFFFFFFFF:
    228             raise InvalidFrameException(
    229                 'Extended payload length >= 2^63')
    230         if ws_version >= 13 and payload_length < 0x10000:
    231             valid_length_encoding = False
    232             length_encoding_bytes = 8
    233 
    234         logger.log(common.LOGLEVEL_FINE,
    235                    'Decoded_payload_length=%s', payload_length)
    236     elif payload_length == 126:
    237         logger.log(common.LOGLEVEL_FINE,
    238                    'Receive 2-octet extended payload length')
    239 
    240         extended_payload_length = receive_bytes(2)
    241         payload_length = struct.unpack(
    242             '!H', extended_payload_length)[0]
    243         if ws_version >= 13 and payload_length < 126:
    244             valid_length_encoding = False
    245             length_encoding_bytes = 2
    246 
    247         logger.log(common.LOGLEVEL_FINE,
    248                    'Decoded_payload_length=%s', payload_length)
    249 
    250     if not valid_length_encoding:
    251         logger.warning(
    252             'Payload length is not encoded using the minimal number of '
    253             'bytes (%d is encoded using %d bytes)',
    254             payload_length,
    255             length_encoding_bytes)
    256 
    257     if mask == 1:
    258         logger.log(common.LOGLEVEL_FINE, 'Receive mask')
    259 
    260         masking_nonce = receive_bytes(4)
    261         masker = util.RepeatedXorMasker(masking_nonce)
    262 
    263         logger.log(common.LOGLEVEL_FINE, 'Mask=%r', masking_nonce)
    264     else:
    265         masker = _NOOP_MASKER
    266 
    267     logger.log(common.LOGLEVEL_FINE, 'Receive payload data')
    268     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    269         receive_start = time.time()
    270 
    271     raw_payload_bytes = receive_bytes(payload_length)
    272 
    273     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    274         logger.log(
    275             common.LOGLEVEL_FINE,
    276             'Done receiving payload data at %s MB/s',
    277             payload_length / (time.time() - receive_start) / 1000 / 1000)
    278     logger.log(common.LOGLEVEL_FINE, 'Unmask payload data')
    279 
    280     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    281         unmask_start = time.time()
    282 
    283     bytes = masker.mask(raw_payload_bytes)
    284 
    285     if logger.isEnabledFor(common.LOGLEVEL_FINE):
    286         logger.log(
    287             common.LOGLEVEL_FINE,
    288             'Done unmasking payload data at %s MB/s',
    289             payload_length / (time.time() - unmask_start) / 1000 / 1000)
    290 
    291     return opcode, bytes, fin, rsv1, rsv2, rsv3
    292 
    293 
    294 class FragmentedFrameBuilder(object):
    295     """A stateful class to send a message as fragments."""
    296 
    297     def __init__(self, mask, frame_filters=[], encode_utf8=True):
    298         """Constructs an instance."""
    299 
    300         self._mask = mask
    301         self._frame_filters = frame_filters
    302         # This is for skipping UTF-8 encoding when building text type frames
    303         # from compressed data.
    304         self._encode_utf8 = encode_utf8
    305 
    306         self._started = False
    307 
    308         # Hold opcode of the first frame in messages to verify types of other
    309         # frames in the message are all the same.
    310         self._opcode = common.OPCODE_TEXT
    311 
    312     def build(self, message, end, binary):
    313         if binary:
    314             frame_type = common.OPCODE_BINARY
    315         else:
    316             frame_type = common.OPCODE_TEXT
    317         if self._started:
    318             if self._opcode != frame_type:
    319                 raise ValueError('Message types are different in frames for '
    320                                  'the same message')
    321             opcode = common.OPCODE_CONTINUATION
    322         else:
    323             opcode = frame_type
    324             self._opcode = frame_type
    325 
    326         if end:
    327             self._started = False
    328             fin = 1
    329         else:
    330             self._started = True
    331             fin = 0
    332 
    333         if binary or not self._encode_utf8:
    334             return create_binary_frame(
    335                 message, opcode, fin, self._mask, self._frame_filters)
    336         else:
    337             return create_text_frame(
    338                 message, opcode, fin, self._mask, self._frame_filters)
    339 
    340 
    341 def _create_control_frame(opcode, body, mask, frame_filters):
    342     frame = Frame(opcode=opcode, payload=body)
    343 
    344     for frame_filter in frame_filters:
    345         frame_filter.filter(frame)
    346 
    347     if len(frame.payload) > 125:
    348         raise BadOperationException(
    349             'Payload data size of control frames must be 125 bytes or less')
    350 
    351     header = create_header(
    352         frame.opcode, len(frame.payload), frame.fin,
    353         frame.rsv1, frame.rsv2, frame.rsv3, mask)
    354     return _build_frame(header, frame.payload, mask)
    355 
    356 
    357 def create_ping_frame(body, mask=False, frame_filters=[]):
    358     return _create_control_frame(common.OPCODE_PING, body, mask, frame_filters)
    359 
    360 
    361 def create_pong_frame(body, mask=False, frame_filters=[]):
    362     return _create_control_frame(common.OPCODE_PONG, body, mask, frame_filters)
    363 
    364 
    365 def create_close_frame(body, mask=False, frame_filters=[]):
    366     return _create_control_frame(
    367         common.OPCODE_CLOSE, body, mask, frame_filters)
    368 
    369 
    370 def create_closing_handshake_body(code, reason):
    371     body = ''
    372     if code is not None:
    373         if (code > common.STATUS_USER_PRIVATE_MAX or
    374             code < common.STATUS_NORMAL_CLOSURE):
    375             raise BadOperationException('Status code is out of range')
    376         if (code == common.STATUS_NO_STATUS_RECEIVED or
    377             code == common.STATUS_ABNORMAL_CLOSURE or
    378             code == common.STATUS_TLS_HANDSHAKE):
    379             raise BadOperationException('Status code is reserved pseudo '
    380                 'code')
    381         encoded_reason = reason.encode('utf-8')
    382         body = struct.pack('!H', code) + encoded_reason
    383     return body
    384 
    385 
    386 class StreamOptions(object):
    387     """Holds option values to configure Stream objects."""
    388 
    389     def __init__(self):
    390         """Constructs StreamOptions."""
    391 
    392         # Enables deflate-stream extension.
    393         self.deflate_stream = False
    394 
    395         # Filters applied to frames.
    396         self.outgoing_frame_filters = []
    397         self.incoming_frame_filters = []
    398 
    399         # Filters applied to messages. Control frames are not affected by them.
    400         self.outgoing_message_filters = []
    401         self.incoming_message_filters = []
    402 
    403         self.encode_text_message_to_utf8 = True
    404         self.mask_send = False
    405         self.unmask_receive = True
    406         # RFC6455 disallows fragmented control frames, but mux extension
    407         # relaxes the restriction.
    408         self.allow_fragmented_control_frame = False
    409 
    410 
    411 class Stream(StreamBase):
    412     """A class for parsing/building frames of the WebSocket protocol
    413     (RFC 6455).
    414     """
    415 
    416     def __init__(self, request, options):
    417         """Constructs an instance.
    418 
    419         Args:
    420             request: mod_python request.
    421         """
    422 
    423         StreamBase.__init__(self, request)
    424 
    425         self._logger = util.get_class_logger(self)
    426 
    427         self._options = options
    428 
    429         if self._options.deflate_stream:
    430             self._logger.debug('Setup filter for deflate-stream')
    431             self._request = util.DeflateRequest(self._request)
    432 
    433         self._request.client_terminated = False
    434         self._request.server_terminated = False
    435 
    436         # Holds body of received fragments.
    437         self._received_fragments = []
    438         # Holds the opcode of the first fragment.
    439         self._original_opcode = None
    440 
    441         self._writer = FragmentedFrameBuilder(
    442             self._options.mask_send, self._options.outgoing_frame_filters,
    443             self._options.encode_text_message_to_utf8)
    444 
    445         self._ping_queue = deque()
    446 
    447     def _receive_frame(self):
    448         """Receives a frame and return data in the frame as a tuple containing
    449         each header field and payload separately.
    450 
    451         Raises:
    452             ConnectionTerminatedException: when read returns empty
    453                 string.
    454             InvalidFrameException: when the frame contains invalid data.
    455         """
    456 
    457         def _receive_bytes(length):
    458             return self.receive_bytes(length)
    459 
    460         return parse_frame(receive_bytes=_receive_bytes,
    461                            logger=self._logger,
    462                            ws_version=self._request.ws_version,
    463                            unmask_receive=self._options.unmask_receive)
    464 
    465     def _receive_frame_as_frame_object(self):
    466         opcode, bytes, fin, rsv1, rsv2, rsv3 = self._receive_frame()
    467 
    468         return Frame(fin=fin, rsv1=rsv1, rsv2=rsv2, rsv3=rsv3,
    469                      opcode=opcode, payload=bytes)
    470 
    471     def send_message(self, message, end=True, binary=False):
    472         """Send message.
    473 
    474         Args:
    475             message: text in unicode or binary in str to send.
    476             binary: send message as binary frame.
    477 
    478         Raises:
    479             BadOperationException: when called on a server-terminated
    480                 connection or called with inconsistent message type or
    481                 binary parameter.
    482         """
    483 
    484         if self._request.server_terminated:
    485             raise BadOperationException(
    486                 'Requested send_message after sending out a closing handshake')
    487 
    488         if binary and isinstance(message, unicode):
    489             raise BadOperationException(
    490                 'Message for binary frame must be instance of str')
    491 
    492         for message_filter in self._options.outgoing_message_filters:
    493             message = message_filter.filter(message, end, binary)
    494 
    495         try:
    496             self._write(self._writer.build(message, end, binary))
    497         except ValueError, e:
    498             raise BadOperationException(e)
    499 
    500     def _get_message_from_frame(self, frame):
    501         """Gets a message from frame. If the message is composed of fragmented
    502         frames and the frame is not the last fragmented frame, this method
    503         returns None. The whole message will be returned when the last
    504         fragmented frame is passed to this method.
    505 
    506         Raises:
    507             InvalidFrameException: when the frame doesn't match defragmentation
    508                 context, or the frame contains invalid data.
    509         """
    510 
    511         if frame.opcode == common.OPCODE_CONTINUATION:
    512             if not self._received_fragments:
    513                 if frame.fin:
    514                     raise InvalidFrameException(
    515                         'Received a termination frame but fragmentation '
    516                         'not started')
    517                 else:
    518                     raise InvalidFrameException(
    519                         'Received an intermediate frame but '
    520                         'fragmentation not started')
    521 
    522             if frame.fin:
    523                 # End of fragmentation frame
    524                 self._received_fragments.append(frame.payload)
    525                 message = ''.join(self._received_fragments)
    526                 self._received_fragments = []
    527                 return message
    528             else:
    529                 # Intermediate frame
    530                 self._received_fragments.append(frame.payload)
    531                 return None
    532         else:
    533             if self._received_fragments:
    534                 if frame.fin:
    535                     raise InvalidFrameException(
    536                         'Received an unfragmented frame without '
    537                         'terminating existing fragmentation')
    538                 else:
    539                     raise InvalidFrameException(
    540                         'New fragmentation started without terminating '
    541                         'existing fragmentation')
    542 
    543             if frame.fin:
    544                 # Unfragmented frame
    545 
    546                 self._original_opcode = frame.opcode
    547                 return frame.payload
    548             else:
    549                 # Start of fragmentation frame
    550 
    551                 if (not self._options.allow_fragmented_control_frame and
    552                     common.is_control_opcode(frame.opcode)):
    553                     raise InvalidFrameException(
    554                         'Control frames must not be fragmented')
    555 
    556                 self._original_opcode = frame.opcode
    557                 self._received_fragments.append(frame.payload)
    558                 return None
    559 
    560     def _process_close_message(self, message):
    561         """Processes close message.
    562 
    563         Args:
    564             message: close message.
    565 
    566         Raises:
    567             InvalidFrameException: when the message is invalid.
    568         """
    569 
    570         self._request.client_terminated = True
    571 
    572         # Status code is optional. We can have status reason only if we
    573         # have status code. Status reason can be empty string. So,
    574         # allowed cases are
    575         # - no application data: no code no reason
    576         # - 2 octet of application data: has code but no reason
    577         # - 3 or more octet of application data: both code and reason
    578         if len(message) == 0:
    579             self._logger.debug('Received close frame (empty body)')
    580             self._request.ws_close_code = (
    581                 common.STATUS_NO_STATUS_RECEIVED)
    582         elif len(message) == 1:
    583             raise InvalidFrameException(
    584                 'If a close frame has status code, the length of '
    585                 'status code must be 2 octet')
    586         elif len(message) >= 2:
    587             self._request.ws_close_code = struct.unpack(
    588                 '!H', message[0:2])[0]
    589             self._request.ws_close_reason = message[2:].decode(
    590                 'utf-8', 'replace')
    591             self._logger.debug(
    592                 'Received close frame (code=%d, reason=%r)',
    593                 self._request.ws_close_code,
    594                 self._request.ws_close_reason)
    595 
    596         # Drain junk data after the close frame if necessary.
    597         self._drain_received_data()
    598 
    599         if self._request.server_terminated:
    600             self._logger.debug(
    601                 'Received ack for server-initiated closing handshake')
    602             return
    603 
    604         self._logger.debug(
    605             'Received client-initiated closing handshake')
    606 
    607         code = common.STATUS_NORMAL_CLOSURE
    608         reason = ''
    609         if hasattr(self._request, '_dispatcher'):
    610             dispatcher = self._request._dispatcher
    611             code, reason = dispatcher.passive_closing_handshake(
    612                 self._request)
    613             if code is None and reason is not None and len(reason) > 0:
    614                 self._logger.warning(
    615                     'Handler specified reason despite code being None')
    616                 reason = ''
    617             if reason is None:
    618                 reason = ''
    619         self._send_closing_handshake(code, reason)
    620         self._logger.debug(
    621             'Sent ack for client-initiated closing handshake '
    622             '(code=%r, reason=%r)', code, reason)
    623 
    624     def _process_ping_message(self, message):
    625         """Processes ping message.
    626 
    627         Args:
    628             message: ping message.
    629         """
    630 
    631         try:
    632             handler = self._request.on_ping_handler
    633             if handler:
    634                 handler(self._request, message)
    635                 return
    636         except AttributeError, e:
    637             pass
    638         self._send_pong(message)
    639 
    640     def _process_pong_message(self, message):
    641         """Processes pong message.
    642 
    643         Args:
    644             message: pong message.
    645         """
    646 
    647         # TODO(tyoshino): Add ping timeout handling.
    648 
    649         inflight_pings = deque()
    650 
    651         while True:
    652             try:
    653                 expected_body = self._ping_queue.popleft()
    654                 if expected_body == message:
    655                     # inflight_pings contains pings ignored by the
    656                     # other peer. Just forget them.
    657                     self._logger.debug(
    658                         'Ping %r is acked (%d pings were ignored)',
    659                         expected_body, len(inflight_pings))
    660                     break
    661                 else:
    662                     inflight_pings.append(expected_body)
    663             except IndexError, e:
    664                 # The received pong was unsolicited pong. Keep the
    665                 # ping queue as is.
    666                 self._ping_queue = inflight_pings
    667                 self._logger.debug('Received a unsolicited pong')
    668                 break
    669 
    670         try:
    671             handler = self._request.on_pong_handler
    672             if handler:
    673                 handler(self._request, message)
    674         except AttributeError, e:
    675             pass
    676 
    677     def receive_message(self):
    678         """Receive a WebSocket frame and return its payload as a text in
    679         unicode or a binary in str.
    680 
    681         Returns:
    682             payload data of the frame
    683             - as unicode instance if received text frame
    684             - as str instance if received binary frame
    685             or None iff received closing handshake.
    686         Raises:
    687             BadOperationException: when called on a client-terminated
    688                 connection.
    689             ConnectionTerminatedException: when read returns empty
    690                 string.
    691             InvalidFrameException: when the frame contains invalid
    692                 data.
    693             UnsupportedFrameException: when the received frame has
    694                 flags, opcode we cannot handle. You can ignore this
    695                 exception and continue receiving the next frame.
    696         """
    697 
    698         if self._request.client_terminated:
    699             raise BadOperationException(
    700                 'Requested receive_message after receiving a closing '
    701                 'handshake')
    702 
    703         while True:
    704             # mp_conn.read will block if no bytes are available.
    705             # Timeout is controlled by TimeOut directive of Apache.
    706 
    707             frame = self._receive_frame_as_frame_object()
    708 
    709             # Check the constraint on the payload size for control frames
    710             # before extension processes the frame.
    711             # See also http://tools.ietf.org/html/rfc6455#section-5.5
    712             if (common.is_control_opcode(frame.opcode) and
    713                 len(frame.payload) > 125):
    714                 raise InvalidFrameException(
    715                     'Payload data size of control frames must be 125 bytes or '
    716                     'less')
    717 
    718             for frame_filter in self._options.incoming_frame_filters:
    719                 frame_filter.filter(frame)
    720 
    721             if frame.rsv1 or frame.rsv2 or frame.rsv3:
    722                 raise UnsupportedFrameException(
    723                     'Unsupported flag is set (rsv = %d%d%d)' %
    724                     (frame.rsv1, frame.rsv2, frame.rsv3))
    725 
    726             message = self._get_message_from_frame(frame)
    727             if message is None:
    728                 continue
    729 
    730             for message_filter in self._options.incoming_message_filters:
    731                 message = message_filter.filter(message)
    732 
    733             if self._original_opcode == common.OPCODE_TEXT:
    734                 # The WebSocket protocol section 4.4 specifies that invalid
    735                 # characters must be replaced with U+fffd REPLACEMENT
    736                 # CHARACTER.
    737                 try:
    738                     return message.decode('utf-8')
    739                 except UnicodeDecodeError, e:
    740                     raise InvalidUTF8Exception(e)
    741             elif self._original_opcode == common.OPCODE_BINARY:
    742                 return message
    743             elif self._original_opcode == common.OPCODE_CLOSE:
    744                 self._process_close_message(message)
    745                 return None
    746             elif self._original_opcode == common.OPCODE_PING:
    747                 self._process_ping_message(message)
    748             elif self._original_opcode == common.OPCODE_PONG:
    749                 self._process_pong_message(message)
    750             else:
    751                 raise UnsupportedFrameException(
    752                     'Opcode %d is not supported' % self._original_opcode)
    753 
    754     def _send_closing_handshake(self, code, reason):
    755         body = create_closing_handshake_body(code, reason)
    756         frame = create_close_frame(
    757             body, mask=self._options.mask_send,
    758             frame_filters=self._options.outgoing_frame_filters)
    759 
    760         self._request.server_terminated = True
    761 
    762         self._write(frame)
    763 
    764     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
    765         """Closes a WebSocket connection.
    766 
    767         Args:
    768             code: Status code for close frame. If code is None, a close
    769                 frame with empty body will be sent.
    770             reason: string representing close reason.
    771         Raises:
    772             BadOperationException: when reason is specified with code None
    773             or reason is not an instance of both str and unicode.
    774         """
    775 
    776         if self._request.server_terminated:
    777             self._logger.debug(
    778                 'Requested close_connection but server is already terminated')
    779             return
    780 
    781         if code is None:
    782             if reason is not None and len(reason) > 0:
    783                 raise BadOperationException(
    784                     'close reason must not be specified if code is None')
    785             reason = ''
    786         else:
    787             if not isinstance(reason, str) and not isinstance(reason, unicode):
    788                 raise BadOperationException(
    789                     'close reason must be an instance of str or unicode')
    790 
    791         self._send_closing_handshake(code, reason)
    792         self._logger.debug(
    793             'Sent server-initiated closing handshake (code=%r, reason=%r)',
    794             code, reason)
    795 
    796         if (code == common.STATUS_GOING_AWAY or
    797             code == common.STATUS_PROTOCOL_ERROR):
    798             # It doesn't make sense to wait for a close frame if the reason is
    799             # protocol error or that the server is going away. For some of
    800             # other reasons, it might not make sense to wait for a close frame,
    801             # but it's not clear, yet.
    802             return
    803 
    804         # TODO(ukai): 2. wait until the /client terminated/ flag has been set,
    805         # or until a server-defined timeout expires.
    806         #
    807         # For now, we expect receiving closing handshake right after sending
    808         # out closing handshake.
    809         message = self.receive_message()
    810         if message is not None:
    811             raise ConnectionTerminatedException(
    812                 'Didn\'t receive valid ack for closing handshake')
    813         # TODO: 3. close the WebSocket connection.
    814         # note: mod_python Connection (mp_conn) doesn't have close method.
    815 
    816     def send_ping(self, body=''):
    817         frame = create_ping_frame(
    818             body,
    819             self._options.mask_send,
    820             self._options.outgoing_frame_filters)
    821         self._write(frame)
    822 
    823         self._ping_queue.append(body)
    824 
    825     def _send_pong(self, body):
    826         frame = create_pong_frame(
    827             body,
    828             self._options.mask_send,
    829             self._options.outgoing_frame_filters)
    830         self._write(frame)
    831 
    832     def get_last_received_opcode(self):
    833         """Returns the opcode of the WebSocket message which the last received
    834         frame belongs to. The return value is valid iff immediately after
    835         receive_message call.
    836         """
    837 
    838         return self._original_opcode
    839 
    840     def _drain_received_data(self):
    841         """Drains unread data in the receive buffer to avoid sending out TCP
    842         RST packet. This is because when deflate-stream is enabled, some
    843         DEFLATE block for flushing data may follow a close frame. If any data
    844         remains in the receive buffer of a socket when the socket is closed,
    845         it sends out TCP RST packet to the other peer.
    846 
    847         Since mod_python's mp_conn object doesn't support non-blocking read,
    848         we perform this only when pywebsocket is running in standalone mode.
    849         """
    850 
    851         # If self._options.deflate_stream is true, self._request is
    852         # DeflateRequest, so we can get wrapped request object by
    853         # self._request._request.
    854         #
    855         # Only _StandaloneRequest has _drain_received_data method.
    856         if (self._options.deflate_stream and
    857             ('_drain_received_data' in dir(self._request._request))):
    858             self._request._request._drain_received_data()
    859 
    860 
    861 # vi:sts=4 sw=4 et
    862