taskcluster/copy_secrets_to_staging.py
author Aki Sasaki <asasaki@mozilla.com>
Tue, 10 May 2022 15:25:24 -0600
changeset 888 a16d4c026782aafd47539d01ac900b38456a33f1
parent 886 38469d4c0d250e1af4926b4781395da522481dae
permissions -rwxr-xr-x
level 1 secrets, not level 3

#!/usr/bin/env python3
"""
Copy all secrets from fxci to staging.

Requires `taskcluster` python module and the taskcluster cli to be in PATH.

"""
from __future__ import print_function

import asyncio
import io
import logging
import os
import pprint
import re
import sys
from copy import deepcopy

from taskcluster.aio import Secrets

PROD_ROOT_URL = "https://firefox-ci-tc.services.mozilla.com/"
STAGING_ROOT_URL = "https://stage.taskcluster.nonprod.cloudops.mozgcp.net/"
CREDS_EXPIRY_TIME = "15m"
FAKE_EXPIRY_TIME = "3000-01-01T12:00:00.000Z"
NOOP = True
COPY_PREFIXES = (
    "project/releng/gecko/build/level-1/",
    # "project/taskcluster/gecko/hg",
)
log = logging.getLogger(__name__)


class RetryException(Exception):
    ...


# utils {{{1
async def get_output_from_command(cmd, env=None):
    """Run a command using ``asyncio.create_subprocess_exec``.

    Returns:
        int: the exit code of the command

    """
    log.info(f"Running {cmd}...")

    kwargs = {
        "stderr": asyncio.subprocess.PIPE,
        "stdout": asyncio.subprocess.PIPE,
        "stdin": asyncio.subprocess.DEVNULL,
        "close_fds": True,
        "preexec_fn": os.setsid,
    }
    if env is not None:
        kwargs["env"] = env
    proc = await asyncio.create_subprocess_exec(*cmd, **kwargs)
    stdout, stderr = await proc.communicate()
    if stderr is not None:
        stderr = io.TextIOWrapper(io.BytesIO(stderr)).read()
    if stdout is not None:
        stdout = io.TextIOWrapper(io.BytesIO(stdout)).read()
    if stderr:
        log.warning(f"Errors: {stderr}")
    if proc.returncode:
        raise RetryException(f"Exited {exitcode}")
    return stdout


def calculate_sleep_time(
    attempt, delay_factor=5.0, randomization_factor=0.5, max_delay=120
):
    """Calculate the sleep time between retries, in seconds.

    Based off of `taskcluster.utils.calculateSleepTime`, but with kwargs instead
    of constant `delay_factor`/`randomization_factor`/`max_delay`.  The taskcluster
    function generally slept for less than a second, which didn't always get
    past server issues.
    Args:
        attempt (int): the retry attempt number
        delay_factor (float, optional): a multiplier for the delay time.  Defaults to 5.
        randomization_factor (float, optional): a randomization multiplier for the
            delay time.  Defaults to .5.
        max_delay (float, optional): the max delay to sleep.  Defaults to 120 (seconds).
    Returns:
        float: the time to sleep, in seconds.
    """
    if attempt <= 0:
        return 0

    # We subtract one to get exponents: 1, 2, 3, 4, 5, ..
    delay = float(2 ** (attempt - 1)) * float(delay_factor)
    # Apply randomization factor.  Only increase the delay here.
    delay = delay * (randomization_factor * random.random() + 1)
    # Always limit with a maximum delay
    return min(delay, max_delay)


async def retry_async(
    func,
    attempts=5,
    sleeptime_callback=calculate_sleep_time,
    retry_exceptions=(RetryException,),
    args=(),
    kwargs=None,
    sleeptime_kwargs=None,
):
    """Retry ``func``, where ``func`` is an awaitable.

    Args:
        func (function): an awaitable function.
        attempts (int, optional): the number of attempts to make.  Default is 5.
        sleeptime_callback (function, optional): the function to use to determine
            how long to sleep after each attempt.  Defaults to ``calculateSleepTime``.
        retry_exceptions (list or exception, optional): the exception(s) to retry on.
            Defaults to ``Exception``.
        args (list, optional): the args to pass to ``func``.  Defaults to ()
        kwargs (dict, optional): the kwargs to pass to ``func``.  Defaults to
            {}.
        sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``.
            If None, use {}.  Defaults to None.
    Returns:
        object: the value from a successful ``function`` call
    Raises:
        Exception: the exception from a failed ``function`` call, either outside
            of the retry_exceptions, or one of those if we pass the max
            ``attempts``.
    """
    kwargs = kwargs or {}
    attempt = 1
    while True:
        try:
            return await func(*args, **kwargs)
        except retry_exceptions:
            attempt += 1
            _check_number_of_attempts(attempt, attempts, func, "retry_async")
            await asyncio.sleep(
                _define_sleep_time(
                    sleeptime_kwargs, sleeptime_callback, attempt, func, "retry_async"
                )
            )


def _check_number_of_attempts(
    attempt,
    attempts,
    func,
    retry_function_name,
):
    if attempt > attempts:
        log.warning(
            "{}: {}: too many retries!".format(retry_function_name, func.__name__)
        )
        raise


def _define_sleep_time(
    sleeptime_kwargs,
    sleeptime_callback,
    attempt,
    func,
    retry_function_name,
):
    sleeptime_kwargs = sleeptime_kwargs or {}
    sleep_time = sleeptime_callback(attempt, **sleeptime_kwargs)
    log.debug(
        "{}: {}: sleeping {} seconds before retry".format(
            retry_function_name, func.__name__, sleep_time
        )
    )
    return sleep_time


# }}}


def wipe_secret_values(d):
    """Wipe the secret values of a secret.

    Use this if we want to keep all the keys and the structure of a secret
    intact, but wipe the strings and lists to fake values.

    (This assumes that integers, floats, bools, and the keynames/structure may
    be part of a secret, but are not themselves sensitive secrets.)
    """
    for k, v in d.items():
        if isinstance(v, str):
            d[k] = "fake"
        elif isinstance(v, list):
            d[k] = []
        elif isinstance(v, dict):
            d[k] = wipe_secret_values(d.get(k, {}))
        elif isinstance(v, (bool, int, float)):
            continue
        else:
            raise Exception(f"Unknown type for {k}")
    return d


async def get_auth(root_url, scopes):
    """Log into a cluster, return the root_url, client_id, access_token.
    We'll get output like

    export TASKCLUSTER_CLIENT_ID='USER'
    export TASKCLUSTER_ACCESS_TOKEN='SECRET'
    export TASKCLUSTER_ROOT_URL='https://firefox-ci-tc.services.mozilla.com/'

    Returns: dict
    """
    log.info(f"Logging into {root_url}...")
    output = await retry_async(
        get_output_from_command,
        args=(("taskcluster", "signin", "--expires", CREDS_EXPIRY_TIME, "-s", scopes),),
        kwargs={
            "env": {
                "PATH": os.environ["PATH"],
                "TASKCLUSTER_ROOT_URL": root_url,
            },
        },
    )
    regexes = {
        "clientId": re.compile(r"export TASKCLUSTER_CLIENT_ID='(?P<value>.*)'"),
        "accessToken": re.compile(r"export TASKCLUSTER_ACCESS_TOKEN='(?P<value>.*)'"),
    }
    auth = {}
    for line in output.splitlines():
        for k, v in regexes.items():
            m = v.match(line)
            if m is not None:
                auth[k] = m.groupdict()["value"]
                break
    if len(auth) != len(regexes):
        raise Exception("Auth doesn't contain enough info!")
    return auth


async def get_secrets(root_url):
    secrets_obj = Secrets(options={"rootUrl": root_url})
    output = await retry_async(secrets_obj.list)
    return output["secrets"]


async def async_main():
    secrets_list = await get_secrets(PROD_ROOT_URL)
    prod_auth = await get_auth(PROD_ROOT_URL, "secrets:get:*")
    prod_secrets = Secrets(options={"rootUrl": PROD_ROOT_URL, "credentials": prod_auth})
    staging_auth = await get_auth(STAGING_ROOT_URL, "secrets:get:*\nsecrets:set:*")
    staging_secrets = Secrets(
        options={"rootUrl": STAGING_ROOT_URL, "credentials": staging_auth}
    )
    for secret_name in secrets_list:
        # XXX do we want to copy all secrets or just some? e.g. worker-id secrets?
        if not secret_name.startswith(COPY_PREFIXES):
            log.debug(f"Skipping {secret_name}...")
            continue
        log.info(secret_name)
        secret_value = await retry_async(prod_secrets.get, args=(secret_name,))
        # XXX If we want all the keys, but fake values
        # wiped_secret_value = wipe_secret_values(deepcopy(secret_value))
        # XXX if we want the secret to exist, but fake keys and values except for expires
        wiped_secret_value = {
            "secret": {"content": "fake"},
            "expires": FAKE_EXPIRY_TIME,
        }
        if NOOP:
            log.info(
                f"NOOP: I would have set the value for staging {secret_name} to {wiped_secret_value}"
            )
        else:
            await retry_async(
                staging_secrets.set,
                args=(secret_name, wiped_secret_value),
            )


def main():
    logging.basicConfig(
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
        level=logging.INFO,
    )
    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_main())


if __name__ == "__main__":
    main()