get a pipeline working for test messages; now onto the others... twisty
authorMark Hammond <mhammond@skippinet.com.au>
Thu, 19 Mar 2009 20:56:14 +1100
branchtwisty
changeset 100 5fafd238a09ffa3654def07d2e903ac97d09e3f7
parent 99 ce4ab1a2a289f1ec238cce72bbf29c2433261bf3
child 101 9a02e3c03c728681386819d7e30f1f5565470434
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
get a pipeline working for test messages; now onto the others...
docs/INSTALL
schema/messages/by/views/by_doc_extension_sequence/map.js
schema/messages/by/views/by_doc_roots/map.js
schema/messages/by/views/by_doc_roots/reduce.js
server/python/junius/ext/__init__.py
server/python/junius/ext/message/__init__.py
server/python/junius/ext/message/email.py
server/python/junius/ext/message/rfc822.py
server/python/junius/pipeline.py
server/python/junius/proc/base.py
server/python/junius/proto/__init__.py
server/python/junius/proto/imap.py
server/python/junius/proto/skype.py
server/python/junius/proto/test/__init__.py
server/python/junius/sync.py
server/python/run-raindrop.py
--- a/docs/INSTALL
+++ b/docs/INSTALL
@@ -1,15 +1,18 @@
-Installation Steps
+Late Breaking News/Known Problems
+=================================
 
-Have Python 2.5 or later installed.
+* The reactor must always be stopped with ctrl+c; it never stops by itself.
 
- If not 2.6: 
-    install setup_tools
-    easy_install simplejson
+
+Installation Steps
+==================
+
+Have Python 2.6 or later installed.
 
  Prereqs:
     easy_install couchdb (0.5 or later)
 
 Install couchdb trunk.  This currently means you need to google for
 instructions that work only on Linux - Windows doesn't work.  If you want to
 work on Windows you must install Couch on a Linux box and point your windows
 ~/.raindrop file at the server.
@@ -19,44 +22,74 @@ Install twisted.
 Install 'paisley' from launchpad.
 
 Either install junius:
   cd junius/server/python/junius
   python setup.py develop
 *or* just add it to your PYTHONPATH
   set PYTHONPATH=c:\src\path\to\junius\server\python
 
-configure junuis:
+configure raindrop:
   * edit ~/.raindrop
 
   * if your 'local' couchdb isn't on localhost, add a section along the lines of:
         [couch-local]
         host=hostname
         port=port
 
   * Add imap accounts along the lines of
         [account-some-account-name] # the 'account-' prefix is important!
         type=imap
         host=imap.gmail.com
         port=993
         username=username@gmail.com
         password=topsecret
         ssl=True
 
-With Couchdb running:
+  * Add a 'test' account - for the 'test suite' - along the lines of:
+        [account-test]
+        kind=test
+        username=test # not used, but we die without it!
+        num_test_docs=1 # this defaults to 5 - but 1 is useful sometimes!
 
-Setup the tables:
+With the couchdb server running....
 
-  python run-raindrop.py --nuke-db-and-delete-everything-forever
+Setup the database and upload views and other content:
 
-  WARNING: using this coommand will wipe ALL your databases we may touch.
-  USE WITH CAUTION
+  WARNING: using this command will wipe ALL your databases we may touch.
+  USE WITH CAUTION.
+
+  % run-raindrop.py nuke-db-and-delete-everything-forever
+
+  See --help for how to update the various bits without nuking the database.
 
 Go to http://127.0.0.1:5984/_utils/index.html to check that the table(s) are 
 setup
 
-Load the mail:
-  python run-raindrop.py sync_messages
+Test everything using the 'test suite' (one day it *will* be a test suite :)
+
+  % run-raindrop.py -p test sync-messages process
+
+  THEN repeat the above - sorry about that - the first run didn't quite
+  do everything it was supposed to.
+
+  % run-raindrop.py -p test sync-messages process
+
+  Note the '-p' says to only load the 'test' protocol; without it we
+  will attempt to load all protocols (eg, imap, skype, twitter, etc).
+  But further note that we don't attempt a protocol - even the 'test' 
+  protocol - until we have an account of that 'type' set up.
+
+  Add '-l debug' to the above command-line for debug log messages.  This
+  can get quite verbose; you can also say -l log.name=debug to set a specific
+  log to debug.
+
+  See --help for more.
+
+
+Get *real* messages:
+
+  % run-raindrop.py sync-messages process
 
 (reload http://127.0.0.1:5984/_utils/index.html to see stuff in the messages view)
 
 Go to http://127.0.0.1:5984/junius/files/index.xhtml and do autocomplete to do
 searches.
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_extension_sequence/map.js
@@ -0,0 +1,5 @@
+function(doc) 
+{
+    var root_id = doc._id.split("!", 1)[0];
+    emit([root_id, doc.raindrop_seq], doc.type);
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_roots/map.js
@@ -0,0 +1,5 @@
+function(doc) 
+{
+    var root_id = doc._id.split("!", 1)[0];
+    emit(root_id, null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_roots/reduce.js
@@ -0,0 +1,4 @@
+function(keys, vals)
+{
+    return vals.length
+}
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core extensions
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core message processor extensions.
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/email.py
@@ -0,0 +1,15 @@
+# This is an extension which converts a message/raw/email to a 'message'
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class EmailConverter(base.ConverterBase):
+    def convert(self, doc):
+        # for now, the email repr has all we need.
+        ret = doc.copy()
+        for n in ret.keys():
+            if n.startswith('_') or n.startswith('raindrop'):
+                del ret[n]
+        return ret
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/rfc822.py
@@ -0,0 +1,22 @@
+# This is an extension which converts a message/raw/rfc822 to a
+# message/raw/rfc822
+from __future__ import absolute_import # stop 'email' import finding our ext
+
+import logging
+from email.parser import HeaderParser
+from twisted.internet import defer
+
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class RFC822Converter(base.ConverterBase):
+    def __init__(self, *args, **kw):
+        super(RFC822Converter, self).__init__(*args, **kw)
+        self.hdr_parser = HeaderParser()
+    def convert(self, doc):
+        msg = self.hdr_parser.parsestr(doc['headers'])
+        return {'from': msg['from'],
+                'subject': msg['subject'],
+                'body': doc['body']}
new file mode 100644
--- /dev/null
+++ b/server/python/junius/pipeline.py
@@ -0,0 +1,96 @@
+# This is the raindrop pipeline; it moves messages from their most raw
+# form to their most useful form.
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Simple forward chaining.
+chain = [
+    # trom_type, to_type, transformer)
+    ('proto/test',         'raw/message/rfc822',
+                           'junius.proto.test.TestConverter'),
+    #('proto/skype', 'raw/message/email', 'core.proto.skype'),
+    #('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
+    ('raw/message/rfc822', 'raw/message/email',
+                           'junius.ext.message.rfc822.RFC822Converter'),
+    ('raw/message/email',  'message',
+                           'junius.ext.message.email.EmailConverter'),
+]
+
+
+class Pipeline(object):
+    def __init__(self, doc_model):
+        self.doc_model = doc_model
+        self.forward_chain = {}
+        for from_type, to_type, xformname in chain:
+            root, tail = xformname.rsplit('.', 1)
+            try:
+                mod = __import__(root, fromlist=[tail])
+                klass = getattr(mod, tail)
+                inst = klass(doc_model)
+            except:
+                logger.exception("Failed to load extension %r", xformname)
+            else:
+                self.forward_chain[from_type] = (to_type, inst)
+
+    def start(self):
+        return defer.maybeDeferred(self.process_all_documents
+                    ).addCallback(self._cb_maybe_reprocess_all_documents)
+
+    def process_all_documents(self):
+        # check *every* doc in the DB.  This exists until we can derive
+        # a workqueue based on views.\
+        self.num_this_process = 0
+        return self.doc_model.db.openView('raindrop!messages!by',
+                                          'by_doc_roots',
+                                          group=True, limit=100,
+                    ).addCallback(self._cb_roots_opened)
+
+    def _cb_maybe_reprocess_all_documents(self, result):
+        if self.num_this_process != 0:
+            logger.debug('pipeline processed %d documents last time; trying again',
+                         self.num_this_process)
+            return self.start()
+        logger.info('pipeline is finished.')
+
+    def _cb_roots_opened(self, rows):
+        dl = []
+        for row in rows:
+            did = row['key']
+            logger.debug("Finding last extention point for %s", did)
+            dl.append(
+                self.doc_model.get_last_ext_for_document(did).addCallback(
+                    self._cb_got_doc_last_ext, did)
+                )
+        return defer.DeferredList(dl)
+
+    def _cb_got_doc_last_ext(self, ext_info, rootdocid):
+        last_ext, docid = ext_info
+        if last_ext is None:
+            logger.debug("Document '%s' doesn't appear to be a message; skipping",
+                        rootdocid)
+            return
+        try:
+            xform_info = self.forward_chain[last_ext]
+        except KeyError:
+            logger.warning("Can't find transformer for message type %r - skipping %r",
+                           last_ext, docid)
+            return
+        logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
+        return self.doc_model.open_document(docid
+                    ).addCallback(self._cb_got_last_doc, rootdocid, xform_info
+                    )
+
+    def _cb_got_last_doc(self, doc, rootdocid, xform_info):
+        assert doc is not None, "failed to locate the doc for %r" % rootdocid
+        dest_type, xformer = xform_info
+        logger.debug("calling %r to create a %s from %s", xformer, dest_type,
+                     doc)
+        return defer.maybeDeferred(xformer.convert, doc
+                        ).addCallback(self._cb_converted, dest_type, rootdocid)
+
+    def _cb_converted(self, new_doc, dest_type, rootdocid):
+        logger.debug("converter created new document %s", new_doc)
+        self.num_this_process += 1
+        return self.doc_model.create_ext_document(new_doc, dest_type, rootdocid)
--- a/server/python/junius/proc/base.py
+++ b/server/python/junius/proc/base.py
@@ -89,8 +89,12 @@ class AccountBase(Rat):
 
   def sync(self):
     pass
 
   def verify(self):
     '''
     '''
     pass
+
+class ConverterBase(object):
+    def __init__(self, doc_model):
+       self.doc_model = doc_model
--- a/server/python/junius/proto/__init__.py
+++ b/server/python/junius/proto/__init__.py
@@ -1,22 +1,25 @@
 # this needs to become a 'plugin' mechanism...
 
 _protocol_infos = [
     ('imap', 'junius.proto.imap', 'IMAPAccount'),
     ('skype', 'junius.proto.skype', 'SkypeAccount'),
     ('twitter', 'junius.proto.twitter', 'TwitterAccount'),
 ]
+if __debug__:
+    _protocol_infos.append(('test', 'junius.proto.test', 'TestAccount'))
 
 protocols = {}
 def init_protocols():
     import sys, logging
     logger = logging.getLogger('raindrop.proto')
     for name, mod, factname in _protocol_infos:
         try:
+            logger.debug("attempting import of '%s' for '%s'", mod, factname)
             __import__(mod)
             mod = sys.modules[mod]
             fact = getattr(mod, factname)
         except ImportError, why:
             logger.error("Failed to import '%s' factory: %s", name, why)
         except:
             logger.exception("Error creating '%s' factory", name)
         else:
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,9 +1,9 @@
-from twisted.internet import protocol, ssl, defer, reactor, error
+from twisted.internet import protocol, ssl, defer, error
 from twisted.mail import imap4
 import logging
 
 from ..proc import base
 from ..model import get_db
 
 brat = base.Rat
 
@@ -18,19 +18,17 @@ class ImapClient(imap4.IMAP4Client):
   '''
   Much of our logic here should perhaps be in another class that holds a
   reference to the IMAP4Client instance subclass.  Consider refactoring if we
   don't turn out to benefit from the subclassing relationship.
   '''
   def finished(self, result):
     # See bottom of file - it would be good to remove this...
     logger.info("Finished synchronizing IMAP folders")
-    # XXX - this should be done via callback/errback
-    from ..sync import get_conductor
-    return get_conductor().accountFinishedSync(self.account, result)
+    self.conductor.on_synch_finished(self.account, result)
 
   def serverGreeting(self, caps):
     logger.debug("IMAP server greeting: capabilities are %s", caps)
     return self._doAuthenticate(
             ).addCallback(self._reqList
             ).addBoth(self.finished)
 
   def _doAuthenticate(self):
@@ -182,32 +180,35 @@ class ImapClient(imap4.IMAP4Client):
 
   def accountStatus(self, result, *args):
     return self.account.reportStatus(*args)
 
 
 class ImapClientFactory(protocol.ClientFactory):
   protocol = ImapClient
 
-  def __init__(self, account):
+  def __init__(self, account, conductor):
     self.account = account
+    self.conductor = conductor
 
     self.ctx = ssl.ClientContextFactory()
     self.backoff = 8 # magic number
 
   def buildProtocol(self, addr):
     p = self.protocol(self.ctx)
     p.factory = self
     p.account = self.account
+    p.conductor = self.conductor
     return p
 
   def connect(self):
     details = self.account.details
     logger.debug('attempting to connect to %s:%d (ssl: %s)',
                  details['host'], details['port'], details['ssl'])
+    reactor = self.conductor.reactor    
     if details.get('ssl'):
       reactor.connectSSL(details['host'], details['port'], self, self.ctx)
     else:
       reactor.connectTCP(details['host'], details['port'], self)
 
   def clientConnectionLost(self, connector, reason):
     # the flaw in this is that we start from scratch every time; which is why
     #  most of the logic in the client class should really be pulled out into
@@ -216,33 +217,33 @@ class ImapClientFactory(protocol.ClientF
     # this - meaning we retry just to hit the same exception.
     if reason.check(error.ConnectionDone):
         # only an error if premature
         if not self.deferred.called:
             self.deferred.errback(reason)
     else:
         #self.deferred.errback(reason)
         logger.debug('lost connection to server, going to reconnect in a bit')
-        reactor.callLater(2, self.connect)
+        self.conductor.reactor.callLater(2, self.connect)
 
   def clientConnectionFailed(self, connector, reason):
     self.account.reportStatus(brat.SERVER, brat.BAD, brat.UNREACHABLE,
                               brat.TEMPORARY)
     logger.warning('Failed to connect, will retry after %d secs',
                    self.backoff)
     # It occurs that some "account manager" should be reported of the error,
     # and *it* asks us to retry later?  eg, how do I ask 'ignore backoff -
     # try again *now*"?
-    reactor.callLater(self.backoff, self.connect)
+    self.conductor.reactor.callLater(self.backoff, self.connect)
     self.backoff = min(self.backoff * 2, 600) # magic number
 
 
 class IMAPAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
   def startSync(self, conductor):
-    self.factory = ImapClientFactory(self)
+    self.factory = ImapClientFactory(self, conductor)
     self.factory.connect()
     # XXX - wouldn't it be good to return a deferred here, so the conductor
     # can reliably wait for the deferred to complete, rather than forcing
     # each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -6,17 +6,17 @@ Fetch skype contacts and chats.
 import base64, datetime
 import pprint
 import time
 import re
 import logging
 from urllib2 import urlopen
 
 import twisted.python.log
-from twisted.internet import reactor, defer, threads
+from twisted.internet import defer, threads
 
 from ..proc import base
 from ..model import get_db
 
 import Skype4Py
 
 logger = logging.getLogger(__name__)
 
@@ -67,19 +67,19 @@ def simple_convert(str_val, typ):
         return str_val.split()
     if typ is bool:
         return str_val == "TRUE"
     # all the rest are callables which 'do the right thing'
     return typ(str_val)
 
 
 class TwistySkype(object):
-    def __init__(self, account, reactor):
+    def __init__(self, account, conductor):
         self.account = account
-        self.reactor = reactor
+        self.conductor = conductor
         self.skype = Skype4Py.Skype()
 
     def attach(self):
         logger.info("attaching to skype...")
         d = threads.deferToThread(self.skype.Attach)
         d.addCallback(self.attached)
         return d
 
@@ -115,18 +115,17 @@ class TwistySkype(object):
             d.addCallback(self.start_processing_messages, chat)
 
         # and when this deferred chain completes, this account is complete.
         d.addCallback(self.finished)
 
         return d.callback(None)
 
     def finished(self, result):
-      from ..sync import get_conductor
-      return get_conductor().accountFinishedSync(self.account, result)
+      return self.conductor.on_synch_finished(self.account, result)
 
     def got_chat(self, result, chat):
         logger.debug("starting processing of chat '%s'", chat.Name)
         # make a 'deferred list' to fetch each property one at a time.
         ds = [threads.deferToThread(chat._Property, p)
               for p, _ in CHAT_PROPS]
 
         return defer.DeferredList(ds
@@ -215,9 +214,9 @@ class TwistySkype(object):
 
 
 class SkypeAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
   def startSync(self, conductor):
-    TwistySkype(self, reactor).attach()
+    TwistySkype(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/junius/proto/test/__init__.py
@@ -0,0 +1,79 @@
+# This is an implementation of a 'test' protocol.
+import logging
+from twisted.internet import defer, error
+
+
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+from ...model import get_doc_model
+
+class TestMessageProvider(object):
+    def __init__(self, account, conductor):
+        self.account = account
+        self.conductor = conductor
+        self.doc_model = get_doc_model()
+
+    def attach(self):
+        logger.info("preparing to synch test messages...")
+        # Is a 'DeferredList' more appropriate here?
+        d = defer.Deferred()
+        num_docs = int(self.account.details.get('num_test_docs', 5))
+        logger.info("Creating %d test documents", num_docs)
+        for i in xrange(num_docs):
+            d.addCallback(self.check_test_message, i)
+        d.addCallback(self.finished)
+        return d.callback(None)
+
+    def check_test_message(self, result, i):
+        logger.debug("seeing if message with ID %d exists", i)
+        return self.doc_model.open_document("test.message.%d" % i,
+                        ).addCallback(self.process_test_message, i)
+
+    def process_test_message(self, existing_doc, doc_num):
+        if existing_doc is None:
+            logger.info("Creating new test message with ID %d", doc_num)
+            doc = dict(
+              _id="test.message.%d" % doc_num,
+              storage_key=doc_num,
+              )
+            return self.doc_model.create_raw_document(doc, 'proto/test',
+                                                      self.account
+                        ).addCallback(self.saved_message, doc)
+        else:
+            logger.info("Skipping test message with ID %d - already exists",
+                        doc_num)
+            # we are done.
+
+    def finished(self, result):
+      self.conductor.on_synch_finished(self.account, result)
+
+    def saved_message(self, result, doc):
+        logger.debug("Finished saving test message (%s)", doc['_id'])
+        # done
+
+# A 'converter' - takes a proto/test as input and creates a
+# 'raw/message/rfc822' as output.
+class TestConverter(base.ConverterBase):
+    def convert(self, doc):
+        me = doc['storage_key']
+        headers = """\
+From: from%(storage_key)d@test.com
+To: from%(storage_key)d@test.com
+Subject: This is test document %(storage_key)d
+""" % doc
+        headers = headers.replace('\n', '\r\n')
+        body = "Hello, this is test message %(storage_key)d" % doc
+
+        new_doc = dict(headers=headers, body=body)
+        return new_doc
+
+
+class TestAccount(base.AccountBase):
+  def __init__(self, db, details):
+    self.db = db
+    self.details = details
+
+  def startSync(self, conductor):
+    TestMessageProvider(self, conductor).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,12 +1,13 @@
 import logging
 
 from twisted.internet import reactor, defer
 from twisted.python.failure import Failure
+import twisted.web.error
 import paisley
 
 import junius.proto as proto
 from junius.model import get_db
 
 logger = logging.getLogger(__name__)
 
 _conductor = None
@@ -15,70 +16,84 @@ def get_conductor(options=None):
   global _conductor
   if _conductor is None:
     proto.init_protocols()
     _conductor = SyncConductor(options)
   else:
     assert options is None, 'can only set this at startup'
   return _conductor
   
-  
+
+# XXX - rename this to plain 'Conductor' and move to a different file.
+# This 'conducts' synchronization, the work queues and the interactions with
+# the extensions and database.
 class SyncConductor(object):
   def __init__(self, options):
     self.log = logger
     self.options = options
+    # apparently it is now considered 'good form' to pass reactors around, so
+    # a future of multiple reactors is possible.
+    # We capture it here, and all the things we 'conduct' use this reactor
+    # (but later it should be passed to our ctor too)
+    self.reactor = reactor
 
     self.db = get_db()
 
     self.active_accounts = []
 
   def _ohNoes(self, failure, *args, **kwargs):
     self.log.error('OH NOES! failure! %s', failure)
-    reactor.stop()
+    #self.reactor.stop()
 
   def _getAllAccounts(self):
     return self.db.openView('raindrop!accounts!all', 'all'
       ).addCallback(self._gotAllAccounts
       ).addErrback(self._ohNoes)
 
   def _gotAllAccounts(self, rows, *args, **kwargs):
     self.log.info("Have %d accounts to synch", len(rows))
     for row in rows:
       account_details = row['value']
       kind = account_details['kind']
+      self.log.debug("Found account using protocol %s", kind)
       if not self.options.protocols or kind in self.options.protocols:
         if kind in proto.protocols:
           account = proto.protocols[kind](self.db, account_details)
           self.log.info('Starting sync of %s account: %s',
                         kind, account_details.get('name', '(un-named)'))
           account.startSync(self)
           self.active_accounts.append(account)
         else:
           self.log.error("Don't know what to do with account kind: %s", kind)
       else:
           self.log.info("Skipping account - protocol '%s' is disabled", kind)
 
-  def accountFinishedSync(self, account, result):
+  def sync(self, whateva=None):
+    return self._getAllAccounts()
+
+  # The callbacks called by the accounts as they do interesting things.
+  # XXX - this kinda sucks - ideally when we start the sync we would get
+  # a deferred, which we could add own callback to to manage this.
+  # The complication is IMAP - ProtocolFactory based clients don't lend
+  # themselves to this.
+  def on_synch_finished(self, account, result):
     if isinstance(result, Failure):
       self.log.error("Account %s failed with an error: %s", account, result)
     else:
       self.log.debug("Account %s reports it has finished", account)
     assert account in self.active_accounts, (account, self.active_accounts)
     self.active_accounts.remove(account)
     if not self.active_accounts:
-      self.log.info("sync has finished; stopping reactor")
-      return reactor.stop()
-
-  def sync(self, whateva=None):
-    return self._getAllAccounts()
-
+      self.log.info("sync has finished; NOT stopping reactor yet, but I should...")
+      #self.log.info("sync has finished; stopping reactor")
+      #return self.reactor.stop()
 
 if __name__ == '__main__':
   # normal entry-point is the app itself; this is purely for debugging...
   logging.basicConfig()
   logging.getLogger().setLevel(logging.DEBUG)
 
   conductor = get_conductor()
-  reactor.callWhenRunning(conductor.sync)
+  conductor.reactor.callWhenRunning(conductor.sync)
 
   logger.debug('starting reactor')
-  reactor.run()
+  conductor.reactor.run()
   logger.debug('reactor done')
--- a/server/python/run-raindrop.py
+++ b/server/python/run-raindrop.py
@@ -3,16 +3,17 @@
 import sys
 import optparse
 import logging
 
 from twisted.internet import reactor, defer
 
 from junius import model
 from junius import bootstrap
+from junius import pipeline
 from junius.sync import get_conductor
 
 logger = logging.getLogger('raindrop')
 
 class HelpFormatter(optparse.IndentedHelpFormatter):
     def format_description(self, description):
         return description
 
@@ -43,16 +44,23 @@ def install_files(result, parser, option
     return model.fab_db(update_views=True
             ).addCallback(bootstrap.install_client_files)
 
 def sync_messages(result, parser, options):
     """Synchronize all messages from all accounts"""
     conductor = get_conductor()
     return conductor.sync(None)
 
+def process(result, parser, options):
+    """Process all messages to see if any extensions need running"""
+    def done(result):
+        print "Message pipeline has finished..."
+    p = pipeline.Pipeline(model.get_doc_model())
+    return p.start().addCallback(done)
+
 def _setup_logging(options):
     init_errors = []
     logging.basicConfig()
     for val in options.log_level: # a list of all --log-level options...
         try:
             name, level = val.split("=", 1)
         except ValueError:
             name = None