server/python/raindrop/sync.py
author Mark Hammond <mhammond@skippinet.com.au>
Mon, 06 Apr 2009 12:20:36 +1000
branchtwisty
changeset 163 2656685fd6e423b4d9e3537f54a7c94d0f677cdd
parent 149 server/python/junius/sync.py@93d6e2aa1a5068cd67c1d34a450a4d9c2cde07ff
child 414 0f41ea799a1dd84e872664092b5dcefa147e8786
permissions -rw-r--r--
s/junius/raindrop/

import logging

from twisted.internet import reactor, defer, task
from twisted.python.failure import Failure
import twisted.web.error
import paisley

from . import proto as proto

logger = logging.getLogger(__name__)

_conductor = None
from .model import get_doc_model

def get_conductor(options=None):
  global _conductor
  if _conductor is None:
    proto.init_protocols()
    _conductor = SyncConductor(options)
  else:
    assert options is None, 'can only set this at startup'
  return _conductor
  

# XXX - rename this to plain 'Conductor' and move to a different file.
# This 'conducts' synchronization, the work queues and the interactions with
# the extensions and database.
class SyncConductor(object):
  def __init__(self, options):
    self.log = logger
    self.options = options
    # apparently it is now considered 'good form' to pass reactors around, so
    # a future of multiple reactors is possible.
    # We capture it here, and all the things we 'conduct' use this reactor
    # (but later it should be passed to our ctor too)
    self.reactor = reactor
    self.coop = task.Cooperator()
    reactor.addSystemEventTrigger("before", "shutdown", self._kill_coop)
    self.doc_model = get_doc_model()

    self.active_accounts = []

  def _kill_coop(self):
    logger.debug('stopping the coordinator')
    self.coop.stop()
    logger.debug('coordinator stopped')

  def _ohNoes(self, failure, *args, **kwargs):
    self.log.error('OH NOES! failure! %s', failure)

  def _getAllAccounts(self):
    return self.doc_model.open_view('raindrop!accounts!all', 'all'
      ).addCallback(self._gotAllAccounts
      ).addErrback(self._ohNoes)

  def _gotAllAccounts(self, result):
    # we don't use the cooperator here as we want them all to run in parallel.
    return defer.DeferredList([d for d in self._genAccountSynchs(result['rows'])])

  def _genAccountSynchs(self, rows):
    self.log.info("Have %d accounts to synch", len(rows))
    to_synch = []
    for row in rows:
      account_details = row['value']
      kind = account_details['kind']
      self.log.debug("Found account using protocol %s", kind)
      if not self.options.protocols or kind in self.options.protocols:
        if kind in proto.protocols:
          account = proto.protocols[kind](self.doc_model, account_details)
          self.log.info('Starting sync of %s account: %s',
                        kind, account_details.get('name', '(un-named)'))
          self.active_accounts.append(account)
          yield account.startSync(self
                    ).addBoth(self._cb_sync_finished, account)
        else:
          self.log.error("Don't know what to do with account kind: %s", kind)
      else:
          self.log.info("Skipping account - protocol '%s' is disabled", kind)

  def sync(self, whateva=None):
    return self._getAllAccounts()

  def _cb_sync_finished(self, result, account):
    if isinstance(result, Failure):
      self.log.error("Account %s failed with an error: %s", account, result)
    else:
      self.log.debug("Account %s finished successfully", account)
    assert account in self.active_accounts, (account, self.active_accounts)
    self.active_accounts.remove(account)
    if not self.active_accounts:
      self.log.info("all accounts have finished synchronizing")


if __name__ == '__main__':
  # normal entry-point is the app itself; this is purely for debugging...
  logging.basicConfig()
  logging.getLogger().setLevel(logging.DEBUG)

  conductor = get_conductor()
  conductor.reactor.callWhenRunning(conductor.sync)

  logger.debug('starting reactor')
  conductor.reactor.run()
  logger.debug('reactor done')