Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import __builtin__
      7 import Queue
      8 import logging
      9 import os
     10 import shutil
     11 import signal
     12 import stat
     13 import sys
     14 import tempfile
     15 import time
     16 import unittest
     17 
     18 import mock
     19 import mox
     20 
     21 import common
     22 from autotest_lib.client.common_lib import global_config
     23 from autotest_lib.client.common_lib import utils
     24 #For unittest without cloud_client.proto compiled.
     25 try:
     26     from autotest_lib.site_utils import cloud_console_client
     27 except ImportError:
     28     cloud_console_client = None
     29 from autotest_lib.site_utils import gs_offloader
     30 from autotest_lib.site_utils import job_directories
     31 from autotest_lib.site_utils import job_directories_unittest as jd_test
     32 from autotest_lib.tko import models
     33 from autotest_lib.utils import gslib
     34 from autotest_lib.site_utils import pubsub_utils
     35 from chromite.lib import timeout_util
     36 
     37 # Test value to use for `days_old`, if nothing else is required.
     38 _TEST_EXPIRATION_AGE = 7
     39 
     40 
     41 def _get_options(argv):
     42     """Helper function to exercise command line parsing.
     43 
     44     @param argv Value of sys.argv to be parsed.
     45 
     46     """
     47     sys.argv = ['bogus.py'] + argv
     48     return gs_offloader.parse_options()
     49 
     50 
     51 def is_fifo(path):
     52   """Determines whether a path is a fifo.
     53 
     54   @param path: fifo path string.
     55   """
     56   return stat.S_ISFIFO(os.lstat(path).st_mode)
     57 
     58 
     59 class OffloaderOptionsTests(mox.MoxTestBase):
     60     """Tests for the `Offloader` constructor.
     61 
     62     Tests that offloader instance fields are set as expected
     63     for given command line options.
     64 
     65     """
     66 
     67     _REGULAR_ONLY = {job_directories.SwarmingJobDirectory,
     68                      job_directories.RegularJobDirectory}
     69     _SPECIAL_ONLY = {job_directories.SwarmingJobDirectory,
     70                      job_directories.SpecialJobDirectory}
     71     _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY
     72 
     73 
     74     def setUp(self):
     75         super(OffloaderOptionsTests, self).setUp()
     76         self.mox.StubOutWithMock(utils, 'get_offload_gsuri')
     77         gs_offloader.GS_OFFLOADING_ENABLED = True
     78         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
     79 
     80 
     81     def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
     82                                console_client=None, delete_age=0):
     83         """Mock the process of getting the offload_dir function."""
     84         if is_moblab:
     85             expected_gsuri = '%sresults/%s/%s/' % (
     86                     global_config.global_config.get_config_value(
     87                             'CROS', 'image_storage_server'),
     88                     'Fa:ke:ma:c0:12:34', 'rand0m-uu1d')
     89         else:
     90             expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
     91         utils.get_offload_gsuri().AndReturn(expected_gsuri)
     92         sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
     93             multiprocessing, delete_age, console_client)
     94         self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
     95         if cloud_console_client:
     96             self.mox.StubOutWithMock(cloud_console_client,
     97                     'is_cloud_notification_enabled')
     98         if console_client:
     99             cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
    100             gs_offloader.GSOffloader(
    101                     expected_gsuri, multiprocessing, delete_age,
    102                     mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
    103                         sub_offloader)
    104         else:
    105             if cloud_console_client:
    106                 cloud_console_client.is_cloud_notification_enabled().AndReturn(
    107                         False)
    108             gs_offloader.GSOffloader(
    109                 expected_gsuri, multiprocessing, delete_age, None).AndReturn(
    110                     sub_offloader)
    111         self.mox.ReplayAll()
    112         return sub_offloader
    113 
    114 
    115     def test_process_no_options(self):
    116         """Test default offloader options."""
    117         sub_offloader = self._mock_get_sub_offloader(False)
    118         offloader = gs_offloader.Offloader(_get_options([]))
    119         self.assertEqual(set(offloader._jobdir_classes),
    120                          self._REGULAR_ONLY)
    121         self.assertEqual(offloader._processes, 1)
    122         self.assertEqual(offloader._gs_offloader,
    123                          sub_offloader)
    124         self.assertEqual(offloader._upload_age_limit, 0)
    125         self.assertEqual(offloader._delete_age_limit, 0)
    126 
    127 
    128     def test_process_all_option(self):
    129         """Test offloader handling for the --all option."""
    130         sub_offloader = self._mock_get_sub_offloader(False)
    131         offloader = gs_offloader.Offloader(_get_options(['--all']))
    132         self.assertEqual(set(offloader._jobdir_classes), self._BOTH)
    133         self.assertEqual(offloader._processes, 1)
    134         self.assertEqual(offloader._gs_offloader,
    135                          sub_offloader)
    136         self.assertEqual(offloader._upload_age_limit, 0)
    137         self.assertEqual(offloader._delete_age_limit, 0)
    138 
    139 
    140     def test_process_hosts_option(self):
    141         """Test offloader handling for the --hosts option."""
    142         sub_offloader = self._mock_get_sub_offloader(False)
    143         offloader = gs_offloader.Offloader(
    144                 _get_options(['--hosts']))
    145         self.assertEqual(set(offloader._jobdir_classes),
    146                          self._SPECIAL_ONLY)
    147         self.assertEqual(offloader._processes, 1)
    148         self.assertEqual(offloader._gs_offloader,
    149                          sub_offloader)
    150         self.assertEqual(offloader._upload_age_limit, 0)
    151         self.assertEqual(offloader._delete_age_limit, 0)
    152 
    153 
    154     def test_parallelism_option(self):
    155         """Test offloader handling for the --parallelism option."""
    156         sub_offloader = self._mock_get_sub_offloader(False)
    157         offloader = gs_offloader.Offloader(
    158                 _get_options(['--parallelism', '2']))
    159         self.assertEqual(set(offloader._jobdir_classes),
    160                          self._REGULAR_ONLY)
    161         self.assertEqual(offloader._processes, 2)
    162         self.assertEqual(offloader._gs_offloader,
    163                          sub_offloader)
    164         self.assertEqual(offloader._upload_age_limit, 0)
    165         self.assertEqual(offloader._delete_age_limit, 0)
    166 
    167 
    168     def test_delete_only_option(self):
    169         """Test offloader handling for the --delete_only option."""
    170         offloader = gs_offloader.Offloader(
    171                 _get_options(['--delete_only']))
    172         self.assertEqual(set(offloader._jobdir_classes),
    173                          self._REGULAR_ONLY)
    174         self.assertEqual(offloader._processes, 1)
    175         self.assertIsInstance(offloader._gs_offloader,
    176                               gs_offloader.FakeGSOffloader)
    177         self.assertEqual(offloader._upload_age_limit, 0)
    178         self.assertEqual(offloader._delete_age_limit, 0)
    179 
    180 
    181     def test_days_old_option(self):
    182         """Test offloader handling for the --days_old option."""
    183         sub_offloader = self._mock_get_sub_offloader(False, delete_age=7)
    184         offloader = gs_offloader.Offloader(
    185                 _get_options(['--days_old', '7']))
    186         self.assertEqual(set(offloader._jobdir_classes),
    187                          self._REGULAR_ONLY)
    188         self.assertEqual(offloader._processes, 1)
    189         self.assertEqual(offloader._gs_offloader,
    190                          sub_offloader)
    191         self.assertEqual(offloader._upload_age_limit, 7)
    192         self.assertEqual(offloader._delete_age_limit, 7)
    193 
    194 
    195     def test_moblab_gsuri_generation(self):
    196         """Test offloader construction for Moblab."""
    197         sub_offloader = self._mock_get_sub_offloader(True)
    198         offloader = gs_offloader.Offloader(_get_options([]))
    199         self.assertEqual(set(offloader._jobdir_classes),
    200                          self._REGULAR_ONLY)
    201         self.assertEqual(offloader._processes, 1)
    202         self.assertEqual(offloader._gs_offloader,
    203                          sub_offloader)
    204         self.assertEqual(offloader._upload_age_limit, 0)
    205         self.assertEqual(offloader._delete_age_limit, 0)
    206 
    207 
    208     def test_globalconfig_offloading_flag(self):
    209         """Test enabling of --delete_only via global_config."""
    210         gs_offloader.GS_OFFLOADING_ENABLED = False
    211         offloader = gs_offloader.Offloader(
    212                 _get_options([]))
    213         self.assertIsInstance(offloader._gs_offloader,
    214                              gs_offloader.FakeGSOffloader)
    215 
    216     def test_offloader_multiprocessing_flag_set(self):
    217         """Test multiprocessing is set."""
    218         sub_offloader = self._mock_get_sub_offloader(True, True)
    219         offloader = gs_offloader.Offloader(_get_options(['-m']))
    220         self.assertEqual(offloader._gs_offloader,
    221                          sub_offloader)
    222         self.mox.VerifyAll()
    223 
    224     def test_offloader_multiprocessing_flag_not_set_default_false(self):
    225         """Test multiprocessing is set."""
    226         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
    227         sub_offloader = self._mock_get_sub_offloader(True, False)
    228         offloader = gs_offloader.Offloader(_get_options([]))
    229         self.assertEqual(offloader._gs_offloader,
    230                          sub_offloader)
    231         self.mox.VerifyAll()
    232 
    233     def test_offloader_multiprocessing_flag_not_set_default_true(self):
    234         """Test multiprocessing is set."""
    235         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
    236         sub_offloader = self._mock_get_sub_offloader(True, True)
    237         offloader = gs_offloader.Offloader(_get_options([]))
    238         self.assertEqual(offloader._gs_offloader,
    239                          sub_offloader)
    240         self.mox.VerifyAll()
    241 
    242 
    243     def test_offloader_pubsub_enabled(self):
    244         """Test multiprocessing is set."""
    245         if not cloud_console_client:
    246             return
    247         self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
    248         sub_offloader = self._mock_get_sub_offloader(True, False,
    249                 cloud_console_client.PubSubBasedClient())
    250         offloader = gs_offloader.Offloader(_get_options([]))
    251         self.assertEqual(offloader._gs_offloader,
    252                          sub_offloader)
    253         self.mox.VerifyAll()
    254 
    255 
    256 class _MockJobDirectory(job_directories._JobDirectory):
    257     """Subclass of `_JobDirectory` used as a helper for tests."""
    258 
    259     GLOB_PATTERN = '[0-9]*-*'
    260 
    261 
    262     def __init__(self, resultsdir):
    263         """Create new job in initial state."""
    264         super(_MockJobDirectory, self).__init__(resultsdir)
    265         self._timestamp = None
    266         self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp]
    267 
    268 
    269     def get_timestamp_if_finished(self):
    270         return self._timestamp
    271 
    272 
    273     def set_finished(self, days_old):
    274         """Make this job appear to be finished.
    275 
    276         After calling this function, calls to `enqueue_offload()`
    277         will find this job as finished, but not expired and ready
    278         for offload.  Note that when `days_old` is 0,
    279         `enqueue_offload()` will treat a finished job as eligible
    280         for offload.
    281 
    282         @param days_old The value of the `days_old` parameter that
    283                         will be passed to `enqueue_offload()` for
    284                         testing.
    285 
    286         """
    287         self._timestamp = jd_test.make_timestamp(days_old, False)
    288         self.queue_args[2] = self._timestamp
    289 
    290 
    291     def set_expired(self, days_old):
    292         """Make this job eligible to be offloaded.
    293 
    294         After calling this function, calls to `offload` will attempt
    295         to offload this job.
    296 
    297         @param days_old The value of the `days_old` parameter that
    298                         will be passed to `enqueue_offload()` for
    299                         testing.
    300 
    301         """
    302         self._timestamp = jd_test.make_timestamp(days_old, True)
    303         self.queue_args[2] = self._timestamp
    304 
    305 
    306     def set_incomplete(self):
    307         """Make this job appear to have failed offload just once."""
    308         self.offload_count += 1
    309         self.first_offload_start = time.time()
    310         if not os.path.isdir(self.dirname):
    311             os.mkdir(self.dirname)
    312 
    313 
    314     def set_reportable(self):
    315         """Make this job be reportable."""
    316         self.set_incomplete()
    317         self.offload_count += 1
    318 
    319 
    320     def set_complete(self):
    321         """Make this job be completed."""
    322         self.offload_count += 1
    323         if os.path.isdir(self.dirname):
    324             os.rmdir(self.dirname)
    325 
    326 
    327     def process_gs_instructions(self):
    328         """Always still offload the job directory."""
    329         return True
    330 
    331 
    332 class CommandListTests(unittest.TestCase):
    333     """Tests for `_get_cmd_list()`."""
    334 
    335     def _command_list_assertions(self, job, use_rsync=True, multi=False):
    336         """Call `_get_cmd_list()` and check the return value.
    337 
    338         Check the following assertions:
    339           * The command name (argv[0]) is 'gsutil'.
    340           * '-m' option (argv[1]) is on when the argument, multi, is True.
    341           * The arguments contain the 'cp' subcommand.
    342           * The next-to-last argument (the source directory) is the
    343             job's `queue_args[0]`.
    344           * The last argument (the destination URL) is the job's
    345             'queue_args[1]'.
    346 
    347         @param job A job with properly calculated arguments to
    348                    `_get_cmd_list()`
    349         @param use_rsync True when using 'rsync'. False when using 'cp'.
    350         @param multi True when using '-m' option for gsutil.
    351 
    352         """
    353         test_bucket_uri = 'gs://a-test-bucket'
    354 
    355         gs_offloader.USE_RSYNC_ENABLED = use_rsync
    356 
    357         command = gs_offloader._get_cmd_list(
    358                 multi, job.queue_args[0],
    359                 os.path.join(test_bucket_uri, job.queue_args[1]))
    360 
    361         self.assertEqual(command[0], 'gsutil')
    362         if multi:
    363             self.assertEqual(command[1], '-m')
    364         self.assertEqual(command[-2], job.queue_args[0])
    365 
    366         if use_rsync:
    367             self.assertTrue('rsync' in command)
    368             self.assertEqual(command[-1],
    369                              os.path.join(test_bucket_uri, job.queue_args[0]))
    370         else:
    371             self.assertTrue('cp' in command)
    372             self.assertEqual(command[-1],
    373                              os.path.join(test_bucket_uri, job.queue_args[1]))
    374 
    375 
    376     def test__get_cmd_list_regular(self):
    377         """Test `_get_cmd_list()` as for a regular job."""
    378         job = _MockJobDirectory('118-debug')
    379         self._command_list_assertions(job)
    380 
    381 
    382     def test__get_cmd_list_special(self):
    383         """Test `_get_cmd_list()` as for a special job."""
    384         job = _MockJobDirectory('hosts/host1/118-reset')
    385         self._command_list_assertions(job)
    386 
    387 
    388     def test_get_cmd_list_regular_no_rsync(self):
    389         """Test `_get_cmd_list()` as for a regular job."""
    390         job = _MockJobDirectory('118-debug')
    391         self._command_list_assertions(job, use_rsync=False)
    392 
    393 
    394     def test_get_cmd_list_special_no_rsync(self):
    395         """Test `_get_cmd_list()` as for a special job."""
    396         job = _MockJobDirectory('hosts/host1/118-reset')
    397         self._command_list_assertions(job, use_rsync=False)
    398 
    399 
    400     def test_get_cmd_list_regular_multi(self):
    401         """Test `_get_cmd_list()` as for a regular job with True multi."""
    402         job = _MockJobDirectory('118-debug')
    403         self._command_list_assertions(job, multi=True)
    404 
    405 
    406     def test__get_cmd_list_special_multi(self):
    407         """Test `_get_cmd_list()` as for a special job with True multi."""
    408         job = _MockJobDirectory('hosts/host1/118-reset')
    409         self._command_list_assertions(job, multi=True)
    410 
    411 
    412 class _TempResultsDirTestCase(unittest.TestCase):
    413     """Mixin class for tests using a temporary results directory."""
    414 
    415     REGULAR_JOBLIST = [
    416         '111-fubar', '112-fubar', '113-fubar', '114-snafu']
    417     HOST_LIST = ['host1', 'host2', 'host3']
    418     SPECIAL_JOBLIST = [
    419         'hosts/host1/333-reset', 'hosts/host1/334-reset',
    420         'hosts/host2/444-reset', 'hosts/host3/555-reset']
    421 
    422 
    423     def setUp(self):
    424         super(_TempResultsDirTestCase, self).setUp()
    425         self._resultsroot = tempfile.mkdtemp()
    426         self._cwd = os.getcwd()
    427         os.chdir(self._resultsroot)
    428 
    429 
    430     def tearDown(self):
    431         os.chdir(self._cwd)
    432         shutil.rmtree(self._resultsroot)
    433         super(_TempResultsDirTestCase, self).tearDown()
    434 
    435 
    436     def make_job(self, jobdir):
    437         """Create a job with results in `self._resultsroot`.
    438 
    439         @param jobdir Name of the subdirectory to be created in
    440                       `self._resultsroot`.
    441 
    442         """
    443         os.mkdir(jobdir)
    444         return _MockJobDirectory(jobdir)
    445 
    446 
    447     def make_job_hierarchy(self):
    448         """Create a sample hierarchy of job directories.
    449 
    450         `self.REGULAR_JOBLIST` is a list of directories for regular
    451         jobs to be created; `self.SPECIAL_JOBLIST` is a list of
    452         directories for special jobs to be created.
    453 
    454         """
    455         for d in self.REGULAR_JOBLIST:
    456             os.mkdir(d)
    457         hostsdir = 'hosts'
    458         os.mkdir(hostsdir)
    459         for host in self.HOST_LIST:
    460             os.mkdir(os.path.join(hostsdir, host))
    461         for d in self.SPECIAL_JOBLIST:
    462             os.mkdir(d)
    463 
    464 
    465 class _TempResultsDirTestBase(_TempResultsDirTestCase, mox.MoxTestBase):
    466     """Base Mox test class for tests using a temporary results directory."""
    467 
    468 
    469 class FailedOffloadsLogTest(_TempResultsDirTestBase):
    470     """Test the formatting of failed offloads log file."""
    471     # Below is partial sample of a failed offload log file.  This text is
    472     # deliberately hard-coded and then parsed to create the test data; the idea
    473     # is to make sure the actual text format will be reviewed by a human being.
    474     #
    475     # first offload      count  directory
    476     # --+----1----+----  ----+ ----+----1----+----2----+----3
    477     _SAMPLE_DIRECTORIES_REPORT = '''\
    478     =================== ======  ==============================
    479     2014-03-14 15:09:26      1  118-fubar
    480     2014-03-14 15:19:23      2  117-fubar
    481     2014-03-14 15:29:20      6  116-fubar
    482     2014-03-14 15:39:17     24  115-fubar
    483     2014-03-14 15:49:14    120  114-fubar
    484     2014-03-14 15:59:11    720  113-fubar
    485     2014-03-14 16:09:08   5040  112-fubar
    486     2014-03-14 16:19:05  40320  111-fubar
    487     '''
    488 
    489     def setUp(self):
    490         super(FailedOffloadsLogTest, self).setUp()
    491         self._offloader = gs_offloader.Offloader(_get_options([]))
    492         self._joblist = []
    493         for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
    494             date_, time_, count, dir_ = line.split()
    495             job = _MockJobDirectory(dir_)
    496             job.offload_count = int(count)
    497             timestruct = time.strptime("%s %s" % (date_, time_),
    498                                        gs_offloader.FAILED_OFFLOADS_TIME_FORMAT)
    499             job.first_offload_start = time.mktime(timestruct)
    500             # enter the jobs in reverse order, to make sure we
    501             # test that the output will be sorted.
    502             self._joblist.insert(0, job)
    503 
    504 
    505     def assert_report_well_formatted(self, report_file):
    506         """Assert that report file is well formatted.
    507 
    508         @param report_file: Path to report file
    509         """
    510         with open(report_file, 'r') as f:
    511             report_lines = f.read().split()
    512 
    513         for end_of_header_index in range(len(report_lines)):
    514             if report_lines[end_of_header_index].startswith('=='):
    515                 break
    516         self.assertLess(end_of_header_index, len(report_lines),
    517                         'Failed to find end-of-header marker in the report')
    518 
    519         relevant_lines = report_lines[end_of_header_index:]
    520         expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split()
    521         self.assertListEqual(relevant_lines, expected_lines)
    522 
    523 
    524     def test_failed_offload_log_format(self):
    525         """Trigger an e-mail report and check its contents."""
    526         log_file = os.path.join(self._resultsroot, 'failed_log')
    527         report = self._offloader._log_failed_jobs_locally(self._joblist,
    528                                                           log_file=log_file)
    529         self.assert_report_well_formatted(log_file)
    530 
    531 
    532     def test_failed_offload_file_overwrite(self):
    533         """Verify that we can saefly overwrite the log file."""
    534         log_file = os.path.join(self._resultsroot, 'failed_log')
    535         with open(log_file, 'w') as f:
    536             f.write('boohoohoo')
    537         report = self._offloader._log_failed_jobs_locally(self._joblist,
    538                                                           log_file=log_file)
    539         self.assert_report_well_formatted(log_file)
    540 
    541 
    542 class OffloadDirectoryTests(_TempResultsDirTestBase):
    543     """Tests for `offload_dir()`."""
    544 
    545     def setUp(self):
    546         super(OffloadDirectoryTests, self).setUp()
    547         # offload_dir() logs messages; silence them.
    548         self._saved_loglevel = logging.getLogger().getEffectiveLevel()
    549         logging.getLogger().setLevel(logging.CRITICAL+1)
    550         self._job = self.make_job(self.REGULAR_JOBLIST[0])
    551         self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list')
    552         alarm = mock.patch('signal.alarm', return_value=0)
    553         alarm.start()
    554         self.addCleanup(alarm.stop)
    555         self.mox.StubOutWithMock(models.test, 'parse_job_keyval')
    556 
    557 
    558     def tearDown(self):
    559         logging.getLogger().setLevel(self._saved_loglevel)
    560         super(OffloadDirectoryTests, self).tearDown()
    561 
    562     def _mock__upload_cts_testresult(self):
    563         self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult')
    564         gs_offloader._upload_cts_testresult(
    565                 mox.IgnoreArg(),mox.IgnoreArg()).AndReturn(None)
    566 
    567     def _mock_create_marker_file(self):
    568         self.mox.StubOutWithMock(__builtin__, 'open')
    569         open(mox.IgnoreArg(), 'a').AndReturn(mock.MagicMock())
    570 
    571 
    572     def _mock_offload_dir_calls(self, command, queue_args,
    573                                 marker_initially_exists=False):
    574         """Mock out the calls needed by `offload_dir()`.
    575 
    576         This covers only the calls made when there is no timeout.
    577 
    578         @param command Command list to be returned by the mocked
    579                        call to `_get_cmd_list()`.
    580 
    581         """
    582         self.mox.StubOutWithMock(os.path, 'isfile')
    583         os.path.isfile(mox.IgnoreArg()).AndReturn(marker_initially_exists)
    584         command.append(queue_args[0])
    585         gs_offloader._get_cmd_list(
    586                 False, queue_args[0],
    587                 '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
    588                           queue_args[1])).AndReturn(command)
    589         self._mock__upload_cts_testresult()
    590 
    591 
    592     def _run_offload_dir(self, should_succeed, delete_age):
    593         """Make one call to `offload_dir()`.
    594 
    595         The caller ensures all mocks are set up already.
    596 
    597         @param should_succeed True iff the call to `offload_dir()`
    598                               is expected to succeed and remove the
    599                               offloaded job directory.
    600 
    601         """
    602         self.mox.ReplayAll()
    603         gs_offloader.GSOffloader(
    604                 utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload(
    605                         self._job.queue_args[0],
    606                         self._job.queue_args[1],
    607                         self._job.queue_args[2])
    608         self.mox.VerifyAll()
    609         self.assertEqual(not should_succeed,
    610                          os.path.isdir(self._job.queue_args[0]))
    611 
    612 
    613     def test_offload_success(self):
    614         """Test that `offload_dir()` can succeed correctly."""
    615         self._mock_offload_dir_calls(['test', '-d'],
    616                                      self._job.queue_args)
    617         os.path.isfile(mox.IgnoreArg()).AndReturn(True)
    618         self._mock_create_marker_file()
    619         self._run_offload_dir(True, 0)
    620 
    621 
    622     def test_offload_failure(self):
    623         """Test that `offload_dir()` can fail correctly."""
    624         self._mock_offload_dir_calls(['test', '!', '-d'],
    625                                      self._job.queue_args)
    626         self._run_offload_dir(False, 0)
    627 
    628 
    629     def test_sanitize_dir(self):
    630         """Test that folder/file name with invalid character can be corrected.
    631         """
    632         results_folder = tempfile.mkdtemp()
    633         invalid_chars = '_'.join(['[', ']', '*', '?', '#'])
    634         invalid_files = []
    635         invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars
    636         invalid_folder = os.path.join(
    637                 results_folder,
    638                 invalid_folder_name)
    639         invalid_files.append(os.path.join(
    640                 invalid_folder,
    641                 'invalid_name_file_%s' % invalid_chars))
    642         good_folder =  os.path.join(results_folder, 'valid_name_folder')
    643         good_file = os.path.join(good_folder, 'valid_name_file')
    644         for folder in [invalid_folder, good_folder]:
    645             os.makedirs(folder)
    646         for f in invalid_files + [good_file]:
    647             with open(f, 'w'):
    648                 pass
    649         # check that broken symlinks don't break sanitization
    650         symlink = os.path.join(invalid_folder, 'broken-link')
    651         os.symlink(os.path.join(results_folder, 'no-such-file'),
    652                    symlink)
    653         fifo1 = os.path.join(results_folder, 'test_fifo1')
    654         fifo2 = os.path.join(good_folder, 'test_fifo2')
    655         fifo3 = os.path.join(invalid_folder, 'test_fifo3')
    656         invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars
    657         fifo4 = os.path.join(invalid_folder, invalid_fifo4_name)
    658         os.mkfifo(fifo1)
    659         os.mkfifo(fifo2)
    660         os.mkfifo(fifo3)
    661         os.mkfifo(fifo4)
    662         gs_offloader.sanitize_dir(results_folder)
    663         for _, dirs, files in os.walk(results_folder):
    664             for name in dirs + files:
    665                 self.assertEqual(name, gslib.escape(name))
    666                 for c in name:
    667                     self.assertFalse(c in ['[', ']', '*', '?', '#'])
    668         self.assertTrue(os.path.exists(good_file))
    669 
    670         self.assertTrue(os.path.exists(fifo1))
    671         self.assertFalse(is_fifo(fifo1))
    672         self.assertTrue(os.path.exists(fifo2))
    673         self.assertFalse(is_fifo(fifo2))
    674         corrected_folder = os.path.join(
    675                 results_folder, gslib.escape(invalid_folder_name))
    676         corrected_fifo3 = os.path.join(
    677                 corrected_folder,
    678                 'test_fifo3')
    679         self.assertFalse(os.path.exists(fifo3))
    680         self.assertTrue(os.path.exists(corrected_fifo3))
    681         self.assertFalse(is_fifo(corrected_fifo3))
    682         corrected_fifo4 = os.path.join(
    683                 corrected_folder, gslib.escape(invalid_fifo4_name))
    684         self.assertFalse(os.path.exists(fifo4))
    685         self.assertTrue(os.path.exists(corrected_fifo4))
    686         self.assertFalse(is_fifo(corrected_fifo4))
    687 
    688         corrected_symlink = os.path.join(
    689                 corrected_folder,
    690                 'broken-link')
    691         self.assertFalse(os.path.lexists(symlink))
    692         self.assertTrue(os.path.exists(corrected_symlink))
    693         self.assertFalse(os.path.islink(corrected_symlink))
    694         shutil.rmtree(results_folder)
    695 
    696 
    697     def check_limit_file_count(self, is_test_job=True):
    698         """Test that folder with too many files can be compressed.
    699 
    700         @param is_test_job: True to check the method with test job result
    701                             folder. Set to False for special task folder.
    702         """
    703         results_folder = tempfile.mkdtemp()
    704         host_folder = os.path.join(
    705                 results_folder,
    706                 'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair')
    707         debug_folder = os.path.join(host_folder, 'debug')
    708         sysinfo_folder = os.path.join(host_folder, 'sysinfo')
    709         for folder in [debug_folder, sysinfo_folder]:
    710             os.makedirs(folder)
    711             for i in range(10):
    712                 with open(os.path.join(folder, str(i)), 'w') as f:
    713                     f.write('test')
    714 
    715         gs_offloader._MAX_FILE_COUNT = 100
    716         gs_offloader.limit_file_count(
    717                 results_folder if is_test_job else host_folder)
    718         self.assertTrue(os.path.exists(sysinfo_folder))
    719 
    720         gs_offloader._MAX_FILE_COUNT = 10
    721         gs_offloader.limit_file_count(
    722                 results_folder if is_test_job else host_folder)
    723         self.assertFalse(os.path.exists(sysinfo_folder))
    724         self.assertTrue(os.path.exists(sysinfo_folder + '.tgz'))
    725         self.assertTrue(os.path.exists(debug_folder))
    726 
    727         shutil.rmtree(results_folder)
    728 
    729 
    730     def test_limit_file_count(self):
    731         """Test that folder with too many files can be compressed.
    732         """
    733         self.check_limit_file_count(is_test_job=True)
    734         self.check_limit_file_count(is_test_job=False)
    735 
    736 
    737     def test_is_valid_result(self):
    738         """Test _is_valid_result."""
    739         release_build = 'veyron_minnie-cheets-release/R52-8248.0.0'
    740         pfq_build = 'cyan-cheets-android-pfq/R54-8623.0.0-rc1'
    741         trybot_build = 'trybot-samus-release/R54-8640.0.0-b5092'
    742         trybot_2_build = 'trybot-samus-pfq/R54-8640.0.0-b5092'
    743         release_2_build = 'test-trybot-release/R54-8640.0.0-b5092'
    744         self.assertTrue(gs_offloader._is_valid_result(
    745             release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    746         self.assertTrue(gs_offloader._is_valid_result(
    747             release_build, gs_offloader.CTS_RESULT_PATTERN, 'test_that_wrapper'))
    748         self.assertTrue(gs_offloader._is_valid_result(
    749             release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-arc'))
    750         self.assertFalse(gs_offloader._is_valid_result(
    751             release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-cq'))
    752         self.assertTrue(gs_offloader._is_valid_result(
    753             release_build, gs_offloader.CTS_V2_RESULT_PATTERN, 'arc-gts'))
    754         self.assertFalse(gs_offloader._is_valid_result(
    755             None, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    756         self.assertFalse(gs_offloader._is_valid_result(
    757             release_build, gs_offloader.CTS_RESULT_PATTERN, None))
    758         self.assertFalse(gs_offloader._is_valid_result(
    759             pfq_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    760         self.assertFalse(gs_offloader._is_valid_result(
    761             trybot_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    762         self.assertFalse(gs_offloader._is_valid_result(
    763             trybot_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    764         self.assertTrue(gs_offloader._is_valid_result(
    765             release_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    766 
    767 
    768     def create_results_folder(self):
    769         """Create CTS/GTS results folders."""
    770         results_folder = tempfile.mkdtemp()
    771         host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22')
    772         debug_folder = os.path.join(host_folder, 'debug')
    773         sysinfo_folder = os.path.join(host_folder, 'sysinfo')
    774         cts_result_folder = os.path.join(
    775                 host_folder, 'cheets_CTS.android.dpi', 'results', 'cts-results')
    776         cts_v2_result_folder = os.path.join(host_folder,
    777                 'cheets_CTS_N.CtsGraphicsTestCases', 'results', 'android-cts')
    778         gts_result_folder = os.path.join(
    779                 host_folder, 'cheets_GTS.google.admin', 'results', 'android-gts')
    780         timestamp_str = '2016.04.28_01.41.44'
    781         timestamp_cts_folder = os.path.join(cts_result_folder, timestamp_str)
    782         timestamp_cts_v2_folder = os.path.join(cts_v2_result_folder, timestamp_str)
    783         timestamp_gts_folder = os.path.join(gts_result_folder, timestamp_str)
    784 
    785         # Test results in cts_result_folder with a different time-stamp.
    786         timestamp_str_2 = '2016.04.28_10.41.44'
    787         timestamp_cts_folder_2 = os.path.join(cts_result_folder, timestamp_str_2)
    788 
    789         for folder in [debug_folder, sysinfo_folder, cts_result_folder,
    790                        timestamp_cts_folder, timestamp_cts_folder_2,
    791                        timestamp_cts_v2_folder, timestamp_gts_folder]:
    792             os.makedirs(folder)
    793 
    794         path_pattern_pair = [(timestamp_cts_folder, gs_offloader.CTS_RESULT_PATTERN),
    795                              (timestamp_cts_folder_2, gs_offloader.CTS_RESULT_PATTERN),
    796                              (timestamp_cts_v2_folder, gs_offloader.CTS_V2_RESULT_PATTERN),
    797                              (timestamp_gts_folder, gs_offloader.CTS_V2_RESULT_PATTERN)]
    798 
    799         # Create timestamp.zip file_path.
    800         cts_zip_file = os.path.join(cts_result_folder, timestamp_str + '.zip')
    801         cts_zip_file_2 = os.path.join(cts_result_folder, timestamp_str_2 + '.zip')
    802         cts_v2_zip_file = os.path.join(cts_v2_result_folder, timestamp_str + '.zip')
    803         gts_zip_file = os.path.join(gts_result_folder, timestamp_str + '.zip')
    804 
    805         # Create xml file_path.
    806         cts_result_file = os.path.join(timestamp_cts_folder, 'testResult.xml')
    807         cts_result_file_2 = os.path.join(timestamp_cts_folder_2,
    808                                          'testResult.xml')
    809         gts_result_file = os.path.join(timestamp_gts_folder, 'test_result.xml')
    810         cts_v2_result_file = os.path.join(timestamp_cts_v2_folder,
    811                                          'test_result.xml')
    812 
    813         for file_path in [cts_zip_file, cts_zip_file_2, cts_v2_zip_file,
    814                           gts_zip_file, cts_result_file, cts_result_file_2,
    815                           gts_result_file, cts_v2_result_file]:
    816             with open(file_path, 'w') as f:
    817                 f.write('test')
    818 
    819         return (results_folder, host_folder, path_pattern_pair)
    820 
    821 
    822     def test__upload_cts_testresult(self):
    823         """Test _upload_cts_testresult."""
    824         results_folder, host_folder, path_pattern_pair = self.create_results_folder()
    825 
    826         self.mox.StubOutWithMock(gs_offloader, '_upload_files')
    827         gs_offloader._upload_files(
    828             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
    829                 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
    830                 ['test', '-d', host_folder])
    831         gs_offloader._upload_files(
    832             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
    833                 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
    834                 ['test', '-d', host_folder])
    835         gs_offloader._upload_files(
    836             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False,
    837                 mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
    838                 ['test', '-d', host_folder])
    839 
    840         self.mox.ReplayAll()
    841         gs_offloader._upload_cts_testresult(results_folder, False)
    842         self.mox.VerifyAll()
    843         shutil.rmtree(results_folder)
    844 
    845 
    846     def test_upload_files(self):
    847         """Test upload_files"""
    848         results_folder, host_folder, path_pattern_pair = self.create_results_folder()
    849 
    850         for path, pattern in path_pattern_pair:
    851             models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
    852                 'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
    853                 'parent_job_id': 'p_id',
    854                 'suite': 'arc-cts'
    855             })
    856 
    857             gs_offloader._get_cmd_list(
    858                 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
    859                     ['test', '-d', path])
    860             gs_offloader._get_cmd_list(
    861                 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
    862                     ['test', '-d', path])
    863 
    864             self.mox.ReplayAll()
    865             gs_offloader._upload_files(host_folder, path, pattern, False,
    866                                        'gs://a-test-bucket/',
    867                                        'gs://a-test-apfe-bucket/')
    868             self.mox.VerifyAll()
    869             self.mox.ResetAll()
    870 
    871         shutil.rmtree(results_folder)
    872 
    873 
    874     def test_get_metrics_fields(self):
    875         """Test method _get_metrics_fields."""
    876         results_folder, host_folder, _ = self.create_results_folder()
    877         models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
    878                 'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
    879                 'parent_job_id': 'p_id',
    880                 'suite': 'arc-cts'
    881             })
    882         try:
    883             self.mox.ReplayAll()
    884             self.assertEqual({'board': 'veyron_minnie-cheets',
    885                               'milestone': 'R52'},
    886                              gs_offloader._get_metrics_fields(host_folder))
    887             self.mox.VerifyAll()
    888         finally:
    889             shutil.rmtree(results_folder)
    890 
    891 
    892 class JobDirectoryOffloadTests(_TempResultsDirTestBase):
    893     """Tests for `_JobDirectory.enqueue_offload()`.
    894 
    895     When testing with a `days_old` parameter of 0, we use
    896     `set_finished()` instead of `set_expired()`.  This causes the
    897     job's timestamp to be set in the future.  This is done so as
    898     to test that when `days_old` is 0, the job is always treated
    899     as eligible for offload, regardless of the timestamp's value.
    900 
    901     Testing covers the following assertions:
    902      A. Each time `enqueue_offload()` is called, a message that
    903         includes the job's directory name will be logged using
    904         `logging.debug()`, regardless of whether the job was
    905         enqueued.  Nothing else is allowed to be logged.
    906      B. If the job is not eligible to be offloaded,
    907         `first_offload_start` and `offload_count` are 0.
    908      C. If the job is not eligible for offload, nothing is
    909         enqueued in `queue`.
    910      D. When the job is offloaded, `offload_count` increments
    911         each time.
    912      E. When the job is offloaded, the appropriate parameters are
    913         enqueued exactly once.
    914      F. The first time a job is offloaded, `first_offload_start` is
    915         set to the current time.
    916      G. `first_offload_start` only changes the first time that the
    917         job is offloaded.
    918 
    919     The test cases below are designed to exercise all of the
    920     meaningful state transitions at least once.
    921 
    922     """
    923 
    924     def setUp(self):
    925         super(JobDirectoryOffloadTests, self).setUp()
    926         self._job = self.make_job(self.REGULAR_JOBLIST[0])
    927         self._queue = Queue.Queue()
    928 
    929 
    930     def _offload_unexpired_job(self, days_old):
    931         """Make calls to `enqueue_offload()` for an unexpired job.
    932 
    933         This method tests assertions B and C that calling
    934         `enqueue_offload()` has no effect.
    935 
    936         """
    937         self.assertEqual(self._job.offload_count, 0)
    938         self.assertEqual(self._job.first_offload_start, 0)
    939         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
    940         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
    941         self.assertTrue(self._queue.empty())
    942         self.assertEqual(self._job.offload_count, 0)
    943         self.assertEqual(self._job.first_offload_start, 0)
    944 
    945 
    946     def _offload_expired_once(self, days_old, count):
    947         """Make one call to `enqueue_offload()` for an expired job.
    948 
    949         This method tests assertions D and E regarding side-effects
    950         expected when a job is offloaded.
    951 
    952         """
    953         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
    954         self.assertEqual(self._job.offload_count, count)
    955         self.assertFalse(self._queue.empty())
    956         v = self._queue.get_nowait()
    957         self.assertTrue(self._queue.empty())
    958         self.assertEqual(v, self._job.queue_args)
    959 
    960 
    961     def _offload_expired_job(self, days_old):
    962         """Make calls to `enqueue_offload()` for a just-expired job.
    963 
    964         This method directly tests assertions F and G regarding
    965         side-effects on `first_offload_start`.
    966 
    967         """
    968         t0 = time.time()
    969         self._offload_expired_once(days_old, 1)
    970         t1 = self._job.first_offload_start
    971         self.assertLessEqual(t1, time.time())
    972         self.assertGreaterEqual(t1, t0)
    973         self._offload_expired_once(days_old, 2)
    974         self.assertEqual(self._job.first_offload_start, t1)
    975         self._offload_expired_once(days_old, 3)
    976         self.assertEqual(self._job.first_offload_start, t1)
    977 
    978 
    979     def test_case_1_no_expiration(self):
    980         """Test a series of `enqueue_offload()` calls with `days_old` of 0.
    981 
    982         This tests that offload works as expected if calls are
    983         made both before and after the job becomes expired.
    984 
    985         """
    986         self._offload_unexpired_job(0)
    987         self._job.set_finished(0)
    988         self._offload_expired_job(0)
    989 
    990 
    991     def test_case_2_no_expiration(self):
    992         """Test a series of `enqueue_offload()` calls with `days_old` of 0.
    993 
    994         This tests that offload works as expected if calls are made
    995         only after the job becomes expired.
    996 
    997         """
    998         self._job.set_finished(0)
    999         self._offload_expired_job(0)
   1000 
   1001 
   1002     def test_case_1_with_expiration(self):
   1003         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1004 
   1005         This tests that offload works as expected if calls are made
   1006         before the job finishes, before the job expires, and after
   1007         the job expires.
   1008 
   1009         """
   1010         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1011         self._job.set_finished(_TEST_EXPIRATION_AGE)
   1012         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1013         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1014         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1015 
   1016 
   1017     def test_case_2_with_expiration(self):
   1018         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1019 
   1020         This tests that offload works as expected if calls are made
   1021         between finishing and expiration, and after the job expires.
   1022 
   1023         """
   1024         self._job.set_finished(_TEST_EXPIRATION_AGE)
   1025         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1026         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1027         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1028 
   1029 
   1030     def test_case_3_with_expiration(self):
   1031         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1032 
   1033         This tests that offload works as expected if calls are made
   1034         only before finishing and after expiration.
   1035 
   1036         """
   1037         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1038         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1039         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1040 
   1041 
   1042     def test_case_4_with_expiration(self):
   1043         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1044 
   1045         This tests that offload works as expected if calls are made
   1046         only after expiration.
   1047 
   1048         """
   1049         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1050         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1051 
   1052 
   1053 class GetJobDirectoriesTests(_TempResultsDirTestBase):
   1054     """Tests for `_JobDirectory.get_job_directories()`."""
   1055 
   1056     def setUp(self):
   1057         super(GetJobDirectoriesTests, self).setUp()
   1058         self.make_job_hierarchy()
   1059         os.mkdir('not-a-job')
   1060         open('not-a-dir', 'w').close()
   1061 
   1062 
   1063     def _run_get_directories(self, cls, expected_list):
   1064         """Test `get_job_directories()` for the given class.
   1065 
   1066         Calls the method, and asserts that the returned list of
   1067         directories matches the expected return value.
   1068 
   1069         @param expected_list Expected return value from the call.
   1070         """
   1071         dirlist = cls.get_job_directories()
   1072         self.assertEqual(set(dirlist), set(expected_list))
   1073 
   1074 
   1075     def test_get_regular_jobs(self):
   1076         """Test `RegularJobDirectory.get_job_directories()`."""
   1077         self._run_get_directories(job_directories.RegularJobDirectory,
   1078                                   self.REGULAR_JOBLIST)
   1079 
   1080 
   1081     def test_get_special_jobs(self):
   1082         """Test `SpecialJobDirectory.get_job_directories()`."""
   1083         self._run_get_directories(job_directories.SpecialJobDirectory,
   1084                                   self.SPECIAL_JOBLIST)
   1085 
   1086 
   1087 class AddJobsTests(_TempResultsDirTestBase):
   1088     """Tests for `Offloader._add_new_jobs()`."""
   1089 
   1090     MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
   1091 
   1092     def setUp(self):
   1093         super(AddJobsTests, self).setUp()
   1094         self._initial_job_names = (
   1095             set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST))
   1096         self.make_job_hierarchy()
   1097         self._offloader = gs_offloader.Offloader(_get_options(['-a']))
   1098         self.mox.StubOutWithMock(logging, 'debug')
   1099 
   1100 
   1101     def _run_add_new_jobs(self, expected_key_set):
   1102         """Basic test assertions for `_add_new_jobs()`.
   1103 
   1104         Asserts the following:
   1105           * The keys in the offloader's `_open_jobs` dictionary
   1106             matches the expected set of keys.
   1107           * For every job in `_open_jobs`, the job has the expected
   1108             directory name.
   1109 
   1110         """
   1111         count = len(expected_key_set) - len(self._offloader._open_jobs)
   1112         logging.debug(mox.IgnoreArg(), count)
   1113         self.mox.ReplayAll()
   1114         self._offloader._add_new_jobs()
   1115         self.assertEqual(expected_key_set,
   1116                          set(self._offloader._open_jobs.keys()))
   1117         for jobkey, job in self._offloader._open_jobs.items():
   1118             self.assertEqual(jobkey, job.dirname)
   1119         self.mox.VerifyAll()
   1120         self.mox.ResetAll()
   1121 
   1122 
   1123     def test_add_jobs_empty(self):
   1124         """Test adding jobs to an empty dictionary.
   1125 
   1126         Calls the offloader's `_add_new_jobs()`, then perform
   1127         the assertions of `self._check_open_jobs()`.
   1128 
   1129         """
   1130         self._run_add_new_jobs(self._initial_job_names)
   1131 
   1132 
   1133     def test_add_jobs_non_empty(self):
   1134         """Test adding jobs to a non-empty dictionary.
   1135 
   1136         Calls the offloader's `_add_new_jobs()` twice; once from
   1137         initial conditions, and then again after adding more
   1138         directories.  After the second call, perform the assertions
   1139         of `self._check_open_jobs()`.  Additionally, assert that
   1140         keys added by the first call still map to their original
   1141         job object after the second call.
   1142 
   1143         """
   1144         self._run_add_new_jobs(self._initial_job_names)
   1145         jobs_copy = self._offloader._open_jobs.copy()
   1146         for d in self.MOREJOBS:
   1147             os.mkdir(d)
   1148         self._run_add_new_jobs(self._initial_job_names |
   1149                                  set(self.MOREJOBS))
   1150         for key in jobs_copy.keys():
   1151             self.assertIs(jobs_copy[key],
   1152                           self._offloader._open_jobs[key])
   1153 
   1154 
   1155 class ReportingTests(_TempResultsDirTestBase):
   1156     """Tests for `Offloader._report_failed_jobs()`."""
   1157 
   1158     def setUp(self):
   1159         super(ReportingTests, self).setUp()
   1160         self._offloader = gs_offloader.Offloader(_get_options([]))
   1161         self.mox.StubOutWithMock(self._offloader, '_log_failed_jobs_locally')
   1162         self.mox.StubOutWithMock(logging, 'debug')
   1163 
   1164 
   1165     def _add_job(self, jobdir):
   1166         """Add a job to the dictionary of unfinished jobs."""
   1167         j = self.make_job(jobdir)
   1168         self._offloader._open_jobs[j.dirname] = j
   1169         return j
   1170 
   1171 
   1172     def _expect_log_message(self, new_open_jobs, with_failures):
   1173         """Mock expected logging calls.
   1174 
   1175         `_report_failed_jobs()` logs one message with the number
   1176         of jobs removed from the open job set and the number of jobs
   1177         still remaining.  Additionally, if there are reportable
   1178         jobs, then it logs the number of jobs that haven't yet
   1179         offloaded.
   1180 
   1181         This sets up the logging calls using `new_open_jobs` to
   1182         figure the job counts.  If `with_failures` is true, then
   1183         the log message is set up assuming that all jobs in
   1184         `new_open_jobs` have offload failures.
   1185 
   1186         @param new_open_jobs New job set for calculating counts
   1187                              in the messages.
   1188         @param with_failures Whether the log message with a
   1189                              failure count is expected.
   1190 
   1191         """
   1192         count = len(self._offloader._open_jobs) - len(new_open_jobs)
   1193         logging.debug(mox.IgnoreArg(), count, len(new_open_jobs))
   1194         if with_failures:
   1195             logging.debug(mox.IgnoreArg(), len(new_open_jobs))
   1196 
   1197 
   1198     def _run_update(self, new_open_jobs):
   1199         """Call `_report_failed_jobs()`.
   1200 
   1201         Initial conditions are set up by the caller.  This calls
   1202         `_report_failed_jobs()` once, and then checks these
   1203         assertions:
   1204           * The offloader's new `_open_jobs` field contains only
   1205             the entries in `new_open_jobs`.
   1206 
   1207         @param new_open_jobs A dictionary representing the expected
   1208                              new value of the offloader's
   1209                              `_open_jobs` field.
   1210         """
   1211         self.mox.ReplayAll()
   1212         self._offloader._report_failed_jobs()
   1213         self._offloader._remove_offloaded_jobs()
   1214         self.assertEqual(self._offloader._open_jobs, new_open_jobs)
   1215         self.mox.VerifyAll()
   1216         self.mox.ResetAll()
   1217 
   1218 
   1219     def _expect_failed_jobs(self, failed_jobs):
   1220         """Mock expected call to log the failed jobs on local disk.
   1221 
   1222         TODO(crbug.com/686904): The fact that we have to mock an internal
   1223         function for this test is evidence that we need to pull out the local
   1224         file formatter in its own object in a future CL.
   1225 
   1226         @param failed_jobs: The list of jobs being logged as failed.
   1227         """
   1228         self._offloader._log_failed_jobs_locally(failed_jobs)
   1229 
   1230 
   1231     def test_no_jobs(self):
   1232         """Test `_report_failed_jobs()` with no open jobs.
   1233 
   1234         Initial conditions are an empty `_open_jobs` list.
   1235         Expected result is an empty `_open_jobs` list.
   1236 
   1237         """
   1238         self._expect_log_message({}, False)
   1239         self._expect_failed_jobs([])
   1240         self._run_update({})
   1241 
   1242 
   1243     def test_all_completed(self):
   1244         """Test `_report_failed_jobs()` with only complete jobs.
   1245 
   1246         Initial conditions are an `_open_jobs` list consisting of only completed
   1247         jobs.
   1248         Expected result is an empty `_open_jobs` list.
   1249 
   1250         """
   1251         for d in self.REGULAR_JOBLIST:
   1252             self._add_job(d).set_complete()
   1253         self._expect_log_message({}, False)
   1254         self._expect_failed_jobs([])
   1255         self._run_update({})
   1256 
   1257 
   1258     def test_none_finished(self):
   1259         """Test `_report_failed_jobs()` with only unfinished jobs.
   1260 
   1261         Initial conditions are an `_open_jobs` list consisting of only
   1262         unfinished jobs.
   1263         Expected result is no change to the `_open_jobs` list.
   1264 
   1265         """
   1266         for d in self.REGULAR_JOBLIST:
   1267             self._add_job(d)
   1268         new_jobs = self._offloader._open_jobs.copy()
   1269         self._expect_log_message(new_jobs, False)
   1270         self._expect_failed_jobs([])
   1271         self._run_update(new_jobs)
   1272 
   1273 
   1274 class GsOffloaderMockTests(_TempResultsDirTestCase):
   1275     """Tests using mock instead of mox."""
   1276 
   1277     def setUp(self):
   1278         super(GsOffloaderMockTests, self).setUp()
   1279         alarm = mock.patch('signal.alarm', return_value=0)
   1280         alarm.start()
   1281         self.addCleanup(alarm.stop)
   1282 
   1283         self._saved_loglevel = logging.getLogger().getEffectiveLevel()
   1284         logging.getLogger().setLevel(logging.CRITICAL + 1)
   1285 
   1286         self._job = self.make_job(self.REGULAR_JOBLIST[0])
   1287 
   1288 
   1289     def test_offload_timeout_early(self):
   1290         """Test that `offload_dir()` times out correctly.
   1291 
   1292         This test triggers timeout at the earliest possible moment,
   1293         at the first call to set the timeout alarm.
   1294 
   1295         """
   1296         signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')]
   1297         with mock.patch.object(gs_offloader, '_upload_cts_testresult',
   1298                                autospec=True) as upload:
   1299             upload.return_value = None
   1300             gs_offloader.GSOffloader(
   1301                     utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
   1302                             self._job.queue_args[0],
   1303                             self._job.queue_args[1],
   1304                             self._job.queue_args[2])
   1305             self.assertTrue(os.path.isdir(self._job.queue_args[0]))
   1306 
   1307 
   1308     # TODO(ayatane): This tests passes when run locally, but it fails
   1309     # when run on trybot.  I have no idea why, but the assert isdir
   1310     # fails.
   1311     #
   1312     # This test is also kind of redundant since we are using the timeout
   1313     # from chromite which has its own tests.
   1314     @unittest.skip('This fails on trybot')
   1315     def test_offload_timeout_late(self):
   1316         """Test that `offload_dir()` times out correctly.
   1317 
   1318         This test triggers timeout at the latest possible moment, at
   1319         the call to clear the timeout alarm.
   1320 
   1321         """
   1322         signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')]
   1323         with mock.patch.object(gs_offloader, '_upload_cts_testresult',
   1324                                autospec=True) as upload, \
   1325              mock.patch.object(gs_offloader, '_get_cmd_list',
   1326                                autospec=True) as get_cmd_list:
   1327             upload.return_value = None
   1328             get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]]
   1329             gs_offloader.GSOffloader(
   1330                     utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
   1331                             self._job.queue_args[0],
   1332                             self._job.queue_args[1],
   1333                             self._job.queue_args[2])
   1334             self.assertTrue(os.path.isdir(self._job.queue_args[0]))
   1335 
   1336 
   1337 
   1338 if __name__ == '__main__':
   1339     unittest.main()
   1340