Bug 1401995 Update funsize to use async, to reduce task time r=jlorenzo
authorSimon Fraser <sfraser@mozilla.com>
Wed, 03 Jan 2018 14:42:47 +0000
changeset 449942 a98402f998c4828e8e7dffe11796e5f974a45668
parent 449941 58cc2c96502d73b6bf9dfafc709ce20125d3434c
child 449943 a88a69a60eb8ee13f8a7fe5626a0f6b64b26f7d3
push id8527
push userCallek@gmail.com
push dateThu, 11 Jan 2018 21:05:50 +0000
treeherdermozilla-beta@95342d212a7a [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersjlorenzo
bugs1401995
milestone59.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1401995 Update funsize to use async, to reduce task time r=jlorenzo MozReview-Commit-ID: 24IU3pcJseY
taskcluster/docker/funsize-update-generator/requirements.txt
taskcluster/docker/funsize-update-generator/runme.sh
taskcluster/docker/funsize-update-generator/scripts/funsize.py
tools/lint/py2.yml
--- a/taskcluster/docker/funsize-update-generator/requirements.txt
+++ b/taskcluster/docker/funsize-update-generator/requirements.txt
@@ -1,5 +1,7 @@
 mar==2.1.2
 backports.lzma==0.0.8
 datadog==0.17.0
 redo==1.6
+aiohttp==2.3.6
 awscli==1.14.10
+scriptworker==6.0.0
--- a/taskcluster/docker/funsize-update-generator/runme.sh
+++ b/taskcluster/docker/funsize-update-generator/runme.sh
@@ -13,17 +13,18 @@ curl --location --retry 10 --retry-delay
     "https://queue.taskcluster.net/v1/task/$TASK_ID"
 
 # auth:aws-s3:read-write:tc-gp-private-1d-us-east-1/releng/mbsdiff-cache/
 # -> bucket of tc-gp-private-1d-us-east-1, path of releng/mbsdiff-cache/
 # Trailing slash is important, due to prefix permissions in S3.
 S3_BUCKET_AND_PATH=$(jq -r '.scopes[] | select(contains ("auth:aws-s3"))' /home/worker/task.json | awk -F: '{print $4}')
 
 # Will be empty if there's no scope for AWS S3.
-if [ -n "${S3_BUCKET_AND_PATH}" ]; then
+if [ -n "${S3_BUCKET_AND_PATH}" ] && getent hosts taskcluster
+then
   # Does this parse as we expect?
   S3_PATH=${S3_BUCKET_AND_PATH#*/}
   AWS_BUCKET_NAME=${S3_BUCKET_AND_PATH%/${S3_PATH}*}
   test "${S3_PATH}"
   test "${AWS_BUCKET_NAME}"
 
   set +x  # Don't echo these.
   secret_url="taskcluster/auth/v1/aws/s3/read-write/${AWS_BUCKET_NAME}/${S3_PATH}"
--- a/taskcluster/docker/funsize-update-generator/scripts/funsize.py
+++ b/taskcluster/docker/funsize-update-generator/scripts/funsize.py
@@ -1,35 +1,44 @@
 #!/usr/bin/env python3
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, division, print_function
 
+import asyncio
+import aiohttp
 import configparser
 import argparse
 import hashlib
 import json
 import logging
 import os
 import shutil
 import tempfile
 import time
 import requests
 import sh
 
 import redo
+from scriptworker.utils import retry_async
 from mardor.reader import MarReader
 from mardor.signing import get_keysize
 
 from datadog import initialize, ThreadStats
 
 
 log = logging.getLogger(__name__)
+
+# Create this even when not sending metrics, so the context manager
+# statements work.
+ddstats = ThreadStats(namespace='releng.releases.partials')
+
+
 ALLOWED_URL_PREFIXES = [
     "http://download.cdn.mozilla.net/pub/mozilla.org/firefox/nightly/",
     "http://download.cdn.mozilla.net/pub/firefox/nightly/",
     "https://mozilla-nightly-updates.s3.amazonaws.com",
     "https://queue.taskcluster.net/",
     "http://ftp.mozilla.org/",
     "http://download.mozilla.org/",
     "https://archive.mozilla.org/",
@@ -66,131 +75,178 @@ def get_secret(secret_name):
     # 403: If unauthorized, just give up.
     if r.status_code == 403:
         log.info("Unable to get secret key")
         return {}
     r.raise_for_status()
     return r.json().get('secret', {})
 
 
-@redo.retriable()
-def download(url, dest, mode=None):
-    log.debug("Downloading %s to %s", url, dest)
-    r = requests.get(url)
-    r.raise_for_status()
+async def retry_download(*args, **kwargs):  # noqa: E999
+    """Retry download() calls."""
+    await retry_async(
+        download,
+        retry_exceptions=(
+            aiohttp.ClientError
+        ),
+        args=args,
+        kwargs=kwargs
+    )
+
+
+async def download(url, dest, mode=None):  # noqa: E999
+    log.info("Downloading %s to %s", url, dest)
 
     bytes_downloaded = 0
-    with open(dest, 'wb') as fd:
-        for chunk in r.iter_content(4096):
-            fd.write(chunk)
-            bytes_downloaded += len(chunk)
 
-    log.debug('Downloaded %s bytes', bytes_downloaded)
-    if 'content-length' in r.headers:
-        log.debug('Content-Length: %s bytes', r.headers['content-length'])
-        if bytes_downloaded != int(r.headers['content-length']):
-            raise IOError('Unexpected number of bytes downloaded')
+    async with aiohttp.ClientSession(raise_for_status=True) as session:
+        async with session.get(url) as resp:
+            with open(dest, 'wb') as fd:
+                while True:
+                    chunk = await resp.content.read(4096)
+                    if not chunk:
+                        break
+                    fd.write(chunk)
+                    bytes_downloaded += len(chunk)
 
-    if mode:
-        log.debug("chmod %o %s", mode, dest)
-        os.chmod(dest, mode)
+            log.debug('Downloaded %s bytes', bytes_downloaded)
+            if 'content-length' in resp.headers:
+                log.debug('Content-Length: %s bytes', resp.headers['content-length'])
+                if bytes_downloaded != int(resp.headers['content-length']):
+                    raise IOError('Unexpected number of bytes downloaded')
+
+            if mode:
+                log.debug("chmod %o %s", mode, dest)
+                os.chmod(dest, mode)
 
 
-def unpack(work_env, mar, dest_dir):
+async def run_command(cmd, cwd='/', env=None, label=None, silent=False):
+    if not env:
+        env = dict()
+    process = await asyncio.create_subprocess_shell(cmd,
+                                                    stdout=asyncio.subprocess.PIPE,
+                                                    stderr=asyncio.subprocess.STDOUT,
+                                                    cwd=cwd, env=env)
+    stdout, stderr = await process.communicate()
+
+    await process.wait()
+
+    if silent:
+        return
+
+    if not stderr:
+        stderr = ""
+    if not stdout:
+        stdout = ""
+
+    label = "{}: ".format(label)
+
+    for line in stdout.splitlines():
+        log.debug("%s%s", label, line.decode('utf-8'))
+    for line in stderr.splitlines():
+        log.warn("%s%s", label, line.decode('utf-8'))
+
+
+async def unpack(work_env, mar, dest_dir):
     os.mkdir(dest_dir)
-    unwrap_cmd = sh.Command(os.path.join(work_env.workdir,
-                                         "unwrap_full_update.pl"))
     log.debug("Unwrapping %s", mar)
     env = work_env.env
     if not is_lzma_compressed_mar(mar):
         env['MAR_OLD_FORMAT'] = '1'
     elif 'MAR_OLD_FORMAT' in env:
         del env['MAR_OLD_FORMAT']
-    out = unwrap_cmd(mar, _cwd=dest_dir, _env=env, _timeout=240,
-                     _err_to_out=True)
-    if out:
-        log.debug(out)
+
+    cmd = "{} {}".format(work_env.paths['unwrap_full_update.pl'], mar)
+    await run_command(cmd, cwd=dest_dir, env=env, label=dest_dir)
 
 
 def find_file(directory, filename):
     log.debug("Searching for %s in %s", filename, directory)
-    for root, dirs, files in os.walk(directory):
+    for root, _, files in os.walk(directory):
         if filename in files:
             f = os.path.join(root, filename)
             log.debug("Found %s", f)
             return f
 
 
 def get_option(directory, filename, section, option):
-    log.debug("Exctracting [%s]: %s from %s/**/%s", section, option, directory,
+    log.debug("Extracting [%s]: %s from %s/**/%s", section, option, directory,
               filename)
     f = find_file(directory, filename)
     config = configparser.ConfigParser()
     config.read(f)
     rv = config.get(section, option)
     log.debug("Found %s", rv)
     return rv
 
 
-def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids,
-                     version, use_old_format):
-    log.debug("Generating partial %s", dest_mar)
+async def generate_partial(work_env, from_dir, to_dir, dest_mar, channel_ids,
+                           version, use_old_format):
+    log.info("Generating partial %s", dest_mar)
     env = work_env.env
     env["MOZ_PRODUCT_VERSION"] = version
     env["MOZ_CHANNEL_ID"] = channel_ids
     if use_old_format:
         env['MAR_OLD_FORMAT'] = '1'
     elif 'MAR_OLD_FORMAT' in env:
         del env['MAR_OLD_FORMAT']
     make_incremental_update = os.path.join(work_env.workdir,
                                            "make_incremental_update.sh")
-    out = sh.bash(make_incremental_update, dest_mar, from_dir, to_dir,
-                  _cwd=work_env.workdir, _env=env, _timeout=900,
-                  _err_to_out=True)
-    if out:
-        log.debug(out)
+    cmd = " ".join([make_incremental_update, dest_mar, from_dir, to_dir])
+
+    await run_command(cmd, cwd=work_env.workdir, env=env, label=dest_mar.split('/')[-1])
 
 
 def get_hash(path, hash_type="sha512"):
     h = hashlib.new(hash_type)
     with open(path, "rb") as f:
         h.update(f.read())
     return h.hexdigest()
 
 
 class WorkEnv(object):
 
     def __init__(self):
         self.workdir = tempfile.mkdtemp()
+        self.paths = {
+            'unwrap_full_update.pl': os.path.join(self.workdir, 'unwrap_full_update.pl'),
+            'mar': os.path.join(self.workdir, 'mar'),
+            'mbsdiff': os.path.join(self.workdir, 'mbsdiff')
+        }
 
-    def setup(self):
-        self.download_unwrap()
-        self.download_martools()
+    async def setup(self):
+        await self.download_unwrap()
+        await self.download_martools()
 
-    def download_unwrap(self):
+    async def clone(self, workenv):
+        for path in workenv.paths:
+            if os.path.exists(self.paths[path]):
+                os.unlink(self.paths[path])
+            os.link(workenv.paths[path], self.paths[path])
+
+    async def download_unwrap(self):
         # unwrap_full_update.pl is not too sensitive to the revision
         url = "https://hg.mozilla.org/mozilla-central/raw-file/default/" \
             "tools/update-packaging/unwrap_full_update.pl"
-        download(url, dest=os.path.join(self.workdir, "unwrap_full_update.pl"),
-                 mode=0o755)
+        await retry_download(url, dest=self.paths['unwrap_full_update.pl'], mode=0o755)
 
-    def download_buildsystem_bits(self, repo, revision):
+    async def download_buildsystem_bits(self, repo, revision):
         prefix = "{repo}/raw-file/{revision}/tools/update-packaging"
         prefix = prefix.format(repo=repo, revision=revision)
-        for f in ("make_incremental_update.sh", "common.sh"):
+        for f in ('make_incremental_update.sh', 'common.sh'):
             url = "{prefix}/{f}".format(prefix=prefix, f=f)
-            download(url, dest=os.path.join(self.workdir, f), mode=0o755)
+            await retry_download(url, dest=os.path.join(self.workdir, f), mode=0o755)
 
-    def download_martools(self):
+    async def download_martools(self):
         # TODO: check if the tools have to be branch specific
         prefix = "https://ftp.mozilla.org/pub/mozilla.org/firefox/nightly/" \
             "latest-mozilla-central/mar-tools/linux64"
-        for f in ("mar", "mbsdiff"):
+        for f in ('mar', 'mbsdiff'):
             url = "{prefix}/{f}".format(prefix=prefix, f=f)
-            download(url, dest=os.path.join(self.workdir, f), mode=0o755)
+            await retry_download(url, dest=self.paths[f], mode=0o755)
 
     def cleanup(self):
         shutil.rmtree(self.workdir)
 
     @property
     def env(self):
         my_env = os.environ.copy()
         my_env['LC_ALL'] = 'C'
@@ -201,16 +257,161 @@ class WorkEnv(object):
 
 def verify_allowed_url(mar):
     if not any(mar.startswith(prefix) for prefix in ALLOWED_URL_PREFIXES):
         raise ValueError("{mar} is not in allowed URL prefixes: {p}".format(
             mar=mar, p=ALLOWED_URL_PREFIXES
         ))
 
 
+async def manage_partial(partial_def, work_env, filename_template, artifacts_dir, signing_certs):
+    """Manage the creation of partial mars based on payload."""
+    for mar in (partial_def["from_mar"], partial_def["to_mar"]):
+        verify_allowed_url(mar)
+
+    complete_mars = {}
+    use_old_format = False
+
+    for mar_type, f in (("from", partial_def["from_mar"]), ("to", partial_def["to_mar"])):
+        dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type))
+        unpack_dir = os.path.join(work_env.workdir, mar_type)
+
+        with ddstats.timer('mar.download.time'):
+            await retry_download(f, dest)
+
+        if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"):
+            verify_signature(dest, signing_certs)
+
+        complete_mars["%s_size" % mar_type] = os.path.getsize(dest)
+        complete_mars["%s_hash" % mar_type] = get_hash(dest)
+
+        with ddstats.timer('mar.unpack.time'):
+            await unpack(work_env, dest, unpack_dir)
+
+        if mar_type == 'from':
+            version = get_option(unpack_dir, filename="application.ini",
+                                 section="App", option="Version")
+            major = int(version.split(".")[0])
+            # The updater for versions less than 56.0 requires BZ2
+            # compressed MAR files
+            if major < 56:
+                use_old_format = True
+                log.info("Forcing BZ2 compression for %s", f)
+
+        log.info("AV-scanning %s ...", unpack_dir)
+        metric_tags = [
+            "platform:{}".format(partial_def['platform']),
+        ]
+        with ddstats.timer('mar.clamscan.time', tags=metric_tags):
+            await run_command("clamscan -r {}".format(unpack_dir), label='clamscan')
+        log.info("Done.")
+
+    to_path = os.path.join(work_env.workdir, "to")
+    from_path = os.path.join(work_env.workdir, "from")
+
+    mar_data = {
+        "ACCEPTED_MAR_CHANNEL_IDS": get_option(
+            to_path, filename="update-settings.ini", section="Settings",
+            option="ACCEPTED_MAR_CHANNEL_IDS"),
+        "version": get_option(to_path, filename="application.ini",
+                              section="App", option="Version"),
+        "to_buildid": get_option(to_path, filename="application.ini",
+                                 section="App", option="BuildID"),
+        "from_buildid": get_option(from_path, filename="application.ini",
+                                   section="App", option="BuildID"),
+        "appName": get_option(from_path, filename="application.ini",
+                              section="App", option="Name"),
+        # Use Gecko repo and rev from platform.ini, not application.ini
+        "repo": get_option(to_path, filename="platform.ini", section="Build",
+                           option="SourceRepository"),
+        "revision": get_option(to_path, filename="platform.ini",
+                               section="Build", option="SourceStamp"),
+        "from_mar": partial_def["from_mar"],
+        "to_mar": partial_def["to_mar"],
+        "platform": partial_def["platform"],
+        "locale": partial_def["locale"],
+    }
+    # Override ACCEPTED_MAR_CHANNEL_IDS if needed
+    if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ:
+        mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"]
+    for field in ("update_number", "previousVersion", "previousBuildNumber",
+                  "toVersion", "toBuildNumber"):
+        if field in partial_def:
+            mar_data[field] = partial_def[field]
+    mar_data.update(complete_mars)
+
+    # if branch not set explicitly use repo-name
+    mar_data['branch'] = partial_def.get('branch', mar_data['repo'].rstrip('/').split('/')[-1])
+
+    if 'dest_mar' in partial_def:
+        mar_name = partial_def['dest_mar']
+    else:
+        # default to formatted name if not specified
+        mar_name = filename_template.format(**mar_data)
+
+    mar_data['mar'] = mar_name
+    dest_mar = os.path.join(work_env.workdir, mar_name)
+
+    # TODO: download these once
+    await work_env.download_buildsystem_bits(repo=mar_data["repo"],
+                                             revision=mar_data["revision"])
+
+    metric_tags = [
+        "branch:{}".format(mar_data['branch']),
+        "platform:{}".format(mar_data['platform']),
+        # If required. Shouldn't add much useful info, but increases
+        # cardinality of metrics substantially, so avoided.
+        # "locale:{}".format(mar_data['locale']),
+    ]
+    with ddstats.timer('generate_partial.time', tags=metric_tags):
+        await generate_partial(work_env, from_path, to_path, dest_mar,
+                               mar_data["ACCEPTED_MAR_CHANNEL_IDS"],
+                               mar_data["version"],
+                               use_old_format)
+
+    mar_data["size"] = os.path.getsize(dest_mar)
+
+    metric_tags.append("unit:bytes")
+    # Allows us to find out how many releases there were between the two,
+    # making buckets of the file sizes easier.
+    metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0)))
+    ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags)
+
+    mar_data["hash"] = get_hash(dest_mar)
+
+    shutil.copy(dest_mar, artifacts_dir)
+    work_env.cleanup()
+
+    return mar_data
+
+
+async def async_main(args, signing_certs):
+    tasks = []
+
+    master_env = WorkEnv()
+    await master_env.setup()
+
+    task = json.load(args.task_definition)
+    # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema
+    for definition in task["extra"]["funsize"]["partials"]:
+        workenv = WorkEnv()
+        await workenv.clone(master_env)
+        tasks.append(asyncio.ensure_future(manage_partial(
+            partial_def=definition,
+            filename_template=args.filename_template,
+            artifacts_dir=args.artifacts_dir,
+            work_env=workenv,
+            signing_certs=signing_certs)
+        ))
+
+    manifest = await asyncio.gather(*tasks)
+    master_env.cleanup()
+    return manifest
+
+
 def main():
 
     start = time.time()
 
     parser = argparse.ArgumentParser()
     parser.add_argument("--artifacts-dir", required=True)
     parser.add_argument("--sha1-signing-cert", required=True)
     parser.add_argument("--sha384-signing-cert", required=True)
@@ -222,37 +423,31 @@ def main():
                         help="Do not refresh ClamAV DB")
     parser.add_argument("-q", "--quiet", dest="log_level",
                         action="store_const", const=logging.WARNING,
                         default=logging.DEBUG)
     args = parser.parse_args()
 
     logging.basicConfig(format="%(asctime)s - %(levelname)s - %(message)s")
     log.setLevel(args.log_level)
-    task = json.load(args.task_definition)
-    # TODO: verify task["extra"]["funsize"]["partials"] with jsonschema
 
     signing_certs = {
         'sha1': open(args.sha1_signing_cert, 'rb').read(),
         'sha384': open(args.sha384_signing_cert, 'rb').read(),
     }
 
     assert(get_keysize(signing_certs['sha1']) == 2048)
     assert(get_keysize(signing_certs['sha384']) == 4096)
 
     # Intended for local testing.
     dd_api_key = os.environ.get('DATADOG_API_KEY')
     # Intended for Taskcluster.
     if not dd_api_key and os.environ.get('DATADOG_API_SECRET'):
         dd_api_key = get_secret(os.environ.get('DATADOG_API_SECRET')).get('key')
 
-    # Create this even when not sending metrics, so the context manager
-    # statements work.
-    ddstats = ThreadStats(namespace='releng.releases.partials')
-
     if dd_api_key:
         dd_options = {
             'api_key': dd_api_key,
         }
         log.info("Starting metric collection")
         initialize(**dd_options)
         ddstats.start(flush_interval=1)
     else:
@@ -264,139 +459,35 @@ def main():
         log.info("Refreshing clamav db...")
         try:
             redo.retry(lambda: sh.freshclam("--stdout", "--verbose",
                                             _timeout=300, _err_to_out=True))
             log.info("Done.")
         except sh.ErrorReturnCode:
             log.warning("Freshclam failed, skipping DB update")
 
-    manifest = []
-    for e in task["extra"]["funsize"]["partials"]:
-        for mar in (e["from_mar"], e["to_mar"]):
-            verify_allowed_url(mar)
-
-        work_env = WorkEnv()
-        # TODO: run setup once
-        work_env.setup()
-        complete_mars = {}
-        use_old_format = False
-        for mar_type, f in (("from", e["from_mar"]), ("to", e["to_mar"])):
-            dest = os.path.join(work_env.workdir, "{}.mar".format(mar_type))
-            unpack_dir = os.path.join(work_env.workdir, mar_type)
-            with ddstats.timer('mar.download.time'):
-                download(f, dest)
-            if not os.getenv("MOZ_DISABLE_MAR_CERT_VERIFICATION"):
-                verify_signature(dest, signing_certs)
-            complete_mars["%s_size" % mar_type] = os.path.getsize(dest)
-            complete_mars["%s_hash" % mar_type] = get_hash(dest)
-            with ddstats.timer('mar.unpack.time'):
-                unpack(work_env, dest, unpack_dir)
-            if mar_type == 'from':
-                version = get_option(unpack_dir, filename="application.ini",
-                                     section="App", option="Version")
-                major = int(version.split(".")[0])
-                # The updater for versions less than 56.0 requires BZ2
-                # compressed MAR files
-                if major < 56:
-                    use_old_format = True
-                    log.info("Forcing BZ2 compression for %s", f)
-            log.info("AV-scanning %s ...", unpack_dir)
-            metric_tags = [
-                "platform:{}".format(e['platform']),
-            ]
-            with ddstats.timer('mar.clamscan.time', tags=metric_tags):
-                sh.clamscan("-r", unpack_dir, _timeout=600, _err_to_out=True)
-            log.info("Done.")
-
-        path = os.path.join(work_env.workdir, "to")
-        from_path = os.path.join(work_env.workdir, "from")
-        mar_data = {
-            "ACCEPTED_MAR_CHANNEL_IDS": get_option(
-                path, filename="update-settings.ini", section="Settings",
-                option="ACCEPTED_MAR_CHANNEL_IDS"),
-            "version": get_option(path, filename="application.ini",
-                                  section="App", option="Version"),
-            "to_buildid": get_option(path, filename="application.ini",
-                                     section="App", option="BuildID"),
-            "from_buildid": get_option(from_path, filename="application.ini",
-                                       section="App", option="BuildID"),
-            "appName": get_option(from_path, filename="application.ini",
-                                  section="App", option="Name"),
-            # Use Gecko repo and rev from platform.ini, not application.ini
-            "repo": get_option(path, filename="platform.ini", section="Build",
-                               option="SourceRepository"),
-            "revision": get_option(path, filename="platform.ini",
-                                   section="Build", option="SourceStamp"),
-            "from_mar": e["from_mar"],
-            "to_mar": e["to_mar"],
-            "platform": e["platform"],
-            "locale": e["locale"],
-        }
-        # Override ACCEPTED_MAR_CHANNEL_IDS if needed
-        if "ACCEPTED_MAR_CHANNEL_IDS" in os.environ:
-            mar_data["ACCEPTED_MAR_CHANNEL_IDS"] = os.environ["ACCEPTED_MAR_CHANNEL_IDS"]
-        for field in ("update_number", "previousVersion",
-                      "previousBuildNumber", "toVersion",
-                      "toBuildNumber"):
-            if field in e:
-                mar_data[field] = e[field]
-        mar_data.update(complete_mars)
-        # if branch not set explicitly use repo-name
-        mar_data["branch"] = e.get("branch",
-                                   mar_data["repo"].rstrip("/").split("/")[-1])
-        if 'dest_mar' in e:
-            mar_name = e['dest_mar']
-        else:
-            # default to formatted name if not specified
-            mar_name = args.filename_template.format(**mar_data)
-        mar_data["mar"] = mar_name
-        dest_mar = os.path.join(work_env.workdir, mar_name)
-        # TODO: download these once
-        work_env.download_buildsystem_bits(repo=mar_data["repo"],
-                                           revision=mar_data["revision"])
-
-        metric_tags = [
-            "branch:{}".format(mar_data['branch']),
-            "platform:{}".format(mar_data['platform']),
-            # If required. Shouldn't add much useful info, but increases
-            # cardinality of metrics substantially, so avoided.
-            # "locale:{}".format(mar_data['locale']),
-        ]
-
-        with ddstats.timer('generate_partial.time', tags=metric_tags):
-            generate_partial(work_env, from_path, path, dest_mar,
-                             mar_data["ACCEPTED_MAR_CHANNEL_IDS"],
-                             mar_data["version"],
-                             use_old_format)
-
-        mar_data["size"] = os.path.getsize(dest_mar)
-        metric_tags.append("unit:bytes")
-        # Allows us to find out how many releases there were between the two,
-        # making buckets of the file sizes easier.
-        metric_tags.append("update_number:{}".format(mar_data.get('update_number', 0)))
-        ddstats.gauge('partial_mar_size', mar_data['size'], tags=metric_tags)
-
-        mar_data["hash"] = get_hash(dest_mar)
-
-        shutil.copy(dest_mar, args.artifacts_dir)
-        work_env.cleanup()
-        manifest.append(mar_data)
+    loop = asyncio.get_event_loop()
+    manifest = loop.run_until_complete(async_main(args, signing_certs))
+    loop.close()
 
     manifest_file = os.path.join(args.artifacts_dir, "manifest.json")
     with open(manifest_file, "w") as fp:
         json.dump(manifest, fp, indent=2, sort_keys=True)
 
+    log.debug("{}".format(json.dumps(manifest, indent=2, sort_keys=True)))
+
     # Warning: Assumption that one partials task will always be for one branch.
     metric_tags = [
-        "branch:{}".format(mar_data['branch']),
+        "branch:{}".format(manifest[0]['branch']),
     ]
 
     ddstats.timing('task_duration', time.time() - start,
                    start, tags=metric_tags)
+
     # Wait for all the metrics to flush. If the program ends before
     # they've been sent, they'll be dropped.
     # Should be more than the flush_interval for the ThreadStats object
-    time.sleep(10)
+    if dd_api_key:
+        time.sleep(10)
 
 
 if __name__ == '__main__':
     main()
--- a/tools/lint/py2.yml
+++ b/tools/lint/py2.yml
@@ -29,16 +29,17 @@ py2:
         - probes/trace-gen.py
         - python/devtools
         - python/mach
         - python/mozbuild
         - python/mozversioncontrol
         - security
         - services/common/tests/mach_commands.py
         - servo
+        - taskcluster/docker/funsize-update-generator
         - testing/awsy
         - testing/firefox-ui
         - testing/geckodriver
         - testing/gtest
         - testing/marionette
         - testing/mochitest
         - testing/mozharness
         - testing/remotecppunittests.py