Home | History | Annotate | Download | only in cellular_MbimComplianceControlRequest
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 
      7 import common
      8 from autotest_lib.client.bin import utils
      9 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_channel
     10 from autotest_lib.client.cros.cellular.mbim_compliance \
     11         import mbim_command_message
     12 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
     13 from autotest_lib.client.cros.cellular.mbim_compliance \
     14         import mbim_message_request
     15 from autotest_lib.client.cros.cellular.mbim_compliance \
     16         import mbim_message_response
     17 from autotest_lib.client.cros.cellular.mbim_compliance \
     18         import mbim_test_base
     19 from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
     20         import get_descriptors_sequence
     21 from autotest_lib.client.cros.cellular.mbim_compliance.sequences \
     22         import mbim_open_generic_sequence
     23 
     24 
     25 class cellular_MbimComplianceCM16(mbim_test_base.MbimTestBase):
     26     """
     27     CM_16 Validation of fragmented message transmission in case of multiple
     28     fragmented messages.
     29 
     30     This test verifies that fragmented messages sent from the function are not
     31     intermixed. Note that this test is only applicable for devices that support
     32     multiple outstanding commands.
     33 
     34     Reference:
     35         [1] Universal Serial Bus Communication Class MBIM Compliance Testing: 44
     36         http://www.usb.org/developers/docs/devclass_docs/MBIM-Compliance-1.0.pdf
     37     """
     38     version = 1
     39 
     40     def run_internal(self):
     41         """ Run CM_16 test. """
     42         # Precondition
     43         desc_sequence = get_descriptors_sequence.GetDescriptorsSequence(
     44                 self.device_context)
     45         descriptors = desc_sequence.run()
     46         self.device_context.update_descriptor_cache(descriptors)
     47         open_sequence = mbim_open_generic_sequence.MBIMOpenGenericSequence(
     48                 self.device_context)
     49         open_sequence.run(max_control_transfer_size=64)
     50 
     51         device_context = self.device_context
     52         descriptor_cache = device_context.descriptor_cache
     53         self.channel = mbim_channel.MBIMChannel(
     54                 device_context.device,
     55                 descriptor_cache.mbim_communication_interface.bInterfaceNumber,
     56                 descriptor_cache.interrupt_endpoint.bEndpointAddress,
     57                 device_context.max_control_transfer_size)
     58 
     59         # Step 1
     60         caps_command_message = mbim_command_message.MBIMDeviceCapsQuery()
     61         caps_packets = mbim_message_request.generate_request_packets(
     62                 caps_command_message,
     63                 device_context.max_control_transfer_size)
     64         self.caps_transaction_id = caps_command_message.transaction_id
     65 
     66         # Step 2
     67         services_command_message = (
     68                 mbim_command_message.MBIMDeviceServicesQuery())
     69         services_packets = mbim_message_request.generate_request_packets(
     70                 services_command_message,
     71                 device_context.max_control_transfer_size)
     72         self.services_transaction_id = services_command_message.transaction_id
     73 
     74         # Transmit the messages now
     75         self.channel.unidirectional_transaction(*caps_packets)
     76         self.channel.unidirectional_transaction(*services_packets)
     77 
     78         # Step 3
     79         utils.poll_for_condition(
     80                 self._get_response_packets,
     81                 timeout=5,
     82                 exception=mbim_errors.MBIMComplianceChannelError(
     83                         'Failed to retrieve the response packets to specific '
     84                         'control messages.'))
     85         self.channel.close()
     86 
     87         caps_response_message = self.caps_response
     88         services_response_message = self.services_response
     89         is_caps_message_valid = isinstance(
     90                 caps_response_message,
     91                 mbim_command_message.MBIMDeviceCapsInfo)
     92         is_services_message_valid = isinstance(
     93                 services_response_message,
     94                 mbim_command_message.MBIMDeviceServicesInfo)
     95         if not ((is_caps_message_valid and is_services_message_valid) and
     96                 (caps_response_message.transaction_id ==
     97                  caps_command_message.transaction_id) and
     98                 (caps_response_message.device_service_id ==
     99                  caps_command_message.device_service_id) and
    100                 caps_response_message.cid == caps_command_message.cid and
    101                 (services_command_message.transaction_id ==
    102                  services_response_message.transaction_id) and
    103                 (services_command_message.device_service_id ==
    104                  services_response_message.device_service_id) and
    105                 services_command_message.cid == services_response_message.cid):
    106             mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError,
    107                                       'mbim1.0:9.5#1')
    108 
    109 
    110     def _get_response_packets(self):
    111         """
    112         Condition method for |poll_for_condition| to check the retrieval of
    113         target packets.
    114 
    115         @returns True if both caps response packet and services response packet
    116                 are received, False otherwise.
    117 
    118         """
    119         try:
    120             packets = self.channel.get_outstanding_packets()
    121         except mbim_errors.MBIMComplianceChannelError:
    122             logging.debug("Error in receiving response fragments from the device")
    123             mbim_errors.log_and_raise(mbim_errors.MBIMComplianceAssertionError,
    124                                       'mbim1.0:9.5#1')
    125         self.caps_response = None
    126         self.services_response = None
    127         for packet in packets:
    128             try:
    129                 message_response = mbim_message_response.parse_response_packets(
    130                         packet)
    131             except mbim_errors.MBIMComplianceControlMessageError:
    132                 logging.debug("Error in parsing response fragments from the device")
    133                 mbim_errors.log_and_raise(
    134                         mbim_errors.MBIMComplianceAssertionError,
    135                         'mbim1.0:9.5#1')
    136             if message_response.transaction_id == self.caps_transaction_id:
    137                 self.caps_response = message_response
    138             elif (message_response.transaction_id ==
    139                   self.services_transaction_id):
    140                 self.services_response = message_response
    141             if self.caps_response and self.services_response:
    142                 return True
    143         return False
    144