taskcluster/gecko_taskgraph/actions/isolate_test.py
author Emilio Cobos Álvarez <emilio@crisal.io>
Tue, 28 Mar 2023 08:20:09 +0000
changeset 658112 702d4a62e5d03a2b36b3a17645718208a30ddb08
parent 648999 5aae8c28c366d98fd160bc7c1c8bea0abbf25102
permissions -rw-r--r--
Bug 1824304 - Fix PiP window type hint after bug 1823350. r=stransky Differential Revision: https://phabricator.services.mozilla.com/D173677

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.


import json
import logging
import os
import re

from taskgraph.util.parameterization import resolve_task_references
from taskgraph.util.taskcluster import get_artifact, get_task_definition, list_artifacts

from gecko_taskgraph.util.copy_task import copy_task

from .registry import register_callback_action
from .util import add_args_to_command, create_task_from_def, fetch_graph_and_labels

logger = logging.getLogger(__name__)


def get_failures(task_id):
    """Returns a dict containing properties containing a list of
    directories containing test failures and a separate list of
    individual test failures from the errorsummary.log artifact for
    the task.

    Calls the helper function munge_test_path to attempt to find an
    appropriate test path to pass to the task in
    MOZHARNESS_TEST_PATHS.  If no appropriate test path can be
    determined, nothing is returned.
    """

    def re_compile_list(*lst):
        # Ideally we'd just use rb"" literals and avoid the encode, but
        # this file needs to be importable in python2 for now.
        return [re.compile(s.encode("utf-8")) for s in lst]

    re_bad_tests = re_compile_list(
        r"Last test finished",
        r"LeakSanitizer",
        r"Main app process exited normally",
        r"ShutdownLeaks",
        r"[(]SimpleTest/TestRunner.js[)]",
        r"automation.py",
        r"https?://localhost:\d+/\d+/\d+/.*[.]html",
        r"jsreftest",
        r"leakcheck",
        r"mozrunner-startup",
        r"pid: ",
        r"RemoteProcessMonitor",
        r"unknown test url",
    )
    re_extract_tests = re_compile_list(
        r'"test": "(?:[^:]+:)?(?:https?|file):[^ ]+/reftest/tests/([^ "]+)',
        r'"test": "(?:[^:]+:)?(?:https?|file):[^:]+:[0-9]+/tests/([^ "]+)',
        r'xpcshell-?[^ "]*\.ini:([^ "]+)',
        r'/tests/([^ "]+) - finished .*',
        r'"test": "([^ "]+)"',
        r'"message": "Error running command run_test with arguments '
        r"[(]<wptrunner[.]wpttest[.]TestharnessTest ([^>]+)>",
        r'"message": "TEST-[^ ]+ [|] ([^ "]+)[^|]*[|]',
    )

    def munge_test_path(line):
        test_path = None
        for r in re_bad_tests:
            if r.search(line):
                return None
        for r in re_extract_tests:
            m = r.search(line)
            if m:
                test_path = m.group(1)
                break
        return test_path

    dirs = set()
    tests = set()
    artifacts = list_artifacts(task_id)
    for artifact in artifacts:
        if "name" not in artifact or not artifact["name"].endswith("errorsummary.log"):
            continue

        stream = get_artifact(task_id, artifact["name"])
        if not stream:
            continue

        # The number of tasks created is determined by the
        # `times` value and the number of distinct tests and
        # directories as: times * (1 + len(tests) + len(dirs)).
        # Since the maximum value of `times` specifiable in the
        # Treeherder UI is 100, the number of tasks created can
        # reach a very large value depending on the number of
        # unique tests.  During testing, it was found that 10
        # distinct tests were sufficient to cause the action task
        # to exceed the maxRunTime of 1800 seconds resulting in it
        # being aborted.  We limit the number of distinct tests
        # and thereby the number of distinct test directories to a
        # maximum of 5 to keep the action task from timing out.

        # We handle the stream as raw bytes because it may contain invalid
        # UTF-8 characters in portions other than those containing the error
        # messages we're looking for.
        for line in stream.read().split(b"\n"):
            test_path = munge_test_path(line.strip())

            if test_path:
                tests.add(test_path.decode("utf-8"))
                test_dir = os.path.dirname(test_path)
                if test_dir:
                    dirs.add(test_dir.decode("utf-8"))

            if len(tests) > 4:
                break

    return {"dirs": sorted(dirs), "tests": sorted(tests)}


def create_isolate_failure_tasks(task_definition, failures, level, times):
    """
    Create tasks to re-run the original task plus tasks to test
    each failing test directory and individual path.

    """
    logger.info(f"Isolate task:\n{json.dumps(task_definition, indent=2)}")

    # Operate on a copy of the original task_definition
    task_definition = copy_task(task_definition)

    task_name = task_definition["metadata"]["name"]
    repeatable_task = False
    if (
        "crashtest" in task_name
        or "mochitest" in task_name
        or "reftest" in task_name
        and "jsreftest" not in task_name
    ):
        repeatable_task = True

    th_dict = task_definition["extra"]["treeherder"]
    symbol = th_dict["symbol"]
    is_windows = "windows" in th_dict["machine"]["platform"]

    suite = task_definition["extra"]["suite"]
    if "-coverage" in suite:
        suite = suite[: suite.index("-coverage")]
    is_wpt = "web-platform-tests" in suite

    # command is a copy of task_definition['payload']['command'] from the original task.
    # It is used to create the new version containing the
    # task_definition['payload']['command'] with repeat_args which is updated every time
    # through the failure_group loop.

    command = copy_task(task_definition["payload"]["command"])

    th_dict["groupSymbol"] = th_dict["groupSymbol"] + "-I"
    th_dict["tier"] = 3

    for i in range(times):
        create_task_from_def(task_definition, level)

    if repeatable_task:
        task_definition["payload"]["maxRunTime"] = 3600 * 3

    for failure_group in failures:
        if len(failures[failure_group]) == 0:
            continue
        if failure_group == "dirs":
            failure_group_suffix = "-id"
            # execute 5 total loops
            repeat_args = ["--repeat=4"] if repeatable_task else []
        elif failure_group == "tests":
            failure_group_suffix = "-it"
            # execute 20 total loops
            repeat_args = ["--repeat=19"] if repeatable_task else []
        else:
            logger.error(
                "create_isolate_failure_tasks: Unknown failure_group {}".format(
                    failure_group
                )
            )
            continue

        if repeat_args:
            task_definition["payload"]["command"] = add_args_to_command(
                command, extra_args=repeat_args
            )
        else:
            task_definition["payload"]["command"] = command

        for failure_path in failures[failure_group]:
            th_dict["symbol"] = symbol + failure_group_suffix
            if is_wpt:
                failure_path = "testing/web-platform/tests" + failure_path
            if is_windows:
                failure_path = "\\".join(failure_path.split("/"))
            task_definition["payload"]["env"]["MOZHARNESS_TEST_PATHS"] = json.dumps(
                {suite: [failure_path]}, sort_keys=True
            )

            logger.info(
                "Creating task for path {} with command {}".format(
                    failure_path, task_definition["payload"]["command"]
                )
            )
            for i in range(times):
                create_task_from_def(task_definition, level)


@register_callback_action(
    name="isolate-test-failures",
    title="Isolate test failures in job",
    symbol="it",
    description="Re-run Tests for original manifest, directories and tests for failing tests.",
    order=150,
    context=[{"kind": "test"}],
    schema={
        "type": "object",
        "properties": {
            "times": {
                "type": "integer",
                "default": 1,
                "minimum": 1,
                "maximum": 100,
                "title": "Times",
                "description": "How many times to run each task.",
            }
        },
        "additionalProperties": False,
    },
)
def isolate_test_failures(parameters, graph_config, input, task_group_id, task_id):
    task = get_task_definition(task_id)
    decision_task_id, full_task_graph, label_to_taskid = fetch_graph_and_labels(
        parameters, graph_config
    )

    pre_task = full_task_graph.tasks[task["metadata"]["name"]]

    # fix up the task's dependencies, similar to how optimization would
    # have done in the decision
    dependencies = {
        name: label_to_taskid[label] for name, label in pre_task.dependencies.items()
    }

    task_definition = resolve_task_references(
        pre_task.label, pre_task.task, task_id, decision_task_id, dependencies
    )
    task_definition.setdefault("dependencies", []).extend(dependencies.values())

    failures = get_failures(task_id)
    logger.info("isolate_test_failures: %s" % failures)
    create_isolate_failure_tasks(
        task_definition, failures, parameters["level"], input["times"]
    )