Home | History | Annotate | Download | only in site_utils
      1 # Copyright 2015 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """
      6 This module is designed to report metadata in a separated thread to avoid the
      7 performance overhead of sending data to Elasticsearch using HTTP.
      8 
      9 """
     10 
     11 import logging
     12 import Queue
     13 import time
     14 import threading
     15 
     16 import common
     17 from autotest_lib.client.common_lib.cros.graphite import autotest_es
     18 from autotest_lib.client.common_lib.cros.graphite import autotest_stats
     19 from autotest_lib.scheduler import email_manager
     20 # The metadata_reporter thread runs inside scheduler process, thus it doesn't
     21 # need to setup django, otherwise, following import is needed:
     22 # from autotest_lib.frontend import setup_django_environment
     23 from autotest_lib.site_utils import server_manager_utils
     24 
     25 
     26 # Number of seconds to wait before checking queue again for uploading data.
     27 _REPORT_INTERVAL_SECONDS = 5
     28 
     29 _MAX_METADATA_QUEUE_SIZE = 1000000
     30 _MAX_UPLOAD_SIZE = 50000
     31 # The number of seconds for upload to fail continuously. After that, upload will
     32 # be limited to 1 entry.
     33 _MAX_UPLOAD_FAIL_DURATION = 600
     34 # Number of entries to retry when the previous upload failed continueously for
     35 # the duration of _MAX_UPLOAD_FAIL_DURATION.
     36 _MIN_RETRY_ENTRIES = 10
     37 # Queue to buffer metadata to be reported.
     38 metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
     39 
     40 _report_lock = threading.Lock()
     41 _abort = threading.Event()
     42 _queue_full = threading.Event()
     43 
     44 def queue(data):
     45     """Queue metadata to be uploaded in reporter thread.
     46 
     47     If the queue is full, an error will be logged for the first time the queue
     48     becomes full. The call does not wait or raise Queue.Full exception, so
     49     there is no overhead on the performance of caller, e.g., scheduler.
     50 
     51     @param data: A metadata entry, which should be a dictionary.
     52     """
     53     try:
     54         metadata_queue.put_nowait(data)
     55         if _queue_full.is_set():
     56             logging.info('Metadata queue is available to receive new data '
     57                          'again.')
     58             _queue_full.clear()
     59     except Queue.Full:
     60         if not _queue_full.is_set():
     61             _queue_full.set()
     62             logging.error('Metadata queue is full, cannot report data. '
     63                           'Consider increasing the value of '
     64                           '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
     65                           'to %d.', _MAX_METADATA_QUEUE_SIZE)
     66 
     67 
     68 def _email_alert():
     69     """
     70     """
     71     if not server_manager_utils.use_server_db():
     72         logging.debug('Server database not emailed, email alert is skipped.')
     73         return
     74     try:
     75         server_manager_utils.confirm_server_has_role(hostname='localhost',
     76                                                      role='scheduler')
     77     except server_manager_utils.ServerActionError:
     78         # Only email alert if the server is a scheduler, not shard.
     79         return
     80     subject = ('Metadata upload has been failing for %d seconds' %
     81                _MAX_UPLOAD_FAIL_DURATION)
     82     email_manager.manager.enqueue_notify_email(subject, '')
     83     email_manager.manager.send_queued_emails()
     84 
     85 
     86 def _run():
     87     """Report metadata in the queue until being aborted.
     88     """
     89     # Time when the first time upload failed. None if the last upload succeeded.
     90     first_failed_upload = None
     91     # True if email alert was sent when upload has been failing continuously
     92     # for _MAX_UPLOAD_FAIL_DURATION seconds.
     93     email_alert = False
     94     upload_size = _MIN_RETRY_ENTRIES
     95     try:
     96         while True:
     97             start_time = time.time()
     98             data_list = []
     99             if (first_failed_upload and
    100                 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
    101                 upload_size = _MIN_RETRY_ENTRIES
    102                 if not email_alert:
    103                     _email_alert()
    104                     email_alert = True
    105             else:
    106                 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
    107             while (not metadata_queue.empty() and len(data_list) < upload_size):
    108                 data_list.append(metadata_queue.get_nowait())
    109             if data_list:
    110                 if autotest_es.bulk_post(data_list=data_list):
    111                     time_used = time.time() - start_time
    112                     logging.info('%d entries of metadata uploaded in %s '
    113                                  'seconds.', len(data_list), time_used)
    114                     autotest_stats.Timer('metadata_reporter').send(
    115                             'time_used', time_used)
    116                     autotest_stats.Gauge('metadata_reporter').send(
    117                             'entries_uploaded', len(data_list))
    118                     first_failed_upload = None
    119                     email_alert = False
    120                 else:
    121                     logging.warn('Failed to upload %d entries of metadata, '
    122                                  'they will be retried later.', len(data_list))
    123                     autotest_stats.Gauge('metadata_reporter').send(
    124                             'entries_failed', len(data_list))
    125                     for data in data_list:
    126                         queue(data)
    127                     if not first_failed_upload:
    128                         first_failed_upload = time.time()
    129             sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
    130             if sleep_time < 0:
    131                 sleep_time = 0.5
    132             _abort.wait(timeout=sleep_time)
    133     except Exception as e:
    134         logging.error('Metadata reporter thread failed with error: %s', e)
    135         raise
    136     finally:
    137         logging.info('Metadata reporting thread is exiting.')
    138         _abort.clear()
    139         _report_lock.release()
    140 
    141 
    142 def start():
    143     """Start the thread to report metadata.
    144     """
    145     # The lock makes sure there is only one reporting thread working.
    146     if _report_lock.locked():
    147         logging.error('There is already a metadata reporter thread.')
    148         return
    149 
    150     _report_lock.acquire()
    151     reporting_thread = threading.Thread(target=_run)
    152     # Make it a daemon thread so it doesn't need to be closed explicitly.
    153     reporting_thread.setDaemon(True)
    154     reporting_thread.start()
    155     logging.info('Metadata reporting thread is started.')
    156 
    157 
    158 def abort():
    159     """Abort the thread to report metadata.
    160 
    161     The call will wait up to 5 seconds for existing data to be uploaded.
    162     """
    163     if  not _report_lock.locked():
    164         logging.error('The metadata reporting thread has already exited.')
    165         return
    166 
    167     _abort.set()
    168     logging.info('Waiting up to %s seconds for metadata reporting thread to '
    169                  'complete.', _REPORT_INTERVAL_SECONDS)
    170     _abort.wait(_REPORT_INTERVAL_SECONDS)
    171