Home | History | Annotate | Download | only in mod_pywebsocket
      1 # Copyright 2012, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 
     31 """This file provides classes and helper functions for multiplexing extension.
     32 
     33 Specification:
     34 http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-06
     35 """
     36 
     37 
     38 import collections
     39 import copy
     40 import email
     41 import email.parser
     42 import logging
     43 import math
     44 import struct
     45 import threading
     46 import traceback
     47 
     48 from mod_pywebsocket import common
     49 from mod_pywebsocket import handshake
     50 from mod_pywebsocket import util
     51 from mod_pywebsocket._stream_base import BadOperationException
     52 from mod_pywebsocket._stream_base import ConnectionTerminatedException
     53 from mod_pywebsocket._stream_base import InvalidFrameException
     54 from mod_pywebsocket._stream_hybi import Frame
     55 from mod_pywebsocket._stream_hybi import Stream
     56 from mod_pywebsocket._stream_hybi import StreamOptions
     57 from mod_pywebsocket._stream_hybi import create_binary_frame
     58 from mod_pywebsocket._stream_hybi import create_closing_handshake_body
     59 from mod_pywebsocket._stream_hybi import create_header
     60 from mod_pywebsocket._stream_hybi import create_length_header
     61 from mod_pywebsocket._stream_hybi import parse_frame
     62 from mod_pywebsocket.handshake import hybi
     63 
     64 
     65 _CONTROL_CHANNEL_ID = 0
     66 _DEFAULT_CHANNEL_ID = 1
     67 
     68 _MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
     69 _MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
     70 _MUX_OPCODE_FLOW_CONTROL = 2
     71 _MUX_OPCODE_DROP_CHANNEL = 3
     72 _MUX_OPCODE_NEW_CHANNEL_SLOT = 4
     73 
     74 _MAX_CHANNEL_ID = 2 ** 29 - 1
     75 
     76 _INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
     77 _INITIAL_QUOTA_FOR_CLIENT = 8 * 1024
     78 
     79 _HANDSHAKE_ENCODING_IDENTITY = 0
     80 _HANDSHAKE_ENCODING_DELTA = 1
     81 
     82 # We need only these status code for now.
     83 _HTTP_BAD_RESPONSE_MESSAGES = {
     84     common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
     85 }
     86 
     87 # DropChannel reason code
     88 # TODO(bashi): Define all reason code defined in -05 draft.
     89 _DROP_CODE_NORMAL_CLOSURE = 1000
     90 
     91 _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE = 2001
     92 _DROP_CODE_CHANNEL_ID_TRUNCATED = 2002
     93 _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED = 2003
     94 _DROP_CODE_UNKNOWN_MUX_OPCODE = 2004
     95 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK = 2005
     96 _DROP_CODE_CHANNEL_ALREADY_EXISTS = 2006
     97 _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION = 2007
     98 _DROP_CODE_UNKNOWN_REQUEST_ENCODING = 2010
     99 
    100 _DROP_CODE_SEND_QUOTA_VIOLATION = 3005
    101 _DROP_CODE_SEND_QUOTA_OVERFLOW = 3006
    102 _DROP_CODE_ACKNOWLEDGED = 3008
    103 _DROP_CODE_BAD_FRAGMENTATION = 3009
    104 
    105 
    106 class MuxUnexpectedException(Exception):
    107     """Exception in handling multiplexing extension."""
    108     pass
    109 
    110 
    111 # Temporary
    112 class MuxNotImplementedException(Exception):
    113     """Raised when a flow enters unimplemented code path."""
    114     pass
    115 
    116 
    117 class LogicalConnectionClosedException(Exception):
    118     """Raised when logical connection is gracefully closed."""
    119     pass
    120 
    121 
    122 class PhysicalConnectionError(Exception):
    123     """Raised when there is a physical connection error."""
    124     def __init__(self, drop_code, message=''):
    125         super(PhysicalConnectionError, self).__init__(
    126             'code=%d, message=%r' % (drop_code, message))
    127         self.drop_code = drop_code
    128         self.message = message
    129 
    130 
    131 class LogicalChannelError(Exception):
    132     """Raised when there is a logical channel error."""
    133     def __init__(self, channel_id, drop_code, message=''):
    134         super(LogicalChannelError, self).__init__(
    135             'channel_id=%d, code=%d, message=%r' % (
    136                 channel_id, drop_code, message))
    137         self.channel_id = channel_id
    138         self.drop_code = drop_code
    139         self.message = message
    140 
    141 
    142 def _encode_channel_id(channel_id):
    143     if channel_id < 0:
    144         raise ValueError('Channel id %d must not be negative' % channel_id)
    145 
    146     if channel_id < 2 ** 7:
    147         return chr(channel_id)
    148     if channel_id < 2 ** 14:
    149         return struct.pack('!H', 0x8000 + channel_id)
    150     if channel_id < 2 ** 21:
    151         first = chr(0xc0 + (channel_id >> 16))
    152         return first + struct.pack('!H', channel_id & 0xffff)
    153     if channel_id < 2 ** 29:
    154         return struct.pack('!L', 0xe0000000 + channel_id)
    155 
    156     raise ValueError('Channel id %d is too large' % channel_id)
    157 
    158 
    159 def _encode_number(number):
    160     return create_length_header(number, False)
    161 
    162 
    163 def _create_add_channel_response(channel_id, encoded_handshake,
    164                                  encoding=0, rejected=False):
    165     if encoding != 0 and encoding != 1:
    166         raise ValueError('Invalid encoding %d' % encoding)
    167 
    168     first_byte = ((_MUX_OPCODE_ADD_CHANNEL_RESPONSE << 5) |
    169                   (rejected << 4) | encoding)
    170     block = (chr(first_byte) +
    171              _encode_channel_id(channel_id) +
    172              _encode_number(len(encoded_handshake)) +
    173              encoded_handshake)
    174     return block
    175 
    176 
    177 def _create_drop_channel(channel_id, code=None, message=''):
    178     if len(message) > 0 and code is None:
    179         raise ValueError('Code must be specified if message is specified')
    180 
    181     first_byte = _MUX_OPCODE_DROP_CHANNEL << 5
    182     block = chr(first_byte) + _encode_channel_id(channel_id)
    183     if code is None:
    184         block += _encode_number(0) # Reason size
    185     else:
    186         reason = struct.pack('!H', code) + message
    187         reason_size = _encode_number(len(reason))
    188         block += reason_size + reason
    189 
    190     return block
    191 
    192 
    193 def _create_flow_control(channel_id, replenished_quota):
    194     first_byte = _MUX_OPCODE_FLOW_CONTROL << 5
    195     block = (chr(first_byte) +
    196              _encode_channel_id(channel_id) +
    197              _encode_number(replenished_quota))
    198     return block
    199 
    200 
    201 def _create_new_channel_slot(slots, send_quota):
    202     if slots < 0 or send_quota < 0:
    203         raise ValueError('slots and send_quota must be non-negative.')
    204     first_byte = _MUX_OPCODE_NEW_CHANNEL_SLOT << 5
    205     block = (chr(first_byte) +
    206              _encode_number(slots) +
    207              _encode_number(send_quota))
    208     return block
    209 
    210 
    211 def _create_fallback_new_channel_slot():
    212     first_byte = (_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 1 # Set the F flag
    213     block = (chr(first_byte) + _encode_number(0) + _encode_number(0))
    214     return block
    215 
    216 
    217 def _parse_request_text(request_text):
    218     request_line, header_lines = request_text.split('\r\n', 1)
    219 
    220     words = request_line.split(' ')
    221     if len(words) != 3:
    222         raise ValueError('Bad Request-Line syntax %r' % request_line)
    223     [command, path, version] = words
    224     if version != 'HTTP/1.1':
    225         raise ValueError('Bad request version %r' % version)
    226 
    227     # email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
    228     # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
    229     # RFC 822.
    230     headers = email.parser.Parser().parsestr(header_lines)
    231     return command, path, version, headers
    232 
    233 
    234 class _ControlBlock(object):
    235     """A structure that holds parsing result of multiplexing control block.
    236     Control block specific attributes will be added by _MuxFramePayloadParser.
    237     (e.g. encoded_handshake will be added for AddChannelRequest and
    238     AddChannelResponse)
    239     """
    240 
    241     def __init__(self, opcode):
    242         self.opcode = opcode
    243 
    244 
    245 class _MuxFramePayloadParser(object):
    246     """A class that parses multiplexed frame payload."""
    247 
    248     def __init__(self, payload):
    249         self._data = payload
    250         self._read_position = 0
    251         self._logger = util.get_class_logger(self)
    252 
    253     def read_channel_id(self):
    254         """Reads channel id.
    255 
    256         Raises:
    257             ValueError: when the payload doesn't contain
    258                 valid channel id.
    259         """
    260 
    261         remaining_length = len(self._data) - self._read_position
    262         pos = self._read_position
    263         if remaining_length == 0:
    264             raise ValueError('Invalid channel id format')
    265 
    266         channel_id = ord(self._data[pos])
    267         channel_id_length = 1
    268         if channel_id & 0xe0 == 0xe0:
    269             if remaining_length < 4:
    270                 raise ValueError('Invalid channel id format')
    271             channel_id = struct.unpack('!L',
    272                                        self._data[pos:pos+4])[0] & 0x1fffffff
    273             channel_id_length = 4
    274         elif channel_id & 0xc0 == 0xc0:
    275             if remaining_length < 3:
    276                 raise ValueError('Invalid channel id format')
    277             channel_id = (((channel_id & 0x1f) << 16) +
    278                           struct.unpack('!H', self._data[pos+1:pos+3])[0])
    279             channel_id_length = 3
    280         elif channel_id & 0x80 == 0x80:
    281             if remaining_length < 2:
    282                 raise ValueError('Invalid channel id format')
    283             channel_id = struct.unpack('!H',
    284                                        self._data[pos:pos+2])[0] & 0x3fff
    285             channel_id_length = 2
    286         self._read_position += channel_id_length
    287 
    288         return channel_id
    289 
    290     def read_inner_frame(self):
    291         """Reads an inner frame.
    292 
    293         Raises:
    294             PhysicalConnectionError: when the inner frame is invalid.
    295         """
    296 
    297         if len(self._data) == self._read_position:
    298             raise PhysicalConnectionError(
    299                 _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED)
    300 
    301         bits = ord(self._data[self._read_position])
    302         self._read_position += 1
    303         fin = (bits & 0x80) == 0x80
    304         rsv1 = (bits & 0x40) == 0x40
    305         rsv2 = (bits & 0x20) == 0x20
    306         rsv3 = (bits & 0x10) == 0x10
    307         opcode = bits & 0xf
    308         payload = self.remaining_data()
    309         # Consume rest of the message which is payload data of the original
    310         # frame.
    311         self._read_position = len(self._data)
    312         return fin, rsv1, rsv2, rsv3, opcode, payload
    313 
    314     def _read_number(self):
    315         if self._read_position + 1 > len(self._data):
    316             raise ValueError(
    317                 'Cannot read the first byte of number field')
    318 
    319         number = ord(self._data[self._read_position])
    320         if number & 0x80 == 0x80:
    321             raise ValueError(
    322                 'The most significant bit of the first byte of number should '
    323                 'be unset')
    324         self._read_position += 1
    325         pos = self._read_position
    326         if number == 127:
    327             if pos + 8 > len(self._data):
    328                 raise ValueError('Invalid number field')
    329             self._read_position += 8
    330             number = struct.unpack('!Q', self._data[pos:pos+8])[0]
    331             if number > 0x7FFFFFFFFFFFFFFF:
    332                 raise ValueError('Encoded number(%d) >= 2^63' % number)
    333             if number <= 0xFFFF:
    334                 raise ValueError(
    335                     '%d should not be encoded by 9 bytes encoding' % number)
    336             return number
    337         if number == 126:
    338             if pos + 2 > len(self._data):
    339                 raise ValueError('Invalid number field')
    340             self._read_position += 2
    341             number = struct.unpack('!H', self._data[pos:pos+2])[0]
    342             if number <= 125:
    343                 raise ValueError(
    344                     '%d should not be encoded by 3 bytes encoding' % number)
    345         return number
    346 
    347     def _read_size_and_contents(self):
    348         """Reads data that consists of followings:
    349             - the size of the contents encoded the same way as payload length
    350               of the WebSocket Protocol with 1 bit padding at the head.
    351             - the contents.
    352         """
    353 
    354         try:
    355             size = self._read_number()
    356         except ValueError, e:
    357             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    358                                           str(e))
    359         pos = self._read_position
    360         if pos + size > len(self._data):
    361             raise PhysicalConnectionError(
    362                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    363                 'Cannot read %d bytes data' % size)
    364 
    365         self._read_position += size
    366         return self._data[pos:pos+size]
    367 
    368     def _read_add_channel_request(self, first_byte, control_block):
    369         reserved = (first_byte >> 2) & 0x7
    370         if reserved != 0:
    371             raise PhysicalConnectionError(
    372                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    373                 'Reserved bits must be unset')
    374 
    375         # Invalid encoding will be handled by MuxHandler.
    376         encoding = first_byte & 0x3
    377         try:
    378             control_block.channel_id = self.read_channel_id()
    379         except ValueError, e:
    380             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
    381         control_block.encoding = encoding
    382         encoded_handshake = self._read_size_and_contents()
    383         control_block.encoded_handshake = encoded_handshake
    384         return control_block
    385 
    386     def _read_add_channel_response(self, first_byte, control_block):
    387         reserved = (first_byte >> 2) & 0x3
    388         if reserved != 0:
    389             raise PhysicalConnectionError(
    390                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    391                 'Reserved bits must be unset')
    392 
    393         control_block.accepted = (first_byte >> 4) & 1
    394         control_block.encoding = first_byte & 0x3
    395         try:
    396             control_block.channel_id = self.read_channel_id()
    397         except ValueError, e:
    398             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
    399         control_block.encoded_handshake = self._read_size_and_contents()
    400         return control_block
    401 
    402     def _read_flow_control(self, first_byte, control_block):
    403         reserved = first_byte & 0x1f
    404         if reserved != 0:
    405             raise PhysicalConnectionError(
    406                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    407                 'Reserved bits must be unset')
    408 
    409         try:
    410             control_block.channel_id = self.read_channel_id()
    411             control_block.send_quota = self._read_number()
    412         except ValueError, e:
    413             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    414                                           str(e))
    415 
    416         return control_block
    417 
    418     def _read_drop_channel(self, first_byte, control_block):
    419         reserved = first_byte & 0x1f
    420         if reserved != 0:
    421             raise PhysicalConnectionError(
    422                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    423                 'Reserved bits must be unset')
    424 
    425         try:
    426             control_block.channel_id = self.read_channel_id()
    427         except ValueError, e:
    428             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK)
    429         reason = self._read_size_and_contents()
    430         if len(reason) == 0:
    431             control_block.drop_code = None
    432             control_block.drop_message = ''
    433         elif len(reason) >= 2:
    434             control_block.drop_code = struct.unpack('!H', reason[:2])[0]
    435             control_block.drop_message = reason[2:]
    436         else:
    437             raise PhysicalConnectionError(
    438                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    439                 'Received DropChannel that conains only 1-byte reason')
    440         return control_block
    441 
    442     def _read_new_channel_slot(self, first_byte, control_block):
    443         reserved = first_byte & 0x1e
    444         if reserved != 0:
    445             raise PhysicalConnectionError(
    446                 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    447                 'Reserved bits must be unset')
    448         control_block.fallback = first_byte & 1
    449         try:
    450             control_block.slots = self._read_number()
    451             control_block.send_quota = self._read_number()
    452         except ValueError, e:
    453             raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
    454                                           str(e))
    455         return control_block
    456 
    457     def read_control_blocks(self):
    458         """Reads control block(s).
    459 
    460         Raises:
    461            PhysicalConnectionError: when the payload contains invalid control
    462                block(s).
    463            StopIteration: when no control blocks left.
    464         """
    465 
    466         while self._read_position < len(self._data):
    467             first_byte = ord(self._data[self._read_position])
    468             self._read_position += 1
    469             opcode = (first_byte >> 5) & 0x7
    470             control_block = _ControlBlock(opcode=opcode)
    471             if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
    472                 yield self._read_add_channel_request(first_byte, control_block)
    473             elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
    474                 yield self._read_add_channel_response(
    475                     first_byte, control_block)
    476             elif opcode == _MUX_OPCODE_FLOW_CONTROL:
    477                 yield self._read_flow_control(first_byte, control_block)
    478             elif opcode == _MUX_OPCODE_DROP_CHANNEL:
    479                 yield self._read_drop_channel(first_byte, control_block)
    480             elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
    481                 yield self._read_new_channel_slot(first_byte, control_block)
    482             else:
    483                 raise PhysicalConnectionError(
    484                     _DROP_CODE_UNKNOWN_MUX_OPCODE,
    485                     'Invalid opcode %d' % opcode)
    486 
    487         assert self._read_position == len(self._data)
    488         raise StopIteration
    489 
    490     def remaining_data(self):
    491         """Returns remaining data."""
    492 
    493         return self._data[self._read_position:]
    494 
    495 
    496 class _LogicalRequest(object):
    497     """Mimics mod_python request."""
    498 
    499     def __init__(self, channel_id, command, path, protocol, headers,
    500                  connection):
    501         """Constructs an instance.
    502 
    503         Args:
    504             channel_id: the channel id of the logical channel.
    505             command: HTTP request command.
    506             path: HTTP request path.
    507             headers: HTTP headers.
    508             connection: _LogicalConnection instance.
    509         """
    510 
    511         self.channel_id = channel_id
    512         self.method = command
    513         self.uri = path
    514         self.protocol = protocol
    515         self.headers_in = headers
    516         self.connection = connection
    517         self.server_terminated = False
    518         self.client_terminated = False
    519 
    520     def is_https(self):
    521         """Mimics request.is_https(). Returns False because this method is
    522         used only by old protocols (hixie and hybi00).
    523         """
    524 
    525         return False
    526 
    527 
    528 class _LogicalConnection(object):
    529     """Mimics mod_python mp_conn."""
    530 
    531     # For details, see the comment of set_read_state().
    532     STATE_ACTIVE = 1
    533     STATE_GRACEFULLY_CLOSED = 2
    534     STATE_TERMINATED = 3
    535 
    536     def __init__(self, mux_handler, channel_id):
    537         """Constructs an instance.
    538 
    539         Args:
    540             mux_handler: _MuxHandler instance.
    541             channel_id: channel id of this connection.
    542         """
    543 
    544         self._mux_handler = mux_handler
    545         self._channel_id = channel_id
    546         self._incoming_data = ''
    547 
    548         # - Protects _waiting_write_completion
    549         # - Signals the thread waiting for completion of write by mux handler
    550         self._write_condition = threading.Condition()
    551         self._waiting_write_completion = False
    552 
    553         self._read_condition = threading.Condition()
    554         self._read_state = self.STATE_ACTIVE
    555 
    556     def get_local_addr(self):
    557         """Getter to mimic mp_conn.local_addr."""
    558 
    559         return self._mux_handler.physical_connection.get_local_addr()
    560     local_addr = property(get_local_addr)
    561 
    562     def get_remote_addr(self):
    563         """Getter to mimic mp_conn.remote_addr."""
    564 
    565         return self._mux_handler.physical_connection.get_remote_addr()
    566     remote_addr = property(get_remote_addr)
    567 
    568     def get_memorized_lines(self):
    569         """Gets memorized lines. Not supported."""
    570 
    571         raise MuxUnexpectedException('_LogicalConnection does not support '
    572                                      'get_memorized_lines')
    573 
    574     def write(self, data):
    575         """Writes data. mux_handler sends data asynchronously. The caller will
    576         be suspended until write done.
    577 
    578         Args:
    579             data: data to be written.
    580 
    581         Raises:
    582             MuxUnexpectedException: when called before finishing the previous
    583                 write.
    584         """
    585 
    586         try:
    587             self._write_condition.acquire()
    588             if self._waiting_write_completion:
    589                 raise MuxUnexpectedException(
    590                     'Logical connection %d is already waiting the completion '
    591                     'of write' % self._channel_id)
    592 
    593             self._waiting_write_completion = True
    594             self._mux_handler.send_data(self._channel_id, data)
    595             self._write_condition.wait()
    596             # TODO(tyoshino): Raise an exception if woke up by on_writer_done.
    597         finally:
    598             self._write_condition.release()
    599 
    600     def write_control_data(self, data):
    601         """Writes data via the control channel. Don't wait finishing write
    602         because this method can be called by mux dispatcher.
    603 
    604         Args:
    605             data: data to be written.
    606         """
    607 
    608         self._mux_handler.send_control_data(data)
    609 
    610     def on_write_data_done(self):
    611         """Called when sending data is completed."""
    612 
    613         try:
    614             self._write_condition.acquire()
    615             if not self._waiting_write_completion:
    616                 raise MuxUnexpectedException(
    617                     'Invalid call of on_write_data_done for logical '
    618                     'connection %d' % self._channel_id)
    619             self._waiting_write_completion = False
    620             self._write_condition.notify()
    621         finally:
    622             self._write_condition.release()
    623 
    624     def on_writer_done(self):
    625         """Called by the mux handler when the writer thread has finished."""
    626 
    627         try:
    628             self._write_condition.acquire()
    629             self._waiting_write_completion = False
    630             self._write_condition.notify()
    631         finally:
    632             self._write_condition.release()
    633 
    634 
    635     def append_frame_data(self, frame_data):
    636         """Appends incoming frame data. Called when mux_handler dispatches
    637         frame data to the corresponding application.
    638 
    639         Args:
    640             frame_data: incoming frame data.
    641         """
    642 
    643         self._read_condition.acquire()
    644         self._incoming_data += frame_data
    645         self._read_condition.notify()
    646         self._read_condition.release()
    647 
    648     def read(self, length):
    649         """Reads data. Blocks until enough data has arrived via physical
    650         connection.
    651 
    652         Args:
    653             length: length of data to be read.
    654         Raises:
    655             LogicalConnectionClosedException: when closing handshake for this
    656                 logical channel has been received.
    657             ConnectionTerminatedException: when the physical connection has
    658                 closed, or an error is caused on the reader thread.
    659         """
    660 
    661         self._read_condition.acquire()
    662         while (self._read_state == self.STATE_ACTIVE and
    663                len(self._incoming_data) < length):
    664             self._read_condition.wait()
    665 
    666         try:
    667             if self._read_state == self.STATE_GRACEFULLY_CLOSED:
    668                 raise LogicalConnectionClosedException(
    669                     'Logical channel %d has closed.' % self._channel_id)
    670             elif self._read_state == self.STATE_TERMINATED:
    671                 raise ConnectionTerminatedException(
    672                     'Receiving %d byte failed. Logical channel (%d) closed' %
    673                     (length, self._channel_id))
    674 
    675             value = self._incoming_data[:length]
    676             self._incoming_data = self._incoming_data[length:]
    677         finally:
    678             self._read_condition.release()
    679 
    680         return value
    681 
    682     def set_read_state(self, new_state):
    683         """Sets the state of this connection. Called when an event for this
    684         connection has occurred.
    685 
    686         Args:
    687             new_state: state to be set. new_state must be one of followings:
    688             - STATE_GRACEFULLY_CLOSED: when closing handshake for this
    689                 connection has been received.
    690             - STATE_TERMINATED: when the physical connection has closed or
    691                 DropChannel of this connection has received.
    692         """
    693 
    694         self._read_condition.acquire()
    695         self._read_state = new_state
    696         self._read_condition.notify()
    697         self._read_condition.release()
    698 
    699 
    700 class _InnerMessage(object):
    701     """Holds the result of _InnerMessageBuilder.build().
    702     """
    703 
    704     def __init__(self, opcode, payload):
    705         self.opcode = opcode
    706         self.payload = payload
    707 
    708 
    709 class _InnerMessageBuilder(object):
    710     """A class that holds the context of inner message fragmentation and
    711     builds a message from fragmented inner frame(s).
    712     """
    713 
    714     def __init__(self):
    715         self._control_opcode = None
    716         self._pending_control_fragments = []
    717         self._message_opcode = None
    718         self._pending_message_fragments = []
    719         self._frame_handler = self._handle_first
    720 
    721     def _handle_first(self, frame):
    722         if frame.opcode == common.OPCODE_CONTINUATION:
    723             raise InvalidFrameException('Sending invalid continuation opcode')
    724 
    725         if common.is_control_opcode(frame.opcode):
    726             return self._process_first_fragmented_control(frame)
    727         else:
    728             return self._process_first_fragmented_message(frame)
    729 
    730     def _process_first_fragmented_control(self, frame):
    731         self._control_opcode = frame.opcode
    732         self._pending_control_fragments.append(frame.payload)
    733         if not frame.fin:
    734             self._frame_handler = self._handle_fragmented_control
    735             return None
    736         return self._reassemble_fragmented_control()
    737 
    738     def _process_first_fragmented_message(self, frame):
    739         self._message_opcode = frame.opcode
    740         self._pending_message_fragments.append(frame.payload)
    741         if not frame.fin:
    742             self._frame_handler = self._handle_fragmented_message
    743             return None
    744         return self._reassemble_fragmented_message()
    745 
    746     def _handle_fragmented_control(self, frame):
    747         if frame.opcode != common.OPCODE_CONTINUATION:
    748             raise InvalidFrameException(
    749                 'Sending invalid opcode %d while sending fragmented control '
    750                 'message' % frame.opcode)
    751         self._pending_control_fragments.append(frame.payload)
    752         if not frame.fin:
    753             return None
    754         return self._reassemble_fragmented_control()
    755 
    756     def _reassemble_fragmented_control(self):
    757         opcode = self._control_opcode
    758         payload = ''.join(self._pending_control_fragments)
    759         self._control_opcode = None
    760         self._pending_control_fragments = []
    761         if self._message_opcode is not None:
    762             self._frame_handler = self._handle_fragmented_message
    763         else:
    764             self._frame_handler = self._handle_first
    765         return _InnerMessage(opcode, payload)
    766 
    767     def _handle_fragmented_message(self, frame):
    768         # Sender can interleave a control message while sending fragmented
    769         # messages.
    770         if common.is_control_opcode(frame.opcode):
    771             if self._control_opcode is not None:
    772                 raise MuxUnexpectedException(
    773                     'Should not reach here(Bug in builder)')
    774             return self._process_first_fragmented_control(frame)
    775 
    776         if frame.opcode != common.OPCODE_CONTINUATION:
    777             raise InvalidFrameException(
    778                 'Sending invalid opcode %d while sending fragmented message' %
    779                 frame.opcode)
    780         self._pending_message_fragments.append(frame.payload)
    781         if not frame.fin:
    782             return None
    783         return self._reassemble_fragmented_message()
    784 
    785     def _reassemble_fragmented_message(self):
    786         opcode = self._message_opcode
    787         payload = ''.join(self._pending_message_fragments)
    788         self._message_opcode = None
    789         self._pending_message_fragments = []
    790         self._frame_handler = self._handle_first
    791         return _InnerMessage(opcode, payload)
    792 
    793     def build(self, frame):
    794         """Build an inner message. Returns an _InnerMessage instance when
    795         the given frame is the last fragmented frame. Returns None otherwise.
    796 
    797         Args:
    798             frame: an inner frame.
    799         Raises:
    800             InvalidFrameException: when received invalid opcode. (e.g.
    801                 receiving non continuation data opcode but the fin flag of
    802                 the previous inner frame was not set.)
    803         """
    804 
    805         return self._frame_handler(frame)
    806 
    807 
    808 class _LogicalStream(Stream):
    809     """Mimics the Stream class. This class interprets multiplexed WebSocket
    810     frames.
    811     """
    812 
    813     def __init__(self, request, stream_options, send_quota, receive_quota):
    814         """Constructs an instance.
    815 
    816         Args:
    817             request: _LogicalRequest instance.
    818             stream_options: StreamOptions instance.
    819             send_quota: Initial send quota.
    820             receive_quota: Initial receive quota.
    821         """
    822 
    823         # Physical stream is responsible for masking.
    824         stream_options.unmask_receive = False
    825         Stream.__init__(self, request, stream_options)
    826 
    827         self._send_closed = False
    828         self._send_quota = send_quota
    829         # - Protects _send_closed and _send_quota
    830         # - Signals the thread waiting for send quota replenished
    831         self._send_condition = threading.Condition()
    832 
    833         # The opcode of the first frame in messages.
    834         self._message_opcode = common.OPCODE_TEXT
    835         # True when the last message was fragmented.
    836         self._last_message_was_fragmented = False
    837 
    838         self._receive_quota = receive_quota
    839         self._write_inner_frame_semaphore = threading.Semaphore()
    840 
    841         self._inner_message_builder = _InnerMessageBuilder()
    842 
    843     def _create_inner_frame(self, opcode, payload, end=True):
    844         frame = Frame(fin=end, opcode=opcode, payload=payload)
    845         for frame_filter in self._options.outgoing_frame_filters:
    846             frame_filter.filter(frame)
    847 
    848         if len(payload) != len(frame.payload):
    849             raise MuxUnexpectedException(
    850                 'Mux extension must not be used after extensions which change '
    851                 ' frame boundary')
    852 
    853         first_byte = ((frame.fin << 7) | (frame.rsv1 << 6) |
    854                       (frame.rsv2 << 5) | (frame.rsv3 << 4) | frame.opcode)
    855         return chr(first_byte) + frame.payload
    856 
    857     def _write_inner_frame(self, opcode, payload, end=True):
    858         payload_length = len(payload)
    859         write_position = 0
    860 
    861         try:
    862             # An inner frame will be fragmented if there is no enough send
    863             # quota. This semaphore ensures that fragmented inner frames are
    864             # sent in order on the logical channel.
    865             # Note that frames that come from other logical channels or
    866             # multiplexing control blocks can be inserted between fragmented
    867             # inner frames on the physical channel.
    868             self._write_inner_frame_semaphore.acquire()
    869 
    870             # Consume an octet quota when this is the first fragmented frame.
    871             if opcode != common.OPCODE_CONTINUATION:
    872                 try:
    873                     self._send_condition.acquire()
    874                     while (not self._send_closed) and self._send_quota == 0:
    875                         self._send_condition.wait()
    876 
    877                     if self._send_closed:
    878                         raise BadOperationException(
    879                             'Logical connection %d is closed' %
    880                             self._request.channel_id)
    881 
    882                     self._send_quota -= 1
    883                 finally:
    884                     self._send_condition.release()
    885 
    886             while write_position < payload_length:
    887                 try:
    888                     self._send_condition.acquire()
    889                     while (not self._send_closed) and self._send_quota == 0:
    890                         self._logger.debug(
    891                             'No quota. Waiting FlowControl message for %d.' %
    892                             self._request.channel_id)
    893                         self._send_condition.wait()
    894 
    895                     if self._send_closed:
    896                         raise BadOperationException(
    897                             'Logical connection %d is closed' %
    898                             self.request._channel_id)
    899 
    900                     remaining = payload_length - write_position
    901                     write_length = min(self._send_quota, remaining)
    902                     inner_frame_end = (
    903                         end and
    904                         (write_position + write_length == payload_length))
    905 
    906                     inner_frame = self._create_inner_frame(
    907                         opcode,
    908                         payload[write_position:write_position+write_length],
    909                         inner_frame_end)
    910                     self._send_quota -= write_length
    911                     self._logger.debug('Consumed quota=%d, remaining=%d' %
    912                                        (write_length, self._send_quota))
    913                 finally:
    914                     self._send_condition.release()
    915 
    916                 # Writing data will block the worker so we need to release
    917                 # _send_condition before writing.
    918                 self._logger.debug('Sending inner frame: %r' % inner_frame)
    919                 self._request.connection.write(inner_frame)
    920                 write_position += write_length
    921 
    922                 opcode = common.OPCODE_CONTINUATION
    923 
    924         except ValueError, e:
    925             raise BadOperationException(e)
    926         finally:
    927             self._write_inner_frame_semaphore.release()
    928 
    929     def replenish_send_quota(self, send_quota):
    930         """Replenish send quota."""
    931 
    932         try:
    933             self._send_condition.acquire()
    934             if self._send_quota + send_quota > 0x7FFFFFFFFFFFFFFF:
    935                 self._send_quota = 0
    936                 raise LogicalChannelError(
    937                     self._request.channel_id, _DROP_CODE_SEND_QUOTA_OVERFLOW)
    938             self._send_quota += send_quota
    939             self._logger.debug('Replenished send quota for channel id %d: %d' %
    940                                (self._request.channel_id, self._send_quota))
    941         finally:
    942             self._send_condition.notify()
    943             self._send_condition.release()
    944 
    945     def consume_receive_quota(self, amount):
    946         """Consumes receive quota. Returns False on failure."""
    947 
    948         if self._receive_quota < amount:
    949             self._logger.debug('Violate quota on channel id %d: %d < %d' %
    950                                (self._request.channel_id,
    951                                 self._receive_quota, amount))
    952             return False
    953         self._receive_quota -= amount
    954         return True
    955 
    956     def send_message(self, message, end=True, binary=False):
    957         """Override Stream.send_message."""
    958 
    959         if self._request.server_terminated:
    960             raise BadOperationException(
    961                 'Requested send_message after sending out a closing handshake')
    962 
    963         if binary and isinstance(message, unicode):
    964             raise BadOperationException(
    965                 'Message for binary frame must be instance of str')
    966 
    967         if binary:
    968             opcode = common.OPCODE_BINARY
    969         else:
    970             opcode = common.OPCODE_TEXT
    971             message = message.encode('utf-8')
    972 
    973         for message_filter in self._options.outgoing_message_filters:
    974             message = message_filter.filter(message, end, binary)
    975 
    976         if self._last_message_was_fragmented:
    977             if opcode != self._message_opcode:
    978                 raise BadOperationException('Message types are different in '
    979                                             'frames for the same message')
    980             opcode = common.OPCODE_CONTINUATION
    981         else:
    982             self._message_opcode = opcode
    983 
    984         self._write_inner_frame(opcode, message, end)
    985         self._last_message_was_fragmented = not end
    986 
    987     def _receive_frame(self):
    988         """Overrides Stream._receive_frame.
    989 
    990         In addition to call Stream._receive_frame, this method adds the amount
    991         of payload to receiving quota and sends FlowControl to the client.
    992         We need to do it here because Stream.receive_message() handles
    993         control frames internally.
    994         """
    995 
    996         opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
    997         amount = len(payload)
    998         # Replenish extra one octet when receiving the first fragmented frame.
    999         if opcode != common.OPCODE_CONTINUATION:
   1000             amount += 1
   1001         self._receive_quota += amount
   1002         frame_data = _create_flow_control(self._request.channel_id,
   1003                                           amount)
   1004         self._logger.debug('Sending flow control for %d, replenished=%d' %
   1005                            (self._request.channel_id, amount))
   1006         self._request.connection.write_control_data(frame_data)
   1007         return opcode, payload, fin, rsv1, rsv2, rsv3
   1008 
   1009     def _get_message_from_frame(self, frame):
   1010         """Overrides Stream._get_message_from_frame.
   1011         """
   1012 
   1013         try:
   1014             inner_message = self._inner_message_builder.build(frame)
   1015         except InvalidFrameException:
   1016             raise LogicalChannelError(
   1017                 self._request.channel_id, _DROP_CODE_BAD_FRAGMENTATION)
   1018 
   1019         if inner_message is None:
   1020             return None
   1021         self._original_opcode = inner_message.opcode
   1022         return inner_message.payload
   1023 
   1024     def receive_message(self):
   1025         """Overrides Stream.receive_message."""
   1026 
   1027         # Just call Stream.receive_message(), but catch
   1028         # LogicalConnectionClosedException, which is raised when the logical
   1029         # connection has closed gracefully.
   1030         try:
   1031             return Stream.receive_message(self)
   1032         except LogicalConnectionClosedException, e:
   1033             self._logger.debug('%s', e)
   1034             return None
   1035 
   1036     def _send_closing_handshake(self, code, reason):
   1037         """Overrides Stream._send_closing_handshake."""
   1038 
   1039         body = create_closing_handshake_body(code, reason)
   1040         self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
   1041                            (self._request.channel_id, code, reason))
   1042         self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)
   1043 
   1044         self._request.server_terminated = True
   1045 
   1046     def send_ping(self, body=''):
   1047         """Overrides Stream.send_ping"""
   1048 
   1049         self._logger.debug('Sending ping on logical channel %d: %r' %
   1050                            (self._request.channel_id, body))
   1051         self._write_inner_frame(common.OPCODE_PING, body, end=True)
   1052 
   1053         self._ping_queue.append(body)
   1054 
   1055     def _send_pong(self, body):
   1056         """Overrides Stream._send_pong"""
   1057 
   1058         self._logger.debug('Sending pong on logical channel %d: %r' %
   1059                            (self._request.channel_id, body))
   1060         self._write_inner_frame(common.OPCODE_PONG, body, end=True)
   1061 
   1062     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
   1063         """Overrides Stream.close_connection."""
   1064 
   1065         # TODO(bashi): Implement
   1066         self._logger.debug('Closing logical connection %d' %
   1067                            self._request.channel_id)
   1068         self._request.server_terminated = True
   1069 
   1070     def stop_sending(self):
   1071         """Stops accepting new send operation (_write_inner_frame)."""
   1072 
   1073         self._send_condition.acquire()
   1074         self._send_closed = True
   1075         self._send_condition.notify()
   1076         self._send_condition.release()
   1077 
   1078 
   1079 class _OutgoingData(object):
   1080     """A structure that holds data to be sent via physical connection and
   1081     origin of the data.
   1082     """
   1083 
   1084     def __init__(self, channel_id, data):
   1085         self.channel_id = channel_id
   1086         self.data = data
   1087 
   1088 
   1089 class _PhysicalConnectionWriter(threading.Thread):
   1090     """A thread that is responsible for writing data to physical connection.
   1091 
   1092     TODO(bashi): Make sure there is no thread-safety problem when the reader
   1093     thread reads data from the same socket at a time.
   1094     """
   1095 
   1096     def __init__(self, mux_handler):
   1097         """Constructs an instance.
   1098 
   1099         Args:
   1100             mux_handler: _MuxHandler instance.
   1101         """
   1102 
   1103         threading.Thread.__init__(self)
   1104         self._logger = util.get_class_logger(self)
   1105         self._mux_handler = mux_handler
   1106         self.setDaemon(True)
   1107 
   1108         # When set, make this thread stop accepting new data, flush pending
   1109         # data and exit.
   1110         self._stop_requested = False
   1111         # The close code of the physical connection.
   1112         self._close_code = common.STATUS_NORMAL_CLOSURE
   1113         # Deque for passing write data. It's protected by _deque_condition
   1114         # until _stop_requested is set.
   1115         self._deque = collections.deque()
   1116         # - Protects _deque, _stop_requested and _close_code
   1117         # - Signals threads waiting for them to be available
   1118         self._deque_condition = threading.Condition()
   1119 
   1120     def put_outgoing_data(self, data):
   1121         """Puts outgoing data.
   1122 
   1123         Args:
   1124             data: _OutgoingData instance.
   1125 
   1126         Raises:
   1127             BadOperationException: when the thread has been requested to
   1128                 terminate.
   1129         """
   1130 
   1131         try:
   1132             self._deque_condition.acquire()
   1133             if self._stop_requested:
   1134                 raise BadOperationException('Cannot write data anymore')
   1135 
   1136             self._deque.append(data)
   1137             self._deque_condition.notify()
   1138         finally:
   1139             self._deque_condition.release()
   1140 
   1141     def _write_data(self, outgoing_data):
   1142         message = (_encode_channel_id(outgoing_data.channel_id) +
   1143                    outgoing_data.data)
   1144         try:
   1145             self._mux_handler.physical_stream.send_message(
   1146                 message=message, end=True, binary=True)
   1147         except Exception, e:
   1148             util.prepend_message_to_exception(
   1149                 'Failed to send message to %r: ' %
   1150                 (self._mux_handler.physical_connection.remote_addr,), e)
   1151             raise
   1152 
   1153         # TODO(bashi): It would be better to block the thread that sends
   1154         # control data as well.
   1155         if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
   1156             self._mux_handler.notify_write_data_done(outgoing_data.channel_id)
   1157 
   1158     def run(self):
   1159         try:
   1160             self._deque_condition.acquire()
   1161             while not self._stop_requested:
   1162                 if len(self._deque) == 0:
   1163                     self._deque_condition.wait()
   1164                     continue
   1165 
   1166                 outgoing_data = self._deque.popleft()
   1167 
   1168                 self._deque_condition.release()
   1169                 self._write_data(outgoing_data)
   1170                 self._deque_condition.acquire()
   1171 
   1172             # Flush deque.
   1173             #
   1174             # At this point, self._deque_condition is always acquired.
   1175             try:
   1176                 while len(self._deque) > 0:
   1177                     outgoing_data = self._deque.popleft()
   1178                     self._write_data(outgoing_data)
   1179             finally:
   1180                 self._deque_condition.release()
   1181 
   1182             # Close physical connection.
   1183             try:
   1184                 # Don't wait the response here. The response will be read
   1185                 # by the reader thread.
   1186                 self._mux_handler.physical_stream.close_connection(
   1187                     self._close_code, wait_response=False)
   1188             except Exception, e:
   1189                 util.prepend_message_to_exception(
   1190                     'Failed to close the physical connection: %r' % e)
   1191                 raise
   1192         finally:
   1193             self._mux_handler.notify_writer_done()
   1194 
   1195     def stop(self, close_code=common.STATUS_NORMAL_CLOSURE):
   1196         """Stops the writer thread."""
   1197 
   1198         self._deque_condition.acquire()
   1199         self._stop_requested = True
   1200         self._close_code = close_code
   1201         self._deque_condition.notify()
   1202         self._deque_condition.release()
   1203 
   1204 
   1205 class _PhysicalConnectionReader(threading.Thread):
   1206     """A thread that is responsible for reading data from physical connection.
   1207     """
   1208 
   1209     def __init__(self, mux_handler):
   1210         """Constructs an instance.
   1211 
   1212         Args:
   1213             mux_handler: _MuxHandler instance.
   1214         """
   1215 
   1216         threading.Thread.__init__(self)
   1217         self._logger = util.get_class_logger(self)
   1218         self._mux_handler = mux_handler
   1219         self.setDaemon(True)
   1220 
   1221     def run(self):
   1222         while True:
   1223             try:
   1224                 physical_stream = self._mux_handler.physical_stream
   1225                 message = physical_stream.receive_message()
   1226                 if message is None:
   1227                     break
   1228                 # Below happens only when a data message is received.
   1229                 opcode = physical_stream.get_last_received_opcode()
   1230                 if opcode != common.OPCODE_BINARY:
   1231                     self._mux_handler.fail_physical_connection(
   1232                         _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE,
   1233                         'Received a text message on physical connection')
   1234                     break
   1235 
   1236             except ConnectionTerminatedException, e:
   1237                 self._logger.debug('%s', e)
   1238                 break
   1239 
   1240             try:
   1241                 self._mux_handler.dispatch_message(message)
   1242             except PhysicalConnectionError, e:
   1243                 self._mux_handler.fail_physical_connection(
   1244                     e.drop_code, e.message)
   1245                 break
   1246             except LogicalChannelError, e:
   1247                 self._mux_handler.fail_logical_channel(
   1248                     e.channel_id, e.drop_code, e.message)
   1249             except Exception, e:
   1250                 self._logger.debug(traceback.format_exc())
   1251                 break
   1252 
   1253         self._mux_handler.notify_reader_done()
   1254 
   1255 
   1256 class _Worker(threading.Thread):
   1257     """A thread that is responsible for running the corresponding application
   1258     handler.
   1259     """
   1260 
   1261     def __init__(self, mux_handler, request):
   1262         """Constructs an instance.
   1263 
   1264         Args:
   1265             mux_handler: _MuxHandler instance.
   1266             request: _LogicalRequest instance.
   1267         """
   1268 
   1269         threading.Thread.__init__(self)
   1270         self._logger = util.get_class_logger(self)
   1271         self._mux_handler = mux_handler
   1272         self._request = request
   1273         self.setDaemon(True)
   1274 
   1275     def run(self):
   1276         self._logger.debug('Logical channel worker started. (id=%d)' %
   1277                            self._request.channel_id)
   1278         try:
   1279             # Non-critical exceptions will be handled by dispatcher.
   1280             self._mux_handler.dispatcher.transfer_data(self._request)
   1281         except LogicalChannelError, e:
   1282             self._mux_handler.fail_logical_channel(
   1283                 e.channel_id, e.drop_code, e.message)
   1284         finally:
   1285             self._mux_handler.notify_worker_done(self._request.channel_id)
   1286 
   1287 
   1288 class _MuxHandshaker(hybi.Handshaker):
   1289     """Opening handshake processor for multiplexing."""
   1290 
   1291     _DUMMY_WEBSOCKET_KEY = 'dGhlIHNhbXBsZSBub25jZQ=='
   1292 
   1293     def __init__(self, request, dispatcher, send_quota, receive_quota):
   1294         """Constructs an instance.
   1295         Args:
   1296             request: _LogicalRequest instance.
   1297             dispatcher: Dispatcher instance (dispatch.Dispatcher).
   1298             send_quota: Initial send quota.
   1299             receive_quota: Initial receive quota.
   1300         """
   1301 
   1302         hybi.Handshaker.__init__(self, request, dispatcher)
   1303         self._send_quota = send_quota
   1304         self._receive_quota = receive_quota
   1305 
   1306         # Append headers which should not be included in handshake field of
   1307         # AddChannelRequest.
   1308         # TODO(bashi): Make sure whether we should raise exception when
   1309         #     these headers are included already.
   1310         request.headers_in[common.UPGRADE_HEADER] = (
   1311             common.WEBSOCKET_UPGRADE_TYPE)
   1312         request.headers_in[common.SEC_WEBSOCKET_VERSION_HEADER] = (
   1313             str(common.VERSION_HYBI_LATEST))
   1314         request.headers_in[common.SEC_WEBSOCKET_KEY_HEADER] = (
   1315             self._DUMMY_WEBSOCKET_KEY)
   1316 
   1317     def _create_stream(self, stream_options):
   1318         """Override hybi.Handshaker._create_stream."""
   1319 
   1320         self._logger.debug('Creating logical stream for %d' %
   1321                            self._request.channel_id)
   1322         return _LogicalStream(
   1323             self._request, stream_options, self._send_quota,
   1324             self._receive_quota)
   1325 
   1326     def _create_handshake_response(self, accept):
   1327         """Override hybi._create_handshake_response."""
   1328 
   1329         response = []
   1330 
   1331         response.append('HTTP/1.1 101 Switching Protocols\r\n')
   1332 
   1333         # Upgrade and Sec-WebSocket-Accept should be excluded.
   1334         response.append('%s: %s\r\n' % (
   1335             common.CONNECTION_HEADER, common.UPGRADE_CONNECTION_TYPE))
   1336         if self._request.ws_protocol is not None:
   1337             response.append('%s: %s\r\n' % (
   1338                 common.SEC_WEBSOCKET_PROTOCOL_HEADER,
   1339                 self._request.ws_protocol))
   1340         if (self._request.ws_extensions is not None and
   1341             len(self._request.ws_extensions) != 0):
   1342             response.append('%s: %s\r\n' % (
   1343                 common.SEC_WEBSOCKET_EXTENSIONS_HEADER,
   1344                 common.format_extensions(self._request.ws_extensions)))
   1345         response.append('\r\n')
   1346 
   1347         return ''.join(response)
   1348 
   1349     def _send_handshake(self, accept):
   1350         """Override hybi.Handshaker._send_handshake."""
   1351 
   1352         # Don't send handshake response for the default channel
   1353         if self._request.channel_id == _DEFAULT_CHANNEL_ID:
   1354             return
   1355 
   1356         handshake_response = self._create_handshake_response(accept)
   1357         frame_data = _create_add_channel_response(
   1358                          self._request.channel_id,
   1359                          handshake_response)
   1360         self._logger.debug('Sending handshake response for %d: %r' %
   1361                            (self._request.channel_id, frame_data))
   1362         self._request.connection.write_control_data(frame_data)
   1363 
   1364 
   1365 class _LogicalChannelData(object):
   1366     """A structure that holds information about logical channel.
   1367     """
   1368 
   1369     def __init__(self, request, worker):
   1370         self.request = request
   1371         self.worker = worker
   1372         self.drop_code = _DROP_CODE_NORMAL_CLOSURE
   1373         self.drop_message = ''
   1374 
   1375 
   1376 class _HandshakeDeltaBase(object):
   1377     """A class that holds information for delta-encoded handshake."""
   1378 
   1379     def __init__(self, headers):
   1380         self._headers = headers
   1381 
   1382     def create_headers(self, delta=None):
   1383         """Creates request headers for an AddChannelRequest that has
   1384         delta-encoded handshake.
   1385 
   1386         Args:
   1387             delta: headers should be overridden.
   1388         """
   1389 
   1390         headers = copy.copy(self._headers)
   1391         if delta:
   1392             for key, value in delta.items():
   1393                 # The spec requires that a header with an empty value is
   1394                 # removed from the delta base.
   1395                 if len(value) == 0 and headers.has_key(key):
   1396                     del headers[key]
   1397                 else:
   1398                     headers[key] = value
   1399         return headers
   1400 
   1401 
   1402 class _MuxHandler(object):
   1403     """Multiplexing handler. When a handler starts, it launches three
   1404     threads; the reader thread, the writer thread, and a worker thread.
   1405 
   1406     The reader thread reads data from the physical stream, i.e., the
   1407     ws_stream object of the underlying websocket connection. The reader
   1408     thread interprets multiplexed frames and dispatches them to logical
   1409     channels. Methods of this class are mostly called by the reader thread.
   1410 
   1411     The writer thread sends multiplexed frames which are created by
   1412     logical channels via the physical connection.
   1413 
   1414     The worker thread launched at the starting point handles the
   1415     "Implicitly Opened Connection". If multiplexing handler receives
   1416     an AddChannelRequest and accepts it, the handler will launch a new worker
   1417     thread and dispatch the request to it.
   1418     """
   1419 
   1420     def __init__(self, request, dispatcher):
   1421         """Constructs an instance.
   1422 
   1423         Args:
   1424             request: mod_python request of the physical connection.
   1425             dispatcher: Dispatcher instance (dispatch.Dispatcher).
   1426         """
   1427 
   1428         self.original_request = request
   1429         self.dispatcher = dispatcher
   1430         self.physical_connection = request.connection
   1431         self.physical_stream = request.ws_stream
   1432         self._logger = util.get_class_logger(self)
   1433         self._logical_channels = {}
   1434         self._logical_channels_condition = threading.Condition()
   1435         # Holds client's initial quota
   1436         self._channel_slots = collections.deque()
   1437         self._handshake_base = None
   1438         self._worker_done_notify_received = False
   1439         self._reader = None
   1440         self._writer = None
   1441 
   1442     def start(self):
   1443         """Starts the handler.
   1444 
   1445         Raises:
   1446             MuxUnexpectedException: when the handler already started, or when
   1447                 opening handshake of the default channel fails.
   1448         """
   1449 
   1450         if self._reader or self._writer:
   1451             raise MuxUnexpectedException('MuxHandler already started')
   1452 
   1453         self._reader = _PhysicalConnectionReader(self)
   1454         self._writer = _PhysicalConnectionWriter(self)
   1455         self._reader.start()
   1456         self._writer.start()
   1457 
   1458         # Create "Implicitly Opened Connection".
   1459         logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
   1460         headers = copy.copy(self.original_request.headers_in)
   1461         # Add extensions for logical channel.
   1462         headers[common.SEC_WEBSOCKET_EXTENSIONS_HEADER] = (
   1463             common.format_extensions(
   1464                 self.original_request.mux_processor.extensions()))
   1465         self._handshake_base = _HandshakeDeltaBase(headers)
   1466         logical_request = _LogicalRequest(
   1467             _DEFAULT_CHANNEL_ID,
   1468             self.original_request.method,
   1469             self.original_request.uri,
   1470             self.original_request.protocol,
   1471             self._handshake_base.create_headers(),
   1472             logical_connection)
   1473         # Client's send quota for the implicitly opened connection is zero,
   1474         # but we will send FlowControl later so set the initial quota to
   1475         # _INITIAL_QUOTA_FOR_CLIENT.
   1476         self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
   1477         send_quota = self.original_request.mux_processor.quota()
   1478         if not self._do_handshake_for_logical_request(
   1479             logical_request, send_quota=send_quota):
   1480             raise MuxUnexpectedException(
   1481                 'Failed handshake on the default channel id')
   1482         self._add_logical_channel(logical_request)
   1483 
   1484         # Send FlowControl for the implicitly opened connection.
   1485         frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
   1486                                           _INITIAL_QUOTA_FOR_CLIENT)
   1487         logical_request.connection.write_control_data(frame_data)
   1488 
   1489     def add_channel_slots(self, slots, send_quota):
   1490         """Adds channel slots.
   1491 
   1492         Args:
   1493             slots: number of slots to be added.
   1494             send_quota: initial send quota for slots.
   1495         """
   1496 
   1497         self._channel_slots.extend([send_quota] * slots)
   1498         # Send NewChannelSlot to client.
   1499         frame_data = _create_new_channel_slot(slots, send_quota)
   1500         self.send_control_data(frame_data)
   1501 
   1502     def wait_until_done(self, timeout=None):
   1503         """Waits until all workers are done. Returns False when timeout has
   1504         occurred. Returns True on success.
   1505 
   1506         Args:
   1507             timeout: timeout in sec.
   1508         """
   1509 
   1510         self._logical_channels_condition.acquire()
   1511         try:
   1512             while len(self._logical_channels) > 0:
   1513                 self._logger.debug('Waiting workers(%d)...' %
   1514                                    len(self._logical_channels))
   1515                 self._worker_done_notify_received = False
   1516                 self._logical_channels_condition.wait(timeout)
   1517                 if not self._worker_done_notify_received:
   1518                     self._logger.debug('Waiting worker(s) timed out')
   1519                     return False
   1520         finally:
   1521             self._logical_channels_condition.release()
   1522 
   1523         # Flush pending outgoing data
   1524         self._writer.stop()
   1525         self._writer.join()
   1526 
   1527         return True
   1528 
   1529     def notify_write_data_done(self, channel_id):
   1530         """Called by the writer thread when a write operation has done.
   1531 
   1532         Args:
   1533             channel_id: objective channel id.
   1534         """
   1535 
   1536         try:
   1537             self._logical_channels_condition.acquire()
   1538             if channel_id in self._logical_channels:
   1539                 channel_data = self._logical_channels[channel_id]
   1540                 channel_data.request.connection.on_write_data_done()
   1541             else:
   1542                 self._logger.debug('Seems that logical channel for %d has gone'
   1543                                    % channel_id)
   1544         finally:
   1545             self._logical_channels_condition.release()
   1546 
   1547     def send_control_data(self, data):
   1548         """Sends data via the control channel.
   1549 
   1550         Args:
   1551             data: data to be sent.
   1552         """
   1553 
   1554         self._writer.put_outgoing_data(_OutgoingData(
   1555                 channel_id=_CONTROL_CHANNEL_ID, data=data))
   1556 
   1557     def send_data(self, channel_id, data):
   1558         """Sends data via given logical channel. This method is called by
   1559         worker threads.
   1560 
   1561         Args:
   1562             channel_id: objective channel id.
   1563             data: data to be sent.
   1564         """
   1565 
   1566         self._writer.put_outgoing_data(_OutgoingData(
   1567                 channel_id=channel_id, data=data))
   1568 
   1569     def _send_drop_channel(self, channel_id, code=None, message=''):
   1570         frame_data = _create_drop_channel(channel_id, code, message)
   1571         self._logger.debug(
   1572             'Sending drop channel for channel id %d' % channel_id)
   1573         self.send_control_data(frame_data)
   1574 
   1575     def _send_error_add_channel_response(self, channel_id, status=None):
   1576         if status is None:
   1577             status = common.HTTP_STATUS_BAD_REQUEST
   1578 
   1579         if status in _HTTP_BAD_RESPONSE_MESSAGES:
   1580             message = _HTTP_BAD_RESPONSE_MESSAGES[status]
   1581         else:
   1582             self._logger.debug('Response message for %d is not found' % status)
   1583             message = '???'
   1584 
   1585         response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
   1586         frame_data = _create_add_channel_response(channel_id,
   1587                                                   encoded_handshake=response,
   1588                                                   encoding=0, rejected=True)
   1589         self.send_control_data(frame_data)
   1590 
   1591     def _create_logical_request(self, block):
   1592         if block.channel_id == _CONTROL_CHANNEL_ID:
   1593             # TODO(bashi): Raise PhysicalConnectionError with code 2006
   1594             # instead of MuxUnexpectedException.
   1595             raise MuxUnexpectedException(
   1596                 'Received the control channel id (0) as objective channel '
   1597                 'id for AddChannel')
   1598 
   1599         if block.encoding > _HANDSHAKE_ENCODING_DELTA:
   1600             raise PhysicalConnectionError(
   1601                 _DROP_CODE_UNKNOWN_REQUEST_ENCODING)
   1602 
   1603         method, path, version, headers = _parse_request_text(
   1604             block.encoded_handshake)
   1605         if block.encoding == _HANDSHAKE_ENCODING_DELTA:
   1606             headers = self._handshake_base.create_headers(headers)
   1607 
   1608         connection = _LogicalConnection(self, block.channel_id)
   1609         request = _LogicalRequest(block.channel_id, method, path, version,
   1610                                   headers, connection)
   1611         return request
   1612 
   1613     def _do_handshake_for_logical_request(self, request, send_quota=0):
   1614         try:
   1615             receive_quota = self._channel_slots.popleft()
   1616         except IndexError:
   1617             raise LogicalChannelError(
   1618                 request.channel_id, _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION)
   1619 
   1620         handshaker = _MuxHandshaker(request, self.dispatcher,
   1621                                     send_quota, receive_quota)
   1622         try:
   1623             handshaker.do_handshake()
   1624         except handshake.VersionException, e:
   1625             self._logger.info('%s', e)
   1626             self._send_error_add_channel_response(
   1627                 request.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1628             return False
   1629         except handshake.HandshakeException, e:
   1630             # TODO(bashi): Should we _Fail the Logical Channel_ with 3001
   1631             # instead?
   1632             self._logger.info('%s', e)
   1633             self._send_error_add_channel_response(request.channel_id,
   1634                                                   status=e.status)
   1635             return False
   1636         except handshake.AbortedByUserException, e:
   1637             self._logger.info('%s', e)
   1638             self._send_error_add_channel_response(request.channel_id)
   1639             return False
   1640 
   1641         return True
   1642 
   1643     def _add_logical_channel(self, logical_request):
   1644         try:
   1645             self._logical_channels_condition.acquire()
   1646             if logical_request.channel_id in self._logical_channels:
   1647                 self._logger.debug('Channel id %d already exists' %
   1648                                    logical_request.channel_id)
   1649                 raise PhysicalConnectionError(
   1650                     _DROP_CODE_CHANNEL_ALREADY_EXISTS,
   1651                     'Channel id %d already exists' %
   1652                     logical_request.channel_id)
   1653             worker = _Worker(self, logical_request)
   1654             channel_data = _LogicalChannelData(logical_request, worker)
   1655             self._logical_channels[logical_request.channel_id] = channel_data
   1656             worker.start()
   1657         finally:
   1658             self._logical_channels_condition.release()
   1659 
   1660     def _process_add_channel_request(self, block):
   1661         try:
   1662             logical_request = self._create_logical_request(block)
   1663         except ValueError, e:
   1664             self._logger.debug('Failed to create logical request: %r' % e)
   1665             self._send_error_add_channel_response(
   1666                 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1667             return
   1668         if self._do_handshake_for_logical_request(logical_request):
   1669             if block.encoding == _HANDSHAKE_ENCODING_IDENTITY:
   1670                 # Update handshake base.
   1671                 # TODO(bashi): Make sure this is the right place to update
   1672                 # handshake base.
   1673                 self._handshake_base = _HandshakeDeltaBase(
   1674                     logical_request.headers_in)
   1675             self._add_logical_channel(logical_request)
   1676         else:
   1677             self._send_error_add_channel_response(
   1678                 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1679 
   1680     def _process_flow_control(self, block):
   1681         try:
   1682             self._logical_channels_condition.acquire()
   1683             if not block.channel_id in self._logical_channels:
   1684                 return
   1685             channel_data = self._logical_channels[block.channel_id]
   1686             channel_data.request.ws_stream.replenish_send_quota(
   1687                 block.send_quota)
   1688         finally:
   1689             self._logical_channels_condition.release()
   1690 
   1691     def _process_drop_channel(self, block):
   1692         self._logger.debug(
   1693             'DropChannel received for %d: code=%r, reason=%r' %
   1694             (block.channel_id, block.drop_code, block.drop_message))
   1695         try:
   1696             self._logical_channels_condition.acquire()
   1697             if not block.channel_id in self._logical_channels:
   1698                 return
   1699             channel_data = self._logical_channels[block.channel_id]
   1700             channel_data.drop_code = _DROP_CODE_ACKNOWLEDGED
   1701 
   1702             # Close the logical channel
   1703             channel_data.request.connection.set_read_state(
   1704                 _LogicalConnection.STATE_TERMINATED)
   1705             channel_data.request.ws_stream.stop_sending()
   1706         finally:
   1707             self._logical_channels_condition.release()
   1708 
   1709     def _process_control_blocks(self, parser):
   1710         for control_block in parser.read_control_blocks():
   1711             opcode = control_block.opcode
   1712             self._logger.debug('control block received, opcode: %d' % opcode)
   1713             if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
   1714                 self._process_add_channel_request(control_block)
   1715             elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE:
   1716                 raise PhysicalConnectionError(
   1717                     _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
   1718                     'Received AddChannelResponse')
   1719             elif opcode == _MUX_OPCODE_FLOW_CONTROL:
   1720                 self._process_flow_control(control_block)
   1721             elif opcode == _MUX_OPCODE_DROP_CHANNEL:
   1722                 self._process_drop_channel(control_block)
   1723             elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
   1724                 raise PhysicalConnectionError(
   1725                     _DROP_CODE_INVALID_MUX_CONTROL_BLOCK,
   1726                     'Received NewChannelSlot')
   1727             else:
   1728                 raise MuxUnexpectedException(
   1729                     'Unexpected opcode %r' % opcode)
   1730 
   1731     def _process_logical_frame(self, channel_id, parser):
   1732         self._logger.debug('Received a frame. channel id=%d' % channel_id)
   1733         try:
   1734             self._logical_channels_condition.acquire()
   1735             if not channel_id in self._logical_channels:
   1736                 # We must ignore the message for an inactive channel.
   1737                 return
   1738             channel_data = self._logical_channels[channel_id]
   1739             fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
   1740             consuming_byte = len(payload)
   1741             if opcode != common.OPCODE_CONTINUATION:
   1742                 consuming_byte += 1
   1743             if not channel_data.request.ws_stream.consume_receive_quota(
   1744                 consuming_byte):
   1745                 # The client violates quota. Close logical channel.
   1746                 raise LogicalChannelError(
   1747                     channel_id, _DROP_CODE_SEND_QUOTA_VIOLATION)
   1748             header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
   1749                                    mask=False)
   1750             frame_data = header + payload
   1751             channel_data.request.connection.append_frame_data(frame_data)
   1752         finally:
   1753             self._logical_channels_condition.release()
   1754 
   1755     def dispatch_message(self, message):
   1756         """Dispatches message. The reader thread calls this method.
   1757 
   1758         Args:
   1759             message: a message that contains encapsulated frame.
   1760         Raises:
   1761             PhysicalConnectionError: if the message contains physical
   1762                 connection level errors.
   1763             LogicalChannelError: if the message contains logical channel
   1764                 level errors.
   1765         """
   1766 
   1767         parser = _MuxFramePayloadParser(message)
   1768         try:
   1769             channel_id = parser.read_channel_id()
   1770         except ValueError, e:
   1771             raise PhysicalConnectionError(_DROP_CODE_CHANNEL_ID_TRUNCATED)
   1772         if channel_id == _CONTROL_CHANNEL_ID:
   1773             self._process_control_blocks(parser)
   1774         else:
   1775             self._process_logical_frame(channel_id, parser)
   1776 
   1777     def notify_worker_done(self, channel_id):
   1778         """Called when a worker has finished.
   1779 
   1780         Args:
   1781             channel_id: channel id corresponded with the worker.
   1782         """
   1783 
   1784         self._logger.debug('Worker for channel id %d terminated' % channel_id)
   1785         try:
   1786             self._logical_channels_condition.acquire()
   1787             if not channel_id in self._logical_channels:
   1788                 raise MuxUnexpectedException(
   1789                     'Channel id %d not found' % channel_id)
   1790             channel_data = self._logical_channels.pop(channel_id)
   1791         finally:
   1792             self._worker_done_notify_received = True
   1793             self._logical_channels_condition.notify()
   1794             self._logical_channels_condition.release()
   1795 
   1796         if not channel_data.request.server_terminated:
   1797             self._send_drop_channel(
   1798                 channel_id, code=channel_data.drop_code,
   1799                 message=channel_data.drop_message)
   1800 
   1801     def notify_reader_done(self):
   1802         """This method is called by the reader thread when the reader has
   1803         finished.
   1804         """
   1805 
   1806         self._logger.debug(
   1807             'Termiating all logical connections waiting for incoming data '
   1808             '...')
   1809         self._logical_channels_condition.acquire()
   1810         for channel_data in self._logical_channels.values():
   1811             try:
   1812                 channel_data.request.connection.set_read_state(
   1813                     _LogicalConnection.STATE_TERMINATED)
   1814             except Exception:
   1815                 self._logger.debug(traceback.format_exc())
   1816         self._logical_channels_condition.release()
   1817 
   1818     def notify_writer_done(self):
   1819         """This method is called by the writer thread when the writer has
   1820         finished.
   1821         """
   1822 
   1823         self._logger.debug(
   1824             'Termiating all logical connections waiting for write '
   1825             'completion ...')
   1826         self._logical_channels_condition.acquire()
   1827         for channel_data in self._logical_channels.values():
   1828             try:
   1829                 channel_data.request.connection.on_writer_done()
   1830             except Exception:
   1831                 self._logger.debug(traceback.format_exc())
   1832         self._logical_channels_condition.release()
   1833 
   1834     def fail_physical_connection(self, code, message):
   1835         """Fail the physical connection.
   1836 
   1837         Args:
   1838             code: drop reason code.
   1839             message: drop message.
   1840         """
   1841 
   1842         self._logger.debug('Failing the physical connection...')
   1843         self._send_drop_channel(_CONTROL_CHANNEL_ID, code, message)
   1844         self._writer.stop(common.STATUS_INTERNAL_ENDPOINT_ERROR)
   1845 
   1846     def fail_logical_channel(self, channel_id, code, message):
   1847         """Fail a logical channel.
   1848 
   1849         Args:
   1850             channel_id: channel id.
   1851             code: drop reason code.
   1852             message: drop message.
   1853         """
   1854 
   1855         self._logger.debug('Failing logical channel %d...' % channel_id)
   1856         try:
   1857             self._logical_channels_condition.acquire()
   1858             if channel_id in self._logical_channels:
   1859                 channel_data = self._logical_channels[channel_id]
   1860                 # Close the logical channel. notify_worker_done() will be
   1861                 # called later and it will send DropChannel.
   1862                 channel_data.drop_code = code
   1863                 channel_data.drop_message = message
   1864 
   1865                 channel_data.request.connection.set_read_state(
   1866                     _LogicalConnection.STATE_TERMINATED)
   1867                 channel_data.request.ws_stream.stop_sending()
   1868             else:
   1869                 self._send_drop_channel(channel_id, code, message)
   1870         finally:
   1871             self._logical_channels_condition.release()
   1872 
   1873 
   1874 def use_mux(request):
   1875     return hasattr(request, 'mux_processor') and (
   1876         request.mux_processor.is_active())
   1877 
   1878 
   1879 def start(request, dispatcher):
   1880     mux_handler = _MuxHandler(request, dispatcher)
   1881     mux_handler.start()
   1882 
   1883     mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
   1884                                   _INITIAL_QUOTA_FOR_CLIENT)
   1885 
   1886     mux_handler.wait_until_done()
   1887 
   1888 
   1889 # vi:sts=4 sw=4 et
   1890