move to a non-recursive twisted model, basic 'tags' support twisty
authorMark Hammond <mhammond@skippinet.com.au>
Mon, 23 Mar 2009 16:48:36 +1100
branchtwisty
changeset 114 652cc177430ece2c1c7e026ea3c539dc301ae45b
parent 113 e07ebe3f1e17ed502d2dc052bc753b6146d33d73
child 115 20624eb48d8c2516b80c37697e70caba7b248a83
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
move to a non-recursive twisted model, basic 'tags' support
server/python/junius/proto/skype.py
server/python/junius/proto/twitter.py
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,17 +1,16 @@
 #!/usr/bin/env python
 '''
 Fetch skype contacts and chats.
 '''
 
 import base64, datetime
 import pprint
 import time
-import re
 import logging
 from urllib2 import urlopen
 
 import twisted.python.log
 from twisted.internet import defer, threads
 
 from ..proc import base
 
@@ -94,34 +93,43 @@ class TwistySkype(object):
     def attached(self, status):
         logger.info("attached to skype - getting chats")
         return threads.deferToThread(self.skype._GetChats
                     ).addCallback(self._cb_got_chats
                     )
 
     def _cb_got_chats(self, chats):
         logger.debug("skype has %d chat(s) total", len(chats))
-        dl = []
-        # *sob* - I'd so like to work out the correct pattern for this.
-        # Using a DeferredList with > xxx elts appears to cause:
-        # | File "...\twisted\internet\selectreactor.py", line 40, in win32select
-        # |  r, w, e = select.select(r, w, w, timeout)
-        # | exceptions.ValueError: too many file descriptors in select()
-        # Tada - DeferredSemaphore's to the rescue!  See
-        # http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html
-        dl = []
-        sem = defer.DeferredSemaphore(max_run)
-        for chat in chats:
-            dl.append(sem.run(self._cb_maybe_process_chat, chat))
-            # and also process the messages in each chat.
-            dl.append(sem.run(self._cb_process_chat_messages, chat))
+        # chats is a tuple...
+        self.remaining_chats = list(chats)
+        return self.process_next_chat()
+
+    def process_next_chat(self):
+        if not self.remaining_chats:
+            logger.debug("finished processing chats")
+            self.finished(None) # not sure this is the right place...
+            return
 
-        return defer.DeferredList(dl).addCallback(self.finished)
+        chat = self.remaining_chats.pop()
+        d = defer.Deferred()
+        # process the chat object itself if its not seen before.
+        d.addCallback(self._cb_maybe_process_chat, chat)
+        # process all messages in all chats - there may be new ones even if
+        # we previously have seen the chat itself.
+        d.addCallback(self._cb_process_chat_messages, chat)
+        d.addCallback(self._cb_processed_chat)
+        d.callback(None)
+        return d
 
-    def _cb_maybe_process_chat(self, chat):
+    def _cb_processed_chat(self, result):
+        # the 'pattern' we copied from the imap sample code calls for us
+        # to recurse - but that can recurse too far...
+        return self.conductor.reactor.callLater(0, self.process_next_chat)
+
+    def _cb_maybe_process_chat(self, result, chat):
         logger.debug("seeing if skype chat %r exists", chat.Name)
         return self.doc_model.open_document(self.get_docid_for_chat(chat),
                         ).addCallback(self._cb_process_chat, chat)
 
     def _cb_process_chat(self, existing_doc, chat):
         if existing_doc is None:
             logger.info("Creating new skype chat %r", chat.Name)
             # make a 'deferred list' to fetch each property one at a time.
@@ -148,37 +156,45 @@ class TwistySkype(object):
                                                   'proto/skype-chat',
                                                   self.account
                     )
 
     def finished(self, result):
         return self.conductor.on_synch_finished(self.account, result)
 
     # Processing the messages for each chat...
-    def _cb_process_chat_messages(self, chat):
+    def _cb_process_chat_messages(self, result, chat):
         # Get each of the messages in the chat
         return threads.deferToThread(chat._GetMessages
                     ).addCallback(self._cb_got_messages, chat,
                     )
 
     def _cb_got_messages(self, messages, chat):
         logger.debug("chat '%s' has %d message(s) total; looking for new ones",
                      chat.Name, len(messages))
-        # See *sob* above...
-        dl = []
-        sem = defer.DeferredSemaphore(max_run)
-        for msg in messages:
-            dl.append(sem.run(self._cb_check_message, chat, msg))
-        return defer.DeferredList(dl)
+        self.remaining_messages = list(messages) # message is a tuple.
+        self.current_chat = chat
+        return self.process_next_message()
 
-    def _cb_check_message(self, chat, msg):
+    def process_next_message(self):
+        if not self.remaining_messages:
+            logger.debug("finished processing messages for this chat")
+            return
+
+        msg = self.remaining_messages.pop()
+        chat = self.current_chat
         logger.debug("seeing if message %r exists (in chat %r)",
                      msg._Id, chat.Name)
         return self.doc_model.open_document(self.get_docid_for_msg(msg)
-                        ).addCallback(self._cb_maybe_process_message, chat, msg)
+                        ).addCallback(self._cb_maybe_process_message, chat, msg
+                        ).addCallback(self._cb_processed_message
+                        )
+
+    def _cb_processed_message(self, result):
+        return self.conductor.reactor.callLater(0, self.process_next_message)
 
     def _cb_maybe_process_message(self, existing_doc, chat, msg):
         if existing_doc is None:
             logger.debug("New skype message %d", msg._Id)
             # make a 'deferred list' to fetch each property one at a time.
             ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
             return defer.DeferredList(ds
                         ).addCallback(self._cb_got_msg_props, chat, msg)
@@ -189,18 +205,21 @@ class TwistySkype(object):
         doc = {}
         for (name, typ), (ok, val) in zip(MSG_PROPS, results):
             if ok:
                 doc['skype_'+name.lower()] = simple_convert(val, typ)
         # we include the skype username with the ID as they are unique per user.
         docid = self.get_docid_for_msg(msg)
         return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
                                                   self.account
+                    ).addCallback(self._cb_msg_saved
                     )
 
+    def _cb_msg_saved(self, result):
+        pass
 
 # A 'converter' - takes a proto/skype-msg as input and creates a
 # 'message' as output (some other intermediate step might end up more
 # appopriate)
 class SkypeConverter(base.ConverterBase):
     def convert(self, doc):
         # We need to open the 'chat' for this Message.  Clearly this is
         # going to be inefficient...
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -2,16 +2,17 @@
 '''
 Fetch twitter raw* objects
 '''
 
 # prevent 'import twitter' finding this module!
 from __future__ import absolute_import
 
 import logging
+import re
 import twisted.python.log
 from twisted.internet import defer, threads
 
 from ..proc import base
 
 # See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
 # about getting twisted support into the twitter package.
 # Sadly, the twisty-twitter package has issues of its own (like calling
@@ -41,66 +42,70 @@ class TwitterProcessor(object):
 
     def attach(self):
         logger.info("attaching to twitter...")
         username = self.account.details['username']
         pw = self.account.details['password']
         return threads.deferToThread(twitter.Api,
                                   username=username, password=pw
                     ).addCallback(self.attached
-                    ).addBoth(self.finished
                     )
 
     def attached(self, twit):
         self.twit = twit
         # build the list of all users we will fetch tweets from.
         return threads.deferToThread(self.twit.GetFriends
                   ).addCallback(self.got_friends)
 
     def got_friends(self, friends):
         # None at the start means 'me'
         self.friends_remaining = [None] + [f.screen_name for f in friends]
         return self.process_next_friend()
 
     def process_next_friend(self):
         if not self.friends_remaining:
             logger.debug("Finished processing twitter friends")
-            return
+            return defer.maybeDeferred(self.finished, None)
 
         fid = self.friends_remaining.pop()
         return threads.deferToThread(self.twit.GetUserTimeline, fid
                             ).addCallback(self.got_friend_timeline, fid
                             ).addErrback(self.err_friend_timeline, fid
+                            ).addCallback(self.finished_friend
                             )
 
+    def finished_friend(self, result):
+        self.conductor.reactor.callLater(0, self.process_next_friend)
+
     def err_friend_timeline(self, failure, fid):
         logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
-        return self.process_next_friend()
 
     def got_friend_timeline(self, timeline, fid):
         self.current_fid = fid
         self.remaining_tweets = timeline[:]
+        logger.debug("Friend %r has %d items in their timeline", fid,
+                     len(self.remaining_tweets))
         return self.process_next_tweet()
 
     def process_next_tweet(self):
         if not self.remaining_tweets:
             logger.debug("Finished processing timeline")
-            return self.process_next_friend()
+            return
 
         tweet = self.remaining_tweets.pop()
 
         logger.debug("seeing if tweet %r exists", tweet.id)
         docid = "tweet.%d" % (tweet.id,)
         return self.doc_model.open_document(docid,
                         ).addCallback(self.maybe_process_tweet, docid, tweet)
 
     def maybe_process_tweet(self, existing_doc, docid, tweet):
         if existing_doc is None:
             # put the 'raw' document object together and save it.
-            logger.info("New tweet %s", tweet.id)
+            logger.info("New tweet '%s...' (%s)", tweet.text[:25], tweet.id)
             # create the couch document for the tweet itself.
             doc = {}
             for name in TWEET_PROPS:
                 val = getattr(tweet, name)
                 # simple hacks - users just become the user ID.
                 if isinstance(val, twitter.User):
                     val = val.id
                 doc['twitter_'+name.lower()] = val
@@ -117,21 +122,30 @@ class TwitterProcessor(object):
         return self.process_next_tweet()
 
     def finished(self, result):
         return self.conductor.on_synch_finished(self.account, result)
 
 
 # A 'converter' - takes a proto/twitter as input and creates a
 # 'message' as output (some other intermediate step might end up more
-# appopriate)
+# appropriate)
 class TwitterConverter(base.ConverterBase):
+    re_tags = re.compile(r'#(\w+)')    
     def convert(self, doc):
+        # for now, if a 'proto' can detect tags, it writes them directly
+        # to a 'tags' attribute.
+        body = doc['twitter_text']
+        tags = self.re_tags.findall(body)
         return {'from': ['twitter', doc['twitter_user']],
-                'body': doc['twitter_text']}
+                'body': body,
+                'body_preview': body[:128],
+                'tags': tags,
+                'timestamp': int(doc['twitter_created_at_in_seconds'])
+                }
 
 
 class TwitterAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
   def startSync(self, conductor, doc_model):