1 #!/usr/bin/python 2 # Copyright (c) 2013 The Chromium OS Authors. All rights reserved. 3 # Use of this source code is governed by a BSD-style license that can be 4 # found in the LICENSE file. 5 6 7 import copy 8 import mock 9 import prologix_scpi_driver 10 import scpi 11 import unittest 12 import cellular_logging 13 import cellular_system_error 14 15 log = cellular_logging.SetupCellularLogging('scpi_test') 16 17 # TODO:(byronk): 18 # a hack for now. Should look this up in labconfig_data. crbug.com/225108 19 # TODO:(byronk): 20 # replace SystemError with a specific exception crbug.com/225127 21 22 scpi_instruments = [ 23 # Agilent 8960 call box 24 {'name_part': "8960", 'gpib_addr': '14', 'ip': '172.22.50.118'}, 25 # PXT is called 6621 26 {'name_part': "6621", 'gpib_addr': '14', 'ip': "172.22.50.244"} 27 ] 28 29 30 class BasicPrologixTest(unittest.TestCase): 31 """ 32 Basic connection test 33 """ 34 35 def test_bad_ip_address(self): 36 """ 37 Connect to the wrong port and check for the right error message. 38 """ 39 instr = copy.copy(scpi_instruments[0]) 40 instr['ip'] = '192.168.0.0' # str(int(instr['gpib_addr'])+1) 41 log.debug(instr) 42 with self.assertRaises(Exception) as ex: 43 self._get_idns_and_verify(instruments=[instr], opc=True) 44 self.assertIsInstance(ex.exception, 45 cellular_system_error.SocketTimeout) 46 47 def test_ConnectToPortSuccess(self): 48 """ Make a socket connection """ 49 s = scpi_instruments[0] 50 prologix_scpi_driver.connect_to_port(s['ip'], 1234, 5) 51 52 def test_ConnectToPortBadIP(self): 53 """ Make a socket connection """ 54 with self.assertRaises(Exception) as ex: 55 prologix_scpi_driver.connect_to_port('192.168.255.111', 1234, 1) 56 self.assertIsInstance(ex.exception, 57 cellular_system_error.SocketTimeout) 58 59 def test_BadGpibAddress(self): 60 """ 61 How does the code behave if we can't connect. 62 """ 63 instr = copy.copy(scpi_instruments[0]) 64 instr['gpib_addr'] = 9 # str(int(instr['gpib_addr'])+1) 65 with self.assertRaises(Exception) as ex: 66 self._get_idns_and_verify(instruments=[instr], opc=True) 67 self.assertIsInstance(ex.exception, 68 cellular_system_error.InstrumentTimeout) 69 70 @mock.patch.object(prologix_scpi_driver.PrologixScpiDriver, '_DirectQuery') 71 def test_NonClearReadBufferBeforeInit(self, patched_driver): 72 """ 73 Sometimes the Prologix box will have junk in it's read buffer 74 There is code to read the junk out until setting the ++addr works. 75 Test that here. 76 """ 77 s = scpi_instruments[0] 78 patched_driver.side_effect = ['junk1', 'junk2', s['gpib_addr']] 79 driver = prologix_scpi_driver.PrologixScpiDriver( 80 hostname=s['ip'], 81 port=1234, 82 gpib_address=s['gpib_addr'], 83 read_timeout_seconds=2) 84 85 def test_Reset(self): 86 for instr in scpi_instruments: 87 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 88 read_timeout_seconds=20) 89 scpi_connection.Reset() 90 self.scpi_connection.Close() 91 92 def test_SimpleVerify(self): 93 """ 94 call SimpleVerify. 95 """ 96 # TODO(byronk): make sure this test only runs on the 8960. This 97 # command doesn't work on other boxes 98 for instr in scpi_instruments[:1]: 99 assert instr['name_part'] == '8960' 100 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 101 read_timeout_seconds=2) 102 # Check to see if the power state is off. 103 # setting most instrument to off should be ok. 104 scpi_connection.SimpleVerify('call:ms:pow:targ', '+0') 105 self.scpi_connection.Close() 106 107 def test_FetchErrors(self): 108 """ 109 call FetchErrors 110 """ 111 for instr in scpi_instruments: 112 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 113 read_timeout_seconds=2) 114 scpi_connection._WaitAndFetchErrors() 115 self.scpi_connection.Close() 116 117 def test_BadScpiCommand(self): 118 """ 119 Send a bad command. We should fail gracefully. 120 """ 121 for instr in scpi_instruments: 122 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 123 read_timeout_seconds=1) 124 try: 125 scpi_connection.Query('*IDN') 126 except cellular_system_error.InstrumentTimeout: 127 assert \ 128 "Should have raised a Instrument Timeout on a bad SCPI command" 129 130 def test_ErrorCheckerContextAndStanzaSendingOpcFalse(self): 131 """ 132 Send a stanza, which uses the context manager 133 """ 134 for instr in scpi_instruments: 135 scpi_connection = self._open_prologix(instr, opc_on_stanza=False, 136 read_timeout_seconds=5) 137 scpi_connection.SendStanza(['*WAI']) 138 scpi_connection.Close() 139 140 def test_ErrorCheckerContextAndStanzaSendingOpcTrue(self): 141 """ 142 Send a stanza, which uses the context manager 143 """ 144 for instr in scpi_instruments: 145 scpi_connection = self._open_prologix(instr, opc_on_stanza=True, 146 read_timeout_seconds=5) 147 scpi_connection.SendStanza(['*WAI']) 148 scpi_connection.Close() 149 150 def test_GetIdnOpcTrue(self): 151 """ 152 Test with opc True. OPC= operation complete. Asking this 153 question *OPC? after commands blocks until the command finishes. 154 This prevents us from sending commands faster then then the 155 instrument can handle. True is usually the right setting. 156 157 """ 158 self._get_idns_and_verify(instruments=scpi_instruments, opc=True) 159 160 def test_GetIdnOpcFalse(self): 161 """ 162 Now with OPC off. 163 """ 164 self._get_idns_and_verify(instruments=scpi_instruments, opc=False) 165 166 def _open_prologix(self, instr, opc_on_stanza, read_timeout_seconds=2): 167 """ 168 Build the prologix object. 169 """ 170 ip_addr = instr['ip'] 171 name_part = instr['name_part'] 172 gpib_addr = instr['gpib_addr'] 173 log.debug("trying %s at %s" % (name_part, ip_addr)) 174 driver = prologix_scpi_driver.PrologixScpiDriver( 175 hostname=ip_addr, 176 port=1234, 177 gpib_address=gpib_addr, 178 read_timeout_seconds=read_timeout_seconds) 179 self.scpi_connection = scpi.Scpi(driver) 180 log.debug("setting opc to %s " % opc_on_stanza) 181 self.scpi_connection.opc_on_stanza = opc_on_stanza 182 return self.scpi_connection 183 184 def _get_idns_and_verify(self, instruments, opc=False): 185 """ 186 Get the idn string from all the instruments, and check that it 187 contains the desired substring. This is a quick sanity check only. 188 """ 189 for instr in instruments: 190 scpi_connection = self._open_prologix(instr, opc_on_stanza=opc) 191 response = scpi_connection.Query('*IDN?') 192 log.debug("looking for %s in response string: %s " % 193 (instr['name_part'], response)) 194 assert instr['name_part'] in response 195 self.scpi_connection.Close() 196 197 if __name__ == '__main__': 198 unittest.main() 199