js/src/tests/lib/tasks_win.py
author Terrence Cole <terrence@mozilla.com>
Thu, 18 Jun 2015 10:33:34 -0700
changeset 269163 ceba6484abda5dcf2eed70c46798a609d158e96a
parent 269159 ede9f497a5cc70239a2b7aa9e9f071b46f1089eb
child 269435 cf426328aee818b5a1756123243631dd55193f34
permissions -rw-r--r--
Bug 1175708 - Followup to fix windows jstests.py bustage on a CLOSED TREE; r=meow

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/. */
from __future__ import print_function, unicode_literals, division

import sys
from threading import Thread
from Queue import Queue, Empty


class EndMarker:
    pass


def _do_work(qTasks, qResults, prefix, timeout):
    while True:
        test = qTasks.get(block=True, timeout=sys.maxint)
        if test is EndMarker:
            qResults.put(EndMarker)
            return
        qResults.put(test.run(prefix, timeout))


def run_all_tests_gen(tests, prefix, results, options):
    """
    Uses scatter-gather to a thread-pool to manage children.
    """
    qTasks, qResults = Queue(), Queue()

    workers = []
    for _ in range(options.worker_count):
        worker = Thread(target=_do_work, args=(qTasks, qResults, prefix,
                                               options.timeout))
        worker.setDaemon(True)
        worker.start()
        workers.append(worker)

    # Insert all jobs into the queue, followed by the queue-end
    # marker, one per worker. This will not block on growing the
    # queue, only on waiting for more items in the generator. The
    # workers are already started, however, so this will process as
    # fast as we can produce tests from the filesystem.
    for test in tests:
        qTasks.put(test)
    for _ in workers:
        qTasks.put(EndMarker)

    # Read from the results.
    ended = 0
    while ended < len(workers):
        result = qResults.get(block=True, timeout=sys.maxint)
        if result is EndMarker:
            ended += 1
        else:
            yield result

    # Cleanup and exit.
    for worker in workers:
        worker.join()
    assert qTasks.empty(), "Send queue not drained"
    assert qResults.empty(), "Result queue not drained"


def run_all_tests(tests, prefix, results, options):
    for result in run_all_tests_gen(tests, prefix, results, options):
        results.push(result)
    return True