Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks draft
authorBrian Stack <bstack@mozilla.com>
Wed, 20 Sep 2017 12:52:29 -0700
changeset 667819 42e7c05cdf35b1d603088de9bc7336a8ffbf1dd1
parent 667790 469eb992a9d166004f2601ce725786f671219054
child 732522 359278cb8c2da385cbf80af357b09c82560001c2
push id80858
push userbstack@mozilla.com
push dateWed, 20 Sep 2017 19:53:01 +0000
bugs1400223
milestone57.0a1
Bug 1400223 - Merge tasks added by action tasks into graphs used for subsequent tasks MozReview-Commit-ID: 7ZTbS5h0vPA
taskcluster/taskgraph/actions/add_new_jobs.py
taskcluster/taskgraph/actions/add_talos.py
taskcluster/taskgraph/actions/mochitest_retrigger.py
taskcluster/taskgraph/actions/registry.py
taskcluster/taskgraph/actions/retrigger.py
taskcluster/taskgraph/actions/run_missing_tests.py
taskcluster/taskgraph/actions/util.py
taskcluster/taskgraph/util/taskcluster.py
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -3,19 +3,17 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from .registry import register_callback_action
 
-from .util import (create_tasks, find_decision_task)
-from taskgraph.util.taskcluster import get_artifact
-from taskgraph.taskgraph import TaskGraph
+from .util import (create_tasks, fetch_graph_and_labels)
 
 
 @register_callback_action(
     name='add-new-jobs',
     title='Add new jobs',
     symbol='add-new',
     description="Add new jobs using task labels.",
     order=10000,
@@ -29,21 +27,17 @@ from taskgraph.taskgraph import TaskGrap
                 'items': {
                     'type': 'string'
                 }
             }
         }
     }
 )
 def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     to_run = []
     for elem in input['tasks']:
         if elem in full_task_graph.tasks:
             to_run.append(elem)
         else:
             raise Exception('{} was not found in the task-graph'.format(elem))
 
--- a/taskcluster/taskgraph/actions/add_talos.py
+++ b/taskcluster/taskgraph/actions/add_talos.py
@@ -34,21 +34,17 @@ logger = logging.getLogger(__name__)
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         },
         'additionalProperties': False
     },
 )
 def add_all_talos(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     times = input.get('times', 1)
     for i in xrange(times):
         to_run = [label
                   for label, entry
                   in full_task_graph.tasks.iteritems() if 'talos_try_name' in entry.attributes]
 
         create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -77,21 +77,17 @@ logger = logging.getLogger(__name__)
                 'additionalProperties': {'type': 'string'}
             }
         },
         'additionalProperties': False,
         'required': ['path']
     }
 )
 def mochitest_retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     pre_task = full_task_graph.tasks[task['metadata']['name']]
 
     # fix up the task's dependencies, similar to how optimization would
     # have done in the decision
     dependencies = {name: label_to_taskid[label]
                     for name, label in pre_task.dependencies.iteritems()}
     new_task_definition = resolve_task_references(pre_task.label, pre_task.task, dependencies)
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -185,17 +185,17 @@ def register_callback_action(name, title
             repo_scope = 'assume:repo:{}/{}:*'.format(
                 match.group(1), match.group(2))
 
             task_group_id = os.environ.get('TASK_ID', slugid())
 
             return {
                 'created': {'$fromNow': ''},
                 'deadline': {'$fromNow': '12 hours'},
-                'expires': {'$fromNow': '14 days'},
+                'expires': {'$fromNow': '1 year'},
                 'metadata': {
                     'owner': 'mozilla-taskcluster-maintenance@mozilla.com',
                     'source': '{}raw-file/{}/{}'.format(
                         parameters['head_repository'], parameters['head_rev'], source_path,
                     ),
                     'name': 'Action: {}'.format(title),
                     'description': 'Task executing callback for action.\n\n---\n' + description,
                 },
@@ -210,31 +210,40 @@ def register_callback_action(name, title
                     'createdForUser': parameters['owner'],
                     'kind': 'action-callback',
                 },
                 'routes': [
                     'tc-treeherder.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
                     'tc-treeherder-stage.v2.{}.{}.{}'.format(
                         parameters['project'], parameters['head_rev'], parameters['pushlog_id']),
+                    'index.gecko.v2.{}.pushlog-id.{}.actions.${{ownTaskId}}'.format(
+                        parameters['project'], parameters['pushlog_id'])
                 ],
                 'payload': {
                     'env': {
                         'GECKO_BASE_REPOSITORY': 'https://hg.mozilla.org/mozilla-unified',
                         'GECKO_HEAD_REPOSITORY': parameters['head_repository'],
                         'GECKO_HEAD_REF': parameters['head_ref'],
                         'GECKO_HEAD_REV': parameters['head_rev'],
                         'HG_STORE_PATH': '/builds/worker/checkouts/hg-store',
                         'ACTION_TASK_GROUP_ID': task_group_id,
                         'ACTION_TASK_ID': {'$json': {'$eval': 'taskId'}},
                         'ACTION_TASK': {'$json': {'$eval': 'task'}},
                         'ACTION_INPUT': {'$json': {'$eval': 'input'}},
                         'ACTION_CALLBACK': cb.__name__,
                         'ACTION_PARAMETERS': {'$json': {'$eval': 'parameters'}},
                     },
+                    'artifacts': {
+                        'public': {
+                            'type': 'directory',
+                            'path': '/builds/worker/artifacts',
+                            'expires': {'$fromNow': '1 year'},
+                        },
+                    },
                     'cache': {
                         'level-{}-checkouts'.format(parameters['level']):
                             '/builds/worker/checkouts',
                     },
                     'features': {
                         'taskclusterProxy': True,
                         'chainOfTrust': True,
                     },
--- a/taskcluster/taskgraph/actions/retrigger.py
+++ b/taskcluster/taskgraph/actions/retrigger.py
@@ -46,21 +46,17 @@ logger = logging.getLogger(__name__)
                 'maximum': 6,
                 'title': 'Times',
                 'description': 'How many times to run each task.',
             }
         }
     }
 )
 def retrigger_action(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
 
     label = task['metadata']['name']
     with_downstream = ' '
     to_run = [label]
 
     if input.get('downstream'):
         to_run = full_task_graph.graph.transitive_closure(set(to_run), reverse=True).nodes
         to_run = to_run & set(label_to_taskid.keys())
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -25,22 +25,18 @@ logger = logging.getLogger(__name__)
         "\n"
         "This action is for use on pushes that will be merged into another branch,"
         "to check that optimization hasn't hidden any failures."
     ),
     order=100,  # Useful for sheriffs, but not top of the list
     context=[],  # Applies to decision task
 )
 def run_missing_tests(parameters, input, task_group_id, task_id, task):
-    decision_task_id = find_decision_task(parameters)
-
-    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
-    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(parameters)
     target_tasks = get_artifact(decision_task_id, "public/target-tasks.json")
-    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
 
     # The idea here is to schedule all tasks of the `test` kind that were
     # targetted but did not appear in the final task-graph -- those were the
     # optimized tasks.
     to_run = []
     already_run = 0
     for label in target_tasks:
         task = full_task_graph.tasks[label]
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -2,29 +2,50 @@
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from taskgraph import create
+from taskgraph.decision import write_artifact
 from taskgraph.taskgraph import TaskGraph
 from taskgraph.optimize import optimize_task_graph
-from taskgraph.util.taskcluster import get_session, find_task_id
+from taskgraph.util.taskcluster import get_session, find_task_id, get_artifact, list_tasks
 
 
 def find_decision_task(parameters):
     """Given the parameters for this action, find the taskId of the decision
     task"""
     return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
         parameters['project'],
         parameters['pushlog_id']))
 
 
+def fetch_graph_and_labels(parameters):
+    decision_task_id = find_decision_task(parameters)
+
+    # First grab the graph and labels generated during the initial decision task
+    full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
+    _, full_task_graph = TaskGraph.from_json(full_task_graph)
+    label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
+
+    # Now fetch any modifications made by action tasks and swap out new tasks
+    # for old ones
+    namespace = 'gecko.v2.{}.pushlog-id.{}.actions'.format(
+        parameters['project'],
+        parameters['pushlog_id'])
+    for action in list_tasks(namespace):
+        run_label_to_id = get_artifact(action, "public/label-to-taskid.json")
+        label_to_taskid.update(run_label_to_id)
+
+    return (decision_task_id, full_task_graph, label_to_taskid)
+
+
 def create_task_from_def(task_id, task_def, level):
     """Create a new task from a definition rather than from a label
     that is already in the full-task-graph. The task definition will
     have {relative-datestamp': '..'} rendered just like in a decision task.
     Use this for entirely new tasks or ones that change internals of the task.
     It is useful if you want to "edit" the full_task_graph and then hand
     it to this function. No dependencies will be scheduled. You must handle
     this yourself. Seeing how create_tasks handles it might prove helpful."""
@@ -44,9 +65,12 @@ def create_tasks(to_run, full_task_graph
     target_graph = full_task_graph.graph.transitive_closure(to_run)
     target_task_graph = TaskGraph(
         {l: full_task_graph[l] for l in target_graph.nodes},
         target_graph)
     optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
                                                                 params,
                                                                 to_run,
                                                                 label_to_taskid)
+    write_artifact('task-graph.json', optimized_task_graph.to_json())
+    write_artifact('label-to-taskid.json', label_to_taskid)
+    write_artifact('to-run.json', list(to_run))
     create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -1,16 +1,17 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
+import datetime
 import functools
 import yaml
 import requests
 from mozbuild.util import memoize
 from requests.packages.urllib3.util.retry import Retry
 from requests.adapters import HTTPAdapter
 
 _TC_ARTIFACT_LOCATION = \
@@ -22,19 +23,22 @@ def get_session():
     session = requests.Session()
     retry = Retry(total=5, backoff_factor=0.1,
                   status_forcelist=[500, 502, 503, 504])
     session.mount('http://', HTTPAdapter(max_retries=retry))
     session.mount('https://', HTTPAdapter(max_retries=retry))
     return session
 
 
-def _do_request(url):
+def _do_request(url, content=None):
     session = get_session()
-    response = session.get(url, stream=True)
+    if content is None:
+        response = session.get(url, stream=True)
+    else:
+        response = session.post(url, json=content)
     if response.status_code >= 400:
         # Consume content before raise_for_status, so that the connection can be
         # reused.
         response.content
     response.raise_for_status()
     return response
 
 
@@ -69,35 +73,53 @@ def get_artifact(task_id, path, use_prox
     return _handle_artifact(path, response)
 
 
 def list_artifacts(task_id, use_proxy=False):
     response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
     return response.json()['artifacts']
 
 
-def get_index_url(index_path, use_proxy=False):
+def get_index_url(index_path, use_proxy=False, multiple=False):
     if use_proxy:
-        INDEX_URL = 'http://taskcluster/index/v1/task/{}'
+        INDEX_URL = 'http://taskcluster/index/v1/task{}/{}'
     else:
-        INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
-    return INDEX_URL.format(index_path)
+        INDEX_URL = 'https://index.taskcluster.net/v1/task{}/{}'
+    return INDEX_URL.format('s' if multiple else '', index_path)
 
 
 def find_task_id(index_path, use_proxy=False):
     response = _do_request(get_index_url(index_path, use_proxy))
     return response.json()['taskId']
 
 
 def get_artifact_from_index(index_path, artifact_path, use_proxy=False):
     full_path = index_path + '/artifacts/' + artifact_path
     response = _do_request(get_index_url(full_path, use_proxy))
     return _handle_artifact(full_path, response)
 
 
+def list_tasks(index_path, use_proxy=False):
+    results = []
+    data = {}
+    while True:
+        response = _do_request(get_index_url(index_path, use_proxy, multiple=True), data)
+        response = response.json()
+        results += response['tasks']
+        if response.get('continuationToken'):
+            data = {'continuationToken': response.get('continuationToken')}
+        else:
+            break
+
+    # We can sort on expires because all of these tasks should be created with the
+    # same expires time so they end up in order from earliest to latest action
+    results.sort(key=lambda t: datetime.datetime.strptime(t['expires'], '%Y-%m-%dT%H:%M:%S.%fZ'))
+    return [t['taskId'] for t in results]
+
+
 def get_task_url(task_id, use_proxy=False):
     if use_proxy:
         TASK_URL = 'http://taskcluster/queue/v1/task/{}'
     else:
         TASK_URL = 'https://queue.taskcluster.net/v1/task/{}'
     return TASK_URL.format(task_id)