--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -192,28 +192,70 @@ class DocumentModel(object):
def _cb_doc_opened(self, result):
if isinstance(result, Failure):
result.trap(twisted.web.error.Error)
if result.value.status != '404': # not found
result.raiseException()
result = None # indicate no doc exists.
return result
- def create_raw_document(self, docid, doc, doc_type, account, attachments=None):
- assert '_id' not in doc # that isn't how you specify the ID.
+ def _prepare_raw_doc(self, account, docid, doc, doc_type):
+ assert '_id' not in doc, doc # that isn't how you specify the ID.
assert '!' not in docid, docid # these chars are special.
assert 'raindrop_account' not in doc, doc # we look after that!
doc['raindrop_account'] = account.details['_id']
assert 'type' not in doc, doc # we look after that!
doc['type'] = doc_type
assert 'raindrop_seq' not in doc, doc # we look after that!
doc['raindrop_seq'] = get_seq()
+ def create_raw_documents(self, account, doc_infos):
+ """A high-performance version of 'create_raw_document', but doesn't
+ support attachments yet."""
+ docs = []
+ dids = [] # purely for the log :(
+ logger.debug('create_raw_documents preparing %d docs', len(doc_infos))
+ for (docid, doc, doc_type) in doc_infos:
+ self._prepare_raw_doc(account, docid, doc, doc_type)
+ # in a bulk-update, the ID is in the doc itself.
+ doc['_id'] = docid
+ docs.append(doc)
+ dids.append(docid)
+ logger.debug('create_raw_documents saving docs %s', dids)
+ return self.db.updateDocuments(docs
+ ).addCallback(self._cb_saved_multi_docs,
+ ).addErrback(self._cb_save_multi_failed,
+ )
+
+ def _cb_saved_multi_docs(self, result):
+ # result: {'ok': True, 'new_revs': [{'rev': 'xxx', 'id': '...'}, ...]}
+ logger.debug("saved multiple docs with result=%(ok)s", result)
+ return result
+
+ def _cb_save_multi_failed(self, failure):
+ logger.error("Failed to save lotsa docs: %s", failure)
+ failure.raiseException()
+
+ def create_raw_document(self, account, docid, doc, doc_type, attachments=None):
+ self._prepare_raw_doc(account, docid, doc, doc_type)
+ # XXX - attachments need more thought - ultimately we need to be able
+ # to 'push' them via a generator or similar to avoid reading them
+ # entirely in memory. Further, we need some way of the document
+ # knowing if the attachment failed (or vice-versa) given we have
+ # no transactional semantics.
+ # fixup attachments.
+ try:
+ attachments = doc['_attachments']
+ # nuke attachments specified
+ del doc['_attachments']
+ except KeyError:
+ attachments = None
+
# save the document.
logger.debug('create_raw_document saving doc %r', docid)
qid = quote_id(docid)
return self.db.saveDoc(doc, docId=qid,
).addCallback(self._cb_saved_document, 'raw-message', docid
).addErrback(self._cb_save_failed, 'raw-message', docid
).addCallback(self._cb_save_attachments, attachments, qid
)
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -1,11 +1,11 @@
# This is the raindrop pipeline; it moves messages from their most raw
# form to their most useful form.
-from twisted.internet import defer, reactor
+from twisted.internet import defer, reactor, task
import logging
logger = logging.getLogger(__name__)
# Simple forward chaining.
chain = [
# from_type, to_type, transformer)
('proto/test', 'raw/message/rfc822',
@@ -29,77 +29,69 @@ chain = [
('anno/tags', None, None),
]
class Pipeline(object):
def __init__(self, doc_model):
self.doc_model = doc_model
self.forward_chain = {}
+ # use a cooperator to do the work via a generator.
+ # XXX - should we own the cooperator or use our parents?
+ self.coop = task.Cooperator()
for from_type, to_type, xformname in chain:
if xformname:
root, tail = xformname.rsplit('.', 1)
try:
mod = __import__(root, fromlist=[tail])
klass = getattr(mod, tail)
inst = klass(doc_model)
except:
logger.exception("Failed to load extension %r", xformname)
else:
self.forward_chain[from_type] = (to_type, inst)
else:
assert not to_type, 'no xformname must mean no to_type'
self.forward_chain[from_type] = None
def start(self):
- return defer.maybeDeferred(self.process_all_documents)
+ return self.coop.coiterate(self.gen_all_documents())
- def process_all_documents(self):
+ def gen_all_documents(self):
# check *every* doc in the DB. This exists until we can derive
# a workqueue based on views.
self.num_this_process = 0
- return self.doc_model.db.openView('raindrop!messages!by',
- 'by_doc_roots',
- group=True,
- ).addCallback(self._cb_roots_opened)
-
- def _cb_maybe_reprocess_all_documents(self, result):
- if self.num_this_process:
- logger.debug('pipeline processed %d documents last time; trying again',
- self.num_this_process)
- return self.start()
- logger.info('pipeline is finished.')
+ while True:
+ logger.debug('opening view for work queue...')
+ yield self.doc_model.db.openView('raindrop!messages!by',
+ 'by_doc_roots',
+ group=True,
+ ).addCallback(self._cb_roots_opened)
+ logger.debug('this time we did %d documents', self.num_this_process)
+ if self.num_this_process == 0:
+ break
+ logger.debug('finally run out of documents.')
def _cb_roots_opened(self, rows):
- self.remaining_roots = [r['key'] for r in rows]
- return self._process_next_root()
+ logger.info('work queue has %d items to process.', len(rows))
+ def gen_todo(todo):
+ for row in todo:
+ did = row['key']
+ logger.debug("Finding last extension point for %s", did)
+ yield self.doc_model.get_last_ext_for_document(did
+ ).addCallback(self._cb_got_doc_last_ext, did
+ ).addErrback(self._eb_doc_failed
+ )
- def _process_next_root(self):
- if not self.remaining_roots:
- logger.debug("Finished document roots")
- d = defer.Deferred()
- d.addCallback(self._cb_maybe_reprocess_all_documents)
- d.callback(None)
- return d
-
- did = self.remaining_roots.pop()
- logger.debug("Finding last extension point for %s", did)
- return self.doc_model.get_last_ext_for_document(did
- ).addCallback(self._cb_got_doc_last_ext, did
- ).addErrback(self._eb_doc_failed
- ).addCallback(self._cb_did_doc_root
- )
+ return self.coop.coiterate(gen_todo(rows))
def _eb_doc_failed(self, failure):
logger.error("FAILED to pipe a doc: %s", failure)
# and we will auto skip to the next doc...
- def _cb_did_doc_root(self, result):
- return reactor.callLater(0, self._process_next_root)
-
def _cb_got_doc_last_ext(self, ext_info, rootdocid):
last_ext, docid = ext_info
if last_ext is None:
logger.debug("Document '%s' doesn't appear to be a message; skipping",
rootdocid)
return None
logger.debug("Last extension for doc '%s' is '%s'", docid, last_ext)
try:
@@ -107,16 +99,17 @@ class Pipeline(object):
except KeyError:
logger.warning("Can't find transformer for message type %r - skipping %r",
last_ext, docid)
return None
if xform_info is None:
logger.debug("Document %r is already at its terminal type of %r",
rootdocid, last_ext)
return None
+ logger.info("Processing document %r", docid)
return self.doc_model.open_document(docid
).addCallback(self._cb_got_last_doc, rootdocid, xform_info
)
def _cb_got_last_doc(self, doc, rootdocid, xform_info):
assert doc is not None, "failed to locate the doc for %r" % rootdocid
dest_type, xformer = xform_info
logger.debug("calling %r to create a %s from %s", xformer, dest_type,
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,9 +1,9 @@
-from twisted.internet import protocol, ssl, defer, error
+from twisted.internet import protocol, ssl, defer, error, task
from twisted.mail import imap4
import logging
from ..proc import base
from ..model import get_db
from ..ext.message.rfc822 import doc_from_bytes
brat = base.Rat
@@ -12,26 +12,22 @@ logger = logging.getLogger(__name__)
class ImapClient(imap4.IMAP4Client):
'''
Much of our logic here should perhaps be in another class that holds a
reference to the IMAP4Client instance subclass. Consider refactoring if we
don't turn out to benefit from the subclassing relationship.
'''
- def finished(self, result):
- # See bottom of file - it would be good to remove this...
- logger.info("Finished synchronizing IMAP folders")
- self.conductor.on_synch_finished(self.account, result)
-
def serverGreeting(self, caps):
logger.debug("IMAP server greeting: capabilities are %s", caps)
+ self.coop = task.Cooperator()
return self._doAuthenticate(
).addCallback(self._reqList
- ).addBoth(self.finished)
+ )
def _doAuthenticate(self):
if self.account.details.get('crypto') == 'TLS':
d = self.startTLS(self.factory.ctx)
d.addErrback(self.accountStatus,
brat.SERVER, brat.BAD, brat.CRYPTO, brat.PERMANENT)
d.addCallback(self._doLogin)
else:
@@ -45,130 +41,112 @@ class ImapClient(imap4.IMAP4Client):
return self.login(self.account.details['username'],
self.account.details['password'])
def _reqList(self, *args, **kwargs):
self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
return self.list('', '*').addCallback(self._procList)
def _procList(self, result, *args, **kwargs):
- # As per http://python.codefetch.com/example/ow/tnpe_code/ch07/imapdownload.py,
- # We keep a 'stack' of items left to process in an instance variable...
- # (*sob* - but its not clear why this doesn't suffer from
- # death-by-recursion like the other impls can demonstrate? Even though we
- # use a 'stack' of items left to process, each call to
- # _process_next_folder is recursive?
- self.folder_infos = result[:]
- return self._process_next_folder()
+ return self.coop.coiterate(self.gen_folder_list(result))
- def _process_next_folder(self):
- if not self.folder_infos:
- # yay - all done!
- return
+ def gen_folder_list(self, result):
+ for flags, delim, name in result:
+ logger.debug('Processing folder %s (flags=%s)', name, flags)
+ if r"\Noselect" in flags:
+ logger.debug("'%s' is unselectable - skipping", name)
+ continue
- flags, delim, name = self.folder_infos.pop()
- self.current_folder = name
- logger.debug('Processing folder %s (flags=%s)', name, flags)
- if r"\Noselect" in flags:
- logger.debug("'%s' is unselectable - skipping", name)
- return self._process_next_folder()
+ # XXX - sob - markh sees:
+ # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
+ # although we obviously have seen them already in the other folders.
+ if name and name.startswith('['):
+ logger.info("'%s' appears special -skipping", name)
+ continue
- # XXX - sob - markh sees:
- # 'Folder [Gmail]/All Mail has 38163 messages, 36391 of which we haven't seen'
- # although we obviously have seen them already in the other folders.
- if name and name.startswith('['):
- logger.info("'%s' appears special -skipping", name)
- return self._process_next_folder()
-
- return self.examine(name
+ yield self.examine(name
).addCallback(self._examineFolder, name
).addErrback(self._cantExamineFolder, name)
+ logger.debug('imap processing finished.')
+ yield self.conductor.on_synch_finished(self.account, None) # XXX - must die!
def _examineFolder(self, result, folder_path):
logger.debug('Looking for messages already fetched for folder %s', folder_path)
return get_db().openView('raindrop!proto!imap', 'seen',
startkey=[folder_path], endkey=[folder_path, {}]
).addCallback(self._fetchAndProcess, folder_path)
def _fetchAndProcess(self, rows, folder_path):
# XXX - we should look at the flags and update the message if it's not
# the same - later.
+ logger.debug('_FetchAndProcess says we have seen %d items for %r',
+ len(rows), folder_path)
seen_uids = set(row['key'][1] for row in rows)
# now build a list of all message currently in the folder.
allMessages = imap4.MessageSet(1, None)
return self.fetchUID(allMessages, True).addCallback(
self._gotUIDs, folder_path, seen_uids)
def _gotUIDs(self, uidResults, name, seen_uids):
all_uids = set(int(result['UID']) for result in uidResults.values())
- self.messages_remaining = all_uids - seen_uids
+ remaining = all_uids - seen_uids
logger.info("Folder %s has %d messages, %d new", name,
- len(all_uids), len(self.messages_remaining))
- return self._process_next_message()
-
- def _process_next_message(self):
- logger.debug("processNextMessage has %d messages to go...",
- len(self.messages_remaining))
- if not self.messages_remaining:
- return self._process_next_folder()
+ len(all_uids), len(remaining))
+ def gen_remaining(folder_name, reming):
+ for uid in reming:
+ # XXX - we need something to make this truly unique.
+ did = "%s#%d" % (folder_name, uid)
+ logger.debug("new imap message %r - fetching content", did)
+ # grr - we have to get the rfc822 body and the flags in separate requests.
+ to_fetch = imap4.MessageSet(uid)
+ yield self.fetchMessage(to_fetch, uid=True
+ ).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
+ )
+ return self.coop.coiterate(gen_remaining(name, remaining))
- uid = self.messages_remaining.pop()
- # XXX - we need something to make this truly unique.
- did = "%s#%d" % (self.current_folder, uid)
- logger.debug("new imap message %r - fetching content", did)
- # grr - we have to get the rfc822 body and the flags in separate requests.
- to_fetch = imap4.MessageSet(uid)
- return self.fetchMessage(to_fetch, uid=True
- ).addCallback(self._cb_got_body, uid, did, to_fetch
- )
-
- def _cb_got_body(self, result, uid, did, to_fetch):
+ def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
_, result = result.popitem()
content = result['RFC822']
# grr - get the flags
logger.debug("message %r has %d bytes; fetching flags", did, len(content))
return self.fetchFlags(to_fetch, uid=True
- ).addCallback(self._cb_got_flags, uid, did, content
+ ).addCallback(self._cb_got_flags, uid, did, folder_name, content
).addErrback(self._cantGetMessage
)
- def _cb_got_flags(self, result, uid, did, content):
+ def _cb_got_flags(self, result, uid, did, folder_name, content):
logger.debug("flags are %s", result)
assert len(result)==1, result
_, result = result.popitem()
flags = result['FLAGS']
# put the 'raw' document object together and save it.
doc = dict(
- storage_key=[self.current_folder, uid],
+ storage_key=[folder_name, uid],
imap_flags=flags,
)
attachments = {'rfc822' : {'content_type': 'message',
'data': content,
}
}
- return self.doc_model.create_raw_document(did, doc, 'proto/imap',
- self.account,
+ return self.doc_model.create_raw_document(self.account,
+ did, doc, 'proto/imap',
attachments=attachments,
).addCallback(self._cb_saved_message)
-
+
def _cantGetMessage(self, failure):
logger.error("Failed to fetch message: %s", failure)
- return self._process_next_message()
def _cb_saved_message(self, result):
logger.debug("Saved message %s", result)
- return self._process_next_message()
def _cantSaveDocument(self, failure):
logger.error("Failed to save message: %s", failure)
- return self._process_next_message()
def _cantExamineFolder(self, failure, name, *args, **kw):
logger.warning("Failed to examine folder '%s': %s", name, failure)
- return self._process_next_folder()
def accountStatus(self, result, *args):
return self.account.reportStatus(*args)
class ImapClientFactory(protocol.ClientFactory):
protocol = ImapClient
@@ -242,11 +220,8 @@ class IMAPConverter(base.ConverterBase):
class IMAPAccount(base.AccountBase):
def __init__(self, db, details):
self.db = db
self.details = details
def startSync(self, conductor, doc_model):
self.factory = ImapClientFactory(self, conductor, doc_model)
self.factory.connect()
- # XXX - wouldn't it be good to return a deferred here, so the conductor
- # can reliably wait for the deferred to complete, rather than forcing
- # each deferred to manage 'finished' itself?
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,16 +1,16 @@
#!/usr/bin/env python
'''
Fetch skype contacts and chats.
'''
import logging
import twisted.python.log
-from twisted.internet import defer, threads
+from twisted.internet import defer, threads, task
from ..proc import base
import Skype4Py
logger = logging.getLogger(__name__)
# These are the raw properties we fetch from skype.
@@ -64,21 +64,21 @@ def simple_convert(str_val, typ):
return typ(str_val)
class TwistySkype(object):
def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
self.doc_model = doc_model
+ # use a cooperator to do the work via a generator.
+ # XXX - should we own the cooperator or use our parents?
+ self.coop = task.Cooperator()
self.skype = Skype4Py.Skype()
- def finished(self, result):
- return self.conductor.on_synch_finished(self.account, result)
-
def get_docid_for_chat(self, chat):
return "skypechat-" + chat.Name.encode('utf8') # hrmph!
def get_docid_for_msg(self, msg):
return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
def attach(self):
logger.info("attaching to skype...")
@@ -89,133 +89,115 @@ class TwistySkype(object):
def attached(self, status):
logger.info("attached to skype - getting chats")
return threads.deferToThread(self.skype._GetChats
).addCallback(self._cb_got_chats
)
def _cb_got_chats(self, chats):
logger.debug("skype has %d chat(s) total", len(chats))
- # 'chats' is a tuple
- self.remaining_chats = list(chats)
- return self.process_next_chat()
-
-
- def process_next_chat(self):
- if not self.remaining_chats:
- logger.debug("finished processing chats")
- self.finished(None) # not sure this is the right place...
- return
-
- chat = self.remaining_chats.pop()
# fetch all the messages (future optimization; all we need are the
# IDs, but we suffer from creating an instance wrapper for each item.
# Sadly the skype lib doesn't offer a clean way of doing this.)
- return threads.deferToThread(chat._GetMessages
- ).addCallback(self._cb_got_messages, chat,
- )
+ def gen_chats(chats):
+ for chat in chats:
+ yield threads.deferToThread(chat._GetMessages
+ ).addCallback(self._cb_got_messages, chat,
+ )
+ logger.info("nothing left to do - finished!")
+ # XXX - on_synch_finished must die - caller knows this!!!
+ yield self.conductor.on_synch_finished(self.account, None)
+
+ return self.coop.coiterate(gen_chats(chats))
def _cb_got_messages(self, messages, chat):
logger.debug("chat '%s' has %d message(s) total; looking for new ones",
chat.Name, len(messages))
# Finally got all the messages for this chat. Execute a view to
# determine which we have seen (note that we obviously could just
- # fetch the *entire* view once - but we do it this way on purpose
- # to ensure we remain scalable...)
+ # fetch the *entire* chats+msgs view once - but we do it this way on
+ # purpose to ensure we remain scalable...)
return self.doc_model.db.openView('raindrop!proto!skype', 'seen',
startkey=[chat.Name],
endkey=[chat.Name, {}]
).addCallback(self._cb_got_seen, chat, messages
)
def _cb_got_seen(self, result, chat, messages):
- self.msgs_by_id = dict((m._Id, m) for m in messages)
- self.current_chat = chat
+ msgs_by_id = dict((m._Id, m) for m in messages)
chatname = chat.Name
# The view gives us a list of [chat_name, msg_id], where msg_id is
# None if we've seen the chat itself. Create a set of messages we
# *haven't* seen - including the [chat_name, None] entry if applic.
all_keys = [(chatname, None)]
- all_keys.extend((chatname, mid) for mid in self.msgs_by_id.keys())
+ all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
seen_chats = set([tuple(row['key']) for row in result])
- self.remaining_items = set(all_keys)-set(seen_chats)
+ add_bulk = [] # we bulk-update these at the end!
+ remaining = set(all_keys)-set(seen_chats)
# we could just process the empty list as normal, but the logging of
# an info when we do have items is worthwhile...
- if not self.remaining_items:
+ if not remaining:
logger.debug("Chat %r has no new items to process", chatname)
- self.msgs_by_id = None
- return self.process_next_chat()
+ return None
# we have something to do...
logger.info("Chat %r has %d items to process", chatname,
- len(self.remaining_items))
+ len(remaining))
logger.debug("we've already seen %d items from this chat",
len(seen_chats))
- return self.process_next_item()
-
- def process_next_item(self):
- if not self.remaining_items:
- self.msgs_by_id = None
- logger.debug("finished processing messages for this chat")
- return self.conductor.reactor.callLater(0, self.process_next_chat)
+ return self.coop.coiterate(self.gen_items(chat, remaining, msgs_by_id))
- chatname, msgid = self.remaining_items.pop()
- chat = self.current_chat
- if msgid is None:
- # we haven't seen the chat itself - do that.
- logger.debug("Creating new skype chat %r", chat.Name)
- # make a 'deferred list' to fetch each property one at a time.
- ds = [threads.deferToThread(chat._Property, p)
- for p, _ in CHAT_PROPS]
- ret = defer.DeferredList(ds
- ).addCallback(self._cb_got_chat_props, chat)
- else:
- msg = self.msgs_by_id[msgid]
- # A new msg in this chat.
- logger.debug("New skype message %d", msg._Id)
- # make a 'deferred list' to fetch each property one at a time.
- ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
- ret = defer.DeferredList(ds
- ).addCallback(self._cb_got_msg_props, chat, msg)
+ def gen_items(self, chat, todo, msgs_by_id):
+ tow = []
+ for _, msgid in todo:
+ if msgid is None:
+ # we haven't seen the chat itself - do that.
+ logger.debug("Creating new skype chat %r", chat.Name)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(chat._Property, p)
+ for p, _ in CHAT_PROPS]
+ yield defer.DeferredList(ds
+ ).addCallback(self._cb_got_chat_props, chat, tow)
+ else:
+ msg = msgs_by_id[msgid]
+ # A new msg in this chat.
+ logger.debug("New skype message %d", msg._Id)
+ # make a 'deferred list' to fetch each property one at a time.
+ ds = [threads.deferToThread(msg._Property, p) for p, _ in MSG_PROPS]
+ yield defer.DeferredList(ds
+ ).addCallback(self._cb_got_msg_props, chat, msg, tow)
- ret.addCallback(self._cb_doc_saved)
- return ret
+ if tow:
+ yield self.doc_model.create_raw_documents(self.account, tow)
+ logger.debug("finished processing chat %r", chat.Name)
- def _cb_got_chat_props(self, results, chat):
+ def _cb_got_chat_props(self, results, chat, pending):
logger.debug("got chat %r properties: %s", chat.Name, results)
docid = self.get_docid_for_chat(chat)
doc ={}
for (name, typ), (ok, val) in zip(CHAT_PROPS, results):
if ok:
doc['skype_'+name.lower()] = simple_convert(val, typ)
# 'Name' is a special case that doesn't come via a prop. We use
# 'chatname' as that is the equiv attr on the messages themselves.
doc['skype_chatname'] = chat.Name
- return self.doc_model.create_raw_document(docid, doc,
- 'proto/skype-chat',
- self.account
- )
+ pending.append((docid, doc, 'proto/skype-chat'))
- def _cb_got_msg_props(self, results, chat, msg):
+ def _cb_got_msg_props(self, results, chat, msg, pending):
+ logger.debug("got message properties for %s", msg._Id)
doc = {}
for (name, typ), (ok, val) in zip(MSG_PROPS, results):
if ok:
doc['skype_'+name.lower()] = simple_convert(val, typ)
doc['skype_id'] = msg._Id
# we include the skype username with the ID as they are unique per user.
docid = self.get_docid_for_msg(msg)
- return self.doc_model.create_raw_document(docid, doc, 'proto/skype-msg',
- self.account
- )
+ pending.append((docid, doc, 'proto/skype-msg'))
- def _cb_doc_saved(self, result):
- # the 'pattern' we copied from the imap sample code calls for us
- # to recurse - but that can recurse too far...
- return self.conductor.reactor.callLater(0, self.process_next_item)
# A 'converter' - takes a proto/skype-msg as input and creates a
# 'message' as output (some other intermediate step might end up more
# appopriate)
class SkypeConverter(base.ConverterBase):
def convert(self, doc):
# We need to open the 'chat' for this Message. Clearly this is
# going to be inefficient...
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,60 +1,76 @@
# This is an implementation of a 'test' protocol.
import logging
-from twisted.internet import defer, error
+from twisted.internet import defer, error, task
logger = logging.getLogger(__name__)
from ...proc import base
class TestMessageProvider(object):
def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
self.doc_model = doc_model
- def attach(self):
- logger.info("preparing to synch test messages...")
- # Is a 'DeferredList' more appropriate here?
- d = defer.Deferred()
+ def sync_generator(self):
num_docs = int(self.account.details.get('num_test_docs', 5))
logger.info("Creating %d test documents", num_docs)
for i in xrange(num_docs):
- d.addCallback(self.check_test_message, i)
- d.addCallback(self.finished)
- return d.callback(None)
+ yield self.check_test_message(i)
+ if self.bulk_docs:
+ yield self.doc_model.create_raw_documents(self.account,
+ self.bulk_docs)
+ yield self.conductor.on_synch_finished(self.account, None) # must die!
- def check_test_message(self, result, i):
+ def attach(self):
+ logger.info("preparing to synch test messages...")
+ self.bulk_docs = [] # anything added here will be done in bulk
+ # Experimenting with a 'cooperator'...
+ coop = task.Cooperator()
+ gen = self.sync_generator()
+ return coop.coiterate(gen)
+
+ def check_test_message(self, i):
logger.debug("seeing if message with ID %d exists", i)
return self.doc_model.open_document("test.message.%d" % i,
).addCallback(self.process_test_message, i)
def process_test_message(self, existing_doc, doc_num):
if existing_doc is None:
- logger.info("Creating new test message with ID %d", doc_num)
+ doc_type = 'proto/test'
doc = dict(
storage_key=doc_num,
)
did = "test.message.%d" % doc_num
- return self.doc_model.create_raw_document(did, doc, 'proto/test',
- self.account
- ).addCallback(self.saved_message, doc)
+ if doc_num % 2 == 0:
+ logger.info("Queueing test message with ID %d for bulk update",
+ doc_num)
+ self.bulk_docs.append((did, doc, doc_type))
+ return None # we will get these later...
+ else:
+ logger.info("Creating new test message with ID %d", doc_num)
+ return self.doc_model.create_raw_document(self.account,
+ did, doc, doc_type
+ ).addCallback(self.saved_message, doc
+ )
else:
logger.info("Skipping test message with ID %d - already exists",
doc_num)
# we are done.
- def finished(self, result):
- self.conductor.on_synch_finished(self.account, result)
-
def saved_message(self, result, doc):
logger.debug("Finished saving test message %r", result)
# done
+ def saved_bulk_messages(self, result, n):
+ logger.debug("Finished saving %d test messages in bulk", n)
+ # done
+
# A 'converter' - takes a proto/test as input and creates a
# 'raw/message/rfc822' as output.
class TestConverter(base.ConverterBase):
def convert(self, doc):
me = doc['storage_key']
headers = {'from': 'From: from%(storage_key)d@test.com',
'subject' : 'This is test document %(storage_key)d',
}
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -4,17 +4,17 @@ Fetch twitter raw* objects
'''
# prevent 'import twitter' finding this module!
from __future__ import absolute_import
import logging
import re
import twisted.python.log
-from twisted.internet import defer, threads
+from twisted.internet import defer, threads, task
from ..proc import base
# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
# about getting twisted support into the twitter package.
# Sadly, the twisty-twitter package has issues of its own (like calling
# delegates outside the deferred mechanism meaning you can't rely on callbacks
# being completed when they say they are.)
@@ -34,100 +34,91 @@ TWEET_PROPS = """
class TwitterProcessor(object):
def __init__(self, account, conductor, doc_model):
self.account = account
self.conductor = conductor
self.doc_model = doc_model
self.twit = None
self.seen_tweets = None
+ # use a cooperator to do the work via a generator.
+ # XXX - should we own the cooperator or use our parents?
+ self.coop = task.Cooperator()
def attach(self):
logger.info("attaching to twitter...")
username = self.account.details['username']
pw = self.account.details['password']
return threads.deferToThread(twitter.Api,
username=username, password=pw
).addCallback(self.attached
)
def attached(self, twit):
+ logger.info("attached to twitter - fetching friends")
self.twit = twit
# build the list of all users we will fetch tweets from.
return threads.deferToThread(self.twit.GetFriends
- ).addCallback(self.got_friends)
-
- def got_friends(self, friends):
- # None at the start means 'me'
- self.friends_remaining = [None] + [f.screen_name for f in friends]
- return self.process_next_friend()
+ ).addCallback(self._cb_got_friends)
- def process_next_friend(self):
- if not self.friends_remaining:
- logger.debug("Finished processing twitter friends")
- return defer.maybeDeferred(self.finished, None)
+ def _cb_got_friends(self, friends):
+ return self.coop.coiterate(self.gen_friends_info(friends))
+
+ def gen_friends_info(self, friends):
+ logger.info("apparently I've %d friends", len(friends))
+ def do_fid(fid):
+ return threads.deferToThread(self.twit.GetUserTimeline, fid
+ ).addCallback(self.got_friend_timeline, fid
+ ).addErrback(self.err_friend_timeline, fid
+ ).addCallback(self.finished_friend
+ )
- fid = self.friends_remaining.pop()
- return threads.deferToThread(self.twit.GetUserTimeline, fid
- ).addCallback(self.got_friend_timeline, fid
- ).addErrback(self.err_friend_timeline, fid
- ).addCallback(self.finished_friend
- )
+ # None means 'me'
+ yield do_fid(None)
+ for f in friends:
+ yield do_fid(f.screen_name)
+
+ logger.debug("Finished friends")
+ yield self.conductor.on_synch_finished(self.account, None) # Must die
def finished_friend(self, result):
- self.conductor.reactor.callLater(0, self.process_next_friend)
+ logger.debug("finished friend: %s", result)
def err_friend_timeline(self, failure, fid):
logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
def got_friend_timeline(self, timeline, fid):
- self.current_fid = fid
- self.remaining_tweets = timeline[:]
logger.debug("Friend %r has %d items in their timeline", fid,
- len(self.remaining_tweets))
- return self.process_next_tweet()
+ len(timeline))
+ return self.coop.coiterate(self.gen_friend_timeline(timeline, fid))
- def process_next_tweet(self):
- if not self.remaining_tweets:
- logger.debug("Finished processing timeline")
- return
-
- tweet = self.remaining_tweets.pop()
-
- logger.debug("seeing if tweet %r exists", tweet.id)
- docid = "tweet.%d" % (tweet.id,)
- return self.doc_model.open_document(docid,
- ).addCallback(self.maybe_process_tweet, docid, tweet)
+ def gen_friend_timeline(self, timeline, fid):
+ for tweet in timeline:
+ logger.debug("seeing if tweet %r exists", tweet.id)
+ docid = "tweet.%d" % (tweet.id,)
+ yield self.doc_model.open_document(docid,
+ ).addCallback(self.maybe_process_tweet, docid, tweet)
def maybe_process_tweet(self, existing_doc, docid, tweet):
if existing_doc is None:
# put the 'raw' document object together and save it.
logger.info("New tweet '%s...' (%s)", tweet.text[:25], tweet.id)
# create the couch document for the tweet itself.
doc = {}
for name in TWEET_PROPS:
val = getattr(tweet, name)
# simple hacks - users just become the user ID.
if isinstance(val, twitter.User):
val = val.id
doc['twitter_'+name.lower()] = val
- return self.doc_model.create_raw_document(docid, doc, 'proto/twitter',
- self.account
- ).addCallback(self.saved_tweet, tweet)
+ return self.doc_model.create_raw_document(
+ self.account, docid, doc, 'proto/twitter')
else:
# Must be a seen one.
logger.debug("Skipping seen tweet '%s'", tweet.id)
- return self.process_next_tweet()
-
- def saved_tweet(self, result, tweet):
- logger.debug("Finished processing of new tweet '%s'", tweet.id)
- return self.process_next_tweet()
-
- def finished(self, result):
- return self.conductor.on_synch_finished(self.account, result)
# A 'converter' - takes a proto/twitter as input and creates a
# 'message' as output (some other intermediate step might end up more
# appropriate)
class TwitterConverter(base.ConverterBase):
re_tags = re.compile(r'#(\w+)')
def convert(self, doc):