Bug 1175636 - Do not spawn a watchdog for each test on Windows; r=sfink
authorTerrence Cole <terrence@mozilla.com>
Wed, 17 Jun 2015 11:54:35 -0700
changeset 280668 cf426328aee818b5a1756123243631dd55193f34
parent 280667 953f8ac1e47ea90b9cb255a294d3f0450b30bcd2
child 280669 43aa1645ee307aa63c414487e708e3a0d3e68e5f
push id4932
push userjlund@mozilla.com
push dateMon, 10 Aug 2015 18:23:06 +0000
treeherdermozilla-beta@6dd5a4f5f745 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssfink
bugs1175636
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1175636 - Do not spawn a watchdog for each test on Windows; r=sfink
js/src/tests/lib/progressbar.py
js/src/tests/lib/results.py
js/src/tests/lib/tasks_win.py
js/src/tests/lib/tests.py
--- a/js/src/tests/lib/progressbar.py
+++ b/js/src/tests/lib/progressbar.py
@@ -10,17 +10,18 @@ else:
     from terminal_unix import Terminal
 
 class NullProgressBar(object):
     def update(self, current, data): pass
     def poke(self): pass
     def finish(self, complete=True): pass
     def beginline(self): pass
     def message(self, msg): sys.stdout.write(msg + '\n')
-    def update_granularity(self): return timedelta.max
+    @staticmethod
+    def update_granularity(): return timedelta.max
 
 class ProgressBar(object):
     def __init__(self, limit, fmt):
         assert self.conservative_isatty()
 
         self.prior = None
         self.atLineStart = True
         self.counters_fmt = fmt # [{str:str}] Describtion of how to lay out each
@@ -32,17 +33,18 @@ class ProgressBar(object):
         # Compute the width of the counters and build the format string.
         self.counters_width = 1 # [
         for layout in self.counters_fmt:
             self.counters_width += self.limit_digits
             self.counters_width += 1 # | (or ']' for the last one)
 
         self.barlen = 64 - self.counters_width
 
-    def update_granularity(self):
+    @staticmethod
+    def update_granularity():
         return timedelta(seconds=0.1)
 
     def update(self, current, data):
         # Record prior for poke.
         self.prior = (current, data)
         self.atLineStart = False
 
         # Build counters string.
--- a/js/src/tests/lib/results.py
+++ b/js/src/tests/lib/results.py
@@ -146,17 +146,17 @@ class ResultsSink:
             else:
                 show_cmd = self.options.show_cmd \
                            and not self.options.failed_only
 
             if show_output or show_cmd:
                 self.pb.beginline()
 
                 if show_output:
-                    print('## {}: rc = {:d}, run time = {:f}'.format(
+                    print('## {}: rc = {:d}, run time = {}'.format(
                         output.test.path, output.rc, output.dt), file=self.fp)
 
                 if show_cmd:
                     print(escape_cmdline(output.cmd), file=self.fp)
 
                 if show_output:
                     self.fp.write(output.out)
                     self.fp.write(output.err)
--- a/js/src/tests/lib/tasks_win.py
+++ b/js/src/tests/lib/tasks_win.py
@@ -1,67 +1,122 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/. */
 from __future__ import print_function, unicode_literals, division
 
+import subprocess
 import sys
+from datetime import datetime, timedelta
+from progressbar import ProgressBar
+from results import TestOutput
 from threading import Thread
 from Queue import Queue, Empty
 
 
 class EndMarker:
     pass
 
 
-def _do_work(qTasks, qResults, prefix, timeout):
+class TaskFinishedMarker:
+    pass
+
+
+def _do_work(qTasks, qResults, qWatch, prefix, timeout):
     while True:
         test = qTasks.get(block=True, timeout=sys.maxint)
         if test is EndMarker:
+            qWatch.put(EndMarker)
             qResults.put(EndMarker)
             return
-        qResults.put(test.run(prefix, timeout))
+
+        # Spawn the test task.
+        cmd = test.get_command(prefix)
+        tStart = datetime.now()
+        proc = subprocess.Popen(cmd,
+                                stdout=subprocess.PIPE,
+                                stderr=subprocess.PIPE)
+
+        # Push the task to the watchdog -- it will kill the task
+        # if it goes over the timeout while we keep its stdout
+        # buffer clear on the "main" worker thread.
+        qWatch.put(proc)
+        out, err = proc.communicate()
+        qWatch.put(TaskFinishedMarker)
+
+        # Create a result record and forward to result processing.
+        dt = datetime.now() - tStart
+        result = TestOutput(test, cmd, out, err, proc.returncode, dt.total_seconds(),
+                            dt > timedelta(seconds=timeout))
+        qResults.put(result)
+
+
+def _do_watch(qWatch, timeout):
+    while True:
+        proc = qWatch.get(True)
+        if proc == EndMarker:
+            return
+        try:
+            fin = qWatch.get(block=True, timeout=timeout)
+            assert fin is TaskFinishedMarker, "invalid finish marker"
+        except Empty:
+            # Timed out, force-kill the test.
+            proc.terminate()
+            fin = qWatch.get(block=True, timeout=sys.maxint)
+            assert fin is TaskFinishedMarker, "invalid finish marker"
 
 
 def run_all_tests_gen(tests, prefix, results, options):
     """
     Uses scatter-gather to a thread-pool to manage children.
     """
     qTasks, qResults = Queue(), Queue()
 
     workers = []
+    watchdogs = []
     for _ in range(options.worker_count):
-        worker = Thread(target=_do_work, args=(qTasks, qResults, prefix,
-                                               options.timeout))
+        qWatch = Queue()
+        watcher = Thread(target=_do_watch, args=(qWatch, options.timeout))
+        watcher.setDaemon(True)
+        watcher.start()
+        watchdogs.append(watcher)
+        worker = Thread(target=_do_work, args=(qTasks, qResults, qWatch,
+                                               prefix, options.timeout))
         worker.setDaemon(True)
         worker.start()
         workers.append(worker)
 
     # Insert all jobs into the queue, followed by the queue-end
     # marker, one per worker. This will not block on growing the
     # queue, only on waiting for more items in the generator. The
     # workers are already started, however, so this will process as
     # fast as we can produce tests from the filesystem.
     for test in tests:
         qTasks.put(test)
     for _ in workers:
         qTasks.put(EndMarker)
 
     # Read from the results.
     ended = 0
+    delay = ProgressBar.update_granularity().total_seconds()
     while ended < len(workers):
-        result = qResults.get(block=True, timeout=sys.maxint)
-        if result is EndMarker:
-            ended += 1
-        else:
-            yield result
+        try:
+            result = qResults.get(block=True, timeout=delay)
+            if result is EndMarker:
+                ended += 1
+            else:
+                yield result
+        except Empty:
+            results.pb.poke()
 
     # Cleanup and exit.
     for worker in workers:
         worker.join()
+    for watcher in watchdogs:
+        watcher.join()
     assert qTasks.empty(), "Send queue not drained"
     assert qResults.empty(), "Result queue not drained"
 
 
 def run_all_tests(tests, prefix, results, options):
     for result in run_all_tests_gen(tests, prefix, results, options):
         results.push(result)
     return True
--- a/js/src/tests/lib/tests.py
+++ b/js/src/tests/lib/tests.py
@@ -40,72 +40,16 @@ JITFLAGS = {
 def get_jitflags(variant, **kwargs):
     if variant not in JITFLAGS:
         print('Invalid jitflag: "{}"'.format(variant))
         sys.exit(1)
     if variant == 'none' and 'none' in kwargs:
         return kwargs['none']
     return JITFLAGS[variant]
 
-def do_run_cmd(cmd):
-    l = [None, None]
-    th_run_cmd(cmd, l)
-    return l[1]
-
-def set_limits():
-    # resource module not supported on all platforms
-    try:
-        import resource
-        GB = 2**30
-        resource.setrlimit(resource.RLIMIT_AS, (2*GB, 2*GB))
-    except:
-        return
-
-def th_run_cmd(cmd, l):
-    t0 = datetime.datetime.now()
-
-    # close_fds and preexec_fn are not supported on Windows and will
-    # cause a ValueError.
-    options = {}
-    if sys.platform != 'win32':
-        options["close_fds"] = True
-        options["preexec_fn"] = set_limits
-    p = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, **options)
-
-    l[0] = p
-    out, err = p.communicate()
-    t1 = datetime.datetime.now()
-    dd = t1-t0
-    dt = dd.seconds + 1e-6 * dd.microseconds
-    l[1] = (out, err, p.returncode, dt)
-
-def run_cmd(cmd, timeout=60.0):
-    if timeout is None:
-        return do_run_cmd(cmd)
-
-    l = [None, None]
-    timed_out = False
-    th = Thread(target=th_run_cmd, args=(cmd, l))
-    th.start()
-    th.join(timeout)
-    while th.isAlive():
-        if l[0] is not None:
-            try:
-                # In Python 3, we could just do l[0].kill().
-                import signal
-                if sys.platform != 'win32':
-                    os.kill(l[0].pid, signal.SIGKILL)
-                time.sleep(.1)
-                timed_out = True
-            except OSError:
-                # Expecting a "No such process" error
-                pass
-    th.join()
-    return l[1] + (timed_out,)
-
 class Test(object):
     """A runnable test."""
     def __init__(self, path):
         self.path = path     # str:  path of JS file relative to tests root dir
         self.options = []    # [str]: Extra options to pass to the shell
         self.jitflags = []   # [str]: JIT flags to pass to the shell
 
     @staticmethod
@@ -119,21 +63,16 @@ class Test(object):
             + ['-f', os.path.join(path, 'shell.js')]
 
     def get_command(self, prefix):
         dirname, filename = os.path.split(self.path)
         cmd = prefix + self.jitflags + self.options \
               + Test.prefix_command(dirname) + ['-f', self.path]
         return cmd
 
-    def run(self, prefix, timeout=30.0):
-        cmd = self.get_command(prefix)
-        out, err, rc, dt, timed_out = run_cmd(cmd, timeout)
-        return TestOutput(self, cmd, out, err, rc, dt, timed_out)
-
 class TestCase(Test):
     """A test case consisting of a test and an expected result."""
     def __init__(self, path):
         Test.__init__(self, path)
         self.enable = True   # bool: True => run test, False => don't run
         self.expect = True   # bool: expected result, True => pass
         self.random = False  # bool: True => ignore output as 'random'
         self.slow = False    # bool: True => test may run slowly