1 # Copyright 2015 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 import logging 6 7 import common 8 from autotest_lib.client.bin import utils 9 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_channel 10 from autotest_lib.client.cros.cellular.mbim_compliance \ 11 import mbim_command_message 12 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors 13 from autotest_lib.client.cros.cellular.mbim_compliance \ 14 import mbim_message_request 15 from autotest_lib.client.cros.cellular.mbim_compliance \ 16 import mbim_message_response 17 from autotest_lib.client.cros.cellular.mbim_compliance \ 18 import mbim_test_base 19 from autotest_lib.client.cros.cellular.mbim_compliance.sequences \ 20 import get_descriptors_sequence 21 from autotest_lib.client.cros.cellular.mbim_compliance.sequences \ 22 import mbim_open_generic_sequence 23 24 25 class cellular_MbimComplianceCM16(mbim_test_base.MbimTestBase): 26 """ 27 CM_16 Validation of fragmented message transmission in case of multiple 28 fragmented messages. 29 30 This test verifies that fragmented messages sent from the function are not 31 intermixed. Note that this test is only applicable for devices that support 32 multiple outstanding commands. 33 34 Reference: 35 [1] Universal Serial Bus Communication Class MBIM Compliance Testing: 44 36 http://www.usb.org/developers/docs/devclass_docs/MBIM-Compliance-1.0.pdf 37 """ 38 version = 1 39 40 def run_internal(self): 41 """ Run CM_16 test. """ 42 # Precondition 43 desc_sequence = get_descriptors_sequence.GetDescriptorsSequence( 44 self.device_context) 45 descriptors = desc_sequence.run() 46 self.device_context.update_descriptor_cache(descriptors) 47 open_sequence = mbim_open_generic_sequence.MBIMOpenGenericSequence( 48 self.device_context) 49 open_sequence.run(max_control_transfer_size=64) 50 51 device_context = self.device_context 52 descriptor_cache = device_context.descriptor_cache 53 self.channel = mbim_channel.MBIMChannel( 54 device_context.device, 55 descriptor_cache.mbim_communication_interface.bInterfaceNumber, 56 descriptor_cache.interrupt_endpoint.bEndpointAddress, 57 device_context.max_control_transfer_size) 58 59 # Step 1 60 caps_command_message = mbim_command_message.MBIMDeviceCapsQuery() 61 caps_packets = mbim_message_request.generate_request_packets( 62 caps_command_message, 63 device_context.max_control_transfer_size) 64 self.caps_transaction_id = caps_command_message.transaction_id 65 66 # Step 2 67 services_command_message = ( 68 mbim_command_message.MBIMDeviceServicesQuery()) 69 services_packets = mbim_message_request.generate_request_packets( 70 services_command_message, 71 device_context.max_control_transfer_size) 72 self.services_transaction_id = services_command_message.transaction_id 73 74 # Transmit the messages now 75 self.channel.unidirectional_transaction(*caps_packets) 76 self.channel.unidirectional_transaction(*services_packets) 77 78 # Step 3 79 utils.poll_for_condition( 80 self._get_response_packets, 81 timeout=5, 82 exception=mbim_errors.MBIMComplianceChannelError( 83 'Failed to retrieve the response packets to specific ' 84 'control messages.')) 85 self.channel.close() 86 87 caps_response_message = self.caps_response 88 services_response_message = self.services_response 89 is_caps_message_valid = isinstance( 90 caps_response_message, 91 mbim_command_message.MBIMDeviceCapsInfo) 92 is_services_message_valid = isinstance( 93 services_response_message, 94 mbim_command_message.MBIMDeviceServicesInfo) 95 if not ((is_caps_message_valid and is_services_message_valid) and 96 (caps_response_message.transaction_id == 97 caps_command_message.transaction_id) and 98 (caps_response_message.device_service_id == 99 caps_command_message.device_service_id) and 100 caps_response_message.cid == caps_command_message.cid and 101 (services_command_message.transaction_id == 102 services_response_message.transaction_id) and 103 (services_command_message.device_service_id == 104 services_response_message.device_service_id) and 105 services_command_message.cid == services_response_message.cid): 106 mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError, 107 'mbim1.0:9.5#1') 108 109 110 def _get_response_packets(self): 111 """ 112 Condition method for |poll_for_condition| to check the retrieval of 113 target packets. 114 115 @returns True if both caps response packet and services response packet 116 are received, False otherwise. 117 118 """ 119 try: 120 packets = self.channel.get_outstanding_packets() 121 except mbim_errors.MBIMComplianceChannelError: 122 logging.debug("Error in receiving response fragments from the device") 123 mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError, 124 'mbim1.0:9.5#1') 125 self.caps_response = None 126 self.services_response = None 127 for packet in packets: 128 try: 129 message_response = mbim_message_response.parse_response_packets( 130 packet) 131 except mbim_errors.MBIMComplianceControlMessageError: 132 logging.debug("Error in parsing response fragments from the device") 133 mbim_errors.log_and_raise( 134 mbim_errors.MBIMComplianceAssertionError, 135 'mbim1.0:9.5#1') 136 if message_response.transaction_id == self.caps_transaction_id: 137 self.caps_response = message_response 138 elif (message_response.transaction_id == 139 self.services_transaction_id): 140 self.services_response = message_response 141 if self.caps_response and self.services_response: 142 return True 143 return False 144