Home | History | Annotate | Download | only in system
      1 # Copyright (C) 2010 Google Inc. All rights reserved.
      2 # Copyright (C) 2009 Daniel Bates (dbates (at] intudata.com). All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #    * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #    * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #    * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 import os
     31 import errno
     32 import signal
     33 import subprocess
     34 import sys
     35 import time
     36 import unittest
     37 
     38 # Since we execute this script directly as part of the unit tests, we need to ensure
     39 # that Tools/Scripts and Tools/Scripts/thirdparty are in sys.path for the next imports to work correctly.
     40 script_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
     41 if script_dir not in sys.path:
     42     sys.path.append(script_dir)
     43 third_party_py = os.path.join(script_dir, "webkitpy", "thirdparty")
     44 if third_party_py not in sys.path:
     45     sys.path.append(third_party_py)
     46 
     47 
     48 from webkitpy.common.system.executive import Executive, ScriptError
     49 from webkitpy.common.system.filesystem_mock import MockFileSystem
     50 
     51 
     52 class ScriptErrorTest(unittest.TestCase):
     53     def test_message_with_output(self):
     54         error = ScriptError('My custom message!', '', -1)
     55         self.assertEqual(error.message_with_output(), 'My custom message!')
     56         error = ScriptError('My custom message!', '', -1, 'My output.')
     57         self.assertEqual(error.message_with_output(), 'My custom message!\n\noutput: My output.')
     58         error = ScriptError('', 'my_command!', -1, 'My output.', '/Users/username/blah')
     59         self.assertEqual(error.message_with_output(), 'Failed to run "\'my_command!\'" exit_code: -1 cwd: /Users/username/blah\n\noutput: My output.')
     60         error = ScriptError('', 'my_command!', -1, 'ab' + '1' * 499)
     61         self.assertEqual(error.message_with_output(), 'Failed to run "\'my_command!\'" exit_code: -1\n\noutput: Last 500 characters of output:\nb' + '1' * 499)
     62 
     63     def test_message_with_tuple(self):
     64         error = ScriptError('', ('my', 'command'), -1, 'My output.', '/Users/username/blah')
     65         self.assertEqual(error.message_with_output(), 'Failed to run "(\'my\', \'command\')" exit_code: -1 cwd: /Users/username/blah\n\noutput: My output.')
     66 
     67 def never_ending_command():
     68     """Arguments for a command that will never end (useful for testing process
     69     killing). It should be a process that is unlikely to already be running
     70     because all instances will be killed."""
     71     if sys.platform == 'win32':
     72         return ['wmic']
     73     return ['yes']
     74 
     75 
     76 def command_line(cmd, *args):
     77     return [sys.executable, __file__, '--' + cmd] + list(args)
     78 
     79 
     80 class ExecutiveTest(unittest.TestCase):
     81     def assert_interpreter_for_content(self, intepreter, content):
     82         fs = MockFileSystem()
     83 
     84         tempfile, temp_name = fs.open_binary_tempfile('')
     85         tempfile.write(content)
     86         tempfile.close()
     87         file_interpreter = Executive.interpreter_for_script(temp_name, fs)
     88 
     89         self.assertEqual(file_interpreter, intepreter)
     90 
     91     def test_interpreter_for_script(self):
     92         self.assert_interpreter_for_content(None, '')
     93         self.assert_interpreter_for_content(None, 'abcd\nefgh\nijklm')
     94         self.assert_interpreter_for_content(None, '##/usr/bin/perl')
     95         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl')
     96         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl\nfirst\nsecond')
     97         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl')
     98         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl -w')
     99         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python')
    100         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python\nfirst\nsecond')
    101         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/python')
    102         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby')
    103         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby\nfirst\nsecond')
    104         self.assert_interpreter_for_content('ruby', '#!/usr/bin/ruby')
    105 
    106     def test_run_command_with_bad_command(self):
    107         def run_bad_command():
    108             Executive().run_command(["foo_bar_command_blah"], error_handler=Executive.ignore_error, return_exit_code=True)
    109         self.assertRaises(OSError, run_bad_command)
    110 
    111     def test_run_command_args_type(self):
    112         executive = Executive()
    113         self.assertRaises(AssertionError, executive.run_command, "echo")
    114         self.assertRaises(AssertionError, executive.run_command, u"echo")
    115         executive.run_command(command_line('echo', 'foo'))
    116         executive.run_command(tuple(command_line('echo', 'foo')))
    117 
    118     def test_auto_stringify_args(self):
    119         executive = Executive()
    120         executive.run_command(command_line('echo', 1))
    121         executive.popen(command_line('echo', 1), stdout=executive.PIPE).wait()
    122         self.assertEqual('echo 1', executive.command_for_printing(['echo', 1]))
    123 
    124     def test_popen_args(self):
    125         executive = Executive()
    126         # Explicitly naming the 'args' argument should not thow an exception.
    127         executive.popen(args=command_line('echo', 1), stdout=executive.PIPE).wait()
    128 
    129     def test_run_command_with_unicode(self):
    130         """Validate that it is safe to pass unicode() objects
    131         to Executive.run* methods, and they will return unicode()
    132         objects by default unless decode_output=False"""
    133         unicode_tor_input = u"WebKit \u2661 Tor Arne Vestb\u00F8!"
    134         if sys.platform == 'win32':
    135             encoding = 'mbcs'
    136         else:
    137             encoding = 'utf-8'
    138         encoded_tor = unicode_tor_input.encode(encoding)
    139         # On Windows, we expect the unicode->mbcs->unicode roundtrip to be
    140         # lossy. On other platforms, we expect a lossless roundtrip.
    141         if sys.platform == 'win32':
    142             unicode_tor_output = encoded_tor.decode(encoding)
    143         else:
    144             unicode_tor_output = unicode_tor_input
    145 
    146         executive = Executive()
    147 
    148         output = executive.run_command(command_line('cat'), input=unicode_tor_input)
    149         self.assertEqual(output, unicode_tor_output)
    150 
    151         output = executive.run_command(command_line('echo', unicode_tor_input))
    152         self.assertEqual(output, unicode_tor_output)
    153 
    154         output = executive.run_command(command_line('echo', unicode_tor_input), decode_output=False)
    155         self.assertEqual(output, encoded_tor)
    156 
    157         # Make sure that str() input also works.
    158         output = executive.run_command(command_line('cat'), input=encoded_tor, decode_output=False)
    159         self.assertEqual(output, encoded_tor)
    160 
    161         # FIXME: We should only have one run* method to test
    162         output = executive.run_and_throw_if_fail(command_line('echo', unicode_tor_input), quiet=True)
    163         self.assertEqual(output, unicode_tor_output)
    164 
    165         output = executive.run_and_throw_if_fail(command_line('echo', unicode_tor_input), quiet=True, decode_output=False)
    166         self.assertEqual(output, encoded_tor)
    167 
    168     def test_kill_process(self):
    169         executive = Executive()
    170         process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE)
    171         self.assertEqual(process.poll(), None)  # Process is running
    172         executive.kill_process(process.pid)
    173 
    174         # Killing again should fail silently.
    175         executive.kill_process(process.pid)
    176 
    177     def _assert_windows_image_name(self, name, expected_windows_name):
    178         executive = Executive()
    179         windows_name = executive._windows_image_name(name)
    180         self.assertEqual(windows_name, expected_windows_name)
    181 
    182     def test_windows_image_name(self):
    183         self._assert_windows_image_name("foo", "foo.exe")
    184         self._assert_windows_image_name("foo.exe", "foo.exe")
    185         self._assert_windows_image_name("foo.com", "foo.com")
    186         # If the name looks like an extension, even if it isn't
    187         # supposed to, we have no choice but to return the original name.
    188         self._assert_windows_image_name("foo.baz", "foo.baz")
    189         self._assert_windows_image_name("foo.baz.exe", "foo.baz.exe")
    190 
    191     def test_check_running_pid(self):
    192         executive = Executive()
    193         self.assertTrue(executive.check_running_pid(os.getpid()))
    194         # Maximum pid number on Linux is 32768 by default
    195         self.assertFalse(executive.check_running_pid(100000))
    196 
    197     def test_running_pids(self):
    198         if sys.platform in ("win32", "cygwin"):
    199             return  # This function isn't implemented on Windows yet.
    200 
    201         executive = Executive()
    202         pids = executive.running_pids()
    203         self.assertIn(os.getpid(), pids)
    204 
    205     def test_run_in_parallel_assert_nonempty(self):
    206         self.assertRaises(AssertionError, Executive().run_in_parallel, [])
    207 
    208 
    209 def main(platform, stdin, stdout, cmd, args):
    210     if platform == 'win32' and hasattr(stdout, 'fileno'):
    211         import msvcrt
    212         msvcrt.setmode(stdout.fileno(), os.O_BINARY)
    213     if cmd == '--cat':
    214         stdout.write(stdin.read())
    215     elif cmd == '--echo':
    216         stdout.write(' '.join(args))
    217     return 0
    218 
    219 if __name__ == '__main__' and len(sys.argv) > 1 and sys.argv[1] in ('--cat', '--echo'):
    220     sys.exit(main(sys.platform, sys.stdin, sys.stdout, sys.argv[1], sys.argv[2:]))
    221