build/upload_generated_sources.py
author Alessio Placitelli <alessio.placitelli@gmail.com>
Mon, 27 Nov 2017 11:54:27 +0100
changeset 397944 443747e39abe9c7f7dae06603daa57f5c0706d1b
parent 385817 6074db12d685655fe5692d59471b3c32cc967dc9
child 472612 4dabc47dec1223930955d6e7eeb09d7fb5a61661
permissions -rw-r--r--
Bug 1417473 - Implement the hybrid content telemetry API. r=chutten,Gijs This enables whitelisted pages to send messages to the chrome using asynchronous messaging from the content. This patch only adds the API and test coverage. The first consumer of the API will be added as part of bug 1417479. MozReview-Commit-ID: ESxFFjvhpWA

#!/usr/bin/env/python
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import absolute_import, print_function, unicode_literals

import argparse
from contextlib import contextmanager
import gzip
import io
import logging
from mozbuild.base import MozbuildObject
from mozbuild.generated_sources import (
    get_filename_with_digest,
    get_s3_region_and_bucket,
)
import os
from Queue import Queue
import requests
import sys
import tarfile
from threading import Event, Thread
import time

# Arbitrary, should probably measure this.
NUM_WORKER_THREADS = 10
log = logging.getLogger('upload-generated-sources')
log.setLevel(logging.INFO)


@contextmanager
def timed():
    '''
    Yield a function that provides the elapsed time in seconds since this
    function was called.
    '''
    start = time.time()

    def elapsed():
        return time.time() - start
    yield elapsed


def gzip_compress(data):
    '''
    Apply gzip compression to `data` and return the result as a `BytesIO`.
    '''
    b = io.BytesIO()
    with gzip.GzipFile(fileobj=b, mode='w') as f:
        f.write(data)
    b.flush()
    b.seek(0)
    return b


def upload_worker(queue, event, bucket, session_args):
    '''
    Get `(name, contents)` entries from `queue` and upload `contents`
    to S3 with gzip compression using `name` as the key, prefixed with
    the SHA-512 digest of `contents` as a hex string. If an exception occurs,
    set `event`.
    '''
    try:
        import boto3
        session = boto3.session.Session(**session_args)
        s3 = session.client('s3')
        while True:
            if event.is_set():
                # Some other thread hit an exception.
                return
            (name, contents) = queue.get()
            pathname = get_filename_with_digest(name, contents)
            compressed = gzip_compress(contents)
            extra_args = {
                'ContentEncoding': 'gzip',
                'ContentType': 'text/plain',
            }
            log.info('Uploading "{}" ({} bytes)'.format(
                pathname, len(compressed.getvalue())))
            with timed() as elapsed:
                s3.upload_fileobj(compressed, bucket,
                                  pathname, ExtraArgs=extra_args)
                log.info('Finished uploading "{}" in {:0.3f}s'.format(
                    pathname, elapsed()))
            queue.task_done()
    except Exception:
        log.exception('Thread encountered exception:')
        event.set()


def do_work(artifact, region, bucket):
    session_args = {'region_name': region}
    session = requests.Session()
    if 'TASK_ID' in os.environ:
        level = os.environ.get('MOZ_SCM_LEVEL', '1')
        secrets_url = 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
            level)
        log.info(
            'Using AWS credentials from the secrets service: "{}"'.format(secrets_url))
        res = session.get(secrets_url)
        res.raise_for_status()
        secret = res.json()
        session_args.update(
            aws_access_key_id=secret['secret']['AWS_ACCESS_KEY_ID'],
            aws_secret_access_key=secret['secret']['AWS_SECRET_ACCESS_KEY'],
        )
    else:
        log.info('Trying to use your AWS credentials..')

    # First, fetch the artifact containing the sources.
    log.info('Fetching generated sources artifact: "{}"'.format(artifact))
    with timed() as elapsed:
        res = session.get(artifact)
        log.info('Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
            res.status_code, len(res.content), elapsed()))
    res.raise_for_status()
    # Create a queue and worker threads for uploading.
    q = Queue()
    event = Event()
    log.info('Creating {} worker threads'.format(NUM_WORKER_THREADS))
    for i in range(NUM_WORKER_THREADS):
        t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
        t.daemon = True
        t.start()
    with tarfile.open(fileobj=io.BytesIO(res.content), mode='r|gz') as tar:
        # Next, process each file.
        for entry in tar:
            if event.is_set():
                break
            log.info('Queueing "{}"'.format(entry.name))
            q.put((entry.name, tar.extractfile(entry).read()))
    # Wait until all uploads are finished.
    # We don't use q.join() here because we want to also monitor event.
    while q.unfinished_tasks:
        if event.wait(0.1):
            log.error('Worker thread encountered exception, exiting...')
            break


def main(argv):
    logging.basicConfig(format='%(levelname)s - %(threadName)s - %(message)s')
    parser = argparse.ArgumentParser(
        description='Upload generated source files in ARTIFACT to BUCKET in S3.')
    parser.add_argument('artifact',
                        help='generated-sources artifact from build task')
    args = parser.parse_args(argv)
    region, bucket = get_s3_region_and_bucket()

    config = MozbuildObject.from_environment()
    config._activate_virtualenv()
    config.virtualenv_manager.install_pip_package('boto3==1.4.4')

    with timed() as elapsed:
        do_work(region=region, bucket=bucket, artifact=args.artifact)
        log.info('Finished in {:.03f}s'.format(elapsed()))
    return 0


if __name__ == '__main__':
    sys.exit(main(sys.argv[1:]))