Bug 1281062 - Create Action Tasks to schedule new jobs. r=dustin draft
authorKalpesh Krishna <kalpeshk2011@gmail.com>
Sun, 10 Jul 2016 03:30:52 +0530
changeset 386025 5dcd5a31542f861bd02e68ee4f2dcc19e3cbd87f
parent 385906 5ea06a1578db28ff2d614d660e6d203b42c3b306
child 525015 ae604c80799ab54f2ce4f32e89f7aad101bd2b03
push id22596
push userbmo:kalpeshk2011@gmail.com
push dateSat, 09 Jul 2016 22:02:11 +0000
reviewersdustin
bugs1281062
milestone50.0a1
Bug 1281062 - Create Action Tasks to schedule new jobs. r=dustin MozReview-Commit-ID: 6XpMZwvHlcS
taskcluster/docs/taskgraph.rst
taskcluster/mach_commands.py
taskcluster/taskgraph/action.py
taskcluster/taskgraph/action.yml
taskcluster/taskgraph/decision.py
taskcluster/taskgraph/optimize.py
taskcluster/taskgraph/taskgraph.py
taskcluster/taskgraph/test/test_decision.py
taskcluster/taskgraph/test/test_optimize.py
taskcluster/taskgraph/test/test_taskgraph.py
--- a/taskcluster/docs/taskgraph.rst
+++ b/taskcluster/docs/taskgraph.rst
@@ -183,16 +183,20 @@ Each task has the following properties:
 
 ``dependencies``
    The task's in-graph dependencies, represented as an object mapping
    dependency name to label (or to taskId for optimized task graphs)
 
 ``task``
    The task's TaskCluster task definition.
 
+``kind_implementation``
+   The module and the class name which was used to implement this particular task.
+   It is always of the form ``<module-path>:<object-path>``
+
 The task definition may contain "relative datestamps" of the form
 ``{"relative-datestamp": "certain number of seconds/hours/days/years"}``.
 These will be replaced in the last step, while creating tasks.
 The UTC timestamp at that moment is noted, and all the relative datestamps
 are replaced with respect to this timestamp.
 
 The task definition may contain "task references" of the form
 ``{"task-reference": "string containing <task-label>"}``.  These will be
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -148,16 +148,40 @@ class MachCommands(MachCommandBase):
         import taskgraph.decision
         try:
             self.setup_logging()
             return taskgraph.decision.taskgraph_decision(options)
         except Exception:
             traceback.print_exc()
             sys.exit(1)
 
+    @SubCommand('taskgraph', 'action-task',
+                description="Run the action task")
+    @CommandArgument('--root', '-r',
+                     default='taskcluster/ci',
+                     help="root of the taskgraph definition relative to topsrcdir")
+    @CommandArgument('--decision-id',
+                     required=True,
+                     help="Decision Task ID of the reference decision task")
+    @CommandArgument('--task-labels',
+                     required=True,
+                     help='Comma separated list of task labels to be scheduled')
+    def taskgraph_action(self, **options):
+        """Run the action task: Generates a task graph using the set of labels
+        provided in the task-labels parameter. It uses the full-task file of
+        the gecko decision task."""
+
+        import taskgraph.action
+        try:
+            self.setup_logging()
+            return taskgraph.action.taskgraph_action(options)
+        except Exception:
+            traceback.print_exc()
+            sys.exit(1)
+
     def setup_logging(self, quiet=False, verbose=True):
         """
         Set up Python logging for all loggers, sending results to stderr (so
         that command output can be redirected easily) and adding the typical
         mach timestamp.
         """
         # remove the old terminal handler
         self.log_manager.replace_terminal_handler(None)
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/action.py
@@ -0,0 +1,76 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import json
+import logging
+import requests
+
+from .create import create_tasks
+from .decision import write_artifact
+from .parameters import Parameters
+from .optimize import optimize_task_graph
+from .taskgraph import TaskGraph
+
+logger = logging.getLogger(__name__)
+TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task/"
+
+
+def taskgraph_action(options):
+    """
+    Run the action task.  This function implements `mach taskgraph action-task`,
+    and is responsible for
+
+     * creating taskgraph of tasks asked for in parameters with respect to
+     a given gecko decision task and schedule these jobs.
+    """
+
+    parameters = get_action_parameters(options)
+    decision_task_id = parameters['decision_id']
+    # read in the full graph for reference
+    full_task_json = get_artifact(decision_task_id, "public/full-task-graph.json")
+    all_tasks, full_task_graph = TaskGraph.from_json(full_task_json, options['root'])
+
+    target_tasks = set(parameters['task_labels'].split(','))
+    target_graph = full_task_graph.graph.transitive_closure(target_tasks)
+    target_task_graph = TaskGraph(
+        {l: all_tasks[l] for l in target_graph.nodes},
+        target_graph)
+
+    existing_tasks = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+    # We don't want to optimize target tasks since they have been requested by user
+    # Hence we put `target_tasks under` `do_not_optimize`
+    optimized_graph, label_to_taskid = optimize_task_graph(target_task_graph=target_task_graph,
+                                                           do_not_optimize=target_tasks,
+                                                           existing_tasks=existing_tasks)
+
+    # write out the optimized task graph to describe what will actually happen,
+    # and the map of labels to taskids
+    write_artifact('task-graph.json', optimized_graph.to_json())
+    write_artifact('label-to-taskid.json', label_to_taskid)
+    # actually create the graph
+    create_tasks(optimized_graph, label_to_taskid)
+
+
+def get_action_parameters(options):
+    """
+    Load parameters from the command-line options for 'taskgraph action'.
+    """
+    parameters = {n: options[n] for n in [
+        'decision_id',
+        'task_labels',
+    ] if n in options}
+
+    return Parameters(parameters)
+
+
+def get_artifact(task_id, path):
+    url = TASKCLUSTER_QUEUE_URL + task_id + "/artifacts/" + path
+    resp = requests.get(url=url)
+    artifact = json.loads(resp.text)
+    return artifact
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/action.yml
@@ -0,0 +1,72 @@
+---
+created: '{{now}}'
+deadline: '{{#from_now}}1 day{{/from_now}}'
+expires: '{{#from_now}}14 day{{/from_now}}'
+metadata:
+  owner: mozilla-taskcluster-maintenance@mozilla.com
+  source: 'https://hg.mozilla.org/{{project}}/file/{{head_rev}}/taskcluster/taskgraph/action.yml'
+  name: "[tc] Action Task"
+  description: Helps schedule new jobs without new push
+
+workerType: "gecko-decision"
+provisionerId: "aws-provisioner-v1"
+
+tags:
+  createdForUser: {{owner}}
+
+scopes:
+  # Bug 1269443: cache scopes, etc. must be listed explicitly
+  - "docker-worker:cache:level-1-*"
+  - "docker-worker:cache:tooltool-cache"
+  - "secrets:get:project/taskcluster/gecko/hgfingerprint"
+  - "assume:repo:hg.mozilla.org/try:*"
+
+routes:
+  - "tc-treeherder.v2.{{project}}.{{head_rev}}.{{pushlog_id}}"
+  - "tc-treeherder-stage.v2.{{project}}.{{head_rev}}.{{pushlog_id}}"
+
+payload:
+  env:
+    GECKO_BASE_REPOSITORY: 'https://hg.mozilla.org/mozilla-central'
+    GECKO_HEAD_REPOSITORY: '{{{head_repository}}}'
+    GECKO_HEAD_REF: '{{head_ref}}'
+    GECKO_HEAD_REV: '{{head_rev}}'
+
+  cache:
+    level-{{level}}-{{project}}-tc-vcs-public-sources: /home/worker/.tc-vcs/
+    level-{{level}}-{{project}}-gecko-decision: /home/worker/workspace
+
+  features:
+    taskclusterProxy: true
+
+  # Note: This task is built server side without the context or tooling that
+  # exist in tree so we must hard code the version
+  image: 'taskcluster/decision:0.1.0'
+
+  # Virtually no network or other potentially risky operations happen as part
+  # of the task timeout aside from the initial clone. We intentionally have
+  # set this to a lower value _all_ decision tasks should use a root
+  # repository which is cached.
+  maxRunTime: 1800
+
+  command:
+    - /bin/bash
+    - -cx
+    - >
+      mkdir -p /home/worker/artifacts &&
+      checkout-gecko workspace &&
+      cd workspace/gecko &&
+      ln -s /home/worker/artifacts artifacts &&
+      ./mach taskgraph action-task
+      --decision-id='{{decision_task_id}}'
+      --task-labels='{{task_labels}}'
+
+  artifacts:
+    'public':
+      type: 'directory'
+      path: '/home/worker/artifacts'
+      expires: '{{#from_now}}7 days{{/from_now}}'
+
+extra:
+  treeherder:
+    symbol: A
--- a/taskcluster/taskgraph/decision.py
+++ b/taskcluster/taskgraph/decision.py
@@ -11,21 +11,27 @@ import json
 import logging
 import yaml
 
 from .generator import TaskGraphGenerator
 from .create import create_tasks
 from .parameters import Parameters
 from .target_tasks import get_method
 
-logger = logging.getLogger(__name__)
-ARTIFACTS_DIR = 'artifacts'
+from taskgraph.util.templates import Templates
+from taskgraph.util.time import (
+    json_time_from_now,
+    current_json_time,
+)
 
 logger = logging.getLogger(__name__)
 
+ARTIFACTS_DIR = 'artifacts'
+GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
+
 # For each project, this gives a set of parameters specific to the project.
 # See `taskcluster/docs/parameters.rst` for information on parameters.
 PER_PROJECT_PARAMETERS = {
     'try': {
         'target_tasks_method': 'try_option_syntax',
         # for try, if a task was specified as a target, it should
         # not be optimized away
         'optimize_target_tasks': False,
@@ -59,16 +65,19 @@ def taskgraph_decision(options):
     tgg = TaskGraphGenerator(
         root_dir=options['root'],
         parameters=parameters,
         target_tasks_method=target_tasks_method)
 
     # write out the parameters used to generate this graph
     write_artifact('parameters.yml', dict(**parameters))
 
+    # write out the yml file for action tasks
+    write_artifact('action.yml', get_action_yml(parameters))
+
     # write out the full graph for reference
     write_artifact('full-task-graph.json', tgg.full_task_graph.to_json())
 
     # write out the target task set to allow reproducing this as input
     write_artifact('target-tasks.json', tgg.target_task_set.tasks.keys())
 
     # write out the optimized task graph to describe what will actually happen,
     # and the map of labels to taskids
@@ -118,8 +127,20 @@ def write_artifact(filename, data):
     if filename.endswith('.yml'):
         with open(path, 'w') as f:
             yaml.safe_dump(data, f, allow_unicode=True, default_flow_style=False)
     elif filename.endswith('.json'):
         with open(path, 'w') as f:
             json.dump(data, f, sort_keys=True, indent=2, separators=(',', ': '))
     else:
         raise TypeError("Don't know how to write to {}".format(filename))
+
+
+def get_action_yml(parameters):
+    templates = Templates(os.path.join(GECKO, "taskcluster/taskgraph"))
+    action_parameters = parameters.copy()
+    action_parameters.update({
+        "decision_task_id": "{{decision_task_id}}",
+        "task_labels": "{{task_labels}}",
+        "from_now": json_time_from_now,
+        "now": current_json_time()
+    })
+    return templates.load('action.yml', action_parameters)
--- a/taskcluster/taskgraph/optimize.py
+++ b/taskcluster/taskgraph/optimize.py
@@ -9,30 +9,34 @@ import re
 from .graph import Graph
 from .taskgraph import TaskGraph
 from slugid import nice as slugid
 
 logger = logging.getLogger(__name__)
 TASK_REFERENCE_PATTERN = re.compile('<([^>]+)>')
 
 
-def optimize_task_graph(target_task_graph, do_not_optimize):
+def optimize_task_graph(target_task_graph, do_not_optimize, existing_tasks=None):
     """
     Perform task optimization, without optimizing tasks named in
     do_not_optimize.
     """
     named_links_dict = target_task_graph.graph.named_links_dict()
     label_to_taskid = {}
 
     # This proceeds in two phases.  First, mark all optimized tasks (those
     # which will be removed from the graph) as such, including a replacement
     # taskId where applicable.  Second, generate a new task graph containing
     # only the non-optimized tasks, with all task labels resolved to taskIds
     # and with task['dependencies'] populated.
-    annotate_task_graph(target_task_graph, do_not_optimize, named_links_dict, label_to_taskid)
+    annotate_task_graph(target_task_graph=target_task_graph,
+                        do_not_optimize=do_not_optimize,
+                        named_links_dict=named_links_dict,
+                        label_to_taskid=label_to_taskid,
+                        existing_tasks=existing_tasks)
     return get_subgraph(target_task_graph, named_links_dict, label_to_taskid), label_to_taskid
 
 
 def resolve_task_references(label, task_def, taskid_for_edge_name):
     def repl(match):
         key = match.group(1)
         try:
             return taskid_for_edge_name[key]
@@ -50,17 +54,18 @@ def resolve_task_references(label, task_
                 return TASK_REFERENCE_PATTERN.sub(repl, val['task-reference'])
             else:
                 return {k: recurse(v) for k, v in val.iteritems()}
         else:
             return val
     return recurse(task_def)
 
 
-def annotate_task_graph(target_task_graph, do_not_optimize, named_links_dict, label_to_taskid):
+def annotate_task_graph(target_task_graph, do_not_optimize,
+                        named_links_dict, label_to_taskid, existing_tasks):
     """
     Annotate each task in the graph with .optimized (boolean) and .task_id
     (possibly None), following the rules for optimization and calling the task
     kinds' `optimize_task` method.
 
     As a side effect, label_to_taskid is updated with labels for all optimized
     tasks that are replaced with existing tasks.
     """
@@ -81,16 +86,20 @@ def annotate_task_graph(target_task_grap
 
         # if this task is blacklisted, don't even consider optimizing
         replacement_task_id = None
         if label in do_not_optimize:
             optimized = False
         # if any dependencies can't be optimized, this task can't, either
         elif any(not t.optimized for t in dependencies):
             optimized = False
+        # Let's check whether this task has been created before
+        elif existing_tasks is not None and label in existing_tasks:
+            optimized = True
+            replacement_task_id = existing_tasks[label]
         # otherwise, examine the task itself (which may be an expensive operation)
         else:
             optimized, replacement_task_id = task.optimize()
 
         task.optimized = optimized
         task.task_id = replacement_task_id
         if replacement_task_id:
             label_to_taskid[label] = replacement_task_id
--- a/taskcluster/taskgraph/taskgraph.py
+++ b/taskcluster/taskgraph/taskgraph.py
@@ -1,16 +1,15 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import os
-import yaml
 
 from .graph import Graph
 from .util.python_path import find_object
 
 TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task/"
 GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
 
 
@@ -29,21 +28,23 @@ class TaskGraph(object):
 
     def to_json(self):
         "Return a JSON-able object representing the task graph, as documented"
         named_links_dict = self.graph.named_links_dict()
         # this dictionary may be keyed by label or by taskid, so let's just call it 'key'
         tasks = {}
         for key in self.graph.visit_postorder():
             task = self.tasks[key]
+            implementation = task.__class__.__module__ + ":" + task.__class__.__name__
             task_json = {
                 'label': task.label,
                 'attributes': task.attributes,
                 'dependencies': named_links_dict.get(key, {}),
-                'task': task.task
+                'task': task.task,
+                'kind_implementation': implementation
             }
             if task.task_id:
                 task_json['task_id'] = task.task_id
             tasks[key] = task_json
         return tasks
 
     def __getitem__(self, label):
         "Get a task by label"
@@ -63,21 +64,18 @@ class TaskGraph(object):
     def from_json(cls, tasks_dict, root):
         """
         This code is used to generate the a TaskGraph using a dictionary
         which is representative of the TaskGraph.
         """
         tasks = {}
         edges = set()
         for key, value in tasks_dict.iteritems():
-            # We try to find the task implementation using the kind
-            kind = value['attributes']['kind']
-            file_path = os.path.join(GECKO, root, kind, "kind.yml")
-            with open(file_path, 'r') as f:
-                implementation = yaml.load(f)['implementation']
+            # We get the implementation from JSON
+            implementation = value['kind_implementation']
             # Loading the module and creating a Task from a dictionary
             task_kind = find_object(implementation)
             tasks[key] = task_kind.from_json(value)
             if 'task_id' in value:
                 tasks[key].task_id = value['task_id']
             for depname, dep in value['dependencies'].iteritems():
                 edges.add((key, dep, depname))
         task_graph = cls(tasks, Graph(set(tasks), edges))
--- a/taskcluster/taskgraph/test/test_decision.py
+++ b/taskcluster/taskgraph/test/test_decision.py
@@ -24,29 +24,30 @@ class TestDecision(unittest.TestCase):
         tasks = {
             'a': TestTask(label='a', attributes={'attr': 'a-task'}),
             'b': TestTask(label='b', task={'task': 'def'}),
         }
         graph = Graph(nodes=set('ab'), edges={('a', 'b', 'edgelabel')})
         taskgraph = TaskGraph(tasks, graph)
 
         res = taskgraph.to_json()
-
         self.assertEqual(res, {
             'a': {
                 'label': 'a',
                 'attributes': {'attr': 'a-task', 'kind': 'test'},
                 'task': {},
                 'dependencies': {'edgelabel': 'b'},
+                'kind_implementation': 'taskgraph.test.util:TestTask',
             },
             'b': {
                 'label': 'b',
                 'attributes': {'kind': 'test'},
                 'task': {'task': 'def'},
                 'dependencies': {},
+                'kind_implementation': 'taskgraph.test.util:TestTask',
             }
         })
 
     def test_write_artifact_json(self):
         data = [{'some': 'data'}]
         tmpdir = tempfile.mkdtemp()
         try:
             decision.ARTIFACTS_DIR = os.path.join(tmpdir, "artifacts")
--- a/taskcluster/taskgraph/test/test_optimize.py
+++ b/taskcluster/taskgraph/test/test_optimize.py
@@ -88,58 +88,58 @@ class TestOptimize(unittest.TestCase):
         OptimizingTask.optimize = lambda self: (False, None)
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             self.make_task('task3'),
             ('task2', 'task1', 'build'),
             ('task2', 'task3', 'image'),
         )
-        annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+        annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None),
             task3=(False, None)
         )
 
     def test_annotate_task_graph_taskid_without_optimize(self):
         "raises exception if kind returns a taskid without optimizing"
         OptimizingTask.optimize = lambda self: (False, 'some-taskid')
         graph = self.make_graph(self.make_task('task1'))
         self.assertRaises(
             Exception,
-            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         )
 
     def test_annotate_task_graph_optimize_away_dependency(self):
         "raises exception if kind optimizes away a task on which another depends"
         OptimizingTask.optimize = \
             lambda self: (True, None) if self.label == 'task1' else (False, None)
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             ('task2', 'task1', 'build'),
         )
         self.assertRaises(
             Exception,
-            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {})
+            lambda: annotate_task_graph(graph, set(), graph.graph.named_links_dict(), {}, None)
         )
 
     def test_annotate_task_graph_do_not_optimize(self):
         "annotating marks everything as un-optimized if in do_not_optimize"
         OptimizingTask.optimize = lambda self: (True, 'taskid')
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             ('task2', 'task1', 'build'),
         )
         label_to_taskid = {}
         annotate_task_graph(graph, {'task1', 'task2'},
-                            graph.graph.named_links_dict(), label_to_taskid)
+                            graph.graph.named_links_dict(), label_to_taskid, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None)
         )
         self.assertEqual
 
     def test_annotate_task_graph_nos_propagate(self):
@@ -149,17 +149,17 @@ class TestOptimize(unittest.TestCase):
         graph = self.make_graph(
             self.make_task('task1'),
             self.make_task('task2'),
             self.make_task('task3'),
             ('task2', 'task1', 'build'),
             ('task2', 'task3', 'image'),
         )
         annotate_task_graph(graph, set(),
-                            graph.graph.named_links_dict(), {})
+                            graph.graph.named_links_dict(), {}, None)
         self.assert_annotations(
             graph,
             task1=(False, None),
             task2=(False, None),  # kind would have returned (True, 'taskid') here
             task3=(True, 'taskid')
         )
 
     def test_get_subgraph_single_dep(self):
--- a/taskcluster/taskgraph/test/test_taskgraph.py
+++ b/taskcluster/taskgraph/test/test_taskgraph.py
@@ -15,17 +15,18 @@ from mozunit import main
 
 class TestTargetTasks(unittest.TestCase):
 
     def test_from_json(self):
         legacy_dict = {
             'attributes': {'kind': 'legacy'},
             'task': {},
             'dependencies': {},
-            'label': 'a'
+            'label': 'a',
+            'kind_implementation': 'taskgraph.kind.legacy:LegacyTask'
         }
         graph = TaskGraph(tasks={
             'a': LegacyTask(kind='legacy',
                             label='a',
                             attributes={},
                             task={},
                             task_dict=legacy_dict),
             'b': DockerImageTask(kind='docker-image',