author Iain Ireland <>
Fri, 11 Jan 2019 18:05:36 +0000
changeset 453523 025feea5945bffc625e9cca2e23be51ee8670315
parent 385817 6074db12d685655fe5692d59471b3c32cc967dc9
child 472612 4dabc47dec1223930955d6e7eeb09d7fb5a61661
permissions -rw-r--r--
Bug 1480390: Move ForOfIterClose logic inside TryNoteIter r=tcampbell This patch was intended to be a pure refactoring of existing code with no side-effects, moving the logic for handling for-of/for-of-iterclose trynotes inside TryNoteIter to avoid duplicating logic in all users of TryNoteIter. However, it turns out that there was a subtle preexisting bug in TryNoteIter that is fixed by the refactoring. Specifically, the logic to skip from a for-of-iterclose to its enclosing for-of must run before the logic to skip trynotes based on stack depth. Otherwise, the stack depth code may filter out the enclosing for-of (see the attached test case for an example) and we will skip too many try-notes. Differential Revision:

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at

from __future__ import absolute_import, print_function, unicode_literals

import argparse
from contextlib import contextmanager
import gzip
import io
import logging
from mozbuild.base import MozbuildObject
from mozbuild.generated_sources import (
import os
from Queue import Queue
import requests
import sys
import tarfile
from threading import Event, Thread
import time

# Arbitrary, should probably measure this.
log = logging.getLogger('upload-generated-sources')

def timed():
    Yield a function that provides the elapsed time in seconds since this
    function was called.
    start = time.time()

    def elapsed():
        return time.time() - start
    yield elapsed

def gzip_compress(data):
    Apply gzip compression to `data` and return the result as a `BytesIO`.
    b = io.BytesIO()
    with gzip.GzipFile(fileobj=b, mode='w') as f:
    return b

def upload_worker(queue, event, bucket, session_args):
    Get `(name, contents)` entries from `queue` and upload `contents`
    to S3 with gzip compression using `name` as the key, prefixed with
    the SHA-512 digest of `contents` as a hex string. If an exception occurs,
    set `event`.
        import boto3
        session = boto3.session.Session(**session_args)
        s3 = session.client('s3')
        while True:
            if event.is_set():
                # Some other thread hit an exception.
            (name, contents) = queue.get()
            pathname = get_filename_with_digest(name, contents)
            compressed = gzip_compress(contents)
            extra_args = {
                'ContentEncoding': 'gzip',
                'ContentType': 'text/plain',
  'Uploading "{}" ({} bytes)'.format(
                pathname, len(compressed.getvalue())))
            with timed() as elapsed:
                s3.upload_fileobj(compressed, bucket,
                                  pathname, ExtraArgs=extra_args)
      'Finished uploading "{}" in {:0.3f}s'.format(
                    pathname, elapsed()))
    except Exception:
        log.exception('Thread encountered exception:')

def do_work(artifact, region, bucket):
    session_args = {'region_name': region}
    session = requests.Session()
    if 'TASK_ID' in os.environ:
        level = os.environ.get('MOZ_SCM_LEVEL', '1')
        secrets_url = 'http://taskcluster/secrets/v1/secret/project/releng/gecko/build/level-{}/gecko-generated-sources-upload'.format( # noqa
            'Using AWS credentials from the secrets service: "{}"'.format(secrets_url))
        res = session.get(secrets_url)
        secret = res.json()
    else:'Trying to use your AWS credentials..')

    # First, fetch the artifact containing the sources.'Fetching generated sources artifact: "{}"'.format(artifact))
    with timed() as elapsed:
        res = session.get(artifact)'Fetch HTTP status: {}, {} bytes downloaded in {:0.3f}s'.format(
            res.status_code, len(res.content), elapsed()))
    # Create a queue and worker threads for uploading.
    q = Queue()
    event = Event()'Creating {} worker threads'.format(NUM_WORKER_THREADS))
    for i in range(NUM_WORKER_THREADS):
        t = Thread(target=upload_worker, args=(q, event, bucket, session_args))
        t.daemon = True
    with, mode='r|gz') as tar:
        # Next, process each file.
        for entry in tar:
            if event.is_set():
  'Queueing "{}"'.format(
            q.put((, tar.extractfile(entry).read()))
    # Wait until all uploads are finished.
    # We don't use q.join() here because we want to also monitor event.
    while q.unfinished_tasks:
        if event.wait(0.1):
            log.error('Worker thread encountered exception, exiting...')

def main(argv):
    logging.basicConfig(format='%(levelname)s - %(threadName)s - %(message)s')
    parser = argparse.ArgumentParser(
        description='Upload generated source files in ARTIFACT to BUCKET in S3.')
                        help='generated-sources artifact from build task')
    args = parser.parse_args(argv)
    region, bucket = get_s3_region_and_bucket()

    config = MozbuildObject.from_environment()

    with timed() as elapsed:
        do_work(region=region, bucket=bucket, artifact=args.artifact)'Finished in {:.03f}s'.format(elapsed()))
    return 0

if __name__ == '__main__':