Home | History | Annotate | Download | only in common_lib
      1 #!/usr/bin/python
      2 
      3 import StringIO
      4 import errno
      5 import logging
      6 import os
      7 import select
      8 import shutil
      9 import socket
     10 import subprocess
     11 import time
     12 import unittest
     13 import urllib2
     14 
     15 import common
     16 from autotest_lib.client.common_lib import autotemp
     17 from autotest_lib.client.common_lib import error
     18 from autotest_lib.client.common_lib import utils
     19 from autotest_lib.client.common_lib.test_utils import mock
     20 
     21 metrics = utils.metrics_mock
     22 
     23 
     24 class test_read_one_line(unittest.TestCase):
     25     def setUp(self):
     26         self.god = mock.mock_god(ut=self)
     27         self.god.stub_function(utils, "open")
     28 
     29 
     30     def tearDown(self):
     31         self.god.unstub_all()
     32 
     33 
     34     def test_ip_to_long(self):
     35         self.assertEqual(utils.ip_to_long('0.0.0.0'), 0)
     36         self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295)
     37         self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521)
     38         self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320)
     39 
     40 
     41     def test_long_to_ip(self):
     42         self.assertEqual(utils.long_to_ip(0), '0.0.0.0')
     43         self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255')
     44         self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1')
     45         self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8')
     46 
     47 
     48     def test_create_subnet_mask(self):
     49         self.assertEqual(utils.create_subnet_mask(0), 0)
     50         self.assertEqual(utils.create_subnet_mask(32), 4294967295)
     51         self.assertEqual(utils.create_subnet_mask(25), 4294967168)
     52 
     53 
     54     def test_format_ip_with_mask(self):
     55         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0),
     56                          '0.0.0.0/0')
     57         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32),
     58                          '192.168.0.1/32')
     59         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26),
     60                          '192.168.0.0/26')
     61         self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26),
     62                          '192.168.0.192/26')
     63 
     64 
     65     def create_test_file(self, contents):
     66         test_file = StringIO.StringIO(contents)
     67         utils.open.expect_call("filename", "r").and_return(test_file)
     68 
     69 
     70     def test_reads_one_line_file(self):
     71         self.create_test_file("abc\n")
     72         self.assertEqual("abc", utils.read_one_line("filename"))
     73         self.god.check_playback()
     74 
     75 
     76     def test_strips_read_lines(self):
     77         self.create_test_file("abc   \n")
     78         self.assertEqual("abc   ", utils.read_one_line("filename"))
     79         self.god.check_playback()
     80 
     81 
     82     def test_drops_extra_lines(self):
     83         self.create_test_file("line 1\nline 2\nline 3\n")
     84         self.assertEqual("line 1", utils.read_one_line("filename"))
     85         self.god.check_playback()
     86 
     87 
     88     def test_works_on_empty_file(self):
     89         self.create_test_file("")
     90         self.assertEqual("", utils.read_one_line("filename"))
     91         self.god.check_playback()
     92 
     93 
     94     def test_works_on_file_with_no_newlines(self):
     95         self.create_test_file("line but no newline")
     96         self.assertEqual("line but no newline",
     97                          utils.read_one_line("filename"))
     98         self.god.check_playback()
     99 
    100 
    101     def test_preserves_leading_whitespace(self):
    102         self.create_test_file("   has leading whitespace")
    103         self.assertEqual("   has leading whitespace",
    104                          utils.read_one_line("filename"))
    105 
    106 
    107 class test_write_one_line(unittest.TestCase):
    108     def setUp(self):
    109         self.god = mock.mock_god(ut=self)
    110         self.god.stub_function(utils, "open")
    111 
    112 
    113     def tearDown(self):
    114         self.god.unstub_all()
    115 
    116 
    117     def get_write_one_line_output(self, content):
    118         test_file = mock.SaveDataAfterCloseStringIO()
    119         utils.open.expect_call("filename", "w").and_return(test_file)
    120         utils.write_one_line("filename", content)
    121         self.god.check_playback()
    122         return test_file.final_data
    123 
    124 
    125     def test_writes_one_line_file(self):
    126         self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
    127 
    128 
    129     def test_preserves_existing_newline(self):
    130         self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
    131 
    132 
    133     def test_preserves_leading_whitespace(self):
    134         self.assertEqual("   abc\n", self.get_write_one_line_output("   abc"))
    135 
    136 
    137     def test_preserves_trailing_whitespace(self):
    138         self.assertEqual("abc   \n", self.get_write_one_line_output("abc   "))
    139 
    140 
    141     def test_handles_empty_input(self):
    142         self.assertEqual("\n", self.get_write_one_line_output(""))
    143 
    144 
    145 class test_open_write_close(unittest.TestCase):
    146     def setUp(self):
    147         self.god = mock.mock_god(ut=self)
    148         self.god.stub_function(utils, "open")
    149 
    150 
    151     def tearDown(self):
    152         self.god.unstub_all()
    153 
    154 
    155     def test_simple_functionality(self):
    156         data = "\n\nwhee\n"
    157         test_file = mock.SaveDataAfterCloseStringIO()
    158         utils.open.expect_call("filename", "w").and_return(test_file)
    159         utils.open_write_close("filename", data)
    160         self.god.check_playback()
    161         self.assertEqual(data, test_file.final_data)
    162 
    163 
    164 class test_read_keyval(unittest.TestCase):
    165     def setUp(self):
    166         self.god = mock.mock_god(ut=self)
    167         self.god.stub_function(utils, "open")
    168         self.god.stub_function(os.path, "isdir")
    169         self.god.stub_function(os.path, "exists")
    170 
    171 
    172     def tearDown(self):
    173         self.god.unstub_all()
    174 
    175 
    176     def create_test_file(self, filename, contents):
    177         test_file = StringIO.StringIO(contents)
    178         os.path.exists.expect_call(filename).and_return(True)
    179         utils.open.expect_call(filename).and_return(test_file)
    180 
    181 
    182     def read_keyval(self, contents):
    183         os.path.isdir.expect_call("file").and_return(False)
    184         self.create_test_file("file", contents)
    185         keyval = utils.read_keyval("file")
    186         self.god.check_playback()
    187         return keyval
    188 
    189 
    190     def test_returns_empty_when_file_doesnt_exist(self):
    191         os.path.isdir.expect_call("file").and_return(False)
    192         os.path.exists.expect_call("file").and_return(False)
    193         self.assertEqual({}, utils.read_keyval("file"))
    194         self.god.check_playback()
    195 
    196 
    197     def test_accesses_files_directly(self):
    198         os.path.isdir.expect_call("file").and_return(False)
    199         self.create_test_file("file", "")
    200         utils.read_keyval("file")
    201         self.god.check_playback()
    202 
    203 
    204     def test_accesses_directories_through_keyval_file(self):
    205         os.path.isdir.expect_call("dir").and_return(True)
    206         self.create_test_file("dir/keyval", "")
    207         utils.read_keyval("dir")
    208         self.god.check_playback()
    209 
    210 
    211     def test_values_are_rstripped(self):
    212         keyval = self.read_keyval("a=b   \n")
    213         self.assertEquals(keyval, {"a": "b"})
    214 
    215 
    216     def test_comments_are_ignored(self):
    217         keyval = self.read_keyval("a=b # a comment\n")
    218         self.assertEquals(keyval, {"a": "b"})
    219 
    220 
    221     def test_integers_become_ints(self):
    222         keyval = self.read_keyval("a=1\n")
    223         self.assertEquals(keyval, {"a": 1})
    224         self.assertEquals(int, type(keyval["a"]))
    225 
    226 
    227     def test_float_values_become_floats(self):
    228         keyval = self.read_keyval("a=1.5\n")
    229         self.assertEquals(keyval, {"a": 1.5})
    230         self.assertEquals(float, type(keyval["a"]))
    231 
    232 
    233     def test_multiple_lines(self):
    234         keyval = self.read_keyval("a=one\nb=two\n")
    235         self.assertEquals(keyval, {"a": "one", "b": "two"})
    236 
    237 
    238     def test_the_last_duplicate_line_is_used(self):
    239         keyval = self.read_keyval("a=one\nb=two\na=three\n")
    240         self.assertEquals(keyval, {"a": "three", "b": "two"})
    241 
    242 
    243     def test_extra_equals_are_included_in_values(self):
    244         keyval = self.read_keyval("a=b=c\n")
    245         self.assertEquals(keyval, {"a": "b=c"})
    246 
    247 
    248     def test_non_alphanumeric_keynames_are_rejected(self):
    249         self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
    250 
    251 
    252     def test_underscores_are_allowed_in_key_names(self):
    253         keyval = self.read_keyval("a_b=value\n")
    254         self.assertEquals(keyval, {"a_b": "value"})
    255 
    256 
    257     def test_dashes_are_allowed_in_key_names(self):
    258         keyval = self.read_keyval("a-b=value\n")
    259         self.assertEquals(keyval, {"a-b": "value"})
    260 
    261     def test_empty_value_is_allowed(self):
    262         keyval = self.read_keyval("a=\n")
    263         self.assertEquals(keyval, {"a": ""})
    264 
    265 
    266 class test_write_keyval(unittest.TestCase):
    267     def setUp(self):
    268         self.god = mock.mock_god(ut=self)
    269         self.god.stub_function(utils, "open")
    270         self.god.stub_function(os.path, "isdir")
    271 
    272 
    273     def tearDown(self):
    274         self.god.unstub_all()
    275 
    276 
    277     def assertHasLines(self, value, lines):
    278         vlines = value.splitlines()
    279         vlines.sort()
    280         self.assertEquals(vlines, sorted(lines))
    281 
    282 
    283     def write_keyval(self, filename, dictionary, expected_filename=None,
    284                      type_tag=None):
    285         if expected_filename is None:
    286             expected_filename = filename
    287         test_file = StringIO.StringIO()
    288         self.god.stub_function(test_file, "close")
    289         utils.open.expect_call(expected_filename, "a").and_return(test_file)
    290         test_file.close.expect_call()
    291         if type_tag is None:
    292             utils.write_keyval(filename, dictionary)
    293         else:
    294             utils.write_keyval(filename, dictionary, type_tag)
    295         return test_file.getvalue()
    296 
    297 
    298     def write_keyval_file(self, dictionary, type_tag=None):
    299         os.path.isdir.expect_call("file").and_return(False)
    300         return self.write_keyval("file", dictionary, type_tag=type_tag)
    301 
    302 
    303     def test_accesses_files_directly(self):
    304         os.path.isdir.expect_call("file").and_return(False)
    305         result = self.write_keyval("file", {"a": "1"})
    306         self.assertEquals(result, "a=1\n")
    307 
    308 
    309     def test_accesses_directories_through_keyval_file(self):
    310         os.path.isdir.expect_call("dir").and_return(True)
    311         result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
    312         self.assertEquals(result, "b=2\n")
    313 
    314 
    315     def test_numbers_are_stringified(self):
    316         result = self.write_keyval_file({"c": 3})
    317         self.assertEquals(result, "c=3\n")
    318 
    319 
    320     def test_type_tags_are_excluded_by_default(self):
    321         result = self.write_keyval_file({"d": "a string"})
    322         self.assertEquals(result, "d=a string\n")
    323         self.assertRaises(ValueError, self.write_keyval_file,
    324                           {"d{perf}": "a string"})
    325 
    326 
    327     def test_perf_tags_are_allowed(self):
    328         result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
    329                                         type_tag="perf")
    330         self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
    331         self.assertRaises(ValueError, self.write_keyval_file,
    332                           {"a": 1, "b": 2}, type_tag="perf")
    333 
    334 
    335     def test_non_alphanumeric_keynames_are_rejected(self):
    336         self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
    337 
    338 
    339     def test_underscores_are_allowed_in_key_names(self):
    340         result = self.write_keyval_file({"a_b": "value"})
    341         self.assertEquals(result, "a_b=value\n")
    342 
    343 
    344     def test_dashes_are_allowed_in_key_names(self):
    345         result = self.write_keyval_file({"a-b": "value"})
    346         self.assertEquals(result, "a-b=value\n")
    347 
    348 
    349 class test_is_url(unittest.TestCase):
    350     def test_accepts_http(self):
    351         self.assertTrue(utils.is_url("http://example.com"))
    352 
    353 
    354     def test_accepts_ftp(self):
    355         self.assertTrue(utils.is_url("ftp://ftp.example.com"))
    356 
    357 
    358     def test_rejects_local_path(self):
    359         self.assertFalse(utils.is_url("/home/username/file"))
    360 
    361 
    362     def test_rejects_local_filename(self):
    363         self.assertFalse(utils.is_url("filename"))
    364 
    365 
    366     def test_rejects_relative_local_path(self):
    367         self.assertFalse(utils.is_url("somedir/somesubdir/file"))
    368 
    369 
    370     def test_rejects_local_path_containing_url(self):
    371         self.assertFalse(utils.is_url("somedir/http://path/file"))
    372 
    373 
    374 class test_urlopen(unittest.TestCase):
    375     def setUp(self):
    376         self.god = mock.mock_god(ut=self)
    377 
    378 
    379     def tearDown(self):
    380         self.god.unstub_all()
    381 
    382 
    383     def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
    384                                              *expected_args):
    385         expected_args += (None,) * (2 - len(expected_args))
    386         def urlopen(url, data=None):
    387             self.assertEquals(expected_args, (url,data))
    388             test_func(socket.getdefaulttimeout())
    389             return expected_return
    390         self.god.stub_with(urllib2, "urlopen", urlopen)
    391 
    392 
    393     def stub_urlopen_with_timeout_check(self, expected_timeout,
    394                                         expected_return, *expected_args):
    395         def test_func(timeout):
    396             self.assertEquals(timeout, expected_timeout)
    397         self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
    398                                                   *expected_args)
    399 
    400 
    401     def test_timeout_set_during_call(self):
    402         self.stub_urlopen_with_timeout_check(30, "retval", "url")
    403         retval = utils.urlopen("url", timeout=30)
    404         self.assertEquals(retval, "retval")
    405 
    406 
    407     def test_timeout_reset_after_call(self):
    408         old_timeout = socket.getdefaulttimeout()
    409         self.stub_urlopen_with_timeout_check(30, None, "url")
    410         try:
    411             socket.setdefaulttimeout(1234)
    412             utils.urlopen("url", timeout=30)
    413             self.assertEquals(1234, socket.getdefaulttimeout())
    414         finally:
    415             socket.setdefaulttimeout(old_timeout)
    416 
    417 
    418     def test_timeout_set_by_default(self):
    419         def test_func(timeout):
    420             self.assertTrue(timeout is not None)
    421         self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
    422         utils.urlopen("url")
    423 
    424 
    425     def test_args_are_untouched(self):
    426         self.stub_urlopen_with_timeout_check(30, None, "http://url",
    427                                              "POST data")
    428         utils.urlopen("http://url", timeout=30, data="POST data")
    429 
    430 
    431 class test_urlretrieve(unittest.TestCase):
    432     def setUp(self):
    433         self.god = mock.mock_god(ut=self)
    434 
    435 
    436     def tearDown(self):
    437         self.god.unstub_all()
    438 
    439 
    440     def test_urlopen_passed_arguments(self):
    441         self.god.stub_function(utils, "urlopen")
    442         self.god.stub_function(utils.shutil, "copyfileobj")
    443         self.god.stub_function(utils, "open")
    444 
    445         url = "url"
    446         dest = "somefile"
    447         data = object()
    448         timeout = 10
    449 
    450         src_file = self.god.create_mock_class(file, "file")
    451         dest_file = self.god.create_mock_class(file, "file")
    452 
    453         (utils.urlopen.expect_call(url, data=data, timeout=timeout)
    454                 .and_return(src_file))
    455         utils.open.expect_call(dest, "wb").and_return(dest_file)
    456         utils.shutil.copyfileobj.expect_call(src_file, dest_file)
    457         dest_file.close.expect_call()
    458         src_file.close.expect_call()
    459 
    460         utils.urlretrieve(url, dest, data=data, timeout=timeout)
    461         self.god.check_playback()
    462 
    463 
    464 class test_merge_trees(unittest.TestCase):
    465     # a some path-handling helper functions
    466     def src(self, *path_segments):
    467         return os.path.join(self.src_tree.name, *path_segments)
    468 
    469 
    470     def dest(self, *path_segments):
    471         return os.path.join(self.dest_tree.name, *path_segments)
    472 
    473 
    474     def paths(self, *path_segments):
    475         return self.src(*path_segments), self.dest(*path_segments)
    476 
    477 
    478     def assertFileEqual(self, *path_segments):
    479         src, dest = self.paths(*path_segments)
    480         self.assertEqual(True, os.path.isfile(src))
    481         self.assertEqual(True, os.path.isfile(dest))
    482         self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
    483         self.assertEqual(open(src).read(), open(dest).read())
    484 
    485 
    486     def assertFileContents(self, contents, *path_segments):
    487         dest = self.dest(*path_segments)
    488         self.assertEqual(True, os.path.isfile(dest))
    489         self.assertEqual(os.path.getsize(dest), len(contents))
    490         self.assertEqual(contents, open(dest).read())
    491 
    492 
    493     def setUp(self):
    494         self.src_tree = autotemp.tempdir(unique_id='utilsrc')
    495         self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
    496 
    497         # empty subdirs
    498         os.mkdir(self.src("empty"))
    499         os.mkdir(self.dest("empty"))
    500 
    501 
    502     def tearDown(self):
    503         self.src_tree.clean()
    504         self.dest_tree.clean()
    505 
    506 
    507     def test_both_dont_exist(self):
    508         utils.merge_trees(*self.paths("empty"))
    509 
    510 
    511     def test_file_only_at_src(self):
    512         print >> open(self.src("src_only"), "w"), "line 1"
    513         utils.merge_trees(*self.paths("src_only"))
    514         self.assertFileEqual("src_only")
    515 
    516 
    517     def test_file_only_at_dest(self):
    518         print >> open(self.dest("dest_only"), "w"), "line 1"
    519         utils.merge_trees(*self.paths("dest_only"))
    520         self.assertEqual(False, os.path.exists(self.src("dest_only")))
    521         self.assertFileContents("line 1\n", "dest_only")
    522 
    523 
    524     def test_file_at_both(self):
    525         print >> open(self.dest("in_both"), "w"), "line 1"
    526         print >> open(self.src("in_both"), "w"), "line 2"
    527         utils.merge_trees(*self.paths("in_both"))
    528         self.assertFileContents("line 1\nline 2\n", "in_both")
    529 
    530 
    531     def test_directory_with_files_in_both(self):
    532         print >> open(self.dest("in_both"), "w"), "line 1"
    533         print >> open(self.src("in_both"), "w"), "line 3"
    534         utils.merge_trees(*self.paths())
    535         self.assertFileContents("line 1\nline 3\n", "in_both")
    536 
    537 
    538     def test_directory_with_mix_of_files(self):
    539         print >> open(self.dest("in_dest"), "w"), "dest line"
    540         print >> open(self.src("in_src"), "w"), "src line"
    541         utils.merge_trees(*self.paths())
    542         self.assertFileContents("dest line\n", "in_dest")
    543         self.assertFileContents("src line\n", "in_src")
    544 
    545 
    546     def test_directory_with_subdirectories(self):
    547         os.mkdir(self.src("src_subdir"))
    548         print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line"
    549         os.mkdir(self.src("both_subdir"))
    550         os.mkdir(self.dest("both_subdir"))
    551         print >> open(self.src("both_subdir", "subfile"), "w"), "src line"
    552         print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line"
    553         utils.merge_trees(*self.paths())
    554         self.assertFileContents("subdir line\n", "src_subdir", "subfile")
    555         self.assertFileContents("dest line\nsrc line\n", "both_subdir",
    556                                 "subfile")
    557 
    558 
    559 class test_get_relative_path(unittest.TestCase):
    560     def test_not_absolute(self):
    561         self.assertRaises(AssertionError, utils.get_relative_path, "a", "b")
    562 
    563     def test_same_dir(self):
    564         self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c")
    565 
    566     def test_forward_dir(self):
    567         self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
    568 
    569     def test_previous_dir(self):
    570         self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
    571 
    572     def test_parallel_dir(self):
    573         self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"),
    574                          "../../../c/d")
    575 
    576 
    577 class test_sh_escape(unittest.TestCase):
    578     def _test_in_shell(self, text):
    579         escaped_text = utils.sh_escape(text)
    580         proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
    581                                 stdin=open(os.devnull, 'r'),
    582                                 stdout=subprocess.PIPE,
    583                                 stderr=open(os.devnull, 'w'))
    584         stdout, _ = proc.communicate()
    585         self.assertEqual(proc.returncode, 0)
    586         self.assertEqual(stdout[:-1], text)
    587 
    588 
    589     def test_normal_string(self):
    590         self._test_in_shell('abcd')
    591 
    592 
    593     def test_spaced_string(self):
    594         self._test_in_shell('abcd efgh')
    595 
    596 
    597     def test_dollar(self):
    598         self._test_in_shell('$')
    599 
    600 
    601     def test_single_quote(self):
    602         self._test_in_shell('\'')
    603 
    604 
    605     def test_single_quoted_string(self):
    606         self._test_in_shell('\'efgh\'')
    607 
    608 
    609     def test_string_with_single_quote(self):
    610         self._test_in_shell("a'b")
    611 
    612 
    613     def test_string_with_escaped_single_quote(self):
    614         self._test_in_shell(r"a\'b")
    615 
    616 
    617     def test_double_quote(self):
    618         self._test_in_shell('"')
    619 
    620 
    621     def test_double_quoted_string(self):
    622         self._test_in_shell('"abcd"')
    623 
    624 
    625     def test_backtick(self):
    626         self._test_in_shell('`')
    627 
    628 
    629     def test_backticked_string(self):
    630         self._test_in_shell('`jklm`')
    631 
    632 
    633     def test_backslash(self):
    634         self._test_in_shell('\\')
    635 
    636 
    637     def test_backslashed_special_characters(self):
    638         self._test_in_shell('\\$')
    639         self._test_in_shell('\\"')
    640         self._test_in_shell('\\\'')
    641         self._test_in_shell('\\`')
    642 
    643 
    644     def test_backslash_codes(self):
    645         self._test_in_shell('\\n')
    646         self._test_in_shell('\\r')
    647         self._test_in_shell('\\t')
    648         self._test_in_shell('\\v')
    649         self._test_in_shell('\\b')
    650         self._test_in_shell('\\a')
    651         self._test_in_shell('\\000')
    652 
    653     def test_real_newline(self):
    654         self._test_in_shell('\n')
    655         self._test_in_shell('\\\n')
    656 
    657 
    658 class test_sh_quote_word(test_sh_escape):
    659     """Run tests on sh_quote_word.
    660 
    661     Inherit from test_sh_escape to get the same tests to run on both.
    662     """
    663 
    664     def _test_in_shell(self, text):
    665         quoted_word = utils.sh_quote_word(text)
    666         echoed_value = subprocess.check_output('echo %s' % quoted_word,
    667                                                shell=True)
    668         self.assertEqual(echoed_value, text + '\n')
    669 
    670 
    671 class test_nested_sh_quote_word(test_sh_quote_word):
    672     """Run nested tests on sh_quote_word.
    673 
    674     Inherit from test_sh_quote_word to get the same tests to run on both.
    675     """
    676 
    677     def _test_in_shell(self, text):
    678         command = 'echo ' + utils.sh_quote_word(text)
    679         nested_command = 'echo ' + utils.sh_quote_word(command)
    680         produced_command = subprocess.check_output(nested_command, shell=True)
    681         echoed_value = subprocess.check_output(produced_command, shell=True)
    682         self.assertEqual(echoed_value, text + '\n')
    683 
    684 
    685 class test_run(unittest.TestCase):
    686     """
    687     Test the utils.run() function.
    688 
    689     Note: This test runs simple external commands to test the utils.run()
    690     API without assuming implementation details.
    691     """
    692 
    693     # Log levels in ascending severity.
    694     LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
    695                   logging.CRITICAL]
    696 
    697 
    698     def setUp(self):
    699         self.god = mock.mock_god(ut=self)
    700         self.god.stub_function(utils.logging, 'warning')
    701         self.god.stub_function(utils.logging, 'debug')
    702 
    703         # Log level -> StringIO.StringIO.
    704         self.logs = {}
    705         for level in self.LOG_LEVELS:
    706             self.logs[level] = StringIO.StringIO()
    707 
    708         # Override logging_manager.LoggingFile to return buffers.
    709         def logging_file(level=None, prefix=None):
    710             return self.logs[level]
    711         self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file)
    712 
    713     def tearDown(self):
    714         self.god.unstub_all()
    715 
    716 
    717     def __check_result(self, result, command, exit_status=0, stdout='',
    718                        stderr=''):
    719         self.assertEquals(result.command, command)
    720         self.assertEquals(result.exit_status, exit_status)
    721         self.assertEquals(result.stdout, stdout)
    722         self.assertEquals(result.stderr, stderr)
    723 
    724 
    725     def __get_logs(self):
    726         """Returns contents of log buffers at all levels.
    727 
    728             @return: 5-element list of strings corresponding to logged messages
    729                 at the levels in self.LOG_LEVELS.
    730         """
    731         return [self.logs[v].getvalue() for v in self.LOG_LEVELS]
    732 
    733 
    734     def test_default_simple(self):
    735         cmd = 'echo "hello world"'
    736         # expect some king of logging.debug() call but don't care about args
    737         utils.logging.debug.expect_any_call()
    738         self.__check_result(utils.run(cmd), cmd, stdout='hello world\n')
    739 
    740 
    741     def test_default_failure(self):
    742         cmd = 'exit 11'
    743         try:
    744             utils.run(cmd, verbose=False)
    745         except utils.error.CmdError, err:
    746             self.__check_result(err.result_obj, cmd, exit_status=11)
    747 
    748 
    749     def test_ignore_status(self):
    750         cmd = 'echo error >&2 && exit 11'
    751         self.__check_result(utils.run(cmd, ignore_status=True, verbose=False),
    752                             cmd, exit_status=11, stderr='error\n')
    753 
    754 
    755     def test_timeout(self):
    756         # we expect a logging.warning() message, don't care about the contents
    757         utils.logging.warning.expect_any_call()
    758         try:
    759             utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
    760         except utils.error.CmdError, err:
    761             self.assertEquals(err.result_obj.stdout, 'output')
    762 
    763 
    764     def test_stdout_stderr_tee(self):
    765         cmd = 'echo output && echo error >&2'
    766         stdout_tee = StringIO.StringIO()
    767         stderr_tee = StringIO.StringIO()
    768 
    769         self.__check_result(utils.run(
    770                 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
    771                 verbose=False), cmd, stdout='output\n', stderr='error\n')
    772         self.assertEqual(stdout_tee.getvalue(), 'output\n')
    773         self.assertEqual(stderr_tee.getvalue(), 'error\n')
    774 
    775 
    776     def test_stdin_string(self):
    777         cmd = 'cat'
    778         self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'),
    779                             cmd, stdout='hi!\n')
    780 
    781 
    782     def test_stdout_tee_to_logs_info(self):
    783         """Test logging stdout at the info level."""
    784         utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
    785                   stdout_level=logging.INFO, verbose=False)
    786         self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', ''])
    787 
    788 
    789     def test_stdout_tee_to_logs_warning(self):
    790         """Test logging stdout at the warning level."""
    791         utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
    792                   stdout_level=logging.WARNING, verbose=False)
    793         self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', ''])
    794 
    795 
    796     def test_stdout_and_stderr_tee_to_logs(self):
    797         """Test simultaneous stdout and stderr log levels."""
    798         utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
    799                   stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
    800                   stderr_level=logging.ERROR, verbose=False)
    801         self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', ''])
    802 
    803 
    804     def test_default_expected_stderr_log_level(self):
    805         """Test default expected stderr log level.
    806 
    807         stderr should be logged at the same level as stdout when
    808         stderr_is_expected is true and stderr_level isn't passed.
    809         """
    810         utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
    811                   stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
    812                   stderr_is_expected=True, verbose=False)
    813         self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', ''])
    814 
    815 
    816     def test_safe_args(self):
    817         # NOTE: The string in expected_quoted_cmd depends on the internal
    818         # implementation of shell quoting which is used by utils.run(),
    819         # in this case, sh_quote_word().
    820         expected_quoted_cmd = "echo 'hello \"world' again"
    821         self.__check_result(utils.run(
    822                 'echo', verbose=False, args=('hello "world', 'again')),
    823                 expected_quoted_cmd, stdout='hello "world again\n')
    824 
    825 
    826     def test_safe_args_given_string(self):
    827         self.assertRaises(TypeError, utils.run, 'echo', args='hello')
    828 
    829 
    830     def test_wait_interrupt(self):
    831         """Test that we actually select twice if the first one returns EINTR."""
    832         utils.logging.debug.expect_any_call()
    833 
    834         bg_job = utils.BgJob('echo "hello world"')
    835         bg_job.result.exit_status = 0
    836         self.god.stub_function(utils.select, 'select')
    837 
    838         utils.select.select.expect_any_call().and_raises(
    839                 select.error(errno.EINTR, 'Select interrupted'))
    840         utils.logging.warning.expect_any_call()
    841 
    842         utils.select.select.expect_any_call().and_return(
    843                 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
    844         utils.logging.warning.expect_any_call()
    845 
    846         self.assertFalse(
    847                 utils._wait_for_commands([bg_job], time.time(), None))
    848 
    849 
    850 class test_compare_versions(unittest.TestCase):
    851     def test_zerofill(self):
    852         self.assertEqual(utils.compare_versions('1.7', '1.10'), -1)
    853         self.assertEqual(utils.compare_versions('1.222', '1.3'), 1)
    854         self.assertEqual(utils.compare_versions('1.03', '1.3'), 0)
    855 
    856 
    857     def test_unequal_len(self):
    858         self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1)
    859         self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1)
    860 
    861 
    862     def test_dash_delimited(self):
    863         self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1)
    864         self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1)
    865         self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0)
    866 
    867 
    868     def test_alphabets(self):
    869         self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1)
    870         self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1)
    871         self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0)
    872 
    873 
    874     def test_mix_symbols(self):
    875         self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1)
    876         self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1)
    877         self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0)
    878 
    879         self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1)
    880         self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1)
    881         self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0)
    882 
    883 
    884 class test_args_to_dict(unittest.TestCase):
    885     def test_no_args(self):
    886         result = utils.args_to_dict([])
    887         self.assertEqual({}, result)
    888 
    889 
    890     def test_matches(self):
    891         result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
    892                                      'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
    893         self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
    894                                   'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
    895 
    896 
    897     def test_unmatches(self):
    898         # Temporarily shut warning messages from args_to_dict() when an argument
    899         # doesn't match its pattern.
    900         logger = logging.getLogger()
    901         saved_level = logger.level
    902         logger.setLevel(logging.ERROR)
    903 
    904         try:
    905             result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
    906                                          ':VAL', '=VVV', 'WORD'])
    907             self.assertEqual({}, result)
    908         finally:
    909             # Restore level.
    910             logger.setLevel(saved_level)
    911 
    912 
    913 class test_get_random_port(unittest.TestCase):
    914     def do_bind(self, port, socket_type, socket_proto):
    915         s = socket.socket(socket.AF_INET, socket_type, socket_proto)
    916         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    917         s.bind(('', port))
    918         return s
    919 
    920 
    921     def test_get_port(self):
    922         for _ in xrange(100):
    923             p = utils.get_unused_port()
    924             s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
    925             self.assert_(s.getsockname())
    926             s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    927             self.assert_(s.getsockname())
    928 
    929 
    930 def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    931     """Test global function.
    932     """
    933 
    934 
    935 class TestClass(object):
    936     """Test class.
    937     """
    938 
    939     def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    940         """Test instance function.
    941         """
    942 
    943 
    944     @classmethod
    945     def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    946         """Test class function.
    947         """
    948 
    949 
    950     @staticmethod
    951     def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    952         """Test static function.
    953         """
    954 
    955 
    956 class GetFunctionArgUnittest(unittest.TestCase):
    957     """Tests for method get_function_arg_value."""
    958 
    959     def run_test(self, func, insert_arg):
    960         """Run test.
    961 
    962         @param func: Function being called with given arguments.
    963         @param insert_arg: Set to True to insert an object in the argument list.
    964                            This is to mock instance/class object.
    965         """
    966         if insert_arg:
    967             args = (None, 1, 2, 3)
    968         else:
    969             args = (1, 2, 3)
    970         for i in range(1, 7):
    971             self.assertEquals(utils.get_function_arg_value(
    972                     func, 'arg%d'%i, args, {}), i)
    973 
    974         self.assertEquals(utils.get_function_arg_value(
    975                 func, 'arg7', args, {'arg7': 7}), 7)
    976         self.assertRaises(
    977                 KeyError, utils.get_function_arg_value,
    978                 func, 'arg3', args[:-1], {})
    979 
    980 
    981     def test_global_function(self):
    982         """Test global function.
    983         """
    984         self.run_test(test_function, False)
    985 
    986 
    987     def test_instance_function(self):
    988         """Test instance function.
    989         """
    990         self.run_test(TestClass().test_instance_function, True)
    991 
    992 
    993     def test_class_function(self):
    994         """Test class function.
    995         """
    996         self.run_test(TestClass.test_class_function, True)
    997 
    998 
    999     def test_static_function(self):
   1000         """Test static function.
   1001         """
   1002         self.run_test(TestClass.test_static_function, False)
   1003 
   1004 
   1005 class VersionMatchUnittest(unittest.TestCase):
   1006     """Test version_match function."""
   1007 
   1008     def test_version_match(self):
   1009         """Test version_match function."""
   1010         canary_build = 'lumpy-release/R43-6803.0.0'
   1011         canary_release = '6803.0.0'
   1012         cq_build = 'lumpy-release/R43-6803.0.0-rc1'
   1013         cq_release = '6803.0.0-rc1'
   1014         trybot_paladin_build = 'trybot-lumpy-paladin/R43-6803.0.0-b123'
   1015         trybot_paladin_release = '6803.0.2015_03_12_2103'
   1016         trybot_pre_cq_build = 'trybot-wifi-pre-cq/R43-7000.0.0-b36'
   1017         trybot_pre_cq_release = '7000.0.2016_03_12_2103'
   1018         trybot_toolchain_build = 'trybot-nyan_big-gcc-toolchain/R56-8936.0.0-b14'
   1019         trybot_toolchain_release = '8936.0.2016_10_26_1403'
   1020         cros_cheets_build = 'lumpy-release/R49-6899.0.0-cheetsth'
   1021         cros_cheets_release = '6899.0.0'
   1022         trybot_paladin_cheets_build = 'trybot-lumpy-paladin/R50-6900.0.0-b123-cheetsth'
   1023         trybot_paladin_cheets_release = '6900.0.2015_03_12_2103'
   1024 
   1025         builds = [canary_build, cq_build, trybot_paladin_build,
   1026                   trybot_pre_cq_build, trybot_toolchain_build,
   1027                   cros_cheets_build, trybot_paladin_cheets_build]
   1028         releases = [canary_release, cq_release, trybot_paladin_release,
   1029                     trybot_pre_cq_release, trybot_toolchain_release,
   1030                     cros_cheets_release, trybot_paladin_cheets_release]
   1031         for i in range(len(builds)):
   1032             for j in range(len(releases)):
   1033                 self.assertEqual(
   1034                         utils.version_match(builds[i], releases[j]), i==j,
   1035                         'Build version %s should%s match release version %s.' %
   1036                         (builds[i], '' if i==j else ' not', releases[j]))
   1037 
   1038 
   1039 class IsInSameSubnetUnittest(unittest.TestCase):
   1040     """Test is_in_same_subnet function."""
   1041 
   1042     def test_is_in_same_subnet(self):
   1043         """Test is_in_same_subnet function."""
   1044         self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
   1045                                                 23))
   1046         self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
   1047                                                 24))
   1048         self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255',
   1049                                                 24))
   1050         self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0',
   1051                                                 24))
   1052 
   1053 
   1054 class GetWirelessSsidUnittest(unittest.TestCase):
   1055     """Test get_wireless_ssid function."""
   1056 
   1057     DEFAULT_SSID = 'default'
   1058     SSID_1 = 'ssid_1'
   1059     SSID_2 = 'ssid_2'
   1060     SSID_3 = 'ssid_3'
   1061 
   1062     def test_get_wireless_ssid(self):
   1063         """Test is_in_same_subnet function."""
   1064         god = mock.mock_god()
   1065         god.stub_function_to_return(utils.CONFIG, 'get_config_value',
   1066                                     self.DEFAULT_SSID)
   1067         god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex',
   1068                                     {'wireless_ssid_1.2.3.4/24': self.SSID_1,
   1069                                      'wireless_ssid_4.3.2.1/16': self.SSID_2,
   1070                                      'wireless_ssid_4.3.2.111/32': self.SSID_3})
   1071         self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100'))
   1072         self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100'))
   1073         self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111'))
   1074         self.assertEqual(self.DEFAULT_SSID,
   1075                          utils.get_wireless_ssid('100.0.0.100'))
   1076 
   1077 
   1078 class LaunchControlBuildParseUnittest(unittest.TestCase):
   1079     """Test various parsing functions related to Launch Control builds and
   1080     devices.
   1081     """
   1082 
   1083     def test_parse_launch_control_target(self):
   1084         """Test parse_launch_control_target function."""
   1085         target_tests = {
   1086                 ('shamu', 'userdebug'): 'shamu-userdebug',
   1087                 ('shamu', 'eng'): 'shamu-eng',
   1088                 ('shamu-board', 'eng'): 'shamu-board-eng',
   1089                 (None, None): 'bad_target',
   1090                 (None, None): 'target'}
   1091         for result, target in target_tests.items():
   1092             self.assertEqual(result, utils.parse_launch_control_target(target))
   1093 
   1094 
   1095 class GetOffloaderUriTest(unittest.TestCase):
   1096     """Test get_offload_gsuri function."""
   1097     _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket'
   1098 
   1099     def setUp(self):
   1100         self.god = mock.mock_god()
   1101 
   1102     def tearDown(self):
   1103         self.god.unstub_all()
   1104 
   1105     def test_get_default_lab_offload_gsuri(self):
   1106         """Test default lab offload gsuri ."""
   1107         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1108         self.god.stub_function_to_return(utils, 'is_moblab', False)
   1109         self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI,
   1110                 utils.get_offload_gsuri())
   1111 
   1112         self.god.check_playback()
   1113 
   1114     def test_get_default_moblab_offload_gsuri(self):
   1115         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1116         self.god.stub_function_to_return(utils, 'is_moblab', True)
   1117         utils.CONFIG.get_config_value.expect_call(
   1118                 'CROS', 'image_storage_server').and_return(
   1119                         self._IMAGE_STORAGE_SERVER)
   1120         self.god.stub_function_to_return(utils,
   1121                 'get_moblab_serial_number', 'test_serial_number')
   1122         self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
   1123         expected_gsuri = '%sresults/%s/%s/' % (
   1124                 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id')
   1125         cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI
   1126         utils.DEFAULT_OFFLOAD_GSURI = None
   1127         gsuri = utils.get_offload_gsuri()
   1128         utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri
   1129         self.assertEqual(expected_gsuri, gsuri)
   1130 
   1131         self.god.check_playback()
   1132 
   1133     def test_get_moblab_offload_gsuri(self):
   1134         """Test default lab offload gsuri ."""
   1135         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1136         self.god.stub_function_to_return(utils, 'is_moblab', True)
   1137         self.god.stub_function_to_return(utils,
   1138                 'get_moblab_serial_number', 'test_serial_number')
   1139         self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
   1140         gsuri = '%s%s/%s/' % (
   1141                 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id')
   1142         self.assertEqual(gsuri, utils.get_offload_gsuri())
   1143 
   1144         self.god.check_playback()
   1145 
   1146 
   1147 
   1148 class  MockMetricsTest(unittest.TestCase):
   1149     """Test metrics mock class can handle various metrics calls."""
   1150 
   1151     def test_Counter(self):
   1152         """Test the mock class can create an instance and call any method.
   1153         """
   1154         c = metrics.Counter('counter')
   1155         c.increment(fields={'key': 1})
   1156 
   1157 
   1158     def test_Context(self):
   1159         """Test the mock class can handle context class.
   1160         """
   1161         test_value = None
   1162         with metrics.SecondsTimer('context') as t:
   1163             test_value = 'called_in_context'
   1164             t['random_key'] = 'pass'
   1165         self.assertEqual('called_in_context', test_value)
   1166 
   1167 
   1168     def test_decorator(self):
   1169         """Test the mock class can handle decorator.
   1170         """
   1171         class TestClass(object):
   1172 
   1173             def __init__(self):
   1174                 self.value = None
   1175 
   1176         test_value = TestClass()
   1177         test_value.value = None
   1178         @metrics.SecondsTimerDecorator('decorator')
   1179         def test(arg):
   1180             arg.value = 'called_in_decorator'
   1181 
   1182         test(test_value)
   1183         self.assertEqual('called_in_decorator', test_value.value)
   1184 
   1185 
   1186     def test_setitem(self):
   1187         """Test the mock class can handle set item call.
   1188         """
   1189         timer = metrics.SecondsTimer('name')
   1190         timer['random_key'] = 'pass'
   1191 
   1192 
   1193 
   1194 if __name__ == "__main__":
   1195     unittest.main()
   1196