Home | History | Annotate | Download | only in site_utils
      1 # Copyright 2015 The Chromium Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 """
      6 This module is designed to report metadata in a separated thread to avoid the
      7 performance overhead of sending data to Elasticsearch using HTTP.
      8 
      9 """
     10 
     11 import logging
     12 import Queue
     13 import socket
     14 import time
     15 import threading
     16 
     17 import common
     18 from autotest_lib.client.common_lib import utils
     19 
     20 try:
     21     from chromite.lib import metrics
     22 except ImportError:
     23     metrics = utils.metrics_mock
     24 
     25 
     26 _METADATA_METRICS_PREFIX = 'chromeos/autotest/es_metadata_reporter/'
     27 
     28 # Number of seconds to wait before checking queue again for uploading data.
     29 _REPORT_INTERVAL_SECONDS = 5
     30 
     31 _MAX_METADATA_QUEUE_SIZE = 1000000
     32 _MAX_UPLOAD_SIZE = 50000
     33 # The number of seconds for upload to fail continuously. After that, upload will
     34 # be limited to 1 entry.
     35 _MAX_UPLOAD_FAIL_DURATION = 600
     36 # Number of entries to retry when the previous upload failed continueously for
     37 # the duration of _MAX_UPLOAD_FAIL_DURATION.
     38 _MIN_RETRY_ENTRIES = 10
     39 # Queue to buffer metadata to be reported.
     40 metadata_queue = Queue.Queue(_MAX_METADATA_QUEUE_SIZE)
     41 
     42 _report_lock = threading.Lock()
     43 _abort = threading.Event()
     44 _queue_full = threading.Event()
     45 _metrics_fields = {}
     46 
     47 def  _get_metrics_fields():
     48     """Get the fields information to be uploaded to metrics."""
     49     if not _metrics_fields:
     50         _metrics_fields['hostname'] = socket.gethostname()
     51 
     52     return _metrics_fields
     53 
     54 
     55 def queue(data):
     56     """Queue metadata to be uploaded in reporter thread.
     57 
     58     If the queue is full, an error will be logged for the first time the queue
     59     becomes full. The call does not wait or raise Queue.Full exception, so
     60     there is no overhead on the performance of caller, e.g., scheduler.
     61 
     62     @param data: A metadata entry, which should be a dictionary.
     63     """
     64     if not is_running():
     65         return
     66 
     67     try:
     68         metadata_queue.put_nowait(data)
     69         if _queue_full.is_set():
     70             logging.info('Metadata queue is available to receive new data '
     71                          'again.')
     72             _queue_full.clear()
     73     except Queue.Full:
     74         if not _queue_full.is_set():
     75             _queue_full.set()
     76             logging.error('Metadata queue is full, cannot report data. '
     77                           'Consider increasing the value of '
     78                           '_MAX_METADATA_QUEUE_SIZE. Its current value is set '
     79                           'to %d.', _MAX_METADATA_QUEUE_SIZE)
     80 
     81 
     82 def _run():
     83     """Report metadata in the queue until being aborted.
     84     """
     85     # Time when the first time upload failed. None if the last upload succeeded.
     86     first_failed_upload = None
     87     upload_size = _MIN_RETRY_ENTRIES
     88 
     89     try:
     90         while True:
     91             start_time = time.time()
     92             data_list = []
     93             if (first_failed_upload and
     94                 time.time() - first_failed_upload > _MAX_UPLOAD_FAIL_DURATION):
     95                 upload_size = _MIN_RETRY_ENTRIES
     96             else:
     97                 upload_size = min(upload_size*2, _MAX_UPLOAD_SIZE)
     98             while (not metadata_queue.empty() and len(data_list) < upload_size):
     99                 data_list.append(metadata_queue.get_nowait())
    100             if data_list:
    101                 success = False
    102                 fields = _get_metrics_fields().copy()
    103                 fields['success'] = success
    104                 metrics.Gauge(
    105                         _METADATA_METRICS_PREFIX + 'upload/batch_sizes').set(
    106                                 len(data_list), fields=fields)
    107                 metrics.Counter(
    108                         _METADATA_METRICS_PREFIX + 'upload/attempts').increment(
    109                                 fields=fields);
    110 
    111             metrics.Gauge(_METADATA_METRICS_PREFIX + 'queue_size').set(
    112                     metadata_queue.qsize(), fields=_get_metrics_fields())
    113             sleep_time = _REPORT_INTERVAL_SECONDS - time.time() + start_time
    114             if sleep_time < 0:
    115                 sleep_time = 0.5
    116             _abort.wait(timeout=sleep_time)
    117     except Exception as e:
    118         logging.exception('Metadata reporter thread failed with error: %s', e)
    119         raise
    120     finally:
    121         logging.info('Metadata reporting thread is exiting.')
    122         _abort.clear()
    123         _report_lock.release()
    124 
    125 
    126 def is_running():
    127     """Check if metadata_reporter is running.
    128 
    129     @return: True if metadata_reporter is running.
    130     """
    131     return _report_lock.locked()
    132 
    133 
    134 def start():
    135     """Start the thread to report metadata.
    136     """
    137     # The lock makes sure there is only one reporting thread working.
    138     if is_running():
    139         logging.error('There is already a metadata reporter thread.')
    140         return
    141 
    142     logging.warn('Elasticsearch db deprecated, no metadata will be '
    143                  'reported.')
    144 
    145     _report_lock.acquire()
    146     reporting_thread = threading.Thread(target=_run)
    147     # Make it a daemon thread so it doesn't need to be closed explicitly.
    148     reporting_thread.setDaemon(True)
    149     reporting_thread.start()
    150     logging.info('Metadata reporting thread is started.')
    151 
    152 
    153 def abort():
    154     """Abort the thread to report metadata.
    155 
    156     The call will wait up to 5 seconds for existing data to be uploaded.
    157     """
    158     if  not is_running():
    159         logging.error('The metadata reporting thread has already exited.')
    160         return
    161 
    162     _abort.set()
    163     logging.info('Waiting up to %s seconds for metadata reporting thread to '
    164                  'complete.', _REPORT_INTERVAL_SECONDS)
    165     _abort.wait(_REPORT_INTERVAL_SECONDS)
    166