Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Script to archive old Autotest results to Google Storage.
      8 
      9 Uses gsutil to archive files to the configured Google Storage bucket.
     10 Upon successful copy, the local results directory is deleted.
     11 """
     12 
     13 import abc
     14 try:
     15   import cachetools
     16 except ImportError:
     17   cachetools = None
     18 import datetime
     19 import errno
     20 import glob
     21 import gzip
     22 import logging
     23 import logging.handlers
     24 import os
     25 import re
     26 import shutil
     27 import stat
     28 import subprocess
     29 import sys
     30 import tarfile
     31 import tempfile
     32 import time
     33 
     34 from optparse import OptionParser
     35 
     36 import common
     37 from autotest_lib.client.common_lib import file_utils
     38 from autotest_lib.client.common_lib import global_config
     39 from autotest_lib.client.common_lib import utils
     40 from autotest_lib.site_utils import job_directories
     41 # For unittest, the cloud_console.proto is not compiled yet.
     42 try:
     43     from autotest_lib.site_utils import cloud_console_client
     44 except ImportError:
     45     cloud_console_client = None
     46 from autotest_lib.tko import models
     47 from autotest_lib.utils import labellib
     48 from autotest_lib.utils import gslib
     49 from chromite.lib import timeout_util
     50 
     51 # Autotest requires the psutil module from site-packages, so it must be imported
     52 # after "import common".
     53 try:
     54     # Does not exist, nor is needed, on moblab.
     55     import psutil
     56 except ImportError:
     57     psutil = None
     58 
     59 from chromite.lib import parallel
     60 try:
     61     from chromite.lib import metrics
     62     from chromite.lib import ts_mon_config
     63 except ImportError:
     64     metrics = utils.metrics_mock
     65     ts_mon_config = utils.metrics_mock
     66 
     67 
     68 GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
     69         'CROS', 'gs_offloading_enabled', type=bool, default=True)
     70 
     71 # Nice setting for process, the higher the number the lower the priority.
     72 NICENESS = 10
     73 
     74 # Maximum number of seconds to allow for offloading a single
     75 # directory.
     76 OFFLOAD_TIMEOUT_SECS = 60 * 60
     77 
     78 # Sleep time per loop.
     79 SLEEP_TIME_SECS = 5
     80 
     81 # Minimum number of seconds between e-mail reports.
     82 REPORT_INTERVAL_SECS = 60 * 60
     83 
     84 # Location of Autotest results on disk.
     85 RESULTS_DIR = '/usr/local/autotest/results'
     86 FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
     87 
     88 FAILED_OFFLOADS_FILE_HEADER = '''
     89 This is the list of gs_offloader failed jobs.
     90 Last offloader attempt at %s failed to offload %d files.
     91 Check http://go/cros-triage-gsoffloader to triage the issue
     92 
     93 
     94 First failure       Count   Directory name
     95 =================== ======  ==============================
     96 '''
     97 # --+----1----+----  ----+  ----+----1----+----2----+----3
     98 
     99 FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
    100 FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
    101 
    102 USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
    103         'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
    104 
    105 LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
    106         'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
    107 
    108 # Use multiprocessing for gsutil uploading.
    109 GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
    110         'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
    111 
    112 D = '[0-9][0-9]'
    113 TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
    114 CTS_RESULT_PATTERN = 'testResult.xml'
    115 CTS_V2_RESULT_PATTERN = 'test_result.xml'
    116 # Google Storage bucket URI to store results in.
    117 DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
    118         'CROS', 'cts_results_server', default='')
    119 DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
    120         'CROS', 'cts_apfe_server', default='')
    121 DEFAULT_CTS_DELTA_RESULTS_GSURI = global_config.global_config.get_config_value(
    122         'CROS', 'ctsdelta_results_server', default='')
    123 DEFAULT_CTS_DELTA_APFE_GSURI = global_config.global_config.get_config_value(
    124         'CROS', 'ctsdelta_apfe_server', default='')
    125 DEFAULT_CTS_BVT_APFE_GSURI = global_config.global_config.get_config_value(
    126         'CROS', 'ctsbvt_apfe_server', default='')
    127 
    128 # metadata type
    129 GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
    130 GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
    131 
    132 # Autotest test to collect list of CTS tests
    133 TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
    134 
    135 def _get_metrics_fields(dir_entry):
    136     """Get metrics fields for the given test result directory, including board
    137     and milestone.
    138 
    139     @param dir_entry: Directory entry to offload.
    140     @return A dictionary for the metrics data to be uploaded.
    141     """
    142     fields = {'board': 'unknown',
    143               'milestone': 'unknown'}
    144     if dir_entry:
    145         # There could be multiple hosts in the job directory, use the first one
    146         # available.
    147         for host in glob.glob(os.path.join(dir_entry, '*')):
    148             try:
    149                 keyval = models.test.parse_job_keyval(host)
    150             except ValueError:
    151                 continue
    152             build = keyval.get('build')
    153             if build:
    154                 try:
    155                     cros_version = labellib.parse_cros_version(build)
    156                     fields['board'] = cros_version.board
    157                     fields['milestone'] = cros_version.milestone
    158                     break
    159                 except ValueError:
    160                     # Ignore version parsing error so it won't crash
    161                     # gs_offloader.
    162                     pass
    163 
    164     return fields;
    165 
    166 
    167 def _get_cmd_list(multiprocessing, dir_entry, gs_path):
    168     """Return the command to offload a specified directory.
    169 
    170     @param multiprocessing: True to turn on -m option for gsutil.
    171     @param dir_entry: Directory entry/path that which we need a cmd_list
    172                       to offload.
    173     @param gs_path: Location in google storage where we will
    174                     offload the directory.
    175 
    176     @return A command list to be executed by Popen.
    177     """
    178     cmd = ['gsutil']
    179     if multiprocessing:
    180         cmd.append('-m')
    181     if USE_RSYNC_ENABLED:
    182         cmd.append('rsync')
    183         target = os.path.join(gs_path, os.path.basename(dir_entry))
    184     else:
    185         cmd.append('cp')
    186         target = gs_path
    187     cmd += ['-eR', dir_entry, target]
    188     return cmd
    189 
    190 
    191 def sanitize_dir(dirpath):
    192     """Sanitize directory for gs upload.
    193 
    194     Symlinks and FIFOS are converted to regular files to fix bugs.
    195 
    196     @param dirpath: Directory entry to be sanitized.
    197     """
    198     if not os.path.exists(dirpath):
    199         return
    200     _escape_rename(dirpath)
    201     _escape_rename_dir_contents(dirpath)
    202     _sanitize_fifos(dirpath)
    203     _sanitize_symlinks(dirpath)
    204 
    205 
    206 def _escape_rename_dir_contents(dirpath):
    207     """Recursively rename directory to escape filenames for gs upload.
    208 
    209     @param dirpath: Directory path string.
    210     """
    211     for filename in os.listdir(dirpath):
    212         path = os.path.join(dirpath, filename)
    213         _escape_rename(path)
    214     for filename in os.listdir(dirpath):
    215         path = os.path.join(dirpath, filename)
    216         if os.path.isdir(path):
    217             _escape_rename_dir_contents(path)
    218 
    219 
    220 def _escape_rename(path):
    221     """Rename file to escape filenames for gs upload.
    222 
    223     @param path: File path string.
    224     """
    225     dirpath, filename = os.path.split(path)
    226     sanitized_filename = gslib.escape(filename)
    227     sanitized_path = os.path.join(dirpath, sanitized_filename)
    228     os.rename(path, sanitized_path)
    229 
    230 
    231 def _sanitize_fifos(dirpath):
    232     """Convert fifos to regular files (fixes crbug.com/684122).
    233 
    234     @param dirpath: Directory path string.
    235     """
    236     for root, _, files in os.walk(dirpath):
    237         for filename in files:
    238             path = os.path.join(root, filename)
    239             file_stat = os.lstat(path)
    240             if stat.S_ISFIFO(file_stat.st_mode):
    241                 _replace_fifo_with_file(path)
    242 
    243 
    244 def _replace_fifo_with_file(path):
    245     """Replace a fifo with a normal file.
    246 
    247     @param path: Fifo path string.
    248     """
    249     logging.debug('Removing fifo %s', path)
    250     os.remove(path)
    251     logging.debug('Creating fifo marker %s', path)
    252     with open(path, 'w') as f:
    253         f.write('<FIFO>')
    254 
    255 
    256 def _sanitize_symlinks(dirpath):
    257     """Convert Symlinks to regular files (fixes crbug.com/692788).
    258 
    259     @param dirpath: Directory path string.
    260     """
    261     for root, _, files in os.walk(dirpath):
    262         for filename in files:
    263             path = os.path.join(root, filename)
    264             file_stat = os.lstat(path)
    265             if stat.S_ISLNK(file_stat.st_mode):
    266                 _replace_symlink_with_file(path)
    267 
    268 
    269 def _replace_symlink_with_file(path):
    270     """Replace a symlink with a normal file.
    271 
    272     @param path: Symlink path string.
    273     """
    274     target = os.readlink(path)
    275     logging.debug('Removing symlink %s', path)
    276     os.remove(path)
    277     logging.debug('Creating symlink marker %s', path)
    278     with open(path, 'w') as f:
    279         f.write('<symlink to %s>' % target)
    280 
    281 
    282 # Maximum number of files in the folder.
    283 _MAX_FILE_COUNT = 3000
    284 _FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
    285 
    286 
    287 def _get_zippable_folders(dir_entry):
    288     folders_list = []
    289     for folder in os.listdir(dir_entry):
    290         folder_path = os.path.join(dir_entry, folder)
    291         if (not os.path.isfile(folder_path) and
    292                 not folder in _FOLDERS_NEVER_ZIP):
    293             folders_list.append(folder_path)
    294     return folders_list
    295 
    296 
    297 def limit_file_count(dir_entry):
    298     """Limit the number of files in given directory.
    299 
    300     The method checks the total number of files in the given directory.
    301     If the number is greater than _MAX_FILE_COUNT, the method will
    302     compress each folder in the given directory, except folders in
    303     _FOLDERS_NEVER_ZIP.
    304 
    305     @param dir_entry: Directory entry to be checked.
    306     """
    307     try:
    308         count = _count_files(dir_entry)
    309     except ValueError:
    310         logging.warning('Fail to get the file count in folder %s.', dir_entry)
    311         return
    312     if count < _MAX_FILE_COUNT:
    313         return
    314 
    315     # For test job, zip folders in a second level, e.g. 123-debug/host1.
    316     # This is to allow autoserv debug folder still be accessible.
    317     # For special task, it does not need to dig one level deeper.
    318     is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
    319                                dir_entry)
    320 
    321     folders = _get_zippable_folders(dir_entry)
    322     if not is_special_task:
    323         subfolders = []
    324         for folder in folders:
    325             subfolders.extend(_get_zippable_folders(folder))
    326         folders = subfolders
    327 
    328     for folder in folders:
    329         _make_into_tarball(folder)
    330 
    331 
    332 def _count_files(dirpath):
    333     """Count the number of files in a directory recursively.
    334 
    335     @param dirpath: Directory path string.
    336     """
    337     return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
    338 
    339 
    340 def _make_into_tarball(dirpath):
    341     """Make directory into tarball.
    342 
    343     @param dirpath: Directory path string.
    344     """
    345     tarpath = '%s.tgz' % dirpath
    346     with tarfile.open(tarpath, 'w:gz') as tar:
    347         tar.add(dirpath, arcname=os.path.basename(dirpath))
    348     shutil.rmtree(dirpath)
    349 
    350 
    351 def correct_results_folder_permission(dir_entry):
    352     """Make sure the results folder has the right permission settings.
    353 
    354     For tests running with server-side packaging, the results folder has
    355     the owner of root. This must be changed to the user running the
    356     autoserv process, so parsing job can access the results folder.
    357 
    358     @param dir_entry: Path to the results folder.
    359     """
    360     if not dir_entry:
    361         return
    362 
    363     logging.info('Trying to correct file permission of %s.', dir_entry)
    364     try:
    365         owner = '%s:%s' % (os.getuid(), os.getgid())
    366         subprocess.check_call(
    367                 ['sudo', '-n', 'chown', '-R', owner, dir_entry])
    368         subprocess.check_call(['chmod', '-R', 'u+r', dir_entry])
    369         subprocess.check_call(
    370                 ['find', dir_entry, '-type', 'd',
    371                  '-exec', 'chmod', 'u+x', '{}', ';'])
    372     except subprocess.CalledProcessError as e:
    373         logging.error('Failed to modify permission for %s: %s',
    374                       dir_entry, e)
    375 
    376 
    377 def _upload_cts_testresult(dir_entry, multiprocessing):
    378     """Upload test results to separate gs buckets.
    379 
    380     Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
    381     Upload timestamp.zip to cts_apfe_bucket.
    382 
    383     @param dir_entry: Path to the results folder.
    384     @param multiprocessing: True to turn on -m option for gsutil.
    385     """
    386     for host in glob.glob(os.path.join(dir_entry, '*')):
    387         cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
    388                                 TIMESTAMP_PATTERN)
    389         cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
    390                                    TIMESTAMP_PATTERN)
    391         gts_v2_path = os.path.join(host, 'cheets_GTS*', 'results', '*',
    392                                    TIMESTAMP_PATTERN)
    393         for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
    394                             (cts_v2_path, CTS_V2_RESULT_PATTERN),
    395                             (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
    396             for path in glob.glob(result_path):
    397                 try:
    398                     # CTS results from bvt-arc suites need to be only uploaded
    399                     # to APFE from its designated gs bucket for early EDI
    400                     # entries in APFE. These results need to copied only into
    401                     # APFE bucket. Copying to results bucket is not required.
    402                     if 'bvt-arc' in path:
    403                         _upload_files(host, path, result_pattern,
    404                                       multiprocessing,
    405                                       None,
    406                                       DEFAULT_CTS_BVT_APFE_GSURI)
    407                         return
    408                     # Non-bvt CTS results need to be uploaded to standard gs
    409                     # buckets.
    410                     _upload_files(host, path, result_pattern,
    411                                   multiprocessing,
    412                                   DEFAULT_CTS_RESULTS_GSURI,
    413                                   DEFAULT_CTS_APFE_GSURI)
    414                     # TODO(rohitbm): make better comparison using regex.
    415                     # plan_follower CTS results go to plan_follower specific
    416                     # gs buckets apart from standard gs buckets.
    417                     if 'plan_follower' in path:
    418                         _upload_files(host, path, result_pattern,
    419                                       multiprocessing,
    420                                       DEFAULT_CTS_DELTA_RESULTS_GSURI,
    421                                       DEFAULT_CTS_DELTA_APFE_GSURI)
    422                 except Exception as e:
    423                     logging.error('ERROR uploading test results %s to GS: %s',
    424                                   path, e)
    425 
    426 
    427 def _is_valid_result(build, result_pattern, suite):
    428     """Check if the result should be uploaded to CTS/GTS buckets.
    429 
    430     @param build: Builder name.
    431     @param result_pattern: XML result file pattern.
    432     @param suite: Test suite name.
    433 
    434     @returns: Bool flag indicating whether a valid result.
    435     """
    436     if build is None or suite is None:
    437         return False
    438 
    439     # Not valid if it's not a release build.
    440     if not re.match(r'(?!trybot-).*-release/.*', build):
    441         return False
    442 
    443     # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
    444     # suite.
    445     result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
    446     if result_pattern in result_patterns and not (
    447             suite.startswith('arc-cts') or
    448             suite.startswith('arc-gts') or
    449             suite.startswith('bvt-arc') or
    450             suite.startswith('test_that_wrapper')):
    451         return False
    452 
    453     return True
    454 
    455 
    456 def _is_test_collector(package):
    457     """Returns true if the test run is just to collect list of CTS tests.
    458 
    459     @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
    460 
    461     @return Bool flag indicating a test package is CTS list generator or not.
    462     """
    463     return TEST_LIST_COLLECTOR in package
    464 
    465 
    466 def _upload_files(host, path, result_pattern, multiprocessing,
    467                   result_gs_bucket, apfe_gs_bucket):
    468     keyval = models.test.parse_job_keyval(host)
    469     build = keyval.get('build')
    470     suite = keyval.get('suite')
    471 
    472     if not _is_valid_result(build, result_pattern, suite):
    473         # No need to upload current folder, return.
    474         return
    475 
    476     parent_job_id = str(keyval['parent_job_id'])
    477 
    478     folders = path.split(os.sep)
    479     job_id = folders[-6]
    480     package = folders[-4]
    481     timestamp = folders[-1]
    482 
    483     # Results produced by CTS test list collector are dummy results.
    484     # They don't need to be copied to APFE bucket which is mainly being used for
    485     # CTS APFE submission.
    486     if not _is_test_collector(package):
    487         # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
    488         # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
    489         cts_apfe_gs_path = os.path.join(
    490                 apfe_gs_bucket, build, parent_job_id,
    491                 package, job_id + '_' + timestamp) + '/'
    492 
    493         for zip_file in glob.glob(os.path.join('%s.zip' % path)):
    494             utils.run(' '.join(_get_cmd_list(
    495                     multiprocessing, zip_file, cts_apfe_gs_path)))
    496             logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
    497     else:
    498         logging.debug('%s is a CTS Test collector Autotest test run.', package)
    499         logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
    500 
    501     if result_gs_bucket:
    502         # Path: bucket/cheets_CTS.*/job_id_timestamp/
    503         # or bucket/cheets_GTS.*/job_id_timestamp/
    504         test_result_gs_path = os.path.join(
    505                 result_gs_bucket, package, job_id + '_' + timestamp) + '/'
    506 
    507         for test_result_file in glob.glob(os.path.join(path, result_pattern)):
    508             # gzip test_result_file(testResult.xml/test_result.xml)
    509 
    510             test_result_file_gz =  '%s.gz' % test_result_file
    511             with open(test_result_file, 'r') as f_in, (
    512                     gzip.open(test_result_file_gz, 'w')) as f_out:
    513                 shutil.copyfileobj(f_in, f_out)
    514             utils.run(' '.join(_get_cmd_list(
    515                     multiprocessing, test_result_file_gz, test_result_gs_path)))
    516             logging.debug('Zip and upload %s to %s',
    517                           test_result_file_gz, test_result_gs_path)
    518             # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
    519             os.remove(test_result_file_gz)
    520 
    521 
    522 def _emit_gs_returncode_metric(returncode):
    523     """Increment the gs_returncode counter based on |returncode|."""
    524     m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
    525     rcode = int(returncode)
    526     if rcode < 0 or rcode > 255:
    527         rcode = -1
    528     metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
    529 
    530 
    531 def _handle_dir_os_error(dir_entry, fix_permission=False):
    532     """Try to fix the result directory's permission issue if needed.
    533 
    534     @param dir_entry: Directory entry to offload.
    535     @param fix_permission: True to change the directory's owner to the same one
    536             running gs_offloader.
    537     """
    538     if fix_permission:
    539         correct_results_folder_permission(dir_entry)
    540     m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
    541                           'wrong_permissions_count')
    542     metrics_fields = _get_metrics_fields(dir_entry)
    543     metrics.Counter(m_permission_error).increment(fields=metrics_fields)
    544 
    545 
    546 class BaseGSOffloader(object):
    547 
    548     """Google Storage offloader interface."""
    549 
    550     __metaclass__ = abc.ABCMeta
    551 
    552     def offload(self, dir_entry, dest_path, job_complete_time):
    553         """Safely offload a directory entry to Google Storage.
    554 
    555         This method is responsible for copying the contents of
    556         `dir_entry` to Google storage at `dest_path`.
    557 
    558         When successful, the method must delete all of `dir_entry`.
    559         On failure, `dir_entry` should be left undisturbed, in order
    560         to allow for retry.
    561 
    562         Errors are conveyed simply and solely by two methods:
    563           * At the time of failure, write enough information to the log
    564             to allow later debug, if necessary.
    565           * Don't delete the content.
    566 
    567         In order to guarantee robustness, this method must not raise any
    568         exceptions.
    569 
    570         @param dir_entry: Directory entry to offload.
    571         @param dest_path: Location in google storage where we will
    572                           offload the directory.
    573         @param job_complete_time: The complete time of the job from the AFE
    574                                   database.
    575         """
    576         try:
    577             self._full_offload(dir_entry, dest_path, job_complete_time)
    578         except Exception as e:
    579             logging.debug('Exception in offload for %s', dir_entry)
    580             logging.debug('Ignoring this error: %s', str(e))
    581 
    582     @abc.abstractmethod
    583     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    584         """Offload a directory entry to Google Storage.
    585 
    586         This method implements the actual offload behavior of its
    587         subclass.  To guarantee effective debug, this method should
    588         catch all exceptions, and perform any reasonable diagnosis
    589         or other handling.
    590 
    591         @param dir_entry: Directory entry to offload.
    592         @param dest_path: Location in google storage where we will
    593                           offload the directory.
    594         @param job_complete_time: The complete time of the job from the AFE
    595                                   database.
    596         """
    597 
    598 
    599 class GSOffloader(BaseGSOffloader):
    600     """Google Storage Offloader."""
    601 
    602     def __init__(self, gs_uri, multiprocessing, delete_age,
    603             console_client=None):
    604         """Returns the offload directory function for the given gs_uri
    605 
    606         @param gs_uri: Google storage bucket uri to offload to.
    607         @param multiprocessing: True to turn on -m option for gsutil.
    608         @param console_client: The cloud console client. If None,
    609           cloud console APIs are  not called.
    610         """
    611         self._gs_uri = gs_uri
    612         self._multiprocessing = multiprocessing
    613         self._delete_age = delete_age
    614         self._console_client = console_client
    615 
    616     @metrics.SecondsTimerDecorator(
    617             'chromeos/autotest/gs_offloader/job_offload_duration')
    618     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    619         """Offload the specified directory entry to Google storage.
    620 
    621         @param dir_entry: Directory entry to offload.
    622         @param dest_path: Location in google storage where we will
    623                           offload the directory.
    624         @param job_complete_time: The complete time of the job from the AFE
    625                                   database.
    626         """
    627         with tempfile.TemporaryFile('w+') as stdout_file, \
    628              tempfile.TemporaryFile('w+') as stderr_file:
    629             try:
    630                 try:
    631                     self._try_offload(dir_entry, dest_path, stdout_file,
    632                                       stderr_file)
    633                 except OSError as e:
    634                     # Correct file permission error of the directory, then raise
    635                     # the exception so gs_offloader can retry later.
    636                     _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
    637                     # Try again after the permission issue is fixed.
    638                     self._try_offload(dir_entry, dest_path, stdout_file,
    639                                       stderr_file)
    640             except _OffloadError as e:
    641                 metrics_fields = _get_metrics_fields(dir_entry)
    642                 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
    643                 metrics.Counter(m_any_error).increment(fields=metrics_fields)
    644 
    645                 # Rewind the log files for stdout and stderr and log
    646                 # their contents.
    647                 stdout_file.seek(0)
    648                 stderr_file.seek(0)
    649                 stderr_content = stderr_file.read()
    650                 logging.warning('Error occurred when offloading %s:', dir_entry)
    651                 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
    652                                 stderr_content)
    653 
    654                 # Some result files may have wrong file permission. Try
    655                 # to correct such error so later try can success.
    656                 # TODO(dshi): The code is added to correct result files
    657                 # with wrong file permission caused by bug 511778. After
    658                 # this code is pushed to lab and run for a while to
    659                 # clean up these files, following code and function
    660                 # correct_results_folder_permission can be deleted.
    661                 if 'CommandException: Error opening file' in stderr_content:
    662                     correct_results_folder_permission(dir_entry)
    663             else:
    664                 self._prune(dir_entry, job_complete_time)
    665 
    666     def _try_offload(self, dir_entry, dest_path,
    667                  stdout_file, stderr_file):
    668         """Offload the specified directory entry to Google storage.
    669 
    670         @param dir_entry: Directory entry to offload.
    671         @param dest_path: Location in google storage where we will
    672                           offload the directory.
    673         @param job_complete_time: The complete time of the job from the AFE
    674                                   database.
    675         @param stdout_file: Log file.
    676         @param stderr_file: Log file.
    677         """
    678         if _is_uploaded(dir_entry):
    679             return
    680         start_time = time.time()
    681         metrics_fields = _get_metrics_fields(dir_entry)
    682         error_obj = _OffloadError(start_time)
    683         try:
    684             sanitize_dir(dir_entry)
    685             if DEFAULT_CTS_RESULTS_GSURI:
    686                 _upload_cts_testresult(dir_entry, self._multiprocessing)
    687 
    688             if LIMIT_FILE_COUNT:
    689                 limit_file_count(dir_entry)
    690 
    691             process = None
    692             with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
    693                 gs_path = '%s%s' % (self._gs_uri, dest_path)
    694                 cmd = _get_cmd_list(self._multiprocessing, dir_entry, gs_path)
    695                 logging.debug('Attempting an offload command %s', cmd)
    696                 process = subprocess.Popen(
    697                     cmd, stdout=stdout_file, stderr=stderr_file)
    698                 process.wait()
    699                 logging.debug('Offload command %s completed.', cmd)
    700 
    701             _emit_gs_returncode_metric(process.returncode)
    702             if process.returncode != 0:
    703                 raise error_obj
    704             _emit_offload_metrics(dir_entry)
    705 
    706             if self._console_client:
    707                 gcs_uri = os.path.join(gs_path,
    708                         os.path.basename(dir_entry))
    709                 if not self._console_client.send_test_job_offloaded_message(
    710                         gcs_uri):
    711                     raise error_obj
    712 
    713             _mark_uploaded(dir_entry)
    714         except timeout_util.TimeoutError:
    715             m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
    716             metrics.Counter(m_timeout).increment(fields=metrics_fields)
    717             # If we finished the call to Popen(), we may need to
    718             # terminate the child process.  We don't bother calling
    719             # process.poll(); that inherently races because the child
    720             # can die any time it wants.
    721             if process:
    722                 try:
    723                     process.terminate()
    724                 except OSError:
    725                     # We don't expect any error other than "No such
    726                     # process".
    727                     pass
    728             logging.error('Offloading %s timed out after waiting %d '
    729                           'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
    730             raise error_obj
    731 
    732     def _prune(self, dir_entry, job_complete_time):
    733         """Prune directory if it is uploaded and expired.
    734 
    735         @param dir_entry: Directory entry to offload.
    736         @param job_complete_time: The complete time of the job from the AFE
    737                                   database.
    738         """
    739         if not (_is_uploaded(dir_entry)
    740                 and job_directories.is_job_expired(self._delete_age,
    741                                                    job_complete_time)):
    742             return
    743         try:
    744             logging.debug('Pruning uploaded directory %s', dir_entry)
    745             shutil.rmtree(dir_entry)
    746             job_timestamp_cache.delete(dir_entry)
    747         except OSError as e:
    748             # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
    749             # to raise OSError with message 'Permission denied'. Details can be
    750             # found in crbug.com/536151
    751             _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
    752             # Try again after the permission issue is fixed.
    753             shutil.rmtree(dir_entry)
    754 
    755 
    756 class _OffloadError(Exception):
    757     """Google Storage offload failed."""
    758 
    759     def __init__(self, start_time):
    760         super(_OffloadError, self).__init__(start_time)
    761         self.start_time = start_time
    762 
    763 
    764 
    765 class FakeGSOffloader(BaseGSOffloader):
    766 
    767     """Fake Google Storage Offloader that only deletes directories."""
    768 
    769     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    770         """Pretend to offload a directory and delete it.
    771 
    772         @param dir_entry: Directory entry to offload.
    773         @param dest_path: Location in google storage where we will
    774                           offload the directory.
    775         @param job_complete_time: The complete time of the job from the AFE
    776                                   database.
    777         """
    778         shutil.rmtree(dir_entry)
    779 
    780 
    781 class OptionalMemoryCache(object):
    782    """Implements memory cache if cachetools module can be loaded.
    783 
    784    If the platform has cachetools available then the cache will
    785    be created, otherwise the get calls will always act as if there
    786    was a cache miss and the set/delete will be no-ops.
    787    """
    788    cache = None
    789 
    790    def setup(self, age_to_delete):
    791        """Set up a TTL cache size based on how long the job will be handled.
    792 
    793        Autotest jobs are handled by gs_offloader until they are deleted from
    794        local storage, base the cache size on how long that is.
    795 
    796        @param age_to_delete: Number of days after which items in the cache
    797                              should expire.
    798        """
    799        if cachetools:
    800            # Min cache is 1000 items for 10 mins. If the age to delete is 0
    801            # days you still want a short / small cache.
    802            # 2000 items is a good approximation for the max number of jobs a
    803            # moblab # can produce in a day, lab offloads immediatly so
    804            # the number of carried jobs should be very small in the normal
    805            # case.
    806            ttl = max(age_to_delete * 24 * 60 * 60, 600)
    807            maxsize = max(age_to_delete * 2000, 1000)
    808            job_timestamp_cache.cache = cachetools.TTLCache(maxsize=maxsize,
    809                                                            ttl=ttl)
    810 
    811    def get(self, key):
    812        """If we have a cache try to retrieve from it."""
    813        if self.cache is not None:
    814            result = self.cache.get(key)
    815            return result
    816        return None
    817 
    818    def add(self, key, value):
    819        """If we have a cache try to store key/value."""
    820        if self.cache is not None:
    821            self.cache[key] = value
    822 
    823    def delete(self, key):
    824        """If we have a cache try to remove a key."""
    825        if self.cache is not None:
    826            return self.cache.delete(key)
    827 
    828 
    829 job_timestamp_cache = OptionalMemoryCache()
    830 
    831 
    832 def _cached_get_timestamp_if_finished(job):
    833     """Retrieve a job finished timestamp from cache or AFE.
    834     @param job       _JobDirectory instance to retrieve
    835                      finished timestamp of..
    836 
    837     @returns: None if the job is not finished, or the
    838               last job finished time recorded by Autotest.
    839     """
    840     job_timestamp = job_timestamp_cache.get(job.dirname)
    841     if not job_timestamp:
    842         job_timestamp = job.get_timestamp_if_finished()
    843         if job_timestamp:
    844             job_timestamp_cache.add(job.dirname, job_timestamp)
    845     return job_timestamp
    846 
    847 
    848 def _is_expired(job, age_limit):
    849     """Return whether job directory is expired for uploading
    850 
    851     @param job: _JobDirectory instance.
    852     @param age_limit:  Minimum age in days at which a job may be offloaded.
    853     """
    854     job_timestamp = _cached_get_timestamp_if_finished(job)
    855     if not job_timestamp:
    856         return False
    857     return job_directories.is_job_expired(age_limit, job_timestamp)
    858 
    859 
    860 def _emit_offload_metrics(dirpath):
    861     """Emit gs offload metrics.
    862 
    863     @param dirpath: Offloaded directory path.
    864     """
    865     dir_size = file_utils.get_directory_size_kibibytes(dirpath)
    866     metrics_fields = _get_metrics_fields(dirpath)
    867 
    868     m_offload_count = (
    869             'chromeos/autotest/gs_offloader/jobs_offloaded')
    870     metrics.Counter(m_offload_count).increment(
    871             fields=metrics_fields)
    872     m_offload_size = ('chromeos/autotest/gs_offloader/'
    873                       'kilobytes_transferred')
    874     metrics.Counter(m_offload_size).increment_by(
    875             dir_size, fields=metrics_fields)
    876 
    877 
    878 def _is_uploaded(dirpath):
    879     """Return whether directory has been uploaded.
    880 
    881     @param dirpath: Directory path string.
    882     """
    883     return os.path.isfile(_get_uploaded_marker_file(dirpath))
    884 
    885 
    886 def _mark_uploaded(dirpath):
    887     """Mark directory as uploaded.
    888 
    889     @param dirpath: Directory path string.
    890     """
    891     logging.debug('Creating uploaded marker for directory %s', dirpath)
    892     with open(_get_uploaded_marker_file(dirpath), 'a'):
    893         pass
    894 
    895 
    896 def _get_uploaded_marker_file(dirpath):
    897     """Return path to upload marker file for directory.
    898 
    899     @param dirpath: Directory path string.
    900     """
    901     return '%s/.GS_UPLOADED' % (dirpath,)
    902 
    903 
    904 def _format_job_for_failure_reporting(job):
    905     """Formats a _JobDirectory for reporting / logging.
    906 
    907     @param job: The _JobDirectory to format.
    908     """
    909     d = datetime.datetime.fromtimestamp(job.first_offload_start)
    910     data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
    911             job.offload_count,
    912             job.dirname)
    913     return FAILED_OFFLOADS_LINE_FORMAT % data
    914 
    915 
    916 def wait_for_gs_write_access(gs_uri):
    917     """Verify and wait until we have write access to Google Storage.
    918 
    919     @param gs_uri: The Google Storage URI we are trying to offload to.
    920     """
    921     # TODO (sbasi) Try to use the gsutil command to check write access.
    922     # Ensure we have write access to gs_uri.
    923     dummy_file = tempfile.NamedTemporaryFile()
    924     test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
    925     while True:
    926         logging.debug('Checking for write access with dummy file %s',
    927                       dummy_file.name)
    928         try:
    929             subprocess.check_call(test_cmd)
    930             subprocess.check_call(
    931                     ['gsutil', 'rm',
    932                      os.path.join(gs_uri,
    933                                   os.path.basename(dummy_file.name))])
    934             break
    935         except subprocess.CalledProcessError:
    936             t = 120
    937             logging.debug('Unable to offload dummy file to %s, sleeping for %s '
    938                           'seconds.', gs_uri, t)
    939             time.sleep(t)
    940     logging.debug('Dummy file write check to gs succeeded.')
    941 
    942 
    943 class Offloader(object):
    944     """State of the offload process.
    945 
    946     Contains the following member fields:
    947       * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
    948       * _jobdir_classes:  List of classes of job directory to be
    949         offloaded.
    950       * _processes:  Maximum number of outstanding offload processes
    951         to allow during an offload cycle.
    952       * _age_limit:  Minimum age in days at which a job may be
    953         offloaded.
    954       * _open_jobs: a dictionary mapping directory paths to Job
    955         objects.
    956     """
    957 
    958     def __init__(self, options):
    959         self._upload_age_limit = options.age_to_upload
    960         self._delete_age_limit = options.age_to_delete
    961         if options.delete_only:
    962             self._gs_offloader = FakeGSOffloader()
    963         else:
    964             self.gs_uri = utils.get_offload_gsuri()
    965             logging.debug('Offloading to: %s', self.gs_uri)
    966             multiprocessing = False
    967             if options.multiprocessing:
    968                 multiprocessing = True
    969             elif options.multiprocessing is None:
    970                 multiprocessing = GS_OFFLOADER_MULTIPROCESSING
    971             logging.info(
    972                     'Offloader multiprocessing is set to:%r', multiprocessing)
    973             console_client = None
    974             if (cloud_console_client and
    975                     cloud_console_client.is_cloud_notification_enabled()):
    976                 console_client = cloud_console_client.PubSubBasedClient()
    977             self._gs_offloader = GSOffloader(
    978                     self.gs_uri, multiprocessing, self._delete_age_limit,
    979                     console_client)
    980         classlist = [
    981                 job_directories.SwarmingJobDirectory,
    982         ]
    983         if options.process_hosts_only or options.process_all:
    984             classlist.append(job_directories.SpecialJobDirectory)
    985         if not options.process_hosts_only:
    986             classlist.append(job_directories.RegularJobDirectory)
    987         self._jobdir_classes = classlist
    988         assert self._jobdir_classes
    989         self._processes = options.parallelism
    990         self._open_jobs = {}
    991         self._pusub_topic = None
    992         self._offload_count_limit = 3
    993 
    994 
    995     def _add_new_jobs(self):
    996         """Find new job directories that need offloading.
    997 
    998         Go through the file system looking for valid job directories
    999         that are currently not in `self._open_jobs`, and add them in.
   1000 
   1001         """
   1002         new_job_count = 0
   1003         for cls in self._jobdir_classes:
   1004             for resultsdir in cls.get_job_directories():
   1005                 if resultsdir in self._open_jobs:
   1006                     continue
   1007                 self._open_jobs[resultsdir] = cls(resultsdir)
   1008                 new_job_count += 1
   1009         logging.debug('Start of offload cycle - found %d new jobs',
   1010                       new_job_count)
   1011 
   1012 
   1013     def _remove_offloaded_jobs(self):
   1014         """Removed offloaded jobs from `self._open_jobs`."""
   1015         removed_job_count = 0
   1016         for jobkey, job in self._open_jobs.items():
   1017             if (
   1018                     not os.path.exists(job.dirname)
   1019                     or _is_uploaded(job.dirname)):
   1020                 del self._open_jobs[jobkey]
   1021                 removed_job_count += 1
   1022         logging.debug('End of offload cycle - cleared %d jobs, '
   1023                       'carrying %d open jobs',
   1024                       removed_job_count, len(self._open_jobs))
   1025 
   1026 
   1027     def _report_failed_jobs(self):
   1028         """Report status after attempting offload.
   1029 
   1030         This function processes all jobs in `self._open_jobs`, assuming
   1031         an attempt has just been made to offload all of them.
   1032 
   1033         If any jobs have reportable errors, and we haven't generated
   1034         an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
   1035         send new e-mail describing the failures.
   1036 
   1037         """
   1038         failed_jobs = [j for j in self._open_jobs.values() if
   1039                        j.first_offload_start]
   1040         self._report_failed_jobs_count(failed_jobs)
   1041         self._log_failed_jobs_locally(failed_jobs)
   1042 
   1043 
   1044     def offload_once(self):
   1045         """Perform one offload cycle.
   1046 
   1047         Find all job directories for new jobs that we haven't seen
   1048         before.  Then, attempt to offload the directories for any
   1049         jobs that have finished running.  Offload of multiple jobs
   1050         is done in parallel, up to `self._processes` at a time.
   1051 
   1052         After we've tried uploading all directories, go through the list
   1053         checking the status of all uploaded directories.  If necessary,
   1054         report failures via e-mail.
   1055 
   1056         """
   1057         self._add_new_jobs()
   1058         self._report_current_jobs_count()
   1059         with parallel.BackgroundTaskRunner(
   1060                 self._gs_offloader.offload, processes=self._processes) as queue:
   1061             for job in self._open_jobs.values():
   1062                 _enqueue_offload(job, queue, self._upload_age_limit)
   1063         self._give_up_on_jobs_over_limit()
   1064         self._remove_offloaded_jobs()
   1065         self._report_failed_jobs()
   1066 
   1067 
   1068     def _give_up_on_jobs_over_limit(self):
   1069         """Give up on jobs that have gone over the offload limit.
   1070 
   1071         We mark them as uploaded as we won't try to offload them any more.
   1072         """
   1073         for job in self._open_jobs.values():
   1074             if job.offload_count >= self._offload_count_limit:
   1075                 _mark_uploaded(job.dirname)
   1076 
   1077 
   1078     def _log_failed_jobs_locally(self, failed_jobs,
   1079                                  log_file=FAILED_OFFLOADS_FILE):
   1080         """Updates a local file listing all the failed jobs.
   1081 
   1082         The dropped file can be used by the developers to list jobs that we have
   1083         failed to upload.
   1084 
   1085         @param failed_jobs: A list of failed _JobDirectory objects.
   1086         @param log_file: The file to log the failed jobs to.
   1087         """
   1088         now = datetime.datetime.now()
   1089         now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
   1090         formatted_jobs = [_format_job_for_failure_reporting(job)
   1091                             for job in failed_jobs]
   1092         formatted_jobs.sort()
   1093 
   1094         with open(log_file, 'w') as logfile:
   1095             logfile.write(FAILED_OFFLOADS_FILE_HEADER %
   1096                           (now_str, len(failed_jobs)))
   1097             logfile.writelines(formatted_jobs)
   1098 
   1099 
   1100     def _report_current_jobs_count(self):
   1101         """Report the number of outstanding jobs to monarch."""
   1102         metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
   1103                 len(self._open_jobs))
   1104 
   1105 
   1106     def _report_failed_jobs_count(self, failed_jobs):
   1107         """Report the number of outstanding failed offload jobs to monarch.
   1108 
   1109         @param: List of failed jobs.
   1110         """
   1111         metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
   1112                 len(failed_jobs))
   1113 
   1114 
   1115 def _enqueue_offload(job, queue, age_limit):
   1116     """Enqueue the job for offload, if it's eligible.
   1117 
   1118     The job is eligible for offloading if the database has marked
   1119     it finished, and the job is older than the `age_limit`
   1120     parameter.
   1121 
   1122     If the job is eligible, offload processing is requested by
   1123     passing the `queue` parameter's `put()` method a sequence with
   1124     the job's `dirname` attribute and its directory name.
   1125 
   1126     @param job       _JobDirectory instance to offload.
   1127     @param queue     If the job should be offloaded, put the offload
   1128                      parameters into this queue for processing.
   1129     @param age_limit Minimum age for a job to be offloaded.  A value
   1130                      of 0 means that the job will be offloaded as
   1131                      soon as it is finished.
   1132 
   1133     """
   1134     if not job.offload_count:
   1135         if not _is_expired(job, age_limit):
   1136             return
   1137         job.first_offload_start = time.time()
   1138     job.offload_count += 1
   1139     if job.process_gs_instructions():
   1140         timestamp = _cached_get_timestamp_if_finished(job)
   1141         queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
   1142 
   1143 
   1144 def parse_options():
   1145     """Parse the args passed into gs_offloader."""
   1146     defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
   1147             utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
   1148     usage = 'usage: %prog [options]\n' + defaults
   1149     parser = OptionParser(usage)
   1150     parser.add_option('-a', '--all', dest='process_all',
   1151                       action='store_true',
   1152                       help='Offload all files in the results directory.')
   1153     parser.add_option('-s', '--hosts', dest='process_hosts_only',
   1154                       action='store_true',
   1155                       help='Offload only the special tasks result files '
   1156                       'located in the results/hosts subdirectory')
   1157     parser.add_option('-p', '--parallelism', dest='parallelism',
   1158                       type='int', default=1,
   1159                       help='Number of parallel workers to use.')
   1160     parser.add_option('-o', '--delete_only', dest='delete_only',
   1161                       action='store_true',
   1162                       help='GS Offloader will only the delete the '
   1163                       'directories and will not offload them to google '
   1164                       'storage. NOTE: If global_config variable '
   1165                       'CROS.gs_offloading_enabled is False, --delete_only '
   1166                       'is automatically True.',
   1167                       default=not GS_OFFLOADING_ENABLED)
   1168     parser.add_option('-d', '--days_old', dest='days_old',
   1169                       help='Minimum job age in days before a result can be '
   1170                       'offloaded.', type='int', default=0)
   1171     parser.add_option('-l', '--log_size', dest='log_size',
   1172                       help='Limit the offloader logs to a specified '
   1173                       'number of Mega Bytes.', type='int', default=0)
   1174     parser.add_option('-m', dest='multiprocessing', action='store_true',
   1175                       help='Turn on -m option for gsutil. If not set, the '
   1176                       'global config setting gs_offloader_multiprocessing '
   1177                       'under CROS section is applied.')
   1178     parser.add_option('-i', '--offload_once', dest='offload_once',
   1179                       action='store_true',
   1180                       help='Upload all available results and then exit.')
   1181     parser.add_option('-y', '--normal_priority', dest='normal_priority',
   1182                       action='store_true',
   1183                       help='Upload using normal process priority.')
   1184     parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
   1185                       help='Minimum job age in days before a result can be '
   1186                       'offloaded, but not removed from local storage',
   1187                       type='int', default=None)
   1188     parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
   1189                       help='Minimum job age in days before a result can be '
   1190                       'removed from local storage',
   1191                       type='int', default=None)
   1192     parser.add_option(
   1193             '--metrics-file',
   1194             help='If provided, drop metrics to this local file instead of '
   1195                  'reporting to ts_mon',
   1196             type=str,
   1197             default=None,
   1198     )
   1199     parser.add_option('-t', '--enable_timestamp_cache',
   1200                       dest='enable_timestamp_cache',
   1201                       action='store_true',
   1202                       help='Cache the finished timestamps from AFE.')
   1203 
   1204     options = parser.parse_args()[0]
   1205     if options.process_all and options.process_hosts_only:
   1206         parser.print_help()
   1207         print ('Cannot process all files and only the hosts '
   1208                'subdirectory. Please remove an argument.')
   1209         sys.exit(1)
   1210 
   1211     if options.days_old and (options.age_to_upload or options.age_to_delete):
   1212         parser.print_help()
   1213         print('Use the days_old option or the age_to_* options but not both')
   1214         sys.exit(1)
   1215 
   1216     if options.age_to_upload == None:
   1217         options.age_to_upload = options.days_old
   1218     if options.age_to_delete == None:
   1219         options.age_to_delete = options.days_old
   1220 
   1221     return options
   1222 
   1223 
   1224 def main():
   1225     """Main method of gs_offloader."""
   1226     options = parse_options()
   1227 
   1228     if options.process_all:
   1229         offloader_type = 'all'
   1230     elif options.process_hosts_only:
   1231         offloader_type = 'hosts'
   1232     else:
   1233         offloader_type = 'jobs'
   1234 
   1235     _setup_logging(options, offloader_type)
   1236 
   1237     if options.enable_timestamp_cache:
   1238         # Extend the cache expiry time by another 1% so the timstamps
   1239         # are available as the results are purged.
   1240         job_timestamp_cache.setup(options.age_to_delete * 1.01)
   1241 
   1242     # Nice our process (carried to subprocesses) so we don't overload
   1243     # the system.
   1244     if not options.normal_priority:
   1245         logging.debug('Set process to nice value: %d', NICENESS)
   1246         os.nice(NICENESS)
   1247     if psutil:
   1248         proc = psutil.Process()
   1249         logging.debug('Set process to ionice IDLE')
   1250         proc.ionice(psutil.IOPRIO_CLASS_IDLE)
   1251 
   1252     # os.listdir returns relative paths, so change to where we need to
   1253     # be to avoid an os.path.join on each loop.
   1254     logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
   1255     os.chdir(RESULTS_DIR)
   1256 
   1257     service_name = 'gs_offloader(%s)' % offloader_type
   1258     with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
   1259                                              short_lived=False,
   1260                                              debug_file=options.metrics_file):
   1261         with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
   1262             offloader = Offloader(options)
   1263             if not options.delete_only:
   1264                 wait_for_gs_write_access(offloader.gs_uri)
   1265             while True:
   1266                 offloader.offload_once()
   1267                 if options.offload_once:
   1268                     break
   1269                 time.sleep(SLEEP_TIME_SECS)
   1270 
   1271 
   1272 _LOG_LOCATION = '/usr/local/autotest/logs/'
   1273 _LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
   1274 _LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
   1275 _LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
   1276 
   1277 
   1278 def _setup_logging(options, offloader_type):
   1279     """Set up logging.
   1280 
   1281     @param options: Parsed options.
   1282     @param offloader_type: Type of offloader action as string.
   1283     """
   1284     log_filename = _get_log_filename(options, offloader_type)
   1285     log_formatter = logging.Formatter(_LOGGING_FORMAT)
   1286     # Replace the default logging handler with a RotatingFileHandler. If
   1287     # options.log_size is 0, the file size will not be limited. Keeps
   1288     # one backup just in case.
   1289     handler = logging.handlers.RotatingFileHandler(
   1290             log_filename, maxBytes=1024 * options.log_size, backupCount=1)
   1291     handler.setFormatter(log_formatter)
   1292     logger = logging.getLogger()
   1293     logger.setLevel(logging.DEBUG)
   1294     logger.addHandler(handler)
   1295 
   1296 
   1297 def _get_log_filename(options, offloader_type):
   1298     """Get log filename.
   1299 
   1300     @param options: Parsed options.
   1301     @param offloader_type: Type of offloader action as string.
   1302     """
   1303     if options.log_size > 0:
   1304         log_timestamp = ''
   1305     else:
   1306         log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
   1307     log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
   1308     return os.path.join(_LOG_LOCATION, log_basename)
   1309 
   1310 
   1311 if __name__ == '__main__':
   1312     main()
   1313