fix error retries by passing the old revision info around
authorMark Hammond <mhammond@skippinet.com.au>
Mon, 20 Apr 2009 11:06:46 +1000
changeset 210 9056db5cd8ef1ea86be7232e338c0b3025f250c3
parent 209 ef3b5eaab4ad0450a9e97e65fbf26c4debb6c750
child 211 cec075b725653fd9234bc369e747e744719f4960
push id24
push usermhammond@skippinet.com.au
push dateMon, 20 Apr 2009 01:22:13 +0000
fix error retries by passing the old revision info around
schema/proto/workqueue/source_revs-map.js
server/python/raindrop/pipeline.py
server/python/raindrop/tests/pipeline/test_simple.py
--- a/schema/proto/workqueue/source_revs-map.js
+++ b/schema/proto/workqueue/source_revs-map.js
@@ -1,4 +1,4 @@
 function(doc) {
     if (doc.raindrop_sources)
-        emit(doc._id, doc.raindrop_sources);
+        emit(doc._id, [doc._rev, doc.raindrop_sources]);
 }
--- a/server/python/raindrop/pipeline.py
+++ b/server/python/raindrop/pipeline.py
@@ -154,19 +154,20 @@ class Pipeline(object):
         # XXX - see below - should we reuse self.coop, or is that unsafe?
         coop = task.Cooperator()
         return coop.coiterate(gen_work())
 
     def _cb_got_error_source(self, source):
         # XXX - see below - should we reuse self.coop, or is that unsafe?
         coop = task.Cooperator()
         return coop.coiterate(self.gen_transition_tasks(
-                                        source['_id'], source['_rev']))
+                                        source['_id'], source['_rev'],
+                                        force=True))
 
-    def gen_transition_tasks(self, src_id, src_rev,
+    def gen_transition_tasks(self, src_id, src_rev, force=False,
                              # caller can supply this dict if they care...
                              caller_ret={'num_processed': 0}):
         # A generator which checks if each of its 'targets' (ie, the extension
         # which depends on this document) is up-to-date wrt this document's
         # revision.
         logger.debug("generating transition tasks for %r (rev=%s)", src_id,
                      src_rev)
 
@@ -187,51 +188,55 @@ class Pipeline(object):
 
         for target_info in targets:
             target_cat, target_type = target_info
             assert target_cat==doc_cat, (target_cat, doc_cat)
             target_id = dm.build_docid(target_cat, target_type, proto_id)
             cvtr = converters[(doc_cat, doc_type), target_info]
             logger.debug('looking for existing document %r', target_id)
             yield dm.open_view('raindrop!proto!workqueue', 'source_revs',
-                               key=target_id
+                            key=target_id
                     ).addCallback(self._cb_check_existing_doc, cvtr,
-                                  proto_id, src_id, src_rev, caller_ret)
+                                  proto_id, src_id, src_rev, force,
+                                  caller_ret)
 
     def _cb_check_existing_doc(self, result, cvtr, proto_id, src_id, src_rev,
-                               caller_ret):
+                               force, caller_ret):
         rows = result['rows']
         if len(rows)==0:
             # no target exists at all - needs to be executed...
             need_target = True
+            target_rev = None
         else:
             assert len(rows)==1, rows # what does more than 1 mean?
-            all_sources = rows[0]['value']
+            val = rows[0]['value']
+            target_rev, all_sources = val
             look = [src_id, src_rev]
-            need_target = look not in all_sources
+            need_target = force or look not in all_sources
             logger.debug("need target=%s (looked for %r in %r)", need_target,
                          look, all_sources)
 
         if not need_target:
             return None # nothing to do.
-        caller_ret['num_processed'] += 1
-        return self.make_document(cvtr, proto_id)
+        return self._make_document(cvtr, proto_id, target_rev, caller_ret)
 
-    def make_document(self, cvtr, proto_id):
+    def _make_document(self, cvtr, proto_id, target_rev, caller_ret):
         # OK - need to create this new type - locate all dependents in the
         # DB - this will presumably include the document which triggered
         # the process...
+        caller_ret['num_processed'] += 1
         all_deps = []
         dm = self.doc_model
         for src_cat, src_type in cvtr.sources:
             all_deps.append(dm.build_docid(src_cat, src_type, proto_id))
         return dm.db.listDoc(keys=all_deps, include_docs=True,
-                    ).addCallback(self._cb_do_conversion, cvtr, proto_id)
+                    ).addCallback(self._cb_do_conversion, cvtr, proto_id,
+                                  target_rev)
 
-    def _cb_do_conversion(self, result, cvtr, proto_id):
+    def _cb_do_conversion(self, result, cvtr, proto_id, target_rev):
         sources = []
         for r in result['rows']:
             if 'error' in r:
                 # This is usually a simple 'not found' error; it doesn't
                 # mean an 'error record'
                 logger.debug('skipping document %(key)s - %(error)s', r)
                 continue
             if 'deleted' in r['value']:
@@ -242,19 +247,20 @@ class Pipeline(object):
         logger.debug("calling %r to create a %s from %d docs", cvtr,
                      cvtr.target_type, len(sources))
         if not sources:
             # no source documents - that's strange but OK - when sources
             # appear we will get here again...
             return None
         return defer.maybeDeferred(cvtr.convert, sources
                         ).addBoth(self._cb_converted_or_not,
-                                  cvtr, sources, proto_id)
+                                  cvtr, sources, proto_id, target_rev)
 
-    def _cb_converted_or_not(self, result, target_ext, sources, proto_id):
+    def _cb_converted_or_not(self, result, target_ext, sources, proto_id,
+                             target_rev):
         # This is both a callBack and an errBack.  If a converter fails to
         # create a document, we can't just fail, or no later messages in the
         # DB will ever get processed!
         # So we write a "dummy" record - it has the same docid that the real
         # document would have - but the 'type' attribute in the document
         # reflects it is actually an error marker.
         dest_type = target_ext.target_type[1]
         if isinstance(result, Failure):
@@ -278,23 +284,26 @@ class Pipeline(object):
             new_doc = result
             self.doc_model.prepare_ext_document(proto_id, dest_type, new_doc)
             logger.debug("converter returned new document type %r for %r: %r",
                          dest_type, proto_id, new_doc['_id'])
         # In theory, the source ID of each doc that contributed is
         # redundant as it could be deduced.  But we need the revision to
         # check if we are up-to-date wrt our 'children'...
         new_doc['raindrop_sources'] = [(s['_id'], s['_rev']) for s in sources]
+        if target_rev is not None:
+            new_doc['_rev'] = target_rev
         return self.doc_model.create_ext_documents([new_doc])
 
 def generate_work_queue(doc_model, transition_gen_factory):
     """generate deferreds which will determine where our processing is up
-    to and walk us through the _all_docs_by_seq view from that point,
-    creating and saving new docs as it goes. When the generator finishes
-    the queue is empty."""
+    to and walk us through the _all_docs_by_seq view from that point.  This
+    generator itself determines the source documents to process, then passes
+    each of those documents through another new generator, which actually
+    calls the extensions and saves the docs."""
     def _cb_update_state_doc(result, d):
         if result is not None:
             assert d['_id'] == result['_id'], result
             d.update(result)
         # else no doc exists - the '_id' remains in the default though.
 
     def _cb_by_seq_opened(result, state_doc, convert_result):
         rows = result['rows']
@@ -316,17 +325,18 @@ def generate_work_queue(doc_model, trans
         # twisted loops, using our own should be ok though.
         coop = task.Cooperator()
         return coop.coiterate(_gen_tasks_from_seq_rows(rows, convert_result))
 
     def _gen_tasks_from_seq_rows(rows, convert_result):
         for row in rows:
             did = row['id']
             src_rev = row['value']['rev']
-            for task in transition_gen_factory(did, src_rev, convert_result):
+            for task in transition_gen_factory(did, src_rev,
+                                               caller_ret=convert_result):
                 yield task
 
     # first open our 'state' document
     state_doc = {'_id': 'workqueue!msg',
                  'type': u"core/workqueue",
                  'seq': 0}
     yield doc_model.open_document_by_id(state_doc['_id'],
                 ).addCallback(_cb_update_state_doc, state_doc)
--- a/server/python/raindrop/tests/pipeline/test_simple.py
+++ b/server/python/raindrop/tests/pipeline/test_simple.py
@@ -1,12 +1,11 @@
 # The first raindrop unittest!
-# execute via: path/to/twisted/scripts/trial.py path/to/this_file.py
 
-from twisted.internet import task
+from twisted.internet import task, defer
 
 from raindrop.tests import TestCaseWithTestDB, FakeOptions
 from raindrop.model import get_doc_model, encode_proto_id
 from raindrop.proto import test as test_proto
 from raindrop import pipeline
 
 class TestPipelineBase(TestCaseWithTestDB):
     def get_pipeline(self):
@@ -20,75 +19,80 @@ class TestPipelineBase(TestCaseWithTestD
         return coop.coiterate(
                     pl.gen_transition_tasks(doc['_id'], doc['_rev']))
 
 class TestPipeline(TestPipelineBase):
     def test_one_step(self):
         # Test taking a raw message one step along its pipeline.
         test_proto.next_convert_fails = False
 
-        def open_target(whateva):
-            return dm.open_document('msg', '0', 'raw/message/rfc822')
+        def check_targets_last(lasts_by_seq, target_types):
+            docs = [row['doc'] for row in lasts_by_seq]
+            db_types = set(doc['type'] for doc in docs)
+            self.failUnlessEqual(db_types, target_types)
+            return docs
 
-        def check_target_last(lasts_by_seq, doc):
-            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
-            return doc
-
-        def check_target(doc):
-            # Our doc should be the last written
-            self.failUnless(doc)
-            return self.get_last_by_seq(
-                        ).addCallback(check_target_last, doc
+        def check_targets(result, target_types):
+            # Our targets should be the last written
+            return self.get_last_by_seq(len(target_types),
+                        ).addCallback(check_targets_last, target_types
                         )
 
         # open the test document to get its ID and _rev.
+        targets = set(('raw/message/rfc822', 'anno/flags'))
         dm = get_doc_model()
         return dm.open_document('msg', '0', 'proto/test'
                 ).addCallback(self.process_doc
-                ).addCallback(open_target
-                ).addCallback(check_target
+                ).addCallback(check_targets, targets
                 )
 
     def test_one_again_does_nothing(self):
         # Test that attempting to process a message which has already been
         # processed is a noop.
         dm = get_doc_model()
 
-        def check_target_same(lasts, target_b4):
-            # Re-processing should not have modified the target in any way.
-            self.failUnlessEqual(lasts[0]['doc'], target_b4)
+        def check_targets_same(lasts, targets_b4):
+            # Re-processing should not have modified the targets in any way.
+            db_ids = set(row['id'] for row in lasts)
+            expected = set(doc['_id'] for doc in targets_b4)
+            self.failUnlessEqual(db_ids, expected)
 
-        def check_nothing_done(whateva, target_b4):
-            return self.get_last_by_seq(
-                        ).addCallback(check_target_same, target_b4
+        def check_nothing_done(whateva, targets_b4):
+            return self.get_last_by_seq(len(targets_b4),
+                        ).addCallback(check_targets_same, targets_b4
                         )
 
-        def reprocess(src_doc, target_b4):
+        def reprocess(src_doc, targets_b4):
             return self.process_doc(src_doc
-                        ).addCallback(check_nothing_done, target_b4)
+                        ).addCallback(check_nothing_done, targets_b4)
 
-        def do_it_again(target_doc):
+        def do_it_again(target_docs):
             return dm.open_document('msg', '0', 'proto/test'
-                            ).addCallback(reprocess, target_doc
+                            ).addCallback(reprocess, target_docs
                             )
 
         return self.test_one_step(
                 ).addCallback(do_it_again
                 )
         
 
     def test_two_steps(self):
         # Test taking a raw message two steps along its pipeline.
-        def check_last_doc(lasts):
-            self.failUnless(lasts[0]['id'].endswith("!raw/message/email"), lasts)
+        def check_last_docs(lasts, target_types):
+            db_types = set(row['doc']['type'] for row in lasts)
+            self.failUnlessEqual(db_types, target_types)
 
+        def process_nexts(targets):
+            return defer.DeferredList([self.process_doc(d) for d in targets])
+
+        target_types = set(('raw/message/email', 'aggr/flags'))
         return self.test_one_step(
-                ).addCallback(self.process_doc
-                ).addCallback(lambda whateva: self.get_last_by_seq()
-                ).addCallback(check_last_doc
+                ).addCallback(process_nexts,
+                ).addCallback(lambda whateva: self.get_last_by_seq(len(target_types))
+                ).addCallback(check_last_docs, target_types
                 )
 
     def test_all_steps(self):
         def check_last_doc(lasts):
             self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
             self.failUnless(lasts[1]['id'].endswith("!aggr/tags"), lasts)
 
         test_proto.next_convert_fails = False
@@ -98,72 +102,61 @@ class TestPipeline(TestPipelineBase):
                 )
 
 class TestErrors(TestPipelineBase):
     def test_error_stub(self):
         # Test that when a converter fails an appropriate error record is
         # written
         test_proto.next_convert_fails = True
 
-        def open_target(whateva):
-            return dm.open_document('msg', '0', 'raw/message/rfc822')
-
-        def check_target_last(lasts_by_seq, doc):
-            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
-            self.failUnlessEqual(doc['type'], 'core/error/msg')
-            self.failUnless('This is a test failure' in doc['error_details'],
-                            doc['error_details'])
-            return doc
-
-        def check_target(doc):
-            # Our doc should be the last written and be an error records.
-            self.failUnless(doc)
-            return self.get_last_by_seq(
-                        ).addCallback(check_target_last, doc
-                        )
+        def check_target_last(lasts):
+            # The current pipeline means that the 'raw/message/email' is an
+            # error but the 'anno/flags' works...
+            expected = set(('anno/flags', 'core/error/msg'))
+            types = set([row['doc']['type'] for row in lasts])
+            self.failUnlessEqual(types, expected)
 
         # open the test document to get its ID and _rev.
         dm = get_doc_model()
         return dm.open_document('msg', '0', 'proto/test'
                 ).addCallback(self.process_doc
-                ).addCallback(open_target
-                ).addCallback(check_target
+                ).addCallback(lambda whateva: self.get_last_by_seq(2)
+                ).addCallback(check_target_last
                 )
 
     def test_reprocess_errors(self):
         # Test that reprocessing an error results in the correct thing.
         dm = get_doc_model()
-        def open_target(whateva):
-            return dm.open_document('msg', '0', 'raw/message/rfc822')
 
-        def check_target_last(lasts_by_seq, doc):
-            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
-            return doc
-
-        def check_target(doc):
-            # Our doc should be the last written
-            self.failUnless(doc)
-            return self.get_last_by_seq(
-                        ).addCallback(check_target_last, doc
-                        )
+        def check_target_last(lasts):
+            got = set(row['doc']['type'] for row in lasts)
+            expected = set(('anno/flags', 'raw/message/rfc822'))
+            self.failUnlessEqual(got, expected)
 
         def start_retry(whateva):
-            test_proto.next_convert_fails = True
+            test_proto.next_convert_fails = False
             return self.get_pipeline().start_retry_errors()
 
         return self.test_error_stub(
                 ).addCallback(start_retry
-                ).addCallback(open_target
-                ).addCallback(check_target
+                ).addCallback(lambda whateva: self.get_last_by_seq(2
+                ).addCallback(check_target_last)
                 )
 
     def test_all_steps(self):
         # We test the right thing happens running a 'full' pipeline
         # when our test converter throws an error.
         def check_last_doc(lasts):
-            self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
-            self.failUnlessEqual(lasts[1]['doc']['type'], 'core/error/msg')
+            # The tail of the DB should be as below:
+            expected = set(['core/workqueue', 'aggr/flags',
+                            'anno/flags', 'core/error/msg', 'proto/test'])
+            # Note the 'core/error/msg' is the failing conversion (ie, the
+            # error stub for the rfc822 message), and no 'email' record exists
+            # as it depends on the failing conversion.  The anno and aggr
+            # are independent of the failing message, so they complete.
+            got = set(l['doc']['type'] for l in lasts)
+            self.failUnlessEqual(got, expected)
 
         test_proto.next_convert_fails = True
         return self.get_pipeline().start(
-                ).addCallback(lambda whateva: self.get_last_by_seq(2)
+                ).addCallback(lambda whateva: self.get_last_by_seq(5)
                 ).addCallback(check_last_doc
                 )