Home | History | Annotate | Download | only in cellular
      1 #!/usr/bin/python
      2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 
      7 import copy
      8 import mock
      9 import prologix_scpi_driver
     10 import scpi
     11 import unittest
     12 import cellular_logging
     13 import cellular_system_error
     14 
     15 log = cellular_logging.SetupCellularLogging('scpi_test')
     16 
     17 # TODO:(byronk):
     18 # a hack for now. Should look this up in labconfig_data. crbug.com/225108
     19 # TODO:(byronk):
     20 # replace SystemError with a specific exception crbug.com/225127
     21 
     22 scpi_instruments = [
     23     # Agilent 8960 call box
     24     {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'},
     25     # PXT is called 6621
     26     {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"}
     27 ]
     28 
     29 
     30 class BasicPrologixTest(unittest.TestCase):
     31     """
     32     Basic connection test
     33     """
     34 
     35     def test_bad_ip_address(self):
     36         """
     37         Connect to the wrong port and check for the right error message.
     38         """
     39         instr = copy.copy(scpi_instruments[0])
     40         instr['ip'] = '192.168.0.0'  # str(int(instr['gpib_addr'])+1)
     41         log.debug(instr)
     42         with self.assertRaises(Exception) as ex:
     43             self._get_idns_and_verify(instruments=[instr], opc=True)
     44         self.assertIsInstance(ex.exception,
     45                               cellular_system_error.SocketTimeout)
     46 
     47     def test_ConnectToPortSuccess(self):
     48         """ Make a socket connection """
     49         s = scpi_instruments[0]
     50         prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5)
     51 
     52     def test_ConnectToPortBadIP(self):
     53         """ Make a socket connection """
     54         with self.assertRaises(Exception) as ex:
     55             prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1)
     56         self.assertIsInstance(ex.exception,
     57                               cellular_system_error.SocketTimeout)
     58 
     59     def test_BadGpibAddress(self):
     60         """
     61         How does the code behave if we can't connect.
     62         """
     63         instr = copy.copy(scpi_instruments[0])
     64         instr['gpib_addr'] = 9  # str(int(instr['gpib_addr'])+1)
     65         with self.assertRaises(Exception) as ex:
     66             self._get_idns_and_verify(instruments=[instr], opc=True)
     67         self.assertIsInstance(ex.exception,
     68                               cellular_system_error.InstrumentTimeout)
     69 
     70     @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery')
     71     def test_NonClearReadBufferBeforeInit(self, patched_driver):
     72         """
     73         Sometimes the Prologix box will have junk in it's read buffer
     74         There is code to read the junk out until setting the ++addr works.
     75         Test that here.
     76         """
     77         s = scpi_instruments[0]
     78         patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']]
     79         driver = prologix_scpi_driver.PrologixScpiDriver(
     80             hostname=s['ip'],
     81             port=1234,
     82             gpib_address=s['gpib_addr'],
     83             read_timeout_seconds=2)
     84 
     85     def test_Reset(self):
     86         for instr in scpi_instruments:
     87             scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
     88                                                   read_timeout_seconds=20)
     89             scpi_connection.Reset()
     90             self.scpi_connection.Close()
     91 
     92     def test_SimpleVerify(self):
     93         """
     94         call SimpleVerify.
     95         """
     96         # TODO(byronk): make sure this test only runs on the 8960. This
     97         # command doesn't work on other boxes
     98         for instr in scpi_instruments[:1]:
     99             assert instr['name_part'] == '8960'
    100             scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
    101                                                   read_timeout_seconds=2)
    102             # Check to see if the power state is off.
    103             # setting most instrument to off should be ok.
    104             scpi_connection.SimpleVerify('call:ms:pow:targ', '+0')
    105             self.scpi_connection.Close()
    106 
    107     def test_FetchErrors(self):
    108         """
    109         call FetchErrors
    110         """
    111         for instr in scpi_instruments:
    112             scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
    113                                                   read_timeout_seconds=2)
    114             scpi_connection._WaitAndFetchErrors()
    115             self.scpi_connection.Close()
    116 
    117     def test_BadScpiCommand(self):
    118         """
    119         Send a bad command. We should fail gracefully.
    120         """
    121         for instr in scpi_instruments:
    122             scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
    123                                                   read_timeout_seconds=1)
    124             try:
    125                 scpi_connection.Query('*IDN')
    126             except cellular_system_error.InstrumentTimeout:
    127                 assert \
    128                  "Should have raised a Instrument Timeout on a bad SCPI command"
    129 
    130     def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self):
    131         """
    132         Send a stanza, which uses the context manager
    133         """
    134         for instr in scpi_instruments:
    135             scpi_connection = self._open_prologix(instr, opc_on_stanza=False,
    136                                                   read_timeout_seconds=5)
    137             scpi_connection.SendStanza(['*WAI'])
    138             scpi_connection.Close()
    139 
    140     def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self):
    141         """
    142         Send a stanza, which uses the context manager
    143         """
    144         for instr in scpi_instruments:
    145             scpi_connection = self._open_prologix(instr, opc_on_stanza=True,
    146                                                   read_timeout_seconds=5)
    147             scpi_connection.SendStanza(['*WAI'])
    148             scpi_connection.Close()
    149 
    150     def test_GetIdnOpcTrue(self):
    151         """
    152         Test with opc True. OPC= operation complete. Asking this
    153         question *OPC? after commands blocks until the command finishes.
    154         This prevents us from sending commands faster then then the
    155         instrument can handle. True is usually the right setting.
    156 
    157         """
    158         self._get_idns_and_verify(instruments=scpi_instruments, opc=True)
    159 
    160     def test_GetIdnOpcFalse(self):
    161         """
    162         Now with OPC off.
    163         """
    164         self._get_idns_and_verify(instruments=scpi_instruments, opc=False)
    165 
    166     def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2):
    167         """
    168         Build the prologix object.
    169         """
    170         ip_addr = instr['ip']
    171         name_part = instr['name_part']
    172         gpib_addr = instr['gpib_addr']
    173         log.debug("trying %s at %s" % (name_part, ip_addr))
    174         driver = prologix_scpi_driver.PrologixScpiDriver(
    175             hostname=ip_addr,
    176             port=1234,
    177             gpib_address=gpib_addr,
    178             read_timeout_seconds=read_timeout_seconds)
    179         self.scpi_connection = scpi.Scpi(driver)
    180         log.debug("setting opc to %s " % opc_on_stanza)
    181         self.scpi_connection.opc_on_stanza = opc_on_stanza
    182         return self.scpi_connection
    183 
    184     def _get_idns_and_verify(self, instruments, opc=False):
    185         """
    186         Get the idn string from all the instruments, and check that it
    187         contains the desired substring. This is a quick sanity check only.
    188         """
    189         for instr in instruments:
    190             scpi_connection = self._open_prologix(instr, opc_on_stanza=opc)
    191             response = scpi_connection.Query('*IDN?')
    192             log.debug("looking for %s  in response string: %s " %
    193                       (instr['name_part'], response))
    194             assert instr['name_part'] in response
    195             self.scpi_connection.Close()
    196 
    197 if __name__ == '__main__':
    198     unittest.main()
    199