#!/usr/bin/env python
"""
Terminates all workers for the given pool which appear to be stuck (in the
running state but have never claimed a task).
Usage:
./terminate_workers.py [<workerPoolId> ...]
"""
import argparse
import sys
from datetime import datetime
from pprint import pprint
import taskcluster
now = datetime.utcnow()
queue = taskcluster.Queue(taskcluster.optionsFromEnvironment())
wm = taskcluster.WorkerManager(taskcluster.optionsFromEnvironment())
SUPPORTED_PROVIDERS = (
"aws",
"fxci-level1-gcp",
"fxci-level2-gcp",
"fxci-level3-gcp",
)
def _normalize(workers):
return {(w["workerGroup"], w["workerId"]) for w in workers}
def is_candidate(worker):
if worker["state"] != "running":
return False
# Only consider workers which have been running for at least one hour.
created = datetime.strptime(worker["created"], "%Y-%m-%dT%H:%M:%S.%fZ")
delta = now - created
if delta.total_seconds() < 3600:
return False
return True
def terminate_workers_in_pool(worker_pool_id, dry_run=False):
print(f"checking {worker_pool_id}...")
provisioner, worker_type = worker_pool_id.split("/")
wm_workers = filter(
is_candidate, wm.listWorkersForWorkerPool(worker_pool_id)["workers"]
)
queue_workers = queue.listWorkers(provisioner, worker_type)["workers"]
# Any workers that a known to worker-manager, but not the queue service
# have not picked up any tasks and can therefore be considered "busted".
busted_workers = _normalize(wm_workers) - _normalize(queue_workers)
action = "would terminate" if dry_run else "terminating"
for group, worker_id in busted_workers:
print(f" {action} worker '{worker_id}' in {group}")
if not dry_run:
wm.removeWorker(worker_pool_id, group, worker_id)
def terminate_workers(pools=None, **kwargs):
pools = pools or [
p["workerPoolId"]
for p in wm.listWorkerPools()["workerPools"]
if p["providerId"] in SUPPORTED_PROVIDERS
]
for pool in pools:
terminate_workers_in_pool(pool, **kwargs)
def cli(args=sys.argv[1:]):
parser = argparse.ArgumentParser(__doc__)
parser.add_argument(
"pools",
metavar="POOL",
nargs="*",
help="Worker pool id(s) to terminate workers on.",
)
parser.add_argument(
"--dry-run",
action="store_true",
default=False,
help="Print which workers would be terminated.",
)
args = parser.parse_args(args)
terminate_workers(**vars(args))
if __name__ == "__main__":
sys.exit(cli(sys.argv[1:]))