buildfarm/utils/archiver_client.py
author Tom Prince <mozilla@hocat.ca>
Sun, 05 Nov 2017 11:43:33 -0700
changeset 8054 ba720850b702
parent 7073 a273caaf6717
permissions -rwxr-xr-x
Bug 1415619: Add features for legacy taskcluster routes; r=dustin MozReview-Commit-ID: AGEI4VDSkXP
#! /usr/bin/env python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

#
# Script name:   archiver_client.py
# Author(s):     Jordan Lund <jlund@mozilla.com>
# Target:        Python 2.7.x
#
"""
    calls relengapi archiver, downloads returned s3 url, and unpacks it locally
"""
import logging
import os
import random
import ssl
import subprocess
import tarfile
import time
import urllib2
import json
import sys
from optparse import OptionParser
import httplib

SUCCESS_CODE = 0
# This is not an infra error and we can't recover from it
FAILURE_CODE = 1
# When an infra error happens we want to turn purple and
# let sheriffs determine if re-triggering is needed
INFRA_CODE = 3

ARCHIVER_CONFIGS = {
    'mozharness': {
        'url_format': "archiver/hgmo/%(repo)s/%(rev)s?&preferred_region=%(region)s&suffix=%(suffix)s",
        # the root path from within the archive
        'extract_root_dir': "%(repo)s-%(rev)s",
        # the subdir path from within the root
        'default_extract_subdir': "testing/mozharness",
    }
}

RELENGAPI_HOST = {
    'staging': 'https://api-pub-build.allizom.org/',
    'production': 'https://api.pub.build.mozilla.org/'
}

logging.basicConfig(format='%(asctime)s %(message)s')
log = logging.getLogger(__name__)


# This has been copied from lib.python.util.retry
def retrier(attempts=5, sleeptime=7, max_sleeptime=300, sleepscale=1.5, jitter=1):
    """ It helps us retry """
    for _ in range(attempts):
        log.debug("attempt %i/%i", _ + 1, attempts)
        yield
        if jitter:
            sleeptime += random.randint(-jitter, jitter)
        if _ == attempts - 1:
            # Don't need to sleep the last time
            break
        log.debug("sleeping for %.2fs (attempt %i/%i)",
                  sleeptime, _ + 1, attempts)
        time.sleep(sleeptime)
        sleeptime *= sleepscale
        if sleeptime > max_sleeptime:
            sleeptime = max_sleeptime


def get_task_result(url):
    task_response = urllib2.urlopen(url, timeout=60)
    task_content = task_response.read()
    task_response.close()
    return json.loads(task_content)['result']


def get_response_from_task(url, options):
    """
    gets and returns response from archiver task when the task is complete or retries are
    exhausted. Complete being the result's 'state' equals 'SUCCESS'

    :param url: archiver sub task url
    :param options: script options
    :return: response obj
    """
    task_result = {}
    for _ in retrier(attempts=options.max_retries, sleeptime=options.sleeptime,
                     max_sleeptime=options.max_retries * options.sleeptime):
        task_result = get_task_result(url)
        log.debug("current task status: %s state: %s", task_result['status'], task_result['state'])
        if task_result['state'] == "SUCCESS":
            break

    if task_result.get('state') == "SUCCESS":
        if task_result.get('s3_urls'):
            # grab a s3 url using the preferred region if available
            s3_url = task_result['s3_urls'].get(options.region, task_result['s3_urls'].values()[0])
            return urllib2.urlopen(s3_url, timeout=60)
        else:
            log.error("Automation Error: s3 URL could not be determined even though archiver task completed")
            log.error("Check archiver logs for errors. Task status: %s" % task_result['status'])
            exit(INFRA_CODE)
    else:
        log.error("Automation Error: task state did not equal SUCCESS")
        log.error("Check archiver logs for errors. Task status: %s Task state: %s",
                  task_result['status'], task_result['state'])
        exit(INFRA_CODE)


def get_url_response(api_url, options):
    """
    queries archiver endpoint and parses response for s3_url. if archiver returns a 202,
    a sub task url is polled until that response is complete.

    :param url: archiver get url with params
    :param options: script options
    :return: response obj
    """
    fatal_msg = "Automation Error: could not determine a valid url response."
    num = 0
    response = None
    for _ in retrier(attempts=options.max_retries, sleeptime=options.sleeptime,
                     max_sleeptime=options.max_retries * options.sleeptime):
        try:
            log.info("Getting archive location from %s" % api_url)
            response = urllib2.urlopen(api_url, timeout=60)

            if response.code == 202:
                # archiver is taking a long time so it started a sub task
                response = get_response_from_task(response.info()['location'], options)

            if response.code == 200:
                break
            else:
                log.debug("got a bad response. response code: %s", response.code)

        except (urllib2.HTTPError, urllib2.URLError, ssl.SSLError, httplib.BadStatusLine) as e:
            if num == options.max_retries - 1:
                log.exception(fatal_msg)
                exit(INFRA_CODE)
        num += 1

    if not response.code == 200:
        content = response.read()
        log.error(fatal_msg)
        log.error("return code: '%s' return content: %s" % (response.code, content))
        exit(INFRA_CODE)

    return response


def download_and_extract_archive(response, extract_root, destination):
    """downloads and extracts an archive from the archiver response.

    If the archive was based on an archive, only files and directories from within the subdir are
    extracted. The extraction takes place in the destination, overriding paths with the same name.

    :param response: the archiver response pointing to an archive
    :param extract_root: the root path to be extracted
    :param destination: the destination path to extract to
    """
    # make sure our extracted root path has a trailing slash
    extract_root = os.path.join(extract_root, '')
    # now convert any back slashes to forward slashes as tarfile's member path strings use posix
    # forward slashes even if run on a windows machine
    extract_root = extract_root.replace("\\", "/")

    try:
        tar = tarfile.open(fileobj=response, mode='r|gz')
        log.debug("unpacking tar archive at: %s", extract_root)
        for member in tar:
            if not member.name.startswith(extract_root):
                continue
            member.name = member.name.replace(extract_root, '')
            if member.issym() and sys.platform in ('win32', 'cygwin'):
                log.debug('skipping symlink on windows: %s', member.name)
                continue
            tar.extract(member, destination)
    except tarfile.TarError as e:
        log.exception("Automation Error: Could not download and extract archive. See Traceback:")
        exit(INFRA_CODE)
    finally:
        tar.close()


def archiver(api_url, config_key, options):
    """
    1) obtains valid s3 url for archive via relengapi's archiver
    2) downloads and extracts archive
    """
    archive_cfg = ARCHIVER_CONFIGS[config_key]  # specifics to the archiver endpoint

    response = get_url_response(api_url, options)

    # get the root path within the archive that will be the starting path of extraction
    subdir = options.subdir or archive_cfg.get('default_extract_subdir')
    extract_root = archive_cfg['extract_root_dir'] % {'repo': os.path.basename(options.repo),
                                                      'rev': options.rev}
    if subdir:
        extract_root = os.path.join(extract_root, subdir)

    destination = os.path.abspath(options.destination or config_key)

    download_and_extract_archive(response, extract_root, destination)


def options_args():
    """
    Validate options and args and return them.
    """
    parser = OptionParser(__doc__)
    parser.add_option("--repo", dest="repo", default='mozilla-central',
                      help="The repository the archive is based on.")
    parser.add_option("--rev", dest="rev", help="The revision the archive is based on.")
    parser.add_option("--tag", dest="tag",
                      help="The tag the archive is based on. This is only supported with mozharness.")
    parser.add_option("--region", dest="region", default='us-west-2',
                      help="The preferred region of the s3 archive.")
    parser.add_option("--subdir", dest="subdir",
                      help="The path within the root of the archive of where to start extracting "
                           "from. If not supplied, the entire archive will be extracted.")
    parser.add_option("--destination", dest="destination",
                      help="The relative directory of where to extract the archive to.")
    parser.add_option("--staging", dest='staging', action='store_true', default=False,
                      help="Use staging relengapi")
    parser.add_option("--timeout", dest="timeout", type="float", default=30,
                      help="Used to specify how long to wait until timing out "
                           "for network requests.")
    parser.add_option("--max-retries", dest="max_retries", type="int",
                      default=10,
                      help="A maximum number of retries for network requests.")
    parser.add_option("--sleeptime", dest="sleeptime", type="int", default=10,
                      help="How long to sleep in between network requests.")
    parser.add_option("--debug", dest="debug", action="store_true",
                      default=False, help="Enable debug logging.")

    options, args = parser.parse_args()

    if not len(args) == 1:
        parser.error("archiver_client.py requires exactly 1 argument: the archiver config. "
                     "Valid configs: %s" % str(ARCHIVER_CONFIGS.keys()))

    if options.rev and len(options.rev) > 12:
        log.warning("truncating revision to first 12 chars")
        options.rev = options.rev[0:12]

    if options.debug:
        log.setLevel(logging.DEBUG)
        log.info("Setting DEBUG logging.")

    config = args[0]
    if not ARCHIVER_CONFIGS.get(config):
        log.error("Config argument is unknown. "
                  "Given: '%s', Valid: %s" % (config, str(ARCHIVER_CONFIGS.keys())))
        exit(FAILURE_CODE)

    if options.tag and options.rev:
        parser.error("--rev or --tag can be passed but not both.")

    if config == 'mozharness':
        options.rev = custom_mozharness_options(options)
    elif options.tag:
        log.error('--tag is only supported with mozharness for now.')
        exit(FAILURE_CODE)

    return options, args


def custom_mozharness_options(options):
    rev_to_be_replaced = None
    new_rev = options.rev
    msg = None
    if options.rev == 'default':
        rev_to_be_replaced = options.rev
        msg = '"default" was passed as the revision. Querying remote repository for ' \
              'corresponding rev hash of current default tip'
    if options.tag:
        rev_to_be_replaced = options.tag
        msg = '"%s" was passed as the tag. Querying remote repository for ' \
              'corresponding rev hash.' % options.tag

    if rev_to_be_replaced:
        cmd = ['hg', 'id', '-r', rev_to_be_replaced, 'https://hg.mozilla.org/%s' % (options.repo,)]
        log.info(msg)
        new_rev = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0].strip()
        if not new_rev:
            log.error('The revision could not be determined. Does it exist?')
            exit(FAILURE_CODE)
        log.info('revision being used: %s', new_rev)

    return new_rev


def main():
    options, args = options_args()
    config = args[0]

    api_url = RELENGAPI_HOST['staging' if options.staging else 'production']
    api_url += ARCHIVER_CONFIGS[config]['url_format'] % {
        'rev': options.rev, 'repo': options.repo, 'region': options.region, 'suffix': 'tar.gz'
    }
    subdir = options.subdir or ARCHIVER_CONFIGS[config].get('default_extract_subdir')
    if subdir:
        api_url += "&subdir=%s" % (subdir,)

    archiver(api_url=api_url, config_key=config, options=options)

    exit(SUCCESS_CODE)


if __name__ == '__main__':
    main()