Home | History | Annotate | Download | only in graphite
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 # This file defines helper functions for putting entries into elasticsearch.
      6 
      7 """Utils for sending metadata to elasticsearch
      8 
      9 Elasticsearch is a key-value store NOSQL database.
     10 Source is here: https://github.com/elasticsearch/elasticsearch
     11 We will be using es to store our metadata.
     12 
     13 For example, if we wanted to store the following metadata:
     14 
     15 metadata = {
     16     'host_id': 1
     17     'job_id': 20
     18     'time_start': 100000
     19     'time_recorded': 100006
     20 }
     21 
     22 The following call will send metadata to the default es server.
     23     autotest_es.post(index, metadata)
     24 We can also specify which port and host to use.
     25 
     26 Using for testing: Sometimes, when we choose a single index
     27 to put entries into, we want to clear that index of all
     28 entries before running our tests. Use clear_index function.
     29 (see es_utils_functionaltest.py for an example)
     30 
     31 This file also contains methods for sending queries to es. Currently,
     32 the query (json dict) we send to es is quite complicated (but flexible).
     33 
     34 For example, the below query returns job_id, host_id, and job_start
     35 for all job_ids in [0, 99999] and host_id matching 10.
     36 
     37 range_eq_query = {
     38     'fields': ['job_id', 'host_id', 'job_start'],
     39     'query': {
     40         'filtered': {
     41             'query': {
     42                 'match': {
     43                     'host_id': 10,
     44                 }
     45             }
     46             'filter': {
     47                 'range': {
     48                     'job_id': {
     49                         'gte': 0,
     50                         'lte': 99999,
     51                     }
     52                 }
     53             }
     54         }
     55     }
     56 }
     57 
     58 To send a query once it is created, call execute_query() to send it to the
     59 intended elasticsearch server. The query() function can be used to construct a
     60 query with certain parameters and execute it all in one call.
     61 
     62 """
     63 
     64 import es_utils
     65 
     66 import common
     67 from autotest_lib.client.common_lib import global_config
     68 
     69 
     70 # Server and ports for elasticsearch (for metadata use only)
     71 METADATA_ES_SERVER = global_config.global_config.get_config_value(
     72         'CROS', 'ES_HOST', default='localhost')
     73 ES_PORT = global_config.global_config.get_config_value(
     74         'CROS', 'ES_PORT', type=int, default=9200)
     75 ES_UDP_PORT = global_config.global_config.get_config_value(
     76         'CROS', 'ES_UDP_PORT', type=int, default=9700)
     77 # Whether to use http. udp is very little overhead (around 3 ms) compared to
     78 # using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for
     79 # subsequent connections.
     80 ES_USE_HTTP = global_config.global_config.get_config_value(
     81         'CROS', 'ES_USE_HTTP', type=bool, default=False)
     82 
     83 # If CLIENT/metadata_index is not set, INDEX_METADATA falls back to
     84 # autotest instance name (SERVER/hostname).
     85 INDEX_METADATA = global_config.global_config.get_config_value(
     86         'CLIENT', 'metadata_index', type=str, default=None)
     87 if not INDEX_METADATA:
     88     INDEX_METADATA = global_config.global_config.get_config_value(
     89             'SERVER', 'hostname', type=str, default='localhost')
     90 
     91 # 3 Seconds before connection to esdb timeout.
     92 DEFAULT_TIMEOUT = 3
     93 
     94 DEFAULT_BULK_POST_RETRIES = 5
     95 
     96 def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT,
     97          timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA, udp_port=ES_UDP_PORT,
     98          *args, **kwargs):
     99     """This function takes a series of arguments which are passed to the
    100     es_utils.ESMetadata constructor, and any other arguments are passed to
    101     its post() function. For an explanation of each, see those functions in
    102     es_utils.
    103     """
    104     esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port,
    105                                timeout=timeout, index=index, udp_port=udp_port)
    106     return esmd.post(*args, **kwargs)
    107 
    108 
    109 def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT,
    110               timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA,
    111               retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs):
    112     """This function takes a series of arguments which are passed to the
    113     es_utils.ESMetadata constructor, and a list of metadata, then upload to
    114     Elasticsearch server using Elasticsearch bulk API. This can greatly nhance
    115     the performance of uploading data using HTTP.
    116     For an explanation of each argument, see those functions in es_utils.
    117     """
    118     esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
    119                                timeout=timeout, index=index,
    120                                udp_port=ES_UDP_PORT)
    121     # bulk post may fail due to the amount of data, retry several times.
    122     for _ in range(retries):
    123         if esmd.bulk_post(data_list, *args, **kwargs):
    124             return True
    125     return False
    126 
    127 
    128 def execute_query(host=METADATA_ES_SERVER, port=ES_PORT,
    129                   timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA,
    130                   *args, **kwargs):
    131     """This function takes a series of arguments which are passed to the
    132     es_utils.ESMetadata constructor, and any other arguments are passed to
    133     its execute_query() function. For an explanation of each, see those
    134     functions in es_utils.
    135     """
    136     esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
    137                                timeout=timeout, index=index, udp_port=0)
    138     return esmd.execute_query(*args, **kwargs)
    139 
    140 
    141 def query(host=METADATA_ES_SERVER, port=ES_PORT, timeout=DEFAULT_TIMEOUT,
    142           index=INDEX_METADATA, *args, **kwargs):
    143     """This function takes a series of arguments which are passed to the
    144     es_utils.ESMetadata constructor, and any other arguments are passed to
    145     its query() function. For an explanation of each, see those functions in
    146     es_utils.
    147     """
    148     esmd = es_utils.ESMetadata(use_http=True, host=host, port=port,
    149                                timeout=timeout, index=index, udp_port=0)
    150     return esmd.query(*args, **kwargs)
    151