Bug 1432390 - Use zstandard and requests modules instead of spawning curl | zstd in docker.load_image. r=dustin
authorMike Hommey <mh+mozilla@glandium.org>
Wed, 24 Jan 2018 11:18:13 +0900
changeset 455913 d7d6780f2b42e0a434803bb2a734c7b8564713a1
parent 455912 c80238a31d8d721df40d036baea84d4fe3990d59
child 455914 5ed2c5587d03a97a4d06df20ec9a3a1805b9c9d0
push id1683
push usersfraser@mozilla.com
push dateThu, 26 Apr 2018 16:43:40 +0000
treeherdermozilla-release@5af6cb21869d [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersdustin
bugs1432390
milestone60.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 1432390 - Use zstandard and requests modules instead of spawning curl | zstd in docker.load_image. r=dustin The zstd command we spawn, if available at all, might be the wrong version: zstd changed its stream format in an incompatible way at some point, and the version shipped in e.g. Ubuntu 16.04 uses the old format, while the version taskcluster relies on uses the new format. Relying on gps's zstandard library allows to ensure we use the right version. Another advantage is that we can trivially pip install it in a virtualenv if it isn't available on the system running the command. If we're ridding ourselves of the subprocess spawning for zstd, we might as well cover curl as well. Especially considering the error handling when subprocesses are involved is not trivial, such that the current error handling code is actually broken and leads to dead-lock conditions, when, for example, curl is still waiting for the python side to read data, but the python side is not reading data anymore because an exception was thrown in the tar reading loop.
taskcluster/mach_commands.py
taskcluster/taskgraph/docker.py
--- a/taskcluster/mach_commands.py
+++ b/taskcluster/mach_commands.py
@@ -370,31 +370,43 @@ class MachCommands(MachCommandBase):
                 for depname, dep in named_links_dict[key].iteritems():
                     if regexprogram.match(dep):
                         filterededges.add((key, dep, depname))
         filtered_taskgraph = TaskGraph(filteredtasks, Graph(set(filteredtasks), filterededges))
         return filtered_taskgraph
 
 
 @CommandProvider
-class TaskClusterImagesProvider(object):
+class TaskClusterImagesProvider(MachCommandBase):
+    def _ensure_sztd(self):
+        try:
+            import zstd
+            # There are two zstd libraries that exist in the wild, ensure we
+            # have the right one.
+            zstd.ZstdCompressor
+            zstd.ZstdDecompressor
+        except (ImportError, AttributeError):
+            self._activate_virtualenv()
+            self.virtualenv_manager.install_pip_package('zstandard==0.8.1')
+
     @Command('taskcluster-load-image', category="ci",
              description="Load a pre-built Docker image")
     @CommandArgument('--task-id',
                      help="Load the image at public/image.tar.zst in this task,"
                           "rather than searching the index")
     @CommandArgument('-t', '--tag',
                      help="tag that the image should be loaded as. If not "
                           "image will be loaded with tag from the tarball",
                      metavar="name:tag")
     @CommandArgument('image_name', nargs='?',
                      help="Load the image of this name based on the current"
                           "contents of the tree (as built for mozilla-central"
                           "or mozilla-inbound)")
     def load_image(self, image_name, task_id, tag):
+        self._ensure_sztd()
         from taskgraph.docker import load_image_by_name, load_image_by_task_id
         if not image_name and not task_id:
             print("Specify either IMAGE-NAME or TASK-ID")
             sys.exit(1)
         try:
             if task_id:
                 ok = load_image_by_task_id(task_id, tag)
             else:
--- a/taskcluster/taskgraph/docker.py
+++ b/taskcluster/taskgraph/docker.py
@@ -14,16 +14,17 @@ import tempfile
 import which
 from subprocess import Popen, PIPE
 from io import BytesIO
 
 from taskgraph.util import docker
 from taskgraph.util.taskcluster import (
     find_task_id,
     get_artifact_url,
+    get_session,
 )
 from taskgraph.util.cached_tasks import cached_index_path
 from . import GECKO
 
 
 def load_image_by_name(image_name, tag=None):
     context_path = os.path.join(GECKO, 'taskcluster', 'docker', image_name)
     context_hash = docker.generate_context_hash(GECKO, context_path, image_name)
@@ -108,39 +109,70 @@ def build_image(name, args=None):
         print('*' * 50)
         print('WARNING: no VERSION file found in image directory.')
         print('Image is not suitable for deploying/pushing.')
         print('Create an image suitable for deploying/pushing by creating')
         print('a VERSION file in the image directory.')
         print('*' * 50)
 
 
+# The zstandard library doesn't expose a file-like interface for its
+# decompressor, but an iterator. Support for a file-like interface is due in
+# next release. In the meanwhile, we use this proxy class to turn the iterator
+# into a file-like.
+class IteratorReader(object):
+    def __init__(self, iterator):
+        self._iterator = iterator
+        self._buf = b''
+
+    def read(self, size):
+        result = b''
+        while len(result) < size:
+            wanted = min(size - len(result), len(self._buf))
+            if not self._buf:
+                try:
+                    self._buf = memoryview(next(self._iterator))
+                except StopIteration:
+                    break
+            result += self._buf[:wanted].tobytes()
+            self._buf = self._buf[wanted:]
+        return result
+
+
 def load_image(url, imageName=None, imageTag=None):
     """
     Load docker image from URL as imageName:tag, if no imageName or tag is given
     it will use whatever is inside the zstd compressed tarball.
 
     Returns an object with properties 'image', 'tag' and 'layer'.
     """
+    import zstd
+
     # If imageName is given and we don't have an imageTag
     # we parse out the imageTag from imageName, or default it to 'latest'
     # if no imageName and no imageTag is given, 'repositories' won't be rewritten
     if imageName and not imageTag:
         if ':' in imageName:
             imageName, imageTag = imageName.split(':', 1)
         else:
             imageTag = 'latest'
 
-    curl, zstd, docker = None, None, None
+    docker = None
     image, tag, layer = None, None, None
     try:
-        # Setup piping: curl | zstd | tarin
-        curl = Popen(['curl', '-#', '--fail', '-L', '--retry', '8', url], stdout=PIPE)
-        zstd = Popen(['zstd', '-d'], stdin=curl.stdout, stdout=PIPE)
-        tarin = tarfile.open(mode='r|', fileobj=zstd.stdout)
+        print("Downloading from {}".format(url))
+        # get_session() gets us a requests.Session set to retry several times.
+        req = get_session().get(url, stream=True)
+        req.raise_for_status()
+        decompressed_reader = IteratorReader(zstd.ZstdDecompressor().read_from(req.raw))
+        tarin = tarfile.open(
+            mode='r|',
+            fileobj=decompressed_reader,
+            bufsize=zstd.DECOMPRESSION_RECOMMENDED_OUTPUT_SIZE)
+
         # Seutp piping: tarout | docker
         docker = Popen(['docker', 'load'], stdin=PIPE)
         tarout = tarfile.open(mode='w|', fileobj=docker.stdin, format=tarfile.GNU_FORMAT)
 
         # Read from tarin and write to tarout
         for member in tarin:
             # Write non-file members directly (don't use extractfile on links)
             if not member.isfile():
@@ -170,30 +202,16 @@ def load_image(url, imageName=None, imag
                 reader = BytesIO(data)
                 member.size = len(data)
 
             # Add member and reader
             tarout.addfile(member, reader)
             reader.close()
         tarout.close()
     finally:
-        def trykill(proc):
-            try:
-                proc.kill()
-            except:
-                pass
-
-        # Check that all subprocesses finished correctly
-        if curl and curl.wait() != 0:
-            trykill(zstd)
-            trykill(docker)
-            raise Exception('failed to download from url: {}'.format(url))
-        if zstd and zstd.wait() != 0:
-            trykill(docker)
-            raise Exception('zstd decompression failed')
         if docker:
             docker.stdin.close()
         if docker and docker.wait() != 0:
             raise Exception('loading into docker failed')
 
     # Check that we found a repositories file
     if not image or not tag or not layer:
         raise Exception('No repositories file found!')