Home | History | Annotate | Download | only in graphite
      1 # Copyright (c) 2014 The Chromium OS Authors. All rights reserved.
      2 # Use of this source code is governed by a BSD-style license that can be
      3 # found in the LICENSE file.
      4 
      5 # This file defines helper functions for putting entries into elasticsearch.
      6 
      7 """Utils for sending metadata to elasticsearch
      8 
      9 Elasticsearch is a key-value store NOSQL database.
     10 Source is here: https://github.com/elasticsearch/elasticsearch
     11 We will be using es to store our metadata.
     12 
     13 For example, if we wanted to store the following metadata:
     14 
     15 metadata = {
     16     'host_id': 1
     17     'job_id': 20
     18     'time_start': 100000
     19     'time_recorded': 100006
     20 }
     21 
     22 The following call will send metadata to the default es server.
     23     es_utils.ESMetadata().post(index, metadata)
     24 We can also specify which port and host to use.
     25 
     26 Using for testing: Sometimes, when we choose a single index
     27 to put entries into, we want to clear that index of all
     28 entries before running our tests. Use clear_index function.
     29 (see es_utils_functionaltest.py for an example)
     30 
     31 This file also contains methods for sending queries to es. Currently,
     32 the query (json dict) we send to es is quite complicated (but flexible).
     33 We've included several methods that composes queries that would be useful.
     34 These methods are all named create_*_query()
     35 
     36 For example, the below query returns job_id, host_id, and job_start
     37 for all job_ids in [0, 99999] and host_id matching 10.
     38 
     39 range_eq_query = {
     40     'fields': ['job_id', 'host_id', 'job_start'],
     41     'query': {
     42         'filtered': {
     43             'query': {
     44                 'match': {
     45                     'host_id': 10,
     46                 }
     47             }
     48             'filter': {
     49                 'range': {
     50                     'job_id': {
     51                         'gte': 0,
     52                         'lte': 99999,
     53                     }
     54                 }
     55             }
     56         }
     57     }
     58 }
     59 
     60 To send a query once it is created, call execute_query() to send it to the
     61 intended elasticsearch server.
     62 
     63 """
     64 
     65 import collections
     66 import json
     67 import logging
     68 import socket
     69 import time
     70 
     71 try:
     72     import elasticsearch
     73     from elasticsearch import helpers as elasticsearch_helpers
     74 except ImportError:
     75     logging.debug('Failed to import elasticsearch. Mock classes will be used '
     76                   'and calls to Elasticsearch server will be no-op. Test run '
     77                   'is not affected by the missing elasticsearch module.')
     78     import elasticsearch_mock as elasticsearch
     79     elasticsearch_helpers = elasticsearch.Elasticsearch()
     80 
     81 
     82 DEFAULT_TIMEOUT = 30
     83 
     84 
     85 class EsUtilException(Exception):
     86     """Exception raised when functions here fail. """
     87     pass
     88 
     89 
     90 QueryResult = collections.namedtuple('QueryResult', ['total', 'hits'])
     91 
     92 
     93 class ESMetadata(object):
     94     """Class handling es connection for metadata."""
     95 
     96     @property
     97     def es(self):
     98         """Read only property, lazily initialized"""
     99         if not self._es:
    100             self._es = elasticsearch.Elasticsearch(host=self.host,
    101                                                    port=self.port,
    102                                                    timeout=self.timeout)
    103         return self._es
    104 
    105 
    106     def __init__(self, use_http, host, port, index, udp_port,
    107                  timeout=DEFAULT_TIMEOUT):
    108         """Initialize ESMetadata object.
    109 
    110         @param use_http: Whether to send data to ES using HTTP.
    111         @param host: Elasticsearch host.
    112         @param port: Elasticsearch port.
    113         @param index: What index the metadata is stored in.
    114         @param udp_port: What port to use for UDP data.
    115         @param timeout: How long to wait while connecting to es.
    116         """
    117         self.use_http = use_http
    118         self.host = host
    119         self.port = port
    120         self.index = index
    121         self.udp_port = udp_port
    122         self.timeout = timeout
    123         self._es = None
    124 
    125 
    126     def _send_data_http(self, type_str, metadata):
    127         """Sends data to insert into elasticsearch using HTTP.
    128 
    129         @param type_str: sets the _type field in elasticsearch db.
    130         @param metadata: dictionary object containing metadata
    131         """
    132         try:
    133             self.es.index(index=self.index, doc_type=type_str, body=metadata)
    134         except elasticsearch.ElasticsearchException as e:
    135             # Mute exceptions from metadata reporting to prevent meta data
    136             # reporting errors from killing test.
    137             logging.error(e)
    138 
    139 
    140     def _send_data_udp(self, type_str, metadata):
    141         """Sends data to insert into elasticsearch using UDP.
    142 
    143         @param type_str: sets the _type field in elasticsearch db.
    144         @param metadata: dictionary object containing metadata
    145         """
    146         try:
    147             # Header.
    148             message = json.dumps(
    149                     {'index': {'_index': self.index, '_type': type_str}},
    150                     separators=(', ', ' : '))
    151             message += '\n'
    152             # Metadata.
    153             message += json.dumps(metadata, separators=(', ', ' : '))
    154             message += '\n'
    155 
    156             sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    157             sock.sendto(message, (self.host, self.udp_port))
    158         except socket.error as e:
    159             logging.warn(e)
    160 
    161 
    162     def post(self, type_str, metadata, log_time_recorded=True, **kwargs):
    163         """Wraps call of send_data, inserts entry into elasticsearch.
    164 
    165         @param type_str: Sets the _type field in elasticsearch db.
    166         @param metadata: Dictionary object containing metadata
    167         @param log_time_recorded: Whether to automatically record the time
    168                                   this metadata is recorded. Default is True.
    169         @param kwargs: Additional metadata fields
    170 
    171         @return: True if post action succeeded. Otherwise return False.
    172 
    173         """
    174         if not metadata:
    175             return True
    176 
    177         metadata = metadata.copy()
    178         metadata.update(kwargs)
    179         # metadata should not contain anything with key '_type'
    180         if '_type' in metadata:
    181             type_str = metadata['_type']
    182             del metadata['_type']
    183         if log_time_recorded:
    184             metadata['time_recorded'] = time.time()
    185         try:
    186             if self.use_http:
    187                 self._send_data_http(type_str, metadata)
    188             else:
    189                 self._send_data_udp(type_str, metadata)
    190             return True
    191         except elasticsearch.ElasticsearchException as e:
    192             logging.error(e)
    193             return False
    194 
    195 
    196     def bulk_post(self, data_list, log_time_recorded=True, **kwargs):
    197         """Wraps call of send_data, inserts entry into elasticsearch.
    198 
    199         @param data_list: A list of dictionary objects containing metadata.
    200         @param log_time_recorded: Whether to automatically record the time
    201                                   this metadata is recorded. Default is True.
    202         @param kwargs: Additional metadata fields
    203 
    204         @return: True if post action succeeded. Otherwise return False.
    205 
    206         """
    207         if not data_list:
    208             return True
    209 
    210         actions = []
    211         for metadata in data_list:
    212             metadata = metadata.copy()
    213             metadata.update(kwargs)
    214             if log_time_recorded and not 'time_recorded' in metadata:
    215                 metadata['time_recorded'] = time.time()
    216             metadata['_index'] = self.index
    217             actions.append(metadata)
    218 
    219         try:
    220             elasticsearch_helpers.bulk(self.es, actions)
    221             return True
    222         except elasticsearch.ElasticsearchException as e:
    223             logging.error(e)
    224             return False
    225 
    226 
    227     def _compose_query(self, equality_constraints=[], fields_returned=None,
    228                        range_constraints=[], size=1000000, sort_specs=None,
    229                        regex_constraints=[], batch_constraints=[]):
    230         """Creates a dict. representing multple range and/or equality queries.
    231 
    232         Example input:
    233         _compose_query(
    234             fields_returned = ['time_recorded', 'hostname',
    235                                'status', 'dbg_str'],
    236             equality_constraints = [
    237                 ('_type', 'host_history'),
    238                 ('hostname', '172.22.169.106'),
    239             ],
    240             range_constraints = [
    241                 ('time_recorded', 1405628341.904379, 1405700341.904379)
    242             ],
    243             size=20,
    244             sort_specs=[
    245                 'hostname',
    246                 {'time_recorded': 'asc'},
    247             ]
    248         )
    249 
    250         Output:
    251         {
    252             'fields': ['time_recorded', 'hostname', 'status', 'dbg_str'],
    253             'query': {
    254                 'bool': {
    255                     'minimum_should_match': 3,
    256                     'should': [
    257                         {
    258                             'term':  {
    259                                 '_type': 'host_history'
    260                             }
    261                         },
    262                         {
    263                             'term': {
    264                                 'hostname': '172.22.169.106'
    265                             }
    266                         },
    267                         {
    268                             'range': {
    269                                 'time_recorded': {
    270                                     'gte': 1405628341.904379,
    271                                     'lte': 1405700341.904379
    272                                 }
    273                             }
    274                         }
    275                     ]
    276                 },
    277             },
    278             'size': 20
    279             'sort': [
    280                 'hostname',
    281                 { 'time_recorded': 'asc'},
    282             ]
    283         }
    284 
    285         @param equality_constraints: list of tuples of (field, value) pairs
    286             representing what each field should equal to in the query.
    287             e.g. [ ('field1', 1), ('field2', 'value') ]
    288         @param fields_returned: list of fields that we should return when
    289             the query is executed. Set it to None to return all fields. Note
    290             that the key/vals will be stored in _source key of the hit object,
    291             if fields_returned is set to None.
    292         @param range_constraints: list of tuples of (field, low, high) pairs
    293             representing what each field should be between (inclusive).
    294             e.g. [ ('field1', 2, 10), ('field2', -1, 20) ]
    295             If you want one side to be unbounded, you can use None.
    296             e.g. [ ('field1', 2, None) ] means value of field1 >= 2.
    297         @param size: max number of entries to return. Default is 1000000.
    298         @param sort_specs: A list of fields to sort on, tiebreakers will be
    299             broken by the next field(s).
    300         @param regex_constraints: A list of regex constraints of tuples of
    301             (field, value) pairs, e.g., [('filed1', '.*value.*')].
    302         @param batch_constraints: list of tuples of (field, list) pairs
    303             representing each field should be equal to one of the values
    304             in the list.
    305             e.g., [ ('job_id', [10, 11, 12, 13]) ]
    306         @returns: dictionary object that represents query to es.
    307                   This will return None if there are no equality constraints
    308                   and no range constraints.
    309         """
    310         if not equality_constraints and not range_constraints:
    311             raise EsUtilException('No range or equality constraints specified.')
    312 
    313         # Creates list of range dictionaries to put in the 'should' list.
    314         range_list = []
    315         if range_constraints:
    316             for key, low, high in range_constraints:
    317                 if low is None and high is None:
    318                     continue
    319                 temp_dict = {}
    320                 if low is not None:
    321                     temp_dict['gte'] = low
    322                 if high is not None:
    323                     temp_dict['lte'] = high
    324                 range_list.append( {'range': {key: temp_dict}})
    325 
    326         # Creates the list of term dictionaries to put in the 'should' list.
    327         eq_list = [{'term': {k: v}} for k, v in equality_constraints if k]
    328         batch_list = [{'terms': {k: v}} for k, v in batch_constraints if k]
    329         regex_list = [{'regexp': {k: v}} for k, v in regex_constraints if k]
    330         constraints = eq_list + batch_list + range_list + regex_list
    331         query = {
    332             'query': {
    333                 'bool': {
    334                     'must': constraints,
    335                 }
    336             },
    337         }
    338         if fields_returned:
    339             query['fields'] = fields_returned
    340         query['size'] = size
    341         if sort_specs:
    342             query['sort'] = sort_specs
    343         return query
    344 
    345 
    346     def execute_query(self, query):
    347         """Makes a query on the given index.
    348 
    349         @param query: query dictionary (see _compose_query)
    350         @returns: A QueryResult instance describing the result.
    351 
    352         Example output:
    353         {
    354             "took" : 5,
    355             "timed_out" : false,
    356             "_shards" : {
    357                 "total" : 16,
    358                 "successful" : 16,
    359                 "failed" : 0
    360             },
    361             "hits" : {
    362                 "total" : 4,
    363                 "max_score" : 1.0,
    364                 "hits" : [ {
    365                     "_index" : "graphite_metrics2",
    366                     "_type" : "metric",
    367                     "_id" : "rtntrjgdsafdsfdsfdsfdsfdssssssss",
    368                     "_score" : 1.0,
    369                     "_source":{"target_type": "timer",
    370                                "host_id": 1,
    371                                "job_id": 22,
    372                                "time_start": 400}
    373                 }, {
    374                     "_index" : "graphite_metrics2",
    375                     "_type" : "metric",
    376                     "_id" : "dfgfddddddddddddddddddddddhhh",
    377                     "_score" : 1.0,
    378                     "_source":{"target_type": "timer",
    379                         "host_id": 2,
    380                         "job_id": 23,
    381                         "time_start": 405}
    382                 }, {
    383                 "_index" : "graphite_metrics2",
    384                 "_type" : "metric",
    385                 "_id" : "erwerwerwewtrewgfednvfngfngfrhfd",
    386                 "_score" : 1.0,
    387                 "_source":{"target_type": "timer",
    388                            "host_id": 3,
    389                            "job_id": 24,
    390                            "time_start": 4098}
    391                 }, {
    392                     "_index" : "graphite_metrics2",
    393                     "_type" : "metric",
    394                     "_id" : "dfherjgwetfrsupbretowegoegheorgsa",
    395                     "_score" : 1.0,
    396                     "_source":{"target_type": "timer",
    397                                "host_id": 22,
    398                                "job_id": 25,
    399                                "time_start": 4200}
    400                 } ]
    401             }
    402         }
    403 
    404         """
    405         if not self.es.indices.exists(index=self.index):
    406             logging.error('Index (%s) does not exist on %s:%s',
    407                           self.index, self.host, self.port)
    408             return None
    409         result = self.es.search(index=self.index, body=query,
    410                                 timeout=DEFAULT_TIMEOUT,
    411                                 request_timeout=DEFAULT_TIMEOUT)
    412         # Check if all matched records are returned. It could be the size is
    413         # set too small. Special case for size set to 1, as that means that
    414         # the query cares about the first matched entry.
    415         # TODO: Use pagination in Elasticsearch. This needs major change on how
    416         #       query results are iterated.
    417         size = query.get('size', 1)
    418         return_count = len(result['hits']['hits'])
    419         total_match = result['hits']['total']
    420         if total_match > return_count and size != 1:
    421             logging.error('There are %d matched records, only %d entries are '
    422                           'returned. Query size is set to %d.', total_match,
    423                           return_count, size)
    424 
    425         # Extract the actual results from the query.
    426         output = QueryResult(total_match, [])
    427         for hit in result['hits']['hits']:
    428             converted = {}
    429             if 'fields' in hit:
    430                 for key, value in hit['fields'].items():
    431                     converted[key] = value[0] if len(value)==1 else value
    432             else:
    433                 converted = hit['_source'].copy()
    434             output.hits.append(converted)
    435         return output
    436 
    437 
    438     def query(self, *args, **kwargs):
    439         """The arguments to this function are the same as _compose_query."""
    440         query = self._compose_query(*args, **kwargs)
    441         return self.execute_query(query)
    442