1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved. 2 # Use of this source code is governed by a BSD-style license that can be 3 # found in the LICENSE file. 4 5 # This file defines helper functions for putting entries into elasticsearch. 6 7 """Utils for sending metadata to elasticsearch 8 9 Elasticsearch is a key-value store NOSQL database. 10 Source is here: https://github.com/elasticsearch/elasticsearch 11 We will be using es to store our metadata. 12 13 For example, if we wanted to store the following metadata: 14 15 metadata = { 16 'host_id': 1 17 'job_id': 20 18 'time_start': 100000 19 'time_recorded': 100006 20 } 21 22 The following call will send metadata to the default es server. 23 autotest_es.post(index, metadata) 24 We can also specify which port and host to use. 25 26 Using for testing: Sometimes, when we choose a single index 27 to put entries into, we want to clear that index of all 28 entries before running our tests. Use clear_index function. 29 (see es_utils_functionaltest.py for an example) 30 31 This file also contains methods for sending queries to es. Currently, 32 the query (json dict) we send to es is quite complicated (but flexible). 33 34 For example, the below query returns job_id, host_id, and job_start 35 for all job_ids in [0, 99999] and host_id matching 10. 36 37 range_eq_query = { 38 'fields': ['job_id', 'host_id', 'job_start'], 39 'query': { 40 'filtered': { 41 'query': { 42 'match': { 43 'host_id': 10, 44 } 45 } 46 'filter': { 47 'range': { 48 'job_id': { 49 'gte': 0, 50 'lte': 99999, 51 } 52 } 53 } 54 } 55 } 56 } 57 58 To send a query once it is created, call execute_query() to send it to the 59 intended elasticsearch server. The query() function can be used to construct a 60 query with certain parameters and execute it all in one call. 61 62 """ 63 64 import es_utils 65 66 import common 67 from autotest_lib.client.common_lib import global_config 68 69 70 # Server and ports for elasticsearch (for metadata use only) 71 METADATA_ES_SERVER = global_config.global_config.get_config_value( 72 'CROS', 'ES_HOST', default='localhost') 73 ES_PORT = global_config.global_config.get_config_value( 74 'CROS', 'ES_PORT', type=int, default=9200) 75 ES_UDP_PORT = global_config.global_config.get_config_value( 76 'CROS', 'ES_UDP_PORT', type=int, default=9700) 77 # Whether to use http. udp is very little overhead (around 3 ms) compared to 78 # using http (tcp) takes ~ 500 ms for the first connection and 50-100ms for 79 # subsequent connections. 80 ES_USE_HTTP = global_config.global_config.get_config_value( 81 'CROS', 'ES_USE_HTTP', type=bool, default=False) 82 83 # If CLIENT/metadata_index is not set, INDEX_METADATA falls back to 84 # autotest instance name (SERVER/hostname). 85 INDEX_METADATA = global_config.global_config.get_config_value( 86 'CLIENT', 'metadata_index', type=str, default=None) 87 if not INDEX_METADATA: 88 INDEX_METADATA = global_config.global_config.get_config_value( 89 'SERVER', 'hostname', type=str, default='localhost') 90 91 # 3 Seconds before connection to esdb timeout. 92 DEFAULT_TIMEOUT = 3 93 94 DEFAULT_BULK_POST_RETRIES = 5 95 96 def post(use_http=ES_USE_HTTP, host=METADATA_ES_SERVER, port=ES_PORT, 97 timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA, udp_port=ES_UDP_PORT, 98 *args, **kwargs): 99 """This function takes a series of arguments which are passed to the 100 es_utils.ESMetadata constructor, and any other arguments are passed to 101 its post() function. For an explanation of each, see those functions in 102 es_utils. 103 """ 104 esmd = es_utils.ESMetadata(use_http=use_http, host=host, port=port, 105 timeout=timeout, index=index, udp_port=udp_port) 106 return esmd.post(*args, **kwargs) 107 108 109 def bulk_post(data_list, host=METADATA_ES_SERVER, port=ES_PORT, 110 timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA, 111 retries=DEFAULT_BULK_POST_RETRIES, *args, **kwargs): 112 """This function takes a series of arguments which are passed to the 113 es_utils.ESMetadata constructor, and a list of metadata, then upload to 114 Elasticsearch server using Elasticsearch bulk API. This can greatly nhance 115 the performance of uploading data using HTTP. 116 For an explanation of each argument, see those functions in es_utils. 117 """ 118 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 119 timeout=timeout, index=index, 120 udp_port=ES_UDP_PORT) 121 # bulk post may fail due to the amount of data, retry several times. 122 for _ in range(retries): 123 if esmd.bulk_post(data_list, *args, **kwargs): 124 return True 125 return False 126 127 128 def execute_query(host=METADATA_ES_SERVER, port=ES_PORT, 129 timeout=DEFAULT_TIMEOUT, index=INDEX_METADATA, 130 *args, **kwargs): 131 """This function takes a series of arguments which are passed to the 132 es_utils.ESMetadata constructor, and any other arguments are passed to 133 its execute_query() function. For an explanation of each, see those 134 functions in es_utils. 135 """ 136 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 137 timeout=timeout, index=index, udp_port=0) 138 return esmd.execute_query(*args, **kwargs) 139 140 141 def query(host=METADATA_ES_SERVER, port=ES_PORT, timeout=DEFAULT_TIMEOUT, 142 index=INDEX_METADATA, *args, **kwargs): 143 """This function takes a series of arguments which are passed to the 144 es_utils.ESMetadata constructor, and any other arguments are passed to 145 its query() function. For an explanation of each, see those functions in 146 es_utils. 147 """ 148 esmd = es_utils.ESMetadata(use_http=True, host=host, port=port, 149 timeout=timeout, index=index, udp_port=0) 150 return esmd.query(*args, **kwargs) 151