author Carsten "Tomcat" Book <cbook@mozilla.com>
Wed, 20 Jul 2016 16:56:03 +0200
changeset 330980 f90a9f8af37c202842f5d9c5b2928004124ab5e1
parent 330581 9063b402b8596fce243f243693a64fbdf480dc2c
child 331770 9baa784f40f48fca3ffa262c7dfe3b1710f70442
permissions -rw-r--r--
Merge mozilla-central to mozilla-inbound

# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

from __future__ import absolute_import, print_function, unicode_literals

import concurrent.futures as futures
import requests
import requests.adapters
import json
import os
import logging

from slugid import nice as slugid
from taskgraph.util.time import (

logger = logging.getLogger(__name__)

def create_tasks(taskgraph, label_to_taskid):
    # TODO: use the taskGroupId of the decision task
    task_group_id = slugid()
    taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}

    session = requests.Session()

    decision_task_id = os.environ.get('TASK_ID')

    with futures.ThreadPoolExecutor(requests.adapters.DEFAULT_POOLSIZE) as e:
        fs = {}

        # We can't submit a task until its dependencies have been submitted.
        # So our strategy is to walk the graph and submit tasks once all
        # their dependencies have been submitted.
        # Using visit_postorder() here isn't the most efficient: we'll
        # block waiting for dependencies of task N to submit even though
        # dependencies for task N+1 may be finished. If we need to optimize
        # this further, we can build a graph of task dependencies and walk
        # that.
        for task_id in taskgraph.graph.visit_postorder():
            task_def = taskgraph.tasks[task_id].task
            # if this task has no dependencies, make it depend on this decision
            # task so that it does not start immediately; and so that if this loop
            # fails halfway through, none of the already-created tasks run.
            if decision_task_id and not task_def.get('dependencies'):
                task_def['dependencies'] = [decision_task_id]

            task_def['taskGroupId'] = task_group_id
            task_def['schedulerId'] = '-'

            # Wait for dependencies before submitting this.
            deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
                       if dep in fs]
            for f in futures.as_completed(deps_fs):

            fs[task_id] = e.submit(_create_task, session, task_id,
                                   taskid_to_label[task_id], task_def)

        # Wait for all futures to complete.
        for f in futures.as_completed(fs.values()):

def _create_task(session, task_id, label, task_def):
    # create the task using 'http://taskcluster/queue', which is proxied to the queue service
    # with credentials appropriate to this job.

    # Resolve timestamps
    now = current_json_time(datetime_format=True)
    task_def = resolve_timestamps(now, task_def)

    logger.debug("Creating task with taskId {} for {}".format(task_id, label))
    res = session.put('http://taskcluster/queue/v1/task/{}'.format(task_id),
    if res.status_code != 200:

def resolve_timestamps(now, task_def):
    def recurse(val):
        if isinstance(val, list):
            return [recurse(v) for v in val]
        elif isinstance(val, dict):
            if val.keys() == ['relative-datestamp']:
                return json_time_from_now(val['relative-datestamp'], now)
                return {k: recurse(v) for k, v in val.iteritems()}
            return val
    return recurse(task_def)