move back to views returning the 'raw' result; share a coop; misc others twisty
authorMark Hammond <mhammond@skippinet.com.au>
Tue, 31 Mar 2009 23:35:12 +1100
branchtwisty
changeset 149 93d6e2aa1a5068cd67c1d34a450a4d9c2cde07ff
parent 148 5bbfd86b049a1c6ae78fb7e39e050e6cb48d1bc9
child 150 25d0f986332b4009160017cd2e17b703ee15a3ba
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
move back to views returning the 'raw' result; share a coop; misc others
server/python/junius/model.py
server/python/junius/pipeline.py
server/python/junius/proc/base.py
server/python/junius/proto/imap.py
server/python/junius/proto/skype.py
server/python/junius/proto/test/__init__.py
server/python/junius/proto/twitter.py
server/python/junius/sync.py
server/python/run-raindrop.py
--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -34,24 +34,16 @@ if sys.platform=='win32':
     clock_start = time.time()
     time.clock() # time.clock starts counting from zero the first time its called.
     def get_seq():
         return clock_start + time.clock()
 else:
     get_seq = time.time
 
 
-def _raw_to_rows(raw):
-    # {'rows': [], 'total_rows': 0} -> the actual rows.
-    ret = raw['rows']
-    # hrmph - on a view with start_key etc params, total_rows will be
-    # greater than the rows.
-    assert 'total_rows' not in raw or len(ret)<=raw['total_rows'], raw
-    return ret
-
 # from the couchdb package; not sure what makes these names special...
 def _encode_options(options):
     retval = {}
     for name, value in options.items():
         if name in ('key', 'startkey', 'endkey') \
                 or not isinstance(value, basestring):
             value = json.dumps(value, allow_nan=False, ensure_ascii=False)
         retval[name] = value
@@ -62,24 +54,19 @@ class CouchDB(paisley.CouchDB):
     def postob(self, uri, ob):
         # This seems to not use keep-alives etc where using twisted.web
         # directly does?
         body = json.dumps(ob, allow_nan=False,
                           ensure_ascii=False).encode('utf-8')
         return self.post(uri, body)
 
     def openView(self, *args, **kwargs):
-        # The base class of this returns the raw json object - eg:
-        # {'rows': [], 'total_rows': 0}
-        # XXX - Note that paisley isn't interested in this enhancement, so
-        # we need to remove it...
-
-        # *sob* - and it also doesn't handle encoding options...
+        # paisley doesn't handle encoding options...
         return super(CouchDB, self).openView(*args, **_encode_options(kwargs)
-                        ).addCallback(_raw_to_rows)
+                        )
 
     def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
         # paisley appears to use an old api for attachments?
         if attachment:
             uri = "/%s/%s/%s" % (dbName, docId, attachment)
             return  self.get(uri)
         return super(CouchDB, self).openDoc(dbName, docId, revision, full)
 
@@ -180,16 +167,21 @@ def quote_id(doc_id):
 class DocumentModel(object):
     """The layer between 'documents' and the 'database'.  Responsible for
        creating the unique ID for each document (other than the raw document),
        for fetching documents based on an ID, etc
     """
     def __init__(self, db):
         self.db = db
 
+    def open_view(self, *args, **kwargs):
+        # A convenience method for completeness so consumers don't hit the
+        # DB directly (and to give a consistent 'style').  Is it worth it?
+        return self.db.openView(*args, **kwargs)
+
     def open_document(self, doc_id, **kw):
         """Open the specific document, returning None if it doesn't exist"""
         return self.db.openDoc(quote_id(doc_id), **kw).addBoth(self._cb_doc_opened)
 
     def _cb_doc_opened(self, result):
         if isinstance(result, Failure):
             result.trap(twisted.web.error.Error)
             if result.value.status != '404': # not found
@@ -335,22 +327,23 @@ class DocumentModel(object):
     def get_last_ext_for_document(self, doc_id):
         """Given a base docid, find the most-recent extension to have run.
         This will differ from the latest extension in the document chain if
         the document chain has been 'reset' for any reason (eg, change
         detected at the source of the message, user adding annotations, etc)
         """
         startkey = [doc_id]
         endkey = [doc_id, {}]
-        return self.db.openView('raindrop!messages!workqueue',
-                                'by_doc_extension_sequence',
-                                startkey=startkey, endkey=endkey
+        return self.open_view('raindrop!messages!workqueue',
+                              'by_doc_extension_sequence',
+                              startkey=startkey, endkey=endkey
                     ).addCallback(self._cb_des_opened, doc_id)
 
-    def _cb_des_opened(self, rows, doc_id):
+    def _cb_des_opened(self, result, doc_id):
+        rows = result['rows']
         if not rows:
             ret = None, None
         else:
             last = rows[-1]
             ret = last["value"], last["id"]
         logger.debug("document '%s' has last-extension of %r", doc_id, ret)
         return ret
 
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -51,29 +51,29 @@ class Pipeline(object):
             else:
                 assert not to_type, 'no xformname must mean no to_type'
                 self.forward_chain[from_type] = None
 
     def unprocess(self):
         # A bit of a hack that will suffice until we get better dependency
         # management.  Determines which doc-types are 'derived', then deletes
         # them all.
-        def delete_docs(rows, doc_type):
+        def delete_docs(result, doc_type):
             docs = []
-            to_del = [(row['id'], row['value']) for row in rows]
+            to_del = [(row['id'], row['value']) for row in result['rows']]
             for id, rev in to_del:
                 docs.append({'_id': id, '_rev': rev, '_deleted': True})
             logger.info('deleting %d messages of type %r', len(docs), doc_type)
             return self.doc_model.db.updateDocuments(docs)
 
         def gen_deleting_docs(doc_types):
             for t in doc_types:
-                yield self.doc_model.db.openView('raindrop!messages!by',
-                                                 'by_doc_type',
-                                                 key=t,
+                yield self.doc_model.open_view('raindrop!messages!by',
+                                               'by_doc_type',
+                                               key=t,
                             ).addCallback(delete_docs, t)
 
         derived = set()
         for from_type, to_info in self.forward_chain.iteritems():
             if to_info is not None:
                 to_type, inst = to_info
                 derived.add(to_type)
         logger.info("deleting documents with types %r", derived)
@@ -83,26 +83,27 @@ class Pipeline(object):
         return self.coop.coiterate(self.gen_all_documents())
 
     def gen_all_documents(self):
         # check *every* doc in the DB.  This exists until we can derive
         # a workqueue based on views.
         while True:
             self.num_this_process = 0
             logger.debug('opening view for work queue...')
-            yield self.doc_model.db.openView('raindrop!messages!workqueue',
-                                             'by_doc_roots',
-                                             group=True,
+            yield self.doc_model.open_view('raindrop!messages!workqueue',
+                                           'by_doc_roots',
+                                           group=True,
                         ).addCallback(self._cb_roots_opened)
             logger.info('created %d new documents', self.num_this_process)
             if self.num_this_process == 0:
                 break
         logger.debug('finally run out of documents.')
 
-    def _cb_roots_opened(self, rows):
+    def _cb_roots_opened(self, result):
+        rows = result['rows']
         logger.info('work queue has %d items to check.', len(rows))
         def gen_todo(todo):
             for row in todo:
                 did = row['key']
                 logger.debug("Finding last extension point for %s", did)
                 yield self.doc_model.get_last_ext_for_document(did
                             ).addCallback(self._cb_got_doc_last_ext, did
                             ).addErrback(self._eb_doc_failed
--- a/server/python/junius/proc/base.py
+++ b/server/python/junius/proc/base.py
@@ -65,16 +65,20 @@ class Rat(object):
   #: permanent implies the user must take some action to correct the problem
   PERMANENT = 'permanent'
   #: unknown means it either doesn't matter or it could be temporary but the
   #:  user should potentially still be informed
   UNKNOWN = 'unknown'
 
 
 class AccountBase(Rat):
+  def __init__(self, doc_model, details):
+    self.doc_model = doc_model
+    self.details = details
+
   def reportStatus(self, what, state, why=Rat.UNKNOWN,
                    expectedDuration=Rat.UNKNOWN):
     '''
     Report status relating to this account.
 
     Everything is peachy: EVERYTHING GOOD
     Wrong password: ACCOUNT BAD PASSWORD PERMANENT
     (may be temporary if a bad password can mean many things)
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,30 +1,28 @@
-from twisted.internet import protocol, ssl, defer, error, task
+from twisted.internet import protocol, ssl, defer, error
 from twisted.mail import imap4
 import logging
 
 from ..proc import base
-from ..model import get_db
 from ..ext.message.rfc822 import doc_from_bytes
 
 brat = base.Rat
 
 logger = logging.getLogger(__name__)
 
 
 class ImapClient(imap4.IMAP4Client):
   '''
   Much of our logic here should perhaps be in another class that holds a
   reference to the IMAP4Client instance subclass.  Consider refactoring if we
   don't turn out to benefit from the subclassing relationship.
   '''
   def serverGreeting(self, caps):
     logger.debug("IMAP server greeting: capabilities are %s", caps)
-    self.coop = task.Cooperator()
     return self._doAuthenticate(
             ).addCallback(self._reqList
             ).addCallback(self.deferred.callback)
 
   def _doAuthenticate(self):
     if self.account.details.get('crypto') == 'TLS':
       d = self.startTLS(self.factory.ctx)
       d.addErrback(self.accountStatus,
@@ -41,17 +39,17 @@ class ImapClient(imap4.IMAP4Client):
     return self.login(self.account.details['username'],
                       self.account.details['password'])
 
   def _reqList(self, *args, **kwargs):
     self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
     return self.list('', '*').addCallback(self._procList)
 
   def _procList(self, result, *args, **kwargs):
-    return self.coop.coiterate(self.gen_folder_list(result))
+    return self.conductor.coop.coiterate(self.gen_folder_list(result))
 
   def gen_folder_list(self, result):
     for flags, delim, name in result:
       logger.debug('Processing folder %s (flags=%s)', name, flags)
       if r"\Noselect" in flags:
         logger.debug("'%s' is unselectable - skipping", name)
         continue
 
@@ -64,23 +62,24 @@ class ImapClient(imap4.IMAP4Client):
 
       yield self.examine(name
                  ).addCallback(self._examineFolder, name
                  ).addErrback(self._cantExamineFolder, name)
     logger.debug('imap processing finished.')
 
   def _examineFolder(self, result, folder_path):
     logger.debug('Looking for messages already fetched for folder %s', folder_path)
-    return get_db().openView('raindrop!proto!imap', 'seen',
+    return self.doc_model.open_view('raindrop!proto!imap', 'seen',
                              startkey=[folder_path], endkey=[folder_path, {}]
               ).addCallback(self._fetchAndProcess, folder_path)
 
-  def _fetchAndProcess(self, rows, folder_path):
+  def _fetchAndProcess(self, result, folder_path):
     # XXX - we should look at the flags and update the message if it's not
     # the same - later.
+    rows = result['rows']
     logger.debug('_FetchAndProcess says we have seen %d items for %r',
                  len(rows), folder_path)
     seen_uids = set(row['key'][1] for row in rows)
     # now build a list of all message currently in the folder.
     allMessages = imap4.MessageSet(1, None)
     return self.fetchUID(allMessages, True).addCallback(
             self._gotUIDs, folder_path, seen_uids)
 
@@ -94,17 +93,17 @@ class ImapClient(imap4.IMAP4Client):
         # XXX - we need something to make this truly unique.
         did = "%s#%d" % (folder_name, uid)
         logger.debug("new imap message %r - fetching content", did)
         # grr - we have to get the rfc822 body and the flags in separate requests.
         to_fetch = imap4.MessageSet(uid)
         yield self.fetchMessage(to_fetch, uid=True
                     ).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
                     )
-    return self.coop.coiterate(gen_remaining(name, remaining))
+    return self.conductor.coop.coiterate(gen_remaining(name, remaining))
 
   def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
     _, result = result.popitem()
     content = result['RFC822']
     # grr - get the flags
     logger.debug("message %r has %d bytes; fetching flags", did, len(content))
     return self.fetchFlags(to_fetch, uid=True
                 ).addCallback(self._cb_got_flags, uid, did, folder_name, content
@@ -144,31 +143,31 @@ class ImapClient(imap4.IMAP4Client):
 
   def accountStatus(self, result, *args):
     return self.account.reportStatus(*args)
 
 
 class ImapClientFactory(protocol.ClientFactory):
   protocol = ImapClient
 
-  def __init__(self, account, conductor, doc_model):
+  def __init__(self, account, conductor):
     # base-class has no __init__
     self.account = account
     self.conductor = conductor
-    self.doc_model = doc_model
+    self.doc_model = account.doc_model # this is a little confused...
 
     self.ctx = ssl.ClientContextFactory()
     self.backoff = 8 # magic number
 
   def buildProtocol(self, addr):
     p = self.protocol(self.ctx)
     p.factory = self
     p.account = self.account
     p.conductor = self.conductor
-    p.doc_model = self.doc_model
+    p.doc_model = self.account.doc_model
     p.deferred = self.deferred # this isn't going to work in reconnect scenarios
     return p
 
   def connect(self):
     details = self.account.details
     logger.debug('attempting to connect to %s:%d (ssl: %s)',
                  details['host'], details['port'], details['ssl'])
     reactor = self.conductor.reactor
@@ -216,15 +215,11 @@ class IMAPConverter(base.ConverterBase):
 
   def _cb_got_attachment(self, content, doc):
     assert content, "can't locate attachment for %r" % doc['_id']
     # the 'rfc822' module knows what to do...
     return doc_from_bytes(content)
 
 
 class IMAPAccount(base.AccountBase):
-  def __init__(self, db, details):
-    self.db = db
-    self.details = details
-
-  def startSync(self, conductor, doc_model):
-    self.factory = ImapClientFactory(self, conductor, doc_model)
+  def startSync(self, conductor):
+    self.factory = ImapClientFactory(self, conductor)
     return self.factory.connect()
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,16 +1,16 @@
 #!/usr/bin/env python
 '''
 Fetch skype contacts and chats.
 '''
 import logging
 
 import twisted.python.log
-from twisted.internet import defer, threads, task
+from twisted.internet import defer, threads
 
 from ..proc import base
 
 import Skype4Py
 
 logger = logging.getLogger(__name__)
 
 # These are the raw properties we fetch from skype.
@@ -60,23 +60,20 @@ def simple_convert(str_val, typ):
         return str_val.split()
     if typ is bool:
         return str_val == "TRUE"
     # all the rest are callables which 'do the right thing'
     return typ(str_val)
 
 
 class TwistySkype(object):
-    def __init__(self, account, conductor, doc_model):
+    def __init__(self, account, conductor):
         self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
         self.conductor = conductor
-        self.doc_model = doc_model
-        # use a cooperator to do the work via a generator.
-        # XXX - should we own the cooperator or use our parents?
-        self.coop = task.Cooperator()
         self.skype = Skype4Py.Skype()
 
     def get_docid_for_chat(self, chat):
         return "skypechat-" + chat.Name.encode('utf8') # hrmph!
 
     def get_docid_for_msg(self, msg):
         return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
 
@@ -99,54 +96,55 @@ class TwistySkype(object):
         # Sadly the skype lib doesn't offer a clean way of doing this.)
         def gen_chats(chats):
             for chat in chats:
                 yield threads.deferToThread(chat._GetMessages
                         ).addCallback(self._cb_got_messages, chat,
                         )
             logger.info("skype has finished processing all chats")
 
-        return self.coop.coiterate(gen_chats(chats))
+        return self.conductor.coop.coiterate(gen_chats(chats))
 
     def _cb_got_messages(self, messages, chat):
         logger.debug("chat '%s' has %d message(s) total; looking for new ones",
                      chat.Name, len(messages))
 
         # Finally got all the messages for this chat.  Execute a view to
         # determine which we have seen (note that we obviously could just
         # fetch the *entire* chats+msgs view once - but we do it this way on
         # purpose to ensure we remain scalable...)
-        return self.doc_model.db.openView('raindrop!proto!skype', 'seen',
-                                          startkey=[chat.Name],
-                                          endkey=[chat.Name, {}]
+        return self.doc_model.open_view('raindrop!proto!skype', 'seen',
+                                        startkey=[chat.Name],
+                                        endkey=[chat.Name, {}]
                     ).addCallback(self._cb_got_seen, chat, messages
                     )
 
     def _cb_got_seen(self, result, chat, messages):
         msgs_by_id = dict((m._Id, m) for m in messages)
         chatname = chat.Name
         # The view gives us a list of [chat_name, msg_id], where msg_id is
         # None if we've seen the chat itself.  Create a set of messages we
         # *haven't* seen - including the [chat_name, None] entry if applic.
         all_keys = [(chatname, None)]
         all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
-        seen_chats = set([tuple(row['key']) for row in result])
+        seen_chats = set([tuple(row['key']) for row in result['rows']])
         add_bulk = [] # we bulk-update these at the end!
         remaining = set(all_keys)-set(seen_chats)
         # we could just process the empty list as normal, but the logging of
         # an info when we do have items is worthwhile...
         if not remaining:
             logger.debug("Chat %r has no new items to process", chatname)
             return None
         # we have something to do...
         logger.info("Chat %r has %d items to process", chatname,
                     len(remaining))
         logger.debug("we've already seen %d items from this chat",
                      len(seen_chats))
-        return self.coop.coiterate(self.gen_items(chat, remaining, msgs_by_id))
+        return self.conductor.coop.coiterate(
+                    self.gen_items(chat, remaining, msgs_by_id))
 
     def gen_items(self, chat, todo, msgs_by_id):
         tow = []
         for _, msgid in todo:
             if msgid is None:
                 # we haven't seen the chat itself - do that.
                 logger.debug("Creating new skype chat %r", chat.Name)
                 # make a 'deferred list' to fetch each property one at a time.
@@ -213,14 +211,10 @@ class SkypeConverter(base.ConverterBase)
                 'body': doc['skype_body'],
                 'body_preview': doc['skype_body'][:128],
                 'conversation_id': doc['skype_chatname'],
                 'timestamp': doc['skype_timestamp'], # skype's works ok here?
                 }
 
 
 class SkypeAccount(base.AccountBase):
-  def __init__(self, db, details):
-    self.db = db
-    self.details = details
-
-  def startSync(self, conductor, doc_model):
-    return TwistySkype(self, conductor, doc_model).attach()
+  def startSync(self, conductor):
+    return TwistySkype(self, conductor).attach()
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,38 +1,36 @@
 # This is an implementation of a 'test' protocol.
 import logging
-from twisted.internet import defer, error, task
+from twisted.internet import defer, error
 
 logger = logging.getLogger(__name__)
 
 from ...proc import base
 
 class TestMessageProvider(object):
-    def __init__(self, account, conductor, doc_model):
+    def __init__(self, account, conductor):
         self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
         self.conductor = conductor
-        self.doc_model = doc_model
 
     def sync_generator(self):
         num_docs = int(self.account.details.get('num_test_docs', 5))
         logger.info("Creating %d test documents", num_docs)
         for i in xrange(num_docs):
             yield self.check_test_message(i)
         if self.bulk_docs:
             yield self.doc_model.create_raw_documents(self.account,
                                                       self.bulk_docs)
 
     def attach(self):
         logger.info("preparing to synch test messages...")
         self.bulk_docs = [] # anything added here will be done in bulk
-        # Experimenting with a 'cooperator'...
-        coop = task.Cooperator()
-        gen = self.sync_generator()
-        return coop.coiterate(gen)
+        # use the cooperator for testing purposes.
+        return self.conductor.coop.coiterate(self.sync_generator())
 
     def check_test_message(self, i):
         logger.debug("seeing if message with ID %d exists", i)
         return self.doc_model.open_document("test.message.%d" % i,
                         ).addCallback(self.process_test_message, i)
 
     def process_test_message(self, existing_doc, doc_num):
         if existing_doc is None:
@@ -82,14 +80,10 @@ class TestConverter(base.ConverterBase):
                                     "data" : 'test\0blob'
                                     }
                       }
         new_doc = dict(headers=headers, body=body, _attachments=attachments)
         return new_doc
 
 
 class TestAccount(base.AccountBase):
-  def __init__(self, db, details):
-    self.db = db
-    self.details = details
-
-  def startSync(self, conductor, doc_model):
-    return TestMessageProvider(self, conductor, doc_model).attach()
+  def startSync(self, conductor):
+    return TestMessageProvider(self, conductor).attach()
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -4,17 +4,17 @@ Fetch twitter raw* objects
 '''
 
 # prevent 'import twitter' finding this module!
 from __future__ import absolute_import
 
 import logging
 import re
 import twisted.python.log
-from twisted.internet import defer, threads, task
+from twisted.internet import defer, threads
 
 from ..proc import base
 
 # See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
 # about getting twisted support into the twitter package.
 # Sadly, the twisty-twitter package has issues of its own (like calling
 # delegates outside the deferred mechanism meaning you can't rely on callbacks
 # being completed when they say they are.)
@@ -28,25 +28,22 @@ logger = logging.getLogger(__name__)
 TWEET_PROPS = """
         created_at created_at_in_seconds favorited in_reply_to_screen_name
         in_reply_to_user_id in_reply_to_status_id truncated
         source id text relative_created_at user
 """.split()
 
 
 class TwitterProcessor(object):
-    def __init__(self, account, conductor, doc_model):
+    def __init__(self, account, conductor):
         self.account = account
+        self.doc_model = account.doc_model # this is a little confused...
         self.conductor = conductor
-        self.doc_model = doc_model
         self.twit = None
         self.seen_tweets = None
-        # use a cooperator to do the work via a generator.
-        # XXX - should we own the cooperator or use our parents?
-        self.coop = task.Cooperator()
 
     def attach(self):
         logger.info("attaching to twitter...")
         username = self.account.details['username']
         pw = self.account.details['password']
         return threads.deferToThread(twitter.Api,
                                   username=username, password=pw
                     ).addCallback(self.attached
@@ -55,17 +52,17 @@ class TwitterProcessor(object):
     def attached(self, twit):
         logger.info("attached to twitter - fetching friends")
         self.twit = twit
         # build the list of all users we will fetch tweets from.
         return threads.deferToThread(self.twit.GetFriends
                   ).addCallback(self._cb_got_friends)
 
     def _cb_got_friends(self, friends):
-        return self.coop.coiterate(self.gen_friends_info(friends))
+        return self.conductor.coop.coiterate(self.gen_friends_info(friends))
         
     def gen_friends_info(self, friends):
         logger.info("apparently I've %d friends", len(friends))
         def do_fid(fid):
             return threads.deferToThread(self.twit.GetUserTimeline, fid
                                 ).addCallback(self.got_friend_timeline, fid
                                 ).addErrback(self.err_friend_timeline, fid
                                 ).addCallback(self.finished_friend
@@ -82,17 +79,18 @@ class TwitterProcessor(object):
         logger.debug("finished friend: %s", result)
 
     def err_friend_timeline(self, failure, fid):
         logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
 
     def got_friend_timeline(self, timeline, fid):
         logger.debug("Friend %r has %d items in their timeline", fid,
                      len(timeline))
-        return self.coop.coiterate(self.gen_friend_timeline(timeline, fid))
+        return self.conductor.coop.coiterate(
+                    self.gen_friend_timeline(timeline, fid))
 
     def gen_friend_timeline(self, timeline, fid):
         for tweet in timeline:
             logger.debug("seeing if tweet %r exists", tweet.id)
             docid = "tweet.%d" % (tweet.id,)
             yield self.doc_model.open_document(docid,
                             ).addCallback(self.maybe_process_tweet, docid, tweet)
 
@@ -129,14 +127,10 @@ class TwitterConverter(base.ConverterBas
                 'body': body,
                 'body_preview': body[:128],
                 'tags': tags,
                 'timestamp': int(doc['twitter_created_at_in_seconds'])
                 }
 
 
 class TwitterAccount(base.AccountBase):
-  def __init__(self, db, details):
-    self.db = db
-    self.details = details
-
-  def startSync(self, conductor, doc_model):
-    return TwitterProcessor(self, conductor, doc_model).attach()
+  def startSync(self, conductor):
+    return TwitterProcessor(self, conductor).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,17 +1,16 @@
 import logging
 
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, task
 from twisted.python.failure import Failure
 import twisted.web.error
 import paisley
 
 from . import proto as proto
-from .model import get_db
 
 logger = logging.getLogger(__name__)
 
 _conductor = None
 from .model import get_doc_model
 
 def get_conductor(options=None):
   global _conductor
@@ -30,48 +29,53 @@ class SyncConductor(object):
   def __init__(self, options):
     self.log = logger
     self.options = options
     # apparently it is now considered 'good form' to pass reactors around, so
     # a future of multiple reactors is possible.
     # We capture it here, and all the things we 'conduct' use this reactor
     # (but later it should be passed to our ctor too)
     self.reactor = reactor
-
-    self.db = get_db()
+    self.coop = task.Cooperator()
+    reactor.addSystemEventTrigger("before", "shutdown", self._kill_coop)
+    self.doc_model = get_doc_model()
 
     self.active_accounts = []
 
+  def _kill_coop(self):
+    logger.debug('stopping the coordinator')
+    self.coop.stop()
+    logger.debug('coordinator stopped')
+
   def _ohNoes(self, failure, *args, **kwargs):
     self.log.error('OH NOES! failure! %s', failure)
 
   def _getAllAccounts(self):
-    return self.db.openView('raindrop!accounts!all', 'all'
+    return self.doc_model.open_view('raindrop!accounts!all', 'all'
       ).addCallback(self._gotAllAccounts
       ).addErrback(self._ohNoes)
 
-  def _gotAllAccounts(self, rows):
-    # we don't use a cooperator here as we want them all to run in parallel.
-    return defer.DeferredList([d for d in self._genAccountSynchs(rows)])
+  def _gotAllAccounts(self, result):
+    # we don't use the cooperator here as we want them all to run in parallel.
+    return defer.DeferredList([d for d in self._genAccountSynchs(result['rows'])])
 
   def _genAccountSynchs(self, rows):
-    doc_model = get_doc_model()
     self.log.info("Have %d accounts to synch", len(rows))
     to_synch = []
     for row in rows:
       account_details = row['value']
       kind = account_details['kind']
       self.log.debug("Found account using protocol %s", kind)
       if not self.options.protocols or kind in self.options.protocols:
         if kind in proto.protocols:
-          account = proto.protocols[kind](self.db, account_details)
+          account = proto.protocols[kind](self.doc_model, account_details)
           self.log.info('Starting sync of %s account: %s',
                         kind, account_details.get('name', '(un-named)'))
           self.active_accounts.append(account)
-          yield account.startSync(self, doc_model
+          yield account.startSync(self
                     ).addBoth(self._cb_sync_finished, account)
         else:
           self.log.error("Don't know what to do with account kind: %s", kind)
       else:
           self.log.info("Skipping account - protocol '%s' is disabled", kind)
 
   def sync(self, whateva=None):
     return self._getAllAccounts()
--- a/server/python/run-raindrop.py
+++ b/server/python/run-raindrop.py
@@ -23,16 +23,17 @@ def asynch_command(f):
     f.asynch = True
     return f
 
 # NOTE: All global functions with docstrings are 'commands'
 # They must all return a deferred.
 def nuke_db_and_delete_everything_forever(result, parser, options):
     """Nuke the database AND ALL MESSAGES FOREVER"""
     return model.nuke_db(
+                ).addCallback(model.fab_db
                 ).addCallback(bootstrap.install_accounts)
 
 
 # XXX - install_accounts should die too, but how to make a safe 'fingerprint'
 # so we can do it implicitly? We could create a hash which doesn't include
 # the password, but then the password changing wouldn't be enough to trigger
 # an update.  Including even the *hash* of the password might risk leaking
 # info.  So for now you must install manually.
@@ -60,39 +61,39 @@ def unprocess(result, parser, options):
     """
     def done(result):
         print "unprocess has finished..."
     # XXX - pipeline should probably be a singleton?
     p = pipeline.Pipeline(model.get_doc_model())
     return p.unprocess().addCallback(done)
 
 def delete_docs(result, parser, options):
-    """Delete all documents of a particular type.  Use with caution"""
+    """Delete all documents of a particular type.  Use with caution or see
+       the 'unprocess' command for an alternative.
+    """
     # NOTE: This is for development only, until we get a way to say
     # 'reprocess stuff you've already done' - in the meantime deleting those
     # intermediate docs has the same result...
-    from urllib import quote
-    db = model.get_db()
-
     def _del_docs(to_del):
         docs = []
         for id, rev in to_del:
             docs.append({'_id': id, '_rev': rev, '_deleted': True})
         return db.updateDocuments(docs)
 
     def _got_docs(result, dt):
-        to_del = [(row['id'], row['value']) for row in result]
+        to_del = [(row['id'], row['value']) for row in result['rows']]
         logger.info("Deleting %d documents of type %r", len(to_del), dt)
         return to_del
 
     if not options.doctypes:
         parser.error("You must specify one or more --doctype")
     deferreds = []
     for dt in options.doctypes:
-        d = db.openView('raindrop!messages!by', 'by_doc_type', key=dt
+        d = model.get_doc_model().open_view('raindrop!messages!by',
+                                            'by_doc_type', key=dt
                 ).addCallback(_got_docs, dt
                 ).addCallback(_del_docs
                 )
         deferreds.append(d)
     return defer.DeferredList(deferreds)
 
 
 def _setup_logging(options):