Home | History | Annotate | Download | only in common_lib
      1 #!/usr/bin/python
      2 
      3 # pylint: disable=missing-docstring
      4 
      5 import StringIO
      6 import errno
      7 import itertools
      8 import logging
      9 import os
     10 import select
     11 import socket
     12 import subprocess
     13 import time
     14 import unittest
     15 import urllib2
     16 
     17 import common
     18 from autotest_lib.client.common_lib import autotemp
     19 from autotest_lib.client.common_lib import utils
     20 from autotest_lib.client.common_lib.test_utils import mock
     21 
     22 # mock 1.0.0 (in site-packages/chromite/third_party/mock.py)
     23 # which is an ancestor of Python's default library starting from Python 3.3.
     24 # See https://docs.python.org/3/library/unittest.mock.html
     25 import mock as pymock
     26 
     27 metrics = utils.metrics_mock
     28 
     29 
     30 class test_read_one_line(unittest.TestCase):
     31     def setUp(self):
     32         self.god = mock.mock_god(ut=self)
     33         self.god.stub_function(utils, "open")
     34 
     35 
     36     def tearDown(self):
     37         self.god.unstub_all()
     38 
     39 
     40     def test_ip_to_long(self):
     41         self.assertEqual(utils.ip_to_long('0.0.0.0'), 0)
     42         self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295)
     43         self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521)
     44         self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320)
     45 
     46 
     47     def test_long_to_ip(self):
     48         self.assertEqual(utils.long_to_ip(0), '0.0.0.0')
     49         self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255')
     50         self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1')
     51         self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8')
     52 
     53 
     54     def test_create_subnet_mask(self):
     55         self.assertEqual(utils.create_subnet_mask(0), 0)
     56         self.assertEqual(utils.create_subnet_mask(32), 4294967295)
     57         self.assertEqual(utils.create_subnet_mask(25), 4294967168)
     58 
     59 
     60     def test_format_ip_with_mask(self):
     61         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0),
     62                          '0.0.0.0/0')
     63         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32),
     64                          '192.168.0.1/32')
     65         self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26),
     66                          '192.168.0.0/26')
     67         self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26),
     68                          '192.168.0.192/26')
     69 
     70 
     71     def create_test_file(self, contents):
     72         test_file = StringIO.StringIO(contents)
     73         utils.open.expect_call("filename", "r").and_return(test_file)
     74 
     75 
     76     def test_reads_one_line_file(self):
     77         self.create_test_file("abc\n")
     78         self.assertEqual("abc", utils.read_one_line("filename"))
     79         self.god.check_playback()
     80 
     81 
     82     def test_strips_read_lines(self):
     83         self.create_test_file("abc   \n")
     84         self.assertEqual("abc   ", utils.read_one_line("filename"))
     85         self.god.check_playback()
     86 
     87 
     88     def test_drops_extra_lines(self):
     89         self.create_test_file("line 1\nline 2\nline 3\n")
     90         self.assertEqual("line 1", utils.read_one_line("filename"))
     91         self.god.check_playback()
     92 
     93 
     94     def test_works_on_empty_file(self):
     95         self.create_test_file("")
     96         self.assertEqual("", utils.read_one_line("filename"))
     97         self.god.check_playback()
     98 
     99 
    100     def test_works_on_file_with_no_newlines(self):
    101         self.create_test_file("line but no newline")
    102         self.assertEqual("line but no newline",
    103                          utils.read_one_line("filename"))
    104         self.god.check_playback()
    105 
    106 
    107     def test_preserves_leading_whitespace(self):
    108         self.create_test_file("   has leading whitespace")
    109         self.assertEqual("   has leading whitespace",
    110                          utils.read_one_line("filename"))
    111 
    112 
    113 class test_write_one_line(unittest.TestCase):
    114     def setUp(self):
    115         self.god = mock.mock_god(ut=self)
    116         self.god.stub_function(utils, "open")
    117 
    118 
    119     def tearDown(self):
    120         self.god.unstub_all()
    121 
    122 
    123     def get_write_one_line_output(self, content):
    124         test_file = mock.SaveDataAfterCloseStringIO()
    125         utils.open.expect_call("filename", "w").and_return(test_file)
    126         utils.write_one_line("filename", content)
    127         self.god.check_playback()
    128         return test_file.final_data
    129 
    130 
    131     def test_writes_one_line_file(self):
    132         self.assertEqual("abc\n", self.get_write_one_line_output("abc"))
    133 
    134 
    135     def test_preserves_existing_newline(self):
    136         self.assertEqual("abc\n", self.get_write_one_line_output("abc\n"))
    137 
    138 
    139     def test_preserves_leading_whitespace(self):
    140         self.assertEqual("   abc\n", self.get_write_one_line_output("   abc"))
    141 
    142 
    143     def test_preserves_trailing_whitespace(self):
    144         self.assertEqual("abc   \n", self.get_write_one_line_output("abc   "))
    145 
    146 
    147     def test_handles_empty_input(self):
    148         self.assertEqual("\n", self.get_write_one_line_output(""))
    149 
    150 
    151 class test_open_write_close(unittest.TestCase):
    152     def setUp(self):
    153         self.god = mock.mock_god(ut=self)
    154         self.god.stub_function(utils, "open")
    155 
    156 
    157     def tearDown(self):
    158         self.god.unstub_all()
    159 
    160 
    161     def test_simple_functionality(self):
    162         data = "\n\nwhee\n"
    163         test_file = mock.SaveDataAfterCloseStringIO()
    164         utils.open.expect_call("filename", "w").and_return(test_file)
    165         utils.open_write_close("filename", data)
    166         self.god.check_playback()
    167         self.assertEqual(data, test_file.final_data)
    168 
    169 
    170 class test_read_keyval(unittest.TestCase):
    171     def setUp(self):
    172         self.god = mock.mock_god(ut=self)
    173         self.god.stub_function(utils, "open")
    174         self.god.stub_function(os.path, "isdir")
    175         self.god.stub_function(os.path, "exists")
    176 
    177 
    178     def tearDown(self):
    179         self.god.unstub_all()
    180 
    181 
    182     def create_test_file(self, filename, contents):
    183         test_file = StringIO.StringIO(contents)
    184         os.path.exists.expect_call(filename).and_return(True)
    185         utils.open.expect_call(filename).and_return(test_file)
    186 
    187 
    188     def read_keyval(self, contents):
    189         os.path.isdir.expect_call("file").and_return(False)
    190         self.create_test_file("file", contents)
    191         keyval = utils.read_keyval("file")
    192         self.god.check_playback()
    193         return keyval
    194 
    195 
    196     def test_returns_empty_when_file_doesnt_exist(self):
    197         os.path.isdir.expect_call("file").and_return(False)
    198         os.path.exists.expect_call("file").and_return(False)
    199         self.assertEqual({}, utils.read_keyval("file"))
    200         self.god.check_playback()
    201 
    202 
    203     def test_accesses_files_directly(self):
    204         os.path.isdir.expect_call("file").and_return(False)
    205         self.create_test_file("file", "")
    206         utils.read_keyval("file")
    207         self.god.check_playback()
    208 
    209 
    210     def test_accesses_directories_through_keyval_file(self):
    211         os.path.isdir.expect_call("dir").and_return(True)
    212         self.create_test_file("dir/keyval", "")
    213         utils.read_keyval("dir")
    214         self.god.check_playback()
    215 
    216 
    217     def test_values_are_rstripped(self):
    218         keyval = self.read_keyval("a=b   \n")
    219         self.assertEquals(keyval, {"a": "b"})
    220 
    221 
    222     def test_comments_are_ignored(self):
    223         keyval = self.read_keyval("a=b # a comment\n")
    224         self.assertEquals(keyval, {"a": "b"})
    225 
    226 
    227     def test_integers_become_ints(self):
    228         keyval = self.read_keyval("a=1\n")
    229         self.assertEquals(keyval, {"a": 1})
    230         self.assertEquals(int, type(keyval["a"]))
    231 
    232 
    233     def test_float_values_become_floats(self):
    234         keyval = self.read_keyval("a=1.5\n")
    235         self.assertEquals(keyval, {"a": 1.5})
    236         self.assertEquals(float, type(keyval["a"]))
    237 
    238 
    239     def test_multiple_lines(self):
    240         keyval = self.read_keyval("a=one\nb=two\n")
    241         self.assertEquals(keyval, {"a": "one", "b": "two"})
    242 
    243 
    244     def test_the_last_duplicate_line_is_used(self):
    245         keyval = self.read_keyval("a=one\nb=two\na=three\n")
    246         self.assertEquals(keyval, {"a": "three", "b": "two"})
    247 
    248 
    249     def test_extra_equals_are_included_in_values(self):
    250         keyval = self.read_keyval("a=b=c\n")
    251         self.assertEquals(keyval, {"a": "b=c"})
    252 
    253 
    254     def test_non_alphanumeric_keynames_are_rejected(self):
    255         self.assertRaises(ValueError, self.read_keyval, "a$=one\n")
    256 
    257 
    258     def test_underscores_are_allowed_in_key_names(self):
    259         keyval = self.read_keyval("a_b=value\n")
    260         self.assertEquals(keyval, {"a_b": "value"})
    261 
    262 
    263     def test_dashes_are_allowed_in_key_names(self):
    264         keyval = self.read_keyval("a-b=value\n")
    265         self.assertEquals(keyval, {"a-b": "value"})
    266 
    267     def test_empty_value_is_allowed(self):
    268         keyval = self.read_keyval("a=\n")
    269         self.assertEquals(keyval, {"a": ""})
    270 
    271 
    272 class test_write_keyval(unittest.TestCase):
    273     def setUp(self):
    274         self.god = mock.mock_god(ut=self)
    275         self.god.stub_function(utils, "open")
    276         self.god.stub_function(os.path, "isdir")
    277 
    278 
    279     def tearDown(self):
    280         self.god.unstub_all()
    281 
    282 
    283     def assertHasLines(self, value, lines):
    284         vlines = value.splitlines()
    285         vlines.sort()
    286         self.assertEquals(vlines, sorted(lines))
    287 
    288 
    289     def write_keyval(self, filename, dictionary, expected_filename=None,
    290                      type_tag=None):
    291         if expected_filename is None:
    292             expected_filename = filename
    293         test_file = StringIO.StringIO()
    294         self.god.stub_function(test_file, "close")
    295         utils.open.expect_call(expected_filename, "a").and_return(test_file)
    296         test_file.close.expect_call()
    297         if type_tag is None:
    298             utils.write_keyval(filename, dictionary)
    299         else:
    300             utils.write_keyval(filename, dictionary, type_tag)
    301         return test_file.getvalue()
    302 
    303 
    304     def write_keyval_file(self, dictionary, type_tag=None):
    305         os.path.isdir.expect_call("file").and_return(False)
    306         return self.write_keyval("file", dictionary, type_tag=type_tag)
    307 
    308 
    309     def test_accesses_files_directly(self):
    310         os.path.isdir.expect_call("file").and_return(False)
    311         result = self.write_keyval("file", {"a": "1"})
    312         self.assertEquals(result, "a=1\n")
    313 
    314 
    315     def test_accesses_directories_through_keyval_file(self):
    316         os.path.isdir.expect_call("dir").and_return(True)
    317         result = self.write_keyval("dir", {"b": "2"}, "dir/keyval")
    318         self.assertEquals(result, "b=2\n")
    319 
    320 
    321     def test_numbers_are_stringified(self):
    322         result = self.write_keyval_file({"c": 3})
    323         self.assertEquals(result, "c=3\n")
    324 
    325 
    326     def test_type_tags_are_excluded_by_default(self):
    327         result = self.write_keyval_file({"d": "a string"})
    328         self.assertEquals(result, "d=a string\n")
    329         self.assertRaises(ValueError, self.write_keyval_file,
    330                           {"d{perf}": "a string"})
    331 
    332 
    333     def test_perf_tags_are_allowed(self):
    334         result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2},
    335                                         type_tag="perf")
    336         self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"])
    337         self.assertRaises(ValueError, self.write_keyval_file,
    338                           {"a": 1, "b": 2}, type_tag="perf")
    339 
    340 
    341     def test_non_alphanumeric_keynames_are_rejected(self):
    342         self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0})
    343 
    344 
    345     def test_underscores_are_allowed_in_key_names(self):
    346         result = self.write_keyval_file({"a_b": "value"})
    347         self.assertEquals(result, "a_b=value\n")
    348 
    349 
    350     def test_dashes_are_allowed_in_key_names(self):
    351         result = self.write_keyval_file({"a-b": "value"})
    352         self.assertEquals(result, "a-b=value\n")
    353 
    354 
    355 class test_is_url(unittest.TestCase):
    356     def test_accepts_http(self):
    357         self.assertTrue(utils.is_url("http://example.com"))
    358 
    359 
    360     def test_accepts_ftp(self):
    361         self.assertTrue(utils.is_url("ftp://ftp.example.com"))
    362 
    363 
    364     def test_rejects_local_path(self):
    365         self.assertFalse(utils.is_url("/home/username/file"))
    366 
    367 
    368     def test_rejects_local_filename(self):
    369         self.assertFalse(utils.is_url("filename"))
    370 
    371 
    372     def test_rejects_relative_local_path(self):
    373         self.assertFalse(utils.is_url("somedir/somesubdir/file"))
    374 
    375 
    376     def test_rejects_local_path_containing_url(self):
    377         self.assertFalse(utils.is_url("somedir/http://path/file"))
    378 
    379 
    380 class test_urlopen(unittest.TestCase):
    381     def setUp(self):
    382         self.god = mock.mock_god(ut=self)
    383 
    384 
    385     def tearDown(self):
    386         self.god.unstub_all()
    387 
    388 
    389     def stub_urlopen_with_timeout_comparison(self, test_func, expected_return,
    390                                              *expected_args):
    391         expected_args += (None,) * (2 - len(expected_args))
    392         def urlopen(url, data=None):
    393             self.assertEquals(expected_args, (url,data))
    394             test_func(socket.getdefaulttimeout())
    395             return expected_return
    396         self.god.stub_with(urllib2, "urlopen", urlopen)
    397 
    398 
    399     def stub_urlopen_with_timeout_check(self, expected_timeout,
    400                                         expected_return, *expected_args):
    401         def test_func(timeout):
    402             self.assertEquals(timeout, expected_timeout)
    403         self.stub_urlopen_with_timeout_comparison(test_func, expected_return,
    404                                                   *expected_args)
    405 
    406 
    407     def test_timeout_set_during_call(self):
    408         self.stub_urlopen_with_timeout_check(30, "retval", "url")
    409         retval = utils.urlopen("url", timeout=30)
    410         self.assertEquals(retval, "retval")
    411 
    412 
    413     def test_timeout_reset_after_call(self):
    414         old_timeout = socket.getdefaulttimeout()
    415         self.stub_urlopen_with_timeout_check(30, None, "url")
    416         try:
    417             socket.setdefaulttimeout(1234)
    418             utils.urlopen("url", timeout=30)
    419             self.assertEquals(1234, socket.getdefaulttimeout())
    420         finally:
    421             socket.setdefaulttimeout(old_timeout)
    422 
    423 
    424     def test_timeout_set_by_default(self):
    425         def test_func(timeout):
    426             self.assertTrue(timeout is not None)
    427         self.stub_urlopen_with_timeout_comparison(test_func, None, "url")
    428         utils.urlopen("url")
    429 
    430 
    431     def test_args_are_untouched(self):
    432         self.stub_urlopen_with_timeout_check(30, None, "http://url",
    433                                              "POST data")
    434         utils.urlopen("http://url", timeout=30, data="POST data")
    435 
    436 
    437 class test_urlretrieve(unittest.TestCase):
    438     def setUp(self):
    439         self.god = mock.mock_god(ut=self)
    440 
    441 
    442     def tearDown(self):
    443         self.god.unstub_all()
    444 
    445 
    446     def test_urlopen_passed_arguments(self):
    447         self.god.stub_function(utils, "urlopen")
    448         self.god.stub_function(utils.shutil, "copyfileobj")
    449         self.god.stub_function(utils, "open")
    450 
    451         url = "url"
    452         dest = "somefile"
    453         data = object()
    454         timeout = 10
    455 
    456         src_file = self.god.create_mock_class(file, "file")
    457         dest_file = self.god.create_mock_class(file, "file")
    458 
    459         (utils.urlopen.expect_call(url, data=data, timeout=timeout)
    460                 .and_return(src_file))
    461         utils.open.expect_call(dest, "wb").and_return(dest_file)
    462         utils.shutil.copyfileobj.expect_call(src_file, dest_file)
    463         dest_file.close.expect_call()
    464         src_file.close.expect_call()
    465 
    466         utils.urlretrieve(url, dest, data=data, timeout=timeout)
    467         self.god.check_playback()
    468 
    469 
    470 class test_merge_trees(unittest.TestCase):
    471     # a some path-handling helper functions
    472     def src(self, *path_segments):
    473         return os.path.join(self.src_tree.name, *path_segments)
    474 
    475 
    476     def dest(self, *path_segments):
    477         return os.path.join(self.dest_tree.name, *path_segments)
    478 
    479 
    480     def paths(self, *path_segments):
    481         return self.src(*path_segments), self.dest(*path_segments)
    482 
    483 
    484     def assertFileEqual(self, *path_segments):
    485         src, dest = self.paths(*path_segments)
    486         self.assertEqual(True, os.path.isfile(src))
    487         self.assertEqual(True, os.path.isfile(dest))
    488         self.assertEqual(os.path.getsize(src), os.path.getsize(dest))
    489         self.assertEqual(open(src).read(), open(dest).read())
    490 
    491 
    492     def assertFileContents(self, contents, *path_segments):
    493         dest = self.dest(*path_segments)
    494         self.assertEqual(True, os.path.isfile(dest))
    495         self.assertEqual(os.path.getsize(dest), len(contents))
    496         self.assertEqual(contents, open(dest).read())
    497 
    498 
    499     def setUp(self):
    500         self.src_tree = autotemp.tempdir(unique_id='utilsrc')
    501         self.dest_tree = autotemp.tempdir(unique_id='utilsdest')
    502 
    503         # empty subdirs
    504         os.mkdir(self.src("empty"))
    505         os.mkdir(self.dest("empty"))
    506 
    507 
    508     def tearDown(self):
    509         self.src_tree.clean()
    510         self.dest_tree.clean()
    511 
    512 
    513     def test_both_dont_exist(self):
    514         utils.merge_trees(*self.paths("empty"))
    515 
    516 
    517     def test_file_only_at_src(self):
    518         print >> open(self.src("src_only"), "w"), "line 1"
    519         utils.merge_trees(*self.paths("src_only"))
    520         self.assertFileEqual("src_only")
    521 
    522 
    523     def test_file_only_at_dest(self):
    524         print >> open(self.dest("dest_only"), "w"), "line 1"
    525         utils.merge_trees(*self.paths("dest_only"))
    526         self.assertEqual(False, os.path.exists(self.src("dest_only")))
    527         self.assertFileContents("line 1\n", "dest_only")
    528 
    529 
    530     def test_file_at_both(self):
    531         print >> open(self.dest("in_both"), "w"), "line 1"
    532         print >> open(self.src("in_both"), "w"), "line 2"
    533         utils.merge_trees(*self.paths("in_both"))
    534         self.assertFileContents("line 1\nline 2\n", "in_both")
    535 
    536 
    537     def test_directory_with_files_in_both(self):
    538         print >> open(self.dest("in_both"), "w"), "line 1"
    539         print >> open(self.src("in_both"), "w"), "line 3"
    540         utils.merge_trees(*self.paths())
    541         self.assertFileContents("line 1\nline 3\n", "in_both")
    542 
    543 
    544     def test_directory_with_mix_of_files(self):
    545         print >> open(self.dest("in_dest"), "w"), "dest line"
    546         print >> open(self.src("in_src"), "w"), "src line"
    547         utils.merge_trees(*self.paths())
    548         self.assertFileContents("dest line\n", "in_dest")
    549         self.assertFileContents("src line\n", "in_src")
    550 
    551 
    552     def test_directory_with_subdirectories(self):
    553         os.mkdir(self.src("src_subdir"))
    554         print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line"
    555         os.mkdir(self.src("both_subdir"))
    556         os.mkdir(self.dest("both_subdir"))
    557         print >> open(self.src("both_subdir", "subfile"), "w"), "src line"
    558         print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line"
    559         utils.merge_trees(*self.paths())
    560         self.assertFileContents("subdir line\n", "src_subdir", "subfile")
    561         self.assertFileContents("dest line\nsrc line\n", "both_subdir",
    562                                 "subfile")
    563 
    564 
    565 class test_get_relative_path(unittest.TestCase):
    566     def test_not_absolute(self):
    567         self.assertRaises(AssertionError, utils.get_relative_path, "a", "b")
    568 
    569     def test_same_dir(self):
    570         self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c")
    571 
    572     def test_forward_dir(self):
    573         self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d")
    574 
    575     def test_previous_dir(self):
    576         self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..")
    577 
    578     def test_parallel_dir(self):
    579         self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"),
    580                          "../../../c/d")
    581 
    582 
    583 class test_sh_escape(unittest.TestCase):
    584     def _test_in_shell(self, text):
    585         escaped_text = utils.sh_escape(text)
    586         proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True,
    587                                 stdin=open(os.devnull, 'r'),
    588                                 stdout=subprocess.PIPE,
    589                                 stderr=open(os.devnull, 'w'))
    590         stdout, _ = proc.communicate()
    591         self.assertEqual(proc.returncode, 0)
    592         self.assertEqual(stdout[:-1], text)
    593 
    594 
    595     def test_normal_string(self):
    596         self._test_in_shell('abcd')
    597 
    598 
    599     def test_spaced_string(self):
    600         self._test_in_shell('abcd efgh')
    601 
    602 
    603     def test_dollar(self):
    604         self._test_in_shell('$')
    605 
    606 
    607     def test_single_quote(self):
    608         self._test_in_shell('\'')
    609 
    610 
    611     def test_single_quoted_string(self):
    612         self._test_in_shell('\'efgh\'')
    613 
    614 
    615     def test_string_with_single_quote(self):
    616         self._test_in_shell("a'b")
    617 
    618 
    619     def test_string_with_escaped_single_quote(self):
    620         self._test_in_shell(r"a\'b")
    621 
    622 
    623     def test_double_quote(self):
    624         self._test_in_shell('"')
    625 
    626 
    627     def test_double_quoted_string(self):
    628         self._test_in_shell('"abcd"')
    629 
    630 
    631     def test_backtick(self):
    632         self._test_in_shell('`')
    633 
    634 
    635     def test_backticked_string(self):
    636         self._test_in_shell('`jklm`')
    637 
    638 
    639     def test_backslash(self):
    640         self._test_in_shell('\\')
    641 
    642 
    643     def test_backslashed_special_characters(self):
    644         self._test_in_shell('\\$')
    645         self._test_in_shell('\\"')
    646         self._test_in_shell('\\\'')
    647         self._test_in_shell('\\`')
    648 
    649 
    650     def test_backslash_codes(self):
    651         self._test_in_shell('\\n')
    652         self._test_in_shell('\\r')
    653         self._test_in_shell('\\t')
    654         self._test_in_shell('\\v')
    655         self._test_in_shell('\\b')
    656         self._test_in_shell('\\a')
    657         self._test_in_shell('\\000')
    658 
    659     def test_real_newline(self):
    660         self._test_in_shell('\n')
    661         self._test_in_shell('\\\n')
    662 
    663 
    664 class test_sh_quote_word(test_sh_escape):
    665     """Run tests on sh_quote_word.
    666 
    667     Inherit from test_sh_escape to get the same tests to run on both.
    668     """
    669 
    670     def _test_in_shell(self, text):
    671         quoted_word = utils.sh_quote_word(text)
    672         echoed_value = subprocess.check_output('echo %s' % quoted_word,
    673                                                shell=True)
    674         self.assertEqual(echoed_value, text + '\n')
    675 
    676 
    677 class test_nested_sh_quote_word(test_sh_quote_word):
    678     """Run nested tests on sh_quote_word.
    679 
    680     Inherit from test_sh_quote_word to get the same tests to run on both.
    681     """
    682 
    683     def _test_in_shell(self, text):
    684         command = 'echo ' + utils.sh_quote_word(text)
    685         nested_command = 'echo ' + utils.sh_quote_word(command)
    686         produced_command = subprocess.check_output(nested_command, shell=True)
    687         echoed_value = subprocess.check_output(produced_command, shell=True)
    688         self.assertEqual(echoed_value, text + '\n')
    689 
    690 
    691 class test_run(unittest.TestCase):
    692     """
    693     Test the utils.run() function.
    694 
    695     Note: This test runs simple external commands to test the utils.run()
    696     API without assuming implementation details.
    697     """
    698 
    699     # Log levels in ascending severity.
    700     LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR,
    701                   logging.CRITICAL]
    702 
    703 
    704     def setUp(self):
    705         self.god = mock.mock_god(ut=self)
    706         self.god.stub_function(utils.logging, 'warning')
    707         self.god.stub_function(utils.logging, 'debug')
    708 
    709         # Log level -> StringIO.StringIO.
    710         self.logs = {}
    711         for level in self.LOG_LEVELS:
    712             self.logs[level] = StringIO.StringIO()
    713 
    714         # Override logging_manager.LoggingFile to return buffers.
    715         def logging_file(level=None, prefix=None):
    716             return self.logs[level]
    717         self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file)
    718 
    719     def tearDown(self):
    720         self.god.unstub_all()
    721 
    722 
    723     def __check_result(self, result, command, exit_status=0, stdout='',
    724                        stderr=''):
    725         self.assertEquals(result.command, command)
    726         self.assertEquals(result.exit_status, exit_status)
    727         self.assertEquals(result.stdout, stdout)
    728         self.assertEquals(result.stderr, stderr)
    729 
    730 
    731     def __get_logs(self):
    732         """Returns contents of log buffers at all levels.
    733 
    734             @return: 5-element list of strings corresponding to logged messages
    735                 at the levels in self.LOG_LEVELS.
    736         """
    737         return [self.logs[v].getvalue() for v in self.LOG_LEVELS]
    738 
    739 
    740     def test_default_simple(self):
    741         cmd = 'echo "hello world"'
    742         # expect some king of logging.debug() call but don't care about args
    743         utils.logging.debug.expect_any_call()
    744         self.__check_result(utils.run(cmd), cmd, stdout='hello world\n')
    745 
    746 
    747     def test_default_failure(self):
    748         cmd = 'exit 11'
    749         try:
    750             utils.run(cmd, verbose=False)
    751         except utils.error.CmdError, err:
    752             self.__check_result(err.result_obj, cmd, exit_status=11)
    753 
    754 
    755     def test_ignore_status(self):
    756         cmd = 'echo error >&2 && exit 11'
    757         self.__check_result(utils.run(cmd, ignore_status=True, verbose=False),
    758                             cmd, exit_status=11, stderr='error\n')
    759 
    760 
    761     def test_timeout(self):
    762         # we expect a logging.warning() message, don't care about the contents
    763         utils.logging.warning.expect_any_call()
    764         try:
    765             utils.run('echo -n output && sleep 10', timeout=1, verbose=False)
    766         except utils.error.CmdError, err:
    767             self.assertEquals(err.result_obj.stdout, 'output')
    768 
    769 
    770     def test_stdout_stderr_tee(self):
    771         cmd = 'echo output && echo error >&2'
    772         stdout_tee = StringIO.StringIO()
    773         stderr_tee = StringIO.StringIO()
    774 
    775         self.__check_result(utils.run(
    776                 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee,
    777                 verbose=False), cmd, stdout='output\n', stderr='error\n')
    778         self.assertEqual(stdout_tee.getvalue(), 'output\n')
    779         self.assertEqual(stderr_tee.getvalue(), 'error\n')
    780 
    781 
    782     def test_stdin_string(self):
    783         cmd = 'cat'
    784         self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'),
    785                             cmd, stdout='hi!\n')
    786 
    787 
    788     def test_stdout_tee_to_logs_info(self):
    789         """Test logging stdout at the info level."""
    790         utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
    791                   stdout_level=logging.INFO, verbose=False)
    792         self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', ''])
    793 
    794 
    795     def test_stdout_tee_to_logs_warning(self):
    796         """Test logging stdout at the warning level."""
    797         utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS,
    798                   stdout_level=logging.WARNING, verbose=False)
    799         self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', ''])
    800 
    801 
    802     def test_stdout_and_stderr_tee_to_logs(self):
    803         """Test simultaneous stdout and stderr log levels."""
    804         utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
    805                   stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
    806                   stderr_level=logging.ERROR, verbose=False)
    807         self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', ''])
    808 
    809 
    810     def test_default_expected_stderr_log_level(self):
    811         """Test default expected stderr log level.
    812 
    813         stderr should be logged at the same level as stdout when
    814         stderr_is_expected is true and stderr_level isn't passed.
    815         """
    816         utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS,
    817                   stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO,
    818                   stderr_is_expected=True, verbose=False)
    819         self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', ''])
    820 
    821 
    822     def test_safe_args(self):
    823         # NOTE: The string in expected_quoted_cmd depends on the internal
    824         # implementation of shell quoting which is used by utils.run(),
    825         # in this case, sh_quote_word().
    826         expected_quoted_cmd = "echo 'hello \"world' again"
    827         self.__check_result(utils.run(
    828                 'echo', verbose=False, args=('hello "world', 'again')),
    829                 expected_quoted_cmd, stdout='hello "world again\n')
    830 
    831 
    832     def test_safe_args_given_string(self):
    833         self.assertRaises(TypeError, utils.run, 'echo', args='hello')
    834 
    835 
    836     def test_wait_interrupt(self):
    837         """Test that we actually select twice if the first one returns EINTR."""
    838         utils.logging.debug.expect_any_call()
    839 
    840         bg_job = utils.BgJob('echo "hello world"')
    841         bg_job.result.exit_status = 0
    842         self.god.stub_function(utils.select, 'select')
    843 
    844         utils.select.select.expect_any_call().and_raises(
    845                 select.error(errno.EINTR, 'Select interrupted'))
    846         utils.logging.warning.expect_any_call()
    847 
    848         utils.select.select.expect_any_call().and_return(
    849                 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None))
    850         utils.logging.warning.expect_any_call()
    851 
    852         self.assertFalse(
    853                 utils._wait_for_commands([bg_job], time.time(), None))
    854 
    855 
    856 class test_compare_versions(unittest.TestCase):
    857     def test_zerofill(self):
    858         self.assertEqual(utils.compare_versions('1.7', '1.10'), -1)
    859         self.assertEqual(utils.compare_versions('1.222', '1.3'), 1)
    860         self.assertEqual(utils.compare_versions('1.03', '1.3'), 0)
    861 
    862 
    863     def test_unequal_len(self):
    864         self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1)
    865         self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1)
    866 
    867 
    868     def test_dash_delimited(self):
    869         self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1)
    870         self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1)
    871         self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0)
    872 
    873 
    874     def test_alphabets(self):
    875         self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1)
    876         self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1)
    877         self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0)
    878 
    879 
    880     def test_mix_symbols(self):
    881         self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1)
    882         self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1)
    883         self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0)
    884 
    885         self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1)
    886         self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1)
    887         self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0)
    888 
    889 
    890 class test_args_to_dict(unittest.TestCase):
    891     def test_no_args(self):
    892         result = utils.args_to_dict([])
    893         self.assertEqual({}, result)
    894 
    895 
    896     def test_matches(self):
    897         result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:',
    898                                      'F__o0O=', 'B8r:=:=', '_bAZ_=:=:'])
    899         self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'',
    900                                   'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'})
    901 
    902 
    903     def test_unmatches(self):
    904         # Temporarily shut warning messages from args_to_dict() when an argument
    905         # doesn't match its pattern.
    906         logger = logging.getLogger()
    907         saved_level = logger.level
    908         logger.setLevel(logging.ERROR)
    909 
    910         try:
    911             result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b',
    912                                          ':VAL', '=VVV', 'WORD'])
    913             self.assertEqual({}, result)
    914         finally:
    915             # Restore level.
    916             logger.setLevel(saved_level)
    917 
    918 
    919 class test_get_random_port(unittest.TestCase):
    920     def do_bind(self, port, socket_type, socket_proto):
    921         s = socket.socket(socket.AF_INET, socket_type, socket_proto)
    922         s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    923         s.bind(('', port))
    924         return s
    925 
    926 
    927     def test_get_port(self):
    928         for _ in xrange(100):
    929             p = utils.get_unused_port()
    930             s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP)
    931             self.assert_(s.getsockname())
    932             s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
    933             self.assert_(s.getsockname())
    934 
    935 
    936 def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    937     """Test global function.
    938     """
    939 
    940 
    941 class TestClass(object):
    942     """Test class.
    943     """
    944 
    945     def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    946         """Test instance function.
    947         """
    948 
    949 
    950     @classmethod
    951     def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    952         """Test class function.
    953         """
    954 
    955 
    956     @staticmethod
    957     def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6):
    958         """Test static function.
    959         """
    960 
    961 
    962 class GetFunctionArgUnittest(unittest.TestCase):
    963     """Tests for method get_function_arg_value."""
    964 
    965     def run_test(self, func, insert_arg):
    966         """Run test.
    967 
    968         @param func: Function being called with given arguments.
    969         @param insert_arg: Set to True to insert an object in the argument list.
    970                            This is to mock instance/class object.
    971         """
    972         if insert_arg:
    973             args = (None, 1, 2, 3)
    974         else:
    975             args = (1, 2, 3)
    976         for i in range(1, 7):
    977             self.assertEquals(utils.get_function_arg_value(
    978                     func, 'arg%d'%i, args, {}), i)
    979 
    980         self.assertEquals(utils.get_function_arg_value(
    981                 func, 'arg7', args, {'arg7': 7}), 7)
    982         self.assertRaises(
    983                 KeyError, utils.get_function_arg_value,
    984                 func, 'arg3', args[:-1], {})
    985 
    986 
    987     def test_global_function(self):
    988         """Test global function.
    989         """
    990         self.run_test(test_function, False)
    991 
    992 
    993     def test_instance_function(self):
    994         """Test instance function.
    995         """
    996         self.run_test(TestClass().test_instance_function, True)
    997 
    998 
    999     def test_class_function(self):
   1000         """Test class function.
   1001         """
   1002         self.run_test(TestClass.test_class_function, True)
   1003 
   1004 
   1005     def test_static_function(self):
   1006         """Test static function.
   1007         """
   1008         self.run_test(TestClass.test_static_function, False)
   1009 
   1010 
   1011 class IsInSameSubnetUnittest(unittest.TestCase):
   1012     """Test is_in_same_subnet function."""
   1013 
   1014     def test_is_in_same_subnet(self):
   1015         """Test is_in_same_subnet function."""
   1016         self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
   1017                                                 23))
   1018         self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2',
   1019                                                 24))
   1020         self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255',
   1021                                                 24))
   1022         self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0',
   1023                                                 24))
   1024 
   1025 
   1026 class GetWirelessSsidUnittest(unittest.TestCase):
   1027     """Test get_wireless_ssid function."""
   1028 
   1029     DEFAULT_SSID = 'default'
   1030     SSID_1 = 'ssid_1'
   1031     SSID_2 = 'ssid_2'
   1032     SSID_3 = 'ssid_3'
   1033 
   1034     def test_get_wireless_ssid(self):
   1035         """Test is_in_same_subnet function."""
   1036         god = mock.mock_god()
   1037         god.stub_function_to_return(utils.CONFIG, 'get_config_value',
   1038                                     self.DEFAULT_SSID)
   1039         god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex',
   1040                                     {'wireless_ssid_1.2.3.4/24': self.SSID_1,
   1041                                      'wireless_ssid_4.3.2.1/16': self.SSID_2,
   1042                                      'wireless_ssid_4.3.2.111/32': self.SSID_3})
   1043         self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100'))
   1044         self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100'))
   1045         self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111'))
   1046         self.assertEqual(self.DEFAULT_SSID,
   1047                          utils.get_wireless_ssid('100.0.0.100'))
   1048 
   1049 
   1050 class LaunchControlBuildParseUnittest(unittest.TestCase):
   1051     """Test various parsing functions related to Launch Control builds and
   1052     devices.
   1053     """
   1054 
   1055     def test_parse_launch_control_target(self):
   1056         """Test parse_launch_control_target function."""
   1057         target_tests = {
   1058                 ('shamu', 'userdebug'): 'shamu-userdebug',
   1059                 ('shamu', 'eng'): 'shamu-eng',
   1060                 ('shamu-board', 'eng'): 'shamu-board-eng',
   1061                 (None, None): 'bad_target',
   1062                 (None, None): 'target'}
   1063         for result, target in target_tests.items():
   1064             self.assertEqual(result, utils.parse_launch_control_target(target))
   1065 
   1066 
   1067 class GetOffloaderUriTest(unittest.TestCase):
   1068     """Test get_offload_gsuri function."""
   1069     _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket'
   1070 
   1071     def setUp(self):
   1072         self.god = mock.mock_god()
   1073 
   1074     def tearDown(self):
   1075         self.god.unstub_all()
   1076 
   1077     def test_get_default_lab_offload_gsuri(self):
   1078         """Test default lab offload gsuri ."""
   1079         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1080         self.god.stub_function_to_return(utils, 'is_moblab', False)
   1081         self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI,
   1082                 utils.get_offload_gsuri())
   1083 
   1084         self.god.check_playback()
   1085 
   1086     def test_get_default_moblab_offload_gsuri(self):
   1087         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1088         self.god.stub_function_to_return(utils, 'is_moblab', True)
   1089         utils.CONFIG.get_config_value.expect_call(
   1090                 'CROS', 'image_storage_server').and_return(
   1091                         self._IMAGE_STORAGE_SERVER)
   1092         self.god.stub_function_to_return(utils,
   1093                 'get_moblab_serial_number', 'test_serial_number')
   1094         self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
   1095         expected_gsuri = '%sresults/%s/%s/' % (
   1096                 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id')
   1097         cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI
   1098         utils.DEFAULT_OFFLOAD_GSURI = None
   1099         gsuri = utils.get_offload_gsuri()
   1100         utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri
   1101         self.assertEqual(expected_gsuri, gsuri)
   1102 
   1103         self.god.check_playback()
   1104 
   1105     def test_get_moblab_offload_gsuri(self):
   1106         """Test default lab offload gsuri ."""
   1107         self.god.mock_up(utils.CONFIG, 'CONFIG')
   1108         self.god.stub_function_to_return(utils, 'is_moblab', True)
   1109         self.god.stub_function_to_return(utils,
   1110                 'get_moblab_serial_number', 'test_serial_number')
   1111         self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id')
   1112         gsuri = '%s%s/%s/' % (
   1113                 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id')
   1114         self.assertEqual(gsuri, utils.get_offload_gsuri())
   1115 
   1116         self.god.check_playback()
   1117 
   1118 
   1119 
   1120 class  MockMetricsTest(unittest.TestCase):
   1121     """Test metrics mock class can handle various metrics calls."""
   1122 
   1123     def test_Counter(self):
   1124         """Test the mock class can create an instance and call any method.
   1125         """
   1126         c = metrics.Counter('counter')
   1127         c.increment(fields={'key': 1})
   1128 
   1129 
   1130     def test_Context(self):
   1131         """Test the mock class can handle context class.
   1132         """
   1133         test_value = None
   1134         with metrics.SecondsTimer('context') as t:
   1135             test_value = 'called_in_context'
   1136             t['random_key'] = 'pass'
   1137         self.assertEqual('called_in_context', test_value)
   1138 
   1139 
   1140     def test_decorator(self):
   1141         """Test the mock class can handle decorator.
   1142         """
   1143         class TestClass(object):
   1144 
   1145             def __init__(self):
   1146                 self.value = None
   1147 
   1148         test_value = TestClass()
   1149         test_value.value = None
   1150         @metrics.SecondsTimerDecorator('decorator')
   1151         def test(arg):
   1152             arg.value = 'called_in_decorator'
   1153 
   1154         test(test_value)
   1155         self.assertEqual('called_in_decorator', test_value.value)
   1156 
   1157 
   1158     def test_setitem(self):
   1159         """Test the mock class can handle set item call.
   1160         """
   1161         timer = metrics.SecondsTimer('name')
   1162         timer['random_key'] = 'pass'
   1163 
   1164 
   1165 class test_background_sample(unittest.TestCase):
   1166     """Test that the background sample can sample as desired.
   1167     """
   1168 
   1169     def test_can_sample(self):
   1170         """Test that a simple sample will work with no other complications.
   1171         """
   1172         should_be_sampled = 'name'
   1173 
   1174         def sample_function():
   1175             """Return value of variable stored in method."""
   1176             return should_be_sampled
   1177         still_sampling = True
   1178 
   1179         t = utils.background_sample_until_condition(
   1180                 function=sample_function,
   1181                 condition=lambda: still_sampling,
   1182                 timeout=5,
   1183                 sleep_interval=0.1)
   1184         result = t.finish()
   1185         self.assertIn(should_be_sampled, result)
   1186 
   1187 
   1188     def test_samples_multiple_values(self):
   1189         """Test that a sample will work and actually samples at the necessary
   1190         intervals, such that it will pick up changes.
   1191         """
   1192         should_be_sampled = 'name'
   1193 
   1194         def sample_function():
   1195             """Return value of variable stored in method."""
   1196             return should_be_sampled
   1197         still_sampling = True
   1198 
   1199         t = utils.background_sample_until_condition(
   1200                 function=sample_function,
   1201                 condition=lambda: still_sampling,
   1202                 timeout=5,
   1203                 sleep_interval=0.1)
   1204         # Let it sample values some with the initial value.
   1205         time.sleep(2.5)
   1206         # It should also sample some with the new value.
   1207         should_be_sampled = 'noname'
   1208         result = t.finish()
   1209         self.assertIn('name', result)
   1210         self.assertIn('noname', result)
   1211 
   1212 
   1213 class FakeTime(object):
   1214     """Provides time() and sleep() for faking time module.
   1215     """
   1216 
   1217     def __init__(self, start_time):
   1218         self._time = start_time
   1219 
   1220 
   1221     def time(self):
   1222         return self._time
   1223 
   1224 
   1225     def sleep(self, interval):
   1226         self._time += interval
   1227 
   1228 
   1229 class TimeModuleMockTestCase(unittest.TestCase):
   1230     """Mocks up utils.time with a FakeTime.
   1231 
   1232     It substitudes time.time() and time.sleep() with FakeTime.time()
   1233     and FakeTime.sleep(), respectively.
   1234     """
   1235 
   1236     def setUp(self):
   1237         self.fake_time_begin = 10
   1238         self.fake_time = FakeTime(self.fake_time_begin)
   1239         self.patcher = pymock.patch(
   1240             'autotest_lib.client.common_lib.utils.time')
   1241         self.time_mock = self.patcher.start()
   1242         self.addCleanup(self.patcher.stop)
   1243         self.time_mock.time.side_effect = self.fake_time.time
   1244         self.time_mock.sleep.side_effect = self.fake_time.sleep
   1245 
   1246 
   1247 def always_raise():
   1248     """A function that raises an exception."""
   1249     raise Exception('always raise')
   1250 
   1251 
   1252 def fail_n_times(count):
   1253     """Creates a function that returns False for the first count-th calls.
   1254 
   1255     @return a function returns False for the first count-th calls and True
   1256             afterwards.
   1257     """
   1258     counter = itertools.count(count, -1)
   1259     return lambda: next(counter) <= 0
   1260 
   1261 
   1262 class test_poll_for_condition(TimeModuleMockTestCase):
   1263     """Test poll_for_condition.
   1264     """
   1265 
   1266     def test_ok(self):
   1267         """Test polling condition that returns True.
   1268         """
   1269         self.assertTrue(utils.poll_for_condition(lambda: True))
   1270 
   1271 
   1272     def test_ok_evaluated_as_true(self):
   1273         """Test polling condition which's return value is evaluated as True.
   1274         """
   1275         self.assertEqual(1, utils.poll_for_condition(lambda: 1))
   1276 
   1277         self.assertEqual('something',
   1278                          utils.poll_for_condition(lambda: 'something'))
   1279 
   1280 
   1281     def test_fail(self):
   1282         """Test polling condition that returns False.
   1283 
   1284         Expect TimeoutError exception as neither customized exception nor
   1285         exception raised from condition().
   1286         """
   1287         with self.assertRaises(utils.TimeoutError):
   1288             utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1)
   1289         self.assertEqual(3, self.time_mock.sleep.call_count)
   1290 
   1291 
   1292     def test_fail_evaluated_as_false(self):
   1293         """Test polling condition which's return value is evaluated as False.
   1294 
   1295         Expect TimeoutError exception as neither customized exception nor
   1296         exception raised from condition().
   1297         """
   1298         with self.assertRaises(utils.TimeoutError):
   1299             utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1)
   1300         self.assertEqual(3, self.time_mock.sleep.call_count)
   1301 
   1302         with self.assertRaises(utils.TimeoutError):
   1303             utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1)
   1304 
   1305 
   1306     def test_exception_arg(self):
   1307         """Test polling condition always fails.
   1308 
   1309         Expect exception raised by 'exception' args.
   1310         """
   1311         with self.assertRaisesRegexp(Exception, 'from args'):
   1312             utils.poll_for_condition(lambda: False,
   1313                                      exception=Exception('from args'),
   1314                                      timeout=3, sleep_interval=1)
   1315         self.assertEqual(3, self.time_mock.sleep.call_count)
   1316 
   1317 
   1318     def test_exception_from_condition(self):
   1319         """Test polling condition always fails.
   1320 
   1321         Expect exception raised by condition().
   1322         """
   1323         with self.assertRaisesRegexp(Exception, 'always raise'):
   1324             utils.poll_for_condition(always_raise,
   1325                                      exception=Exception('from args'),
   1326                                      timeout=3, sleep_interval=1)
   1327         # For poll_for_condition, if condition() raises exception, it raises
   1328         # immidiately without retry. So sleep() should not be called.
   1329         self.time_mock.sleep.assert_not_called()
   1330 
   1331 
   1332     def test_ok_after_retry(self):
   1333         """Test polling a condition which is success after retry twice.
   1334         """
   1335         self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3,
   1336                                                  sleep_interval=1))
   1337 
   1338 
   1339     def test_cannot_wait(self):
   1340         """Test polling a condition which fails till timeout.
   1341         """
   1342         with self.assertRaisesRegexp(
   1343                 utils.TimeoutError,
   1344                 'Timed out waiting for unnamed condition'):
   1345             utils.poll_for_condition(fail_n_times(4), timeout=3,
   1346                                      sleep_interval=1)
   1347         self.assertEqual(3, self.time_mock.sleep.call_count)
   1348 
   1349 
   1350 class test_poll_for_condition_ex(TimeModuleMockTestCase):
   1351     """Test poll_for_condition_ex.
   1352     """
   1353 
   1354     def test_ok(self):
   1355         """Test polling condition that returns True.
   1356         """
   1357         self.assertTrue(utils.poll_for_condition_ex(lambda: True))
   1358 
   1359 
   1360     def test_ok_evaluated_as_true(self):
   1361         """Test polling condition which's return value is evaluated as True.
   1362         """
   1363         self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1))
   1364 
   1365         self.assertEqual('something',
   1366                          utils.poll_for_condition_ex(lambda: 'something'))
   1367 
   1368 
   1369     def test_fail(self):
   1370         """Test polling condition that returns False.
   1371 
   1372         Expect TimeoutError raised.
   1373         """
   1374         with self.assertRaisesRegexp(
   1375                 utils.TimeoutError,
   1376                 'Timed out waiting for unamed condition'):
   1377             utils.poll_for_condition_ex(lambda: False, timeout=3,
   1378                                         sleep_interval=1)
   1379         self.assertEqual(2, self.time_mock.sleep.call_count)
   1380 
   1381 
   1382     def test_fail_evaluated_as_false(self):
   1383         """Test polling condition which's return value is evaluated as False.
   1384 
   1385         Expect TimeoutError raised.
   1386         """
   1387         with self.assertRaisesRegexp(
   1388                 utils.TimeoutError,
   1389                 'Timed out waiting for unamed condition'):
   1390             utils.poll_for_condition_ex(lambda: 0, timeout=3,
   1391                                         sleep_interval=1)
   1392         self.assertEqual(2, self.time_mock.sleep.call_count)
   1393 
   1394         with self.assertRaisesRegexp(
   1395                 utils.TimeoutError,
   1396                 'Timed out waiting for unamed condition'):
   1397             utils.poll_for_condition_ex(lambda: None, timeout=3,
   1398                                         sleep_interval=1)
   1399 
   1400 
   1401     def test_desc_arg(self):
   1402         """Test polling condition always fails with desc.
   1403 
   1404         Expect TimeoutError with condition description embedded.
   1405         """
   1406         with self.assertRaisesRegexp(
   1407                 utils.TimeoutError,
   1408                 'Timed out waiting for always false condition'):
   1409             utils.poll_for_condition_ex(lambda: False,
   1410                                         desc='always false condition',
   1411                                         timeout=3, sleep_interval=1)
   1412         self.assertEqual(2, self.time_mock.sleep.call_count)
   1413 
   1414 
   1415     def test_exception(self):
   1416         """Test polling condition that raises.
   1417 
   1418         Expect TimeoutError with condition raised exception embedded.
   1419         """
   1420         with self.assertRaisesRegexp(
   1421                 utils.TimeoutError,
   1422                 "Reason: Exception\('always raise',\)"):
   1423             utils.poll_for_condition_ex(always_raise, timeout=3,
   1424                                         sleep_interval=1)
   1425         self.assertEqual(2, self.time_mock.sleep.call_count)
   1426 
   1427 
   1428     def test_ok_after_retry(self):
   1429         """Test polling a condition which is success after retry twice.
   1430         """
   1431         self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3,
   1432                                                     sleep_interval=1))
   1433 
   1434 
   1435     def test_cannot_wait(self):
   1436         """Test polling a condition which fails till timeout.
   1437         """
   1438         with self.assertRaisesRegexp(
   1439                 utils.TimeoutError,
   1440                 'condition evaluted as false'):
   1441             utils.poll_for_condition_ex(fail_n_times(3), timeout=3,
   1442                                         sleep_interval=1)
   1443         self.assertEqual(2, self.time_mock.sleep.call_count)
   1444 
   1445 
   1446 class test_timer(TimeModuleMockTestCase):
   1447     """Test Timer.
   1448     """
   1449 
   1450     def test_zero_timeout(self):
   1451         """Test Timer with zero timeout.
   1452 
   1453         Only the first timer.sleep(0) is True.
   1454         """
   1455         timer = utils.Timer(0)
   1456         self.assertTrue(timer.sleep(0))
   1457         self.assertFalse(timer.sleep(0))
   1458         self.time_mock.sleep.assert_not_called()
   1459 
   1460 
   1461     def test_sleep(self):
   1462         """Test Timer.sleep()
   1463         """
   1464         timeout = 3
   1465         sleep_interval = 2
   1466         timer = utils.Timer(timeout)
   1467 
   1468         # Kicks off timer.
   1469         self.assertTrue(timer.sleep(sleep_interval))
   1470         self.assertEqual(self.fake_time_begin + timeout, timer.deadline)
   1471         self.assertTrue(timer.sleep(sleep_interval))
   1472         # now: 12. 12 + 2 > 13, unable to sleep
   1473         self.assertFalse(timer.sleep(sleep_interval))
   1474 
   1475         self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)])
   1476 
   1477 
   1478 class test_timeout_error(unittest.TestCase):
   1479     """Test TimeoutError.
   1480 
   1481     Test TimeoutError with three invocations format.
   1482     """
   1483 
   1484     def test_no_args(self):
   1485         """Create TimeoutError without arguments.
   1486         """
   1487         e = utils.TimeoutError()
   1488         self.assertEqual('', str(e))
   1489         self.assertEqual('TimeoutError()', repr(e))
   1490 
   1491 
   1492     def test_with_message(self):
   1493         """Create TimeoutError with text message.
   1494         """
   1495         e = utils.TimeoutError(message='Waiting for condition')
   1496         self.assertEqual('Waiting for condition', str(e))
   1497         self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
   1498 
   1499         # Positional message argument for backward compatibility.
   1500         e = utils.TimeoutError('Waiting for condition')
   1501         self.assertEqual('Waiting for condition', str(e))
   1502         self.assertEqual("TimeoutError('Waiting for condition',)", repr(e))
   1503 
   1504 
   1505 
   1506     def test_with_reason(self):
   1507         """Create TimeoutError with reason only.
   1508         """
   1509         e = utils.TimeoutError(reason='illegal input')
   1510         self.assertEqual("Reason: 'illegal input'", str(e))
   1511         self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e))
   1512         self.assertEqual('illegal input', e.reason)
   1513 
   1514 
   1515     def test_with_message_reason(self):
   1516         """Create TimeoutError with text message and reason.
   1517         """
   1518         e = utils.TimeoutError(message='Waiting for condition',
   1519                                reason='illegal input')
   1520         self.assertEqual("Waiting for condition. Reason: 'illegal input'",
   1521                          str(e))
   1522         self.assertEqual('illegal input', e.reason)
   1523 
   1524         # Positional message argument for backward compatibility.
   1525         e = utils.TimeoutError('Waiting for condition', reason='illegal input')
   1526         self.assertEqual("Waiting for condition. Reason: 'illegal input'",
   1527                          str(e))
   1528         self.assertEqual('illegal input', e.reason)
   1529 
   1530 
   1531     def test_with_message_reason_object(self):
   1532         """Create TimeoutError with text message and reason as exception object.
   1533         """
   1534         e = utils.TimeoutError(message='Waiting for condition',
   1535                                reason=Exception('illegal input'))
   1536         self.assertEqual(
   1537             "Waiting for condition. Reason: Exception('illegal input',)",
   1538             str(e))
   1539         self.assertIsInstance(e.reason, Exception)
   1540         self.assertEqual('illegal input', e.reason.message)
   1541 
   1542         # Positional message argument for backward compatibility.
   1543         e = utils.TimeoutError('Waiting for condition',
   1544                                reason=Exception('illegal input'))
   1545         self.assertEqual(
   1546             "Waiting for condition. Reason: Exception('illegal input',)",
   1547             str(e))
   1548         self.assertIsInstance(e.reason, Exception)
   1549         self.assertEqual('illegal input', e.reason.message)
   1550 
   1551 
   1552 
   1553 if __name__ == "__main__":
   1554     unittest.main()
   1555