dont call post with zero docs, logging tweaks twisty
authorMark Hammond <mhammond@skippinet.com.au>
Mon, 30 Mar 2009 10:43:46 +1100
branchtwisty
changeset 135 35658a2c572b76a298c6bd83f30e1b88cfc86f54
parent 134 a6f67d2b235c853adc0b058977df72f27d606972
child 136 7469bf443931f3b11303df00538b2857cd616310
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
dont call post with zero docs, logging tweaks
server/python/junius/pipeline.py
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -56,27 +56,27 @@ class Pipeline(object):
         return self.coop.coiterate(self.gen_all_documents())
 
     def gen_all_documents(self):
         # check *every* doc in the DB.  This exists until we can derive
         # a workqueue based on views.
         self.num_this_process = 0
         while True:
             logger.debug('opening view for work queue...')
-            yield self.doc_model.db.openView('raindrop!messages!by',
+            yield self.doc_model.db.openView('raindrop!messages!workqueue',
                                              'by_doc_roots',
                                              group=True,
                         ).addCallback(self._cb_roots_opened)
-            logger.debug('this time we did %d documents', self.num_this_process)
+            logger.info('processed %d documents', self.num_this_process)
             if self.num_this_process == 0:
                 break
         logger.debug('finally run out of documents.')
 
     def _cb_roots_opened(self, rows):
-        logger.info('work queue has %d items to process.', len(rows))
+        logger.info('work queue has %d items to check.', len(rows))
         def gen_todo(todo):
             for row in todo:
                 did = row['key']
                 logger.debug("Finding last extension point for %s", did)
                 yield self.doc_model.get_last_ext_for_document(did
                             ).addCallback(self._cb_got_doc_last_ext, did
                             ).addErrback(self._eb_doc_failed
                             )
@@ -132,26 +132,26 @@ class Pipeline(object):
                 did_ext = dest_type
                 thisdocid = ld['_id']
 
         return self.coop.coiterate(gen_processes()
                     ).addCallback(self._cb_docs_done, rootdocid, new_docs
                     )
 
     def _cb_docs_done(self, result, rootdocid, new_docs):
-        return self.doc_model.create_ext_documents(rootdocid, new_docs)
+        if new_docs:
+            self.doc_model.create_ext_documents(rootdocid, new_docs)
 
     def _cb_got_last_doc(self, doc, rootdocid, xform_info, docs_by_type, new_docs):
         assert doc is not None, "failed to locate the doc for %r" % rootdocid
         dest_type, xformer = xform_info
         logger.debug("calling %r to create a %s from %s", xformer, dest_type,
                      doc['type'])
         return defer.maybeDeferred(xformer.convert, doc
                         ).addCallback(self._cb_converted, dest_type, rootdocid,
                                       docs_by_type, new_docs)
 
     def _cb_converted(self, new_doc, dest_type, rootdocid, docs_by_type, new_docs):
         self.num_this_process += 1
-        # This should really be in the model, but oh well
         self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
         logger.debug("converter returned new document type %r for %r: %r", dest_type, rootdocid, new_doc['_id'])
         docs_by_type[dest_type] = new_doc
         new_docs.append(new_doc)