Add error handling to the message pipeline. twisty
authorMark Hammond <mhammond@skippinet.com.au>
Mon, 06 Apr 2009 11:22:45 +1000
branchtwisty
changeset 159 d181fa823fbbdcb27eb72395bd4e29ed98eb7e95
parent 158 37a8ed4fa05c0175fa774cf2bf60de098645a338
child 160 e6d1979b6580724c8c77cc44ce0353c58c9d726e
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
Add error handling to the message pipeline. If an error occurs creating a message in the pipeline, a 'dummy' error record is written, and the pipeline continues to the next message. A new 'retry-errors' command causes us to attempt to reprocess these messages - presumably after the code in error has been fixed.
schema/proto/errors/errors-map.js
server/python/junius/pipeline.py
server/python/run-raindrop.py
new file mode 100644
--- /dev/null
+++ b/schema/proto/errors/errors-map.js
@@ -0,0 +1,5 @@
+function(doc) {
+  if (doc.type=='core/error/msg')
+    // we use include_docs on this view, so get the ID magically...
+    emit(doc.raindrop_seq, null);
+}
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -1,12 +1,13 @@
 """ This is the raindrop pipeline; it moves messages from their most raw
 form to their most useful form.
 """
 from twisted.internet import defer, task
+from twisted.python.failure import Failure
 import logging
 
 logger = logging.getLogger(__name__)
 
 # Simple forward chaining.
 chain = [
     # from_type, to_type, transformer)
     ('proto/test',         'raw/message/rfc822',
@@ -27,22 +28,23 @@ chain = [
     ('message',            'anno/tags',
                            'junius.ext.message.message.MessageAnnotator'),
     # anno/tags is 'terminal'
     ('anno/tags', None, None),
 ]
 
 
 class Pipeline(object):
-    def __init__(self, doc_model):
+    def __init__(self, doc_model, options):
         self.doc_model = doc_model
-        self.forward_chain = {}
+        self.options = options
         # use a cooperator to do the work via a generator.
         # XXX - should we own the cooperator or use our parents?
         self.coop = task.Cooperator()
+        self.forward_chain = {}
         for from_type, to_type, xformname in chain:
             if xformname:
                 root, tail = xformname.rsplit('.', 1)
                 try:
                     mod = __import__(root, fromlist=[tail])
                     klass = getattr(mod, tail)
                     inst = klass(doc_model)
                 except:
@@ -69,78 +71,195 @@ class Pipeline(object):
             docs = []
             to_del = [(row['id'], row['value']) for row in result['rows']]
             for id, rev in to_del:
                 docs.append({'_id': id, '_rev': rev, '_deleted': True})
             logger.info('deleting %d messages of type %r', len(docs), doc_type)
             return self.doc_model.db.updateDocuments(docs)
 
         def gen_deleting_docs(doc_types):
-            for t in doc_types:
+            for dt in doc_types:
                 yield self.doc_model.open_view('raindrop!messages!by',
                                                'by_doc_type',
-                                               key=t,
-                            ).addCallback(delete_docs, t)
+                                               key=dt,
+                            ).addCallback(delete_docs, dt)
             # and our work-queue docs - they aren't seen by the view, so
             # just delete them by ID.
             docs = []
             for rid in ('workqueue!msg',):
                 yield self.doc_model.open_document_by_id(rid
                         ).addCallback(delete_a_doc, rid)
 
         derived = set()
         for _, to_info in self.forward_chain.iteritems():
             if to_info is not None:
                 to_type, inst = to_info
                 derived.add(to_type)
+        # error records are always 'derived'
+        derived.add('core/error/msg')
         logger.info("deleting documents with types %r", derived)
         return self.coop.coiterate(gen_deleting_docs(derived))
 
     def start(self):
         return self.coop.coiterate(self.gen_wq_tasks())
 
+    def start_retry_errors(self):
+        """Attempt to re-process all messages for which our previous
+        attempt generated an error.
+        """
+        # Later we may need a 'state doc' just for errors??
+        # For now use this dict to save the state during processing but we
+        # don't persist it yet.
+        # Also note that although
+        def gen_work():
+            state_doc = {'raindrop_seq': 0}
+            while True:
+                self.num_errors_found = 0
+                # error docs are quite small - fetch 50 at a time...
+                logger.debug("starting error processing at %(raindrop_seq)r",
+                             state_doc)
+                yield self.doc_model.open_view(
+                                'raindrop!proto!errors', 'errors',
+                                startkey=state_doc['raindrop_seq'],
+                                include_docs=True,
+                        ).addCallback(self._cb_errorq_opened, state_doc)
+                if not self.num_errors_found:
+                    logger.info("No errors remain")
+                    break
+        return self.coop.coiterate(gen_work())
+
+    def _cb_errorq_opened(self, result, state_doc):
+        def gen_work():
+            for row in result['rows']:
+                self.num_errors_found += 1
+                err_doc = row['doc']
+                logger.debug("processing error document %(_id)r", err_doc)
+                # Open original source doc
+                source_infos = err_doc['raindrop_sources']
+                assert len(source_infos)==1, "only simple fwd chaining!"
+                source_id = source_infos[0][0]
+                yield self.doc_model.open_document_by_id(source_id
+                        ).addCallback(self._cb_got_error_source, err_doc)
+                state_doc['raindrop_seq'] = row['key']
+
+        # XXX - see below - should we reuse self.coop, or is that unsafe?
+        coop = task.Cooperator()
+        return coop.coiterate(gen_work())
+
+    def _cb_got_error_source(self, result, err_doc):
+        # build the infos dict used by the sub-generator.
+        try:
+            _, proto_id, doc_type = err_doc['_id'].split("!")
+        except ValueError:
+            logger.warning("skipping malformed ID %(_id)r", err_doc)
+            return
+
+        infos = {proto_id: [result]}
+        # Although we only have 1 item to process, the queue is setup to
+        # handle many, so we need to use a generator
+        def gen_my_doc():
+            new_docs = []
+            for whateva in self.gen_work_tasks(infos, new_docs):
+                yield whateva
+            # should only be 1 new doc, and even on error there should be a
+            # new error doc.
+            assert len(new_docs)==1, new_docs
+            # either way, we are writing the new record replacing the one
+            # we have.
+            new_docs[0]['_rev'] = err_doc['_rev']
+            yield self.doc_model.create_ext_documents(new_docs)
+
+        # XXX - see below - should we reuse self.coop, or is that unsafe?
+        coop = task.Cooperator()
+        return coop.coiterate(gen_my_doc())
+
     def gen_wq_tasks(self):
+        """generate deferreds which will determine where our processing is up
+        to and walk us through the _all_docs_by_seq view from that point,
+        creating and saving new docs as it goes. When the generator finishes
+        the queue is empty. """
         # first open our 'state' document
         def _cb_update_state_doc(result, d):
             if result is not None:
                 assert d['_id'] == result['_id'], result
                 d.update(result)
             # else no doc exists - the '_id' remains in the default though.
 
         state_doc = {'_id': 'workqueue!msg',
-                     'doc_type': "core/workqueue",
+                     'doc_type': u"core/workqueue",
                      'seq': 0}
         yield self.doc_model.open_document_by_id(state_doc['_id'],
                     ).addCallback(_cb_update_state_doc, state_doc)
 
         logger.info("Work queue starting with sequence ID %d",
                     state_doc['seq'])
 
         logger.debug('opening by_seq view for work queue...')
-        # We process 5 records at a time, and fetch those 5 documents in
-        # the same request; this seems a reasonable compromize between
-        # efficiency and handling large docs (no attachments come back...)
-        # Another alternative worth benchmarking; use a much larger limit
-        # without returning the documents, in the hope we can deduce more
-        # from the view, avoiding the fetch of many docs at all...
+        num_per_batch = 15 # configurable????
+        # We process num_per_batch records at a time, and fetch all those
+        # documents in the same request; this seems a reasonable compromize
+        # between efficiency and handling large docs (no attachments come
+        # back...) Another alternative worth benchmarking; use a much larger
+        # limit without returning the documents, in the hope we can deduce
+        # more from the view, avoiding the fetch of many docs at all...
         self.queue_finished = False
         self.state_doc_dirty = False
         while not self.queue_finished:
-            yield self.doc_model.db.listDocsBySeq(limit=15,
+            yield self.doc_model.db.listDocsBySeq(limit=num_per_batch,
                                                   startkey=state_doc['seq'],
                                                   include_docs=True,
                         ).addCallback(self._cb_by_seq_opened, state_doc)
         if self.state_doc_dirty:
             logger.debug("flushing state doc at end of run...")
             yield self.doc_model.create_ext_documents([state_doc]
                     ).addCallback(self._cb_created_docs, state_doc
                     )
         else:
             logger.debug("no need to flush state doc")
 
+    def gen_work_tasks(self, doc_infos, new_docs):
+        # A generator which takes each doc in the list to its "next" type, but noting that
+        # as we have a number of docs ahead of us, one of them may satisfy
+        # us..
+        # ultimately we may need to generate lots of new docs from this one -
+        # but for now we have a single, simple chain moving forwards...
+
+        # Results aren't written - that is the caller's job - new docs are
+        # appended to the passed list.
+
+        # doc_infos is a dict keyed by proto_id.  Each value is a list, in
+        # sequence order, of the document itself.
+        for proto_id, infos in doc_infos.iteritems():
+            for sq, doc in enumerate(infos):
+                doc_type = doc['type']
+                try:
+                    xform_info = self.forward_chain[doc_type]
+                except KeyError:
+                    logger.warning("Can't find transformer for message type %r - skipping %r",
+                                   doc_type, proto_id)
+                    continue
+                if xform_info is None:
+                    logger.debug("Document %r is at its terminal type of %r",
+                                 proto_id, doc_type)
+                    continue
+
+                next_type, xformer = xform_info
+                # See if the next_type is in the rows ahead of us in by_seq
+                for check_doc in infos[sq+1:]:
+                    if next_type == check_doc['type']:
+                        logger.debug("cool - _by_seq lookahead tells us the doc is already %r",
+                                     next_type)
+                        continue
+                # OK - need to create this new type.
+                logger.debug("calling %r to create a %s from %s", xformer,
+                             next_type, doc['type'])
+                yield defer.maybeDeferred(xformer.convert, doc
+                            ).addBoth(self._cb_converted_or_not,
+                                      next_type, proto_id, doc, new_docs)
+
     def _cb_by_seq_opened(self, result, state_doc):
         rows = result['rows']
         logger.debug('work queue has %d items to check.', len(rows))
         if not rows:
             # no rows left.  There is no guarantee our state doc will be
             # the last one...
             logger.info("work queue ran out of rows...")
             # either way, we are done!
@@ -164,56 +283,33 @@ class Pipeline(object):
             if value.get('deleted'):
                 logger.debug("skipping deleted message %r", rid)
                 continue
             try:
                 _, proto_id, doc_type = rid.split("!")
             except ValueError:
                 logger.warning("skipping malformed message ID %r", rid)
                 continue
-            known.setdefault(proto_id, []).append(
-                                (doc_type, value['rev'], row['doc']))
+            doc = row['doc']
+            real_type = doc.get('type')
+            if real_type != doc_type: # probably a core/error/msg
+                logger.info("message isn't of expected type (expected %r "
+                            "but got %r) - skipping", doc_type, real_type)
+                continue
+            known.setdefault(proto_id, []).append(doc)
 
         state_doc['seq'] = last_seq # only takes effect once saved...
         logger.debug("Our %d rows gave info about %d messages",
                      len(rows), len(known))
 
-        # Now take each doc in the list to its "next" type, but noting that
-        # as we have a number of docs ahead of us, one of them may satisfy
-        # us..
-        # ultimately we may need to generate lots of new docs from this one -
-        # but for now we have a single, simple chain moving forwards...
-        def gen_todo():
+        def gen_my_work_tasks():
             new_docs = []
-            for proto_id, infos in known.iteritems():
-                for (sq, (doc_type, rev, doc)) in enumerate(infos):
-                    try:
-                        xform_info = self.forward_chain[doc_type]
-                    except KeyError:
-                        logger.warning("Can't find transformer for message type %r - skipping %r",
-                                       doc_type, proto_id)
-                        continue
-                    if xform_info is None:
-                        logger.debug("Document %r is at its terminal type of %r",
-                                     proto_id, doc_type)
-                        continue
+            for whateva in self.gen_work_tasks(known, new_docs):
+                yield whateva
 
-                    next_type, xformer = xform_info
-                    # See if the next_type is in the rows ahead of us in by_seq
-                    for (check_type, _, _) in infos[sq+1:]:
-                        if next_type == check_type:
-                            logger.debug("cool - _by_seq lookahead tells us the doc is already %r",
-                                         next_type)
-                            continue
-                    # OK - need to create this new type.
-                    logger.debug("calling %r to create a %s from %s", xformer,
-                                 next_type, doc['type'])
-                    yield defer.maybeDeferred(xformer.convert, doc
-                                    ).addCallback(self._cb_converted, next_type,
-                                                  proto_id, new_docs)
             # It isn't unusual to see zero docs to process, and writing our
             # new state doc each time is going to hurt perf a little.  The
             # worst case is we die before flushing our state doc, but that's
             # OK - we should be able to re-skip these messages quickly next
             # time...
             if new_docs:
                 # Now write the docs we created, plus our queue 'state' doc.
                 logger.info("work queue finished at sequence %d - %d new documents"
@@ -223,29 +319,62 @@ class Pipeline(object):
                 yield self.doc_model.create_ext_documents(new_docs
                         ).addCallback(self._cb_created_docs, state_doc
                         )
             else:
                 # don't bother writing the state doc now, but record it is
                 # dirty so if we get to the end we *do* write it.
                 self.state_doc_dirty = True
 
-        return self.coop.coiterate(gen_todo())
+        # I *think* that if we share the same cooperator as the task itself,
+        # we risk having the next chunk of sequence IDs processed before we
+        # are done here.
+        # OTOH, I'm not sure about that.....
+        # As we are really only using a coiterator as a handy way of managing
+        # twisted loops, using our own should be ok though.
+        coop = task.Cooperator()
+        return coop.coiterate(gen_my_work_tasks())
 
     def _cb_created_docs(self, new_revs, state_doc):
         # XXX - note that the _ids in this result can't be trusted if there
         # were attachments :(  fortunately we don't care here...
         # XXXXXXX - *sob* - we do care once we start hitting conflicts in
         # messages ahead of us...
         # XXX - this might not be necessary...
         last_rev = new_revs[-1]
         assert last_rev['id'] == state_doc['_id'], last_rev
         state_doc['_rev'] = last_rev['rev']
 
-    def _eb_doc_failed(self, failure):
-        logger.error("FAILED to pipe a doc: %s", failure)
-        # and we will auto skip to the next doc...
-
-    def _cb_converted(self, new_doc, dest_type, rootdocid, new_docs):
-        self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
-        logger.debug("converter returned new document type %r for %r: %r",
-                     dest_type, rootdocid, new_doc['_id'])
+    def _cb_converted_or_not(self, result, dest_type, rootdocid, existing_doc,
+                             new_docs):
+        # This is both a callBack and an errBack.  If a converter fails to
+        # create a document, we can't just fail, or no later messages in the
+        # DB will ever get processed!
+        # So we write a "dummy" record - it has the same docid that the real
+        # document would have - but the 'type' attribute in the document
+        # reflects it is actually an error marker.
+        if isinstance(result, Failure):
+            logger.warn("Failed to convert a document: %s", result)
+            if self.options.stop_on_error:
+                logger.info("--stop-on-error specified - re-throwing error")
+                result.raiseException()
+            # and make a dummy doc.
+            new_doc = {'error_details': unicode(result)}
+            self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+            # and patch the 'type' attribute to reflect its really an error.
+            new_doc['type'] = 'core/error/msg'
+            # In theory, the source ID of each doc that contributed is
+            # redundant as it could be deduced once we get backward chaining.
+            # However, we probably will always need to track the _rev of those
+            # docs so we can detect 'stale' errors (XXX - probably not - the
+            # 'chain' doesn't go any further on error)
+            # Also in theory, we don't need this in the non-error case, as
+            # our _by_seq processing will ensure the right thing happens.
+            # A list of sources as one day we will support that!
+            new_doc['raindrop_sources'] = [[existing_doc['_id'],
+                                            existing_doc['_rev']]
+                                          ]
+        else:
+            new_doc = result
+            self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
+            logger.debug("converter returned new document type %r for %r: %r",
+                         dest_type, rootdocid, new_doc['_id'])
         new_docs.append(new_doc)
--- a/server/python/run-raindrop.py
+++ b/server/python/run-raindrop.py
@@ -54,19 +54,27 @@ def sync_messages(result, parser, option
     conductor = get_conductor()
     return conductor.sync(None)
 
 @asynch_command
 def process(result, parser, options):
     """Process all messages to see if any extensions need running"""
     def done(result):
         print "Message pipeline has finished..."
-    p = pipeline.Pipeline(model.get_doc_model())
+    p = pipeline.Pipeline(model.get_doc_model(), options)
     return p.start().addCallback(done)
 
+@asynch_command
+def retry_errors(result, parser, options):
+    """Reprocess all conversions which previously resulted in an error."""
+    def done(result):
+        print "Error retry pipeline has finished..."
+    p = pipeline.Pipeline(model.get_doc_model(), options)
+    return p.start_retry_errors().addCallback(done)
+
 @allargs_command
 def show_view(result, parser, options, args):
     """Pretty-print the result of executing a view.
 
     All arguments after this command are URLs to be executed as the view.  If
     the view name starts with '/', the URL will be used as-is.  This is
     suitable for builtin views - eg:
 
@@ -117,17 +125,17 @@ def show_view(result, parser, options, a
 
 
 def unprocess(result, parser, options):
     """Delete all documents which can be re-generated by the 'process' command
     """
     def done(result):
         print "unprocess has finished..."
     # XXX - pipeline should probably be a singleton?
-    p = pipeline.Pipeline(model.get_doc_model())
+    p = pipeline.Pipeline(model.get_doc_model(), options)
     return p.unprocess().addCallback(done)
 
 def delete_docs(result, parser, options):
     """Delete all documents of a particular type.  Use with caution or see
        the 'unprocess' command for an alternative.
     """
     # NOTE: This is for development only, until we get a way to say
     # 'reprocess stuff you've already done' - in the meantime deleting those
@@ -215,16 +223,21 @@ def main():
     parser.add_option("", "--doctype", action="append", dest='doctypes',
                       help="Specifies the document types to use for some "
                            "operations.")
 
     parser.add_option("", "--force", action="store_true",
                       help="Forces some operations which would otherwise not "
                            "be done.")
 
+    parser.add_option("-s", "--stop-on-error", action="store_true",
+                      help="Causes (some) operations which would normally "
+                           "handle an error and continue to stop when an "
+                           "error occurs.")
+
     options, args = parser.parse_args()
 
     _setup_logging(options)
 
     # do this very early just to set the options
     get_conductor(options)
 
     if args and args[0]=='help':