buildbot-helpers/command_runner.py
author ffxbld <release@mozilla.com>
Fri, 02 Dec 2016 00:24:22 -0800
changeset 7192 1b332ac05350d5488cb6fd831f859bb8f2102ee1
parent 3649 4580ea535a5d92dad022bc185b3c8929a6708b54
permissions -rwxr-xr-x
No bug - Tagging tools with FIREFOX_51_0b6_BUILD1_RUNTIME, FIREFOX_51_0b6_RELEASE_RUNTIME a=release DONTBUILD CLOSED TREE

#!/usr/bin/env python
"""
Runs commands from a queue!
"""
import subprocess
import os
import signal
import time
from mozilla_buildtools.queuedir import QueueDir
from buildbot.util import json
import logging
log = logging.getLogger(__name__)


class Job(object):
    def __init__(self, cmd, item_id, log_fp):
        self.cmd = cmd
        self.log = log_fp
        self.item_id = item_id
        self.started = None
        self.last_signal_time = 0
        self.last_signal = None

        self.proc = None

    def start(self):
        devnull = open(os.devnull, 'r')
        self.log.write("Running %s\n" % self.cmd)
        self.log.flush()
        self.proc = subprocess.Popen(self.cmd, close_fds=True, stdin=devnull,
                                     stdout=self.log, stderr=self.log)
        self.started = time.time()

    def check(self):
        now = time.time()
        if now - self.started > self.max_time:
            # Kill stuff off
            if now - self.last_signal_time > 60:
                s = {None: signal.SIGINT, signal.SIGINT:
                     signal.SIGTERM}.get(self.last_signal, signal.SIGKILL)
                log.info("Killing %i with %i", self.proc.pid, s)
                try:
                    self.log.write("Killing with %s\n" % s)
                    os.kill(self.proc.pid, s)
                    self.last_signal = s
                    self.last_signal_time = now
                except OSError:
                    # Ok, process must have exited already
                    log.exception("Failed to kill")
                    pass

        result = self.proc.poll()
        if result is not None:
            self.log.write("\nResult: %s, Elapsed: %1.1f seconds\n" % (result, time.time() - self.started))
            self.log.close()
        return result


class CommandRunner(object):
    def __init__(self, options):
        self.queuedir = options.queuedir
        self.q = QueueDir('commands', self.queuedir)
        self.concurrency = options.concurrency
        self.retry_time = options.retry_time
        self.max_retries = options.max_retries
        self.max_time = options.max_time

        self.active = []

        # List of (signal_time, level, proc)
        self.to_kill = []

    def run(self, job):
        """
        Runs the given job
        """
        log.info("Running %s", job.cmd)
        try:
            job.start()
            self.active.append(job)
        except OSError:
            job.log.write("\nFailed with OSError; requeuing in %i seconds\n" %
                          self.retry_time)
            # Wait to requeue it
            # If we die, then it's still in cur, and will be moved back into
            # 'new' eventually
            self.q.requeue(job.item_id, self.retry_time, self.max_retries)

    def monitor(self):
        """
        Monitor running jobs
        """
        for job in self.active[:]:
            self.q.touch(job.item_id)
            result = job.check()

            if result is not None:
                self.active.remove(job)
                if result == 0:
                    self.q.remove(job.item_id)
                else:
                    log.warn("%s failed; requeuing", job.item_id)
                    # Requeue it!
                    self.q.requeue(
                        job.item_id, self.retry_time, self.max_retries)

    def loop(self):
        """
        Main processing loop. Read new items from the queue and run them!
        """
        while True:
            self.monitor()
            if len(self.active) >= self.concurrency:
                # Wait!
                time.sleep(1)
                continue

            while len(self.active) < self.concurrency:
                item = self.q.pop()
                if not item:
                    # Don't wait for very long, since we have to check up on
                    # our children
                    if self.active:
                        self.q.wait(1)
                    else:
                        self.q.wait()
                    break

                item_id, fp = item
                try:
                    command = json.load(fp)
                    job = Job(command, item_id, self.q.getlog(item_id))
                    job.max_time = self.max_time
                    self.run(job)
                except ValueError:
                    # Couldn't parse it as json
                    # There's no hope!
                    self.q.log(item_id, "Couldn't load json; murdering")
                    self.q.murder(item_id)
                finally:
                    fp.close()


def main():
    from optparse import OptionParser
    import logging.handlers
    parser = OptionParser()
    parser.set_defaults(
        concurrency=1,
        max_retries=1,
        retry_time=0,
        verbosity=0,
        logfile=None,
        max_time=60,
    )
    parser.add_option("-q", "--queuedir", dest="queuedir")
    parser.add_option("-j", "--jobs", dest="concurrency", type="int",
                      help="number of commands to run at once")
    parser.add_option("-r", "--max_retries", dest="max_retries",
                      type="int", help="number of times to retry commands")
    parser.add_option("-t", "--retry_time", dest="retry_time",
                      type="int", help="seconds to wait between retries")
    parser.add_option("-v", "--verbose", dest="verbosity",
                      action="count", help="increase verbosity")
    parser.add_option(
        "-l", "--logfile", dest="logfile", help="where to send logs")
    parser.add_option("-m", "--max_time", dest="max_time", type="int",
                      help="maximum time for a command to run")

    options, args = parser.parse_args()

    # Set up logging
    if options.verbosity == 0:
        log_level = logging.WARNING
    elif options.verbosity == 1:
        log_level = logging.INFO
    else:
        log_level = logging.DEBUG

    if not options.logfile:
        logging.basicConfig(
            level=log_level, format="%(asctime)s - %(message)s")
    else:
        logger = logging.getLogger()
        logger.setLevel(log_level)
        handler = logging.handlers.RotatingFileHandler(
            options.logfile, maxBytes=1024 ** 2, backupCount=5)
        formatter = logging.Formatter("%(asctime)s - %(message)s")
        handler.setFormatter(formatter)
        logger.addHandler(handler)

    if not options.queuedir:
        parser.error("-q/--queuedir is required")

    runner = CommandRunner(options)
    runner.loop()

if __name__ == '__main__':
    main()