Home | History | Annotate | Download | only in network
      1 # Copyright (c) 2018 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 import socket
      6 import telnetlib
      7 
      8 from autotest_lib.client.common_lib import error
      9 
     10 SHORT_TIMEOUT = 2
     11 LONG_TIMEOUT = 30
     12 
     13 class TelnetHelper(object):
     14     """Helper class to run basic string commands on a telnet host."""
     15 
     16     def __init__(self, tx_cmd_separator="\n", rx_cmd_separator="\n", prompt=""):
     17         self._tn = None
     18 
     19         self._tx_cmd_separator = tx_cmd_separator
     20         self._rx_cmd_separator = rx_cmd_separator
     21         self._prompt = prompt
     22 
     23     def open(self, hostname, port=22):
     24         """Opens telnet connection to attenuator host.
     25 
     26         @param hostname: Valid hostname
     27         @param port: Optional port number, defaults to 22
     28 
     29         """
     30         if self._tn:
     31             self._tn.close()
     32 
     33         self._tn = telnetlib.Telnet()
     34 
     35         try:
     36             self._tn.open(hostname, port, LONG_TIMEOUT)
     37         except socket.timeout as e:
     38             raise error.TestError("Timed out while opening telnet connection")
     39 
     40     def is_open(self):
     41         """Returns true if telnet connection is open."""
     42         return bool(self._tn)
     43 
     44     def close(self):
     45         """Closes telnet connection."""
     46         if self._tn:
     47             self._tn.close()
     48             self._tn = None
     49 
     50     def cmd(self, cmd_str, wait_ret=True):
     51         """Run command on attenuator.
     52 
     53         @param cmd_str: Command to run
     54         @param wait_ret: Wait for command output or not
     55         @returns command output
     56         """
     57         if not isinstance(cmd_str, str):
     58             raise error.TestError("Invalid command string %s" % cmd_str)
     59 
     60         if not self.is_open():
     61             raise error.TestError("Telnet connection not open for commands")
     62 
     63         cmd_str.strip(self._tx_cmd_separator)
     64         try:
     65             self._tn.read_until(self._prompt, SHORT_TIMEOUT)
     66         except EOFError as e:
     67             raise error.TestError("Connection closed. EOFError (%s)" % e)
     68 
     69         try:
     70             self._tn.write(cmd_str + self._tx_cmd_separator)
     71         except socket.error as e:
     72             raise error.TestError("Connection closed. Socket error (%s)." % e)
     73 
     74         if wait_ret is False:
     75             return None
     76 
     77         try:
     78             match_channel_idx, _, ret_text = \
     79                     self._tn.expect(["\S+" + self._rx_cmd_separator],
     80                                     SHORT_TIMEOUT)
     81         except EOFError as e:
     82             raise error.TestError("Connection closed. EOFError (%s)" % e)
     83 
     84         if match_channel_idx == -1:
     85             raise error.TestError("Telnet command failed to return valid data. "
     86                                   "Data returned: %s" % ret_text)
     87 
     88         ret_text = ret_text.strip()
     89 
     90         return ret_text
     91