Bug 795360 - Make dump_syms from symbolstore.py run in parallel on multi-core machines. r=ted
authorBenedict Singer <singerb.dev@gmail.com>
Thu, 27 Dec 2012 15:32:45 -0800
changeset 126989 26954234c778fd7f6e4d138dbda318afa90aa7e8
parent 126988 aef27223779a07170808b7d0c06b659ca541e391
child 126990 49ddd8c0cb44720bf77ef93c45f14ca6cdf763dd
push id2151
push userlsblakk@mozilla.com
push dateTue, 19 Feb 2013 18:06:57 +0000
treeherdermozilla-beta@4952e88741ec [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersted
bugs795360
milestone20.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 795360 - Make dump_syms from symbolstore.py run in parallel on multi-core machines. r=ted
toolkit/crashreporter/tools/symbolstore.py
toolkit/crashreporter/tools/unit-symbolstore.py
--- a/toolkit/crashreporter/tools/symbolstore.py
+++ b/toolkit/crashreporter/tools/symbolstore.py
@@ -24,16 +24,18 @@ import sys
 import platform
 import os
 import re
 import shutil
 import textwrap
 import fnmatch
 import subprocess
 import urlparse
+import multiprocessing
+import collections
 from optparse import OptionParser
 from xml.dom.minidom import parse
 
 # Utility classes
 
 class VCSFileInfo:
     """ A base class for version-controlled file information. Ensures that the
         following attributes are generated only once (successfully):
@@ -298,31 +300,48 @@ def SourceIndex(fileStream, outputPath, 
     pdbStreamFile.write('''SRCSRV: ini ------------------------------------------------\r\nVERSION=2\r\nINDEXVERSION=2\r\nVERCTRL=http\r\nSRCSRV: variables ------------------------------------------\r\nHGSERVER=''')
     pdbStreamFile.write(vcs_root)
     pdbStreamFile.write('''\r\nSRCSRVVERCTRL=http\r\nHTTP_EXTRACT_TARGET=%hgserver%/raw-file/%var3%/%var2%\r\nSRCSRVTRG=%http_extract_target%\r\nSRCSRV: source files ---------------------------------------\r\n''')
     pdbStreamFile.write(fileStream) # can't do string interpolation because the source server also uses this and so there are % in the above
     pdbStreamFile.write("SRCSRV: end ------------------------------------------------\r\n\n")
     pdbStreamFile.close()
     return result
 
+def WorkerInitializer(cls, lock):
+    """Windows worker processes won't have run GlobalInit, and due to a lack of fork(),
+    won't inherit the class variables from the parent. The only one they need is the lock,
+    so we run an initializer to set it. Redundant but harmless on other platforms."""
+    cls.lock = lock
+
+def StartProcessFilesWork(dumper, files, arch_num, arch, vcs_root, after, after_arg):
+    """multiprocessing can't handle methods as Process targets, so we define
+    a simple wrapper function around the work method."""
+    return dumper.ProcessFilesWork(files, arch_num, arch, vcs_root, after, after_arg)
+
 class Dumper:
     """This class can dump symbols from a file with debug info, and
     store the output in a directory structure that is valid for use as
     a Breakpad symbol server.  Requires a path to a dump_syms binary--
     |dump_syms| and a directory to store symbols in--|symbol_path|.
     Optionally takes a list of processor architectures to process from
     each debug file--|archs|, the full path to the top source
     directory--|srcdir|, for generating relative source file names,
     and an option to copy debug info files alongside the dumped
     symbol files--|copy_debug|, mostly useful for creating a
     Microsoft Symbol Server from the resulting output.
 
     You don't want to use this directly if you intend to call
     ProcessDir.  Instead, call GetPlatformSpecificDumper to
-    get an instance of a subclass."""
+    get an instance of a subclass.
+ 
+    Processing is performed asynchronously via worker processes; in
+    order to wait for processing to finish and cleanup correctly, you
+    must call Finish after all Process/ProcessDir calls have been made.
+    You must also call Dumper.GlobalInit before creating or using any
+    instances."""
     def __init__(self, dump_syms, symbol_path,
                  archs=None,
                  srcdirs=[],
                  copy_debug=False,
                  vcsinfo=False,
                  srcsrv=False,
                  exclude=[],
                  repo_manifest=None):
@@ -337,16 +356,69 @@ class Dumper:
         self.srcdirs = [os.path.normpath(a) for a in srcdirs]
         self.copy_debug = copy_debug
         self.vcsinfo = vcsinfo
         self.srcsrv = srcsrv
         self.exclude = exclude[:]
         if repo_manifest:
             self.parse_repo_manifest(repo_manifest)
 
+        # book-keeping to keep track of our jobs and the cleanup work per file tuple
+        self.files_record = {}
+        self.jobs_record = collections.defaultdict(int)
+
+    @classmethod
+    def GlobalInit(cls, module=multiprocessing):
+        """Initialize the class globals for the multiprocessing setup; must
+        be called before any Dumper instances are created and used. Test cases
+        may pass in a different module to supply Manager and Pool objects,
+        usually multiprocessing.dummy."""
+        num_cpus = module.cpu_count()
+        if num_cpus is None:
+            # assume a dual core machine if we can't find out for some reason
+            # probably better on single core anyway due to I/O constraints
+            num_cpus = 2
+
+        # have to create any locks etc before the pool
+        cls.manager = module.Manager()
+        cls.jobs_condition = Dumper.manager.Condition()
+        cls.lock = Dumper.manager.RLock()
+        cls.pool = module.Pool(num_cpus, WorkerInitializer, (cls, cls.lock))
+
+    def JobStarted(self, file_key):
+        """Increments the number of submitted jobs for the specified key file,
+        defined as the original file we processed; note that a single key file
+        can generate up to 1 + len(self.archs) jobs in the Mac case."""
+        with Dumper.jobs_condition:
+            self.jobs_record[file_key] += 1
+            Dumper.jobs_condition.notify_all()
+
+    def JobFinished(self, file_key):
+        """Decrements the number of submitted jobs for the specified key file,
+        defined as the original file we processed; once the count is back to 0,
+        remove the entry from our record."""
+        with Dumper.jobs_condition:
+            self.jobs_record[file_key] -= 1
+
+            if self.jobs_record[file_key] == 0:
+                del self.jobs_record[file_key]
+
+            Dumper.jobs_condition.notify_all()
+
+    def output(self, dest, output_str):
+        """Writes |output_str| to |dest|, holding |lock|;
+        terminates with a newline."""
+        with Dumper.lock:
+            dest.write(output_str + "\n")
+            dest.flush()
+
+    def output_pid(self, dest, output_str):
+        """Debugging output; prepends the pid to the string."""
+        self.output(dest, "%d: %s" % (os.getpid(), output_str))
+
     def parse_repo_manifest(self, repo_manifest):
         """
         Parse an XML manifest of repository info as produced
         by the `repo manifest -r` command.
         """
         doc = parse(repo_manifest)
         if doc.firstChild.tagName != "manifest":
             return
@@ -412,51 +484,89 @@ class Dumper:
     # This is a no-op except on Win32
     def SourceServerIndexing(self, debug_file, guid, sourceFileStream, vcs_root):
         return ""
 
     # subclasses override this if they want to support this
     def CopyDebug(self, file, debug_file, guid):
         pass
 
+    def Finish(self, stop_pool=True):
+        """Wait for the expected number of jobs to be submitted, and then
+        wait for the pool to finish processing them. By default, will close
+        and clear the pool, but for testcases that need multiple runs, pass
+        stop_pool = False."""
+        with Dumper.jobs_condition:
+            while len(self.jobs_record) != 0:
+                Dumper.jobs_condition.wait()
+        if stop_pool:
+            Dumper.pool.close()
+            Dumper.pool.join()
+
     def Process(self, file_or_dir):
-        "Process a file or all the (valid) files in a directory."
+        """Process a file or all the (valid) files in a directory; processing is performed
+        asynchronously, and Finish must be called to wait for it complete and cleanup."""
         if os.path.isdir(file_or_dir) and not self.ShouldSkipDir(file_or_dir):
-            return self.ProcessDir(file_or_dir)
+            self.ProcessDir(file_or_dir)
         elif os.path.isfile(file_or_dir):
-            return self.ProcessFile(file_or_dir)
-        # maybe it doesn't exist?
-        return False
+            self.ProcessFiles((file_or_dir,))
 
     def ProcessDir(self, dir):
         """Process all the valid files in this directory.  Valid files
-        are determined by calling ShouldProcess."""
-        result = True
+        are determined by calling ShouldProcess; processing is performed
+        asynchronously, and Finish must be called to wait for it complete and cleanup."""
         for root, dirs, files in os.walk(dir):
             for d in dirs[:]:
                 if self.ShouldSkipDir(d):
                     dirs.remove(d)
             for f in files:
                 fullpath = os.path.join(root, f)
                 if self.ShouldProcess(fullpath):
-                    if not self.ProcessFile(fullpath):
-                        result = False
-        return result
+                    self.ProcessFiles((fullpath,))
+
+    def SubmitJob(self, file_key, func, args, callback):
+        """Submits a job to the pool of workers; increments the number of submitted jobs."""
+        self.JobStarted(file_key)
+        res = Dumper.pool.apply_async(func, args=args, callback=callback)
 
-    def ProcessFile(self, file):
-        """Dump symbols from this file into a symbol file, stored
-        in the proper directory structure in  |symbol_path|."""
-        print >> sys.stderr, "Processing file: %s" % file
-        sys.stderr.flush()
-        result = False
-        sourceFileStream = ''
+    def ProcessFilesFinished(self, res):
+        """Callback from multiprocesing when ProcessFilesWork finishes;
+        run the cleanup work, if any"""
+        self.JobFinished(res['files'][-1])
+        # only run the cleanup function once per tuple of files
+        self.files_record[res['files']] += 1
+        if self.files_record[res['files']] == len(self.archs):
+            del self.files_record[res['files']]
+            if res['after']:
+                res['after'](res['status'], res['after_arg'])
+
+    def ProcessFiles(self, files, after=None, after_arg=None):
+        """Dump symbols from these files into a symbol file, stored
+        in the proper directory structure in  |symbol_path|; processing is performed
+        asynchronously, and Finish must be called to wait for it complete and cleanup.
+        All files after the first are fallbacks in case the first file does not process
+        successfully; if it does, no other files will be touched."""
+        self.output_pid(sys.stderr, "Submitting jobs for files: %s" % str(files))
+
         # tries to get the vcs root from the .mozconfig first - if it's not set
         # the tinderbox vcs path will be assigned further down
         vcs_root = os.environ.get("SRCSRV_ROOT")
         for arch_num, arch in enumerate(self.archs):
+            self.files_record[files] = 0 # record that we submitted jobs for this tuple of files
+            self.SubmitJob(files[-1], StartProcessFilesWork, args=(self, files, arch_num, arch, vcs_root, after, after_arg), callback=self.ProcessFilesFinished)
+
+    def ProcessFilesWork(self, files, arch_num, arch, vcs_root, after, after_arg):
+        self.output_pid(sys.stderr, "Worker processing files: %s" % (files,))
+
+        # our result is a status, a cleanup function, an argument to that function, and the tuple of files we were called on
+        result = { 'status' : False, 'after' : after, 'after_arg' : after_arg, 'files' : files }
+
+        sourceFileStream = ''
+        for file in files:
+            # files is a tuple of files, containing fallbacks in case the first file doesn't process successfully
             try:
                 proc = subprocess.Popen([self.dump_syms] + arch.split() + [file],
                                         stdout=subprocess.PIPE)
                 module_line = proc.stdout.next()
                 if module_line.startswith("MODULE"):
                     # MODULE os cpu guid debug_file
                     (guid, debug_file) = (module_line.split())[3:5]
                     # strip off .pdb extensions, and append .sym
@@ -496,33 +606,36 @@ class Dumper:
                             if filename.startswith("hg"):
                                 (ver, checkout, source_file, revision) = filename.split(":", 3)
                                 sourceFileStream += sourcepath + "*" + source_file + '*' + revision + "\r\n"
                             f.write("FILE %s %s\n" % (index, filename))
                         else:
                             # pass through all other lines unchanged
                             f.write(line)
                             # we want to return true only if at least one line is not a MODULE or FILE line
-                            result = True
+                            result['status'] = True
                     f.close()
                     proc.wait()
                     # we output relative paths so callers can get a list of what
                     # was generated
-                    print rel_path
+                    self.output(sys.stdout, rel_path)
                     if self.srcsrv and vcs_root:
                         # add source server indexing to the pdb file
                         self.SourceServerIndexing(file, guid, sourceFileStream, vcs_root)
                     # only copy debug the first time if we have multiple architectures
                     if self.copy_debug and arch_num == 0:
                         self.CopyDebug(file, debug_file, guid)
             except StopIteration:
                 pass
-            except:
-                print >> sys.stderr, "Unexpected error: ", sys.exc_info()[0]
+            except e:
+                self.output(sys.stderr, "Unexpected error: %s" % (str(e),))
                 raise
+            if result['status']:
+                # we only need 1 file to work
+                break
         return result
 
 # Platform-specific subclasses.  For the most part, these just have
 # logic to determine what files to extract symbols from.
 
 class Dumper_Win32(Dumper):
     fixedFilenameCaseCache = {}
 
@@ -571,19 +684,19 @@ class Dumper_Win32(Dumper):
         compressed_file = os.path.splitext(full_path)[0] + ".pd_"
         # ignore makecab's output
         success = subprocess.call(["makecab.exe", "/D", "CompressionType=LZX", "/D",
                                    "CompressionMemory=21",
                                    full_path, compressed_file],
                                   stdout=open("NUL:","w"), stderr=subprocess.STDOUT)
         if success == 0 and os.path.exists(compressed_file):
             os.unlink(full_path)
-            print os.path.splitext(rel_path)[0] + ".pd_"
+            self.output(sys.stdout, os.path.splitext(rel_path)[0] + ".pd_")
         else:
-            print rel_path
+            self.output(sys.stdout, rel_path)
         
     def SourceServerIndexing(self, debug_file, guid, sourceFileStream, vcs_root):
         # Creates a .pdb.stream file in the mozilla\objdir to be used for source indexing
         debug_file = os.path.abspath(debug_file)
         streamFilename = debug_file + ".stream"
         stream_output_path = os.path.abspath(streamFilename)
         # Call SourceIndex to create the .stream file
         result = SourceIndex(sourceFileStream, stream_output_path, vcs_root)
@@ -620,17 +733,17 @@ class Dumper_Linux(Dumper):
             rel_path = os.path.join(debug_file,
                                     guid,
                                     debug_file + ".dbg")
             full_path = os.path.normpath(os.path.join(self.symbol_path,
                                                       rel_path))
             shutil.move(file_dbg, full_path)
             # gzip the shipped debug files
             os.system("gzip %s" % full_path)
-            print rel_path + ".gz"
+            self.output(sys.stdout, rel_path + ".gz")
         else:
             if os.path.isfile(file_dbg):
                 os.unlink(file_dbg)
 
 class Dumper_Solaris(Dumper):
     def RunFileCommand(self, file):
         """Utility function, returns the output of file(1)"""
         try:
@@ -645,16 +758,26 @@ class Dumper_Solaris(Dumper):
         file(1) reports as being ELF files.  It expects to find the file
         command in PATH."""
         if not Dumper.ShouldProcess(self, file):
             return False
         if file.endswith(".so") or os.access(file, os.X_OK):
             return self.RunFileCommand(file).startswith("ELF")
         return False
 
+def StartProcessFilesWorkMac(dumper, file):
+    """multiprocessing can't handle methods as Process targets, so we define
+    a simple wrapper function around the work method."""
+    return dumper.ProcessFilesWorkMac(file)
+
+def AfterMac(status, dsymbundle):
+    """Cleanup function to run on Macs after we process the file(s)."""
+    # CopyDebug will already have been run from Dumper.ProcessFiles
+    shutil.rmtree(dsymbundle)
+
 class Dumper_Mac(Dumper):
     def ShouldProcess(self, file):
         """This function will allow processing of files that are
         executable, or end with the .dylib extension, and additionally
         file(1) reports as being Mach-O files.  It expects to find the file
         command in PATH."""
         if not Dumper.ShouldProcess(self, file):
             return False
@@ -666,57 +789,70 @@ class Dumper_Mac(Dumper):
         """We create .dSYM bundles on the fly, but if someone runs
         buildsymbols twice, we should skip any bundles we created
         previously, otherwise we'll recurse into them and try to 
         dump the inner bits again."""
         if dir.endswith(".dSYM"):
             return True
         return False
 
-    def ProcessFile(self, file):
+    def ProcessFiles(self, files, after=None, after_arg=None):
+        # also note, files must be len 1 here, since we're the only ones
+        # that ever add more than one file to the list
+        self.output_pid(sys.stderr, "Submitting job for Mac pre-processing on file: %s" % (files[0]))
+        self.SubmitJob(files[0], StartProcessFilesWorkMac, args=(self, files[0]), callback=self.ProcessFilesMacFinished)
+
+    def ProcessFilesMacFinished(self, result):
+        if result['status']:
+            # kick off new jobs per-arch with our new list of files
+            Dumper.ProcessFiles(self, result['files'], after=AfterMac, after_arg=result['files'][0])
+        # only decrement jobs *after* that, since otherwise we'll remove the record for this file
+        self.JobFinished(result['files'][-1])
+
+    def ProcessFilesWorkMac(self, file):
         """dump_syms on Mac needs to be run on a dSYM bundle produced
         by dsymutil(1), so run dsymutil here and pass the bundle name
         down to the superclass method instead."""
+        self.output_pid(sys.stderr, "Worker running Mac pre-processing on file: %s" % (file,))
+
+        # our return is a status and a tuple of files to dump symbols for
+        # the extra files are fallbacks; as soon as one is dumped successfully, we stop
+        result = { 'status' : False, 'files' : None, 'file_key' : file }
         dsymbundle = file + ".dSYM"
         if os.path.exists(dsymbundle):
             shutil.rmtree(dsymbundle)
         # dsymutil takes --arch=foo instead of -a foo like everything else
         subprocess.call(["dsymutil"] + [a.replace('-a ', '--arch=') for a in self.archs if a]
                         + [file],
                         stdout=open("/dev/null","w"))
         if not os.path.exists(dsymbundle):
             # dsymutil won't produce a .dSYM for files without symbols
-            return False
-        res = Dumper.ProcessFile(self, dsymbundle)
-        # CopyDebug will already have been run from Dumper.ProcessFile
-        shutil.rmtree(dsymbundle)
+            result['status'] = False
+            return result
 
-        # fallback for DWARF-less binaries
-        if not res:
-            print >> sys.stderr, "Couldn't read DWARF symbols in: %s" % dsymbundle
-            res = Dumper.ProcessFile(self, file)
-
-        return res
+        result['status'] = True
+        result['files'] = (dsymbundle, file)
+        return result
 
     def CopyDebug(self, file, debug_file, guid):
-        """ProcessFile has already produced a dSYM bundle, so we should just
+        """ProcessFiles has already produced a dSYM bundle, so we should just
         copy that to the destination directory. However, we'll package it
         into a .tar.bz2 because the debug symbols are pretty huge, and
         also because it's a bundle, so it's a directory. |file| here is the
         dSYM bundle, and |debug_file| is the original filename."""
         rel_path = os.path.join(debug_file,
                                 guid,
                                 os.path.basename(file) + ".tar.bz2")
         full_path = os.path.abspath(os.path.join(self.symbol_path,
                                                   rel_path))
         success = subprocess.call(["tar", "cjf", full_path, os.path.basename(file)],
                                   cwd=os.path.dirname(file),
                                   stdout=open("/dev/null","w"), stderr=subprocess.STDOUT)
         if success == 0 and os.path.exists(full_path):
-            print rel_path
+            self.output(sys.stdout, rel_path)
 
 # Entry point if called as a standalone program
 def main():
     parser = OptionParser(usage="usage: %prog [options] <dump_syms binary> <symbol store path> <debug info files>")
     parser.add_option("-c", "--copy",
                       action="store_true", dest="copy_debug", default=False,
                       help="Copy debug info files into the same directory structure as symbol files")
     parser.add_option("-a", "--archs",
@@ -758,12 +894,17 @@ produced by the `repo manifest -r` comma
                                        archs=options.archs,
                                        srcdirs=options.srcdir,
                                        vcsinfo=options.vcsinfo,
                                        srcsrv=options.srcsrv,
                                        exclude=options.exclude,
                                        repo_manifest=options.repo_manifest)
     for arg in args[2:]:
         dumper.Process(arg)
+    dumper.Finish()
 
 # run main if run directly
 if __name__ == "__main__":
+    # set up the multiprocessing infrastructure before we start;
+    # note that this needs to be in the __main__ guard, or else Windows will choke
+    Dumper.GlobalInit()
+
     main()
--- a/toolkit/crashreporter/tools/unit-symbolstore.py
+++ b/toolkit/crashreporter/tools/unit-symbolstore.py
@@ -1,14 +1,14 @@
 #!/usr/bin/env python
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
-import os, tempfile, unittest, shutil, struct, platform, subprocess
+import os, tempfile, unittest, shutil, struct, platform, subprocess, multiprocessing.dummy
 import mock
 from mock import patch
 import symbolstore
 
 # Some simple functions to mock out files that the platform-specific dumpers will accept.
 # dump_syms itself will not be run (we mock that call out), but we can't override
 # the ShouldProcessFile method since we actually want to test that.
 def write_elf(filename):
@@ -61,44 +61,48 @@ class HelperMixin(object):
             writer(f)
 
 class TestExclude(HelperMixin, unittest.TestCase):
     def test_exclude_wildcard(self):
         """
         Test that using an exclude list with a wildcard pattern works.
         """
         processed = []
-        def mock_process_file(filename):
-            processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
+        def mock_process_file(filenames):
+            for filename in filenames:
+                processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
             return True
         self.add_test_files(add_extension(["foo", "bar", "abc/xyz", "abc/fooxyz", "def/asdf", "def/xyzfoo"]))
         d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
                                                   symbol_path="symbol_path",
                                                   exclude=["*foo*"])
-        d.ProcessFile = mock_process_file
-        self.assertTrue(d.Process(self.test_dir))
+        d.ProcessFiles = mock_process_file
+        d.Process(self.test_dir)
+        d.Finish(stop_pool=False)
         processed.sort()
         expected = add_extension(["bar", "abc/xyz", "def/asdf"])
         expected.sort()
         self.assertEqual(processed, expected)
 
     def test_exclude_filenames(self):
         """
         Test that excluding a filename without a wildcard works.
         """
         processed = []
-        def mock_process_file(filename):
-            processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
+        def mock_process_file(filenames):
+            for filename in filenames:
+                processed.append((filename[len(self.test_dir):] if filename.startswith(self.test_dir) else filename).replace('\\', '/'))
             return True
         self.add_test_files(add_extension(["foo", "bar", "abc/foo", "abc/bar", "def/foo", "def/bar"]))
         d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
                                                   symbol_path="symbol_path",
                                                   exclude=add_extension(["foo"]))
-        d.ProcessFile = mock_process_file
-        self.assertTrue(d.Process(self.test_dir))
+        d.ProcessFiles = mock_process_file
+        d.Process(self.test_dir)
+        d.Finish(stop_pool=False)
         processed.sort()
         expected = add_extension(["bar", "abc/bar", "def/bar"])
         expected.sort()
         self.assertEqual(processed, expected)
 
 def popen_factory(stdouts):
     """
     Generate a class that can mock subprocess.Popen. |stdouts| is an iterable that
@@ -124,23 +128,29 @@ class TestCopyDebugUniversal(HelperMixin
     def setUp(self):
         HelperMixin.setUp(self)
         self.symbol_dir = tempfile.mkdtemp()
         self._subprocess_call = subprocess.call
         subprocess.call = self.mock_call
         self._subprocess_popen = subprocess.Popen
         subprocess.Popen = popen_factory(self.next_mock_stdout())
         self.stdouts = []
+        self._shutil_rmtree = shutil.rmtree
+        shutil.rmtree = self.mock_rmtree
 
     def tearDown(self):
         HelperMixin.tearDown(self)
+        shutil.rmtree = self._shutil_rmtree
         shutil.rmtree(self.symbol_dir)
         subprocess.call = self._subprocess_call
         subprocess.Popen = self._subprocess_popen
 
+    def mock_rmtree(self, path):
+        pass
+
     def mock_call(self, args, **kwargs):
         if args[0].endswith("dsymutil"):
             filename = args[-1]
             os.makedirs(filename + ".dSYM")
         return 0
 
     def next_mock_stdout(self):
         if not self.stdouts:
@@ -159,17 +169,18 @@ class TestCopyDebugUniversal(HelperMixin
         self.add_test_files(add_extension(["foo"]))
         self.stdouts.append(mock_dump_syms("X" * 33, add_extension(["foo"])[0]))
         self.stdouts.append(mock_dump_syms("Y" * 33, add_extension(["foo"])[0]))
         d = symbolstore.GetPlatformSpecificDumper(dump_syms="dump_syms",
                                                   symbol_path=self.symbol_dir,
                                                   copy_debug=True,
                                                   archs="abc xyz")
         d.CopyDebug = mock_copy_debug
-        self.assertTrue(d.Process(self.test_dir))
+        d.Process(self.test_dir)
+        d.Finish(stop_pool=False)
         self.assertEqual(1, len(copied))
 
 class TestGetVCSFilename(HelperMixin, unittest.TestCase):
     def setUp(self):
         HelperMixin.setUp(self)
 
     def tearDown(self):
         HelperMixin.tearDown(self)
@@ -226,9 +237,16 @@ class TestRepoManifest(HelperMixin, unit
         self.assertEqual("git:example.com/bar/projects/one:src1.c:abcd1234",
                          symbolstore.GetVCSFilename(file1, d.srcdirs)[0])
         self.assertEqual("git:example.com/foo/projects/two:src2.c:ffffffff",
                          symbolstore.GetVCSFilename(file2, d.srcdirs)[0])
         self.assertEqual("git:example.com/bar/something_else:src3.c:00000000",
                          symbolstore.GetVCSFilename(file3, d.srcdirs)[0])
 
 if __name__ == '__main__':
-  unittest.main()
+    # use the multiprocessing.dummy module to use threading wrappers so
+    # that our mocking/module-patching works
+    symbolstore.Dumper.GlobalInit(module=multiprocessing.dummy)
+
+    unittest.main()
+
+    symbolstore.Dumper.pool.close()
+    symbolstore.Dumper.pool.join()