1 import itertools 2 3 import Util 4 from ShCommands import Command, Pipeline, Seq 5 6 class ShLexer: 7 def __init__(self, data, win32Escapes = False): 8 self.data = data 9 self.pos = 0 10 self.end = len(data) 11 self.win32Escapes = win32Escapes 12 13 def eat(self): 14 c = self.data[self.pos] 15 self.pos += 1 16 return c 17 18 def look(self): 19 return self.data[self.pos] 20 21 def maybe_eat(self, c): 22 """ 23 maybe_eat(c) - Consume the character c if it is the next character, 24 returning True if a character was consumed. """ 25 if self.data[self.pos] == c: 26 self.pos += 1 27 return True 28 return False 29 30 def lex_arg_fast(self, c): 31 # Get the leading whitespace free section. 32 chunk = self.data[self.pos - 1:].split(None, 1)[0] 33 34 # If it has special characters, the fast path failed. 35 if ('|' in chunk or '&' in chunk or 36 '<' in chunk or '>' in chunk or 37 "'" in chunk or '"' in chunk or 38 '\\' in chunk): 39 return None 40 41 self.pos = self.pos - 1 + len(chunk) 42 return chunk 43 44 def lex_arg_slow(self, c): 45 if c in "'\"": 46 str = self.lex_arg_quoted(c) 47 else: 48 str = c 49 while self.pos != self.end: 50 c = self.look() 51 if c.isspace() or c in "|&": 52 break 53 elif c in '><': 54 # This is an annoying case; we treat '2>' as a single token so 55 # we don't have to track whitespace tokens. 56 57 # If the parse string isn't an integer, do the usual thing. 58 if not str.isdigit(): 59 break 60 61 # Otherwise, lex the operator and convert to a redirection 62 # token. 63 num = int(str) 64 tok = self.lex_one_token() 65 assert isinstance(tok, tuple) and len(tok) == 1 66 return (tok[0], num) 67 elif c == '"': 68 self.eat() 69 str += self.lex_arg_quoted('"') 70 elif c == "'": 71 self.eat() 72 str += self.lex_arg_quoted("'") 73 elif not self.win32Escapes and c == '\\': 74 # Outside of a string, '\\' escapes everything. 75 self.eat() 76 if self.pos == self.end: 77 Util.warning("escape at end of quoted argument in: %r" % 78 self.data) 79 return str 80 str += self.eat() 81 else: 82 str += self.eat() 83 return str 84 85 def lex_arg_quoted(self, delim): 86 str = '' 87 while self.pos != self.end: 88 c = self.eat() 89 if c == delim: 90 return str 91 elif c == '\\' and delim == '"': 92 # Inside a '"' quoted string, '\\' only escapes the quote 93 # character and backslash, otherwise it is preserved. 94 if self.pos == self.end: 95 Util.warning("escape at end of quoted argument in: %r" % 96 self.data) 97 return str 98 c = self.eat() 99 if c == '"': # 100 str += '"' 101 elif c == '\\': 102 str += '\\' 103 else: 104 str += '\\' + c 105 else: 106 str += c 107 Util.warning("missing quote character in %r" % self.data) 108 return str 109 110 def lex_arg_checked(self, c): 111 pos = self.pos 112 res = self.lex_arg_fast(c) 113 end = self.pos 114 115 self.pos = pos 116 reference = self.lex_arg_slow(c) 117 if res is not None: 118 if res != reference: 119 raise ValueError,"Fast path failure: %r != %r" % (res, reference) 120 if self.pos != end: 121 raise ValueError,"Fast path failure: %r != %r" % (self.pos, end) 122 return reference 123 124 def lex_arg(self, c): 125 return self.lex_arg_fast(c) or self.lex_arg_slow(c) 126 127 def lex_one_token(self): 128 """ 129 lex_one_token - Lex a single 'sh' token. """ 130 131 c = self.eat() 132 if c in ';!': 133 return (c,) 134 if c == '|': 135 if self.maybe_eat('|'): 136 return ('||',) 137 return (c,) 138 if c == '&': 139 if self.maybe_eat('&'): 140 return ('&&',) 141 if self.maybe_eat('>'): 142 return ('&>',) 143 return (c,) 144 if c == '>': 145 if self.maybe_eat('&'): 146 return ('>&',) 147 if self.maybe_eat('>'): 148 return ('>>',) 149 return (c,) 150 if c == '<': 151 if self.maybe_eat('&'): 152 return ('<&',) 153 if self.maybe_eat('>'): 154 return ('<<',) 155 return (c,) 156 157 return self.lex_arg(c) 158 159 def lex(self): 160 while self.pos != self.end: 161 if self.look().isspace(): 162 self.eat() 163 else: 164 yield self.lex_one_token() 165 166 ### 167 168 class ShParser: 169 def __init__(self, data, win32Escapes = False): 170 self.data = data 171 self.tokens = ShLexer(data, win32Escapes = win32Escapes).lex() 172 173 def lex(self): 174 try: 175 return self.tokens.next() 176 except StopIteration: 177 return None 178 179 def look(self): 180 next = self.lex() 181 if next is not None: 182 self.tokens = itertools.chain([next], self.tokens) 183 return next 184 185 def parse_command(self): 186 tok = self.lex() 187 if not tok: 188 raise ValueError,"empty command!" 189 if isinstance(tok, tuple): 190 raise ValueError,"syntax error near unexpected token %r" % tok[0] 191 192 args = [tok] 193 redirects = [] 194 while 1: 195 tok = self.look() 196 197 # EOF? 198 if tok is None: 199 break 200 201 # If this is an argument, just add it to the current command. 202 if isinstance(tok, str): 203 args.append(self.lex()) 204 continue 205 206 # Otherwise see if it is a terminator. 207 assert isinstance(tok, tuple) 208 if tok[0] in ('|',';','&','||','&&'): 209 break 210 211 # Otherwise it must be a redirection. 212 op = self.lex() 213 arg = self.lex() 214 if not arg: 215 raise ValueError,"syntax error near token %r" % op[0] 216 redirects.append((op, arg)) 217 218 return Command(args, redirects) 219 220 def parse_pipeline(self): 221 negate = False 222 if self.look() == ('!',): 223 self.lex() 224 negate = True 225 226 commands = [self.parse_command()] 227 while self.look() == ('|',): 228 self.lex() 229 commands.append(self.parse_command()) 230 return Pipeline(commands, negate) 231 232 def parse(self): 233 lhs = self.parse_pipeline() 234 235 while self.look(): 236 operator = self.lex() 237 assert isinstance(operator, tuple) and len(operator) == 1 238 239 if not self.look(): 240 raise ValueError, "missing argument to operator %r" % operator[0] 241 242 # FIXME: Operator precedence!! 243 lhs = Seq(lhs, operator[0], self.parse_pipeline()) 244 245 return lhs 246 247 ### 248 249 import unittest 250 251 class TestShLexer(unittest.TestCase): 252 def lex(self, str, *args, **kwargs): 253 return list(ShLexer(str, *args, **kwargs).lex()) 254 255 def test_basic(self): 256 self.assertEqual(self.lex('a|b>c&d<e'), 257 ['a', ('|',), 'b', ('>',), 'c', ('&',), 'd', 258 ('<',), 'e']) 259 260 def test_redirection_tokens(self): 261 self.assertEqual(self.lex('a2>c'), 262 ['a2', ('>',), 'c']) 263 self.assertEqual(self.lex('a 2>c'), 264 ['a', ('>',2), 'c']) 265 266 def test_quoting(self): 267 self.assertEqual(self.lex(""" 'a' """), 268 ['a']) 269 self.assertEqual(self.lex(""" "hello\\"world" """), 270 ['hello"world']) 271 self.assertEqual(self.lex(""" "hello\\'world" """), 272 ["hello\\'world"]) 273 self.assertEqual(self.lex(""" "hello\\\\world" """), 274 ["hello\\world"]) 275 self.assertEqual(self.lex(""" he"llo wo"rld """), 276 ["hello world"]) 277 self.assertEqual(self.lex(""" a\\ b a\\\\b """), 278 ["a b", "a\\b"]) 279 self.assertEqual(self.lex(""" "" "" """), 280 ["", ""]) 281 self.assertEqual(self.lex(""" a\\ b """, win32Escapes = True), 282 ['a\\', 'b']) 283 284 class TestShParse(unittest.TestCase): 285 def parse(self, str): 286 return ShParser(str).parse() 287 288 def test_basic(self): 289 self.assertEqual(self.parse('echo hello'), 290 Pipeline([Command(['echo', 'hello'], [])], False)) 291 self.assertEqual(self.parse('echo ""'), 292 Pipeline([Command(['echo', ''], [])], False)) 293 self.assertEqual(self.parse("""echo -DFOO='a'"""), 294 Pipeline([Command(['echo', '-DFOO=a'], [])], False)) 295 self.assertEqual(self.parse('echo -DFOO="a"'), 296 Pipeline([Command(['echo', '-DFOO=a'], [])], False)) 297 298 def test_redirection(self): 299 self.assertEqual(self.parse('echo hello > c'), 300 Pipeline([Command(['echo', 'hello'], 301 [((('>'),), 'c')])], False)) 302 self.assertEqual(self.parse('echo hello > c >> d'), 303 Pipeline([Command(['echo', 'hello'], [(('>',), 'c'), 304 (('>>',), 'd')])], False)) 305 self.assertEqual(self.parse('a 2>&1'), 306 Pipeline([Command(['a'], [(('>&',2), '1')])], False)) 307 308 def test_pipeline(self): 309 self.assertEqual(self.parse('a | b'), 310 Pipeline([Command(['a'], []), 311 Command(['b'], [])], 312 False)) 313 314 self.assertEqual(self.parse('a | b | c'), 315 Pipeline([Command(['a'], []), 316 Command(['b'], []), 317 Command(['c'], [])], 318 False)) 319 320 self.assertEqual(self.parse('! a'), 321 Pipeline([Command(['a'], [])], 322 True)) 323 324 def test_list(self): 325 self.assertEqual(self.parse('a ; b'), 326 Seq(Pipeline([Command(['a'], [])], False), 327 ';', 328 Pipeline([Command(['b'], [])], False))) 329 330 self.assertEqual(self.parse('a & b'), 331 Seq(Pipeline([Command(['a'], [])], False), 332 '&', 333 Pipeline([Command(['b'], [])], False))) 334 335 self.assertEqual(self.parse('a && b'), 336 Seq(Pipeline([Command(['a'], [])], False), 337 '&&', 338 Pipeline([Command(['b'], [])], False))) 339 340 self.assertEqual(self.parse('a || b'), 341 Seq(Pipeline([Command(['a'], [])], False), 342 '||', 343 Pipeline([Command(['b'], [])], False))) 344 345 self.assertEqual(self.parse('a && b || c'), 346 Seq(Seq(Pipeline([Command(['a'], [])], False), 347 '&&', 348 Pipeline([Command(['b'], [])], False)), 349 '||', 350 Pipeline([Command(['c'], [])], False))) 351 352 if __name__ == '__main__': 353 unittest.main() 354