Optionally allow caller to specify read/write ES indices
authorJonathan Griffin <jgriffin@mozilla.com>
Fri, 10 Jun 2011 12:27:03 -0700
changeset 32 2edc7518de5abe9251e3b797db1405e6a63d7daf
parent 31 1f649c3998d0721b7152095416f534ff6a75f69f
child 33 ed46547d9991a3b27eb27f235f0f3177399a3c14
push id33
push userjgriffin@mozilla.com
push dateFri, 10 Jun 2011 19:27:10 +0000
Optionally allow caller to specify read/write ES indices
mozautoeslib/eslib.py
--- a/mozautoeslib/eslib.py
+++ b/mozautoeslib/eslib.py
@@ -12,21 +12,35 @@ class ESLib(object):
   def __init__(self, server, index, doc_type=None):
     """Initialize an ESLib object with server address, index, and doc_type.
     """
 
     if not server or not index:
       raise Exception("must specify server and index!")
 
     self.server = server
-    self.index = index
+
+    # split the index parameter into read and write indices
+    if isinstance(index, list):
+      assert(len(list) > 0)
+      if len(index) == 1:
+        self.index = [index[0], index[0]]
+      else:
+        self.index = [index[0], index[1]]
+    elif isinstance(index, basestring):
+      self.index = [index, index]
+
+    self.read_index = self.index[0]
+    self.write_index = self.index[1]
+
     if isinstance(doc_type, list):
       self._doc_type = doc_type
     else:
       self._doc_type = [doc_type]
+
     self.connection = ES([self.server], timeout=30.0)
 
   @property
   def doc_type(self):
     if self._doc_type is None:
       raise Exception('doc_type cannot be None')
     return self._doc_type
 
@@ -102,17 +116,17 @@ class ESLib(object):
         else:
           andList.append(TermFilter(key, item[key]))
       orList.append(ANDFilter(andList))
     orq = ORFilter(orList)
 
     q = FilteredQuery(MatchAllQuery(), orq)
     result = self.connection.search(query=q,
                                     size=size,
-                                    indexes=[self.index],
+                                    indexes=[self.read_index],
                                     doc_types=self.doc_type)
 
     if result and result['hits'] and result['hits']['hits']:
       # partially flatten the data
       for hit in result['hits']['hits']:
         if not '_source' in hit:
           raise Exception("Key ['_source'] not found in response hit")
         resultlist.append(hit['_source'])
@@ -147,29 +161,29 @@ class ESLib(object):
     resultlist = []
 
     boolquery = self._make_bool_query(include, exclude)
 
     if size:
       query_size = size
     else:
       count = self.connection.count(query=boolquery,
-                                    indexes=[self.index],
+                                    indexes=[self.read_index],
                                     doc_types=self.doc_type)
       if not 'count' in count:
         raise Exception("Key ['count'] not found in count response data")
       query_size = count['count']
 
     # there's no data to return, so don't bother searching
     if query_size == 0:
       return []
 
     q = Search(query=boolquery, sort=sort, size=query_size)
     result = self.connection.search(query=q,
-                                    indexes=[self.index],
+                                    indexes=[self.read_index],
                                     doc_types=self.doc_type)
     #print json.dumps(result, indent=2)
 
     if result and result['hits'] and result['hits']['hits']:
       # partially flatten the data
       for hit in result['hits']['hits']:
         if withSource:
           resultlist.append(hit)
@@ -228,17 +242,17 @@ class ESLib(object):
           fieldquery = FieldQuery()
           fieldquery.add(key, y[key])
           facetquery.add_must(fieldquery)
       name = "_".join(nameparts)
       facet = QueryFacet(query = facetquery, name = name)
       q.facet.facets.append(facet)
 
     result = self.connection.search(query=q,
-                                    indexes=[self.index],
+                                    indexes=[self.read_index],
                                     doc_types=self.doc_type)
 
     if 'facets' in result:
       return result['facets']
 
     raise Exception("Key 'facets' not found in response data")
 
   def frequency(self, include={}, exclude={}, frequency_fields=[], size=30000, doc_type=None):
@@ -266,41 +280,41 @@ class ESLib(object):
 
     boolquery = self._make_bool_query(include, exclude)
 
     q = Search(query=boolquery, size=0)
     for field in frequency_fields:
       q.facet.add_term_facet(field, size=size)
 
     result = self.connection.search(query=q,
-                                    indexes=[self.index],
+                                    indexes=[self.read_index],
                                     doc_types=self.doc_type)
 
     if 'facets' in result:
       return result['facets']
 
     raise Exception("Key 'facets' not found in response data")
 
   def delete_doc(self, id, doc_type=None):
     if doc_type:
       self.doc_type = doc_type
-    self.connection.delete(self.index, self.doc_type[0], id)
+    self.connection.delete(self.write_index, self.doc_type[0], id)
 
   def update_field_in_doc(self, id, doc_type, field, value):
-    doc = self.connection.get(self.index, doc_type, id)
+    doc = self.connection.get(self.read_index, doc_type, id)
     doc = doc.get('_source')
     doc[field] = value
     self.add_doc(doc, id=id, doc_type=doc_type)
 
   def add_doc(self, doc, id=None, doc_type=None):
     if doc_type:
       self.doc_type = doc_type
-    return self.connection.index(doc, self.index, self.doc_type[0], id)
+    return self.connection.index(doc, self.write_index, self.doc_type[0], id)
 
   def delete_index(self):
     try:
-      return self.connection.delete_index(self.index)
+      return self.connection.delete_index(self.write_index)
     except Exception:
       pass
 
   def refresh_index(self):
-    self.connection.refresh(indexes=[self.index])
+    self.connection.refresh(indexes=[self.write_index])