--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -5,39 +5,49 @@ import logging
logger = logging.getLogger(__name__)
# Simple forward chaining.
chain = [
# trom_type, to_type, transformer)
('proto/test', 'raw/message/rfc822',
'junius.proto.test.TestConverter'),
- #('proto/skype', 'raw/message/email', 'core.proto.skype'),
+ # skype goes directly to 'message' for now...
+ ('proto/skype-msg', 'message',
+ 'junius.proto.skype.SkypeConverter'),
+ # skype-chat is 'terminal'
+ ('proto/skype-chat', None, None),
#('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
('raw/message/rfc822', 'raw/message/email',
'junius.ext.message.rfc822.RFC822Converter'),
('raw/message/email', 'message',
'junius.ext.message.email.EmailConverter'),
+ # message is 'terminal'
+ ('message', None, None),
]
class Pipeline(object):
def __init__(self, doc_model):
self.doc_model = doc_model
self.forward_chain = {}
for from_type, to_type, xformname in chain:
- root, tail = xformname.rsplit('.', 1)
- try:
- mod = __import__(root, fromlist=[tail])
- klass = getattr(mod, tail)
- inst = klass(doc_model)
- except:
- logger.exception("Failed to load extension %r", xformname)
+ if xformname:
+ root, tail = xformname.rsplit('.', 1)
+ try:
+ mod = __import__(root, fromlist=[tail])
+ klass = getattr(mod, tail)
+ inst = klass(doc_model)
+ except:
+ logger.exception("Failed to load extension %r", xformname)
+ else:
+ self.forward_chain[from_type] = (to_type, inst)
else:
- self.forward_chain[from_type] = (to_type, inst)
+ assert not to_type, 'no xformname must mean to no_type'
+ self.forward_chain[from_type] = None
def start(self):
return defer.maybeDeferred(self.process_all_documents
).addCallback(self._cb_maybe_reprocess_all_documents)
def process_all_documents(self):
# check *every* doc in the DB. This exists until we can derive
# a workqueue based on views.
@@ -63,26 +73,30 @@ class Pipeline(object):
self.doc_model.get_last_ext_for_document(did).addCallback(
self._cb_got_doc_last_ext, did)
)
return defer.DeferredList(dl)
def _cb_got_doc_last_ext(self, ext_info, rootdocid):
last_ext, docid = ext_info
if last_ext is None:
- logger.debug("Document '%s' doesn't appear to be a message; skipping",
+ logger.debug("Document '%s' doesn't appear to be a message; skipping",
rootdocid)
return None
+ logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
try:
xform_info = self.forward_chain[last_ext]
except KeyError:
logger.warning("Can't find transformer for message type %r - skipping %r",
last_ext, docid)
return None
- logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
+ if xform_info is None:
+ logger.debug("Document %r is already at its terminal type of %r",
+ rootdocid, last_ext)
+ return None
return self.doc_model.open_document(docid
).addCallback(self._cb_got_last_doc, rootdocid, xform_info
)
def _cb_got_last_doc(self, doc, rootdocid, xform_info):
assert doc is not None, "failed to locate the doc for %r" % rootdocid
dest_type, xformer = xform_info
logger.debug("calling %r to create a %s from %s", xformer, dest_type,
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -9,22 +9,23 @@ import time
import re
import logging
from urllib2 import urlopen
import twisted.python.log
from twisted.internet import defer, threads
from ..proc import base
-from ..model import get_db
import Skype4Py
logger = logging.getLogger(__name__)
+max_run = 4 # for the deferred semaphore...
+
# These are the raw properties we fetch from skype.
CHAT_PROPS = [
('ACTIVEMEMBERS', list),
('ACTIVITY_TIMESTAMP', float),
('ADDER', unicode),
('APPLICANTS', list),
#'BLOB'??
('BOOKMARKED', bool),
@@ -67,156 +68,149 @@ def simple_convert(str_val, typ):
return str_val.split()
if typ is bool:
return str_val == "TRUE"
# all the rest are callables which 'do the right thing'
return typ(str_val)
class TwistySkype(object):
- def __init__(self, account, conductor):
+ def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
+ self.doc_model = doc_model
self.skype = Skype4Py.Skype()
+ def get_docid_for_chat(self, chat):
+ return "skypechat-" + chat.Name.encode('utf8') # hrmph!
+
+ def get_docid_for_msg(self, msg):
+ return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
+
def attach(self):
logger.info("attaching to skype...")
d = threads.deferToThread(self.skype.Attach)
d.addCallback(self.attached)
return d
def attached(self, status):
logger.info("attached to skype - getting chats")
return threads.deferToThread(self.skype._GetChats
- ).addCallback(self.got_chats
+ ).addCallback(self._cb_got_chats
)
- def got_chats(self, chats):
+ def _cb_got_chats(self, chats):
logger.debug("skype has %d chat(s) total", len(chats))
- # work out which ones are 'new'
- startkey=['skype/chat', self.account.details['_id']]
- # oh for https://issues.apache.org/jira/browse/COUCHDB-194 to be fixed...
- endkey=['skype/chat', self.account.details['_id'] + 'ZZZZZZZZZZZZZZZZZZ']
- # XXX - this isn't quite right - the properties of each chat may have
- # changed, so we need to re-process the ones we've seen. Later...
- get_db().openView('raindrop!messages!by', 'by_storage',
- startkey=startkey, endkey=endkey,
- ).addCallback(self.got_seen_chats, chats)
+ dl = []
+ # *sob* - I'd so like to work out the correct pattern for this.
+ # Using a DeferredList with > xxx elts appears to cause:
+ # | File "...\twisted\internet\selectreactor.py", line 40, in win32select
+ # | r, w, e = select.select(r, w, w, timeout)
+ # | exceptions.ValueError: too many file descriptors in select()
+ # Tada - DeferredSemaphore's to the rescue! See
+ # http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html
+ dl = []
+ sem = defer.DeferredSemaphore(max_run)
+ for chat in chats[:5]: ###################################################################
+ dl.append(sem.run(self._cb_maybe_process_chat, chat))
+ # and also process the messages in each chat.
+ dl.append(sem.run(self._cb_process_chat_messages, chat))
- def got_seen_chats(self, rows, all_chats):
- seen_chats = set([r['key'][2] for r in rows])
- need = [chat for chat in all_chats if chat.Name not in seen_chats]
- logger.info("Skype has %d chats(s), %d of which we haven't seen",
- len(all_chats), len(need))
- # Is a 'DeferredList' more appropriate here?
- d = defer.Deferred()
- for chat in need:
- d.addCallback(self.got_chat, chat)
- # But *every* chat must have its messages processed, new and old.
- for chat in all_chats:
- d.addCallback(self.start_processing_messages, chat)
+ return defer.DeferredList(dl).addCallback(self.finished)
- # and when this deferred chain completes, this account is complete.
- d.addCallback(self.finished)
-
- return d.callback(None)
-
- def finished(self, result):
- return self.conductor.on_synch_finished(self.account, result)
+ def _cb_maybe_process_chat(self, chat):
+ logger.debug("seeing if skype chat %r exists", chat.Name)
+ return self.doc_model.open_document(self.get_docid_for_chat(chat),
+ ).addCallback(self._cb_process_chat, chat)
- def got_chat(self, result, chat):
- logger.debug("starting processing of chat '%s'", chat.Name)
- # make a 'deferred list' to fetch each property one at a time.
- ds = [threads.deferToThread(chat._Property, p)
- for p, _ in CHAT_PROPS]
-
- return defer.DeferredList(ds
- ).addCallback(self.got_chat_props, chat)
+ def _cb_process_chat(self, existing_doc, chat):
+ if existing_doc is None:
+ logger.info("Creating new skype chat %r", chat.Name)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(chat._Property, p)
+ for p, _ in CHAT_PROPS]
+ return defer.DeferredList(ds
+ ).addCallback(self._cb_got_chat_props, chat)
+ else:
+ logger.debug("Skipping skype chat %r - already exists", chat.Name)
+ # we are done.
- def got_chat_props(self, results, chat):
- # create the couch document for the chat itself.
- doc = dict(
- type='rawMessage',
- subtype='skype/chat',
- account_id=self.account.details['_id'],
- storage_key=chat.Name
- )
+ def _cb_got_chat_props(self, results, chat):
+ logger.debug("got chat %r properties: %s", chat.Name, results)
+ docid = self.get_docid_for_chat(chat)
+ doc ={}
for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
if ok:
doc['skype_'+name.lower()] = simple_convert(val, typ)
# 'Name' is a special case that doesn't come via a prop. We use
# 'chatname' as that is the equiv attr on the messages themselves.
doc['skype_chatname'] = chat.Name
- return get_db().saveDoc(doc
- ).addCallback(self.saved_chat, chat
- )
+ return self.doc_model.create_raw_document(docid, doc,
+ 'proto/skype-chat',
+ self.account
+ )
- def saved_chat(self, result, chat):
- logger.debug("Finished processing of new chat '%s'", chat.Name)
- # nothing else to do for the chat (the individual messages are
- # processed by a different path; we are here only for new ones...
+ def finished(self, result):
+ return self.conductor.on_synch_finished(self.account, result)
- def start_processing_messages(self, result, chat):
+ # Processing the messages for each chat...
+ def _cb_process_chat_messages(self, chat):
# Get each of the messages in the chat
return threads.deferToThread(chat._GetMessages
- ).addCallback(self.got_messages, chat,
+ ).addCallback(self._cb_got_messages, chat,
)
- def got_messages(self, messages, chat):
+ def _cb_got_messages(self, messages, chat):
logger.debug("chat '%s' has %d message(s) total; looking for new ones",
chat.Name, len(messages))
- startkey=['skype/message', self.account.details['_id'], [chat.Name, 0]]
- endkey=['skype/message', self.account.details['_id'], [chat.Name, 4000000000]]
- return get_db().openView('raindrop!messages!by', 'by_storage',
- startkey=startkey, endkey=endkey,
- ).addCallback(self.got_seen_messages, messages, chat)
-
- def got_seen_messages(self, rows, messages, chat):
- seen_ids = set([r['key'][2][1] for r in rows])
- need = [msg for msg in messages if msg._Id not in seen_ids]
- logger.info("Chat '%s' has %d messages, %d of which we haven't seen",
- chat.Name, len(messages), len(need))
-
+ # See *sob* above...
dl = []
- for i, msg in enumerate(need):
- d = defer.Deferred()
- d.addCallback(self.got_new_msg, chat, msg, i)
- dl.append(d)
- d.callback(None)
+ sem = defer.DeferredSemaphore(max_run)
+ for msg in messages:
+ dl.append(sem.run(self._cb_check_message, chat, msg))
return defer.DeferredList(dl)
- def got_new_msg(self, result, chat, msg, i):
- logger.debug("Processing message %d from chat '%s'", i, chat.Name)
- # make a 'deferred list' to fetch each property one at a time.
- ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
- return defer.DeferredList(ds
- ).addCallback(self.got_msg_props, chat, msg)
+ def _cb_check_message(self, chat, msg):
+ logger.debug("seeing if message %r exists (in chat %r)",
+ msg._Id, chat.Name)
+ return self.doc_model.open_document(self.get_docid_for_msg(msg)
+ ).addCallback(self._cb_maybe_process_message, chat, msg)
- def got_msg_props(self, results, chat, msg):
- # create the couch document for the chat message itself.
- doc = dict(
- type='rawMessage',
- subtype='skype/message',
- account_id=self.account.details['_id'],
- storage_key=[chat.Name, msg._Id]
- )
+ def _cb_maybe_process_message(self, existing_doc, chat, msg):
+ if existing_doc is None:
+ logger.debug("New skype message %d", msg._Id)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+ return defer.DeferredList(ds
+ ).addCallback(self._cb_got_msg_props, chat, msg)
+ else:
+ logger.debug("already have raw doc for msg %r; skipping", msg._Id)
+
+ def _cb_got_msg_props(self, results, chat, msg):
+ doc = {}
for (name, typ), (ok, val) in zip(MSG_PROPS, results):
if ok:
doc['skype_'+name.lower()] = simple_convert(val, typ)
- # The 'Id' attribute doesn't come via a property.
- doc['skype_msgid'] = msg._Id
+ # we include the skype username with the ID as they are unique per user.
+ docid = self.get_docid_for_msg(msg)
+ return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
+ self.account
+ )
+
- return get_db().saveDoc(doc
- ).addCallback(self.saved_message, chat
- )
-
- def saved_message(self, result, chat):
- logger.debug('message processing complete')
+# A 'converter' - takes a proto/skype-msg as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class SkypeConverter(base.ConverterBase):
+ def convert(self, doc):
+ return {'from': ['skype', doc['skype_from_handle']],
+ 'subject': 'I gotta get this from the chat itself :(',
+ 'body': doc['skype_body']}
class SkypeAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
- def startSync(self, conductor):
- TwistySkype(self, conductor).attach()
+ def startSync(self, conductor, doc_model):
+ TwistySkype(self, conductor, doc_model).attach()
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,24 +1,21 @@
# This is an implementation of a 'test' protocol.
import logging
from twisted.internet import defer, error
-
-
logger = logging.getLogger(__name__)
from ...proc import base
-from ...model import get_doc_model
class TestMessageProvider(object):
- def __init__(self, account, conductor):
+ def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
- self.doc_model = get_doc_model()
+ self.doc_model = doc_model
def attach(self):
logger.info("preparing to synch test messages...")
# Is a 'DeferredList' more appropriate here?
d = defer.Deferred()
num_docs = int(self.account.details.get('num_test_docs', 5))
logger.info("Creating %d test documents", num_docs)
for i in xrange(num_docs):
@@ -30,32 +27,32 @@ class TestMessageProvider(object):
logger.debug("seeing if message with ID %d exists", i)
return self.doc_model.open_document("test.message.%d" % i,
).addCallback(self.process_test_message, i)
def process_test_message(self, existing_doc, doc_num):
if existing_doc is None:
logger.info("Creating new test message with ID %d", doc_num)
doc = dict(
- _id="test.message.%d" % doc_num,
storage_key=doc_num,
)
- return self.doc_model.create_raw_document(doc, 'proto/test',
+ did = "test.message.%d" % doc_num
+ return self.doc_model.create_raw_document(did, doc, 'proto/test',
self.account
).addCallback(self.saved_message, doc)
else:
logger.info("Skipping test message with ID %d - already exists",
doc_num)
# we are done.
def finished(self, result):
self.conductor.on_synch_finished(self.account, result)
def saved_message(self, result, doc):
- logger.debug("Finished saving test message (%s)", doc['_id'])
+ logger.debug("Finished saving test message %r", result)
# done
# A 'converter' - takes a proto/test as input and creates a
# 'raw/message/rfc822' as output.
class TestConverter(base.ConverterBase):
def convert(self, doc):
me = doc['storage_key']
headers = """\
@@ -70,10 +67,10 @@ Subject: This is test document %(storage
return new_doc
class TestAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
- def startSync(self, conductor):
- TestMessageProvider(self, conductor).attach()
+ def startSync(self, conductor, doc_model):
+ TestMessageProvider(self, conductor, doc_model).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,21 +1,22 @@
import logging
from twisted.internet import reactor, defer
from twisted.python.failure import Failure
import twisted.web.error
import paisley
-import junius.proto as proto
-from junius.model import get_db
+from . import proto as proto
+from .model import get_db
logger = logging.getLogger(__name__)
_conductor = None
+from .model import get_doc_model
def get_conductor(options=None):
global _conductor
if _conductor is None:
proto.init_protocols()
_conductor = SyncConductor(options)
else:
assert options is None, 'can only set this at startup'
@@ -44,27 +45,28 @@ class SyncConductor(object):
#self.reactor.stop()
def _getAllAccounts(self):
return self.db.openView('raindrop!accounts!all', 'all'
).addCallback(self._gotAllAccounts
).addErrback(self._ohNoes)
def _gotAllAccounts(self, rows, *args, **kwargs):
+ doc_model = get_doc_model()
self.log.info("Have %d accounts to synch", len(rows))
for row in rows:
account_details = row['value']
kind = account_details['kind']
self.log.debug("Found account using protocol %s", kind)
if not self.options.protocols or kind in self.options.protocols:
if kind in proto.protocols:
account = proto.protocols[kind](self.db, account_details)
self.log.info('Starting sync of %s account: %s',
kind, account_details.get('name', '(un-named)'))
- account.startSync(self)
+ account.startSync(self, doc_model)
self.active_accounts.append(account)
else:
self.log.error("Don't know what to do with account kind: %s", kind)
else:
self.log.info("Skipping account - protocol '%s' is disabled", kind)
def sync(self, whateva=None):
return self._getAllAccounts()