Home | History | Annotate | Download | only in attenuator_lib
      1 
      2 #   Copyright 2016- The Android Open Source Project
      3 #
      4 #   Licensed under the Apache License, Version 2.0 (the "License");
      5 #   you may not use this file except in compliance with the License.
      6 #   You may obtain a copy of the License at
      7 #
      8 #       http://www.apache.org/licenses/LICENSE-2.0
      9 #
     10 #   Unless required by applicable law or agreed to in writing, software
     11 #   distributed under the License is distributed on an "AS IS" BASIS,
     12 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     13 #   See the License for the specific language governing permissions and
     14 #   limitations under the License.
     15 """
     16 Helper module for common telnet capability to communicate with AttenuatorInstrument(s).
     17 
     18 User code shouldn't need to directly access this class.
     19 """
     20 
     21 import telnetlib
     22 from vts.utils.python.controllers import attenuator
     23 
     24 
     25 def _ascii_string(uc_string):
     26     return str(uc_string).encode('ASCII')
     27 
     28 
     29 class _TNHelper():
     30     #This is an internal helper class for Telnet+SCPI command-based instruments.
     31     #It should only be used by those implemention control libraries and not by any user code
     32     # directly
     33 
     34     def __init__(self,
     35                  tx_cmd_separator="\n",
     36                  rx_cmd_separator="\n",
     37                  prompt=""):
     38         self._tn = None
     39 
     40         self.tx_cmd_separator = tx_cmd_separator
     41         self.rx_cmd_separator = rx_cmd_separator
     42         self.prompt = prompt
     43 
     44     def open(self, host, port=23):
     45         if self._tn:
     46             self._tn.close()
     47 
     48         self._tn = telnetlib.Telnet()
     49         self._tn.open(host, port, 10)
     50 
     51     def is_open(self):
     52         return bool(self._tn)
     53 
     54     def close(self):
     55         if self._tn:
     56             self._tn.close()
     57             self._tn = None
     58 
     59     def cmd(self, cmd_str, wait_ret=True):
     60         if not isinstance(cmd_str, str):
     61             raise TypeError("Invalid command string", cmd_str)
     62 
     63         if not self.is_open():
     64             raise attenuator.InvalidOperationError(
     65                 "Telnet connection not open for commands")
     66 
     67         cmd_str.strip(self.tx_cmd_separator)
     68         self._tn.read_until(_ascii_string(self.prompt), 2)
     69         self._tn.write(_ascii_string(cmd_str + self.tx_cmd_separator))
     70 
     71         if wait_ret is False:
     72             return None
     73 
     74         match_idx, match_val, ret_text = \
     75             self._tn.expect([_ascii_string("\S+"+self.rx_cmd_separator)], 1)
     76 
     77         if match_idx == -1:
     78             raise attenuator.InvalidDataError(
     79                 "Telnet command failed to return valid data")
     80 
     81         ret_text = ret_text.decode()
     82         ret_text = ret_text.strip(self.tx_cmd_separator + self.rx_cmd_separator
     83                                   + self.prompt)
     84 
     85         return ret_text
     86