streamclone: add support for cloning non append-only file
authorBoris Feld <boris.feld@octobus.net>
Thu, 18 Jan 2018 00:50:02 +0100
changeset 41651 56c30b31afbe65eded2b31c09be79751727f466a
parent 41650 b6ffd419463943dbc614c89fd9735dba37310254
child 41652 72fdd99eb5265e4bf5c3af961eb8288d676e7946
push id653
push usergszorc@mozilla.com
push dateSun, 21 Jan 2018 00:53:23 +0000
streamclone: add support for cloning non append-only file The phaseroots are stored in a non append-only file in the repository. We include them in the stream too. Since they are not append-only, we have to keep a copy around while we hold the lock to be able to stream them later. Since phase get exchanged within the stream we can skip requesting them independently. As a side effect, this will fixes issue5648 once the feature is enabled by default.
mercurial/exchange.py
mercurial/streamclone.py
tests/test-clone-uncompressed.t
--- a/mercurial/exchange.py
+++ b/mercurial/exchange.py
@@ -1460,32 +1460,33 @@ def _pullbundle2(pullop):
     # declare pull perimeters
     kwargs['common'] = pullop.common
     kwargs['heads'] = pullop.heads or pullop.rheads
 
     if streaming:
         kwargs['cg'] = False
         kwargs['stream'] = True
         pullop.stepsdone.add('changegroup')
+        pullop.stepsdone.add('phases')
 
     else:
         # pulling changegroup
         pullop.stepsdone.add('changegroup')
 
         kwargs['cg'] = pullop.fetch
 
-    legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
-    hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
-    if (not legacyphase and hasbinaryphase):
-        kwargs['phases'] = True
-        pullop.stepsdone.add('phases')
+        legacyphase = 'phases' in ui.configlist('devel', 'legacy.exchange')
+        hasbinaryphase = 'heads' in pullop.remotebundle2caps.get('phases', ())
+        if (not legacyphase and hasbinaryphase):
+            kwargs['phases'] = True
+            pullop.stepsdone.add('phases')
 
-    if 'listkeys' in pullop.remotebundle2caps:
-        if 'phases' not in pullop.stepsdone:
-            kwargs['listkeys'] = ['phases']
+        if 'listkeys' in pullop.remotebundle2caps:
+            if 'phases' not in pullop.stepsdone:
+                kwargs['listkeys'] = ['phases']
 
     bookmarksrequested = False
     legacybookmark = 'bookmarks' in ui.configlist('devel', 'legacy.exchange')
     hasbinarybook = 'bookmarks' in pullop.remotebundle2caps
 
     if pullop.remotebookmarks is not None:
         pullop.stepsdone.add('request-bookmarks')
 
--- a/mercurial/streamclone.py
+++ b/mercurial/streamclone.py
@@ -2,17 +2,20 @@
 #
 # Copyright 2015 Gregory Szorc <gregory.szorc@gmail.com>
 #
 # This software may be used and distributed according to the terms of the
 # GNU General Public License version 2 or any later version.
 
 from __future__ import absolute_import
 
+import contextlib
+import os
 import struct
+import tempfile
 
 from .i18n import _
 from . import (
     branchmap,
     error,
     phases,
     store,
     util,
@@ -423,42 +426,87 @@ class streamcloneapplier(object):
     readers to perform bundle type-specific functionality.
     """
     def __init__(self, fh):
         self._fh = fh
 
     def apply(self, repo):
         return applybundlev1(repo, self._fh)
 
+# type of file to stream
+_fileappend = 0 # append only file
+_filefull = 1   # full snapshot file
+
+# This is it's own function so extensions can override it.
+def _walkstreamfullstorefiles(repo):
+    """list snapshot file from the store"""
+    fnames = []
+    if not repo.publishing():
+        fnames.append('phaseroots')
+    return fnames
+
+def _filterfull(entry, copy, vfs):
+    """actually copy the snapshot files"""
+    name, ftype, data = entry
+    if ftype != _filefull:
+        return entry
+    return (name, ftype, copy(vfs.join(name)))
+
+@contextlib.contextmanager
+def maketempcopies():
+    """return a function to temporary copy file"""
+    files = []
+    try:
+        def copy(src):
+            fd, dst = tempfile.mkstemp()
+            os.close(fd)
+            files.append(dst)
+            util.copyfiles(src, dst, hardlink=True)
+            return dst
+        yield copy
+    finally:
+        for tmp in files:
+            util.tryunlink(tmp)
+
 def _emit(repo, entries, totalfilesize):
     """actually emit the stream bundle"""
+    vfs = repo.svfs
     progress = repo.ui.progress
     progress(_('bundle'), 0, total=totalfilesize, unit=_('bytes'))
-    vfs = repo.svfs
-    try:
-        seen = 0
-        for name, size in entries:
-            yield util.uvarintencode(len(name))
-            fp = vfs(name)
-            try:
-                yield util.uvarintencode(size)
-                yield name
-                if size <= 65536:
-                    chunks = (fp.read(size),)
-                else:
-                    chunks = util.filechunkiter(fp, limit=size)
-                for chunk in chunks:
-                    seen += len(chunk)
-                    progress(_('bundle'), seen, total=totalfilesize,
-                             unit=_('bytes'))
-                    yield chunk
-            finally:
-                fp.close()
-    finally:
-        progress(_('bundle'), None)
+    with maketempcopies() as copy:
+        try:
+            # copy is delayed until we are in the try
+            entries = [_filterfull(e, copy, vfs) for e in entries]
+            yield None # this release the lock on the repository
+            seen = 0
+
+            for name, ftype, data in entries:
+                yield util.uvarintencode(len(name))
+                if ftype == _fileappend:
+                    fp = vfs(name)
+                    size = data
+                elif ftype == _filefull:
+                    fp = open(data, 'rb')
+                    size = util.fstat(fp).st_size
+                try:
+                    yield util.uvarintencode(size)
+                    yield name
+                    if size <= 65536:
+                        chunks = (fp.read(size),)
+                    else:
+                        chunks = util.filechunkiter(fp, limit=size)
+                    for chunk in chunks:
+                        seen += len(chunk)
+                        progress(_('bundle'), seen, total=totalfilesize,
+                                 unit=_('bytes'))
+                        yield chunk
+                finally:
+                    fp.close()
+        finally:
+            progress(_('bundle'), None)
 
 def generatev2(repo):
     """Emit content for version 2 of a streaming clone.
 
     the data stream consists the following entries:
     1) A varint containing the length of the filename
     2) A varint containing the length of file data
     3) N bytes containing the filename (the internal, store-agnostic form)
@@ -470,20 +518,26 @@ def generatev2(repo):
     with repo.lock():
 
         entries = []
         totalfilesize = 0
 
         repo.ui.debug('scanning\n')
         for name, ename, size in _walkstreamfiles(repo):
             if size:
-                entries.append((name, size))
+                entries.append((name, _fileappend, size))
                 totalfilesize += size
+        for name in _walkstreamfullstorefiles(repo):
+            if repo.svfs.exists(name):
+                totalfilesize += repo.svfs.lstat(name).st_size
+                entries.append((name, _filefull, None))
 
         chunks = _emit(repo, entries, totalfilesize)
+        first = next(chunks)
+        assert first is None
 
     return len(entries), totalfilesize, chunks
 
 def consumev2(repo, fp, filecount, filesize):
     """Apply the contents from a version 2 streaming clone.
 
     Data is read from an object that only needs to provide a ``read(size)``
     method.
--- a/tests/test-clone-uncompressed.t
+++ b/tests/test-clone-uncompressed.t
@@ -95,19 +95,17 @@ Clone with background file closing enabl
   bundle2-input-bundle: with-transaction
   bundle2-input-part: "stream" (params: 4 mandatory) supported
   applying stream bundle
   1027 files to transfer, 96.3 KB of data
   starting 4 threads for background file closing
   transferred 96.3 KB in * seconds (* */sec) (glob)
   bundle2-input-part: total payload size 110887
   bundle2-input-part: "listkeys" (params: 1 mandatory) supported
-  bundle2-input-part: "phase-heads" supported
-  bundle2-input-part: total payload size 24
-  bundle2-input-bundle: 2 parts total
+  bundle2-input-bundle: 1 parts total
   checking for updated bookmarks
 #endif
 
 Cannot stream clone when there are secret changesets
 
   $ hg -R server phase --force --secret -r tip
   $ hg clone --stream -U http://localhost:$HGPORT secret-denied
   warning: stream clone requested but server has them disabled
@@ -315,18 +313,18 @@ Clone as non publishing
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
   0: public
   1: public
 #endif
 #if stream-bundle2
   $ hg clone --stream http://localhost:$HGPORT phase-no-publish
   streaming all changes
-  1027 files to transfer, 96.3 KB of data
-  transferred 96.3 KB in * seconds (* */sec) (glob)
+  1028 files to transfer, 96.4 KB of data
+  transferred 96.4 KB in * seconds (* */sec) (glob)
   updating to branch default
   1025 files updated, 0 files merged, 0 files removed, 0 files unresolved
   $ hg -R phase-no-publish phase -r 'all()'
-  0: public
-  1: public
+  0: draft
+  1: draft
 #endif
 
   $ killdaemons.py