Bug 1502253: [taskgraph] Give reasonable error message when an error is made in task.run blocks; r=dustin,aki
authorTom Prince <mozilla@hocat.ca>
Fri, 26 Oct 2018 19:02:36 +0000
changeset 443221 bb0a820903b98831234e8bfae11f6d209c7729dc
parent 443220 80472227ba8906853affcd7b79df78a7a33cff38
child 443222 c6ccc706547610c6897ead77343a2ec62835ec1f
push id34944
push userncsoregi@mozilla.com
push dateSat, 27 Oct 2018 09:49:55 +0000
treeherdermozilla-central@49d47a692ca4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin, aki
bugs1502253
milestone65.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1502253: [taskgraph] Give reasonable error message when an error is made in task.run blocks; r=dustin,aki Differential Revision: https://phabricator.services.mozilla.com/D9863
taskcluster/taskgraph/transforms/task.py
--- a/taskcluster/taskgraph/transforms/task.py
+++ b/taskcluster/taskgraph/transforms/task.py
@@ -213,430 +213,20 @@ task_description_schema = Schema({
 
     # Whether the job should use sccache compiler caching.
     Required('needs-sccache'): bool,
 
     # Set of artifacts relevant to release tasks
     Optional('release-artifacts'): [basestring],
 
     # information specific to the worker implementation that will run this task
-    'worker': Any({
-        Required('implementation'): 'docker-worker',
-        Required('os'): 'linux',
-
-        # For tasks that will run in docker-worker or docker-engine, this is the
-        # name of the docker image or in-tree docker image to run the task in.  If
-        # in-tree, then a dependency will be created automatically.  This is
-        # generally `desktop-test`, or an image that acts an awful lot like it.
-        Required('docker-image'): Any(
-            # a raw Docker image path (repo/image:tag)
-            basestring,
-            # an in-tree generated docker image (from `taskcluster/docker/<name>`)
-            {'in-tree': basestring},
-            # an indexed docker image
-            {'indexed': basestring},
-        ),
-
-        # worker features that should be enabled
-        Required('relengapi-proxy'): bool,
-        Required('chain-of-trust'): bool,
-        Required('taskcluster-proxy'): bool,
-        Required('allow-ptrace'): bool,
-        Required('loopback-video'): bool,
-        Required('loopback-audio'): bool,
-        Required('docker-in-docker'): bool,  # (aka 'dind')
-        Required('privileged'): bool,
-
-        # Paths to Docker volumes.
-        #
-        # For in-tree Docker images, volumes can be parsed from Dockerfile.
-        # This only works for the Dockerfile itself: if a volume is defined in
-        # a base image, it will need to be declared here. Out-of-tree Docker
-        # images will also require explicit volume annotation.
-        #
-        # Caches are often mounted to the same path as Docker volumes. In this
-        # case, they take precedence over a Docker volume. But a volume still
-        # needs to be declared for the path.
-        Optional('volumes'): [basestring],
-
-        # caches to set up for the task
-        Optional('caches'): [{
-            # only one type is supported by any of the workers right now
-            'type': 'persistent',
-
-            # name of the cache, allowing re-use by subsequent tasks naming the
-            # same cache
-            'name': basestring,
-
-            # location in the task image where the cache will be mounted
-            'mount-point': basestring,
-
-            # Whether the cache is not used in untrusted environments
-            # (like the Try repo).
-            Optional('skip-untrusted'): bool,
-        }],
-
-        # artifacts to extract from the task image after completion
-        Optional('artifacts'): [{
-            # type of artifact -- simple file, or recursive directory
-            'type': Any('file', 'directory'),
-
-            # task image path from which to read artifact
-            'path': basestring,
-
-            # name of the produced artifact (root of the names for
-            # type=directory)
-            'name': basestring,
-        }],
-
-        # environment variables
-        Required('env'): {basestring: taskref_or_string},
-
-        # the command to run; if not given, docker-worker will default to the
-        # command in the docker image
-        Optional('command'): [taskref_or_string],
-
-        # the maximum time to run, in seconds
-        Required('max-run-time'): int,
-
-        # the exit status code(s) that indicates the task should be retried
-        Optional('retry-exit-status'): [int],
-
-        # the exit status code(s) that indicates the caches used by the task
-        # should be purged
-        Optional('purge-caches-exit-status'): [int],
-
-        # Wether any artifacts are assigned to this worker
-        Optional('skip-artifacts'): bool,
-    }, {
-        Required('implementation'): 'generic-worker',
-        Required('os'): Any('windows', 'macosx', 'linux'),
-        # see http://schemas.taskcluster.net/generic-worker/v1/payload.json
-        # and https://docs.taskcluster.net/reference/workers/generic-worker/payload
-
-        # command is a list of commands to run, sequentially
-        # on Windows, each command is a string, on OS X and Linux, each command is
-        # a string array
-        Required('command'): Any(
-            [taskref_or_string],   # Windows
-            [[taskref_or_string]]  # Linux / OS X
-        ),
-
-        # artifacts to extract from the task image after completion; note that artifacts
-        # for the generic worker cannot have names
-        Optional('artifacts'): [{
-            # type of artifact -- simple file, or recursive directory
-            'type': Any('file', 'directory'),
-
-            # filesystem path from which to read artifact
-            'path': basestring,
-
-            # if not specified, path is used for artifact name
-            Optional('name'): basestring
-        }],
-
-        # Directories and/or files to be mounted.
-        # The actual allowed combinations are stricter than the model below,
-        # but this provides a simple starting point.
-        # See https://docs.taskcluster.net/reference/workers/generic-worker/payload
-        Optional('mounts'): [{
-            # A unique name for the cache volume, implies writable cache directory
-            # (otherwise mount is a read-only file or directory).
-            Optional('cache-name'): basestring,
-            # Optional content for pre-loading cache, or mandatory content for
-            # read-only file or directory. Pre-loaded content can come from either
-            # a task artifact or from a URL.
-            Optional('content'): {
-
-                # *** Either (artifact and task-id) or url must be specified. ***
-
-                # Artifact name that contains the content.
-                Optional('artifact'): basestring,
-                # Task ID that has the artifact that contains the content.
-                Optional('task-id'): taskref_or_string,
-                # URL that supplies the content in response to an unauthenticated
-                # GET request.
-                Optional('url'): basestring
-            },
-
-            # *** Either file or directory must be specified. ***
-
-            # If mounting a cache or read-only directory, the filesystem location of
-            # the directory should be specified as a relative path to the task
-            # directory here.
-            Optional('directory'): basestring,
-            # If mounting a file, specify the relative path within the task
-            # directory to mount the file (the file will be read only).
-            Optional('file'): basestring,
-            # Required if and only if `content` is specified and mounting a
-            # directory (not a file). This should be the archive format of the
-            # content (either pre-loaded cache or read-only directory).
-            Optional('format'): Any('rar', 'tar.bz2', 'tar.gz', 'zip')
-        }],
-
-        # environment variables
-        Required('env'): {basestring: taskref_or_string},
-
-        # the maximum time to run, in seconds
-        Required('max-run-time'): int,
-
-        # os user groups for test task workers
-        Optional('os-groups'): [basestring],
-
-        # feature for test task to run as administarotr
-        Optional('run-as-administrator'): bool,
-
-        # optional features
-        Required('chain-of-trust'): bool,
-        Optional('taskcluster-proxy'): bool,
-
-        # Wether any artifacts are assigned to this worker
-        Optional('skip-artifacts'): bool,
-    }, {
-        Required('implementation'): 'native-engine',
-        Required('os'): Any('macosx', 'linux'),
-
-        # the maximum time to run, in seconds
-        Required('max-run-time'): int,
-
-        # A link for an executable to download
-        Optional('context'): basestring,
-
-        # Tells the worker whether machine should reboot
-        # after the task is finished.
-        Optional('reboot'):
-            Any('always', 'on-exception', 'on-failure'),
-
-        # the command to run
-        Optional('command'): [taskref_or_string],
-
-        # environment variables
-        Optional('env'): {basestring: taskref_or_string},
-
-        # artifacts to extract from the task image after completion
-        Optional('artifacts'): [{
-            # type of artifact -- simple file, or recursive directory
-            Required('type'): Any('file', 'directory'),
-
-            # task image path from which to read artifact
-            Required('path'): basestring,
-
-            # name of the produced artifact (root of the names for
-            # type=directory)
-            Required('name'): basestring,
-        }],
-        # Wether any artifacts are assigned to this worker
-        Optional('skip-artifacts'): bool,
-    }, {
-        Required('implementation'): 'script-engine-autophone',
-        Required('os'): Any('macosx', 'linux'),
-
-        # A link for an executable to download
-        Optional('context'): basestring,
-
-        # Tells the worker whether machine should reboot
-        # after the task is finished.
-        Optional('reboot'):
-            Any(False, 'always', 'never', 'on-exception', 'on-failure'),
-
-        # the command to run
-        Optional('command'): [taskref_or_string],
-
-        # environment variables
-        Optional('env'): {basestring: taskref_or_string},
-
-        # artifacts to extract from the task image after completion
-        Optional('artifacts'): [{
-            # type of artifact -- simple file, or recursive directory
-            Required('type'): Any('file', 'directory'),
-
-            # task image path from which to read artifact
-            Required('path'): basestring,
-
-            # name of the produced artifact (root of the names for
-            # type=directory)
-            Required('name'): basestring,
-        }],
-    }, {
-        Required('implementation'): 'scriptworker-signing',
-
-        # the maximum time to run, in seconds
-        Required('max-run-time'): int,
-
-        # list of artifact URLs for the artifacts that should be signed
-        Required('upstream-artifacts'): [{
-            # taskId of the task with the artifact
-            Required('taskId'): taskref_or_string,
-
-            # type of signing task (for CoT)
-            Required('taskType'): basestring,
-
-            # Paths to the artifacts to sign
-            Required('paths'): [basestring],
-
-            # Signing formats to use on each of the paths
-            Required('formats'): [basestring],
-        }],
-    }, {
-        Required('implementation'): 'binary-transparency',
-    }, {
-        Required('implementation'): 'beetmover',
-
-        # the maximum time to run, in seconds
-        Required('max-run-time', default=600): int,
-
-        # locale key, if this is a locale beetmover job
-        Optional('locale'): basestring,
-
-        Optional('partner-public'): bool,
-
-        Required('release-properties'): {
-            'app-name': basestring,
-            'app-version': basestring,
-            'branch': basestring,
-            'build-id': basestring,
-            'hash-type': basestring,
-            'platform': basestring,
-        },
-
-        # list of artifact URLs for the artifacts that should be beetmoved
-        Required('upstream-artifacts'): [{
-            # taskId of the task with the artifact
-            Required('taskId'): taskref_or_string,
-
-            # type of signing task (for CoT)
-            Required('taskType'): basestring,
-
-            # Paths to the artifacts to sign
-            Required('paths'): [basestring],
-
-            # locale is used to map upload path and allow for duplicate simple names
-            Required('locale'): basestring,
-        }],
-    }, {
-        Required('implementation'): 'beetmover-push-to-release',
-
-        # the maximum time to run, in seconds
-        Required('max-run-time'): int,
-        Required('product'): basestring,
-    }, {
-        Required('implementation'): 'beetmover-maven',
-
-        Required('max-run-time', default=600): int,
-        Required('release-properties'): {
-            'app-name': basestring,
-            'app-version': basestring,
-            'branch': basestring,
-            'build-id': basestring,
-            'artifact-id': basestring,
-            'hash-type': basestring,
-            'platform': basestring,
-        },
-
-        Required('upstream-artifacts'): [{
-            Required('taskId'): taskref_or_string,
-            Required('taskType'): basestring,
-            Required('paths'): [basestring],
-            Required('zipExtract', default=False): bool,
-        }],
-    }, {
-        Required('implementation'): 'balrog',
-        Required('balrog-action'): Any(*BALROG_ACTIONS),
-        Optional('product'): basestring,
-        Optional('platforms'): [basestring],
-        Optional('release-eta'): basestring,
-        Optional('channel-names'): optionally_keyed_by('release-type', [basestring]),
-        Optional('require-mirrors'): bool,
-        Optional('publish-rules'): optionally_keyed_by('release-type', 'release-level', [int]),
-        Optional('rules-to-update'): optionally_keyed_by(
-            'release-type', 'release-level', [basestring]),
-        Optional('archive-domain'): optionally_keyed_by('release-level', basestring),
-        Optional('download-domain'): optionally_keyed_by('release-level', basestring),
-        Optional('blob-suffix'): basestring,
-        Optional('complete-mar-filename-pattern'): basestring,
-        Optional('complete-mar-bouncer-product-pattern'): basestring,
-
-        # list of artifact URLs for the artifacts that should be beetmoved
-        Optional('upstream-artifacts'): [{
-            # taskId of the task with the artifact
-            Required('taskId'): taskref_or_string,
-
-            # type of signing task (for CoT)
-            Required('taskType'): basestring,
-
-            # Paths to the artifacts to sign
-            Required('paths'): [basestring],
-        }],
-    }, {
-        Required('implementation'): 'bouncer-aliases',
-        Required('entries'): object,
-    }, {
-        Required('implementation'): 'bouncer-locations',
-        Required('bouncer-products'): [basestring],
-    }, {
-        Required('implementation'): 'bouncer-submission',
-        Required('locales'): [basestring],
-        Required('entries'): object,
-    }, {
-        Required('implementation'): 'invalid',
-        # an invalid task is one which should never actually be created; this is used in
-        # release automation on branches where the task just doesn't make sense
+    'worker': {
+        Required('implementation'): basestring,
         Extra: object,
-
-    }, {
-        Required('implementation'): 'always-optimized',
-        Extra: object,
-
-    }, {
-        Required('implementation'): 'push-apk',
-        Required('upstream-artifacts'): [{
-            Required('taskId'): taskref_or_string,
-            Required('taskType'): basestring,
-            Required('paths'): [basestring],
-            Optional('optional', default=False): bool,
-        }],
-
-        # "Invalid" is a noop for try and other non-supported branches
-        Required('google-play-track'): Any('production', 'beta', 'alpha', 'rollout', 'internal'),
-        Required('commit'): bool,
-        Optional('rollout-percentage'): Any(int, None),
-    }, {
-        Required('implementation'): 'push-snap',
-        Required('upstream-artifacts'): [{
-            Required('taskId'): taskref_or_string,
-            Required('taskType'): basestring,
-            Required('paths'): [basestring],
-        }],
-    }, {
-        Required('implementation'): 'sign-and-push-addons',
-        Required('channel'): Any('listed', 'unlisted'),
-        Required('upstream-artifacts'): [{
-            Required('taskId'): taskref_or_string,
-            Required('taskType'): basestring,
-            Required('paths'): [basestring],
-        }],
-    }, {
-        Required('implementation'): 'shipit-shipped',
-        Required('release-name'): basestring,
-    }, {
-        Required('implementation'): 'shipit-started',
-        Required('release-name'): basestring,
-        Required('product'): basestring,
-        Required('branch'): basestring,
-        Required('locales'): basestring,
-    }, {
-        Required('implementation'): 'treescript',
-        Required('tags'): [Any('buildN', 'release', None)],
-        Required('bump'): bool,
-        Optional('bump-files'): [basestring],
-        Optional('repo-param-prefix'): basestring,
-        Optional('dontbuild'): bool,
-        Required('force-dry-run', default=True): bool,
-        Required('push', default=False): bool
-    }),
+    }
 })
 
 TC_TREEHERDER_SCHEMA_URL = 'https://github.com/taskcluster/taskcluster-treeherder/' \
                            'blob/master/schemas/task-treeherder-config.yml'
 
 
 UNKNOWN_GROUP_NAME = "Treeherder group {} (from {}) has no name; " \
                      "add it to taskcluster/ci/config.yml"
@@ -725,19 +315,22 @@ BRANCH_PRIORITIES = {
     'graphics': 'very-low',
     'ux': 'very-low',
 }
 
 # define a collection of payload builders, depending on the worker implementation
 payload_builders = {}
 
 
-def payload_builder(name):
+def payload_builder(name, schema):
+    schema = Schema({Required('implementation'): name}).extend(schema)
+
     def wrap(func):
         payload_builders[name] = func
+        func.schema = Schema(schema)
         return func
     return wrap
 
 
 # define a collection of index builders, depending on the type implementation
 index_builders = {}
 
 
@@ -773,17 +366,104 @@ The gecko-v2 product {product} is not in
 
 
 def verify_index(config, index):
     product = index['product']
     if product not in config.graph_config['index']['products']:
         raise Exception(UNSUPPORTED_INDEX_PRODUCT_ERROR.format(product=product))
 
 
-@payload_builder('docker-worker')
+@payload_builder('docker-worker', schema={
+    Required('os'): 'linux',
+
+    # For tasks that will run in docker-worker or docker-engine, this is the
+    # name of the docker image or in-tree docker image to run the task in.  If
+    # in-tree, then a dependency will be created automatically.  This is
+    # generally `desktop-test`, or an image that acts an awful lot like it.
+    Required('docker-image'): Any(
+        # a raw Docker image path (repo/image:tag)
+        basestring,
+        # an in-tree generated docker image (from `taskcluster/docker/<name>`)
+        {'in-tree': basestring},
+        # an indexed docker image
+        {'indexed': basestring},
+    ),
+
+    # worker features that should be enabled
+    Required('relengapi-proxy'): bool,
+    Required('chain-of-trust'): bool,
+    Required('taskcluster-proxy'): bool,
+    Required('allow-ptrace'): bool,
+    Required('loopback-video'): bool,
+    Required('loopback-audio'): bool,
+    Required('docker-in-docker'): bool,  # (aka 'dind')
+    Required('privileged'): bool,
+
+    # Paths to Docker volumes.
+    #
+    # For in-tree Docker images, volumes can be parsed from Dockerfile.
+    # This only works for the Dockerfile itself: if a volume is defined in
+    # a base image, it will need to be declared here. Out-of-tree Docker
+    # images will also require explicit volume annotation.
+    #
+    # Caches are often mounted to the same path as Docker volumes. In this
+    # case, they take precedence over a Docker volume. But a volume still
+    # needs to be declared for the path.
+    Optional('volumes'): [basestring],
+
+    # caches to set up for the task
+    Optional('caches'): [{
+        # only one type is supported by any of the workers right now
+        'type': 'persistent',
+
+        # name of the cache, allowing re-use by subsequent tasks naming the
+        # same cache
+        'name': basestring,
+
+        # location in the task image where the cache will be mounted
+        'mount-point': basestring,
+
+        # Whether the cache is not used in untrusted environments
+        # (like the Try repo).
+        Optional('skip-untrusted'): bool,
+    }],
+
+    # artifacts to extract from the task image after completion
+    Optional('artifacts'): [{
+        # type of artifact -- simple file, or recursive directory
+        'type': Any('file', 'directory'),
+
+        # task image path from which to read artifact
+        'path': basestring,
+
+        # name of the produced artifact (root of the names for
+        # type=directory)
+        'name': basestring,
+    }],
+
+    # environment variables
+    Required('env'): {basestring: taskref_or_string},
+
+    # the command to run; if not given, docker-worker will default to the
+    # command in the docker image
+    Optional('command'): [taskref_or_string],
+
+    # the maximum time to run, in seconds
+    Required('max-run-time'): int,
+
+    # the exit status code(s) that indicates the task should be retried
+    Optional('retry-exit-status'): [int],
+
+    # the exit status code(s) that indicates the caches used by the task
+    # should be purged
+    Optional('purge-caches-exit-status'): [int],
+
+    # Wether any artifacts are assigned to this worker
+    Optional('skip-artifacts'): bool,
+})
 def build_docker_worker_payload(config, task, task_def):
     worker = task['worker']
     level = int(config.params['level'])
 
     image = worker['docker-image']
     if isinstance(image, dict):
         if 'in-tree' in image:
             name = image['in-tree']
@@ -980,17 +660,100 @@ def build_docker_worker_payload(config, 
 
     # coalesce / superseding
     if 'coalesce' in task:
         payload['supersederUrl'] = superseder_url(config, task)
 
     check_caches_are_volumes(task)
 
 
-@payload_builder('generic-worker')
+@payload_builder('generic-worker', schema={
+    Required('os'): Any('windows', 'macosx', 'linux'),
+    # see http://schemas.taskcluster.net/generic-worker/v1/payload.json
+    # and https://docs.taskcluster.net/reference/workers/generic-worker/payload
+
+    # command is a list of commands to run, sequentially
+    # on Windows, each command is a string, on OS X and Linux, each command is
+    # a string array
+    Required('command'): Any(
+        [taskref_or_string],   # Windows
+        [[taskref_or_string]]  # Linux / OS X
+    ),
+
+    # artifacts to extract from the task image after completion; note that artifacts
+    # for the generic worker cannot have names
+    Optional('artifacts'): [{
+        # type of artifact -- simple file, or recursive directory
+        'type': Any('file', 'directory'),
+
+        # filesystem path from which to read artifact
+        'path': basestring,
+
+        # if not specified, path is used for artifact name
+        Optional('name'): basestring
+    }],
+
+    # Directories and/or files to be mounted.
+    # The actual allowed combinations are stricter than the model below,
+    # but this provides a simple starting point.
+    # See https://docs.taskcluster.net/reference/workers/generic-worker/payload
+    Optional('mounts'): [{
+        # A unique name for the cache volume, implies writable cache directory
+        # (otherwise mount is a read-only file or directory).
+        Optional('cache-name'): basestring,
+        # Optional content for pre-loading cache, or mandatory content for
+        # read-only file or directory. Pre-loaded content can come from either
+        # a task artifact or from a URL.
+        Optional('content'): {
+
+            # *** Either (artifact and task-id) or url must be specified. ***
+
+            # Artifact name that contains the content.
+            Optional('artifact'): basestring,
+            # Task ID that has the artifact that contains the content.
+            Optional('task-id'): taskref_or_string,
+            # URL that supplies the content in response to an unauthenticated
+            # GET request.
+            Optional('url'): basestring
+        },
+
+        # *** Either file or directory must be specified. ***
+
+        # If mounting a cache or read-only directory, the filesystem location of
+        # the directory should be specified as a relative path to the task
+        # directory here.
+        Optional('directory'): basestring,
+        # If mounting a file, specify the relative path within the task
+        # directory to mount the file (the file will be read only).
+        Optional('file'): basestring,
+        # Required if and only if `content` is specified and mounting a
+        # directory (not a file). This should be the archive format of the
+        # content (either pre-loaded cache or read-only directory).
+        Optional('format'): Any('rar', 'tar.bz2', 'tar.gz', 'zip')
+    }],
+
+    # environment variables
+    Required('env'): {basestring: taskref_or_string},
+
+    # the maximum time to run, in seconds
+    Required('max-run-time'): int,
+
+    # os user groups for test task workers
+    Optional('os-groups'): [basestring],
+
+    # feature for test task to run as administarotr
+    Optional('run-as-administrator'): bool,
+
+    # optional features
+    Required('chain-of-trust'): bool,
+    Optional('taskcluster-proxy'): bool,
+
+    # Wether any artifacts are assigned to this worker
+    Optional('skip-artifacts'): bool,
+})
 def build_generic_worker_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'command': worker['command'],
         'maxRunTime': worker['max-run-time'],
     }
 
@@ -1057,17 +820,35 @@ def build_generic_worker_payload(config,
     if features:
         task_def['payload']['features'] = features
 
     # coalesce / superseding
     if 'coalesce' in task:
         task_def['payload']['supersederUrl'] = superseder_url(config, task)
 
 
-@payload_builder('scriptworker-signing')
+@payload_builder('scriptworker-signing', schema={
+    # the maximum time to run, in seconds
+    Required('max-run-time'): int,
+
+    # list of artifact URLs for the artifacts that should be signed
+    Required('upstream-artifacts'): [{
+        # taskId of the task with the artifact
+        Required('taskId'): taskref_or_string,
+
+        # type of signing task (for CoT)
+        Required('taskType'): basestring,
+
+        # Paths to the artifacts to sign
+        Required('paths'): [basestring],
+
+        # Signing formats to use on each of the paths
+        Required('formats'): [basestring],
+    }],
+})
 def build_scriptworker_signing_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'maxRunTime': worker['max-run-time'],
         'upstreamArtifacts':  worker['upstream-artifacts']
     }
 
@@ -1077,17 +858,17 @@ def build_scriptworker_signing_payload(c
             artifacts.update(get_signed_artifacts(
                 input=path,
                 formats=upstream_artifact['formats'],
             ))
 
     task['release-artifacts'] = list(artifacts)
 
 
-@payload_builder('binary-transparency')
+@payload_builder('binary-transparency', schema={})
 def build_binary_transparency_payload(config, task, task_def):
     release_config = get_release_config(config)
 
     task_def['payload'] = {
         'version': release_config['version'],
         'chain': 'TRANSPARENCY.pem',
         'contact': task_def['metadata']['owner'],
         'maxRunTime': 600,
@@ -1098,17 +879,49 @@ def build_binary_transparency_payload(co
         ).format(
             task['shipping-product'],
             release_config['version'],
             release_config['build_number'],
         ),
     }
 
 
-@payload_builder('beetmover')
+@payload_builder('beetmover', schema={
+    # the maximum time to run, in seconds
+    Required('max-run-time', default=600): int,
+
+    # locale key, if this is a locale beetmover job
+    Optional('locale'): basestring,
+
+    Optional('partner-public'): bool,
+
+    Required('release-properties'): {
+        'app-name': basestring,
+        'app-version': basestring,
+        'branch': basestring,
+        'build-id': basestring,
+        'hash-type': basestring,
+        'platform': basestring,
+    },
+
+    # list of artifact URLs for the artifacts that should be beetmoved
+    Required('upstream-artifacts'): [{
+        # taskId of the task with the artifact
+        Required('taskId'): taskref_or_string,
+
+        # type of signing task (for CoT)
+        Required('taskType'): basestring,
+
+        # Paths to the artifacts to sign
+        Required('paths'): [basestring],
+
+        # locale is used to map upload path and allow for duplicate simple names
+        Required('locale'): basestring,
+    }],
+})
 def build_beetmover_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
     release_properties = worker['release-properties']
 
     task_def['payload'] = {
         'maxRunTime': worker['max-run-time'],
         'releaseProperties': {
@@ -1125,40 +938,89 @@ def build_beetmover_payload(config, task
     if worker.get('locale'):
         task_def['payload']['locale'] = worker['locale']
     if worker.get('partner-public'):
         task_def['payload']['is_partner_repack_public'] = worker['partner-public']
     if release_config:
         task_def['payload'].update(release_config)
 
 
-@payload_builder('beetmover-push-to-release')
+@payload_builder('beetmover-push-to-release', schema={
+    # the maximum time to run, in seconds
+    Required('max-run-time'): int,
+    Required('product'): basestring,
+})
 def build_beetmover_push_to_release_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
 
     task_def['payload'] = {
         'maxRunTime': worker['max-run-time'],
         'product': worker['product'],
         'version': release_config['version'],
         'build_number': release_config['build_number'],
     }
 
 
-@payload_builder('beetmover-maven')
+@payload_builder('beetmover-maven', schema={
+    Required('max-run-time', default=600): int,
+    Required('release-properties'): {
+        'app-name': basestring,
+        'app-version': basestring,
+        'branch': basestring,
+        'build-id': basestring,
+        'artifact-id': basestring,
+        'hash-type': basestring,
+        'platform': basestring,
+    },
+
+    Required('upstream-artifacts'): [{
+        Required('taskId'): taskref_or_string,
+        Required('taskType'): basestring,
+        Required('paths'): [basestring],
+        Required('zipExtract', default=False): bool,
+    }],
+})
 def build_beetmover_maven_payload(config, task, task_def):
     build_beetmover_payload(config, task, task_def)
 
     task_def['payload']['artifact_id'] = task['worker']['release-properties']['artifact-id']
 
     del task_def['payload']['releaseProperties']['hashType']
     del task_def['payload']['releaseProperties']['platform']
 
 
-@payload_builder('balrog')
+@payload_builder('balrog', schema={
+    Required('balrog-action'): Any(*BALROG_ACTIONS),
+    Optional('product'): basestring,
+    Optional('platforms'): [basestring],
+    Optional('release-eta'): basestring,
+    Optional('channel-names'): optionally_keyed_by('release-type', [basestring]),
+    Optional('require-mirrors'): bool,
+    Optional('publish-rules'): optionally_keyed_by('release-type', 'release-level', [int]),
+    Optional('rules-to-update'): optionally_keyed_by(
+        'release-type', 'release-level', [basestring]),
+    Optional('archive-domain'): optionally_keyed_by('release-level', basestring),
+    Optional('download-domain'): optionally_keyed_by('release-level', basestring),
+    Optional('blob-suffix'): basestring,
+    Optional('complete-mar-filename-pattern'): basestring,
+    Optional('complete-mar-bouncer-product-pattern'): basestring,
+
+    # list of artifact URLs for the artifacts that should be beetmoved
+    Optional('upstream-artifacts'): [{
+        # taskId of the task with the artifact
+        Required('taskId'): taskref_or_string,
+
+        # type of signing task (for CoT)
+        Required('taskType'): basestring,
+
+        # Paths to the artifacts to sign
+        Required('paths'): [basestring],
+    }],
+})
 def build_balrog_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
 
     if worker['balrog-action'] == 'submit-locale':
         task_def['payload'] = {
             'upstreamArtifacts':  worker['upstream-artifacts']
         }
@@ -1195,106 +1057,154 @@ def build_balrog_payload(config, task, t
             })
         else:  # schedule / ship
             task_def['payload'].update({
                 'publish_rules': worker['publish-rules'],
                 'release_eta': worker.get('release-eta', config.params.get('release_eta')) or '',
             })
 
 
-@payload_builder('bouncer-aliases')
+@payload_builder('bouncer-aliases', schema={
+    Required('entries'): object,
+})
 def build_bouncer_aliases_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'aliases_entries': worker['entries']
     }
 
 
-@payload_builder('bouncer-locations')
+@payload_builder('bouncer-locations', schema={
+    Required('implementation'): 'bouncer-locations',
+    Required('bouncer-products'): [basestring],
+})
 def build_bouncer_locations_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
 
     task_def['payload'] = {
         'bouncer_products': worker['bouncer-products'],
         'version': release_config['version'],
     }
 
 
-@payload_builder('bouncer-submission')
+@payload_builder('bouncer-submission', schema={
+    Required('locales'): [basestring],
+    Required('entries'): object,
+})
 def build_bouncer_submission_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'locales':  worker['locales'],
         'submission_entries': worker['entries']
     }
 
 
-@payload_builder('push-apk')
+@payload_builder('push-apk', schema={
+    Required('upstream-artifacts'): [{
+        Required('taskId'): taskref_or_string,
+        Required('taskType'): basestring,
+        Required('paths'): [basestring],
+        Optional('optional', default=False): bool,
+    }],
+
+    # "Invalid" is a noop for try and other non-supported branches
+    Required('google-play-track'): Any('production', 'beta', 'alpha', 'rollout', 'internal'),
+    Required('commit'): bool,
+    Optional('rollout-percentage'): Any(int, None),
+})
 def build_push_apk_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'commit': worker['commit'],
         'upstreamArtifacts': worker['upstream-artifacts'],
         'google_play_track': worker['google-play-track'],
     }
 
     if worker.get('rollout-percentage', None):
         task_def['payload']['rollout_percentage'] = worker['rollout-percentage']
 
 
-@payload_builder('push-snap')
+@payload_builder('push-snap', schema={
+    Required('upstream-artifacts'): [{
+        Required('taskId'): taskref_or_string,
+        Required('taskType'): basestring,
+        Required('paths'): [basestring],
+    }],
+})
 def build_push_snap_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'upstreamArtifacts':  worker['upstream-artifacts'],
     }
 
 
-@payload_builder('shipit-shipped')
+@payload_builder('shipit-shipped', schema={
+    Required('release-name'): basestring,
+})
 def build_ship_it_shipped_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'release_name': worker['release-name']
     }
 
 
-@payload_builder('shipit-started')
+@payload_builder('shipit-started', schema={
+    Required('release-name'): basestring,
+    Required('product'): basestring,
+    Required('branch'): basestring,
+    Required('locales'): basestring,
+})
 def build_ship_it_started_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
 
     task_def['payload'] = {
         'release_name': worker['release-name'],
         'product': worker['product'],
         'version': release_config['version'],
         'build_number': release_config['build_number'],
         'branch': worker['branch'],
         'revision': get_branch_rev(config),
         'partials': release_config.get('partial_versions', ""),
         'l10n_changesets': worker['locales'],
     }
 
 
-@payload_builder('sign-and-push-addons')
+@payload_builder('sign-and-push-addons', schema={
+    Required('channel'): Any('listed', 'unlisted'),
+    Required('upstream-artifacts'): [{
+        Required('taskId'): taskref_or_string,
+        Required('taskType'): basestring,
+        Required('paths'): [basestring],
+    }],
+})
 def build_sign_and_push_addons_payload(config, task, task_def):
     worker = task['worker']
 
     task_def['payload'] = {
         'channel': worker['channel'],
         'upstreamArtifacts': worker['upstream-artifacts'],
     }
 
 
-@payload_builder('treescript')
+@payload_builder('treescript', schema={
+    Required('tags'): [Any('buildN', 'release', None)],
+    Required('bump'): bool,
+    Optional('bump-files'): [basestring],
+    Optional('repo-param-prefix'): basestring,
+    Optional('dontbuild'): bool,
+    Required('force-dry-run', default=True): bool,
+    Required('push', default=False): bool
+})
 def build_treescript_payload(config, task, task_def):
     worker = task['worker']
     release_config = get_release_config(config)
 
     task_def['payload'] = {}
     task_def.setdefault('scopes', [])
     if worker['tags']:
         tag_names = []
@@ -1331,27 +1241,67 @@ def build_treescript_payload(config, tas
 
     if worker.get('force-dry-run'):
         task_def['payload']['dry_run'] = True
 
     if worker.get('dontbuild'):
         task_def['payload']['dontbuild'] = True
 
 
-@payload_builder('invalid')
+@payload_builder('invalid', schema={
+    # an invalid task is one which should never actually be created; this is used in
+    # release automation on branches where the task just doesn't make sense
+    Extra: object,
+})
 def build_invalid_payload(config, task, task_def):
     task_def['payload'] = 'invalid task - should never be created'
 
 
-@payload_builder('always-optimized')
+@payload_builder('always-optimized', schema={
+    Extra: object,
+})
 def build_always_optimized_payload(config, task, task_def):
     task_def['payload'] = {}
 
 
-@payload_builder('native-engine')
+@payload_builder('native-engine', schema={
+    Required('os'): Any('macosx', 'linux'),
+
+    # the maximum time to run, in seconds
+    Required('max-run-time'): int,
+
+    # A link for an executable to download
+    Optional('context'): basestring,
+
+    # Tells the worker whether machine should reboot
+    # after the task is finished.
+    Optional('reboot'):
+    Any('always', 'on-exception', 'on-failure'),
+
+    # the command to run
+    Optional('command'): [taskref_or_string],
+
+    # environment variables
+    Optional('env'): {basestring: taskref_or_string},
+
+    # artifacts to extract from the task image after completion
+    Optional('artifacts'): [{
+        # type of artifact -- simple file, or recursive directory
+        Required('type'): Any('file', 'directory'),
+
+        # task image path from which to read artifact
+        Required('path'): basestring,
+
+        # name of the produced artifact (root of the names for
+        # type=directory)
+        Required('name'): basestring,
+    }],
+    # Wether any artifacts are assigned to this worker
+    Optional('skip-artifacts'): bool,
+})
 def build_macosx_engine_payload(config, task, task_def):
     worker = task['worker']
     artifacts = map(lambda artifact: {
         'name': artifact['name'],
         'path': artifact['path'],
         'type': artifact['type'],
         'expires': task_def['expires'],
     }, worker.get('artifacts', []))
@@ -1365,17 +1315,46 @@ def build_macosx_engine_payload(config, 
     }
     if worker.get('reboot'):
         task_def['payload'] = worker['reboot']
 
     if task.get('needs-sccache'):
         raise Exception('needs-sccache not supported in native-engine')
 
 
-@payload_builder('script-engine-autophone')
+@payload_builder('script-engine-autophone', schema={
+    Required('os'): Any('macosx', 'linux'),
+
+    # A link for an executable to download
+    Optional('context'): basestring,
+
+    # Tells the worker whether machine should reboot
+    # after the task is finished.
+    Optional('reboot'):
+    Any(False, 'always', 'never', 'on-exception', 'on-failure'),
+
+    # the command to run
+    Optional('command'): [taskref_or_string],
+
+    # environment variables
+    Optional('env'): {basestring: taskref_or_string},
+
+    # artifacts to extract from the task image after completion
+    Optional('artifacts'): [{
+        # type of artifact -- simple file, or recursive directory
+        Required('type'): Any('file', 'directory'),
+
+        # task image path from which to read artifact
+        Required('path'): basestring,
+
+        # name of the produced artifact (root of the names for
+        # type=directory)
+        Required('name'): basestring,
+    }],
+})
 def build_script_engine_autophone_payload(config, task, task_def):
     worker = task['worker']
     artifacts = map(lambda artifact: {
         'name': artifact['name'],
         'path': artifact['path'],
         'type': artifact['type'],
         'expires': task_def['expires'],
     }, worker.get('artifacts', []))
@@ -1461,16 +1440,20 @@ def validate_shipping_product(config, pr
 
 
 @transforms.add
 def validate(config, tasks):
     for task in tasks:
         validate_schema(
             task_description_schema, task,
             "In task {!r}:".format(task.get('label', '?no-label?')))
+        validate_schema(
+           payload_builders[task['worker']['implementation']].schema,
+           task['worker'],
+           "In task.run {!r}:".format(task.get('label', '?no-label?')))
         if task['shipping-product'] is not None:
             validate_shipping_product(config, task['shipping-product'])
         yield task
 
 
 @index_builder('generic')
 def add_generic_index_routes(config, task):
     index = task.get('index')