1 """This class extends pexpect.spawn to specialize setting up SSH connections. 2 This adds methods for login, logout, and expecting the shell prompt. 3 4 PEXPECT LICENSE 5 6 This license is approved by the OSI and FSF as GPL-compatible. 7 http://opensource.org/licenses/isc-license.txt 8 9 Copyright (c) 2012, Noah Spurrier <noah (at] noah.org> 10 PERMISSION TO USE, COPY, MODIFY, AND/OR DISTRIBUTE THIS SOFTWARE FOR ANY 11 PURPOSE WITH OR WITHOUT FEE IS HEREBY GRANTED, PROVIDED THAT THE ABOVE 12 COPYRIGHT NOTICE AND THIS PERMISSION NOTICE APPEAR IN ALL COPIES. 13 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES 14 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 15 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR 16 ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 17 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 18 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF 19 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 20 21 """ 22 23 from pexpect import * 24 import pexpect 25 import time 26 import os 27 28 __all__ = ['ExceptionPxssh', 'pxssh'] 29 30 # Exception classes used by this module. 31 class ExceptionPxssh(ExceptionPexpect): 32 """Raised for pxssh exceptions. 33 """ 34 35 class pxssh (spawn): 36 37 """This class extends pexpect.spawn to specialize setting up SSH 38 connections. This adds methods for login, logout, and expecting the shell 39 prompt. It does various tricky things to handle many situations in the SSH 40 login process. For example, if the session is your first login, then pxssh 41 automatically accepts the remote certificate; or if you have public key 42 authentication setup then pxssh won't wait for the password prompt. 43 44 pxssh uses the shell prompt to synchronize output from the remote host. In 45 order to make this more robust it sets the shell prompt to something more 46 unique than just $ or #. This should work on most Borne/Bash or Csh style 47 shells. 48 49 Example that runs a few commands on a remote server and prints the result:: 50 51 import pxssh 52 import getpass 53 try: 54 s = pxssh.pxssh() 55 hostname = raw_input('hostname: ') 56 username = raw_input('username: ') 57 password = getpass.getpass('password: ') 58 s.login (hostname, username, password) 59 s.sendline ('uptime') # run a command 60 s.prompt() # match the prompt 61 print s.before # print everything before the prompt. 62 s.sendline ('ls -l') 63 s.prompt() 64 print s.before 65 s.sendline ('df') 66 s.prompt() 67 print s.before 68 s.logout() 69 except pxssh.ExceptionPxssh, e: 70 print "pxssh failed on login." 71 print str(e) 72 73 Note that if you have ssh-agent running while doing development with pxssh 74 then this can lead to a lot of confusion. Many X display managers (xdm, 75 gdm, kdm, etc.) will automatically start a GUI agent. You may see a GUI 76 dialog box popup asking for a password during development. You should turn 77 off any key agents during testing. The 'force_password' attribute will turn 78 off public key authentication. This will only work if the remote SSH server 79 is configured to allow password logins. Example of using 'force_password' 80 attribute:: 81 82 s = pxssh.pxssh() 83 s.force_password = True 84 hostname = raw_input('hostname: ') 85 username = raw_input('username: ') 86 password = getpass.getpass('password: ') 87 s.login (hostname, username, password) 88 """ 89 90 def __init__ (self, timeout=30, maxread=2000, searchwindowsize=None, logfile=None, cwd=None, env=None): 91 92 spawn.__init__(self, None, timeout=timeout, maxread=maxread, searchwindowsize=searchwindowsize, logfile=logfile, cwd=cwd, env=env) 93 94 self.name = '<pxssh>' 95 96 #SUBTLE HACK ALERT! Note that the command that SETS the prompt uses a 97 #slightly different string than the regular expression to match it. This 98 #is because when you set the prompt the command will echo back, but we 99 #don't want to match the echoed command. So if we make the set command 100 #slightly different than the regex we eliminate the problem. To make the 101 #set command different we add a backslash in front of $. The $ doesn't 102 #need to be escaped, but it doesn't hurt and serves to make the set 103 #prompt command different than the regex. 104 105 # used to match the command-line prompt 106 self.UNIQUE_PROMPT = "\[PEXPECT\][\$\#] " 107 self.PROMPT = self.UNIQUE_PROMPT 108 109 # used to set shell command-line prompt to UNIQUE_PROMPT. 110 self.PROMPT_SET_SH = "PS1='[PEXPECT]\$ '" 111 self.PROMPT_SET_CSH = "set prompt='[PEXPECT]\$ '" 112 self.SSH_OPTS = ("-o'RSAAuthentication=no'" 113 + " -o 'PubkeyAuthentication=no'") 114 # Disabling host key checking, makes you vulnerable to MITM attacks. 115 # + " -o 'StrictHostKeyChecking=no'" 116 # + " -o 'UserKnownHostsFile /dev/null' ") 117 # Disabling X11 forwarding gets rid of the annoying SSH_ASKPASS from 118 # displaying a GUI password dialog. I have not figured out how to 119 # disable only SSH_ASKPASS without also disabling X11 forwarding. 120 # Unsetting SSH_ASKPASS on the remote side doesn't disable it! Annoying! 121 #self.SSH_OPTS = "-x -o'RSAAuthentication=no' -o 'PubkeyAuthentication=no'" 122 self.force_password = False 123 self.auto_prompt_reset = True 124 125 def levenshtein_distance(self, a,b): 126 127 """This calculates the Levenshtein distance between a and b. 128 """ 129 130 n, m = len(a), len(b) 131 if n > m: 132 a,b = b,a 133 n,m = m,n 134 current = range(n+1) 135 for i in range(1,m+1): 136 previous, current = current, [i]+[0]*n 137 for j in range(1,n+1): 138 add, delete = previous[j]+1, current[j-1]+1 139 change = previous[j-1] 140 if a[j-1] != b[i-1]: 141 change = change + 1 142 current[j] = min(add, delete, change) 143 return current[n] 144 145 def sync_original_prompt (self): 146 147 """This attempts to find the prompt. Basically, press enter and record 148 the response; press enter again and record the response; if the two 149 responses are similar then assume we are at the original prompt. This 150 is a slow function. It can take over 10 seconds. """ 151 152 # All of these timing pace values are magic. 153 # I came up with these based on what seemed reliable for 154 # connecting to a heavily loaded machine I have. 155 self.sendline() 156 time.sleep(0.1) 157 # If latency is worse than these values then this will fail. 158 159 try: 160 # Clear the buffer before getting the prompt. 161 self.read_nonblocking(size=10000,timeout=1) 162 except TIMEOUT: 163 pass 164 time.sleep(0.1) 165 self.sendline() 166 time.sleep(0.5) 167 x = self.read_nonblocking(size=1000,timeout=1) 168 time.sleep(0.1) 169 self.sendline() 170 time.sleep(0.5) 171 a = self.read_nonblocking(size=1000,timeout=1) 172 time.sleep(0.1) 173 self.sendline() 174 time.sleep(0.5) 175 b = self.read_nonblocking(size=1000,timeout=1) 176 ld = self.levenshtein_distance(a,b) 177 len_a = len(a) 178 if len_a == 0: 179 return False 180 if float(ld)/len_a < 0.4: 181 return True 182 return False 183 184 ### TODO: This is getting messy and I'm pretty sure this isn't perfect. 185 ### TODO: I need to draw a flow chart for this. 186 def login (self,server,username,password='',terminal_type='ansi',original_prompt=r"[#$]",login_timeout=10,port=None,auto_prompt_reset=True,ssh_key=None): 187 188 """This logs the user into the given server. It uses the 189 'original_prompt' to try to find the prompt right after login. When it 190 finds the prompt it immediately tries to reset the prompt to something 191 more easily matched. The default 'original_prompt' is very optimistic 192 and is easily fooled. It's more reliable to try to match the original 193 prompt as exactly as possible to prevent false matches by server 194 strings such as the "Message Of The Day". On many systems you can 195 disable the MOTD on the remote server by creating a zero-length file 196 called "~/.hushlogin" on the remote server. If a prompt cannot be found 197 then this will not necessarily cause the login to fail. In the case of 198 a timeout when looking for the prompt we assume that the original 199 prompt was so weird that we could not match it, so we use a few tricks 200 to guess when we have reached the prompt. Then we hope for the best and 201 blindly try to reset the prompt to something more unique. If that fails 202 then login() raises an ExceptionPxssh exception. 203 204 In some situations it is not possible or desirable to reset the 205 original prompt. In this case, set 'auto_prompt_reset' to False to 206 inhibit setting the prompt to the UNIQUE_PROMPT. Remember that pxssh 207 uses a unique prompt in the prompt() method. If the original prompt is 208 not reset then this will disable the prompt() method unless you 209 manually set the PROMPT attribute. """ 210 211 ssh_options = '-q' 212 if self.force_password: 213 ssh_options = ssh_options + ' ' + self.SSH_OPTS 214 if port is not None: 215 ssh_options = ssh_options + ' -p %s'%(str(port)) 216 if ssh_key is not None: 217 try: 218 os.path.isfile(ssh_key) 219 except: 220 raise ExceptionPxssh ('private ssh key does not exist') 221 ssh_options = ssh_options + ' -i %s' % (ssh_key) 222 cmd = "ssh %s -l %s %s" % (ssh_options, username, server) 223 224 # This does not distinguish between a remote server 'password' prompt 225 # and a local ssh 'passphrase' prompt (for unlocking a private key). 226 spawn._spawn(self, cmd) 227 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT, "(?i)connection closed by remote host"], timeout=login_timeout) 228 229 # First phase 230 if i==0: 231 # New certificate -- always accept it. 232 # This is what you get if SSH does not have the remote host's 233 # public key stored in the 'known_hosts' cache. 234 self.sendline("yes") 235 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 236 if i==2: # password or passphrase 237 self.sendline(password) 238 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 239 if i==4: 240 self.sendline(terminal_type) 241 i = self.expect(["(?i)are you sure you want to continue connecting", original_prompt, "(?i)(?:password)|(?:passphrase for key)", "(?i)permission denied", "(?i)terminal type", TIMEOUT]) 242 243 # Second phase 244 if i==0: 245 # This is weird. This should not happen twice in a row. 246 self.close() 247 raise ExceptionPxssh ('Weird error. Got "are you sure" prompt twice.') 248 elif i==1: # can occur if you have a public key pair set to authenticate. 249 ### TODO: May NOT be OK if expect() got tricked and matched a false prompt. 250 pass 251 elif i==2: # password prompt again 252 # For incorrect passwords, some ssh servers will 253 # ask for the password again, others return 'denied' right away. 254 # If we get the password prompt again then this means 255 # we didn't get the password right the first time. 256 self.close() 257 raise ExceptionPxssh ('password refused') 258 elif i==3: # permission denied -- password was bad. 259 self.close() 260 raise ExceptionPxssh ('permission denied') 261 elif i==4: # terminal type again? WTF? 262 self.close() 263 raise ExceptionPxssh ('Weird error. Got "terminal type" prompt twice.') 264 elif i==5: # Timeout 265 #This is tricky... I presume that we are at the command-line prompt. 266 #It may be that the shell prompt was so weird that we couldn't match 267 #it. Or it may be that we couldn't log in for some other reason. I 268 #can't be sure, but it's safe to guess that we did login because if 269 #I presume wrong and we are not logged in then this should be caught 270 #later when I try to set the shell prompt. 271 pass 272 elif i==6: # Connection closed by remote host 273 self.close() 274 raise ExceptionPxssh ('connection closed') 275 else: # Unexpected 276 self.close() 277 raise ExceptionPxssh ('unexpected login response') 278 if not self.sync_original_prompt(): 279 self.close() 280 raise ExceptionPxssh ('could not synchronize with original prompt') 281 # We appear to be in. 282 # set shell prompt to something unique. 283 if auto_prompt_reset: 284 if not self.set_unique_prompt(): 285 self.close() 286 raise ExceptionPxssh ('could not set shell prompt\n'+self.before) 287 return True 288 289 def logout (self): 290 291 """This sends exit to the remote shell. If there are stopped jobs then 292 this automatically sends exit twice. """ 293 294 self.sendline("exit") 295 index = self.expect([EOF, "(?i)there are stopped jobs"]) 296 if index==1: 297 self.sendline("exit") 298 self.expect(EOF) 299 self.close() 300 301 def prompt (self, timeout=-1): 302 303 """This matches the shell prompt. This is little more than a short-cut 304 to the expect() method. This returns True if the shell prompt was 305 matched. This returns False if a timeout was raised. Note that if you 306 called login() with auto_prompt_reset set to False then before calling 307 prompt() you must set the PROMPT attribute to a regex that prompt() 308 will use for matching the prompt. Calling prompt() will erase the 309 contents of the 'before' attribute even if no prompt is ever matched. 310 If timeout is not given or it is set to -1 then self.timeout is used. 311 """ 312 313 if timeout == -1: 314 timeout = self.timeout 315 i = self.expect([self.PROMPT, TIMEOUT], timeout=timeout) 316 if i==1: 317 return False 318 return True 319 320 def set_unique_prompt (self): 321 322 """This sets the remote prompt to something more unique than # or $. 323 This makes it easier for the prompt() method to match the shell prompt 324 unambiguously. This method is called automatically by the login() 325 method, but you may want to call it manually if you somehow reset the 326 shell prompt. For example, if you 'su' to a different user then you 327 will need to manually reset the prompt. This sends shell commands to 328 the remote host to set the prompt, so this assumes the remote host is 329 ready to receive commands. 330 331 Alternatively, you may use your own prompt pattern. Just set the PROMPT 332 attribute to a regular expression that matches it. In this case you 333 should call login() with auto_prompt_reset=False; then set the PROMPT 334 attribute. After that the prompt() method will try to match your prompt 335 pattern.""" 336 337 self.sendline ("unset PROMPT_COMMAND") 338 self.sendline (self.PROMPT_SET_SH) # sh-style 339 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 340 if i == 0: # csh-style 341 self.sendline (self.PROMPT_SET_CSH) 342 i = self.expect ([TIMEOUT, self.PROMPT], timeout=10) 343 if i == 0: 344 return False 345 return True 346 347 # vi:ts=4:sw=4:expandtab:ft=python: 348