1 #!/usr/bin/env python 2 # 3 # Copyright 2012, Google Inc. 4 # All rights reserved. 5 # 6 # Redistribution and use in source and binary forms, with or without 7 # modification, are permitted provided that the following conditions are 8 # met: 9 # 10 # * Redistributions of source code must retain the above copyright 11 # notice, this list of conditions and the following disclaimer. 12 # * Redistributions in binary form must reproduce the above 13 # copyright notice, this list of conditions and the following disclaimer 14 # in the documentation and/or other materials provided with the 15 # distribution. 16 # * Neither the name of Google Inc. nor the names of its 17 # contributors may be used to endorse or promote products derived from 18 # this software without specific prior written permission. 19 # 20 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 21 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 22 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 23 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 24 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 25 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 26 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 27 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 28 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 29 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 30 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 31 32 33 """WebSocket client utility for testing mux extension. 34 35 This code should be independent from mod_pywebsocket. See the comment of 36 client_for_testing.py. 37 38 NOTE: This code is far from robust like client_for_testing.py. 39 """ 40 41 42 43 import Queue 44 import base64 45 import collections 46 import email 47 import email.parser 48 import logging 49 import math 50 import os 51 import random 52 import socket 53 import struct 54 import threading 55 56 from mod_pywebsocket import util 57 58 from test import client_for_testing 59 60 61 _CONTROL_CHANNEL_ID = 0 62 _DEFAULT_CHANNEL_ID = 1 63 64 _MUX_OPCODE_ADD_CHANNEL_REQUEST = 0 65 _MUX_OPCODE_ADD_CHANNEL_RESPONSE = 1 66 _MUX_OPCODE_FLOW_CONTROL = 2 67 _MUX_OPCODE_DROP_CHANNEL = 3 68 _MUX_OPCODE_NEW_CHANNEL_SLOT = 4 69 70 71 class _ControlBlock: 72 def __init__(self, opcode): 73 self.opcode = opcode 74 75 76 def _parse_handshake_response(response): 77 status_line, header_lines = response.split('\r\n', 1) 78 79 words = status_line.split(' ') 80 if len(words) < 3: 81 raise ValueError('Bad Status-Line syntax %r' % status_line) 82 [version, response_code] = words[:2] 83 if version != 'HTTP/1.1': 84 raise ValueError('Bad response version %r' % version) 85 86 if response_code != '101': 87 raise ValueError('Bad response code %r ' % response_code) 88 headers = email.parser.Parser().parsestr(header_lines) 89 return headers 90 91 92 def _parse_channel_id(data, offset=0): 93 length = len(data) 94 remaining = length - offset 95 96 if remaining <= 0: 97 raise Exception('No channel id found') 98 99 channel_id = ord(data[offset]) 100 channel_id_length = 1 101 if channel_id & 0xe0 == 0xe0: 102 if remaining < 4: 103 raise Exception('Invalid channel id format') 104 channel_id = struct.unpack('!L', 105 data[offset:offset+4])[0] & 0x1fffffff 106 channel_id_length = 4 107 elif channel_id & 0xc0 == 0xc0: 108 if remaining < 3: 109 raise Exception('Invalid channel id format') 110 channel_id = (((channel_id & 0x1f) << 16) + 111 struct.unpack('!H', data[offset+1:offset+3])[0]) 112 channel_id_length = 3 113 elif channel_id & 0x80 == 0x80: 114 if remaining < 2: 115 raise Exception('Invalid channel id format') 116 channel_id = struct.unpack('!H', data[offset:offset+2])[0] & 0x3fff 117 channel_id_length = 2 118 119 return channel_id, channel_id_length 120 121 122 def _read_number(data, size_of_size, offset=0): 123 if size_of_size == 1: 124 return ord(data[offset]) 125 elif size_of_size == 2: 126 return struct.unpack('!H', data[offset:offset+2])[0] 127 elif size_of_size == 3: 128 return ((ord(data[offset]) << 16) 129 + struct.unpack('!H', data[offset+1:offset+3])[0]) 130 elif size_of_size == 4: 131 return struct.unpack('!L', data[offset:offset+4])[0] 132 else: 133 raise Exception('Invalid "size of size" in control block') 134 135 136 def _parse_control_block_specific_data(data, size_of_size, offset=0): 137 remaining = len(data) - offset 138 if remaining < size_of_size: 139 raise Exception('Invalid control block received') 140 141 size = _read_number(data, size_of_size, offset) 142 143 start_position = offset + size_of_size 144 end_position = start_position + size 145 if len(data) < end_position: 146 raise Exception('Invalid size of control block (%d < %d)' % ( 147 len(data), end_position)) 148 return data[start_position:end_position], size_of_size + size 149 150 151 def _parse_control_blocks(data): 152 blocks = [] 153 length = len(data) 154 pos = 0 155 156 while pos < length: 157 first_byte = ord(data[pos]) 158 pos += 1 159 opcode = (first_byte >> 5) & 0x7 160 block = _ControlBlock(opcode) 161 162 # TODO(bashi): Support more opcode 163 if opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 164 block.encode = (first_byte >> 2) & 3 165 block.rejected = (first_byte >> 4) & 1 166 167 channel_id, advance = _parse_channel_id(data, pos) 168 block.channel_id = channel_id 169 pos += advance 170 171 size_of_size = (first_byte & 3) + 1 172 encoded_handshake, advance = _parse_control_block_specific_data( 173 data, size_of_size, pos) 174 block.encoded_handshake = encoded_handshake 175 pos += advance 176 blocks.append(block) 177 elif opcode == _MUX_OPCODE_DROP_CHANNEL: 178 block.mux_error = (first_byte >> 4) & 1 179 180 channel_id, channel_id_length = _parse_channel_id(data, pos) 181 block.channel_id = channel_id 182 pos += channel_id_length 183 184 size_of_size = first_byte & 3 185 reason, size = _parse_control_block_specific_data( 186 data, size_of_size, pos) 187 block.reason = reason 188 pos += size 189 blocks.append(block) 190 elif opcode == _MUX_OPCODE_FLOW_CONTROL: 191 channel_id, advance = _parse_channel_id(data, pos) 192 block.channel_id = channel_id 193 pos += advance 194 size_of_quota = (first_byte & 3) + 1 195 block.send_quota = _read_number(data, size_of_quota, pos) 196 pos += size_of_quota 197 blocks.append(block) 198 elif opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 199 size_of_slots = ((first_byte >> 2) & 3) + 1 200 size_of_quota = (first_byte & 3) + 1 201 block.slots = _read_number(data, size_of_slots, pos) 202 pos += size_of_slots 203 block.send_quota = _read_number(data, size_of_quota, pos) 204 pos += size_of_quota 205 blocks.append(block) 206 else: 207 raise Exception( 208 'Unsupported mux opcode %d received' % opcode) 209 210 return blocks 211 212 213 def _encode_channel_id(channel_id): 214 if channel_id < 0: 215 raise ValueError('Channel id %d must not be negative' % channel_id) 216 217 if channel_id < 2 ** 7: 218 return chr(channel_id) 219 if channel_id < 2 ** 14: 220 return struct.pack('!H', 0x8000 + channel_id) 221 if channel_id < 2 ** 21: 222 first = chr(0xc0 + (channel_id >> 16)) 223 return first + struct.pack('!H', channel_id & 0xffff) 224 if channel_id < 2 ** 29: 225 return struct.pack('!L', 0xe0000000 + channel_id) 226 227 raise ValueError('Channel id %d is too large' % channel_id) 228 229 230 def _size_of_number_in_bytes_minus_1(number): 231 # Calculate the minimum number of bytes minus 1 that are required to store 232 # the data. 233 if number < 0: 234 raise ValueError('Invalid number: %d' % number) 235 elif number < 2 ** 8: 236 return 0 237 elif number < 2 ** 16: 238 return 1 239 elif number < 2 ** 24: 240 return 2 241 elif number < 2 ** 32: 242 return 3 243 else: 244 raise ValueError('Invalid number %d' % number) 245 246 247 def _encode_number(number): 248 if number < 2 ** 8: 249 return chr(number) 250 elif number < 2 ** 16: 251 return struct.pack('!H', number) 252 elif number < 2 ** 24: 253 return chr(number >> 16) + struct.pack('!H', number & 0xffff) 254 else: 255 return struct.pack('!L', number) 256 257 258 def _create_add_channel_request(channel_id, encoded_handshake, 259 encoding=0): 260 length = len(encoded_handshake) 261 size_of_length = _size_of_number_in_bytes_minus_1(length) 262 263 first_byte = ((_MUX_OPCODE_ADD_CHANNEL_REQUEST << 5) | (encoding << 2) | 264 size_of_length) 265 encoded_length = _encode_number(length) 266 267 return (chr(first_byte) + _encode_channel_id(channel_id) + 268 encoded_length + encoded_handshake) 269 270 271 def _create_flow_control(channel_id, replenished_quota): 272 size_of_quota = _size_of_number_in_bytes_minus_1(replenished_quota) 273 first_byte = ((_MUX_OPCODE_FLOW_CONTROL << 5) | size_of_quota) 274 return (chr(first_byte) + _encode_channel_id(channel_id) + 275 _encode_number(replenished_quota)) 276 277 278 class _MuxReaderThread(threading.Thread): 279 """Mux reader thread. 280 281 Reads frames and passes them to the mux client. This thread accesses 282 private functions/variables of the mux client. 283 """ 284 285 def __init__(self, mux): 286 threading.Thread.__init__(self) 287 self.setDaemon(True) 288 self._mux = mux 289 self._stop_requested = False 290 291 def _receive_message(self): 292 first_opcode = None 293 pending_payload = [] 294 while not self._stop_requested: 295 fin, rsv1, rsv2, rsv3, opcode, payload_length = ( 296 client_for_testing.read_frame_header(self._mux._socket)) 297 298 if not first_opcode: 299 if opcode == client_for_testing.OPCODE_TEXT: 300 raise Exception('Received a text message on physical ' 301 'connection') 302 if opcode == client_for_testing.OPCODE_CONTINUATION: 303 raise Exception('Received an intermediate frame but ' 304 'fragmentation was not started') 305 if (opcode == client_for_testing.OPCODE_BINARY or 306 opcode == client_for_testing.OPCODE_PONG or 307 opcode == client_for_testing.OPCODE_PONG or 308 opcode == client_for_testing.OPCODE_CLOSE): 309 first_opcode = opcode 310 else: 311 raise Exception('Received an undefined opcode frame: %d' % 312 opcode) 313 314 elif opcode != client_for_testing.OPCODE_CONTINUATION: 315 raise Exception('Received a new opcode before ' 316 'terminating fragmentation') 317 318 payload = client_for_testing.receive_bytes( 319 self._mux._socket, payload_length) 320 321 if self._mux._incoming_frame_filter is not None: 322 payload = self._mux._incoming_frame_filter.filter(payload) 323 324 pending_payload.append(payload) 325 326 if fin: 327 break 328 329 if self._stop_requested: 330 return None, None 331 332 message = ''.join(pending_payload) 333 return first_opcode, message 334 335 def request_stop(self): 336 self._stop_requested = True 337 338 def run(self): 339 try: 340 while not self._stop_requested: 341 # opcode is OPCODE_BINARY or control opcodes when a message 342 # is succesfully received. 343 opcode, message = self._receive_message() 344 if not opcode: 345 return 346 if opcode == client_for_testing.OPCODE_BINARY: 347 channel_id, advance = _parse_channel_id(message) 348 self._mux._dispatch_frame(channel_id, message[advance:]) 349 else: 350 self._mux._process_control_message(opcode, message) 351 finally: 352 self._mux._notify_reader_done() 353 354 355 class _InnerFrame(object): 356 def __init__(self, fin, rsv1, rsv2, rsv3, opcode, payload): 357 self.fin = fin 358 self.rsv1 = rsv1 359 self.rsv2 = rsv2 360 self.rsv3 = rsv3 361 self.opcode = opcode 362 self.payload = payload 363 364 365 class _LogicalChannelData(object): 366 def __init__(self): 367 self.queue = Queue.Queue() 368 self.send_quota = 0 369 self.receive_quota = 0 370 371 372 class MuxClient(object): 373 """WebSocket mux client. 374 375 Note that this class is NOT thread-safe. Do not access an instance of this 376 class from multiple threads at a same time. 377 """ 378 379 def __init__(self, options): 380 self._logger = util.get_class_logger(self) 381 382 self._options = options 383 self._options.enable_mux() 384 self._stream = None 385 self._socket = None 386 self._handshake = client_for_testing.WebSocketHandshake(self._options) 387 self._incoming_frame_filter = None 388 self._outgoing_frame_filter = None 389 390 self._is_active = False 391 self._read_thread = None 392 self._control_blocks_condition = threading.Condition() 393 self._control_blocks = [] 394 self._channel_slots = collections.deque() 395 self._logical_channels_condition = threading.Condition(); 396 self._logical_channels = {} 397 self._timeout = 2 398 self._physical_connection_close_event = None 399 self._physical_connection_close_message = None 400 401 def _parse_inner_frame(self, data): 402 if len(data) == 0: 403 raise Exception('Invalid encapsulated frame received') 404 405 first_byte = ord(data[0]) 406 fin = (first_byte << 7) & 1 407 rsv1 = (first_byte << 6) & 1 408 rsv2 = (first_byte << 5) & 1 409 rsv3 = (first_byte << 4) & 1 410 opcode = first_byte & 0xf 411 412 if self._outgoing_frame_filter: 413 payload = self._outgoing_frame_filter.filter( 414 data[1:]) 415 else: 416 payload = data[1:] 417 418 return _InnerFrame(fin, rsv1, rsv2, rsv3, opcode, payload) 419 420 def _process_mux_control_blocks(self): 421 for block in self._control_blocks: 422 if block.opcode == _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 423 # AddChannelResponse will be handled in add_channel(). 424 continue 425 elif block.opcode == _MUX_OPCODE_FLOW_CONTROL: 426 try: 427 self._logical_channels_condition.acquire() 428 if not block.channel_id in self._logical_channels: 429 raise Exception('Invalid flow control received for ' 430 'channel id %d' % block.channel_id) 431 self._logical_channels[block.channel_id].send_quota += ( 432 block.send_quota) 433 self._logical_channels_condition.notify() 434 finally: 435 self._logical_channels_condition.release() 436 elif block.opcode == _MUX_OPCODE_NEW_CHANNEL_SLOT: 437 self._channel_slots.extend([block.send_quota] * block.slots) 438 439 def _dispatch_frame(self, channel_id, payload): 440 if channel_id == _CONTROL_CHANNEL_ID: 441 try: 442 self._control_blocks_condition.acquire() 443 self._control_blocks += _parse_control_blocks(payload) 444 self._process_mux_control_blocks() 445 self._control_blocks_condition.notify() 446 finally: 447 self._control_blocks_condition.release() 448 else: 449 try: 450 self._logical_channels_condition.acquire() 451 if not channel_id in self._logical_channels: 452 raise Exception('Received logical frame on channel id ' 453 '%d, which is not established' % 454 channel_id) 455 456 inner_frame = self._parse_inner_frame(payload) 457 self._logical_channels[channel_id].receive_quota -= ( 458 len(inner_frame.payload)) 459 if self._logical_channels[channel_id].receive_quota < 0: 460 raise Exception('The server violates quota on ' 461 'channel id %d' % channel_id) 462 finally: 463 self._logical_channels_condition.release() 464 self._logical_channels[channel_id].queue.put(inner_frame) 465 466 def _process_control_message(self, opcode, message): 467 # Ping/Pong are not supported. 468 if opcode == client_for_testing.OPCODE_CLOSE: 469 self._physical_connection_close_message = message 470 if self._is_active: 471 self._stream.send_close( 472 code=client_for_testing.STATUS_NORMAL_CLOSURE, reason='') 473 self._read_thread.request_stop() 474 475 if self._physical_connection_close_event: 476 self._physical_connection_close_event.set() 477 478 def _notify_reader_done(self): 479 self._logger.debug('Read thread terminated.') 480 self.close_socket() 481 482 def _assert_channel_slot_available(self): 483 try: 484 self._control_blocks_condition.acquire() 485 if len(self._channel_slots) == 0: 486 # Wait once 487 self._control_blocks_condition.wait(timeout=self._timeout) 488 finally: 489 self._control_blocks_condition.release() 490 491 if len(self._channel_slots) == 0: 492 raise Exception('Failed to receive NewChannelSlot') 493 494 def _assert_send_quota_available(self, channel_id): 495 try: 496 self._logical_channels_condition.acquire() 497 if self._logical_channels[channel_id].send_quota == 0: 498 # Wait once 499 self._logical_channels_condition.wait(timeout=self._timeout) 500 finally: 501 self._logical_channels_condition.release() 502 503 if self._logical_channels[channel_id].send_quota == 0: 504 raise Exception('Failed to receive FlowControl for channel id %d' % 505 channel_id) 506 507 def connect(self): 508 self._socket = socket.socket() 509 self._socket.settimeout(self._options.socket_timeout) 510 511 self._socket.connect((self._options.server_host, 512 self._options.server_port)) 513 if self._options.use_tls: 514 self._socket = _TLSSocket(self._socket) 515 516 self._handshake.handshake(self._socket) 517 self._stream = client_for_testing.WebSocketStream( 518 self._socket, self._handshake) 519 520 self._logical_channels[_DEFAULT_CHANNEL_ID] = _LogicalChannelData() 521 522 self._read_thread = _MuxReaderThread(self) 523 self._read_thread.start() 524 525 self._assert_channel_slot_available() 526 self._assert_send_quota_available(_DEFAULT_CHANNEL_ID) 527 528 self._is_active = True 529 self._logger.info('Connection established') 530 531 def add_channel(self, channel_id, options): 532 if not self._is_active: 533 raise Exception('Mux client is not active') 534 535 if channel_id in self._logical_channels: 536 raise Exception('Channel id %d already exists' % channel_id) 537 538 try: 539 send_quota = self._channel_slots.popleft() 540 except IndexError, e: 541 raise Exception('No channel slots: %r' % e) 542 543 # Create AddChannel request 544 request_line = 'GET %s HTTP/1.1\r\n' % options.resource 545 fields = [] 546 fields.append('Upgrade: websocket\r\n') 547 fields.append('Connection: Upgrade\r\n') 548 if options.server_port == client_for_testing.DEFAULT_PORT: 549 fields.append('Host: %s\r\n' % options.server_host.lower()) 550 else: 551 fields.append('Host: %s:%d\r\n' % (options.server_host.lower(), 552 options.server_port)) 553 fields.append('Origin: %s\r\n' % options.origin.lower()) 554 555 original_key = os.urandom(16) 556 key = base64.b64encode(original_key) 557 fields.append('Sec-WebSocket-Key: %s\r\n' % key) 558 559 fields.append('Sec-WebSocket-Version: 13\r\n') 560 561 if len(options.extensions) > 0: 562 fields.append('Sec-WebSocket-Extensions: %s\r\n' % 563 ', '.join(options.extensions)) 564 565 handshake = request_line + ''.join(fields) + '\r\n' 566 add_channel_request = _create_add_channel_request( 567 channel_id, handshake) 568 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + add_channel_request 569 self._stream.send_binary(payload) 570 571 # Wait AddChannelResponse 572 self._logger.debug('Waiting AddChannelResponse for the request...') 573 response = None 574 try: 575 self._control_blocks_condition.acquire() 576 while True: 577 for block in self._control_blocks: 578 if block.opcode != _MUX_OPCODE_ADD_CHANNEL_RESPONSE: 579 continue 580 if block.channel_id == channel_id: 581 response = block 582 self._control_blocks.remove(response) 583 break 584 if response: 585 break 586 self._control_blocks_condition.wait(self._timeout) 587 if not self._is_active: 588 raise Exception('AddChannelRequest timed out') 589 finally: 590 self._control_blocks_condition.release() 591 592 # Validate AddChannelResponse 593 if response.rejected: 594 raise Exception('The server rejected AddChannelRequest') 595 596 fields = _parse_handshake_response(response.encoded_handshake) 597 598 if not 'upgrade' in fields: 599 raise Exception('No Upgrade header') 600 if fields['upgrade'] != 'websocket': 601 raise Exception('Wrong Upgrade header') 602 if not 'connection' in fields: 603 raise Exception('No Connection header') 604 if fields['connection'] != 'Upgrade': 605 raise Exception('Wrong Connection header') 606 if not 'sec-websocket-accept' in fields: 607 raise Exception('No Sec-WebSocket-Accept header') 608 609 accept = fields['sec-websocket-accept'] 610 try: 611 decoded_accept = base64.b64decode(accept) 612 except TypeError, e: 613 raise Exception( 614 'Illegal value for header Sec-WebSocket-Accept: ' + accept) 615 616 if len(decoded_accept) != 20: 617 raise Exception( 618 'Decoded value of Sec-WebSocket-Accept is not 20-byte long') 619 620 original_expected_accept = util.sha1_hash( 621 key + client_for_testing.WEBSOCKET_ACCEPT_UUID).digest() 622 expected_accept = base64.b64encode(original_expected_accept) 623 624 if accept != expected_accept: 625 raise Exception( 626 'Invalid Sec-WebSocket-Accept header: %r (expected) != %r ' 627 '(actual)' % (accept, expected_accept)) 628 629 self._logical_channels_condition.acquire() 630 self._logical_channels[channel_id] = _LogicalChannelData() 631 self._logical_channels[channel_id].send_quota = send_quota 632 self._logical_channels_condition.release() 633 634 self._logger.debug('Logical channel %d established' % channel_id) 635 636 def _check_logical_channel_is_opened(self, channel_id): 637 if not self._is_active: 638 raise Exception('Mux client is not active') 639 640 if not channel_id in self._logical_channels: 641 raise Exception('Logical channel %d is not established.') 642 643 def drop_channel(self, channel_id): 644 # TODO(bashi): Implement 645 pass 646 647 def send_flow_control(self, channel_id, replenished_quota): 648 self._check_logical_channel_is_opened(channel_id) 649 flow_control = _create_flow_control(channel_id, replenished_quota) 650 payload = _encode_channel_id(_CONTROL_CHANNEL_ID) + flow_control 651 # Replenish receive quota 652 try: 653 self._logical_channels_condition.acquire() 654 self._logical_channels[channel_id].receive_quota += ( 655 replenished_quota) 656 finally: 657 self._logical_channels_condition.release() 658 self._stream.send_binary(payload) 659 660 def send_message(self, channel_id, message, end=True, binary=False): 661 self._check_logical_channel_is_opened(channel_id) 662 663 if binary: 664 first_byte = (end << 7) | client_for_testing.OPCODE_BINARY 665 else: 666 first_byte = (end << 7) | client_for_testing.OPCODE_TEXT 667 message = message.encode('utf-8') 668 669 try: 670 self._logical_channels_condition.acquire() 671 if self._logical_channels[channel_id].send_quota < len(message): 672 raise Exception('Send quota violation: %d < %d' % ( 673 self._logical_channels[channel_id].send_quota, 674 len(message))) 675 676 self._logical_channels[channel_id].send_quota -= len(message) 677 finally: 678 self._logical_channels_condition.release() 679 payload = _encode_channel_id(channel_id) + chr(first_byte) + message 680 self._stream.send_binary(payload) 681 682 def assert_receive(self, channel_id, payload, binary=False): 683 self._check_logical_channel_is_opened(channel_id) 684 685 try: 686 inner_frame = self._logical_channels[channel_id].queue.get( 687 timeout=self._timeout) 688 except Queue.Empty, e: 689 raise Exception('Cannot receive message from channel id %d' % 690 channel_id) 691 692 if binary: 693 opcode = client_for_testing.OPCODE_BINARY 694 else: 695 opcode = client_for_testing.OPCODE_TEXT 696 697 if inner_frame.opcode != opcode: 698 raise Exception('Unexpected opcode received (%r != %r)' % 699 (expected_opcode, inner_frame.opcode)) 700 701 if inner_frame.payload != payload: 702 raise Exception('Unexpected payload received') 703 704 def send_close(self, channel_id, code=None, reason=''): 705 self._check_logical_channel_is_opened(channel_id) 706 707 if code is not None: 708 body = struct.pack('!H', code) + reason.encode('utf-8') 709 else: 710 body = '' 711 712 first_byte = (1 << 7) | client_for_testing.OPCODE_CLOSE 713 payload = _encode_channel_id(channel_id) + chr(first_byte) + body 714 self._stream.send_binary(payload) 715 716 def assert_receive_close(self, channel_id): 717 self._check_logical_channel_is_opened(channel_id) 718 719 try: 720 inner_frame = self._logical_channels[channel_id].queue.get( 721 timeout=self._timeout) 722 except Queue.Empty, e: 723 raise Exception('Cannot receive message from channel id %d' % 724 channel_id) 725 if inner_frame.opcode != client_for_testing.OPCODE_CLOSE: 726 raise Exception('Didn\'t receive close frame') 727 728 def send_physical_connection_close(self, code=None, reason=''): 729 self._physical_connection_close_event = threading.Event() 730 self._stream.send_close(code, reason) 731 732 # This method can be used only after calling 733 # send_physical_connection_close(). 734 def assert_physical_connection_receive_close( 735 self, code=client_for_testing.STATUS_NORMAL_CLOSURE, reason=''): 736 self._physical_connection_close_event.wait(timeout=self._timeout) 737 if (not self._physical_connection_close_event.isSet() or 738 not self._physical_connection_close_message): 739 raise Exception('Didn\'t receive closing handshake') 740 741 def close_socket(self): 742 self._is_active = False 743 self._socket.close() 744