1 # Copyright 2012, Google Inc. 2 # All rights reserved. 3 # 4 # Redistribution and use in source and binary forms, with or without 5 # modification, are permitted provided that the following conditions are 6 # met: 7 # 8 # * Redistributions of source code must retain the above copyright 9 # notice, this list of conditions and the following disclaimer. 10 # * Redistributions in binary form must reproduce the above 11 # copyright notice, this list of conditions and the following disclaimer 12 # in the documentation and/or other materials provided with the 13 # distribution. 14 # * Neither the name of Google Inc. nor the names of its 15 # contributors may be used to endorse or promote products derived from 16 # this software without specific prior written permission. 17 # 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 31 """This file provides classes and helper functions for multiplexing extension. 32 33 Specification: 34 http://tools.ietf.org/html/draft-ietf-hybi-websocket-multiplexing-06 35 """ 36 37 38 import collections 39 import copy 40 import email 41 import email.parser 42 import logging 43 import math 44 import struct 45 import threading 46 import traceback 47 48 from mod_pywebsocket import common 49 from mod_pywebsocket import handshake 50 from mod_pywebsocket import util 51 from mod_pywebsocket._stream_base import BadOperationException 52 from mod_pywebsocket._stream_base import ConnectionTerminatedException 53 from mod_pywebsocket._stream_base import InvalidFrameException 54 from mod_pywebsocket._stream_hybi import Frame 55 from mod_pywebsocket._stream_hybi import Stream 56 from mod_pywebsocket._stream_hybi import StreamOptions 57 from mod_pywebsocket._stream_hybi import create_binary_frame 58 from mod_pywebsocket._stream_hybi import create_closing_handshake_body 59 from mod_pywebsocket._stream_hybi import create_header 60 from mod_pywebsocket._stream_hybi import create_length_header 61 from mod_pywebsocket._stream_hybi import parse_frame 62 from mod_pywebsocket.handshake import hybi 63 64 65 _CONTROL_CHANNEL_ID = 0 66 _DEFAULT_CHANNEL_ID = 1 67 68 _MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 69 _MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 70 _MUX_OPCODE_FLOW_CONTROL = 2 71 _MUX_OPCODE_DROP_CHANNEL = 3 72 _MUX_OPCODE_NEW_CHANNEL_SLOT = 4 73 74 _MAX_CHANNEL_ID = 2 ** 29 - 1 75 76 _INITIAL_NUMBER_OF_CHANNEL_SLOTS = 64 77 _INITIAL_QUOTA_FOR_CLIENT = 8 * 1024 78 79 _HANDSHAKE_ENCODING_IDENTITY = 0 80 _HANDSHAKE_ENCODING_DELTA = 1 81 82 # We need only these status code for now. 83 _HTTP_BAD_RESPONSE_MESSAGES = { 84 common.HTTP_STATUS_BAD_REQUEST: 'Bad Request', 85 } 86 87 # DropChannel reason code 88 # TODO(bashi): Define all reason code defined in -05 draft. 89 _DROP_CODE_NORMAL_CLOSURE = 1000 90 91 _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE = 2001 92 _DROP_CODE_CHANNEL_ID_TRUNCATED = 2002 93 _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED = 2003 94 _DROP_CODE_UNKNOWN_MUX_OPCODE = 2004 95 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK = 2005 96 _DROP_CODE_CHANNEL_ALREADY_EXISTS = 2006 97 _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION = 2007 98 _DROP_CODE_UNKNOWN_REQUEST_ENCODING = 2010 99 100 _DROP_CODE_SEND_QUOTA_VIOLATION = 3005 101 _DROP_CODE_SEND_QUOTA_OVERFLOW = 3006 102 _DROP_CODE_ACKNOWLEDGED = 3008 103 _DROP_CODE_BAD_FRAGMENTATION = 3009 104 105 106 class MuxUnexpectedException(Exception): 107 """Exception in handling multiplexing extension.""" 108 pass 109 110 111 # Temporary 112 class MuxNotImplementedException(Exception): 113 """Raised when a flow enters unimplemented code path.""" 114 pass 115 116 117 class LogicalConnectionClosedException(Exception): 118 """Raised when logical connection is gracefully closed.""" 119 pass 120 121 122 class PhysicalConnectionError(Exception): 123 """Raised when there is a physical connection error.""" 124 def __init__(self, drop_code, message=''): 125 super(PhysicalConnectionError, self).__init__( 126 'code=%d, message=%r' % (drop_code, message)) 127 self.drop_code = drop_code 128 self.message = message 129 130 131 class LogicalChannelError(Exception): 132 """Raised when there is a logical channel error.""" 133 def __init__(self, channel_id, drop_code, message=''): 134 super(LogicalChannelError, self).__init__( 135 'channel_id=%d, code=%d, message=%r' % ( 136 channel_id, drop_code, message)) 137 self.channel_id = channel_id 138 self.drop_code = drop_code 139 self.message = message 140 141 142 def _encode_channel_id(channel_id): 143 if channel_id < 0: 144 raise ValueError('Channel id %d must not be negative' % channel_id) 145 146 if channel_id < 2 ** 7: 147 return chr(channel_id) 148 if channel_id < 2 ** 14: 149 return struct.pack('!H', 0x8000 + channel_id) 150 if channel_id < 2 ** 21: 151 first = chr(0xc0 + (channel_id >> 16)) 152 return first + struct.pack('!H', channel_id & 0xffff) 153 if channel_id < 2 ** 29: 154 return struct.pack('!L', 0xe0000000 + channel_id) 155 156 raise ValueError('Channel id %d is too large' % channel_id) 157 158 159 def _encode_number(number): 160 return create_length_header(number, False) 161 162 163 def _create_add_channel_response(channel_id, encoded_handshake, 164 encoding=0, rejected=False): 165 if encoding != 0 and encoding != 1: 166 raise ValueError('Invalid encoding %d' % encoding) 167 168 first_byte = ((_MUX_OPCODE_ADD_CHANNEL_RESPONSE << 5) | 169 (rejected << 4) | encoding) 170 block = (chr(first_byte) + 171 _encode_channel_id(channel_id) + 172 _encode_number(len(encoded_handshake)) + 173 encoded_handshake) 174 return block 175 176 177 def _create_drop_channel(channel_id, code=None, message=''): 178 if len(message) > 0 and code is None: 179 raise ValueError('Code must be specified if message is specified') 180 181 first_byte = _MUX_OPCODE_DROP_CHANNEL << 5 182 block = chr(first_byte) + _encode_channel_id(channel_id) 183 if code is None: 184 block += _encode_number(0) # Reason size 185 else: 186 reason = struct.pack('!H', code) + message 187 reason_size = _encode_number(len(reason)) 188 block += reason_size + reason 189 190 return block 191 192 193 def _create_flow_control(channel_id, replenished_quota): 194 first_byte = _MUX_OPCODE_FLOW_CONTROL << 5 195 block = (chr(first_byte) + 196 _encode_channel_id(channel_id) + 197 _encode_number(replenished_quota)) 198 return block 199 200 201 def _create_new_channel_slot(slots, send_quota): 202 if slots < 0 or send_quota < 0: 203 raise ValueError('slots and send_quota must be non-negative.') 204 first_byte = _MUX_OPCODE_NEW_CHANNEL_SLOT << 5 205 block = (chr(first_byte) + 206 _encode_number(slots) + 207 _encode_number(send_quota)) 208 return block 209 210 211 def _create_fallback_new_channel_slot(): 212 first_byte = (_MUX_OPCODE_NEW_CHANNEL_SLOT << 5) | 1 # Set the F flag 213 block = (chr(first_byte) + _encode_number(0) + _encode_number(0)) 214 return block 215 216 217 def _parse_request_text(request_text): 218 request_line, header_lines = request_text.split('\r\n', 1) 219 220 words = request_line.split(' ') 221 if len(words) != 3: 222 raise ValueError('Bad Request-Line syntax %r' % request_line) 223 [command, path, version] = words 224 if version != 'HTTP/1.1': 225 raise ValueError('Bad request version %r' % version) 226 227 # email.parser.Parser() parses RFC 2822 (RFC 822) style headers. 228 # RFC 6455 refers RFC 2616 for handshake parsing, and RFC 2616 refers 229 # RFC 822. 230 headers = email.parser.Parser().parsestr(header_lines) 231 return command, path, version, headers 232 233 234 class _ControlBlock(object): 235 """A structure that holds parsing result of multiplexing control block. 236 Control block specific attributes will be added by _MuxFramePayloadParser. 237 (e.g. encoded_handshake will be added for AddChannelRequest and 238 AddChannelResponse) 239 """ 240 241 def __init__(self, opcode): 242 self.opcode = opcode 243 244 245 class _MuxFramePayloadParser(object): 246 """A class that parses multiplexed frame payload.""" 247 248 def __init__(self, payload): 249 self._data = payload 250 self._read_position = 0 251 self._logger = util.get_class_logger(self) 252 253 def read_channel_id(self): 254 """Reads channel id. 255 256 Raises: 257 ValueError: when the payload doesn't contain 258 valid channel id. 259 """ 260 261 remaining_length = len(self._data) - self._read_position 262 pos = self._read_position 263 if remaining_length == 0: 264 raise ValueError('Invalid channel id format') 265 266 channel_id = ord(self._data[pos]) 267 channel_id_length = 1 268 if channel_id & 0xe0 == 0xe0: 269 if remaining_length < 4: 270 raise ValueError('Invalid channel id format') 271 channel_id = struct.unpack('!L', 272 self._data[pos:pos+4])[0] & 0x1fffffff 273 channel_id_length = 4 274 elif channel_id & 0xc0 == 0xc0: 275 if remaining_length < 3: 276 raise ValueError('Invalid channel id format') 277 channel_id = (((channel_id & 0x1f) << 16) + 278 struct.unpack('!H', self._data[pos+1:pos+3])[0]) 279 channel_id_length = 3 280 elif channel_id & 0x80 == 0x80: 281 if remaining_length < 2: 282 raise ValueError('Invalid channel id format') 283 channel_id = struct.unpack('!H', 284 self._data[pos:pos+2])[0] & 0x3fff 285 channel_id_length = 2 286 self._read_position += channel_id_length 287 288 return channel_id 289 290 def read_inner_frame(self): 291 """Reads an inner frame. 292 293 Raises: 294 PhysicalConnectionError: when the inner frame is invalid. 295 """ 296 297 if len(self._data) == self._read_position: 298 raise PhysicalConnectionError( 299 _DROP_CODE_ENCAPSULATED_FRAME_IS_TRUNCATED) 300 301 bits = ord(self._data[self._read_position]) 302 self._read_position += 1 303 fin = (bits & 0x80) == 0x80 304 rsv1 = (bits & 0x40) == 0x40 305 rsv2 = (bits & 0x20) == 0x20 306 rsv3 = (bits & 0x10) == 0x10 307 opcode = bits & 0xf 308 payload = self.remaining_data() 309 # Consume rest of the message which is payload data of the original 310 # frame. 311 self._read_position = len(self._data) 312 return fin, rsv1, rsv2, rsv3, opcode, payload 313 314 def _read_number(self): 315 if self._read_position + 1 > len(self._data): 316 raise ValueError( 317 'Cannot read the first byte of number field') 318 319 number = ord(self._data[self._read_position]) 320 if number & 0x80 == 0x80: 321 raise ValueError( 322 'The most significant bit of the first byte of number should ' 323 'be unset') 324 self._read_position += 1 325 pos = self._read_position 326 if number == 127: 327 if pos + 8 > len(self._data): 328 raise ValueError('Invalid number field') 329 self._read_position += 8 330 number = struct.unpack('!Q', self._data[pos:pos+8])[0] 331 if number > 0x7FFFFFFFFFFFFFFF: 332 raise ValueError('Encoded number(%d) >= 2^63' % number) 333 if number <= 0xFFFF: 334 raise ValueError( 335 '%d should not be encoded by 9 bytes encoding' % number) 336 return number 337 if number == 126: 338 if pos + 2 > len(self._data): 339 raise ValueError('Invalid number field') 340 self._read_position += 2 341 number = struct.unpack('!H', self._data[pos:pos+2])[0] 342 if number <= 125: 343 raise ValueError( 344 '%d should not be encoded by 3 bytes encoding' % number) 345 return number 346 347 def _read_size_and_contents(self): 348 """Reads data that consists of followings: 349 - the size of the contents encoded the same way as payload length 350 of the WebSocket Protocol with 1 bit padding at the head. 351 - the contents. 352 """ 353 354 try: 355 size = self._read_number() 356 except ValueError, e: 357 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 358 str(e)) 359 pos = self._read_position 360 if pos + size > len(self._data): 361 raise PhysicalConnectionError( 362 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 363 'Cannot read %d bytes data' % size) 364 365 self._read_position += size 366 return self._data[pos:pos+size] 367 368 def _read_add_channel_request(self, first_byte, control_block): 369 reserved = (first_byte >> 2) & 0x7 370 if reserved != 0: 371 raise PhysicalConnectionError( 372 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 373 'Reserved bits must be unset') 374 375 # Invalid encoding will be handled by MuxHandler. 376 encoding = first_byte & 0x3 377 try: 378 control_block.channel_id = self.read_channel_id() 379 except ValueError, e: 380 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) 381 control_block.encoding = encoding 382 encoded_handshake = self._read_size_and_contents() 383 control_block.encoded_handshake = encoded_handshake 384 return control_block 385 386 def _read_add_channel_response(self, first_byte, control_block): 387 reserved = (first_byte >> 2) & 0x3 388 if reserved != 0: 389 raise PhysicalConnectionError( 390 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 391 'Reserved bits must be unset') 392 393 control_block.accepted = (first_byte >> 4) & 1 394 control_block.encoding = first_byte & 0x3 395 try: 396 control_block.channel_id = self.read_channel_id() 397 except ValueError, e: 398 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) 399 control_block.encoded_handshake = self._read_size_and_contents() 400 return control_block 401 402 def _read_flow_control(self, first_byte, control_block): 403 reserved = first_byte & 0x1f 404 if reserved != 0: 405 raise PhysicalConnectionError( 406 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 407 'Reserved bits must be unset') 408 409 try: 410 control_block.channel_id = self.read_channel_id() 411 control_block.send_quota = self._read_number() 412 except ValueError, e: 413 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 414 str(e)) 415 416 return control_block 417 418 def _read_drop_channel(self, first_byte, control_block): 419 reserved = first_byte & 0x1f 420 if reserved != 0: 421 raise PhysicalConnectionError( 422 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 423 'Reserved bits must be unset') 424 425 try: 426 control_block.channel_id = self.read_channel_id() 427 except ValueError, e: 428 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK) 429 reason = self._read_size_and_contents() 430 if len(reason) == 0: 431 control_block.drop_code = None 432 control_block.drop_message = '' 433 elif len(reason) >= 2: 434 control_block.drop_code = struct.unpack('!H', reason[:2])[0] 435 control_block.drop_message = reason[2:] 436 else: 437 raise PhysicalConnectionError( 438 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 439 'Received DropChannel that conains only 1-byte reason') 440 return control_block 441 442 def _read_new_channel_slot(self, first_byte, control_block): 443 reserved = first_byte & 0x1e 444 if reserved != 0: 445 raise PhysicalConnectionError( 446 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 447 'Reserved bits must be unset') 448 control_block.fallback = first_byte & 1 449 try: 450 control_block.slots = self._read_number() 451 control_block.send_quota = self._read_number() 452 except ValueError, e: 453 raise PhysicalConnectionError(_DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 454 str(e)) 455 return control_block 456 457 def read_control_blocks(self): 458 """Reads control block(s). 459 460 Raises: 461 PhysicalConnectionError: when the payload contains invalid control 462 block(s). 463 StopIteration: when no control blocks left. 464 """ 465 466 while self._read_position < len(self._data): 467 first_byte = ord(self._data[self._read_position]) 468 self._read_position += 1 469 opcode = (first_byte >> 5) & 0x7 470 control_block = _ControlBlock(opcode=opcode) 471 if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 472 yield self._read_add_channel_request(first_byte, control_block) 473 elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 474 yield self._read_add_channel_response( 475 first_byte, control_block) 476 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 477 yield self._read_flow_control(first_byte, control_block) 478 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 479 yield self._read_drop_channel(first_byte, control_block) 480 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 481 yield self._read_new_channel_slot(first_byte, control_block) 482 else: 483 raise PhysicalConnectionError( 484 _DROP_CODE_UNKNOWN_MUX_OPCODE, 485 'Invalid opcode %d' % opcode) 486 487 assert self._read_position == len(self._data) 488 raise StopIteration 489 490 def remaining_data(self): 491 """Returns remaining data.""" 492 493 return self._data[self._read_position:] 494 495 496 class _LogicalRequest(object): 497 """Mimics mod_python request.""" 498 499 def __init__(self, channel_id, command, path, protocol, headers, 500 connection): 501 """Constructs an instance. 502 503 Args: 504 channel_id: the channel id of the logical channel. 505 command: HTTP request command. 506 path: HTTP request path. 507 headers: HTTP headers. 508 connection: _LogicalConnection instance. 509 """ 510 511 self.channel_id = channel_id 512 self.method = command 513 self.uri = path 514 self.protocol = protocol 515 self.headers_in = headers 516 self.connection = connection 517 self.server_terminated = False 518 self.client_terminated = False 519 520 def is_https(self): 521 """Mimics request.is_https(). Returns False because this method is 522 used only by old protocols (hixie and hybi00). 523 """ 524 525 return False 526 527 528 class _LogicalConnection(object): 529 """Mimics mod_python mp_conn.""" 530 531 # For details, see the comment of set_read_state(). 532 STATE_ACTIVE = 1 533 STATE_GRACEFULLY_CLOSED = 2 534 STATE_TERMINATED = 3 535 536 def __init__(self, mux_handler, channel_id): 537 """Constructs an instance. 538 539 Args: 540 mux_handler: _MuxHandler instance. 541 channel_id: channel id of this connection. 542 """ 543 544 self._mux_handler = mux_handler 545 self._channel_id = channel_id 546 self._incoming_data = '' 547 548 # - Protects _waiting_write_completion 549 # - Signals the thread waiting for completion of write by mux handler 550 self._write_condition = threading.Condition() 551 self._waiting_write_completion = False 552 553 self._read_condition = threading.Condition() 554 self._read_state = self.STATE_ACTIVE 555 556 def get_local_addr(self): 557 """Getter to mimic mp_conn.local_addr.""" 558 559 return self._mux_handler.physical_connection.get_local_addr() 560 local_addr = property(get_local_addr) 561 562 def get_remote_addr(self): 563 """Getter to mimic mp_conn.remote_addr.""" 564 565 return self._mux_handler.physical_connection.get_remote_addr() 566 remote_addr = property(get_remote_addr) 567 568 def get_memorized_lines(self): 569 """Gets memorized lines. Not supported.""" 570 571 raise MuxUnexpectedException('_LogicalConnection does not support ' 572 'get_memorized_lines') 573 574 def write(self, data): 575 """Writes data. mux_handler sends data asynchronously. The caller will 576 be suspended until write done. 577 578 Args: 579 data: data to be written. 580 581 Raises: 582 MuxUnexpectedException: when called before finishing the previous 583 write. 584 """ 585 586 try: 587 self._write_condition.acquire() 588 if self._waiting_write_completion: 589 raise MuxUnexpectedException( 590 'Logical connection %d is already waiting the completion ' 591 'of write' % self._channel_id) 592 593 self._waiting_write_completion = True 594 self._mux_handler.send_data(self._channel_id, data) 595 self._write_condition.wait() 596 # TODO(tyoshino): Raise an exception if woke up by on_writer_done. 597 finally: 598 self._write_condition.release() 599 600 def write_control_data(self, data): 601 """Writes data via the control channel. Don't wait finishing write 602 because this method can be called by mux dispatcher. 603 604 Args: 605 data: data to be written. 606 """ 607 608 self._mux_handler.send_control_data(data) 609 610 def on_write_data_done(self): 611 """Called when sending data is completed.""" 612 613 try: 614 self._write_condition.acquire() 615 if not self._waiting_write_completion: 616 raise MuxUnexpectedException( 617 'Invalid call of on_write_data_done for logical ' 618 'connection %d' % self._channel_id) 619 self._waiting_write_completion = False 620 self._write_condition.notify() 621 finally: 622 self._write_condition.release() 623 624 def on_writer_done(self): 625 """Called by the mux handler when the writer thread has finished.""" 626 627 try: 628 self._write_condition.acquire() 629 self._waiting_write_completion = False 630 self._write_condition.notify() 631 finally: 632 self._write_condition.release() 633 634 635 def append_frame_data(self, frame_data): 636 """Appends incoming frame data. Called when mux_handler dispatches 637 frame data to the corresponding application. 638 639 Args: 640 frame_data: incoming frame data. 641 """ 642 643 self._read_condition.acquire() 644 self._incoming_data += frame_data 645 self._read_condition.notify() 646 self._read_condition.release() 647 648 def read(self, length): 649 """Reads data. Blocks until enough data has arrived via physical 650 connection. 651 652 Args: 653 length: length of data to be read. 654 Raises: 655 LogicalConnectionClosedException: when closing handshake for this 656 logical channel has been received. 657 ConnectionTerminatedException: when the physical connection has 658 closed, or an error is caused on the reader thread. 659 """ 660 661 self._read_condition.acquire() 662 while (self._read_state == self.STATE_ACTIVE and 663 len(self._incoming_data) < length): 664 self._read_condition.wait() 665 666 try: 667 if self._read_state == self.STATE_GRACEFULLY_CLOSED: 668 raise LogicalConnectionClosedException( 669 'Logical channel %d has closed.' % self._channel_id) 670 elif self._read_state == self.STATE_TERMINATED: 671 raise ConnectionTerminatedException( 672 'Receiving %d byte failed. Logical channel (%d) closed' % 673 (length, self._channel_id)) 674 675 value = self._incoming_data[:length] 676 self._incoming_data = self._incoming_data[length:] 677 finally: 678 self._read_condition.release() 679 680 return value 681 682 def set_read_state(self, new_state): 683 """Sets the state of this connection. Called when an event for this 684 connection has occurred. 685 686 Args: 687 new_state: state to be set. new_state must be one of followings: 688 - STATE_GRACEFULLY_CLOSED: when closing handshake for this 689 connection has been received. 690 - STATE_TERMINATED: when the physical connection has closed or 691 DropChannel of this connection has received. 692 """ 693 694 self._read_condition.acquire() 695 self._read_state = new_state 696 self._read_condition.notify() 697 self._read_condition.release() 698 699 700 class _InnerMessage(object): 701 """Holds the result of _InnerMessageBuilder.build(). 702 """ 703 704 def __init__(self, opcode, payload): 705 self.opcode = opcode 706 self.payload = payload 707 708 709 class _InnerMessageBuilder(object): 710 """A class that holds the context of inner message fragmentation and 711 builds a message from fragmented inner frame(s). 712 """ 713 714 def __init__(self): 715 self._control_opcode = None 716 self._pending_control_fragments = [] 717 self._message_opcode = None 718 self._pending_message_fragments = [] 719 self._frame_handler = self._handle_first 720 721 def _handle_first(self, frame): 722 if frame.opcode == common.OPCODE_CONTINUATION: 723 raise InvalidFrameException('Sending invalid continuation opcode') 724 725 if common.is_control_opcode(frame.opcode): 726 return self._process_first_fragmented_control(frame) 727 else: 728 return self._process_first_fragmented_message(frame) 729 730 def _process_first_fragmented_control(self, frame): 731 self._control_opcode = frame.opcode 732 self._pending_control_fragments.append(frame.payload) 733 if not frame.fin: 734 self._frame_handler = self._handle_fragmented_control 735 return None 736 return self._reassemble_fragmented_control() 737 738 def _process_first_fragmented_message(self, frame): 739 self._message_opcode = frame.opcode 740 self._pending_message_fragments.append(frame.payload) 741 if not frame.fin: 742 self._frame_handler = self._handle_fragmented_message 743 return None 744 return self._reassemble_fragmented_message() 745 746 def _handle_fragmented_control(self, frame): 747 if frame.opcode != common.OPCODE_CONTINUATION: 748 raise InvalidFrameException( 749 'Sending invalid opcode %d while sending fragmented control ' 750 'message' % frame.opcode) 751 self._pending_control_fragments.append(frame.payload) 752 if not frame.fin: 753 return None 754 return self._reassemble_fragmented_control() 755 756 def _reassemble_fragmented_control(self): 757 opcode = self._control_opcode 758 payload = ''.join(self._pending_control_fragments) 759 self._control_opcode = None 760 self._pending_control_fragments = [] 761 if self._message_opcode is not None: 762 self._frame_handler = self._handle_fragmented_message 763 else: 764 self._frame_handler = self._handle_first 765 return _InnerMessage(opcode, payload) 766 767 def _handle_fragmented_message(self, frame): 768 # Sender can interleave a control message while sending fragmented 769 # messages. 770 if common.is_control_opcode(frame.opcode): 771 if self._control_opcode is not None: 772 raise MuxUnexpectedException( 773 'Should not reach here(Bug in builder)') 774 return self._process_first_fragmented_control(frame) 775 776 if frame.opcode != common.OPCODE_CONTINUATION: 777 raise InvalidFrameException( 778 'Sending invalid opcode %d while sending fragmented message' % 779 frame.opcode) 780 self._pending_message_fragments.append(frame.payload) 781 if not frame.fin: 782 return None 783 return self._reassemble_fragmented_message() 784 785 def _reassemble_fragmented_message(self): 786 opcode = self._message_opcode 787 payload = ''.join(self._pending_message_fragments) 788 self._message_opcode = None 789 self._pending_message_fragments = [] 790 self._frame_handler = self._handle_first 791 return _InnerMessage(opcode, payload) 792 793 def build(self, frame): 794 """Build an inner message. Returns an _InnerMessage instance when 795 the given frame is the last fragmented frame. Returns None otherwise. 796 797 Args: 798 frame: an inner frame. 799 Raises: 800 InvalidFrameException: when received invalid opcode. (e.g. 801 receiving non continuation data opcode but the fin flag of 802 the previous inner frame was not set.) 803 """ 804 805 return self._frame_handler(frame) 806 807 808 class _LogicalStream(Stream): 809 """Mimics the Stream class. This class interprets multiplexed WebSocket 810 frames. 811 """ 812 813 def __init__(self, request, stream_options, send_quota, receive_quota): 814 """Constructs an instance. 815 816 Args: 817 request: _LogicalRequest instance. 818 stream_options: StreamOptions instance. 819 send_quota: Initial send quota. 820 receive_quota: Initial receive quota. 821 """ 822 823 # Physical stream is responsible for masking. 824 stream_options.unmask_receive = False 825 Stream.__init__(self, request, stream_options) 826 827 self._send_closed = False 828 self._send_quota = send_quota 829 # - Protects _send_closed and _send_quota 830 # - Signals the thread waiting for send quota replenished 831 self._send_condition = threading.Condition() 832 833 # The opcode of the first frame in messages. 834 self._message_opcode = common.OPCODE_TEXT 835 # True when the last message was fragmented. 836 self._last_message_was_fragmented = False 837 838 self._receive_quota = receive_quota 839 self._write_inner_frame_semaphore = threading.Semaphore() 840 841 self._inner_message_builder = _InnerMessageBuilder() 842 843 def _create_inner_frame(self, opcode, payload, end=True): 844 frame = Frame(fin=end, opcode=opcode, payload=payload) 845 for frame_filter in self._options.outgoing_frame_filters: 846 frame_filter.filter(frame) 847 848 if len(payload) != len(frame.payload): 849 raise MuxUnexpectedException( 850 'Mux extension must not be used after extensions which change ' 851 ' frame boundary') 852 853 first_byte = ((frame.fin << 7) | (frame.rsv1 << 6) | 854 (frame.rsv2 << 5) | (frame.rsv3 << 4) | frame.opcode) 855 return chr(first_byte) + frame.payload 856 857 def _write_inner_frame(self, opcode, payload, end=True): 858 payload_length = len(payload) 859 write_position = 0 860 861 try: 862 # An inner frame will be fragmented if there is no enough send 863 # quota. This semaphore ensures that fragmented inner frames are 864 # sent in order on the logical channel. 865 # Note that frames that come from other logical channels or 866 # multiplexing control blocks can be inserted between fragmented 867 # inner frames on the physical channel. 868 self._write_inner_frame_semaphore.acquire() 869 870 # Consume an octet quota when this is the first fragmented frame. 871 if opcode != common.OPCODE_CONTINUATION: 872 try: 873 self._send_condition.acquire() 874 while (not self._send_closed) and self._send_quota == 0: 875 self._send_condition.wait() 876 877 if self._send_closed: 878 raise BadOperationException( 879 'Logical connection %d is closed' % 880 self._request.channel_id) 881 882 self._send_quota -= 1 883 finally: 884 self._send_condition.release() 885 886 while write_position < payload_length: 887 try: 888 self._send_condition.acquire() 889 while (not self._send_closed) and self._send_quota == 0: 890 self._logger.debug( 891 'No quota. Waiting FlowControl message for %d.' % 892 self._request.channel_id) 893 self._send_condition.wait() 894 895 if self._send_closed: 896 raise BadOperationException( 897 'Logical connection %d is closed' % 898 self.request._channel_id) 899 900 remaining = payload_length - write_position 901 write_length = min(self._send_quota, remaining) 902 inner_frame_end = ( 903 end and 904 (write_position + write_length == payload_length)) 905 906 inner_frame = self._create_inner_frame( 907 opcode, 908 payload[write_position:write_position+write_length], 909 inner_frame_end) 910 self._send_quota -= write_length 911 self._logger.debug('Consumed quota=%d, remaining=%d' % 912 (write_length, self._send_quota)) 913 finally: 914 self._send_condition.release() 915 916 # Writing data will block the worker so we need to release 917 # _send_condition before writing. 918 self._logger.debug('Sending inner frame: %r' % inner_frame) 919 self._request.connection.write(inner_frame) 920 write_position += write_length 921 922 opcode = common.OPCODE_CONTINUATION 923 924 except ValueError, e: 925 raise BadOperationException(e) 926 finally: 927 self._write_inner_frame_semaphore.release() 928 929 def replenish_send_quota(self, send_quota): 930 """Replenish send quota.""" 931 932 try: 933 self._send_condition.acquire() 934 if self._send_quota + send_quota > 0x7FFFFFFFFFFFFFFF: 935 self._send_quota = 0 936 raise LogicalChannelError( 937 self._request.channel_id, _DROP_CODE_SEND_QUOTA_OVERFLOW) 938 self._send_quota += send_quota 939 self._logger.debug('Replenished send quota for channel id %d: %d' % 940 (self._request.channel_id, self._send_quota)) 941 finally: 942 self._send_condition.notify() 943 self._send_condition.release() 944 945 def consume_receive_quota(self, amount): 946 """Consumes receive quota. Returns False on failure.""" 947 948 if self._receive_quota < amount: 949 self._logger.debug('Violate quota on channel id %d: %d < %d' % 950 (self._request.channel_id, 951 self._receive_quota, amount)) 952 return False 953 self._receive_quota -= amount 954 return True 955 956 def send_message(self, message, end=True, binary=False): 957 """Override Stream.send_message.""" 958 959 if self._request.server_terminated: 960 raise BadOperationException( 961 'Requested send_message after sending out a closing handshake') 962 963 if binary and isinstance(message, unicode): 964 raise BadOperationException( 965 'Message for binary frame must be instance of str') 966 967 if binary: 968 opcode = common.OPCODE_BINARY 969 else: 970 opcode = common.OPCODE_TEXT 971 message = message.encode('utf-8') 972 973 for message_filter in self._options.outgoing_message_filters: 974 message = message_filter.filter(message, end, binary) 975 976 if self._last_message_was_fragmented: 977 if opcode != self._message_opcode: 978 raise BadOperationException('Message types are different in ' 979 'frames for the same message') 980 opcode = common.OPCODE_CONTINUATION 981 else: 982 self._message_opcode = opcode 983 984 self._write_inner_frame(opcode, message, end) 985 self._last_message_was_fragmented = not end 986 987 def _receive_frame(self): 988 """Overrides Stream._receive_frame. 989 990 In addition to call Stream._receive_frame, this method adds the amount 991 of payload to receiving quota and sends FlowControl to the client. 992 We need to do it here because Stream.receive_message() handles 993 control frames internally. 994 """ 995 996 opcode, payload, fin, rsv1, rsv2, rsv3 = Stream._receive_frame(self) 997 amount = len(payload) 998 # Replenish extra one octet when receiving the first fragmented frame. 999 if opcode != common.OPCODE_CONTINUATION: 1000 amount += 1 1001 self._receive_quota += amount 1002 frame_data = _create_flow_control(self._request.channel_id, 1003 amount) 1004 self._logger.debug('Sending flow control for %d, replenished=%d' % 1005 (self._request.channel_id, amount)) 1006 self._request.connection.write_control_data(frame_data) 1007 return opcode, payload, fin, rsv1, rsv2, rsv3 1008 1009 def _get_message_from_frame(self, frame): 1010 """Overrides Stream._get_message_from_frame. 1011 """ 1012 1013 try: 1014 inner_message = self._inner_message_builder.build(frame) 1015 except InvalidFrameException: 1016 raise LogicalChannelError( 1017 self._request.channel_id, _DROP_CODE_BAD_FRAGMENTATION) 1018 1019 if inner_message is None: 1020 return None 1021 self._original_opcode = inner_message.opcode 1022 return inner_message.payload 1023 1024 def receive_message(self): 1025 """Overrides Stream.receive_message.""" 1026 1027 # Just call Stream.receive_message(), but catch 1028 # LogicalConnectionClosedException, which is raised when the logical 1029 # connection has closed gracefully. 1030 try: 1031 return Stream.receive_message(self) 1032 except LogicalConnectionClosedException, e: 1033 self._logger.debug('%s', e) 1034 return None 1035 1036 def _send_closing_handshake(self, code, reason): 1037 """Overrides Stream._send_closing_handshake.""" 1038 1039 body = create_closing_handshake_body(code, reason) 1040 self._logger.debug('Sending closing handshake for %d: (%r, %r)' % 1041 (self._request.channel_id, code, reason)) 1042 self._write_inner_frame(common.OPCODE_CLOSE, body, end=True) 1043 1044 self._request.server_terminated = True 1045 1046 def send_ping(self, body=''): 1047 """Overrides Stream.send_ping""" 1048 1049 self._logger.debug('Sending ping on logical channel %d: %r' % 1050 (self._request.channel_id, body)) 1051 self._write_inner_frame(common.OPCODE_PING, body, end=True) 1052 1053 self._ping_queue.append(body) 1054 1055 def _send_pong(self, body): 1056 """Overrides Stream._send_pong""" 1057 1058 self._logger.debug('Sending pong on logical channel %d: %r' % 1059 (self._request.channel_id, body)) 1060 self._write_inner_frame(common.OPCODE_PONG, body, end=True) 1061 1062 def close_connection(self, code=common.STATUS_NORMAL_CLOSURE, reason=''): 1063 """Overrides Stream.close_connection.""" 1064 1065 # TODO(bashi): Implement 1066 self._logger.debug('Closing logical connection %d' % 1067 self._request.channel_id) 1068 self._request.server_terminated = True 1069 1070 def stop_sending(self): 1071 """Stops accepting new send operation (_write_inner_frame).""" 1072 1073 self._send_condition.acquire() 1074 self._send_closed = True 1075 self._send_condition.notify() 1076 self._send_condition.release() 1077 1078 1079 class _OutgoingData(object): 1080 """A structure that holds data to be sent via physical connection and 1081 origin of the data. 1082 """ 1083 1084 def __init__(self, channel_id, data): 1085 self.channel_id = channel_id 1086 self.data = data 1087 1088 1089 class _PhysicalConnectionWriter(threading.Thread): 1090 """A thread that is responsible for writing data to physical connection. 1091 1092 TODO(bashi): Make sure there is no thread-safety problem when the reader 1093 thread reads data from the same socket at a time. 1094 """ 1095 1096 def __init__(self, mux_handler): 1097 """Constructs an instance. 1098 1099 Args: 1100 mux_handler: _MuxHandler instance. 1101 """ 1102 1103 threading.Thread.__init__(self) 1104 self._logger = util.get_class_logger(self) 1105 self._mux_handler = mux_handler 1106 self.setDaemon(True) 1107 1108 # When set, make this thread stop accepting new data, flush pending 1109 # data and exit. 1110 self._stop_requested = False 1111 # The close code of the physical connection. 1112 self._close_code = common.STATUS_NORMAL_CLOSURE 1113 # Deque for passing write data. It's protected by _deque_condition 1114 # until _stop_requested is set. 1115 self._deque = collections.deque() 1116 # - Protects _deque, _stop_requested and _close_code 1117 # - Signals threads waiting for them to be available 1118 self._deque_condition = threading.Condition() 1119 1120 def put_outgoing_data(self, data): 1121 """Puts outgoing data. 1122 1123 Args: 1124 data: _OutgoingData instance. 1125 1126 Raises: 1127 BadOperationException: when the thread has been requested to 1128 terminate. 1129 """ 1130 1131 try: 1132 self._deque_condition.acquire() 1133 if self._stop_requested: 1134 raise BadOperationException('Cannot write data anymore') 1135 1136 self._deque.append(data) 1137 self._deque_condition.notify() 1138 finally: 1139 self._deque_condition.release() 1140 1141 def _write_data(self, outgoing_data): 1142 message = (_encode_channel_id(outgoing_data.channel_id) + 1143 outgoing_data.data) 1144 try: 1145 self._mux_handler.physical_stream.send_message( 1146 message=message, end=True, binary=True) 1147 except Exception, e: 1148 util.prepend_message_to_exception( 1149 'Failed to send message to %r: ' % 1150 (self._mux_handler.physical_connection.remote_addr,), e) 1151 raise 1152 1153 # TODO(bashi): It would be better to block the thread that sends 1154 # control data as well. 1155 if outgoing_data.channel_id != _CONTROL_CHANNEL_ID: 1156 self._mux_handler.notify_write_data_done(outgoing_data.channel_id) 1157 1158 def run(self): 1159 try: 1160 self._deque_condition.acquire() 1161 while not self._stop_requested: 1162 if len(self._deque) == 0: 1163 self._deque_condition.wait() 1164 continue 1165 1166 outgoing_data = self._deque.popleft() 1167 1168 self._deque_condition.release() 1169 self._write_data(outgoing_data) 1170 self._deque_condition.acquire() 1171 1172 # Flush deque. 1173 # 1174 # At this point, self._deque_condition is always acquired. 1175 try: 1176 while len(self._deque) > 0: 1177 outgoing_data = self._deque.popleft() 1178 self._write_data(outgoing_data) 1179 finally: 1180 self._deque_condition.release() 1181 1182 # Close physical connection. 1183 try: 1184 # Don't wait the response here. The response will be read 1185 # by the reader thread. 1186 self._mux_handler.physical_stream.close_connection( 1187 self._close_code, wait_response=False) 1188 except Exception, e: 1189 util.prepend_message_to_exception( 1190 'Failed to close the physical connection: %r' % e) 1191 raise 1192 finally: 1193 self._mux_handler.notify_writer_done() 1194 1195 def stop(self, close_code=common.STATUS_NORMAL_CLOSURE): 1196 """Stops the writer thread.""" 1197 1198 self._deque_condition.acquire() 1199 self._stop_requested = True 1200 self._close_code = close_code 1201 self._deque_condition.notify() 1202 self._deque_condition.release() 1203 1204 1205 class _PhysicalConnectionReader(threading.Thread): 1206 """A thread that is responsible for reading data from physical connection. 1207 """ 1208 1209 def __init__(self, mux_handler): 1210 """Constructs an instance. 1211 1212 Args: 1213 mux_handler: _MuxHandler instance. 1214 """ 1215 1216 threading.Thread.__init__(self) 1217 self._logger = util.get_class_logger(self) 1218 self._mux_handler = mux_handler 1219 self.setDaemon(True) 1220 1221 def run(self): 1222 while True: 1223 try: 1224 physical_stream = self._mux_handler.physical_stream 1225 message = physical_stream.receive_message() 1226 if message is None: 1227 break 1228 # Below happens only when a data message is received. 1229 opcode = physical_stream.get_last_received_opcode() 1230 if opcode != common.OPCODE_BINARY: 1231 self._mux_handler.fail_physical_connection( 1232 _DROP_CODE_INVALID_ENCAPSULATING_MESSAGE, 1233 'Received a text message on physical connection') 1234 break 1235 1236 except ConnectionTerminatedException, e: 1237 self._logger.debug('%s', e) 1238 break 1239 1240 try: 1241 self._mux_handler.dispatch_message(message) 1242 except PhysicalConnectionError, e: 1243 self._mux_handler.fail_physical_connection( 1244 e.drop_code, e.message) 1245 break 1246 except LogicalChannelError, e: 1247 self._mux_handler.fail_logical_channel( 1248 e.channel_id, e.drop_code, e.message) 1249 except Exception, e: 1250 self._logger.debug(traceback.format_exc()) 1251 break 1252 1253 self._mux_handler.notify_reader_done() 1254 1255 1256 class _Worker(threading.Thread): 1257 """A thread that is responsible for running the corresponding application 1258 handler. 1259 """ 1260 1261 def __init__(self, mux_handler, request): 1262 """Constructs an instance. 1263 1264 Args: 1265 mux_handler: _MuxHandler instance. 1266 request: _LogicalRequest instance. 1267 """ 1268 1269 threading.Thread.__init__(self) 1270 self._logger = util.get_class_logger(self) 1271 self._mux_handler = mux_handler 1272 self._request = request 1273 self.setDaemon(True) 1274 1275 def run(self): 1276 self._logger.debug('Logical channel worker started. (id=%d)' % 1277 self._request.channel_id) 1278 try: 1279 # Non-critical exceptions will be handled by dispatcher. 1280 self._mux_handler.dispatcher.transfer_data(self._request) 1281 except LogicalChannelError, e: 1282 self._mux_handler.fail_logical_channel( 1283 e.channel_id, e.drop_code, e.message) 1284 finally: 1285 self._mux_handler.notify_worker_done(self._request.channel_id) 1286 1287 1288 class _MuxHandshaker(hybi.Handshaker): 1289 """Opening handshake processor for multiplexing.""" 1290 1291 _DUMMY_WEBSOCKET_KEY = 'dGhlIHNhbXBsZSBub25jZQ==' 1292 1293 def __init__(self, request, dispatcher, send_quota, receive_quota): 1294 """Constructs an instance. 1295 Args: 1296 request: _LogicalRequest instance. 1297 dispatcher: Dispatcher instance (dispatch.Dispatcher). 1298 send_quota: Initial send quota. 1299 receive_quota: Initial receive quota. 1300 """ 1301 1302 hybi.Handshaker.__init__(self, request, dispatcher) 1303 self._send_quota = send_quota 1304 self._receive_quota = receive_quota 1305 1306 # Append headers which should not be included in handshake field of 1307 # AddChannelRequest. 1308 # TODO(bashi): Make sure whether we should raise exception when 1309 # these headers are included already. 1310 request.headers_in[common.UPGRADE_HEADER] = ( 1311 common.WEBSOCKET_UPGRADE_TYPE) 1312 request.headers_in[common.SEC_WEBSOCKET_VERSION_HEADER] = ( 1313 str(common.VERSION_HYBI_LATEST)) 1314 request.headers_in[common.SEC_WEBSOCKET_KEY_HEADER] = ( 1315 self._DUMMY_WEBSOCKET_KEY) 1316 1317 def _create_stream(self, stream_options): 1318 """Override hybi.Handshaker._create_stream.""" 1319 1320 self._logger.debug('Creating logical stream for %d' % 1321 self._request.channel_id) 1322 return _LogicalStream( 1323 self._request, stream_options, self._send_quota, 1324 self._receive_quota) 1325 1326 def _create_handshake_response(self, accept): 1327 """Override hybi._create_handshake_response.""" 1328 1329 response = [] 1330 1331 response.append('HTTP/1.1 101 Switching Protocols\r\n') 1332 1333 # Upgrade and Sec-WebSocket-Accept should be excluded. 1334 response.append('%s: %s\r\n' % ( 1335 common.CONNECTION_HEADER, common.UPGRADE_CONNECTION_TYPE)) 1336 if self._request.ws_protocol is not None: 1337 response.append('%s: %s\r\n' % ( 1338 common.SEC_WEBSOCKET_PROTOCOL_HEADER, 1339 self._request.ws_protocol)) 1340 if (self._request.ws_extensions is not None and 1341 len(self._request.ws_extensions) != 0): 1342 response.append('%s: %s\r\n' % ( 1343 common.SEC_WEBSOCKET_EXTENSIONS_HEADER, 1344 common.format_extensions(self._request.ws_extensions))) 1345 response.append('\r\n') 1346 1347 return ''.join(response) 1348 1349 def _send_handshake(self, accept): 1350 """Override hybi.Handshaker._send_handshake.""" 1351 1352 # Don't send handshake response for the default channel 1353 if self._request.channel_id == _DEFAULT_CHANNEL_ID: 1354 return 1355 1356 handshake_response = self._create_handshake_response(accept) 1357 frame_data = _create_add_channel_response( 1358 self._request.channel_id, 1359 handshake_response) 1360 self._logger.debug('Sending handshake response for %d: %r' % 1361 (self._request.channel_id, frame_data)) 1362 self._request.connection.write_control_data(frame_data) 1363 1364 1365 class _LogicalChannelData(object): 1366 """A structure that holds information about logical channel. 1367 """ 1368 1369 def __init__(self, request, worker): 1370 self.request = request 1371 self.worker = worker 1372 self.drop_code = _DROP_CODE_NORMAL_CLOSURE 1373 self.drop_message = '' 1374 1375 1376 class _HandshakeDeltaBase(object): 1377 """A class that holds information for delta-encoded handshake.""" 1378 1379 def __init__(self, headers): 1380 self._headers = headers 1381 1382 def create_headers(self, delta=None): 1383 """Creates request headers for an AddChannelRequest that has 1384 delta-encoded handshake. 1385 1386 Args: 1387 delta: headers should be overridden. 1388 """ 1389 1390 headers = copy.copy(self._headers) 1391 if delta: 1392 for key, value in delta.items(): 1393 # The spec requires that a header with an empty value is 1394 # removed from the delta base. 1395 if len(value) == 0 and headers.has_key(key): 1396 del headers[key] 1397 else: 1398 headers[key] = value 1399 return headers 1400 1401 1402 class _MuxHandler(object): 1403 """Multiplexing handler. When a handler starts, it launches three 1404 threads; the reader thread, the writer thread, and a worker thread. 1405 1406 The reader thread reads data from the physical stream, i.e., the 1407 ws_stream object of the underlying websocket connection. The reader 1408 thread interprets multiplexed frames and dispatches them to logical 1409 channels. Methods of this class are mostly called by the reader thread. 1410 1411 The writer thread sends multiplexed frames which are created by 1412 logical channels via the physical connection. 1413 1414 The worker thread launched at the starting point handles the 1415 "Implicitly Opened Connection". If multiplexing handler receives 1416 an AddChannelRequest and accepts it, the handler will launch a new worker 1417 thread and dispatch the request to it. 1418 """ 1419 1420 def __init__(self, request, dispatcher): 1421 """Constructs an instance. 1422 1423 Args: 1424 request: mod_python request of the physical connection. 1425 dispatcher: Dispatcher instance (dispatch.Dispatcher). 1426 """ 1427 1428 self.original_request = request 1429 self.dispatcher = dispatcher 1430 self.physical_connection = request.connection 1431 self.physical_stream = request.ws_stream 1432 self._logger = util.get_class_logger(self) 1433 self._logical_channels = {} 1434 self._logical_channels_condition = threading.Condition() 1435 # Holds client's initial quota 1436 self._channel_slots = collections.deque() 1437 self._handshake_base = None 1438 self._worker_done_notify_received = False 1439 self._reader = None 1440 self._writer = None 1441 1442 def start(self): 1443 """Starts the handler. 1444 1445 Raises: 1446 MuxUnexpectedException: when the handler already started, or when 1447 opening handshake of the default channel fails. 1448 """ 1449 1450 if self._reader or self._writer: 1451 raise MuxUnexpectedException('MuxHandler already started') 1452 1453 self._reader = _PhysicalConnectionReader(self) 1454 self._writer = _PhysicalConnectionWriter(self) 1455 self._reader.start() 1456 self._writer.start() 1457 1458 # Create "Implicitly Opened Connection". 1459 logical_connection = _LogicalConnection(self, _DEFAULT_CHANNEL_ID) 1460 headers = copy.copy(self.original_request.headers_in) 1461 # Add extensions for logical channel. 1462 headers[common.SEC_WEBSOCKET_EXTENSIONS_HEADER] = ( 1463 common.format_extensions( 1464 self.original_request.mux_processor.extensions())) 1465 self._handshake_base = _HandshakeDeltaBase(headers) 1466 logical_request = _LogicalRequest( 1467 _DEFAULT_CHANNEL_ID, 1468 self.original_request.method, 1469 self.original_request.uri, 1470 self.original_request.protocol, 1471 self._handshake_base.create_headers(), 1472 logical_connection) 1473 # Client's send quota for the implicitly opened connection is zero, 1474 # but we will send FlowControl later so set the initial quota to 1475 # _INITIAL_QUOTA_FOR_CLIENT. 1476 self._channel_slots.append(_INITIAL_QUOTA_FOR_CLIENT) 1477 send_quota = self.original_request.mux_processor.quota() 1478 if not self._do_handshake_for_logical_request( 1479 logical_request, send_quota=send_quota): 1480 raise MuxUnexpectedException( 1481 'Failed handshake on the default channel id') 1482 self._add_logical_channel(logical_request) 1483 1484 # Send FlowControl for the implicitly opened connection. 1485 frame_data = _create_flow_control(_DEFAULT_CHANNEL_ID, 1486 _INITIAL_QUOTA_FOR_CLIENT) 1487 logical_request.connection.write_control_data(frame_data) 1488 1489 def add_channel_slots(self, slots, send_quota): 1490 """Adds channel slots. 1491 1492 Args: 1493 slots: number of slots to be added. 1494 send_quota: initial send quota for slots. 1495 """ 1496 1497 self._channel_slots.extend([send_quota] * slots) 1498 # Send NewChannelSlot to client. 1499 frame_data = _create_new_channel_slot(slots, send_quota) 1500 self.send_control_data(frame_data) 1501 1502 def wait_until_done(self, timeout=None): 1503 """Waits until all workers are done. Returns False when timeout has 1504 occurred. Returns True on success. 1505 1506 Args: 1507 timeout: timeout in sec. 1508 """ 1509 1510 self._logical_channels_condition.acquire() 1511 try: 1512 while len(self._logical_channels) > 0: 1513 self._logger.debug('Waiting workers(%d)...' % 1514 len(self._logical_channels)) 1515 self._worker_done_notify_received = False 1516 self._logical_channels_condition.wait(timeout) 1517 if not self._worker_done_notify_received: 1518 self._logger.debug('Waiting worker(s) timed out') 1519 return False 1520 finally: 1521 self._logical_channels_condition.release() 1522 1523 # Flush pending outgoing data 1524 self._writer.stop() 1525 self._writer.join() 1526 1527 return True 1528 1529 def notify_write_data_done(self, channel_id): 1530 """Called by the writer thread when a write operation has done. 1531 1532 Args: 1533 channel_id: objective channel id. 1534 """ 1535 1536 try: 1537 self._logical_channels_condition.acquire() 1538 if channel_id in self._logical_channels: 1539 channel_data = self._logical_channels[channel_id] 1540 channel_data.request.connection.on_write_data_done() 1541 else: 1542 self._logger.debug('Seems that logical channel for %d has gone' 1543 % channel_id) 1544 finally: 1545 self._logical_channels_condition.release() 1546 1547 def send_control_data(self, data): 1548 """Sends data via the control channel. 1549 1550 Args: 1551 data: data to be sent. 1552 """ 1553 1554 self._writer.put_outgoing_data(_OutgoingData( 1555 channel_id=_CONTROL_CHANNEL_ID, data=data)) 1556 1557 def send_data(self, channel_id, data): 1558 """Sends data via given logical channel. This method is called by 1559 worker threads. 1560 1561 Args: 1562 channel_id: objective channel id. 1563 data: data to be sent. 1564 """ 1565 1566 self._writer.put_outgoing_data(_OutgoingData( 1567 channel_id=channel_id, data=data)) 1568 1569 def _send_drop_channel(self, channel_id, code=None, message=''): 1570 frame_data = _create_drop_channel(channel_id, code, message) 1571 self._logger.debug( 1572 'Sending drop channel for channel id %d' % channel_id) 1573 self.send_control_data(frame_data) 1574 1575 def _send_error_add_channel_response(self, channel_id, status=None): 1576 if status is None: 1577 status = common.HTTP_STATUS_BAD_REQUEST 1578 1579 if status in _HTTP_BAD_RESPONSE_MESSAGES: 1580 message = _HTTP_BAD_RESPONSE_MESSAGES[status] 1581 else: 1582 self._logger.debug('Response message for %d is not found' % status) 1583 message = '???' 1584 1585 response = 'HTTP/1.1 %d %s\r\n\r\n' % (status, message) 1586 frame_data = _create_add_channel_response(channel_id, 1587 encoded_handshake=response, 1588 encoding=0, rejected=True) 1589 self.send_control_data(frame_data) 1590 1591 def _create_logical_request(self, block): 1592 if block.channel_id == _CONTROL_CHANNEL_ID: 1593 # TODO(bashi): Raise PhysicalConnectionError with code 2006 1594 # instead of MuxUnexpectedException. 1595 raise MuxUnexpectedException( 1596 'Received the control channel id (0) as objective channel ' 1597 'id for AddChannel') 1598 1599 if block.encoding > _HANDSHAKE_ENCODING_DELTA: 1600 raise PhysicalConnectionError( 1601 _DROP_CODE_UNKNOWN_REQUEST_ENCODING) 1602 1603 method, path, version, headers = _parse_request_text( 1604 block.encoded_handshake) 1605 if block.encoding == _HANDSHAKE_ENCODING_DELTA: 1606 headers = self._handshake_base.create_headers(headers) 1607 1608 connection = _LogicalConnection(self, block.channel_id) 1609 request = _LogicalRequest(block.channel_id, method, path, version, 1610 headers, connection) 1611 return request 1612 1613 def _do_handshake_for_logical_request(self, request, send_quota=0): 1614 try: 1615 receive_quota = self._channel_slots.popleft() 1616 except IndexError: 1617 raise LogicalChannelError( 1618 request.channel_id, _DROP_CODE_NEW_CHANNEL_SLOT_VIOLATION) 1619 1620 handshaker = _MuxHandshaker(request, self.dispatcher, 1621 send_quota, receive_quota) 1622 try: 1623 handshaker.do_handshake() 1624 except handshake.VersionException, e: 1625 self._logger.info('%s', e) 1626 self._send_error_add_channel_response( 1627 request.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1628 return False 1629 except handshake.HandshakeException, e: 1630 # TODO(bashi): Should we _Fail the Logical Channel_ with 3001 1631 # instead? 1632 self._logger.info('%s', e) 1633 self._send_error_add_channel_response(request.channel_id, 1634 status=e.status) 1635 return False 1636 except handshake.AbortedByUserException, e: 1637 self._logger.info('%s', e) 1638 self._send_error_add_channel_response(request.channel_id) 1639 return False 1640 1641 return True 1642 1643 def _add_logical_channel(self, logical_request): 1644 try: 1645 self._logical_channels_condition.acquire() 1646 if logical_request.channel_id in self._logical_channels: 1647 self._logger.debug('Channel id %d already exists' % 1648 logical_request.channel_id) 1649 raise PhysicalConnectionError( 1650 _DROP_CODE_CHANNEL_ALREADY_EXISTS, 1651 'Channel id %d already exists' % 1652 logical_request.channel_id) 1653 worker = _Worker(self, logical_request) 1654 channel_data = _LogicalChannelData(logical_request, worker) 1655 self._logical_channels[logical_request.channel_id] = channel_data 1656 worker.start() 1657 finally: 1658 self._logical_channels_condition.release() 1659 1660 def _process_add_channel_request(self, block): 1661 try: 1662 logical_request = self._create_logical_request(block) 1663 except ValueError, e: 1664 self._logger.debug('Failed to create logical request: %r' % e) 1665 self._send_error_add_channel_response( 1666 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1667 return 1668 if self._do_handshake_for_logical_request(logical_request): 1669 if block.encoding == _HANDSHAKE_ENCODING_IDENTITY: 1670 # Update handshake base. 1671 # TODO(bashi): Make sure this is the right place to update 1672 # handshake base. 1673 self._handshake_base = _HandshakeDeltaBase( 1674 logical_request.headers_in) 1675 self._add_logical_channel(logical_request) 1676 else: 1677 self._send_error_add_channel_response( 1678 block.channel_id, status=common.HTTP_STATUS_BAD_REQUEST) 1679 1680 def _process_flow_control(self, block): 1681 try: 1682 self._logical_channels_condition.acquire() 1683 if not block.channel_id in self._logical_channels: 1684 return 1685 channel_data = self._logical_channels[block.channel_id] 1686 channel_data.request.ws_stream.replenish_send_quota( 1687 block.send_quota) 1688 finally: 1689 self._logical_channels_condition.release() 1690 1691 def _process_drop_channel(self, block): 1692 self._logger.debug( 1693 'DropChannel received for %d: code=%r, reason=%r' % 1694 (block.channel_id, block.drop_code, block.drop_message)) 1695 try: 1696 self._logical_channels_condition.acquire() 1697 if not block.channel_id in self._logical_channels: 1698 return 1699 channel_data = self._logical_channels[block.channel_id] 1700 channel_data.drop_code = _DROP_CODE_ACKNOWLEDGED 1701 1702 # Close the logical channel 1703 channel_data.request.connection.set_read_state( 1704 _LogicalConnection.STATE_TERMINATED) 1705 channel_data.request.ws_stream.stop_sending() 1706 finally: 1707 self._logical_channels_condition.release() 1708 1709 def _process_control_blocks(self, parser): 1710 for control_block in parser.read_control_blocks(): 1711 opcode = control_block.opcode 1712 self._logger.debug('control block received, opcode: %d' % opcode) 1713 if opcode == _MUX_OPCODE_ADD_CHANNEL_REQUEST: 1714 self._process_add_channel_request(control_block) 1715 elif opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 1716 raise PhysicalConnectionError( 1717 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 1718 'Received AddChannelResponse') 1719 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 1720 self._process_flow_control(control_block) 1721 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 1722 self._process_drop_channel(control_block) 1723 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 1724 raise PhysicalConnectionError( 1725 _DROP_CODE_INVALID_MUX_CONTROL_BLOCK, 1726 'Received NewChannelSlot') 1727 else: 1728 raise MuxUnexpectedException( 1729 'Unexpected opcode %r' % opcode) 1730 1731 def _process_logical_frame(self, channel_id, parser): 1732 self._logger.debug('Received a frame. channel id=%d' % channel_id) 1733 try: 1734 self._logical_channels_condition.acquire() 1735 if not channel_id in self._logical_channels: 1736 # We must ignore the message for an inactive channel. 1737 return 1738 channel_data = self._logical_channels[channel_id] 1739 fin, rsv1, rsv2, rsv3, opcode, payload = parser.read_inner_frame() 1740 consuming_byte = len(payload) 1741 if opcode != common.OPCODE_CONTINUATION: 1742 consuming_byte += 1 1743 if not channel_data.request.ws_stream.consume_receive_quota( 1744 consuming_byte): 1745 # The client violates quota. Close logical channel. 1746 raise LogicalChannelError( 1747 channel_id, _DROP_CODE_SEND_QUOTA_VIOLATION) 1748 header = create_header(opcode, len(payload), fin, rsv1, rsv2, rsv3, 1749 mask=False) 1750 frame_data = header + payload 1751 channel_data.request.connection.append_frame_data(frame_data) 1752 finally: 1753 self._logical_channels_condition.release() 1754 1755 def dispatch_message(self, message): 1756 """Dispatches message. The reader thread calls this method. 1757 1758 Args: 1759 message: a message that contains encapsulated frame. 1760 Raises: 1761 PhysicalConnectionError: if the message contains physical 1762 connection level errors. 1763 LogicalChannelError: if the message contains logical channel 1764 level errors. 1765 """ 1766 1767 parser = _MuxFramePayloadParser(message) 1768 try: 1769 channel_id = parser.read_channel_id() 1770 except ValueError, e: 1771 raise PhysicalConnectionError(_DROP_CODE_CHANNEL_ID_TRUNCATED) 1772 if channel_id == _CONTROL_CHANNEL_ID: 1773 self._process_control_blocks(parser) 1774 else: 1775 self._process_logical_frame(channel_id, parser) 1776 1777 def notify_worker_done(self, channel_id): 1778 """Called when a worker has finished. 1779 1780 Args: 1781 channel_id: channel id corresponded with the worker. 1782 """ 1783 1784 self._logger.debug('Worker for channel id %d terminated' % channel_id) 1785 try: 1786 self._logical_channels_condition.acquire() 1787 if not channel_id in self._logical_channels: 1788 raise MuxUnexpectedException( 1789 'Channel id %d not found' % channel_id) 1790 channel_data = self._logical_channels.pop(channel_id) 1791 finally: 1792 self._worker_done_notify_received = True 1793 self._logical_channels_condition.notify() 1794 self._logical_channels_condition.release() 1795 1796 if not channel_data.request.server_terminated: 1797 self._send_drop_channel( 1798 channel_id, code=channel_data.drop_code, 1799 message=channel_data.drop_message) 1800 1801 def notify_reader_done(self): 1802 """This method is called by the reader thread when the reader has 1803 finished. 1804 """ 1805 1806 self._logger.debug( 1807 'Termiating all logical connections waiting for incoming data ' 1808 '...') 1809 self._logical_channels_condition.acquire() 1810 for channel_data in self._logical_channels.values(): 1811 try: 1812 channel_data.request.connection.set_read_state( 1813 _LogicalConnection.STATE_TERMINATED) 1814 except Exception: 1815 self._logger.debug(traceback.format_exc()) 1816 self._logical_channels_condition.release() 1817 1818 def notify_writer_done(self): 1819 """This method is called by the writer thread when the writer has 1820 finished. 1821 """ 1822 1823 self._logger.debug( 1824 'Termiating all logical connections waiting for write ' 1825 'completion ...') 1826 self._logical_channels_condition.acquire() 1827 for channel_data in self._logical_channels.values(): 1828 try: 1829 channel_data.request.connection.on_writer_done() 1830 except Exception: 1831 self._logger.debug(traceback.format_exc()) 1832 self._logical_channels_condition.release() 1833 1834 def fail_physical_connection(self, code, message): 1835 """Fail the physical connection. 1836 1837 Args: 1838 code: drop reason code. 1839 message: drop message. 1840 """ 1841 1842 self._logger.debug('Failing the physical connection...') 1843 self._send_drop_channel(_CONTROL_CHANNEL_ID, code, message) 1844 self._writer.stop(common.STATUS_INTERNAL_ENDPOINT_ERROR) 1845 1846 def fail_logical_channel(self, channel_id, code, message): 1847 """Fail a logical channel. 1848 1849 Args: 1850 channel_id: channel id. 1851 code: drop reason code. 1852 message: drop message. 1853 """ 1854 1855 self._logger.debug('Failing logical channel %d...' % channel_id) 1856 try: 1857 self._logical_channels_condition.acquire() 1858 if channel_id in self._logical_channels: 1859 channel_data = self._logical_channels[channel_id] 1860 # Close the logical channel. notify_worker_done() will be 1861 # called later and it will send DropChannel. 1862 channel_data.drop_code = code 1863 channel_data.drop_message = message 1864 1865 channel_data.request.connection.set_read_state( 1866 _LogicalConnection.STATE_TERMINATED) 1867 channel_data.request.ws_stream.stop_sending() 1868 else: 1869 self._send_drop_channel(channel_id, code, message) 1870 finally: 1871 self._logical_channels_condition.release() 1872 1873 1874 def use_mux(request): 1875 return hasattr(request, 'mux_processor') and ( 1876 request.mux_processor.is_active()) 1877 1878 1879 def start(request, dispatcher): 1880 mux_handler = _MuxHandler(request, dispatcher) 1881 mux_handler.start() 1882 1883 mux_handler.add_channel_slots(_INITIAL_NUMBER_OF_CHANNEL_SLOTS, 1884 _INITIAL_QUOTA_FOR_CLIENT) 1885 1886 mux_handler.wait_until_done() 1887 1888 1889 # vi:sts=4 sw=4 et 1890