Bug 1174262 - Make jstest dispatch via a generator on Windows; r=sfink
authorTerrence Cole <terrence@mozilla.com>
Thu, 11 Jun 2015 18:37:21 -0700
changeset 267548 47ed04af6273d5e79769c49a87bbe5d4efb98a09
parent 267547 ec6773c9131c21c8d28ff48f047d44ee626bb4b5
child 267549 1efbf4e6be470943b5208c2121e5acc520b9a0f2
push id4932
push userjlund@mozilla.com
push dateMon, 10 Aug 2015 18:23:06 +0000
treeherdermozilla-esr52@6dd5a4f5f745 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewerssfink
bugs1174262
milestone41.0a1
Bug 1174262 - Make jstest dispatch via a generator on Windows; r=sfink
js/src/tests/lib/tasks_win.py
--- a/js/src/tests/lib/tasks_win.py
+++ b/js/src/tests/lib/tasks_win.py
@@ -1,96 +1,67 @@
-# Multiprocess activities with a push-driven divide-process-collect model.
-
-from __future__ import print_function
-
-from threading import Thread, Lock
-from Queue import Queue, Empty
-from datetime import datetime
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
+from __future__ import print_function, unicode_literals, division
 
-class Source:
-    def __init__(self, task_list, results, timeout, verbose=False):
-        self.tasks = Queue()
-        for task in task_list:
-            self.tasks.put_nowait(task)
+import sys
+from threading import Thread
+from Queue import Queue, Empty
 
-        self.results = results
-        self.timeout = timeout
-        self.verbose = verbose
 
-    def start(self, worker_count):
-        t0 = datetime.now()
+class EndMarker:
+    pass
+
 
-        sink = Sink(self.results)
-        self.workers = [Worker(_ + 1, self.tasks, sink, self.timeout,
-                               self.verbose)
-                        for _ in range(worker_count)]
-        if self.verbose:
-            print('[P] Starting workers.')
-        for w in self.workers:
-            w.t0 = t0
-            w.start()
-        ans = self.join_workers()
-        if self.verbose:
-            print('[P] Finished.')
-        return ans
+def _do_work(qTasks, qResults, timeout):
+    while True:
+        test = qTasks.get(block=True, timeout=sys.maxint)
+        if test is EndMarker:
+            qResults.put(EndMarker)
+            return
+        qResults.put(test.run(test.js_cmd_prefix, timeout))
 
-    def join_workers(self):
-        try:
-            for w in self.workers:
-                w.join(20000)
-            return True
-        except KeyboardInterrupt:
-            for w in self.workers:
-                w.stop = True
-            return False
+
+def run_all_tests_gen(tests, results, options):
+    """
+    Uses scatter-gather to a thread-pool to manage children.
+    """
+    qTasks, qResults = Queue(), Queue()
 
-class Sink:
-    def __init__(self, results):
-        self.results = results
-        self.lock = Lock()
-
-    def push(self, result):
-        self.lock.acquire()
-        try:
-            self.results.push(result)
-        finally:
-            self.lock.release()
-
-class Worker(Thread):
-    def __init__(self, id, tasks, sink, timeout, verbose):
-        Thread.__init__(self)
-        self.setDaemon(True)
-        self.id = id
-        self.tasks = tasks
-        self.sink = sink
-        self.timeout = timeout
-        self.verbose = verbose
+    workers = []
+    for _ in range(options.worker_count):
+        worker = Thread(target=_do_work, args=(qTasks, qResults, options.timeout))
+        worker.setDaemon(True)
+        worker.start()
+        workers.append(worker)
 
-        self.thread = None
-        self.stop = False
-        self.t0 = 0
-
-    def log(self, msg):
-        if self.verbose:
-            dd = datetime.now() - self.t0
-            dt = dd.seconds + 1e-6 * dd.microseconds
-            print('[W{:d} {:.3f}] {}'.format(self.id, dt, msg))
+    # Insert all jobs into the queue, followed by the queue-end
+    # marker, one per worker. This will not block on growing the
+    # queue, only on waiting for more items in the generator. The
+    # workers are already started, however, so this will process as
+    # fast as we can produce tests from the filesystem.
+    for test in tests:
+        qTasks.put(test)
+    for _ in workers:
+        qTasks.put(EndMarker)
 
-    def run(self):
-        try:
-            while True:
-                if self.stop:
-                    break
-                self.log('Get next task.')
-                task = self.tasks.get(False)
-                self.log('Start task {}.'.format(str(task)))
-                result = task.run(task.js_cmd_prefix, self.timeout)
-                self.log('Finished task.')
-                self.sink.push(result)
-                self.log('Pushed result.')
-        except Empty:
-            pass
+    # Read from the results.
+    ended = 0
+    while ended < len(workers):
+        result = qResults.get(block=True, timeout=sys.maxint)
+        if result is EndMarker:
+            ended += 1
+        else:
+            yield result
+
+    # Cleanup and exit.
+    for worker in workers:
+        worker.join()
+    assert qTasks.empty(), "Send queue not drained"
+    assert qResults.empty(), "Result queue not drained"
+
 
 def run_all_tests(tests, results, options):
-    pipeline = Source(tests, results, options.timeout, False)
-    return pipeline.start(options.worker_count)
+    for result in run_all_tests_gen(tests, results, options):
+        results.push(result)
+    return True