author Ben Hearsum <>
Tue, 26 Oct 2010 15:11:47 -0400
changeset 95 58225ba726780fbe6ce9adce00690952b5920906
parent 78 42babfd9ed35458f5acc07b4a0302f6b42304c71
child 139 6cf864606526c53c6d68a4ee3d4b1aec5db2c204
permissions -rw-r--r--
bug 600694: Slave disconnect exceptions should auto-retry. r=rail

# Version: MPL 1.1/GPL 2.0/LGPL 2.1
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
# The Original Code is Mozilla-specific Buildbot steps.
# The Initial Developer of the Original Code is
# Mozilla Foundation.
# Portions created by the Initial Developer are Copyright (C) 2009
# the Initial Developer. All Rights Reserved.
# Contributor(s):
#   Brian Warner <>
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
# ***** END LICENSE BLOCK *****

One-at-a-time notification-triggered Deferred event loop. Each such loop has a
'doorbell' named trigger() and a set of processing functions.  The processing
functions are expected to be callables like Scheduler methods, which examine a
database for work to do. The doorbell will be rung by other code that writes
into the database (possibly in a separate process).

At some point after the doorbell is rung, each function will be run in turn,
one at a time.  Each function can return a Deferred, and the next function will
not be run until the previous one's Deferred has fired.  That is, at all times,
at most one processing function will be active. 

If the doorbell is rung during a run, the loop will be run again later.
Multiple rings may be handled by a single run, but the class guarantees that
there will be at least one full run that begins after the last ring.  The
relative order of processing functions within a run is not preserved.  If a
processing function is added to the loop more than once, it will still only be
called once per run.

If the Deferred returned by the processing function fires with a number, the
event loop will call that function again at or after the given time
(expressed as seconds since epoch). This can be used by processing functions
when they want to 'sleep' until some amount of time has passed, such as for a
Scheduler that is waiting for a tree-stable-timer to expire, or a Periodic
scheduler that wants to fire once every six hours. This delayed call will
obey the same one-at-a-time behavior as the run-everything trigger.

Each function's return-value-timer value will replace the previous timer. Any
outstanding timer will be cancelled just before invoking a processing
function. As a result, these functions should basically be idempotent: if the
database says that the Scheduler needs to wake up at 5pm, it should keep
returning '5pm' until it gets called after 5pm, at which point it should
start returning None.

The functions should also add an epsilon (perhaps one second) to their
desired wakeup time, so that rounding errors or low-resolution system timers
don't cause 'OCD Alarm Clock Syndrome' (in which they get woken up a moment
too early and then try to sleep repeatedly for zero seconds). The event loop
will silently impose a 5-second minimum delay time to avoid this.

Any errors in the processing functions are written to log.err and then

from twisted.internet import reactor, defer
from twisted.application import service
from twisted.python import log

from buildbot.util.eventual import eventually
from buildbot import util

class LoopBase(service.MultiService):

    def __init__(self):
        self._loop_running = False
        self._everything_needs_to_run = False
        self._wakeup_timer = None
        self._timers = {}
        self._when_quiet_waiters = set()
        self._start_timer = None
        self._reactor = reactor # seam for tests to use t.i.t.Clock
        self._remaining = []

    def stopService(self):
        if self._start_timer and
        if self._wakeup_timer and
        return service.MultiService.stopService(self)

    def is_quiet(self):
        return not self._loop_running

    def when_quiet(self):
        d = defer.Deferred()
        return d

    def trigger(self):
        # if we're triggered while not running, ignore it.  We'll automatically
        # trigger when the service starts
        if not self.running:
            print "loop triggered while service disabled; ignoring trigger"

    def _mark_runnable(self, run_everything):
        if run_everything:
            self._everything_needs_to_run = True
            # timers are now redundant, so cancel any existing ones
        if self._loop_running:
        self._loop_running = True
        self._start_timer = self._reactor.callLater(0, self._loop_start)

    def get_processors(self):
        raise Exception('subclasses must implement get_processors()')

    def _loop_start(self):
        if self._everything_needs_to_run:
            self._everything_needs_to_run = False
            self._remaining = list(self.get_processors())
            self._remaining = []
            now =
            all_processors = self.get_processors()
            for p in list(self._timers.keys()):
                if self._timers[p] <= now:
                    del self._timers[p]
                    # don't run a processor that was removed while it still
                    # had a timer running
                    if p in all_processors:
                # consider sorting by 'when'

    def _loop_next(self):
        if not self._remaining:
            return self._loop_done()
        p = self._remaining.pop(0)
        self._timers.pop(p, None)
        now =
        d = defer.maybeDeferred(p)
        d.addCallback(self._set_timer, now, p)
        return None # no long Deferred chains

    def _one_done(self, ignored):

    def _loop_done(self):
        if self._everything_needs_to_run:
        self._loop_running = False
        if not self._timers:
            # we're really idle, so notify waiters (used by unit tests)
            while self._when_quiet_waiters:
                d = self._when_quiet_waiters.pop()
                self._reactor.callLater(0, d.callback, None)

    def loop_done(self):
        # this can be overridden by subclasses to do more work when we've
        # finished a pass through the loop and don't need to immediately
        # start a new one

    def _set_timer(self, res, now, p):
        if isinstance(res, (int, float)):
            assert res > now # give me absolute time, not an interval
            # don't wake up right away. By doing this here instead of in
            # _set_wakeup_timer, we avoid penalizing unrelated jobs which
            # want to wake up a few seconds apart
            when = max(res, now+self.OCD_MINIMUM_DELAY)
            self._timers[p] = when

    def _set_wakeup_timer(self):
        if not self._timers:
            if self._wakeup_timer:
                self._wakeup_timer = None
        when = min(self._timers.values())
        # to avoid waking too frequently, this could be:
        #  delay=max(when-now,OCD_MINIMUM_DELAY)
        # but that delays unrelated jobs that want to wake few seconds apart
        delay = max(0, when -
        if self._wakeup_timer:
            self._wakeup_timer = self._reactor.callLater(delay, self._wakeup)

    def _wakeup(self):
        self._wakeup_timer = None

class Loop(LoopBase):
    def __init__(self):
        self.processors = set()

    def add(self, processor):

    def remove(self, processor):

    def get_processors(self):
        return self.processors.copy()

class DelegateLoop(LoopBase):
    def __init__(self, get_processors_function):
        self.get_processors = get_processors_function

class MultiServiceLoop(LoopBase):
    """I am a Loop which gets my processors from my service children. When I
    run, I iterate over each of them, invoking their 'run' method."""

    def get_processors(self):
        return [ for child in self]