Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 # Copyright 2016 The Chromium OS Authors. All rights reserved.
      3 # Use of this source code is governed by a BSD-style license that can be
      4 # found in the LICENSE file.
      5 
      6 import __builtin__
      7 import Queue
      8 import datetime
      9 import logging
     10 import os
     11 import shutil
     12 import signal
     13 import stat
     14 import sys
     15 import tempfile
     16 import time
     17 import unittest
     18 
     19 import mock
     20 import mox
     21 
     22 import common
     23 from autotest_lib.client.common_lib import global_config
     24 from autotest_lib.client.common_lib import time_utils
     25 from autotest_lib.client.common_lib import utils
     26 #For unittest without cloud_client.proto compiled.
     27 try:
     28     from autotest_lib.site_utils import cloud_console_client
     29 except ImportError:
     30     cloud_console_client = None
     31 from autotest_lib.site_utils import gs_offloader
     32 from autotest_lib.site_utils import job_directories
     33 from autotest_lib.tko import models
     34 from autotest_lib.utils import gslib
     35 from autotest_lib.site_utils import pubsub_utils
     36 from chromite.lib import timeout_util
     37 
     38 # Test value to use for `days_old`, if nothing else is required.
     39 _TEST_EXPIRATION_AGE = 7
     40 
     41 # When constructing sample time values for testing expiration,
     42 # allow this many seconds between the expiration time and the
     43 # current time.
     44 _MARGIN_SECS = 10.0
     45 
     46 
     47 def _get_options(argv):
     48     """Helper function to exercise command line parsing.
     49 
     50     @param argv Value of sys.argv to be parsed.
     51 
     52     """
     53     sys.argv = ['bogus.py'] + argv
     54     return gs_offloader.parse_options()
     55 
     56 
     57 def is_fifo(path):
     58   """Determines whether a path is a fifo.
     59 
     60   @param path: fifo path string.
     61   """
     62   return stat.S_ISFIFO(os.lstat(path).st_mode)
     63 
     64 
     65 class OffloaderOptionsTests(mox.MoxTestBase):
     66     """Tests for the `Offloader` constructor.
     67 
     68     Tests that offloader instance fields are set as expected
     69     for given command line options.
     70 
     71     """
     72 
     73     _REGULAR_ONLY = set([job_directories.RegularJobDirectory])
     74     _SPECIAL_ONLY = set([job_directories.SpecialJobDirectory])
     75     _BOTH = _REGULAR_ONLY | _SPECIAL_ONLY
     76 
     77 
     78     def setUp(self):
     79         super(OffloaderOptionsTests, self).setUp()
     80         self.mox.StubOutWithMock(utils, 'get_offload_gsuri')
     81         gs_offloader.GS_OFFLOADING_ENABLED = True
     82         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
     83 
     84 
     85     def _mock_get_sub_offloader(self, is_moblab, multiprocessing=False,
     86                                console_client=None, delete_age=0):
     87         """Mock the process of getting the offload_dir function."""
     88         if is_moblab:
     89             expected_gsuri = '%sresults/%s/%s/' % (
     90                     global_config.global_config.get_config_value(
     91                             'CROS', 'image_storage_server'),
     92                     'Fa:ke:ma:c0:12:34', 'rand0m-uu1d')
     93         else:
     94             expected_gsuri = utils.DEFAULT_OFFLOAD_GSURI
     95         utils.get_offload_gsuri().AndReturn(expected_gsuri)
     96         sub_offloader = gs_offloader.GSOffloader(expected_gsuri,
     97             multiprocessing, delete_age, console_client)
     98         self.mox.StubOutWithMock(gs_offloader, 'GSOffloader')
     99         if cloud_console_client:
    100             self.mox.StubOutWithMock(cloud_console_client,
    101                     'is_cloud_notification_enabled')
    102         if console_client:
    103             cloud_console_client.is_cloud_notification_enabled().AndReturn(True)
    104             gs_offloader.GSOffloader(
    105                     expected_gsuri, multiprocessing, delete_age,
    106                     mox.IsA(cloud_console_client.PubSubBasedClient)).AndReturn(
    107                         sub_offloader)
    108         else:
    109             if cloud_console_client:
    110                 cloud_console_client.is_cloud_notification_enabled().AndReturn(
    111                         False)
    112             gs_offloader.GSOffloader(
    113                 expected_gsuri, multiprocessing, delete_age, None).AndReturn(
    114                     sub_offloader)
    115         self.mox.ReplayAll()
    116         return sub_offloader
    117 
    118 
    119     def test_process_no_options(self):
    120         """Test default offloader options."""
    121         sub_offloader = self._mock_get_sub_offloader(False)
    122         offloader = gs_offloader.Offloader(_get_options([]))
    123         self.assertEqual(set(offloader._jobdir_classes),
    124                          self._REGULAR_ONLY)
    125         self.assertEqual(offloader._processes, 1)
    126         self.assertEqual(offloader._gs_offloader,
    127                          sub_offloader)
    128         self.assertEqual(offloader._upload_age_limit, 0)
    129         self.assertEqual(offloader._delete_age_limit, 0)
    130 
    131 
    132     def test_process_all_option(self):
    133         """Test offloader handling for the --all option."""
    134         sub_offloader = self._mock_get_sub_offloader(False)
    135         offloader = gs_offloader.Offloader(_get_options(['--all']))
    136         self.assertEqual(set(offloader._jobdir_classes), self._BOTH)
    137         self.assertEqual(offloader._processes, 1)
    138         self.assertEqual(offloader._gs_offloader,
    139                          sub_offloader)
    140         self.assertEqual(offloader._upload_age_limit, 0)
    141         self.assertEqual(offloader._delete_age_limit, 0)
    142 
    143 
    144     def test_process_hosts_option(self):
    145         """Test offloader handling for the --hosts option."""
    146         sub_offloader = self._mock_get_sub_offloader(False)
    147         offloader = gs_offloader.Offloader(
    148                 _get_options(['--hosts']))
    149         self.assertEqual(set(offloader._jobdir_classes),
    150                          self._SPECIAL_ONLY)
    151         self.assertEqual(offloader._processes, 1)
    152         self.assertEqual(offloader._gs_offloader,
    153                          sub_offloader)
    154         self.assertEqual(offloader._upload_age_limit, 0)
    155         self.assertEqual(offloader._delete_age_limit, 0)
    156 
    157 
    158     def test_parallelism_option(self):
    159         """Test offloader handling for the --parallelism option."""
    160         sub_offloader = self._mock_get_sub_offloader(False)
    161         offloader = gs_offloader.Offloader(
    162                 _get_options(['--parallelism', '2']))
    163         self.assertEqual(set(offloader._jobdir_classes),
    164                          self._REGULAR_ONLY)
    165         self.assertEqual(offloader._processes, 2)
    166         self.assertEqual(offloader._gs_offloader,
    167                          sub_offloader)
    168         self.assertEqual(offloader._upload_age_limit, 0)
    169         self.assertEqual(offloader._delete_age_limit, 0)
    170 
    171 
    172     def test_delete_only_option(self):
    173         """Test offloader handling for the --delete_only option."""
    174         offloader = gs_offloader.Offloader(
    175                 _get_options(['--delete_only']))
    176         self.assertEqual(set(offloader._jobdir_classes),
    177                          self._REGULAR_ONLY)
    178         self.assertEqual(offloader._processes, 1)
    179         self.assertIsInstance(offloader._gs_offloader,
    180                               gs_offloader.FakeGSOffloader)
    181         self.assertEqual(offloader._upload_age_limit, 0)
    182         self.assertEqual(offloader._delete_age_limit, 0)
    183 
    184 
    185     def test_days_old_option(self):
    186         """Test offloader handling for the --days_old option."""
    187         sub_offloader = self._mock_get_sub_offloader(False, delete_age=7)
    188         offloader = gs_offloader.Offloader(
    189                 _get_options(['--days_old', '7']))
    190         self.assertEqual(set(offloader._jobdir_classes),
    191                          self._REGULAR_ONLY)
    192         self.assertEqual(offloader._processes, 1)
    193         self.assertEqual(offloader._gs_offloader,
    194                          sub_offloader)
    195         self.assertEqual(offloader._upload_age_limit, 7)
    196         self.assertEqual(offloader._delete_age_limit, 7)
    197 
    198 
    199     def test_moblab_gsuri_generation(self):
    200         """Test offloader construction for Moblab."""
    201         sub_offloader = self._mock_get_sub_offloader(True)
    202         offloader = gs_offloader.Offloader(_get_options([]))
    203         self.assertEqual(set(offloader._jobdir_classes),
    204                          self._REGULAR_ONLY)
    205         self.assertEqual(offloader._processes, 1)
    206         self.assertEqual(offloader._gs_offloader,
    207                          sub_offloader)
    208         self.assertEqual(offloader._upload_age_limit, 0)
    209         self.assertEqual(offloader._delete_age_limit, 0)
    210 
    211 
    212     def test_globalconfig_offloading_flag(self):
    213         """Test enabling of --delete_only via global_config."""
    214         gs_offloader.GS_OFFLOADING_ENABLED = False
    215         offloader = gs_offloader.Offloader(
    216                 _get_options([]))
    217         self.assertIsInstance(offloader._gs_offloader,
    218                              gs_offloader.FakeGSOffloader)
    219 
    220     def test_offloader_multiprocessing_flag_set(self):
    221         """Test multiprocessing is set."""
    222         sub_offloader = self._mock_get_sub_offloader(True, True)
    223         offloader = gs_offloader.Offloader(_get_options(['-m']))
    224         self.assertEqual(offloader._gs_offloader,
    225                          sub_offloader)
    226         self.mox.VerifyAll()
    227 
    228     def test_offloader_multiprocessing_flag_not_set_default_false(self):
    229         """Test multiprocessing is set."""
    230         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = False
    231         sub_offloader = self._mock_get_sub_offloader(True, False)
    232         offloader = gs_offloader.Offloader(_get_options([]))
    233         self.assertEqual(offloader._gs_offloader,
    234                          sub_offloader)
    235         self.mox.VerifyAll()
    236 
    237     def test_offloader_multiprocessing_flag_not_set_default_true(self):
    238         """Test multiprocessing is set."""
    239         gs_offloader.GS_OFFLOADER_MULTIPROCESSING = True
    240         sub_offloader = self._mock_get_sub_offloader(True, True)
    241         offloader = gs_offloader.Offloader(_get_options([]))
    242         self.assertEqual(offloader._gs_offloader,
    243                          sub_offloader)
    244         self.mox.VerifyAll()
    245 
    246 
    247     def test_offloader_pubsub_enabled(self):
    248         """Test multiprocessing is set."""
    249         if not cloud_console_client:
    250             return
    251         self.mox.StubOutWithMock(pubsub_utils, "PubSubClient")
    252         sub_offloader = self._mock_get_sub_offloader(True, False,
    253                 cloud_console_client.PubSubBasedClient())
    254         offloader = gs_offloader.Offloader(_get_options([]))
    255         self.assertEqual(offloader._gs_offloader,
    256                          sub_offloader)
    257         self.mox.VerifyAll()
    258 
    259 
    260 def _make_timestamp(age_limit, is_expired):
    261     """Create a timestamp for use by `job_directories.is_job_expired()`.
    262 
    263     The timestamp will meet the syntactic requirements for
    264     timestamps used as input to `is_job_expired()`.  If
    265     `is_expired` is true, the timestamp will be older than
    266     `age_limit` days before the current time; otherwise, the
    267     date will be younger.
    268 
    269     @param age_limit    The number of days before expiration of the
    270                         target timestamp.
    271     @param is_expired   Whether the timestamp should be expired
    272                         relative to `age_limit`.
    273 
    274     """
    275     seconds = -_MARGIN_SECS
    276     if is_expired:
    277         seconds = -seconds
    278     delta = datetime.timedelta(days=age_limit, seconds=seconds)
    279     reference_time = datetime.datetime.now() - delta
    280     return reference_time.strftime(time_utils.TIME_FMT)
    281 
    282 
    283 class JobExpirationTests(unittest.TestCase):
    284     """Tests to exercise `job_directories.is_job_expired()`."""
    285 
    286     def test_expired(self):
    287         """Test detection of an expired job."""
    288         timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, True)
    289         self.assertTrue(
    290             job_directories.is_job_expired(
    291                 _TEST_EXPIRATION_AGE, timestamp))
    292 
    293 
    294     def test_alive(self):
    295         """Test detection of a job that's not expired."""
    296         # N.B.  This test may fail if its run time exceeds more than
    297         # about _MARGIN_SECS seconds.
    298         timestamp = _make_timestamp(_TEST_EXPIRATION_AGE, False)
    299         self.assertFalse(
    300             job_directories.is_job_expired(
    301                 _TEST_EXPIRATION_AGE, timestamp))
    302 
    303 
    304 class _MockJobDirectory(job_directories._JobDirectory):
    305     """Subclass of `_JobDirectory` used as a helper for tests."""
    306 
    307     GLOB_PATTERN = '[0-9]*-*'
    308 
    309 
    310     def __init__(self, resultsdir):
    311         """Create new job in initial state."""
    312         super(_MockJobDirectory, self).__init__(resultsdir)
    313         self._timestamp = None
    314         self.queue_args = [resultsdir, os.path.dirname(resultsdir), self._timestamp]
    315 
    316 
    317     def get_timestamp_if_finished(self):
    318         return self._timestamp
    319 
    320 
    321     def set_finished(self, days_old):
    322         """Make this job appear to be finished.
    323 
    324         After calling this function, calls to `enqueue_offload()`
    325         will find this job as finished, but not expired and ready
    326         for offload.  Note that when `days_old` is 0,
    327         `enqueue_offload()` will treat a finished job as eligible
    328         for offload.
    329 
    330         @param days_old The value of the `days_old` parameter that
    331                         will be passed to `enqueue_offload()` for
    332                         testing.
    333 
    334         """
    335         self._timestamp = _make_timestamp(days_old, False)
    336         self.queue_args[2] = self._timestamp
    337 
    338 
    339     def set_expired(self, days_old):
    340         """Make this job eligible to be offloaded.
    341 
    342         After calling this function, calls to `offload` will attempt
    343         to offload this job.
    344 
    345         @param days_old The value of the `days_old` parameter that
    346                         will be passed to `enqueue_offload()` for
    347                         testing.
    348 
    349         """
    350         self._timestamp = _make_timestamp(days_old, True)
    351         self.queue_args[2] = self._timestamp
    352 
    353 
    354     def set_incomplete(self):
    355         """Make this job appear to have failed offload just once."""
    356         self.offload_count += 1
    357         self.first_offload_start = time.time()
    358         if not os.path.isdir(self.dirname):
    359             os.mkdir(self.dirname)
    360 
    361 
    362     def set_reportable(self):
    363         """Make this job be reportable."""
    364         self.set_incomplete()
    365         self.offload_count += 1
    366 
    367 
    368     def set_complete(self):
    369         """Make this job be completed."""
    370         self.offload_count += 1
    371         if os.path.isdir(self.dirname):
    372             os.rmdir(self.dirname)
    373 
    374 
    375     def process_gs_instructions(self):
    376         """Always still offload the job directory."""
    377         return True
    378 
    379 
    380 class CommandListTests(unittest.TestCase):
    381     """Tests for `_get_cmd_list()`."""
    382 
    383     def _command_list_assertions(self, job, use_rsync=True, multi=False):
    384         """Call `_get_cmd_list()` and check the return value.
    385 
    386         Check the following assertions:
    387           * The command name (argv[0]) is 'gsutil'.
    388           * '-m' option (argv[1]) is on when the argument, multi, is True.
    389           * The arguments contain the 'cp' subcommand.
    390           * The next-to-last argument (the source directory) is the
    391             job's `queue_args[0]`.
    392           * The last argument (the destination URL) is the job's
    393             'queue_args[1]'.
    394 
    395         @param job A job with properly calculated arguments to
    396                    `_get_cmd_list()`
    397         @param use_rsync True when using 'rsync'. False when using 'cp'.
    398         @param multi True when using '-m' option for gsutil.
    399 
    400         """
    401         test_bucket_uri = 'gs://a-test-bucket'
    402 
    403         gs_offloader.USE_RSYNC_ENABLED = use_rsync
    404 
    405         command = gs_offloader._get_cmd_list(
    406                 multi, job.queue_args[0],
    407                 os.path.join(test_bucket_uri, job.queue_args[1]))
    408 
    409         self.assertEqual(command[0], 'gsutil')
    410         if multi:
    411             self.assertEqual(command[1], '-m')
    412         self.assertEqual(command[-2], job.queue_args[0])
    413 
    414         if use_rsync:
    415             self.assertTrue('rsync' in command)
    416             self.assertEqual(command[-1],
    417                              os.path.join(test_bucket_uri, job.queue_args[0]))
    418         else:
    419             self.assertTrue('cp' in command)
    420             self.assertEqual(command[-1],
    421                              os.path.join(test_bucket_uri, job.queue_args[1]))
    422 
    423 
    424     def test__get_cmd_list_regular(self):
    425         """Test `_get_cmd_list()` as for a regular job."""
    426         job = _MockJobDirectory('118-debug')
    427         self._command_list_assertions(job)
    428 
    429 
    430     def test__get_cmd_list_special(self):
    431         """Test `_get_cmd_list()` as for a special job."""
    432         job = _MockJobDirectory('hosts/host1/118-reset')
    433         self._command_list_assertions(job)
    434 
    435 
    436     def test_get_cmd_list_regular_no_rsync(self):
    437         """Test `_get_cmd_list()` as for a regular job."""
    438         job = _MockJobDirectory('118-debug')
    439         self._command_list_assertions(job, use_rsync=False)
    440 
    441 
    442     def test_get_cmd_list_special_no_rsync(self):
    443         """Test `_get_cmd_list()` as for a special job."""
    444         job = _MockJobDirectory('hosts/host1/118-reset')
    445         self._command_list_assertions(job, use_rsync=False)
    446 
    447 
    448     def test_get_cmd_list_regular_multi(self):
    449         """Test `_get_cmd_list()` as for a regular job with True multi."""
    450         job = _MockJobDirectory('118-debug')
    451         self._command_list_assertions(job, multi=True)
    452 
    453 
    454     def test__get_cmd_list_special_multi(self):
    455         """Test `_get_cmd_list()` as for a special job with True multi."""
    456         job = _MockJobDirectory('hosts/host1/118-reset')
    457         self._command_list_assertions(job, multi=True)
    458 
    459 
    460 class _MockJob(object):
    461     """Class to mock the return value of `AFE.get_jobs()`."""
    462     def __init__(self, created):
    463         self.created_on = created
    464 
    465 
    466 class _MockHostQueueEntry(object):
    467     """Class to mock the return value of `AFE.get_host_queue_entries()`."""
    468     def __init__(self, finished):
    469         self.finished_on = finished
    470 
    471 
    472 class _MockSpecialTask(object):
    473     """Class to mock the return value of `AFE.get_special_tasks()`."""
    474     def __init__(self, finished):
    475         self.time_finished = finished
    476 
    477 
    478 class JobDirectorySubclassTests(mox.MoxTestBase):
    479     """Test specific to RegularJobDirectory and SpecialJobDirectory.
    480 
    481     This provides coverage for the implementation in both
    482     RegularJobDirectory and SpecialJobDirectory.
    483 
    484     """
    485 
    486     def setUp(self):
    487         super(JobDirectorySubclassTests, self).setUp()
    488         self.mox.StubOutWithMock(job_directories._AFE, 'get_jobs')
    489         self.mox.StubOutWithMock(job_directories._AFE,
    490                                  'get_host_queue_entries')
    491         self.mox.StubOutWithMock(job_directories._AFE,
    492                                  'get_special_tasks')
    493 
    494 
    495     def test_regular_job_fields(self):
    496         """Test the constructor for `RegularJobDirectory`.
    497 
    498         Construct a regular job, and assert that the `dirname`
    499         and `_id` attributes are set as expected.
    500 
    501         """
    502         resultsdir = '118-fubar'
    503         job = job_directories.RegularJobDirectory(resultsdir)
    504         self.assertEqual(job.dirname, resultsdir)
    505         self.assertEqual(job._id, 118)
    506 
    507 
    508     def test_special_job_fields(self):
    509         """Test the constructor for `SpecialJobDirectory`.
    510 
    511         Construct a special job, and assert that the `dirname`
    512         and `_id` attributes are set as expected.
    513 
    514         """
    515         destdir = 'hosts/host1'
    516         resultsdir = destdir + '/118-reset'
    517         job = job_directories.SpecialJobDirectory(resultsdir)
    518         self.assertEqual(job.dirname, resultsdir)
    519         self.assertEqual(job._id, 118)
    520 
    521 
    522     def _check_finished_job(self, jobtime, hqetimes, expected):
    523         """Mock and test behavior of a finished job.
    524 
    525         Initialize the mocks for a call to
    526         `get_timestamp_if_finished()`, then simulate one call.
    527         Assert that the returned timestamp matches the passed
    528         in expected value.
    529 
    530         @param jobtime Time used to construct a _MockJob object.
    531         @param hqetimes List of times used to construct
    532                         _MockHostQueueEntry objects.
    533         @param expected Expected time to be returned by
    534                         get_timestamp_if_finished
    535 
    536         """
    537         job = job_directories.RegularJobDirectory('118-fubar')
    538         job_directories._AFE.get_jobs(
    539                 id=job._id, finished=True).AndReturn(
    540                         [_MockJob(jobtime)])
    541         job_directories._AFE.get_host_queue_entries(
    542                 finished_on__isnull=False,
    543                 job_id=job._id).AndReturn(
    544                         [_MockHostQueueEntry(t) for t in hqetimes])
    545         self.mox.ReplayAll()
    546         self.assertEqual(expected, job.get_timestamp_if_finished())
    547         self.mox.VerifyAll()
    548 
    549 
    550     def test_finished_regular_job(self):
    551         """Test getting the timestamp for a finished regular job.
    552 
    553         Tests the return value for
    554         `RegularJobDirectory.get_timestamp_if_finished()` when
    555         the AFE indicates the job is finished.
    556 
    557         """
    558         created_timestamp = _make_timestamp(1, True)
    559         hqe_timestamp = _make_timestamp(0, True)
    560         self._check_finished_job(created_timestamp,
    561                                  [hqe_timestamp],
    562                                  hqe_timestamp)
    563 
    564 
    565     def test_finished_regular_job_multiple_hqes(self):
    566         """Test getting the timestamp for a regular job with multiple hqes.
    567 
    568         Tests the return value for
    569         `RegularJobDirectory.get_timestamp_if_finished()` when
    570         the AFE indicates the job is finished and the job has multiple host
    571         queue entries.
    572 
    573         Tests that the returned timestamp is the latest timestamp in
    574         the list of HQEs, regardless of the returned order.
    575 
    576         """
    577         created_timestamp = _make_timestamp(2, True)
    578         older_hqe_timestamp = _make_timestamp(1, True)
    579         newer_hqe_timestamp = _make_timestamp(0, True)
    580         hqe_list = [older_hqe_timestamp,
    581                     newer_hqe_timestamp]
    582         self._check_finished_job(created_timestamp,
    583                                  hqe_list,
    584                                  newer_hqe_timestamp)
    585         self.mox.ResetAll()
    586         hqe_list.reverse()
    587         self._check_finished_job(created_timestamp,
    588                                  hqe_list,
    589                                  newer_hqe_timestamp)
    590 
    591 
    592     def test_finished_regular_job_null_finished_times(self):
    593         """Test getting the timestamp for an aborted regular job.
    594 
    595         Tests the return value for
    596         `RegularJobDirectory.get_timestamp_if_finished()` when
    597         the AFE indicates the job is finished and the job has aborted host
    598         queue entries.
    599 
    600         """
    601         timestamp = _make_timestamp(0, True)
    602         self._check_finished_job(timestamp, [], timestamp)
    603 
    604 
    605     def test_unfinished_regular_job(self):
    606         """Test getting the timestamp for an unfinished regular job.
    607 
    608         Tests the return value for
    609         `RegularJobDirectory.get_timestamp_if_finished()` when
    610         the AFE indicates the job is not finished.
    611 
    612         """
    613         job = job_directories.RegularJobDirectory('118-fubar')
    614         job_directories._AFE.get_jobs(
    615                 id=job._id, finished=True).AndReturn([])
    616         self.mox.ReplayAll()
    617         self.assertIsNone(job.get_timestamp_if_finished())
    618         self.mox.VerifyAll()
    619 
    620 
    621     def test_finished_special_job(self):
    622         """Test getting the timestamp for a finished special job.
    623 
    624         Tests the return value for
    625         `SpecialJobDirectory.get_timestamp_if_finished()` when
    626         the AFE indicates the job is finished.
    627 
    628         """
    629         job = job_directories.SpecialJobDirectory(
    630                 'hosts/host1/118-reset')
    631         timestamp = _make_timestamp(0, True)
    632         job_directories._AFE.get_special_tasks(
    633                 id=job._id, is_complete=True).AndReturn(
    634                     [_MockSpecialTask(timestamp)])
    635         self.mox.ReplayAll()
    636         self.assertEqual(timestamp,
    637                          job.get_timestamp_if_finished())
    638         self.mox.VerifyAll()
    639 
    640 
    641     def test_unfinished_special_job(self):
    642         """Test getting the timestamp for an unfinished special job.
    643 
    644         Tests the return value for
    645         `SpecialJobDirectory.get_timestamp_if_finished()` when
    646         the AFE indicates the job is not finished.
    647 
    648         """
    649         job = job_directories.SpecialJobDirectory(
    650                 'hosts/host1/118-reset')
    651         job_directories._AFE.get_special_tasks(
    652                 id=job._id, is_complete=True).AndReturn([])
    653         self.mox.ReplayAll()
    654         self.assertIsNone(job.get_timestamp_if_finished())
    655         self.mox.VerifyAll()
    656 
    657 
    658 class _TempResultsDirTestCase(unittest.TestCase):
    659     """Mixin class for tests using a temporary results directory."""
    660 
    661     REGULAR_JOBLIST = [
    662         '111-fubar', '112-fubar', '113-fubar', '114-snafu']
    663     HOST_LIST = ['host1', 'host2', 'host3']
    664     SPECIAL_JOBLIST = [
    665         'hosts/host1/333-reset', 'hosts/host1/334-reset',
    666         'hosts/host2/444-reset', 'hosts/host3/555-reset']
    667 
    668 
    669     def setUp(self):
    670         super(_TempResultsDirTestCase, self).setUp()
    671         self._resultsroot = tempfile.mkdtemp()
    672         self._cwd = os.getcwd()
    673         os.chdir(self._resultsroot)
    674 
    675 
    676     def tearDown(self):
    677         os.chdir(self._cwd)
    678         shutil.rmtree(self._resultsroot)
    679         super(_TempResultsDirTestCase, self).tearDown()
    680 
    681 
    682     def make_job(self, jobdir):
    683         """Create a job with results in `self._resultsroot`.
    684 
    685         @param jobdir Name of the subdirectory to be created in
    686                       `self._resultsroot`.
    687 
    688         """
    689         os.mkdir(jobdir)
    690         return _MockJobDirectory(jobdir)
    691 
    692 
    693     def make_job_hierarchy(self):
    694         """Create a sample hierarchy of job directories.
    695 
    696         `self.REGULAR_JOBLIST` is a list of directories for regular
    697         jobs to be created; `self.SPECIAL_JOBLIST` is a list of
    698         directories for special jobs to be created.
    699 
    700         """
    701         for d in self.REGULAR_JOBLIST:
    702             os.mkdir(d)
    703         hostsdir = 'hosts'
    704         os.mkdir(hostsdir)
    705         for host in self.HOST_LIST:
    706             os.mkdir(os.path.join(hostsdir, host))
    707         for d in self.SPECIAL_JOBLIST:
    708             os.mkdir(d)
    709 
    710 
    711 class _TempResultsDirTestBase(_TempResultsDirTestCase, mox.MoxTestBase):
    712     """Base Mox test class for tests using a temporary results directory."""
    713 
    714 
    715 class FailedOffloadsLogTest(_TempResultsDirTestBase):
    716     """Test the formatting of failed offloads log file."""
    717     # Below is partial sample of a failed offload log file.  This text is
    718     # deliberately hard-coded and then parsed to create the test data; the idea
    719     # is to make sure the actual text format will be reviewed by a human being.
    720     #
    721     # first offload      count  directory
    722     # --+----1----+----  ----+ ----+----1----+----2----+----3
    723     _SAMPLE_DIRECTORIES_REPORT = '''\
    724     =================== ======  ==============================
    725     2014-03-14 15:09:26      1  118-fubar
    726     2014-03-14 15:19:23      2  117-fubar
    727     2014-03-14 15:29:20      6  116-fubar
    728     2014-03-14 15:39:17     24  115-fubar
    729     2014-03-14 15:49:14    120  114-fubar
    730     2014-03-14 15:59:11    720  113-fubar
    731     2014-03-14 16:09:08   5040  112-fubar
    732     2014-03-14 16:19:05  40320  111-fubar
    733     '''
    734 
    735     def setUp(self):
    736         super(FailedOffloadsLogTest, self).setUp()
    737         self._offloader = gs_offloader.Offloader(_get_options([]))
    738         self._joblist = []
    739         for line in self._SAMPLE_DIRECTORIES_REPORT.split('\n')[1 : -1]:
    740             date_, time_, count, dir_ = line.split()
    741             job = _MockJobDirectory(dir_)
    742             job.offload_count = int(count)
    743             timestruct = time.strptime("%s %s" % (date_, time_),
    744                                        gs_offloader.FAILED_OFFLOADS_TIME_FORMAT)
    745             job.first_offload_start = time.mktime(timestruct)
    746             # enter the jobs in reverse order, to make sure we
    747             # test that the output will be sorted.
    748             self._joblist.insert(0, job)
    749 
    750 
    751     def assert_report_well_formatted(self, report_file):
    752         """Assert that report file is well formatted.
    753 
    754         @param report_file: Path to report file
    755         """
    756         with open(report_file, 'r') as f:
    757             report_lines = f.read().split()
    758 
    759         for end_of_header_index in range(len(report_lines)):
    760             if report_lines[end_of_header_index].startswith('=='):
    761                 break
    762         self.assertLess(end_of_header_index, len(report_lines),
    763                         'Failed to find end-of-header marker in the report')
    764 
    765         relevant_lines = report_lines[end_of_header_index:]
    766         expected_lines = self._SAMPLE_DIRECTORIES_REPORT.split()
    767         self.assertListEqual(relevant_lines, expected_lines)
    768 
    769 
    770     def test_failed_offload_log_format(self):
    771         """Trigger an e-mail report and check its contents."""
    772         log_file = os.path.join(self._resultsroot, 'failed_log')
    773         report = self._offloader._log_failed_jobs_locally(self._joblist,
    774                                                           log_file=log_file)
    775         self.assert_report_well_formatted(log_file)
    776 
    777 
    778     def test_failed_offload_file_overwrite(self):
    779         """Verify that we can saefly overwrite the log file."""
    780         log_file = os.path.join(self._resultsroot, 'failed_log')
    781         with open(log_file, 'w') as f:
    782             f.write('boohoohoo')
    783         report = self._offloader._log_failed_jobs_locally(self._joblist,
    784                                                           log_file=log_file)
    785         self.assert_report_well_formatted(log_file)
    786 
    787 
    788 class OffloadDirectoryTests(_TempResultsDirTestBase):
    789     """Tests for `offload_dir()`."""
    790 
    791     def setUp(self):
    792         super(OffloadDirectoryTests, self).setUp()
    793         # offload_dir() logs messages; silence them.
    794         self._saved_loglevel = logging.getLogger().getEffectiveLevel()
    795         logging.getLogger().setLevel(logging.CRITICAL+1)
    796         self._job = self.make_job(self.REGULAR_JOBLIST[0])
    797         self.mox.StubOutWithMock(gs_offloader, '_get_cmd_list')
    798         alarm = mock.patch('signal.alarm', return_value=0)
    799         alarm.start()
    800         self.addCleanup(alarm.stop)
    801         self.mox.StubOutWithMock(models.test, 'parse_job_keyval')
    802 
    803 
    804     def tearDown(self):
    805         logging.getLogger().setLevel(self._saved_loglevel)
    806         super(OffloadDirectoryTests, self).tearDown()
    807 
    808     def _mock__upload_cts_testresult(self):
    809         self.mox.StubOutWithMock(gs_offloader, '_upload_cts_testresult')
    810         gs_offloader._upload_cts_testresult(
    811                 mox.IgnoreArg(),mox.IgnoreArg()).AndReturn(None)
    812 
    813     def _mock_create_marker_file(self):
    814         self.mox.StubOutWithMock(__builtin__, 'open')
    815         open(mox.IgnoreArg(), 'a').AndReturn(mock.MagicMock())
    816 
    817 
    818     def _mock_offload_dir_calls(self, command, queue_args,
    819                                 marker_initially_exists=False):
    820         """Mock out the calls needed by `offload_dir()`.
    821 
    822         This covers only the calls made when there is no timeout.
    823 
    824         @param command Command list to be returned by the mocked
    825                        call to `_get_cmd_list()`.
    826 
    827         """
    828         self.mox.StubOutWithMock(os.path, 'isfile')
    829         os.path.isfile(mox.IgnoreArg()).AndReturn(marker_initially_exists)
    830         command.append(queue_args[0])
    831         gs_offloader._get_cmd_list(
    832                 False, queue_args[0],
    833                 '%s%s' % (utils.DEFAULT_OFFLOAD_GSURI,
    834                           queue_args[1])).AndReturn(command)
    835         self._mock__upload_cts_testresult()
    836 
    837 
    838     def _run_offload_dir(self, should_succeed, delete_age):
    839         """Make one call to `offload_dir()`.
    840 
    841         The caller ensures all mocks are set up already.
    842 
    843         @param should_succeed True iff the call to `offload_dir()`
    844                               is expected to succeed and remove the
    845                               offloaded job directory.
    846 
    847         """
    848         self.mox.ReplayAll()
    849         gs_offloader.GSOffloader(
    850                 utils.DEFAULT_OFFLOAD_GSURI, False, delete_age).offload(
    851                         self._job.queue_args[0],
    852                         self._job.queue_args[1],
    853                         self._job.queue_args[2])
    854         self.mox.VerifyAll()
    855         self.assertEqual(not should_succeed,
    856                          os.path.isdir(self._job.queue_args[0]))
    857 
    858 
    859     def test_offload_success(self):
    860         """Test that `offload_dir()` can succeed correctly."""
    861         self._mock_offload_dir_calls(['test', '-d'],
    862                                      self._job.queue_args)
    863         os.path.isfile(mox.IgnoreArg()).AndReturn(True)
    864         self._mock_create_marker_file()
    865         self._run_offload_dir(True, 0)
    866 
    867 
    868     def test_offload_failure(self):
    869         """Test that `offload_dir()` can fail correctly."""
    870         self._mock_offload_dir_calls(['test', '!', '-d'],
    871                                      self._job.queue_args)
    872         self._run_offload_dir(False, 0)
    873 
    874 
    875     def test_sanitize_dir(self):
    876         """Test that folder/file name with invalid character can be corrected.
    877         """
    878         results_folder = tempfile.mkdtemp()
    879         invalid_chars = '_'.join(['[', ']', '*', '?', '#'])
    880         invalid_files = []
    881         invalid_folder_name = 'invalid_name_folder_%s' % invalid_chars
    882         invalid_folder = os.path.join(
    883                 results_folder,
    884                 invalid_folder_name)
    885         invalid_files.append(os.path.join(
    886                 invalid_folder,
    887                 'invalid_name_file_%s' % invalid_chars))
    888         good_folder =  os.path.join(results_folder, 'valid_name_folder')
    889         good_file = os.path.join(good_folder, 'valid_name_file')
    890         for folder in [invalid_folder, good_folder]:
    891             os.makedirs(folder)
    892         for f in invalid_files + [good_file]:
    893             with open(f, 'w'):
    894                 pass
    895         # check that broken symlinks don't break sanitization
    896         symlink = os.path.join(invalid_folder, 'broken-link')
    897         os.symlink(os.path.join(results_folder, 'no-such-file'),
    898                    symlink)
    899         fifo1 = os.path.join(results_folder, 'test_fifo1')
    900         fifo2 = os.path.join(good_folder, 'test_fifo2')
    901         fifo3 = os.path.join(invalid_folder, 'test_fifo3')
    902         invalid_fifo4_name = 'test_fifo4_%s' % invalid_chars
    903         fifo4 = os.path.join(invalid_folder, invalid_fifo4_name)
    904         os.mkfifo(fifo1)
    905         os.mkfifo(fifo2)
    906         os.mkfifo(fifo3)
    907         os.mkfifo(fifo4)
    908         gs_offloader.sanitize_dir(results_folder)
    909         for _, dirs, files in os.walk(results_folder):
    910             for name in dirs + files:
    911                 self.assertEqual(name, gslib.escape(name))
    912                 for c in name:
    913                     self.assertFalse(c in ['[', ']', '*', '?', '#'])
    914         self.assertTrue(os.path.exists(good_file))
    915 
    916         self.assertTrue(os.path.exists(fifo1))
    917         self.assertFalse(is_fifo(fifo1))
    918         self.assertTrue(os.path.exists(fifo2))
    919         self.assertFalse(is_fifo(fifo2))
    920         corrected_folder = os.path.join(
    921                 results_folder, gslib.escape(invalid_folder_name))
    922         corrected_fifo3 = os.path.join(
    923                 corrected_folder,
    924                 'test_fifo3')
    925         self.assertFalse(os.path.exists(fifo3))
    926         self.assertTrue(os.path.exists(corrected_fifo3))
    927         self.assertFalse(is_fifo(corrected_fifo3))
    928         corrected_fifo4 = os.path.join(
    929                 corrected_folder, gslib.escape(invalid_fifo4_name))
    930         self.assertFalse(os.path.exists(fifo4))
    931         self.assertTrue(os.path.exists(corrected_fifo4))
    932         self.assertFalse(is_fifo(corrected_fifo4))
    933 
    934         corrected_symlink = os.path.join(
    935                 corrected_folder,
    936                 'broken-link')
    937         self.assertFalse(os.path.lexists(symlink))
    938         self.assertTrue(os.path.exists(corrected_symlink))
    939         self.assertFalse(os.path.islink(corrected_symlink))
    940         shutil.rmtree(results_folder)
    941 
    942 
    943     def check_limit_file_count(self, is_test_job=True):
    944         """Test that folder with too many files can be compressed.
    945 
    946         @param is_test_job: True to check the method with test job result
    947                             folder. Set to False for special task folder.
    948         """
    949         results_folder = tempfile.mkdtemp()
    950         host_folder = os.path.join(
    951                 results_folder,
    952                 'lab1-host1' if is_test_job else 'hosts/lab1-host1/1-repair')
    953         debug_folder = os.path.join(host_folder, 'debug')
    954         sysinfo_folder = os.path.join(host_folder, 'sysinfo')
    955         for folder in [debug_folder, sysinfo_folder]:
    956             os.makedirs(folder)
    957             for i in range(10):
    958                 with open(os.path.join(folder, str(i)), 'w') as f:
    959                     f.write('test')
    960 
    961         gs_offloader._MAX_FILE_COUNT = 100
    962         gs_offloader.limit_file_count(
    963                 results_folder if is_test_job else host_folder)
    964         self.assertTrue(os.path.exists(sysinfo_folder))
    965 
    966         gs_offloader._MAX_FILE_COUNT = 10
    967         gs_offloader.limit_file_count(
    968                 results_folder if is_test_job else host_folder)
    969         self.assertFalse(os.path.exists(sysinfo_folder))
    970         self.assertTrue(os.path.exists(sysinfo_folder + '.tgz'))
    971         self.assertTrue(os.path.exists(debug_folder))
    972 
    973         shutil.rmtree(results_folder)
    974 
    975 
    976     def test_limit_file_count(self):
    977         """Test that folder with too many files can be compressed.
    978         """
    979         self.check_limit_file_count(is_test_job=True)
    980         self.check_limit_file_count(is_test_job=False)
    981 
    982 
    983     def test_is_valid_result(self):
    984         """Test _is_valid_result."""
    985         release_build = 'veyron_minnie-cheets-release/R52-8248.0.0'
    986         pfq_build = 'cyan-cheets-android-pfq/R54-8623.0.0-rc1'
    987         trybot_build = 'trybot-samus-release/R54-8640.0.0-b5092'
    988         trybot_2_build = 'trybot-samus-pfq/R54-8640.0.0-b5092'
    989         release_2_build = 'test-trybot-release/R54-8640.0.0-b5092'
    990         self.assertTrue(gs_offloader._is_valid_result(
    991             release_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
    992         self.assertTrue(gs_offloader._is_valid_result(
    993             release_build, gs_offloader.CTS_RESULT_PATTERN, 'test_that_wrapper'))
    994         self.assertFalse(gs_offloader._is_valid_result(
    995             release_build, gs_offloader.CTS_RESULT_PATTERN, 'bvt-arc'))
    996         self.assertTrue(gs_offloader._is_valid_result(
    997             release_build, gs_offloader.CTS_V2_RESULT_PATTERN, 'arc-gts'))
    998         self.assertFalse(gs_offloader._is_valid_result(
    999             None, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
   1000         self.assertFalse(gs_offloader._is_valid_result(
   1001             release_build, gs_offloader.CTS_RESULT_PATTERN, None))
   1002         self.assertFalse(gs_offloader._is_valid_result(
   1003             pfq_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
   1004         self.assertFalse(gs_offloader._is_valid_result(
   1005             trybot_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
   1006         self.assertFalse(gs_offloader._is_valid_result(
   1007             trybot_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
   1008         self.assertTrue(gs_offloader._is_valid_result(
   1009             release_2_build, gs_offloader.CTS_RESULT_PATTERN, 'arc-cts'))
   1010 
   1011 
   1012     def create_results_folder(self):
   1013         """Create CTS/GTS results folders."""
   1014         results_folder = tempfile.mkdtemp()
   1015         host_folder = os.path.join(results_folder, 'chromeos4-row9-rack11-host22')
   1016         debug_folder = os.path.join(host_folder, 'debug')
   1017         sysinfo_folder = os.path.join(host_folder, 'sysinfo')
   1018         cts_result_folder = os.path.join(
   1019                 host_folder, 'cheets_CTS.android.dpi', 'results', 'cts-results')
   1020         cts_v2_result_folder = os.path.join(host_folder,
   1021                 'cheets_CTS_N.CtsGraphicsTestCases', 'results', 'android-cts')
   1022         gts_result_folder = os.path.join(
   1023                 host_folder, 'cheets_GTS.google.admin', 'results', 'android-gts')
   1024         timestamp_str = '2016.04.28_01.41.44'
   1025         timestamp_cts_folder = os.path.join(cts_result_folder, timestamp_str)
   1026         timestamp_cts_v2_folder = os.path.join(cts_v2_result_folder, timestamp_str)
   1027         timestamp_gts_folder = os.path.join(gts_result_folder, timestamp_str)
   1028 
   1029         # Test results in cts_result_folder with a different time-stamp.
   1030         timestamp_str_2 = '2016.04.28_10.41.44'
   1031         timestamp_cts_folder_2 = os.path.join(cts_result_folder, timestamp_str_2)
   1032 
   1033         for folder in [debug_folder, sysinfo_folder, cts_result_folder,
   1034                        timestamp_cts_folder, timestamp_cts_folder_2,
   1035                        timestamp_cts_v2_folder, timestamp_gts_folder]:
   1036             os.makedirs(folder)
   1037 
   1038         path_pattern_pair = [(timestamp_cts_folder, gs_offloader.CTS_RESULT_PATTERN),
   1039                              (timestamp_cts_folder_2, gs_offloader.CTS_RESULT_PATTERN),
   1040                              (timestamp_cts_v2_folder, gs_offloader.CTS_V2_RESULT_PATTERN),
   1041                              (timestamp_gts_folder, gs_offloader.CTS_V2_RESULT_PATTERN)]
   1042 
   1043         # Create timestamp.zip file_path.
   1044         cts_zip_file = os.path.join(cts_result_folder, timestamp_str + '.zip')
   1045         cts_zip_file_2 = os.path.join(cts_result_folder, timestamp_str_2 + '.zip')
   1046         cts_v2_zip_file = os.path.join(cts_v2_result_folder, timestamp_str + '.zip')
   1047         gts_zip_file = os.path.join(gts_result_folder, timestamp_str + '.zip')
   1048 
   1049         # Create xml file_path.
   1050         cts_result_file = os.path.join(timestamp_cts_folder, 'testResult.xml')
   1051         cts_result_file_2 = os.path.join(timestamp_cts_folder_2,
   1052                                          'testResult.xml')
   1053         gts_result_file = os.path.join(timestamp_gts_folder, 'test_result.xml')
   1054         cts_v2_result_file = os.path.join(timestamp_cts_v2_folder,
   1055                                          'test_result.xml')
   1056 
   1057         for file_path in [cts_zip_file, cts_zip_file_2, cts_v2_zip_file,
   1058                           gts_zip_file, cts_result_file, cts_result_file_2,
   1059                           gts_result_file, cts_v2_result_file]:
   1060             with open(file_path, 'w') as f:
   1061                 f.write('test')
   1062 
   1063         return (results_folder, host_folder, path_pattern_pair)
   1064 
   1065 
   1066     def test__upload_cts_testresult(self):
   1067         """Test _upload_cts_testresult."""
   1068         results_folder, host_folder, path_pattern_pair = self.create_results_folder()
   1069 
   1070         self.mox.StubOutWithMock(gs_offloader, '_upload_files')
   1071         gs_offloader._upload_files(
   1072             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
   1073                 ['test', '-d', host_folder])
   1074         gs_offloader._upload_files(
   1075             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
   1076                 ['test', '-d', host_folder])
   1077         gs_offloader._upload_files(
   1078             mox.IgnoreArg(), mox.IgnoreArg(), mox.IgnoreArg(), False).AndReturn(
   1079                 ['test', '-d', host_folder])
   1080 
   1081         self.mox.ReplayAll()
   1082         gs_offloader._upload_cts_testresult(results_folder, False)
   1083         self.mox.VerifyAll()
   1084         shutil.rmtree(results_folder)
   1085 
   1086 
   1087     def test_upload_files(self):
   1088         """Test upload_files"""
   1089         results_folder, host_folder, path_pattern_pair = self.create_results_folder()
   1090 
   1091         for path, pattern in path_pattern_pair:
   1092             models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
   1093                 'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
   1094                 'parent_job_id': 'p_id',
   1095                 'suite': 'arc-cts'
   1096             })
   1097 
   1098             gs_offloader._get_cmd_list(
   1099                 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
   1100                     ['test', '-d', path])
   1101             gs_offloader._get_cmd_list(
   1102                 False, mox.IgnoreArg(), mox.IgnoreArg()).AndReturn(
   1103                     ['test', '-d', path])
   1104 
   1105             self.mox.ReplayAll()
   1106             gs_offloader._upload_files(host_folder, path, pattern, False)
   1107             self.mox.VerifyAll()
   1108             self.mox.ResetAll()
   1109 
   1110         shutil.rmtree(results_folder)
   1111 
   1112 
   1113     def test_get_metrics_fields(self):
   1114         """Test method _get_metrics_fields."""
   1115         results_folder, host_folder, _ = self.create_results_folder()
   1116         models.test.parse_job_keyval(mox.IgnoreArg()).AndReturn({
   1117                 'build': 'veyron_minnie-cheets-release/R52-8248.0.0',
   1118                 'parent_job_id': 'p_id',
   1119                 'suite': 'arc-cts'
   1120             })
   1121         try:
   1122             self.mox.ReplayAll()
   1123             self.assertEqual({'board': 'veyron_minnie-cheets',
   1124                               'milestone': 'R52'},
   1125                              gs_offloader._get_metrics_fields(host_folder))
   1126             self.mox.VerifyAll()
   1127         finally:
   1128             shutil.rmtree(results_folder)
   1129 
   1130 
   1131 class JobDirectoryOffloadTests(_TempResultsDirTestBase):
   1132     """Tests for `_JobDirectory.enqueue_offload()`.
   1133 
   1134     When testing with a `days_old` parameter of 0, we use
   1135     `set_finished()` instead of `set_expired()`.  This causes the
   1136     job's timestamp to be set in the future.  This is done so as
   1137     to test that when `days_old` is 0, the job is always treated
   1138     as eligible for offload, regardless of the timestamp's value.
   1139 
   1140     Testing covers the following assertions:
   1141      A. Each time `enqueue_offload()` is called, a message that
   1142         includes the job's directory name will be logged using
   1143         `logging.debug()`, regardless of whether the job was
   1144         enqueued.  Nothing else is allowed to be logged.
   1145      B. If the job is not eligible to be offloaded,
   1146         `first_offload_start` and `offload_count` are 0.
   1147      C. If the job is not eligible for offload, nothing is
   1148         enqueued in `queue`.
   1149      D. When the job is offloaded, `offload_count` increments
   1150         each time.
   1151      E. When the job is offloaded, the appropriate parameters are
   1152         enqueued exactly once.
   1153      F. The first time a job is offloaded, `first_offload_start` is
   1154         set to the current time.
   1155      G. `first_offload_start` only changes the first time that the
   1156         job is offloaded.
   1157 
   1158     The test cases below are designed to exercise all of the
   1159     meaningful state transitions at least once.
   1160 
   1161     """
   1162 
   1163     def setUp(self):
   1164         super(JobDirectoryOffloadTests, self).setUp()
   1165         self._job = self.make_job(self.REGULAR_JOBLIST[0])
   1166         self._queue = Queue.Queue()
   1167 
   1168 
   1169     def _offload_unexpired_job(self, days_old):
   1170         """Make calls to `enqueue_offload()` for an unexpired job.
   1171 
   1172         This method tests assertions B and C that calling
   1173         `enqueue_offload()` has no effect.
   1174 
   1175         """
   1176         self.assertEqual(self._job.offload_count, 0)
   1177         self.assertEqual(self._job.first_offload_start, 0)
   1178         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
   1179         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
   1180         self.assertTrue(self._queue.empty())
   1181         self.assertEqual(self._job.offload_count, 0)
   1182         self.assertEqual(self._job.first_offload_start, 0)
   1183 
   1184 
   1185     def _offload_expired_once(self, days_old, count):
   1186         """Make one call to `enqueue_offload()` for an expired job.
   1187 
   1188         This method tests assertions D and E regarding side-effects
   1189         expected when a job is offloaded.
   1190 
   1191         """
   1192         gs_offloader._enqueue_offload(self._job, self._queue, days_old)
   1193         self.assertEqual(self._job.offload_count, count)
   1194         self.assertFalse(self._queue.empty())
   1195         v = self._queue.get_nowait()
   1196         self.assertTrue(self._queue.empty())
   1197         self.assertEqual(v, self._job.queue_args)
   1198 
   1199 
   1200     def _offload_expired_job(self, days_old):
   1201         """Make calls to `enqueue_offload()` for a just-expired job.
   1202 
   1203         This method directly tests assertions F and G regarding
   1204         side-effects on `first_offload_start`.
   1205 
   1206         """
   1207         t0 = time.time()
   1208         self._offload_expired_once(days_old, 1)
   1209         t1 = self._job.first_offload_start
   1210         self.assertLessEqual(t1, time.time())
   1211         self.assertGreaterEqual(t1, t0)
   1212         self._offload_expired_once(days_old, 2)
   1213         self.assertEqual(self._job.first_offload_start, t1)
   1214         self._offload_expired_once(days_old, 3)
   1215         self.assertEqual(self._job.first_offload_start, t1)
   1216 
   1217 
   1218     def test_case_1_no_expiration(self):
   1219         """Test a series of `enqueue_offload()` calls with `days_old` of 0.
   1220 
   1221         This tests that offload works as expected if calls are
   1222         made both before and after the job becomes expired.
   1223 
   1224         """
   1225         self._offload_unexpired_job(0)
   1226         self._job.set_finished(0)
   1227         self._offload_expired_job(0)
   1228 
   1229 
   1230     def test_case_2_no_expiration(self):
   1231         """Test a series of `enqueue_offload()` calls with `days_old` of 0.
   1232 
   1233         This tests that offload works as expected if calls are made
   1234         only after the job becomes expired.
   1235 
   1236         """
   1237         self._job.set_finished(0)
   1238         self._offload_expired_job(0)
   1239 
   1240 
   1241     def test_case_1_with_expiration(self):
   1242         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1243 
   1244         This tests that offload works as expected if calls are made
   1245         before the job finishes, before the job expires, and after
   1246         the job expires.
   1247 
   1248         """
   1249         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1250         self._job.set_finished(_TEST_EXPIRATION_AGE)
   1251         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1252         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1253         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1254 
   1255 
   1256     def test_case_2_with_expiration(self):
   1257         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1258 
   1259         This tests that offload works as expected if calls are made
   1260         between finishing and expiration, and after the job expires.
   1261 
   1262         """
   1263         self._job.set_finished(_TEST_EXPIRATION_AGE)
   1264         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1265         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1266         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1267 
   1268 
   1269     def test_case_3_with_expiration(self):
   1270         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1271 
   1272         This tests that offload works as expected if calls are made
   1273         only before finishing and after expiration.
   1274 
   1275         """
   1276         self._offload_unexpired_job(_TEST_EXPIRATION_AGE)
   1277         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1278         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1279 
   1280 
   1281     def test_case_4_with_expiration(self):
   1282         """Test a series of `enqueue_offload()` calls with `days_old` non-zero.
   1283 
   1284         This tests that offload works as expected if calls are made
   1285         only after expiration.
   1286 
   1287         """
   1288         self._job.set_expired(_TEST_EXPIRATION_AGE)
   1289         self._offload_expired_job(_TEST_EXPIRATION_AGE)
   1290 
   1291 
   1292 class GetJobDirectoriesTests(_TempResultsDirTestBase):
   1293     """Tests for `_JobDirectory.get_job_directories()`."""
   1294 
   1295     def setUp(self):
   1296         super(GetJobDirectoriesTests, self).setUp()
   1297         self.make_job_hierarchy()
   1298         os.mkdir('not-a-job')
   1299         open('not-a-dir', 'w').close()
   1300 
   1301 
   1302     def _run_get_directories(self, cls, expected_list):
   1303         """Test `get_job_directories()` for the given class.
   1304 
   1305         Calls the method, and asserts that the returned list of
   1306         directories matches the expected return value.
   1307 
   1308         @param expected_list Expected return value from the call.
   1309         """
   1310         dirlist = cls.get_job_directories()
   1311         self.assertEqual(set(dirlist), set(expected_list))
   1312 
   1313 
   1314     def test_get_regular_jobs(self):
   1315         """Test `RegularJobDirectory.get_job_directories()`."""
   1316         self._run_get_directories(job_directories.RegularJobDirectory,
   1317                                   self.REGULAR_JOBLIST)
   1318 
   1319 
   1320     def test_get_special_jobs(self):
   1321         """Test `SpecialJobDirectory.get_job_directories()`."""
   1322         self._run_get_directories(job_directories.SpecialJobDirectory,
   1323                                   self.SPECIAL_JOBLIST)
   1324 
   1325 
   1326 class AddJobsTests(_TempResultsDirTestBase):
   1327     """Tests for `Offloader._add_new_jobs()`."""
   1328 
   1329     MOREJOBS = ['115-fubar', '116-fubar', '117-fubar', '118-snafu']
   1330 
   1331     def setUp(self):
   1332         super(AddJobsTests, self).setUp()
   1333         self._initial_job_names = (
   1334             set(self.REGULAR_JOBLIST) | set(self.SPECIAL_JOBLIST))
   1335         self.make_job_hierarchy()
   1336         self._offloader = gs_offloader.Offloader(_get_options(['-a']))
   1337         self.mox.StubOutWithMock(logging, 'debug')
   1338 
   1339 
   1340     def _run_add_new_jobs(self, expected_key_set):
   1341         """Basic test assertions for `_add_new_jobs()`.
   1342 
   1343         Asserts the following:
   1344           * The keys in the offloader's `_open_jobs` dictionary
   1345             matches the expected set of keys.
   1346           * For every job in `_open_jobs`, the job has the expected
   1347             directory name.
   1348 
   1349         """
   1350         count = len(expected_key_set) - len(self._offloader._open_jobs)
   1351         logging.debug(mox.IgnoreArg(), count)
   1352         self.mox.ReplayAll()
   1353         self._offloader._add_new_jobs()
   1354         self.assertEqual(expected_key_set,
   1355                          set(self._offloader._open_jobs.keys()))
   1356         for jobkey, job in self._offloader._open_jobs.items():
   1357             self.assertEqual(jobkey, job.dirname)
   1358         self.mox.VerifyAll()
   1359         self.mox.ResetAll()
   1360 
   1361 
   1362     def test_add_jobs_empty(self):
   1363         """Test adding jobs to an empty dictionary.
   1364 
   1365         Calls the offloader's `_add_new_jobs()`, then perform
   1366         the assertions of `self._check_open_jobs()`.
   1367 
   1368         """
   1369         self._run_add_new_jobs(self._initial_job_names)
   1370 
   1371 
   1372     def test_add_jobs_non_empty(self):
   1373         """Test adding jobs to a non-empty dictionary.
   1374 
   1375         Calls the offloader's `_add_new_jobs()` twice; once from
   1376         initial conditions, and then again after adding more
   1377         directories.  After the second call, perform the assertions
   1378         of `self._check_open_jobs()`.  Additionally, assert that
   1379         keys added by the first call still map to their original
   1380         job object after the second call.
   1381 
   1382         """
   1383         self._run_add_new_jobs(self._initial_job_names)
   1384         jobs_copy = self._offloader._open_jobs.copy()
   1385         for d in self.MOREJOBS:
   1386             os.mkdir(d)
   1387         self._run_add_new_jobs(self._initial_job_names |
   1388                                  set(self.MOREJOBS))
   1389         for key in jobs_copy.keys():
   1390             self.assertIs(jobs_copy[key],
   1391                           self._offloader._open_jobs[key])
   1392 
   1393 
   1394 class ReportingTests(_TempResultsDirTestBase):
   1395     """Tests for `Offloader._report_failed_jobs()`."""
   1396 
   1397     def setUp(self):
   1398         super(ReportingTests, self).setUp()
   1399         self._offloader = gs_offloader.Offloader(_get_options([]))
   1400         self.mox.StubOutWithMock(self._offloader, '_log_failed_jobs_locally')
   1401         self.mox.StubOutWithMock(logging, 'debug')
   1402 
   1403 
   1404     def _add_job(self, jobdir):
   1405         """Add a job to the dictionary of unfinished jobs."""
   1406         j = self.make_job(jobdir)
   1407         self._offloader._open_jobs[j.dirname] = j
   1408         return j
   1409 
   1410 
   1411     def _expect_log_message(self, new_open_jobs, with_failures):
   1412         """Mock expected logging calls.
   1413 
   1414         `_report_failed_jobs()` logs one message with the number
   1415         of jobs removed from the open job set and the number of jobs
   1416         still remaining.  Additionally, if there are reportable
   1417         jobs, then it logs the number of jobs that haven't yet
   1418         offloaded.
   1419 
   1420         This sets up the logging calls using `new_open_jobs` to
   1421         figure the job counts.  If `with_failures` is true, then
   1422         the log message is set up assuming that all jobs in
   1423         `new_open_jobs` have offload failures.
   1424 
   1425         @param new_open_jobs New job set for calculating counts
   1426                              in the messages.
   1427         @param with_failures Whether the log message with a
   1428                              failure count is expected.
   1429 
   1430         """
   1431         count = len(self._offloader._open_jobs) - len(new_open_jobs)
   1432         logging.debug(mox.IgnoreArg(), count, len(new_open_jobs))
   1433         if with_failures:
   1434             logging.debug(mox.IgnoreArg(), len(new_open_jobs))
   1435 
   1436 
   1437     def _run_update(self, new_open_jobs):
   1438         """Call `_report_failed_jobs()`.
   1439 
   1440         Initial conditions are set up by the caller.  This calls
   1441         `_report_failed_jobs()` once, and then checks these
   1442         assertions:
   1443           * The offloader's new `_open_jobs` field contains only
   1444             the entries in `new_open_jobs`.
   1445 
   1446         @param new_open_jobs A dictionary representing the expected
   1447                              new value of the offloader's
   1448                              `_open_jobs` field.
   1449         """
   1450         self.mox.ReplayAll()
   1451         self._offloader._report_failed_jobs()
   1452         self._offloader._remove_offloaded_jobs()
   1453         self.assertEqual(self._offloader._open_jobs, new_open_jobs)
   1454         self.mox.VerifyAll()
   1455         self.mox.ResetAll()
   1456 
   1457 
   1458     def _expect_failed_jobs(self, failed_jobs):
   1459         """Mock expected call to log the failed jobs on local disk.
   1460 
   1461         TODO(crbug.com/686904): The fact that we have to mock an internal
   1462         function for this test is evidence that we need to pull out the local
   1463         file formatter in its own object in a future CL.
   1464 
   1465         @param failed_jobs: The list of jobs being logged as failed.
   1466         """
   1467         self._offloader._log_failed_jobs_locally(failed_jobs)
   1468 
   1469 
   1470     def test_no_jobs(self):
   1471         """Test `_report_failed_jobs()` with no open jobs.
   1472 
   1473         Initial conditions are an empty `_open_jobs` list.
   1474         Expected result is an empty `_open_jobs` list.
   1475 
   1476         """
   1477         self._expect_log_message({}, False)
   1478         self._expect_failed_jobs([])
   1479         self._run_update({})
   1480 
   1481 
   1482     def test_all_completed(self):
   1483         """Test `_report_failed_jobs()` with only complete jobs.
   1484 
   1485         Initial conditions are an `_open_jobs` list consisting of only completed
   1486         jobs.
   1487         Expected result is an empty `_open_jobs` list.
   1488 
   1489         """
   1490         for d in self.REGULAR_JOBLIST:
   1491             self._add_job(d).set_complete()
   1492         self._expect_log_message({}, False)
   1493         self._expect_failed_jobs([])
   1494         self._run_update({})
   1495 
   1496 
   1497     def test_none_finished(self):
   1498         """Test `_report_failed_jobs()` with only unfinished jobs.
   1499 
   1500         Initial conditions are an `_open_jobs` list consisting of only
   1501         unfinished jobs.
   1502         Expected result is no change to the `_open_jobs` list.
   1503 
   1504         """
   1505         for d in self.REGULAR_JOBLIST:
   1506             self._add_job(d)
   1507         new_jobs = self._offloader._open_jobs.copy()
   1508         self._expect_log_message(new_jobs, False)
   1509         self._expect_failed_jobs([])
   1510         self._run_update(new_jobs)
   1511 
   1512 
   1513 class GsOffloaderMockTests(_TempResultsDirTestCase):
   1514     """Tests using mock instead of mox."""
   1515 
   1516     def setUp(self):
   1517         super(GsOffloaderMockTests, self).setUp()
   1518         alarm = mock.patch('signal.alarm', return_value=0)
   1519         alarm.start()
   1520         self.addCleanup(alarm.stop)
   1521 
   1522         self._saved_loglevel = logging.getLogger().getEffectiveLevel()
   1523         logging.getLogger().setLevel(logging.CRITICAL + 1)
   1524 
   1525         self._job = self.make_job(self.REGULAR_JOBLIST[0])
   1526 
   1527 
   1528     def test_offload_timeout_early(self):
   1529         """Test that `offload_dir()` times out correctly.
   1530 
   1531         This test triggers timeout at the earliest possible moment,
   1532         at the first call to set the timeout alarm.
   1533 
   1534         """
   1535         signal.alarm.side_effect = [0, timeout_util.TimeoutError('fubar')]
   1536         with mock.patch.object(gs_offloader, '_upload_cts_testresult',
   1537                                autospec=True) as upload:
   1538             upload.return_value = None
   1539             gs_offloader.GSOffloader(
   1540                     utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
   1541                             self._job.queue_args[0],
   1542                             self._job.queue_args[1],
   1543                             self._job.queue_args[2])
   1544             self.assertTrue(os.path.isdir(self._job.queue_args[0]))
   1545 
   1546 
   1547     # TODO(ayatane): This tests passes when run locally, but it fails
   1548     # when run on trybot.  I have no idea why, but the assert isdir
   1549     # fails.
   1550     #
   1551     # This test is also kind of redundant since we are using the timeout
   1552     # from chromite which has its own tests.
   1553     @unittest.skip('This fails on trybot')
   1554     def test_offload_timeout_late(self):
   1555         """Test that `offload_dir()` times out correctly.
   1556 
   1557         This test triggers timeout at the latest possible moment, at
   1558         the call to clear the timeout alarm.
   1559 
   1560         """
   1561         signal.alarm.side_effect = [0, 0, timeout_util.TimeoutError('fubar')]
   1562         with mock.patch.object(gs_offloader, '_upload_cts_testresult',
   1563                                autospec=True) as upload, \
   1564              mock.patch.object(gs_offloader, '_get_cmd_list',
   1565                                autospec=True) as get_cmd_list:
   1566             upload.return_value = None
   1567             get_cmd_list.return_value = ['test', '-d', self._job.queue_args[0]]
   1568             gs_offloader.GSOffloader(
   1569                     utils.DEFAULT_OFFLOAD_GSURI, False, 0).offload(
   1570                             self._job.queue_args[0],
   1571                             self._job.queue_args[1],
   1572                             self._job.queue_args[2])
   1573             self.assertTrue(os.path.isdir(self._job.queue_args[0]))
   1574 
   1575 
   1576 
   1577 if __name__ == '__main__':
   1578     unittest.main()
   1579