Home | History | Annotate | Download | only in cellular
      1 #!/usr/bin/python
      2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import cellular_logging
      7 import cellular_system_error
      8 
      9 log = cellular_logging.SetupCellularLogging('scpi_driver')
     10 
     11 
     12 class _ErrorCheckerContext(object):
     13     """Reference-count our error-checking state and only check for
     14     errors when we take the first ref or drop the last ref.
     15 
     16     This way, we can minimize the number of checks; each one takes a
     17     bit of time.  You will likely want to set always_check to True when
     18     debugging new SCPI interactions.
     19 
     20     On first entry, we check for errors, but do not stop if we find
     21     them; these are errors that were accumulated on the device before
     22     this test ran.
     23     """
     24 
     25     def __init__(self, scpi):
     26         self.always_check = True  # True for serious debugging
     27         self.scpi = scpi
     28         self.depth = 0
     29         self.raise_on_error = True
     30 
     31     def __enter__(self):
     32         log.debug('ErrorCheckerContext Depth: %s' % self.depth)
     33         if self.depth == 0 or self.always_check:
     34             errors = self.scpi._WaitAndFetchErrors(
     35                 raise_on_error=False)  # Never raise when clearing old errors
     36         self.depth += 1
     37         return self
     38 
     39     def __exit__(self, type, value, traceback):
     40         self.depth -= 1
     41         if self.depth <= 0 or self.always_check:
     42             self.scpi._WaitAndFetchErrors()
     43         return
     44 
     45 
     46 class Scpi(object):
     47     """Wrapper for SCPI.
     48 
     49     SCPI = "standard commands for programmable instruments",
     50     a relative of GPIB.
     51 
     52     The SCPI driver must export:  Query, Send, Reset and Close
     53     """
     54 
     55     def __init__(self, driver, opc_on_stanza=False):
     56         self.driver = driver
     57         self.opc_on_stanza = opc_on_stanza
     58         self.checker_context = _ErrorCheckerContext(self)
     59 
     60     def Query(self, command):
     61         """Send the SCPI command and return the response."""
     62         response = self.driver.Query(command)
     63         return response
     64 
     65     def Send(self, command):
     66         """Send the SCPI command."""
     67         self.driver.Send(command)
     68 
     69     def Reset(self):
     70         """Tell the device to reset with *RST."""
     71         # Some devices (like the prologix) require special handling for
     72         # reset.
     73         self.driver.Reset()
     74 
     75     def Close(self):
     76         """Close the device."""
     77         self.driver.Close()
     78 
     79     def RetrieveErrors(self):
     80         """Retrieves all SYSTem:ERRor messages from the device."""
     81         errors = []
     82         while True:
     83             error = self.Query('SYSTem:ERRor?')
     84             if '+0,"No error"' in error:
     85                 # We've reached the end of the error stack
     86                 break
     87 
     88             if '-420' in error and 'Query UNTERMINATED' in error:
     89                 # This is benign; the GPIB bridge asked for a response when
     90                 # the device didn't have one to give.
     91 
     92                 # TODO(rochberg): This is a layering violation; we should
     93                 # really only accept -420 if the underlying driver is in a
     94                 # mode that is known to cause this
     95                 continue
     96 
     97             if '+292' in error and 'Data arrived on unknown SAPI' in error:
     98                 # This may be benign; It is known to occur when we do a switch
     99                 # from GPRS to WCDMA
    100                 continue
    101 
    102             errors.append(error)
    103 
    104         self.Send('*CLS')           # Clear status
    105         errors.reverse()
    106         return errors
    107 
    108     def _WaitAndFetchErrors(self, raise_on_error=True):
    109         """Waits for command completion, returns errors."""
    110         self.Query('*OPC?')      # Wait for operation complete
    111         errors = self.RetrieveErrors()
    112         if errors and raise_on_error:
    113             raise cellular_system_error.BadScpiCommand('\n'.join(errors))
    114         return errors
    115 
    116     def SimpleVerify(self, command, arg):
    117         """Sends "command arg", then "command?", expecting arg back.
    118 
    119         Arguments:
    120           command: SCPI command
    121           arg: Argument.  We currently check for exact equality: you should
    122             send strings quoted with " because that's what the 8960 returns.
    123             We also fail if you send 1 and receive +1 back.
    124 
    125         Raises:
    126           Error:  Verification failed
    127         """
    128         self.always_check = False
    129         with self.checker_context:
    130             self.Send('%s %s' % (command, arg))
    131             result = self.Query('%s?' % (command,))
    132             if result != arg:
    133                 raise cellular_system_error.BadScpiCommand(
    134                     'Error on %s: sent %s, got %s' % (command, arg, result))
    135 
    136     def SendStanza(self, commands):
    137         """
    138         Sends a list of commands and verifies that they complete correctly.
    139         """
    140         with self.checker_context:
    141             for c in commands:
    142                 if self.opc_on_stanza:
    143                     self.Send(c)
    144                     self.Query('*OPC?')
    145                 else:
    146                     self.Send(c)
    147