Bug 1495007 - Ensure that we capture stdio from executor subprocess r=ato
authorJames Graham <james@hoppipolla.co.uk>
Mon, 01 Oct 2018 06:49:58 +0000
changeset 494673 d47b208203c898cff3172b9c05e72ee8a5dc8a4a
parent 494665 6f59e128ed67ef50a35373bc808166b6c9a71c02
child 494674 3b77ee1b098608adfdd6bd62bed3eab27dfd8f69
push id9984
push userffxbld-merge
push dateMon, 15 Oct 2018 21:07:35 +0000
treeherdermozilla-beta@183d27ea8570 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersato
bugs1495007
milestone64.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1495007 - Ensure that we capture stdio from executor subprocess r=ato Multiprocessing on Linux and OSX implictly inherits the stdio handles into the subprocess. But on Windows that doesn't happen because it doesn't have fork() to abuse in the same way. So instead we need to ensure that we explicitly set up handling of stdio immediately after starting the executor subprocess to avoid gettin non-structured output. Differential Revision: https://phabricator.services.mozilla.com/D7186
testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -4,16 +4,18 @@ import multiprocessing
 import threading
 import traceback
 from Queue import Empty
 from collections import namedtuple
 from multiprocessing import Process, current_process, Queue
 
 from mozlog import structuredlog
 
+import wptlogging
+
 # Special value used as a sentinal in various commands
 Stop = object()
 
 
 class MessageLogger(object):
     def __init__(self, message_func):
         self.send_message = message_func
 
@@ -35,34 +37,35 @@ def _log_func(level_name):
     return log
 
 # Create all the methods on StructuredLog for debug levels
 for level_name in structuredlog.log_levels:
     setattr(MessageLogger, level_name.lower(), _log_func(level_name))
 
 
 class TestRunner(object):
-    def __init__(self, command_queue, result_queue, executor):
+    def __init__(self, logger, command_queue, result_queue, executor):
         """Class implementing the main loop for running tests.
 
         This class delegates the job of actually running a test to the executor
         that is passed in.
 
+        :param logger: Structured logger
         :param command_queue: subprocess.Queue used to send commands to the
                               process
         :param result_queue: subprocess.Queue used to send results to the
                              parent TestManager process
         :param executor: TestExecutor object that will actually run a test.
         """
         self.command_queue = command_queue
         self.result_queue = result_queue
 
         self.executor = executor
         self.name = current_process().name
-        self.logger = MessageLogger(self.send_message)
+        self.logger = logger
 
     def __enter__(self):
         return self
 
     def __exit__(self, exc_type, exc_value, traceback):
         self.teardown()
 
     def setup(self):
@@ -112,40 +115,41 @@ class TestRunner(object):
 
     def send_message(self, command, *args):
         self.result_queue.put((command, args))
 
 
 def start_runner(runner_command_queue, runner_result_queue,
                  executor_cls, executor_kwargs,
                  executor_browser_cls, executor_browser_kwargs,
-                 stop_flag):
+                 capture_stdio, stop_flag):
     """Launch a TestRunner in a new process"""
-    def log(level, msg):
-        runner_result_queue.put(("log", (level, {"message": msg})))
+
+    def send_message(command, *args):
+        runner_result_queue.put((command, args))
 
     def handle_error(e):
-        log("critical", traceback.format_exc())
+        logger.critical(traceback.format_exc())
         stop_flag.set()
 
-    try:
-        browser = executor_browser_cls(**executor_browser_kwargs)
-        executor = executor_cls(browser, **executor_kwargs)
-        with TestRunner(runner_command_queue, runner_result_queue, executor) as runner:
-            try:
-                runner.run()
-            except KeyboardInterrupt:
-                stop_flag.set()
-            except Exception as e:
-                handle_error(e)
-    except Exception as e:
-        handle_error(e)
-    finally:
-        runner_command_queue = None
-        runner_result_queue = None
+    logger = MessageLogger(send_message)
+
+    with wptlogging.CaptureIO(logger, capture_stdio):
+        try:
+            browser = executor_browser_cls(**executor_browser_kwargs)
+            executor = executor_cls(browser, **executor_kwargs)
+            with TestRunner(logger, runner_command_queue, runner_result_queue, executor) as runner:
+                try:
+                    runner.run()
+                except KeyboardInterrupt:
+                    stop_flag.set()
+                except Exception as e:
+                    handle_error(e)
+        except Exception as e:
+            handle_error(e)
 
 
 manager_count = 0
 
 
 def next_manager_number():
     global manager_count
     local = manager_count = manager_count + 1
@@ -250,17 +254,18 @@ class _RunnerManagerState(object):
 
 
 RunnerManagerState = _RunnerManagerState()
 
 
 class TestRunnerManager(threading.Thread):
     def __init__(self, suite_name, test_queue, test_source_cls, browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs, stop_flag, rerun=1, pause_after_test=False,
-                 pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
+                 pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None,
+                 capture_stdio=True):
         """Thread that owns a single TestRunner process and any processes required
         by the TestRunner (e.g. the Firefox binary).
 
         TestRunnerManagers are responsible for launching the browser process and the
         runner process, and for logging the test progress. The actual test running
         is done by the TestRunner. In particular they:
 
         * Start the binary of the program under test
@@ -308,16 +313,18 @@ class TestRunnerManager(threading.Thread
 
         # This may not really be what we want
         self.daemon = True
 
         self.max_restarts = 5
 
         self.browser = None
 
+        self.capture_stdio = capture_stdio
+
     def run(self):
         """Main loop for the TestManager.
 
         TestManagers generally receive commands from their
         TestRunner updating them on the status of a test. They
         may also have a stop flag set by the main thread indicating
         that the manager should shut down the next time the event loop
         spins."""
@@ -474,16 +481,17 @@ class TestRunnerManager(threading.Thread
         executor_browser_cls, executor_browser_kwargs = self.browser.browser.executor_browser()
 
         args = (self.remote_queue,
                 self.command_queue,
                 self.executor_cls,
                 self.executor_kwargs,
                 executor_browser_cls,
                 executor_browser_kwargs,
+                self.capture_stdio,
                 self.child_stop_flag)
         self.test_runner_proc = Process(target=start_runner,
                                         args=args,
                                         name="Thread-TestRunner-%i" % self.manager_number)
         self.test_runner_proc.start()
         self.logger.debug("Test runner started")
         # Now we wait for either an init_succeeded event or an init_failed event
 
@@ -732,31 +740,33 @@ def make_test_queue(tests, test_source_c
 class ManagerGroup(object):
     def __init__(self, suite_name, size, test_source_cls, test_source_kwargs,
                  browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs,
                  rerun=1,
                  pause_after_test=False,
                  pause_on_unexpected=False,
                  restart_on_unexpected=True,
-                 debug_info=None):
+                 debug_info=None,
+                 capture_stdio=True):
         """Main thread object that owns all the TestManager threads."""
         self.suite_name = suite_name
         self.size = size
         self.test_source_cls = test_source_cls
         self.test_source_kwargs = test_source_kwargs
         self.browser_cls = browser_cls
         self.browser_kwargs = browser_kwargs
         self.executor_cls = executor_cls
         self.executor_kwargs = executor_kwargs
         self.pause_after_test = pause_after_test
         self.pause_on_unexpected = pause_on_unexpected
         self.restart_on_unexpected = restart_on_unexpected
         self.debug_info = debug_info
         self.rerun = rerun
+        self.capture_stdio = capture_stdio
 
         self.pool = set()
         # Event that is polled by threads so that they can gracefully exit in the face
         # of sigint
         self.stop_flag = threading.Event()
         self.logger = structuredlog.StructuredLogger(suite_name)
 
     def __enter__(self):
@@ -783,17 +793,18 @@ class ManagerGroup(object):
                                         self.browser_kwargs,
                                         self.executor_cls,
                                         self.executor_kwargs,
                                         self.stop_flag,
                                         self.rerun,
                                         self.pause_after_test,
                                         self.pause_on_unexpected,
                                         self.restart_on_unexpected,
-                                        self.debug_info)
+                                        self.debug_info,
+                                        self.capture_stdio)
             manager.start()
             self.pool.add(manager)
         self.wait()
 
     def is_alive(self):
         """Boolean indicating whether any manager in the group is still alive"""
         return any(manager.is_alive() for manager in self.pool)
 
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
@@ -260,17 +260,18 @@ def run_tests(config, test_paths, produc
                                       browser_cls,
                                       browser_kwargs,
                                       executor_cls,
                                       executor_kwargs,
                                       kwargs["rerun"],
                                       kwargs["pause_after_test"],
                                       kwargs["pause_on_unexpected"],
                                       kwargs["restart_on_unexpected"],
-                                      kwargs["debug_info"]) as manager_group:
+                                      kwargs["debug_info"],
+                                      not kwargs["no_capture_stdio"]) as manager_group:
                         try:
                             manager_group.run(test_type, run_tests)
                         except KeyboardInterrupt:
                             logger.critical("Main thread got signal")
                             manager_group.stop()
                             raise
                     test_count += manager_group.test_count()
                     unexpected_count += manager_group.unexpected_count()