--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,17 +1,16 @@
#!/usr/bin/env python
'''
Fetch skype contacts and chats.
'''
import base64, datetime
import pprint
import time
-import re
import logging
from urllib2 import urlopen
import twisted.python.log
from twisted.internet import defer, threads
from ..proc import base
@@ -94,34 +93,43 @@ class TwistySkype(object):
def attached(self, status):
logger.info("attached to skype - getting chats")
return threads.deferToThread(self.skype._GetChats
).addCallback(self._cb_got_chats
)
def _cb_got_chats(self, chats):
logger.debug("skype has %d chat(s) total", len(chats))
- dl = []
- # *sob* - I'd so like to work out the correct pattern for this.
- # Using a DeferredList with > xxx elts appears to cause:
- # | File "...\twisted\internet\selectreactor.py", line 40, in win32select
- # | r, w, e = select.select(r, w, w, timeout)
- # | exceptions.ValueError: too many file descriptors in select()
- # Tada - DeferredSemaphore's to the rescue! See
- # http://oubiwann.blogspot.com/2008/06/async-batching-with-twisted-walkthrough.html
- dl = []
- sem = defer.DeferredSemaphore(max_run)
- for chat in chats:
- dl.append(sem.run(self._cb_maybe_process_chat, chat))
- # and also process the messages in each chat.
- dl.append(sem.run(self._cb_process_chat_messages, chat))
+ # chats is a tuple...
+ self.remaining_chats = list(chats)
+ return self.process_next_chat()
+
+ def process_next_chat(self):
+ if not self.remaining_chats:
+ logger.debug("finished processing chats")
+ self.finished(None) # not sure this is the right place...
+ return
- return defer.DeferredList(dl).addCallback(self.finished)
+ chat = self.remaining_chats.pop()
+ d = defer.Deferred()
+ # process the chat object itself if its not seen before.
+ d.addCallback(self._cb_maybe_process_chat, chat)
+ # process all messages in all chats - there may be new ones even if
+ # we previously have seen the chat itself.
+ d.addCallback(self._cb_process_chat_messages, chat)
+ d.addCallback(self._cb_processed_chat)
+ d.callback(None)
+ return d
- def _cb_maybe_process_chat(self, chat):
+ def _cb_processed_chat(self, result):
+ # the 'pattern' we copied from the imap sample code calls for us
+ # to recurse - but that can recurse too far...
+ return self.conductor.reactor.callLater(0, self.process_next_chat)
+
+ def _cb_maybe_process_chat(self, result, chat):
logger.debug("seeing if skype chat %r exists", chat.Name)
return self.doc_model.open_document(self.get_docid_for_chat(chat),
).addCallback(self._cb_process_chat, chat)
def _cb_process_chat(self, existing_doc, chat):
if existing_doc is None:
logger.info("Creating new skype chat %r", chat.Name)
# make a 'deferred list' to fetch each property one at a time.
@@ -148,37 +156,45 @@ class TwistySkype(object):
'proto/skype-chat',
self.account
)
def finished(self, result):
return self.conductor.on_synch_finished(self.account, result)
# Processing the messages for each chat...
- def _cb_process_chat_messages(self, chat):
+ def _cb_process_chat_messages(self, result, chat):
# Get each of the messages in the chat
return threads.deferToThread(chat._GetMessages
).addCallback(self._cb_got_messages, chat,
)
def _cb_got_messages(self, messages, chat):
logger.debug("chat '%s' has %d message(s) total; looking for new ones",
chat.Name, len(messages))
- # See *sob* above...
- dl = []
- sem = defer.DeferredSemaphore(max_run)
- for msg in messages:
- dl.append(sem.run(self._cb_check_message, chat, msg))
- return defer.DeferredList(dl)
+ self.remaining_messages = list(messages) # message is a tuple.
+ self.current_chat = chat
+ return self.process_next_message()
- def _cb_check_message(self, chat, msg):
+ def process_next_message(self):
+ if not self.remaining_messages:
+ logger.debug("finished processing messages for this chat")
+ return
+
+ msg = self.remaining_messages.pop()
+ chat = self.current_chat
logger.debug("seeing if message %r exists (in chat %r)",
msg._Id, chat.Name)
return self.doc_model.open_document(self.get_docid_for_msg(msg)
- ).addCallback(self._cb_maybe_process_message, chat, msg)
+ ).addCallback(self._cb_maybe_process_message, chat, msg
+ ).addCallback(self._cb_processed_message
+ )
+
+ def _cb_processed_message(self, result):
+ return self.conductor.reactor.callLater(0, self.process_next_message)
def _cb_maybe_process_message(self, existing_doc, chat, msg):
if existing_doc is None:
logger.debug("New skype message %d", msg._Id)
# make a 'deferred list' to fetch each property one at a time.
ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
return defer.DeferredList(ds
).addCallback(self._cb_got_msg_props, chat, msg)
@@ -189,18 +205,21 @@ class TwistySkype(object):
doc = {}
for (name, typ), (ok, val) in zip(MSG_PROPS, results):
if ok:
doc['skype_'+name.lower()] = simple_convert(val, typ)
# we include the skype username with the ID as they are unique per user.
docid = self.get_docid_for_msg(msg)
return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
self.account
+ ).addCallback(self._cb_msg_saved
)
+ def _cb_msg_saved(self, result):
+ pass
# A 'converter' - takes a proto/skype-msg as input and creates a
# 'message' as output (some other intermediate step might end up more
# appopriate)
class SkypeConverter(base.ConverterBase):
def convert(self, doc):
# We need to open the 'chat' for this Message. Clearly this is
# going to be inefficient...
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -2,16 +2,17 @@
'''
Fetch twitter raw* objects
'''
# prevent 'import twitter' finding this module!
from __future__ import absolute_import
import logging
+import re
import twisted.python.log
from twisted.internet import defer, threads
from ..proc import base
# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
# about getting twisted support into the twitter package.
# Sadly, the twisty-twitter package has issues of its own (like calling
@@ -41,66 +42,70 @@ class TwitterProcessor(object):
def attach(self):
logger.info("attaching to twitter...")
username = self.account.details['username']
pw = self.account.details['password']
return threads.deferToThread(twitter.Api,
username=username, password=pw
).addCallback(self.attached
- ).addBoth(self.finished
)
def attached(self, twit):
self.twit = twit
# build the list of all users we will fetch tweets from.
return threads.deferToThread(self.twit.GetFriends
).addCallback(self.got_friends)
def got_friends(self, friends):
# None at the start means 'me'
self.friends_remaining = [None] + [f.screen_name for f in friends]
return self.process_next_friend()
def process_next_friend(self):
if not self.friends_remaining:
logger.debug("Finished processing twitter friends")
- return
+ return defer.maybeDeferred(self.finished, None)
fid = self.friends_remaining.pop()
return threads.deferToThread(self.twit.GetUserTimeline, fid
).addCallback(self.got_friend_timeline, fid
).addErrback(self.err_friend_timeline, fid
+ ).addCallback(self.finished_friend
)
+ def finished_friend(self, result):
+ self.conductor.reactor.callLater(0, self.process_next_friend)
+
def err_friend_timeline(self, failure, fid):
logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
- return self.process_next_friend()
def got_friend_timeline(self, timeline, fid):
self.current_fid = fid
self.remaining_tweets = timeline[:]
+ logger.debug("Friend %r has %d items in their timeline", fid,
+ len(self.remaining_tweets))
return self.process_next_tweet()
def process_next_tweet(self):
if not self.remaining_tweets:
logger.debug("Finished processing timeline")
- return self.process_next_friend()
+ return
tweet = self.remaining_tweets.pop()
logger.debug("seeing if tweet %r exists", tweet.id)
docid = "tweet.%d" % (tweet.id,)
return self.doc_model.open_document(docid,
).addCallback(self.maybe_process_tweet, docid, tweet)
def maybe_process_tweet(self, existing_doc, docid, tweet):
if existing_doc is None:
# put the 'raw' document object together and save it.
- logger.info("New tweet %s", tweet.id)
+ logger.info("New tweet '%s...' (%s)", tweet.text[:25], tweet.id)
# create the couch document for the tweet itself.
doc = {}
for name in TWEET_PROPS:
val = getattr(tweet, name)
# simple hacks - users just become the user ID.
if isinstance(val, twitter.User):
val = val.id
doc['twitter_'+name.lower()] = val
@@ -117,21 +122,30 @@ class TwitterProcessor(object):
return self.process_next_tweet()
def finished(self, result):
return self.conductor.on_synch_finished(self.account, result)
# A 'converter' - takes a proto/twitter as input and creates a
# 'message' as output (some other intermediate step might end up more
-# appopriate)
+# appropriate)
class TwitterConverter(base.ConverterBase):
+ re_tags = re.compile(r'#(\w+)')
def convert(self, doc):
+ # for now, if a 'proto' can detect tags, it writes them directly
+ # to a 'tags' attribute.
+ body = doc['twitter_text']
+ tags = self.re_tags.findall(body)
return {'from': ['twitter', doc['twitter_user']],
- 'body': doc['twitter_text']}
+ 'body': body,
+ 'body_preview': body[:128],
+ 'tags': tags,
+ 'timestamp': int(doc['twitter_created_at_in_seconds'])
+ }
class TwitterAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
def startSync(self, conductor, doc_model):