1 #!/usr/bin/python 2 3 # pylint: disable=missing-docstring 4 5 import StringIO 6 import errno 7 import itertools 8 import logging 9 import os 10 import select 11 import socket 12 import subprocess 13 import time 14 import unittest 15 import urllib2 16 17 import common 18 from autotest_lib.client.common_lib import autotemp 19 from autotest_lib.client.common_lib import utils 20 from autotest_lib.client.common_lib.test_utils import mock 21 22 # mock 1.0.0 (in site-packages/chromite/third_party/mock.py) 23 # which is an ancestor of Python's default library starting from Python 3.3. 24 # See https://docs.python.org/3/library/unittest.mock.html 25 import mock as pymock 26 27 metrics = utils.metrics_mock 28 29 30 class test_read_one_line(unittest.TestCase): 31 def setUp(self): 32 self.god = mock.mock_god(ut=self) 33 self.god.stub_function(utils, "open") 34 35 36 def tearDown(self): 37 self.god.unstub_all() 38 39 40 def test_ip_to_long(self): 41 self.assertEqual(utils.ip_to_long('0.0.0.0'), 0) 42 self.assertEqual(utils.ip_to_long('255.255.255.255'), 4294967295) 43 self.assertEqual(utils.ip_to_long('192.168.0.1'), 3232235521) 44 self.assertEqual(utils.ip_to_long('1.2.4.8'), 16909320) 45 46 47 def test_long_to_ip(self): 48 self.assertEqual(utils.long_to_ip(0), '0.0.0.0') 49 self.assertEqual(utils.long_to_ip(4294967295), '255.255.255.255') 50 self.assertEqual(utils.long_to_ip(3232235521), '192.168.0.1') 51 self.assertEqual(utils.long_to_ip(16909320), '1.2.4.8') 52 53 54 def test_create_subnet_mask(self): 55 self.assertEqual(utils.create_subnet_mask(0), 0) 56 self.assertEqual(utils.create_subnet_mask(32), 4294967295) 57 self.assertEqual(utils.create_subnet_mask(25), 4294967168) 58 59 60 def test_format_ip_with_mask(self): 61 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 0), 62 '0.0.0.0/0') 63 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 32), 64 '192.168.0.1/32') 65 self.assertEqual(utils.format_ip_with_mask('192.168.0.1', 26), 66 '192.168.0.0/26') 67 self.assertEqual(utils.format_ip_with_mask('192.168.0.255', 26), 68 '192.168.0.192/26') 69 70 71 def create_test_file(self, contents): 72 test_file = StringIO.StringIO(contents) 73 utils.open.expect_call("filename", "r").and_return(test_file) 74 75 76 def test_reads_one_line_file(self): 77 self.create_test_file("abc\n") 78 self.assertEqual("abc", utils.read_one_line("filename")) 79 self.god.check_playback() 80 81 82 def test_strips_read_lines(self): 83 self.create_test_file("abc \n") 84 self.assertEqual("abc ", utils.read_one_line("filename")) 85 self.god.check_playback() 86 87 88 def test_drops_extra_lines(self): 89 self.create_test_file("line 1\nline 2\nline 3\n") 90 self.assertEqual("line 1", utils.read_one_line("filename")) 91 self.god.check_playback() 92 93 94 def test_works_on_empty_file(self): 95 self.create_test_file("") 96 self.assertEqual("", utils.read_one_line("filename")) 97 self.god.check_playback() 98 99 100 def test_works_on_file_with_no_newlines(self): 101 self.create_test_file("line but no newline") 102 self.assertEqual("line but no newline", 103 utils.read_one_line("filename")) 104 self.god.check_playback() 105 106 107 def test_preserves_leading_whitespace(self): 108 self.create_test_file(" has leading whitespace") 109 self.assertEqual(" has leading whitespace", 110 utils.read_one_line("filename")) 111 112 113 class test_write_one_line(unittest.TestCase): 114 def setUp(self): 115 self.god = mock.mock_god(ut=self) 116 self.god.stub_function(utils, "open") 117 118 119 def tearDown(self): 120 self.god.unstub_all() 121 122 123 def get_write_one_line_output(self, content): 124 test_file = mock.SaveDataAfterCloseStringIO() 125 utils.open.expect_call("filename", "w").and_return(test_file) 126 utils.write_one_line("filename", content) 127 self.god.check_playback() 128 return test_file.final_data 129 130 131 def test_writes_one_line_file(self): 132 self.assertEqual("abc\n", self.get_write_one_line_output("abc")) 133 134 135 def test_preserves_existing_newline(self): 136 self.assertEqual("abc\n", self.get_write_one_line_output("abc\n")) 137 138 139 def test_preserves_leading_whitespace(self): 140 self.assertEqual(" abc\n", self.get_write_one_line_output(" abc")) 141 142 143 def test_preserves_trailing_whitespace(self): 144 self.assertEqual("abc \n", self.get_write_one_line_output("abc ")) 145 146 147 def test_handles_empty_input(self): 148 self.assertEqual("\n", self.get_write_one_line_output("")) 149 150 151 class test_open_write_close(unittest.TestCase): 152 def setUp(self): 153 self.god = mock.mock_god(ut=self) 154 self.god.stub_function(utils, "open") 155 156 157 def tearDown(self): 158 self.god.unstub_all() 159 160 161 def test_simple_functionality(self): 162 data = "\n\nwhee\n" 163 test_file = mock.SaveDataAfterCloseStringIO() 164 utils.open.expect_call("filename", "w").and_return(test_file) 165 utils.open_write_close("filename", data) 166 self.god.check_playback() 167 self.assertEqual(data, test_file.final_data) 168 169 170 class test_read_keyval(unittest.TestCase): 171 def setUp(self): 172 self.god = mock.mock_god(ut=self) 173 self.god.stub_function(utils, "open") 174 self.god.stub_function(os.path, "isdir") 175 self.god.stub_function(os.path, "exists") 176 177 178 def tearDown(self): 179 self.god.unstub_all() 180 181 182 def create_test_file(self, filename, contents): 183 test_file = StringIO.StringIO(contents) 184 os.path.exists.expect_call(filename).and_return(True) 185 utils.open.expect_call(filename).and_return(test_file) 186 187 188 def read_keyval(self, contents): 189 os.path.isdir.expect_call("file").and_return(False) 190 self.create_test_file("file", contents) 191 keyval = utils.read_keyval("file") 192 self.god.check_playback() 193 return keyval 194 195 196 def test_returns_empty_when_file_doesnt_exist(self): 197 os.path.isdir.expect_call("file").and_return(False) 198 os.path.exists.expect_call("file").and_return(False) 199 self.assertEqual({}, utils.read_keyval("file")) 200 self.god.check_playback() 201 202 203 def test_accesses_files_directly(self): 204 os.path.isdir.expect_call("file").and_return(False) 205 self.create_test_file("file", "") 206 utils.read_keyval("file") 207 self.god.check_playback() 208 209 210 def test_accesses_directories_through_keyval_file(self): 211 os.path.isdir.expect_call("dir").and_return(True) 212 self.create_test_file("dir/keyval", "") 213 utils.read_keyval("dir") 214 self.god.check_playback() 215 216 217 def test_values_are_rstripped(self): 218 keyval = self.read_keyval("a=b \n") 219 self.assertEquals(keyval, {"a": "b"}) 220 221 222 def test_comments_are_ignored(self): 223 keyval = self.read_keyval("a=b # a comment\n") 224 self.assertEquals(keyval, {"a": "b"}) 225 226 227 def test_integers_become_ints(self): 228 keyval = self.read_keyval("a=1\n") 229 self.assertEquals(keyval, {"a": 1}) 230 self.assertEquals(int, type(keyval["a"])) 231 232 233 def test_float_values_become_floats(self): 234 keyval = self.read_keyval("a=1.5\n") 235 self.assertEquals(keyval, {"a": 1.5}) 236 self.assertEquals(float, type(keyval["a"])) 237 238 239 def test_multiple_lines(self): 240 keyval = self.read_keyval("a=one\nb=two\n") 241 self.assertEquals(keyval, {"a": "one", "b": "two"}) 242 243 244 def test_the_last_duplicate_line_is_used(self): 245 keyval = self.read_keyval("a=one\nb=two\na=three\n") 246 self.assertEquals(keyval, {"a": "three", "b": "two"}) 247 248 249 def test_extra_equals_are_included_in_values(self): 250 keyval = self.read_keyval("a=b=c\n") 251 self.assertEquals(keyval, {"a": "b=c"}) 252 253 254 def test_non_alphanumeric_keynames_are_rejected(self): 255 self.assertRaises(ValueError, self.read_keyval, "a$=one\n") 256 257 258 def test_underscores_are_allowed_in_key_names(self): 259 keyval = self.read_keyval("a_b=value\n") 260 self.assertEquals(keyval, {"a_b": "value"}) 261 262 263 def test_dashes_are_allowed_in_key_names(self): 264 keyval = self.read_keyval("a-b=value\n") 265 self.assertEquals(keyval, {"a-b": "value"}) 266 267 def test_empty_value_is_allowed(self): 268 keyval = self.read_keyval("a=\n") 269 self.assertEquals(keyval, {"a": ""}) 270 271 272 class test_write_keyval(unittest.TestCase): 273 def setUp(self): 274 self.god = mock.mock_god(ut=self) 275 self.god.stub_function(utils, "open") 276 self.god.stub_function(os.path, "isdir") 277 278 279 def tearDown(self): 280 self.god.unstub_all() 281 282 283 def assertHasLines(self, value, lines): 284 vlines = value.splitlines() 285 vlines.sort() 286 self.assertEquals(vlines, sorted(lines)) 287 288 289 def write_keyval(self, filename, dictionary, expected_filename=None, 290 type_tag=None): 291 if expected_filename is None: 292 expected_filename = filename 293 test_file = StringIO.StringIO() 294 self.god.stub_function(test_file, "close") 295 utils.open.expect_call(expected_filename, "a").and_return(test_file) 296 test_file.close.expect_call() 297 if type_tag is None: 298 utils.write_keyval(filename, dictionary) 299 else: 300 utils.write_keyval(filename, dictionary, type_tag) 301 return test_file.getvalue() 302 303 304 def write_keyval_file(self, dictionary, type_tag=None): 305 os.path.isdir.expect_call("file").and_return(False) 306 return self.write_keyval("file", dictionary, type_tag=type_tag) 307 308 309 def test_accesses_files_directly(self): 310 os.path.isdir.expect_call("file").and_return(False) 311 result = self.write_keyval("file", {"a": "1"}) 312 self.assertEquals(result, "a=1\n") 313 314 315 def test_accesses_directories_through_keyval_file(self): 316 os.path.isdir.expect_call("dir").and_return(True) 317 result = self.write_keyval("dir", {"b": "2"}, "dir/keyval") 318 self.assertEquals(result, "b=2\n") 319 320 321 def test_numbers_are_stringified(self): 322 result = self.write_keyval_file({"c": 3}) 323 self.assertEquals(result, "c=3\n") 324 325 326 def test_type_tags_are_excluded_by_default(self): 327 result = self.write_keyval_file({"d": "a string"}) 328 self.assertEquals(result, "d=a string\n") 329 self.assertRaises(ValueError, self.write_keyval_file, 330 {"d{perf}": "a string"}) 331 332 333 def test_perf_tags_are_allowed(self): 334 result = self.write_keyval_file({"a{perf}": 1, "b{perf}": 2}, 335 type_tag="perf") 336 self.assertHasLines(result, ["a{perf}=1", "b{perf}=2"]) 337 self.assertRaises(ValueError, self.write_keyval_file, 338 {"a": 1, "b": 2}, type_tag="perf") 339 340 341 def test_non_alphanumeric_keynames_are_rejected(self): 342 self.assertRaises(ValueError, self.write_keyval_file, {"x$": 0}) 343 344 345 def test_underscores_are_allowed_in_key_names(self): 346 result = self.write_keyval_file({"a_b": "value"}) 347 self.assertEquals(result, "a_b=value\n") 348 349 350 def test_dashes_are_allowed_in_key_names(self): 351 result = self.write_keyval_file({"a-b": "value"}) 352 self.assertEquals(result, "a-b=value\n") 353 354 355 class test_is_url(unittest.TestCase): 356 def test_accepts_http(self): 357 self.assertTrue(utils.is_url("http://example.com")) 358 359 360 def test_accepts_ftp(self): 361 self.assertTrue(utils.is_url("ftp://ftp.example.com")) 362 363 364 def test_rejects_local_path(self): 365 self.assertFalse(utils.is_url("/home/username/file")) 366 367 368 def test_rejects_local_filename(self): 369 self.assertFalse(utils.is_url("filename")) 370 371 372 def test_rejects_relative_local_path(self): 373 self.assertFalse(utils.is_url("somedir/somesubdir/file")) 374 375 376 def test_rejects_local_path_containing_url(self): 377 self.assertFalse(utils.is_url("somedir/http://path/file")) 378 379 380 class test_urlopen(unittest.TestCase): 381 def setUp(self): 382 self.god = mock.mock_god(ut=self) 383 384 385 def tearDown(self): 386 self.god.unstub_all() 387 388 389 def stub_urlopen_with_timeout_comparison(self, test_func, expected_return, 390 *expected_args): 391 expected_args += (None,) * (2 - len(expected_args)) 392 def urlopen(url, data=None): 393 self.assertEquals(expected_args, (url,data)) 394 test_func(socket.getdefaulttimeout()) 395 return expected_return 396 self.god.stub_with(urllib2, "urlopen", urlopen) 397 398 399 def stub_urlopen_with_timeout_check(self, expected_timeout, 400 expected_return, *expected_args): 401 def test_func(timeout): 402 self.assertEquals(timeout, expected_timeout) 403 self.stub_urlopen_with_timeout_comparison(test_func, expected_return, 404 *expected_args) 405 406 407 def test_timeout_set_during_call(self): 408 self.stub_urlopen_with_timeout_check(30, "retval", "url") 409 retval = utils.urlopen("url", timeout=30) 410 self.assertEquals(retval, "retval") 411 412 413 def test_timeout_reset_after_call(self): 414 old_timeout = socket.getdefaulttimeout() 415 self.stub_urlopen_with_timeout_check(30, None, "url") 416 try: 417 socket.setdefaulttimeout(1234) 418 utils.urlopen("url", timeout=30) 419 self.assertEquals(1234, socket.getdefaulttimeout()) 420 finally: 421 socket.setdefaulttimeout(old_timeout) 422 423 424 def test_timeout_set_by_default(self): 425 def test_func(timeout): 426 self.assertTrue(timeout is not None) 427 self.stub_urlopen_with_timeout_comparison(test_func, None, "url") 428 utils.urlopen("url") 429 430 431 def test_args_are_untouched(self): 432 self.stub_urlopen_with_timeout_check(30, None, "http://url", 433 "POST data") 434 utils.urlopen("http://url", timeout=30, data="POST data") 435 436 437 class test_urlretrieve(unittest.TestCase): 438 def setUp(self): 439 self.god = mock.mock_god(ut=self) 440 441 442 def tearDown(self): 443 self.god.unstub_all() 444 445 446 def test_urlopen_passed_arguments(self): 447 self.god.stub_function(utils, "urlopen") 448 self.god.stub_function(utils.shutil, "copyfileobj") 449 self.god.stub_function(utils, "open") 450 451 url = "url" 452 dest = "somefile" 453 data = object() 454 timeout = 10 455 456 src_file = self.god.create_mock_class(file, "file") 457 dest_file = self.god.create_mock_class(file, "file") 458 459 (utils.urlopen.expect_call(url, data=data, timeout=timeout) 460 .and_return(src_file)) 461 utils.open.expect_call(dest, "wb").and_return(dest_file) 462 utils.shutil.copyfileobj.expect_call(src_file, dest_file) 463 dest_file.close.expect_call() 464 src_file.close.expect_call() 465 466 utils.urlretrieve(url, dest, data=data, timeout=timeout) 467 self.god.check_playback() 468 469 470 class test_merge_trees(unittest.TestCase): 471 # a some path-handling helper functions 472 def src(self, *path_segments): 473 return os.path.join(self.src_tree.name, *path_segments) 474 475 476 def dest(self, *path_segments): 477 return os.path.join(self.dest_tree.name, *path_segments) 478 479 480 def paths(self, *path_segments): 481 return self.src(*path_segments), self.dest(*path_segments) 482 483 484 def assertFileEqual(self, *path_segments): 485 src, dest = self.paths(*path_segments) 486 self.assertEqual(True, os.path.isfile(src)) 487 self.assertEqual(True, os.path.isfile(dest)) 488 self.assertEqual(os.path.getsize(src), os.path.getsize(dest)) 489 self.assertEqual(open(src).read(), open(dest).read()) 490 491 492 def assertFileContents(self, contents, *path_segments): 493 dest = self.dest(*path_segments) 494 self.assertEqual(True, os.path.isfile(dest)) 495 self.assertEqual(os.path.getsize(dest), len(contents)) 496 self.assertEqual(contents, open(dest).read()) 497 498 499 def setUp(self): 500 self.src_tree = autotemp.tempdir(unique_id='utilsrc') 501 self.dest_tree = autotemp.tempdir(unique_id='utilsdest') 502 503 # empty subdirs 504 os.mkdir(self.src("empty")) 505 os.mkdir(self.dest("empty")) 506 507 508 def tearDown(self): 509 self.src_tree.clean() 510 self.dest_tree.clean() 511 512 513 def test_both_dont_exist(self): 514 utils.merge_trees(*self.paths("empty")) 515 516 517 def test_file_only_at_src(self): 518 print >> open(self.src("src_only"), "w"), "line 1" 519 utils.merge_trees(*self.paths("src_only")) 520 self.assertFileEqual("src_only") 521 522 523 def test_file_only_at_dest(self): 524 print >> open(self.dest("dest_only"), "w"), "line 1" 525 utils.merge_trees(*self.paths("dest_only")) 526 self.assertEqual(False, os.path.exists(self.src("dest_only"))) 527 self.assertFileContents("line 1\n", "dest_only") 528 529 530 def test_file_at_both(self): 531 print >> open(self.dest("in_both"), "w"), "line 1" 532 print >> open(self.src("in_both"), "w"), "line 2" 533 utils.merge_trees(*self.paths("in_both")) 534 self.assertFileContents("line 1\nline 2\n", "in_both") 535 536 537 def test_directory_with_files_in_both(self): 538 print >> open(self.dest("in_both"), "w"), "line 1" 539 print >> open(self.src("in_both"), "w"), "line 3" 540 utils.merge_trees(*self.paths()) 541 self.assertFileContents("line 1\nline 3\n", "in_both") 542 543 544 def test_directory_with_mix_of_files(self): 545 print >> open(self.dest("in_dest"), "w"), "dest line" 546 print >> open(self.src("in_src"), "w"), "src line" 547 utils.merge_trees(*self.paths()) 548 self.assertFileContents("dest line\n", "in_dest") 549 self.assertFileContents("src line\n", "in_src") 550 551 552 def test_directory_with_subdirectories(self): 553 os.mkdir(self.src("src_subdir")) 554 print >> open(self.src("src_subdir", "subfile"), "w"), "subdir line" 555 os.mkdir(self.src("both_subdir")) 556 os.mkdir(self.dest("both_subdir")) 557 print >> open(self.src("both_subdir", "subfile"), "w"), "src line" 558 print >> open(self.dest("both_subdir", "subfile"), "w"), "dest line" 559 utils.merge_trees(*self.paths()) 560 self.assertFileContents("subdir line\n", "src_subdir", "subfile") 561 self.assertFileContents("dest line\nsrc line\n", "both_subdir", 562 "subfile") 563 564 565 class test_get_relative_path(unittest.TestCase): 566 def test_not_absolute(self): 567 self.assertRaises(AssertionError, utils.get_relative_path, "a", "b") 568 569 def test_same_dir(self): 570 self.assertEqual(utils.get_relative_path("/a/b/c", "/a/b"), "c") 571 572 def test_forward_dir(self): 573 self.assertEqual(utils.get_relative_path("/a/b/c/d", "/a/b"), "c/d") 574 575 def test_previous_dir(self): 576 self.assertEqual(utils.get_relative_path("/a/b", "/a/b/c/d"), "../..") 577 578 def test_parallel_dir(self): 579 self.assertEqual(utils.get_relative_path("/a/c/d", "/a/b/c/d"), 580 "../../../c/d") 581 582 583 class test_sh_escape(unittest.TestCase): 584 def _test_in_shell(self, text): 585 escaped_text = utils.sh_escape(text) 586 proc = subprocess.Popen('echo "%s"' % escaped_text, shell=True, 587 stdin=open(os.devnull, 'r'), 588 stdout=subprocess.PIPE, 589 stderr=open(os.devnull, 'w')) 590 stdout, _ = proc.communicate() 591 self.assertEqual(proc.returncode, 0) 592 self.assertEqual(stdout[:-1], text) 593 594 595 def test_normal_string(self): 596 self._test_in_shell('abcd') 597 598 599 def test_spaced_string(self): 600 self._test_in_shell('abcd efgh') 601 602 603 def test_dollar(self): 604 self._test_in_shell('$') 605 606 607 def test_single_quote(self): 608 self._test_in_shell('\'') 609 610 611 def test_single_quoted_string(self): 612 self._test_in_shell('\'efgh\'') 613 614 615 def test_string_with_single_quote(self): 616 self._test_in_shell("a'b") 617 618 619 def test_string_with_escaped_single_quote(self): 620 self._test_in_shell(r"a\'b") 621 622 623 def test_double_quote(self): 624 self._test_in_shell('"') 625 626 627 def test_double_quoted_string(self): 628 self._test_in_shell('"abcd"') 629 630 631 def test_backtick(self): 632 self._test_in_shell('`') 633 634 635 def test_backticked_string(self): 636 self._test_in_shell('`jklm`') 637 638 639 def test_backslash(self): 640 self._test_in_shell('\\') 641 642 643 def test_backslashed_special_characters(self): 644 self._test_in_shell('\\$') 645 self._test_in_shell('\\"') 646 self._test_in_shell('\\\'') 647 self._test_in_shell('\\`') 648 649 650 def test_backslash_codes(self): 651 self._test_in_shell('\\n') 652 self._test_in_shell('\\r') 653 self._test_in_shell('\\t') 654 self._test_in_shell('\\v') 655 self._test_in_shell('\\b') 656 self._test_in_shell('\\a') 657 self._test_in_shell('\\000') 658 659 def test_real_newline(self): 660 self._test_in_shell('\n') 661 self._test_in_shell('\\\n') 662 663 664 class test_sh_quote_word(test_sh_escape): 665 """Run tests on sh_quote_word. 666 667 Inherit from test_sh_escape to get the same tests to run on both. 668 """ 669 670 def _test_in_shell(self, text): 671 quoted_word = utils.sh_quote_word(text) 672 echoed_value = subprocess.check_output('echo %s' % quoted_word, 673 shell=True) 674 self.assertEqual(echoed_value, text + '\n') 675 676 677 class test_nested_sh_quote_word(test_sh_quote_word): 678 """Run nested tests on sh_quote_word. 679 680 Inherit from test_sh_quote_word to get the same tests to run on both. 681 """ 682 683 def _test_in_shell(self, text): 684 command = 'echo ' + utils.sh_quote_word(text) 685 nested_command = 'echo ' + utils.sh_quote_word(command) 686 produced_command = subprocess.check_output(nested_command, shell=True) 687 echoed_value = subprocess.check_output(produced_command, shell=True) 688 self.assertEqual(echoed_value, text + '\n') 689 690 691 class test_run(unittest.TestCase): 692 """ 693 Test the utils.run() function. 694 695 Note: This test runs simple external commands to test the utils.run() 696 API without assuming implementation details. 697 """ 698 699 # Log levels in ascending severity. 700 LOG_LEVELS = [logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, 701 logging.CRITICAL] 702 703 704 def setUp(self): 705 self.god = mock.mock_god(ut=self) 706 self.god.stub_function(utils.logging, 'warning') 707 self.god.stub_function(utils.logging, 'debug') 708 709 # Log level -> StringIO.StringIO. 710 self.logs = {} 711 for level in self.LOG_LEVELS: 712 self.logs[level] = StringIO.StringIO() 713 714 # Override logging_manager.LoggingFile to return buffers. 715 def logging_file(level=None, prefix=None): 716 return self.logs[level] 717 self.god.stub_with(utils.logging_manager, 'LoggingFile', logging_file) 718 719 def tearDown(self): 720 self.god.unstub_all() 721 722 723 def __check_result(self, result, command, exit_status=0, stdout='', 724 stderr=''): 725 self.assertEquals(result.command, command) 726 self.assertEquals(result.exit_status, exit_status) 727 self.assertEquals(result.stdout, stdout) 728 self.assertEquals(result.stderr, stderr) 729 730 731 def __get_logs(self): 732 """Returns contents of log buffers at all levels. 733 734 @return: 5-element list of strings corresponding to logged messages 735 at the levels in self.LOG_LEVELS. 736 """ 737 return [self.logs[v].getvalue() for v in self.LOG_LEVELS] 738 739 740 def test_default_simple(self): 741 cmd = 'echo "hello world"' 742 # expect some king of logging.debug() call but don't care about args 743 utils.logging.debug.expect_any_call() 744 self.__check_result(utils.run(cmd), cmd, stdout='hello world\n') 745 746 747 def test_default_failure(self): 748 cmd = 'exit 11' 749 try: 750 utils.run(cmd, verbose=False) 751 except utils.error.CmdError, err: 752 self.__check_result(err.result_obj, cmd, exit_status=11) 753 754 755 def test_ignore_status(self): 756 cmd = 'echo error >&2 && exit 11' 757 self.__check_result(utils.run(cmd, ignore_status=True, verbose=False), 758 cmd, exit_status=11, stderr='error\n') 759 760 761 def test_timeout(self): 762 # we expect a logging.warning() message, don't care about the contents 763 utils.logging.warning.expect_any_call() 764 try: 765 utils.run('echo -n output && sleep 10', timeout=1, verbose=False) 766 except utils.error.CmdError, err: 767 self.assertEquals(err.result_obj.stdout, 'output') 768 769 770 def test_stdout_stderr_tee(self): 771 cmd = 'echo output && echo error >&2' 772 stdout_tee = StringIO.StringIO() 773 stderr_tee = StringIO.StringIO() 774 775 self.__check_result(utils.run( 776 cmd, stdout_tee=stdout_tee, stderr_tee=stderr_tee, 777 verbose=False), cmd, stdout='output\n', stderr='error\n') 778 self.assertEqual(stdout_tee.getvalue(), 'output\n') 779 self.assertEqual(stderr_tee.getvalue(), 'error\n') 780 781 782 def test_stdin_string(self): 783 cmd = 'cat' 784 self.__check_result(utils.run(cmd, verbose=False, stdin='hi!\n'), 785 cmd, stdout='hi!\n') 786 787 788 def test_stdout_tee_to_logs_info(self): 789 """Test logging stdout at the info level.""" 790 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 791 stdout_level=logging.INFO, verbose=False) 792 self.assertEqual(self.__get_logs(), ['', 'output\n', '', '', '']) 793 794 795 def test_stdout_tee_to_logs_warning(self): 796 """Test logging stdout at the warning level.""" 797 utils.run('echo output', stdout_tee=utils.TEE_TO_LOGS, 798 stdout_level=logging.WARNING, verbose=False) 799 self.assertEqual(self.__get_logs(), ['', '', 'output\n', '', '']) 800 801 802 def test_stdout_and_stderr_tee_to_logs(self): 803 """Test simultaneous stdout and stderr log levels.""" 804 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 805 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 806 stderr_level=logging.ERROR, verbose=False) 807 self.assertEqual(self.__get_logs(), ['', 'output\n', '', 'error\n', '']) 808 809 810 def test_default_expected_stderr_log_level(self): 811 """Test default expected stderr log level. 812 813 stderr should be logged at the same level as stdout when 814 stderr_is_expected is true and stderr_level isn't passed. 815 """ 816 utils.run('echo output && echo error >&2', stdout_tee=utils.TEE_TO_LOGS, 817 stderr_tee=utils.TEE_TO_LOGS, stdout_level=logging.INFO, 818 stderr_is_expected=True, verbose=False) 819 self.assertEqual(self.__get_logs(), ['', 'output\nerror\n', '', '', '']) 820 821 822 def test_safe_args(self): 823 # NOTE: The string in expected_quoted_cmd depends on the internal 824 # implementation of shell quoting which is used by utils.run(), 825 # in this case, sh_quote_word(). 826 expected_quoted_cmd = "echo 'hello \"world' again" 827 self.__check_result(utils.run( 828 'echo', verbose=False, args=('hello "world', 'again')), 829 expected_quoted_cmd, stdout='hello "world again\n') 830 831 832 def test_safe_args_given_string(self): 833 self.assertRaises(TypeError, utils.run, 'echo', args='hello') 834 835 836 def test_wait_interrupt(self): 837 """Test that we actually select twice if the first one returns EINTR.""" 838 utils.logging.debug.expect_any_call() 839 840 bg_job = utils.BgJob('echo "hello world"') 841 bg_job.result.exit_status = 0 842 self.god.stub_function(utils.select, 'select') 843 844 utils.select.select.expect_any_call().and_raises( 845 select.error(errno.EINTR, 'Select interrupted')) 846 utils.logging.warning.expect_any_call() 847 848 utils.select.select.expect_any_call().and_return( 849 ([bg_job.sp.stdout, bg_job.sp.stderr], [], None)) 850 utils.logging.warning.expect_any_call() 851 852 self.assertFalse( 853 utils._wait_for_commands([bg_job], time.time(), None)) 854 855 856 class test_compare_versions(unittest.TestCase): 857 def test_zerofill(self): 858 self.assertEqual(utils.compare_versions('1.7', '1.10'), -1) 859 self.assertEqual(utils.compare_versions('1.222', '1.3'), 1) 860 self.assertEqual(utils.compare_versions('1.03', '1.3'), 0) 861 862 863 def test_unequal_len(self): 864 self.assertEqual(utils.compare_versions('1.3', '1.3.4'), -1) 865 self.assertEqual(utils.compare_versions('1.3.1', '1.3'), 1) 866 867 868 def test_dash_delimited(self): 869 self.assertEqual(utils.compare_versions('1-2-3', '1-5-1'), -1) 870 self.assertEqual(utils.compare_versions('1-2-1', '1-1-1'), 1) 871 self.assertEqual(utils.compare_versions('1-2-4', '1-2-4'), 0) 872 873 874 def test_alphabets(self): 875 self.assertEqual(utils.compare_versions('m.l.b', 'n.b.a'), -1) 876 self.assertEqual(utils.compare_versions('n.b.a', 'm.l.b'), 1) 877 self.assertEqual(utils.compare_versions('abc.e', 'abc.e'), 0) 878 879 880 def test_mix_symbols(self): 881 self.assertEqual(utils.compare_versions('k-320.1', 'k-320.3'), -1) 882 self.assertEqual(utils.compare_versions('k-231.5', 'k-231.1'), 1) 883 self.assertEqual(utils.compare_versions('k-231.1', 'k-231.1'), 0) 884 885 self.assertEqual(utils.compare_versions('k.320-1', 'k.320-3'), -1) 886 self.assertEqual(utils.compare_versions('k.231-5', 'k.231-1'), 1) 887 self.assertEqual(utils.compare_versions('k.231-1', 'k.231-1'), 0) 888 889 890 class test_args_to_dict(unittest.TestCase): 891 def test_no_args(self): 892 result = utils.args_to_dict([]) 893 self.assertEqual({}, result) 894 895 896 def test_matches(self): 897 result = utils.args_to_dict(['aBc:DeF', 'SyS=DEf', 'XY_Z:', 898 'F__o0O=', 'B8r:=:=', '_bAZ_=:=:']) 899 self.assertEqual(result, {'abc':'DeF', 'sys':'DEf', 'xy_z':'', 900 'f__o0o':'', 'b8r':'=:=', '_baz_':':=:'}) 901 902 903 def test_unmatches(self): 904 # Temporarily shut warning messages from args_to_dict() when an argument 905 # doesn't match its pattern. 906 logger = logging.getLogger() 907 saved_level = logger.level 908 logger.setLevel(logging.ERROR) 909 910 try: 911 result = utils.args_to_dict(['ab-c:DeF', '--SyS=DEf', 'a*=b', 'a*b', 912 ':VAL', '=VVV', 'WORD']) 913 self.assertEqual({}, result) 914 finally: 915 # Restore level. 916 logger.setLevel(saved_level) 917 918 919 class test_get_random_port(unittest.TestCase): 920 def do_bind(self, port, socket_type, socket_proto): 921 s = socket.socket(socket.AF_INET, socket_type, socket_proto) 922 s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 923 s.bind(('', port)) 924 return s 925 926 927 def test_get_port(self): 928 for _ in xrange(100): 929 p = utils.get_unused_port() 930 s = self.do_bind(p, socket.SOCK_STREAM, socket.IPPROTO_TCP) 931 self.assert_(s.getsockname()) 932 s = self.do_bind(p, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 933 self.assert_(s.getsockname()) 934 935 936 def test_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 937 """Test global function. 938 """ 939 940 941 class TestClass(object): 942 """Test class. 943 """ 944 945 def test_instance_function(self, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 946 """Test instance function. 947 """ 948 949 950 @classmethod 951 def test_class_function(cls, arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 952 """Test class function. 953 """ 954 955 956 @staticmethod 957 def test_static_function(arg1, arg2, arg3, arg4=4, arg5=5, arg6=6): 958 """Test static function. 959 """ 960 961 962 class GetFunctionArgUnittest(unittest.TestCase): 963 """Tests for method get_function_arg_value.""" 964 965 def run_test(self, func, insert_arg): 966 """Run test. 967 968 @param func: Function being called with given arguments. 969 @param insert_arg: Set to True to insert an object in the argument list. 970 This is to mock instance/class object. 971 """ 972 if insert_arg: 973 args = (None, 1, 2, 3) 974 else: 975 args = (1, 2, 3) 976 for i in range(1, 7): 977 self.assertEquals(utils.get_function_arg_value( 978 func, 'arg%d'%i, args, {}), i) 979 980 self.assertEquals(utils.get_function_arg_value( 981 func, 'arg7', args, {'arg7': 7}), 7) 982 self.assertRaises( 983 KeyError, utils.get_function_arg_value, 984 func, 'arg3', args[:-1], {}) 985 986 987 def test_global_function(self): 988 """Test global function. 989 """ 990 self.run_test(test_function, False) 991 992 993 def test_instance_function(self): 994 """Test instance function. 995 """ 996 self.run_test(TestClass().test_instance_function, True) 997 998 999 def test_class_function(self): 1000 """Test class function. 1001 """ 1002 self.run_test(TestClass.test_class_function, True) 1003 1004 1005 def test_static_function(self): 1006 """Test static function. 1007 """ 1008 self.run_test(TestClass.test_static_function, False) 1009 1010 1011 class IsInSameSubnetUnittest(unittest.TestCase): 1012 """Test is_in_same_subnet function.""" 1013 1014 def test_is_in_same_subnet(self): 1015 """Test is_in_same_subnet function.""" 1016 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1017 23)) 1018 self.assertFalse(utils.is_in_same_subnet('192.168.0.0', '192.168.1.2', 1019 24)) 1020 self.assertTrue(utils.is_in_same_subnet('192.168.0.0', '192.168.0.255', 1021 24)) 1022 self.assertFalse(utils.is_in_same_subnet('191.168.0.0', '192.168.0.0', 1023 24)) 1024 1025 1026 class GetWirelessSsidUnittest(unittest.TestCase): 1027 """Test get_wireless_ssid function.""" 1028 1029 DEFAULT_SSID = 'default' 1030 SSID_1 = 'ssid_1' 1031 SSID_2 = 'ssid_2' 1032 SSID_3 = 'ssid_3' 1033 1034 def test_get_wireless_ssid(self): 1035 """Test is_in_same_subnet function.""" 1036 god = mock.mock_god() 1037 god.stub_function_to_return(utils.CONFIG, 'get_config_value', 1038 self.DEFAULT_SSID) 1039 god.stub_function_to_return(utils.CONFIG, 'get_config_value_regex', 1040 {'wireless_ssid_1.2.3.4/24': self.SSID_1, 1041 'wireless_ssid_4.3.2.1/16': self.SSID_2, 1042 'wireless_ssid_4.3.2.111/32': self.SSID_3}) 1043 self.assertEqual(self.SSID_1, utils.get_wireless_ssid('1.2.3.100')) 1044 self.assertEqual(self.SSID_2, utils.get_wireless_ssid('4.3.2.100')) 1045 self.assertEqual(self.SSID_3, utils.get_wireless_ssid('4.3.2.111')) 1046 self.assertEqual(self.DEFAULT_SSID, 1047 utils.get_wireless_ssid('100.0.0.100')) 1048 1049 1050 class LaunchControlBuildParseUnittest(unittest.TestCase): 1051 """Test various parsing functions related to Launch Control builds and 1052 devices. 1053 """ 1054 1055 def test_parse_launch_control_target(self): 1056 """Test parse_launch_control_target function.""" 1057 target_tests = { 1058 ('shamu', 'userdebug'): 'shamu-userdebug', 1059 ('shamu', 'eng'): 'shamu-eng', 1060 ('shamu-board', 'eng'): 'shamu-board-eng', 1061 (None, None): 'bad_target', 1062 (None, None): 'target'} 1063 for result, target in target_tests.items(): 1064 self.assertEqual(result, utils.parse_launch_control_target(target)) 1065 1066 1067 class GetOffloaderUriTest(unittest.TestCase): 1068 """Test get_offload_gsuri function.""" 1069 _IMAGE_STORAGE_SERVER = 'gs://test_image_bucket' 1070 1071 def setUp(self): 1072 self.god = mock.mock_god() 1073 1074 def tearDown(self): 1075 self.god.unstub_all() 1076 1077 def test_get_default_lab_offload_gsuri(self): 1078 """Test default lab offload gsuri .""" 1079 self.god.mock_up(utils.CONFIG, 'CONFIG') 1080 self.god.stub_function_to_return(utils, 'is_moblab', False) 1081 self.assertEqual(utils.DEFAULT_OFFLOAD_GSURI, 1082 utils.get_offload_gsuri()) 1083 1084 self.god.check_playback() 1085 1086 def test_get_default_moblab_offload_gsuri(self): 1087 self.god.mock_up(utils.CONFIG, 'CONFIG') 1088 self.god.stub_function_to_return(utils, 'is_moblab', True) 1089 utils.CONFIG.get_config_value.expect_call( 1090 'CROS', 'image_storage_server').and_return( 1091 self._IMAGE_STORAGE_SERVER) 1092 self.god.stub_function_to_return(utils, 1093 'get_moblab_serial_number', 'test_serial_number') 1094 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1095 expected_gsuri = '%sresults/%s/%s/' % ( 1096 self._IMAGE_STORAGE_SERVER, 'test_serial_number', 'test_id') 1097 cached_gsuri = utils.DEFAULT_OFFLOAD_GSURI 1098 utils.DEFAULT_OFFLOAD_GSURI = None 1099 gsuri = utils.get_offload_gsuri() 1100 utils.DEFAULT_OFFLOAD_GSURI = cached_gsuri 1101 self.assertEqual(expected_gsuri, gsuri) 1102 1103 self.god.check_playback() 1104 1105 def test_get_moblab_offload_gsuri(self): 1106 """Test default lab offload gsuri .""" 1107 self.god.mock_up(utils.CONFIG, 'CONFIG') 1108 self.god.stub_function_to_return(utils, 'is_moblab', True) 1109 self.god.stub_function_to_return(utils, 1110 'get_moblab_serial_number', 'test_serial_number') 1111 self.god.stub_function_to_return(utils, 'get_moblab_id', 'test_id') 1112 gsuri = '%s%s/%s/' % ( 1113 utils.DEFAULT_OFFLOAD_GSURI, 'test_serial_number', 'test_id') 1114 self.assertEqual(gsuri, utils.get_offload_gsuri()) 1115 1116 self.god.check_playback() 1117 1118 1119 1120 class MockMetricsTest(unittest.TestCase): 1121 """Test metrics mock class can handle various metrics calls.""" 1122 1123 def test_Counter(self): 1124 """Test the mock class can create an instance and call any method. 1125 """ 1126 c = metrics.Counter('counter') 1127 c.increment(fields={'key': 1}) 1128 1129 1130 def test_Context(self): 1131 """Test the mock class can handle context class. 1132 """ 1133 test_value = None 1134 with metrics.SecondsTimer('context') as t: 1135 test_value = 'called_in_context' 1136 t['random_key'] = 'pass' 1137 self.assertEqual('called_in_context', test_value) 1138 1139 1140 def test_decorator(self): 1141 """Test the mock class can handle decorator. 1142 """ 1143 class TestClass(object): 1144 1145 def __init__(self): 1146 self.value = None 1147 1148 test_value = TestClass() 1149 test_value.value = None 1150 @metrics.SecondsTimerDecorator('decorator') 1151 def test(arg): 1152 arg.value = 'called_in_decorator' 1153 1154 test(test_value) 1155 self.assertEqual('called_in_decorator', test_value.value) 1156 1157 1158 def test_setitem(self): 1159 """Test the mock class can handle set item call. 1160 """ 1161 timer = metrics.SecondsTimer('name') 1162 timer['random_key'] = 'pass' 1163 1164 1165 class test_background_sample(unittest.TestCase): 1166 """Test that the background sample can sample as desired. 1167 """ 1168 1169 def test_can_sample(self): 1170 """Test that a simple sample will work with no other complications. 1171 """ 1172 should_be_sampled = 'name' 1173 1174 def sample_function(): 1175 """Return value of variable stored in method.""" 1176 return should_be_sampled 1177 still_sampling = True 1178 1179 t = utils.background_sample_until_condition( 1180 function=sample_function, 1181 condition=lambda: still_sampling, 1182 timeout=5, 1183 sleep_interval=0.1) 1184 result = t.finish() 1185 self.assertIn(should_be_sampled, result) 1186 1187 1188 def test_samples_multiple_values(self): 1189 """Test that a sample will work and actually samples at the necessary 1190 intervals, such that it will pick up changes. 1191 """ 1192 should_be_sampled = 'name' 1193 1194 def sample_function(): 1195 """Return value of variable stored in method.""" 1196 return should_be_sampled 1197 still_sampling = True 1198 1199 t = utils.background_sample_until_condition( 1200 function=sample_function, 1201 condition=lambda: still_sampling, 1202 timeout=5, 1203 sleep_interval=0.1) 1204 # Let it sample values some with the initial value. 1205 time.sleep(2.5) 1206 # It should also sample some with the new value. 1207 should_be_sampled = 'noname' 1208 result = t.finish() 1209 self.assertIn('name', result) 1210 self.assertIn('noname', result) 1211 1212 1213 class FakeTime(object): 1214 """Provides time() and sleep() for faking time module. 1215 """ 1216 1217 def __init__(self, start_time): 1218 self._time = start_time 1219 1220 1221 def time(self): 1222 return self._time 1223 1224 1225 def sleep(self, interval): 1226 self._time += interval 1227 1228 1229 class TimeModuleMockTestCase(unittest.TestCase): 1230 """Mocks up utils.time with a FakeTime. 1231 1232 It substitudes time.time() and time.sleep() with FakeTime.time() 1233 and FakeTime.sleep(), respectively. 1234 """ 1235 1236 def setUp(self): 1237 self.fake_time_begin = 10 1238 self.fake_time = FakeTime(self.fake_time_begin) 1239 self.patcher = pymock.patch( 1240 'autotest_lib.client.common_lib.utils.time') 1241 self.time_mock = self.patcher.start() 1242 self.addCleanup(self.patcher.stop) 1243 self.time_mock.time.side_effect = self.fake_time.time 1244 self.time_mock.sleep.side_effect = self.fake_time.sleep 1245 1246 1247 def always_raise(): 1248 """A function that raises an exception.""" 1249 raise Exception('always raise') 1250 1251 1252 def fail_n_times(count): 1253 """Creates a function that returns False for the first count-th calls. 1254 1255 @return a function returns False for the first count-th calls and True 1256 afterwards. 1257 """ 1258 counter = itertools.count(count, -1) 1259 return lambda: next(counter) <= 0 1260 1261 1262 class test_poll_for_condition(TimeModuleMockTestCase): 1263 """Test poll_for_condition. 1264 """ 1265 1266 def test_ok(self): 1267 """Test polling condition that returns True. 1268 """ 1269 self.assertTrue(utils.poll_for_condition(lambda: True)) 1270 1271 1272 def test_ok_evaluated_as_true(self): 1273 """Test polling condition which's return value is evaluated as True. 1274 """ 1275 self.assertEqual(1, utils.poll_for_condition(lambda: 1)) 1276 1277 self.assertEqual('something', 1278 utils.poll_for_condition(lambda: 'something')) 1279 1280 1281 def test_fail(self): 1282 """Test polling condition that returns False. 1283 1284 Expect TimeoutError exception as neither customized exception nor 1285 exception raised from condition(). 1286 """ 1287 with self.assertRaises(utils.TimeoutError): 1288 utils.poll_for_condition(lambda: False, timeout=3, sleep_interval=1) 1289 self.assertEqual(3, self.time_mock.sleep.call_count) 1290 1291 1292 def test_fail_evaluated_as_false(self): 1293 """Test polling condition which's return value is evaluated as False. 1294 1295 Expect TimeoutError exception as neither customized exception nor 1296 exception raised from condition(). 1297 """ 1298 with self.assertRaises(utils.TimeoutError): 1299 utils.poll_for_condition(lambda: 0, timeout=3, sleep_interval=1) 1300 self.assertEqual(3, self.time_mock.sleep.call_count) 1301 1302 with self.assertRaises(utils.TimeoutError): 1303 utils.poll_for_condition(lambda: None, timeout=3, sleep_interval=1) 1304 1305 1306 def test_exception_arg(self): 1307 """Test polling condition always fails. 1308 1309 Expect exception raised by 'exception' args. 1310 """ 1311 with self.assertRaisesRegexp(Exception, 'from args'): 1312 utils.poll_for_condition(lambda: False, 1313 exception=Exception('from args'), 1314 timeout=3, sleep_interval=1) 1315 self.assertEqual(3, self.time_mock.sleep.call_count) 1316 1317 1318 def test_exception_from_condition(self): 1319 """Test polling condition always fails. 1320 1321 Expect exception raised by condition(). 1322 """ 1323 with self.assertRaisesRegexp(Exception, 'always raise'): 1324 utils.poll_for_condition(always_raise, 1325 exception=Exception('from args'), 1326 timeout=3, sleep_interval=1) 1327 # For poll_for_condition, if condition() raises exception, it raises 1328 # immidiately without retry. So sleep() should not be called. 1329 self.time_mock.sleep.assert_not_called() 1330 1331 1332 def test_ok_after_retry(self): 1333 """Test polling a condition which is success after retry twice. 1334 """ 1335 self.assertTrue(utils.poll_for_condition(fail_n_times(2), timeout=3, 1336 sleep_interval=1)) 1337 1338 1339 def test_cannot_wait(self): 1340 """Test polling a condition which fails till timeout. 1341 """ 1342 with self.assertRaisesRegexp( 1343 utils.TimeoutError, 1344 'Timed out waiting for unnamed condition'): 1345 utils.poll_for_condition(fail_n_times(4), timeout=3, 1346 sleep_interval=1) 1347 self.assertEqual(3, self.time_mock.sleep.call_count) 1348 1349 1350 class test_poll_for_condition_ex(TimeModuleMockTestCase): 1351 """Test poll_for_condition_ex. 1352 """ 1353 1354 def test_ok(self): 1355 """Test polling condition that returns True. 1356 """ 1357 self.assertTrue(utils.poll_for_condition_ex(lambda: True)) 1358 1359 1360 def test_ok_evaluated_as_true(self): 1361 """Test polling condition which's return value is evaluated as True. 1362 """ 1363 self.assertEqual(1, utils.poll_for_condition_ex(lambda: 1)) 1364 1365 self.assertEqual('something', 1366 utils.poll_for_condition_ex(lambda: 'something')) 1367 1368 1369 def test_fail(self): 1370 """Test polling condition that returns False. 1371 1372 Expect TimeoutError raised. 1373 """ 1374 with self.assertRaisesRegexp( 1375 utils.TimeoutError, 1376 'Timed out waiting for unamed condition'): 1377 utils.poll_for_condition_ex(lambda: False, timeout=3, 1378 sleep_interval=1) 1379 self.assertEqual(2, self.time_mock.sleep.call_count) 1380 1381 1382 def test_fail_evaluated_as_false(self): 1383 """Test polling condition which's return value is evaluated as False. 1384 1385 Expect TimeoutError raised. 1386 """ 1387 with self.assertRaisesRegexp( 1388 utils.TimeoutError, 1389 'Timed out waiting for unamed condition'): 1390 utils.poll_for_condition_ex(lambda: 0, timeout=3, 1391 sleep_interval=1) 1392 self.assertEqual(2, self.time_mock.sleep.call_count) 1393 1394 with self.assertRaisesRegexp( 1395 utils.TimeoutError, 1396 'Timed out waiting for unamed condition'): 1397 utils.poll_for_condition_ex(lambda: None, timeout=3, 1398 sleep_interval=1) 1399 1400 1401 def test_desc_arg(self): 1402 """Test polling condition always fails with desc. 1403 1404 Expect TimeoutError with condition description embedded. 1405 """ 1406 with self.assertRaisesRegexp( 1407 utils.TimeoutError, 1408 'Timed out waiting for always false condition'): 1409 utils.poll_for_condition_ex(lambda: False, 1410 desc='always false condition', 1411 timeout=3, sleep_interval=1) 1412 self.assertEqual(2, self.time_mock.sleep.call_count) 1413 1414 1415 def test_exception(self): 1416 """Test polling condition that raises. 1417 1418 Expect TimeoutError with condition raised exception embedded. 1419 """ 1420 with self.assertRaisesRegexp( 1421 utils.TimeoutError, 1422 "Reason: Exception\('always raise',\)"): 1423 utils.poll_for_condition_ex(always_raise, timeout=3, 1424 sleep_interval=1) 1425 self.assertEqual(2, self.time_mock.sleep.call_count) 1426 1427 1428 def test_ok_after_retry(self): 1429 """Test polling a condition which is success after retry twice. 1430 """ 1431 self.assertTrue(utils.poll_for_condition_ex(fail_n_times(2), timeout=3, 1432 sleep_interval=1)) 1433 1434 1435 def test_cannot_wait(self): 1436 """Test polling a condition which fails till timeout. 1437 """ 1438 with self.assertRaisesRegexp( 1439 utils.TimeoutError, 1440 'condition evaluted as false'): 1441 utils.poll_for_condition_ex(fail_n_times(3), timeout=3, 1442 sleep_interval=1) 1443 self.assertEqual(2, self.time_mock.sleep.call_count) 1444 1445 1446 class test_timer(TimeModuleMockTestCase): 1447 """Test Timer. 1448 """ 1449 1450 def test_zero_timeout(self): 1451 """Test Timer with zero timeout. 1452 1453 Only the first timer.sleep(0) is True. 1454 """ 1455 timer = utils.Timer(0) 1456 self.assertTrue(timer.sleep(0)) 1457 self.assertFalse(timer.sleep(0)) 1458 self.time_mock.sleep.assert_not_called() 1459 1460 1461 def test_sleep(self): 1462 """Test Timer.sleep() 1463 """ 1464 timeout = 3 1465 sleep_interval = 2 1466 timer = utils.Timer(timeout) 1467 1468 # Kicks off timer. 1469 self.assertTrue(timer.sleep(sleep_interval)) 1470 self.assertEqual(self.fake_time_begin + timeout, timer.deadline) 1471 self.assertTrue(timer.sleep(sleep_interval)) 1472 # now: 12. 12 + 2 > 13, unable to sleep 1473 self.assertFalse(timer.sleep(sleep_interval)) 1474 1475 self.time_mock.sleep.assert_has_calls([pymock.call(sleep_interval)]) 1476 1477 1478 class test_timeout_error(unittest.TestCase): 1479 """Test TimeoutError. 1480 1481 Test TimeoutError with three invocations format. 1482 """ 1483 1484 def test_no_args(self): 1485 """Create TimeoutError without arguments. 1486 """ 1487 e = utils.TimeoutError() 1488 self.assertEqual('', str(e)) 1489 self.assertEqual('TimeoutError()', repr(e)) 1490 1491 1492 def test_with_message(self): 1493 """Create TimeoutError with text message. 1494 """ 1495 e = utils.TimeoutError(message='Waiting for condition') 1496 self.assertEqual('Waiting for condition', str(e)) 1497 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e)) 1498 1499 # Positional message argument for backward compatibility. 1500 e = utils.TimeoutError('Waiting for condition') 1501 self.assertEqual('Waiting for condition', str(e)) 1502 self.assertEqual("TimeoutError('Waiting for condition',)", repr(e)) 1503 1504 1505 1506 def test_with_reason(self): 1507 """Create TimeoutError with reason only. 1508 """ 1509 e = utils.TimeoutError(reason='illegal input') 1510 self.assertEqual("Reason: 'illegal input'", str(e)) 1511 self.assertEqual("TimeoutError(\"Reason: 'illegal input'\",)", repr(e)) 1512 self.assertEqual('illegal input', e.reason) 1513 1514 1515 def test_with_message_reason(self): 1516 """Create TimeoutError with text message and reason. 1517 """ 1518 e = utils.TimeoutError(message='Waiting for condition', 1519 reason='illegal input') 1520 self.assertEqual("Waiting for condition. Reason: 'illegal input'", 1521 str(e)) 1522 self.assertEqual('illegal input', e.reason) 1523 1524 # Positional message argument for backward compatibility. 1525 e = utils.TimeoutError('Waiting for condition', reason='illegal input') 1526 self.assertEqual("Waiting for condition. Reason: 'illegal input'", 1527 str(e)) 1528 self.assertEqual('illegal input', e.reason) 1529 1530 1531 def test_with_message_reason_object(self): 1532 """Create TimeoutError with text message and reason as exception object. 1533 """ 1534 e = utils.TimeoutError(message='Waiting for condition', 1535 reason=Exception('illegal input')) 1536 self.assertEqual( 1537 "Waiting for condition. Reason: Exception('illegal input',)", 1538 str(e)) 1539 self.assertIsInstance(e.reason, Exception) 1540 self.assertEqual('illegal input', e.reason.message) 1541 1542 # Positional message argument for backward compatibility. 1543 e = utils.TimeoutError('Waiting for condition', 1544 reason=Exception('illegal input')) 1545 self.assertEqual( 1546 "Waiting for condition. Reason: Exception('illegal input',)", 1547 str(e)) 1548 self.assertIsInstance(e.reason, Exception) 1549 self.assertEqual('illegal input', e.reason.message) 1550 1551 1552 1553 if __name__ == "__main__": 1554 unittest.main() 1555