Home | History | Annotate | Download | only in rf_switch
      1 # Copyright (c) 2017 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """Interface for SCPI Protocol.
      6 
      7 Helper module to communicate with devices that uses SCPI protocol.
      8 
      9 https://en.wikipedia.org/wiki/Standard_Commands_for_Programmable_Instruments
     10 
     11 This will be used by RF Switch that was designed to connect WiFi AP and
     12 WiFi Clients RF enclosures for interoperability testing.
     13 
     14 """
     15 
     16 import logging
     17 import socket
     18 import sys
     19 
     20 
     21 class ScpiException(Exception):
     22     """Exception for SCPI Errors."""
     23 
     24     def __init__(self, msg=None, cause=None):
     25         messages = []
     26         if msg:
     27             messages.append(msg)
     28         if cause:
     29             messages.append('Wrapping exception: %s: %s' % (
     30                 type(cause).__name__, str(cause)))
     31         super(ScpiException, self).__init__(', '.join(messages))
     32 
     33 
     34 class Scpi(object):
     35     """Controller for devices using SCPI protocol."""
     36 
     37     SCPI_PORT = 5025
     38     DEFAULT_READ_LEN = 4096
     39 
     40     CMD_IDENTITY = '*IDN?'
     41     CMD_RESET = '*RST'
     42     CMD_STATUS = '*STB?'
     43     CMD_ERROR_CHECK = 'SYST:ERR?'
     44 
     45     def __init__(self, host, port=SCPI_PORT):
     46         """
     47         Controller for devices using SCPI protocol.
     48 
     49         @param host: hostname or IP address of device using SCPI protocol
     50         @param port: Int SCPI port number (default 5025)
     51 
     52         @raises SCPIException: on error connecting to device
     53 
     54         """
     55         self.host = host
     56         self.port = port
     57 
     58         # Open a socket connection for communication with chassis.
     59         try:
     60             self.socket = socket.socket()
     61             self.socket.connect((host, port))
     62         except (socket.error, socket.timeout) as e:
     63             logging.error('Error connecting to SCPI device.')
     64             raise ScpiException(cause=e), None, sys.exc_info()[2]
     65 
     66     def close(self):
     67         """Close the connection."""
     68         if hasattr(self, 'socket'):
     69             self.socket.close()
     70             del self.socket
     71 
     72     def write(self, data):
     73         """Send data to socket.
     74 
     75         @param data: Data to send
     76 
     77         @returns number of bytes sent
     78 
     79         """
     80         return self.socket.send(data)
     81 
     82     def read(self, buffer_size=DEFAULT_READ_LEN):
     83         """Safely read the query response.
     84 
     85         @param buffer_size: Int max data to read at once (default 4096)
     86 
     87         @returns String data read from the socket
     88 
     89         """
     90         return str(self.socket.recv(buffer_size))
     91 
     92     def query(self, data, buffer_size=DEFAULT_READ_LEN):
     93         """Send the query and get response.
     94 
     95         @param data: data (Query) to send
     96         @param buffer_size: Int max data to read at once (default 4096)
     97 
     98         @returns String data read from the socket
     99 
    100         """
    101         self.write(data)
    102         return self.read(buffer_size)
    103 
    104     def info(self):
    105         """Get Chassis Info.
    106 
    107         @returns dictionary information of Chassis
    108 
    109         """
    110         # Returns a comma separated text as below converted to dict.
    111         # 'VTI Instruments Corporation,EX7200-S-11539,138454,3.13.8\n'
    112         return dict(
    113             zip(('Manufacturer', 'Model', 'Serial', 'Version'),
    114                 self.query('%s\n' % self.CMD_IDENTITY)
    115                 .strip().split(',', 3)))
    116 
    117     def reset(self):
    118         """Reset the chassis.
    119 
    120         @returns number of bytes sent
    121         """
    122         return self.write('%s\n' % self.CMD_RESET)
    123 
    124     def status(self):
    125         """Get status of relays.
    126 
    127         @returns Int status of relays
    128 
    129         """
    130         return int(self.query('%s\n' % self.CMD_STATUS))
    131 
    132     def error_query(self):
    133         """Check for any error.
    134 
    135         @returns tuple of error code and error message
    136 
    137         """
    138         code, msg = self.query('%s\n' % self.CMD_ERROR_CHECK).split(', ')
    139         return int(code), msg.strip().strip('"')
    140