Bug 662885: out of process implementation for pulse publishing, command running. r=dustin
authorChris AtLee <catlee@mozilla.com>
Thu, 08 Sep 2011 09:01:22 -0400
changeset 1786 1d12713b220a3b11c0e898a42e41ea060e27277d
parent 1785 5fee6bc44661ab0d74fc8f37e553c39c5850a0ee
child 1787 d0a86616776ed0c694f4b1a168803c4f98230a2e
push id1255
push usercatlee@mozilla.com
push dateThu, 08 Sep 2011 13:02:19 +0000
reviewersdustin
bugs662885
Bug 662885: out of process implementation for pulse publishing, command running. r=dustin
buildbot-helpers/check_queuedir.py
buildbot-helpers/command_runner.py
buildbot-helpers/pulse_publisher.py
lib/python/buildtools/queuedir.py
new file mode 100755
--- /dev/null
+++ b/buildbot-helpers/check_queuedir.py
@@ -0,0 +1,82 @@
+#!/usr/bin/python
+"""%prog -w <warn_new> -c <crit_new> -t <max_age> queuedir [queuedir...]
+
+nagios plugin to monitor a queuedir"""
+import os, sys, traceback, time
+
+OK, WARNING, CRITICAL, UNKNOWN = range(4)
+
+def check_queuedir(d, options):
+    status = OK
+    msgs = []
+
+    # Check 'dead'
+    num_dead = len(os.listdir(os.path.join(d, 'dead')))
+    if num_dead > 0:
+        status = CRITICAL
+        msgs.append("%i dead items" % num_dead)
+
+    # Check 'new'
+    new_files = os.listdir(os.path.join(d, 'new'))
+    num_new = len(new_files)
+    if num_new > 0:
+        oldest_new = min(
+                os.path.getmtime(os.path.join(d, 'new', f)) for f in new_files)
+        if num_new >= options.crit_new:
+            status = CRITICAL
+            msgs.append("%i new items" % num_new)
+        elif num_new >= options.warn_new:
+            status = max(status, WARNING)
+            msgs.append("%i new items" % num_new)
+
+        age = int(time.time() - oldest_new)
+        if age > options.max_age:
+            status = max(status, WARNING)
+            msgs.append("oldest item is %is old" % age)
+
+
+    return status, msgs
+
+def main():
+    from optparse import OptionParser
+    parser = OptionParser(__doc__)
+    parser.set_defaults(
+            warn_new=5,
+            crit_new=10,
+            max_age=300,
+            )
+    parser.add_option("-w", dest="warn_new", type="int",
+            help="warn when there are more than this number of items in new")
+    parser.add_option("-c", dest="crit_new", type="int",
+            help="critical when there are more than this number of items in new")
+    parser.add_option("-t", dest="max_age", type="int",
+            help="warn when oldest item in new is more than this many seconds old")
+
+    options, args = parser.parse_args()
+
+    if len(args) == 0:
+        print "You must specify at least one queuedir"
+        sys.exit(UNKNOWN)
+
+    try:
+        status = OK
+        msgs = []
+        for d in args:
+            d_status, d_msgs = check_queuedir(d, options)
+            status = max(status, d_status)
+            msgs.extend(d_msgs)
+
+        if not msgs:
+            print "Ok"
+        else:
+            print ";".join(msgs)
+        sys.exit(status)
+    except SystemExit:
+        raise
+    except:
+        print "Unhandled exception"
+        traceback.print_exc()
+        sys.exit(UNKNOWN)
+
+if __name__ == '__main__':
+    main()
new file mode 100755
--- /dev/null
+++ b/buildbot-helpers/command_runner.py
@@ -0,0 +1,183 @@
+#!/usr/bin/env python
+"""
+Runs commands from a queue!
+"""
+import subprocess, os, signal
+import time
+from buildtools.queuedir import QueueDir
+from buildbot.util import json
+import logging
+log = logging.getLogger(__name__)
+
+class Job(object):
+    def __init__(self, cmd, item_id, log_fp):
+        self.cmd = cmd
+        self.log = log_fp
+        self.item_id = item_id
+        self.started = None
+        self.last_signal_time = 0
+        self.last_signal = None
+
+        self.proc = None
+
+    def start(self):
+        devnull = open(os.devnull, 'r')
+        self.log.write("Running %s\n" % self.cmd)
+        self.log.flush()
+        self.proc = subprocess.Popen(self.cmd, close_fds=True, stdin=devnull, stdout=self.log, stderr=self.log)
+        self.started = time.time()
+
+    def check(self):
+        now = time.time()
+        if now - self.started > self.max_time:
+            # Kill stuff off
+            if now - self.last_signal_time > 60:
+                s = {None: signal.SIGINT, signal.SIGINT: signal.SIGTERM}.get(self.last_signal, signal.SIGKILL)
+                log.info("Killing %i with %i", self.proc.pid, s)
+                try:
+                    self.log.write("Killing with %s\n" % s)
+                    os.kill(self.proc.pid, s)
+                    self.last_signal = s
+                    self.last_signal_time = now
+                except OSError:
+                    # Ok, process must have exited already
+                    log.exception("Failed to kill")
+                    pass
+
+        result = self.proc.poll()
+        if result is not None:
+            self.log.write("\nResult: %s\n" % result)
+            self.log.close()
+        return result
+
+class CommandRunner(object):
+    def __init__(self, options):
+        self.queuedir = options.queuedir
+        self.q = QueueDir('commands', self.queuedir)
+        self.concurrency = options.concurrency
+        self.retry_time = options.retry_time
+        self.max_retries = options.max_retries
+        self.max_time = options.max_time
+
+        self.active = []
+
+        # List of (signal_time, level, proc)
+        self.to_kill = []
+
+    def run(self, job):
+        """
+        Runs the given job
+        """
+        log.info("Running %s", job.cmd)
+        try:
+            job.start()
+            self.active.append(job)
+        except OSError:
+            job.log.write("\nFailed with OSError; requeuing in %i seconds\n" % self.retry_time)
+            # Wait to requeue it
+            # If we die, then it's still in cur, and will be moved back into
+            # 'new' eventually
+            self.q.requeue(job.item_id, self.retry_time, self.max_retries)
+
+    def monitor(self):
+        """
+        Monitor running jobs
+        """
+        for job in self.active[:]:
+            self.q.touch(job.item_id)
+            result = job.check()
+
+            if result is not None:
+                self.active.remove(job)
+                if result == 0:
+                    self.q.remove(job.item_id)
+                else:
+                    log.warn("%s failed; requeuing", job.item_id)
+                    # Requeue it!
+                    self.q.requeue(job.item_id, self.retry_time, self.max_retries)
+
+    def loop(self):
+        """
+        Main processing loop. Read new items from the queue and run them!
+        """
+        while True:
+            self.monitor()
+            if len(self.active) >= self.concurrency:
+                # Wait!
+                time.sleep(1)
+                continue
+
+            while len(self.active) < self.concurrency:
+                item = self.q.pop()
+                if not item:
+                    # Don't wait for very long, since we have to check up on
+                    # our children
+                    if self.active:
+                        self.q.wait(1)
+                    else:
+                        self.q.wait()
+                    break
+
+                item_id, fp = item
+                try:
+                    command = json.load(fp)
+                    job = Job(command, item_id, self.q.getlog(item_id))
+                    job.max_time = self.max_time
+                    self.run(job)
+                except ValueError:
+                    # Couldn't parse it as json
+                    # There's no hope!
+                    self.q.log(item_id, "Couldn't load json; murdering")
+                    self.q.murder(item_id)
+                finally:
+                    fp.close()
+
+def main():
+    from optparse import OptionParser
+    import logging.handlers
+    parser = OptionParser()
+    parser.set_defaults(
+            concurrency=1,
+            max_retries=1,
+            retry_time=0,
+            verbosity=0,
+            logfile=None,
+            max_time=60,
+            )
+    parser.add_option("-q", "--queuedir", dest="queuedir")
+    parser.add_option("-j", "--jobs", dest="concurrency", type="int", help="number of commands to run at once")
+    parser.add_option("-r", "--max_retries", dest="max_retries", type="int", help="number of times to retry commands")
+    parser.add_option("-t", "--retry_time", dest="retry_time", type="int", help="seconds to wait between retries")
+    parser.add_option("-v", "--verbose", dest="verbosity", action="count", help="increase verbosity")
+    parser.add_option("-l", "--logfile", dest="logfile", help="where to send logs")
+    parser.add_option("-m", "--max_time", dest="max_time", type="int", help="maximum time for a command to run")
+
+    options, args = parser.parse_args()
+
+    # Set up logging
+    if options.verbosity == 0:
+        log_level = logging.WARNING
+    elif options.verbosity == 1:
+        log_level = logging.INFO
+    else:
+        log_level = logging.DEBUG
+
+    if not options.logfile:
+        logging.basicConfig(level=log_level, format="%(asctime)s - %(message)s")
+    else:
+        logger = logging.getLogger()
+        logger.setLevel(log_level)
+        handler = logging.handlers.RotatingFileHandler(
+                    options.logfile, maxBytes=1024**2, backupCount=5)
+        formatter = logging.Formatter("%(asctime)s - %(message)s")
+        handler.setFormatter(formatter)
+        logger.addHandler(handler)
+
+    if not options.queuedir:
+        parser.error("-q/--queuedir is required")
+
+    runner = CommandRunner(options)
+    runner.loop()
+
+if __name__ == '__main__':
+    main()
new file mode 100755
--- /dev/null
+++ b/buildbot-helpers/pulse_publisher.py
@@ -0,0 +1,259 @@
+#!/usr/bin/env python
+"""
+Publisher for Pulse events.
+
+Consumes new events being written into a queue directory by the PulseStatus
+plugin
+
+see http://hg.mozilla.org/users/clegnitto_mozilla.com/mozillapulse/ for pulse
+code
+"""
+import time
+from datetime import tzinfo, timedelta, datetime
+
+from mozillapulse.messages.build import BuildMessage
+from buildtools.queuedir import QueueDir
+from buildbot.util import json
+
+import logging
+log = logging.getLogger(__name__)
+
+ZERO = timedelta(0)
+HOUR = timedelta(hours=1)
+
+# A UTC class.
+class UTC(tzinfo):
+    """UTC"""
+
+    def utcoffset(self, dt):
+        return ZERO
+
+    def tzname(self, dt):
+        return "UTC"
+
+    def dst(self, dt):
+        return ZERO
+
+def transform_time(t):
+    """Transform an epoch time to a string representation of the form
+    YYYY-mm-ddTHH:MM:SS+0000"""
+    if t is None:
+        return None
+    elif isinstance(t, basestring):
+        return t
+
+    dt = datetime.fromtimestamp(t, UTC())
+    return dt.strftime('%Y-%m-%dT%H:%M:%S%z')
+
+def transform_times(event):
+    """Replace epoch times in event with string representations of the time"""
+    if isinstance(event, dict):
+        retval = {}
+        for key, value in event.items():
+            if key == 'times' and len(value) == 2:
+                retval[key] = [transform_time(t) for t in value]
+            else:
+                retval[key] = transform_times(value)
+    else:
+        retval = event
+    return retval
+
+class PulsePusher(object):
+    """
+    Publish buildbot events via pulse.
+
+    `queuedir`         - a directory to look for incoming events being written
+                         by a buildbot master
+
+    `publisher`        - an instance of mozillapulse.GenericPublisher indicating where
+                         these messages should be sent
+
+    `max_idle_time`    - number of seconds since last activity after which we'll
+                         disconnect. Set to None/0 to disable
+
+    `max_connect_time` - number of seconds since we last connected after which
+                         we'll disconnect. Set to None/0 to disable
+
+    `retry_time`       - time in seconds to wait between retries
+
+    `max_retries`      - how many times to retry
+    """
+    def __init__(self, queuedir, publisher, max_idle_time=300,
+            max_connect_time=600, retry_time=60, max_retries=5):
+        self.queuedir = QueueDir('pulse', queuedir)
+        self.publisher = publisher
+        self.max_idle_time = max_idle_time
+        self.max_connect_time = max_connect_time
+        self.retry_time = retry_time
+        self.max_retries = max_retries
+
+        # When should we next disconnect
+        self._disconnect_timer = None
+        # When did we last have activity
+        self._last_activity = None
+        # When did we last connect
+        self._last_connection = None
+
+    def send(self, events):
+        """
+        Send events to pulse
+
+        `events` - a list of buildbot event dicts
+        """
+        if not self._last_connection and self.max_connect_time:
+            self._last_connection = time.time()
+        log.debug("Sending %i messages", len(events))
+        start = time.time()
+        for e in events:
+            msg = BuildMessage(transform_times(e))
+            self.publisher.publish(msg)
+        end = time.time()
+        log.info("Sent %i messages in %.2fs", len(events), end-start)
+        self._last_activity = time.time()
+
+        # Update our timers
+        t = 0
+        if self.max_connect_time:
+            t = self._last_connection + self.max_connect_time
+        if self.max_idle_time:
+            if t:
+                t = min(t, self._last_activity + self.max_idle_time)
+            else:
+                t = self._last_activity + self.max_idle_time
+        if t:
+            self._disconnect_timer = t
+
+    def maybe_disconnect(self):
+        "Disconnect from pulse if our timer has expired"
+        now = time.time()
+        if self._disconnect_timer and now > self._disconnect_timer:
+            log.info("Disconnecting")
+            self.publisher.disconnect()
+            self._disconnect_timer = None
+            self._last_connection = None
+            self._last_activity = None
+
+    def loop(self):
+        """
+        Main processing loop. Read new items from the queue, push them to
+        pulse, remove processed items, and then wait for more.
+        """
+        while True:
+            self.maybe_disconnect()
+
+            # Grab any new events
+            item_ids = []
+            events = []
+            come_back_soon = False
+            try:
+                while True:
+                    item = self.queuedir.pop()
+                    if not item:
+                        break
+                    if len(events) > 50:
+                        come_back_soon = True
+                        break
+
+                    try:
+                        item_id, fp = item
+                        item_ids.append(item_id)
+                        log.debug("Loading %s", item)
+                        events.extend(json.load(fp))
+                    except:
+                        log.exception("Error loading %s", item_id)
+                        raise
+                    finally:
+                        fp.close()
+                log.info("Loaded %i events", len(events))
+                self.send(events)
+                for item_id in item_ids:
+                    log.info("Removing %s", item_id)
+                    self.queuedir.remove(item_id)
+            except:
+                log.exception("Error processing messages")
+                # Don't try again soon, something has gone horribly wrong!
+                come_back_soon = False
+                for item_id in item_ids:
+                    self.queuedir.requeue(item_id, self.retry_time, self.max_retries)
+
+            if come_back_soon:
+                # Let's do more right now!
+                log.info("Doing more!")
+                continue
+
+            # Wait for more
+            # don't wait more than our max_idle/max_connect_time
+            now = time.time()
+            to_wait = None
+            if self._disconnect_timer:
+                to_wait = self._disconnect_timer - now
+                if to_wait < 0:
+                    to_wait = None
+            log.info("Waiting for %s", to_wait)
+            self.queuedir.wait(to_wait)
+
+def main():
+    from optparse import OptionParser
+    from mozillapulse.publishers import GenericPublisher
+    from mozillapulse.config import PulseConfiguration
+    import logging.handlers
+    parser = OptionParser()
+    parser.set_defaults(
+            verbosity=0,
+            logfile=None,
+            max_retries=1,
+            retry_time=0,
+            )
+    parser.add_option("--passwords", dest="passwords")
+    parser.add_option("-q", "--queuedir", dest="queuedir")
+    parser.add_option("-v", "--verbose", dest="verbosity", action="count",
+            help="increase verbosity")
+    parser.add_option("-l", "--logfile", dest="logfile",
+            help="where to send logs")
+    parser.add_option("-r", "--max_retries", dest="max_retries", type="int",
+            help="number of times to retry")
+    parser.add_option("-t", "--retry_time", dest="retry_time", type="int",
+            help="seconds to wait between retries")
+
+    options, args = parser.parse_args()
+
+    # Set up logging
+    if options.verbosity == 0:
+        log_level = logging.WARNING
+    elif options.verbosity == 1:
+        log_level = logging.INFO
+    else:
+        log_level = logging.DEBUG
+
+    if not options.logfile:
+        logging.basicConfig(level=log_level, format="%(asctime)s - %(message)s")
+    else:
+        logger = logging.getLogger()
+        logger.setLevel(log_level)
+        handler = logging.handlers.RotatingFileHandler(
+                    options.logfile, maxBytes=1024**2, backupCount=5)
+        formatter = logging.Formatter("%(asctime)s - %(message)s")
+        handler.setFormatter(formatter)
+        logger.addHandler(handler)
+
+    if not options.passwords:
+        parser.error("--passwords is required")
+    if not options.queuedir:
+        parser.error("-q/--queuedir is required")
+
+    passwords = {}
+    execfile(options.passwords, passwords, passwords)
+
+    publisher = GenericPublisher(
+            PulseConfiguration(
+                user=passwords['PULSE_USERNAME'],
+                password=passwords['PULSE_PASSWORD'],
+            ),
+            exchange=passwords['PULSE_EXCHANGE'])
+
+    pusher = PulsePusher(options.queuedir, publisher,
+            max_retries=options.max_retries, retry_time=options.retry_time)
+    pusher.loop()
+
+if __name__ == '__main__':
+    main()
new file mode 100644
--- /dev/null
+++ b/lib/python/buildtools/queuedir.py
@@ -0,0 +1,301 @@
+"""
+Implement an on-disk queue for stuff
+"""
+import os, tempfile, time
+import logging
+log = logging.getLogger(__name__)
+
+try:
+    import pyinotify
+    assert pyinotify
+    class _MovedHandler(pyinotify.ProcessEvent):
+        def process_IN_MOVED_TO(self, event):
+            pass
+except ImportError:
+    pyinotify = None
+
+class QueueDir(object):
+    # How long before things are considered to be "old"
+    # Also how long between cleanup jobs
+    cleanup_time = 300 # 5 minutes
+
+    # Should the producer do cleanup?
+    producer_cleanup = True
+
+    # Mapping of names to QueueDir instances
+    _objects = {}
+
+    def __init__(self, name, queue_dir):
+        self._objects[name] = self
+        self.queue_dir = queue_dir
+
+        self.pid = os.getpid()
+        self.started = int(time.time())
+        self.count = 0
+        self.last_cleanup = 0
+        # List of time, item_id for items to move from cur back into new
+        self.to_requeue = []
+
+        self.tmp_dir = os.path.join(self.queue_dir, 'tmp')
+        self.new_dir = os.path.join(self.queue_dir, 'new')
+        self.cur_dir = os.path.join(self.queue_dir, 'cur')
+        self.log_dir = os.path.join(self.queue_dir, 'logs')
+        self.dead_dir = os.path.join(self.queue_dir, 'dead')
+
+        self.setup()
+
+    @classmethod
+    def getQueue(cls, name):
+        return cls._objects[name]
+
+    def setup(self):
+        for d in (self.tmp_dir, self.new_dir, self.cur_dir, self.log_dir, self.dead_dir):
+            if not os.path.exists(d):
+                os.makedirs(d, 0700)
+            else:
+                os.chmod(d, 0700)
+
+        self.cleanup()
+
+    def cleanup(self):
+        """
+        Removes old items from tmp
+        Removes old logs from log_dir
+        Moves old items from cur into new
+
+        'old' is defined by the cleanup_time property
+        """
+        now = time.time()
+        if now - self.last_cleanup < self.cleanup_time:
+            return
+        self.last_cleanup = now
+        dirs = [self.tmp_dir, self.log_dir]
+        for d in dirs:
+            for f in os.listdir(d):
+                fn = os.path.join(d, f)
+                try:
+                    if os.path.getmtime(fn) < now - self.cleanup_time:
+                        os.unlink(fn)
+                except OSError:
+                    pass
+
+        for f in os.listdir(self.cur_dir):
+            fn = os.path.join(self.cur_dir, f)
+            try:
+                if os.path.getmtime(fn) < now - self.cleanup_time:
+                    self.requeue(f)
+            except OSError:
+                pass
+
+    ###
+    # For producers
+    ###
+    def add(self, data):
+        """
+        Adds a new item to the queue.
+        """
+        # write data to tmp
+        fd, tmp_name = tempfile.mkstemp(prefix="%i-%i-%i" % (self.started,
+            self.count, self.pid), dir=self.tmp_dir)
+        os.write(fd, data)
+        os.close(fd)
+
+        dst_name = os.path.join(self.new_dir, os.path.basename(tmp_name))
+        os.rename(tmp_name, dst_name)
+        self.count += 1
+
+        if self.producer_cleanup:
+            self.cleanup()
+
+    ###
+    # For consumers
+    ###
+    def pop(self, sorted=True):
+        """
+        Moves an item from new into cur
+        Returns item_id, file handle
+        Returns None if queue is empty
+        If sorted is True, then the earliest item is returned
+        """
+        self._check_to_requeue()
+        self.cleanup()
+        items = os.listdir(self.new_dir)
+        if sorted:
+            items.sort(key=lambda f: os.path.getmtime(os.path.join(self.new_dir, f)))
+        for item in items:
+            try:
+                dst_name = os.path.join(self.cur_dir, item)
+                os.rename(os.path.join(self.new_dir, item), dst_name)
+                os.utime(dst_name, None)
+                return item, open(dst_name, 'rb')
+            except OSError:
+                pass
+        return None
+
+    def peek(self):
+        """
+        Returns True if there are new items in the queue
+        """
+        items = os.listdir(self.new_dir)
+        return len(items) > 0
+
+    def touch(self, item_id):
+        """
+        Indicate that we're still working on this item
+        """
+        fn = os.path.join(self.cur_dir, item_id)
+        try:
+            os.utime(fn, None)
+        except OSError:
+            # Somebody else moved this; that's probably ok
+            pass
+
+    def getcount(self, item_id):
+        """
+        Returns how many times this item has been run
+        """
+        try:
+            return int(item_id.split(".")[1])
+        except:
+            return 0
+
+    def getlogname(self, item_id):
+        if "." in item_id:
+            item_id = item_id.split(".")[0]
+        fn = os.path.join(self.log_dir, "%s.log" % item_id)
+        return fn
+
+    def getlog(self, item_id):
+        """
+        Creates and returns a file object for a log file for this item
+        """
+        return open(self.getlogname(item_id), "a+")
+
+    def log(self, item_id, msg):
+        self.getlog(item_id).write(msg + "\n")
+
+    def remove(self, item_id):
+        """
+        Removes item_id from cur
+        """
+        os.unlink(os.path.join(self.cur_dir, item_id))
+
+    def _check_to_requeue(self):
+        if not self.to_requeue:
+            return
+        now = time.time()
+        for t, item_id in self.to_requeue[:]:
+            if now > t:
+                self.requeue(item_id)
+                self.to_requeue.remove( (t, item_id) )
+            else:
+                self.touch(item_id)
+
+    def requeue(self, item_id, delay=None, max_retries=None):
+        """
+        Moves item_id from cur back into new, incrementing the counter at the
+        end.
+
+        If delay is set, it is a number of seconds to wait before moving the
+        item back into new. It will remain in cur until the appropriate time
+        has expired.
+        The state for this is kept in the QueueDir instance, so if the instance
+        goes away, the item won't be requeued on schedule. It will eventually
+        be moved out of cur when the cleanup time expires however.
+        You must be call pop() at some point in the future for requeued items
+        to be processed.
+        """
+        try:
+            bits = item_id.split(".")
+            core_item_id = ".".join(bits[:-1])
+            count = int(bits[-1])+1
+        except:
+            core_item_id = item_id
+            count = 1
+
+        if max_retries is not None and count > max_retries:
+            log.info("Maximum retry count exceeded; murdering %s", item_id)
+            self.murder(item_id)
+            return
+
+        if delay:
+            self.to_requeue.append( (time.time() + delay, item_id) )
+            self.to_requeue.sort()
+            return
+
+        dst_name = os.path.join(self.new_dir, "%s.%i" % (core_item_id, count))
+        os.rename(os.path.join(self.cur_dir, item_id), dst_name)
+        os.utime(dst_name, None)
+
+    def murder(self, item_id):
+        """
+        Moves item_id and log from cur into dead for future inspection
+        """
+        dst_name = os.path.join(self.dead_dir, item_id)
+        os.rename(os.path.join(self.cur_dir, item_id), dst_name)
+        if os.path.exists(self.getlogname(item_id)):
+            dst_name = os.path.join(self.dead_dir, "%s.log" % item_id)
+            os.rename(self.getlogname(item_id), dst_name)
+
+    if pyinotify:
+        def wait(self, timeout=None):
+            """
+            Waits for new items to arrive in new.
+            timeout is in seconds, and is the maximum amount of time to wait. we
+            might return before that.
+            """
+            # Check if we have any items to requeue
+            if self.to_requeue:
+                reque_time = self.to_requeue[0][0] - time.time()
+                # Need to do something right now!
+                if reque_time < 0:
+                    return
+                if timeout:
+                    timeout = min(reque_time, timeout)
+                else:
+                    timeout = reque_time
+
+            if timeout:
+                timeout *= 1000
+
+            log.debug("Sleeping for %s", timeout)
+
+            wm = pyinotify.WatchManager()
+            try:
+                wm.add_watch(self.new_dir, pyinotify.IN_MOVED_TO)
+
+                notifier = pyinotify.Notifier(wm, _MovedHandler())
+                notifier.check_events(timeout)
+                notifier.process_events()
+            finally:
+                wm.close()
+    else:
+        def wait(self, timeout=None):
+            """
+            Waits for new items to arrive in new.
+            timeout is in seconds, and is the maximum amount of time to wait. we
+            might return before that.
+            """
+            # Check if we have any items to requeue
+            if self.to_requeue:
+                reque_time = self.to_requeue[0][0] - time.time()
+                # Need to do something right now!
+                if reque_time < 0:
+                    return
+                if timeout:
+                    timeout = min(reque_time, timeout)
+                else:
+                    timeout = reque_time
+
+            if timeout:
+                timeout *= 1000
+
+            log.debug("Sleeping for %s", timeout)
+
+            start = time.time()
+            while True:
+                if self.peek():
+                    return
+                time.sleep(1)
+                if timeout and time.time() - start > timeout:
+                    return