Home | History | Annotate | Download | only in mod_pywebsocket
      1 # Copyright 2012, Google Inc.
      2 # All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #     * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #     * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #     * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 
     31 """This file provides classes and helper functions for multiplexing extension.
     32 
     33 Specification:
     34 http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-03
     35 """
     36 
     37 
     38 import collections
     39 import copy
     40 import email
     41 import email.parser
     42 import logging
     43 import math
     44 import struct
     45 import threading
     46 import traceback
     47 
     48 from mod_pywebsocket import common
     49 from mod_pywebsocket import handshake
     50 from mod_pywebsocket import util
     51 from mod_pywebsocket._stream_base import BadOperationException
     52 from mod_pywebsocket._stream_base import ConnectionTerminatedException
     53 from mod_pywebsocket._stream_hybi import Frame
     54 from mod_pywebsocket._stream_hybi import Stream
     55 from mod_pywebsocket._stream_hybi import StreamOptions
     56 from mod_pywebsocket._stream_hybi import create_binary_frame
     57 from mod_pywebsocket._stream_hybi import create_closing_handshake_body
     58 from mod_pywebsocket._stream_hybi import create_header
     59 from mod_pywebsocket._stream_hybi import parse_frame
     60 from mod_pywebsocket.handshake import hybi
     61 
     62 
     63 _CONTROL_CHANNEL_ID = 0
     64 _DEFAULT_CHANNEL_ID = 1
     65 
     66 _MUX_OPCODE_ADD_CHANNEL_REQUEST = 0
     67 _MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1
     68 _MUX_OPCODE_FLOW_CONTROL = 2
     69 _MUX_OPCODE_DROP_CHANNEL = 3
     70 _MUX_OPCODE_NEW_CHANNEL_SLOT = 4
     71 
     72 _MAX_CHANNEL_ID = 2 ** 29 - 1
     73 
     74 _INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64
     75 _INITIAL_QUOTA_FOR_CLIENT = 8 * 1024
     76 
     77 # We need only these status code for now.
     78 _HTTP_BAD_RESPONSE_MESSAGES = {
     79     common.HTTP_STATUS_BAD_REQUEST: 'Bad Request',
     80 }
     81 
     82 
     83 class MuxUnexpectedException(Exception):
     84     """Exception in handling multiplexing extension."""
     85     pass
     86 
     87 
     88 # Temporary
     89 class MuxNotImplementedException(Exception):
     90     """Raised when a flow enters unimplemented code path."""
     91     pass
     92 
     93 
     94 class InvalidMuxFrameException(Exception):
     95     """Raised when an invalid multiplexed frame received."""
     96     pass
     97 
     98 
     99 class InvalidMuxControlBlockException(Exception):
    100     """Raised when an invalid multiplexing control block received."""
    101     pass
    102 
    103 
    104 class LogicalConnectionClosedException(Exception):
    105     """Raised when logical connection is gracefully closed."""
    106     pass
    107 
    108 
    109 def _encode_channel_id(channel_id):
    110     if channel_id < 0:
    111         raise ValueError('Channel id %d must not be negative' % channel_id)
    112 
    113     if channel_id < 2 ** 7:
    114         return chr(channel_id)
    115     if channel_id < 2 ** 14:
    116         return struct.pack('!H', 0x8000 + channel_id)
    117     if channel_id < 2 ** 21:
    118         first = chr(0xc0 + (channel_id >> 16))
    119         return first + struct.pack('!H', channel_id & 0xffff)
    120     if channel_id < 2 ** 29:
    121         return struct.pack('!L', 0xe0000000 + channel_id)
    122 
    123     raise ValueError('Channel id %d is too large' % channel_id)
    124 
    125 
    126 def _size_of_number_in_bytes_minus_1(number):
    127     # Calculate the minimum number of bytes minus 1 that are required to store
    128     # the data.
    129     if number < 0:
    130         raise ValueError('Invalid number: %d' % number)
    131     elif number < 2 ** 8:
    132         return 0
    133     elif number < 2 ** 16:
    134         return 1
    135     elif number < 2 ** 24:
    136         return 2
    137     elif number < 2 ** 32:
    138         return 3
    139     else:
    140         raise ValueError('Invalid number %d' % number)
    141 
    142 
    143 def _encode_number(number):
    144     if number < 2 ** 8:
    145         return chr(number)
    146     elif number < 2 ** 16:
    147         return struct.pack('!H', number)
    148     elif number < 2 ** 24:
    149         return chr(number >> 16) + struct.pack('!H', number & 0xffff)
    150     else:
    151         return struct.pack('!L', number)
    152 
    153 
    154 def _create_control_block_length_value(channel_id, opcode, flags, value):
    155     """Creates a control block that consists of objective channel id, opcode,
    156     flags, encoded length of opcode specific value, and the value.
    157     Most of control blocks have this structure.
    158 
    159     Args:
    160         channel_id: objective channel id.
    161         opcode: opcode of the control block.
    162         flags: 3bit opcode specific flags.
    163         value: opcode specific data.
    164     """
    165 
    166     if channel_id < 0 or channel_id > _MAX_CHANNEL_ID:
    167         raise ValueError('Invalid channel id: %d' % channel_id)
    168     if (opcode != _MUX_OPCODE_ADD_CHANNEL_REQUEST and
    169         opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE and
    170         opcode != _MUX_OPCODE_DROP_CHANNEL):
    171         raise ValueError('Invalid opcode: %d' % opcode)
    172     if flags < 0 or flags > 7:
    173         raise ValueError('Invalid flags: %x' % flags)
    174     length = len(value)
    175     if length < 0 or length > 2 ** 32 - 1:
    176         raise ValueError('Invalid length: %d' % length)
    177 
    178     # The first byte consists of opcode, opcode specific flags, and size of
    179     # the size of value in bytes minus 1.
    180     bytes_of_length = _size_of_number_in_bytes_minus_1(length)
    181     first_byte = (opcode << 5) | (flags << 2) | bytes_of_length
    182 
    183     encoded_length = _encode_number(length)
    184 
    185     return (chr(first_byte) + _encode_channel_id(channel_id) +
    186             encoded_length + value)
    187 
    188 
    189 def _create_add_channel_response(channel_id, encoded_handshake,
    190                                  encoding=0, rejected=False,
    191                                  outer_frame_mask=False):
    192     if encoding != 0 and encoding != 1:
    193         raise ValueError('Invalid encoding %d' % encoding)
    194 
    195     flags = (rejected << 2) | encoding
    196     block = _create_control_block_length_value(
    197         channel_id, _MUX_OPCODE_ADD_CHANNEL_RESPONSE, flags, encoded_handshake)
    198     payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
    199     return create_binary_frame(payload, mask=outer_frame_mask)
    200 
    201 
    202 def _create_drop_channel(channel_id, reason='', mux_error=False,
    203                          outer_frame_mask=False):
    204     if not mux_error and len(reason) > 0:
    205         raise ValueError('Reason must be empty if mux_error is False')
    206 
    207     flags = mux_error << 2
    208     block = _create_control_block_length_value(
    209         channel_id, _MUX_OPCODE_DROP_CHANNEL, flags, reason)
    210     payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + block
    211     return create_binary_frame(payload, mask=outer_frame_mask)
    212 
    213 
    214 def _create_flow_control(channel_id, replenished_quota,
    215                          outer_frame_mask=False):
    216     if replenished_quota < 0 or replenished_quota >= 2 ** 32:
    217         raise ValueError('Invalid quota: %d' % replenished_quota)
    218     first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) |
    219                   _size_of_number_in_bytes_minus_1(replenished_quota))
    220     payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
    221                _encode_channel_id(channel_id) +
    222                _encode_number(replenished_quota))
    223     return create_binary_frame(payload, mask=outer_frame_mask)
    224 
    225 
    226 def _create_new_channel_slot(slots, send_quota, outer_frame_mask=False):
    227     if slots < 0 or slots >= 2 ** 32:
    228         raise ValueError('Invalid number of slots: %d' % slots)
    229     if send_quota < 0 or send_quota >= 2 ** 32:
    230         raise ValueError('Invalid send quota: %d' % send_quota)
    231     slots_size = _size_of_number_in_bytes_minus_1(slots)
    232     send_quota_size = _size_of_number_in_bytes_minus_1(send_quota)
    233 
    234     first_byte = ((_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) |
    235                   (slots_size << 2) | send_quota_size)
    236     payload = (_encode_channel_id(_CONTROL_CHANNEL_ID) + chr(first_byte) +
    237                _encode_number(slots) + _encode_number(send_quota))
    238     return create_binary_frame(payload, mask=outer_frame_mask)
    239 
    240 
    241 def _parse_request_text(request_text):
    242     request_line, header_lines = request_text.split('\r\n', 1)
    243 
    244     words = request_line.split(' ')
    245     if len(words) != 3:
    246         raise ValueError('Bad Request-Line syntax %r' % request_line)
    247     [command, path, version] = words
    248     if version != 'HTTP/1.1':
    249         raise ValueError('Bad request version %r' % version)
    250 
    251     # email.parser.Parser() parses RFC 2822 (RFC 822) style headers.
    252     # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers
    253     # RFC 822.
    254     headers = email.parser.Parser().parsestr(header_lines)
    255     return command, path, version, headers
    256 
    257 
    258 class _ControlBlock(object):
    259     """A structure that holds parsing result of multiplexing control block.
    260     Control block specific attributes will be added by _MuxFramePayloadParser.
    261     (e.g. encoded_handshake will be added for AddChannelRequest and
    262     AddChannelResponse)
    263     """
    264 
    265     def __init__(self, opcode):
    266         self.opcode = opcode
    267 
    268 
    269 class _MuxFramePayloadParser(object):
    270     """A class that parses multiplexed frame payload."""
    271 
    272     def __init__(self, payload):
    273         self._data = payload
    274         self._read_position = 0
    275         self._logger = util.get_class_logger(self)
    276 
    277     def read_channel_id(self):
    278         """Reads channel id.
    279 
    280         Raises:
    281             InvalidMuxFrameException: when the payload doesn't contain
    282                 valid channel id.
    283         """
    284 
    285         remaining_length = len(self._data) - self._read_position
    286         pos = self._read_position
    287         if remaining_length == 0:
    288             raise InvalidMuxFrameException('No channel id found')
    289 
    290         channel_id = ord(self._data[pos])
    291         channel_id_length = 1
    292         if channel_id & 0xe0 == 0xe0:
    293             if remaining_length < 4:
    294                 raise InvalidMuxFrameException(
    295                     'Invalid channel id format')
    296             channel_id = struct.unpack('!L',
    297                                        self._data[pos:pos+4])[0] & 0x1fffffff
    298             channel_id_length = 4
    299         elif channel_id & 0xc0 == 0xc0:
    300             if remaining_length < 3:
    301                 raise InvalidMuxFrameException(
    302                     'Invalid channel id format')
    303             channel_id = (((channel_id & 0x1f) << 16) +
    304                           struct.unpack('!H', self._data[pos+1:pos+3])[0])
    305             channel_id_length = 3
    306         elif channel_id & 0x80 == 0x80:
    307             if remaining_length < 2:
    308                 raise InvalidMuxFrameException(
    309                     'Invalid channel id format')
    310             channel_id = struct.unpack('!H',
    311                                        self._data[pos:pos+2])[0] & 0x3fff
    312             channel_id_length = 2
    313         self._read_position += channel_id_length
    314 
    315         return channel_id
    316 
    317     def read_inner_frame(self):
    318         """Reads an inner frame.
    319 
    320         Raises:
    321             InvalidMuxFrameException: when the inner frame is invalid.
    322         """
    323 
    324         if len(self._data) == self._read_position:
    325             raise InvalidMuxFrameException('No inner frame bits found')
    326         bits = ord(self._data[self._read_position])
    327         self._read_position += 1
    328         fin = (bits & 0x80) == 0x80
    329         rsv1 = (bits & 0x40) == 0x40
    330         rsv2 = (bits & 0x20) == 0x20
    331         rsv3 = (bits & 0x10) == 0x10
    332         opcode = bits & 0xf
    333         payload = self.remaining_data()
    334         # Consume rest of the message which is payload data of the original
    335         # frame.
    336         self._read_position = len(self._data)
    337         return fin, rsv1, rsv2, rsv3, opcode, payload
    338 
    339     def _read_number(self, size):
    340         if self._read_position + size > len(self._data):
    341             raise InvalidMuxControlBlock(
    342                 'Cannot read %d byte(s) number' % size)
    343 
    344         pos = self._read_position
    345         if size == 1:
    346             self._read_position += 1
    347             return ord(self._data[pos])
    348         elif size == 2:
    349             self._read_position += 2
    350             return struct.unpack('!H', self._data[pos:pos+2])[0]
    351         elif size == 3:
    352             self._read_position += 3
    353             return ((ord(self._data[pos]) << 16)
    354                     + struct.unpack('!H', self._data[pos+1:pos+3])[0])
    355         elif size == 4:
    356             self._read_position += 4
    357             return struct.unpack('!L', self._data[pos:pos+4])[0]
    358         else:
    359             raise InvalidMuxControlBlockException(
    360                 'Cannot read %d byte(s) number' % size)
    361 
    362     def _read_opcode_specific_data(self, opcode, size_of_size):
    363         """Reads opcode specific data that consists of followings:
    364             - the size of the opcode specific data (1-4 bytes)
    365             - the opcode specific data
    366         AddChannelRequest and DropChannel have this structure.
    367         """
    368 
    369         if self._read_position + size_of_size > len(self._data):
    370             raise InvalidMuxControlBlockException(
    371                 'No size field for opcode %d' % opcode)
    372 
    373         size = self._read_number(size_of_size)
    374 
    375         pos = self._read_position
    376         if pos + size > len(self._data):
    377             raise InvalidMuxControlBlockException(
    378                 'No data field for opcode %d (%d + %d > %d)' %
    379                 (opcode, pos, size, len(self._data)))
    380 
    381         specific_data = self._data[pos:pos+size]
    382         self._read_position += size
    383         return specific_data
    384 
    385     def _read_add_channel_request(self, first_byte, control_block):
    386         reserved = (first_byte >> 4) & 0x1
    387         encoding = (first_byte >> 2) & 0x3
    388         size_of_handshake_size = (first_byte & 0x3) + 1
    389 
    390         control_block.channel_id = self.read_channel_id()
    391         encoded_handshake = self._read_opcode_specific_data(
    392                                 _MUX_OPCODE_ADD_CHANNEL_REQUEST,
    393                                 size_of_handshake_size)
    394         control_block.encoding = encoding
    395         control_block.encoded_handshake = encoded_handshake
    396         return control_block
    397 
    398     def _read_flow_control(self, first_byte, control_block):
    399         quota_size = (first_byte & 0x3) + 1
    400         control_block.channel_id = self.read_channel_id()
    401         control_block.send_quota = self._read_number(quota_size)
    402         return control_block
    403 
    404     def _read_drop_channel(self, first_byte, control_block):
    405         mux_error = (first_byte >> 4) & 0x1
    406         reserved = (first_byte >> 2) & 0x3
    407         size_of_reason_size = (first_byte & 0x3) + 1
    408 
    409         control_block.channel_id = self.read_channel_id()
    410         reason = self._read_opcode_specific_data(
    411                      _MUX_OPCODE_ADD_CHANNEL_RESPONSE,
    412                      size_of_reason_size)
    413         if mux_error and len(reason) > 0:
    414             raise InvalidMuxControlBlockException(
    415                 'Reason must be empty when F bit is set')
    416         control_block.mux_error = mux_error
    417         control_block.reason = reason
    418         return control_block
    419 
    420     def _read_new_channel_slot(self, first_byte, control_block):
    421         # TODO(bashi): Implement
    422         raise MuxNotImplementedException('NewChannelSlot is not implemented')
    423 
    424     def read_control_blocks(self):
    425         """Reads control block(s).
    426 
    427         Raises:
    428            InvalidMuxControlBlock: when the payload contains invalid control
    429                block(s).
    430            StopIteration: when no control blocks left.
    431         """
    432 
    433         while self._read_position < len(self._data):
    434             if self._read_position >= len(self._data):
    435                 raise InvalidMuxControlBlockException(
    436                     'No control opcode found')
    437             first_byte = ord(self._data[self._read_position])
    438             self._read_position += 1
    439             opcode = (first_byte >> 5) & 0x7
    440             control_block = _ControlBlock(opcode=opcode)
    441             if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
    442                 yield self._read_add_channel_request(first_byte, control_block)
    443             elif opcode == _MUX_OPCODE_FLOW_CONTROL:
    444                 yield self._read_flow_control(first_byte, control_block)
    445             elif opcode == _MUX_OPCODE_DROP_CHANNEL:
    446                 yield self._read_drop_channel(first_byte, control_block)
    447             elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
    448                 yield self._read_new_channel_slot(first_byte, control_block)
    449             else:
    450                 raise InvalidMuxControlBlockException(
    451                     'Invalid opcode %d' % opcode)
    452         assert self._read_position == len(self._data)
    453         raise StopIteration
    454 
    455     def remaining_data(self):
    456         """Returns remaining data."""
    457 
    458         return self._data[self._read_position:]
    459 
    460 
    461 class _LogicalRequest(object):
    462     """Mimics mod_python request."""
    463 
    464     def __init__(self, channel_id, command, path, headers, connection):
    465         """Constructs an instance.
    466 
    467         Args:
    468             channel_id: the channel id of the logical channel.
    469             command: HTTP request command.
    470             path: HTTP request path.
    471             headers: HTTP headers.
    472             connection: _LogicalConnection instance.
    473         """
    474 
    475         self.channel_id = channel_id
    476         self.method = command
    477         self.uri = path
    478         self.headers_in = headers
    479         self.connection = connection
    480         self.server_terminated = False
    481         self.client_terminated = False
    482 
    483     def is_https(self):
    484         """Mimics request.is_https(). Returns False because this method is
    485         used only by old protocols (hixie and hybi00).
    486         """
    487 
    488         return False
    489 
    490 
    491 class _LogicalConnection(object):
    492     """Mimics mod_python mp_conn."""
    493 
    494     # For details, see the comment of set_read_state().
    495     STATE_ACTIVE = 1
    496     STATE_GRACEFULLY_CLOSED = 2
    497     STATE_TERMINATED = 3
    498 
    499     def __init__(self, mux_handler, channel_id):
    500         """Constructs an instance.
    501 
    502         Args:
    503             mux_handler: _MuxHandler instance.
    504             channel_id: channel id of this connection.
    505         """
    506 
    507         self._mux_handler = mux_handler
    508         self._channel_id = channel_id
    509         self._incoming_data = ''
    510         self._write_condition = threading.Condition()
    511         self._waiting_write_completion = False
    512         self._read_condition = threading.Condition()
    513         self._read_state = self.STATE_ACTIVE
    514 
    515     def get_local_addr(self):
    516         """Getter to mimic mp_conn.local_addr."""
    517 
    518         return self._mux_handler.physical_connection.get_local_addr()
    519     local_addr = property(get_local_addr)
    520 
    521     def get_remote_addr(self):
    522         """Getter to mimic mp_conn.remote_addr."""
    523 
    524         return self._mux_handler.physical_connection.get_remote_addr()
    525     remote_addr = property(get_remote_addr)
    526 
    527     def get_memorized_lines(self):
    528         """Gets memorized lines. Not supported."""
    529 
    530         raise MuxUnexpectedException('_LogicalConnection does not support '
    531                                      'get_memorized_lines')
    532 
    533     def write(self, data):
    534         """Writes data. mux_handler sends data asynchronously. The caller will
    535         be suspended until write done.
    536 
    537         Args:
    538             data: data to be written.
    539 
    540         Raises:
    541             MuxUnexpectedException: when called before finishing the previous
    542                 write.
    543         """
    544 
    545         try:
    546             self._write_condition.acquire()
    547             if self._waiting_write_completion:
    548                 raise MuxUnexpectedException(
    549                     'Logical connection %d is already waiting the completion '
    550                     'of write' % self._channel_id)
    551 
    552             self._waiting_write_completion = True
    553             self._mux_handler.send_data(self._channel_id, data)
    554             self._write_condition.wait()
    555         finally:
    556             self._write_condition.release()
    557 
    558     def write_control_data(self, data):
    559         """Writes data via the control channel. Don't wait finishing write
    560         because this method can be called by mux dispatcher.
    561 
    562         Args:
    563             data: data to be written.
    564         """
    565 
    566         self._mux_handler.send_control_data(data)
    567 
    568     def notify_write_done(self):
    569         """Called when sending data is completed."""
    570 
    571         try:
    572             self._write_condition.acquire()
    573             if not self._waiting_write_completion:
    574                 raise MuxUnexpectedException(
    575                     'Invalid call of notify_write_done for logical connection'
    576                     ' %d' % self._channel_id)
    577             self._waiting_write_completion = False
    578             self._write_condition.notify()
    579         finally:
    580             self._write_condition.release()
    581 
    582     def append_frame_data(self, frame_data):
    583         """Appends incoming frame data. Called when mux_handler dispatches
    584         frame data to the corresponding application.
    585 
    586         Args:
    587             frame_data: incoming frame data.
    588         """
    589 
    590         self._read_condition.acquire()
    591         self._incoming_data += frame_data
    592         self._read_condition.notify()
    593         self._read_condition.release()
    594 
    595     def read(self, length):
    596         """Reads data. Blocks until enough data has arrived via physical
    597         connection.
    598 
    599         Args:
    600             length: length of data to be read.
    601         Raises:
    602             LogicalConnectionClosedException: when closing handshake for this
    603                 logical channel has been received.
    604             ConnectionTerminatedException: when the physical connection has
    605                 closed, or an error is caused on the reader thread.
    606         """
    607 
    608         self._read_condition.acquire()
    609         while (self._read_state == self.STATE_ACTIVE and
    610                len(self._incoming_data) < length):
    611             self._read_condition.wait()
    612 
    613         try:
    614             if self._read_state == self.STATE_GRACEFULLY_CLOSED:
    615                 raise LogicalConnectionClosedException(
    616                     'Logical channel %d has closed.' % self._channel_id)
    617             elif self._read_state == self.STATE_TERMINATED:
    618                 raise ConnectionTerminatedException(
    619                     'Receiving %d byte failed. Logical channel (%d) closed' %
    620                     (length, self._channel_id))
    621 
    622             value = self._incoming_data[:length]
    623             self._incoming_data = self._incoming_data[length:]
    624         finally:
    625             self._read_condition.release()
    626 
    627         return value
    628 
    629     def set_read_state(self, new_state):
    630         """Sets the state of this connection. Called when an event for this
    631         connection has occurred.
    632 
    633         Args:
    634             new_state: state to be set. new_state must be one of followings:
    635             - STATE_GRACEFULLY_CLOSED: when closing handshake for this
    636                 connection has been received.
    637             - STATE_TERMINATED: when the physical connection has closed or
    638                 DropChannel of this connection has received.
    639         """
    640 
    641         self._read_condition.acquire()
    642         self._read_state = new_state
    643         self._read_condition.notify()
    644         self._read_condition.release()
    645 
    646 
    647 class _LogicalStream(Stream):
    648     """Mimics the Stream class. This class interprets multiplexed WebSocket
    649     frames.
    650     """
    651 
    652     def __init__(self, request, send_quota, receive_quota):
    653         """Constructs an instance.
    654 
    655         Args:
    656             request: _LogicalRequest instance.
    657             send_quota: Initial send quota.
    658             receive_quota: Initial receive quota.
    659         """
    660 
    661         # TODO(bashi): Support frame filters.
    662         stream_options = StreamOptions()
    663         # Physical stream is responsible for masking.
    664         stream_options.unmask_receive = False
    665         # Control frames can be fragmented on logical channel.
    666         stream_options.allow_fragmented_control_frame = True
    667         Stream.__init__(self, request, stream_options)
    668         self._send_quota = send_quota
    669         self._send_quota_condition = threading.Condition()
    670         self._receive_quota = receive_quota
    671         self._write_inner_frame_semaphore = threading.Semaphore()
    672 
    673     def _create_inner_frame(self, opcode, payload, end=True):
    674         # TODO(bashi): Support extensions that use reserved bits.
    675         first_byte = (end << 7) | opcode
    676         return (_encode_channel_id(self._request.channel_id) +
    677                 chr(first_byte) + payload)
    678 
    679     def _write_inner_frame(self, opcode, payload, end=True):
    680         payload_length = len(payload)
    681         write_position = 0
    682 
    683         try:
    684             # An inner frame will be fragmented if there is no enough send
    685             # quota. This semaphore ensures that fragmented inner frames are
    686             # sent in order on the logical channel.
    687             # Note that frames that come from other logical channels or
    688             # multiplexing control blocks can be inserted between fragmented
    689             # inner frames on the physical channel.
    690             self._write_inner_frame_semaphore.acquire()
    691             while write_position < payload_length:
    692                 try:
    693                     self._send_quota_condition.acquire()
    694                     while self._send_quota == 0:
    695                         self._logger.debug(
    696                             'No quota. Waiting FlowControl message for %d.' %
    697                             self._request.channel_id)
    698                         self._send_quota_condition.wait()
    699 
    700                     remaining = payload_length - write_position
    701                     write_length = min(self._send_quota, remaining)
    702                     inner_frame_end = (
    703                         end and
    704                         (write_position + write_length == payload_length))
    705 
    706                     inner_frame = self._create_inner_frame(
    707                         opcode,
    708                         payload[write_position:write_position+write_length],
    709                         inner_frame_end)
    710                     frame_data = self._writer.build(
    711                         inner_frame, end=True, binary=True)
    712                     self._send_quota -= write_length
    713                     self._logger.debug('Consumed quota=%d, remaining=%d' %
    714                                        (write_length, self._send_quota))
    715                 finally:
    716                     self._send_quota_condition.release()
    717 
    718                 # Writing data will block the worker so we need to release
    719                 # _send_quota_condition before writing.
    720                 self._logger.debug('Sending inner frame: %r' % frame_data)
    721                 self._request.connection.write(frame_data)
    722                 write_position += write_length
    723 
    724                 opcode = common.OPCODE_CONTINUATION
    725 
    726         except ValueError, e:
    727             raise BadOperationException(e)
    728         finally:
    729             self._write_inner_frame_semaphore.release()
    730 
    731     def replenish_send_quota(self, send_quota):
    732         """Replenish send quota."""
    733 
    734         self._send_quota_condition.acquire()
    735         self._send_quota += send_quota
    736         self._logger.debug('Replenished send quota for channel id %d: %d' %
    737                            (self._request.channel_id, self._send_quota))
    738         self._send_quota_condition.notify()
    739         self._send_quota_condition.release()
    740 
    741     def consume_receive_quota(self, amount):
    742         """Consumes receive quota. Returns False on failure."""
    743 
    744         if self._receive_quota < amount:
    745             self._logger.debug('Violate quota on channel id %d: %d < %d' %
    746                                (self._request.channel_id,
    747                                 self._receive_quota, amount))
    748             return False
    749         self._receive_quota -= amount
    750         return True
    751 
    752     def send_message(self, message, end=True, binary=False):
    753         """Override Stream.send_message."""
    754 
    755         if self._request.server_terminated:
    756             raise BadOperationException(
    757                 'Requested send_message after sending out a closing handshake')
    758 
    759         if binary and isinstance(message, unicode):
    760             raise BadOperationException(
    761                 'Message for binary frame must be instance of str')
    762 
    763         if binary:
    764             opcode = common.OPCODE_BINARY
    765         else:
    766             opcode = common.OPCODE_TEXT
    767             message = message.encode('utf-8')
    768 
    769         self._write_inner_frame(opcode, message, end)
    770 
    771     def _receive_frame(self):
    772         """Overrides Stream._receive_frame.
    773 
    774         In addition to call Stream._receive_frame, this method adds the amount
    775         of payload to receiving quota and sends FlowControl to the client.
    776         We need to do it here because Stream.receive_message() handles
    777         control frames internally.
    778         """
    779 
    780         opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self)
    781         amount = len(payload)
    782         self._receive_quota += amount
    783         frame_data = _create_flow_control(self._request.channel_id,
    784                                           amount)
    785         self._logger.debug('Sending flow control for %d, replenished=%d' %
    786                            (self._request.channel_id, amount))
    787         self._request.connection.write_control_data(frame_data)
    788         return opcode, payload, fin, rsv1, rsv2, rsv3
    789 
    790     def receive_message(self):
    791         """Overrides Stream.receive_message."""
    792 
    793         # Just call Stream.receive_message(), but catch
    794         # LogicalConnectionClosedException, which is raised when the logical
    795         # connection has closed gracefully.
    796         try:
    797             return Stream.receive_message(self)
    798         except LogicalConnectionClosedException, e:
    799             self._logger.debug('%s', e)
    800             return None
    801 
    802     def _send_closing_handshake(self, code, reason):
    803         """Overrides Stream._send_closing_handshake."""
    804 
    805         body = create_closing_handshake_body(code, reason)
    806         self._logger.debug('Sending closing handshake for %d: (%r, %r)' %
    807                            (self._request.channel_id, code, reason))
    808         self._write_inner_frame(common.OPCODE_CLOSE, body, end=True)
    809 
    810         self._request.server_terminated = True
    811 
    812     def send_ping(self, body=''):
    813         """Overrides Stream.send_ping"""
    814 
    815         self._logger.debug('Sending ping on logical channel %d: %r' %
    816                            (self._request.channel_id, body))
    817         self._write_inner_frame(common.OPCODE_PING, body, end=True)
    818 
    819         self._ping_queue.append(body)
    820 
    821     def _send_pong(self, body):
    822         """Overrides Stream._send_pong"""
    823 
    824         self._logger.debug('Sending pong on logical channel %d: %r' %
    825                            (self._request.channel_id, body))
    826         self._write_inner_frame(common.OPCODE_PONG, body, end=True)
    827 
    828     def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''):
    829         """Overrides Stream.close_connection."""
    830 
    831         # TODO(bashi): Implement
    832         self._logger.debug('Closing logical connection %d' %
    833                            self._request.channel_id)
    834         self._request.server_terminated = True
    835 
    836     def _drain_received_data(self):
    837         """Overrides Stream._drain_received_data. Nothing need to be done for
    838         logical channel.
    839         """
    840 
    841         pass
    842 
    843 
    844 class _OutgoingData(object):
    845     """A structure that holds data to be sent via physical connection and
    846     origin of the data.
    847     """
    848 
    849     def __init__(self, channel_id, data):
    850         self.channel_id = channel_id
    851         self.data = data
    852 
    853 
    854 class _PhysicalConnectionWriter(threading.Thread):
    855     """A thread that is responsible for writing data to physical connection.
    856 
    857     TODO(bashi): Make sure there is no thread-safety problem when the reader
    858     thread reads data from the same socket at a time.
    859     """
    860 
    861     def __init__(self, mux_handler):
    862         """Constructs an instance.
    863 
    864         Args:
    865             mux_handler: _MuxHandler instance.
    866         """
    867 
    868         threading.Thread.__init__(self)
    869         self._logger = util.get_class_logger(self)
    870         self._mux_handler = mux_handler
    871         self.setDaemon(True)
    872         self._stop_requested = False
    873         self._deque = collections.deque()
    874         self._deque_condition = threading.Condition()
    875 
    876     def put_outgoing_data(self, data):
    877         """Puts outgoing data.
    878 
    879         Args:
    880             data: _OutgoingData instance.
    881 
    882         Raises:
    883             BadOperationException: when the thread has been requested to
    884                 terminate.
    885         """
    886 
    887         try:
    888             self._deque_condition.acquire()
    889             if self._stop_requested:
    890                 raise BadOperationException('Cannot write data anymore')
    891 
    892             self._deque.append(data)
    893             self._deque_condition.notify()
    894         finally:
    895             self._deque_condition.release()
    896 
    897     def _write_data(self, outgoing_data):
    898         try:
    899             self._mux_handler.physical_connection.write(outgoing_data.data)
    900         except Exception, e:
    901             util.prepend_message_to_exception(
    902                 'Failed to send message to %r: ' %
    903                 (self._mux_handler.physical_connection.remote_addr,), e)
    904             raise
    905 
    906         # TODO(bashi): It would be better to block the thread that sends
    907         # control data as well.
    908         if outgoing_data.channel_id != _CONTROL_CHANNEL_ID:
    909             self._mux_handler.notify_write_done(outgoing_data.channel_id)
    910 
    911     def run(self):
    912         self._deque_condition.acquire()
    913         while not self._stop_requested:
    914             if len(self._deque) == 0:
    915                 self._deque_condition.wait()
    916                 continue
    917 
    918             outgoing_data = self._deque.popleft()
    919             self._deque_condition.release()
    920             self._write_data(outgoing_data)
    921             self._deque_condition.acquire()
    922 
    923         # Flush deque
    924         try:
    925             while len(self._deque) > 0:
    926                 outgoing_data = self._deque.popleft()
    927                 self._write_data(outgoing_data)
    928         finally:
    929             self._deque_condition.release()
    930 
    931     def stop(self):
    932         """Stops the writer thread."""
    933 
    934         self._deque_condition.acquire()
    935         self._stop_requested = True
    936         self._deque_condition.notify()
    937         self._deque_condition.release()
    938 
    939 
    940 class _PhysicalConnectionReader(threading.Thread):
    941     """A thread that is responsible for reading data from physical connection.
    942     """
    943 
    944     def __init__(self, mux_handler):
    945         """Constructs an instance.
    946 
    947         Args:
    948             mux_handler: _MuxHandler instance.
    949         """
    950 
    951         threading.Thread.__init__(self)
    952         self._logger = util.get_class_logger(self)
    953         self._mux_handler = mux_handler
    954         self.setDaemon(True)
    955 
    956     def run(self):
    957         while True:
    958             try:
    959                 physical_stream = self._mux_handler.physical_stream
    960                 message = physical_stream.receive_message()
    961                 if message is None:
    962                     break
    963                 opcode = physical_stream.get_last_received_opcode()
    964                 if opcode == common.OPCODE_TEXT:
    965                     raise MuxUnexpectedException(
    966                         'Received a text message on physical connection')
    967             except ConnectionTerminatedException, e:
    968                 self._logger.debug('%s', e)
    969                 break
    970 
    971             try:
    972                 self._mux_handler.dispatch_message(message)
    973             except Exception, e:
    974                 self._logger.debug(traceback.format_exc())
    975                 break
    976 
    977         self._mux_handler.notify_reader_done()
    978 
    979 
    980 class _Worker(threading.Thread):
    981     """A thread that is responsible for running the corresponding application
    982     handler.
    983     """
    984 
    985     def __init__(self, mux_handler, request):
    986         """Constructs an instance.
    987 
    988         Args:
    989             mux_handler: _MuxHandler instance.
    990             request: _LogicalRequest instance.
    991         """
    992 
    993         threading.Thread.__init__(self)
    994         self._logger = util.get_class_logger(self)
    995         self._mux_handler = mux_handler
    996         self._request = request
    997         self.setDaemon(True)
    998 
    999     def run(self):
   1000         self._logger.debug('Logical channel worker started. (id=%d)' %
   1001                            self._request.channel_id)
   1002         try:
   1003             # Non-critical exceptions will be handled by dispatcher.
   1004             self._mux_handler.dispatcher.transfer_data(self._request)
   1005         finally:
   1006             self._mux_handler.notify_worker_done(self._request.channel_id)
   1007 
   1008 
   1009 class _MuxHandshaker(hybi.Handshaker):
   1010     """Opening handshake processor for multiplexing."""
   1011 
   1012     def __init__(self, request, dispatcher, send_quota, receive_quota):
   1013         """Constructs an instance.
   1014         Args:
   1015             request: _LogicalRequest instance.
   1016             dispatcher: Dispatcher instance (dispatch.Dispatcher).
   1017             send_quota: Initial send quota.
   1018             receive_quota: Initial receive quota.
   1019         """
   1020 
   1021         hybi.Handshaker.__init__(self, request, dispatcher)
   1022         self._send_quota = send_quota
   1023         self._receive_quota = receive_quota
   1024 
   1025     def _create_stream(self, stream_options):
   1026         """Override hybi.Handshaker._create_stream."""
   1027 
   1028         self._logger.debug('Creating logical stream for %d' %
   1029                            self._request.channel_id)
   1030         return _LogicalStream(self._request, self._send_quota,
   1031                               self._receive_quota)
   1032 
   1033     def _send_handshake(self, accept):
   1034         """Override hybi.Handshaker._send_handshake."""
   1035 
   1036         # Don't send handshake response for the default channel
   1037         if self._request.channel_id == _DEFAULT_CHANNEL_ID:
   1038             return
   1039 
   1040         handshake_response = self._create_handshake_response(accept)
   1041         frame_data = _create_add_channel_response(
   1042                          self._request.channel_id,
   1043                          handshake_response)
   1044         self._logger.debug('Sending handshake response for %d: %r' %
   1045                            (self._request.channel_id, frame_data))
   1046         self._request.connection.write_control_data(frame_data)
   1047 
   1048 
   1049 class _LogicalChannelData(object):
   1050     """A structure that holds information about logical channel.
   1051     """
   1052 
   1053     def __init__(self, request, worker):
   1054         self.request = request
   1055         self.worker = worker
   1056         self.mux_error_occurred = False
   1057         self.mux_error_reason = ''
   1058 
   1059 
   1060 class _MuxHandler(object):
   1061     """Multiplexing handler. When a handler starts, it launches three
   1062     threads; the reader thread, the writer thread, and a worker thread.
   1063 
   1064     The reader thread reads data from the physical stream, i.e., the
   1065     ws_stream object of the underlying websocket connection. The reader
   1066     thread interprets multiplexed frames and dispatches them to logical
   1067     channels. Methods of this class are mostly called by the reader thread.
   1068 
   1069     The writer thread sends multiplexed frames which are created by
   1070     logical channels via the physical connection.
   1071 
   1072     The worker thread launched at the starting point handles the
   1073     "Implicitly Opened Connection". If multiplexing handler receives
   1074     an AddChannelRequest and accepts it, the handler will launch a new worker
   1075     thread and dispatch the request to it.
   1076     """
   1077 
   1078     def __init__(self, request, dispatcher):
   1079         """Constructs an instance.
   1080 
   1081         Args:
   1082             request: mod_python request of the physical connection.
   1083             dispatcher: Dispatcher instance (dispatch.Dispatcher).
   1084         """
   1085 
   1086         self.original_request = request
   1087         self.dispatcher = dispatcher
   1088         self.physical_connection = request.connection
   1089         self.physical_stream = request.ws_stream
   1090         self._logger = util.get_class_logger(self)
   1091         self._logical_channels = {}
   1092         self._logical_channels_condition = threading.Condition()
   1093         # Holds client's initial quota
   1094         self._channel_slots = collections.deque()
   1095         self._worker_done_notify_received = False
   1096         self._reader = None
   1097         self._writer = None
   1098 
   1099     def start(self):
   1100         """Starts the handler.
   1101 
   1102         Raises:
   1103             MuxUnexpectedException: when the handler already started, or when
   1104                 opening handshake of the default channel fails.
   1105         """
   1106 
   1107         if self._reader or self._writer:
   1108             raise MuxUnexpectedException('MuxHandler already started')
   1109 
   1110         self._reader = _PhysicalConnectionReader(self)
   1111         self._writer = _PhysicalConnectionWriter(self)
   1112         self._reader.start()
   1113         self._writer.start()
   1114 
   1115         # Create "Implicitly Opened Connection".
   1116         logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID)
   1117         headers_in = copy.copy(self.original_request.headers_in)
   1118         # TODO(bashi): Support extensions
   1119         headers_in['Sec-WebSocket-Extensions'] = ''
   1120         logical_request = _LogicalRequest(_DEFAULT_CHANNEL_ID,
   1121                                           self.original_request.method,
   1122                                           self.original_request.uri,
   1123                                           headers_in,
   1124                                           logical_connection)
   1125         # Client's send quota for the implicitly opened connection is zero,
   1126         # but we will send FlowControl later so set the initial quota to
   1127         # _INITIAL_QUOTA_FOR_CLIENT.
   1128         self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT)
   1129         if not self._do_handshake_for_logical_request(
   1130             logical_request, send_quota=self.original_request.mux_quota):
   1131             raise MuxUnexpectedException(
   1132                 'Failed handshake on the default channel id')
   1133         self._add_logical_channel(logical_request)
   1134 
   1135         # Send FlowControl for the implicitly opened connection.
   1136         frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID,
   1137                                           _INITIAL_QUOTA_FOR_CLIENT)
   1138         logical_request.connection.write_control_data(frame_data)
   1139 
   1140     def add_channel_slots(self, slots, send_quota):
   1141         """Adds channel slots.
   1142 
   1143         Args:
   1144             slots: number of slots to be added.
   1145             send_quota: initial send quota for slots.
   1146         """
   1147 
   1148         self._channel_slots.extend([send_quota] * slots)
   1149         # Send NewChannelSlot to client.
   1150         frame_data = _create_new_channel_slot(slots, send_quota)
   1151         self.send_control_data(frame_data)
   1152 
   1153     def wait_until_done(self, timeout=None):
   1154         """Waits until all workers are done. Returns False when timeout has
   1155         occurred. Returns True on success.
   1156 
   1157         Args:
   1158             timeout: timeout in sec.
   1159         """
   1160 
   1161         self._logical_channels_condition.acquire()
   1162         try:
   1163             while len(self._logical_channels) > 0:
   1164                 self._logger.debug('Waiting workers(%d)...' %
   1165                                    len(self._logical_channels))
   1166                 self._worker_done_notify_received = False
   1167                 self._logical_channels_condition.wait(timeout)
   1168                 if not self._worker_done_notify_received:
   1169                     self._logger.debug('Waiting worker(s) timed out')
   1170                     return False
   1171 
   1172         finally:
   1173             self._logical_channels_condition.release()
   1174 
   1175         # Flush pending outgoing data
   1176         self._writer.stop()
   1177         self._writer.join()
   1178 
   1179         return True
   1180 
   1181     def notify_write_done(self, channel_id):
   1182         """Called by the writer thread when a write operation has done.
   1183 
   1184         Args:
   1185             channel_id: objective channel id.
   1186         """
   1187 
   1188         try:
   1189             self._logical_channels_condition.acquire()
   1190             if channel_id in self._logical_channels:
   1191                 channel_data = self._logical_channels[channel_id]
   1192                 channel_data.request.connection.notify_write_done()
   1193             else:
   1194                 self._logger.debug('Seems that logical channel for %d has gone'
   1195                                    % channel_id)
   1196         finally:
   1197             self._logical_channels_condition.release()
   1198 
   1199     def send_control_data(self, data):
   1200         """Sends data via the control channel.
   1201 
   1202         Args:
   1203             data: data to be sent.
   1204         """
   1205 
   1206         self._writer.put_outgoing_data(_OutgoingData(
   1207                 channel_id=_CONTROL_CHANNEL_ID, data=data))
   1208 
   1209     def send_data(self, channel_id, data):
   1210         """Sends data via given logical channel. This method is called by
   1211         worker threads.
   1212 
   1213         Args:
   1214             channel_id: objective channel id.
   1215             data: data to be sent.
   1216         """
   1217 
   1218         self._writer.put_outgoing_data(_OutgoingData(
   1219                 channel_id=channel_id, data=data))
   1220 
   1221     def _send_drop_channel(self, channel_id, reason='', mux_error=False):
   1222         frame_data = _create_drop_channel(channel_id, reason, mux_error)
   1223         self._logger.debug(
   1224             'Sending drop channel for channel id %d' % channel_id)
   1225         self.send_control_data(frame_data)
   1226 
   1227     def _send_error_add_channel_response(self, channel_id, status=None):
   1228         if status is None:
   1229             status = common.HTTP_STATUS_BAD_REQUEST
   1230 
   1231         if status in _HTTP_BAD_RESPONSE_MESSAGES:
   1232             message = _HTTP_BAD_RESPONSE_MESSAGES[status]
   1233         else:
   1234             self._logger.debug('Response message for %d is not found' % status)
   1235             message = '???'
   1236 
   1237         response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message)
   1238         frame_data = _create_add_channel_response(channel_id,
   1239                                                   encoded_handshake=response,
   1240                                                   encoding=0, rejected=True)
   1241         self.send_control_data(frame_data)
   1242 
   1243     def _create_logical_request(self, block):
   1244         if block.channel_id == _CONTROL_CHANNEL_ID:
   1245             raise MuxUnexpectedException(
   1246                 'Received the control channel id (0) as objective channel '
   1247                 'id for AddChannel')
   1248 
   1249         if block.encoding != 0:
   1250             raise MuxNotImplementedException(
   1251                 'delta-encoding not supported yet')
   1252         connection = _LogicalConnection(self, block.channel_id)
   1253         command, path, version, headers = _parse_request_text(
   1254                                               block.encoded_handshake)
   1255         request = _LogicalRequest(block.channel_id, command, path,
   1256                                   headers, connection)
   1257 
   1258         return request
   1259 
   1260     def _do_handshake_for_logical_request(self, request, send_quota=0):
   1261         try:
   1262             receive_quota = self._channel_slots.popleft()
   1263         except IndexError:
   1264             raise MuxUnexpectedException('No room in channel pool')
   1265 
   1266         handshaker = _MuxHandshaker(request, self.dispatcher,
   1267                                     send_quota, receive_quota)
   1268         try:
   1269             handshaker.do_handshake()
   1270         except handshake.VersionException, e:
   1271             self._logger.info('%s', e)
   1272             self._send_error_add_channel_response(
   1273                 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1274             return False
   1275         except handshake.HandshakeException, e:
   1276             self._logger.info('%s', e)
   1277             self._send_error_add_channel_response(request.channel_id,
   1278                                                   status=e.status)
   1279             return False
   1280         except handshake.AbortedByUserException, e:
   1281             self._logger.info('%s', e)
   1282             self._send_error_add_channel_response(request.channel_id)
   1283             return False
   1284 
   1285         return True
   1286 
   1287     def _add_logical_channel(self, logical_request):
   1288         try:
   1289             self._logical_channels_condition.acquire()
   1290             if logical_request.channel_id in self._logical_channels:
   1291                 raise MuxUnexpectedException('Channel id %d already exists' %
   1292                                              logical_request.channel_id)
   1293             worker = _Worker(self, logical_request)
   1294             channel_data = _LogicalChannelData(logical_request, worker)
   1295             self._logical_channels[logical_request.channel_id] = channel_data
   1296             worker.start()
   1297         finally:
   1298             self._logical_channels_condition.release()
   1299 
   1300     def _process_add_channel_request(self, block):
   1301         try:
   1302             logical_request = self._create_logical_request(block)
   1303         except ValueError, e:
   1304             self._logger.debug('Failed to create logical request: %r' % e)
   1305             self._send_error_add_channel_response(
   1306                 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1307             return
   1308         if self._do_handshake_for_logical_request(logical_request):
   1309             self._add_logical_channel(logical_request)
   1310         else:
   1311             self._send_error_add_channel_response(
   1312                 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST)
   1313 
   1314     def _process_flow_control(self, block):
   1315         try:
   1316             self._logical_channels_condition.acquire()
   1317             if not block.channel_id in self._logical_channels:
   1318                 return
   1319             channel_data = self._logical_channels[block.channel_id]
   1320             channel_data.request.ws_stream.replenish_send_quota(
   1321                 block.send_quota)
   1322         finally:
   1323             self._logical_channels_condition.release()
   1324 
   1325     def _process_drop_channel(self, block):
   1326         self._logger.debug('DropChannel received for %d: reason=%r' %
   1327                            (block.channel_id, block.reason))
   1328         try:
   1329             self._logical_channels_condition.acquire()
   1330             if not block.channel_id in self._logical_channels:
   1331                 return
   1332             channel_data = self._logical_channels[block.channel_id]
   1333             if not block.mux_error:
   1334                 channel_data.request.connection.set_read_state(
   1335                     _LogicalConnection.STATE_TERMINATED)
   1336             else:
   1337                 # TODO(bashi): What should we do?
   1338                 channel_data.request.connection.set_read_state(
   1339                     _LogicalConnection.STATE_TERMINATED)
   1340         finally:
   1341             self._logical_channels_condition.release()
   1342 
   1343     def _process_new_channel_slot(self, block):
   1344         raise MuxUnexpectedException('Client should not send NewChannelSlot')
   1345 
   1346     def _process_control_blocks(self, parser):
   1347         for control_block in parser.read_control_blocks():
   1348             opcode = control_block.opcode
   1349             self._logger.debug('control block received, opcode: %d' % opcode)
   1350             if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST:
   1351                 self._process_add_channel_request(control_block)
   1352             elif opcode == _MUX_OPCODE_FLOW_CONTROL:
   1353                 self._process_flow_control(control_block)
   1354             elif opcode == _MUX_OPCODE_DROP_CHANNEL:
   1355                 self._process_drop_channel(control_block)
   1356             elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT:
   1357                 self._process_new_channel_slot(control_block)
   1358             else:
   1359                 raise InvalidMuxControlBlockException(
   1360                     'Invalid opcode')
   1361 
   1362     def _process_logical_frame(self, channel_id, parser):
   1363         self._logger.debug('Received a frame. channel id=%d' % channel_id)
   1364         try:
   1365             self._logical_channels_condition.acquire()
   1366             if not channel_id in self._logical_channels:
   1367                 raise MuxUnexpectedException(
   1368                     'Channel id %d not found' % channel_id)
   1369             channel_data = self._logical_channels[channel_id]
   1370             fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame()
   1371             if not channel_data.request.ws_stream.consume_receive_quota(
   1372                 len(payload)):
   1373                 # The client violates quota. Close logical channel.
   1374                 channel_data.mux_error_occurred = True
   1375                 channel_data.mux_error_reason = 'Quota violation'
   1376                 channel_data.request.connection.set_read_state(
   1377                     _LogicalConnection.STATE_TERMINATED)
   1378                 return
   1379             header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3,
   1380                                    mask=False)
   1381             frame_data = header + payload
   1382             channel_data.request.connection.append_frame_data(frame_data)
   1383         finally:
   1384             self._logical_channels_condition.release()
   1385 
   1386     def dispatch_message(self, message):
   1387         """Dispatches message. The reader thread calls this method.
   1388 
   1389         Args:
   1390             message: a message that contains encapsulated frame.
   1391         Raises:
   1392             InvalidMuxFrameException: if the message is invalid.
   1393         """
   1394 
   1395         parser = _MuxFramePayloadParser(message)
   1396         channel_id = parser.read_channel_id()
   1397         if channel_id == _CONTROL_CHANNEL_ID:
   1398             self._process_control_blocks(parser)
   1399         else:
   1400             self._process_logical_frame(channel_id, parser)
   1401 
   1402     def notify_worker_done(self, channel_id):
   1403         """Called when a worker has finished.
   1404 
   1405         Args:
   1406             channel_id: channel id corresponded with the worker.
   1407         """
   1408 
   1409         self._logger.debug('Worker for channel id %d terminated' % channel_id)
   1410         try:
   1411             self._logical_channels_condition.acquire()
   1412             if not channel_id in self._logical_channels:
   1413                 raise MuxUnexpectedException(
   1414                     'Channel id %d not found' % channel_id)
   1415             channel_data = self._logical_channels.pop(channel_id)
   1416         finally:
   1417             self._worker_done_notify_received = True
   1418             self._logical_channels_condition.notify()
   1419             self._logical_channels_condition.release()
   1420 
   1421         if not channel_data.request.server_terminated:
   1422             if channel_data.mux_error_occurred:
   1423                 self._send_drop_channel(
   1424                     channel_id, reason=channel_data.mux_error_reason,
   1425                     mux_error=True)
   1426             else:
   1427                 self._send_drop_channel(channel_id)
   1428 
   1429     def notify_reader_done(self):
   1430         """This method is called by the reader thread when the reader has
   1431         finished.
   1432         """
   1433 
   1434         # Terminate all logical connections
   1435         self._logger.debug('termiating all logical connections...')
   1436         self._logical_channels_condition.acquire()
   1437         for channel_data in self._logical_channels.values():
   1438             try:
   1439                 channel_data.request.connection.set_read_state(
   1440                     _LogicalConnection.STATE_TERMINATED)
   1441             except Exception:
   1442                 pass
   1443         self._logical_channels_condition.release()
   1444 
   1445 
   1446 def use_mux(request):
   1447     return hasattr(request, 'mux') and request.mux
   1448 
   1449 
   1450 def start(request, dispatcher):
   1451     mux_handler = _MuxHandler(request, dispatcher)
   1452     mux_handler.start()
   1453 
   1454     mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS,
   1455                                   _INITIAL_QUOTA_FOR_CLIENT)
   1456 
   1457     mux_handler.wait_until_done()
   1458 
   1459 
   1460 # vi:sts=4 sw=4 et
   1461