get imap and twitter working in the new world order twisty
authorMark Hammond <mhammond@skippinet.com.au>
Fri, 20 Mar 2009 16:26:49 +1100
branchtwisty
changeset 110 20979788066de01860cf050a0d0c43498d640624
parent 109 67c16c20afc0730084a2c65d36073bb53144f896
child 111 b0340c2821da37ecd100912692f3876ed6fa1806
push id1
push userroot
push dateWed, 08 Apr 2009 01:46:05 +0000
get imap and twitter working in the new world order
server/python/junius/pipeline.py
server/python/junius/proto/imap.py
server/python/junius/proto/twitter.py
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -10,17 +10,20 @@ chain = [
     # trom_type, to_type, transformer)
     ('proto/test',         'raw/message/rfc822',
                            'junius.proto.test.TestConverter'),
     # skype goes directly to 'message' for now...
     ('proto/skype-msg',    'message',
                            'junius.proto.skype.SkypeConverter'),
     # skype-chat is 'terminal'
     ('proto/skype-chat', None, None),
-    #('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
+    ('proto/imap',         'raw/message/rfc822',
+                           'junius.proto.imap.IMAPConverter'),
+    ('proto/twitter',      'message',
+                           'junius.proto.twitter.TwitterConverter'),
     ('raw/message/rfc822', 'raw/message/email',
                            'junius.ext.message.rfc822.RFC822Converter'),
     ('raw/message/email',  'message',
                            'junius.ext.message.email.EmailConverter'),
     # message is 'terminal'
     ('message', None, None),
 ]
 
@@ -95,16 +98,16 @@ class Pipeline(object):
         return self.doc_model.open_document(docid
                     ).addCallback(self._cb_got_last_doc, rootdocid, xform_info
                     )
 
     def _cb_got_last_doc(self, doc, rootdocid, xform_info):
         assert doc is not None, "failed to locate the doc for %r" % rootdocid
         dest_type, xformer = xform_info
         logger.debug("calling %r to create a %s from %s", xformer, dest_type,
-                     doc)
+                     doc['_id'])
         return defer.maybeDeferred(xformer.convert, doc
                         ).addCallback(self._cb_converted, dest_type, rootdocid)
 
     def _cb_converted(self, new_doc, dest_type, rootdocid):
-        logger.debug("converter created new document %s", new_doc)
+        logger.debug("converter returned new document for %s", rootdocid)
         self.num_this_process += 1
         return self.doc_model.create_ext_document(new_doc, dest_type, rootdocid)
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -51,157 +51,165 @@ class ImapClient(imap4.IMAP4Client):
   def _reqList(self, *args, **kwargs):
     self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
     return self.list('', '*').addCallback(self._procList)
 
   def _procList(self, result, *args, **kwargs):
     # As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
     # We keep a 'stack' of items left to process in an instance variable...
     self.folder_infos = result[:]
-    return self._processNextFolder()
+    return self._process_next_folder()
 
-  def _processNextFolder(self):
+  def _process_next_folder(self):
     if not self.folder_infos:
       # yay - all done!
       return
 
     flags, delim, name = self.folder_infos.pop()
-    self.current_folder_path = cfp = name.split(delim)
+    self.current_folder = name
     logger.debug('Processing folder %s (flags=%s)', name, flags)
     if r"\Noselect" in flags:
       logger.debug("'%s' is unselectable - skipping", name)
-      return self._processNextFolder()
+      return self._process_next_folder()
 
     # XXX - sob - markh sees:
     # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
     # although we obviously have seen them already in the other folders.
-    if cfp and cfp[0].startswith('[') and cfp[0].endswith(']'):
+    if name and name.startswith('['):
       logger.info("'%s' appears special -skipping", name)
-      return self._processNextFolder()
+      return self._process_next_folder()
 
     return self.examine(name
-                 ).addCallback(self._examineFolder, cfp
-                 ).addErrback(self._cantExamineFolder, cfp)
+                 ).addCallback(self._examineFolder, name
+                 ).addErrback(self._cantExamineFolder, name)
 
   def _examineFolder(self, result, folder_path):
     logger.debug('Looking for messages already fetched for folder %s', folder_path)
     startkey=['rfc822', self.account.details['_id'], [folder_path, 0]]
     endkey=['rfc822', self.account.details['_id'], [folder_path, 4000000000]]
     return get_db().openView('raindrop!messages!by', 'by_storage',
                              startkey=startkey, endkey=endkey,
               ).addCallback(self._fetchAndProcess, folder_path)
 
   def _fetchAndProcess(self, rows, folder_path):
     allMessages = imap4.MessageSet(1, None)
     seen_ids = [r['key'][2][1] for r in rows]
-    logger.debug("%d messages already exist from %s",
-                 len(seen_ids), folder_path)
+    logger.debug("%d messages exist in %s", len(seen_ids), folder_path)
     return self.fetchUID(allMessages, True).addCallback(
             self._gotUIDs, folder_path, seen_ids)
 
   def _gotUIDs(self, uidResults, name, seen_uids):
-    uids = set([int(result['UID']) for result in uidResults.values()])
-    need = uids - set(seen_uids)
-    logger.info("Folder %s has %d messages, %d of which we haven't seen",
-                 name, len(uids), len(need))
-    self.messages_remaining = need
-    return self._processNextMessage()
+    uids = [int(result['UID']) for result in uidResults.values()]
+    logger.info("Folder %s has %d messages", name, len(uids))
+    self.messages_remaining = uids
+    return self._process_next_message()
 
-  def _processNextMessage(self):
+  def _process_next_message(self):
     logger.debug("processNextMessage has %d messages to go...",
                  len(self.messages_remaining))
     if not self.messages_remaining:
-      return self._processNextFolder()
+      return self._process_next_folder()
 
     uid = self.messages_remaining.pop()
-    to_fetch = imap4.MessageSet(uid)
-    # grr - we have to get the rfc822 body and the flags in separate requests.
-    logger.debug("fetching rfc822 for message %s", to_fetch)
-    return self.fetchMessage(to_fetch, uid=True
-                ).addCallback(self._gotBody, to_fetch
-                )
+    # XXX - we need something to make this truly unique.
+    did = "%s#%d" % (self.current_folder, uid)
+    logger.debug("seeing if imap message %r exists", did)
+    return self.doc_model.open_document(did,
+                    ).addCallback(self._cb_process_message, uid, did
+                    )
 
-  def _gotBody(self, result, to_fetch):
+  def _cb_process_message(self, existing_doc, uid, did):
+    if existing_doc is None:
+      logger.debug("new imap message %r - fetching content", did)
+      # grr - we have to get the rfc822 body and the flags in separate requests.
+      to_fetch = imap4.MessageSet(uid)
+      return self.fetchMessage(to_fetch, uid=True
+                  ).addCallback(self._cb_got_body, uid, did, to_fetch
+                  )
+    else:
+      logger.debug("Skipping message %r - already exists", did)
+      # we are done with this message.
+      return self._process_next_message()
+
+  def _cb_got_body(self, result, uid, did, to_fetch):
     _, result = result.popitem()
     try:
-      body = result['RFC822'].decode(imap_encoding)
+      content = result['RFC822'].decode(imap_encoding)
     except UnicodeError, why:
       logger.error("Failed to decode message "
                    "(but will re-decode ignoring errors) : %s", why)
       # heh - 'ignore' and 'replace' are apparently ignored for the 'utf-7'
       # codecs...
       try:
         body = result['RFC822'].decode(imap_encoding, 'ignore')
       except UnicodeError, why:
+        # XXX - is this possible???
         logger.error("and failed to 'ignore' unicode errors - skipping it: %s",
                      why)
-        return self._processNextMessage()
+        return self._process_next_message()
 
     # grr - get the flags
-    logger.debug("fetching flags for message %s", to_fetch)
+    logger.debug("fetching flags for message %s", did)
     return self.fetchFlags(to_fetch, uid=True
-                ).addCallback(self._gotMessage, body
+                ).addCallback(self._cb_got_flags, uid, did, content
                 ).addErrback(self._cantGetMessage
                 )
 
-  def _gotMessage(self, result, body):
-    # not sure about this - can we ever get more?
+  def _cb_got_flags(self, result, uid, did, content):
     logger.debug("flags are %s", result)
     assert len(result)==1, result
     _, result = result.popitem()
     flags = result['FLAGS']
     # put the 'raw' document object together and save it.
     doc = dict(
-      type='rawMessage',
-      subtype='rfc822',
-      account_id=self.account.details['_id'],
-      storage_key=[self.current_folder_path, int(result['UID'])],
-      rfc822=body,
+      storage_key=[self.current_folder, uid],
       imap_flags=flags,
+      imap_content=content,
       )
-    return get_db().saveDoc(doc
-                ).addCallback(self._savedDocument
-                ).addErrback(self._cantSaveDocument
-                )
-
+    return self.doc_model.create_raw_document(did, doc, 'proto/imap',
+                                              self.account
+                ).addCallback(self._cb_saved_message)
+    
   def _cantGetMessage(self, failure):
     logger.error("Failed to fetch message: %s", failure)
-    return self._processNextMessage()
+    return self._process_next_message()
 
-  def _savedDocument(self, result):
+  def _cb_saved_message(self, result):
     logger.debug("Saved message %s", result)
-    return self._processNextMessage()
+    return self._process_next_message()
 
   def _cantSaveDocument(self, failure):
     logger.error("Failed to save message: %s", failure)
-    return self._processNextMessage()
+    return self._process_next_message()
 
   def _cantExamineFolder(self, failure, name, *args, **kw):
     logger.warning("Failed to examine folder '%s': %s", name, failure)
-    return self._processNextFolder()
+    return self._process_next_folder()
 
   def accountStatus(self, result, *args):
     return self.account.reportStatus(*args)
 
 
 class ImapClientFactory(protocol.ClientFactory):
   protocol = ImapClient
 
-  def __init__(self, account, conductor):
+  def __init__(self, account, conductor, doc_model):
     self.account = account
     self.conductor = conductor
+    self.doc_model = doc_model
 
     self.ctx = ssl.ClientContextFactory()
     self.backoff = 8 # magic number
 
   def buildProtocol(self, addr):
     p = self.protocol(self.ctx)
     p.factory = self
     p.account = self.account
     p.conductor = self.conductor
+    p.doc_model = self.doc_model
     return p
 
   def connect(self):
     details = self.account.details
     logger.debug('attempting to connect to %s:%d (ssl: %s)',
                  details['host'], details['port'], details['ssl'])
     reactor = self.conductor.reactor    
     if details.get('ssl'):
@@ -231,19 +239,27 @@ class ImapClientFactory(protocol.ClientF
                    self.backoff)
     # It occurs that some "account manager" should be reported of the error,
     # and *it* asks us to retry later?  eg, how do I ask 'ignore backoff -
     # try again *now*"?
     self.conductor.reactor.callLater(self.backoff, self.connect)
     self.backoff = min(self.backoff * 2, 600) # magic number
 
 
+# A 'converter' - takes a proto/imap as input and creates a
+# 'raw/message/rfc822' as output
+class IMAPConverter(base.ConverterBase):
+  def convert(self, doc):
+    headers, body = doc['imap_content'].split('\r\n\r\n', 1)
+    return {'headers': headers, 'body': body}
+
+
 class IMAPAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
-  def startSync(self, conductor):
-    self.factory = ImapClientFactory(self, conductor)
+  def startSync(self, conductor, doc_model):
+    self.factory = ImapClientFactory(self, conductor, doc_model)
     self.factory.connect()
     # XXX - wouldn't it be good to return a deferred here, so the conductor
     # can reliably wait for the deferred to complete, rather than forcing
     # each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -3,20 +3,19 @@
 Fetch twitter raw* objects
 '''
 
 # prevent 'import twitter' finding this module!
 from __future__ import absolute_import
 
 import logging
 import twisted.python.log
-from twisted.internet import reactor, defer, threads
+from twisted.internet import defer, threads
 
 from ..proc import base
-from ..model import get_db
 
 # See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
 # about getting twisted support into the twitter package.
 # Sadly, the twisty-twitter package has issues of its own (like calling
 # delegates outside the deferred mechanism meaning you can't rely on callbacks
 # being completed when they say they are.)
 
 # So for now we are going with the blocking twitter package used via
@@ -28,99 +27,112 @@ logger = logging.getLogger(__name__)
 TWEET_PROPS = """
         created_at created_at_in_seconds favorited in_reply_to_screen_name
         in_reply_to_user_id in_reply_to_status_id truncated
         source id text relative_created_at user
 """.split()
 
 
 class TwitterProcessor(object):
-    def __init__(self, account, reactor):
+    def __init__(self, account, conductor, doc_model):
         self.account = account
-        self.reactor = reactor
+        self.conductor = conductor
+        self.doc_model = doc_model
         self.twit = None
         self.seen_tweets = None
 
     def attach(self):
         logger.info("attaching to twitter...")
         username = self.account.details['username']
         pw = self.account.details['password']
         return threads.deferToThread(twitter.Api,
                                   username=username, password=pw
                     ).addCallback(self.attached
                     ).addBoth(self.finished
                     )
 
     def attached(self, twit):
         self.twit = twit
-        # work out which tweets are 'new'
-        startkey=['tweet', self.account.details['_id']]
-        # oh for https://issues.apache.org/jira/browse/COUCHDB-194 to be fixed...
-        endkey=['tweet', self.account.details['_id'] + 'ZZZZZZZZZZZZZZZZZZ']
-        return get_db().openView('raindrop!messages!by', 'by_storage',
-                                 startkey=startkey, endkey=endkey,
-            ).addCallback(self.got_seen_tweets
-            )
-
-    def got_seen_tweets(self, rows):
-        self.seen_tweets = set([r['key'][2] for r in rows])
-        logger.debug("Have %d tweets already in the database",
-                     len(self.seen_tweets))
-
         # build the list of all users we will fetch tweets from.
         return threads.deferToThread(self.twit.GetFriends
                   ).addCallback(self.got_friends)
-        
+
     def got_friends(self, friends):
-        dl = []
-        for fid in [None] + [f.screen_name for f in friends]:
-            dl.append(threads.deferToThread(self.twit.GetUserTimeline, fid
+        # None at the start means 'me'
+        self.friends_remaining = [None] + [f.screen_name for f in friends]
+        return self.process_next_friend()
+
+    def process_next_friend(self):
+        if not self.friends_remaining:
+            logger.debug("Finished processing twitter friends")
+            return
+
+        fid = self.friends_remaining.pop()
+        return threads.deferToThread(self.twit.GetUserTimeline, fid
                             ).addCallback(self.got_friend_timeline, fid
                             ).addErrback(self.err_friend_timeline, fid
                             )
-                      )
-
-        return defer.DeferredList(dl)
 
     def err_friend_timeline(self, failure, fid):
         logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
+        return self.process_next_friend()
 
     def got_friend_timeline(self, timeline, fid):
-        dl = []
-        for tweet in timeline:
-            if tweet.id not in self.seen_tweets:
-                logger.info("New tweet %s", tweet.id)
-                # create the couch document for the tweet itself.
-                doc = dict(
-                  type='rawMessage',
-                  subtype='tweet',
-                  account_id=self.account.details['_id'],
-                  storage_key=tweet.id,
-                )
-                for name in TWEET_PROPS:
-                    val = getattr(tweet, name)
-                    # simple hacks - users just become the user ID.
-                    if isinstance(val, twitter.User):
-                        val = val.id
-                    doc['twitter_'+name.lower()] = val
-                dl.append(get_db().saveDoc(doc
-                            ).addCallback(self.saved_tweet, tweet
-                            ))
-            else:
-                # Must be a seen one.
-                logger.debug("Skipping seen tweet '%s'", tweet.id)
-        return defer.DeferredList(dl)
+        self.current_fid = fid
+        self.remaining_tweets = timeline[:]
+        return self.process_next_tweet()
+
+    def process_next_tweet(self):
+        if not self.remaining_tweets:
+            logger.debug("Finished processing timeline")
+            return self.process_next_friend()
+
+        tweet = self.remaining_tweets.pop()
+
+        logger.debug("seeing if tweet %r exists", tweet.id)
+        docid = "tweet.%d" % (tweet.id,)
+        return self.doc_model.open_document(docid,
+                        ).addCallback(self.maybe_process_tweet, docid, tweet)
+
+    def maybe_process_tweet(self, existing_doc, docid, tweet):
+        if existing_doc is None:
+            # put the 'raw' document object together and save it.
+            logger.info("New tweet %s", tweet.id)
+            # create the couch document for the tweet itself.
+            doc = {}
+            for name in TWEET_PROPS:
+                val = getattr(tweet, name)
+                # simple hacks - users just become the user ID.
+                if isinstance(val, twitter.User):
+                    val = val.id
+                doc['twitter_'+name.lower()] = val
+            return self.doc_model.create_raw_document(docid, doc, 'proto/twitter',
+                                                      self.account
+                        ).addCallback(self.saved_tweet, tweet)
+        else:
+            # Must be a seen one.
+            logger.debug("Skipping seen tweet '%s'", tweet.id)
+            return self.process_next_tweet()
 
     def saved_tweet(self, result, tweet):
         logger.debug("Finished processing of new tweet '%s'", tweet.id)
+        return self.process_next_tweet()
 
     def finished(self, result):
-        from ..sync import get_conductor
-        return get_conductor().accountFinishedSync(self.account, result)
+        return self.conductor.on_synch_finished(self.account, result)
+
+
+# A 'converter' - takes a proto/twitter as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class TwitterConverter(base.ConverterBase):
+    def convert(self, doc):
+        return {'from': ['twitter', doc['twitter_user']],
+                'body': doc['twitter_text']}
 
 
 class TwitterAccount(base.AccountBase):
   def __init__(self, db, details):
     self.db = db
     self.details = details
 
-  def startSync(self, conductor):
-    return TwitterProcessor(self, reactor).attach()
+  def startSync(self, conductor, doc_model):
+    return TwitterProcessor(self, conductor, doc_model).attach()