Bug 1258497: first draft of mach taskgraph and subcommands draft
authorDustin J. Mitchell <dustin@mozilla.com>
Mon, 02 May 2016 15:14:31 +0000
changeset 358381 9c24d54c92a4c05312f0bf370ea4d59a3db6b6e2
parent 357902 01067412db32dd8eee38f64e6d66942b437fa924
child 358382 e437ab047f993f04e0f596914475bedeaa087daf
push id16995
push userdmitchell@mozilla.com
push dateMon, 02 May 2016 18:47:33 +0000
bugs1258497
milestone49.0a1
Bug 1258497: first draft of mach taskgraph and subcommands This adds a new suite of mach commands, `mach taskgraph`, along with an implementation of a task-graph generation algorithm, tests, and documentation. MozReview-Commit-ID: 1d02LGjGfMd
build/mach_bootstrap.py
config/mozunit.py
moz.build
taskcluster/PARAMETERS.md
taskcluster/ci/legacy/kind.yml
taskcluster/mach_commands.py
taskcluster/taskgraph/__init__.py
taskcluster/taskgraph/generator.py
taskcluster/taskgraph/graph.py
taskcluster/taskgraph/kind/__init__.py
taskcluster/taskgraph/kind/base.py
taskcluster/taskgraph/kind/legacy.py
taskcluster/taskgraph/test/__init__.py
taskcluster/taskgraph/test/test_generator.py
taskcluster/taskgraph/test/test_graph.py
taskcluster/taskgraph/types.py
testing/taskcluster/mach_commands.py
testing/taskcluster/taskcluster_graph/mach_util.py
--- a/build/mach_bootstrap.py
+++ b/build/mach_bootstrap.py
@@ -71,16 +71,17 @@ SEARCH_PATHS = [
     'python/slugid',
     'build',
     'config',
     'dom/bindings',
     'dom/bindings/parser',
     'dom/media/test/external',
     'layout/tools/reftest',
     'other-licenses/ply',
+    'taskcluster',
     'testing',
     'testing/firefox-ui/harness',
     'testing/firefox-ui/tests',
     'testing/luciddream',
     'testing/marionette/harness',
     'testing/marionette/harness/marionette/runner/mixins/browsermob-proxy-py',
     'testing/marionette/client',
     'testing/mozbase/mozcrash',
@@ -123,16 +124,17 @@ MACH_MODULES = [
     'python/mach/mach/commands/settings.py',
     'python/compare-locales/mach_commands.py',
     'python/mozboot/mozboot/mach_commands.py',
     'python/mozbuild/mozbuild/mach_commands.py',
     'python/mozbuild/mozbuild/backend/mach_commands.py',
     'python/mozbuild/mozbuild/compilation/codecomplete.py',
     'python/mozbuild/mozbuild/frontend/mach_commands.py',
     'services/common/tests/mach_commands.py',
+    'taskcluster/mach_commands.py',
     'testing/firefox-ui/mach_commands.py',
     'testing/luciddream/mach_commands.py',
     'testing/mach_commands.py',
     'testing/marionette/mach_commands.py',
     'testing/mochitest/mach_commands.py',
     'testing/mozharness/mach_commands.py',
     'testing/talos/mach_commands.py',
     'testing/taskcluster/mach_commands.py',
--- a/config/mozunit.py
+++ b/config/mozunit.py
@@ -162,10 +162,10 @@ class MockedOpen(object):
             return True
 
         abspath = os.path.abspath(p)
         if abspath in self.files:
             return True
 
         return self._orig_path_exists(p)
 
-def main(*args):
-    unittest.main(testRunner=MozTestRunner(),*args)
+def main(*args, **kwargs):
+    unittest.main(testRunner=MozTestRunner(), *args, **kwargs)
--- a/moz.build
+++ b/moz.build
@@ -17,16 +17,17 @@ CONFIGURE_SUBST_FILES += [
 
 if CONFIG['ENABLE_CLANG_PLUGIN']:
     DIRS += ['build/clang-plugin']
 
 DIRS += [
     'config',
     'python',
     'testing',
+    'taskcluster',
 ]
 
 if not CONFIG['JS_STANDALONE']:
     CONFIGURE_SUBST_FILES += [
         'tools/update-packaging/Makefile',
     ]
     CONFIGURE_DEFINE_FILES += [
         'mozilla-config.h',
new file mode 100644
--- /dev/null
+++ b/taskcluster/PARAMETERS.md
@@ -0,0 +1,76 @@
+Task-Graph Generation Parameters
+================================
+
+Task-graph generation takes a collection of parameters as input, in the form of
+a JSON object.
+
+During decision-task processing, some of these parameters are supplied on the
+command line or by environment variables.  It helpfully produces a full
+parameters file as one of its output artifacts.  The other `mach taskgraph`
+commands can take this file as input.  This can be very helpful when working on
+a change to the task graph.
+
+The properties of the parameters object are described here, divided rougly by
+topic.
+
+Push Information
+----------------
+
+* `base_repository` - the repository from which to do an initial clone, utilizing
+  any available caching.
+
+* `head_repository` - the repository containing the changeset to be built.  This
+  may differ from `base_repository` in cases where `base_repository` is likely
+  to be cached and only a few additional commits are needed from
+  `head_repository`.
+
+* `head_rev` - the revision to check out; this can be a short revision string
+
+* `head_ref` - for Mercurial repositories, this is the same as `head_rev`.  For
+  git repositories, which do not allow pulling explicit revisions, this gives
+  the symbolic ref containing `head_rev` that should be pulled from
+  `head_repository`.
+
+* `revision_hash` - the full-length revision string
+
+* `revision_hash` - same thing?? XXX
+
+* `owner` - email address indicating the person who made the push.  Note that this
+  value may be forged and *must not* be relied on for authentication.
+
+* `message` (optional) - the commit message
+
+* `pushlog_id` (optional) - the ID from the `hg.mozilla.org` pushlog
+
+Tree Information
+----------------
+
+* `project` - another name for what may otherwise be called tree or branch or
+  repository.  This is the unqualified name, such as `mozilla-central` or
+  `cedar`.
+
+* `level` - the SCM level associated with this tree.  This dictates the names
+  of resources used in the generated tasks, and those tasks will fail if it
+  is incorrect.
+
+Target Set
+----------
+
+The "target set" is the set of task labels which must be included in a task
+graph.  The task graph generation process will include any tasks required by
+those in the target set, recursively.  In a decision task, this set can be
+specified programmatically using one of a variety of methods (e.g., parsing try
+syntax or reading a project-specific configuration file).
+
+The decision task writes its task set to the `task_set.json` artifact, and this
+can be copied into `parameters['task_set']` for debugging with other `mach
+taskgraph` commands.
+
+* `target_set_method` (optional) - the method to use to determine the target
+  task set.  This is the suffix of one of the `task_set_xxx` methods in
+  `tascluster/mach_commands:DecisionTask`.  If omitted, all tasks are targeted.
+
+* `target_set` (optional) - the target set method `from_parameters` reads the
+  target set, as a list of task labels, from this parameter.  The `mach
+  taskgraph` commands other than `decision` read from this value automatically,
+  if it is set.
new file mode 100644
--- /dev/null
+++ b/taskcluster/ci/legacy/kind.yml
@@ -0,0 +1,2 @@
+implementation: 'taskgraph.kind.legacy:LegacyKind'
+legacy_path: '../../../testing/taskcluster'
new file mode 100644
--- /dev/null
+++ b/taskcluster/mach_commands.py
@@ -0,0 +1,290 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import logging
+import os
+import json
+import copy
+import re
+import sys
+import time
+import textwrap
+from collections import namedtuple
+
+from mach.decorators import (
+    CommandArgument,
+    CommandProvider,
+    Command,
+    SubCommand,
+)
+
+from mozbuild.base import MachCommandBase
+
+ROOT = os.path.dirname(os.path.realpath(__file__))
+TOPSRCDIR = os.path.realpath(os.path.join(ROOT, '..'))
+
+
+class TaskGraphSubCommand(SubCommand):
+    """A SubCommand with TaskGraph-specific arguments"""
+
+    def __call__(self, func):
+        after = SubCommand.__call__(self, func)
+        args = [
+            CommandArgument('--root', '-r', default='taskcluster/ci',
+                            help="root of the taskgraph definition relative to topsrcdir"),
+            CommandArgument('--parameters', '-p', required=False,
+                            help="parameters file (.yml or .json; see "
+                                 "`taskcluster/PARAMETERS.md)`"),
+            CommandArgument('--no-optimize', dest="optimize", action="store_false",
+                            default="true",
+                            help="do not remove tasks from the graph that are found in the "
+                            "index (a.k.a. optimize the graph)"),
+        ]
+        for arg in args:
+            after = arg(after)
+        return after
+
+
+@CommandProvider
+class MachCommands(MachCommandBase):
+
+    @Command('taskgraph', category="ci",
+             description="Manipulate TaskCluster task graphs defined in-tree")
+    def taskgraph(self):
+        """The taskgraph subcommands all relate to the generation of task graphs
+        for Gecko continuous integration.  A task graph is a set of tasks linked
+        by dependencies: for example, a binary must be built before it is tested,
+        and that build may further depend on various toolchains, libraries, etc.
+        """
+
+    @SubCommand('taskgraph', 'python-tests',
+                'Run the taskgraph unit tests')
+    def taskgraph_python_tests(self, **options):
+        import unittest
+        suite = unittest.defaultTestLoader.discover('taskgraph.test')
+        runner = unittest.TextTestRunner(verbosity=2)
+        result = runner.run(suite)
+        if not result.wasSuccessful:
+            sys.exit(1)
+
+    @TaskGraphSubCommand('taskgraph', 'tasks',
+                         "Show all tasks in the taskgraph")
+    def taskgraph_tasks(self, **options):
+        parameters = self.load_parameters_file(options)
+        tgg = self.get_taskgraph_generator(options, parameters)
+        self.show_taskgraph(tgg.full_task_set, options)
+
+    @TaskGraphSubCommand('taskgraph', 'full',
+                         "Show the full taskgraph")
+    def taskgraph_full(self, **options):
+        parameters = self.load_parameters_file(options)
+        tgg = self.get_taskgraph_generator(options, parameters)
+        self.show_taskgraph(tgg.full_task_graph, options)
+
+    @TaskGraphSubCommand('taskgraph', 'target',
+                         "Show the target task set")
+    def taskgraph_target(self, **options):
+        parameters = self.load_parameters_file(options)
+        tgg = self.get_taskgraph_generator(options, parameters)
+        self.set_target_tasks(tgg, parameters)
+        self.show_taskgraph(tgg.target_task_set, options)
+
+    @TaskGraphSubCommand('taskgraph', 'target-graph',
+                         "Show the target taskgraph (the target set with its dependencies)")
+    def taskgraph_target_taskgraph(self, **options):
+        parameters = self.load_parameters_file(options)
+        tgg = self.get_taskgraph_generator(options, parameters)
+        self.set_target_tasks(tgg, parameters)
+        self.show_taskgraph(tgg.target_task_graph, options)
+
+    @TaskGraphSubCommand('taskgraph', 'optimized',
+                         "Show the optimized taskgraph")
+    def taskgraph_optimized(self, **options):
+        parameters = self.load_parameters_file(options)
+        tgg = self.get_taskgraph_generator(options, parameters)
+        self.set_target_tasks(tgg, parameters)
+        self.show_taskgraph(tgg.optimized_task_graph, options)
+
+    @TaskGraphSubCommand('taskgraph', 'decision', textwrap.dedent("""\
+                         Decision task: generate a task graph and submit to TaskCluster.
+                         """))
+    @CommandArgument('--base-repository',
+        default=os.environ.get('GECKO_BASE_REPOSITORY'),
+        help='URL for "base" repository to clone ($GECKO_BASE_REPOSITORY)')
+    @CommandArgument('--head-repository',
+        default=os.environ.get('GECKO_HEAD_REPOSITORY'),
+        help='URL for "head" repository to fetch revision from '
+             '($GECKO_HEAD_REPOSITORY)')
+    @CommandArgument('--head-ref',
+        default=os.environ.get('GECKO_HEAD_REF'),
+        help='Reference (this is same as rev usually for hg) ($GECKO_HEAD_REF)')
+    @CommandArgument('--head-rev',
+        default=os.environ.get('GECKO_HEAD_REV'),
+        help='Commit revision to use from head repository ($GECKO_HEAD_REV)')
+    @CommandArgument('--message',
+        help='Commit message to be parsed. Example: "try: -b do -p all -u all"')
+    @CommandArgument('--revision-hash',
+            required=False,
+            help='Treeherder revision hash to attach results to')
+    @CommandArgument('--project',
+        required=True,
+        help='Project to use for creating task graph. Example: --project=try')
+    @CommandArgument('--pushlog-id',
+        dest='pushlog_id',
+        required=False,
+        default=0)
+    @CommandArgument('--owner',
+        required=True,
+        help='email address of who owns this graph')
+    @CommandArgument('--level',
+        default="1",
+        help='SCM level of this repository')
+    def taskgraph_decision(self, **options):
+        # load parameters from env vars, command line, etc.
+        parameters = self.get_decision_parameters(options)
+
+        # create a TaskGraphGenerator instance
+        import taskgraph.generator
+        tgg = taskgraph.generator.TaskGraphGenerator(
+            root=options['root'],
+            log=self.log,
+            parameters=parameters,
+            optimization_finder=None)  # XXX
+
+        # produce some artifacts
+        def write_artifact(filename, data):
+            self.log(logging.INFO, 'writing-artifact', {
+                'filename': filename,
+            }, 'writing artifact file `{filename}`')
+            if not os.path.isdir('artifacts'):
+                os.mkdir('artifacts')
+            path = os.path.join('artifacts', filename)
+            if filename.endswith('.yml'):
+                import yaml
+                yaml.dump(data, open(path, 'w'), allow_unicode=True, default_flow_style=False)
+            elif filename.endswith('.json'):
+                json.dump(data, open(path, 'w'),
+                          sort_keys=True, indent=2, separators=(',', ': '))
+            else:
+                raise TypeError("Don't know how to write to {}".format(filename))
+
+        # generate the target_tasks list and write it as an artifact in case someone
+        # wants to reproduce this run
+        target_tasks = self.set_target_tasks(tgg, parameters)
+        if target_tasks:
+            write_artifact('target_tasks.json', target_tasks)
+
+        # write out the parameters used to generate this graph
+        write_artifact('parameters.yml', parameters)
+
+        # write out the full graph for reference
+        write_artifact('full-task-graph.json',
+                       self.taskgraph_to_json(tgg.full_task_graph))
+
+        # write out the optimized task graph to describe what will happen
+        write_artifact('task-graph.json',
+                       self.taskgraph_to_json(tgg.optimized_task_graph))
+
+        # TODO: call the taskcluster API to create the tasks in the optimized graph
+
+    ##
+    # Parameter handling
+
+    def load_parameters_file(self, options):
+        filename = options['parameters']
+        if not filename:
+            return {}
+        file = open(filename)
+        if filename.endswith('.yml'):
+            import yaml
+            return yaml.load(file)
+        elif filename.endswith('.json'):
+            return json.load(file)
+        else:
+            print("Parameters file `{}` is not JSON or YAML".format(filename))
+            sys.exit(1)
+
+    def get_decision_parameters(self, options):
+        parameters = self.load_parameters_file(options)
+
+        # override some parameters from command-line options
+        option_names = [
+            'base_repository',
+            'head_repository',
+            'head_rev',
+            'head_ref',
+            'revision_hash',
+            'message',
+            'project',
+            'pushlog_id',
+            'owner',
+            'level',
+        ]
+        for n in option_names:
+            if options[n]:
+                parameters[n] = options[n]
+
+        return parameters
+
+    ##
+    # Target tasks methods
+
+    def set_target_tasks(self, tgg, parameters):
+        """If params['target_task_set_method'] is set, use it to determine the
+        target task set, update the task graph with that set, and return it.  Note
+        that as a side-effect, this generates the full task set."""
+        target_tasks_method = parameters.get('target_tasks_method')
+        if target_tasks_method:
+            meth = getattr(self, 'target_tasks_' + target_tasks_method)
+            target_tasks = meth(tgg.full_task_graph, parameters)
+            tgg.set_target_tasks(target_tasks)
+            return target_tasks
+
+    def target_tasks_from_parameters(self, full_task_graph, parameters):
+        """Get the target task set from parameters['target_tasks'].  This is
+        useful for re-running a decision task with the same target set as in an
+        earlier run, by copying `target_tasks.json` into `parameters.yml`."""
+        return parameters['target_tasks']
+
+    def target_tasks_try(self, full_task_graph, parameters):
+        pass
+
+    ##
+    # Utilities
+
+    def get_taskgraph_generator(self, options, parameters):
+        import taskgraph.generator
+        if options['optimize']:
+            optimization_finder = None  # XXX function that searches index
+        else:
+            # null optmization finder
+            optimization_finder = lambda keys: {}
+        tgg = taskgraph.generator.TaskGraphGenerator(
+            root=options['root'],
+            log=self.log,
+            parameters=parameters,
+            optimization_finder=optimization_finder)
+        if 'task_set' in parameters:
+            tgg.set_task_set(parameters['task_set'])
+        return tgg
+
+    def show_taskgraph(self, taskgraph, options):
+        # TODO: optionally output in dot format, select fields, filter, etc.
+        for label in taskgraph.graph.visit_postorder():
+            print(taskgraph.tasks[label])
+
+    def taskgraph_to_json(self, taskgraph):
+        dep_links = taskgraph.graph.links_dict()
+        tasks = taskgraph.tasks
+        def tojson(task):
+            return {
+                'task': task.task,
+                'attributes': list(task.attributes),
+                'dependencies': list(dep_links[task.label]), # XXX
+            }
+        return {label: tojson(tasks[label]) for label in taskgraph.graph.nodes}
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/generator.py
@@ -0,0 +1,205 @@
+from __future__ import print_function
+import logging
+import os
+import yaml
+import functools
+
+from taskgraph.graph import Graph
+from taskgraph.types import Task, TaskGraph
+
+class TaskGraphGenerator(object):
+    """
+    The central controller for taskgraph.  This handles all phases of graph
+    generation.
+
+    Access to the results of this generation, at various phases, is available
+    via properties.  This encourages the provision of all generation inputs at
+    instance construction time.  The exception is `target_tasks`, which can be
+    set at any time until `target_task_set` is accessed; this allows the target
+    tasks to be determined based on `full_task_graph`.
+    """
+
+    # Task-graph generation is implemented as a Python generator that yields
+    # each "phase" of generation.  This allows some mach subcommands to short-
+    # circuit generation of the entire graph by never completing the generator.
+
+    def __init__(self, root, log, parameters, optimization_finder):
+        self.root = root
+        self.log = log
+        self.parameters = parameters
+        self.optimization_finder = optimization_finder
+
+        # this can be set up until the time the target task set is generated;
+        # it defaults to parameters['target_tasks']
+        self._target_tasks = parameters.get('target_tasks')
+
+        # start the generator
+        self._run = self._run()
+        self._run_results = {}
+
+    @property
+    def full_task_set(self):
+        """
+        The full task set: all tasks defined by any kind (a graph without edges)
+
+        @type: TaskGraph
+        """
+        return self._run_until('full_task_set')
+
+
+    @property
+    def full_task_graph(self):
+        """
+        The full task graph: the full task set, with edges representing
+        dependencies.
+
+        @type: TaskGraph
+        """
+        return self._run_until('full_task_graph')
+
+    def set_target_tasks(self, target_tasks):
+        if 'target_task_set' in self._run_results:
+            raise RuntimeError("target_task_set has already been generated; "
+                               "it's too late to set target_tasks.")
+        self._target_tasks = target_tasks
+
+    @property
+    def target_task_set(self):
+        """
+        The set of targetted tasks (a graph without edges)
+
+        @type: TaskGraph
+        """
+        return self._run_until('target_task_set')
+
+    @property
+    def target_task_graph(self):
+        """
+        The set of targetted tasks and all of their dependencies
+
+        @type: TaskGraph
+        """
+        return self._run_until('target_task_graph')
+
+    @property
+    def optimized_task_graph(self):
+        """
+        The set of targetted tasks and all of their dependencies; tasks that
+        have been optimized out are either omitted or replaced with a Task
+        instance containing only a task_id.
+
+        @type: TaskGraph
+        """
+        return self._run_until('optimized_task_graph')
+
+    def _load_kinds(self):
+        for path in os.listdir(self.root):
+            path = os.path.join(self.root, path)
+            if not os.path.isdir(path):
+                continue
+            name = os.path.basename(path)
+            self.log(logging.DEBUG, 'loading-kind', {
+                'name': name,
+                'path': path,
+            }, "loading kind `{name}` from {path}")
+
+            kind_yml = os.path.join(path, 'kind.yml')
+            config = yaml.load(open(kind_yml))
+
+            # load the class defined by implementation
+            try:
+                impl = config['implementation']
+            except KeyError:
+                raise KeyError("{!r} does not define implementation".format(kind_yml))
+            if impl.count(':') != 1:
+                raise TypeError('{!r} implementation does not have the form "module:object"'
+                                .format(kind_yml))
+
+            impl_module, impl_object = impl.split(':')
+            impl_class = __import__(impl_module)
+            for a in impl_module.split('.')[1:]:
+                impl_class = getattr(impl_class, a)
+            for a in impl_object.split('.'):
+                impl_class = getattr(impl_class, a)
+
+            yield impl_class(path, config, self.log)
+
+    def _run(self):
+        all_tasks = {}
+        for kind in self._load_kinds():
+            for task in kind.load_tasks(self.parameters):
+                if task.label in all_tasks:
+                    raise KeyError("duplicate tasks with label " + task.label)
+                all_tasks[task.label] = task
+
+        full_task_set = TaskGraph(all_tasks, Graph(set(all_tasks), set()))
+        yield 'full_task_set', full_task_set
+
+        edges = set()
+        for t in full_task_set:
+            for dep, depname in t.kind.get_task_dependencies(t, full_task_set):
+                edges.add((t.label, dep, depname))
+
+        full_task_graph = TaskGraph(all_tasks,
+                                    Graph(full_task_set.graph.nodes, edges))
+        yield 'full_task_graph', full_task_graph
+
+        if self._target_tasks is not None:
+            target_tasks = set(self._target_tasks)
+        else:
+            # target everything (this is not the common case)
+            target_tasks = full_task_graph.graph.nodes
+
+        target_task_set = TaskGraph(
+            {l: all_tasks[l] for l in target_tasks},
+            Graph(target_tasks, set()))
+        yield 'target_task_set', target_task_set
+
+        target_graph = full_task_graph.graph.transitive_closure(target_tasks)
+        target_task_graph = TaskGraph(
+            {l: all_tasks[l] for l in target_graph.nodes},
+            target_graph)
+        yield 'target_task_graph', target_task_graph
+
+        # TODO: optimization
+#        def visit(task):
+#            task.optimization_key = task.kind.get_task_optimization_key(task, target_task_graph)
+#        target_task_graph.depth_first(visit)
+#
+#        # look up all tasks in the index at once
+#        optimization_keys = filter(None, (t.optimization_key for t in target_task_graph))
+#        optimizations = self.optimization_finder(optimization_keys)
+#
+#        # assign the taskId for each task that has been successfully optimized away
+#        for task in target_task_graph:
+#            task_id = optimizations.get(task.optimization_key)
+#            if task_id:
+#                task.task_id = task_id
+#
+#        # now replace any dependencies on tasks that have been optimized away with
+#        # a direct dependency on the taskId TODO link this properly, rather than relying
+#        # on KeyError
+#        def replace(label, name):
+#            task_id = target_task_graph[label].task_id
+#            if task_id:
+#                return task_id, name
+#            else:
+#                return label, name
+#        for task in target_task_graph:
+#            task.dependencies = [replace(l, n) for l, n in task.dependencies]
+#
+#        # re-compute the transitive closure of the subset of the graph.  Note
+#        # that this may include optimized tasks if they are in the
+#        # target_task_set.
+#        optimized_task_graph = target_task_graph.transitive_closure(self.target_task_set)
+#
+        yield 'optimized_task_graph', target_task_graph
+
+    def _run_until(self, name):
+        while name not in self._run_results:
+            try:
+                k, v = self._run.next()
+            except StopIteration:
+                raise AttributeError("No such run result {}".format(name))
+            self._run_results[k] = v
+        return self._run_results[name]
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/graph.py
@@ -0,0 +1,105 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import collections
+
+class Graph(object):
+    """
+    Generic representation of a directed acyclic graph with labeled edges.
+    Graph operations are implemented in a functional manner, so the data
+    structure is immutable.
+
+    It permits at most one edge of a given anme between any set of nodes.  The
+    graph is not checked for cycles, and methods may hang or otherwise fail if
+    given a cyclic graph.
+
+    The `nodes` and `edges` attributes may be accessed in a read-only fashion.
+    The `nodes` attribute is a set of node names, while `edges` is a set of
+    `(start, end, label)` tuples.
+    """
+
+    __slots__ = ['nodes', 'edges']
+
+    def __init__(self, nodes, edges):
+        """
+        Create a graph.  Nodes is a set of node names, while edges is a set of
+        (start, end) pairs of node names.  Both values are used by reference,
+        and should not be modified after building a graph.
+        """
+        assert isinstance(nodes, set)
+        assert isinstance(edges, set)
+        self.nodes = nodes
+        self.edges = edges
+
+    def __eq__(self, other):
+        return self.nodes == other.nodes and self.edges == other.edges
+
+    def __repr__(self):
+        return "<Graph nodes={!r} edges={!r}>".format(self.nodes, self.edges)
+
+    def transitive_closure(self, nodes):
+        """
+        Return the transitive closure of <nodes>: the graph containing all
+        specified nodes as well as any nodes reachable from them, and any
+        intervening edges.
+        """
+        assert isinstance(nodes, set)
+        assert nodes <= self.nodes
+
+        # generate a new graph by expanding along edges until reaching a fixed
+        # point
+        new_nodes, new_edges = nodes, set()
+        nodes, edges = set(), set()
+        while (new_nodes, new_edges) != (nodes, edges):
+            nodes, edges = new_nodes, new_edges
+            add_edges = set((l, r, n) for (l, r, n) in self.edges if l in nodes)
+            add_nodes = set(r for (l, r, n) in add_edges)
+            new_nodes = nodes | add_nodes
+            new_edges = edges | add_edges
+        return Graph(new_nodes, new_edges)
+
+    def visit_postorder(self):
+        """
+        Generate a sequence of nodes in postorder, such that every node is
+        visited *after* any nodes it links to.
+
+        Behavior is undefined (read: it will hang) if the graph contains a
+        cycle.
+        """
+        queue = collections.deque(self.nodes)
+        links_by_node = self.links_dict()
+        seen = set()
+        while queue:
+            node = queue.popleft()
+            if node in seen:
+                continue
+            links = links_by_node[node]
+            if all((n in seen) for n in links):
+                seen.add(node)
+                yield node
+            else:
+                for n in links:
+                    queue.append(n)
+                queue.append(node)
+
+    def links_dict(self):
+        """
+        Return a dictionary mapping each node to a set of its downstream
+        nodes (omitting edge names)
+        """
+        links = collections.defaultdict(lambda: set())
+        for l, r, n in self.edges:
+            links[l].add(r)
+        return links
+
+    def reverse_links_dict(self):
+        """
+        Return a dictionary mapping each node to a set of its upstream
+        nodes (omitting edge names)
+        """
+        links = collections.defaultdict(lambda: set())
+        for l, r, n in self.edges:
+            links[r].add(l)
+        return links
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/kind/base.py
@@ -0,0 +1,54 @@
+import os
+import abc
+
+class Kind(object):
+    """
+    A kind represents a collection of tasks that share common characteristics.
+    For example, all build jobs.  Each instance of a kind is intialized with a
+    path from which it draws its task configuration.  The instance is free to
+    store as much local state as it needs.
+    """
+    __metaclass__ = abc.ABCMeta
+
+    def __init__(self, path, config, log):
+        self.name = os.path.basename(path)
+        self.path = path
+        self.config = config
+        self.log = log
+
+    @abc.abstractmethod
+    def load_tasks(self, parameters):
+        """
+        Get the set of tasks of this kind.
+
+        The `parameters` give details on which to base the task generation.
+        See `taskcluster/PARAMETERS.md` for details.
+
+        The return value is a list of Task instances.
+        """
+
+    @abc.abstractmethod
+    def get_task_dependencies(self, task, taskgraph):
+        """
+        Get the set of task labels this task depends on, by querying the task graph.
+
+        Returns a list of (task_label, dependency_name) pairs describing the
+        dependencies.
+        """
+
+    @abc.abstractmethod
+    def get_task_optimization_key(self, task, taskgraph):
+        """
+        Get the *optimization key* for the given task.  When called, all
+        dependencies of this task will already have their `optimization_key`
+        attribute set.
+
+        The optimization key is a unique identifier covering all inputs to this
+        task.  If another task with the same optimization key has already been
+        performed, it will be used directly instead of executing the task
+        again.
+
+        Returns a string suitable for inclusion in a TaskCluster index
+        namespace (generally of the form `<optimizationName>.<hash>`), or None
+        if this task cannot be optimized.
+        """
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/kind/legacy.py
@@ -0,0 +1,371 @@
+import time
+import os
+import sys
+import json
+import copy
+import logging
+from . import base
+from taskgraph.types import Task
+
+from functools import partial
+
+from mozpack.path import match as mozpackmatch
+
+from slugid import nice as slugid
+
+from taskcluster_graph.mach_util import (
+    merge_dicts,
+    gaia_info,
+    configure_dependent_task,
+    set_interactive_task,
+    remove_caches_from_task,
+    query_vcs_info
+)
+import taskcluster_graph.transform.routes as routes_transform
+import taskcluster_graph.transform.treeherder as treeherder_transform
+from taskcluster_graph.commit_parser import parse_commit
+from taskcluster_graph.image_builder import (
+    docker_image,
+    normalize_image_details,
+    task_id_for_image
+)
+from taskcluster_graph.from_now import (
+    json_time_from_now,
+    current_json_time,
+)
+from taskcluster_graph.templates import Templates
+import taskcluster_graph.build_task
+
+ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
+DEFINE_TASK = 'queue:define-task:aws-provisioner-v1/{}'
+DEFAULT_TRY = 'try: -b do -p all -u all'
+DEFAULT_JOB_PATH = os.path.join(
+    'tasks', 'branches', 'base_jobs.yml'
+)
+
+
+class LegacyKind(base.Kind):
+
+    def load_tasks(self, params):
+        root = os.path.abspath(os.path.join(self.path, self.config['legacy_path']))
+
+        project = params['project']
+        message = params.get('message', '') if project == 'try' else DEFAULT_TRY
+
+        templates = Templates(root)
+
+        job_path = os.path.join(root, 'tasks', 'branches', project, 'job_flags.yml')
+        job_path = job_path if os.path.exists(job_path) else \
+            os.path.join(root, DEFAULT_JOB_PATH)
+
+        jobs = templates.load(job_path, {})
+
+        job_graph, trigger_tests = parse_commit(message, jobs)
+
+        cmdline_interactive = params.get('interactive', False)
+
+        # Default to current time if querying the head rev fails
+        pushdate = time.strftime('%Y%m%d%H%M%S', time.gmtime())
+        vcs_info = query_vcs_info(params['head_repository'], params['head_rev'])
+        changed_files = set()
+        if vcs_info:
+            pushdate = time.strftime('%Y%m%d%H%M%S', time.gmtime(vcs_info.pushdate))
+
+            self.log(logging.DEBUG, 'vcs-info', {},
+                     '%d commits influencing task scheduling:\n' % len(vcs_info.changesets))
+            for c in vcs_info.changesets:
+                self.log(logging.DEBUG, 'vcs-relevant-commit', {
+                    'cset': c['node'][0:12],
+                    'desc': c['desc'].splitlines()[0].encode('ascii', 'ignore'),
+                }, "{cset} {desc}")
+                changed_files |= set(c['files'])
+
+        # Template parameters used when expanding the graph
+        seen_images = {}
+        parameters = dict(gaia_info().items() + {
+            'index': 'index',
+            'project': project,
+            'pushlog_id': params.get('pushlog_id', 0),
+            'docker_image': docker_image,
+            'task_id_for_image': partial(task_id_for_image, seen_images, project),
+            'base_repository': params['base_repository'] or
+            params['head_repository'],
+            'head_repository': params['head_repository'],
+            'head_ref': params['head_ref'] or params['head_rev'],
+            'head_rev': params['head_rev'],
+            'pushdate': pushdate,
+            'pushtime': pushdate[8:],
+            'year': pushdate[0:4],
+            'month': pushdate[4:6],
+            'day': pushdate[6:8],
+            'owner': params['owner'],
+            'level': params['level'],
+            'from_now': json_time_from_now,
+            'now': current_json_time(),
+            'revision_hash': params['revision_hash']
+        }.items())
+
+        treeherder_route = '{}.{}'.format(
+            params['project'],
+            params.get('revision_hash', '')
+        )
+
+        routes_file = os.path.join(root, 'routes.json')
+        with open(routes_file) as f:
+            contents = json.load(f)
+            json_routes = contents['routes']
+            # TODO: Nightly and/or l10n routes
+
+        # Task graph we are generating for taskcluster...
+        graph = {
+            'tasks': [],
+            'scopes': set(),
+        }
+
+        if params['revision_hash']:
+            for env in routes_transform.TREEHERDER_ROUTES:
+                route = 'queue:route:{}.{}'.format(
+                    routes_transform.TREEHERDER_ROUTES[env],
+                    treeherder_route)
+                graph['scopes'].add(route)
+
+        graph['metadata'] = {
+            'source': '{repo}file/{rev}/testing/taskcluster/mach_commands.py'.format(repo=params['head_repository'], rev=params['head_rev']),
+            'owner': params['owner'],
+            # TODO: Add full mach commands to this example?
+            'description': 'Task graph generated via ./mach taskcluster-graph',
+            'name': 'task graph local'
+        }
+
+        # Filter the job graph according to conditions met by this invocation run.
+        def should_run(task):
+            # Old style build or test task that doesn't define conditions. Always runs.
+            if 'when' not in task:
+                return True
+
+            when = task['when']
+
+            # If the task defines file patterns and we have a set of changed
+            # files to compare against, only run if a file pattern matches one
+            # of the changed files.
+            file_patterns = when.get('file_patterns', None)
+            if file_patterns and changed_files:
+                for pattern in file_patterns:
+                    for path in changed_files:
+                        if mozpackmatch(path, pattern):
+                            self.log(logging.DEBUG, 'schedule-task', {
+                                'schedule': True,
+                                'task': task['task'],
+                                'pattern': pattern,
+                                'path': path,
+                            }, 'scheduling {task} because pattern {pattern} '
+                                'matches {path}')
+                            return True
+
+                # No file patterns matched. Discard task.
+                self.log(logging.DEBUG, 'schedule-task', {
+                    'schedule': False,
+                    'task': task['task'],
+                }, 'discarding {task} because no relevant files changed')
+                return False
+
+            return True
+
+        job_graph = filter(should_run, job_graph)
+
+        all_routes = {}
+
+        for build in job_graph:
+            self.log(logging.DEBUG, 'load-task', {
+                'task': build['task'],
+            }, 'loading task {task}')
+            interactive = cmdline_interactive or build["interactive"]
+            build_parameters = merge_dicts(parameters, build['additional-parameters'])
+            build_parameters['build_slugid'] = slugid()
+            build_parameters['source'] = '{repo}file/{rev}/testing/taskcluster/{file}'.format(repo=params['head_repository'], rev=params['head_rev'], file=build['task'])
+            build_task = templates.load(build['task'], build_parameters)
+
+            # Copy build_* attributes to expose them to post-build tasks
+            # as well as json routes and tests
+            task_extra = build_task['task']['extra']
+            build_parameters['build_name'] = task_extra['build_name']
+            build_parameters['build_type'] = task_extra['build_type']
+            build_parameters['build_product'] = task_extra['build_product']
+
+            normalize_image_details(graph,
+                                    build_task,
+                                    seen_images,
+                                    build_parameters,
+                                    os.environ.get('TASK_ID', None))
+            set_interactive_task(build_task, interactive)
+
+            # try builds don't use cache
+            if project == "try":
+                remove_caches_from_task(build_task)
+
+            if params['revision_hash']:
+                treeherder_transform.add_treeherder_revision_info(build_task['task'],
+                                                                  params['head_rev'],
+                                                                  params['revision_hash'])
+                routes_transform.decorate_task_treeherder_routes(build_task['task'],
+                                                                 treeherder_route)
+                routes_transform.decorate_task_json_routes(build_task['task'],
+                                                           json_routes,
+                                                           build_parameters)
+
+            # Ensure each build graph is valid after construction.
+            taskcluster_graph.build_task.validate(build_task)
+            graph['tasks'].append(build_task)
+
+            for location in build_task['task']['extra'].get('locations', {}):
+                build_parameters['{}_url'.format(location)] = ARTIFACT_URL.format(
+                    build_parameters['build_slugid'],
+                    build_task['task']['extra']['locations'][location]
+                )
+
+            for url in build_task['task']['extra'].get('url', {}):
+                build_parameters['{}_url'.format(url)] = \
+                    build_task['task']['extra']['url'][url]
+
+            define_task = DEFINE_TASK.format(build_task['task']['workerType'])
+
+            for route in build_task['task'].get('routes', []):
+                if route.startswith('index.gecko.v2') and route in all_routes:
+                    raise Exception("Error: route '%s' is in use by multiple tasks: '%s' and '%s'" % (
+                        route,
+                        build_task['task']['metadata']['name'],
+                        all_routes[route],
+                    ))
+                all_routes[route] = build_task['task']['metadata']['name']
+
+            graph['scopes'].add(define_task)
+            graph['scopes'] |= set(build_task['task'].get('scopes', []))
+            route_scopes = map(lambda route: 'queue:route:' + route, build_task['task'].get('routes', []))
+            graph['scopes'] |= set(route_scopes)
+
+            # Treeherder symbol configuration for the graph required for each
+            # build so tests know which platform they belong to.
+            build_treeherder_config = build_task['task']['extra']['treeherder']
+
+            if 'machine' not in build_treeherder_config:
+                message = '({}), extra.treeherder.machine required for all builds'
+                raise ValueError(message.format(build['task']))
+
+            if 'build' not in build_treeherder_config:
+                build_treeherder_config['build'] = \
+                    build_treeherder_config['machine']
+
+            if 'collection' not in build_treeherder_config:
+                build_treeherder_config['collection'] = {'opt': True}
+
+            if len(build_treeherder_config['collection'].keys()) != 1:
+                message = '({}), extra.treeherder.collection must contain one type'
+                raise ValueError(message.fomrat(build['task']))
+
+            for post_build in build['post-build']:
+                # copy over the old parameters to update the template
+                # TODO additional-parameters is currently not an option, only
+                # enabled for build tasks
+                post_parameters = merge_dicts(build_parameters,
+                                              post_build.get('additional-parameters', {}))
+                post_task = configure_dependent_task(post_build['task'],
+                                                     post_parameters,
+                                                     slugid(),
+                                                     templates,
+                                                     build_treeherder_config)
+                normalize_image_details(graph,
+                                        post_task,
+                                        seen_images,
+                                        build_parameters,
+                                        os.environ.get('TASK_ID', None))
+                set_interactive_task(post_task, interactive)
+                treeherder_transform.add_treeherder_revision_info(post_task['task'],
+                                                                  params['head_rev'],
+                                                                  params['revision_hash'])
+                graph['tasks'].append(post_task)
+
+            for test in build['dependents']:
+                test = test['allowed_build_tasks'][build['task']]
+                # TODO additional-parameters is currently not an option, only
+                # enabled for build tasks
+                test_parameters = merge_dicts(build_parameters,
+                                              test.get('additional-parameters', {}))
+                test_parameters = copy.copy(build_parameters)
+
+                test_definition = templates.load(test['task'], {})['task']
+                chunk_config = test_definition['extra'].get('chunks', {})
+
+                # Allow branch configs to override task level chunking...
+                if 'chunks' in test:
+                    chunk_config['total'] = test['chunks']
+
+                chunked = 'total' in chunk_config
+                if chunked:
+                    test_parameters['total_chunks'] = chunk_config['total']
+
+                if 'suite' in test_definition['extra']:
+                    suite_config = test_definition['extra']['suite']
+                    test_parameters['suite'] = suite_config['name']
+                    test_parameters['flavor'] = suite_config.get('flavor', '')
+
+                for chunk in range(1, chunk_config.get('total', 1) + 1):
+                    if 'only_chunks' in test and chunked and \
+                            chunk not in test['only_chunks']:
+                        continue
+
+                    if chunked:
+                        test_parameters['chunk'] = chunk
+                    test_task = configure_dependent_task(test['task'],
+                                                         test_parameters,
+                                                         slugid(),
+                                                         templates,
+                                                         build_treeherder_config)
+                    normalize_image_details(graph,
+                                            test_task,
+                                            seen_images,
+                                            build_parameters,
+                                            os.environ.get('TASK_ID', None))
+                    set_interactive_task(test_task, interactive)
+
+                    if params['revision_hash']:
+                        treeherder_transform.add_treeherder_revision_info(test_task['task'],
+                                                                          params['head_rev'],
+                                                                          params['revision_hash'])
+                        routes_transform.decorate_task_treeherder_routes(
+                            test_task['task'],
+                            treeherder_route
+                        )
+
+                    # This will schedule test jobs N times
+                    for i in range(0, trigger_tests):
+                        graph['tasks'].append(test_task)
+                        # If we're scheduling more tasks each have to be unique
+                        test_task = copy.deepcopy(test_task)
+                        test_task['taskId'] = slugid()
+
+                    define_task = DEFINE_TASK.format(
+                        test_task['task']['workerType']
+                    )
+
+                    graph['scopes'].add(define_task)
+                    graph['scopes'] |= set(test_task['task'].get('scopes', []))
+
+        graph['scopes'] = sorted(graph['scopes'])
+
+        # save the graph for later, when taskgraph asks for additional information
+        # such as dependencies
+        self.graph = graph
+        self.tasks_by_label = {t['taskId']: t for t in self.graph['tasks']}
+
+        # Convert to a dictionary of tasks.  The process above has invented a
+        # taskId for each task, and we use those as the *labels* for the tasks;
+        # taskgraph will later assign them new taskIds.
+        return [Task(self, t['taskId'], task=t['task']) for t in self.graph['tasks']]
+
+    def get_task_dependencies(self, task, taskgraph):
+        # fetch dependency information from the cached graph
+        taskdict = self.tasks_by_label[task.label]
+        return [(label, 'legacy') for label in taskdict.get('requires', [])]
+
+    def get_task_optimization_key(self, task, taskgraph):
+        pass
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/test/test_generator.py
@@ -0,0 +1,95 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from taskgraph.generator import TaskGraphGenerator
+from taskgraph import types
+from taskgraph import graph
+
+from mozunit import main
+
+
+class FakeKind(object):
+
+    def maketask(self, i):
+        return types.Task(
+            self,
+            label='t-{}'.format(i),
+            attributes={'tasknum': str(i)},
+            task={},
+            i=i)
+
+    def load_tasks(self, parameters):
+        self.tasks = [self.maketask(i) for i in range(3)]
+        return self.tasks
+
+    def get_task_dependencies(self, task, full_task_set):
+        i = task.extra['i']
+        if i > 0:
+            return [('t-{}'.format(i-1), 'prev')]
+        else:
+            return []
+
+
+class WithFakeKind(TaskGraphGenerator):
+
+    def _load_kinds(self):
+        yield FakeKind()
+
+
+class TestGraph(unittest.TestCase):
+
+    def setUp(self):
+        def log(level, name, data, message):
+            pass
+        self.tgg = WithFakeKind('/root', log, {}, None)
+
+    def test_full_task_set(self):
+        "The full_task_set property has all tasks"
+        self.assertEqual(self.tgg.full_task_set.graph,
+                         graph.Graph(set(['t-0', 't-1', 't-2']), set()))
+        self.assertEqual(self.tgg.full_task_set.tasks.keys(),
+                         ['t-0', 't-1', 't-2'])
+
+    def test_full_task_graph(self):
+        "The full_task_graph property has all tasks, and links"
+        self.assertEqual(self.tgg.full_task_graph.graph,
+                         graph.Graph(set(['t-0', 't-1', 't-2']),
+                                     set([
+                                         ('t-1', 't-0', 'prev'),
+                                         ('t-2', 't-1', 'prev'),
+                                     ])))
+        self.assertEqual(self.tgg.full_task_graph.tasks.keys(),
+                         ['t-0', 't-1', 't-2'])
+
+    def test_target_task_set(self):
+        "The target_task_set property has the targeted tasks"
+        self.tgg.set_target_tasks(['t-1'])
+        self.assertEqual(self.tgg.target_task_set.graph,
+                         graph.Graph(set(['t-1']), set()))
+        self.assertEqual(self.tgg.target_task_set.tasks.keys(),
+                         ['t-1'])
+
+    def test_target_task_graph(self):
+        "The target_task_graph property has the targeted tasks and deps"
+        self.tgg.set_target_tasks(['t-1'])
+        self.assertEqual(self.tgg.target_task_graph.graph,
+                         graph.Graph(set(['t-0', 't-1']),
+                                     set([
+                                         ('t-1', 't-0', 'prev'),
+                                     ])))
+        self.assertEqual(self.tgg.target_task_graph.tasks.keys(),
+                         ['t-0', 't-1'])
+
+    def test_optimized_task_graph(self):
+        "The optimized task graph is the target task graph (for now)"
+        self.tgg.set_target_tasks(['t-1'])
+        self.assertEqual(self.tgg.optimized_task_graph.graph,
+                         self.tgg.target_task_graph.graph)
+
+if __name__ == '__main__':
+    main()
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/test/test_graph.py
@@ -0,0 +1,120 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+from __future__ import unicode_literals
+
+import sys
+import unittest
+
+from taskgraph.graph import Graph
+
+from mozunit import main
+
+class TestGraph(unittest.TestCase):
+
+    tree = Graph(set('abcdefg'), set([
+        ('a', 'b', 'L'),
+        ('a', 'c', 'L'),
+        ('b', 'd', 'K'),
+        ('b', 'e', 'K'),
+        ('c', 'f', 'N'),
+        ('c', 'g', 'N'),
+    ]))
+
+    linear = Graph(set('1234'), set([
+        ('1', '2', 'L'),
+        ('2', '3', 'L'),
+        ('3', '4', 'L'),
+    ]))
+
+    diamonds = Graph(set('ABCDEFGHIJ'), set(tuple(x) for x in 
+        'AFL ADL BDL BEL CEL CHL DFL DGL EGL EHL FIL GIL GJL HJL'.split()
+    ))
+
+    multi_edges = Graph(set('1234'), set([
+        ('2', '1', 'red'),
+        ('2', '1', 'blue'),
+        ('3', '1', 'red'),
+        ('3', '2', 'blue'),
+        ('3', '2', 'green'),
+        ('4', '3', 'green'),
+    ]))
+
+    def test_transitive_closure_empty(self):
+        "transitive closure of an empty set is an empty graph"
+        g = Graph(set('abc'), set([('a', 'b', 'L'), ('a', 'c', 'L')]))
+        self.assertEqual(g.transitive_closure(set()), Graph(set(), set()))
+
+    def test_transitive_closure_disjoint(self):
+        "transitive closure of a disjoint set is a subset"
+        g = Graph(set('abc'), set())
+        self.assertEqual(g.transitive_closure(set('ac')), Graph(set('ac'), set()))
+
+    def test_transitive_closure_trees(self):
+        "transitive closure of a tree, at two non-root nodes, is the two subtrees"
+        self.assertEqual(self.tree.transitive_closure(set('bc')),
+            Graph(set('bcdefg'), set([
+                ('b', 'd', 'K'),
+                ('b', 'e', 'K'),
+                ('c', 'f', 'N'),
+                ('c', 'g', 'N'),
+            ])))
+
+    def test_transitive_closure_multi_edges(self):
+        "transitive closure of a tree with multiple edges between nodes keeps those edges"
+        self.assertEqual(self.multi_edges.transitive_closure(set('3')),
+            Graph(set('123'), set([
+                ('2', '1', 'red'),
+                ('2', '1', 'blue'),
+                ('3', '1', 'red'),
+                ('3', '2', 'blue'),
+                ('3', '2', 'green'),
+            ])))
+
+    def test_transitive_closure_linear(self):
+        "transitive closure of a linear graph includes all nodes in the line"
+        self.assertEqual(self.linear.transitive_closure(set('1')), self.linear)
+
+    def test_visit_postorder_empty(self):
+        "postorder visit of an empty graph is empty"
+        self.assertEqual(list(Graph(set(), set()).visit_postorder()), [])
+
+    def assert_postorder(self, seq, all_nodes):
+        seen = set()
+        for e in seq:
+            for l, r, n in self.tree.edges:
+                if l == e:
+                    self.failUnless(r in seen)
+            seen.add(e)
+        self.assertEqual(seen, all_nodes)
+
+    def test_visit_postorder_tree(self):
+        "postorder visit of a tree satisfies invariant"
+        self.assert_postorder(self.tree.visit_postorder(), self.tree.nodes)
+
+    def test_visit_postorder_diamonds(self):
+        "postorder visit of a graph full of diamonds satisfies invariant"
+        self.assert_postorder(self.diamonds.visit_postorder(), self.diamonds.nodes)
+
+    def test_visit_postorder_multi_edges(self):
+        "postorder visit of a graph with duplicate edges satisfies invariant"
+        self.assert_postorder(self.multi_edges.visit_postorder(), self.multi_edges.nodes)
+
+    def test_links_dict(self):
+        "link dict for a graph with multiple edges is correct"
+        self.assertEqual(self.multi_edges.links_dict(), {
+            '2': set('1'),
+            '3': set('12'),
+            '4': set('3'),
+        })
+
+    def test_reverse_links_dict(self):
+        "reverse link dict for a graph with multiple edges is correct"
+        self.assertEqual(self.multi_edges.reverse_links_dict(), {
+            '1': set('23'),
+            '2': set('3'),
+            '3': set('4'),
+        })
+
+if __name__ == '__main__':
+    main()
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/types.py
@@ -0,0 +1,64 @@
+import pprint
+import collections
+
+class Task(object):
+    """
+    Representation of a task in a TaskGraph.
+
+    Each has, at creation:
+
+    - kind: Kind instance that created this task
+    - label; the label for this task
+    - attributes: a set of attributes for this task (used for filtering)
+    - task: the task definition (JSON-able dictionary)
+    - extra: extra kind-specific metadata
+
+    And later, as the task-graph processing proceeds:
+
+    - optimization_key -- key for finding equivalent tasks in the TC index
+    - task_id -- TC taskId under which this task will be created
+    """
+
+    def __init__(self, kind, label, attributes=set(), task={}, **extra):
+        self.kind = kind
+        self.label = label
+        self.attributes = attributes
+        self.task = task
+        self.extra = extra
+
+        self.optimization_key = None
+        self.task_id = None
+
+    def __str__(self):
+        return "{} ({})".format(self.task_id or self.label,
+                                self.task['metadata']['description'].strip())
+
+
+class TaskGraph(object):
+    """
+    Representation of a task graph.
+
+    A task graph is a combination of a Graph and a dictionary of tasks indexed
+    by label.  TaskGraph instances should be treated as immutable.
+    """
+
+    __slots__ = ['tasks', 'graph']
+
+    def __init__(self, tasks, graph):
+        assert set(tasks) == graph.nodes
+        self.tasks = tasks
+        self.graph = graph
+
+    def __getitem__(self, label):
+        "Get a task by label"
+        return self.tasks[label]
+
+    def __iter__(self):
+        "Iterate over tasks in undefined order"
+        return self.tasks.itervalues()
+
+    def __repr__(self):
+        return "<TaskGraph graph={!r} tasks={!r}>".format(self.graph, self.tasks)
+
+    def __eq__(self, other):
+        return self.tasks == other.tasks and self.graph == other.graph
--- a/testing/taskcluster/mach_commands.py
+++ b/testing/taskcluster/mach_commands.py
@@ -30,160 +30,16 @@ ARTIFACT_URL = 'https://queue.taskcluste
 
 DEFINE_TASK = 'queue:define-task:aws-provisioner-v1/{}'
 
 DEFAULT_TRY = 'try: -b do -p all -u all'
 DEFAULT_JOB_PATH = os.path.join(
     ROOT, 'tasks', 'branches', 'base_jobs.yml'
 )
 
-def merge_dicts(*dicts):
-    merged_dict = {}
-    for dictionary in dicts:
-        merged_dict.update(dictionary)
-    return merged_dict
-
-def gaia_info():
-    '''
-    Fetch details from in tree gaia.json (which links this version of
-    gecko->gaia) and construct the usual base/head/ref/rev pairing...
-    '''
-    gaia = json.load(open(os.path.join(GECKO, 'b2g', 'config', 'gaia.json')))
-
-    if gaia['git'] is None or \
-       gaia['git']['remote'] == '' or \
-       gaia['git']['git_revision'] == '' or \
-       gaia['git']['branch'] == '':
-
-       # Just use the hg params...
-       return {
-         'gaia_base_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
-         'gaia_head_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
-         'gaia_ref': gaia['revision'],
-         'gaia_rev': gaia['revision']
-       }
-
-    else:
-        # Use git
-        return {
-            'gaia_base_repository': gaia['git']['remote'],
-            'gaia_head_repository': gaia['git']['remote'],
-            'gaia_rev': gaia['git']['git_revision'],
-            'gaia_ref': gaia['git']['branch'],
-        }
-
-def configure_dependent_task(task_path, parameters, taskid, templates, build_treeherder_config):
-    """
-    Configure a build dependent task. This is shared between post-build and test tasks.
-
-    :param task_path: location to the task yaml
-    :param parameters: parameters to load the template
-    :param taskid: taskid of the dependent task
-    :param templates: reference to the template builder
-    :param build_treeherder_config: parent treeherder config
-    :return: the configured task
-    """
-    task = templates.load(task_path, parameters)
-    task['taskId'] = taskid
-
-    if 'requires' not in task:
-        task['requires'] = []
-
-    task['requires'].append(parameters['build_slugid'])
-
-    if 'treeherder' not in task['task']['extra']:
-        task['task']['extra']['treeherder'] = {}
-
-    # Copy over any treeherder configuration from the build so
-    # tests show up under the same platform...
-    treeherder_config = task['task']['extra']['treeherder']
-
-    treeherder_config['collection'] = \
-        build_treeherder_config.get('collection', {})
-
-    treeherder_config['build'] = \
-        build_treeherder_config.get('build', {})
-
-    treeherder_config['machine'] = \
-        build_treeherder_config.get('machine', {})
-
-    if 'routes' not in task['task']:
-        task['task']['routes'] = []
-
-    if 'scopes' not in task['task']:
-        task['task']['scopes'] = []
-
-    return task
-
-def set_interactive_task(task, interactive):
-    r"""Make the task interactive.
-
-    :param task: task definition.
-    :param interactive: True if the task should be interactive.
-    """
-    if not interactive:
-        return
-
-    payload = task["task"]["payload"]
-    if "features" not in payload:
-        payload["features"] = {}
-    payload["features"]["interactive"] = True
-
-def remove_caches_from_task(task):
-    r"""Remove all caches but tc-vcs from the task.
-
-    :param task: task definition.
-    """
-    whitelist = [
-        re.compile("^level-[123]-.*-tc-vcs(-public-sources)?$"),
-        re.compile("^tooltool-cache$"),
-    ]
-    try:
-        caches = task["task"]["payload"]["cache"]
-        for cache in caches.keys():
-            if not any(pat.match(cache) for pat in whitelist):
-                caches.pop(cache)
-    except KeyError:
-        pass
-
-def query_vcs_info(repository, revision):
-    """Query the pushdate and pushid of a repository/revision.
-    This is intended to be used on hg.mozilla.org/mozilla-central and
-    similar. It may or may not work for other hg repositories.
-    """
-    if not repository or not revision:
-        sys.stderr.write('cannot query vcs info because vcs info not provided\n')
-        return None
-
-    VCSInfo = namedtuple('VCSInfo', ['pushid', 'pushdate', 'changesets'])
-
-    try:
-        import requests
-        url = '%s/json-automationrelevance/%s' % (repository.rstrip('/'),
-                                                  revision)
-        sys.stderr.write("Querying version control for metadata: %s\n" % url)
-        contents = requests.get(url).json()
-
-        changesets = []
-        for c in contents['changesets']:
-            changesets.append({k: c[k] for k in ('desc', 'files', 'node')})
-
-        pushid = contents['changesets'][-1]['pushid']
-        pushdate = contents['changesets'][-1]['pushdate'][0]
-
-        return VCSInfo(pushid, pushdate, changesets)
-
-    except Exception:
-        sys.stderr.write(
-            "Error querying VCS info for '%s' revision '%s'\n" % (
-                repository, revision,
-            )
-        )
-        return None
-
 @CommandProvider
 class DecisionTask(object):
     @Command('taskcluster-decision', category="ci",
         description="Build a decision task")
     @CommandArgument('--project',
         required=True,
         help='Treeherder project name')
     @CommandArgument('--url',
@@ -197,16 +53,17 @@ class DecisionTask(object):
     @CommandArgument('--comment',
         required=True,
         help='Commit message for this revision')
     @CommandArgument('--owner',
         required=True,
         help='email address of who owns this graph')
     @CommandArgument('task', help="Path to decision task to run.")
     def run_task(self, **params):
+        from taskcluster_graph.mach_util import gaia_info
         from taskcluster_graph.slugidjar import SlugidJar
         from taskcluster_graph.from_now import (
             json_time_from_now,
             current_json_time,
         )
         from taskcluster_graph.templates import Templates
 
         templates = Templates(ROOT)
@@ -313,16 +170,24 @@ class Graph(object):
         help='Run tasks even if their conditions are not met')
     def create_graph(self, **params):
         from functools import partial
 
         from mozpack.path import match as mozpackmatch
 
         from slugid import nice as slugid
 
+        from taskcluster_graph.mach_util import (
+            merge_dicts,
+            gaia_info,
+            configure_dependent_task,
+            set_interactive_task,
+            remove_caches_from_task,
+            query_vcs_info
+        )
         import taskcluster_graph.transform.routes as routes_transform
         import taskcluster_graph.transform.treeherder as treeherder_transform
         from taskcluster_graph.commit_parser import parse_commit
         from taskcluster_graph.image_builder import (
             docker_image,
             normalize_image_details,
             task_id_for_image
         )
@@ -689,16 +554,21 @@ class CIBuild(object):
         help='path to build task definition')
     @CommandArgument('--interactive',
         required=False,
         default=False,
         action="store_true",
         dest="interactive",
         help="Run the task with the interactive feature enabled")
     def create_ci_build(self, **params):
+        from taskcluster_graph.mach_util import (
+            gaia_info,
+            set_interactive_task,
+            query_vcs_info
+        )
         from taskcluster_graph.templates import Templates
         from taskcluster_graph.image_builder import docker_image
         import taskcluster_graph.build_task
 
         templates = Templates(ROOT)
         # TODO handle git repos
         head_repository = params['head_repository']
         if not head_repository:
new file mode 100644
--- /dev/null
+++ b/testing/taskcluster/taskcluster_graph/mach_util.py
@@ -0,0 +1,159 @@
+from __future__ import absolute_import
+
+from collections import defaultdict
+import os
+import json
+import copy
+import re
+import sys
+import time
+from collections import namedtuple
+
+ROOT = os.path.dirname(os.path.realpath(__file__))
+GECKO = os.path.realpath(os.path.join(ROOT, '..', '..', '..'))
+
+def merge_dicts(*dicts):
+    merged_dict = {}
+    for dictionary in dicts:
+        merged_dict.update(dictionary)
+    return merged_dict
+
+def gaia_info():
+    '''
+    Fetch details from in tree gaia.json (which links this version of
+    gecko->gaia) and construct the usual base/head/ref/rev pairing...
+    '''
+    gaia = json.load(open(os.path.join(GECKO, 'b2g', 'config', 'gaia.json')))
+
+    if gaia['git'] is None or \
+       gaia['git']['remote'] == '' or \
+       gaia['git']['git_revision'] == '' or \
+       gaia['git']['branch'] == '':
+
+       # Just use the hg params...
+       return {
+         'gaia_base_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
+         'gaia_head_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
+         'gaia_ref': gaia['revision'],
+         'gaia_rev': gaia['revision']
+       }
+
+    else:
+        # Use git
+        return {
+            'gaia_base_repository': gaia['git']['remote'],
+            'gaia_head_repository': gaia['git']['remote'],
+            'gaia_rev': gaia['git']['git_revision'],
+            'gaia_ref': gaia['git']['branch'],
+        }
+
+def configure_dependent_task(task_path, parameters, taskid, templates, build_treeherder_config):
+    """
+    Configure a build dependent task. This is shared between post-build and test tasks.
+
+    :param task_path: location to the task yaml
+    :param parameters: parameters to load the template
+    :param taskid: taskid of the dependent task
+    :param templates: reference to the template builder
+    :param build_treeherder_config: parent treeherder config
+    :return: the configured task
+    """
+    task = templates.load(task_path, parameters)
+    task['taskId'] = taskid
+
+    if 'requires' not in task:
+        task['requires'] = []
+
+    task['requires'].append(parameters['build_slugid'])
+
+    if 'treeherder' not in task['task']['extra']:
+        task['task']['extra']['treeherder'] = {}
+
+    # Copy over any treeherder configuration from the build so
+    # tests show up under the same platform...
+    treeherder_config = task['task']['extra']['treeherder']
+
+    treeherder_config['collection'] = \
+        build_treeherder_config.get('collection', {})
+
+    treeherder_config['build'] = \
+        build_treeherder_config.get('build', {})
+
+    treeherder_config['machine'] = \
+        build_treeherder_config.get('machine', {})
+
+    if 'routes' not in task['task']:
+        task['task']['routes'] = []
+
+    if 'scopes' not in task['task']:
+        task['task']['scopes'] = []
+
+    return task
+
+def set_interactive_task(task, interactive):
+    r"""Make the task interactive.
+
+    :param task: task definition.
+    :param interactive: True if the task should be interactive.
+    """
+    if not interactive:
+        return
+
+    payload = task["task"]["payload"]
+    if "features" not in payload:
+        payload["features"] = {}
+    payload["features"]["interactive"] = True
+
+def remove_caches_from_task(task):
+    r"""Remove all caches but tc-vcs from the task.
+
+    :param task: task definition.
+    """
+    whitelist = [
+        re.compile("^level-[123]-.*-tc-vcs(-public-sources)?$"),
+        re.compile("^tooltool-cache$"),
+    ]
+    try:
+        caches = task["task"]["payload"]["cache"]
+        for cache in caches.keys():
+            if not any(pat.match(cache) for pat in whitelist):
+                caches.pop(cache)
+    except KeyError:
+        pass
+
+def query_vcs_info(repository, revision):
+    """Query the pushdate and pushid of a repository/revision.
+    This is intended to be used on hg.mozilla.org/mozilla-central and
+    similar. It may or may not work for other hg repositories.
+    """
+    if not repository or not revision:
+        sys.stderr.write('cannot query vcs info because vcs info not provided\n')
+        return None
+
+    VCSInfo = namedtuple('VCSInfo', ['pushid', 'pushdate', 'changesets'])
+
+    try:
+        import requests
+        url = '%s/json-automationrelevance/%s' % (repository.rstrip('/'),
+                                                  revision)
+        sys.stderr.write("Querying version control for metadata: %s\n" % url)
+        contents = requests.get(url).json()
+
+        changesets = []
+        for c in contents['changesets']:
+            changesets.append({k: c[k] for k in ('desc', 'files', 'node')})
+
+        pushid = contents['changesets'][-1]['pushid']
+        pushdate = contents['changesets'][-1]['pushdate'][0]
+
+        return VCSInfo(pushid, pushdate, changesets)
+
+    except Exception:
+        sys.stderr.write(
+            "Error querying VCS info for '%s' revision '%s'\n" % (
+                repository, revision,
+            )
+        )
+        return None
+
+