--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -10,17 +10,20 @@ chain = [
# trom_type, to_type, transformer)
('proto/test', 'raw/message/rfc822',
'junius.proto.test.TestConverter'),
# skype goes directly to 'message' for now...
('proto/skype-msg', 'message',
'junius.proto.skype.SkypeConverter'),
# skype-chat is 'terminal'
('proto/skype-chat', None, None),
- #('proto/imap', 'raw/message/rfc822', 'core.proto.imap'),
+ ('proto/imap', 'raw/message/rfc822',
+ 'junius.proto.imap.IMAPConverter'),
+ ('proto/twitter', 'message',
+ 'junius.proto.twitter.TwitterConverter'),
('raw/message/rfc822', 'raw/message/email',
'junius.ext.message.rfc822.RFC822Converter'),
('raw/message/email', 'message',
'junius.ext.message.email.EmailConverter'),
# message is 'terminal'
('message', None, None),
]
@@ -95,16 +98,16 @@ class Pipeline(object):
return self.doc_model.open_document(docid
).addCallback(self._cb_got_last_doc, rootdocid, xform_info
)
def _cb_got_last_doc(self, doc, rootdocid, xform_info):
assert doc is not None, "failed to locate the doc for %r" % rootdocid
dest_type, xformer = xform_info
logger.debug("calling %r to create a %s from %s", xformer, dest_type,
- doc)
+ doc['_id'])
return defer.maybeDeferred(xformer.convert, doc
).addCallback(self._cb_converted, dest_type, rootdocid)
def _cb_converted(self, new_doc, dest_type, rootdocid):
- logger.debug("converter created new document %s", new_doc)
+ logger.debug("converter returned new document for %s", rootdocid)
self.num_this_process += 1
return self.doc_model.create_ext_document(new_doc, dest_type, rootdocid)
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -51,157 +51,165 @@ class ImapClient(imap4.IMAP4Client):
def _reqList(self, *args, **kwargs):
self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
return self.list('', '*').addCallback(self._procList)
def _procList(self, result, *args, **kwargs):
# As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
# We keep a 'stack' of items left to process in an instance variable...
self.folder_infos = result[:]
- return self._processNextFolder()
+ return self._process_next_folder()
- def _processNextFolder(self):
+ def _process_next_folder(self):
if not self.folder_infos:
# yay - all done!
return
flags, delim, name = self.folder_infos.pop()
- self.current_folder_path = cfp = name.split(delim)
+ self.current_folder = name
logger.debug('Processing folder %s (flags=%s)', name, flags)
if r"\Noselect" in flags:
logger.debug("'%s' is unselectable - skipping", name)
- return self._processNextFolder()
+ return self._process_next_folder()
# XXX - sob - markh sees:
# 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
# although we obviously have seen them already in the other folders.
- if cfp and cfp[0].startswith('[') and cfp[0].endswith(']'):
+ if name and name.startswith('['):
logger.info("'%s' appears special -skipping", name)
- return self._processNextFolder()
+ return self._process_next_folder()
return self.examine(name
- ).addCallback(self._examineFolder, cfp
- ).addErrback(self._cantExamineFolder, cfp)
+ ).addCallback(self._examineFolder, name
+ ).addErrback(self._cantExamineFolder, name)
def _examineFolder(self, result, folder_path):
logger.debug('Looking for messages already fetched for folder %s', folder_path)
startkey=['rfc822', self.account.details['_id'], [folder_path, 0]]
endkey=['rfc822', self.account.details['_id'], [folder_path, 4000000000]]
return get_db().openView('raindrop!messages!by', 'by_storage',
startkey=startkey, endkey=endkey,
).addCallback(self._fetchAndProcess, folder_path)
def _fetchAndProcess(self, rows, folder_path):
allMessages = imap4.MessageSet(1, None)
seen_ids = [r['key'][2][1] for r in rows]
- logger.debug("%d messages already exist from %s",
- len(seen_ids), folder_path)
+ logger.debug("%d messages exist in %s", len(seen_ids), folder_path)
return self.fetchUID(allMessages, True).addCallback(
self._gotUIDs, folder_path, seen_ids)
def _gotUIDs(self, uidResults, name, seen_uids):
- uids = set([int(result['UID']) for result in uidResults.values()])
- need = uids - set(seen_uids)
- logger.info("Folder %s has %d messages, %d of which we haven't seen",
- name, len(uids), len(need))
- self.messages_remaining = need
- return self._processNextMessage()
+ uids = [int(result['UID']) for result in uidResults.values()]
+ logger.info("Folder %s has %d messages", name, len(uids))
+ self.messages_remaining = uids
+ return self._process_next_message()
- def _processNextMessage(self):
+ def _process_next_message(self):
logger.debug("processNextMessage has %d messages to go...",
len(self.messages_remaining))
if not self.messages_remaining:
- return self._processNextFolder()
+ return self._process_next_folder()
uid = self.messages_remaining.pop()
- to_fetch = imap4.MessageSet(uid)
- # grr - we have to get the rfc822 body and the flags in separate requests.
- logger.debug("fetching rfc822 for message %s", to_fetch)
- return self.fetchMessage(to_fetch, uid=True
- ).addCallback(self._gotBody, to_fetch
- )
+ # XXX - we need something to make this truly unique.
+ did = "%s#%d" % (self.current_folder, uid)
+ logger.debug("seeing if imap message %r exists", did)
+ return self.doc_model.open_document(did,
+ ).addCallback(self._cb_process_message, uid, did
+ )
- def _gotBody(self, result, to_fetch):
+ def _cb_process_message(self, existing_doc, uid, did):
+ if existing_doc is None:
+ logger.debug("new imap message %r - fetching content", did)
+ # grr - we have to get the rfc822 body and the flags in separate requests.
+ to_fetch = imap4.MessageSet(uid)
+ return self.fetchMessage(to_fetch, uid=True
+ ).addCallback(self._cb_got_body, uid, did, to_fetch
+ )
+ else:
+ logger.debug("Skipping message %r - already exists", did)
+ # we are done with this message.
+ return self._process_next_message()
+
+ def _cb_got_body(self, result, uid, did, to_fetch):
_, result = result.popitem()
try:
- body = result['RFC822'].decode(imap_encoding)
+ content = result['RFC822'].decode(imap_encoding)
except UnicodeError, why:
logger.error("Failed to decode message "
"(but will re-decode ignoring errors) : %s", why)
# heh - 'ignore' and 'replace' are apparently ignored for the 'utf-7'
# codecs...
try:
body = result['RFC822'].decode(imap_encoding, 'ignore')
except UnicodeError, why:
+ # XXX - is this possible???
logger.error("and failed to 'ignore' unicode errors - skipping it: %s",
why)
- return self._processNextMessage()
+ return self._process_next_message()
# grr - get the flags
- logger.debug("fetching flags for message %s", to_fetch)
+ logger.debug("fetching flags for message %s", did)
return self.fetchFlags(to_fetch, uid=True
- ).addCallback(self._gotMessage, body
+ ).addCallback(self._cb_got_flags, uid, did, content
).addErrback(self._cantGetMessage
)
- def _gotMessage(self, result, body):
- # not sure about this - can we ever get more?
+ def _cb_got_flags(self, result, uid, did, content):
logger.debug("flags are %s", result)
assert len(result)==1, result
_, result = result.popitem()
flags = result['FLAGS']
# put the 'raw' document object together and save it.
doc = dict(
- type='rawMessage',
- subtype='rfc822',
- account_id=self.account.details['_id'],
- storage_key=[self.current_folder_path, int(result['UID'])],
- rfc822=body,
+ storage_key=[self.current_folder, uid],
imap_flags=flags,
+ imap_content=content,
)
- return get_db().saveDoc(doc
- ).addCallback(self._savedDocument
- ).addErrback(self._cantSaveDocument
- )
-
+ return self.doc_model.create_raw_document(did, doc, 'proto/imap',
+ self.account
+ ).addCallback(self._cb_saved_message)
+
def _cantGetMessage(self, failure):
logger.error("Failed to fetch message: %s", failure)
- return self._processNextMessage()
+ return self._process_next_message()
- def _savedDocument(self, result):
+ def _cb_saved_message(self, result):
logger.debug("Saved message %s", result)
- return self._processNextMessage()
+ return self._process_next_message()
def _cantSaveDocument(self, failure):
logger.error("Failed to save message: %s", failure)
- return self._processNextMessage()
+ return self._process_next_message()
def _cantExamineFolder(self, failure, name, *args, **kw):
logger.warning("Failed to examine folder '%s': %s", name, failure)
- return self._processNextFolder()
+ return self._process_next_folder()
def accountStatus(self, result, *args):
return self.account.reportStatus(*args)
class ImapClientFactory(protocol.ClientFactory):
protocol = ImapClient
- def __init__(self, account, conductor):
+ def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
+ self.doc_model = doc_model
self.ctx = ssl.ClientContextFactory()
self.backoff = 8 # magic number
def buildProtocol(self, addr):
p = self.protocol(self.ctx)
p.factory = self
p.account = self.account
p.conductor = self.conductor
+ p.doc_model = self.doc_model
return p
def connect(self):
details = self.account.details
logger.debug('attempting to connect to %s:%d (ssl: %s)',
details['host'], details['port'], details['ssl'])
reactor = self.conductor.reactor
if details.get('ssl'):
@@ -231,19 +239,27 @@ class ImapClientFactory(protocol.ClientF
self.backoff)
# It occurs that some "account manager" should be reported of the error,
# and *it* asks us to retry later? eg, how do I ask 'ignore backoff -
# try again *now*"?
self.conductor.reactor.callLater(self.backoff, self.connect)
self.backoff = min(self.backoff * 2, 600) # magic number
+# A 'converter' - takes a proto/imap as input and creates a
+# 'raw/message/rfc822' as output
+class IMAPConverter(base.ConverterBase):
+ def convert(self, doc):
+ headers, body = doc['imap_content'].split('\r\n\r\n', 1)
+ return {'headers': headers, 'body': body}
+
+
class IMAPAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
- def startSync(self, conductor):
- self.factory = ImapClientFactory(self, conductor)
+ def startSync(self, conductor, doc_model):
+ self.factory = ImapClientFactory(self, conductor, doc_model)
self.factory.connect()
# XXX - wouldn't it be good to return a deferred here, so the conductor
# can reliably wait for the deferred to complete, rather than forcing
# each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -3,20 +3,19 @@
Fetch twitter raw* objects
'''
# prevent 'import twitter' finding this module!
from __future__ import absolute_import
import logging
import twisted.python.log
-from twisted.internet import reactor, defer, threads
+from twisted.internet import defer, threads
from ..proc import base
-from ..model import get_db
# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
# about getting twisted support into the twitter package.
# Sadly, the twisty-twitter package has issues of its own (like calling
# delegates outside the deferred mechanism meaning you can't rely on callbacks
# being completed when they say they are.)
# So for now we are going with the blocking twitter package used via
@@ -28,99 +27,112 @@ logger = logging.getLogger(__name__)
TWEET_PROPS = """
created_at created_at_in_seconds favorited in_reply_to_screen_name
in_reply_to_user_id in_reply_to_status_id truncated
source id text relative_created_at user
""".split()
class TwitterProcessor(object):
- def __init__(self, account, reactor):
+ def __init__(self, account, conductor, doc_model):
self.account = account
- self.reactor = reactor
+ self.conductor = conductor
+ self.doc_model = doc_model
self.twit = None
self.seen_tweets = None
def attach(self):
logger.info("attaching to twitter...")
username = self.account.details['username']
pw = self.account.details['password']
return threads.deferToThread(twitter.Api,
username=username, password=pw
).addCallback(self.attached
).addBoth(self.finished
)
def attached(self, twit):
self.twit = twit
- # work out which tweets are 'new'
- startkey=['tweet', self.account.details['_id']]
- # oh for https://issues.apache.org/jira/browse/COUCHDB-194 to be fixed...
- endkey=['tweet', self.account.details['_id'] + 'ZZZZZZZZZZZZZZZZZZ']
- return get_db().openView('raindrop!messages!by', 'by_storage',
- startkey=startkey, endkey=endkey,
- ).addCallback(self.got_seen_tweets
- )
-
- def got_seen_tweets(self, rows):
- self.seen_tweets = set([r['key'][2] for r in rows])
- logger.debug("Have %d tweets already in the database",
- len(self.seen_tweets))
-
# build the list of all users we will fetch tweets from.
return threads.deferToThread(self.twit.GetFriends
).addCallback(self.got_friends)
-
+
def got_friends(self, friends):
- dl = []
- for fid in [None] + [f.screen_name for f in friends]:
- dl.append(threads.deferToThread(self.twit.GetUserTimeline, fid
+ # None at the start means 'me'
+ self.friends_remaining = [None] + [f.screen_name for f in friends]
+ return self.process_next_friend()
+
+ def process_next_friend(self):
+ if not self.friends_remaining:
+ logger.debug("Finished processing twitter friends")
+ return
+
+ fid = self.friends_remaining.pop()
+ return threads.deferToThread(self.twit.GetUserTimeline, fid
).addCallback(self.got_friend_timeline, fid
).addErrback(self.err_friend_timeline, fid
)
- )
-
- return defer.DeferredList(dl)
def err_friend_timeline(self, failure, fid):
logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
+ return self.process_next_friend()
def got_friend_timeline(self, timeline, fid):
- dl = []
- for tweet in timeline:
- if tweet.id not in self.seen_tweets:
- logger.info("New tweet %s", tweet.id)
- # create the couch document for the tweet itself.
- doc = dict(
- type='rawMessage',
- subtype='tweet',
- account_id=self.account.details['_id'],
- storage_key=tweet.id,
- )
- for name in TWEET_PROPS:
- val = getattr(tweet, name)
- # simple hacks - users just become the user ID.
- if isinstance(val, twitter.User):
- val = val.id
- doc['twitter_'+name.lower()] = val
- dl.append(get_db().saveDoc(doc
- ).addCallback(self.saved_tweet, tweet
- ))
- else:
- # Must be a seen one.
- logger.debug("Skipping seen tweet '%s'", tweet.id)
- return defer.DeferredList(dl)
+ self.current_fid = fid
+ self.remaining_tweets = timeline[:]
+ return self.process_next_tweet()
+
+ def process_next_tweet(self):
+ if not self.remaining_tweets:
+ logger.debug("Finished processing timeline")
+ return self.process_next_friend()
+
+ tweet = self.remaining_tweets.pop()
+
+ logger.debug("seeing if tweet %r exists", tweet.id)
+ docid = "tweet.%d" % (tweet.id,)
+ return self.doc_model.open_document(docid,
+ ).addCallback(self.maybe_process_tweet, docid, tweet)
+
+ def maybe_process_tweet(self, existing_doc, docid, tweet):
+ if existing_doc is None:
+ # put the 'raw' document object together and save it.
+ logger.info("New tweet %s", tweet.id)
+ # create the couch document for the tweet itself.
+ doc = {}
+ for name in TWEET_PROPS:
+ val = getattr(tweet, name)
+ # simple hacks - users just become the user ID.
+ if isinstance(val, twitter.User):
+ val = val.id
+ doc['twitter_'+name.lower()] = val
+ return self.doc_model.create_raw_document(docid, doc, 'proto/twitter',
+ self.account
+ ).addCallback(self.saved_tweet, tweet)
+ else:
+ # Must be a seen one.
+ logger.debug("Skipping seen tweet '%s'", tweet.id)
+ return self.process_next_tweet()
def saved_tweet(self, result, tweet):
logger.debug("Finished processing of new tweet '%s'", tweet.id)
+ return self.process_next_tweet()
def finished(self, result):
- from ..sync import get_conductor
- return get_conductor().accountFinishedSync(self.account, result)
+ return self.conductor.on_synch_finished(self.account, result)
+
+
+# A 'converter' - takes a proto/twitter as input and creates a
+# 'message' as output (some other intermediate step might end up more
+# appopriate)
+class TwitterConverter(base.ConverterBase):
+ def convert(self, doc):
+ return {'from': ['twitter', doc['twitter_user']],
+ 'body': doc['twitter_text']}
class TwitterAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
- def startSync(self, conductor):
- return TwitterProcessor(self, reactor).attach()
+ def startSync(self, conductor, doc_model):
+ return TwitterProcessor(self, conductor, doc_model).attach()