taskcluster/taskgraph/task/legacy.py
author Dustin J. Mitchell <dustin@mozilla.com>
Wed, 13 Jul 2016 18:50:50 +0000
changeset 308768 2a271d53d9784d68e05349f719ef5dc093c523f0
parent 308600 fc6ed18f76e16d3e2a392c93fd77b867e12955e4
child 313674 ac2d06495f67d1ceb52a6b0f071d1f99cd05dd68
permissions -rw-r--r--
Bug 1293733: accept pushdate from command line; r=garndt MozReview-Commit-ID: BrGiowlMVCa

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import absolute_import, print_function, unicode_literals

import json
import logging
import os
import re
import time
from collections import namedtuple

from . import base
from mozpack.path import match as mozpackmatch
from slugid import nice as slugid
from taskgraph.util.legacy_commit_parser import parse_commit
from taskgraph.util.time import (
    json_time_from_now,
    current_json_time,
)
from taskgraph.util.templates import Templates
from taskgraph.util.docker import docker_image


ROOT = os.path.dirname(os.path.realpath(__file__))
GECKO = os.path.realpath(os.path.join(ROOT, '..', '..', '..'))
# TASKID_PLACEHOLDER is the "internal" form of a taskid; it is substituted with
# actual taskIds at the very last minute, in get_task_definition
TASKID_PLACEHOLDER = 'TaskLabel=={}'

DEFINE_TASK = 'queue:define-task:aws-provisioner-v1/{}'
DEFAULT_TRY = 'try: -b do -p all -u all -t all'
DEFAULT_JOB_PATH = os.path.join(
    'tasks', 'branches', 'base_jobs.yml'
)

TREEHERDER_ROUTES = {
    'staging': 'tc-treeherder-stage',
    'production': 'tc-treeherder'
}

# time after which a try build's results will expire
TRY_EXPIRATION = "14 days"

logger = logging.getLogger(__name__)


def mklabel():
    return TASKID_PLACEHOLDER.format(slugid())


def merge_dicts(*dicts):
    merged_dict = {}
    for dictionary in dicts:
        merged_dict.update(dictionary)
    return merged_dict


def gaia_info():
    '''Fetch details from in tree gaia.json (which links this version of
    gecko->gaia) and construct the usual base/head/ref/rev pairing...'''
    gaia = json.load(open(os.path.join(GECKO, 'b2g', 'config', 'gaia.json')))

    if gaia['git'] is None or \
       gaia['git']['remote'] == '' or \
       gaia['git']['git_revision'] == '' or \
       gaia['git']['branch'] == '':

        # Just use the hg params...
        return {
            'gaia_base_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
            'gaia_head_repository': 'https://hg.mozilla.org/{}'.format(gaia['repo_path']),
            'gaia_ref': gaia['revision'],
            'gaia_rev': gaia['revision']
        }

    else:
        # Use git
        return {
            'gaia_base_repository': gaia['git']['remote'],
            'gaia_head_repository': gaia['git']['remote'],
            'gaia_rev': gaia['git']['git_revision'],
            'gaia_ref': gaia['git']['branch'],
        }


def configure_dependent_task(task_path, parameters, taskid, templates, build_treeherder_config):
    """Configure a build dependent task. This is shared between post-build and test tasks.

    :param task_path: location to the task yaml
    :param parameters: parameters to load the template
    :param taskid: taskid of the dependent task
    :param templates: reference to the template builder
    :param build_treeherder_config: parent treeherder config
    :return: the configured task
    """
    task = templates.load(task_path, parameters)
    task['taskId'] = taskid

    if 'requires' not in task:
        task['requires'] = []

    task['requires'].append(parameters['build_slugid'])

    if 'treeherder' not in task['task']['extra']:
        task['task']['extra']['treeherder'] = {}

    # Copy over any treeherder configuration from the build so
    # tests show up under the same platform...
    treeherder_config = task['task']['extra']['treeherder']

    treeherder_config['collection'] = \
        build_treeherder_config.get('collection', {})

    treeherder_config['build'] = \
        build_treeherder_config.get('build', {})

    if 'machine' not in treeherder_config:
        treeherder_config['machine'] = \
            build_treeherder_config.get('machine', {})

    if 'routes' not in task['task']:
        task['task']['routes'] = []

    if 'scopes' not in task['task']:
        task['task']['scopes'] = []

    return task


def set_interactive_task(task, interactive):
    r"""Make the task interactive.

    :param task: task definition.
    :param interactive: True if the task should be interactive.
    """
    if not interactive:
        return

    payload = task["task"]["payload"]
    if "features" not in payload:
        payload["features"] = {}
    payload["features"]["interactive"] = True


def remove_caches_from_task(task):
    r"""Remove all caches but vcs from the task.

    :param task: task definition.
    """
    whitelist = [
        re.compile("^level-[123]-.*-tc-vcs(-public-sources)?$"),
        re.compile("^level-[123]-hg-shared$"),
        # The assumption here is that `hg robustcheckout --purge` is used and
        # the checkout will start from a clean slate on job execution. This
        # means there should be no contamination from previous tasks.
        re.compile("^level-[123]-checkouts$"),
        re.compile("^tooltool-cache$"),
    ]
    try:
        caches = task["task"]["payload"]["cache"]
        scopes = task["task"]["scopes"]
        for cache in caches.keys():
            if not any(pat.match(cache) for pat in whitelist):
                caches.pop(cache)
                scope = 'docker-worker:cache:' + cache
                try:
                    scopes.remove(scope)
                except ValueError:
                    raise ValueError("scope '{}' not in {}".format(scope, scopes))
    except KeyError:
        pass


def remove_coalescing_from_task(task):
    r"""Remove coalescing route and supersederUrl from job task

    :param task: task definition.
    """
    patterns = [
        re.compile("^coalesce.v1.builds.*pgo$"),
    ]

    try:
        payload = task["task"]["payload"]
        routes = task["task"]["routes"]
        removable_routes = [route for route in list(routes)
                            if any([p.match(route) for p in patterns])]
        if removable_routes:
            # we remove supersederUrl only when we have also routes to remove
            payload.pop("supersederUrl")

        for route in removable_routes:
            routes.remove(route)
    except KeyError:
        pass


def query_vcs_info(repository, revision):
    """Query the pushdate and pushid of a repository/revision.

    This is intended to be used on hg.mozilla.org/mozilla-central and
    similar. It may or may not work for other hg repositories.
    """
    if not repository or not revision:
        logger.warning('cannot query vcs info because vcs info not provided')
        return None

    VCSInfo = namedtuple('VCSInfo', ['pushid', 'pushdate', 'changesets'])

    try:
        import requests
        url = '%s/json-automationrelevance/%s' % (repository.rstrip('/'),
                                                  revision)
        logger.debug("Querying version control for metadata: %s", url)
        contents = requests.get(url).json()

        changesets = []
        for c in contents['changesets']:
            changesets.append({k: c[k] for k in ('desc', 'files', 'node')})

        pushid = contents['changesets'][-1]['pushid']
        pushdate = contents['changesets'][-1]['pushdate'][0]

        return VCSInfo(pushid, pushdate, changesets)

    except Exception:
        logger.exception("Error querying VCS info for '%s' revision '%s'",
                         repository, revision)
        return None


def set_expiration(task, relative_datestamp):
    task_def = task['task']
    task_def['expires'] = {'relative-datestamp': relative_datestamp}
    if 'deadline' in task_def:
        now = current_json_time(datetime_format=True)
        timestamp = json_time_from_now(input_str=TRY_EXPIRATION,
                                       now=now,
                                       datetime_format=True)
        deadline = json_time_from_now(input_str=task_def['deadline']['relative-datestamp'],
                                      now=now,
                                      datetime_format=True)
        if deadline > timestamp:
            task_def['deadline']['relative-datestamp'] = relative_datestamp

    try:
        artifacts = task_def['payload']['artifacts']
    except KeyError:
        return

    # for docker-worker, artifacts is a dictionary
    # for generic-worker, artifacts is a list
    # for taskcluster-worker, it will depend on what we do in artifacts plugin
    for artifact in artifacts.values() if hasattr(artifacts, "values") else artifacts:
        artifact['expires']['relative-datestamp'] = relative_datestamp


def format_treeherder_route(destination, project, revision, pushlog_id):
    return "{}.v2.{}.{}.{}".format(destination,
                                   project,
                                   revision,
                                   pushlog_id)


def decorate_task_treeherder_routes(task, project, revision, pushlog_id):
    """Decorate the given task with treeherder routes.

    Uses task.extra.treeherderEnv if available otherwise defaults to only
    staging.

    :param dict task: task definition.
    :param str project: The project the tasks are running for.
    :param str revision: The revision for the push
    :param str pushlog_id: The ID of the push
    """

    if 'extra' not in task:
        return

    if 'routes' not in task:
        task['routes'] = []

    treeheder_env = task['extra'].get('treeherderEnv', ['staging'])

    for env in treeheder_env:
        route = format_treeherder_route(TREEHERDER_ROUTES[env],
                                        project,
                                        revision,
                                        pushlog_id)
        task['routes'].append(route)


def decorate_task_json_routes(task, json_routes, parameters):
    """Decorate the given task with routes.json routes.

    :param dict task: task definition.
    :param json_routes: the list of routes to use from routes.json
    :param parameters: dictionary of parameters to use in route templates
    """
    routes = task.get('routes', [])
    for route in json_routes:
        routes.append(route.format(**parameters))

    task['routes'] = routes


class BuildTaskValidationException(Exception):
    pass


def validate_build_task(task):
    '''The build tasks have some required fields in extra this function ensures
    they are there. '''
    if 'task' not in task:
        raise BuildTaskValidationException('must have task field')

    task_def = task['task']

    if 'extra' not in task_def:
        raise BuildTaskValidationException('build task must have task.extra props')

    if 'locations' in task_def['extra']:

        locations = task_def['extra']['locations']

        if 'build' not in locations:
            raise BuildTaskValidationException('task.extra.locations.build missing')

        if 'tests' not in locations and 'test_packages' not in locations:
            raise BuildTaskValidationException('task.extra.locations.tests or '
                                               'task.extra.locations.tests_packages missing')


class LegacyTask(base.Task):
    """
    This kind generates a full task graph from the old YAML files in
    `testing/taskcluster/tasks`.  The tasks already have dependency links.

    The existing task-graph generation generates slugids for tasks during task
    generation, so this kind labels tasks using those slugids, with a prefix of
    "TaskLabel==".  These labels are unfortunately not stable from run to run.
    """

    def __init__(self, *args, **kwargs):
        self.task_dict = kwargs.pop('task_dict')
        super(LegacyTask, self).__init__(*args, **kwargs)

    def __eq__(self, other):
        return super(LegacyTask, self).__eq__(other) and \
               self.task_dict == other.task_dict

    @classmethod
    def load_tasks(cls, kind, path, config, params, loaded_tasks):
        root = os.path.abspath(os.path.join(path, config['legacy_path']))

        project = params['project']
        # NOTE: message is ignored here; we always use DEFAULT_TRY, then filter the
        # resulting task graph later
        message = DEFAULT_TRY

        templates = Templates(root)

        job_path = os.path.join(root, 'tasks', 'branches', project, 'job_flags.yml')
        job_path = job_path if os.path.exists(job_path) else \
            os.path.join(root, DEFAULT_JOB_PATH)

        jobs = templates.load(job_path, {})

        job_graph, trigger_tests = parse_commit(message, jobs)

        cmdline_interactive = params.get('interactive', False)

        # Default to current time if querying the head rev fails
        vcs_info = query_vcs_info(params['head_repository'], params['head_rev'])
        changed_files = set()
        if vcs_info:

            logger.debug(
                '{} commits influencing task scheduling:'.format(len(vcs_info.changesets)))
            for c in vcs_info.changesets:
                logger.debug("{cset} {desc}".format(
                    cset=c['node'][0:12],
                    desc=c['desc'].splitlines()[0].encode('ascii', 'ignore')))
                changed_files |= set(c['files'])

        pushdate = time.strftime('%Y%m%d%H%M%S', time.gmtime(params['pushdate']))

        # Template parameters used when expanding the graph
        parameters = dict(gaia_info().items() + {
            'index': 'index',
            'project': project,
            'pushlog_id': params.get('pushlog_id', 0),
            'docker_image': docker_image,
            'base_repository': params['base_repository'] or
            params['head_repository'],
            'head_repository': params['head_repository'],
            'head_ref': params['head_ref'] or params['head_rev'],
            'head_rev': params['head_rev'],
            'pushdate': pushdate,
            'pushtime': pushdate[8:],
            'year': pushdate[0:4],
            'month': pushdate[4:6],
            'day': pushdate[6:8],
            'rank': params['pushdate'],
            'owner': params['owner'],
            'level': params['level'],
        }.items())

        routes_file = os.path.join(root, 'routes.json')
        with open(routes_file) as f:
            contents = json.load(f)
            json_routes = contents['routes']
            # TODO: Nightly and/or l10n routes

        # Task graph we are generating for taskcluster...
        graph = {
            'tasks': [],
            'scopes': set(),
        }

        for env in TREEHERDER_ROUTES:
            route = format_treeherder_route(TREEHERDER_ROUTES[env],
                                            parameters['project'],
                                            parameters['head_rev'],
                                            parameters['pushlog_id'])
            graph['scopes'].add("queue:route:{}".format(route))

        graph['metadata'] = {
            'source': '{repo}file/{rev}/testing/taskcluster/mach_commands.py'.format(
                repo=params['head_repository'], rev=params['head_rev']),
            'owner': params['owner'],
            # TODO: Add full mach commands to this example?
            'description': 'Task graph generated via ./mach taskcluster-graph',
            'name': 'task graph local'
        }

        # Filter the job graph according to conditions met by this invocation run.
        def should_run(task):
            # Old style build or test task that doesn't define conditions. Always runs.
            if 'when' not in task:
                return True

            when = task['when']

            # If the task defines file patterns and we have a set of changed
            # files to compare against, only run if a file pattern matches one
            # of the changed files.
            file_patterns = when.get('file_patterns', None)
            if file_patterns and changed_files:
                # Always consider changes to the task definition itself
                file_patterns.append('testing/taskcluster/{task}'.format(task=task['task']))
                for pattern in file_patterns:
                    for path in changed_files:
                        if mozpackmatch(path, pattern):
                            logger.debug('scheduling {task} because pattern {pattern} '
                                         'matches {path}'.format(
                                             task=task['task'],
                                             pattern=pattern,
                                             path=path,
                                         ))
                            return True

                # No file patterns matched. Discard task.
                logger.debug('discarding {task} because no relevant files changed'.format(
                    task=task['task'],
                    pattern=pattern,
                    path=path))
                return False

            return True

        job_graph = filter(should_run, job_graph)

        all_routes = {}

        for build in job_graph:
            logging.debug("loading build task {}".format(build['task']))
            interactive = cmdline_interactive or build["interactive"]
            build_parameters = merge_dicts(parameters, build['additional-parameters'])
            build_parameters['build_slugid'] = mklabel()
            build_parameters['source'] = '{repo}file/{rev}/testing/taskcluster/{file}'.format(
                repo=params['head_repository'], rev=params['head_rev'], file=build['task'])
            build_task = templates.load(build['task'], build_parameters)

            # Copy build_* attributes to expose them to post-build tasks
            # as well as json routes and tests
            task_extra = build_task['task']['extra']
            build_parameters['build_name'] = task_extra['build_name']
            build_parameters['build_type'] = task_extra['build_type']
            build_parameters['build_product'] = task_extra['build_product']

            if 'treeherder' in task_extra:
                tier = task_extra['treeherder'].get('tier', 1)
                if tier != 1:
                    # Only tier 1 jobs use the build time as rank. Everything
                    # else gets rank 0 until it is promoted to tier 1.
                    task_extra['index']['rank'] = 0

            set_interactive_task(build_task, interactive)

            # try builds don't use cache nor coalescing
            if project == "try":
                remove_caches_from_task(build_task)
                remove_coalescing_from_task(build_task)
                set_expiration(build_task, TRY_EXPIRATION)

            decorate_task_treeherder_routes(build_task['task'],
                                            build_parameters['project'],
                                            build_parameters['head_rev'],
                                            build_parameters['pushlog_id'])
            decorate_task_json_routes(build_task['task'],
                                      json_routes,
                                      build_parameters)

            # Ensure each build graph is valid after construction.
            validate_build_task(build_task)
            attributes = build_task['attributes'] = {'kind': 'legacy', 'legacy_kind': 'build'}
            if 'build_name' in build:
                attributes['build_platform'] = build['build_name']
            if 'build_type' in task_extra:
                attributes['build_type'] = {'dbg': 'debug'}.get(task_extra['build_type'],
                                                                task_extra['build_type'])
            if build.get('is_job'):
                attributes['job'] = build['build_name']
                attributes['legacy_kind'] = 'job'
            graph['tasks'].append(build_task)

            for location in build_task['task']['extra'].get('locations', {}):
                build_parameters['{}_location'.format(location)] = \
                    build_task['task']['extra']['locations'][location]

            for url in build_task['task']['extra'].get('url', {}):
                build_parameters['{}_url'.format(url)] = \
                    build_task['task']['extra']['url'][url]

            define_task = DEFINE_TASK.format(build_task['task']['workerType'])

            for route in build_task['task'].get('routes', []):
                if route.startswith('index.gecko.v2') and route in all_routes:
                    raise Exception(
                        "Error: route '%s' is in use by multiple tasks: '%s' and '%s'" % (
                            route,
                            build_task['task']['metadata']['name'],
                            all_routes[route],
                        ))
                all_routes[route] = build_task['task']['metadata']['name']

            graph['scopes'].add(define_task)
            graph['scopes'] |= set(build_task['task'].get('scopes', []))
            route_scopes = map(
                lambda route: 'queue:route:' + route, build_task['task'].get('routes', [])
            )
            graph['scopes'] |= set(route_scopes)

            # Treeherder symbol configuration for the graph required for each
            # build so tests know which platform they belong to.
            build_treeherder_config = build_task['task']['extra']['treeherder']

            if 'machine' not in build_treeherder_config:
                message = '({}), extra.treeherder.machine required for all builds'
                raise ValueError(message.format(build['task']))

            if 'build' not in build_treeherder_config:
                build_treeherder_config['build'] = \
                    build_treeherder_config['machine']

            if 'collection' not in build_treeherder_config:
                build_treeherder_config['collection'] = {'opt': True}

            if len(build_treeherder_config['collection'].keys()) != 1:
                message = '({}), extra.treeherder.collection must contain one type'
                raise ValueError(message.fomrat(build['task']))

            for post_build in build['post-build']:
                # copy over the old parameters to update the template
                # TODO additional-parameters is currently not an option, only
                # enabled for build tasks
                post_parameters = merge_dicts(build_parameters,
                                              post_build.get('additional-parameters', {}))
                post_task = configure_dependent_task(post_build['task'],
                                                     post_parameters,
                                                     mklabel(),
                                                     templates,
                                                     build_treeherder_config)
                set_interactive_task(post_task, interactive)

                if project == "try":
                    set_expiration(post_task, TRY_EXPIRATION)

                post_task['attributes'] = attributes.copy()
                post_task['attributes']['legacy_kind'] = 'post_build'
                post_task['attributes']['post_build'] = post_build['job_flag']
                graph['tasks'].append(post_task)

        graph['scopes'] = sorted(graph['scopes'])

        # Convert to a dictionary of tasks.  The process above has invented a
        # taskId for each task, and we use those as the *labels* for the tasks;
        # taskgraph will later assign them new taskIds.
        return [
            cls(kind, t['taskId'], task=t['task'], attributes=t['attributes'], task_dict=t)
            for t in graph['tasks']
        ]

    def get_dependencies(self, taskgraph):
        # fetch dependency information from the cached graph
        deps = [(label, label) for label in self.task_dict.get('requires', [])]

        # add a dependency on an image task, if needed
        if 'docker-image' in self.task_dict:
            deps.append(('build-docker-image-{docker-image}'.format(**self.task_dict),
                         'docker-image'))

        return deps

    def optimize(self):
        # no optimization for the moment
        return False, None

    @classmethod
    def from_json(cls, task_dict):
        legacy_task = cls(kind='legacy',
                          label=task_dict['label'],
                          attributes=task_dict['attributes'],
                          task=task_dict['task'],
                          task_dict=task_dict)
        return legacy_task