taskcluster/terminate_broken_workers.py
author Johan Lorenzo <jlorenzo@mozilla.com>
Fri, 10 Feb 2023 20:15:10 +0100
changeset 905 a87545a560d669427b0e267086e220d35dfb87d7
parent 904 c8bb2fc2fbdc7e59c7df62a7815a4ec4a9372ea1
permissions -rwxr-xr-x
taskcluster: Add script to display what tasks got rerun

#!/usr/bin/env python
"""
Terminates all workers for the given pool which appear to be stuck (in the
running state but have never claimed a task).

Usage:
    ./terminate_workers.py [<workerPoolId> ...]
"""
import argparse
import sys
from datetime import datetime
from pprint import pprint

import taskcluster

now = datetime.utcnow()
queue = taskcluster.Queue(taskcluster.optionsFromEnvironment())
wm = taskcluster.WorkerManager(taskcluster.optionsFromEnvironment())


SUPPORTED_PROVIDERS = (
    "aws",
    "fxci-level1-gcp",
    "fxci-level2-gcp",
    "fxci-level3-gcp",
)


def _normalize(workers):
    return {(w["workerGroup"], w["workerId"]) for w in workers}


def is_candidate(worker):
    if worker["state"] != "running":
        return False

    # Only consider workers which have been running for at least one hour.
    created = datetime.strptime(worker["created"], "%Y-%m-%dT%H:%M:%S.%fZ")
    delta = now - created
    if delta.total_seconds() < 3600:
        return False

    return True


def terminate_workers_in_pool(worker_pool_id, dry_run=False):
    print(f"checking {worker_pool_id}...")
    provisioner, worker_type = worker_pool_id.split("/")

    wm_workers = filter(
        is_candidate, wm.listWorkersForWorkerPool(worker_pool_id)["workers"]
    )
    queue_workers = queue.listWorkers(provisioner, worker_type)["workers"]

    # Any workers that a known to worker-manager, but not the queue service
    # have not picked up any tasks and can therefore be considered "busted".
    busted_workers = _normalize(wm_workers) - _normalize(queue_workers)
    action = "would terminate" if dry_run else "terminating"
    for group, worker_id in busted_workers:
        print(f"  {action} worker '{worker_id}' in {group}")
        if not dry_run:
            wm.removeWorker(worker_pool_id, group, worker_id)


def terminate_workers(pools=None, **kwargs):
    pools = pools or [
        p["workerPoolId"]
        for p in wm.listWorkerPools()["workerPools"]
        if p["providerId"] in SUPPORTED_PROVIDERS
    ]
    for pool in pools:
        terminate_workers_in_pool(pool, **kwargs)


def cli(args=sys.argv[1:]):
    parser = argparse.ArgumentParser(__doc__)
    parser.add_argument(
        "pools",
        metavar="POOL",
        nargs="*",
        help="Worker pool id(s) to terminate workers on.",
    )
    parser.add_argument(
        "--dry-run",
        action="store_true",
        default=False,
        help="Print which workers would be terminated.",
    )
    args = parser.parse_args(args)
    terminate_workers(**vars(args))


if __name__ == "__main__":
    sys.exit(cli(sys.argv[1:]))