Bug 1281062 - Create Action Tasks to schedule new jobs. r=dustin
authorKalpesh Krishna <kalpeshk2011@gmail.com>
Mon, 11 Jul 2016 22:43:58 +0530
changeset 304887 d223b3cdee6645c6c062d3b53027a1ec29a76f20
parent 304886 11b0b0c476095b24faa650eb0dcb387dd1f61c3d
child 304888 556c8daaf008275ed6af30a2bd9b09f20181bcdf
push id30445
push usercbook@mozilla.com
push dateThu, 14 Jul 2016 09:43:22 +0000
treeherdermozilla-central@112cc226ba7c [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin
bugs1281062
milestone50.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1281062 - Create Action Tasks to schedule new jobs. r=dustin MozReview-Commit-ID: 5MvqLfGrlLC
taskcluster/docs/taskgraph.rst
taskcluster/mach_commands.py
taskcluster/taskgraph/action.py
taskcluster/taskgraph/action.yml
taskcluster/taskgraph/decision.py
taskcluster/taskgraph/optimize.py
taskcluster/taskgraph/test/test_optimize.py
--- a/taskcluster/docs/taskgraph.rst
+++ b/taskcluster/docs/taskgraph.rst
@@ -121,16 +121,36 @@ another, equivalent task, so it generate
 that to search for a matching, existing task.
 
 In some cases, such as try pushes, tasks in the target task set have been
 explicitly requested and are thus excluded from optimization. In other cases,
 the target task set is almost the entire task graph, so targetted tasks are
 considered for optimization.  This behavior is controlled with the
 ``optimize_target_tasks`` parameter.
 
+Action Tasks
+------------
+
+Action Tasks are tasks which help you to schedule new jobs via Treeherder's
+"Add New Jobs" feature. The Decision Task creates a YAML file named
+``action.yml`` which can be used to schedule Action Tasks after suitably replacing
+``{{decision_task_id}}`` and ``{{task_labels}}``, which correspond to the decision
+task ID of the push and a comma separated list of task labels which need to be
+scheduled.
+
+This task invokes ``mach taskgraph action-task`` which builds up a task graph of
+the requested tasks. This graph is optimized using the tasks running initially in
+the same push, due to the decision task.
+
+So for instance, if you had already requested a build task in the ``try`` command,
+and you wish to add a test which depends on this build, the original build task
+is re-used.
+
+This feature is only present on ``try`` pushes for now.
+
 Mach commands
 -------------
 
 A number of mach subcommands are available aside from ``mach taskgraph
 decision`` to make this complex system more accesssible to those trying to
 understand or modify it.  They allow you to run portions of the
 graph-generation process and output the results.
 
@@ -155,16 +175,21 @@ such a file on every run, and that is ge
 parameter file.  The parameter keys and values are described in
 :doc:`parameters`.
 
 Finally, the ``mach taskgraph decision`` subcommand performs the entire
 task-graph generation process, then creates the tasks.  This command should
 only be used within a decision task, as it assumes it is running in that
 context.
 
+The ``mach taskgraph action-task`` subcommand is used by Action Tasks to
+create a task graph of the requested jobs and its non-optimized dependencies.
+Action Tasks are currently scheduled by
+[pulse_actions](https://github.com/mozilla/pulse_actions)
+
 Taskgraph JSON Format
 ---------------------
 
 Task graphs -- both the graph artifacts produced by the decision task and those
 output by the ``--json`` option to the ``mach taskgraph`` commands -- are JSON
 objects, keyed by label, or for optimized task graphs, by taskId.  For
 convenience, the decision task also writes out ``label-to-taskid.json``
 containing a mapping from label to taskId.  Each task in the graph is
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -148,16 +148,40 @@ class MachCommands(MachCommandBase):
         import taskgraph.decision
         try:
             self.setup_logging()
             return taskgraph.decision.taskgraph_decision(options)
         except Exception:
             traceback.print_exc()
             sys.exit(1)
 
+    @SubCommand('taskgraph', 'action-task',
+                description="Run the action task")
+    @CommandArgument('--root', '-r',
+                     default='taskcluster/ci',
+                     help="root of the taskgraph definition relative to topsrcdir")
+    @CommandArgument('--decision-id',
+                     required=True,
+                     help="Decision Task ID of the reference decision task")
+    @CommandArgument('--task-labels',
+                     required=True,
+                     help='Comma separated list of task labels to be scheduled')
+    def taskgraph_action(self, **options):
+        """Run the action task: Generates a task graph using the set of labels
+        provided in the task-labels parameter. It uses the full-task file of
+        the gecko decision task."""
+
+        import taskgraph.action
+        try:
+            self.setup_logging()
+            return taskgraph.action.taskgraph_action(options)
+        except Exception:
+            traceback.print_exc()
+            sys.exit(1)
+
     def setup_logging(self, quiet=False, verbose=True):
         """
         Set up Python logging for all loggers, sending results to stderr (so
         that command output can be redirected easily) and adding the typical
         mach timestamp.
         """
         # remove the old terminal handler
         self.log_manager.replace_terminal_handler(None)
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/action.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import json
+import logging
+import requests
+
+from .create import create_tasks
+from .decision import write_artifact
+from .parameters import Parameters
+from .optimize import optimize_task_graph
+from .taskgraph import TaskGraph
+
+logger = logging.getLogger(__name__)
+TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task/"
+
+
+def taskgraph_action(options):
+    """
+    Run the action task.  This function implements `mach taskgraph action-task`,
+    and is responsible for
+
+     * creating taskgraph of tasks asked for in parameters with respect to
+     a given gecko decision task and schedule these jobs.
+    """
+
+    parameters = get_action_parameters(options)
+    decision_task_id = parameters['decision_id']
+    # read in the full graph for reference
+    full_task_json = get_artifact(decision_task_id, "public/full-task-graph.json")
+    all_tasks, full_task_graph = TaskGraph.from_json(full_task_json, options['root'])
+
+    target_tasks = set(parameters['task_labels'].split(','))
+    target_graph = full_task_graph.graph.transitive_closure(target_tasks)
+    target_task_graph = TaskGraph(
+        {l: all_tasks[l] for l in target_graph.nodes},
+        target_graph)
+
+    existing_tasks = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+    # We don't want to optimize target tasks since they have been requested by user
+    # Hence we put `target_tasks under` `do_not_optimize`
+    optimized_graph, label_to_taskid = optimize_task_graph(target_task_graph=target_task_graph,
+                                                           do_not_optimize=target_tasks,
+                                                           existing_tasks=existing_tasks)
+
+    # write out the optimized task graph to describe what will actually happen,
+    # and the map of labels to taskids
+    write_artifact('task-graph.json', optimized_graph.to_json())
+    write_artifact('label-to-taskid.json', label_to_taskid)
+    # actually create the graph
+    create_tasks(optimized_graph, label_to_taskid)
+
+
+def get_action_parameters(options):
+    """
+    Load parameters from the command-line options for 'taskgraph action'.
+    """
+    parameters = {n: options[n] for n in [
+        'decision_id',
+        'task_labels',
+    ] if n in options}
+
+    return Parameters(parameters)
+
+
+def get_artifact(task_id, path):
+    url = TASKCLUSTER_QUEUE_URL + task_id + "/artifacts/" + path
+    resp = requests.get(url=url)
+    artifact = json.loads(resp.text)
+    return artifact
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/action.yml
@@ -0,0 +1,72 @@
+---
+created: '{{now}}'
+deadline: '{{#from_now}}1 day{{/from_now}}'
+expires: '{{#from_now}}14 day{{/from_now}}'
+metadata:
+  owner: mozilla-taskcluster-maintenance@mozilla.com
+  source: 'https://hg.mozilla.org/{{project}}/file/{{head_rev}}/taskcluster/taskgraph/action.yml'
+  name: "[tc] Action Task"
+  description: Helps schedule new jobs without new push
+
+workerType: "gecko-decision"
+provisionerId: "aws-provisioner-v1"
+
+tags:
+  createdForUser: {{owner}}
+
+scopes:
+  # Bug 1269443: cache scopes, etc. must be listed explicitly
+  - "docker-worker:cache:level-1-*"
+  - "docker-worker:cache:tooltool-cache"
+  - "secrets:get:project/taskcluster/gecko/hgfingerprint"
+  - "assume:repo:hg.mozilla.org/try:*"
+
+routes:
+  - "tc-treeherder.v2.{{project}}.{{head_rev}}.{{pushlog_id}}"
+  - "tc-treeherder-stage.v2.{{project}}.{{head_rev}}.{{pushlog_id}}"
+
+payload:
+  env:
+    GECKO_BASE_REPOSITORY: 'https://hg.mozilla.org/mozilla-central'
+    GECKO_HEAD_REPOSITORY: '{{{head_repository}}}'
+    GECKO_HEAD_REF: '{{head_ref}}'
+    GECKO_HEAD_REV: '{{head_rev}}'
+
+  cache:
+    level-{{level}}-{{project}}-tc-vcs-public-sources: /home/worker/.tc-vcs/
+    level-{{level}}-{{project}}-gecko-decision: /home/worker/workspace
+
+  features:
+    taskclusterProxy: true
+
+  # Note: This task is built server side without the context or tooling that
+  # exist in tree so we must hard code the version
+  image: 'taskcluster/decision:0.1.0'
+
+  # Virtually no network or other potentially risky operations happen as part
+  # of the task timeout aside from the initial clone. We intentionally have
+  # set this to a lower value _all_ decision tasks should use a root
+  # repository which is cached.
+  maxRunTime: 1800
+
+  command:
+    - /bin/bash
+    - -cx
+    - >
+      mkdir -p /home/worker/artifacts &&
+      checkout-gecko workspace &&
+      cd workspace/gecko &&
+      ln -s /home/worker/artifacts artifacts &&
+      ./mach taskgraph action-task
+      --decision-id='{{decision_task_id}}'
+      --task-labels='{{task_labels}}'
+
+  artifacts:
+    'public':
+      type: 'directory'
+      path: '/home/worker/artifacts'
+      expires: '{{#from_now}}7 days{{/from_now}}'
+
+extra:
+  treeherder:
+    symbol: A
--- a/taskcluster/taskgraph/decision.py
+++ b/taskcluster/taskgraph/decision.py
@@ -11,21 +11,27 @@ import json
 import logging
 import yaml
 
 from .generator import TaskGraphGenerator
 from .create import create_tasks
 from .parameters import Parameters
 from .target_tasks import get_method
 
-logger = logging.getLogger(__name__)
-ARTIFACTS_DIR = 'artifacts'
+from taskgraph.util.templates import Templates
+from taskgraph.util.time import (
+    json_time_from_now,
+    current_json_time,
+)
 
 logger = logging.getLogger(__name__)
 
+ARTIFACTS_DIR = 'artifacts'
+GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
+
 # For each project, this gives a set of parameters specific to the project.
 # See `taskcluster/docs/parameters.rst` for information on parameters.
 PER_PROJECT_PARAMETERS = {
     'try': {
         'target_tasks_method': 'try_option_syntax',
         # for try, if a task was specified as a target, it should
         # not be optimized away
         'optimize_target_tasks': False,
@@ -59,16 +65,19 @@ def taskgraph_decision(options):
     tgg = TaskGraphGenerator(
         root_dir=options['root'],
         parameters=parameters,
         target_tasks_method=target_tasks_method)
 
     # write out the parameters used to generate this graph
     write_artifact('parameters.yml', dict(**parameters))
 
+    # write out the yml file for action tasks
+    write_artifact('action.yml', get_action_yml(parameters))
+
     # write out the full graph for reference
     write_artifact('full-task-graph.json', tgg.full_task_graph.to_json())
 
     # write out the target task set to allow reproducing this as input
     write_artifact('target-tasks.json', tgg.target_task_set.tasks.keys())
 
     # write out the optimized task graph to describe what will actually happen,
     # and the map of labels to taskids
@@ -118,8 +127,20 @@ def write_artifact(filename, data):
     if filename.endswith('.yml'):
         with open(path, 'w') as f:
             yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False)
     elif filename.endswith('.json'):
         with open(path, 'w') as f:
             json.dump(data, f, sort_keys=True, indent=2, separators=(',', ': '))
     else:
         raise TypeError("Don't know how to write to {}".format(filename))
+
+
+def get_action_yml(parameters):
+    templates = Templates(os.path.join(GECKO, "taskcluster/taskgraph"))
+    action_parameters = parameters.copy()
+    action_parameters.update({
+        "decision_task_id": "{{decision_task_id}}",
+        "task_labels": "{{task_labels}}",
+        "from_now": json_time_from_now,
+        "now": current_json_time()
+    })
+    return templates.load('action.yml', action_parameters)
--- a/taskcluster/taskgraph/optimize.py
+++ b/taskcluster/taskgraph/optimize.py
@@ -9,30 +9,34 @@ import re
 from .graph import Graph
 from .taskgraph import TaskGraph
 from slugid import nice as slugid
 
 logger = logging.getLogger(__name__)
 TASK_REFERENCE_PATTERN = re.compile('<([^>]+)>')
 
 
-def optimize_task_graph(target_task_graph, do_not_optimize):
+def optimize_task_graph(target_task_graph, do_not_optimize, existing_tasks=None):
     """
     Perform task optimization, without optimizing tasks named in
     do_not_optimize.
     """
     named_links_dict = target_task_graph.graph.named_links_dict()
     label_to_taskid = {}
 
     # This proceeds in two phases.  First, mark all optimized tasks (those
     # which will be removed from the graph) as such, including a replacement
     # taskId where applicable.  Second, generate a new task graph containing
     # only the non-optimized tasks, with all task labels resolved to taskIds
     # and with task['dependencies'] populated.
-    annotate_task_graph(target_task_graph, do_not_optimize, named_links_dict, label_to_taskid)
+    annotate_task_graph(target_task_graph=target_task_graph,
+                        do_not_optimize=do_not_optimize,
+                        named_links_dict=named_links_dict,
+                        label_to_taskid=label_to_taskid,
+                        existing_tasks=existing_tasks)
     return get_subgraph(target_task_graph, named_links_dict, label_to_taskid), label_to_taskid
 
 
 def resolve_task_references(label, task_def, taskid_for_edge_name):
     def repl(match):
         key = match.group(1)
         try:
             return taskid_for_edge_name[key]
@@ -50,17 +54,18 @@ def resolve_task_references(label, task_
                 return TASK_REFERENCE_PATTERN.sub(repl, val['task-reference'])
             else:
                 return {k: recurse(v) for k, v in val.iteritems()}
         else:
             return val
     return recurse(task_def)
 
 
-def annotate_task_graph(target_task_graph, do_not_optimize, named_links_dict, label_to_taskid):
+def annotate_task_graph(target_task_graph, do_not_optimize,
+                        named_links_dict, label_to_taskid, existing_tasks):
     """
     Annotate each task in the graph with .optimized (boolean) and .task_id
     (possibly None), following the rules for optimization and calling the task
     kinds' `optimize_task` method.
 
     As a side effect, label_to_taskid is updated with labels for all optimized
     tasks that are replaced with existing tasks.
     """
@@ -81,16 +86,20 @@ def annotate_task_graph(target_task_grap
 
         # if this task is blacklisted, don't even consider optimizing
         replacement_task_id = None
         if label in do_not_optimize:
             optimized = False
         # if any dependencies can't be optimized, this task can't, either
         elif any(not t.optimized for t in dependencies):
             optimized = False
+        # Let's check whether this task has been created before
+        elif existing_tasks is not None and label in existing_tasks:
+            optimized = True
+            replacement_task_id = existing_tasks[label]
         # otherwise, examine the task itself (which may be an expensive operation)
         else:
             optimized, replacement_task_id = task.optimize()
 
         task.optimized = optimized
         task.task_id = replacement_task_id
         if replacement_task_id:
             label_to_taskid[label] = replacement_task_id
--- a/taskcluster/taskgraph/test/test_optimize.py
+++ b/taskcluster/taskgraph/test/test_optimize.py
@@ -88,58 +88,58 @@ class TestOptimize(unittest.TestCase):
         OptimizingTask.optimize = lambda self: (False, None)
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             self.make_task('task3'),
             ('task2', 'task1', 'build'),
             ('task2', 'task3', 'image'),
         )
-        annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+        annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None),
             task3=(False, None)
         )
 
     def test_annotate_task_graph_taskid_without_optimize(self):
         "raises exception if kind returns a taskid without optimizing"
         OptimizingTask.optimize = lambda self: (False, 'some-taskid')
         graph = self.make_graph(self.make_task('task1'))
         self.assertRaises(
             Exception,
-            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         )
 
     def test_annotate_task_graph_optimize_away_dependency(self):
         "raises exception if kind optimizes away a task on which another depends"
         OptimizingTask.optimize = \
             lambda self: (True, None) if self.label == 'task1' else (False, None)
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             ('task2', 'task1', 'build'),
         )
         self.assertRaises(
             Exception,
-            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         )
 
     def test_annotate_task_graph_do_not_optimize(self):
         "annotating marks everything as un-optimized if in do_not_optimize"
         OptimizingTask.optimize = lambda self: (True, 'taskid')
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             ('task2', 'task1', 'build'),
         )
         label_to_taskid = {}
         annotate_task_graph(graph, {'task1', 'task2'},
-                            graph.graph.named_links_dict(), label_to_taskid)
+                            graph.graph.named_links_dict(), label_to_taskid, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None)
         )
         self.assertEqual
 
     def test_annotate_task_graph_nos_propagate(self):
@@ -149,17 +149,17 @@ class TestOptimize(unittest.TestCase):
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             self.make_task('task3'),
             ('task2', 'task1', 'build'),
             ('task2', 'task3', 'image'),
         )
         annotate_task_graph(graph, set(),
-                            graph.graph.named_links_dict(), {})
+                            graph.graph.named_links_dict(), {}, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None),  # kind would have returned (True, 'taskid') here
             task3=(True, 'taskid')
         )
 
     def test_get_subgraph_single_dep(self):