mozautoeslib/eslib.py
author Jonathan Griffin <jgriffin@mozilla.com>
Tue, 14 Jun 2011 17:16:17 -0700
changeset 35 eb9808033c351e0ac76f131ca761f3cbf2714e7b
parent 34 51afbd7578ffc80ec6c235d5ecce6dbd33ebcb6b
child 36 b87730b654132d4c0bf691c57b34e632beedc95e
permissions -rw-r--r--
Don't assume script is present

import logging
# logging.basicConfig(filename='mozautoeslib.log', level=logging.DEBUG)

from pyes import *
from pyes.facets import QueryFacet

class ESLib(object):
  """Class with convenience methods for making common types of
     ElasticSearch queries.
  """

  def __init__(self, server, index, doc_type=None):
    """Initialize an ESLib object with server address, index, and doc_type.
    """

    if not server or not index:
      raise Exception("must specify server and index!")

    self.server = server

    # split the index parameter into read and write indices
    if isinstance(index, list):
      assert(len(index) > 0)
      if len(index) == 1:
        self.index = [index[0], index[0]]
      else:
        self.index = [index[0], index[1]]
    elif isinstance(index, basestring):
      self.index = [index, index]

    self.read_index = self.index[0]
    self.write_index = self.index[1]

    if isinstance(doc_type, list):
      self._doc_type = doc_type
    else:
      self._doc_type = [doc_type]

    self.connection = ES([self.server], timeout=30.0)

  @property
  def doc_type(self):
    if self._doc_type is None:
      raise Exception('doc_type cannot be None')
    return self._doc_type

  @doc_type.setter
  def doc_type(self, value):
    if isinstance(value, list):
      self._doc_type = value
    else:
      self._doc_type = [value]

  def _add_fieldlist_to_boolquery(self, boolquery, fieldlist, add_must):
    """Take an existing boolquery, and add a list of fields to it.  Fields
       are added as 'add_must' if add_must is True, otherwise they are added
       as 'add_must_not'.

       See the query documentation for information on types of fields.
    """

    for key in fieldlist:
      if isinstance(fieldlist[key], list):
        if len(fieldlist[key]) != 2:
          raise Exception("range list must have two members")
        query = RangeQuery(ESRange(key, fieldlist[key][0], fieldlist[key][1]))
      elif isinstance(fieldlist[key], tuple):
        query = FieldQuery()
        query.add(key, " ".join(fieldlist[key]))
      else:
        query = FieldQuery()
        query.add(key, fieldlist[key])
      if add_must:
        boolquery.add_must(query)
      else:
        boolquery.add_must_not(query)

  def _make_bool_query(self, include={}, exclude={}, sort=None):
    """Generate a simple bool query to include fields in 'include', and
       exclude fields in 'exclude'.
    """

    boolquery = BoolQuery()
    self._add_fieldlist_to_boolquery(boolquery, include, True)
    self._add_fieldlist_to_boolquery(boolquery, exclude, False)
    if sort:
      boolquery.sort = sort

    return boolquery

  def ORQuery(self, ORItems, size=10000, doc_type=None, useFieldQueries=False):
    """Return a list of hits that match any of the combination of terms
       specified in the ORItems list of dicts.

       Example:
         return hits that match any of the following mahine/starttime
         combinations:

       result = eslib.ORQuery([
        {'machine': 'talos-r1', 'starttime': '1306918341'},
        {'machine': 'talos-r2', 'starttime': '1259351812'},
       ])
    """

    if doc_type:
      self.doc_type = doc_type

    resultlist = []

    orList = []
    for item in ORItems:
      andList = []
      for key in item:
        if useFieldQueries:
          andList.append(QueryFilter(FieldQuery(FieldParameter(key, item[key]))))
        else:
          andList.append(TermFilter(key, item[key]))
      orList.append(ANDFilter(andList))
    orq = ORFilter(orList)

    q = FilteredQuery(MatchAllQuery(), orq)
    result = self.connection.search(query=q,
                                    size=size,
                                    indexes=[self.read_index],
                                    doc_types=self.doc_type)

    if result and result['hits'] and result['hits']['hits']:
      # partially flatten the data
      for hit in result['hits']['hits']:
        if not '_source' in hit:
          raise Exception("Key ['_source'] not found in response hit")
        resultlist.append(hit['_source'])
    else:
      raise Exception("Key ['hits']['hits'] not found in response data")

    return resultlist

  def query(self, include={}, exclude={}, size=None, doc_type=None, sort=None,
            withSource=False):
    """Return a list of hits which match all the fields in 'include',
       but none of the fields in 'exclude', up to a maximum of 'size' hits,
       or all hits when 'size' is None.

       Each field in 'include' and 'exclude' can be one of three types:
         - string: the field MUST include that string
         - tuple: the field MUST include any of the strings in the tuple
         - list: the field MUST have a value in the range denoted by the list

       For example, the following query returns all hits that represent
       bug 620598, on the mozilla-central OR tracemonkey trees, in the date
       range 2010-12-21 to 2011-01-05:

  result = eslib.query({'bug': '620598',
                        'tree': ('mozilla-central', 'tracemonkey'),
                        'date': ['2010-12-21', '2011-01-05']})
    """

    if doc_type:
      self.doc_type = doc_type

    resultlist = []

    boolquery = self._make_bool_query(include, exclude)

    if size:
      query_size = size
    else:
      count = self.connection.count(query=boolquery,
                                    indexes=[self.read_index],
                                    doc_types=self.doc_type)
      if not 'count' in count:
        raise Exception("Key ['count'] not found in count response data")
      query_size = count['count']

    # there's no data to return, so don't bother searching
    if query_size == 0:
      return []

    q = Search(query=boolquery, sort=sort, size=query_size)
    result = self.connection.search(query=q,
                                    indexes=[self.read_index],
                                    doc_types=self.doc_type)
    #print json.dumps(result, indent=2)

    if result and result['hits'] and result['hits']['hits']:
      # partially flatten the data
      for hit in result['hits']['hits']:
        if withSource:
          resultlist.append(hit)
        else:
          if not '_source' in hit:
            raise Exception("Key ['_source'] not found in response hit")
          resultlist.append(hit['_source'])
    else:
      raise Exception("Key ['hits']['hits'] not found in response data")

    return resultlist

  def aggregates(self, include={}, exclude={}, aggregate_by={}, doc_type=None):
    """Return a count of hits that match all possible combinations of fields
       in aggregate_by.

       Example:

  result = eslib.aggregates(include = {
                                        'tree': ('mozilla-central', 'tracemonkey'),
                                        'date': ['2010-12-21', '2011-01-05']
                                      },
                            aggregate_by = {
                                             'debug': ['true', 'false'],
                                             'os': ['linux', 'linux64', 'windows', 'osx', 'osx64']
                                           } 
                           )
    """

    if doc_type:
      self.doc_type = doc_type

    resultlist = []
    masterquery = self._make_bool_query(include, exclude)
    q = Search(query=masterquery, size=0)

    aggregate_list = []
    for key in aggregate_by:
      value_list = []
      for value in aggregate_by[key]:
        value_list.append({ key: value })
      aggregate_list.append(value_list)

    r = [[]]
    for x in aggregate_list:
      r = [ i + [y] for y in x for i in r]

    for x in r:
      nameparts = []
      namefields = []
      facetquery = BoolQuery()
      for y in x:
        for key in y:
          nameparts.append(key + ":" + y[key])
          namefields.append({ key: y[key] })
          fieldquery = FieldQuery()
          fieldquery.add(key, y[key])
          facetquery.add_must(fieldquery)
      name = "_".join(nameparts)
      facet = QueryFacet(query = facetquery, name = name)
      q.facet.facets.append(facet)

    result = self.connection.search(query=q,
                                    indexes=[self.read_index],
                                    doc_types=self.doc_type)

    if 'facets' in result:
      return result['facets']

    raise Exception("Key 'facets' not found in response data")

  def frequency(self, include={}, exclude={}, frequency_fields=[], size=30000, doc_type=None):
    """Return a count of the 'size' most frequent terms that are produced
       by a query.

       For example, the following produces a list of the top 50 bugs
       in a date range:

  result = eslib.frequency(include = {
                                        'tree': 'mozilla-central',
                                        'date': ['2010-12-21', '2011-01-05']
                                     },
                           frequency_fields = ["bug"],
                           size = 50
                          )

       Note that date fields in frequency queries are always returned
       in ms since epoch; they can be converted to python datetime objects
       using datetime.datetime.utcfromtimestamp(value/1000).
    """

    if doc_type:
      self.doc_type = doc_type

    boolquery = self._make_bool_query(include, exclude)

    q = Search(query=boolquery, size=0)
    for field in frequency_fields:
      if type(field) == basestring:
        q.facet.add_term_facet(field, size=size)
      else:
        q.facet.add_term_facet(field['field'], size=size, script=field.get('script'))

    result = self.connection.search(query=q,
                                    indexes=[self.read_index],
                                    doc_types=self.doc_type)

    if 'facets' in result:
      return result['facets']

    raise Exception("Key 'facets' not found in response data")

  def delete_doc(self, id, doc_type=None):
    if doc_type:
      self.doc_type = doc_type
    self.connection.delete(self.write_index, self.doc_type[0], id)

  def update_field_in_doc(self, id, doc_type, field, value):
    doc = self.connection.get(self.read_index, doc_type, id)
    doc = doc.get('_source')
    doc[field] = value
    self.add_doc(doc, id=id, doc_type=doc_type)

  def add_doc(self, doc, id=None, doc_type=None):
    if doc_type:
      self.doc_type = doc_type
    return self.connection.index(doc, self.write_index, self.doc_type[0], id)

  def delete_index(self):
    try:
      return self.connection.delete_index(self.write_index)
    except Exception:
      pass

  def refresh_index(self):
    self.connection.refresh(indexes=[self.write_index])