Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks r=dustin
authorBrian Stack <bstack@mozilla.com>
Wed, 20 Sep 2017 12:52:29 -0700
changeset 383502 112de4d9e9e67a7a10cb9d6dc8eb4d4bee6585e2
parent 383501 8547b76f8dfef0e55d8c8c3690fca1cfb19a798c
child 383503 e238860e4943cd1bb4c2c5f7175a36d494f0e91b
push id32594
push userkwierso@gmail.com
push dateThu, 28 Sep 2017 22:49:33 +0000
treeherdermozilla-central@6dea0ee45b66 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin
bugs1400223
milestone58.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks r=dustin MozReview-Commit-ID: 7ZTbS5h0vPA
taskcluster/docs/actions.rst
taskcluster/taskgraph/actions/add_new_jobs.py
taskcluster/taskgraph/actions/add_talos.py
taskcluster/taskgraph/actions/mochitest_retrigger.py
taskcluster/taskgraph/actions/registry.py
taskcluster/taskgraph/actions/retrigger.py
taskcluster/taskgraph/actions/run_missing_tests.py
taskcluster/taskgraph/actions/util.py
taskcluster/taskgraph/util/taskcluster.py
--- a/taskcluster/docs/actions.rst
+++ b/taskcluster/docs/actions.rst
@@ -35,17 +35,17 @@ a custom action task can be more efficie
 Creating a Callback Action
 --------------------------
 A *callback action* is an action that calls back into in-tree logic. That is,
 you register the action with name, title, description, context, input schema and a
 python callback. When the action is triggered in a user interface,
 input matching the schema is collected, passed to a new task which then calls
 your python callback, enabling it to do pretty much anything it wants to.
 
-To create a new action you must create a file
+To create a new callback action you must create a file
 ``taskcluster/taskgraph/actions/my-action.py``, that at minimum contains::
 
   from registry import register_callback_action
 
   @register_callback_action(
       name='hello',
       title='Say Hello',
       symbol='hw',  # Show the callback task in treeherder as 'hw'
@@ -53,16 +53,30 @@ To create a new action you must create a
       order=10000,  # Order in which it should appear relative to other actions
   )
   def hello_world_action(parameters, input, task_group_id, task_id, task):
       # parameters is an instance of taskgraph.parameters.Parameters
       # it carries decision task parameters from the original decision task.
       # input, task_id, and task should all be None
       print "Hello was triggered from taskGroupId: " + taskGroupId
 
+Callback actions are configured in-tree to generate 3 artifacts when they run.
+These artifacts are similar to the artifacts generated by decision tasks since
+callback actions are basically mini decision tasks. The artifacts are:
+
+``task-graph.json``:
+  The graph of all tasks created by the action task. Includes tasks
+  created to satisfy requirements.
+``to-run.json``:
+  The set of tasks that the action task requested to build. This does not
+  include the requirements.
+``label-to-taskid.json``:
+  This is the mapping from label to ``taskid`` for all tasks involved in
+  the task-graph. This includes dependencies.
+
 The example above defines an action that is available in the context-menu for
 the entire task-group (result-set or push in Treeherder terminology). To create
 an action that shows up in the context menu for a task we would specify the
 ``context`` parameter.
 
 
 Setting the Action Context
 ..........................
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -3,19 +3,17 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from .registry import register_callback_action
 
-from .util import (create_tasks, find_decision_task)
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import (create_tasks, fetch_graph_and_labels)
 
 
 @register_callback_action(
     name='add-new-jobs',
     title='Add new jobs',
     symbol='add-new',
     description="Add new jobs using task labels.",
     order=10000,
@@ -29,21 +27,17 @@ from taskgraph.taskgraph import TaskGrap
                 'items': {
                     'type': 'string'
                 }
             }
         }
     }
 )
 def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     to_run = []
     for elem in input['tasks']:
         if elem in full_task_graph.tasks:
             to_run.append(elem)
         else:
             raise Exception('{} was not found in the task-graph'.format(elem))
 
--- a/taskcluster/taskgraph/actions/add_talos.py
+++ b/taskcluster/taskgraph/actions/add_talos.py
@@ -4,19 +4,17 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import create_tasks, fetch_graph_and_labels
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='run-all-talos',
     title='Run All Talos Tests',
     symbol='raT',
@@ -34,21 +32,17 @@ logger = logging.getLogger(__name__)
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         },
         'additionalProperties': False
     },
 )
 def add_all_talos(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     times = input.get('times', 1)
     for i in xrange(times):
         to_run = [label
                   for label, entry
                   in full_task_graph.tasks.iteritems() if 'talos_try_name' in entry.attributes]
 
         create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -6,21 +6,19 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import logging
 
 from slugid import nice as slugid
 
-from .util import (find_decision_task, create_task_from_def)
+from .util import (create_task_from_def, fetch_graph_and_labels)
 from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
 from taskgraph.util.parameterization import resolve_task_references
-from taskgraph.taskgraph import TaskGraph
 
 TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='retrigger-mochitest-reftest-with-options',
@@ -77,21 +75,17 @@ logger = logging.getLogger(__name__)
                 'additionalProperties': {'type': 'string'}
             }
         },
         'additionalProperties': False,
         'required': ['path']
     }
 )
 def mochitest_retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     pre_task = full_task_graph.tasks[task['metadata']['name']]
 
     # fix up the task's dependencies, similar to how optimization would
     # have done in the decision
     dependencies = {name: label_to_taskid[label]
                     for name, label in pre_task.dependencies.iteritems()}
     new_task_definition = resolve_task_references(pre_task.label, pre_task.task, dependencies)
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -185,17 +185,17 @@ def register_callback_action(name, title
             repo_scope = 'assume:repo:{}/{}:*'.format(
                 match.group(1), match.group(2))
 
             task_group_id = os.environ.get('TASK_ID', slugid())
 
             return {
                 'created': {'$fromNow': ''},
                 'deadline': {'$fromNow': '12 hours'},
-                'expires': {'$fromNow': '14 days'},
+                'expires': {'$fromNow': '1 year'},
                 'metadata': {
                     'owner': 'mozilla-taskcluster-maintenance@mozilla.com',
                     'source': '{}raw-file/{}/{}'.format(
                         parameters['head_repository'], parameters['head_rev'], source_path,
                     ),
                     'name': 'Action: {}'.format(title),
                     'description': 'Task executing callback for action.\n\n---\n' + description,
                 },
@@ -210,31 +210,40 @@ def register_callback_action(name, title
                     'createdForUser': parameters['owner'],
                     'kind': 'action-callback',
                 },
                 'routes': [
                     'tc-treeherder.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
                     'tc-treeherder-stage.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
+                    'index.gecko.v2.{}.pushlog-id.{}.actions.${{ownTaskId}}'.format(
+                        parameters['project'], parameters['pushlog_id'])
                 ],
                 'payload': {
                     'env': {
                         'GECKO_BASE_REPOSITORY': 'https://hg.mozilla.org/mozilla-unified',
                         'GECKO_HEAD_REPOSITORY': parameters['head_repository'],
                         'GECKO_HEAD_REF': parameters['head_ref'],
                         'GECKO_HEAD_REV': parameters['head_rev'],
                         'HG_STORE_PATH': '/builds/worker/checkouts/hg-store',
                         'ACTION_TASK_GROUP_ID': task_group_id,
                         'ACTION_TASK_ID': {'$json': {'$eval': 'taskId'}},
                         'ACTION_TASK': {'$json': {'$eval': 'task'}},
                         'ACTION_INPUT': {'$json': {'$eval': 'input'}},
                         'ACTION_CALLBACK': cb.__name__,
                         'ACTION_PARAMETERS': {'$json': {'$eval': 'parameters'}},
                     },
+                    'artifacts': {
+                        'public': {
+                            'type': 'directory',
+                            'path': '/builds/worker/artifacts',
+                            'expires': {'$fromNow': '1 year'},
+                        },
+                    },
                     'cache': {
                         'level-{}-checkouts'.format(parameters['level']):
                             '/builds/worker/checkouts',
                     },
                     'features': {
                         'taskclusterProxy': True,
                         'chainOfTrust': True,
                     },
--- a/taskcluster/taskgraph/actions/retrigger.py
+++ b/taskcluster/taskgraph/actions/retrigger.py
@@ -5,21 +5,19 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .util import (
     create_tasks,
-    find_decision_task
+    fetch_graph_and_labels
 )
 from .registry import register_callback_action
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     title='Retrigger',
     name='retrigger',
     symbol='rt',
@@ -46,21 +44,17 @@ logger = logging.getLogger(__name__)
                 'maximum': 6,
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         }
     }
 )
 def retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     label = task['metadata']['name']
     with_downstream = ' '
     to_run = [label]
 
     if input.get('downstream'):
         to_run = full_task_graph.graph.transitive_closure(set(to_run), reverse=True).nodes
         to_run = to_run & set(label_to_taskid.keys())
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -4,19 +4,18 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 from .registry import register_callback_action
-from .util import create_tasks, find_decision_task
+from .util import create_tasks, fetch_graph_and_labels
 from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     name='run-missing-tests',
     title='Run Missing Tests',
     symbol='rmt',
@@ -25,22 +24,18 @@ logger = logging.getLogger(__name__)
         "\n"
         "This action is for use on pushes that will be merged into another branch,"
         "to check that optimization hasn't hidden any failures."
     ),
     order=100,  # Useful for sheriffs, but not top of the list
     context=[],  # Applies to decision task
 )
 def run_missing_tests(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
     target_tasks = get_artifact(decision_task_id, "public/target-tasks.json")
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
 
     # The idea here is to schedule all tasks of the `test` kind that were
     # targetted but did not appear in the final task-graph -- those were the
     # optimized tasks.
     to_run = []
     already_run = 0
     for label in target_tasks:
         task = full_task_graph.tasks[label]
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -1,30 +1,61 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import logging
+
+from requests.exceptions import HTTPError
+
 from taskgraph import create
+from taskgraph.decision import write_artifact
 from taskgraph.taskgraph import TaskGraph
 from taskgraph.optimize import optimize_task_graph
-from taskgraph.util.taskcluster import get_session, find_task_id
+from taskgraph.util.taskcluster import get_session, find_task_id, get_artifact, list_tasks
+
+logger = logging.getLogger(__name__)
 
 
 def find_decision_task(parameters):
     """Given the parameters for this action, find the taskId of the decision
     task"""
     return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
         parameters['project'],
         parameters['pushlog_id']))
 
 
+def fetch_graph_and_labels(parameters):
+    decision_task_id = find_decision_task(parameters)
+
+    # First grab the graph and labels generated during the initial decision task
+    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+    # Now fetch any modifications made by action tasks and swap out new tasks
+    # for old ones
+    namespace = 'gecko.v2.{}.pushlog-id.{}.actions'.format(
+        parameters['project'],
+        parameters['pushlog_id'])
+    for action in list_tasks(namespace):
+        try:
+            run_label_to_id = get_artifact(action, "public/label-to-taskid.json")
+            label_to_taskid.update(run_label_to_id)
+        except HTTPError as e:
+            logger.info('Skipping {} due to missing artifact! Error: {}'.format(action, e))
+            continue
+
+    return (decision_task_id, full_task_graph, label_to_taskid)
+
+
 def create_task_from_def(task_id, task_def, level):
     """Create a new task from a definition rather than from a label
     that is already in the full-task-graph. The task definition will
     have {relative-datestamp': '..'} rendered just like in a decision task.
     Use this for entirely new tasks or ones that change internals of the task.
     It is useful if you want to "edit" the full_task_graph and then hand
     it to this function. No dependencies will be scheduled. You must handle
     this yourself. Seeing how create_tasks handles it might prove helpful."""
@@ -44,9 +75,12 @@ def create_tasks(to_run, full_task_graph
     target_graph = full_task_graph.graph.transitive_closure(to_run)
     target_task_graph = TaskGraph(
         {l: full_task_graph[l] for l in target_graph.nodes},
         target_graph)
     optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
                                                                 params,
                                                                 to_run,
                                                                 label_to_taskid)
+    write_artifact('task-graph.json', optimized_task_graph.to_json())
+    write_artifact('label-to-taskid.json', label_to_taskid)
+    write_artifact('to-run.json', list(to_run))
     create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -1,16 +1,17 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import datetime
 import functools
 import yaml
 import requests
 from mozbuild.util import memoize
 from requests.packages.urllib3.util.retry import Retry
 from requests.adapters import HTTPAdapter
 
 _TC_ARTIFACT_LOCATION = \
@@ -22,19 +23,22 @@ def get_session():
     session = requests.Session()
     retry = Retry(total=5, backoff_factor=0.1,
                   status_forcelist=[500, 502, 503, 504])
     session.mount('http://', HTTPAdapter(max_retries=retry))
     session.mount('https://', HTTPAdapter(max_retries=retry))
     return session
 
 
-def _do_request(url):
+def _do_request(url, content=None):
     session = get_session()
-    response = session.get(url, stream=True)
+    if content is None:
+        response = session.get(url, stream=True)
+    else:
+        response = session.post(url, json=content)
     if response.status_code >= 400:
         # Consume content before raise_for_status, so that the connection can be
         # reused.
         response.content
     response.raise_for_status()
     return response
 
 
@@ -69,22 +73,22 @@ def get_artifact(task_id, path, use_prox
     return _handle_artifact(path, response)
 
 
 def list_artifacts(task_id, use_proxy=False):
     response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
     return response.json()['artifacts']
 
 
-def get_index_url(index_path, use_proxy=False):
+def get_index_url(index_path, use_proxy=False, multiple=False):
     if use_proxy:
-        INDEX_URL = 'http://taskcluster/index/v1/task/{}'
+        INDEX_URL = 'http://taskcluster/index/v1/task{}/{}'
     else:
-        INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
-    return INDEX_URL.format(index_path)
+        INDEX_URL = 'https://index.taskcluster.net/v1/task{}/{}'
+    return INDEX_URL.format('s' if multiple else '', index_path)
 
 
 def find_task_id(index_path, use_proxy=False):
     try:
         response = _do_request(get_index_url(index_path, use_proxy))
     except requests.exceptions.HTTPError as e:
         if e.response.status_code == 404:
             raise KeyError("index path {} not found".format(index_path))
@@ -93,16 +97,40 @@ def find_task_id(index_path, use_proxy=F
 
 
 def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
     full_path = index_path + '/artifacts/' + artifact_path
     response = _do_request(get_index_url(full_path, use_proxy))
     return _handle_artifact(full_path, response)
 
 
+def list_tasks(index_path, use_proxy=False):
+    """
+    Returns a list of task_ids where each task_id is indexed under a path
+    in the index. Results are sorted by expiration date from oldest to newest.
+    """
+    results = []
+    data = {}
+    while True:
+        response = _do_request(get_index_url(index_path, use_proxy, multiple=True), data)
+        response = response.json()
+        results += response['tasks']
+        if response.get('continuationToken'):
+            data = {'continuationToken': response.get('continuationToken')}
+        else:
+            break
+
+    # We can sort on expires because in the general case
+    # all of these tasks should be created with the same expires time so they end up in
+    # order from earliest to latest action. If more correctness is needed, consider
+    # fetching each task and sorting on the created date.
+    results.sort(key=lambda t: datetime.datetime.strptime(t['expires'], '%Y-%m-%dT%H:%M:%S.%fZ'))
+    return [t['taskId'] for t in results]
+
+
 def get_task_url(task_id, use_proxy=False):
     if use_proxy:
         TASK_URL = 'http://taskcluster/queue/v1/task/{}'
     else:
         TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
     return TASK_URL.format(task_id)