#!/usr/bin/env python3
"""
Copy all secrets from fxci to staging.
Requires `taskcluster` python module and the taskcluster cli to be in PATH.
"""
from __future__ import print_function
import asyncio
import io
import logging
import os
import pprint
import re
import sys
from copy import deepcopy
from taskcluster.aio import Secrets
PROD_ROOT_URL = "https://firefox-ci-tc.services.mozilla.com/"
STAGING_ROOT_URL = "https://stage.taskcluster.nonprod.cloudops.mozgcp.net/"
CREDS_EXPIRY_TIME = "15m"
FAKE_EXPIRY_TIME = "3000-01-01T12:00:00.000Z"
NOOP = True
COPY_PREFIXES = (
"project/releng/gecko/build/level-1/",
# "project/taskcluster/gecko/hg",
)
log = logging.getLogger(__name__)
class RetryException(Exception):
...
# utils {{{1
async def get_output_from_command(cmd, env=None):
"""Run a command using ``asyncio.create_subprocess_exec``.
Returns:
int: the exit code of the command
"""
log.info(f"Running {cmd}...")
kwargs = {
"stderr": asyncio.subprocess.PIPE,
"stdout": asyncio.subprocess.PIPE,
"stdin": asyncio.subprocess.DEVNULL,
"close_fds": True,
"preexec_fn": os.setsid,
}
if env is not None:
kwargs["env"] = env
proc = await asyncio.create_subprocess_exec(*cmd, **kwargs)
stdout, stderr = await proc.communicate()
if stderr is not None:
stderr = io.TextIOWrapper(io.BytesIO(stderr)).read()
if stdout is not None:
stdout = io.TextIOWrapper(io.BytesIO(stdout)).read()
if stderr:
log.warning(f"Errors: {stderr}")
if proc.returncode:
raise RetryException(f"Exited {exitcode}")
return stdout
def calculate_sleep_time(
attempt, delay_factor=5.0, randomization_factor=0.5, max_delay=120
):
"""Calculate the sleep time between retries, in seconds.
Based off of `taskcluster.utils.calculateSleepTime`, but with kwargs instead
of constant `delay_factor`/`randomization_factor`/`max_delay`. The taskcluster
function generally slept for less than a second, which didn't always get
past server issues.
Args:
attempt (int): the retry attempt number
delay_factor (float, optional): a multiplier for the delay time. Defaults to 5.
randomization_factor (float, optional): a randomization multiplier for the
delay time. Defaults to .5.
max_delay (float, optional): the max delay to sleep. Defaults to 120 (seconds).
Returns:
float: the time to sleep, in seconds.
"""
if attempt <= 0:
return 0
# We subtract one to get exponents: 1, 2, 3, 4, 5, ..
delay = float(2 ** (attempt - 1)) * float(delay_factor)
# Apply randomization factor. Only increase the delay here.
delay = delay * (randomization_factor * random.random() + 1)
# Always limit with a maximum delay
return min(delay, max_delay)
async def retry_async(
func,
attempts=5,
sleeptime_callback=calculate_sleep_time,
retry_exceptions=(RetryException,),
args=(),
kwargs=None,
sleeptime_kwargs=None,
):
"""Retry ``func``, where ``func`` is an awaitable.
Args:
func (function): an awaitable function.
attempts (int, optional): the number of attempts to make. Default is 5.
sleeptime_callback (function, optional): the function to use to determine
how long to sleep after each attempt. Defaults to ``calculateSleepTime``.
retry_exceptions (list or exception, optional): the exception(s) to retry on.
Defaults to ``Exception``.
args (list, optional): the args to pass to ``func``. Defaults to ()
kwargs (dict, optional): the kwargs to pass to ``func``. Defaults to
{}.
sleeptime_kwargs (dict, optional): the kwargs to pass to ``sleeptime_callback``.
If None, use {}. Defaults to None.
Returns:
object: the value from a successful ``function`` call
Raises:
Exception: the exception from a failed ``function`` call, either outside
of the retry_exceptions, or one of those if we pass the max
``attempts``.
"""
kwargs = kwargs or {}
attempt = 1
while True:
try:
return await func(*args, **kwargs)
except retry_exceptions:
attempt += 1
_check_number_of_attempts(attempt, attempts, func, "retry_async")
await asyncio.sleep(
_define_sleep_time(
sleeptime_kwargs, sleeptime_callback, attempt, func, "retry_async"
)
)
def _check_number_of_attempts(
attempt,
attempts,
func,
retry_function_name,
):
if attempt > attempts:
log.warning(
"{}: {}: too many retries!".format(retry_function_name, func.__name__)
)
raise
def _define_sleep_time(
sleeptime_kwargs,
sleeptime_callback,
attempt,
func,
retry_function_name,
):
sleeptime_kwargs = sleeptime_kwargs or {}
sleep_time = sleeptime_callback(attempt, **sleeptime_kwargs)
log.debug(
"{}: {}: sleeping {} seconds before retry".format(
retry_function_name, func.__name__, sleep_time
)
)
return sleep_time
# }}}
def wipe_secret_values(d):
"""Wipe the secret values of a secret.
Use this if we want to keep all the keys and the structure of a secret
intact, but wipe the strings and lists to fake values.
(This assumes that integers, floats, bools, and the keynames/structure may
be part of a secret, but are not themselves sensitive secrets.)
"""
for k, v in d.items():
if isinstance(v, str):
d[k] = "fake"
elif isinstance(v, list):
d[k] = []
elif isinstance(v, dict):
d[k] = wipe_secret_values(d.get(k, {}))
elif isinstance(v, (bool, int, float)):
continue
else:
raise Exception(f"Unknown type for {k}")
return d
async def get_auth(root_url, scopes):
"""Log into a cluster, return the root_url, client_id, access_token.
We'll get output like
export TASKCLUSTER_CLIENT_ID='USER'
export TASKCLUSTER_ACCESS_TOKEN='SECRET'
export TASKCLUSTER_ROOT_URL='https://firefox-ci-tc.services.mozilla.com/'
Returns: dict
"""
log.info(f"Logging into {root_url}...")
output = await retry_async(
get_output_from_command,
args=(("taskcluster", "signin", "--expires", CREDS_EXPIRY_TIME, "-s", scopes),),
kwargs={
"env": {
"PATH": os.environ["PATH"],
"TASKCLUSTER_ROOT_URL": root_url,
},
},
)
regexes = {
"clientId": re.compile(r"export TASKCLUSTER_CLIENT_ID='(?P<value>.*)'"),
"accessToken": re.compile(r"export TASKCLUSTER_ACCESS_TOKEN='(?P<value>.*)'"),
}
auth = {}
for line in output.splitlines():
for k, v in regexes.items():
m = v.match(line)
if m is not None:
auth[k] = m.groupdict()["value"]
break
if len(auth) != len(regexes):
raise Exception("Auth doesn't contain enough info!")
return auth
async def get_secrets(root_url):
secrets_obj = Secrets(options={"rootUrl": root_url})
output = await retry_async(secrets_obj.list)
return output["secrets"]
async def async_main():
secrets_list = await get_secrets(PROD_ROOT_URL)
prod_auth = await get_auth(PROD_ROOT_URL, "secrets:get:*")
prod_secrets = Secrets(options={"rootUrl": PROD_ROOT_URL, "credentials": prod_auth})
staging_auth = await get_auth(STAGING_ROOT_URL, "secrets:get:*\nsecrets:set:*")
staging_secrets = Secrets(
options={"rootUrl": STAGING_ROOT_URL, "credentials": staging_auth}
)
for secret_name in secrets_list:
# XXX do we want to copy all secrets or just some? e.g. worker-id secrets?
if not secret_name.startswith(COPY_PREFIXES):
log.debug(f"Skipping {secret_name}...")
continue
log.info(secret_name)
secret_value = await retry_async(prod_secrets.get, args=(secret_name,))
# XXX If we want all the keys, but fake values
# wiped_secret_value = wipe_secret_values(deepcopy(secret_value))
# XXX if we want the secret to exist, but fake keys and values except for expires
wiped_secret_value = {
"secret": {"content": "fake"},
"expires": FAKE_EXPIRY_TIME,
}
if NOOP:
log.info(
f"NOOP: I would have set the value for staging {secret_name} to {wiped_secret_value}"
)
else:
await retry_async(
staging_secrets.set,
args=(secret_name, wiped_secret_value),
)
def main():
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
loop = asyncio.get_event_loop()
loop.run_until_complete(async_main())
if __name__ == "__main__":
main()