Bug 1369711 - [mozlint] Refactor concurrent.futures ProcessPoolExecutor code for readability r=gps
authorAndrew Halberstadt <ahalberstadt@mozilla.com>
Fri, 23 Feb 2018 09:02:16 -0500
changeset 408701 9712703bd2428c0127e0425c60da22f0b22fe60f
parent 408700 6426e089e5c548dc1c75bc055792d85fe89573ea
child 408702 7fb1a832336d1d78b4f92bdf5505e9f382861565
push id101011
push usernerli@mozilla.com
push dateSat, 17 Mar 2018 22:28:18 +0000
treeherdermozilla-inbound@efce78e62b6d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgps
bugs1369711
milestone61.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1369711 - [mozlint] Refactor concurrent.futures ProcessPoolExecutor code for readability r=gps This commit doesn't change any behaviour, just attempts to make this a little more readable. The workers will call '_collect_results' for each WorkItem they process (either because it is finished or because it was canceled). This also differentiates between setup failures and run failures. MozReview-Commit-ID: 36Pe3bbUKmk
python/mozlint/mozlint/cli.py
python/mozlint/mozlint/roller.py
python/mozlint/test/test_cli.py
python/mozlint/test/test_roller.py
--- a/python/mozlint/mozlint/cli.py
+++ b/python/mozlint/mozlint/cli.py
@@ -170,17 +170,17 @@ def run(paths, linters, fmt, outgoing, w
     if edit and results:
         edit_results(results)
         results = lint.roll(results.keys())
 
     formatter = formatters.get(fmt)
 
     # Encode output with 'replace' to avoid UnicodeEncodeErrors on
     # environments that aren't using utf-8.
-    out = formatter(results, failed=lint.failed).encode(
+    out = formatter(results, failed=lint.failed | lint.failed_setup).encode(
                     sys.stdout.encoding or 'ascii', 'replace')
     if out:
         print(out)
     return 1 if results or lint.failed else 0
 
 
 if __name__ == '__main__':
     parser = MozlintParser()
--- a/python/mozlint/mozlint/roller.py
+++ b/python/mozlint/mozlint/roller.py
@@ -17,44 +17,38 @@ from subprocess import CalledProcessErro
 from mozversioncontrol import get_repository_object, MissingUpstreamRepo, InvalidRepoPath
 
 from .errors import LintersNotConfigured
 from .parser import Parser
 from .pathutils import findobject
 from .types import supported_types
 
 
-def _run_linters(config, paths, **lintargs):
+def _run_worker(config, paths, **lintargs):
     results = defaultdict(list)
     failed = []
 
     func = supported_types[config['type']]
-    res = func(paths, config, **lintargs) or []
+    try:
+        res = func(paths, config, **lintargs) or []
+    except Exception:
+        traceback.print_exc()
+        res = 1
+    finally:
+        sys.stdout.flush()
 
     if not isinstance(res, (list, tuple)):
         if res:
             failed.append(config['name'])
     else:
         for r in res:
             results[r.path].append(r)
     return results, failed
 
 
-def _run_worker(*args, **kwargs):
-    try:
-        return _run_linters(*args, **kwargs)
-    except Exception:
-        # multiprocessing seems to munge worker exceptions, print
-        # it here so it isn't lost.
-        traceback.print_exc()
-        raise
-    finally:
-        sys.stdout.flush()
-
-
 class LintRoller(object):
     """Registers and runs linters.
 
     :param root: Path to which relative paths will be joined. If
                  unspecified, root will either be determined from
                  version control or cwd.
     :param lintargs: Arguments to pass to the underlying linter(s).
     """
@@ -66,18 +60,21 @@ class LintRoller(object):
             self.vcs = get_repository_object(root)
         except InvalidRepoPath:
             self.vcs = None
 
         self.linters = []
         self.lintargs = lintargs
         self.lintargs['root'] = root
 
-        # linters that return non-zero
-        self.failed = set()
+        # result state
+        self.failed = None
+        self.failed_setup = None
+        self.results = None
+
         self.root = root
 
     def read(self, paths):
         """Parse one or more linters and add them to the registry.
 
         :param paths: A path or iterable of paths to linter definitions.
         """
         if isinstance(paths, basestring):
@@ -86,58 +83,72 @@ class LintRoller(object):
         for path in paths:
             self.linters.extend(self.parse(path))
 
     def setup(self):
         """Run setup for applicable linters"""
         if not self.linters:
             raise LintersNotConfigured
 
-        failed = set()
+        self.failed_setup = set()
         for linter in self.linters:
             if 'setup' not in linter:
                 continue
 
             try:
                 res = findobject(linter['setup'])(self.root)
             except Exception:
                 traceback.print_exc()
                 res = 1
 
             if res:
-                failed.add(linter['name'])
+                self.failed_setup.add(linter['name'])
 
-        if failed:
-            print("error: problem with lint setup, skipping {}".format(', '.join(sorted(failed))))
-            self.linters = [l for l in self.linters if l['name'] not in failed]
-            self.failed.update(failed)
+        if self.failed_setup:
+            print("error: problem with lint setup, skipping {}".format(
+                    ', '.join(sorted(self.failed_setup))))
+            self.linters = [l for l in self.linters if l['name'] not in self.failed_setup]
             return 1
         return 0
 
     def _generate_jobs(self, paths, num_procs):
         """A job is of the form (<linter:dict>, <paths:list>)."""
         chunk_size = min(self.MAX_PATHS_PER_JOB, int(ceil(float(len(paths)) / num_procs)))
         while paths:
             for linter in self.linters:
                 yield linter, paths[:chunk_size]
             paths = paths[chunk_size:]
 
+    def _collect_results(self, future):
+        if future.cancelled():
+            return
+
+        results, failed = future.result()
+        if failed:
+            self.failed.update(set(failed))
+        for k, v in results.iteritems():
+            self.results[k].extend(v)
+
     def roll(self, paths=None, outgoing=None, workdir=None, num_procs=None):
         """Run all of the registered linters against the specified file paths.
 
         :param paths: An iterable of files and/or directories to lint.
         :param outgoing: Lint files touched by commits that are not on the remote repository.
         :param workdir: Lint all files touched in the working directory.
         :param num_procs: The number of processes to use. Default: cpu count
         :return: A dictionary with file names as the key, and a list of
                  :class:`~result.ResultContainer`s as the value.
         """
         if not self.linters:
             raise LintersNotConfigured
 
+        # reset result state
+        self.results = defaultdict(list)
+        self.failed = set()
+
         # Need to use a set in case vcs operations specify the same file
         # more than once.
         paths = paths or set()
         if isinstance(paths, basestring):
             paths = set([paths])
         elif isinstance(paths, (list, tuple)):
             paths = set(paths)
 
@@ -165,24 +176,27 @@ class LintRoller(object):
 
         paths = paths or ['.']
 
         # This will convert paths back to a list, but that's ok since
         # we're done adding to it.
         paths = map(os.path.abspath, paths)
 
         num_procs = num_procs or cpu_count()
-        all_results = defaultdict(list)
-        with ProcessPoolExecutor(num_procs) as executor:
-            futures = [executor.submit(_run_worker, config, p, **self.lintargs)
-                       for config, p in self._generate_jobs(paths, num_procs)]
-            # ignore SIGINT in parent so we can still get partial results
-            # from child processes. These should shutdown quickly anyway.
-            orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
-            for future in futures:
-                results, failed = future.result()
-                if failed:
-                    self.failed.update(set(failed))
-                for k, v in results.iteritems():
-                    all_results[k].extend(v)
+        jobs = list(self._generate_jobs(paths, num_procs))
+
+        # Make sure we never spawn more processes than we have jobs.
+        num_procs = min(len(jobs), num_procs)
+
+        executor = ProcessPoolExecutor(num_procs)
 
+        # Submit jobs to the worker pool. The _collect_results method will be
+        # called when a job is finished.
+        for job in jobs:
+            future = executor.submit(_run_worker, *job, **self.lintargs)
+            future.add_done_callback(self._collect_results)
+
+        # Ignore SIGINT in parent so we can still get partial results
+        # from child processes. These should shutdown quickly anyway.
+        orig_sigint = signal.signal(signal.SIGINT, signal.SIG_IGN)
+        executor.shutdown()  # blocks until all workers have finished
         signal.signal(signal.SIGINT, orig_sigint)
-        return all_results
+        return self.results
--- a/python/mozlint/test/test_cli.py
+++ b/python/mozlint/test/test_cli.py
@@ -44,20 +44,21 @@ def test_cli_run_with_fix(run, capfd):
 @pytest.mark.skipif(not find_executable("echo"), reason="No `echo` executable found.")
 def test_cli_run_with_edit(run, parser, capfd):
     os.environ['EDITOR'] = 'echo'
 
     ret = run(['-f', 'compact', '--edit', '--linter', 'external'])
     out, err = capfd.readouterr()
     out = out.splitlines()
     assert ret == 1
-    assert len(out) == 5
     assert out[0].endswith('foobar.js')  # from the `echo` editor
     assert "foobar.js: line 1, col 1, Error" in out[1]
     assert "foobar.js: line 2, col 1, Error" in out[2]
+    assert "2 problems" in out[-1]
+    assert len(out) == 5
 
     del os.environ['EDITOR']
     with pytest.raises(SystemExit):
         parser.parse_args(['--edit'])
 
 
 def test_cli_run_with_setup(run, capfd):
     # implicitly call setup
--- a/python/mozlint/test/test_roller.py
+++ b/python/mozlint/test/test_roller.py
@@ -7,58 +7,57 @@ from __future__ import absolute_import
 import os
 import platform
 import sys
 
 import mozunit
 import pytest
 
 from mozlint import ResultContainer
-from mozlint.errors import LintersNotConfigured, LintException
+from mozlint.errors import LintersNotConfigured
 
 
 here = os.path.abspath(os.path.dirname(__file__))
 
 
 linters = ('string.yml', 'regex.yml', 'external.yml')
 
 
 def test_roll_no_linters_configured(lint, files):
     with pytest.raises(LintersNotConfigured):
         lint.roll(files)
 
 
 def test_roll_successful(lint, linters, files):
     lint.read(linters)
 
+    assert lint.results is None
     result = lint.roll(files)
     assert len(result) == 1
+    assert lint.results == result
     assert lint.failed == set([])
 
     path = result.keys()[0]
     assert os.path.basename(path) == 'foobar.js'
 
     errors = result[path]
     assert isinstance(errors, list)
     assert len(errors) == 6
 
     container = errors[0]
     assert isinstance(container, ResultContainer)
     assert container.rule == 'no-foobar'
 
 
-def test_roll_catch_exception(lint, lintdir, files):
+def test_roll_catch_exception(lint, lintdir, files, capfd):
     lint.read(os.path.join(lintdir, 'raises.yml'))
 
-    # suppress printed traceback from test output
-    old_stderr = sys.stderr
-    sys.stderr = open(os.devnull, 'w')
-    with pytest.raises(LintException):
-        lint.roll(files)
-    sys.stderr = old_stderr
+    lint.roll(files)  # assert not raises
+    out, err = capfd.readouterr()
+    assert 'LintException' in err
 
 
 def test_roll_with_excluded_path(lint, linters, files):
     lint.lintargs.update({'exclude': ['**/foobar.js']})
 
     lint.read(linters)
     result = lint.roll(files)
 
@@ -71,46 +70,46 @@ def test_roll_with_invalid_extension(lin
     result = lint.roll(os.path.join(filedir, 'foobar.py'))
     assert len(result) == 0
     assert lint.failed == set([])
 
 
 def test_roll_with_failure_code(lint, lintdir, files):
     lint.read(os.path.join(lintdir, 'badreturncode.yml'))
 
-    assert lint.failed == set([])
+    assert lint.failed is None
     result = lint.roll(files, num_procs=1)
     assert len(result) == 0
     assert lint.failed == set(['BadReturnCodeLinter'])
 
 
-def fake_run_linters(config, paths, **lintargs):
+def fake_run_worker(config, paths, **lintargs):
     return {'count': [1]}, []
 
 
 @pytest.mark.skipif(platform.system() == 'Windows',
                     reason="monkeypatch issues with multiprocessing on Windows")
 @pytest.mark.parametrize('num_procs', [1, 4, 8, 16])
 def test_number_of_jobs(monkeypatch, lint, linters, files, num_procs):
-    monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+    monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
 
     lint.read(linters)
     num_jobs = len(lint.roll(files, num_procs=num_procs)['count'])
 
     if len(files) >= num_procs:
         assert num_jobs == num_procs * len(linters)
     else:
         assert num_jobs == len(files) * len(linters)
 
 
 @pytest.mark.skipif(platform.system() == 'Windows',
                     reason="monkeypatch issues with multiprocessing on Windows")
 @pytest.mark.parametrize('max_paths,expected_jobs', [(1, 12), (4, 6), (16, 6)])
 def test_max_paths_per_job(monkeypatch, lint, linters, files, max_paths, expected_jobs):
-    monkeypatch.setattr(sys.modules[lint.__module__], '_run_linters', fake_run_linters)
+    monkeypatch.setattr(sys.modules[lint.__module__], '_run_worker', fake_run_worker)
 
     files = files[:4]
     assert len(files) == 4
 
     linters = linters[:3]
     assert len(linters) == 3
 
     lint.MAX_PATHS_PER_JOB = max_paths
@@ -128,13 +127,13 @@ def test_setup(lint, linters, filedir, c
 
     lint.read(linters)
     lint.setup()
     out, err = capfd.readouterr()
     assert 'setup passed' in out
     assert 'setup failed' in out
     assert 'setup raised' in out
     assert 'error: problem with lint setup, skipping' in out
-    assert lint.failed == set(['SetupFailedLinter', 'SetupRaisedLinter'])
+    assert lint.failed_setup == set(['SetupFailedLinter', 'SetupRaisedLinter'])
 
 
 if __name__ == '__main__':
     mozunit.main()