Home | History | Annotate | Download | only in pexpect
      1 """This class extends pexpect.spawn to specialize setting up SSH connections.
      2 This adds methods for login, logout, and expecting the shell prompt.
      3 
      4 PEXPECT LICENSE
      5 
      6     This license is approved by the OSI and FSF as GPL-compatible.
      7         http://opensource.org/licenses/isc-license.txt
      8 
      9     Copyright (c) 2012, Noah Spurrier <noah (at] noah.org>
     10     PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY
     11     PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE
     12     COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES.
     13     THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
     14     WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
     15     MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
     16     ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
     17     WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
     18     ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
     19     OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
     20 
     21 """
     22 
     23 from pexpect import *
     24 import pexpect
     25 import time
     26 import os
     27 
     28 __all__ = ['ExceptionPxssh', 'pxssh']
     29 
     30 # Exception classes used by this module.
     31 class ExceptionPxssh(ExceptionPexpect):
     32     """Raised for pxssh exceptions.
     33     """
     34 
     35 class pxssh (spawn):
     36 
     37     """This class extends pexpect.spawn to specialize setting up SSH
     38     connections. This adds methods for login, logout, and expecting the shell
     39     prompt. It does various tricky things to handle many situations in the SSH
     40     login process. For example, if the session is your first login, then pxssh
     41     automatically accepts the remote certificate; or if you have public key
     42     authentication setup then pxssh won't wait for the password prompt.
     43 
     44     pxssh uses the shell prompt to synchronize output from the remote host. In
     45     order to make this more robust it sets the shell prompt to something more
     46     unique than just $ or #. This should work on most Borne/Bash or Csh style
     47     shells.
     48 
     49     Example that runs a few commands on a remote server and prints the result::
     50 
     51         import pxssh
     52         import getpass
     53         try:
     54             s = pxssh.pxssh()
     55             hostname = raw_input('hostname: ')
     56             username = raw_input('username: ')
     57             password = getpass.getpass('password: ')
     58             s.login (hostname, username, password)
     59             s.sendline ('uptime')  # run a command
     60             s.prompt()             # match the prompt
     61             print s.before         # print everything before the prompt.
     62             s.sendline ('ls -l')
     63             s.prompt()
     64             print s.before
     65             s.sendline ('df')
     66             s.prompt()
     67             print s.before
     68             s.logout()
     69         except pxssh.ExceptionPxssh, e:
     70             print "pxssh failed on login."
     71             print str(e)
     72 
     73     Note that if you have ssh-agent running while doing development with pxssh
     74     then this can lead to a lot of confusion. Many X display managers (xdm,
     75     gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI
     76     dialog box popup asking for a password during development. You should turn
     77     off any key agents during testing. The 'force_password' attribute will turn
     78     off public key authentication. This will only work if the remote SSH server
     79     is configured to allow password logins. Example of using 'force_password'
     80     attribute::
     81 
     82             s = pxssh.pxssh()
     83             s.force_password = True
     84             hostname = raw_input('hostname: ')
     85             username = raw_input('username: ')
     86             password = getpass.getpass('password: ')
     87             s.login (hostname, username, password)
     88     """
     89 
     90     def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None):
     91 
     92         spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env)
     93 
     94         self.name = '<pxssh>'
     95 
     96         #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a
     97         #slightly different string than the regular expression to match it. This
     98         #is because when you set the prompt the command will echo back, but we
     99         #don't want to match the echoed command. So if we make the set command
    100         #slightly different than the regex we eliminate the problem. To make the
    101         #set command different we add a backslash in front of $. The $ doesn't
    102         #need to be escaped, but it doesn't hurt and serves to make the set
    103         #prompt command different than the regex.
    104 
    105         # used to match the command-line prompt
    106         self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] "
    107         self.PROMPT = self.UNIQUE_PROMPT
    108 
    109         # used to set shell command-line prompt to UNIQUE_PROMPT.
    110         self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '"
    111         self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '"
    112         self.SSH_OPTS = ("-o'RSAAuthentication=no'"
    113                 + " -o 'PubkeyAuthentication=no'")
    114 # Disabling host key checking, makes you vulnerable to MITM attacks.
    115 #                + " -o 'StrictHostKeyChecking=no'"
    116 #                + " -o 'UserKnownHostsFile /dev/null' ")
    117         # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from
    118         # displaying a GUI password dialog. I have not figured out how to
    119         # disable only SSH_ASKPASS without also disabling X11 forwarding.
    120         # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying!
    121         #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'"
    122         self.force_password = False
    123         self.auto_prompt_reset = True
    124 
    125     def levenshtein_distance(self, a,b):
    126 
    127         """This calculates the Levenshtein distance between a and b.
    128         """
    129 
    130         n, m = len(a), len(b)
    131         if n > m:
    132             a,b = b,a
    133             n,m = m,n
    134         current = range(n+1)
    135         for i in range(1,m+1):
    136             previous, current = current, [i]+[0]*n
    137             for j in range(1,n+1):
    138                 add, delete = previous[j]+1, current[j-1]+1
    139                 change = previous[j-1]
    140                 if a[j-1] != b[i-1]:
    141                     change = change + 1
    142                 current[j] = min(add, delete, change)
    143         return current[n]
    144 
    145     def sync_original_prompt (self):
    146 
    147         """This attempts to find the prompt. Basically, press enter and record
    148         the response; press enter again and record the response; if the two
    149         responses are similar then assume we are at the original prompt. This
    150         is a slow function. It can take over 10 seconds. """
    151 
    152         # All of these timing pace values are magic.
    153         # I came up with these based on what seemed reliable for
    154         # connecting to a heavily loaded machine I have.
    155         self.sendline()
    156         time.sleep(0.1)
    157         # If latency is worse than these values then this will fail.
    158 
    159         try:
    160             # Clear the buffer before getting the prompt.
    161             self.read_nonblocking(size=10000,timeout=1)
    162         except TIMEOUT:
    163             pass
    164         time.sleep(0.1)
    165         self.sendline()
    166         time.sleep(0.5)
    167         x = self.read_nonblocking(size=1000,timeout=1)
    168         time.sleep(0.1)
    169         self.sendline()
    170         time.sleep(0.5)
    171         a = self.read_nonblocking(size=1000,timeout=1)
    172         time.sleep(0.1)
    173         self.sendline()
    174         time.sleep(0.5)
    175         b = self.read_nonblocking(size=1000,timeout=1)
    176         ld = self.levenshtein_distance(a,b)
    177         len_a = len(a)
    178         if len_a == 0:
    179             return False
    180         if float(ld)/len_a < 0.4:
    181             return True
    182         return False
    183 
    184     ### TODO: This is getting messy and I'm pretty sure this isn't perfect.
    185     ### TODO: I need to draw a flow chart for this.
    186     def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None):
    187 
    188         """This logs the user into the given server. It uses the
    189         'original_prompt' to try to find the prompt right after login. When it
    190         finds the prompt it immediately tries to reset the prompt to something
    191         more easily matched. The default 'original_prompt' is very optimistic
    192         and is easily fooled. It's more reliable to try to match the original
    193         prompt as exactly as possible to prevent false matches by server
    194         strings such as the "Message Of The Day". On many systems you can
    195         disable the MOTD on the remote server by creating a zero-length file
    196         called "~/.hushlogin" on the remote server. If a prompt cannot be found
    197         then this will not necessarily cause the login to fail. In the case of
    198         a timeout when looking for the prompt we assume that the original
    199         prompt was so weird that we could not match it, so we use a few tricks
    200         to guess when we have reached the prompt. Then we hope for the best and
    201         blindly try to reset the prompt to something more unique. If that fails
    202         then login() raises an ExceptionPxssh exception.
    203 
    204         In some situations it is not possible or desirable to reset the
    205         original prompt. In this case, set 'auto_prompt_reset' to False to
    206         inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh
    207         uses a unique prompt in the prompt() method. If the original prompt is
    208         not reset then this will disable the prompt() method unless you
    209         manually set the PROMPT attribute. """
    210 
    211         ssh_options = '-q'
    212         if self.force_password:
    213             ssh_options = ssh_options + ' ' + self.SSH_OPTS
    214         if port is not None:
    215             ssh_options = ssh_options + ' -p %s'%(str(port))
    216         if ssh_key is not None:
    217             try:
    218                 os.path.isfile(ssh_key)
    219             except:
    220                 raise ExceptionPxssh ('private ssh key does not exist')
    221             ssh_options = ssh_options + ' -i %s' % (ssh_key)
    222         cmd = "ssh %s -l %s %s" % (ssh_options, username, server)
    223 
    224         # This does not distinguish between a remote server 'password' prompt
    225         # and a local ssh 'passphrase' prompt (for unlocking a private key).
    226         spawn._spawn(self, cmd)
    227         i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout)
    228 
    229         # First phase
    230         if i==0:
    231             # New certificate -- always accept it.
    232             # This is what you get if SSH does not have the remote host's
    233             # public key stored in the 'known_hosts' cache.
    234             self.sendline("yes")
    235             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
    236         if i==2: # password or passphrase
    237             self.sendline(password)
    238             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
    239         if i==4:
    240             self.sendline(terminal_type)
    241             i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT])
    242 
    243         # Second phase
    244         if i==0:
    245             # This is weird. This should not happen twice in a row.
    246             self.close()
    247             raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.')
    248         elif i==1: # can occur if you have a public key pair set to authenticate.
    249             ### TODO: May NOT be OK if expect() got tricked and matched a false prompt.
    250             pass
    251         elif i==2: # password prompt again
    252             # For incorrect passwords, some ssh servers will
    253             # ask for the password again, others return 'denied' right away.
    254             # If we get the password prompt again then this means
    255             # we didn't get the password right the first time.
    256             self.close()
    257             raise ExceptionPxssh ('password refused')
    258         elif i==3: # permission denied -- password was bad.
    259             self.close()
    260             raise ExceptionPxssh ('permission denied')
    261         elif i==4: # terminal type again? WTF?
    262             self.close()
    263             raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.')
    264         elif i==5: # Timeout
    265             #This is tricky... I presume that we are at the command-line prompt.
    266             #It may be that the shell prompt was so weird that we couldn't match
    267             #it. Or it may be that we couldn't log in for some other reason. I
    268             #can't be sure, but it's safe to guess that we did login because if
    269             #I presume wrong and we are not logged in then this should be caught
    270             #later when I try to set the shell prompt.
    271             pass
    272         elif i==6: # Connection closed by remote host
    273             self.close()
    274             raise ExceptionPxssh ('connection closed')
    275         else: # Unexpected
    276             self.close()
    277             raise ExceptionPxssh ('unexpected login response')
    278         if not self.sync_original_prompt():
    279             self.close()
    280             raise ExceptionPxssh ('could not synchronize with original prompt')
    281         # We appear to be in.
    282         # set shell prompt to something unique.
    283         if auto_prompt_reset:
    284             if not self.set_unique_prompt():
    285                 self.close()
    286                 raise ExceptionPxssh ('could not set shell prompt\n'+self.before)
    287         return True
    288 
    289     def logout (self):
    290 
    291         """This sends exit to the remote shell. If there are stopped jobs then
    292         this automatically sends exit twice. """
    293 
    294         self.sendline("exit")
    295         index = self.expect([EOF, "(?i)there are stopped jobs"])
    296         if index==1:
    297             self.sendline("exit")
    298             self.expect(EOF)
    299         self.close()
    300 
    301     def prompt (self, timeout=-1):
    302 
    303         """This matches the shell prompt. This is little more than a short-cut
    304         to the expect() method. This returns True if the shell prompt was
    305         matched. This returns False if a timeout was raised. Note that if you
    306         called login() with auto_prompt_reset set to False then before calling
    307         prompt() you must set the PROMPT attribute to a regex that prompt()
    308         will use for matching the prompt. Calling prompt() will erase the
    309         contents of the 'before' attribute even if no prompt is ever matched.
    310         If timeout is not given or it is set to -1 then self.timeout is used.
    311         """
    312 
    313         if timeout == -1:
    314             timeout = self.timeout
    315         i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout)
    316         if i==1:
    317             return False
    318         return True
    319 
    320     def set_unique_prompt (self):
    321 
    322         """This sets the remote prompt to something more unique than # or $.
    323         This makes it easier for the prompt() method to match the shell prompt
    324         unambiguously. This method is called automatically by the login()
    325         method, but you may want to call it manually if you somehow reset the
    326         shell prompt. For example, if you 'su' to a different user then you
    327         will need to manually reset the prompt. This sends shell commands to
    328         the remote host to set the prompt, so this assumes the remote host is
    329         ready to receive commands.
    330 
    331         Alternatively, you may use your own prompt pattern. Just set the PROMPT
    332         attribute to a regular expression that matches it. In this case you
    333         should call login() with auto_prompt_reset=False; then set the PROMPT
    334         attribute. After that the prompt() method will try to match your prompt
    335         pattern."""
    336 
    337         self.sendline ("unset PROMPT_COMMAND")
    338         self.sendline (self.PROMPT_SET_SH) # sh-style
    339         i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
    340         if i == 0: # csh-style
    341             self.sendline (self.PROMPT_SET_CSH)
    342             i = self.expect ([TIMEOUT, self.PROMPT], timeout=10)
    343             if i == 0:
    344                 return False
    345         return True
    346 
    347 # vi:ts=4:sw=4:expandtab:ft=python:
    348