server/python/junius/sync.py
author Mark Hammond <mhammond@skippinet.com.au>
Mon, 30 Mar 2009 15:45:42 +1100
branchtwisty
changeset 147 c1f0e70ed6d51ba4e5c31195dc41feb210ba03a3
parent 145 b1ab63ee8c478f4f9dd3908a55e5eed56db24ccc
child 149 93d6e2aa1a5068cd67c1d34a450a4d9c2cde07ff
permissions -rw-r--r--
remove commented code

import logging

from twisted.internet import reactor, defer
from twisted.python.failure import Failure
import twisted.web.error
import paisley

from . import proto as proto
from .model import get_db

logger = logging.getLogger(__name__)

_conductor = None
from .model import get_doc_model

def get_conductor(options=None):
  global _conductor
  if _conductor is None:
    proto.init_protocols()
    _conductor = SyncConductor(options)
  else:
    assert options is None, 'can only set this at startup'
  return _conductor
  

# XXX - rename this to plain 'Conductor' and move to a different file.
# This 'conducts' synchronization, the work queues and the interactions with
# the extensions and database.
class SyncConductor(object):
  def __init__(self, options):
    self.log = logger
    self.options = options
    # apparently it is now considered 'good form' to pass reactors around, so
    # a future of multiple reactors is possible.
    # We capture it here, and all the things we 'conduct' use this reactor
    # (but later it should be passed to our ctor too)
    self.reactor = reactor

    self.db = get_db()

    self.active_accounts = []

  def _ohNoes(self, failure, *args, **kwargs):
    self.log.error('OH NOES! failure! %s', failure)

  def _getAllAccounts(self):
    return self.db.openView('raindrop!accounts!all', 'all'
      ).addCallback(self._gotAllAccounts
      ).addErrback(self._ohNoes)

  def _gotAllAccounts(self, rows):
    # we don't use a cooperator here as we want them all to run in parallel.
    return defer.DeferredList([d for d in self._genAccountSynchs(rows)])

  def _genAccountSynchs(self, rows):
    doc_model = get_doc_model()
    self.log.info("Have %d accounts to synch", len(rows))
    to_synch = []
    for row in rows:
      account_details = row['value']
      kind = account_details['kind']
      self.log.debug("Found account using protocol %s", kind)
      if not self.options.protocols or kind in self.options.protocols:
        if kind in proto.protocols:
          account = proto.protocols[kind](self.db, account_details)
          self.log.info('Starting sync of %s account: %s',
                        kind, account_details.get('name', '(un-named)'))
          self.active_accounts.append(account)
          yield account.startSync(self, doc_model
                    ).addBoth(self._cb_sync_finished, account)
        else:
          self.log.error("Don't know what to do with account kind: %s", kind)
      else:
          self.log.info("Skipping account - protocol '%s' is disabled", kind)

  def sync(self, whateva=None):
    return self._getAllAccounts()

  def _cb_sync_finished(self, result, account):
    if isinstance(result, Failure):
      self.log.error("Account %s failed with an error: %s", account, result)
    else:
      self.log.debug("Account %s finished successfully", account)
    assert account in self.active_accounts, (account, self.active_accounts)
    self.active_accounts.remove(account)
    if not self.active_accounts:
      self.log.info("all accounts have finished synchronizing")


if __name__ == '__main__':
  # normal entry-point is the app itself; this is purely for debugging...
  logging.basicConfig()
  logging.getLogger().setLevel(logging.DEBUG)

  conductor = get_conductor()
  conductor.reactor.callWhenRunning(conductor.sync)

  logger.debug('starting reactor')
  conductor.reactor.run()
  logger.debug('reactor done')