Bug 1517645 - always use a highly concurrent requests session
authorDustin J. Mitchell <dustin@mozilla.com>
Fri, 04 Jan 2019 21:42:57 +0000
changeset 511124 53380666b5c70c68da513dc8ba0c8ff7dfca48c5
parent 511123 96a7386c6e5bda914f59fd651b01ec7c03b644d9
child 511125 e256dba8d711838fdb983f287d8f0311241cb8ff
push id10547
push userffxbld-merge
push dateMon, 21 Jan 2019 13:03:58 +0000
treeherdermozilla-beta@24ec1916bffe [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
bugs1517645
milestone66.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1517645 - always use a highly concurrent requests session Reviewers: bstack Subscribers: tomprince Tags: #secure-revision Bug #: 1517645 Differential Revision: https://phabricator.services.mozilla.com/D15853
taskcluster/taskgraph/actions/cancel_all.py
taskcluster/taskgraph/create.py
taskcluster/taskgraph/util/taskcluster.py
--- a/taskcluster/taskgraph/actions/cancel_all.py
+++ b/taskcluster/taskgraph/actions/cancel_all.py
@@ -5,22 +5,23 @@
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import concurrent.futures as futures
 import logging
 import os
 
-from taskgraph.util.taskcluster import list_task_group, cancel_task
+from taskgraph.util.taskcluster import (
+    list_task_group,
+    cancel_task,
+    CONCURRENCY,
+)
 from .registry import register_callback_action
 
-# the maximum number of parallel cancelTask calls to make
-CONCURRENCY = 50
-
 logger = logging.getLogger(__name__)
 
 
 @register_callback_action(
     title='Cancel All',
     name='cancel-all',
     kind='hook',
     generic=True,
@@ -34,14 +35,14 @@ logger = logging.getLogger(__name__)
 )
 def cancel_all_action(parameters, graph_config, input, task_group_id, task_id, task):
     def do_cancel_task(task_id):
         logger.info('Cancelling task {}'.format(task_id))
         cancel_task(task_id, use_proxy=True)
 
     own_task_id = os.environ.get('TASK_ID', '')
     with futures.ThreadPoolExecutor(CONCURRENCY) as e:
-        cancels_jobs = [
+        cancel_futs = [
             e.submit(do_cancel_task, t)
             for t in list_task_group(task_group_id) if t != own_task_id
         ]
-        for job in cancels_jobs:
-            job.result()
+        for f in futures.as_completed(cancel_futs):
+            f.result()
--- a/taskcluster/taskgraph/create.py
+++ b/taskcluster/taskgraph/create.py
@@ -10,38 +10,27 @@ import requests.adapters
 import json
 import os
 import sys
 import logging
 
 from slugid import nice as slugid
 from taskgraph.util.parameterization import resolve_timestamps
 from taskgraph.util.time import current_json_time
+from taskgraph.util.taskcluster import get_session, CONCURRENCY
 
 logger = logging.getLogger(__name__)
 
-# the maximum number of parallel createTask calls to make
-CONCURRENCY = 50
-
 # this is set to true for `mach taskgraph action-callback --test`
 testing = False
 
 
 def create_tasks(taskgraph, label_to_taskid, params, decision_task_id=None):
     taskid_to_label = {t: l for l, t in label_to_taskid.iteritems()}
 
-    session = requests.Session()
-
-    # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
-    # that limit. Connections are established as needed, so using a large value
-    # should not negatively impact performance.
-    http_adapter = requests.adapters.HTTPAdapter(pool_connections=CONCURRENCY,
-                                                 pool_maxsize=CONCURRENCY)
-    session.mount('https://', http_adapter)
-    session.mount('http://', http_adapter)
 
     decision_task_id = decision_task_id or os.environ.get('TASK_ID')
 
     # when running as an actual decision task, we use the decision task's
     # taskId as the taskGroupId.  The process that created the decision task
     # helpfully placed it in this same taskGroup.  If there is no $TASK_ID,
     # fall back to a slugid
     task_group_id = decision_task_id or slugid()
@@ -61,16 +50,17 @@ def create_tasks(taskgraph, label_to_tas
             if not any(t in taskgraph.tasks for t in task_def.get('dependencies', [])):
                 task_def.setdefault('dependencies', []).append(decision_task_id)
 
         task_def['taskGroupId'] = task_group_id
         task_def['schedulerId'] = scheduler_id
 
     # If `testing` is True, then run without parallelization
     concurrency = CONCURRENCY if not testing else 1
+    session = get_session()
     with futures.ThreadPoolExecutor(concurrency) as e:
         fs = {}
 
         # We can't submit a task until its dependencies have been submitted.
         # So our strategy is to walk the graph and submit tasks once all
         # their dependencies have been submitted.
         tasklist = set(taskgraph.graph.visit_postorder())
         alltasks = tasklist.copy()
--- a/taskcluster/taskgraph/util/taskcluster.py
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -22,16 +22,19 @@ logger = logging.getLogger(__name__)
 
 # this is set to true for `mach taskgraph action-callback --test`
 testing = False
 
 # Default rootUrl to use if none is given in the environment; this should point
 # to the production Taskcluster deployment used for CI.
 PRODUCTION_TASKCLUSTER_ROOT_URL = 'https://taskcluster.net'
 
+# the maximum number of parallel Taskcluster API calls to make
+CONCURRENCY = 50
+
 
 @memoize
 def get_root_url():
     """Get the current TASKCLUSTER_ROOT_URL.  When running in a task, this must
     come from $TASKCLUSTER_ROOT_URL; when run on the command line, we apply a
     defualt that points to the production deployment of Taskcluster."""
     if 'TASKCLUSTER_ROOT_URL' not in os.environ:
         if 'TASK_ID' in os.environ:
@@ -43,20 +46,30 @@ def get_root_url():
         os.environ['TASKCLUSTER_ROOT_URL'],
         ' with taskcluster-proxy' if 'TASKCLUSTER_PROXY_URL' in os.environ else ''))
     return os.environ['TASKCLUSTER_ROOT_URL']
 
 
 @memoize
 def get_session():
     session = requests.Session()
+
     retry = Retry(total=5, backoff_factor=0.1,
                   status_forcelist=[500, 502, 503, 504])
-    session.mount('http://', HTTPAdapter(max_retries=retry))
-    session.mount('https://', HTTPAdapter(max_retries=retry))
+
+    # Default HTTPAdapter uses 10 connections. Mount custom adapter to increase
+    # that limit. Connections are established as needed, so using a large value
+    # should not negatively impact performance.
+    http_adapter = requests.adapters.HTTPAdapter(
+        pool_connections=CONCURRENCY,
+        pool_maxsize=CONCURRENCY,
+        max_retries=retry)
+    session.mount('https://', http_adapter)
+    session.mount('http://', http_adapter)
+
     return session
 
 
 def _do_request(url, force_get=False, **kwargs):
     session = get_session()
     if kwargs and not force_get:
         response = session.post(url, **kwargs)
     else: