1 # Copyright (C) 2010 Google Inc. All rights reserved. 2 # Copyright (C) 2009 Daniel Bates (dbates (at] intudata.com). All rights reserved. 3 # 4 # Redistribution and use in source and binary forms, with or without 5 # modification, are permitted provided that the following conditions are 6 # met: 7 # 8 # * Redistributions of source code must retain the above copyright 9 # notice, this list of conditions and the following disclaimer. 10 # * Redistributions in binary form must reproduce the above 11 # copyright notice, this list of conditions and the following disclaimer 12 # in the documentation and/or other materials provided with the 13 # distribution. 14 # * Neither the name of Google Inc. nor the names of its 15 # contributors may be used to endorse or promote products derived from 16 # this software without specific prior written permission. 17 # 18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 import os 31 import errno 32 import signal 33 import subprocess 34 import sys 35 import time 36 import unittest 37 38 # Since we execute this script directly as part of the unit tests, we need to ensure 39 # that Tools/Scripts and Tools/Scripts/thirdparty are in sys.path for the next imports to work correctly. 40 script_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))) 41 if script_dir not in sys.path: 42 sys.path.append(script_dir) 43 third_party_py = os.path.join(script_dir, "webkitpy", "thirdparty") 44 if third_party_py not in sys.path: 45 sys.path.append(third_party_py) 46 47 48 from webkitpy.common.system.executive import Executive, ScriptError 49 from webkitpy.common.system.filesystem_mock import MockFileSystem 50 51 52 class ScriptErrorTest(unittest.TestCase): 53 def test_message_with_output(self): 54 error = ScriptError('My custom message!', '', -1) 55 self.assertEqual(error.message_with_output(), 'My custom message!') 56 error = ScriptError('My custom message!', '', -1, 'My output.') 57 self.assertEqual(error.message_with_output(), 'My custom message!\n\noutput: My output.') 58 error = ScriptError('', 'my_command!', -1, 'My output.', '/Users/username/blah') 59 self.assertEqual(error.message_with_output(), 'Failed to run "\'my_command!\'" exit_code: -1 cwd: /Users/username/blah\n\noutput: My output.') 60 error = ScriptError('', 'my_command!', -1, 'ab' + '1' * 499) 61 self.assertEqual(error.message_with_output(), 'Failed to run "\'my_command!\'" exit_code: -1\n\noutput: Last 500 characters of output:\nb' + '1' * 499) 62 63 def test_message_with_tuple(self): 64 error = ScriptError('', ('my', 'command'), -1, 'My output.', '/Users/username/blah') 65 self.assertEqual(error.message_with_output(), 'Failed to run "(\'my\', \'command\')" exit_code: -1 cwd: /Users/username/blah\n\noutput: My output.') 66 67 def never_ending_command(): 68 """Arguments for a command that will never end (useful for testing process 69 killing). It should be a process that is unlikely to already be running 70 because all instances will be killed.""" 71 if sys.platform == 'win32': 72 return ['wmic'] 73 return ['yes'] 74 75 76 def command_line(cmd, *args): 77 return [sys.executable, __file__, '--' + cmd] + list(args) 78 79 80 class ExecutiveTest(unittest.TestCase): 81 def assert_interpreter_for_content(self, intepreter, content): 82 fs = MockFileSystem() 83 84 tempfile, temp_name = fs.open_binary_tempfile('') 85 tempfile.write(content) 86 tempfile.close() 87 file_interpreter = Executive.interpreter_for_script(temp_name, fs) 88 89 self.assertEqual(file_interpreter, intepreter) 90 91 def test_interpreter_for_script(self): 92 self.assert_interpreter_for_content(None, '') 93 self.assert_interpreter_for_content(None, 'abcd\nefgh\nijklm') 94 self.assert_interpreter_for_content(None, '##/usr/bin/perl') 95 self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl') 96 self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl\nfirst\nsecond') 97 self.assert_interpreter_for_content('perl', '#!/usr/bin/perl') 98 self.assert_interpreter_for_content('perl', '#!/usr/bin/perl -w') 99 self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python') 100 self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python\nfirst\nsecond') 101 self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/python') 102 self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby') 103 self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby\nfirst\nsecond') 104 self.assert_interpreter_for_content('ruby', '#!/usr/bin/ruby') 105 106 def test_run_command_with_bad_command(self): 107 def run_bad_command(): 108 Executive().run_command(["foo_bar_command_blah"], error_handler=Executive.ignore_error, return_exit_code=True) 109 self.assertRaises(OSError, run_bad_command) 110 111 def test_run_command_args_type(self): 112 executive = Executive() 113 self.assertRaises(AssertionError, executive.run_command, "echo") 114 self.assertRaises(AssertionError, executive.run_command, u"echo") 115 executive.run_command(command_line('echo', 'foo')) 116 executive.run_command(tuple(command_line('echo', 'foo'))) 117 118 def test_auto_stringify_args(self): 119 executive = Executive() 120 executive.run_command(command_line('echo', 1)) 121 executive.popen(command_line('echo', 1), stdout=executive.PIPE).wait() 122 self.assertEqual('echo 1', executive.command_for_printing(['echo', 1])) 123 124 def test_popen_args(self): 125 executive = Executive() 126 # Explicitly naming the 'args' argument should not thow an exception. 127 executive.popen(args=command_line('echo', 1), stdout=executive.PIPE).wait() 128 129 def test_run_command_with_unicode(self): 130 """Validate that it is safe to pass unicode() objects 131 to Executive.run* methods, and they will return unicode() 132 objects by default unless decode_output=False""" 133 unicode_tor_input = u"WebKit \u2661 Tor Arne Vestb\u00F8!" 134 if sys.platform == 'win32': 135 encoding = 'mbcs' 136 else: 137 encoding = 'utf-8' 138 encoded_tor = unicode_tor_input.encode(encoding) 139 # On Windows, we expect the unicode->mbcs->unicode roundtrip to be 140 # lossy. On other platforms, we expect a lossless roundtrip. 141 if sys.platform == 'win32': 142 unicode_tor_output = encoded_tor.decode(encoding) 143 else: 144 unicode_tor_output = unicode_tor_input 145 146 executive = Executive() 147 148 output = executive.run_command(command_line('cat'), input=unicode_tor_input) 149 self.assertEqual(output, unicode_tor_output) 150 151 output = executive.run_command(command_line('echo', unicode_tor_input)) 152 self.assertEqual(output, unicode_tor_output) 153 154 output = executive.run_command(command_line('echo', unicode_tor_input), decode_output=False) 155 self.assertEqual(output, encoded_tor) 156 157 # Make sure that str() input also works. 158 output = executive.run_command(command_line('cat'), input=encoded_tor, decode_output=False) 159 self.assertEqual(output, encoded_tor) 160 161 # FIXME: We should only have one run* method to test 162 output = executive.run_and_throw_if_fail(command_line('echo', unicode_tor_input), quiet=True) 163 self.assertEqual(output, unicode_tor_output) 164 165 output = executive.run_and_throw_if_fail(command_line('echo', unicode_tor_input), quiet=True, decode_output=False) 166 self.assertEqual(output, encoded_tor) 167 168 def test_kill_process(self): 169 executive = Executive() 170 process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE) 171 self.assertEqual(process.poll(), None) # Process is running 172 executive.kill_process(process.pid) 173 174 # Killing again should fail silently. 175 executive.kill_process(process.pid) 176 177 def _assert_windows_image_name(self, name, expected_windows_name): 178 executive = Executive() 179 windows_name = executive._windows_image_name(name) 180 self.assertEqual(windows_name, expected_windows_name) 181 182 def test_windows_image_name(self): 183 self._assert_windows_image_name("foo", "foo.exe") 184 self._assert_windows_image_name("foo.exe", "foo.exe") 185 self._assert_windows_image_name("foo.com", "foo.com") 186 # If the name looks like an extension, even if it isn't 187 # supposed to, we have no choice but to return the original name. 188 self._assert_windows_image_name("foo.baz", "foo.baz") 189 self._assert_windows_image_name("foo.baz.exe", "foo.baz.exe") 190 191 def test_check_running_pid(self): 192 executive = Executive() 193 self.assertTrue(executive.check_running_pid(os.getpid())) 194 # Maximum pid number on Linux is 32768 by default 195 self.assertFalse(executive.check_running_pid(100000)) 196 197 def test_running_pids(self): 198 if sys.platform in ("win32", "cygwin"): 199 return # This function isn't implemented on Windows yet. 200 201 executive = Executive() 202 pids = executive.running_pids() 203 self.assertIn(os.getpid(), pids) 204 205 def test_run_in_parallel_assert_nonempty(self): 206 self.assertRaises(AssertionError, Executive().run_in_parallel, []) 207 208 209 def main(platform, stdin, stdout, cmd, args): 210 if platform == 'win32' and hasattr(stdout, 'fileno'): 211 import msvcrt 212 msvcrt.setmode(stdout.fileno(), os.O_BINARY) 213 if cmd == '--cat': 214 stdout.write(stdin.read()) 215 elif cmd == '--echo': 216 stdout.write(' '.join(args)) 217 return 0 218 219 if __name__ == '__main__' and len(sys.argv) > 1 and sys.argv[1] in ('--cat', '--echo'): 220 sys.exit(main(sys.platform, sys.stdin, sys.stdout, sys.argv[1], sys.argv[2:])) 221