Bug 1388407 - Fix timeouts in action-task graph submission draft
authorBrian Stack <bstack@mozilla.com>
Fri, 11 Aug 2017 15:31:43 -0700
changeset 645180 2b3d0637ec0258b582f61478a7826d4e0dd8cd44
parent 645074 80ff3f300e05f38f96c385b03d1973a966a2bd35
child 725840 7fca2d8553ed421966e741cb02c28d93135a6e20
push id73685
push userbstack@mozilla.com
push dateFri, 11 Aug 2017 22:32:36 +0000
bugs1388407
milestone57.0a1
Bug 1388407 - Fix timeouts in action-task graph submission MozReview-Commit-ID: 5hoFSaiYEXD
taskcluster/taskgraph/actions/add_new_jobs.py
taskcluster/taskgraph/actions/backfill.py
taskcluster/taskgraph/actions/mochitest_retrigger.py
taskcluster/taskgraph/actions/registry.py
taskcluster/taskgraph/actions/run_missing_tests.py
taskcluster/taskgraph/actions/util.py
taskcluster/taskgraph/create.py
--- a/taskcluster/taskgraph/actions/add_new_jobs.py
+++ b/taskcluster/taskgraph/actions/add_new_jobs.py
@@ -4,17 +4,17 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from .registry import register_callback_action
 from slugid import nice as slugid
 
-from .util import (create_task, find_decision_task)
+from .util import (create_tasks, find_decision_task)
 from taskgraph.util.taskcluster import get_artifact
 from taskgraph.util.parameterization import resolve_task_references
 from taskgraph.taskgraph import TaskGraph
 
 
 @register_callback_action(
     name='add-new-jobs',
     title='Add new jobs',
@@ -37,22 +37,16 @@ from taskgraph.taskgraph import TaskGrap
 )
 def add_new_jobs_action(parameters, input, task_group_id, task_id, task):
     decision_task_id = find_decision_task(parameters)
 
     full_task_graph = get_artifact(decision_task_id, "public/full-task-graph.json")
     _, full_task_graph = TaskGraph.from_json(full_task_graph)
     label_to_taskid = get_artifact(decision_task_id, "public/label-to-taskid.json")
 
+    to_run = []
     for elem in input['tasks']:
         if elem in full_task_graph.tasks:
-            task = full_task_graph.tasks[elem]
-
-            # fix up the task's dependencies, similar to how optimization would
-            # have done in the decision
-            dependencies = {name: label_to_taskid[label]
-                            for name, label in task.dependencies.iteritems()}
-            task_def = resolve_task_references(task.label, task.task, dependencies)
-            task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
-            # actually create the new task
-            create_task(slugid(), task_def, parameters['level'])
+            to_run.append(elem)
         else:
             raise Exception('{} was not found in the task-graph'.format(elem))
+
+    create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
--- a/taskcluster/taskgraph/actions/backfill.py
+++ b/taskcluster/taskgraph/actions/backfill.py
@@ -7,22 +7,22 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 
 import requests
 from slugid import nice as slugid
 
 from .registry import register_callback_action
-from .util import create_task
+from .util import find_decision_task, create_tasks
 from taskgraph.util.taskcluster import get_artifact_from_index
 from taskgraph.util.parameterization import resolve_task_references
 from taskgraph.taskgraph import TaskGraph
 
-PUSHLOG_TMPL = '{}json-pushes?version=2&startID={}&endID={}'
+PUSHLOG_TMPL = '{}/json-pushes?version=2&startID={}&endID={}'
 INDEX_TMPL = 'gecko.v2.{}.pushlog-id.{}.decision'
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     title='Backfill',
     name='backfill',
@@ -74,18 +74,17 @@ def backfill_action(parameters, input, t
     for push in pushes:
         full_task_graph = get_artifact_from_index(
                 INDEX_TMPL.format(parameters['project'], push),
                 'public/full-task-graph.json')
         _, full_task_graph = TaskGraph.from_json(full_task_graph)
         label_to_taskid = get_artifact_from_index(
                 INDEX_TMPL.format(parameters['project'], push),
                 'public/label-to-taskid.json')
+        push_params = get_artifact_from_index(
+                INDEX_TMPL.format(parameters['project'], push),
+                'public/parameters.yml')
+        push_decision_task_id = find_decision_task(push_params)
 
         if label in full_task_graph.tasks.keys():
-            task = full_task_graph.tasks[label]
-            dependencies = {name: label_to_taskid[label]
-                            for name, label in task.dependencies.iteritems()}
-            task_def = resolve_task_references(task.label, task.task, dependencies)
-            task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
-            create_task(slugid(), task_def, parameters['level'])
+            create_tasks([label], full_task_graph, label_to_taskid, push_params, push_decision_task_id)
         else:
             logging.info('Could not find {} on {}. Skipping.'.format(label, push))
--- a/taskcluster/taskgraph/actions/mochitest_retrigger.py
+++ b/taskcluster/taskgraph/actions/mochitest_retrigger.py
@@ -6,17 +6,17 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import logging
 
 from slugid import nice as slugid
 
-from .util import (find_decision_task, create_task)
+from .util import (find_decision_task, create_task_from_def)
 from .registry import register_callback_action
 from taskgraph.util.taskcluster import get_artifact
 from taskgraph.util.parameterization import resolve_task_references
 from taskgraph.taskgraph import TaskGraph
 
 TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
 
 logger = logging.getLogger(__name__)
@@ -135,9 +135,9 @@ def mochitest_retrigger_action(parameter
 
     # tweak the treeherder symbol
     new_task_definition['extra']['treeherder']['symbol'] += '-custom'
 
     logging.info("New task definition: %s", new_task_definition)
 
     # actually create the new task
     new_task_id = slugid()
-    create_task(new_task_id, new_task_definition, parameters['level'])
+    create_task_from_def(new_task_id, new_task_definition, parameters['level'])
--- a/taskcluster/taskgraph/actions/registry.py
+++ b/taskcluster/taskgraph/actions/registry.py
@@ -8,19 +8,19 @@ from __future__ import absolute_import, 
 
 import json
 import os
 import inspect
 import re
 from mozbuild.util import memoize
 from types import FunctionType
 from collections import namedtuple
+from taskgraph import create
 from taskgraph.util.docker import docker_image
 from taskgraph.parameters import Parameters
-from . import util
 
 
 GECKO = os.path.realpath(os.path.join(__file__, '..', '..', '..'))
 
 actions = []
 callbacks = {}
 
 Action = namedtuple('Action', [
@@ -304,17 +304,17 @@ def trigger_action_callback(task_group_i
     the action callback in testing mode, without actually creating tasks.
     """
     cb = get_callbacks().get(callback, None)
     if not cb:
         raise Exception('Unknown callback: {}. Known callbacks: {}'.format(
             callback, get_callbacks().keys()))
 
     if test:
-        util.testing = True
+        create.testing = True
 
     cb(Parameters(**parameters), input, task_group_id, task_id, task)
 
 
 @memoize
 def _load():
     # Load all modules from this folder, relying on the side-effects of register_
     # functions to populate the action registry.
--- a/taskcluster/taskgraph/actions/run_missing_tests.py
+++ b/taskcluster/taskgraph/actions/run_missing_tests.py
@@ -5,17 +5,17 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import logging
 from slugid import nice as slugid
 
 from .registry import register_callback_action
-from .util import create_task, find_decision_task
+from .util import create_tasks, find_decision_task
 from taskgraph.util.taskcluster import get_artifact
 from taskgraph.util.parameterization import resolve_task_references
 from taskgraph.taskgraph import TaskGraph
 
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
@@ -46,22 +46,14 @@ def run_missing_tests(parameters, input,
     already_run = 0
     for label in target_tasks:
         task = full_task_graph.tasks[label]
         if task.kind != 'test':
             continue  # not a test
         if label in label_to_taskid:
             already_run += 1
             continue
-        to_run.append(task)
-
-    for task in to_run:
+        to_run.append(label)
 
-        # fix up the task's dependencies, similar to how optimization would
-        # have done in the decision
-        dependencies = {name: label_to_taskid[label]
-                        for name, label in task.dependencies.iteritems()}
-        task_def = resolve_task_references(task.label, task.task, dependencies)
-        task_def.setdefault('dependencies', []).extend(dependencies.itervalues())
-        create_task(slugid(), task_def, parameters['level'])
+    create_tasks(to_run, full_task_graph, label_to_taskid, parameters, decision_task_id)
 
     logger.info('Out of {} test tasks, {} already existed and the action created {}'.format(
         already_run + len(to_run), already_run, len(to_run)))
--- a/taskcluster/taskgraph/actions/util.py
+++ b/taskcluster/taskgraph/actions/util.py
@@ -5,35 +5,47 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import sys
 
 from taskgraph import create
+from taskgraph.taskgraph import TaskGraph
+from taskgraph.optimize import optimize_task_graph
 from taskgraph.util.taskcluster import get_session, find_task_id
 
-# this is set to true for `mach taskgraph action-callback --test`
-testing = False
-
 
 def find_decision_task(parameters):
     """Given the parameters for this action, find the taskId of the decision
     task"""
     return find_task_id('gecko.v2.{}.pushlog-id.{}.decision'.format(
         parameters['project'],
         parameters['pushlog_id']))
 
 
-def create_task(task_id, task_def, level):
-    """Create a new task.  The task definition will have {relative-datestamp':
-    '..'} rendered just like in a decision task.  Action callbacks should use
-    this function to create new tasks, as it has the additional advantage of
-    allowing easy debugging with `mach taskgraph action-callback --test`."""
+def create_task_from_def(task_id, task_def, level):
+    """Create a new task from a definition rather than from a label
+    that is already in the full-task-graph. Use this for entirely
+    new tasks or ones that change internals of the task.
+    No dependencies will be scheduled."""
     task_def['schedulerId'] = 'gecko-level-{}'.format(level)
-    if testing:
-        json.dump([task_id, task_def], sys.stdout,
-                  sort_keys=True, indent=4, separators=(',', ': '))
-        return
     label = task_def['metadata']['name']
     session = get_session()
     create.create_task(session, task_id, label, task_def)
+
+
+def create_tasks(to_run, full_task_graph, label_to_taskid, params, decision_task_id):
+    """Create new tasks.  The task definition will have {relative-datestamp':
+    '..'} rendered just like in a decision task.  Action callbacks should use
+    this function to create new tasks,
+    allowing easy debugging with `mach taskgraph action-callback --test`.
+    This builds up all required tasks to run in order to run the tasks requested."""
+    to_run = set(to_run)
+    target_graph = full_task_graph.graph.transitive_closure(to_run)
+    target_task_graph = TaskGraph(
+        {l: full_task_graph[l] for l in target_graph.nodes},
+        target_graph)
+    optimized_task_graph, label_to_taskid = optimize_task_graph(target_task_graph,
+                                                                params,
+                                                                to_run)
+    create.create_tasks(optimized_task_graph, label_to_taskid, params, decision_task_id)
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -4,42 +4,46 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import concurrent.futures as futures
 import requests
 import requests.adapters
 import json
 import os
+import sys
 import logging
 
 from slugid import nice as slugid
 from taskgraph.util.parameterization import resolve_timestamps
 from taskgraph.util.time import current_json_time
 
 logger = logging.getLogger(__name__)
 
 # the maximum number of parallel createTask calls to make
 CONCURRENCY = 50
 
+# this is set to true for `mach taskgraph action-callback --test`
+testing = False
 
-def create_tasks(taskgraph, label_to_taskid, params):
+
+def create_tasks(taskgraph, label_to_taskid, params, decision_task_id=None):
     taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
 
     session = requests.Session()
 
     # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
     # that limit. Connections are established as needed, so using a large value
     # should not negatively impact performance.
     http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
                                                  pool_maxsize=CONCURRENCY)
     session.mount('https://', http_adapter)
     session.mount('http://', http_adapter)
 
-    decision_task_id = os.environ.get('TASK_ID')
+    decision_task_id = decision_task_id or os.environ.get('TASK_ID')
 
     # when running as an actual decision task, we use the decision task's
     # taskId as the taskGroupId.  The process that created the decision task
     # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
     # fall back to a slugid
     task_group_id = decision_task_id or slugid()
     scheduler_id = 'gecko-level-{}'.format(params['level'])
 
@@ -94,16 +98,21 @@ def create_tasks(taskgraph, label_to_tas
 def create_task(session, task_id, label, task_def):
     # create the task using 'http://taskcluster/queue', which is proxied to the queue service
     # with credentials appropriate to this job.
 
     # Resolve timestamps
     now = current_json_time(datetime_format=True)
     task_def = resolve_timestamps(now, task_def)
 
+    if testing:
+        json.dump([task_id, task_def], sys.stdout,
+                  sort_keys=True, indent=4, separators=(',', ': '))
+        return
+
     logger.debug("Creating task with taskId {} for {}".format(task_id, label))
     res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
                       data=json.dumps(task_def))
     if res.status_code != 200:
         try:
             logger.error(res.json()['message'])
         except:
             logger.error(res.text)