Home | History | Annotate | Download | only in mbim_compliance
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import multiprocessing
      7 import Queue
      8 import struct
      9 import time
     10 
     11 import common
     12 from autotest_lib.client.bin import utils
     13 from autotest_lib.client.cros.cellular.mbim_compliance \
     14         import mbim_channel_endpoint
     15 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
     16 
     17 
     18 class MBIMChannel(object):
     19     """
     20     Provide synchronous access to the modem with MBIM command level interaction.
     21 
     22     This object should simplify your interaction over the MBIM channel as
     23     follows:
     24     - Use |bidirectional_transaction| to send MBIM packets that are part of a
     25       transaction. This function will block until the transaction completes and
     26       return the MBIM packets received in response.
     27     - |bidirectional_transaction| will filter out packets that do not correspond
     28       to your transaction. This way, you don't have to worry about unsolicited
     29       notifications and/or stale packets when interacting with the modem.
     30     - All filtered out packets can be grabbed using the
     31       |get_outstanding_packets| function. Use this function to receive error
     32       notifications, status notifications, etc.
     33     - Use |unidirectional_transaction| to send MBIM packets for which you don't
     34       expect a response.
     35     - Use |flush| to clean out all pipes before starting a new transaction.
     36 
     37     Note that "MBIM packets" here really means MBIM fragments. This object does
     38     not (de)fragment packets for you. Out of necessity, it does check that
     39     received fragments are contiguous and in-order.
     40 
     41     So, this object houses the minimum information necessary about the MBIM
     42     fragments to provide you a comfortable synchronous packet level channel.
     43 
     44     """
     45 
     46     ENDPOINT_JOIN_TIMEOUT_S = 5
     47     FRAGMENT_TIMEOUT_S = 3
     48     # TODO(pprabhu) Consider allowing each transaction to specify its own
     49     # timeout.
     50     TRANSACTION_TIMEOUT_S = 5
     51 
     52     MESSAGE_HEADER_FORMAT = '<LLL'
     53     FRAGMENT_HEADER_FORMAT = '<LL'
     54     MBIM_FRAGMENTED_MESSAGES = [
     55             0x00000003,  # MBIM_COMMAND_MSG
     56             0x80000003,  # MBIM_COMMAND_DONE
     57             0x80000007]  # MBIM_INDICATE_STATUS
     58 
     59     def __init__(self,
     60                  device,
     61                  interface_number,
     62                  interrupt_endpoint_address,
     63                  in_buffer_size,
     64                  process_class=None):
     65         """
     66         @param device: Device handle returned by PyUSB for the modem to test.
     67         @param interface_number: |bInterfaceNumber| of the MBIM interface.
     68         @param interrupt_endpoint_address: |bEndpointAddress| for the usb
     69                 INTERRUPT IN endpoint for notifications.
     70         @param in_buffer_size: The (fixed) buffer size to use for in control
     71                 transfers.
     72         @param process_class: The class to instantiate to create a subprocess.
     73                 This is used by tests only, to easily mock out the process
     74                 ceation.
     75 
     76         """
     77         self._stop_request_event = multiprocessing.Event()
     78         self._request_queue = multiprocessing.Queue()
     79         self._response_queue = multiprocessing.Queue()
     80         self._outstanding_packets = []
     81         self._last_response = []
     82         self._stashed_first_fragment = None
     83         if process_class is None:
     84             process_class = multiprocessing.Process
     85         self._endpoint_process = process_class(
     86                 target=mbim_channel_endpoint.MBIMChannelEndpoint,
     87                 args=(device,
     88                       interface_number,
     89                       interrupt_endpoint_address,
     90                       in_buffer_size,
     91                       self._request_queue,
     92                       self._response_queue,
     93                       self._stop_request_event))
     94         self._endpoint_process.start()
     95 
     96 
     97     def __del__(self):
     98         """
     99         The destructor.
    100 
    101         Note that it is not guaranteed that |__del__| is called for objects that
    102         exist when the interpreter exits. It is recommended to call |close|
    103         explicitly.
    104 
    105         """
    106         self.close()
    107 
    108 
    109     def close(self):
    110         """
    111         Cleanly close the MBIMChannel.
    112 
    113         MBIMChannel forks a subprocess to communicate with the USB device. It is
    114         recommended that |close| be called explicitly.
    115 
    116         """
    117         if not self._endpoint_process:
    118             return
    119 
    120         if self._endpoint_process.is_alive():
    121             self._stop_request_event.set()
    122             self._endpoint_process.join(self.ENDPOINT_JOIN_TIMEOUT_S)
    123             if self._endpoint_process.is_alive():
    124                 self._endpoint_process.terminate()
    125 
    126         self._endpoint_process = None
    127 
    128 
    129     def bidirectional_transaction(self, *args):
    130         """
    131         Execute a synchronous bidirectional transaction.
    132 
    133         @param *args: Fragments of a single MBIM transaction. An MBIM
    134                 transaction may consist of multiple fragments - each fragment is
    135                 the payload for a USB control message. It should be an
    136                 |array.array| object.  It is your responsibility (and choice) to
    137                 keep the fragments in-order, and to send all the fragments.
    138                 For more details, see "Fragmentation of messages" in the MBIM
    139                 spec.
    140         @returns: A list of fragments in the same order as received that
    141                 correspond to the given transaction. If we receive less
    142                 fragments than claimed, we will return what we get. If we
    143                 receive non-contiguous / out-of-order fragments, we'll complain.
    144         @raises: MBIMComplianceChannelError if received fragments are
    145                 out-of-order or non-contigouos.
    146 
    147         """
    148         self._verify_endpoint_open()
    149         if not args:
    150             mbim_errors.log_and_raise(
    151                     mbim_errors.MBIMComplianceChannelError,
    152                     'No data given to |bidirectional_transaction|.')
    153 
    154         transaction_id, _, _ = self._fragment_metadata(args[0])
    155         for fragment in args:
    156             self._request_queue.put_nowait(fragment)
    157         return self._get_response_fragments(transaction_id)
    158 
    159 
    160     def unidirectional_transaction(self, *args):
    161         """
    162         Execute a synchronous unidirectional transaction. No return value.
    163 
    164         @param *args: Fragments of a single MBIM transaction. An MBIM
    165                 transaction may consist of multiple fragments - each fragment is
    166                 the payload for a USB control message. It should be an
    167                 |array.array| object.  It is your responsibility (and choice) to
    168                 keep the fragments in-order, and to send all the fragments.
    169                 For more details, see "Fragmentation of messages" in the MBIM
    170                 spec.
    171 
    172         """
    173         self._verify_endpoint_open()
    174         if not args:
    175             mbim_errors.log_and_raise(
    176                     mbim_errors.MBIMComplianceChannelError,
    177                     'No data given to |unidirectional_transaction|.')
    178 
    179         for fragment in args:
    180             self._request_queue.put_nowait(fragment)
    181 
    182 
    183     def flush(self):
    184         """
    185         Clean out all queues.
    186 
    187         This waits till all outgoing packets have been sent, and then waits some
    188         more to give the channel time to settle down.
    189 
    190         @raises: MBIMComplianceChannelError if things don't settle down fast
    191                 enough.
    192         """
    193         self._verify_endpoint_open()
    194         num_remaining_fragments = self._request_queue.qsize()
    195         try:
    196             timeout = self.FRAGMENT_TIMEOUT_S * num_remaining_fragments
    197             utils.poll_for_condition(lambda: self._request_queue.empty(),
    198                                      timeout=timeout)
    199         except utils.TimeoutError:
    200             mbim_errors.log_and_raise(
    201                     mbim_errors.MBIMComplianceChannelError,
    202                     'Could not flush request queue.')
    203 
    204         # Now wait for the response queue to settle down.
    205         # In the worst case, each request fragment that was remaining at the
    206         # time flush was called belonged to a different transaction, and each of
    207         # these transactions would serially timeout in |TRANSACTION_TIMEOUT_S|.
    208         # To avoid sleeping for long times, we cap this value arbitrarily to 5
    209         # transactions.
    210         num_remaining_transactions = min(5, num_remaining_fragments)
    211         time.sleep(num_remaining_fragments * self.TRANSACTION_TIMEOUT_S)
    212         extra_packets = self.get_outstanding_packets()
    213         for packet in extra_packets:
    214             logging.debug('flush: discarding packet: %s', packet)
    215 
    216 
    217     def get_outstanding_packets(self):
    218         """
    219         Get all received packets that were not part of an explicit transaction.
    220 
    221         @returns: A list of packets. Each packet is a list of fragments, so you
    222         perhaps want to do something like:
    223             for packet in channel.get_outstanding_packets():
    224                 for fragment in packet:
    225                     # handle fragment.
    226 
    227         """
    228         self._verify_endpoint_open()
    229         # Try to get more packets from the response queue.
    230         # This can block forever if the modem keeps spewing trash at us.
    231         while True:
    232             packet = self._get_packet_fragments()
    233             if not packet:
    234                 break
    235             self._outstanding_packets.append(packet)
    236 
    237         packets = self._outstanding_packets
    238         self._outstanding_packets = []
    239         return packets
    240 
    241 
    242     def _get_response_fragments(self, transaction_id):
    243         """
    244         Get response for the given |transaction_id|.
    245 
    246         @returns: A list of fragments.
    247         @raises: MBIMComplianceChannelError if response is not recieved.
    248 
    249         """
    250         def _poll_response():
    251             packet = self._get_packet_fragments()
    252             if not packet:
    253                 return False
    254             first_fragment = packet[0]
    255             response_id, _, _ = self._fragment_metadata(first_fragment)
    256             if response_id == transaction_id:
    257                 self._last_response = packet
    258                 return True
    259             self._outstanding_packets.append(packet)
    260             return False
    261 
    262         try:
    263             utils.poll_for_condition(
    264                     _poll_response,
    265                     timeout=self.TRANSACTION_TIMEOUT_S)
    266         except utils.TimeoutError:
    267             mbim_errors.log_and_raise(
    268                     mbim_errors.MBIMComplianceChannelError,
    269                     'Did not receive timely reply to transaction %d' %
    270                     transaction_id)
    271         return self._last_response
    272 
    273 
    274     def _get_packet_fragments(self):
    275         """
    276         Get all fragements of the next packet from the modem.
    277 
    278         This function is responsible for putting together fragments of one
    279         packet, and checking that fragments are continguous and in-order.
    280 
    281         """
    282         fragments = []
    283         if self._stashed_first_fragment is not None:
    284             first_fragment = self._stashed_first_fragment
    285             self._stashed_first_fragment = None
    286         else:
    287             try:
    288                 first_fragment = self._response_queue.get(
    289                         True, self.FRAGMENT_TIMEOUT_S)
    290             except Queue.Empty:
    291                 # *Don't fail* Just return nothing.
    292                 return fragments
    293 
    294         transaction_id, total_fragments, current_fragment = (
    295                 self._fragment_metadata(first_fragment))
    296         if current_fragment != 0:
    297             mbim_errors.log_and_raise(
    298                     mbim_errors.MBIMComplianceChannelError,
    299                     'First fragment reports fragment number %d' %
    300                     current_fragment)
    301 
    302         fragments.append(first_fragment)
    303 
    304         last_fragment = 0
    305         while last_fragment < total_fragments - 1:
    306             try:
    307                 fragment = self._response_queue.get(True,
    308                                                     self.FRAGMENT_TIMEOUT_S)
    309             except Queue.Empty:
    310                 # *Don't fail* Just return the fragments we got so far.
    311                 break
    312 
    313             fragment_id, fragment_total, fragment_current = (
    314                     self._fragment_metadata(fragment))
    315             if fragment_id != transaction_id:
    316                 # *Don't fail* Treat a different transaction id as indicating
    317                 # that the next packet has already arrived.
    318                 logging.warning('Recieved only %d out of %d fragments for '
    319                                 'transaction %d.',
    320                                 last_fragment,
    321                                 total_fragments,
    322                                 transaction_id)
    323                 self._stashed_first_fragment = fragment
    324                 break
    325 
    326             if fragment_total != total_fragments:
    327                 mbim_errors.log_and_raise(
    328                         mbim_errors.MBIMComplianceChannelError,
    329                         'Fragment number %d reports incorrect total (%d/%d)' %
    330                         (last_fragment + 1, fragment_total, total_fragments))
    331 
    332             if fragment_current != last_fragment + 1:
    333                 mbim_errors.log_and_raise(
    334                         mbim_errors.MBIMComplianceChannelError,
    335                         'Received reordered fragments. Expected %d, got %d' %
    336                         (last_fragment + 1, fragment_current))
    337 
    338             last_fragment += 1
    339             fragments.append(fragment)
    340 
    341         return fragments
    342 
    343 
    344     def _fragment_metadata(self, fragment):
    345         """ This function houses all the MBIM packet knowledge. """
    346         # All packets have a message header.
    347         if len(fragment) < struct.calcsize(self.MESSAGE_HEADER_FORMAT):
    348             mbim_errors.log_and_raise(
    349                     mbim_errors.MBIMComplianceChannelError,
    350                     'Corrupted fragment |%s| does not have an MBIM header.' %
    351                     fragment)
    352 
    353         message_type, _, transaction_id = struct.unpack_from(
    354                 self.MESSAGE_HEADER_FORMAT,
    355                 fragment)
    356 
    357         if message_type in self.MBIM_FRAGMENTED_MESSAGES:
    358             fragment = fragment[struct.calcsize(self.MESSAGE_HEADER_FORMAT):]
    359             if len(fragment) < struct.calcsize(self.FRAGMENT_HEADER_FORMAT):
    360                 mbim_errors.log_and_raise(
    361                         mbim_errors.MBIMComplianceChannelError,
    362                         'Corrupted fragment |%s| does not have a fragment '
    363                         'header. ' %
    364                         fragment)
    365 
    366             total_fragments, current_fragment = struct.unpack_from(
    367                     self.FRAGMENT_HEADER_FORMAT,
    368                     fragment)
    369         else:
    370             # For other types, there is only one 'fragment'.
    371             total_fragments = 1
    372             current_fragment = 0
    373 
    374         return transaction_id, total_fragments, current_fragment
    375 
    376 
    377     def _verify_endpoint_open(self):
    378         if not self._endpoint_process.is_alive():
    379             mbim_errors.log_and_raise(
    380                     mbim_errors.MBIMComplianceChannelError,
    381                     'MBIMChannelEndpoint died unexpectedly. '
    382                     'The actual exception can be found in log entries from the '
    383                     'subprocess.')
    384