--- a/server/python/junius/model.py
+++ b/server/python/junius/model.py
@@ -34,24 +34,16 @@ if sys.platform=='win32':
clock_start = time.time()
time.clock() # time.clock starts counting from zero the first time its called.
def get_seq():
return clock_start + time.clock()
else:
get_seq = time.time
-def _raw_to_rows(raw):
- # {'rows': [], 'total_rows': 0} -> the actual rows.
- ret = raw['rows']
- # hrmph - on a view with start_key etc params, total_rows will be
- # greater than the rows.
- assert 'total_rows' not in raw or len(ret)<=raw['total_rows'], raw
- return ret
-
# from the couchdb package; not sure what makes these names special...
def _encode_options(options):
retval = {}
for name, value in options.items():
if name in ('key', 'startkey', 'endkey') \
or not isinstance(value, basestring):
value = json.dumps(value, allow_nan=False, ensure_ascii=False)
retval[name] = value
@@ -62,24 +54,19 @@ class CouchDB(paisley.CouchDB):
def postob(self, uri, ob):
# This seems to not use keep-alives etc where using twisted.web
# directly does?
body = json.dumps(ob, allow_nan=False,
ensure_ascii=False).encode('utf-8')
return self.post(uri, body)
def openView(self, *args, **kwargs):
- # The base class of this returns the raw json object - eg:
- # {'rows': [], 'total_rows': 0}
- # XXX - Note that paisley isn't interested in this enhancement, so
- # we need to remove it...
-
- # *sob* - and it also doesn't handle encoding options...
+ # paisley doesn't handle encoding options...
return super(CouchDB, self).openView(*args, **_encode_options(kwargs)
- ).addCallback(_raw_to_rows)
+ )
def openDoc(self, dbName, docId, revision=None, full=False, attachment=""):
# paisley appears to use an old api for attachments?
if attachment:
uri = "/%s/%s/%s" % (dbName, docId, attachment)
return self.get(uri)
return super(CouchDB, self).openDoc(dbName, docId, revision, full)
@@ -180,16 +167,21 @@ def quote_id(doc_id):
class DocumentModel(object):
"""The layer between 'documents' and the 'database'. Responsible for
creating the unique ID for each document (other than the raw document),
for fetching documents based on an ID, etc
"""
def __init__(self, db):
self.db = db
+ def open_view(self, *args, **kwargs):
+ # A convenience method for completeness so consumers don't hit the
+ # DB directly (and to give a consistent 'style'). Is it worth it?
+ return self.db.openView(*args, **kwargs)
+
def open_document(self, doc_id, **kw):
"""Open the specific document, returning None if it doesn't exist"""
return self.db.openDoc(quote_id(doc_id), **kw).addBoth(self._cb_doc_opened)
def _cb_doc_opened(self, result):
if isinstance(result, Failure):
result.trap(twisted.web.error.Error)
if result.value.status != '404': # not found
@@ -335,22 +327,23 @@ class DocumentModel(object):
def get_last_ext_for_document(self, doc_id):
"""Given a base docid, find the most-recent extension to have run.
This will differ from the latest extension in the document chain if
the document chain has been 'reset' for any reason (eg, change
detected at the source of the message, user adding annotations, etc)
"""
startkey = [doc_id]
endkey = [doc_id, {}]
- return self.db.openView('raindrop!messages!workqueue',
- 'by_doc_extension_sequence',
- startkey=startkey, endkey=endkey
+ return self.open_view('raindrop!messages!workqueue',
+ 'by_doc_extension_sequence',
+ startkey=startkey, endkey=endkey
).addCallback(self._cb_des_opened, doc_id)
- def _cb_des_opened(self, rows, doc_id):
+ def _cb_des_opened(self, result, doc_id):
+ rows = result['rows']
if not rows:
ret = None, None
else:
last = rows[-1]
ret = last["value"], last["id"]
logger.debug("document '%s' has last-extension of %r", doc_id, ret)
return ret
--- a/server/python/junius/pipeline.py
+++ b/server/python/junius/pipeline.py
@@ -51,29 +51,29 @@ class Pipeline(object):
else:
assert not to_type, 'no xformname must mean no to_type'
self.forward_chain[from_type] = None
def unprocess(self):
# A bit of a hack that will suffice until we get better dependency
# management. Determines which doc-types are 'derived', then deletes
# them all.
- def delete_docs(rows, doc_type):
+ def delete_docs(result, doc_type):
docs = []
- to_del = [(row['id'], row['value']) for row in rows]
+ to_del = [(row['id'], row['value']) for row in result['rows']]
for id, rev in to_del:
docs.append({'_id': id, '_rev': rev, '_deleted': True})
logger.info('deleting %d messages of type %r', len(docs), doc_type)
return self.doc_model.db.updateDocuments(docs)
def gen_deleting_docs(doc_types):
for t in doc_types:
- yield self.doc_model.db.openView('raindrop!messages!by',
- 'by_doc_type',
- key=t,
+ yield self.doc_model.open_view('raindrop!messages!by',
+ 'by_doc_type',
+ key=t,
).addCallback(delete_docs, t)
derived = set()
for from_type, to_info in self.forward_chain.iteritems():
if to_info is not None:
to_type, inst = to_info
derived.add(to_type)
logger.info("deleting documents with types %r", derived)
@@ -83,26 +83,27 @@ class Pipeline(object):
return self.coop.coiterate(self.gen_all_documents())
def gen_all_documents(self):
# check *every* doc in the DB. This exists until we can derive
# a workqueue based on views.
while True:
self.num_this_process = 0
logger.debug('opening view for work queue...')
- yield self.doc_model.db.openView('raindrop!messages!workqueue',
- 'by_doc_roots',
- group=True,
+ yield self.doc_model.open_view('raindrop!messages!workqueue',
+ 'by_doc_roots',
+ group=True,
).addCallback(self._cb_roots_opened)
logger.info('created %d new documents', self.num_this_process)
if self.num_this_process == 0:
break
logger.debug('finally run out of documents.')
- def _cb_roots_opened(self, rows):
+ def _cb_roots_opened(self, result):
+ rows = result['rows']
logger.info('work queue has %d items to check.', len(rows))
def gen_todo(todo):
for row in todo:
did = row['key']
logger.debug("Finding last extension point for %s", did)
yield self.doc_model.get_last_ext_for_document(did
).addCallback(self._cb_got_doc_last_ext, did
).addErrback(self._eb_doc_failed
--- a/server/python/junius/proc/base.py
+++ b/server/python/junius/proc/base.py
@@ -65,16 +65,20 @@ class Rat(object):
#: permanent implies the user must take some action to correct the problem
PERMANENT = 'permanent'
#: unknown means it either doesn't matter or it could be temporary but the
#: user should potentially still be informed
UNKNOWN = 'unknown'
class AccountBase(Rat):
+ def __init__(self, doc_model, details):
+ self.doc_model = doc_model
+ self.details = details
+
def reportStatus(self, what, state, why=Rat.UNKNOWN,
expectedDuration=Rat.UNKNOWN):
'''
Report status relating to this account.
Everything is peachy: EVERYTHING GOOD
Wrong password: ACCOUNT BAD PASSWORD PERMANENT
(may be temporary if a bad password can mean many things)
--- a/server/python/junius/proto/imap.py
+++ b/server/python/junius/proto/imap.py
@@ -1,30 +1,28 @@
-from twisted.internet import protocol, ssl, defer, error, task
+from twisted.internet import protocol, ssl, defer, error
from twisted.mail import imap4
import logging
from ..proc import base
-from ..model import get_db
from ..ext.message.rfc822 import doc_from_bytes
brat = base.Rat
logger = logging.getLogger(__name__)
class ImapClient(imap4.IMAP4Client):
'''
Much of our logic here should perhaps be in another class that holds a
reference to the IMAP4Client instance subclass. Consider refactoring if we
don't turn out to benefit from the subclassing relationship.
'''
def serverGreeting(self, caps):
logger.debug("IMAP server greeting: capabilities are %s", caps)
- self.coop = task.Cooperator()
return self._doAuthenticate(
).addCallback(self._reqList
).addCallback(self.deferred.callback)
def _doAuthenticate(self):
if self.account.details.get('crypto') == 'TLS':
d = self.startTLS(self.factory.ctx)
d.addErrback(self.accountStatus,
@@ -41,17 +39,17 @@ class ImapClient(imap4.IMAP4Client):
return self.login(self.account.details['username'],
self.account.details['password'])
def _reqList(self, *args, **kwargs):
self.account.reportStatus(brat.EVERYTHING, brat.GOOD)
return self.list('', '*').addCallback(self._procList)
def _procList(self, result, *args, **kwargs):
- return self.coop.coiterate(self.gen_folder_list(result))
+ return self.conductor.coop.coiterate(self.gen_folder_list(result))
def gen_folder_list(self, result):
for flags, delim, name in result:
logger.debug('Processing folder %s (flags=%s)', name, flags)
if r"\Noselect" in flags:
logger.debug("'%s' is unselectable - skipping", name)
continue
@@ -64,23 +62,24 @@ class ImapClient(imap4.IMAP4Client):
yield self.examine(name
).addCallback(self._examineFolder, name
).addErrback(self._cantExamineFolder, name)
logger.debug('imap processing finished.')
def _examineFolder(self, result, folder_path):
logger.debug('Looking for messages already fetched for folder %s', folder_path)
- return get_db().openView('raindrop!proto!imap', 'seen',
+ return self.doc_model.open_view('raindrop!proto!imap', 'seen',
startkey=[folder_path], endkey=[folder_path, {}]
).addCallback(self._fetchAndProcess, folder_path)
- def _fetchAndProcess(self, rows, folder_path):
+ def _fetchAndProcess(self, result, folder_path):
# XXX - we should look at the flags and update the message if it's not
# the same - later.
+ rows = result['rows']
logger.debug('_FetchAndProcess says we have seen %d items for %r',
len(rows), folder_path)
seen_uids = set(row['key'][1] for row in rows)
# now build a list of all message currently in the folder.
allMessages = imap4.MessageSet(1, None)
return self.fetchUID(allMessages, True).addCallback(
self._gotUIDs, folder_path, seen_uids)
@@ -94,17 +93,17 @@ class ImapClient(imap4.IMAP4Client):
# XXX - we need something to make this truly unique.
did = "%s#%d" % (folder_name, uid)
logger.debug("new imap message %r - fetching content", did)
# grr - we have to get the rfc822 body and the flags in separate requests.
to_fetch = imap4.MessageSet(uid)
yield self.fetchMessage(to_fetch, uid=True
).addCallback(self._cb_got_body, uid, did, folder_name, to_fetch
)
- return self.coop.coiterate(gen_remaining(name, remaining))
+ return self.conductor.coop.coiterate(gen_remaining(name, remaining))
def _cb_got_body(self, result, uid, did, folder_name, to_fetch):
_, result = result.popitem()
content = result['RFC822']
# grr - get the flags
logger.debug("message %r has %d bytes; fetching flags", did, len(content))
return self.fetchFlags(to_fetch, uid=True
).addCallback(self._cb_got_flags, uid, did, folder_name, content
@@ -144,31 +143,31 @@ class ImapClient(imap4.IMAP4Client):
def accountStatus(self, result, *args):
return self.account.reportStatus(*args)
class ImapClientFactory(protocol.ClientFactory):
protocol = ImapClient
- def __init__(self, account, conductor, doc_model):
+ def __init__(self, account, conductor):
# base-class has no __init__
self.account = account
self.conductor = conductor
- self.doc_model = doc_model
+ self.doc_model = account.doc_model # this is a little confused...
self.ctx = ssl.ClientContextFactory()
self.backoff = 8 # magic number
def buildProtocol(self, addr):
p = self.protocol(self.ctx)
p.factory = self
p.account = self.account
p.conductor = self.conductor
- p.doc_model = self.doc_model
+ p.doc_model = self.account.doc_model
p.deferred = self.deferred # this isn't going to work in reconnect scenarios
return p
def connect(self):
details = self.account.details
logger.debug('attempting to connect to %s:%d (ssl: %s)',
details['host'], details['port'], details['ssl'])
reactor = self.conductor.reactor
@@ -216,15 +215,11 @@ class IMAPConverter(base.ConverterBase):
def _cb_got_attachment(self, content, doc):
assert content, "can't locate attachment for %r" % doc['_id']
# the 'rfc822' module knows what to do...
return doc_from_bytes(content)
class IMAPAccount(base.AccountBase):
- def __init__(self, db, details):
- self.db = db
- self.details = details
-
- def startSync(self, conductor, doc_model):
- self.factory = ImapClientFactory(self, conductor, doc_model)
+ def startSync(self, conductor):
+ self.factory = ImapClientFactory(self, conductor)
return self.factory.connect()
--- a/server/python/junius/proto/skype.py
+++ b/server/python/junius/proto/skype.py
@@ -1,16 +1,16 @@
#!/usr/bin/env python
'''
Fetch skype contacts and chats.
'''
import logging
import twisted.python.log
-from twisted.internet import defer, threads, task
+from twisted.internet import defer, threads
from ..proc import base
import Skype4Py
logger = logging.getLogger(__name__)
# These are the raw properties we fetch from skype.
@@ -60,23 +60,20 @@ def simple_convert(str_val, typ):
return str_val.split()
if typ is bool:
return str_val == "TRUE"
# all the rest are callables which 'do the right thing'
return typ(str_val)
class TwistySkype(object):
- def __init__(self, account, conductor, doc_model):
+ def __init__(self, account, conductor):
self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
self.conductor = conductor
- self.doc_model = doc_model
- # use a cooperator to do the work via a generator.
- # XXX - should we own the cooperator or use our parents?
- self.coop = task.Cooperator()
self.skype = Skype4Py.Skype()
def get_docid_for_chat(self, chat):
return "skypechat-" + chat.Name.encode('utf8') # hrmph!
def get_docid_for_msg(self, msg):
return "skypemsg-%s-%d" % (self.account.details['username'], msg._Id)
@@ -99,54 +96,55 @@ class TwistySkype(object):
# Sadly the skype lib doesn't offer a clean way of doing this.)
def gen_chats(chats):
for chat in chats:
yield threads.deferToThread(chat._GetMessages
).addCallback(self._cb_got_messages, chat,
)
logger.info("skype has finished processing all chats")
- return self.coop.coiterate(gen_chats(chats))
+ return self.conductor.coop.coiterate(gen_chats(chats))
def _cb_got_messages(self, messages, chat):
logger.debug("chat '%s' has %d message(s) total; looking for new ones",
chat.Name, len(messages))
# Finally got all the messages for this chat. Execute a view to
# determine which we have seen (note that we obviously could just
# fetch the *entire* chats+msgs view once - but we do it this way on
# purpose to ensure we remain scalable...)
- return self.doc_model.db.openView('raindrop!proto!skype', 'seen',
- startkey=[chat.Name],
- endkey=[chat.Name, {}]
+ return self.doc_model.open_view('raindrop!proto!skype', 'seen',
+ startkey=[chat.Name],
+ endkey=[chat.Name, {}]
).addCallback(self._cb_got_seen, chat, messages
)
def _cb_got_seen(self, result, chat, messages):
msgs_by_id = dict((m._Id, m) for m in messages)
chatname = chat.Name
# The view gives us a list of [chat_name, msg_id], where msg_id is
# None if we've seen the chat itself. Create a set of messages we
# *haven't* seen - including the [chat_name, None] entry if applic.
all_keys = [(chatname, None)]
all_keys.extend((chatname, mid) for mid in msgs_by_id.keys())
- seen_chats = set([tuple(row['key']) for row in result])
+ seen_chats = set([tuple(row['key']) for row in result['rows']])
add_bulk = [] # we bulk-update these at the end!
remaining = set(all_keys)-set(seen_chats)
# we could just process the empty list as normal, but the logging of
# an info when we do have items is worthwhile...
if not remaining:
logger.debug("Chat %r has no new items to process", chatname)
return None
# we have something to do...
logger.info("Chat %r has %d items to process", chatname,
len(remaining))
logger.debug("we've already seen %d items from this chat",
len(seen_chats))
- return self.coop.coiterate(self.gen_items(chat, remaining, msgs_by_id))
+ return self.conductor.coop.coiterate(
+ self.gen_items(chat, remaining, msgs_by_id))
def gen_items(self, chat, todo, msgs_by_id):
tow = []
for _, msgid in todo:
if msgid is None:
# we haven't seen the chat itself - do that.
logger.debug("Creating new skype chat %r", chat.Name)
# make a 'deferred list' to fetch each property one at a time.
@@ -213,14 +211,10 @@ class SkypeConverter(base.ConverterBase)
'body': doc['skype_body'],
'body_preview': doc['skype_body'][:128],
'conversation_id': doc['skype_chatname'],
'timestamp': doc['skype_timestamp'], # skype's works ok here?
}
class SkypeAccount(base.AccountBase):
- def __init__(self, db, details):
- self.db = db
- self.details = details
-
- def startSync(self, conductor, doc_model):
- return TwistySkype(self, conductor, doc_model).attach()
+ def startSync(self, conductor):
+ return TwistySkype(self, conductor).attach()
--- a/server/python/junius/proto/test/__init__.py
+++ b/server/python/junius/proto/test/__init__.py
@@ -1,38 +1,36 @@
# This is an implementation of a 'test' protocol.
import logging
-from twisted.internet import defer, error, task
+from twisted.internet import defer, error
logger = logging.getLogger(__name__)
from ...proc import base
class TestMessageProvider(object):
- def __init__(self, account, conductor, doc_model):
+ def __init__(self, account, conductor):
self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
self.conductor = conductor
- self.doc_model = doc_model
def sync_generator(self):
num_docs = int(self.account.details.get('num_test_docs', 5))
logger.info("Creating %d test documents", num_docs)
for i in xrange(num_docs):
yield self.check_test_message(i)
if self.bulk_docs:
yield self.doc_model.create_raw_documents(self.account,
self.bulk_docs)
def attach(self):
logger.info("preparing to synch test messages...")
self.bulk_docs = [] # anything added here will be done in bulk
- # Experimenting with a 'cooperator'...
- coop = task.Cooperator()
- gen = self.sync_generator()
- return coop.coiterate(gen)
+ # use the cooperator for testing purposes.
+ return self.conductor.coop.coiterate(self.sync_generator())
def check_test_message(self, i):
logger.debug("seeing if message with ID %d exists", i)
return self.doc_model.open_document("test.message.%d" % i,
).addCallback(self.process_test_message, i)
def process_test_message(self, existing_doc, doc_num):
if existing_doc is None:
@@ -82,14 +80,10 @@ class TestConverter(base.ConverterBase):
"data" : 'test\0blob'
}
}
new_doc = dict(headers=headers, body=body, _attachments=attachments)
return new_doc
class TestAccount(base.AccountBase):
- def __init__(self, db, details):
- self.db = db
- self.details = details
-
- def startSync(self, conductor, doc_model):
- return TestMessageProvider(self, conductor, doc_model).attach()
+ def startSync(self, conductor):
+ return TestMessageProvider(self, conductor).attach()
--- a/server/python/junius/proto/twitter.py
+++ b/server/python/junius/proto/twitter.py
@@ -4,17 +4,17 @@ Fetch twitter raw* objects
'''
# prevent 'import twitter' finding this module!
from __future__ import absolute_import
import logging
import re
import twisted.python.log
-from twisted.internet import defer, threads, task
+from twisted.internet import defer, threads
from ..proc import base
# See http://code.google.com/p/python-twitter/issues/detail?id=13 for info
# about getting twisted support into the twitter package.
# Sadly, the twisty-twitter package has issues of its own (like calling
# delegates outside the deferred mechanism meaning you can't rely on callbacks
# being completed when they say they are.)
@@ -28,25 +28,22 @@ logger = logging.getLogger(__name__)
TWEET_PROPS = """
created_at created_at_in_seconds favorited in_reply_to_screen_name
in_reply_to_user_id in_reply_to_status_id truncated
source id text relative_created_at user
""".split()
class TwitterProcessor(object):
- def __init__(self, account, conductor, doc_model):
+ def __init__(self, account, conductor):
self.account = account
+ self.doc_model = account.doc_model # this is a little confused...
self.conductor = conductor
- self.doc_model = doc_model
self.twit = None
self.seen_tweets = None
- # use a cooperator to do the work via a generator.
- # XXX - should we own the cooperator or use our parents?
- self.coop = task.Cooperator()
def attach(self):
logger.info("attaching to twitter...")
username = self.account.details['username']
pw = self.account.details['password']
return threads.deferToThread(twitter.Api,
username=username, password=pw
).addCallback(self.attached
@@ -55,17 +52,17 @@ class TwitterProcessor(object):
def attached(self, twit):
logger.info("attached to twitter - fetching friends")
self.twit = twit
# build the list of all users we will fetch tweets from.
return threads.deferToThread(self.twit.GetFriends
).addCallback(self._cb_got_friends)
def _cb_got_friends(self, friends):
- return self.coop.coiterate(self.gen_friends_info(friends))
+ return self.conductor.coop.coiterate(self.gen_friends_info(friends))
def gen_friends_info(self, friends):
logger.info("apparently I've %d friends", len(friends))
def do_fid(fid):
return threads.deferToThread(self.twit.GetUserTimeline, fid
).addCallback(self.got_friend_timeline, fid
).addErrback(self.err_friend_timeline, fid
).addCallback(self.finished_friend
@@ -82,17 +79,18 @@ class TwitterProcessor(object):
logger.debug("finished friend: %s", result)
def err_friend_timeline(self, failure, fid):
logger.error("Failed to fetch timeline for '%s': %s", fid, failure)
def got_friend_timeline(self, timeline, fid):
logger.debug("Friend %r has %d items in their timeline", fid,
len(timeline))
- return self.coop.coiterate(self.gen_friend_timeline(timeline, fid))
+ return self.conductor.coop.coiterate(
+ self.gen_friend_timeline(timeline, fid))
def gen_friend_timeline(self, timeline, fid):
for tweet in timeline:
logger.debug("seeing if tweet %r exists", tweet.id)
docid = "tweet.%d" % (tweet.id,)
yield self.doc_model.open_document(docid,
).addCallback(self.maybe_process_tweet, docid, tweet)
@@ -129,14 +127,10 @@ class TwitterConverter(base.ConverterBas
'body': body,
'body_preview': body[:128],
'tags': tags,
'timestamp': int(doc['twitter_created_at_in_seconds'])
}
class TwitterAccount(base.AccountBase):
- def __init__(self, db, details):
- self.db = db
- self.details = details
-
- def startSync(self, conductor, doc_model):
- return TwitterProcessor(self, conductor, doc_model).attach()
+ def startSync(self, conductor):
+ return TwitterProcessor(self, conductor).attach()
--- a/server/python/junius/sync.py
+++ b/server/python/junius/sync.py
@@ -1,17 +1,16 @@
import logging
-from twisted.internet import reactor, defer
+from twisted.internet import reactor, defer, task
from twisted.python.failure import Failure
import twisted.web.error
import paisley
from . import proto as proto
-from .model import get_db
logger = logging.getLogger(__name__)
_conductor = None
from .model import get_doc_model
def get_conductor(options=None):
global _conductor
@@ -30,48 +29,53 @@ class SyncConductor(object):
def __init__(self, options):
self.log = logger
self.options = options
# apparently it is now considered 'good form' to pass reactors around, so
# a future of multiple reactors is possible.
# We capture it here, and all the things we 'conduct' use this reactor
# (but later it should be passed to our ctor too)
self.reactor = reactor
-
- self.db = get_db()
+ self.coop = task.Cooperator()
+ reactor.addSystemEventTrigger("before", "shutdown", self._kill_coop)
+ self.doc_model = get_doc_model()
self.active_accounts = []
+ def _kill_coop(self):
+ logger.debug('stopping the coordinator')
+ self.coop.stop()
+ logger.debug('coordinator stopped')
+
def _ohNoes(self, failure, *args, **kwargs):
self.log.error('OH NOES! failure! %s', failure)
def _getAllAccounts(self):
- return self.db.openView('raindrop!accounts!all', 'all'
+ return self.doc_model.open_view('raindrop!accounts!all', 'all'
).addCallback(self._gotAllAccounts
).addErrback(self._ohNoes)
- def _gotAllAccounts(self, rows):
- # we don't use a cooperator here as we want them all to run in parallel.
- return defer.DeferredList([d for d in self._genAccountSynchs(rows)])
+ def _gotAllAccounts(self, result):
+ # we don't use the cooperator here as we want them all to run in parallel.
+ return defer.DeferredList([d for d in self._genAccountSynchs(result['rows'])])
def _genAccountSynchs(self, rows):
- doc_model = get_doc_model()
self.log.info("Have %d accounts to synch", len(rows))
to_synch = []
for row in rows:
account_details = row['value']
kind = account_details['kind']
self.log.debug("Found account using protocol %s", kind)
if not self.options.protocols or kind in self.options.protocols:
if kind in proto.protocols:
- account = proto.protocols[kind](self.db, account_details)
+ account = proto.protocols[kind](self.doc_model, account_details)
self.log.info('Starting sync of %s account: %s',
kind, account_details.get('name', '(un-named)'))
self.active_accounts.append(account)
- yield account.startSync(self, doc_model
+ yield account.startSync(self
).addBoth(self._cb_sync_finished, account)
else:
self.log.error("Don't know what to do with account kind: %s", kind)
else:
self.log.info("Skipping account - protocol '%s' is disabled", kind)
def sync(self, whateva=None):
return self._getAllAccounts()
--- a/server/python/run-raindrop.py
+++ b/server/python/run-raindrop.py
@@ -23,16 +23,17 @@ def asynch_command(f):
f.asynch = True
return f
# NOTE: All global functions with docstrings are 'commands'
# They must all return a deferred.
def nuke_db_and_delete_everything_forever(result, parser, options):
"""Nuke the database AND ALL MESSAGES FOREVER"""
return model.nuke_db(
+ ).addCallback(model.fab_db
).addCallback(bootstrap.install_accounts)
# XXX - install_accounts should die too, but how to make a safe 'fingerprint'
# so we can do it implicitly? We could create a hash which doesn't include
# the password, but then the password changing wouldn't be enough to trigger
# an update. Including even the *hash* of the password might risk leaking
# info. So for now you must install manually.
@@ -60,39 +61,39 @@ def unprocess(result, parser, options):
"""
def done(result):
print "unprocess has finished..."
# XXX - pipeline should probably be a singleton?
p = pipeline.Pipeline(model.get_doc_model())
return p.unprocess().addCallback(done)
def delete_docs(result, parser, options):
- """Delete all documents of a particular type. Use with caution"""
+ """Delete all documents of a particular type. Use with caution or see
+ the 'unprocess' command for an alternative.
+ """
# NOTE: This is for development only, until we get a way to say
# 'reprocess stuff you've already done' - in the meantime deleting those
# intermediate docs has the same result...
- from urllib import quote
- db = model.get_db()
-
def _del_docs(to_del):
docs = []
for id, rev in to_del:
docs.append({'_id': id, '_rev': rev, '_deleted': True})
return db.updateDocuments(docs)
def _got_docs(result, dt):
- to_del = [(row['id'], row['value']) for row in result]
+ to_del = [(row['id'], row['value']) for row in result['rows']]
logger.info("Deleting %d documents of type %r", len(to_del), dt)
return to_del
if not options.doctypes:
parser.error("You must specify one or more --doctype")
deferreds = []
for dt in options.doctypes:
- d = db.openView('raindrop!messages!by', 'by_doc_type', key=dt
+ d = model.get_doc_model().open_view('raindrop!messages!by',
+ 'by_doc_type', key=dt
).addCallback(_got_docs, dt
).addCallback(_del_docs
)
deferreds.append(d)
return defer.DeferredList(deferreds)
def _setup_logging(options):