Home | History | Annotate | Download | only in site_utils
      1 #!/usr/bin/python
      2 #
      3 # Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
      4 # Use of this source code is governed by a BSD-style license that can be
      5 # found in the LICENSE file.
      6 
      7 """Script to archive old Autotest results to Google Storage.
      8 
      9 Uses gsutil to archive files to the configured Google Storage bucket.
     10 Upon successful copy, the local results directory is deleted.
     11 """
     12 
     13 import abc
     14 import datetime
     15 import errno
     16 import glob
     17 import gzip
     18 import logging
     19 import logging.handlers
     20 import os
     21 import re
     22 import shutil
     23 import socket
     24 import stat
     25 import subprocess
     26 import sys
     27 import tarfile
     28 import tempfile
     29 import time
     30 
     31 from optparse import OptionParser
     32 
     33 import common
     34 from autotest_lib.client.common_lib import file_utils
     35 from autotest_lib.client.common_lib import global_config
     36 from autotest_lib.client.common_lib import utils
     37 from autotest_lib.site_utils import job_directories
     38 # For unittest, the cloud_console.proto is not compiled yet.
     39 try:
     40     from autotest_lib.site_utils import cloud_console_client
     41 except ImportError:
     42     cloud_console_client = None
     43 from autotest_lib.tko import models
     44 from autotest_lib.utils import labellib
     45 from autotest_lib.utils import gslib
     46 from chromite.lib import timeout_util
     47 
     48 # Autotest requires the psutil module from site-packages, so it must be imported
     49 # after "import common".
     50 try:
     51     # Does not exist, nor is needed, on moblab.
     52     import psutil
     53 except ImportError:
     54     psutil = None
     55 
     56 from chromite.lib import parallel
     57 try:
     58     from chromite.lib import metrics
     59     from chromite.lib import ts_mon_config
     60 except ImportError:
     61     metrics = utils.metrics_mock
     62     ts_mon_config = utils.metrics_mock
     63 
     64 
     65 GS_OFFLOADING_ENABLED = global_config.global_config.get_config_value(
     66         'CROS', 'gs_offloading_enabled', type=bool, default=True)
     67 
     68 # Nice setting for process, the higher the number the lower the priority.
     69 NICENESS = 10
     70 
     71 # Maximum number of seconds to allow for offloading a single
     72 # directory.
     73 OFFLOAD_TIMEOUT_SECS = 60 * 60
     74 
     75 # Sleep time per loop.
     76 SLEEP_TIME_SECS = 5
     77 
     78 # Minimum number of seconds between e-mail reports.
     79 REPORT_INTERVAL_SECS = 60 * 60
     80 
     81 # Location of Autotest results on disk.
     82 RESULTS_DIR = '/usr/local/autotest/results'
     83 FAILED_OFFLOADS_FILE = os.path.join(RESULTS_DIR, 'FAILED_OFFLOADS')
     84 
     85 FAILED_OFFLOADS_FILE_HEADER = '''
     86 This is the list of gs_offloader failed jobs.
     87 Last offloader attempt at %s failed to offload %d files.
     88 Check http://go/cros-triage-gsoffloader to triage the issue
     89 
     90 
     91 First failure       Count   Directory name
     92 =================== ======  ==============================
     93 '''
     94 # --+----1----+----  ----+  ----+----1----+----2----+----3
     95 
     96 FAILED_OFFLOADS_LINE_FORMAT = '%19s  %5d  %-1s\n'
     97 FAILED_OFFLOADS_TIME_FORMAT = '%Y-%m-%d %H:%M:%S'
     98 
     99 USE_RSYNC_ENABLED = global_config.global_config.get_config_value(
    100         'CROS', 'gs_offloader_use_rsync', type=bool, default=False)
    101 
    102 LIMIT_FILE_COUNT = global_config.global_config.get_config_value(
    103         'CROS', 'gs_offloader_limit_file_count', type=bool, default=False)
    104 
    105 # Use multiprocessing for gsutil uploading.
    106 GS_OFFLOADER_MULTIPROCESSING = global_config.global_config.get_config_value(
    107         'CROS', 'gs_offloader_multiprocessing', type=bool, default=False)
    108 
    109 D = '[0-9][0-9]'
    110 TIMESTAMP_PATTERN = '%s%s.%s.%s_%s.%s.%s' % (D, D, D, D, D, D, D)
    111 CTS_RESULT_PATTERN = 'testResult.xml'
    112 CTS_V2_RESULT_PATTERN = 'test_result.xml'
    113 # Google Storage bucket URI to store results in.
    114 DEFAULT_CTS_RESULTS_GSURI = global_config.global_config.get_config_value(
    115         'CROS', 'cts_results_server', default='')
    116 DEFAULT_CTS_APFE_GSURI = global_config.global_config.get_config_value(
    117         'CROS', 'cts_apfe_server', default='')
    118 
    119 # metadata type
    120 GS_OFFLOADER_SUCCESS_TYPE = 'gs_offloader_success'
    121 GS_OFFLOADER_FAILURE_TYPE = 'gs_offloader_failure'
    122 
    123 # Autotest test to collect list of CTS tests
    124 TEST_LIST_COLLECTOR = 'tradefed-run-collect-tests-only'
    125 
    126 def _get_metrics_fields(dir_entry):
    127     """Get metrics fields for the given test result directory, including board
    128     and milestone.
    129 
    130     @param dir_entry: Directory entry to offload.
    131     @return A dictionary for the metrics data to be uploaded.
    132     """
    133     fields = {'board': 'unknown',
    134               'milestone': 'unknown'}
    135     if dir_entry:
    136         # There could be multiple hosts in the job directory, use the first one
    137         # available.
    138         for host in glob.glob(os.path.join(dir_entry, '*')):
    139             try:
    140                 keyval = models.test.parse_job_keyval(host)
    141             except ValueError:
    142                 continue
    143             build = keyval.get('build')
    144             if build:
    145                 try:
    146                     cros_version = labellib.parse_cros_version(build)
    147                     fields['board'] = cros_version.board
    148                     fields['milestone'] = cros_version.milestone
    149                     break
    150                 except ValueError:
    151                     # Ignore version parsing error so it won't crash
    152                     # gs_offloader.
    153                     pass
    154 
    155     return fields;
    156 
    157 
    158 def _get_es_metadata(dir_entry):
    159     """Get ES metadata for the given test result directory.
    160 
    161     @param dir_entry: Directory entry to offload.
    162     @return A dictionary for the metadata to be uploaded.
    163     """
    164     fields = _get_metrics_fields(dir_entry)
    165     fields['hostname'] = socket.gethostname()
    166     # Include more data about the test job in metadata.
    167     if dir_entry:
    168         fields['dir_entry'] = dir_entry
    169         fields['job_id'] = job_directories.get_job_id_or_task_id(dir_entry)
    170 
    171     return fields
    172 
    173 
    174 def _get_cmd_list(multiprocessing, dir_entry, gs_path):
    175     """Return the command to offload a specified directory.
    176 
    177     @param multiprocessing: True to turn on -m option for gsutil.
    178     @param dir_entry: Directory entry/path that which we need a cmd_list
    179                       to offload.
    180     @param gs_path: Location in google storage where we will
    181                     offload the directory.
    182 
    183     @return A command list to be executed by Popen.
    184     """
    185     cmd = ['gsutil']
    186     if multiprocessing:
    187         cmd.append('-m')
    188     if USE_RSYNC_ENABLED:
    189         cmd.append('rsync')
    190         target = os.path.join(gs_path, os.path.basename(dir_entry))
    191     else:
    192         cmd.append('cp')
    193         target = gs_path
    194     cmd += ['-eR', dir_entry, target]
    195     return cmd
    196 
    197 
    198 def sanitize_dir(dirpath):
    199     """Sanitize directory for gs upload.
    200 
    201     Symlinks and FIFOS are converted to regular files to fix bugs.
    202 
    203     @param dirpath: Directory entry to be sanitized.
    204     """
    205     if not os.path.exists(dirpath):
    206         return
    207     _escape_rename(dirpath)
    208     _escape_rename_dir_contents(dirpath)
    209     _sanitize_fifos(dirpath)
    210     _sanitize_symlinks(dirpath)
    211 
    212 
    213 def _escape_rename_dir_contents(dirpath):
    214     """Recursively rename directory to escape filenames for gs upload.
    215 
    216     @param dirpath: Directory path string.
    217     """
    218     for filename in os.listdir(dirpath):
    219         path = os.path.join(dirpath, filename)
    220         _escape_rename(path)
    221     for filename in os.listdir(dirpath):
    222         path = os.path.join(dirpath, filename)
    223         if os.path.isdir(path):
    224             _escape_rename_dir_contents(path)
    225 
    226 
    227 def _escape_rename(path):
    228     """Rename file to escape filenames for gs upload.
    229 
    230     @param path: File path string.
    231     """
    232     dirpath, filename = os.path.split(path)
    233     sanitized_filename = gslib.escape(filename)
    234     sanitized_path = os.path.join(dirpath, sanitized_filename)
    235     os.rename(path, sanitized_path)
    236 
    237 
    238 def _sanitize_fifos(dirpath):
    239     """Convert fifos to regular files (fixes crbug.com/684122).
    240 
    241     @param dirpath: Directory path string.
    242     """
    243     for root, _, files in os.walk(dirpath):
    244         for filename in files:
    245             path = os.path.join(root, filename)
    246             file_stat = os.lstat(path)
    247             if stat.S_ISFIFO(file_stat.st_mode):
    248                 _replace_fifo_with_file(path)
    249 
    250 
    251 def _replace_fifo_with_file(path):
    252     """Replace a fifo with a normal file.
    253 
    254     @param path: Fifo path string.
    255     """
    256     logging.debug('Removing fifo %s', path)
    257     os.remove(path)
    258     logging.debug('Creating marker %s', path)
    259     with open(path, 'w') as f:
    260         f.write('<FIFO>')
    261 
    262 
    263 def _sanitize_symlinks(dirpath):
    264     """Convert Symlinks to regular files (fixes crbug.com/692788).
    265 
    266     @param dirpath: Directory path string.
    267     """
    268     for root, _, files in os.walk(dirpath):
    269         for filename in files:
    270             path = os.path.join(root, filename)
    271             file_stat = os.lstat(path)
    272             if stat.S_ISLNK(file_stat.st_mode):
    273                 _replace_symlink_with_file(path)
    274 
    275 
    276 def _replace_symlink_with_file(path):
    277     """Replace a symlink with a normal file.
    278 
    279     @param path: Symlink path string.
    280     """
    281     target = os.readlink(path)
    282     logging.debug('Removing symlink %s', path)
    283     os.remove(path)
    284     logging.debug('Creating marker %s', path)
    285     with open(path, 'w') as f:
    286         f.write('<symlink to %s>' % target)
    287 
    288 
    289 # Maximum number of files in the folder.
    290 _MAX_FILE_COUNT = 500
    291 _FOLDERS_NEVER_ZIP = ['debug', 'ssp_logs', 'autoupdate_logs']
    292 
    293 
    294 def _get_zippable_folders(dir_entry):
    295     folders_list = []
    296     for folder in os.listdir(dir_entry):
    297         folder_path = os.path.join(dir_entry, folder)
    298         if (not os.path.isfile(folder_path) and
    299                 not folder in _FOLDERS_NEVER_ZIP):
    300             folders_list.append(folder_path)
    301     return folders_list
    302 
    303 
    304 def limit_file_count(dir_entry):
    305     """Limit the number of files in given directory.
    306 
    307     The method checks the total number of files in the given directory.
    308     If the number is greater than _MAX_FILE_COUNT, the method will
    309     compress each folder in the given directory, except folders in
    310     _FOLDERS_NEVER_ZIP.
    311 
    312     @param dir_entry: Directory entry to be checked.
    313     """
    314     try:
    315         count = _count_files(dir_entry)
    316     except ValueError:
    317         logging.warning('Fail to get the file count in folder %s.', dir_entry)
    318         return
    319     if count < _MAX_FILE_COUNT:
    320         return
    321 
    322     # For test job, zip folders in a second level, e.g. 123-debug/host1.
    323     # This is to allow autoserv debug folder still be accessible.
    324     # For special task, it does not need to dig one level deeper.
    325     is_special_task = re.match(job_directories.SPECIAL_TASK_PATTERN,
    326                                dir_entry)
    327 
    328     folders = _get_zippable_folders(dir_entry)
    329     if not is_special_task:
    330         subfolders = []
    331         for folder in folders:
    332             subfolders.extend(_get_zippable_folders(folder))
    333         folders = subfolders
    334 
    335     for folder in folders:
    336         _make_into_tarball(folder)
    337 
    338 
    339 def _count_files(dirpath):
    340     """Count the number of files in a directory recursively.
    341 
    342     @param dirpath: Directory path string.
    343     """
    344     return sum(len(files) for _path, _dirs, files in os.walk(dirpath))
    345 
    346 
    347 def _make_into_tarball(dirpath):
    348     """Make directory into tarball.
    349 
    350     @param dirpath: Directory path string.
    351     """
    352     tarpath = '%s.tgz' % dirpath
    353     with tarfile.open(tarpath, 'w:gz') as tar:
    354         tar.add(dirpath, arcname=os.path.basename(dirpath))
    355     shutil.rmtree(dirpath)
    356 
    357 
    358 def correct_results_folder_permission(dir_entry):
    359     """Make sure the results folder has the right permission settings.
    360 
    361     For tests running with server-side packaging, the results folder has
    362     the owner of root. This must be changed to the user running the
    363     autoserv process, so parsing job can access the results folder.
    364 
    365     @param dir_entry: Path to the results folder.
    366     """
    367     if not dir_entry:
    368         return
    369 
    370     logging.info('Trying to correct file permission of %s.', dir_entry)
    371     try:
    372         owner = '%s:%s' % (os.getuid(), os.getgid())
    373         subprocess.check_call(
    374                 ['sudo', '-n', 'chown', '-R', owner, dir_entry])
    375         subprocess.check_call(['chmod', '-R', 'u+r', dir_entry])
    376         subprocess.check_call(
    377                 ['find', dir_entry, '-type', 'd',
    378                  '-exec', 'chmod', 'u+x', '{}', ';'])
    379     except subprocess.CalledProcessError as e:
    380         logging.error('Failed to modify permission for %s: %s',
    381                       dir_entry, e)
    382 
    383 
    384 def _upload_cts_testresult(dir_entry, multiprocessing):
    385     """Upload test results to separate gs buckets.
    386 
    387     Upload testResult.xml.gz/test_result.xml.gz file to cts_results_bucket.
    388     Upload timestamp.zip to cts_apfe_bucket.
    389 
    390     @param dir_entry: Path to the results folder.
    391     @param multiprocessing: True to turn on -m option for gsutil.
    392     """
    393     for host in glob.glob(os.path.join(dir_entry, '*')):
    394         cts_path = os.path.join(host, 'cheets_CTS.*', 'results', '*',
    395                                 TIMESTAMP_PATTERN)
    396         cts_v2_path = os.path.join(host, 'cheets_CTS_*', 'results', '*',
    397                                    TIMESTAMP_PATTERN)
    398         gts_v2_path = os.path.join(host, 'cheets_GTS.*', 'results', '*',
    399                                    TIMESTAMP_PATTERN)
    400         for result_path, result_pattern in [(cts_path, CTS_RESULT_PATTERN),
    401                             (cts_v2_path, CTS_V2_RESULT_PATTERN),
    402                             (gts_v2_path, CTS_V2_RESULT_PATTERN)]:
    403             for path in glob.glob(result_path):
    404                 try:
    405                     _upload_files(host, path, result_pattern, multiprocessing)
    406                 except Exception as e:
    407                     logging.error('ERROR uploading test results %s to GS: %s',
    408                                   path, e)
    409 
    410 
    411 def _is_valid_result(build, result_pattern, suite):
    412     """Check if the result should be uploaded to CTS/GTS buckets.
    413 
    414     @param build: Builder name.
    415     @param result_pattern: XML result file pattern.
    416     @param suite: Test suite name.
    417 
    418     @returns: Bool flag indicating whether a valid result.
    419     """
    420     if build is None or suite is None:
    421         return False
    422 
    423     # Not valid if it's not a release build.
    424     if not re.match(r'(?!trybot-).*-release/.*', build):
    425         return False
    426 
    427     # Not valid if it's cts result but not 'arc-cts*' or 'test_that_wrapper'
    428     # suite.
    429     result_patterns = [CTS_RESULT_PATTERN, CTS_V2_RESULT_PATTERN]
    430     if result_pattern in result_patterns and not (
    431             suite.startswith('arc-cts') or suite.startswith('arc-gts') or
    432             suite.startswith('test_that_wrapper')):
    433         return False
    434 
    435     return True
    436 
    437 
    438 def _is_test_collector(package):
    439     """Returns true if the test run is just to collect list of CTS tests.
    440 
    441     @param package: Autotest package name. e.g. cheets_CTS_N.CtsGraphicsTestCase
    442 
    443     @return Bool flag indicating a test package is CTS list generator or not.
    444     """
    445     return TEST_LIST_COLLECTOR in package
    446 
    447 
    448 def _upload_files(host, path, result_pattern, multiprocessing):
    449     keyval = models.test.parse_job_keyval(host)
    450     build = keyval.get('build')
    451     suite = keyval.get('suite')
    452 
    453     if not _is_valid_result(build, result_pattern, suite):
    454         # No need to upload current folder, return.
    455         return
    456 
    457     parent_job_id = str(keyval['parent_job_id'])
    458 
    459     folders = path.split(os.sep)
    460     job_id = folders[-6]
    461     package = folders[-4]
    462     timestamp = folders[-1]
    463 
    464     # Results produced by CTS test list collector are dummy results.
    465     # They don't need to be copied to APFE bucket which is mainly being used for
    466     # CTS APFE submission.
    467     if not _is_test_collector(package):
    468         # Path: bucket/build/parent_job_id/cheets_CTS.*/job_id_timestamp/
    469         # or bucket/build/parent_job_id/cheets_GTS.*/job_id_timestamp/
    470         cts_apfe_gs_path = os.path.join(
    471                 DEFAULT_CTS_APFE_GSURI, build, parent_job_id,
    472                 package, job_id + '_' + timestamp) + '/'
    473 
    474         for zip_file in glob.glob(os.path.join('%s.zip' % path)):
    475             utils.run(' '.join(_get_cmd_list(
    476                     multiprocessing, zip_file, cts_apfe_gs_path)))
    477             logging.debug('Upload %s to %s ', zip_file, cts_apfe_gs_path)
    478     else:
    479         logging.debug('%s is a CTS Test collector Autotest test run.', package)
    480         logging.debug('Skipping CTS results upload to APFE gs:// bucket.')
    481 
    482     # Path: bucket/cheets_CTS.*/job_id_timestamp/
    483     # or bucket/cheets_GTS.*/job_id_timestamp/
    484     test_result_gs_path = os.path.join(
    485             DEFAULT_CTS_RESULTS_GSURI, package,
    486             job_id + '_' + timestamp) + '/'
    487 
    488     for test_result_file in glob.glob(os.path.join(path, result_pattern)):
    489         # gzip test_result_file(testResult.xml/test_result.xml)
    490 
    491         test_result_file_gz =  '%s.gz' % test_result_file
    492         with open(test_result_file, 'r') as f_in, (
    493                 gzip.open(test_result_file_gz, 'w')) as f_out:
    494             shutil.copyfileobj(f_in, f_out)
    495         utils.run(' '.join(_get_cmd_list(
    496                 multiprocessing, test_result_file_gz, test_result_gs_path)))
    497         logging.debug('Zip and upload %s to %s',
    498                       test_result_file_gz, test_result_gs_path)
    499         # Remove test_result_file_gz(testResult.xml.gz/test_result.xml.gz)
    500         os.remove(test_result_file_gz)
    501 
    502 
    503 def _emit_gs_returncode_metric(returncode):
    504     """Increment the gs_returncode counter based on |returncode|."""
    505     m_gs_returncode = 'chromeos/autotest/gs_offloader/gs_returncode'
    506     rcode = int(returncode)
    507     if rcode < 0 or rcode > 255:
    508         rcode = -1
    509     metrics.Counter(m_gs_returncode).increment(fields={'return_code': rcode})
    510 
    511 
    512 def _handle_dir_os_error(dir_entry, fix_permission=False):
    513     """Try to fix the result directory's permission issue if needed.
    514 
    515     @param dir_entry: Directory entry to offload.
    516     @param fix_permission: True to change the directory's owner to the same one
    517             running gs_offloader.
    518     """
    519     if fix_permission:
    520         correct_results_folder_permission(dir_entry)
    521     m_permission_error = ('chromeos/autotest/errors/gs_offloader/'
    522                           'wrong_permissions_count')
    523     metrics_fields = _get_metrics_fields(dir_entry)
    524     metrics.Counter(m_permission_error).increment(fields=metrics_fields)
    525 
    526 
    527 class BaseGSOffloader(object):
    528 
    529     """Google Storage offloader interface."""
    530 
    531     __metaclass__ = abc.ABCMeta
    532 
    533     def offload(self, dir_entry, dest_path, job_complete_time):
    534         """Safely offload a directory entry to Google Storage.
    535 
    536         This method is responsible for copying the contents of
    537         `dir_entry` to Google storage at `dest_path`.
    538 
    539         When successful, the method must delete all of `dir_entry`.
    540         On failure, `dir_entry` should be left undisturbed, in order
    541         to allow for retry.
    542 
    543         Errors are conveyed simply and solely by two methods:
    544           * At the time of failure, write enough information to the log
    545             to allow later debug, if necessary.
    546           * Don't delete the content.
    547 
    548         In order to guarantee robustness, this method must not raise any
    549         exceptions.
    550 
    551         @param dir_entry: Directory entry to offload.
    552         @param dest_path: Location in google storage where we will
    553                           offload the directory.
    554         @param job_complete_time: The complete time of the job from the AFE
    555                                   database.
    556         """
    557         try:
    558             self._full_offload(dir_entry, dest_path, job_complete_time)
    559         except Exception as e:
    560             logging.debug('Exception in offload for %s', dir_entry)
    561             logging.debug('Ignoring this error: %s', str(e))
    562 
    563     @abc.abstractmethod
    564     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    565         """Offload a directory entry to Google Storage.
    566 
    567         This method implements the actual offload behavior of its
    568         subclass.  To guarantee effective debug, this method should
    569         catch all exceptions, and perform any reasonable diagnosis
    570         or other handling.
    571 
    572         @param dir_entry: Directory entry to offload.
    573         @param dest_path: Location in google storage where we will
    574                           offload the directory.
    575         @param job_complete_time: The complete time of the job from the AFE
    576                                   database.
    577         """
    578 
    579 
    580 class GSOffloader(BaseGSOffloader):
    581     """Google Storage Offloader."""
    582 
    583     def __init__(self, gs_uri, multiprocessing, delete_age,
    584             console_client=None):
    585         """Returns the offload directory function for the given gs_uri
    586 
    587         @param gs_uri: Google storage bucket uri to offload to.
    588         @param multiprocessing: True to turn on -m option for gsutil.
    589         @param console_client: The cloud console client. If None,
    590           cloud console APIs are  not called.
    591         """
    592         self._gs_uri = gs_uri
    593         self._multiprocessing = multiprocessing
    594         self._delete_age = delete_age
    595         self._console_client = console_client
    596 
    597     @metrics.SecondsTimerDecorator(
    598             'chromeos/autotest/gs_offloader/job_offload_duration')
    599     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    600         """Offload the specified directory entry to Google storage.
    601 
    602         @param dir_entry: Directory entry to offload.
    603         @param dest_path: Location in google storage where we will
    604                           offload the directory.
    605         @param job_complete_time: The complete time of the job from the AFE
    606                                   database.
    607         """
    608         with tempfile.TemporaryFile('w+') as stdout_file, \
    609              tempfile.TemporaryFile('w+') as stderr_file:
    610             try:
    611                 try:
    612                     self._try_offload(dir_entry, dest_path, stdout_file,
    613                                       stderr_file)
    614                 except OSError as e:
    615                     # Correct file permission error of the directory, then raise
    616                     # the exception so gs_offloader can retry later.
    617                     _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
    618                     # Try again after the permission issue is fixed.
    619                     self._try_offload(dir_entry, dest_path, stdout_file,
    620                                       stderr_file)
    621             except _OffloadError as e:
    622                 metrics_fields = _get_metrics_fields(dir_entry)
    623                 m_any_error = 'chromeos/autotest/errors/gs_offloader/any_error'
    624                 metrics.Counter(m_any_error).increment(fields=metrics_fields)
    625 
    626                 # Rewind the log files for stdout and stderr and log
    627                 # their contents.
    628                 stdout_file.seek(0)
    629                 stderr_file.seek(0)
    630                 stderr_content = stderr_file.read()
    631                 logging.warning('Error occurred when offloading %s:', dir_entry)
    632                 logging.warning('Stdout:\n%s \nStderr:\n%s', stdout_file.read(),
    633                                 stderr_content)
    634 
    635                 # Some result files may have wrong file permission. Try
    636                 # to correct such error so later try can success.
    637                 # TODO(dshi): The code is added to correct result files
    638                 # with wrong file permission caused by bug 511778. After
    639                 # this code is pushed to lab and run for a while to
    640                 # clean up these files, following code and function
    641                 # correct_results_folder_permission can be deleted.
    642                 if 'CommandException: Error opening file' in stderr_content:
    643                     correct_results_folder_permission(dir_entry)
    644             else:
    645                 self._prune(dir_entry, job_complete_time)
    646 
    647     def _try_offload(self, dir_entry, dest_path,
    648                  stdout_file, stderr_file):
    649         """Offload the specified directory entry to Google storage.
    650 
    651         @param dir_entry: Directory entry to offload.
    652         @param dest_path: Location in google storage where we will
    653                           offload the directory.
    654         @param job_complete_time: The complete time of the job from the AFE
    655                                   database.
    656         @param stdout_file: Log file.
    657         @param stderr_file: Log file.
    658         """
    659         if _is_uploaded(dir_entry):
    660             return
    661         start_time = time.time()
    662         metrics_fields = _get_metrics_fields(dir_entry)
    663         es_metadata = _get_es_metadata(dir_entry)
    664         error_obj = _OffloadError(start_time, es_metadata)
    665         try:
    666             sanitize_dir(dir_entry)
    667             if DEFAULT_CTS_RESULTS_GSURI:
    668                 _upload_cts_testresult(dir_entry, self._multiprocessing)
    669 
    670             if LIMIT_FILE_COUNT:
    671                 limit_file_count(dir_entry)
    672             es_metadata['size_kb'] = file_utils.get_directory_size_kibibytes(dir_entry)
    673 
    674             process = None
    675             with timeout_util.Timeout(OFFLOAD_TIMEOUT_SECS):
    676                 gs_path = '%s%s' % (self._gs_uri, dest_path)
    677                 process = subprocess.Popen(
    678                         _get_cmd_list(self._multiprocessing, dir_entry, gs_path),
    679                         stdout=stdout_file, stderr=stderr_file)
    680                 process.wait()
    681 
    682             _emit_gs_returncode_metric(process.returncode)
    683             if process.returncode != 0:
    684                 raise error_obj
    685             _emit_offload_metrics(dir_entry)
    686 
    687             if self._console_client:
    688                 gcs_uri = os.path.join(gs_path,
    689                         os.path.basename(dir_entry))
    690                 if not self._console_client.send_test_job_offloaded_message(
    691                         gcs_uri):
    692                     raise error_obj
    693 
    694             _mark_uploaded(dir_entry)
    695         except timeout_util.TimeoutError:
    696             m_timeout = 'chromeos/autotest/errors/gs_offloader/timed_out_count'
    697             metrics.Counter(m_timeout).increment(fields=metrics_fields)
    698             # If we finished the call to Popen(), we may need to
    699             # terminate the child process.  We don't bother calling
    700             # process.poll(); that inherently races because the child
    701             # can die any time it wants.
    702             if process:
    703                 try:
    704                     process.terminate()
    705                 except OSError:
    706                     # We don't expect any error other than "No such
    707                     # process".
    708                     pass
    709             logging.error('Offloading %s timed out after waiting %d '
    710                           'seconds.', dir_entry, OFFLOAD_TIMEOUT_SECS)
    711             raise error_obj
    712 
    713     def _prune(self, dir_entry, job_complete_time):
    714         """Prune directory if it is uploaded and expired.
    715 
    716         @param dir_entry: Directory entry to offload.
    717         @param job_complete_time: The complete time of the job from the AFE
    718                                   database.
    719         """
    720         if not (_is_uploaded(dir_entry)
    721                 and job_directories.is_job_expired(self._delete_age,
    722                                                    job_complete_time)):
    723             return
    724         try:
    725             shutil.rmtree(dir_entry)
    726         except OSError as e:
    727             # The wrong file permission can lead call `shutil.rmtree(dir_entry)`
    728             # to raise OSError with message 'Permission denied'. Details can be
    729             # found in crbug.com/536151
    730             _handle_dir_os_error(dir_entry, e.errno==errno.EACCES)
    731             # Try again after the permission issue is fixed.
    732             shutil.rmtree(dir_entry)
    733 
    734 
    735 class _OffloadError(Exception):
    736     """Google Storage offload failed."""
    737 
    738     def __init__(self, start_time, es_metadata):
    739         super(_OffloadError, self).__init__(start_time, es_metadata)
    740         self.start_time = start_time
    741         self.es_metadata = es_metadata
    742 
    743 
    744 
    745 class FakeGSOffloader(BaseGSOffloader):
    746 
    747     """Fake Google Storage Offloader that only deletes directories."""
    748 
    749     def _full_offload(self, dir_entry, dest_path, job_complete_time):
    750         """Pretend to offload a directory and delete it.
    751 
    752         @param dir_entry: Directory entry to offload.
    753         @param dest_path: Location in google storage where we will
    754                           offload the directory.
    755         @param job_complete_time: The complete time of the job from the AFE
    756                                   database.
    757         """
    758         shutil.rmtree(dir_entry)
    759 
    760 
    761 def _is_expired(job, age_limit):
    762     """Return whether job directory is expired for uploading
    763 
    764     @param job: _JobDirectory instance.
    765     @param age_limit:  Minimum age in days at which a job may be offloaded.
    766     """
    767     job_timestamp = job.get_timestamp_if_finished()
    768     if not job_timestamp:
    769         return False
    770     return job_directories.is_job_expired(age_limit, job_timestamp)
    771 
    772 
    773 def _emit_offload_metrics(dirpath):
    774     """Emit gs offload metrics.
    775 
    776     @param dirpath: Offloaded directory path.
    777     """
    778     dir_size = file_utils.get_directory_size_kibibytes(dirpath)
    779     metrics_fields = _get_metrics_fields(dirpath)
    780 
    781     m_offload_count = (
    782             'chromeos/autotest/gs_offloader/jobs_offloaded')
    783     metrics.Counter(m_offload_count).increment(
    784             fields=metrics_fields)
    785     m_offload_size = ('chromeos/autotest/gs_offloader/'
    786                       'kilobytes_transferred')
    787     metrics.Counter(m_offload_size).increment_by(
    788             dir_size, fields=metrics_fields)
    789 
    790 
    791 def _is_uploaded(dirpath):
    792     """Return whether directory has been uploaded.
    793 
    794     @param dirpath: Directory path string.
    795     """
    796     return os.path.isfile(_get_uploaded_marker_file(dirpath))
    797 
    798 
    799 def _mark_uploaded(dirpath):
    800     """Mark directory as uploaded.
    801 
    802     @param dirpath: Directory path string.
    803     """
    804     with open(_get_uploaded_marker_file(dirpath), 'a'):
    805         pass
    806 
    807 
    808 def _get_uploaded_marker_file(dirpath):
    809     """Return path to upload marker file for directory.
    810 
    811     @param dirpath: Directory path string.
    812     """
    813     return '%s/.GS_UPLOADED' % (dirpath,)
    814 
    815 
    816 def _format_job_for_failure_reporting(job):
    817     """Formats a _JobDirectory for reporting / logging.
    818 
    819     @param job: The _JobDirectory to format.
    820     """
    821     d = datetime.datetime.fromtimestamp(job.first_offload_start)
    822     data = (d.strftime(FAILED_OFFLOADS_TIME_FORMAT),
    823             job.offload_count,
    824             job.dirname)
    825     return FAILED_OFFLOADS_LINE_FORMAT % data
    826 
    827 
    828 def wait_for_gs_write_access(gs_uri):
    829     """Verify and wait until we have write access to Google Storage.
    830 
    831     @param gs_uri: The Google Storage URI we are trying to offload to.
    832     """
    833     # TODO (sbasi) Try to use the gsutil command to check write access.
    834     # Ensure we have write access to gs_uri.
    835     dummy_file = tempfile.NamedTemporaryFile()
    836     test_cmd = _get_cmd_list(False, dummy_file.name, gs_uri)
    837     while True:
    838         try:
    839             subprocess.check_call(test_cmd)
    840             subprocess.check_call(
    841                     ['gsutil', 'rm',
    842                      os.path.join(gs_uri,
    843                                   os.path.basename(dummy_file.name))])
    844             break
    845         except subprocess.CalledProcessError:
    846             logging.debug('Unable to offload to %s, sleeping.', gs_uri)
    847             time.sleep(120)
    848 
    849 
    850 class Offloader(object):
    851     """State of the offload process.
    852 
    853     Contains the following member fields:
    854       * _gs_offloader:  _BaseGSOffloader to use to offload a job directory.
    855       * _jobdir_classes:  List of classes of job directory to be
    856         offloaded.
    857       * _processes:  Maximum number of outstanding offload processes
    858         to allow during an offload cycle.
    859       * _age_limit:  Minimum age in days at which a job may be
    860         offloaded.
    861       * _open_jobs: a dictionary mapping directory paths to Job
    862         objects.
    863     """
    864 
    865     def __init__(self, options):
    866         self._upload_age_limit = options.age_to_upload
    867         self._delete_age_limit = options.age_to_delete
    868         if options.delete_only:
    869             self._gs_offloader = FakeGSOffloader()
    870         else:
    871             self.gs_uri = utils.get_offload_gsuri()
    872             logging.debug('Offloading to: %s', self.gs_uri)
    873             multiprocessing = False
    874             if options.multiprocessing:
    875                 multiprocessing = True
    876             elif options.multiprocessing is None:
    877                 multiprocessing = GS_OFFLOADER_MULTIPROCESSING
    878             logging.info(
    879                     'Offloader multiprocessing is set to:%r', multiprocessing)
    880             console_client = None
    881             if (cloud_console_client and
    882                     cloud_console_client.is_cloud_notification_enabled()):
    883                 console_client = cloud_console_client.PubSubBasedClient()
    884             self._gs_offloader = GSOffloader(
    885                     self.gs_uri, multiprocessing, self._delete_age_limit,
    886                     console_client)
    887         classlist = []
    888         if options.process_hosts_only or options.process_all:
    889             classlist.append(job_directories.SpecialJobDirectory)
    890         if not options.process_hosts_only:
    891             classlist.append(job_directories.RegularJobDirectory)
    892         self._jobdir_classes = classlist
    893         assert self._jobdir_classes
    894         self._processes = options.parallelism
    895         self._open_jobs = {}
    896         self._pusub_topic = None
    897         self._offload_count_limit = 3
    898 
    899 
    900     def _add_new_jobs(self):
    901         """Find new job directories that need offloading.
    902 
    903         Go through the file system looking for valid job directories
    904         that are currently not in `self._open_jobs`, and add them in.
    905 
    906         """
    907         new_job_count = 0
    908         for cls in self._jobdir_classes:
    909             for resultsdir in cls.get_job_directories():
    910                 if (
    911                         resultsdir in self._open_jobs
    912                         or _is_uploaded(resultsdir)):
    913                     continue
    914                 self._open_jobs[resultsdir] = cls(resultsdir)
    915                 new_job_count += 1
    916         logging.debug('Start of offload cycle - found %d new jobs',
    917                       new_job_count)
    918 
    919 
    920     def _remove_offloaded_jobs(self):
    921         """Removed offloaded jobs from `self._open_jobs`."""
    922         removed_job_count = 0
    923         for jobkey, job in self._open_jobs.items():
    924             if (
    925                     not os.path.exists(job.dirname)
    926                     or _is_uploaded(job.dirname)):
    927                 del self._open_jobs[jobkey]
    928                 removed_job_count += 1
    929         logging.debug('End of offload cycle - cleared %d new jobs, '
    930                       'carrying %d open jobs',
    931                       removed_job_count, len(self._open_jobs))
    932 
    933 
    934     def _report_failed_jobs(self):
    935         """Report status after attempting offload.
    936 
    937         This function processes all jobs in `self._open_jobs`, assuming
    938         an attempt has just been made to offload all of them.
    939 
    940         If any jobs have reportable errors, and we haven't generated
    941         an e-mail report in the last `REPORT_INTERVAL_SECS` seconds,
    942         send new e-mail describing the failures.
    943 
    944         """
    945         failed_jobs = [j for j in self._open_jobs.values() if
    946                        j.first_offload_start]
    947         self._report_failed_jobs_count(failed_jobs)
    948         self._log_failed_jobs_locally(failed_jobs)
    949 
    950 
    951     def offload_once(self):
    952         """Perform one offload cycle.
    953 
    954         Find all job directories for new jobs that we haven't seen
    955         before.  Then, attempt to offload the directories for any
    956         jobs that have finished running.  Offload of multiple jobs
    957         is done in parallel, up to `self._processes` at a time.
    958 
    959         After we've tried uploading all directories, go through the list
    960         checking the status of all uploaded directories.  If necessary,
    961         report failures via e-mail.
    962 
    963         """
    964         self._add_new_jobs()
    965         self._report_current_jobs_count()
    966         with parallel.BackgroundTaskRunner(
    967                 self._gs_offloader.offload, processes=self._processes) as queue:
    968             for job in self._open_jobs.values():
    969                 _enqueue_offload(job, queue, self._upload_age_limit)
    970         self._give_up_on_jobs_over_limit()
    971         self._remove_offloaded_jobs()
    972         self._report_failed_jobs()
    973 
    974 
    975     def _give_up_on_jobs_over_limit(self):
    976         """Give up on jobs that have gone over the offload limit.
    977 
    978         We mark them as uploaded as we won't try to offload them any more.
    979         """
    980         for job in self._open_jobs.values():
    981             if job.offload_count >= self._offload_count_limit:
    982                 _mark_uploaded(job.dirname)
    983 
    984 
    985     def _log_failed_jobs_locally(self, failed_jobs,
    986                                  log_file=FAILED_OFFLOADS_FILE):
    987         """Updates a local file listing all the failed jobs.
    988 
    989         The dropped file can be used by the developers to list jobs that we have
    990         failed to upload.
    991 
    992         @param failed_jobs: A list of failed _JobDirectory objects.
    993         @param log_file: The file to log the failed jobs to.
    994         """
    995         now = datetime.datetime.now()
    996         now_str = now.strftime(FAILED_OFFLOADS_TIME_FORMAT)
    997         formatted_jobs = [_format_job_for_failure_reporting(job)
    998                             for job in failed_jobs]
    999         formatted_jobs.sort()
   1000 
   1001         with open(log_file, 'w') as logfile:
   1002             logfile.write(FAILED_OFFLOADS_FILE_HEADER %
   1003                           (now_str, len(failed_jobs)))
   1004             logfile.writelines(formatted_jobs)
   1005 
   1006 
   1007     def _report_current_jobs_count(self):
   1008         """Report the number of outstanding jobs to monarch."""
   1009         metrics.Gauge('chromeos/autotest/gs_offloader/current_jobs_count').set(
   1010                 len(self._open_jobs))
   1011 
   1012 
   1013     def _report_failed_jobs_count(self, failed_jobs):
   1014         """Report the number of outstanding failed offload jobs to monarch.
   1015 
   1016         @param: List of failed jobs.
   1017         """
   1018         metrics.Gauge('chromeos/autotest/gs_offloader/failed_jobs_count').set(
   1019                 len(failed_jobs))
   1020 
   1021 
   1022 def _enqueue_offload(job, queue, age_limit):
   1023     """Enqueue the job for offload, if it's eligible.
   1024 
   1025     The job is eligible for offloading if the database has marked
   1026     it finished, and the job is older than the `age_limit`
   1027     parameter.
   1028 
   1029     If the job is eligible, offload processing is requested by
   1030     passing the `queue` parameter's `put()` method a sequence with
   1031     the job's `dirname` attribute and its directory name.
   1032 
   1033     @param job       _JobDirectory instance to offload.
   1034     @param queue     If the job should be offloaded, put the offload
   1035                      parameters into this queue for processing.
   1036     @param age_limit Minimum age for a job to be offloaded.  A value
   1037                      of 0 means that the job will be offloaded as
   1038                      soon as it is finished.
   1039 
   1040     """
   1041     if not job.offload_count:
   1042         if not _is_expired(job, age_limit):
   1043             return
   1044         job.first_offload_start = time.time()
   1045     job.offload_count += 1
   1046     if job.process_gs_instructions():
   1047         timestamp = job.get_timestamp_if_finished()
   1048         queue.put([job.dirname, os.path.dirname(job.dirname), timestamp])
   1049 
   1050 
   1051 def parse_options():
   1052     """Parse the args passed into gs_offloader."""
   1053     defaults = 'Defaults:\n  Destination: %s\n  Results Path: %s' % (
   1054             utils.DEFAULT_OFFLOAD_GSURI, RESULTS_DIR)
   1055     usage = 'usage: %prog [options]\n' + defaults
   1056     parser = OptionParser(usage)
   1057     parser.add_option('-a', '--all', dest='process_all',
   1058                       action='store_true',
   1059                       help='Offload all files in the results directory.')
   1060     parser.add_option('-s', '--hosts', dest='process_hosts_only',
   1061                       action='store_true',
   1062                       help='Offload only the special tasks result files '
   1063                       'located in the results/hosts subdirectory')
   1064     parser.add_option('-p', '--parallelism', dest='parallelism',
   1065                       type='int', default=1,
   1066                       help='Number of parallel workers to use.')
   1067     parser.add_option('-o', '--delete_only', dest='delete_only',
   1068                       action='store_true',
   1069                       help='GS Offloader will only the delete the '
   1070                       'directories and will not offload them to google '
   1071                       'storage. NOTE: If global_config variable '
   1072                       'CROS.gs_offloading_enabled is False, --delete_only '
   1073                       'is automatically True.',
   1074                       default=not GS_OFFLOADING_ENABLED)
   1075     parser.add_option('-d', '--days_old', dest='days_old',
   1076                       help='Minimum job age in days before a result can be '
   1077                       'offloaded.', type='int', default=0)
   1078     parser.add_option('-l', '--log_size', dest='log_size',
   1079                       help='Limit the offloader logs to a specified '
   1080                       'number of Mega Bytes.', type='int', default=0)
   1081     parser.add_option('-m', dest='multiprocessing', action='store_true',
   1082                       help='Turn on -m option for gsutil. If not set, the '
   1083                       'global config setting gs_offloader_multiprocessing '
   1084                       'under CROS section is applied.')
   1085     parser.add_option('-i', '--offload_once', dest='offload_once',
   1086                       action='store_true',
   1087                       help='Upload all available results and then exit.')
   1088     parser.add_option('-y', '--normal_priority', dest='normal_priority',
   1089                       action='store_true',
   1090                       help='Upload using normal process priority.')
   1091     parser.add_option('-u', '--age_to_upload', dest='age_to_upload',
   1092                       help='Minimum job age in days before a result can be '
   1093                       'offloaded, but not removed from local storage',
   1094                       type='int', default=None)
   1095     parser.add_option('-n', '--age_to_delete', dest='age_to_delete',
   1096                       help='Minimum job age in days before a result can be '
   1097                       'removed from local storage',
   1098                       type='int', default=None)
   1099     parser.add_option(
   1100             '--metrics-file',
   1101             help='If provided, drop metrics to this local file instead of '
   1102                  'reporting to ts_mon',
   1103             type=str,
   1104             default=None,
   1105     )
   1106 
   1107     options = parser.parse_args()[0]
   1108     if options.process_all and options.process_hosts_only:
   1109         parser.print_help()
   1110         print ('Cannot process all files and only the hosts '
   1111                'subdirectory. Please remove an argument.')
   1112         sys.exit(1)
   1113 
   1114     if options.days_old and (options.age_to_upload or options.age_to_delete):
   1115         parser.print_help()
   1116         print('Use the days_old option or the age_to_* options but not both')
   1117         sys.exit(1)
   1118 
   1119     if options.age_to_upload == None:
   1120         options.age_to_upload = options.days_old
   1121     if options.age_to_delete == None:
   1122         options.age_to_delete = options.days_old
   1123 
   1124     return options
   1125 
   1126 
   1127 def main():
   1128     """Main method of gs_offloader."""
   1129     options = parse_options()
   1130 
   1131     if options.process_all:
   1132         offloader_type = 'all'
   1133     elif options.process_hosts_only:
   1134         offloader_type = 'hosts'
   1135     else:
   1136         offloader_type = 'jobs'
   1137 
   1138     _setup_logging(options, offloader_type)
   1139 
   1140     # Nice our process (carried to subprocesses) so we don't overload
   1141     # the system.
   1142     if not options.normal_priority:
   1143         logging.debug('Set process to nice value: %d', NICENESS)
   1144         os.nice(NICENESS)
   1145     if psutil:
   1146         proc = psutil.Process()
   1147         logging.debug('Set process to ionice IDLE')
   1148         proc.ionice(psutil.IOPRIO_CLASS_IDLE)
   1149 
   1150     # os.listdir returns relative paths, so change to where we need to
   1151     # be to avoid an os.path.join on each loop.
   1152     logging.debug('Offloading Autotest results in %s', RESULTS_DIR)
   1153     os.chdir(RESULTS_DIR)
   1154 
   1155     service_name = 'gs_offloader(%s)' % offloader_type
   1156     with ts_mon_config.SetupTsMonGlobalState(service_name, indirect=True,
   1157                                              short_lived=False,
   1158                                              debug_file=options.metrics_file):
   1159         with metrics.SuccessCounter('chromeos/autotest/gs_offloader/exit'):
   1160             offloader = Offloader(options)
   1161             if not options.delete_only:
   1162                 wait_for_gs_write_access(offloader.gs_uri)
   1163             while True:
   1164                 offloader.offload_once()
   1165                 if options.offload_once:
   1166                     break
   1167                 time.sleep(SLEEP_TIME_SECS)
   1168 
   1169 
   1170 _LOG_LOCATION = '/usr/local/autotest/logs/'
   1171 _LOG_FILENAME_FORMAT = 'gs_offloader_%s_log_%s.txt'
   1172 _LOG_TIMESTAMP_FORMAT = '%Y%m%d_%H%M%S'
   1173 _LOGGING_FORMAT = '%(asctime)s - %(levelname)s - %(message)s'
   1174 
   1175 
   1176 def _setup_logging(options, offloader_type):
   1177     """Set up logging.
   1178 
   1179     @param options: Parsed options.
   1180     @param offloader_type: Type of offloader action as string.
   1181     """
   1182     log_filename = _get_log_filename(options, offloader_type)
   1183     log_formatter = logging.Formatter(_LOGGING_FORMAT)
   1184     # Replace the default logging handler with a RotatingFileHandler. If
   1185     # options.log_size is 0, the file size will not be limited. Keeps
   1186     # one backup just in case.
   1187     handler = logging.handlers.RotatingFileHandler(
   1188             log_filename, maxBytes=1024 * options.log_size, backupCount=1)
   1189     handler.setFormatter(log_formatter)
   1190     logger = logging.getLogger()
   1191     logger.setLevel(logging.DEBUG)
   1192     logger.addHandler(handler)
   1193 
   1194 
   1195 def _get_log_filename(options, offloader_type):
   1196     """Get log filename.
   1197 
   1198     @param options: Parsed options.
   1199     @param offloader_type: Type of offloader action as string.
   1200     """
   1201     if options.log_size > 0:
   1202         log_timestamp = ''
   1203     else:
   1204         log_timestamp = time.strftime(_LOG_TIMESTAMP_FORMAT)
   1205     log_basename = _LOG_FILENAME_FORMAT % (offloader_type, log_timestamp)
   1206     return os.path.join(_LOG_LOCATION, log_basename)
   1207 
   1208 
   1209 if __name__ == '__main__':
   1210     main()
   1211