Home | History | Annotate | Download | only in utils
      1 #!/usr/bin/env python
      2 # Copyright 2013 The Chromium Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 """Tests for the cmd_helper module."""
      7 
      8 import unittest
      9 import subprocess
     10 import sys
     11 import time
     12 
     13 from devil import devil_env
     14 from devil.utils import cmd_helper
     15 
     16 with devil_env.SysPath(devil_env.PYMOCK_PATH):
     17   import mock  # pylint: disable=import-error
     18 
     19 
     20 class CmdHelperSingleQuoteTest(unittest.TestCase):
     21 
     22   def testSingleQuote_basic(self):
     23     self.assertEquals('hello',
     24                       cmd_helper.SingleQuote('hello'))
     25 
     26   def testSingleQuote_withSpaces(self):
     27     self.assertEquals("'hello world'",
     28                       cmd_helper.SingleQuote('hello world'))
     29 
     30   def testSingleQuote_withUnsafeChars(self):
     31     self.assertEquals("""'hello'"'"'; rm -rf /'""",
     32                       cmd_helper.SingleQuote("hello'; rm -rf /"))
     33 
     34   def testSingleQuote_dontExpand(self):
     35     test_string = 'hello $TEST_VAR'
     36     cmd = 'TEST_VAR=world; echo %s' % cmd_helper.SingleQuote(test_string)
     37     self.assertEquals(test_string,
     38                       cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
     39 
     40 
     41 class CmdHelperDoubleQuoteTest(unittest.TestCase):
     42 
     43   def testDoubleQuote_basic(self):
     44     self.assertEquals('hello',
     45                       cmd_helper.DoubleQuote('hello'))
     46 
     47   def testDoubleQuote_withSpaces(self):
     48     self.assertEquals('"hello world"',
     49                       cmd_helper.DoubleQuote('hello world'))
     50 
     51   def testDoubleQuote_withUnsafeChars(self):
     52     self.assertEquals('''"hello\\"; rm -rf /"''',
     53                       cmd_helper.DoubleQuote('hello"; rm -rf /'))
     54 
     55   def testSingleQuote_doExpand(self):
     56     test_string = 'hello $TEST_VAR'
     57     cmd = 'TEST_VAR=world; echo %s' % cmd_helper.DoubleQuote(test_string)
     58     self.assertEquals('hello world',
     59                       cmd_helper.GetCmdOutput(cmd, shell=True).rstrip())
     60 
     61 
     62 class CmdHelperShinkToSnippetTest(unittest.TestCase):
     63 
     64   def testShrinkToSnippet_noArgs(self):
     65     self.assertEquals('foo',
     66         cmd_helper.ShrinkToSnippet(['foo'], 'a', 'bar'))
     67     self.assertEquals("'foo foo'",
     68         cmd_helper.ShrinkToSnippet(['foo foo'], 'a', 'bar'))
     69     self.assertEquals('"$a"\' bar\'',
     70         cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'foo'))
     71     self.assertEquals('\'foo \'"$a"',
     72         cmd_helper.ShrinkToSnippet(['foo bar'], 'a', 'bar'))
     73     self.assertEquals('foo"$a"',
     74         cmd_helper.ShrinkToSnippet(['foobar'], 'a', 'bar'))
     75 
     76   def testShrinkToSnippet_singleArg(self):
     77     self.assertEquals("foo ''",
     78         cmd_helper.ShrinkToSnippet(['foo', ''], 'a', 'bar'))
     79     self.assertEquals("foo foo",
     80         cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'bar'))
     81     self.assertEquals('"$a" "$a"',
     82         cmd_helper.ShrinkToSnippet(['foo', 'foo'], 'a', 'foo'))
     83     self.assertEquals('foo "$a""$a"',
     84         cmd_helper.ShrinkToSnippet(['foo', 'barbar'], 'a', 'bar'))
     85     self.assertEquals('foo "$a"\' \'"$a"',
     86         cmd_helper.ShrinkToSnippet(['foo', 'bar bar'], 'a', 'bar'))
     87     self.assertEquals('foo "$a""$a"\' \'',
     88         cmd_helper.ShrinkToSnippet(['foo', 'barbar '], 'a', 'bar'))
     89     self.assertEquals('foo \' \'"$a""$a"\' \'',
     90         cmd_helper.ShrinkToSnippet(['foo', ' barbar '], 'a', 'bar'))
     91 
     92 
     93 _DEFAULT = 'DEFAULT'
     94 
     95 
     96 class _ProcessOutputEvent(object):
     97 
     98   def __init__(self, select_fds=_DEFAULT, read_contents=None, ts=_DEFAULT):
     99     self.select_fds = select_fds
    100     self.read_contents = read_contents
    101     self.ts = ts
    102 
    103 
    104 class _MockProcess(object):
    105 
    106   def __init__(self, output_sequence=None, return_value=0):
    107 
    108     # Arbitrary.
    109     fake_stdout_fileno = 25
    110 
    111     self.mock_proc = mock.MagicMock(spec=subprocess.Popen)
    112     self.mock_proc.stdout = mock.MagicMock()
    113     self.mock_proc.stdout.fileno = mock.MagicMock(
    114         return_value=fake_stdout_fileno)
    115     self.mock_proc.returncode = None
    116 
    117     self._return_value = return_value
    118 
    119     # This links the behavior of os.read, select.select, time.time, and
    120     # <process>.poll. The output sequence can be thought of as a list of
    121     # return values for select.select with corresponding return values for
    122     # the other calls at any time between that select call and the following
    123     # one. We iterate through the sequence only on calls to select.select.
    124     #
    125     # os.read is a special case, though, where we only return a given chunk
    126     # of data *once* after a given call to select.
    127 
    128     if not output_sequence:
    129       output_sequence = []
    130 
    131     # Use an leading element to make the iteration logic work.
    132     initial_seq_element = _ProcessOutputEvent(
    133         _DEFAULT, '',
    134         output_sequence[0].ts if output_sequence else _DEFAULT)
    135     output_sequence.insert(0, initial_seq_element)
    136 
    137     for o in output_sequence:
    138       if o.select_fds == _DEFAULT:
    139         if o.read_contents is None:
    140           o.select_fds = []
    141         else:
    142           o.select_fds = [fake_stdout_fileno]
    143       if o.ts == _DEFAULT:
    144         o.ts = time.time()
    145     self._output_sequence = output_sequence
    146 
    147     self._output_seq_index = 0
    148     self._read_flags = [False] * len(output_sequence)
    149 
    150     def read_side_effect(*_args, **_kwargs):
    151       if self._read_flags[self._output_seq_index]:
    152         return None
    153       self._read_flags[self._output_seq_index] = True
    154       return self._output_sequence[self._output_seq_index].read_contents
    155 
    156     def select_side_effect(*_args, **_kwargs):
    157       if self._output_seq_index is None:
    158         self._output_seq_index = 0
    159       else:
    160         self._output_seq_index += 1
    161       if self._output_seq_index < len(self._output_sequence):
    162         return (self._output_sequence[self._output_seq_index].select_fds,
    163                 None, None)
    164       else:
    165         return([], None, None)
    166 
    167     def time_side_effect(*_args, **_kwargs):
    168       return self._output_sequence[self._output_seq_index].ts
    169 
    170     def poll_side_effect(*_args, **_kwargs):
    171       if self._output_seq_index >= len(self._output_sequence) - 1:
    172         self.mock_proc.returncode = self._return_value
    173       return self.mock_proc.returncode
    174 
    175     mock_read = mock.MagicMock(side_effect=read_side_effect)
    176     mock_select = mock.MagicMock(side_effect=select_side_effect)
    177     mock_time = mock.MagicMock(side_effect=time_side_effect)
    178     self.mock_proc.poll = mock.MagicMock(side_effect=poll_side_effect)
    179 
    180     # Set up but *do not start* the mocks.
    181     self._mocks = [
    182       mock.patch('os.read', new=mock_read),
    183       mock.patch('select.select', new=mock_select),
    184       mock.patch('time.time', new=mock_time),
    185     ]
    186     if sys.platform != 'win32':
    187       self._mocks.append(mock.patch('fcntl.fcntl'))
    188 
    189   def __enter__(self):
    190     for m in self._mocks:
    191       m.__enter__()
    192     return self.mock_proc
    193 
    194   def __exit__(self, exc_type, exc_val, exc_tb):
    195     for m in reversed(self._mocks):
    196       m.__exit__(exc_type, exc_val, exc_tb)
    197 
    198 
    199 class CmdHelperIterCmdOutputLinesTest(unittest.TestCase):
    200   """Test IterCmdOutputLines with some calls to the unix 'seq' command."""
    201 
    202   # This calls _IterCmdOutputLines rather than IterCmdOutputLines s.t. it
    203   # can mock the process.
    204   # pylint: disable=protected-access
    205 
    206   _SIMPLE_OUTPUT_SEQUENCE = [
    207     _ProcessOutputEvent(read_contents='1\n2\n'),
    208   ]
    209 
    210   def testIterCmdOutputLines_success(self):
    211     with _MockProcess(
    212         output_sequence=self._SIMPLE_OUTPUT_SEQUENCE) as mock_proc:
    213       for num, line in enumerate(
    214           cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
    215         self.assertEquals(num, int(line))
    216 
    217   def testIterCmdOutputLines_exitStatusFail(self):
    218     with self.assertRaises(subprocess.CalledProcessError):
    219       with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
    220                         return_value=1) as mock_proc:
    221         for num, line in enumerate(
    222             cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
    223           self.assertEquals(num, int(line))
    224         # after reading all the output we get an exit status of 1
    225 
    226   def testIterCmdOutputLines_exitStatusIgnored(self):
    227     with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
    228                       return_value=1) as mock_proc:
    229       for num, line in enumerate(
    230           cmd_helper._IterCmdOutputLines(
    231               mock_proc, 'mock_proc', check_status=False),
    232           1):
    233         self.assertEquals(num, int(line))
    234 
    235   def testIterCmdOutputLines_exitStatusSkipped(self):
    236     with _MockProcess(output_sequence=self._SIMPLE_OUTPUT_SEQUENCE,
    237                       return_value=1) as mock_proc:
    238       for num, line in enumerate(
    239           cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc'), 1):
    240         self.assertEquals(num, int(line))
    241         # no exception will be raised because we don't attempt to read past
    242         # the end of the output and, thus, the status never gets checked
    243         if num == 2:
    244           break
    245 
    246   def testIterCmdOutputLines_delay(self):
    247     output_sequence = [
    248       _ProcessOutputEvent(read_contents='1\n2\n', ts=1),
    249       _ProcessOutputEvent(read_contents=None, ts=2),
    250       _ProcessOutputEvent(read_contents='Awake', ts=10),
    251     ]
    252     with _MockProcess(output_sequence=output_sequence) as mock_proc:
    253       for num, line in enumerate(
    254           cmd_helper._IterCmdOutputLines(mock_proc, 'mock_proc',
    255                                          iter_timeout=5), 1):
    256         if num <= 2:
    257           self.assertEquals(num, int(line))
    258         elif num == 3:
    259           self.assertEquals(None, line)
    260         elif num == 4:
    261           self.assertEquals('Awake', line)
    262         else:
    263           self.fail()
    264 
    265 
    266 if __name__ == '__main__':
    267   unittest.main()
    268