Bug 1341214 - Add a small API to handle taskcluster queue and index requests. r=dustin
authorMike Hommey <mh+mozilla@glandium.org>
Fri, 17 Feb 2017 12:04:48 +0900
changeset 394078 c9dabe672fd8d591d0216311d6da4e664b89eb47
parent 394077 16391584c12e420d514ff7415bc9616a2faefe48
child 394079 7d817a123f53088c543ae09fafa386c4f4105fc4
push id1468
push userasasaki@mozilla.com
push dateMon, 05 Jun 2017 19:31:07 +0000
treeherdermozilla-release@0641fc6ee9d1 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin
bugs1341214
milestone54.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1341214 - Add a small API to handle taskcluster queue and index requests. r=dustin Various modules under taskcluster are doing ad-hoc url formatting or requests to taskcluster services. While we could use the taskcluster client python module, it's kind of overkill for the simple requests done here. So instead of vendoring that module, create a smaller one with a limited set of functions we need. This changes the behavior of the get_artifact function to return a file-like object when the file is neither a json nor a yaml, but that branch was never used (and was actually returning an unassigned variable, so it was broken anyways). At the same time, make the function that does HTTP requests more error-resistant, using urllib3's Retry with a backoff factor. Also add a function that retrieves the list of artifacts, that while currently unused, will be used by `mach artifact` shortly.
taskcluster/mach_commands.py
taskcluster/taskgraph/action.py
taskcluster/taskgraph/cron/__init__.py
taskcluster/taskgraph/docker.py
taskcluster/taskgraph/parameters.py
taskcluster/taskgraph/task/base.py
taskcluster/taskgraph/task/docker_image.py
taskcluster/taskgraph/taskgraph.py
taskcluster/taskgraph/transforms/job/mozharness_test.py
taskcluster/taskgraph/transforms/signing.py
taskcluster/taskgraph/util/docker.py
taskcluster/taskgraph/util/taskcluster.py
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -17,18 +17,16 @@ from mach.decorators import (
     CommandArgument,
     CommandProvider,
     Command,
     SubCommand,
 )
 
 from mozbuild.base import MachCommandBase
 
-ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
-
 
 class ShowTaskGraphSubCommand(SubCommand):
     """A SubCommand with TaskGraph-specific arguments"""
 
     def __call__(self, func):
         after = SubCommand.__call__(self, func)
         args = [
             CommandArgument('--root', '-r', default='taskcluster/ci',
--- a/taskcluster/taskgraph/action.py
+++ b/taskcluster/taskgraph/action.py
@@ -1,28 +1,27 @@
 # -*- coding: utf-8 -*-
 
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
-import json
 import logging
 import requests
-import yaml
 
 from .create import create_tasks
 from .decision import write_artifact
 from .optimize import optimize_task_graph
 from .taskgraph import TaskGraph
+from .util.taskcluster import get_artifact
+
 
 logger = logging.getLogger(__name__)
-TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task"
 TREEHERDER_URL = "https://treeherder.mozilla.org/api"
 
 # We set this to 5 for now because this is what SETA sets the
 # count to for every repository/job. If this is ever changed,
 # we'll need to have an API added to Treeherder to let us query
 # how far back we should look.
 MAX_BACKFILL_RESULTSETS = 5
 
@@ -58,25 +57,16 @@ def add_tasks(decision_task_id, task_lab
     # write out the optimized task graph to describe what will actually happen,
     # and the map of labels to taskids
     write_artifact('{}task-graph.json'.format(prefix), optimized_graph.to_json())
     write_artifact('{}label-to-taskid.json'.format(prefix), label_to_taskid)
     # actually create the graph
     create_tasks(optimized_graph, label_to_taskid, decision_params)
 
 
-def get_artifact(task_id, path):
-    resp = requests.get(url="{}/{}/artifacts/{}".format(TASKCLUSTER_QUEUE_URL, task_id, path))
-    if path.endswith('.json'):
-        artifact = json.loads(resp.text)
-    elif path.endswith('.yml'):
-        artifact = yaml.load(resp.text)
-    return artifact
-
-
 def backfill(project, job_id):
     """
     Run the backfill task.  This function implements `mach taskgraph backfill-task`,
     and is responsible for
 
      * Scheduling backfill jobs from a given treeherder resultset backwards until either
      a successful job is found or `N` jobs have been scheduled.
     """
--- a/taskcluster/taskgraph/cron/__init__.py
+++ b/taskcluster/taskgraph/cron/__init__.py
@@ -7,46 +7,38 @@
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import datetime
 import json
 import logging
 import os
 import traceback
-import requests
 import yaml
 
 from . import decision, schema
 from .util import (
     match_utc,
     calculate_head_rev
 )
 from ..create import create_task
 from .. import GECKO
 from taskgraph.util.attributes import match_run_on_projects
 from taskgraph.util.schema import resolve_keyed_by
+from taskgraph.util.taskcluster import get_session
 
 # Functions to handle each `job.type` in `.cron.yml`.  These are called with
 # the contents of the `job` property from `.cron.yml` and should return a
 # sequence of (taskId, task) tuples which will subsequently be fed to
 # createTask.
 JOB_TYPES = {
     'decision-task': decision.run_decision_task,
 }
 
 logger = logging.getLogger(__name__)
-_session = None
-
-
-def get_session():
-    global _session
-    if not _session:
-        _session = requests.Session()
-    return _session
 
 
 def load_jobs(params):
     with open(os.path.join(GECKO, '.cron.yml'), 'rb') as f:
         cron_yml = yaml.load(f)
     schema.validate(cron_yml)
 
     # resolve keyed_by fields in each job
--- a/taskcluster/taskgraph/docker.py
+++ b/taskcluster/taskgraph/docker.py
@@ -7,41 +7,42 @@
 from __future__ import absolute_import, print_function, unicode_literals
 
 import json
 import os
 import sys
 import subprocess
 import tarfile
 import tempfile
-import urllib2
 import which
 from subprocess import Popen, PIPE
 from io import BytesIO
 
 from taskgraph.util import docker
+from taskgraph.util.taskcluster import (
+    find_task_id,
+    get_artifact_url,
+)
 from . import GECKO
 
-INDEX_URL = 'https://index.taskcluster.net/v1/task/' + docker.INDEX_PREFIX + '.{}.{}.hash.{}'
-ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
+DOCKER_INDEX = docker.INDEX_PREFIX + '.{}.{}.hash.{}'
 
 
 def load_image_by_name(image_name, tag=None):
     context_path = os.path.join(GECKO, 'taskcluster', 'docker', image_name)
     context_hash = docker.generate_context_hash(GECKO, context_path, image_name)
 
-    image_index_url = INDEX_URL.format('level-3', image_name, context_hash)
-    print("Fetching", image_index_url)
-    task = json.load(urllib2.urlopen(image_index_url))
+    index_path = DOCKER_INDEX.format('level-3', image_name, context_hash)
+    task_id = find_task_id(index_path)
 
-    return load_image_by_task_id(task['taskId'], tag)
+    return load_image_by_task_id(task_id, tag)
 
 
 def load_image_by_task_id(task_id, tag=None):
-    artifact_url = ARTIFACT_URL.format(task_id, 'public/image.tar.zst')
+    artifact_url = get_artifact_url(task_id, 'public/image.tar.zst')
     result = load_image(artifact_url, tag)
     print("Found docker image: {}:{}".format(result['image'], result['tag']))
     if tag:
         print("Re-tagged as: {}".format(tag))
     else:
         tag = '{}:{}'.format(result['image'], result['tag'])
     print("Try: docker run -ti --rm {} bash".format(tag))
     return True
--- a/taskcluster/taskgraph/parameters.py
+++ b/taskcluster/taskgraph/parameters.py
@@ -57,33 +57,31 @@ class Parameters(ReadOnlyDict):
             raise KeyError("taskgraph parameter {!r} not found".format(k))
 
 
 def load_parameters_file(options):
     """
     Load parameters from the --parameters option
     """
     import urllib
-
-    url_prefix = "https://queue.taskcluster.net/v1/task/"
-    url_postfix = "/artifacts/public/parameters.yml"
+    from taskgraph.util.taskcluster import get_artifact_url
 
     filename = options['parameters']
 
     if not filename:
         return Parameters()
 
     try:
         # reading parameters from a local parameters.yml file
         f = open(filename)
     except IOError:
         # fetching parameters.yml using task task-id or supplied url
         if filename.startswith("task-id="):
             task_id = filename.split("=")[1]
-            filename = url_prefix + task_id + url_postfix
+            filename = get_artifact_url(task_id, 'public/parameters.yml')
         f = urllib.urlopen(filename)
 
     if filename.endswith('.yml'):
         return Parameters(**yaml.safe_load(f))
     elif filename.endswith('.json'):
         return Parameters(**json.load(f))
     else:
         raise TypeError("Parameters file `{}` is not JSON or YAML".format(filename))
--- a/taskcluster/taskgraph/task/base.py
+++ b/taskcluster/taskgraph/task/base.py
@@ -1,26 +1,18 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 import abc
-import json
 import os
-import urllib2
-
-
-# if running in a task, prefer to use the taskcluster proxy (http://taskcluster/),
-# otherwise hit the services directly
-if os.environ.get('TASK_ID'):
-    INDEX_URL = 'http://taskcluster/index/v1/task/{}'
-else:
-    INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
+import requests
+from taskgraph.util.taskcluster import find_task_id
 
 
 class Task(object):
     """
     Representation of a task in a TaskGraph.  Each Task has, at creation:
 
     - kind: the name of the task kind
     - label; the label for this task
@@ -104,21 +96,22 @@ class Task(object):
         dependencies on this task will isntead depend on that taskId.  It is an
         error to return no taskId for a task on which other tasks depend.
 
         The default optimizes when a taskId can be found for one of the index
         paths attached to the task.
         """
         for index_path in self.index_paths:
             try:
-                url = INDEX_URL.format(index_path)
-                existing_task = json.load(urllib2.urlopen(url))
+                task_id = find_task_id(
+                    index_path,
+                    use_proxy=bool(os.environ.get('TASK_ID')))
 
-                return True, existing_task['taskId']
-            except urllib2.HTTPError:
+                return True, task_id
+            except requests.exceptions.HTTPError:
                 pass
 
         return False, None
 
     @classmethod
     def from_json(cls, task_dict):
         """
         Given a data structure as produced by taskgraph.to_json, re-construct
--- a/taskcluster/taskgraph/task/docker_image.py
+++ b/taskcluster/taskgraph/task/docker_image.py
@@ -10,27 +10,21 @@ import urllib2
 
 from . import base
 from .. import GECKO
 from taskgraph.util.docker import (
     docker_image,
     generate_context_hash,
     INDEX_PREFIX,
 )
+from taskgraph.util.taskcluster import get_artifact_url
 from taskgraph.util.templates import Templates
 
 logger = logging.getLogger(__name__)
 
-# if running in a task, prefer to use the taskcluster proxy (http://taskcluster/),
-# otherwise hit the services directly
-if os.environ.get('TASK_ID'):
-    ARTIFACT_URL = 'http://taskcluster/queue/v1/task/{}/artifacts/{}'
-else:
-    ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
-
 
 class DockerImageTask(base.Task):
 
     @classmethod
     def load_tasks(cls, kind, path, config, params, loaded_tasks):
         parameters = {
             'pushlog_id': params.get('pushlog_id', 0),
             'pushdate': params['moz_build_date'],
@@ -88,18 +82,19 @@ class DockerImageTask(base.Task):
         return []
 
     def optimize(self, params):
         optimized, taskId = super(DockerImageTask, self).optimize(params)
         if optimized and taskId:
             try:
                 # Only return the task ID if the artifact exists for the indexed
                 # task.
-                request = urllib2.Request(
-                    ARTIFACT_URL.format(taskId, 'public/image.tar.zst'))
+                request = urllib2.Request(get_artifact_url(
+                    taskId, 'public/image.tar.zst',
+                    use_proxy=bool(os.environ.get('TASK_ID'))))
                 request.get_method = lambda: 'HEAD'
                 urllib2.urlopen(request)
 
                 # HEAD success on the artifact is enough
                 return True, taskId
             except urllib2.HTTPError:
                 pass
 
--- a/taskcluster/taskgraph/taskgraph.py
+++ b/taskcluster/taskgraph/taskgraph.py
@@ -2,18 +2,16 @@
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from __future__ import absolute_import, print_function, unicode_literals
 
 from .graph import Graph
 from .util.python_path import find_object
 
-TASKCLUSTER_QUEUE_URL = "https://queue.taskcluster.net/v1/task/"
-
 
 class TaskGraph(object):
     """
     Representation of a task graph.
 
     A task graph is a combination of a Graph and a dictionary of tasks indexed
     by label.  TaskGraph instances should be treated as immutable.
     """
--- a/taskcluster/taskgraph/transforms/job/mozharness_test.py
+++ b/taskcluster/taskgraph/transforms/job/mozharness_test.py
@@ -1,27 +1,26 @@
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
 from voluptuous import Schema, Required
+from taskgraph.util.taskcluster import get_artifact_url
 from taskgraph.transforms.job import run_job_using
 from taskgraph.transforms.tests import (
     test_description_schema,
     get_firefox_version,
     normpath
 )
 from taskgraph.transforms.job.common import (
     docker_worker_support_vcs_checkout,
 )
 import os
 import re
 
-ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
-
 ARTIFACTS = [
     # (artifact name prefix, in-image path)
     ("public/logs/", "build/upload/logs/"),
     ("public/test", "artifacts/"),
     ("public/test_info/", "build/blobber_upload_dir/"),
 ]
 
 BUILDER_NAME_PREFIX = {
@@ -54,21 +53,21 @@ def mozharness_test_on_docker(config, jo
 
     artifacts = [
         # (artifact name prefix, in-image path)
         ("public/logs/", "/home/worker/workspace/build/upload/logs/"),
         ("public/test", "/home/worker/artifacts/"),
         ("public/test_info/", "/home/worker/workspace/build/blobber_upload_dir/"),
     ]
 
-    installer_url = ARTIFACT_URL.format('<build>', mozharness['build-artifact-name'])
-    test_packages_url = ARTIFACT_URL.format('<build>',
-                                            'public/build/target.test_packages.json')
-    mozharness_url = ARTIFACT_URL.format('<build>',
-                                         'public/build/mozharness.zip')
+    installer_url = get_artifact_url('<build>', mozharness['build-artifact-name'])
+    test_packages_url = get_artifact_url('<build>',
+                                         'public/build/target.test_packages.json')
+    mozharness_url = get_artifact_url('<build>',
+                                      'public/build/mozharness.zip')
 
     worker['artifacts'] = [{
         'name': prefix,
         'path': os.path.join('/home/worker/workspace', path),
         'type': 'directory',
     } for (prefix, path) in artifacts]
 
     worker['caches'] = [{
@@ -201,21 +200,21 @@ def mozharness_test_on_windows(config, j
             'type': 'directory'
         }
     ]
 
     build_platform = taskdesc['attributes']['build_platform']
 
     target = 'firefox-{}.en-US.{}'.format(get_firefox_version(), build_platform)
 
-    installer_url = ARTIFACT_URL.format(
+    installer_url = get_artifact_url(
         '<build>', 'public/build/{}.zip'.format(target))
-    test_packages_url = ARTIFACT_URL.format(
+    test_packages_url = get_artifact_url(
         '<build>', 'public/build/{}.test_packages.json'.format(target))
-    mozharness_url = ARTIFACT_URL.format(
+    mozharness_url = get_artifact_url(
         '<build>', 'public/build/mozharness.zip')
 
     taskdesc['scopes'].extend(
         ['generic-worker:os-group:{}'.format(group) for group in test['os-groups']])
 
     worker['os-groups'] = test['os-groups']
 
     worker['max-run-time'] = test['max-run-time']
@@ -265,21 +264,21 @@ def mozharness_test_on_windows(config, j
 
 
 @run_job_using('native-engine', 'mozharness-test', schema=mozharness_test_run_schema)
 def mozharness_test_on_native_engine(config, job, taskdesc):
     test = taskdesc['run']['test']
     mozharness = test['mozharness']
     worker = taskdesc['worker']
 
-    installer_url = ARTIFACT_URL.format('<build>', mozharness['build-artifact-name'])
-    test_packages_url = ARTIFACT_URL.format('<build>',
-                                            'public/build/target.test_packages.json')
-    mozharness_url = ARTIFACT_URL.format('<build>',
-                                         'public/build/mozharness.zip')
+    installer_url = get_artifact_url('<build>', mozharness['build-artifact-name'])
+    test_packages_url = get_artifact_url('<build>',
+                                         'public/build/target.test_packages.json')
+    mozharness_url = get_artifact_url('<build>',
+                                      'public/build/mozharness.zip')
 
     worker['artifacts'] = [{
         'name': prefix.rstrip('/'),
         'path': path.rstrip('/'),
         'type': 'directory',
     } for (prefix, path) in ARTIFACTS]
 
     worker['reboot'] = test['reboot']
--- a/taskcluster/taskgraph/transforms/signing.py
+++ b/taskcluster/taskgraph/transforms/signing.py
@@ -9,19 +9,16 @@ from __future__ import absolute_import, 
 
 from taskgraph.transforms.base import TransformSequence
 from taskgraph.util.schema import validate_schema
 from taskgraph.util.scriptworker import get_signing_cert_scope
 from taskgraph.transforms.task import task_description_schema
 from voluptuous import Schema, Any, Required, Optional
 
 
-ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/<{}>/artifacts/{}'
-
-
 # Voluptuous uses marker objects as dictionary *keys*, but they are not
 # comparable, so we cast all of the keys back to regular strings
 task_description_schema = {str(k): v for k, v in task_description_schema.schema.iteritems()}
 
 transforms = TransformSequence()
 
 # shortcut for a string where task references are allowed
 taskref_or_string = Any(
--- a/taskcluster/taskgraph/util/docker.py
+++ b/taskcluster/taskgraph/util/docker.py
@@ -14,17 +14,16 @@ import tempfile
 from mozpack.archive import (
     create_tar_gz_from_files,
 )
 from .. import GECKO
 
 
 IMAGE_DIR = os.path.join(GECKO, 'taskcluster', 'docker')
 INDEX_PREFIX = 'docker.images.v2'
-ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
 
 
 def docker_image(name, by_tag=False):
     '''
         Resolve in-tree prebuilt docker image to ``<registry>/<repository>@sha256:<digest>``,
         or ``<registry>/<repository>:<tag>`` if `by_tag` is `True`.
     '''
     try:
new file mode 100644
--- /dev/null
+++ b/taskcluster/taskgraph/util/taskcluster.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+from __future__ import absolute_import, print_function, unicode_literals
+
+import functools
+import yaml
+import requests
+from mozbuild.util import memoize
+from requests.packages.urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
+
+
+@memoize
+def get_session():
+    session = requests.Session()
+    retry = Retry(total=5, backoff_factor=0.1,
+                  status_forcelist=[500, 502, 503, 504])
+    session.mount('http://', HTTPAdapter(max_retries=retry))
+    session.mount('https://', HTTPAdapter(max_retries=retry))
+    return session
+
+
+def _do_request(url):
+    session = get_session()
+    return session.get(url, stream=True)
+
+
+def get_artifact_url(task_id, path, use_proxy=False):
+    if use_proxy:
+        ARTIFACT_URL = 'http://taskcluster/queue/v1/task/{}/artifacts/{}'
+    else:
+        ARTIFACT_URL = 'https://queue.taskcluster.net/v1/task/{}/artifacts/{}'
+    return ARTIFACT_URL.format(task_id, path)
+
+
+def get_artifact(task_id, path, use_proxy=False):
+    """
+    Returns the artifact with the given path for the given task id.
+
+    If the path ends with ".json" or ".yml", the content is deserialized as,
+    respectively, json or yaml, and the corresponding python data (usually
+    dict) is returned.
+    For other types of content, a file-like object is returned.
+    """
+    response = _do_request(get_artifact_url(task_id, path, use_proxy))
+    response.raise_for_status()
+    if path.endswith('.json'):
+        return response.json()
+    if path.endswith('.yml'):
+        return yaml.load(response.text)
+    response.raw.read = functools.partial(response.raw.read,
+                                          decode_content=True)
+    return response.raw
+
+
+def list_artifacts(task_id, use_proxy=False):
+    response = _do_request(get_artifact_url(task_id, '', use_proxy).rstrip('/'))
+    response.raise_for_status()
+    return response.json()['artifacts']
+
+
+def get_index_url(index_path, use_proxy=False):
+    if use_proxy:
+        INDEX_URL = 'http://taskcluster/index/v1/task/{}'
+    else:
+        INDEX_URL = 'https://index.taskcluster.net/v1/task/{}'
+    return INDEX_URL.format(index_path)
+
+
+def find_task_id(index_path, use_proxy=False):
+    response = _do_request(get_index_url(index_path, use_proxy))
+    response.raise_for_status()
+    return response.json()['taskId']