get skype messages passing through the pipeline to a real message twisty
authorMark Hammond <mhammond@skippinet.com.au>
Fri, 20 Mar 2009 12:40:53 +1100
branchtwisty
changeset 107 807bf2299712d979768f1c57e0d56e3c7c6ad50b
parent 106 e122e868b8750e05f0f026063ce71e86f84c1368
child 108 55279ec07fb24e5ba4b5af648622dd11c9d4ce34
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
get skype messages passing through the pipeline to a real message
server/python/junius/pipeline.py
server/python/junius/proto/skype.py
server/python/junius/proto/test/__init__.py
server/python/junius/sync.py
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -5,39 +5,49 @@ import logging
 
 logger = logging.getLogger(__name__)
 
 # Simple forward chaining.
 chain = [
     # trom_type, to_type, transformer)
     ('proto/test',         'raw/message/rfc822',
                            'junius.proto.test.TestConverter'),
-    #('proto/skype', 'raw/message/email', 'core.proto.skype'),
+    # skype goes directly to 'message' for now...
+    ('proto/skype-msg',    'message',
+                           'junius.proto.skype.SkypeConverter'),
+    # skype-chat is 'terminal'
+    ('proto/skype-chat', None, None),
     #('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
     ('raw/message/rfc822', 'raw/message/email',
                            'junius.ext.message.rfc822.RFC822Converter'),
     ('raw/message/email',  'message',
                            'junius.ext.message.email.EmailConverter'),
+    # message is 'terminal'
+    ('message', None, None),
 ]
 
 
 class Pipeline(object):
     def __init__(self, doc_model):
         self.doc_model = doc_model
         self.forward_chain = {}
         for from_type, to_type, xformname in chain:
-            root, tail = xformname.rsplit('.', 1)
-            try:
-                mod = __import__(root, fromlist=[tail])
-                klass = getattr(mod, tail)
-                inst = klass(doc_model)
-            except:
-                logger.exception("Failed to load extension %r", xformname)
+            if xformname:
+                root, tail = xformname.rsplit('.', 1)
+                try:
+                    mod = __import__(root, fromlist=[tail])
+                    klass = getattr(mod, tail)
+                    inst = klass(doc_model)
+                except:
+                    logger.exception("Failed to load extension %r", xformname)
+                else:
+                    self.forward_chain[from_type] = (to_type, inst)
             else:
-                self.forward_chain[from_type] = (to_type, inst)
+                assert not to_type, 'no xformname must mean to no_type'
+                self.forward_chain[from_type] = None
 
     def start(self):
         return defer.maybeDeferred(self.process_all_documents
                     ).addCallback(self._cb_maybe_reprocess_all_documents)
 
     def process_all_documents(self):
         # check *every* doc in the DB.  This exists until we can derive
         # a workqueue based on views.
@@ -63,26 +73,30 @@ class Pipeline(object):
                 self.doc_model.get_last_ext_for_document(did).addCallback(
                     self._cb_got_doc_last_ext, did)
                 )
         return defer.DeferredList(dl)
 
     def _cb_got_doc_last_ext(self, ext_info, rootdocid):
         last_ext, docid = ext_info
         if last_ext is None:
-            logger.debug("Document '%s' doesn't appear to be a message; skipping",
+            logger.debug("Document '%s' doesn't appear to be a message; skipping",
                          rootdocid)
             return None
+        logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
         try:
             xform_info = self.forward_chain[last_ext]
         except KeyError:
             logger.warning("Can't find transformer for message type %r - skipping %r",
                            last_ext, docid)
             return None
-        logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
+        if xform_info is None:
+            logger.debug("Document %r is already at its terminal type of %r",
+                         rootdocid, last_ext)
+            return None
         return self.doc_model.open_document(docid
                     ).addCallback(self._cb_got_last_doc, rootdocid, xform_info
                     )
 
     def _cb_got_last_doc(self, doc, rootdocid, xform_info):
         assert doc is not None, "failed to locate the doc for %r" % rootdocid
         dest_type, xformer = xform_info
         logger.debug("calling %r to create a %s from %s", xformer, dest_type,
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -9,22 +9,23 @@ import time
 import re
 import logging
 from urllib2 import urlopen
 
 import twisted.python.log
 from twisted.internet import defer, threads
 
 from ..proc import base
-from ..model import get_db
 
 import Skype4Py
 
 logger = logging.getLogger(__name__)
 
+max_run = 4 # for the deferred semaphore...
+
 # These are the raw properties we fetch from skype.
 CHAT_PROPS = [
     ('ACTIVEMEMBERS',   list),
     ('ACTIVITY_TIMESTAMP', float),
     ('ADDER', unicode),
     ('APPLICANTS', list),
     #'BLOB'??
     ('BOOKMARKED', bool),
@@ -67,156 +68,149 @@ def simple_convert(str_val, typ):
         return str_val.split()
     if typ is bool:
         return str_val == "TRUE"
     # all the rest are callables which 'do the right thing'
     return typ(str_val)
 
 
 class TwistySkype(object):
-    def __init__(self, account, conductor):
+    def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
+        self.doc_model = doc_model
         self.skype = Skype4Py.Skype()
 
+    def get_docid_for_chat(self, chat):
+        return "skypechat-" + chat.Name.encode('utf8') # hrmph!
+
+    def get_docid_for_msg(self, msg):
+        return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
+
     def attach(self):
         logger.info("attaching to skype...")
         d = threads.deferToThread(self.skype.Attach)
         d.addCallback(self.attached)
         return d
 
     def attached(self, status):
         logger.info("attached to skype - getting chats")
         return threads.deferToThread(self.skype._GetChats
-                    ).addCallback(self.got_chats
+                    ).addCallback(self._cb_got_chats
                     )
 
-    def got_chats(self, chats):
+    def _cb_got_chats(self, chats):
         logger.debug("skype has %d chat(s) total", len(chats))
-        # work out which ones are 'new'
-        startkey=['skype/chat', self.account.details['_id']]
-        # oh for https://issues.apache.org/jira/browse/COUCHDB-194 to be fixed...
-        endkey=['skype/chat', self.account.details['_id'] + 'ZZZZZZZZZZZZZZZZZZ']
-        # XXX - this isn't quite right - the properties of each chat may have
-        # changed, so we need to re-process the ones we've seen.  Later...
-        get_db().openView('raindrop!messages!by', 'by_storage',
-                          startkey=startkey, endkey=endkey,
-            ).addCallback(self.got_seen_chats, chats)
+        dl = []
+        # *sob* - I'd so like to work out the correct pattern for this.
+        # Using a DeferredList with > xxx elts appears to cause:
+        # | File "...\twisted\internet\selectreactor.py", line 40, in win32select
+        # |  r, w, e = select.select(r, w, w, timeout)
+        # | exceptions.ValueError: too many file descriptors in select()
+        # Tada - DeferredSemaphore's to the rescue!  See
+        # http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html
+        dl = []
+        sem = defer.DeferredSemaphore(max_run)
+        for chat in chats[:5]: ###################################################################
+            dl.append(sem.run(self._cb_maybe_process_chat, chat))
+            # and also process the messages in each chat.
+            dl.append(sem.run(self._cb_process_chat_messages, chat))
 
-    def got_seen_chats(self, rows, all_chats):
-        seen_chats = set([r['key'][2] for r in rows])
-        need = [chat for chat in all_chats if chat.Name not in seen_chats]
-        logger.info("Skype has %d chats(s), %d of which we haven't seen",
-                     len(all_chats), len(need))
-        # Is a 'DeferredList' more appropriate here?
-        d = defer.Deferred()
-        for chat in need:
-            d.addCallback(self.got_chat, chat)
-        # But *every* chat must have its messages processed, new and old.
-        for chat in all_chats:
-            d.addCallback(self.start_processing_messages, chat)
+        return defer.DeferredList(dl).addCallback(self.finished)
 
-        # and when this deferred chain completes, this account is complete.
-        d.addCallback(self.finished)
-
-        return d.callback(None)
-
-    def finished(self, result):
-      return self.conductor.on_synch_finished(self.account, result)
+    def _cb_maybe_process_chat(self, chat):
+        logger.debug("seeing if skype chat %r exists", chat.Name)
+        return self.doc_model.open_document(self.get_docid_for_chat(chat),
+                        ).addCallback(self._cb_process_chat, chat)
 
-    def got_chat(self, result, chat):
-        logger.debug("starting processing of chat '%s'", chat.Name)
-        # make a 'deferred list' to fetch each property one at a time.
-        ds = [threads.deferToThread(chat._Property, p)
-              for p, _ in CHAT_PROPS]
-
-        return defer.DeferredList(ds
-                    ).addCallback(self.got_chat_props, chat)
+    def _cb_process_chat(self, existing_doc, chat):
+        if existing_doc is None:
+            logger.info("Creating new skype chat %r", chat.Name)
+            # make a 'deferred list' to fetch each property one at a time.
+            ds = [threads.deferToThread(chat._Property, p)
+                  for p, _ in CHAT_PROPS]
+            return defer.DeferredList(ds
+                        ).addCallback(self._cb_got_chat_props, chat)
+        else:
+            logger.debug("Skipping skype chat %r - already exists", chat.Name)
+            # we are done.
 
-    def got_chat_props(self, results, chat):
-        # create the couch document for the chat itself.
-        doc = dict(
-          type='rawMessage',
-          subtype='skype/chat',
-          account_id=self.account.details['_id'],
-          storage_key=chat.Name
-          )
+    def _cb_got_chat_props(self, results, chat):
+        logger.debug("got chat %r properties: %s", chat.Name, results)
+        docid = self.get_docid_for_chat(chat)
+        doc ={}
         for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
 
         # 'Name' is a special case that doesn't come via a prop.  We use
         # 'chatname' as that is the equiv attr on the messages themselves.
         doc['skype_chatname'] = chat.Name
-        return get_db().saveDoc(doc
-                ).addCallback(self.saved_chat, chat
-                )
+        return self.doc_model.create_raw_document(docid, doc,
+                                                  'proto/skype-chat',
+                                                  self.account
+                    )
 
-    def saved_chat(self, result, chat):
-        logger.debug("Finished processing of new chat '%s'", chat.Name)
-        # nothing else to do for the chat (the individual messages are
-        # processed by a different path; we are here only for new ones...
+    def finished(self, result):
+        return self.conductor.on_synch_finished(self.account, result)
 
-    def start_processing_messages(self, result, chat):
+    # Processing the messages for each chat...
+    def _cb_process_chat_messages(self, chat):
         # Get each of the messages in the chat
         return threads.deferToThread(chat._GetMessages
-                    ).addCallback(self.got_messages, chat,
+                    ).addCallback(self._cb_got_messages, chat,
                     )
 
-    def got_messages(self, messages, chat):
+    def _cb_got_messages(self, messages, chat):
         logger.debug("chat '%s' has %d message(s) total; looking for new ones",
                      chat.Name, len(messages))
-        startkey=['skype/message', self.account.details['_id'], [chat.Name, 0]]
-        endkey=['skype/message', self.account.details['_id'], [chat.Name, 4000000000]]
-        return get_db().openView('raindrop!messages!by', 'by_storage',
-                                 startkey=startkey, endkey=endkey,
-                    ).addCallback(self.got_seen_messages, messages, chat)
-
-    def got_seen_messages(self, rows, messages, chat):
-        seen_ids = set([r['key'][2][1] for r in rows])
-        need = [msg for msg in messages if msg._Id not in seen_ids]
-        logger.info("Chat '%s' has %d messages, %d of which we haven't seen",
-                     chat.Name, len(messages), len(need))
-        
+        # See *sob* above...
         dl = []
-        for i, msg in enumerate(need):
-            d = defer.Deferred()
-            d.addCallback(self.got_new_msg, chat, msg, i)
-            dl.append(d)
-            d.callback(None)
+        sem = defer.DeferredSemaphore(max_run)
+        for msg in messages:
+            dl.append(sem.run(self._cb_check_message, chat, msg))
         return defer.DeferredList(dl)
 
-    def got_new_msg(self, result, chat, msg, i):
-        logger.debug("Processing message %d from chat '%s'", i, chat.Name)
-        # make a 'deferred list' to fetch each property one at a time.
-        ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
-        return defer.DeferredList(ds
-                    ).addCallback(self.got_msg_props, chat, msg)
+    def _cb_check_message(self, chat, msg):
+        logger.debug("seeing if message %r exists (in chat %r)",
+                     msg._Id, chat.Name)
+        return self.doc_model.open_document(self.get_docid_for_msg(msg)
+                        ).addCallback(self._cb_maybe_process_message, chat, msg)
 
-    def got_msg_props(self, results, chat, msg):
-        # create the couch document for the chat message itself.
-        doc = dict(
-          type='rawMessage',
-          subtype='skype/message',
-          account_id=self.account.details['_id'],
-          storage_key=[chat.Name, msg._Id]
-          )
+    def _cb_maybe_process_message(self, existing_doc, chat, msg):
+        if existing_doc is None:
+            logger.debug("New skype message %d", msg._Id)
+            # make a 'deferred list' to fetch each property one at a time.
+            ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+            return defer.DeferredList(ds
+                        ).addCallback(self._cb_got_msg_props, chat, msg)
+        else:
+            logger.debug("already have raw doc for msg %r; skipping", msg._Id)
+
+    def _cb_got_msg_props(self, results, chat, msg):
+        doc = {}
         for (name, typ), (ok, val) in zip(MSG_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
-        # The 'Id' attribute doesn't come via a property.
-        doc['skype_msgid'] = msg._Id
+        # we include the skype username with the ID as they are unique per user.
+        docid = self.get_docid_for_msg(msg)
+        return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
+                                                  self.account
+                    )
+
 
-        return get_db().saveDoc(doc
-                ).addCallback(self.saved_message, chat
-                )
-
-    def saved_message(self, result, chat):
-        logger.debug('message processing complete')
+# A 'converter' - takes a proto/skype-msg as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class SkypeConverter(base.ConverterBase):
+    def convert(self, doc):
+        return {'from': ['skype', doc['skype_from_handle']],
+                'subject': 'I gotta get this from the chat itself :(',
+                'body': doc['skype_body']}
 
 
 class SkypeAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
-  def startSync(self, conductor):
-    TwistySkype(self, conductor).attach()
+  def startSync(self, conductor, doc_model):
+    TwistySkype(self, conductor, doc_model).attach()
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,24 +1,21 @@
 # This is an implementation of a 'test' protocol.
 import logging
 from twisted.internet import defer, error
 
-
-
 logger = logging.getLogger(__name__)
 
 from ...proc import base
-from ...model import get_doc_model
 
 class TestMessageProvider(object):
-    def __init__(self, account, conductor):
+    def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
-        self.doc_model = get_doc_model()
+        self.doc_model = doc_model
 
     def attach(self):
         logger.info("preparing to synch test messages...")
         # Is a 'DeferredList' more appropriate here?
         d = defer.Deferred()
         num_docs = int(self.account.details.get('num_test_docs', 5))
         logger.info("Creating %d test documents", num_docs)
         for i in xrange(num_docs):
@@ -30,32 +27,32 @@ class TestMessageProvider(object):
         logger.debug("seeing if message with ID %d exists", i)
         return self.doc_model.open_document("test.message.%d" % i,
                         ).addCallback(self.process_test_message, i)
 
     def process_test_message(self, existing_doc, doc_num):
         if existing_doc is None:
             logger.info("Creating new test message with ID %d", doc_num)
             doc = dict(
-              _id="test.message.%d" % doc_num,
               storage_key=doc_num,
               )
-            return self.doc_model.create_raw_document(doc, 'proto/test',
+            did = "test.message.%d" % doc_num
+            return self.doc_model.create_raw_document(did, doc, 'proto/test',
                                                       self.account
                         ).addCallback(self.saved_message, doc)
         else:
             logger.info("Skipping test message with ID %d - already exists",
                         doc_num)
             # we are done.
 
     def finished(self, result):
       self.conductor.on_synch_finished(self.account, result)
 
     def saved_message(self, result, doc):
-        logger.debug("Finished saving test message (%s)", doc['_id'])
+        logger.debug("Finished saving test message %r", result)
         # done
 
 # A 'converter' - takes a proto/test as input and creates a
 # 'raw/message/rfc822' as output.
 class TestConverter(base.ConverterBase):
     def convert(self, doc):
         me = doc['storage_key']
         headers = """\
@@ -70,10 +67,10 @@ Subject: This is test document %(storage
         return new_doc
 
 
 class TestAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
-  def startSync(self, conductor):
-    TestMessageProvider(self, conductor).attach()
+  def startSync(self, conductor, doc_model):
+    TestMessageProvider(self, conductor, doc_model).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,21 +1,22 @@
 import logging
 
 from twisted.internet import reactor, defer
 from twisted.python.failure import Failure
 import twisted.web.error
 import paisley
 
-import junius.proto as proto
-from junius.model import get_db
+from . import proto as proto
+from .model import get_db
 
 logger = logging.getLogger(__name__)
 
 _conductor = None
+from .model import get_doc_model
 
 def get_conductor(options=None):
   global _conductor
   if _conductor is None:
     proto.init_protocols()
     _conductor = SyncConductor(options)
   else:
     assert options is None, 'can only set this at startup'
@@ -44,27 +45,28 @@ class SyncConductor(object):
     #self.reactor.stop()
 
   def _getAllAccounts(self):
     return self.db.openView('raindrop!accounts!all', 'all'
       ).addCallback(self._gotAllAccounts
       ).addErrback(self._ohNoes)
 
   def _gotAllAccounts(self, rows, *args, **kwargs):
+    doc_model = get_doc_model()
     self.log.info("Have %d accounts to synch", len(rows))
     for row in rows:
       account_details = row['value']
       kind = account_details['kind']
       self.log.debug("Found account using protocol %s", kind)
       if not self.options.protocols or kind in self.options.protocols:
         if kind in proto.protocols:
           account = proto.protocols[kind](self.db, account_details)
           self.log.info('Starting sync of %s account: %s',
                         kind, account_details.get('name', '(un-named)'))
-          account.startSync(self)
+          account.startSync(self, doc_model)
           self.active_accounts.append(account)
         else:
           self.log.error("Don't know what to do with account kind: %s", kind)
       else:
           self.log.info("Skipping account - protocol '%s' is disabled", kind)
 
   def sync(self, whateva=None):
     return self._getAllAccounts()