Bug 1363428 - Switch wptrunner to use a deque for test groups, r=maja_zf, jdm
☠☠ backed out by fec6f08ea5ff ☠ ☠
authorJames Graham <james@hoppipolla.co.uk>
Sun, 28 May 2017 21:14:28 +0100
changeset 598324 6a7985dda6a142f403e3a0b5dfe93cdc1cf96afc
parent 598323 f09573370d2fb3b9d6aa43a4c0588c22365ae68f
child 598325 a85d3f3544cbc374eae165db68e81b82ed68a3f5
push id65172
push usergpascutto@mozilla.com
push dateWed, 21 Jun 2017 16:21:54 +0000
reviewersmaja_zf, jdm
bugs1363428
milestone56.0a1
Bug 1363428 - Switch wptrunner to use a deque for test groups, r=maja_zf, jdm Initially wptrunner had a single test queue that was shared between all processes. Then for --run-by-dir it changed to a queue of queues. This change makes it a queue of deques, which is simpler, since the test queues themselves are no longer shared between processes. It also changes the implementation when we aren't using --run-by-dir but are using multiple processes to pre-group the tests into N queues rather than sharing a single queue between all processes. This is necessary to use the deque of course, but importantly anticipates a change in which we will pre-compute per queue metdata for each queue; that doesn't work well with one shared queue. The downside of this change is that there is no work stealing, so it may be less efficient if we randomly assign many slow jobs to one particular process. MozReview-Commit-ID: 7e0Odk7yDwr
testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testloader.py
@@ -558,80 +558,73 @@ class TestLoader(object):
                 group = test.url.split("/")[1]
                 groups.add(group)
 
         return groups
 
 
 class TestSource(object):
     __metaclass__ = ABCMeta
+    def __init__(self, test_queue):
+        self.test_queue = test_queue
+        self.current_queue = None
 
     @abstractmethod
-    def queue_tests(self, test_queue):
-        pass
-
-    @abstractmethod
-    def requeue_test(self, test):
+    def queue_tests(self, test_queue, test_type, tests, **kwargs):
         pass
 
     def __enter__(self):
         return self
 
+    def get_queue(self):
+        if not self.current_queue or len(self.current_queue) == 0:
+            try:
+                self.current_queue = self.test_queue.get(block=False)
+            except Empty:
+                return None, None
+        return self.current_queue
+
     def __exit__(self, *args, **kwargs):
         pass
 
 
-class SingleTestSource(TestSource):
-    def __init__(self, test_queue):
-        self.test_queue = test_queue
+class GroupedSource(TestSource):
+    @classmethod
+    def new_group(cls, state, test, **kwargs):
+        raise NotImplementedError
 
     @classmethod
-    def queue_tests(cls, test_queue, test_type, tests):
-        for test in tests[test_type]:
-            test_queue.put(test)
-
-    def get_queue(self):
-        if self.test_queue.empty():
-            return None
-        return self.test_queue
-
-    def requeue_test(self, test):
-        self.test_queue.put(test)
-
-class PathGroupedSource(TestSource):
-    def __init__(self, test_queue):
-        self.test_queue = test_queue
-        self.current_queue = None
-
-    @classmethod
-    def queue_tests(cls, test_queue, test_type, tests, depth=None):
-        if depth is True:
-            depth = None
-
-        prev_path = None
+    def queue_tests(cls, test_queue, test_type, tests, **kwargs):
+        state = {}
         group = None
 
         for test in tests[test_type]:
-            path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
-            if path != prev_path:
-                group = []
+            if cls.new_group(state, test, **kwargs):
+                group = deque([])
                 test_queue.put(group)
-                prev_path = path
 
             group.append(test)
 
-    def get_queue(self):
-        if not self.current_queue or self.current_queue.empty():
-            try:
-                data = self.test_queue.get(block=True, timeout=1)
-                self.current_queue = Queue()
-                for item in data:
-                    self.current_queue.put(item)
-            except Empty:
-                return None
-        return self.current_queue
+
+class SingleTestSource(TestSource):
+    @classmethod
+    def queue_tests(cls, test_queue, test_type, tests, **kwargs):
+        processes = kwargs["processes"]
+        queues = [deque([]) for _ in xrange(processes)]
+        for test in tests[test_type]:
+            idx = hash(test.id) % processes
+            group = queues[idx]
+            group.append(test)
 
-    def requeue_test(self, test):
-        self.current_queue.put(test)
+        for item in queues:
+            test_queue.put(item)
+
 
-    def __exit__(self, *args, **kwargs):
-        if self.current_queue:
-            self.current_queue.close()
+class PathGroupedSource(GroupedSource):
+    @classmethod
+    def new_group(cls, state, test, **kwargs):
+        depth = kwargs.get("depth")
+        if depth is True:
+            depth = None
+        path = urlparse.urlsplit(test.url).path.split("/")[1:-1][:depth]
+        rv = path != state.get("prev_path")
+        state["prev_path"] = path
+        return rv
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/testrunner.py
@@ -142,18 +142,16 @@ manager_count = 0
 
 def next_manager_number():
     global manager_count
     local = manager_count = manager_count + 1
     return local
 
 
 class BrowserManager(object):
-    init_lock = threading.Lock()
-
     def __init__(self, logger, browser, command_queue, no_timeout=False):
         self.logger = logger
         self.browser = browser
         self.no_timeout = no_timeout
         self.browser_settings = None
         self.last_test = None
 
         self.started = False
@@ -175,37 +173,34 @@ class BrowserManager(object):
         # It seems that this lock is helpful to prevent some race that otherwise
         # sometimes stops the spawned processes initalising correctly, and
         # leaves this thread hung
         if self.init_timer is not None:
             self.init_timer.cancel()
 
         self.logger.debug("Init called, starting browser and runner")
 
-        with self.init_lock:
-            # Guard against problems initialising the browser or the browser
-            # remote control method
-            if not self.no_timeout:
-                self.init_timer = threading.Timer(self.browser.init_timeout,
-                                                  self.init_timeout)
-            try:
-                if self.init_timer is not None:
-                    self.init_timer.start()
-                self.logger.debug("Starting browser with settings %r" % self.browser_settings)
-                self.browser.start(**self.browser_settings)
-                self.browser_pid = self.browser.pid()
-            except:
-                self.logger.warning("Failure during init %s" % traceback.format_exc())
-                if self.init_timer is not None:
-                    self.init_timer.cancel()
-                self.logger.error(traceback.format_exc())
-                succeeded = False
-            else:
-                succeeded = True
-                self.started = True
+        if not self.no_timeout:
+            self.init_timer = threading.Timer(self.browser.init_timeout,
+                                              self.init_timeout)
+        try:
+            if self.init_timer is not None:
+                self.init_timer.start()
+            self.logger.debug("Starting browser with settings %r" % self.browser_settings)
+            self.browser.start(**self.browser_settings)
+            self.browser_pid = self.browser.pid()
+        except:
+            self.logger.warning("Failure during init %s" % traceback.format_exc())
+            if self.init_timer is not None:
+                self.init_timer.cancel()
+            self.logger.error(traceback.format_exc())
+            succeeded = False
+        else:
+            succeeded = True
+            self.started = True
 
         return succeeded
 
     def send_message(self, command, *args):
         self.command_queue.put((command, args))
 
     def init_timeout(self):
         # This is called from a seperate thread, so we send a message to the
@@ -244,18 +239,16 @@ class _RunnerManagerState(object):
     error = namedtuple("error", [])
     stop = namedtuple("stop", [])
 
 
 RunnerManagerState = _RunnerManagerState()
 
 
 class TestRunnerManager(threading.Thread):
-    init_lock = threading.Lock()
-
     def __init__(self, suite_name, tests, test_source_cls, browser_cls, browser_kwargs,
                  executor_cls, executor_kwargs, stop_flag, pause_after_test=False,
                  pause_on_unexpected=False, restart_on_unexpected=True, debug_info=None):
         """Thread that owns a single TestRunner process and any processes required
         by the TestRunner (e.g. the Firefox binary).
 
         TestRunnerManagers are responsible for launching the browser process and the
         runner process, and for logging the test progress. The actual test running
@@ -502,22 +495,20 @@ class TestRunnerManager(threading.Thread
     def get_next_test(self, test_queue=None):
         test = None
         while test is None:
             if test_queue is None:
                 test_queue = self.test_source.get_queue()
                 if test_queue is None:
                     self.logger.info("No more tests")
                     return None, None
-            try:
-                # Need to block here just to allow for contention with other processes
-                test = test_queue.get(block=True, timeout=2)
-            except Empty:
-                if test_queue.empty():
+                if len(test_queue) == 0:
                     test_queue = None
+                else:
+                    test = test_queue.popleft()
         return test, test_queue
 
     def run_test(self):
         assert isinstance(self.state, RunnerManagerState.running)
         assert self.state.test is not None
 
         if self.browser.update_settings(self.state.test):
             self.logger.info("Restarting browser for new test environment")
--- a/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
+++ b/testing/web-platform/tests/tools/wptrunner/wptrunner/wptrunner.py
@@ -127,23 +127,23 @@ def run_tests(config, test_paths, produc
             test_loader = kwargs["test_loader"]
         else:
             run_info, test_loader = get_loader(test_paths,
                                                product,
                                                ssl_env,
                                                run_info_extras=run_info_extras(**kwargs),
                                                **kwargs)
 
+        test_source_kwargs = {"processes": kwargs["processes"]}
         if kwargs["run_by_dir"] is False:
             test_source_cls = testloader.SingleTestSource
-            test_source_kwargs = {}
         else:
             # A value of None indicates infinite depth
             test_source_cls = testloader.PathGroupedSource
-            test_source_kwargs = {"depth": kwargs["run_by_dir"]}
+            test_source_kwargs["depth"] = kwargs["run_by_dir"]
 
         logger.info("Using %i client processes" % kwargs["processes"])
 
         unexpected_total = 0
 
         kwargs["pause_after_test"] = get_pause_after_test(test_loader, **kwargs)
 
         with env.TestEnvironment(test_paths,