--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -35,53 +35,53 @@ class Pipeline(object):
self.forward_chain[from_type] = (to_type, inst)
def start(self):
return defer.maybeDeferred(self.process_all_documents
).addCallback(self._cb_maybe_reprocess_all_documents)
def process_all_documents(self):
# check *every* doc in the DB. This exists until we can derive
- # a workqueue based on views.\
+ # a workqueue based on views.
self.num_this_process = 0
return self.doc_model.db.openView('raindrop!messages!by',
'by_doc_roots',
group=True, limit=100,
).addCallback(self._cb_roots_opened)
def _cb_maybe_reprocess_all_documents(self, result):
- if self.num_this_process != 0:
+ if self.num_this_process:
logger.debug('pipeline processed %d documents last time; trying again',
self.num_this_process)
return self.start()
logger.info('pipeline is finished.')
def _cb_roots_opened(self, rows):
dl = []
for row in rows:
did = row['key']
- logger.debug("Finding last extention point for %s", did)
+ logger.debug("Finding last extension point for %s", did)
dl.append(
self.doc_model.get_last_ext_for_document(did).addCallback(
self._cb_got_doc_last_ext, did)
)
return defer.DeferredList(dl)
def _cb_got_doc_last_ext(self, ext_info, rootdocid):
last_ext, docid = ext_info
if last_ext is None:
- logger.debug("Document '%s' doesn't appear to be a message; skipping",
- rootdocid)
- return
+ logger.debug("Document '%s' doesn't appear to be a message; skipping",
+ rootdocid)
+ return None
try:
xform_info = self.forward_chain[last_ext]
except KeyError:
logger.warning("Can't find transformer for message type %r - skipping %r",
last_ext, docid)
- return
+ return None
logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
return self.doc_model.open_document(docid
).addCallback(self._cb_got_last_doc, rootdocid, xform_info
)
def _cb_got_last_doc(self, doc, rootdocid, xform_info):
assert doc is not None, "failed to locate the doc for %r" % rootdocid
dest_type, xformer = xform_info