Bug 1174262 - Make jstest dispatch via a generator on Windows; r=sfink
authorTerrence Cole <terrence@mozilla.com>
Thu, 11 Jun 2015 18:37:21 -0700
changeset 249455 47ed04af6273d5e79769c49a87bbe5d4efb98a09
parent 249454 ec6773c9131c21c8d28ff48f047d44ee626bb4b5
child 249456 1efbf4e6be470943b5208c2121e5acc520b9a0f2
push id28927
push usercbook@mozilla.com
push dateThu, 18 Jun 2015 13:13:33 +0000
treeherdermozilla-central@efe86609e776 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssfink
bugs1174262
milestone41.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1174262 - Make jstest dispatch via a generator on Windows; r=sfink
js/src/tests/lib/tasks_win.py
--- a/js/src/tests/lib/tasks_win.py
+++ b/js/src/tests/lib/tasks_win.py
@@ -1,96 +1,67 @@
-# Multiprocess activities with a push-driven divide-process-collect model.
-
-from __future__ import print_function
-
-from threading import Thread, Lock
-from Queue import Queue, Empty
-from datetime import datetime
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+from __future__ import print_function, unicode_literals, division
 
-class Source:
-    def __init__(self, task_list, results, timeout, verbose=False):
-        self.tasks = Queue()
-        for task in task_list:
-            self.tasks.put_nowait(task)
+import sys
+from threading import Thread
+from Queue import Queue, Empty
 
-        self.results = results
-        self.timeout = timeout
-        self.verbose = verbose
 
-    def start(self, worker_count):
-        t0 = datetime.now()
+class EndMarker:
+    pass
+
 
-        sink = Sink(self.results)
-        self.workers = [Worker(_ + 1, self.tasks, sink, self.timeout,
-                               self.verbose)
-                        for _ in range(worker_count)]
-        if self.verbose:
-            print('[P] Starting workers.')
-        for w in self.workers:
-            w.t0 = t0
-            w.start()
-        ans = self.join_workers()
-        if self.verbose:
-            print('[P] Finished.')
-        return ans
+def _do_work(qTasks, qResults, timeout):
+    while True:
+        test = qTasks.get(block=True, timeout=sys.maxint)
+        if test is EndMarker:
+            qResults.put(EndMarker)
+            return
+        qResults.put(test.run(test.js_cmd_prefix, timeout))
 
-    def join_workers(self):
-        try:
-            for w in self.workers:
-                w.join(20000)
-            return True
-        except KeyboardInterrupt:
-            for w in self.workers:
-                w.stop = True
-            return False
+
+def run_all_tests_gen(tests, results, options):
+    """
+    Uses scatter-gather to a thread-pool to manage children.
+    """
+    qTasks, qResults = Queue(), Queue()
 
-class Sink:
-    def __init__(self, results):
-        self.results = results
-        self.lock = Lock()
-
-    def push(self, result):
-        self.lock.acquire()
-        try:
-            self.results.push(result)
-        finally:
-            self.lock.release()
-
-class Worker(Thread):
-    def __init__(self, id, tasks, sink, timeout, verbose):
-        Thread.__init__(self)
-        self.setDaemon(True)
-        self.id = id
-        self.tasks = tasks
-        self.sink = sink
-        self.timeout = timeout
-        self.verbose = verbose
+    workers = []
+    for _ in range(options.worker_count):
+        worker = Thread(target=_do_work, args=(qTasks, qResults, options.timeout))
+        worker.setDaemon(True)
+        worker.start()
+        workers.append(worker)
 
-        self.thread = None
-        self.stop = False
-        self.t0 = 0
-
-    def log(self, msg):
-        if self.verbose:
-            dd = datetime.now() - self.t0
-            dt = dd.seconds + 1e-6 * dd.microseconds
-            print('[W{:d} {:.3f}] {}'.format(self.id, dt, msg))
+    # Insert all jobs into the queue, followed by the queue-end
+    # marker, one per worker. This will not block on growing the
+    # queue, only on waiting for more items in the generator. The
+    # workers are already started, however, so this will process as
+    # fast as we can produce tests from the filesystem.
+    for test in tests:
+        qTasks.put(test)
+    for _ in workers:
+        qTasks.put(EndMarker)
 
-    def run(self):
-        try:
-            while True:
-                if self.stop:
-                    break
-                self.log('Get next task.')
-                task = self.tasks.get(False)
-                self.log('Start task {}.'.format(str(task)))
-                result = task.run(task.js_cmd_prefix, self.timeout)
-                self.log('Finished task.')
-                self.sink.push(result)
-                self.log('Pushed result.')
-        except Empty:
-            pass
+    # Read from the results.
+    ended = 0
+    while ended < len(workers):
+        result = qResults.get(block=True, timeout=sys.maxint)
+        if result is EndMarker:
+            ended += 1
+        else:
+            yield result
+
+    # Cleanup and exit.
+    for worker in workers:
+        worker.join()
+    assert qTasks.empty(), "Send queue not drained"
+    assert qResults.empty(), "Result queue not drained"
+
 
 def run_all_tests(tests, results, options):
-    pipeline = Source(tests, results, options.timeout, False)
-    return pipeline.start(options.worker_count)
+    for result in run_all_tests_gen(tests, results, options):
+        results.push(result)
+    return True