Home | History | Annotate | Download | only in mbim_compliance
      1 # Copyright 2015 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import logging
      6 import numpy
      7 
      8 import common
      9 from autotest_lib.client.cros.cellular.mbim_compliance import mbim_errors
     10 
     11 
     12 class MBIMDataChannel(object):
     13     """
     14     Provides access to the data channel of a MBIM modem.
     15 
     16     The object is used to send and receive MBIM frames to/from the modem.
     17     The object uses the BULK-IN endpoint exposed in the data interface for any
     18     reads from the modem to the host.
     19     The object uses the BULK-OUT endpoint exposed in the data interface for any
     20     writes from the host to the modem.
     21     The channel does not deaggregate/aggregate packets into MBIM frames. The
     22     caller is expected to validate/provide MBIM frames to the channel. The
     23     channel is just used to send raw bytes to the device and read raw bytes from
     24     the device.
     25 
     26     """
     27     _READ_TIMEOUT_MS = 10000
     28     _WRITE_TIMEOUT_MS = 10000
     29 
     30     def __init__(self,
     31                  device,
     32                  data_interface_number,
     33                  bulk_in_endpoint_address,
     34                  bulk_out_endpoint_address,
     35                  max_in_buffer_size):
     36         """
     37         @param device: Device handle returned by PyUSB for the modem to test.
     38         @param bulk_in_endpoint_address: |bEndpointAddress| for the usb
     39                 BULK IN endpoint from the data interface.
     40         @param bulk_out_endpoint_address: |bEndpointAddress| for the usb
     41                 BULK OUT endpoint from the data interface.
     42         @param max_in_buffer_size: The (fixed) buffer size to used for in
     43                 data transfers.
     44 
     45         """
     46         self._device = device
     47         self._data_interface_number = data_interface_number
     48         self._bulk_in_endpoint_address = bulk_in_endpoint_address
     49         self._bulk_out_endpoint_address = bulk_out_endpoint_address
     50         self._max_in_buffer_size = max_in_buffer_size
     51 
     52 
     53     def send_ntb(self, ntb):
     54         """
     55         Send the specified payload down to the device using the bulk-out USB
     56         pipe.
     57 
     58         @param ntb: Byte array of complete MBIM NTB to be sent to the device.
     59         @raises MBIMComplianceDataTransferError if the complete |ntb| could not
     60                 be sent.
     61 
     62         """
     63         ntb_length = len(ntb)
     64         written = self._device.write(endpoint=self._bulk_out_endpoint_address,
     65                                      data=ntb,
     66                                      timeout=self._WRITE_TIMEOUT_MS,
     67                                      interface=self._data_interface_number)
     68         numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
     69                                linewidth=1000)
     70         logging.debug('Data Channel: Sent %d bytes out of %d bytes requested. '
     71                       'Payload: %s',
     72                        written, ntb_length, numpy.array(ntb))
     73         if written < ntb_length:
     74             mbim_errors.log_and_raise(
     75                     mbim_errors.MBIMComplianceDataTransferError,
     76                     'Could not send the complete NTB (%d/%d bytes sent)' %
     77                     written, ntb_length)
     78 
     79 
     80     def receive_ntb(self):
     81         """
     82         Receive a payload from the device using the bulk-in USB pipe.
     83 
     84         This API will return any data it receives from the device within
     85         |_READ_TIMEOUT_S| seconds. If nothing is received within this duration,
     86         it returns an empty byte array. The API returns only one MBIM NTB
     87         received per invocation.
     88 
     89         @returns Byte array of complete MBIM NTB received from the device. This
     90                 could be empty if nothing is received from the device.
     91 
     92         """
     93         ntb = self._device.read(endpoint=self._bulk_in_endpoint_address,
     94                                 size=self._max_in_buffer_size,
     95                                 timeout=self._READ_TIMEOUT_MS,
     96                                 interface=self._data_interface_number)
     97         ntb_length = len(ntb)
     98         numpy.set_printoptions(formatter={'int':lambda x: hex(int(x))},
     99                                linewidth=1000)
    100         logging.debug('Data Channel: Received %d bytes response. Payload: %s',
    101                        ntb_length, numpy.array(ntb))
    102         return ntb
    103