--- a/schema/proto/workqueue/source_revs-map.js
+++ b/schema/proto/workqueue/source_revs-map.js
@@ -1,4 +1,4 @@
function(doc) {
if (doc.raindrop_sources)
- emit(doc._id, doc.raindrop_sources);
+ emit(doc._id, [doc._rev, doc.raindrop_sources]);
}
--- a/server/python/raindrop/pipeline.py
+++ b/server/python/raindrop/pipeline.py
@@ -154,19 +154,20 @@ class Pipeline(object):
# XXX - see below - should we reuse self.coop, or is that unsafe?
coop = task.Cooperator()
return coop.coiterate(gen_work())
def _cb_got_error_source(self, source):
# XXX - see below - should we reuse self.coop, or is that unsafe?
coop = task.Cooperator()
return coop.coiterate(self.gen_transition_tasks(
- source['_id'], source['_rev']))
+ source['_id'], source['_rev'],
+ force=True))
- def gen_transition_tasks(self, src_id, src_rev,
+ def gen_transition_tasks(self, src_id, src_rev, force=False,
# caller can supply this dict if they care...
caller_ret={'num_processed': 0}):
# A generator which checks if each of its 'targets' (ie, the extension
# which depends on this document) is up-to-date wrt this document's
# revision.
logger.debug("generating transition tasks for %r (rev=%s)", src_id,
src_rev)
@@ -187,51 +188,55 @@ class Pipeline(object):
for target_info in targets:
target_cat, target_type = target_info
assert target_cat==doc_cat, (target_cat, doc_cat)
target_id = dm.build_docid(target_cat, target_type, proto_id)
cvtr = converters[(doc_cat, doc_type), target_info]
logger.debug('looking for existing document %r', target_id)
yield dm.open_view('raindrop!proto!workqueue', 'source_revs',
- key=target_id
+ key=target_id
).addCallback(self._cb_check_existing_doc, cvtr,
- proto_id, src_id, src_rev, caller_ret)
+ proto_id, src_id, src_rev, force,
+ caller_ret)
def _cb_check_existing_doc(self, result, cvtr, proto_id, src_id, src_rev,
- caller_ret):
+ force, caller_ret):
rows = result['rows']
if len(rows)==0:
# no target exists at all - needs to be executed...
need_target = True
+ target_rev = None
else:
assert len(rows)==1, rows # what does more than 1 mean?
- all_sources = rows[0]['value']
+ val = rows[0]['value']
+ target_rev, all_sources = val
look = [src_id, src_rev]
- need_target = look not in all_sources
+ need_target = force or look not in all_sources
logger.debug("need target=%s (looked for %r in %r)", need_target,
look, all_sources)
if not need_target:
return None # nothing to do.
- caller_ret['num_processed'] += 1
- return self.make_document(cvtr, proto_id)
+ return self._make_document(cvtr, proto_id, target_rev, caller_ret)
- def make_document(self, cvtr, proto_id):
+ def _make_document(self, cvtr, proto_id, target_rev, caller_ret):
# OK - need to create this new type - locate all dependents in the
# DB - this will presumably include the document which triggered
# the process...
+ caller_ret['num_processed'] += 1
all_deps = []
dm = self.doc_model
for src_cat, src_type in cvtr.sources:
all_deps.append(dm.build_docid(src_cat, src_type, proto_id))
return dm.db.listDoc(keys=all_deps, include_docs=True,
- ).addCallback(self._cb_do_conversion, cvtr, proto_id)
+ ).addCallback(self._cb_do_conversion, cvtr, proto_id,
+ target_rev)
- def _cb_do_conversion(self, result, cvtr, proto_id):
+ def _cb_do_conversion(self, result, cvtr, proto_id, target_rev):
sources = []
for r in result['rows']:
if 'error' in r:
# This is usually a simple 'not found' error; it doesn't
# mean an 'error record'
logger.debug('skipping document %(key)s - %(error)s', r)
continue
if 'deleted' in r['value']:
@@ -242,19 +247,20 @@ class Pipeline(object):
logger.debug("calling %r to create a %s from %d docs", cvtr,
cvtr.target_type, len(sources))
if not sources:
# no source documents - that's strange but OK - when sources
# appear we will get here again...
return None
return defer.maybeDeferred(cvtr.convert, sources
).addBoth(self._cb_converted_or_not,
- cvtr, sources, proto_id)
+ cvtr, sources, proto_id, target_rev)
- def _cb_converted_or_not(self, result, target_ext, sources, proto_id):
+ def _cb_converted_or_not(self, result, target_ext, sources, proto_id,
+ target_rev):
# This is both a callBack and an errBack. If a converter fails to
# create a document, we can't just fail, or no later messages in the
# DB will ever get processed!
# So we write a "dummy" record - it has the same docid that the real
# document would have - but the 'type' attribute in the document
# reflects it is actually an error marker.
dest_type = target_ext.target_type[1]
if isinstance(result, Failure):
@@ -278,23 +284,26 @@ class Pipeline(object):
new_doc = result
self.doc_model.prepare_ext_document(proto_id, dest_type, new_doc)
logger.debug("converter returned new document type %r for %r: %r",
dest_type, proto_id, new_doc['_id'])
# In theory, the source ID of each doc that contributed is
# redundant as it could be deduced. But we need the revision to
# check if we are up-to-date wrt our 'children'...
new_doc['raindrop_sources'] = [(s['_id'], s['_rev']) for s in sources]
+ if target_rev is not None:
+ new_doc['_rev'] = target_rev
return self.doc_model.create_ext_documents([new_doc])
def generate_work_queue(doc_model, transition_gen_factory):
"""generate deferreds which will determine where our processing is up
- to and walk us through the _all_docs_by_seq view from that point,
- creating and saving new docs as it goes. When the generator finishes
- the queue is empty."""
+ to and walk us through the _all_docs_by_seq view from that point. This
+ generator itself determines the source documents to process, then passes
+ each of those documents through another new generator, which actually
+ calls the extensions and saves the docs."""
def _cb_update_state_doc(result, d):
if result is not None:
assert d['_id'] == result['_id'], result
d.update(result)
# else no doc exists - the '_id' remains in the default though.
def _cb_by_seq_opened(result, state_doc, convert_result):
rows = result['rows']
@@ -316,17 +325,18 @@ def generate_work_queue(doc_model, trans
# twisted loops, using our own should be ok though.
coop = task.Cooperator()
return coop.coiterate(_gen_tasks_from_seq_rows(rows, convert_result))
def _gen_tasks_from_seq_rows(rows, convert_result):
for row in rows:
did = row['id']
src_rev = row['value']['rev']
- for task in transition_gen_factory(did, src_rev, convert_result):
+ for task in transition_gen_factory(did, src_rev,
+ caller_ret=convert_result):
yield task
# first open our 'state' document
state_doc = {'_id': 'workqueue!msg',
'type': u"core/workqueue",
'seq': 0}
yield doc_model.open_document_by_id(state_doc['_id'],
).addCallback(_cb_update_state_doc, state_doc)
--- a/server/python/raindrop/tests/pipeline/test_simple.py
+++ b/server/python/raindrop/tests/pipeline/test_simple.py
@@ -1,12 +1,11 @@
# The first raindrop unittest!
-# execute via: path/to/twisted/scripts/trial.py path/to/this_file.py
-from twisted.internet import task
+from twisted.internet import task, defer
from raindrop.tests import TestCaseWithTestDB, FakeOptions
from raindrop.model import get_doc_model, encode_proto_id
from raindrop.proto import test as test_proto
from raindrop import pipeline
class TestPipelineBase(TestCaseWithTestDB):
def get_pipeline(self):
@@ -20,75 +19,80 @@ class TestPipelineBase(TestCaseWithTestD
return coop.coiterate(
pl.gen_transition_tasks(doc['_id'], doc['_rev']))
class TestPipeline(TestPipelineBase):
def test_one_step(self):
# Test taking a raw message one step along its pipeline.
test_proto.next_convert_fails = False
- def open_target(whateva):
- return dm.open_document('msg', '0', 'raw/message/rfc822')
+ def check_targets_last(lasts_by_seq, target_types):
+ docs = [row['doc'] for row in lasts_by_seq]
+ db_types = set(doc['type'] for doc in docs)
+ self.failUnlessEqual(db_types, target_types)
+ return docs
- def check_target_last(lasts_by_seq, doc):
- self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
- return doc
-
- def check_target(doc):
- # Our doc should be the last written
- self.failUnless(doc)
- return self.get_last_by_seq(
- ).addCallback(check_target_last, doc
+ def check_targets(result, target_types):
+ # Our targets should be the last written
+ return self.get_last_by_seq(len(target_types),
+ ).addCallback(check_targets_last, target_types
)
# open the test document to get its ID and _rev.
+ targets = set(('raw/message/rfc822', 'anno/flags'))
dm = get_doc_model()
return dm.open_document('msg', '0', 'proto/test'
).addCallback(self.process_doc
- ).addCallback(open_target
- ).addCallback(check_target
+ ).addCallback(check_targets, targets
)
def test_one_again_does_nothing(self):
# Test that attempting to process a message which has already been
# processed is a noop.
dm = get_doc_model()
- def check_target_same(lasts, target_b4):
- # Re-processing should not have modified the target in any way.
- self.failUnlessEqual(lasts[0]['doc'], target_b4)
+ def check_targets_same(lasts, targets_b4):
+ # Re-processing should not have modified the targets in any way.
+ db_ids = set(row['id'] for row in lasts)
+ expected = set(doc['_id'] for doc in targets_b4)
+ self.failUnlessEqual(db_ids, expected)
- def check_nothing_done(whateva, target_b4):
- return self.get_last_by_seq(
- ).addCallback(check_target_same, target_b4
+ def check_nothing_done(whateva, targets_b4):
+ return self.get_last_by_seq(len(targets_b4),
+ ).addCallback(check_targets_same, targets_b4
)
- def reprocess(src_doc, target_b4):
+ def reprocess(src_doc, targets_b4):
return self.process_doc(src_doc
- ).addCallback(check_nothing_done, target_b4)
+ ).addCallback(check_nothing_done, targets_b4)
- def do_it_again(target_doc):
+ def do_it_again(target_docs):
return dm.open_document('msg', '0', 'proto/test'
- ).addCallback(reprocess, target_doc
+ ).addCallback(reprocess, target_docs
)
return self.test_one_step(
).addCallback(do_it_again
)
def test_two_steps(self):
# Test taking a raw message two steps along its pipeline.
- def check_last_doc(lasts):
- self.failUnless(lasts[0]['id'].endswith("!raw/message/email"), lasts)
+ def check_last_docs(lasts, target_types):
+ db_types = set(row['doc']['type'] for row in lasts)
+ self.failUnlessEqual(db_types, target_types)
+ def process_nexts(targets):
+ return defer.DeferredList([self.process_doc(d) for d in targets])
+
+ target_types = set(('raw/message/email', 'aggr/flags'))
return self.test_one_step(
- ).addCallback(self.process_doc
- ).addCallback(lambda whateva: self.get_last_by_seq()
- ).addCallback(check_last_doc
+ ).addCallback(process_nexts,
+ ).addCallback(lambda whateva: self.get_last_by_seq(len(target_types))
+ ).addCallback(check_last_docs, target_types
)
def test_all_steps(self):
def check_last_doc(lasts):
self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
self.failUnless(lasts[1]['id'].endswith("!aggr/tags"), lasts)
test_proto.next_convert_fails = False
@@ -98,72 +102,61 @@ class TestPipeline(TestPipelineBase):
)
class TestErrors(TestPipelineBase):
def test_error_stub(self):
# Test that when a converter fails an appropriate error record is
# written
test_proto.next_convert_fails = True
- def open_target(whateva):
- return dm.open_document('msg', '0', 'raw/message/rfc822')
-
- def check_target_last(lasts_by_seq, doc):
- self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
- self.failUnlessEqual(doc['type'], 'core/error/msg')
- self.failUnless('This is a test failure' in doc['error_details'],
- doc['error_details'])
- return doc
-
- def check_target(doc):
- # Our doc should be the last written and be an error records.
- self.failUnless(doc)
- return self.get_last_by_seq(
- ).addCallback(check_target_last, doc
- )
+ def check_target_last(lasts):
+ # The current pipeline means that the 'raw/message/email' is an
+ # error but the 'anno/flags' works...
+ expected = set(('anno/flags', 'core/error/msg'))
+ types = set([row['doc']['type'] for row in lasts])
+ self.failUnlessEqual(types, expected)
# open the test document to get its ID and _rev.
dm = get_doc_model()
return dm.open_document('msg', '0', 'proto/test'
).addCallback(self.process_doc
- ).addCallback(open_target
- ).addCallback(check_target
+ ).addCallback(lambda whateva: self.get_last_by_seq(2)
+ ).addCallback(check_target_last
)
def test_reprocess_errors(self):
# Test that reprocessing an error results in the correct thing.
dm = get_doc_model()
- def open_target(whateva):
- return dm.open_document('msg', '0', 'raw/message/rfc822')
- def check_target_last(lasts_by_seq, doc):
- self.failUnlessEqual(lasts_by_seq[0]['id'], doc['_id'])
- return doc
-
- def check_target(doc):
- # Our doc should be the last written
- self.failUnless(doc)
- return self.get_last_by_seq(
- ).addCallback(check_target_last, doc
- )
+ def check_target_last(lasts):
+ got = set(row['doc']['type'] for row in lasts)
+ expected = set(('anno/flags', 'raw/message/rfc822'))
+ self.failUnlessEqual(got, expected)
def start_retry(whateva):
- test_proto.next_convert_fails = True
+ test_proto.next_convert_fails = False
return self.get_pipeline().start_retry_errors()
return self.test_error_stub(
).addCallback(start_retry
- ).addCallback(open_target
- ).addCallback(check_target
+ ).addCallback(lambda whateva: self.get_last_by_seq(2
+ ).addCallback(check_target_last)
)
def test_all_steps(self):
# We test the right thing happens running a 'full' pipeline
# when our test converter throws an error.
def check_last_doc(lasts):
- self.failUnlessEqual(lasts[0]['id'], 'workqueue!msg')
- self.failUnlessEqual(lasts[1]['doc']['type'], 'core/error/msg')
+ # The tail of the DB should be as below:
+ expected = set(['core/workqueue', 'aggr/flags',
+ 'anno/flags', 'core/error/msg', 'proto/test'])
+ # Note the 'core/error/msg' is the failing conversion (ie, the
+ # error stub for the rfc822 message), and no 'email' record exists
+ # as it depends on the failing conversion. The anno and aggr
+ # are independent of the failing message, so they complete.
+ got = set(l['doc']['type'] for l in lasts)
+ self.failUnlessEqual(got, expected)
test_proto.next_convert_fails = True
return self.get_pipeline().start(
- ).addCallback(lambda whateva: self.get_last_by_seq(2)
+ ).addCallback(lambda whateva: self.get_last_by_seq(5)
).addCallback(check_last_doc
)