Bug 1399393 Refactor create_tasks to avoid bottlenecks r=dustin
authorSimon Fraser <sfraser@mozilla.com>
Wed, 13 Sep 2017 10:43:42 +0100
changeset 430358 45fe1c0506456d86919732e4223c1d0b2772e7f3
parent 430357 14207baa622e2e51dbb524db59c10840438db82d
child 430359 b549b2ed2efcc38ec69b87ab626ca9c232738259
push id7761
push userjlund@mozilla.com
push dateFri, 15 Sep 2017 00:19:52 +0000
treeherdermozilla-beta@c38455951db4 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin
bugs1399393
milestone57.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1399393 Refactor create_tasks to avoid bottlenecks r=dustin MozReview-Commit-ID: cJW5X3HSCx
taskcluster/taskgraph/create.py
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -42,62 +42,71 @@ def create_tasks(taskgraph, label_to_tas
 
     # when running as an actual decision task, we use the decision task's
     # taskId as the taskGroupId.  The process that created the decision task
     # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
     # fall back to a slugid
     task_group_id = decision_task_id or slugid()
     scheduler_id = 'gecko-level-{}'.format(params['level'])
 
+    # Add the taskGroupId, schedulerId and optionally the decision task
+    # dependency
+    for task_id in taskgraph.graph.nodes:
+        task_def = taskgraph.tasks[task_id].task
+
+        # if this task has no dependencies *within* this taskgraph, make it
+        # depend on this decision task. If it has another dependency within
+        # the taskgraph, then it already implicitly depends on the decision
+        # task.  The result is that tasks do not start immediately. if this
+        # loop fails halfway through, none of the already-created tasks run.
+        if decision_task_id:
+            if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
+                task_def.setdefault('dependencies', []).append(decision_task_id)
+
+        task_def['taskGroupId'] = task_group_id
+        task_def['schedulerId'] = scheduler_id
+
     with futures.ThreadPoolExecutor(CONCURRENCY) as e:
         fs = {}
 
         # We can't submit a task until its dependencies have been submitted.
         # So our strategy is to walk the graph and submit tasks once all
         # their dependencies have been submitted.
-        #
-        # Using visit_postorder() here isn't the most efficient: we'll
-        # block waiting for dependencies of task N to submit even though
-        # dependencies for task N+1 may be finished. If we need to optimize
-        # this further, we can build a graph of task dependencies and walk
-        # that.
-        for task_id in taskgraph.graph.visit_postorder():
-            task_def = taskgraph.tasks[task_id].task
-            attributes = taskgraph.tasks[task_id].attributes
+        tasklist = set(taskgraph.graph.visit_postorder())
+        alltasks = tasklist.copy()
 
-            # if this task has no dependencies *within* this taskgraph, make it
-            # depend on this decision task. If it has another dependency within
-            # the taskgraph, then it already implicitly depends on the decision
-            # task.  The result is that tasks do not start immediately. if this
-            # loop fails halfway through, none of the already-created tasks run.
-            if decision_task_id:
-                if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
-                    task_def.setdefault('dependencies', []).append(decision_task_id)
+        def schedule_tasks(f=None):
+            to_remove = set()
+            for task_id in tasklist:
+                task_def = taskgraph.tasks[task_id].task
+                # If we haven't finished submitting all our dependencies yet,
+                # come back to this later.
+                # Some dependencies aren't in our graph, so make sure to filter
+                # those out
+                deps = set(task_def.get('dependencies', [])) & alltasks
+                if any((d not in fs or not fs[d].done()) for d in deps):
+                    continue
 
-            task_def['taskGroupId'] = task_group_id
-            task_def['schedulerId'] = scheduler_id
-
-            # Wait for dependencies before submitting this.
-            deps_fs = [fs[dep] for dep in task_def.get('dependencies', [])
-                       if dep in fs]
-            for f in futures.as_completed(deps_fs):
-                f.result()
+                fs[task_id] = e.submit(create_task, session, task_id,
+                                       taskid_to_label[task_id], task_def)
+                to_remove.add(task_id)
 
-            fs[task_id] = e.submit(create_task, session, task_id,
-                                   taskid_to_label[task_id], task_def)
+                # Schedule tasks as many times as task_duplicates indicates
+                attributes = taskgraph.tasks[task_id].attributes
+                for i in range(1, attributes.get('task_duplicates', 1)):
+                    # We use slugid() since we want a distinct task id
+                    fs[task_id] = e.submit(create_task, session, slugid(),
+                                           taskid_to_label[task_id], task_def)
+            tasklist.difference_update(to_remove)
 
-            # Schedule tasks as many times as task_duplicates indicates
-            for i in range(1, attributes.get('task_duplicates', 1)):
-                # We use slugid() since we want a distinct task id
-                fs[task_id] = e.submit(create_task, session, slugid(),
-                                       taskid_to_label[task_id], task_def)
-
-        # Wait for all futures to complete.
-        for f in futures.as_completed(fs.values()):
-            f.result()
+        schedule_tasks()
+        while tasklist:
+            for f in futures.as_completed(fs.values()):
+                f.result()
+            schedule_tasks()
 
 
 def create_task(session, task_id, label, task_def):
     # create the task using 'http://taskcluster/queue', which is proxied to the queue service
     # with credentials appropriate to this job.
 
     # Resolve timestamps
     now = current_json_time(datetime_format=True)