get the skype protocol using a view to optimize no of requests twisty
authorMark Hammond <mhammond@skippinet.com.au>
Wed, 25 Mar 2009 19:14:16 +1100
branchtwisty
changeset 122 6c69a445aee2a2da7ac9f65deaf819942217f47e
parent 121 6675d659f527941a70b1b9b6bfa06c3ae408440c
child 123 ebc4050e4049b80671ed7aedc17177033c3f8e44
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
get the skype protocol using a view to optimize no of requests
schema/proto/skype/seen-map.js
server/python/junius/proto/skype.py
new file mode 100644
--- /dev/null
+++ b/schema/proto/skype/seen-map.js
@@ -0,0 +1,7 @@
+function(doc) {
+    if (doc.type && doc.type=='proto/skype-chat') {
+        emit([doc.skype_chatname, null], null);
+    } else if (doc.type && doc.type=='proto/skype-msg') {
+        emit([doc.skype_chatname, doc.skype_id], null);
+    }
+}
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,30 +1,23 @@
 #!/usr/bin/env python
 '''
 Fetch skype contacts and chats.
 '''
-
-import base64, datetime
-import pprint
-import time
 import logging
-from urllib2 import urlopen
 
 import twisted.python.log
 from twisted.internet import defer, threads
 
 from ..proc import base
 
 import Skype4Py
 
 logger = logging.getLogger(__name__)
 
-max_run = 4 # for the deferred semaphore...
-
 # These are the raw properties we fetch from skype.
 CHAT_PROPS = [
     ('ACTIVEMEMBERS',   list),
     ('ACTIVITY_TIMESTAMP', float),
     ('ADDER', unicode),
     ('APPLICANTS', list),
     #'BLOB'??
     ('BOOKMARKED', bool),
@@ -73,16 +66,19 @@ def simple_convert(str_val, typ):
 
 class TwistySkype(object):
     def __init__(self, account, conductor, doc_model):
         self.account = account
         self.conductor = conductor
         self.doc_model = doc_model
         self.skype = Skype4Py.Skype()
 
+    def finished(self, result):
+        return self.conductor.on_synch_finished(self.account, result)
+
     def get_docid_for_chat(self, chat):
         return "skypechat-" + chat.Name.encode('utf8') # hrmph!
 
     def get_docid_for_msg(self, msg):
         return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
 
     def attach(self):
         logger.info("attaching to skype...")
@@ -93,58 +89,100 @@ class TwistySkype(object):
     def attached(self, status):
         logger.info("attached to skype - getting chats")
         return threads.deferToThread(self.skype._GetChats
                     ).addCallback(self._cb_got_chats
                     )
 
     def _cb_got_chats(self, chats):
         logger.debug("skype has %d chat(s) total", len(chats))
-        # chats is a tuple...
+        # 'chats' is a tuple
         self.remaining_chats = list(chats)
         return self.process_next_chat()
 
+
     def process_next_chat(self):
         if not self.remaining_chats:
             logger.debug("finished processing chats")
             self.finished(None) # not sure this is the right place...
             return
 
         chat = self.remaining_chats.pop()
-        d = defer.Deferred()
-        # process the chat object itself if its not seen before.
-        d.addCallback(self._cb_maybe_process_chat, chat)
-        # process all messages in all chats - there may be new ones even if
-        # we previously have seen the chat itself.
-        d.addCallback(self._cb_process_chat_messages, chat)
-        d.addCallback(self._cb_processed_chat)
-        d.callback(None)
-        return d
+        # fetch all the messages (future optimization; all we need are the
+        # IDs, but we suffer from creating an instance wrapper for each item.
+        # Sadly the skype lib doesn't offer a clean way of doing this.)
+        return threads.deferToThread(chat._GetMessages
+                    ).addCallback(self._cb_got_messages, chat,
+                    )
+
+    def _cb_got_messages(self, messages, chat):
+        logger.debug("chat '%s' has %d message(s) total; looking for new ones",
+                     chat.Name, len(messages))
+
+        # Finally got all the messages for this chat.  Execute a view to
+        # determine which we have seen (note that we obviously could just
+        # fetch the *entire* view once - but we do it this way on purpose
+        # to ensure we remain scalable...)
+        return self.doc_model.db.openView('raindrop!proto!skype', 'seen',
+                                          startkey=[chat.Name],
+                                          endkey=[chat.Name, {}]
+                    ).addCallback(self._cb_got_seen, chat, messages
+                    )
 
-    def _cb_processed_chat(self, result):
-        # the 'pattern' we copied from the imap sample code calls for us
-        # to recurse - but that can recurse too far...
-        return self.conductor.reactor.callLater(0, self.process_next_chat)
+    def _cb_got_seen(self, result, chat, messages):
+        self.msgs_by_id = dict((m._Id, m) for m in messages)
+        self.current_chat = chat
+        chatname = chat.Name
+        # The view gives us a list of [chat_name, msg_id], where msg_id is
+        # None if we've seen the chat itself.  Create a set of messages we
+        # *haven't* seen - including the [chat_name, None] entry if applic.
+        all_keys = [(chatname, None)]
+        all_keys.extend((chatname, mid) for mid in self.msgs_by_id.keys())
+        seen_chats = set([tuple(row['key']) for row in result])
+        self.remaining_items = set(all_keys)-set(seen_chats)
+        # we could just process the empty list as normal, but the logging of
+        # an info when we do have items is worthwhile...
+        if not self.remaining_items:
+            logger.debug("Chat %r has no new items to process", chatname)
+            self.msgs_by_id = None
+            return self.process_next_chat()
+        # we have something to do...
+        logger.info("Chat %r has %d items to process", chatname,
+                    len(self.remaining_items))
+        logger.debug("we've already seen %d items from this chat",
+                     len(seen_chats))
+        return self.process_next_item()
 
-    def _cb_maybe_process_chat(self, result, chat):
-        logger.debug("seeing if skype chat %r exists", chat.Name)
-        return self.doc_model.open_document(self.get_docid_for_chat(chat),
-                        ).addCallback(self._cb_process_chat, chat)
+    def process_next_item(self):
+        if not self.remaining_items:
+            self.msgs_by_id = None
+            logger.debug("finished processing messages for this chat")
+            return self.conductor.reactor.callLater(0, self.process_next_chat)
 
-    def _cb_process_chat(self, existing_doc, chat):
-        if existing_doc is None:
-            logger.info("Creating new skype chat %r", chat.Name)
+        chatname, msgid = self.remaining_items.pop()
+        chat = self.current_chat
+        if msgid is None:
+            # we haven't seen the chat itself - do that.
+            logger.debug("Creating new skype chat %r", chat.Name)
             # make a 'deferred list' to fetch each property one at a time.
             ds = [threads.deferToThread(chat._Property, p)
                   for p, _ in CHAT_PROPS]
-            return defer.DeferredList(ds
+            ret = defer.DeferredList(ds
                         ).addCallback(self._cb_got_chat_props, chat)
         else:
-            logger.debug("Skipping skype chat %r - already exists", chat.Name)
-            # we are done.
+            msg = self.msgs_by_id[msgid]
+            # A new msg in this chat.
+            logger.debug("New skype message %d", msg._Id)
+            # make a 'deferred list' to fetch each property one at a time.
+            ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+            ret = defer.DeferredList(ds
+                        ).addCallback(self._cb_got_msg_props, chat, msg)
+
+        ret.addCallback(self._cb_doc_saved)
+        return ret
 
     def _cb_got_chat_props(self, results, chat):
         logger.debug("got chat %r properties: %s", chat.Name, results)
         docid = self.get_docid_for_chat(chat)
         doc ={}
         for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
@@ -152,74 +190,32 @@ class TwistySkype(object):
         # 'Name' is a special case that doesn't come via a prop.  We use
         # 'chatname' as that is the equiv attr on the messages themselves.
         doc['skype_chatname'] = chat.Name
         return self.doc_model.create_raw_document(docid, doc,
                                                   'proto/skype-chat',
                                                   self.account
                     )
 
-    def finished(self, result):
-        return self.conductor.on_synch_finished(self.account, result)
-
-    # Processing the messages for each chat...
-    def _cb_process_chat_messages(self, result, chat):
-        # Get each of the messages in the chat
-        return threads.deferToThread(chat._GetMessages
-                    ).addCallback(self._cb_got_messages, chat,
-                    )
-
-    def _cb_got_messages(self, messages, chat):
-        logger.debug("chat '%s' has %d message(s) total; looking for new ones",
-                     chat.Name, len(messages))
-        self.remaining_messages = list(messages) # message is a tuple.
-        self.current_chat = chat
-        return self.process_next_message()
-
-    def process_next_message(self):
-        if not self.remaining_messages:
-            logger.debug("finished processing messages for this chat")
-            return
-
-        msg = self.remaining_messages.pop()
-        chat = self.current_chat
-        logger.debug("seeing if message %r exists (in chat %r)",
-                     msg._Id, chat.Name)
-        return self.doc_model.open_document(self.get_docid_for_msg(msg)
-                        ).addCallback(self._cb_maybe_process_message, chat, msg
-                        ).addCallback(self._cb_processed_message
-                        )
-
-    def _cb_processed_message(self, result):
-        return self.conductor.reactor.callLater(0, self.process_next_message)
-
-    def _cb_maybe_process_message(self, existing_doc, chat, msg):
-        if existing_doc is None:
-            logger.debug("New skype message %d", msg._Id)
-            # make a 'deferred list' to fetch each property one at a time.
-            ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
-            return defer.DeferredList(ds
-                        ).addCallback(self._cb_got_msg_props, chat, msg)
-        else:
-            logger.debug("already have raw doc for msg %r; skipping", msg._Id)
-
     def _cb_got_msg_props(self, results, chat, msg):
         doc = {}
         for (name, typ), (ok, val) in zip(MSG_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
+        doc['skype_id'] = msg._Id
         # we include the skype username with the ID as they are unique per user.
         docid = self.get_docid_for_msg(msg)
         return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
                                                   self.account
-                    ).addCallback(self._cb_msg_saved
                     )
 
-    def _cb_msg_saved(self, result):
-        pass
+    def _cb_doc_saved(self, result):
+        # the 'pattern' we copied from the imap sample code calls for us
+        # to recurse - but that can recurse too far...
+        return self.conductor.reactor.callLater(0, self.process_next_item)
 
 # A 'converter' - takes a proto/skype-msg as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appopriate)
 class SkypeConverter(base.ConverterBase):
     def convert(self, doc):
         # We need to open the 'chat' for this Message.  Clearly this is
         # going to be inefficient...