formalized message-ids and a _by_seq based work-queue twisty
authorMark Hammond <mhammond@skippinet.com.au>
Fri, 03 Apr 2009 16:02:47 +1100
branchtwisty
changeset 153 c109e37a2770ba8cdb51b5cb59334a705848e2dc
parent 152 2eb31d62489c13339c85b9d2d0129c1dce6c25d0
child 154 6ad39a0fb3d061529bc6692d7a2373d039dcefea
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
formalized message-ids and a _by_seq based work-queue
schema/messages/workqueue/by_doc_extension_sequence-map.js
schema/messages/workqueue/by_doc_roots-map.js
schema/messages/workqueue/by_doc_roots-reduce.js
schema/proto/README.txt
schema/proto/imap/seen-map.js
schema/proto/seen/imap-map.js
schema/proto/seen/skype-map.js
schema/proto/seen/twitter-map.js
schema/proto/seen/twitter-reduce.js
schema/proto/skype/seen-map.js
server/python/junius/ext/message/rfc822.py
server/python/junius/model.py
server/python/junius/pipeline.py
server/python/junius/proto/imap.py
server/python/junius/proto/skype.py
server/python/junius/proto/test/__init__.py
server/python/junius/proto/twitter.py
deleted file mode 100644
--- a/schema/messages/workqueue/by_doc_extension_sequence-map.js
+++ /dev/null
@@ -1,5 +0,0 @@
-function(doc) 
-{
-    var root_id = doc._id.split("!", 1)[0];
-    emit([root_id, doc.raindrop_seq], doc.type);
-}
deleted file mode 100644
--- a/schema/messages/workqueue/by_doc_roots-map.js
+++ /dev/null
@@ -1,5 +0,0 @@
-function(doc) 
-{
-    var root_id = doc._id.split("!", 1)[0];
-    emit(root_id, null);
-}
deleted file mode 100644
--- a/schema/messages/workqueue/by_doc_roots-reduce.js
+++ /dev/null
@@ -1,4 +0,0 @@
-function(keys, vals)
-{
-    return vals.length
-}
new file mode 100644
--- /dev/null
+++ b/schema/proto/README.txt
@@ -0,0 +1,9 @@
+These are the views for the protocol implementations.
+
+Note that each protocol doesn't get its own design document - instead, the
+design documents are structured such a ones likely to be used together are
+grouped together, so they all get built at once.
+
+For example, the 'seen' design document includes one view per protocol - the
+expectation is in general, each protocol will need to process the same set of
+"new" documents to determine what they need to fetch.
rename from schema/proto/imap/seen-map.js
rename to schema/proto/seen/imap-map.js
rename from schema/proto/skype/seen-map.js
rename to schema/proto/seen/skype-map.js
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-map.js
@@ -0,0 +1,8 @@
+function(doc) {
+    if (doc.type=='proto/twitter') {
+        // Twitter uses an incrementing ID per user, and allows us to fetch
+        // only tweets since that ID.  This allows us to use map/reduce to
+        // find the exact ID we need per user.
+        emit(doc.twitter_user.toString(), doc.twitter_id);
+    }
+}
new file mode 100644
--- /dev/null
+++ b/schema/proto/seen/twitter-reduce.js
@@ -0,0 +1,10 @@
+function(keys, values)
+{
+    // we just need the largest ID we've ever seen.
+    // *sob* - where is max()?
+    ret = 0;
+    for each (var v in values)
+        if (v > ret)
+            ret = v;
+    return ret;
+}
--- a/server/python/junius/ext/message/rfc822.py
+++ b/server/python/junius/ext/message/rfc822.py
@@ -61,17 +61,17 @@ def doc_from_bytes(b):
                 logger.error("Failed to fallback decode mail from utf8: %s", exc)
                 body = body_bytes.decode('utf8', 'ignore')
         doc['body'] = body
     return doc
 
 class RFC822Converter(base.ConverterBase):
     def convert(self, doc):
         # I need the binary attachment.
-        return self.doc_model.open_document(doc['_id'], attachment="rfc822"
+        return self.doc_model.open_attachment(doc['_id'], "rfc822",
                   ).addCallback(self._cb_got_attachment, doc)
 
     def _cb_got_attachment(self, body, doc):
         # a 'rfc822' stores 'headers' as a dict
         headers = doc['headers']
         # for now, 'from' etc are all tuples of [identity_type, identity_id]
         # XXX - todo - find one of the multi-part bits to use as the body.
         try:
--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -1,54 +1,61 @@
 import sys
-import os
 import logging
 import time
-import urllib
+from urllib import urlencode, quote
+import base64
 
 import twisted.web.error
 from twisted.internet import defer
 from twisted.python.failure import Failure
 
 try:
     import simplejson as json
 except ImportError:
     import json # Python 2.6
 
 import paisley
-from couchdb import schema
 from .config import get_config
 
 
 config = get_config()
 
 class _NotSpecified:
     pass
 
 logger = logging.getLogger('model')
 
 DBs = {}
 
 # XXXX - this relies on couch giving us some kind of 'sequence id'.  For
 # now we use a timestamp, but that obviously sucks in many scenarios.
-if sys.platform=='win32':
+if sys.platform == 'win32':
     # woeful resolution on windows and equal timestamps are bad.
     clock_start = time.time()
     time.clock() # time.clock starts counting from zero the first time its called.
     def get_seq():
         return clock_start + time.clock()
 else:
     get_seq = time.time
 
 
+def encode_proto_id(proto_id):
+    # a 'protocol' gives us a 'blob' used to identify the document; we create
+    # a real docid from that protocol_id; we base64-encode what was given to
+    # us to avoid the possibility of a '!' char, and also to better accomodate
+    # truly binary strings (eg, a pickle or something bizarre)
+    return base64.encodestring(proto_id).replace('\n', '')
+
+
 # from the couchdb package; not sure what makes these names special...
 def _encode_options(options):
     retval = {}
     for name, value in options.items():
-        if name in ('key', 'startkey', 'endkey') \
+        if name in ('key', 'startkey', 'endkey', 'include_docs') \
                 or not isinstance(value, basestring):
             value = json.dumps(value, allow_nan=False, ensure_ascii=False)
         retval[name] = value
     return retval
 
 
 class CouchDB(paisley.CouchDB):
     def postob(self, uri, ob):
@@ -132,176 +139,207 @@ class CouchDB(paisley.CouchDB):
             else:
                 raise TypeError('expected dict, got %s' % type(doc))
         url = "/%s/_bulk_docs" % dbName
         body = json.dumps({'docs': docs})
         return self.post(url, body
                     ).addCallback(self.parseResult
                     )
 
+    def listDocsBySeq(self, dbName, reverse=False, startKey=0, limit=-1, **kw):
+        """
+        List all documents in a given database by the document's sequence number
+        """
+        # Response:
+        # {"total_rows":1597,"offset":0,"rows":[
+        # {"id":"test","key":1,"value":{"rev":"4104487645"}},
+        # {"id":"skippyhammond","key":2,"value":{"rev":"121469801"}},
+        # ...
+        uri = "/%s/_all_docs_by_seq" % (dbName,)
+        args = {}
+        if reverse:
+            args["reverse"] = "true"
+        if startKey > 0:
+            args["startkey"] = int(startKey)
+        if limit >= 0:
+            args["limit"] = int(limit)
+        # suck the rest of the kwargs in
+        args.update(_encode_options(kw))
+        if args:
+            uri += "?%s" % (urlencode(args),)
+        return self.get(uri
+            ).addCallback(self.parseResult)
+
     # Hack so our new bound methods work.
     def bindToDB(self, dbName):
         super(CouchDB, self).bindToDB(dbName)
         partial = paisley.partial # it works hard to get this!
-        for methname in ["saveAttachment", "updateDocuments"]:
+        for methname in ["saveAttachment", "updateDocuments",
+                         "listDocsBySeq"]:
             method = getattr(self, methname)
             newMethod = partial(method, dbName)
             setattr(self, methname, newMethod)
 
 
+# XXX - get_db should die as a global/singleton - only our DocumentModel
+# instance should care about that.  Sadly, bootstrap.py is a (small; later)
+# problem here...
 def get_db(couchname="local", dbname=_NotSpecified):
     dbinfo = config.couches[couchname]
     if dbname is _NotSpecified:
         dbname = dbinfo['name']
     key = couchname, dbname
     try:
         return DBs[key]
     except KeyError:
         pass
     logger.info("Connecting to couchdb at %s", dbinfo)
     db = CouchDB(dbinfo['host'], dbinfo['port'], dbname)
     DBs[key] = db
     return db
 
 def quote_id(doc_id):
-    return urllib.quote(doc_id, safe="")
+    return quote(doc_id, safe="")
 
 class DocumentModel(object):
     """The layer between 'documents' and the 'database'.  Responsible for
        creating the unique ID for each document (other than the raw document),
        for fetching documents based on an ID, etc
     """
     def __init__(self, db):
         self.db = db
 
     def open_view(self, *args, **kwargs):
         # A convenience method for completeness so consumers don't hit the
         # DB directly (and to give a consistent 'style').  Is it worth it?
         return self.db.openView(*args, **kwargs)
 
-    def open_document(self, doc_id, **kw):
+    def open_attachment(self, doc_id, attachment, **kw):
+        """Open an attachment for the given docID.  As this is generally done
+        when processing the document itself, so the raw ID of the document
+        itself is known.  For this reason, a docid rather than the parts is
+        used.
+        """
+        return self.db.openDoc(quote_id(doc_id), attachment=attachment, **kw
+                    ).addBoth(self._cb_doc_opened)
+
+    def open_document(self, category, proto_id, ext_type, **kw):
         """Open the specific document, returning None if it doesn't exist"""
-        return self.db.openDoc(quote_id(doc_id), **kw).addBoth(self._cb_doc_opened)
+        docid = '%s!%s!%s' % (category, encode_proto_id(proto_id), ext_type)
+        return self.open_document_by_id(docid, **kw)
+
+    def open_document_by_id(self, doc_id, **kw):
+        """Open a document by the already constructed docid"""
+        logger.debug("attempting to open document %r", doc_id)
+        return self.db.openDoc(quote_id(doc_id), **kw
+                    ).addBoth(self._cb_doc_opened)
 
     def _cb_doc_opened(self, result):
         if isinstance(result, Failure):
             result.trap(twisted.web.error.Error)
             if result.value.status != '404': # not found
                 result.raiseException()
             result = None # indicate no doc exists.
         return result
 
     def _prepare_raw_doc(self, account, docid, doc, doc_type):
         assert '_id' not in doc, doc # that isn't how you specify the ID.
-        assert '!' not in docid, docid # these chars are special.
+        doc['_id'] = docid
         assert 'raindrop_account' not in doc, doc # we look after that!
         doc['raindrop_account'] = account.details['_id']
 
         assert 'type' not in doc, doc # we look after that!
         doc['type'] = doc_type
 
         assert 'raindrop_seq' not in doc, doc # we look after that!
         doc['raindrop_seq'] = get_seq()
 
     def create_raw_documents(self, account, doc_infos):
-        """A high-performance version of 'create_raw_document', but doesn't
-        support attachments yet."""
+        """Entry-point to create raw documents.  The API reflects that
+        creating multiple documents in a batch is faster; if you want to
+        create a single doc, just put it in a list
+        """
         docs = []
         dids = [] # purely for the log :(
         logger.debug('create_raw_documents preparing %d docs', len(doc_infos))
-        for (docid, doc, doc_type) in doc_infos:
+        for (proto_id, doc, doc_type) in doc_infos:
+            docid = 'msg!%s!%s' % (encode_proto_id(proto_id), doc_type)
             self._prepare_raw_doc(account, docid, doc, doc_type)
-            # in a bulk-update, the ID is in the doc itself.
-            doc['_id'] = docid
+            # XXX - this hardcoding is obviously going to be a problem when
+            # we need to add 'contacts' etc :(
             docs.append(doc)
             dids.append(docid)
+        attachments = self._prepare_attachments(docs)
         logger.debug('create_raw_documents saving docs %s', dids)
         return self.db.updateDocuments(docs
-                    ).addCallback(self._cb_saved_multi_docs,
-                    ).addErrback(self._cb_save_multi_failed,
+                    ).addCallback(self._cb_saved_docs, attachments
                     )
 
-    def _cb_saved_multi_docs(self, result):
-        # result: {'ok': True, 'new_revs': [{'rev': 'xxx', 'id': '...'}, ...]}
-        logger.debug("saved multiple docs with result=%(ok)s", result)
-        return result
-
-    def _cb_save_multi_failed(self, failure):
-        logger.error("Failed to save lotsa docs: %s", failure)
-        failure.raiseException()
-
-    def create_raw_document(self, account, docid, doc, doc_type):
-        self._prepare_raw_doc(account, docid, doc, doc_type)
-        # XXX - attachments need more thought - ultimately we need to be able
-        # to 'push' them via a generator or similar to avoid reading them
-        # entirely in memory.  Further, we need some way of the document
-        # knowing if the attachment failed (or vice-versa) given we have
-        # no transactional semantics.
-        # fixup attachments.
-        try:
-            attachments = doc['_attachments']
-            # nuke attachments specified
-            del doc['_attachments']
-        except KeyError:
-            attachments = None
+    def _prepare_attachments(self, docs):
+        # called internally when creating a batch of documents. Returns a list
+        # of attachments which should be saved separately.
 
-        # save the document.
-        logger.debug('create_raw_document saving doc %r', docid)
-        qid = quote_id(docid)
-        return self.db.saveDoc(doc, docId=qid,
-                    ).addCallback(self._cb_saved_document, 'raw-message', docid
-                    ).addErrback(self._cb_save_failed, 'raw-message', docid
-                    ).addCallback(self._cb_save_attachments, attachments
-                    )
-
-    def _cb_saved_document(self, result, what, ids):
-        logger.debug("Saved %s %s", what, result)
-        # XXX - now what?
-        return result
+        # The intent is that later we can optimize this - if the attachment
+        # is small, we can keep it in the document base64 encoded and save
+        # a http connection.  For now though we just do all attachments
+        # separately.
 
-    def _cb_save_failed(self, failure, what, ids):
-        logger.error("Failed to save %s (%r): %s", what, ids, failure)
-        failure.raiseException()
-
-    def prepare_ext_document(self, rootdocid, doc_type, doc):
-        assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
-        assert 'type' not in doc, doc # we manage this too!
-        assert 'raindrop_seq' not in doc, doc # we look after that!
-        doc['raindrop_seq'] = get_seq()
-        doc['type'] = doc_type
-        doc['_id'] = rootdocid + "!" + doc['type'] # docs ids need more thought...
-
-    def create_ext_documents(self, rootdocid, docs):
-        # Attachments are all done separately.  We could optimize this -
-        # eg, attachments under a certain size could go in the doc itself,
-        # saving a request but costing a base64 encode.
+        # attachment processing still need more thought - ultimately we need
+        # to be able to 'push' them via a generator or similar to avoid
+        # reading them entirely in memory. Further, we need some way of the
+        # document knowing if the attachment failed (or vice-versa) given we
+        # have no transactional semantics.
         all_attachments = []
         for doc in docs:
             assert '_id' in doc # should have called prepare_ext_document!
             try:
                 all_attachments.append(doc['_attachments'])
                 # nuke attachments specified
                 del doc['_attachments']
             except KeyError:
+                # It is important we put 'None' here so the list of
+                # attachments is exactly parallel with the list of docs.
                 all_attachments.append(None)
+        assert len(all_attachments)==len(docs)
+        return all_attachments
 
+    def prepare_ext_document(self, rootdocid, doc_type, doc):
+        """Called by extensions to setup the raindrop maintained attributes
+           of the documents, including the document ID
+        """
+        assert '_id' not in doc, doc # We manage IDs for all but 'raw' docs.
+        assert 'type' not in doc, doc # we manage this too!
+        assert 'raindrop_seq' not in doc, doc # we look after that!
+        doc['raindrop_seq'] = get_seq()
+        doc['type'] = doc_type
+        doc['_id'] = "msg!" + rootdocid + "!" + doc['type'] # docs ids need more thought...
+
+    def create_ext_documents(self, docs):
+        for doc in docs:
+            assert '_id' in doc # should have called prepare_ext_document!
+        attachments = self._prepare_attachments(docs)
         # save the document.
-        logger.debug('saving %d extension documents for %r', len(docs), rootdocid)
+        logger.debug('saving %d extension documents', len(docs))
         return self.db.updateDocuments(docs,
-                    ).addCallback(self._cb_saved_ext_docs, all_attachments
+                    ).addCallback(self._cb_saved_docs, attachments
                     )
 
-    def _cb_saved_ext_docs(self, result, attachments):
+    def _cb_saved_docs(self, result, attachments):
         # result: {'ok': True, 'new_revs': [{'rev': 'xxx', 'id': '...'}, ...]}
         logger.debug("saved multiple docs with result=%(ok)s", result)
         ds = []
         for dinfo, dattach in zip(result['new_revs'], attachments):
             if dattach:
                 ds.append(self._cb_save_attachments(dinfo, dattach))
-        return defer.DeferredList(ds)
+        def return_orig(ignored_result):
+            return result
+        # XXX - the result set will *not* have the correct _rev if there are
+        # attachments :(
+        return defer.DeferredList(ds
+                    ).addCallback(return_orig)
 
     def _cb_save_attachments(self, saved_doc, attachments):
         if not attachments:
             return saved_doc
         # Each time we save an attachment the doc gets a new revision number.
         # So we need to do them in a chain, passing the result from each to
         # the next.
         remaining = attachments.copy()
@@ -314,43 +352,30 @@ class DocumentModel(object):
         revision = result['rev']
         docid = result['id']
         name, info = remaining.popitem()
         logger.debug('saving attachment %r to doc %r', name, docid)
         return self.db.saveAttachment(quote_id(docid),
                                    quote_id(name), info['data'],
                                    content_type=info['content_type'],
                                    revision=revision,
-                ).addCallback(self._cb_saved_document, 'attachment', (docid, name)
-                ).addErrback(self._cb_save_failed, 'attachment', (docid, name)
+                ).addCallback(self._cb_saved_attachment, (docid, name)
+                ).addErrback(self._cb_save_failed, (docid, name)
                 ).addCallback(self._cb_save_next_attachment, remaining
                 )
 
-    def get_last_ext_for_document(self, doc_id):
-        """Given a base docid, find the most-recent extension to have run.
-        This will differ from the latest extension in the document chain if
-        the document chain has been 'reset' for any reason (eg, change
-        detected at the source of the message, user adding annotations, etc)
-        """
-        startkey = [doc_id]
-        endkey = [doc_id, {}]
-        return self.open_view('raindrop!messages!workqueue',
-                              'by_doc_extension_sequence',
-                              startkey=startkey, endkey=endkey
-                    ).addCallback(self._cb_des_opened, doc_id)
+    def _cb_saved_attachment(self, result, ids):
+        logger.debug("Saved attachment %s", result)
+        # XXX - now what?
+        return result
 
-    def _cb_des_opened(self, result, doc_id):
-        rows = result['rows']
-        if not rows:
-            ret = None, None
-        else:
-            last = rows[-1]
-            ret = last["value"], last["id"]
-        logger.debug("document '%s' has last-extension of %r", doc_id, ret)
-        return ret
+    def _cb_save_failed(self, failure, ids):
+        logger.error("Failed to save attachment (%r): %s", what, ids, failure)
+        failure.raiseException()
+
 
 _doc_model = None
 
 def get_doc_model():
     global _doc_model
     if _doc_model is None:
         _doc_model = DocumentModel(get_db())
     return _doc_model
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -1,11 +1,12 @@
-# This is the raindrop pipeline; it moves messages from their most raw
-# form to their most useful form.
-from twisted.internet import defer, reactor, task
+""" This is the raindrop pipeline; it moves messages from their most raw
+form to their most useful form.
+"""
+from twisted.internet import defer, task
 import logging
 
 logger = logging.getLogger(__name__)
 
 # Simple forward chaining.
 chain = [
     # from_type, to_type, transformer)
     ('proto/test',         'raw/message/rfc822',
@@ -51,135 +52,200 @@ class Pipeline(object):
             else:
                 assert not to_type, 'no xformname must mean no to_type'
                 self.forward_chain[from_type] = None
 
     def unprocess(self):
         # A bit of a hack that will suffice until we get better dependency
         # management.  Determines which doc-types are 'derived', then deletes
         # them all.
+        def delete_a_doc(doc, rid):
+            if doc is None:
+                logger.debug("can't delete document %r - it doesn't exist", rid)
+                return None
+            else:
+                logger.info("Deleting document %(_id)r (rev %(_rev)s)", doc)
+                return self.doc_model.db.deleteDoc(doc['_id'], doc['_rev'])
+
         def delete_docs(result, doc_type):
             docs = []
             to_del = [(row['id'], row['value']) for row in result['rows']]
             for id, rev in to_del:
                 docs.append({'_id': id, '_rev': rev, '_deleted': True})
             logger.info('deleting %d messages of type %r', len(docs), doc_type)
             return self.doc_model.db.updateDocuments(docs)
 
         def gen_deleting_docs(doc_types):
             for t in doc_types:
                 yield self.doc_model.open_view('raindrop!messages!by',
                                                'by_doc_type',
                                                key=t,
                             ).addCallback(delete_docs, t)
+            # and our work-queue docs - they aren't seen by the view, so
+            # just delete them by ID.
+            docs = []
+            for rid in ('workqueue!msg',):
+                yield self.doc_model.open_document_by_id(rid
+                        ).addCallback(delete_a_doc, rid)
 
         derived = set()
-        for from_type, to_info in self.forward_chain.iteritems():
+        for _, to_info in self.forward_chain.iteritems():
             if to_info is not None:
                 to_type, inst = to_info
                 derived.add(to_type)
         logger.info("deleting documents with types %r", derived)
         return self.coop.coiterate(gen_deleting_docs(derived))
 
     def start(self):
-        return self.coop.coiterate(self.gen_all_documents())
+        return self.coop.coiterate(self.gen_wq_tasks())
+
+    def gen_wq_tasks(self):
+        # first open our 'state' document
+        def _cb_update_state_doc(result, d):
+            if result is not None:
+                assert d['_id'] == result['_id'], result
+                d.update(result)
+            # else no doc exists - the '_id' remains in the default though.
+
+        state_doc = {'_id': 'workqueue!msg',
+                     'doc_type': "core/workqueue",
+                     'seq': 0}
+        yield self.doc_model.open_document_by_id(state_doc['_id'],
+                    ).addCallback(_cb_update_state_doc, state_doc)
+
+        logger.info("Work queue starting with sequence ID %d",
+                    state_doc['seq'])
+
+        logger.debug('opening by_seq view for work queue...')
+        # We process 5 records at a time, and fetch those 5 documents in
+        # the same request; this seems a reasonable compromize between
+        # efficiency and handling large docs (no attachments come back...)
+        # Another alternative worth benchmarking; use a much larger limit
+        # without returning the documents, in the hope we can deduce more
+        # from the view, avoiding the fetch of many docs at all...
+        self.queue_finished = False
+        self.state_doc_dirty = False
+        while not self.queue_finished:
+            yield self.doc_model.db.listDocsBySeq(limit=15,
+                                                  startkey=state_doc['seq'],
+                                                  include_docs=True,
+                        ).addCallback(self._cb_by_seq_opened, state_doc)
+        if self.state_doc_dirty:
+            logger.debug("flushing state doc at end of run...")
+            yield self.doc_model.create_ext_documents([state_doc]
+                    ).addCallback(self._cb_created_docs, state_doc
+                    )
+        else:
+            logger.debug("no need to flush state doc")
+
+    def _cb_by_seq_opened(self, result, state_doc):
+        rows = result['rows']
+        logger.debug('work queue has %d items to check.', len(rows))
+        if not rows:
+            # no rows left.  There is no guarantee our state doc will be
+            # the last one...
+            logger.info("work queue ran out of rows...")
+            # either way, we are done!
+            self.queue_finished = True
+            return
+        if len(rows)==1 and rows[0]['id'] == state_doc['_id']:
+            logger.info("Work queue got to the end!")
+            self.queue_finished = True
+            return
 
-    def gen_all_documents(self):
-        # check *every* doc in the DB.  This exists until we can derive
-        # a workqueue based on views.
-        while True:
-            self.num_this_process = 0
-            logger.debug('opening view for work queue...')
-            yield self.doc_model.open_view('raindrop!messages!workqueue',
-                                           'by_doc_roots',
-                                           group=True,
-                        ).addCallback(self._cb_roots_opened)
-            logger.info('created %d new documents', self.num_this_process)
-            if self.num_this_process == 0:
-                break
-        logger.debug('finally run out of documents.')
+        # Build a map of what we can deduce from the entire result set (eg,
+        # a single msg may have a number of doc-types in the rows)
+        known = {}
+        for row in rows:
+            rid = row['id']
+            last_seq = row['key']
+            if not rid.startswith("msg!"):
+                continue
+            value = row['value']
+            if value.get('deleted'):
+                logger.debug("skipping deleted message %r", rid)
+                continue
+            try:
+                _, proto_id, doc_type = rid.split("!")
+            except ValueError:
+                logger.warning("skipping malformed message ID %r", rid)
+                continue
+            known.setdefault(proto_id, []).append(
+                                (doc_type, value['rev'], row['doc']))
+
+        state_doc['seq'] = last_seq # only takes effect once saved...
+        logger.debug("Our %d rows gave info about %d messages",
+                     len(rows), len(known))
+
+        # Now take each doc in the list to its "next" type, but noting that
+        # as we have a number of docs ahead of us, one of them may satisfy
+        # us..
+        # ultimately we may need to generate lots of new docs from this one -
+        # but for now we have a single, simple chain moving forwards...
+        def gen_todo():
+            new_docs = []
+            for proto_id, infos in known.iteritems():
+                for (sq, (doc_type, rev, doc)) in enumerate(infos):
+                    try:
+                        xform_info = self.forward_chain[doc_type]
+                    except KeyError:
+                        logger.warning("Can't find transformer for message type %r - skipping %r",
+                                       doc_type, proto_id)
+                        continue
+                    if xform_info is None:
+                        logger.debug("Document %r is at its terminal type of %r",
+                                     proto_id, doc_type)
+                        continue
 
-    def _cb_roots_opened(self, result):
-        rows = result['rows']
-        logger.info('work queue has %d items to check.', len(rows))
-        def gen_todo(todo):
-            for row in todo:
-                did = row['key']
-                logger.debug("Finding last extension point for %s", did)
-                yield self.doc_model.get_last_ext_for_document(did
-                            ).addCallback(self._cb_got_doc_last_ext, did
-                            ).addErrback(self._eb_doc_failed
-                            )
+                    next_type, xformer = xform_info
+                    # See if the next_type is in the rows ahead of us in by_seq
+                    for (check_type, _, _) in infos[sq+1:]:
+                        if next_type == check_type:
+                            logger.debug("cool - _by_seq lookahead tells us the doc is already %r",
+                                         next_type)
+                            continue
+                    # OK - need to create this new type.
+                    logger.debug("calling %r to create a %s from %s", xformer,
+                                 next_type, doc['type'])
+                    yield defer.maybeDeferred(xformer.convert, doc
+                                    ).addCallback(self._cb_converted, next_type,
+                                                  proto_id, new_docs)
+            # It isn't unusual to see zero docs to process, and writing our
+            # new state doc each time is going to hurt perf a little.  The
+            # worst case is we die before flushing our state doc, but that's
+            # OK - we should be able to re-skip these messages quickly next
+            # time...
+            if new_docs:
+                # Now write the docs we created, plus our queue 'state' doc.
+                logger.info("work queue finished at sequence %d - %d new documents"
+                            , state_doc['seq'], len(new_docs))
+                new_docs.append(state_doc)
+                self.state_doc_dirty = False # it is about to be written.
+                yield self.doc_model.create_ext_documents(new_docs
+                        ).addCallback(self._cb_created_docs, state_doc
+                        )
+            else:
+                # don't bother writing the state doc now, but record it is
+                # dirty so if we get to the end we *do* write it.
+                self.state_doc_dirty = True
 
-        return self.coop.coiterate(gen_todo(rows))
+        return self.coop.coiterate(gen_todo())
+
+    def _cb_created_docs(self, result, state_doc):
+        # XXX - note that the _ids in this result can't be trusted if there
+        # were attachments :(  fortunately we don't care here...
+        # XXXXXXX - *sob* - we do care once we start hitting conflicts in
+        # messages ahead of us...
+        # XXX - this might not be necessary...
+        new_revs = result['new_revs']
+        last_rev = new_revs[-1]
+        assert last_rev['id'] == state_doc['_id'], last_rev
+        state_doc['_rev'] = last_rev['rev']
 
     def _eb_doc_failed(self, failure):
         logger.error("FAILED to pipe a doc: %s", failure)
         # and we will auto skip to the next doc...
 
-    def _cb_got_doc_last_ext(self, ext_info, rootdocid):
-        last_ext, docid = ext_info
-        if last_ext is None:
-            logger.debug("Document '%s' doesn't appear to be a message; skipping",
-                         rootdocid)
-            return None
-
-        docs_by_type = {}
-        new_docs = []
-        # take the message to it's terminal type in one hit
-
-        def gen_processes():
-            did_ext = last_ext
-            thisdocid = docid
-            while True:
-                logger.debug("Last extension for doc '%s' is '%s'", thisdocid, did_ext)
-                try:
-                    xform_info = self.forward_chain[did_ext]
-                except KeyError:
-                    logger.warning("Can't find transformer for message type %r - skipping %r",
-                                   did_ext, thisdocid)
-                    break
-                if xform_info is None:
-                    logger.debug("Document %r is at its terminal type of %r",
-                                 rootdocid, did_ext)
-                    break
-                dest_type, xformer = xform_info
-                logger.debug("Processing document %r - %s->%s", thisdocid,
-                            did_ext, dest_type)
-                if did_ext in docs_by_type:
-                    logger.debug("already have doc for type %r", did_ext)
-                    yield self._cb_got_last_doc(docs_by_type[did_ext],
-                                                rootdocid, xform_info,
-                                                docs_by_type, new_docs)
-                else:
-                    logger.debug("need to open doc for type %r (%r)", did_ext, thisdocid)
-                    yield self.doc_model.open_document(thisdocid
-                                ).addCallback(self._cb_got_last_doc, rootdocid,
-                                              xform_info, docs_by_type, new_docs
-                                )
-                ld = new_docs[-1]
-                assert ld['type'] == dest_type
-                did_ext = dest_type
-                thisdocid = ld['_id']
-
-        return self.coop.coiterate(gen_processes()
-                    ).addCallback(self._cb_docs_done, rootdocid, new_docs
-                    )
-
-    def _cb_docs_done(self, result, rootdocid, new_docs):
-        if new_docs:
-            self.doc_model.create_ext_documents(rootdocid, new_docs)
-
-    def _cb_got_last_doc(self, doc, rootdocid, xform_info, docs_by_type, new_docs):
-        assert doc is not None, "failed to locate the doc for %r" % rootdocid
-        dest_type, xformer = xform_info
-        logger.debug("calling %r to create a %s from %s", xformer, dest_type,
-                     doc['type'])
-        return defer.maybeDeferred(xformer.convert, doc
-                        ).addCallback(self._cb_converted, dest_type, rootdocid,
-                                      docs_by_type, new_docs)
-
-    def _cb_converted(self, new_doc, dest_type, rootdocid, docs_by_type, new_docs):
-        self.num_this_process += 1
+    def _cb_converted(self, new_doc, dest_type, rootdocid, new_docs):
         self.doc_model.prepare_ext_document(rootdocid, dest_type, new_doc)
-        logger.debug("converter returned new document type %r for %r: %r", dest_type, rootdocid, new_doc['_id'])
-        docs_by_type[dest_type] = new_doc
+        logger.debug("converter returned new document type %r for %r: %r",
+                     dest_type, rootdocid, new_doc['_id'])
         new_docs.append(new_doc)
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -62,17 +62,17 @@ class ImapClient(imap4.IMAP4Client):
 
       yield self.examine(name
                  ).addCallback(self._examineFolder, name
                  ).addErrback(self._cantExamineFolder, name)
     logger.debug('imap processing finished.')
 
   def _examineFolder(self, result, folder_path):
     logger.debug('Looking for messages already fetched for folder %s', folder_path)
-    return self.doc_model.open_view('raindrop!proto!imap', 'seen',
+    return self.doc_model.open_view('raindrop!proto!seen', 'imap',
                              startkey=[folder_path], endkey=[folder_path, {}]
               ).addCallback(self._fetchAndProcess, folder_path)
 
   def _fetchAndProcess(self, result, folder_path):
     # XXX - we should look at the flags and update the message if it's not
     # the same - later.
     rows = result['rows']
     logger.debug('_FetchAndProcess says we have seen %d items for %r',
@@ -120,18 +120,18 @@ class ImapClient(imap4.IMAP4Client):
       storage_key=[folder_name, uid],
       imap_flags=flags,
       )
     attachments = {'rfc822' : {'content_type': 'message',
                                'data': content,
                                }
                   }
     doc['_attachments'] = attachments
-    return self.doc_model.create_raw_document(self.account,
-                                              did, doc, 'proto/imap',
+    return self.doc_model.create_raw_documents(self.account,
+                                              [(did, doc, 'proto/imap')],
                 ).addCallback(self._cb_saved_message)
 
   def _cantGetMessage(self, failure):
     logger.error("Failed to fetch message: %s", failure)
 
   def _cb_saved_message(self, result):
     logger.debug("Saved message %s", result)
 
@@ -205,17 +205,17 @@ class ImapClientFactory(protocol.ClientF
     self.backoff = min(self.backoff * 2, 600) # magic number
 
 
 # A 'converter' - takes a proto/imap as input and creates a
 # 'raw/message/rfc822' as output
 class IMAPConverter(base.ConverterBase):
   def convert(self, doc):
     # I need the binary attachment.
-    return self.doc_model.open_document(doc['_id'], attachment="rfc822"
+    return self.doc_model.open_attachment(doc['_id'], "rfc822",
               ).addCallback(self._cb_got_attachment, doc)
 
   def _cb_got_attachment(self, content, doc):
     assert content, "can't locate attachment for %r" % doc['_id']
     # the 'rfc822' module knows what to do...
     return doc_from_bytes(content)
 
 
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -67,20 +67,20 @@ def simple_convert(str_val, typ):
 class TwistySkype(object):
     def __init__(self, account, conductor):
         self.account = account
         self.doc_model = account.doc_model # this is a little confused...
         self.conductor = conductor
         self.skype = Skype4Py.Skype()
 
     def get_docid_for_chat(self, chat):
-        return "skypechat-" + chat.Name.encode('utf8') # hrmph!
+        return chat.Name.encode('utf8') # hrmph!
 
     def get_docid_for_msg(self, msg):
-        return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
+        return "%s-%d" % (self.account.details['username'], msg._Id)
 
     def attach(self):
         logger.info("attaching to skype...")
         d = threads.deferToThread(self.skype.Attach)
         d.addCallback(self.attached)
         return d
 
     def attached(self, status):
@@ -106,17 +106,17 @@ class TwistySkype(object):
     def _cb_got_messages(self, messages, chat):
         logger.debug("chat '%s' has %d message(s) total; looking for new ones",
                      chat.Name, len(messages))
 
         # Finally got all the messages for this chat.  Execute a view to
         # determine which we have seen (note that we obviously could just
         # fetch the *entire* chats+msgs view once - but we do it this way on
         # purpose to ensure we remain scalable...)
-        return self.doc_model.open_view('raindrop!proto!skype', 'seen',
+        return self.doc_model.open_view('raindrop!proto!seen', 'skype',
                                         startkey=[chat.Name],
                                         endkey=[chat.Name, {}]
                     ).addCallback(self._cb_got_seen, chat, messages
                     )
 
     def _cb_got_seen(self, result, chat, messages):
         msgs_by_id = dict((m._Id, m) for m in messages)
         chatname = chat.Name
@@ -192,18 +192,19 @@ class TwistySkype(object):
 
 # A 'converter' - takes a proto/skype-msg as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appopriate)
 class SkypeConverter(base.ConverterBase):
     def convert(self, doc):
         # We need to open the 'chat' for this Message.  Clearly this is
         # going to be inefficient...
-        chat_id = "skypechat-" + doc['skype_chatname'].encode('utf8') # hrmph!
-        return self.doc_model.open_document(chat_id
+        proto_id = doc['skype_chatname'].encode('utf8') # hrmph!
+        return self.doc_model.open_document('msg', 'proto/skype-chat',
+                                            proto_id
                         ).addCallback(self.finish_convert, doc)
 
     def finish_convert(self, chat_doc, doc):
         if chat_doc is None:
             subject = "<failed to fetch skype chat!>"
         else:
             subject = chat_doc['skype_friendlyname']
         return {'from': ['skype', doc['skype_from_handle']],
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -14,56 +14,42 @@ class TestMessageProvider(object):
 
     def sync_generator(self):
         num_docs = int(self.account.details.get('num_test_docs', 5))
         logger.info("Creating %d test documents", num_docs)
         for i in xrange(num_docs):
             yield self.check_test_message(i)
         if self.bulk_docs:
             yield self.doc_model.create_raw_documents(self.account,
-                                                      self.bulk_docs)
+                                                      self.bulk_docs
+                    ).addCallback(self.saved_bulk_messages, len(self.bulk_docs),
+                    )
 
     def attach(self):
         logger.info("preparing to synch test messages...")
         self.bulk_docs = [] # anything added here will be done in bulk
         # use the cooperator for testing purposes.
         return self.conductor.coop.coiterate(self.sync_generator())
 
     def check_test_message(self, i):
         logger.debug("seeing if message with ID %d exists", i)
-        return self.doc_model.open_document("test.message.%d" % i,
+        return self.doc_model.open_document("msg", str(i), "proto/test"
                         ).addCallback(self.process_test_message, i)
 
     def process_test_message(self, existing_doc, doc_num):
         if existing_doc is None:
-            doc_type = 'proto/test'
             doc = dict(
               storage_key=doc_num,
               )
-            did = "test.message.%d" % doc_num
-            if doc_num % 2 == 0:
-                logger.info("Queueing test message with ID %d for bulk update",
-                            doc_num)
-                self.bulk_docs.append((did, doc, doc_type))
-                return None # we will get these later...
-            else:
-                logger.info("Creating new test message with ID %d", doc_num)
-                return self.doc_model.create_raw_document(self.account,
-                                                          did, doc, doc_type
-                            ).addCallback(self.saved_message, doc
-                            )
+            self.bulk_docs.append((str(doc_num), doc, 'proto/test'))
         else:
             logger.info("Skipping test message with ID %d - already exists",
                         doc_num)
             # we are done.
 
-    def saved_message(self, result, doc):
-        logger.debug("Finished saving test message %r", result)
-        # done
-
     def saved_bulk_messages(self, result, n):
         logger.debug("Finished saving %d test messages in bulk", n)
         # done
 
 # A 'converter' - takes a proto/test as input and creates a
 # 'raw/message/rfc822' as output.
 class TestConverter(base.ConverterBase):
     def convert(self, doc):
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -20,22 +20,16 @@ from ..proc import base
 # being completed when they say they are.)
 
 # So for now we are going with the blocking twitter package used via
 # deferToThread for all blocking operations...
 import twitter
 
 logger = logging.getLogger(__name__)
 
-TWEET_PROPS = """
-        created_at created_at_in_seconds favorited in_reply_to_screen_name
-        in_reply_to_user_id in_reply_to_status_id truncated
-        source id text relative_created_at user
-""".split()
-
 
 class TwitterProcessor(object):
     def __init__(self, account, conductor):
         self.account = account
         self.doc_model = account.doc_model # this is a little confused...
         self.conductor = conductor
         self.twit = None
         self.seen_tweets = None
@@ -47,90 +41,100 @@ class TwitterProcessor(object):
         return threads.deferToThread(twitter.Api,
                                   username=username, password=pw
                     ).addCallback(self.attached
                     )
 
     def attached(self, twit):
         logger.info("attached to twitter - fetching friends")
         self.twit = twit
+
         # build the list of all users we will fetch tweets from.
         return threads.deferToThread(self.twit.GetFriends
                   ).addCallback(self._cb_got_friends)
 
     def _cb_got_friends(self, friends):
-        return self.conductor.coop.coiterate(self.gen_friends_info(friends))
-        
-    def gen_friends_info(self, friends):
+        # Our 'seen' view is a reduce view, so we only get one result per
+        # friend we pass in.  It is probably safe to assume we don't have
+        # too many friends that we can't get them all in one hit...
+        return self.doc_model.open_view('raindrop!proto!seen', 'twitter',
+                                        group=True
+                  ).addCallback(self._cb_got_seen, friends)
+
+    def _cb_got_seen(self, result, friends):
+        # result -> [{'key': '12449', 'value': 1371372980}, ...]
+        # turn it into a map.
+        rows = result['rows']
+        last_seen_ids = dict((int(r['key']), int(r['value'])) for r in rows)
+        return self.conductor.coop.coiterate(
+                    self.gen_friends_info(friends, last_seen_ids))
+
+    def gen_friends_info(self, friends, last_seen_ids):
         logger.info("apparently I've %d friends", len(friends))
-        def do_fid(fid):
-            return threads.deferToThread(self.twit.GetUserTimeline, fid
-                                ).addCallback(self.got_friend_timeline, fid
-                                ).addErrback(self.err_friend_timeline, fid
-                                ).addCallback(self.finished_friend
-                                )
+        def do_friend(friend):
+            last_this = last_seen_ids.get(friend.id)
+            logger.debug("friend %r (%s) has latest tweet id of %s",
+                         friend.screen_name, friend.id, last_this)
+            return threads.deferToThread(
+                    self.twit.GetUserTimeline, friend.id, since_id=last_this
+                            ).addCallback(self._cb_got_friend_timeline, friend
+                            ).addErrback(self.err_friend_timeline, friend
+                            )
 
-        # None means 'me'
-        yield do_fid(None)
+        # None means 'me' - but we don't really want to use 'None'.  This is
+        # the only way I can work out how to get ourselves!
+        me = self.twit.GetUser(self.twit._username)
+        yield do_friend(me)
         for f in friends:
-            yield do_fid(f.screen_name)
+            yield do_friend(f)
 
         logger.debug("Finished friends")
 
-    def finished_friend(self, result):
-        logger.debug("finished friend: %s", result)
-
-    def err_friend_timeline(self, failure, fid):
-        logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
-
-    def got_friend_timeline(self, timeline, fid):
-        logger.debug("Friend %r has %d items in their timeline", fid,
-                     len(timeline))
-        return self.conductor.coop.coiterate(
-                    self.gen_friend_timeline(timeline, fid))
+    def err_friend_timeline(self, failure, friend):
+        logger.error("Failed to fetch timeline for '%s': %s",
+                     friend.screen_name, failure)
 
-    def gen_friend_timeline(self, timeline, fid):
+    def _cb_got_friend_timeline(self, timeline, friend):
+        return self.conductor.coop.coiterate(
+                    self.gen_friend_timeline(timeline, friend))
+
+    def gen_friend_timeline(self, timeline, friend):
         for tweet in timeline:
-            logger.debug("seeing if tweet %r exists", tweet.id)
-            docid = "tweet.%d" % (tweet.id,)
-            yield self.doc_model.open_document(docid,
-                            ).addCallback(self.maybe_process_tweet, docid, tweet)
-
-    def maybe_process_tweet(self, existing_doc, docid, tweet):
-        if existing_doc is None:
+            tid = tweet.id
             # put the 'raw' document object together and save it.
-            logger.info("New tweet '%s...' (%s)", tweet.text[:25], tweet.id)
+            logger.info("New tweet '%s...' (%s)", tweet.text[:25], tid)
             # create the couch document for the tweet itself.
             doc = {}
-            for name in TWEET_PROPS:
+            for name, val in tweet.AsDict().iteritems():
                 val = getattr(tweet, name)
                 # simple hacks - users just become the user ID.
                 if isinstance(val, twitter.User):
                     val = val.id
                 doc['twitter_'+name.lower()] = val
-            return self.doc_model.create_raw_document(
-                            self.account, docid, doc, 'proto/twitter')
-        else:
-            # Must be a seen one.
-            logger.debug("Skipping seen tweet '%s'", tweet.id)
+            # not clear if the user ID is actually needed.
+            proto_id = "%s#%s" % (tweet.user.id, tid)
+            yield self.doc_model.create_raw_documents(
+                            self.account, [(proto_id, doc, 'proto/twitter')])
 
 
 # A 'converter' - takes a proto/twitter as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appropriate)
 class TwitterConverter(base.ConverterBase):
     re_tags = re.compile(r'#(\w+)')    
     def convert(self, doc):
         # for now, if a 'proto' can detect tags, it writes them directly
         # to a 'tags' attribute.
         body = doc['twitter_text']
         tags = self.re_tags.findall(body)
         return {'from': ['twitter', doc['twitter_user']],
                 'body': body,
                 'body_preview': body[:128],
                 'tags': tags,
-                'timestamp': int(doc['twitter_created_at_in_seconds'])
+                # We don't have 'in seconds' seeing we came from AsDict() -
+                # but are 'seconds' usable for a timestamp?
+                #'timestamp': int(doc['twitter_created_at_in_seconds'])
                 }
 
 
 class TwitterAccount(base.AccountBase):
   def startSync(self, conductor):
     return TwitterProcessor(self, conductor).attach()