more pipeline tests
authorMark Hammond <mhammond@skippinet.com.au>
Sun, 19 Apr 2009 16:38:26 +1000
changeset 206 1d5df7b9f952fb27701ff674fc085d181e6080bc
parent 205 ac11c66cb0bcf4da074dd41a41c7099b7a8eb4eb
child 207 e315de895c7ac8f2ae0c3e3d1258ac5259cb4617
push id24
push usermhammond@skippinet.com.au
push dateMon, 20 Apr 2009 01:22:13 +0000
more pipeline tests
server/python/raindrop/tests/pipeline/test_simple.py
--- a/server/python/raindrop/tests/pipeline/test_simple.py
+++ b/server/python/raindrop/tests/pipeline/test_simple.py
@@ -23,18 +23,18 @@ class TestPipelineBase(TestCaseWithTestD
 class TestPipeline(TestPipelineBase):
     def test_one_step(self):
         # Test taking a raw message one step along its pipeline.
         test_proto.next_convert_fails = False
 
         def open_target(whateva):
             return dm.open_document('msg', '0', 'raw/message/rfc822')
 
-        def check_target_last(last_by_seq, doc):
-            self.failUnlessEqual(last_by_seq['id'], doc['_id'])
+        def check_target_last(lasts_by_seq, doc):
+            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
             return doc
 
         def check_target(doc):
             # Our doc should be the last written
             self.failUnless(doc)
             return self.get_last_by_seq(
                         ).addCallback(check_target_last, doc
                         )
@@ -47,19 +47,19 @@ class TestPipeline(TestPipelineBase):
                 ).addCallback(check_target
                 )
 
     def test_one_again_does_nothing(self):
         # Test that attempting to process a message which has already been
         # processed is a noop.
         dm = get_doc_model()
 
-        def check_target_same(last, target_b4):
+        def check_target_same(lasts, target_b4):
             # Re-processing should not have modified the target in any way.
-            self.failUnlessEqual(last['doc'], target_b4)
+            self.failUnlessEqual(lasts[0]['doc'], target_b4)
 
         def check_nothing_done(whateva, target_b4):
             return self.get_last_by_seq(
                         ).addCallback(check_target_same, target_b4
                         )
 
         def reprocess(src_doc, target_b4):
             return self.process_doc(src_doc
@@ -72,36 +72,47 @@ class TestPipeline(TestPipelineBase):
 
         return self.test_one_step(
                 ).addCallback(do_it_again
                 )
         
 
     def test_two_steps(self):
         # Test taking a raw message two steps along its pipeline.
-        def check_last_doc(last):
-            self.failUnless(last['id'].endswith("!raw/message/email"), last)
+        def check_last_doc(lasts):
+            self.failUnless(lasts[0]['id'].endswith("!raw/message/email"), lasts)
 
         return self.test_one_step(
                 ).addCallback(self.process_doc
                 ).addCallback(lambda whateva: self.get_last_by_seq()
                 ).addCallback(check_last_doc
                 )
 
+    def test_all_steps(self):
+        def check_last_doc(lasts):
+            self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
+            self.failUnless(lasts[1]['id'].endswith("!aggr/tags"), lasts)
+
+        test_proto.next_convert_fails = False
+        return self.get_pipeline().start(
+                ).addCallback(lambda whateva: self.get_last_by_seq(2)
+                ).addCallback(check_last_doc
+                )
+
 class TestErrors(TestPipelineBase):
     def test_error_stub(self):
         # Test that when a converter fails an appropriate error record is
         # written
         test_proto.next_convert_fails = True
 
         def open_target(whateva):
             return dm.open_document('msg', '0', 'raw/message/rfc822')
 
-        def check_target_last(last_by_seq, doc):
-            self.failUnlessEqual(last_by_seq['id'], doc['_id'])
+        def check_target_last(lasts_by_seq, doc):
+            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
             self.failUnlessEqual(doc['type'], 'core/error/msg')
             self.failUnless('This is a test failure' in doc['error_details'],
                             doc['error_details'])
             return doc
 
         def check_target(doc):
             # Our doc should be the last written and be an error records.
             self.failUnless(doc)
@@ -118,18 +129,18 @@ class TestErrors(TestPipelineBase):
                 )
 
     def test_reprocess_errors(self):
         # Test that reprocessing an error results in the correct thing.
         dm = get_doc_model()
         def open_target(whateva):
             return dm.open_document('msg', '0', 'raw/message/rfc822')
 
-        def check_target_last(last_by_seq, doc):
-            self.failUnlessEqual(last_by_seq['id'], doc['_id'])
+        def check_target_last(lasts_by_seq, doc):
+            self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
             return doc
 
         def check_target(doc):
             # Our doc should be the last written
             self.failUnless(doc)
             return self.get_last_by_seq(
                         ).addCallback(check_target_last, doc
                         )
@@ -138,8 +149,21 @@ class TestErrors(TestPipelineBase):
             test_proto.next_convert_fails = True
             return self.get_pipeline().start_retry_errors()
 
         return self.test_error_stub(
                 ).addCallback(start_retry
                 ).addCallback(open_target
                 ).addCallback(check_target
                 )
+
+    def test_all_steps(self):
+        # We test the right thing happens running a 'full' pipeline
+        # when our test converter throws an error.
+        def check_last_doc(lasts):
+            self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
+            self.failUnlessEqual(lasts[1]['doc']['type'], 'core/error/msg')
+
+        test_proto.next_convert_fails = True
+        return self.get_pipeline().start(
+                ).addCallback(lambda whateva: self.get_last_by_seq(2)
+                ).addCallback(check_last_doc
+                )