hooray for generators/cooperators, plus more batch updates twisty
authorMark Hammond <mhammond@skippinet.com.au>
Sun, 29 Mar 2009 00:50:49 +1100
branchtwisty
changeset 131 afbc52ca5ca1597612b5a25588c541da0aa88ab8
parent 130 170cf0cb11a2234bb68fbc11933bc7913a0ca503
child 132 6ad97bca58340ae42fda3081394719e7d874d9f2
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
hooray for generators/cooperators, plus more batch updates
server/python/junius/model.py
server/python/junius/pipeline.py
server/python/junius/proto/imap.py
server/python/junius/proto/skype.py
server/python/junius/proto/test/__init__.py
server/python/junius/proto/twitter.py
--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -192,28 +192,70 @@ class DocumentModel(object):
     def _cb_doc_opened(self, result):
         if isinstance(result, Failure):
             result.trap(twisted.web.error.Error)
             if result.value.status != '404': # not found
                 result.raiseException()
             result = None # indicate no doc exists.
         return result
 
-    def create_raw_document(self, docid, doc, doc_type, account, attachments=None):
-        assert '_id' not in doc # that isn't how you specify the ID.
+    def _prepare_raw_doc(self, account, docid, doc, doc_type):
+        assert '_id' not in doc, doc # that isn't how you specify the ID.
         assert '!' not in docid, docid # these chars are special.
         assert 'raindrop_account' not in doc, doc # we look after that!
         doc['raindrop_account'] = account.details['_id']
 
         assert 'type' not in doc, doc # we look after that!
         doc['type'] = doc_type
 
         assert 'raindrop_seq' not in doc, doc # we look after that!
         doc['raindrop_seq'] = get_seq()
 
+    def create_raw_documents(self, account, doc_infos):
+        """A high-performance version of 'create_raw_document', but doesn't
+        support attachments yet."""
+        docs = []
+        dids = [] # purely for the log :(
+        logger.debug('create_raw_documents preparing %d docs', len(doc_infos))
+        for (docid, doc, doc_type) in doc_infos:
+            self._prepare_raw_doc(account, docid, doc, doc_type)
+            # in a bulk-update, the ID is in the doc itself.
+            doc['_id'] = docid
+            docs.append(doc)
+            dids.append(docid)
+        logger.debug('create_raw_documents saving docs %s', dids)
+        return self.db.updateDocuments(docs
+                    ).addCallback(self._cb_saved_multi_docs,
+                    ).addErrback(self._cb_save_multi_failed,
+                    )
+
+    def _cb_saved_multi_docs(self, result):
+        # result: {'ok': True, 'new_revs': [{'rev': 'xxx', 'id': '...'}, ...]}
+        logger.debug("saved multiple docs with result=%(ok)s", result)
+        return result
+
+    def _cb_save_multi_failed(self, failure):
+        logger.error("Failed to save lotsa docs: %s", failure)
+        failure.raiseException()
+
+    def create_raw_document(self, account, docid, doc, doc_type, attachments=None):
+        self._prepare_raw_doc(account, docid, doc, doc_type)
+        # XXX - attachments need more thought - ultimately we need to be able
+        # to 'push' them via a generator or similar to avoid reading them
+        # entirely in memory.  Further, we need some way of the document
+        # knowing if the attachment failed (or vice-versa) given we have
+        # no transactional semantics.
+        # fixup attachments.
+        try:
+            attachments = doc['_attachments']
+            # nuke attachments specified
+            del doc['_attachments']
+        except KeyError:
+            attachments = None
+
         # save the document.
         logger.debug('create_raw_document saving doc %r', docid)
         qid = quote_id(docid)
         return self.db.saveDoc(doc, docId=qid,
                     ).addCallback(self._cb_saved_document, 'raw-message', docid
                     ).addErrback(self._cb_save_failed, 'raw-message', docid
                     ).addCallback(self._cb_save_attachments, attachments, qid
                     )
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -1,11 +1,11 @@
 # This is the raindrop pipeline; it moves messages from their most raw
 # form to their most useful form.
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor, task
 import logging
 
 logger = logging.getLogger(__name__)
 
 # Simple forward chaining.
 chain = [
     # from_type, to_type, transformer)
     ('proto/test',         'raw/message/rfc822',
@@ -29,77 +29,69 @@ chain = [
     ('anno/tags', None, None),
 ]
 
 
 class Pipeline(object):
     def __init__(self, doc_model):
         self.doc_model = doc_model
         self.forward_chain = {}
+        # use a cooperator to do the work via a generator.
+        # XXX - should we own the cooperator or use our parents?
+        self.coop = task.Cooperator()
         for from_type, to_type, xformname in chain:
             if xformname:
                 root, tail = xformname.rsplit('.', 1)
                 try:
                     mod = __import__(root, fromlist=[tail])
                     klass = getattr(mod, tail)
                     inst = klass(doc_model)
                 except:
                     logger.exception("Failed to load extension %r", xformname)
                 else:
                     self.forward_chain[from_type] = (to_type, inst)
             else:
                 assert not to_type, 'no xformname must mean no to_type'
                 self.forward_chain[from_type] = None
 
     def start(self):
-        return defer.maybeDeferred(self.process_all_documents)
+        return self.coop.coiterate(self.gen_all_documents())
 
-    def process_all_documents(self):
+    def gen_all_documents(self):
         # check *every* doc in the DB.  This exists until we can derive
         # a workqueue based on views.
         self.num_this_process = 0
-        return self.doc_model.db.openView('raindrop!messages!by',
-                                          'by_doc_roots',
-                                          group=True,
-                    ).addCallback(self._cb_roots_opened)
-
-    def _cb_maybe_reprocess_all_documents(self, result):
-        if self.num_this_process:
-            logger.debug('pipeline processed %d documents last time; trying again',
-                         self.num_this_process)
-            return self.start()
-        logger.info('pipeline is finished.')
+        while True:
+            logger.debug('opening view for work queue...')
+            yield self.doc_model.db.openView('raindrop!messages!by',
+                                             'by_doc_roots',
+                                             group=True,
+                        ).addCallback(self._cb_roots_opened)
+            logger.debug('this time we did %d documents', self.num_this_process)
+            if self.num_this_process == 0:
+                break
+        logger.debug('finally run out of documents.')
 
     def _cb_roots_opened(self, rows):
-        self.remaining_roots = [r['key'] for r in rows]
-        return self._process_next_root()
+        logger.info('work queue has %d items to process.', len(rows))
+        def gen_todo(todo):
+            for row in todo:
+                did = row['key']
+                logger.debug("Finding last extension point for %s", did)
+                yield self.doc_model.get_last_ext_for_document(did
+                            ).addCallback(self._cb_got_doc_last_ext, did
+                            ).addErrback(self._eb_doc_failed
+                            )
 
-    def _process_next_root(self):
-        if not self.remaining_roots:
-            logger.debug("Finished document roots")
-            d = defer.Deferred()
-            d.addCallback(self._cb_maybe_reprocess_all_documents)
-            d.callback(None)
-            return d
-
-        did = self.remaining_roots.pop()
-        logger.debug("Finding last extension point for %s", did)
-        return self.doc_model.get_last_ext_for_document(did
-                    ).addCallback(self._cb_got_doc_last_ext, did
-                    ).addErrback(self._eb_doc_failed
-                    ).addCallback(self._cb_did_doc_root
-                    )
+        return self.coop.coiterate(gen_todo(rows))
 
     def _eb_doc_failed(self, failure):
         logger.error("FAILED to pipe a doc: %s", failure)
         # and we will auto skip to the next doc...
 
-    def _cb_did_doc_root(self, result):
-        return reactor.callLater(0, self._process_next_root)
-
     def _cb_got_doc_last_ext(self, ext_info, rootdocid):
         last_ext, docid = ext_info
         if last_ext is None:
             logger.debug("Document '%s' doesn't appear to be a message; skipping",
                          rootdocid)
             return None
         logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
         try:
@@ -107,16 +99,17 @@ class Pipeline(object):
         except KeyError:
             logger.warning("Can't find transformer for message type %r - skipping %r",
                            last_ext, docid)
             return None
         if xform_info is None:
             logger.debug("Document %r is already at its terminal type of %r",
                          rootdocid, last_ext)
             return None
+        logger.info("Processing document %r", docid)
         return self.doc_model.open_document(docid
                     ).addCallback(self._cb_got_last_doc, rootdocid, xform_info
                     )
 
     def _cb_got_last_doc(self, doc, rootdocid, xform_info):
         assert doc is not None, "failed to locate the doc for %r" % rootdocid
         dest_type, xformer = xform_info
         logger.debug("calling %r to create a %s from %s", xformer, dest_type,
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,9 +1,9 @@
-from twisted.internet import protocol, ssl, defer, error
+from twisted.internet import protocol, ssl, defer, error, task
 from twisted.mail import imap4
 import logging
 
 from ..proc import base
 from ..model import get_db
 from ..ext.message.rfc822 import doc_from_bytes
 
 brat = base.Rat
@@ -12,26 +12,22 @@ logger = logging.getLogger(__name__)
 
 
 class ImapClient(imap4.IMAP4Client):
   '''
   Much of our logic here should perhaps be in another class that holds a
   reference to the IMAP4Client instance subclass.  Consider refactoring if we
   don't turn out to benefit from the subclassing relationship.
   '''
-  def finished(self, result):
-    # See bottom of file - it would be good to remove this...
-    logger.info("Finished synchronizing IMAP folders")
-    self.conductor.on_synch_finished(self.account, result)
-
   def serverGreeting(self, caps):
     logger.debug("IMAP server greeting: capabilities are %s", caps)
+    self.coop = task.Cooperator()
     return self._doAuthenticate(
             ).addCallback(self._reqList
-            ).addBoth(self.finished)
+            )
 
   def _doAuthenticate(self):
     if self.account.details.get('crypto') == 'TLS':
       d = self.startTLS(self.factory.ctx)
       d.addErrback(self.accountStatus,
                    brat.SERVER, brat.BAD, brat.CRYPTO, brat.PERMANENT)
       d.addCallback(self._doLogin)
     else:
@@ -45,130 +41,112 @@ class ImapClient(imap4.IMAP4Client):
     return self.login(self.account.details['username'],
                       self.account.details['password'])
 
   def _reqList(self, *args, **kwargs):
     self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
     return self.list('', '*').addCallback(self._procList)
 
   def _procList(self, result, *args, **kwargs):
-    # As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
-    # We keep a 'stack' of items left to process in an instance variable...
-    # (*sob* - but its not clear why this doesn't suffer from
-    # death-by-recursion like the other impls can demonstrate? Even though we
-    # use a 'stack' of items left to process, each call to
-    # _process_next_folder is recursive?
-    self.folder_infos = result[:]
-    return self._process_next_folder()
+    return self.coop.coiterate(self.gen_folder_list(result))
 
-  def _process_next_folder(self):
-    if not self.folder_infos:
-      # yay - all done!
-      return
+  def gen_folder_list(self, result):
+    for flags, delim, name in result:
+      logger.debug('Processing folder %s (flags=%s)', name, flags)
+      if r"\Noselect" in flags:
+        logger.debug("'%s' is unselectable - skipping", name)
+        continue
 
-    flags, delim, name = self.folder_infos.pop()
-    self.current_folder = name
-    logger.debug('Processing folder %s (flags=%s)', name, flags)
-    if r"\Noselect" in flags:
-      logger.debug("'%s' is unselectable - skipping", name)
-      return self._process_next_folder()
+      # XXX - sob - markh sees:
+      # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
+      # although we obviously have seen them already in the other folders.
+      if name and name.startswith('['):
+        logger.info("'%s' appears special -skipping", name)
+        continue
 
-    # XXX - sob - markh sees:
-    # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
-    # although we obviously have seen them already in the other folders.
-    if name and name.startswith('['):
-      logger.info("'%s' appears special -skipping", name)
-      return self._process_next_folder()
-
-    return self.examine(name
+      yield self.examine(name
                  ).addCallback(self._examineFolder, name
                  ).addErrback(self._cantExamineFolder, name)
+    logger.debug('imap processing finished.')
+    yield self.conductor.on_synch_finished(self.account, None) # XXX - must die!
 
   def _examineFolder(self, result, folder_path):
     logger.debug('Looking for messages already fetched for folder %s', folder_path)
     return get_db().openView('raindrop!proto!imap', 'seen',
                              startkey=[folder_path], endkey=[folder_path, {}]
               ).addCallback(self._fetchAndProcess, folder_path)
 
   def _fetchAndProcess(self, rows, folder_path):
     # XXX - we should look at the flags and update the message if it's not
     # the same - later.
+    logger.debug('_FetchAndProcess says we have seen %d items for %r',
+                 len(rows), folder_path)
     seen_uids = set(row['key'][1] for row in rows)
     # now build a list of all message currently in the folder.
     allMessages = imap4.MessageSet(1, None)
     return self.fetchUID(allMessages, True).addCallback(
             self._gotUIDs, folder_path, seen_uids)
 
   def _gotUIDs(self, uidResults, name, seen_uids):
     all_uids = set(int(result['UID']) for result in uidResults.values())
-    self.messages_remaining = all_uids - seen_uids
+    remaining = all_uids - seen_uids
     logger.info("Folder %s has %d messages, %d new", name,
-                len(all_uids), len(self.messages_remaining))
-    return self._process_next_message()
-
-  def _process_next_message(self):
-    logger.debug("processNextMessage has %d messages to go...",
-                 len(self.messages_remaining))
-    if not self.messages_remaining:
-      return self._process_next_folder()
+                len(all_uids), len(remaining))
+    def gen_remaining(folder_name, reming):
+      for uid in reming:
+        # XXX - we need something to make this truly unique.
+        did = "%s#%d" % (folder_name, uid)
+        logger.debug("new imap message %r - fetching content", did)
+        # grr - we have to get the rfc822 body and the flags in separate requests.
+        to_fetch = imap4.MessageSet(uid)
+        yield self.fetchMessage(to_fetch, uid=True
+                    ).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
+                    )
+    return self.coop.coiterate(gen_remaining(name, remaining))
 
-    uid = self.messages_remaining.pop()
-    # XXX - we need something to make this truly unique.
-    did = "%s#%d" % (self.current_folder, uid)
-    logger.debug("new imap message %r - fetching content", did)
-    # grr - we have to get the rfc822 body and the flags in separate requests.
-    to_fetch = imap4.MessageSet(uid)
-    return self.fetchMessage(to_fetch, uid=True
-                ).addCallback(self._cb_got_body, uid, did, to_fetch
-                )
-
-  def _cb_got_body(self, result, uid, did, to_fetch):
+  def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
     _, result = result.popitem()
     content = result['RFC822']
     # grr - get the flags
     logger.debug("message %r has %d bytes; fetching flags", did, len(content))
     return self.fetchFlags(to_fetch, uid=True
-                ).addCallback(self._cb_got_flags, uid, did, content
+                ).addCallback(self._cb_got_flags, uid, did, folder_name, content
                 ).addErrback(self._cantGetMessage
                 )
 
-  def _cb_got_flags(self, result, uid, did, content):
+  def _cb_got_flags(self, result, uid, did, folder_name, content):
     logger.debug("flags are %s", result)
     assert len(result)==1, result
     _, result = result.popitem()
     flags = result['FLAGS']
     # put the 'raw' document object together and save it.
     doc = dict(
-      storage_key=[self.current_folder, uid],
+      storage_key=[folder_name, uid],
       imap_flags=flags,
       )
     attachments = {'rfc822' : {'content_type': 'message',
                                'data': content,
                                }
                   }
-    return self.doc_model.create_raw_document(did, doc, 'proto/imap',
-                                              self.account,
+    return self.doc_model.create_raw_document(self.account,
+                                              did, doc, 'proto/imap',
                                               attachments=attachments,
                 ).addCallback(self._cb_saved_message)
-    
+
   def _cantGetMessage(self, failure):
     logger.error("Failed to fetch message: %s", failure)
-    return self._process_next_message()
 
   def _cb_saved_message(self, result):
     logger.debug("Saved message %s", result)
-    return self._process_next_message()
 
   def _cantSaveDocument(self, failure):
     logger.error("Failed to save message: %s", failure)
-    return self._process_next_message()
 
   def _cantExamineFolder(self, failure, name, *args, **kw):
     logger.warning("Failed to examine folder '%s': %s", name, failure)
-    return self._process_next_folder()
 
   def accountStatus(self, result, *args):
     return self.account.reportStatus(*args)
 
 
 class ImapClientFactory(protocol.ClientFactory):
   protocol = ImapClient
 
@@ -242,11 +220,8 @@ class IMAPConverter(base.ConverterBase):
 class IMAPAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
   def startSync(self, conductor, doc_model):
     self.factory = ImapClientFactory(self, conductor, doc_model)
     self.factory.connect()
-    # XXX - wouldn't it be good to return a deferred here, so the conductor
-    # can reliably wait for the deferred to complete, rather than forcing
-    # each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,16 +1,16 @@
 #!/usr/bin/env python
 '''
 Fetch skype contacts and chats.
 '''
 import logging
 
 import twisted.python.log
-from twisted.internet import defer, threads
+from twisted.internet import defer, threads, task
 
 from ..proc import base
 
 import Skype4Py
 
 logger = logging.getLogger(__name__)
 
 # These are the raw properties we fetch from skype.
@@ -64,21 +64,21 @@ def simple_convert(str_val, typ):
     return typ(str_val)
 
 
 class TwistySkype(object):
     def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
         self.doc_model = doc_model
+        # use a cooperator to do the work via a generator.
+        # XXX - should we own the cooperator or use our parents?
+        self.coop = task.Cooperator()
         self.skype = Skype4Py.Skype()
 
-    def finished(self, result):
-        return self.conductor.on_synch_finished(self.account, result)
-
     def get_docid_for_chat(self, chat):
         return "skypechat-" + chat.Name.encode('utf8') # hrmph!
 
     def get_docid_for_msg(self, msg):
         return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
 
     def attach(self):
         logger.info("attaching to skype...")
@@ -89,133 +89,115 @@ class TwistySkype(object):
     def attached(self, status):
         logger.info("attached to skype - getting chats")
         return threads.deferToThread(self.skype._GetChats
                     ).addCallback(self._cb_got_chats
                     )
 
     def _cb_got_chats(self, chats):
         logger.debug("skype has %d chat(s) total", len(chats))
-        # 'chats' is a tuple
-        self.remaining_chats = list(chats)
-        return self.process_next_chat()
-
-
-    def process_next_chat(self):
-        if not self.remaining_chats:
-            logger.debug("finished processing chats")
-            self.finished(None) # not sure this is the right place...
-            return
-
-        chat = self.remaining_chats.pop()
         # fetch all the messages (future optimization; all we need are the
         # IDs, but we suffer from creating an instance wrapper for each item.
         # Sadly the skype lib doesn't offer a clean way of doing this.)
-        return threads.deferToThread(chat._GetMessages
-                    ).addCallback(self._cb_got_messages, chat,
-                    )
+        def gen_chats(chats):
+            for chat in chats:
+                yield threads.deferToThread(chat._GetMessages
+                        ).addCallback(self._cb_got_messages, chat,
+                        )
+            logger.info("nothing left to do - finished!")
+            # XXX - on_synch_finished must die - caller knows this!!!
+            yield self.conductor.on_synch_finished(self.account, None)
+
+        return self.coop.coiterate(gen_chats(chats))
 
     def _cb_got_messages(self, messages, chat):
         logger.debug("chat '%s' has %d message(s) total; looking for new ones",
                      chat.Name, len(messages))
 
         # Finally got all the messages for this chat.  Execute a view to
         # determine which we have seen (note that we obviously could just
-        # fetch the *entire* view once - but we do it this way on purpose
-        # to ensure we remain scalable...)
+        # fetch the *entire* chats+msgs view once - but we do it this way on
+        # purpose to ensure we remain scalable...)
         return self.doc_model.db.openView('raindrop!proto!skype', 'seen',
                                           startkey=[chat.Name],
                                           endkey=[chat.Name, {}]
                     ).addCallback(self._cb_got_seen, chat, messages
                     )
 
     def _cb_got_seen(self, result, chat, messages):
-        self.msgs_by_id = dict((m._Id, m) for m in messages)
-        self.current_chat = chat
+        msgs_by_id = dict((m._Id, m) for m in messages)
         chatname = chat.Name
         # The view gives us a list of [chat_name, msg_id], where msg_id is
         # None if we've seen the chat itself.  Create a set of messages we
         # *haven't* seen - including the [chat_name, None] entry if applic.
         all_keys = [(chatname, None)]
-        all_keys.extend((chatname, mid) for mid in self.msgs_by_id.keys())
+        all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
         seen_chats = set([tuple(row['key']) for row in result])
-        self.remaining_items = set(all_keys)-set(seen_chats)
+        add_bulk = [] # we bulk-update these at the end!
+        remaining = set(all_keys)-set(seen_chats)
         # we could just process the empty list as normal, but the logging of
         # an info when we do have items is worthwhile...
-        if not self.remaining_items:
+        if not remaining:
             logger.debug("Chat %r has no new items to process", chatname)
-            self.msgs_by_id = None
-            return self.process_next_chat()
+            return None
         # we have something to do...
         logger.info("Chat %r has %d items to process", chatname,
-                    len(self.remaining_items))
+                    len(remaining))
         logger.debug("we've already seen %d items from this chat",
                      len(seen_chats))
-        return self.process_next_item()
-
-    def process_next_item(self):
-        if not self.remaining_items:
-            self.msgs_by_id = None
-            logger.debug("finished processing messages for this chat")
-            return self.conductor.reactor.callLater(0, self.process_next_chat)
+        return self.coop.coiterate(self.gen_items(chat, remaining, msgs_by_id))
 
-        chatname, msgid = self.remaining_items.pop()
-        chat = self.current_chat
-        if msgid is None:
-            # we haven't seen the chat itself - do that.
-            logger.debug("Creating new skype chat %r", chat.Name)
-            # make a 'deferred list' to fetch each property one at a time.
-            ds = [threads.deferToThread(chat._Property, p)
-                  for p, _ in CHAT_PROPS]
-            ret = defer.DeferredList(ds
-                        ).addCallback(self._cb_got_chat_props, chat)
-        else:
-            msg = self.msgs_by_id[msgid]
-            # A new msg in this chat.
-            logger.debug("New skype message %d", msg._Id)
-            # make a 'deferred list' to fetch each property one at a time.
-            ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
-            ret = defer.DeferredList(ds
-                        ).addCallback(self._cb_got_msg_props, chat, msg)
+    def gen_items(self, chat, todo, msgs_by_id):
+        tow = []
+        for _, msgid in todo:
+            if msgid is None:
+                # we haven't seen the chat itself - do that.
+                logger.debug("Creating new skype chat %r", chat.Name)
+                # make a 'deferred list' to fetch each property one at a time.
+                ds = [threads.deferToThread(chat._Property, p)
+                      for p, _ in CHAT_PROPS]
+                yield defer.DeferredList(ds
+                            ).addCallback(self._cb_got_chat_props, chat, tow)
+            else:
+                msg = msgs_by_id[msgid]
+                # A new msg in this chat.
+                logger.debug("New skype message %d", msg._Id)
+                # make a 'deferred list' to fetch each property one at a time.
+                ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+                yield defer.DeferredList(ds
+                            ).addCallback(self._cb_got_msg_props, chat, msg, tow)
 
-        ret.addCallback(self._cb_doc_saved)
-        return ret
+        if tow:
+            yield self.doc_model.create_raw_documents(self.account, tow)
+        logger.debug("finished processing chat %r", chat.Name)
 
-    def _cb_got_chat_props(self, results, chat):
+    def _cb_got_chat_props(self, results, chat, pending):
         logger.debug("got chat %r properties: %s", chat.Name, results)
         docid = self.get_docid_for_chat(chat)
         doc ={}
         for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
 
         # 'Name' is a special case that doesn't come via a prop.  We use
         # 'chatname' as that is the equiv attr on the messages themselves.
         doc['skype_chatname'] = chat.Name
-        return self.doc_model.create_raw_document(docid, doc,
-                                                  'proto/skype-chat',
-                                                  self.account
-                    )
+        pending.append((docid, doc, 'proto/skype-chat'))
 
-    def _cb_got_msg_props(self, results, chat, msg):
+    def _cb_got_msg_props(self, results, chat, msg, pending):
+        logger.debug("got message properties for %s", msg._Id)
         doc = {}
         for (name, typ), (ok, val) in zip(MSG_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
         doc['skype_id'] = msg._Id
         # we include the skype username with the ID as they are unique per user.
         docid = self.get_docid_for_msg(msg)
-        return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
-                                                  self.account
-                    )
+        pending.append((docid, doc, 'proto/skype-msg'))
 
-    def _cb_doc_saved(self, result):
-        # the 'pattern' we copied from the imap sample code calls for us
-        # to recurse - but that can recurse too far...
-        return self.conductor.reactor.callLater(0, self.process_next_item)
 
 # A 'converter' - takes a proto/skype-msg as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appopriate)
 class SkypeConverter(base.ConverterBase):
     def convert(self, doc):
         # We need to open the 'chat' for this Message.  Clearly this is
         # going to be inefficient...
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,60 +1,76 @@
 # This is an implementation of a 'test' protocol.
 import logging
-from twisted.internet import defer, error
+from twisted.internet import defer, error, task
 
 logger = logging.getLogger(__name__)
 
 from ...proc import base
 
 class TestMessageProvider(object):
     def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
         self.doc_model = doc_model
 
-    def attach(self):
-        logger.info("preparing to synch test messages...")
-        # Is a 'DeferredList' more appropriate here?
-        d = defer.Deferred()
+    def sync_generator(self):
         num_docs = int(self.account.details.get('num_test_docs', 5))
         logger.info("Creating %d test documents", num_docs)
         for i in xrange(num_docs):
-            d.addCallback(self.check_test_message, i)
-        d.addCallback(self.finished)
-        return d.callback(None)
+            yield self.check_test_message(i)
+        if self.bulk_docs:
+            yield self.doc_model.create_raw_documents(self.account,
+                                                      self.bulk_docs)
+        yield self.conductor.on_synch_finished(self.account, None) # must die!
 
-    def check_test_message(self, result, i):
+    def attach(self):
+        logger.info("preparing to synch test messages...")
+        self.bulk_docs = [] # anything added here will be done in bulk
+        # Experimenting with a 'cooperator'...
+        coop = task.Cooperator()
+        gen = self.sync_generator()
+        return coop.coiterate(gen)
+
+    def check_test_message(self, i):
         logger.debug("seeing if message with ID %d exists", i)
         return self.doc_model.open_document("test.message.%d" % i,
                         ).addCallback(self.process_test_message, i)
 
     def process_test_message(self, existing_doc, doc_num):
         if existing_doc is None:
-            logger.info("Creating new test message with ID %d", doc_num)
+            doc_type = 'proto/test'
             doc = dict(
               storage_key=doc_num,
               )
             did = "test.message.%d" % doc_num
-            return self.doc_model.create_raw_document(did, doc, 'proto/test',
-                                                      self.account
-                        ).addCallback(self.saved_message, doc)
+            if doc_num % 2 == 0:
+                logger.info("Queueing test message with ID %d for bulk update",
+                            doc_num)
+                self.bulk_docs.append((did, doc, doc_type))
+                return None # we will get these later...
+            else:
+                logger.info("Creating new test message with ID %d", doc_num)
+                return self.doc_model.create_raw_document(self.account,
+                                                          did, doc, doc_type
+                            ).addCallback(self.saved_message, doc
+                            )
         else:
             logger.info("Skipping test message with ID %d - already exists",
                         doc_num)
             # we are done.
 
-    def finished(self, result):
-      self.conductor.on_synch_finished(self.account, result)
-
     def saved_message(self, result, doc):
         logger.debug("Finished saving test message %r", result)
         # done
 
+    def saved_bulk_messages(self, result, n):
+        logger.debug("Finished saving %d test messages in bulk", n)
+        # done
+
 # A 'converter' - takes a proto/test as input and creates a
 # 'raw/message/rfc822' as output.
 class TestConverter(base.ConverterBase):
     def convert(self, doc):
         me = doc['storage_key']
         headers = {'from': 'From: from%(storage_key)d@test.com',
                    'subject' : 'This is test document %(storage_key)d',
         }
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -4,17 +4,17 @@ Fetch twitter raw* objects
 '''
 
 # prevent 'import twitter' finding this module!
 from __future__ import absolute_import
 
 import logging
 import re
 import twisted.python.log
-from twisted.internet import defer, threads
+from twisted.internet import defer, threads, task
 
 from ..proc import base
 
 # See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
 # about getting twisted support into the twitter package.
 # Sadly, the twisty-twitter package has issues of its own (like calling
 # delegates outside the deferred mechanism meaning you can't rely on callbacks
 # being completed when they say they are.)
@@ -34,100 +34,91 @@ TWEET_PROPS = """
 
 class TwitterProcessor(object):
     def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
         self.doc_model = doc_model
         self.twit = None
         self.seen_tweets = None
+        # use a cooperator to do the work via a generator.
+        # XXX - should we own the cooperator or use our parents?
+        self.coop = task.Cooperator()
 
     def attach(self):
         logger.info("attaching to twitter...")
         username = self.account.details['username']
         pw = self.account.details['password']
         return threads.deferToThread(twitter.Api,
                                   username=username, password=pw
                     ).addCallback(self.attached
                     )
 
     def attached(self, twit):
+        logger.info("attached to twitter - fetching friends")
         self.twit = twit
         # build the list of all users we will fetch tweets from.
         return threads.deferToThread(self.twit.GetFriends
-                  ).addCallback(self.got_friends)
-
-    def got_friends(self, friends):
-        # None at the start means 'me'
-        self.friends_remaining = [None] + [f.screen_name for f in friends]
-        return self.process_next_friend()
+                  ).addCallback(self._cb_got_friends)
 
-    def process_next_friend(self):
-        if not self.friends_remaining:
-            logger.debug("Finished processing twitter friends")
-            return defer.maybeDeferred(self.finished, None)
+    def _cb_got_friends(self, friends):
+        return self.coop.coiterate(self.gen_friends_info(friends))
+        
+    def gen_friends_info(self, friends):
+        logger.info("apparently I've %d friends", len(friends))
+        def do_fid(fid):
+            return threads.deferToThread(self.twit.GetUserTimeline, fid
+                                ).addCallback(self.got_friend_timeline, fid
+                                ).addErrback(self.err_friend_timeline, fid
+                                ).addCallback(self.finished_friend
+                                )
 
-        fid = self.friends_remaining.pop()
-        return threads.deferToThread(self.twit.GetUserTimeline, fid
-                            ).addCallback(self.got_friend_timeline, fid
-                            ).addErrback(self.err_friend_timeline, fid
-                            ).addCallback(self.finished_friend
-                            )
+        # None means 'me'
+        yield do_fid(None)
+        for f in friends:
+            yield do_fid(f.screen_name)
+
+        logger.debug("Finished friends")
+        yield self.conductor.on_synch_finished(self.account, None) # Must die
 
     def finished_friend(self, result):
-        self.conductor.reactor.callLater(0, self.process_next_friend)
+        logger.debug("finished friend: %s", result)
 
     def err_friend_timeline(self, failure, fid):
         logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
 
     def got_friend_timeline(self, timeline, fid):
-        self.current_fid = fid
-        self.remaining_tweets = timeline[:]
         logger.debug("Friend %r has %d items in their timeline", fid,
-                     len(self.remaining_tweets))
-        return self.process_next_tweet()
+                     len(timeline))
+        return self.coop.coiterate(self.gen_friend_timeline(timeline, fid))
 
-    def process_next_tweet(self):
-        if not self.remaining_tweets:
-            logger.debug("Finished processing timeline")
-            return
-
-        tweet = self.remaining_tweets.pop()
-
-        logger.debug("seeing if tweet %r exists", tweet.id)
-        docid = "tweet.%d" % (tweet.id,)
-        return self.doc_model.open_document(docid,
-                        ).addCallback(self.maybe_process_tweet, docid, tweet)
+    def gen_friend_timeline(self, timeline, fid):
+        for tweet in timeline:
+            logger.debug("seeing if tweet %r exists", tweet.id)
+            docid = "tweet.%d" % (tweet.id,)
+            yield self.doc_model.open_document(docid,
+                            ).addCallback(self.maybe_process_tweet, docid, tweet)
 
     def maybe_process_tweet(self, existing_doc, docid, tweet):
         if existing_doc is None:
             # put the 'raw' document object together and save it.
             logger.info("New tweet '%s...' (%s)", tweet.text[:25], tweet.id)
             # create the couch document for the tweet itself.
             doc = {}
             for name in TWEET_PROPS:
                 val = getattr(tweet, name)
                 # simple hacks - users just become the user ID.
                 if isinstance(val, twitter.User):
                     val = val.id
                 doc['twitter_'+name.lower()] = val
-            return self.doc_model.create_raw_document(docid, doc, 'proto/twitter',
-                                                      self.account
-                        ).addCallback(self.saved_tweet, tweet)
+            return self.doc_model.create_raw_document(
+                            self.account, docid, doc, 'proto/twitter')
         else:
             # Must be a seen one.
             logger.debug("Skipping seen tweet '%s'", tweet.id)
-            return self.process_next_tweet()
-
-    def saved_tweet(self, result, tweet):
-        logger.debug("Finished processing of new tweet '%s'", tweet.id)
-        return self.process_next_tweet()
-
-    def finished(self, result):
-        return self.conductor.on_synch_finished(self.account, result)
 
 
 # A 'converter' - takes a proto/twitter as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appropriate)
 class TwitterConverter(base.ConverterBase):
     re_tags = re.compile(r'#(\w+)')    
     def convert(self, doc):