Home | History | Annotate | Download | only in system
      1 # Copyright (C) 2010 Google Inc. All rights reserved.
      2 # Copyright (C) 2009 Daniel Bates (dbates (at] intudata.com). All rights reserved.
      3 #
      4 # Redistribution and use in source and binary forms, with or without
      5 # modification, are permitted provided that the following conditions are
      6 # met:
      7 #
      8 #    * Redistributions of source code must retain the above copyright
      9 # notice, this list of conditions and the following disclaimer.
     10 #    * Redistributions in binary form must reproduce the above
     11 # copyright notice, this list of conditions and the following disclaimer
     12 # in the documentation and/or other materials provided with the
     13 # distribution.
     14 #    * Neither the name of Google Inc. nor the names of its
     15 # contributors may be used to endorse or promote products derived from
     16 # this software without specific prior written permission.
     17 #
     18 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
     19 # "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
     20 # LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
     21 # A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
     22 # OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
     23 # SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
     24 # LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
     25 # DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
     26 # THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
     27 # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
     28 # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
     29 
     30 import os
     31 import signal
     32 import subprocess
     33 import sys
     34 import unittest
     35 
     36 from webkitpy.common.system.executive import Executive, run_command, ScriptError
     37 from webkitpy.common.system.filesystem_mock import MockFileSystem
     38 from webkitpy.test import cat, echo
     39 
     40 
     41 class ScriptErrorTest(unittest.TestCase):
     42     def test_string_from_args(self):
     43         error = ScriptError()
     44         self.assertEquals(error._string_from_args(None), 'None')
     45         self.assertEquals(error._string_from_args([]), '[]')
     46         self.assertEquals(error._string_from_args(map(str, range(30))), "['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10', '11', '12', '13', '14', '15', '16', '17'...")
     47 
     48     def test_message_with_output(self):
     49         error = ScriptError('My custom message!', '', -1)
     50         self.assertEquals(error.message_with_output(), 'My custom message!')
     51         error = ScriptError('My custom message!', '', -1, 'My output.')
     52         self.assertEquals(error.message_with_output(), 'My custom message!\n\nMy output.')
     53         error = ScriptError('', 'my_command!', -1, 'My output.', '/Users/username/blah')
     54         self.assertEquals(error.message_with_output(), 'Failed to run "my_command!" exit_code: -1 cwd: /Users/username/blah\n\nMy output.')
     55         error = ScriptError('', 'my_command!', -1, 'ab' + '1' * 499)
     56         self.assertEquals(error.message_with_output(), 'Failed to run "my_command!" exit_code: -1\n\nLast 500 characters of output:\nb' + '1' * 499)
     57 
     58 def never_ending_command():
     59     """Arguments for a command that will never end (useful for testing process
     60     killing). It should be a process that is unlikely to already be running
     61     because all instances will be killed."""
     62     if sys.platform == 'win32':
     63         return ['wmic']
     64     return ['yes']
     65 
     66 
     67 class ExecutiveTest(unittest.TestCase):
     68 
     69     def assert_interpreter_for_content(self, intepreter, content):
     70         fs = MockFileSystem()
     71         file_path = None
     72         file_interpreter = None
     73 
     74         tempfile, temp_name = fs.open_binary_tempfile('')
     75         tempfile.write(content)
     76         tempfile.close()
     77         file_interpreter = Executive.interpreter_for_script(temp_name, fs)
     78 
     79         self.assertEqual(file_interpreter, intepreter)
     80 
     81     def test_interpreter_for_script(self):
     82         self.assert_interpreter_for_content(None, '')
     83         self.assert_interpreter_for_content(None, 'abcd\nefgh\nijklm')
     84         self.assert_interpreter_for_content(None, '##/usr/bin/perl')
     85         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl')
     86         self.assert_interpreter_for_content('perl', '#!/usr/bin/env perl\nfirst\nsecond')
     87         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl')
     88         self.assert_interpreter_for_content('perl', '#!/usr/bin/perl -w')
     89         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python')
     90         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/env python\nfirst\nsecond')
     91         self.assert_interpreter_for_content(sys.executable, '#!/usr/bin/python')
     92         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby')
     93         self.assert_interpreter_for_content('ruby', '#!/usr/bin/env ruby\nfirst\nsecond')
     94         self.assert_interpreter_for_content('ruby', '#!/usr/bin/ruby')
     95 
     96     def test_run_command_with_bad_command(self):
     97         def run_bad_command():
     98             run_command(["foo_bar_command_blah"], error_handler=Executive.ignore_error, return_exit_code=True)
     99         self.failUnlessRaises(OSError, run_bad_command)
    100 
    101     def test_run_command_args_type(self):
    102         executive = Executive()
    103         self.assertRaises(AssertionError, executive.run_command, "echo")
    104         self.assertRaises(AssertionError, executive.run_command, u"echo")
    105         executive.run_command(echo.command_arguments('foo'))
    106         executive.run_command(tuple(echo.command_arguments('foo')))
    107 
    108     def test_run_command_with_unicode(self):
    109         """Validate that it is safe to pass unicode() objects
    110         to Executive.run* methods, and they will return unicode()
    111         objects by default unless decode_output=False"""
    112         unicode_tor_input = u"WebKit \u2661 Tor Arne Vestb\u00F8!"
    113         if sys.platform == 'win32':
    114             encoding = 'mbcs'
    115         else:
    116             encoding = 'utf-8'
    117         encoded_tor = unicode_tor_input.encode(encoding)
    118         # On Windows, we expect the unicode->mbcs->unicode roundtrip to be
    119         # lossy. On other platforms, we expect a lossless roundtrip.
    120         if sys.platform == 'win32':
    121             unicode_tor_output = encoded_tor.decode(encoding)
    122         else:
    123             unicode_tor_output = unicode_tor_input
    124 
    125         executive = Executive()
    126 
    127         output = executive.run_command(cat.command_arguments(), input=unicode_tor_input)
    128         self.assertEquals(output, unicode_tor_output)
    129 
    130         output = executive.run_command(echo.command_arguments("-n", unicode_tor_input))
    131         self.assertEquals(output, unicode_tor_output)
    132 
    133         output = executive.run_command(echo.command_arguments("-n", unicode_tor_input), decode_output=False)
    134         self.assertEquals(output, encoded_tor)
    135 
    136         # Make sure that str() input also works.
    137         output = executive.run_command(cat.command_arguments(), input=encoded_tor, decode_output=False)
    138         self.assertEquals(output, encoded_tor)
    139 
    140         # FIXME: We should only have one run* method to test
    141         output = executive.run_and_throw_if_fail(echo.command_arguments("-n", unicode_tor_input), quiet=True)
    142         self.assertEquals(output, unicode_tor_output)
    143 
    144         output = executive.run_and_throw_if_fail(echo.command_arguments("-n", unicode_tor_input), quiet=True, decode_output=False)
    145         self.assertEquals(output, encoded_tor)
    146 
    147     def test_kill_process(self):
    148         executive = Executive()
    149         process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE)
    150         self.assertEqual(process.poll(), None)  # Process is running
    151         executive.kill_process(process.pid)
    152         # Note: Can't use a ternary since signal.SIGKILL is undefined for sys.platform == "win32"
    153         if sys.platform == "win32":
    154             # FIXME: https://bugs.webkit.org/show_bug.cgi?id=54790
    155             # We seem to get either 0 or 1 here for some reason.
    156             self.assertTrue(process.wait() in (0, 1))
    157         else:
    158             expected_exit_code = -signal.SIGKILL
    159             self.assertEqual(process.wait(), expected_exit_code)
    160         # Killing again should fail silently.
    161         executive.kill_process(process.pid)
    162 
    163     def _assert_windows_image_name(self, name, expected_windows_name):
    164         executive = Executive()
    165         windows_name = executive._windows_image_name(name)
    166         self.assertEqual(windows_name, expected_windows_name)
    167 
    168     def test_windows_image_name(self):
    169         self._assert_windows_image_name("foo", "foo.exe")
    170         self._assert_windows_image_name("foo.exe", "foo.exe")
    171         self._assert_windows_image_name("foo.com", "foo.com")
    172         # If the name looks like an extension, even if it isn't
    173         # supposed to, we have no choice but to return the original name.
    174         self._assert_windows_image_name("foo.baz", "foo.baz")
    175         self._assert_windows_image_name("foo.baz.exe", "foo.baz.exe")
    176 
    177     def test_kill_all(self):
    178         executive = Executive()
    179         # We use "yes" because it loops forever.
    180         process = subprocess.Popen(never_ending_command(), stdout=subprocess.PIPE)
    181         self.assertEqual(process.poll(), None)  # Process is running
    182         executive.kill_all(never_ending_command()[0])
    183         # Note: Can't use a ternary since signal.SIGTERM is undefined for sys.platform == "win32"
    184         if sys.platform == "cygwin":
    185             expected_exit_code = 0  # os.kill results in exit(0) for this process.
    186             self.assertEqual(process.wait(), expected_exit_code)
    187         elif sys.platform == "win32":
    188             # FIXME: https://bugs.webkit.org/show_bug.cgi?id=54790
    189             # We seem to get either 0 or 1 here for some reason.
    190             self.assertTrue(process.wait() in (0, 1))
    191         else:
    192             expected_exit_code = -signal.SIGTERM
    193             self.assertEqual(process.wait(), expected_exit_code)
    194         # Killing again should fail silently.
    195         executive.kill_all(never_ending_command()[0])
    196 
    197     def test_check_running_pid(self):
    198         executive = Executive()
    199         self.assertTrue(executive.check_running_pid(os.getpid()))
    200         # Maximum pid number on Linux is 32768 by default
    201         self.assertFalse(executive.check_running_pid(100000))
    202