streamclone: also stream caches to the client
authorBoris Feld <boris.feld@octobus.net>
Thu, 18 Jan 2018 00:50:12 +0100
changeset 41653 5f5fb279fd394c117fb64a95ad7fd1e4db72951d
parent 41652 72fdd99eb5265e4bf5c3af961eb8288d676e7946
child 41654 60a6ab7bcda7ed0e97c72912be4e029c38f0ef15
push id653
push usergszorc@mozilla.com
push dateSun, 21 Jan 2018 00:53:23 +0000
streamclone: also stream caches to the client When stream clone is used over bundle2, relevant cache files are also streamed. This is expected to be a massive performance win for clone since no important cache will have to be recomputed. Some performance numbers: (All times are wall-clock times in seconds, 2 attempts per case.) # Mozilla-Central ## Clone over ssh over lan V1 streaming: 234.3 239.6 V2 streaming: 248.4 243.7 ## Clone over ssh over Internet V1 streaming: 175.5 110.9 V2 streaming: 109.1 111.0 ## Clone over HTTP over lan V1 streaming: 105.3 105.6 V2 streaming: 112.7 111.4 ## Clone over HTTP over internet V1 streaming: 105.6 114.6 V2 streaming: 226.7 225.9 ## Hg tags V1 streaming (no cache): 1.084 1.071 V2 streaming (cache): 0.312 0.325 ## Hg branches V1 streaming (no cache): 14.047 14.148 V2 streaming (with cache): 0.312 0.333 # Pypy ## Clone over ssh over internet V1 streaming: 29.4 30.1 V2 streaming: 31.2 30.1 ## Clone over http over internet V1 streaming: 29.7 29.7 V2 streaming: 75.2 72.9 (since ssh and lan are not affected, there seems to be an issue with how we read/write the http stream on connection with latency, unrelated to the format) ## Hg tags V1 streaming (no cache): 1.752 1.664 V2 streaming (with cache): 0.274 0.260 ## Hg branches V1 streaming (no cache): 4.469 4.728 V2 streaming (with cache): 0.318 0.321 # Private repository: * 500K revision revisions * 11K topological heads * 28K branch heads ## hg tags no cache: 1543.332 with cache: 4.900 ## hg branches no cache: 91.828 with cache: 2.955
mercurial/streamclone.py
tests/test-clone-uncompressed.t
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -6,20 +6,22 @@
 # GNU General Public License version 2 or any later version.
 
 from __future__ import absolute_import
 
 import contextlib
 import os
 import struct
 import tempfile
+import warnings
 
 from .i18n import _
 from . import (
     branchmap,
+    cacheutil,
     error,
     phases,
     store,
     util,
 )
 
 def canperformstreamclone(pullop, bundle2=False):
     """Whether it is possible to perform a streaming clone as part of pull.
@@ -430,30 +432,34 @@ class streamcloneapplier(object):
 
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
 
 # type of file to stream
 _fileappend = 0 # append only file
 _filefull = 1   # full snapshot file
 
+# Source of the file
+_srcstore = 's' # store (svfs)
+_srccache = 'c' # cache (cache)
+
 # This is it's own function so extensions can override it.
 def _walkstreamfullstorefiles(repo):
     """list snapshot file from the store"""
     fnames = []
     if not repo.publishing():
         fnames.append('phaseroots')
     return fnames
 
-def _filterfull(entry, copy, vfs):
+def _filterfull(entry, copy, vfsmap):
     """actually copy the snapshot files"""
-    name, ftype, data = entry
+    src, name, ftype, data = entry
     if ftype != _filefull:
         return entry
-    return (name, ftype, copy(vfs.join(name)))
+    return (src, name, ftype, copy(vfsmap[src].join(name)))
 
 @contextlib.contextmanager
 def maketempcopies():
     """return a function to temporary copy file"""
     files = []
     try:
         def copy(src):
             fd, dst = tempfile.mkstemp()
@@ -461,29 +467,43 @@ def maketempcopies():
             files.append(dst)
             util.copyfiles(src, dst, hardlink=True)
             return dst
         yield copy
     finally:
         for tmp in files:
             util.tryunlink(tmp)
 
+def _makemap(repo):
+    """make a (src -> vfs) map for the repo"""
+    vfsmap = {
+        _srcstore: repo.svfs,
+        _srccache: repo.cachevfs,
+    }
+    # we keep repo.vfs out of the on purpose, ther are too many danger there
+    # (eg: .hg/hgrc)
+    assert repo.vfs not in vfsmap.values()
+
+    return vfsmap
+
 def _emit(repo, entries, totalfilesize):
     """actually emit the stream bundle"""
-    vfs = repo.svfs
+    vfsmap = _makemap(repo)
     progress = repo.ui.progress
     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
     with maketempcopies() as copy:
         try:
             # copy is delayed until we are in the try
-            entries = [_filterfull(e, copy, vfs) for e in entries]
+            entries = [_filterfull(e, copy, vfsmap) for e in entries]
             yield None # this release the lock on the repository
             seen = 0
 
-            for name, ftype, data in entries:
+            for src, name, ftype, data in entries:
+                vfs = vfsmap[src]
+                yield src
                 yield util.uvarintencode(len(name))
                 if ftype == _fileappend:
                     fp = vfs(name)
                     size = data
                 elif ftype == _filefull:
                     fp = open(data, 'rb')
                     size = util.fstat(fp).st_size
                 try:
@@ -502,74 +522,93 @@ def _emit(repo, entries, totalfilesize):
                     fp.close()
         finally:
             progress(_('bundle'), None)
 
 def generatev2(repo):
     """Emit content for version 2 of a streaming clone.
 
     the data stream consists the following entries:
-    1) A varint containing the length of the filename
-    2) A varint containing the length of file data
-    3) N bytes containing the filename (the internal, store-agnostic form)
-    4) N bytes containing the file data
+    1) A char representing the file destination (eg: store or cache)
+    2) A varint containing the length of the filename
+    3) A varint containing the length of file data
+    4) N bytes containing the filename (the internal, store-agnostic form)
+    5) N bytes containing the file data
 
     Returns a 3-tuple of (file count, file size, data iterator).
     """
 
     with repo.lock():
 
         entries = []
         totalfilesize = 0
 
         repo.ui.debug('scanning\n')
         for name, ename, size in _walkstreamfiles(repo):
             if size:
-                entries.append((name, _fileappend, size))
+                entries.append((_srcstore, name, _fileappend, size))
                 totalfilesize += size
         for name in _walkstreamfullstorefiles(repo):
             if repo.svfs.exists(name):
                 totalfilesize += repo.svfs.lstat(name).st_size
-                entries.append((name, _filefull, None))
+                entries.append((_srcstore, name, _filefull, None))
+        for name in cacheutil.cachetocopy(repo):
+            if repo.cachevfs.exists(name):
+                totalfilesize += repo.cachevfs.lstat(name).st_size
+                entries.append((_srccache, name, _filefull, None))
 
         chunks = _emit(repo, entries, totalfilesize)
         first = next(chunks)
         assert first is None
 
     return len(entries), totalfilesize, chunks
 
+@contextlib.contextmanager
+def nested(*ctxs):
+    with warnings.catch_warnings():
+        # For some reason, Python decided 'nested' was deprecated without
+        # replacement. They officially advertised for filtering the deprecation
+        # warning for people who actually need the feature.
+        warnings.filterwarnings("ignore",category=DeprecationWarning)
+        with contextlib.nested(*ctxs):
+            yield
+
 def consumev2(repo, fp, filecount, filesize):
     """Apply the contents from a version 2 streaming clone.
 
     Data is read from an object that only needs to provide a ``read(size)``
     method.
     """
     with repo.lock():
         repo.ui.status(_('%d files to transfer, %s of data\n') %
                        (filecount, util.bytecount(filesize)))
 
         start = util.timer()
         handledbytes = 0
         progress = repo.ui.progress
 
         progress(_('clone'), handledbytes, total=filesize, unit=_('bytes'))
 
-        vfs = repo.svfs
+        vfsmap = _makemap(repo)
 
         with repo.transaction('clone'):
-            with vfs.backgroundclosing(repo.ui):
+            ctxs = (vfs.backgroundclosing(repo.ui)
+                    for vfs in vfsmap.values())
+            with nested(*ctxs):
                 for i in range(filecount):
+                    src = fp.read(1)
+                    vfs = vfsmap[src]
                     namelen = util.uvarintdecodestream(fp)
                     datalen = util.uvarintdecodestream(fp)
 
                     name = fp.read(namelen)
 
                     if repo.ui.debugflag:
-                        repo.ui.debug('adding %s (%s)\n' %
-                                      (name, util.bytecount(datalen)))
+                        repo.ui.debug('adding [%s] %s (%s)\n' %
+                                      (src, name, util.bytecount(datalen)))
 
                     with vfs(name, 'w') as ofp:
                         for chunk in util.filechunkiter(fp, limit=datalen):
                             handledbytes += len(chunk)
                             progress(_('clone'), handledbytes, total=filesize,
                                      unit=_('bytes'))
                             ofp.write(chunk)
 
--- a/tests/test-clone-uncompressed.t
+++ b/tests/test-clone-uncompressed.t
@@ -33,35 +33,40 @@ Basic clone
   1027 files to transfer, 96.3 KB of data
   transferred 96.3 KB in * seconds (*/sec) (glob)
   searching for changes
   no changes found
 #endif
 #if stream-bundle2
   $ hg clone --stream -U http://localhost:$HGPORT clone1
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
+
+  $ ls -1 clone1/.hg/cache
+  branch2-served
+  rbc-names-v1
+  rbc-revs-v1
 #endif
 
 --uncompressed is an alias to --stream
 
 #if stream-legacy
   $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
   streaming all changes
   1027 files to transfer, 96.3 KB of data
   transferred 96.3 KB in * seconds (*/sec) (glob)
   searching for changes
   no changes found
 #endif
 #if stream-bundle2
   $ hg clone --uncompressed -U http://localhost:$HGPORT clone1-uncompressed
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
 #endif
 
 Clone with background file closing enabled
 
 #if stream-legacy
   $ hg --debug --config worker.backgroundclose=true --config worker.backgroundcloseminfilecount=1 clone --stream -U http://localhost:$HGPORT clone-background | grep -v adding
   using http://localhost:$HGPORT/
   sending capabilities command
@@ -90,20 +95,21 @@ Clone with background file closing enabl
   sending capabilities command
   query 1; heads
   sending batch command
   streaming all changes
   sending getbundle command
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream" (params: 4 mandatory) supported
   applying stream bundle
-  1027 files to transfer, 96.3 KB of data
+  1030 files to transfer, 96.4 KB of data
+  starting 4 threads for background file closing
   starting 4 threads for background file closing
-  transferred 96.3 KB in * seconds (* */sec) (glob)
-  bundle2-input-part: total payload size 110887
+  transferred 96.4 KB in * seconds (* */sec) (glob)
+  bundle2-input-part: total payload size 112077
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
   bundle2-input-bundle: 1 parts total
   checking for updated bookmarks
 #endif
 
 Cannot stream clone when there are secret changesets
 
   $ hg -R server phase --force --secret -r tip
@@ -131,18 +137,18 @@ Streaming of secrets can be overridden b
   1027 files to transfer, 96.3 KB of data
   transferred 96.3 KB in * seconds (*/sec) (glob)
   searching for changes
   no changes found
 #endif
 #if stream-bundle2
   $ hg clone --stream -U http://localhost:$HGPORT secret-allowed
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1030 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
 #endif
 
   $ killdaemons.py
 
 Verify interaction between preferuncompressed and secret presence
 
   $ cd server
   $ hg serve --config server.preferuncompressed=true -p $HGPORT -d --pid-file=hg.pid
@@ -248,18 +254,18 @@ clone it
   searching for changes
   no changes found
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT with-bookmarks
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1033 files to transfer, 96.6 KB of data
+  transferred 96.6 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
   $ hg -R with-bookmarks bookmarks
      some-bookmark             1:c17445101a72
 
 Stream repository with phases
 -----------------------------
@@ -278,18 +284,18 @@ Clone as publishing
   searching for changes
   no changes found
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-publish
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1033 files to transfer, 96.6 KB of data
+  transferred 96.6 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
 #endif
   $ hg -R phase-publish phase -r 'all()'
   0: public
   1: public
 
 Clone as non publishing
@@ -313,18 +319,18 @@ Clone as non publishing
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
   0: public
   1: public
 #endif
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1028 files to transfer, 96.4 KB of data
-  transferred 96.4 KB in * seconds (* */sec) (glob)
+  1034 files to transfer, 96.7 KB of data
+  transferred 96.7 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
   0: draft
   1: draft
 #endif
 
   $ killdaemons.py