Bug 1629642 - [taskgraph] Implement a 'disperse' platform optimization strategy r=marco
authorAndrew Halberstadt <ahalberstadt@mozilla.com>
Tue, 28 Apr 2020 15:05:31 +0000
changeset 526534 c5df25796d9bda24a9807a78ce0da43c84c6d2fc
parent 526533 0414f57713e890be63db69b7f8766d80e61ce886
child 526535 a34695d9b99d1e9098e846b751c9adf1f52ee760
push id114330
push userahalberstadt@mozilla.com
push dateTue, 28 Apr 2020 17:25:33 +0000
treeherderautoland@a34695d9b99d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersmarco
bugs1629642
milestone77.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1629642 - [taskgraph] Implement a 'disperse' platform optimization strategy r=marco Differential Revision: https://phabricator.services.mozilla.com/D72465
taskcluster/taskgraph/optimize/__init__.py
taskcluster/taskgraph/optimize/bugbug.py
taskcluster/taskgraph/test/test_optimize_strategies.py
taskcluster/taskgraph/transforms/tests.py
taskcluster/taskgraph/util/schema.py
--- a/taskcluster/taskgraph/optimize/__init__.py
+++ b/taskcluster/taskgraph/optimize/__init__.py
@@ -420,16 +420,25 @@ class experimental(object):
         'test': Any(
             'skip-unless-schedules',
             Any('bugbug', 'platform-debug'),
             split_args=tuple
         ),
     }
     """Restricts tests to debug platforms."""
 
+    bugbug_disperse = {
+        'test': Any(
+            'skip-unless-schedules',
+            Any('bugbug', 'platform-disperse'),
+            split_args=tuple
+        ),
+    }
+    """Disperse tests across platforms, medium confidence threshold."""
+
     bugbug_reduced = {
         'test': Any('skip-unless-schedules', 'bugbug-reduced', split_args=tuple),
     }
     """Use the reduced set of tasks (and no groups) chosen by bugbug."""
 
     bugbug_reduced_high = {
         'test': Any('skip-unless-schedules', 'bugbug-reduced-high', split_args=tuple),
     }
--- a/taskcluster/taskgraph/optimize/bugbug.py
+++ b/taskcluster/taskgraph/optimize/bugbug.py
@@ -2,16 +2,17 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import logging
 import time
+from collections import defaultdict
 
 import requests
 from mozbuild.util import memoize, memoized_property
 from six.moves.urllib.parse import urlsplit
 
 from taskgraph.optimize import register_strategy, OptimizationStrategy
 from taskgraph.util.taskcluster import requests_retry_session
 
@@ -22,24 +23,16 @@ CT_LOW = 0.5
 CT_MEDIUM = 0.7
 CT_HIGH = 0.9
 
 
 class BugbugTimeoutException(Exception):
     pass
 
 
-@register_strategy("platform-debug")
-class SkipUnlessDebug(OptimizationStrategy):
-    """Only run debug platforms."""
-
-    def should_remove_task(self, task, params, arg):
-        return not (task.attributes.get('build_type') == "debug")
-
-
 @register_strategy("bugbug", args=(CT_MEDIUM,))
 @register_strategy("bugbug-combined-high", args=(CT_HIGH, False, True))
 @register_strategy("bugbug-low", args=(CT_LOW,))
 @register_strategy("bugbug-high", args=(CT_HIGH,))
 @register_strategy("bugbug-reduced", args=(CT_MEDIUM, True))
 @register_strategy("bugbug-reduced-high", args=(CT_HIGH, True))
 class BugBugPushSchedules(OptimizationStrategy):
     """Query the 'bugbug' service to retrieve relevant tasks and manifests.
@@ -88,49 +81,153 @@ class BugBugPushSchedules(OptimizationSt
         logger.debug("Bugbug scheduler service returns:\n{}".format(
                      json.dumps(data, indent=2)))
 
         if r.status_code == 202:
             raise BugbugTimeoutException("Timed out waiting for result from '{}'".format(url))
 
         return data
 
-    def should_remove_task(self, task, params, arg):
+    def should_remove_task(self, task, params, importance):
         branch = urlsplit(params['head_repository']).path.strip('/')
         rev = params['head_rev']
         data = self.run_query('/push/{branch}/{rev}/schedules'.format(branch=branch, rev=rev))
 
-        if not self.use_reduced_tasks:
-            tasks = set(
-                task
-                for task, confidence in data.get('tasks', {}).items()
-                if confidence >= self.confidence_threshold
-            )
-        else:
-            tasks = set(
-                task
-                for task, confidence in data.get('reduced_tasks', {}).items()
-                if confidence >= self.confidence_threshold
-            )
+        key = "reduced_tasks" if self.use_reduced_tasks else "tasks"
+        tasks = set(
+            task
+            for task, confidence in data.get(key, {}).items()
+            if confidence >= self.confidence_threshold
+        )
 
         test_manifests = task.attributes.get('test_manifests')
         if test_manifests is None or self.use_reduced_tasks:
             if task.label not in tasks:
                 return True
 
             return False
 
         # If a task contains more than one group, figure out which confidence
         # threshold to use. If 'self.combine_weights' is set, add up all
         # confidence thresholds. Otherwise just use the max.
         task_confidence = 0
-        for group, confidence in data.get("groups", {}).items():
+        groups = data.get("groups", {})
+        for group, confidence in groups.items():
             if group not in test_manifests:
                 continue
 
             if self.combine_weights:
                 task_confidence = round(
                     task_confidence + confidence - task_confidence * confidence, 2
                 )
             else:
                 task_confidence = max(task_confidence, confidence)
 
-        return task_confidence < self.confidence_threshold
+        if task_confidence < self.confidence_threshold:
+            return True
+
+        # Store group importance so future optimizers can access it.
+        for manifest in test_manifests:
+            if manifest not in groups:
+                continue
+
+            confidence = groups[manifest]
+            if confidence >= CT_HIGH:
+                importance[manifest] = 'high'
+            elif confidence >= CT_MEDIUM:
+                importance[manifest] = 'medium'
+            elif confidence >= CT_LOW:
+                importance[manifest] = 'low'
+            else:
+                importance[manifest] = 'lowest'
+
+        return False
+
+
+@register_strategy("platform-debug")
+class SkipUnlessDebug(OptimizationStrategy):
+    """Only run debug platforms."""
+
+    def should_remove_task(self, task, params, arg):
+        return not (task.attributes.get('build_type') == "debug")
+
+
+@register_strategy("platform-disperse")
+class DisperseGroups(OptimizationStrategy):
+    """Disperse groups across test configs.
+
+    Each task has an associated 'importance' dict passed in via the arg. This
+    is of the form `{<group>: <importance>}`.
+
+    Where 'group' is a test group id (usually a path to a manifest), and 'importance' is
+    one of `{'lowest', 'low', 'medium', 'high'}`.
+
+    Each importance value has an associated 'count' as defined in
+    `self.target_counts`. It guarantees that 'manifest' will run in at least
+    'count' different configurations (assuming there are enough tasks
+    containing 'manifest').
+
+    On configurations that haven't been seen before, we'll increase the target
+    count by `self.unseen_modifier` to increase the likelihood of scheduling a
+    task on that configuration.
+
+    Args:
+        target_counts (dict): Override DEFAULT_TARGET_COUNTS with custom counts. This
+            is a dict mapping the importance value ('lowest', 'low', etc) to the
+            minimum number of configurations manifests with this value should run
+            on.
+
+        unseen_modifier (int): Override DEFAULT_UNSEEN_MODIFIER to a custom
+            value. This is the amount we'll increase 'target_count' by for unseen
+            configurations.
+    """
+
+    DEFAULT_TARGET_COUNTS = {
+        'high': 3,
+        'medium': 2,
+        'low': 1,
+        'lowest': 0,
+    }
+    DEFAULT_UNSEEN_MODIFIER = 1
+
+    def __init__(self, target_counts=None, unseen_modifier=DEFAULT_UNSEEN_MODIFIER):
+        self.target_counts = self.DEFAULT_TARGET_COUNTS
+        if target_counts:
+            self.target_counts.update(target_counts)
+        self.unseen_modifier = unseen_modifier
+
+        self.count = defaultdict(int)
+        self.seen_configurations = set()
+
+    def should_remove_task(self, task, params, importance):
+        test_manifests = task.attributes.get("test_manifests")
+        test_platform = task.attributes.get("test_platform")
+
+        if not importance or not test_manifests or not test_platform:
+            return False
+
+        # Build the test configuration key.
+        key = test_platform
+        if 'unittest_variant' in task.attributes:
+            key += "-" + task.attributes['unittest_variant']
+
+        if not task.attributes["e10s"]:
+            key += "-1proc"
+
+        important_manifests = set(test_manifests) & set(importance)
+        for manifest in important_manifests:
+            target_count = self.target_counts[importance[manifest]]
+
+            # If this configuration hasn't been seen before, increase the
+            # likelihood of scheduling the task.
+            if key not in self.seen_configurations:
+                target_count += self.unseen_modifier
+
+            if self.count[manifest] < target_count:
+                # Update manifest counts and seen configurations.
+                self.seen_configurations.add(key)
+                for manifest in important_manifests:
+                    self.count[manifest] += 1
+                return False
+
+        # Should remove task because all manifests have reached their
+        # importance count (or there were no important manifests).
+        return True
--- a/taskcluster/taskgraph/test/test_optimize_strategies.py
+++ b/taskcluster/taskgraph/test/test_optimize_strategies.py
@@ -5,79 +5,139 @@ from __future__ import absolute_import
 
 import time
 
 import pytest
 from datetime import datetime
 from mozunit import main
 from time import mktime
 
-from taskgraph.optimize import registry
 from taskgraph.optimize.backstop import Backstop
-from taskgraph.optimize.bugbug import BugBugPushSchedules, BugbugTimeoutException
+from taskgraph.optimize.bugbug import (
+    BugBugPushSchedules,
+    BugbugTimeoutException,
+    DisperseGroups,
+    SkipUnlessDebug,
+)
 from taskgraph.task import Task
 
 
 @pytest.fixture(scope='module')
 def params():
     return {
         'branch': 'integration/autoland',
         'head_repository': 'https://hg.mozilla.org/integration/autoland',
         'head_rev': 'abcdef',
         'project': 'autoland',
         'pushlog_id': 1,
         'pushdate': mktime(datetime.now().timetuple()),
     }
 
 
-@pytest.fixture(scope='module')
-def generate_tasks():
+def generate_tasks(*tasks):
+    for i, task in enumerate(tasks):
+        task.setdefault('label', 'task-{}'.format(i))
+        task.setdefault('kind', 'test')
+        task.setdefault('task', {})
+        task.setdefault('attributes', {})
+        task['attributes'].setdefault('e10s', True)
 
-    def inner(*tasks):
-        for i, task in enumerate(tasks):
-            task.setdefault('label', 'task-{}'.format(i))
-            task.setdefault('kind', 'test')
-            task.setdefault('attributes', {})
-            task.setdefault('task', {})
+        for attr in ('optimization', 'dependencies', 'soft_dependencies', 'release_artifacts'):
+            task.setdefault(attr, None)
 
-            for attr in ('optimization', 'dependencies', 'soft_dependencies', 'release_artifacts'):
-                task.setdefault(attr, None)
-
-            task['task'].setdefault('label', task['label'])
-            yield Task.from_json(task)
-
-    return inner
+        task['task'].setdefault('label', task['label'])
+        yield Task.from_json(task)
 
 
-@pytest.fixture(scope='module')
-def tasks(generate_tasks):
-    return list(generate_tasks(
-        {'attributes': {'test_manifests': ['foo/test.ini', 'bar/test.ini']}},
-        {'attributes': {'test_manifests': ['bar/test.ini'], 'build_type': 'debug'}},
-        {'attributes': {'build_type': 'debug'}},
-        {'attributes': {'test_manifests': []}},
-        {'attributes': {'build_type': 'opt'}},
-    ))
+# task sets
+
+default_tasks = list(generate_tasks(
+    {'attributes': {'test_manifests': ['foo/test.ini', 'bar/test.ini']}},
+    {'attributes': {'test_manifests': ['bar/test.ini'], 'build_type': 'debug'}},
+    {'attributes': {'build_type': 'debug'}},
+    {'attributes': {'test_manifests': []}},
+    {'attributes': {'build_type': 'opt'}},
+))
+
+
+disperse_tasks = list(generate_tasks(
+    {'attributes': {
+        'test_manifests': ['foo/test.ini', 'bar/test.ini'],
+        'test_platform': 'linux/opt',
+    }},
+    {'attributes': {
+        'test_manifests': ['bar/test.ini'],
+        'test_platform': 'linux/opt',
+    }},
+    {'attributes': {
+        'test_manifests': ['bar/test.ini'],
+        'test_platform': 'windows/debug',
+    }},
+    {'attributes': {
+        'test_manifests': ['bar/test.ini'],
+        'test_platform': 'linux/opt',
+        'unittest_variant': 'fission',
+    }},
+    {'attributes': {
+        'e10s': False,
+        'test_manifests': ['bar/test.ini'],
+        'test_platform': 'linux/opt',
+    }},
+))
 
 
 def idfn(param):
     if isinstance(param, tuple):
         return param[0].__name__
     return None
 
 
-@pytest.mark.parametrize("strategy,expected", [
+@pytest.mark.parametrize("opt,tasks,arg,expected", [
+    # debug
     pytest.param(
-        'platform-debug',
+        SkipUnlessDebug(),
+        default_tasks,
+        None,
         ['task-1', 'task-2'],
     ),
+
+    # disperse with no supplied importance
+    pytest.param(
+        DisperseGroups(),
+        disperse_tasks,
+        None,
+        [t.label for t in disperse_tasks],
+    ),
+
+    # disperse with low importance
+    pytest.param(
+        DisperseGroups(),
+        disperse_tasks,
+        {'bar/test.ini': 'low'},
+        ['task-0', 'task-2'],
+    ),
+
+    # disperse with medium importance
+    pytest.param(
+        DisperseGroups(),
+        disperse_tasks,
+        {'bar/test.ini': 'medium'},
+        ['task-0', 'task-1', 'task-2'],
+    ),
+
+    # disperse with high importance
+    pytest.param(
+        DisperseGroups(),
+        disperse_tasks,
+        {'bar/test.ini': 'high'},
+        ['task-0', 'task-1', 'task-2', 'task-3'],
+    ),
 ], ids=idfn)
-def test_optimization_strategy(responses, params, tasks, strategy, expected):
-    opt = registry[strategy]
-    labels = [t.label for t in tasks if not opt.should_remove_task(t, params, None)]
+def test_optimization_strategy(responses, params, opt, tasks, arg, expected):
+    labels = [t.label for t in tasks if not opt.should_remove_task(t, params, arg)]
     assert sorted(labels) == sorted(expected)
 
 
 @pytest.mark.parametrize("args,data,expected", [
     # empty
     pytest.param(
         (0.1,),
         {},
@@ -139,71 +199,71 @@ def test_optimization_strategy(responses
             'tasks': {'task-2': 0.7, 'task-4': 0.7},
             'reduced_tasks': {'task-4': 0.7},
             'groups': {'foo/test.ini': 0.75, 'bar/test.ini': 0.25}
         },
         ['task-4'],
     ),
 
 ], ids=idfn)
-def test_bugbug_push_schedules(responses, params, tasks, args, data, expected):
+def test_bugbug_push_schedules(responses, params, args, data, expected):
     query = "/push/{branch}/{head_rev}/schedules".format(**params)
     url = BugBugPushSchedules.BUGBUG_BASE_URL + query
 
     responses.add(
         responses.GET,
         url,
         json=data,
         status=200,
     )
 
     opt = BugBugPushSchedules(*args)
-    labels = [t.label for t in tasks if not opt.should_remove_task(t, params, None)]
+    labels = [t.label for t in default_tasks if not opt.should_remove_task(t, params, {})]
     assert sorted(labels) == sorted(expected)
 
 
-def test_bugbug_timeout(monkeypatch, responses, params, tasks):
+def test_bugbug_timeout(monkeypatch, responses, params):
     query = "/push/{branch}/{head_rev}/schedules".format(**params)
     url = BugBugPushSchedules.BUGBUG_BASE_URL + query
     responses.add(
         responses.GET,
         url,
         json={"ready": False},
         status=202,
     )
 
     # Make sure the test runs fast.
     monkeypatch.setattr(time, 'sleep', lambda i: None)
 
     opt = BugBugPushSchedules(0.5)
     with pytest.raises(BugbugTimeoutException):
-        opt.should_remove_task(tasks[0], params, None)
+        opt.should_remove_task(default_tasks[0], params, None)
 
 
-def test_backstop(params, tasks):
-    all_labels = {t.label for t in tasks}
+def test_backstop(params):
+    all_labels = {t.label for t in default_tasks}
     opt = Backstop(10, 60)  # every 10th push or 1 hour
 
     # If there's no previous push date, run tasks
     params['pushlog_id'] = 8
-    scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
+    scheduled = {t.label for t in default_tasks if not opt.should_remove_task(t, params, None)}
     assert scheduled == all_labels
 
     # Only multiples of 10 schedule tasks. Pushdate from push 8 was cached.
     params['pushlog_id'] = 9
     params['pushdate'] += 3599
-    scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
+    scheduled = {t.label for t in default_tasks if not opt.should_remove_task(t, params, None)}
     assert scheduled == set()
 
     params['pushlog_id'] = 10
     params['pushdate'] += 1
-    scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
+    scheduled = {t.label for t in default_tasks if not opt.should_remove_task(t, params, None)}
     assert scheduled == all_labels
 
     # Tasks are also scheduled if an hour has passed.
     params['pushlog_id'] = 11
     params['pushdate'] += 3600
-    scheduled = {t.label for t in tasks if not opt.should_remove_task(t, params, None)}
+    scheduled = {t.label for t in default_tasks if not opt.should_remove_task(t, params, None)}
     assert scheduled == all_labels
 
 
 if __name__ == '__main__':
     main()
--- a/taskcluster/taskgraph/transforms/tests.py
+++ b/taskcluster/taskgraph/transforms/tests.py
@@ -1589,18 +1589,20 @@ def make_job_description(config, tests):
             jobdesc['optimization'] = test['optimization']
         # Pushes generated by `mach try auto` should use the non-try optimizations.
         elif config.params.is_try() and config.params['try_mode'] != 'try_auto':
             jobdesc['optimization'] = {'test-try': schedules}
         elif category in INCLUSIVE_COMPONENTS:
             jobdesc['optimization'] = {'test-inclusive': schedules}
         else:
             # First arg goes to 'skip-unless-schedules', second goes to the
-            # main test strategy.
-            jobdesc['optimization'] = {'test': (schedules, None)}
+            # main test strategy. Using an empty dict allows earlier
+            # substrategies (of a CompositeStrategy) to pass values by reference
+            # to later substrategies.
+            jobdesc['optimization'] = {'test': (schedules, {})}
 
         run = jobdesc['run'] = {}
         run['using'] = 'mozharness-test'
         run['test'] = test
 
         if 'workdir' in test:
             run['workdir'] = test.pop('workdir')
 
--- a/taskcluster/taskgraph/util/schema.py
+++ b/taskcluster/taskgraph/util/schema.py
@@ -209,17 +209,17 @@ OptimizationSchema = voluptuous.Any(
     {'push-interval-25': list(schedules.ALL_COMPONENTS)},
     # consult SETA and skip this task if it is low-value
     {'seta': None},
     # skip this task if none of the given file patterns match
     {'skip-unless-changed': [text_type]},
     # skip this task if unless the change files' SCHEDULES contains any of these components
     {'skip-unless-schedules': list(schedules.ALL_COMPONENTS)},
     # optimize strategy aliases for the test kind
-    {'test': (list(schedules.ALL_COMPONENTS), None)},
+    {'test': (list(schedules.ALL_COMPONENTS), dict)},
     {'test-inclusive': list(schedules.ALL_COMPONENTS)},
     {'test-try': list(schedules.ALL_COMPONENTS)},
 )
 
 # shortcut for a string where task references are allowed
 taskref_or_string = voluptuous.Any(
     text_type,
     {voluptuous.Required('task-reference'): text_type},