get a pipeline working for test messages; now onto the others...
twisty
get a pipeline working for test messages; now onto the others...
--- a/docs/INSTALL
+++ b/docs/INSTALL
@@ -1,15 +1,18 @@
-Installation Steps
+Late Breaking News/Known Problems
+=================================
-Have Python 2.5 or later installed.
+* The reactor must always be stopped with ctrl+c; it never stops by itself.
- If not 2.6:
- install setup_tools
- easy_install simplejson
+
+Installation Steps
+==================
+
+Have Python 2.6 or later installed.
Prereqs:
easy_install couchdb (0.5 or later)
Install couchdb trunk. This currently means you need to google for
instructions that work only on Linux - Windows doesn't work. If you want to
work on Windows you must install Couch on a Linux box and point your windows
~/.raindrop file at the server.
@@ -19,44 +22,74 @@ Install twisted.
Install 'paisley' from launchpad.
Either install junius:
cd junius/server/python/junius
python setup.py develop
*or* just add it to your PYTHONPATH
set PYTHONPATH=c:\src\path\to\junius\server\python
-configure junuis:
+configure raindrop:
* edit ~/.raindrop
* if your 'local' couchdb isn't on localhost, add a section along the lines of:
[couch-local]
host=hostname
port=port
* Add imap accounts along the lines of
[account-some-account-name] # the 'account-' prefix is important!
type=imap
host=imap.gmail.com
port=993
username=username@gmail.com
password=topsecret
ssl=True
-With Couchdb running:
+ * Add a 'test' account - for the 'test suite' - along the lines of:
+ [account-test]
+ kind=test
+ username=test # not used, but we die without it!
+ num_test_docs=1 # this defaults to 5 - but 1 is useful sometimes!
-Setup the tables:
+With the couchdb server running....
- python run-raindrop.py --nuke-db-and-delete-everything-forever
+Setup the database and upload views and other content:
- WARNING: using this coommand will wipe ALL your databases we may touch.
- USE WITH CAUTION
+ WARNING: using this command will wipe ALL your databases we may touch.
+ USE WITH CAUTION.
+
+ % run-raindrop.py nuke-db-and-delete-everything-forever
+
+ See --help for how to update the various bits without nuking the database.
Go to http://127.0.0.1:5984/_utils/index.html to check that the table(s) are
setup
-Load the mail:
- python run-raindrop.py sync_messages
+Test everything using the 'test suite' (one day it *will* be a test suite :)
+
+ % run-raindrop.py -p test sync-messages process
+
+ THEN repeat the above - sorry about that - the first run didn't quite
+ do everything it was supposed to.
+
+ % run-raindrop.py -p test sync-messages process
+
+ Note the '-p' says to only load the 'test' protocol; without it we
+ will attempt to load all protocols (eg, imap, skype, twitter, etc).
+ But further note that we don't attempt a protocol - even the 'test'
+ protocol - until we have an account of that 'type' set up.
+
+ Add '-l debug' to the above command-line for debug log messages. This
+ can get quite verbose; you can also say -l log.name=debug to set a specific
+ log to debug.
+
+ See --help for more.
+
+
+Get *real* messages:
+
+ % run-raindrop.py sync-messages process
(reload http://127.0.0.1:5984/_utils/index.html to see stuff in the messages view)
Go to http://127.0.0.1:5984/junius/files/index.xhtml and do autocomplete to do
searches.
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_extension_sequence/map.js
@@ -0,0 +1,5 @@
+function(doc)
+{
+ var root_id = doc._id.split("!", 1)[0];
+ emit([root_id, doc.raindrop_seq], doc.type);
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_roots/map.js
@@ -0,0 +1,5 @@
+function(doc)
+{
+ var root_id = doc._id.split("!", 1)[0];
+ emit(root_id, null);
+}
new file mode 100644
--- /dev/null
+++ b/schema/messages/by/views/by_doc_roots/reduce.js
@@ -0,0 +1,4 @@
+function(keys, vals)
+{
+ return vals.length
+}
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core extensions
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/__init__.py
@@ -0,0 +1,1 @@
+# This is a package for the core message processor extensions.
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/email.py
@@ -0,0 +1,15 @@
+# This is an extension which converts a message/raw/email to a 'message'
+import logging
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class EmailConverter(base.ConverterBase):
+ def convert(self, doc):
+ # for now, the email repr has all we need.
+ ret = doc.copy()
+ for n in ret.keys():
+ if n.startswith('_') or n.startswith('raindrop'):
+ del ret[n]
+ return ret
new file mode 100644
--- /dev/null
+++ b/server/python/junius/ext/message/rfc822.py
@@ -0,0 +1,22 @@
+# This is an extension which converts a message/raw/rfc822 to a
+# message/raw/rfc822
+from __future__ import absolute_import # stop 'email' import finding our ext
+
+import logging
+from email.parser import HeaderParser
+from twisted.internet import defer
+
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+
+class RFC822Converter(base.ConverterBase):
+ def __init__(self, *args, **kw):
+ super(RFC822Converter, self).__init__(*args, **kw)
+ self.hdr_parser = HeaderParser()
+ def convert(self, doc):
+ msg = self.hdr_parser.parsestr(doc['headers'])
+ return {'from': msg['from'],
+ 'subject': msg['subject'],
+ 'body': doc['body']}
new file mode 100644
--- /dev/null
+++ b/server/python/junius/pipeline.py
@@ -0,0 +1,96 @@
+# This is the raindrop pipeline; it moves messages from their most raw
+# form to their most useful form.
+from twisted.internet import defer
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Simple forward chaining.
+chain = [
+ # trom_type, to_type, transformer)
+ ('proto/test', 'raw/message/rfc822',
+ 'junius.proto.test.TestConverter'),
+ #('proto/skype', 'raw/message/email', 'core.proto.skype'),
+ #('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
+ ('raw/message/rfc822', 'raw/message/email',
+ 'junius.ext.message.rfc822.RFC822Converter'),
+ ('raw/message/email', 'message',
+ 'junius.ext.message.email.EmailConverter'),
+]
+
+
+class Pipeline(object):
+ def __init__(self, doc_model):
+ self.doc_model = doc_model
+ self.forward_chain = {}
+ for from_type, to_type, xformname in chain:
+ root, tail = xformname.rsplit('.', 1)
+ try:
+ mod = __import__(root, fromlist=[tail])
+ klass = getattr(mod, tail)
+ inst = klass(doc_model)
+ except:
+ logger.exception("Failed to load extension %r", xformname)
+ else:
+ self.forward_chain[from_type] = (to_type, inst)
+
+ def start(self):
+ return defer.maybeDeferred(self.process_all_documents
+ ).addCallback(self._cb_maybe_reprocess_all_documents)
+
+ def process_all_documents(self):
+ # check *every* doc in the DB. This exists until we can derive
+ # a workqueue based on views.\
+ self.num_this_process = 0
+ return self.doc_model.db.openView('raindrop!messages!by',
+ 'by_doc_roots',
+ group=True, limit=100,
+ ).addCallback(self._cb_roots_opened)
+
+ def _cb_maybe_reprocess_all_documents(self, result):
+ if self.num_this_process != 0:
+ logger.debug('pipeline processed %d documents last time; trying again',
+ self.num_this_process)
+ return self.start()
+ logger.info('pipeline is finished.')
+
+ def _cb_roots_opened(self, rows):
+ dl = []
+ for row in rows:
+ did = row['key']
+ logger.debug("Finding last extention point for %s", did)
+ dl.append(
+ self.doc_model.get_last_ext_for_document(did).addCallback(
+ self._cb_got_doc_last_ext, did)
+ )
+ return defer.DeferredList(dl)
+
+ def _cb_got_doc_last_ext(self, ext_info, rootdocid):
+ last_ext, docid = ext_info
+ if last_ext is None:
+ logger.debug("Document '%s' doesn't appear to be a message; skipping",
+ rootdocid)
+ return
+ try:
+ xform_info = self.forward_chain[last_ext]
+ except KeyError:
+ logger.warning("Can't find transformer for message type %r - skipping %r",
+ last_ext, docid)
+ return
+ logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
+ return self.doc_model.open_document(docid
+ ).addCallback(self._cb_got_last_doc, rootdocid, xform_info
+ )
+
+ def _cb_got_last_doc(self, doc, rootdocid, xform_info):
+ assert doc is not None, "failed to locate the doc for %r" % rootdocid
+ dest_type, xformer = xform_info
+ logger.debug("calling %r to create a %s from %s", xformer, dest_type,
+ doc)
+ return defer.maybeDeferred(xformer.convert, doc
+ ).addCallback(self._cb_converted, dest_type, rootdocid)
+
+ def _cb_converted(self, new_doc, dest_type, rootdocid):
+ logger.debug("converter created new document %s", new_doc)
+ self.num_this_process += 1
+ return self.doc_model.create_ext_document(new_doc, dest_type, rootdocid)
--- a/server/python/junius/proc/base.py
+++ b/server/python/junius/proc/base.py
@@ -89,8 +89,12 @@ class AccountBase(Rat):
def sync(self):
pass
def verify(self):
'''
'''
pass
+
+class ConverterBase(object):
+ def __init__(self, doc_model):
+ self.doc_model = doc_model
--- a/server/python/junius/proto/__init__.py
+++ b/server/python/junius/proto/__init__.py
@@ -1,22 +1,25 @@
# this needs to become a 'plugin' mechanism...
_protocol_infos = [
('imap', 'junius.proto.imap', 'IMAPAccount'),
('skype', 'junius.proto.skype', 'SkypeAccount'),
('twitter', 'junius.proto.twitter', 'TwitterAccount'),
]
+if __debug__:
+ _protocol_infos.append(('test', 'junius.proto.test', 'TestAccount'))
protocols = {}
def init_protocols():
import sys, logging
logger = logging.getLogger('raindrop.proto')
for name, mod, factname in _protocol_infos:
try:
+ logger.debug("attempting import of '%s' for '%s'", mod, factname)
__import__(mod)
mod = sys.modules[mod]
fact = getattr(mod, factname)
except ImportError, why:
logger.error("Failed to import '%s' factory: %s", name, why)
except:
logger.exception("Error creating '%s' factory", name)
else:
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,9 +1,9 @@
-from twisted.internet import protocol, ssl, defer, reactor, error
+from twisted.internet import protocol, ssl, defer, error
from twisted.mail import imap4
import logging
from ..proc import base
from ..model import get_db
brat = base.Rat
@@ -18,19 +18,17 @@ class ImapClient(imap4.IMAP4Client):
'''
Much of our logic here should perhaps be in another class that holds a
reference to the IMAP4Client instance subclass. Consider refactoring if we
don't turn out to benefit from the subclassing relationship.
'''
def finished(self, result):
# See bottom of file - it would be good to remove this...
logger.info("Finished synchronizing IMAP folders")
- # XXX - this should be done via callback/errback
- from ..sync import get_conductor
- return get_conductor().accountFinishedSync(self.account, result)
+ self.conductor.on_synch_finished(self.account, result)
def serverGreeting(self, caps):
logger.debug("IMAP server greeting: capabilities are %s", caps)
return self._doAuthenticate(
).addCallback(self._reqList
).addBoth(self.finished)
def _doAuthenticate(self):
@@ -182,32 +180,35 @@ class ImapClient(imap4.IMAP4Client):
def accountStatus(self, result, *args):
return self.account.reportStatus(*args)
class ImapClientFactory(protocol.ClientFactory):
protocol = ImapClient
- def __init__(self, account):
+ def __init__(self, account, conductor):
self.account = account
+ self.conductor = conductor
self.ctx = ssl.ClientContextFactory()
self.backoff = 8 # magic number
def buildProtocol(self, addr):
p = self.protocol(self.ctx)
p.factory = self
p.account = self.account
+ p.conductor = self.conductor
return p
def connect(self):
details = self.account.details
logger.debug('attempting to connect to %s:%d (ssl: %s)',
details['host'], details['port'], details['ssl'])
+ reactor = self.conductor.reactor
if details.get('ssl'):
reactor.connectSSL(details['host'], details['port'], self, self.ctx)
else:
reactor.connectTCP(details['host'], details['port'], self)
def clientConnectionLost(self, connector, reason):
# the flaw in this is that we start from scratch every time; which is why
# most of the logic in the client class should really be pulled out into
@@ -216,33 +217,33 @@ class ImapClientFactory(protocol.ClientF
# this - meaning we retry just to hit the same exception.
if reason.check(error.ConnectionDone):
# only an error if premature
if not self.deferred.called:
self.deferred.errback(reason)
else:
#self.deferred.errback(reason)
logger.debug('lost connection to server, going to reconnect in a bit')
- reactor.callLater(2, self.connect)
+ self.conductor.reactor.callLater(2, self.connect)
def clientConnectionFailed(self, connector, reason):
self.account.reportStatus(brat.SERVER, brat.BAD, brat.UNREACHABLE,
brat.TEMPORARY)
logger.warning('Failed to connect, will retry after %d secs',
self.backoff)
# It occurs that some "account manager" should be reported of the error,
# and *it* asks us to retry later? eg, how do I ask 'ignore backoff -
# try again *now*"?
- reactor.callLater(self.backoff, self.connect)
+ self.conductor.reactor.callLater(self.backoff, self.connect)
self.backoff = min(self.backoff * 2, 600) # magic number
class IMAPAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
def startSync(self, conductor):
- self.factory = ImapClientFactory(self)
+ self.factory = ImapClientFactory(self, conductor)
self.factory.connect()
# XXX - wouldn't it be good to return a deferred here, so the conductor
# can reliably wait for the deferred to complete, rather than forcing
# each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -6,17 +6,17 @@ Fetch skype contacts and chats.
import base64, datetime
import pprint
import time
import re
import logging
from urllib2 import urlopen
import twisted.python.log
-from twisted.internet import reactor, defer, threads
+from twisted.internet import defer, threads
from ..proc import base
from ..model import get_db
import Skype4Py
logger = logging.getLogger(__name__)
@@ -67,19 +67,19 @@ def simple_convert(str_val, typ):
return str_val.split()
if typ is bool:
return str_val == "TRUE"
# all the rest are callables which 'do the right thing'
return typ(str_val)
class TwistySkype(object):
- def __init__(self, account, reactor):
+ def __init__(self, account, conductor):
self.account = account
- self.reactor = reactor
+ self.conductor = conductor
self.skype = Skype4Py.Skype()
def attach(self):
logger.info("attaching to skype...")
d = threads.deferToThread(self.skype.Attach)
d.addCallback(self.attached)
return d
@@ -115,18 +115,17 @@ class TwistySkype(object):
d.addCallback(self.start_processing_messages, chat)
# and when this deferred chain completes, this account is complete.
d.addCallback(self.finished)
return d.callback(None)
def finished(self, result):
- from ..sync import get_conductor
- return get_conductor().accountFinishedSync(self.account, result)
+ return self.conductor.on_synch_finished(self.account, result)
def got_chat(self, result, chat):
logger.debug("starting processing of chat '%s'", chat.Name)
# make a 'deferred list' to fetch each property one at a time.
ds = [threads.deferToThread(chat._Property, p)
for p, _ in CHAT_PROPS]
return defer.DeferredList(ds
@@ -215,9 +214,9 @@ class TwistySkype(object):
class SkypeAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
def startSync(self, conductor):
- TwistySkype(self, reactor).attach()
+ TwistySkype(self, conductor).attach()
new file mode 100644
--- /dev/null
+++ b/server/python/junius/proto/test/__init__.py
@@ -0,0 +1,79 @@
+# This is an implementation of a 'test' protocol.
+import logging
+from twisted.internet import defer, error
+
+
+
+logger = logging.getLogger(__name__)
+
+from ...proc import base
+from ...model import get_doc_model
+
+class TestMessageProvider(object):
+ def __init__(self, account, conductor):
+ self.account = account
+ self.conductor = conductor
+ self.doc_model = get_doc_model()
+
+ def attach(self):
+ logger.info("preparing to synch test messages...")
+ # Is a 'DeferredList' more appropriate here?
+ d = defer.Deferred()
+ num_docs = int(self.account.details.get('num_test_docs', 5))
+ logger.info("Creating %d test documents", num_docs)
+ for i in xrange(num_docs):
+ d.addCallback(self.check_test_message, i)
+ d.addCallback(self.finished)
+ return d.callback(None)
+
+ def check_test_message(self, result, i):
+ logger.debug("seeing if message with ID %d exists", i)
+ return self.doc_model.open_document("test.message.%d" % i,
+ ).addCallback(self.process_test_message, i)
+
+ def process_test_message(self, existing_doc, doc_num):
+ if existing_doc is None:
+ logger.info("Creating new test message with ID %d", doc_num)
+ doc = dict(
+ _id="test.message.%d" % doc_num,
+ storage_key=doc_num,
+ )
+ return self.doc_model.create_raw_document(doc, 'proto/test',
+ self.account
+ ).addCallback(self.saved_message, doc)
+ else:
+ logger.info("Skipping test message with ID %d - already exists",
+ doc_num)
+ # we are done.
+
+ def finished(self, result):
+ self.conductor.on_synch_finished(self.account, result)
+
+ def saved_message(self, result, doc):
+ logger.debug("Finished saving test message (%s)", doc['_id'])
+ # done
+
+# A 'converter' - takes a proto/test as input and creates a
+# 'raw/message/rfc822' as output.
+class TestConverter(base.ConverterBase):
+ def convert(self, doc):
+ me = doc['storage_key']
+ headers = """\
+From: from%(storage_key)d@test.com
+To: from%(storage_key)d@test.com
+Subject: This is test document %(storage_key)d
+""" % doc
+ headers = headers.replace('\n', '\r\n')
+ body = "Hello, this is test message %(storage_key)d" % doc
+
+ new_doc = dict(headers=headers, body=body)
+ return new_doc
+
+
+class TestAccount(base.AccountBase):
+ def __init__(self, db, details):
+ self.db = db
+ self.details = details
+
+ def startSync(self, conductor):
+ TestMessageProvider(self, conductor).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,12 +1,13 @@
import logging
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
+import twisted.web.error
import paisley
import junius.proto as proto
from junius.model import get_db
logger = logging.getLogger(__name__)
_conductor = None
@@ -15,70 +16,84 @@ def get_conductor(options=None):
global _conductor
if _conductor is None:
proto.init_protocols()
_conductor = SyncConductor(options)
else:
assert options is None, 'can only set this at startup'
return _conductor
-
+
+# XXX - rename this to plain 'Conductor' and move to a different file.
+# This 'conducts' synchronization, the work queues and the interactions with
+# the extensions and database.
class SyncConductor(object):
def __init__(self, options):
self.log = logger
self.options = options
+ # apparently it is now considered 'good form' to pass reactors around, so
+ # a future of multiple reactors is possible.
+ # We capture it here, and all the things we 'conduct' use this reactor
+ # (but later it should be passed to our ctor too)
+ self.reactor = reactor
self.db = get_db()
self.active_accounts = []
def _ohNoes(self, failure, *args, **kwargs):
self.log.error('OH NOES! failure! %s', failure)
- reactor.stop()
+ #self.reactor.stop()
def _getAllAccounts(self):
return self.db.openView('raindrop!accounts!all', 'all'
).addCallback(self._gotAllAccounts
).addErrback(self._ohNoes)
def _gotAllAccounts(self, rows, *args, **kwargs):
self.log.info("Have %d accounts to synch", len(rows))
for row in rows:
account_details = row['value']
kind = account_details['kind']
+ self.log.debug("Found account using protocol %s", kind)
if not self.options.protocols or kind in self.options.protocols:
if kind in proto.protocols:
account = proto.protocols[kind](self.db, account_details)
self.log.info('Starting sync of %s account: %s',
kind, account_details.get('name', '(un-named)'))
account.startSync(self)
self.active_accounts.append(account)
else:
self.log.error("Don't know what to do with account kind: %s", kind)
else:
self.log.info("Skipping account - protocol '%s' is disabled", kind)
- def accountFinishedSync(self, account, result):
+ def sync(self, whateva=None):
+ return self._getAllAccounts()
+
+ # The callbacks called by the accounts as they do interesting things.
+ # XXX - this kinda sucks - ideally when we start the sync we would get
+ # a deferred, which we could add own callback to to manage this.
+ # The complication is IMAP - ProtocolFactory based clients don't lend
+ # themselves to this.
+ def on_synch_finished(self, account, result):
if isinstance(result, Failure):
self.log.error("Account %s failed with an error: %s", account, result)
else:
self.log.debug("Account %s reports it has finished", account)
assert account in self.active_accounts, (account, self.active_accounts)
self.active_accounts.remove(account)
if not self.active_accounts:
- self.log.info("sync has finished; stopping reactor")
- return reactor.stop()
-
- def sync(self, whateva=None):
- return self._getAllAccounts()
-
+ self.log.info("sync has finished; NOT stopping reactor yet, but I should...")
+ #self.log.info("sync has finished; stopping reactor")
+ #return self.reactor.stop()
if __name__ == '__main__':
# normal entry-point is the app itself; this is purely for debugging...
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
conductor = get_conductor()
- reactor.callWhenRunning(conductor.sync)
+ conductor.reactor.callWhenRunning(conductor.sync)
logger.debug('starting reactor')
- reactor.run()
+ conductor.reactor.run()
logger.debug('reactor done')
--- a/server/python/run-raindrop.py
+++ b/server/python/run-raindrop.py
@@ -3,16 +3,17 @@
import sys
import optparse
import logging
from twisted.internet import reactor, defer
from junius import model
from junius import bootstrap
+from junius import pipeline
from junius.sync import get_conductor
logger = logging.getLogger('raindrop')
class HelpFormatter(optparse.IndentedHelpFormatter):
def format_description(self, description):
return description
@@ -43,16 +44,23 @@ def install_files(result, parser, option
return model.fab_db(update_views=True
).addCallback(bootstrap.install_client_files)
def sync_messages(result, parser, options):
"""Synchronize all messages from all accounts"""
conductor = get_conductor()
return conductor.sync(None)
+def process(result, parser, options):
+ """Process all messages to see if any extensions need running"""
+ def done(result):
+ print "Message pipeline has finished..."
+ p = pipeline.Pipeline(model.get_doc_model())
+ return p.start().addCallback(done)
+
def _setup_logging(options):
init_errors = []
logging.basicConfig()
for val in options.log_level: # a list of all --log-level options...
try:
name, level = val.split("=", 1)
except ValueError:
name = None