Bug 1164816 - Rewrite symbolstore.py to use concurrent.futures. r=gps, a=NPOTB
authorTed Mielczarek <ted@mielczarek.org>
Wed, 13 May 2015 14:50:11 -0400
changeset 288721 3db62f9ee5e5db7f0fe52380d475789b3b67d9c8
parent 288720 5ee833743c380bde4cb38eff2d57c6d9d6ab3034
child 288722 29e101c9b5f35dca9196f38b4a6f6c5235c637fc
push id5067
push userraliiev@mozilla.com
push dateMon, 21 Sep 2015 14:04:52 +0000
treeherdermozilla-beta@14221ffe5b2f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgps, NPOTB
bugs1164816
milestone42.0a2
Bug 1164816 - Rewrite symbolstore.py to use concurrent.futures. r=gps, a=NPOTB
toolkit/crashreporter/tools/symbolstore.py
toolkit/crashreporter/tools/unit-symbolstore.py
--- a/toolkit/crashreporter/tools/symbolstore.py
+++ b/toolkit/crashreporter/tools/symbolstore.py
@@ -27,18 +27,20 @@ import platform
 import os
 import re
 import shutil
 import textwrap
 import fnmatch
 import subprocess
 import ctypes
 import urlparse
+import concurrent.futures
 import multiprocessing
 import collections
+
 from optparse import OptionParser
 from xml.dom.minidom import parse
 
 from mozpack.copier import FileRegistry
 from mozpack.manifests import (
     InstallManifest,
     UnreadableInstallManifest,
 )
@@ -349,16 +351,46 @@ def SourceIndex(fileStream, outputPath, 
 def StartJob(dumper, lock, srcdirRepoInfo, func_name, args):
     # Windows worker processes won't have run GlobalInit,
     # and due to a lack of fork(), won't inherit the class
     # variables from the parent, so set them here.
     Dumper.lock = lock
     Dumper.srcdirRepoInfo = srcdirRepoInfo
     return getattr(dumper, func_name)(*args)
 
+class JobPool(object):
+    jobs = {}
+    executor = None
+
+    @classmethod
+    def init(cls, executor):
+        cls.executor = executor
+
+    @classmethod
+    def shutdown(cls):
+        cls.executor.shutdown()
+
+    @classmethod
+    def submit(cls, args, callback):
+        cls.jobs[cls.executor.submit(StartJob, *args)] = callback
+
+    @classmethod
+    def as_completed(cls):
+        '''Like concurrent.futures.as_completed, but allows adding new futures
+        between generator steps. Iteration will end when the generator has
+        yielded all completed futures and JobQueue.jobs is empty.
+        Yields (future, callback) pairs.
+        '''
+        while cls.jobs:
+            completed, _ = concurrent.futures.wait(cls.jobs.keys(), return_when=concurrent.futures.FIRST_COMPLETED)
+            for f in completed:
+                callback = cls.jobs[f]
+                del cls.jobs[f]
+                yield f, callback
+
 class Dumper:
     """This class can dump symbols from a file with debug info, and
     store the output in a directory structure that is valid for use as
     a Breakpad symbol server.  Requires a path to a dump_syms binary--
     |dump_syms| and a directory to store symbols in--|symbol_path|.
     Optionally takes a list of processor architectures to process from
     each debug file--|archs|, the full path to the top source
     directory--|srcdir|, for generating relative source file names,
@@ -396,58 +428,36 @@ class Dumper:
         self.copy_debug = copy_debug
         self.vcsinfo = vcsinfo
         self.srcsrv = srcsrv
         self.exclude = exclude[:]
         if repo_manifest:
             self.parse_repo_manifest(repo_manifest)
         self.file_mapping = file_mapping or {}
 
-        # book-keeping to keep track of our jobs and the cleanup work per file tuple
+        # book-keeping to keep track of the cleanup work per file tuple
         self.files_record = {}
-        self.jobs_record = collections.defaultdict(int)
 
     @classmethod
-    def GlobalInit(cls, module=multiprocessing):
+    def GlobalInit(cls, executor=concurrent.futures.ProcessPoolExecutor):
         """Initialize the class globals for the multiprocessing setup; must
         be called before any Dumper instances are created and used. Test cases
-        may pass in a different module to supply Manager and Pool objects,
-        usually multiprocessing.dummy."""
-        num_cpus = module.cpu_count()
+        may pass in a different executor to use, usually
+        concurrent.futures.ThreadPoolExecutor."""
+        num_cpus = multiprocessing.cpu_count()
         if num_cpus is None:
             # assume a dual core machine if we can't find out for some reason
             # probably better on single core anyway due to I/O constraints
             num_cpus = 2
 
         # have to create any locks etc before the pool
-        cls.manager = module.Manager()
-        cls.jobs_condition = Dumper.manager.Condition()
-        cls.lock = Dumper.manager.RLock()
-        cls.srcdirRepoInfo = Dumper.manager.dict()
-        cls.pool = module.Pool(num_cpus)
-
-    def JobStarted(self, file_key):
-        """Increments the number of submitted jobs for the specified key file,
-        defined as the original file we processed; note that a single key file
-        can generate up to 1 + len(self.archs) jobs in the Mac case."""
-        with Dumper.jobs_condition:
-            self.jobs_record[file_key] += 1
-            Dumper.jobs_condition.notify_all()
-
-    def JobFinished(self, file_key):
-        """Decrements the number of submitted jobs for the specified key file,
-        defined as the original file we processed; once the count is back to 0,
-        remove the entry from our record."""
-        with Dumper.jobs_condition:
-            self.jobs_record[file_key] -= 1
-
-            if self.jobs_record[file_key] == 0:
-                del self.jobs_record[file_key]
-
-            Dumper.jobs_condition.notify_all()
+        manager = multiprocessing.Manager()
+        cls.lock = manager.RLock()
+        cls.srcdirRepoInfo = manager.dict()
+        JobPool.init(executor(max_workers=num_cpus))
 
     def output(self, dest, output_str):
         """Writes |output_str| to |dest|, holding |lock|;
         terminates with a newline."""
         with Dumper.lock:
             dest.write(output_str + "\n")
             dest.flush()
 
@@ -530,26 +540,28 @@ class Dumper:
     def SourceServerIndexing(self, debug_file, guid, sourceFileStream, vcs_root):
         return ""
 
     # subclasses override this if they want to support this
     def CopyDebug(self, file, debug_file, guid, code_file, code_id):
         pass
 
     def Finish(self, stop_pool=True):
-        """Wait for the expected number of jobs to be submitted, and then
-        wait for the pool to finish processing them. By default, will close
-        and clear the pool, but for testcases that need multiple runs, pass
-        stop_pool = False."""
-        with Dumper.jobs_condition:
-            while len(self.jobs_record) != 0:
-                Dumper.jobs_condition.wait()
+        '''Process all pending jobs and any jobs their callbacks submit.
+        By default, will shutdown the executor, but for testcases that
+        need multiple runs, pass stop_pool = False.'''
+        for job, callback in JobPool.as_completed():
+            try:
+                res = job.result()
+            except Exception as e:
+                self.output(sys.stderr, 'Job raised exception: %s' % e)
+                continue
+            callback(res)
         if stop_pool:
-            Dumper.pool.close()
-            Dumper.pool.join()
+            JobPool.shutdown()
 
     def Process(self, file_or_dir):
         """Process a file or all the (valid) files in a directory; processing is performed
         asynchronously, and Finish must be called to wait for it complete and cleanup."""
         if os.path.isdir(file_or_dir) and not self.ShouldSkipDir(file_or_dir):
             self.ProcessDir(file_or_dir)
         elif os.path.isfile(file_or_dir):
             self.ProcessFiles((file_or_dir,))
@@ -563,24 +575,22 @@ class Dumper:
                 if self.ShouldSkipDir(d):
                     dirs.remove(d)
             for f in files:
                 fullpath = os.path.join(root, f)
                 if self.ShouldProcess(fullpath):
                     self.ProcessFiles((fullpath,))
 
     def SubmitJob(self, file_key, func_name, args, callback):
-        """Submits a job to the pool of workers; increments the number of submitted jobs."""
-        self.JobStarted(file_key)
-        res = Dumper.pool.apply_async(StartJob, args=(self, Dumper.lock, Dumper.srcdirRepoInfo, func_name, args), callback=callback)
+        """Submits a job to the pool of workers"""
+        JobPool.submit((self, Dumper.lock, Dumper.srcdirRepoInfo, func_name, args), callback)
 
     def ProcessFilesFinished(self, res):
         """Callback from multiprocesing when ProcessFilesWork finishes;
         run the cleanup work, if any"""
-        self.JobFinished(res['files'][-1])
         # only run the cleanup function once per tuple of files
         self.files_record[res['files']] += 1
         if self.files_record[res['files']] == len(self.archs):
             del self.files_record[res['files']]
             if res['after']:
                 res['after'](res['status'], res['after_arg'])
 
     def ProcessFiles(self, files, after=None, after_arg=None):
@@ -729,17 +739,17 @@ class Dumper_Win32(Dumper):
     def CopyDebug(self, file, debug_file, guid, code_file, code_id):
         def compress(path):
             compressed_file = path[:-1] + '_'
             # ignore makecab's output
             success = subprocess.call(["makecab.exe", "/D",
                                        "CompressionType=LZX", "/D",
                                        "CompressionMemory=21",
                                        path, compressed_file],
-                                      stdout=open("NUL:","w"),
+                                      stdout=open(os.devnull, 'w'),
                                       stderr=subprocess.STDOUT)
             if success == 0 and os.path.exists(compressed_file):
                 os.unlink(path)
                 return True
             return False
 
         rel_path = os.path.join(debug_file,
                                 guid,
@@ -868,41 +878,39 @@ class Dumper_Mac(Dumper):
         if dir.endswith(".dSYM"):
             return True
         return False
 
     def ProcessFiles(self, files, after=None, after_arg=None):
         # also note, files must be len 1 here, since we're the only ones
         # that ever add more than one file to the list
         self.output_pid(sys.stderr, "Submitting job for Mac pre-processing on file: %s" % (files[0]))
-        self.SubmitJob(files[0], 'ProcessFilesWorkMac', args=(files[0]), callback=self.ProcessFilesMacFinished)
+        self.SubmitJob(files[0], 'ProcessFilesWorkMac', args=(files[0],), callback=self.ProcessFilesMacFinished)
 
     def ProcessFilesMacFinished(self, result):
         if result['status']:
             # kick off new jobs per-arch with our new list of files
             Dumper.ProcessFiles(self, result['files'], after=AfterMac, after_arg=result['files'][0])
-        # only decrement jobs *after* that, since otherwise we'll remove the record for this file
-        self.JobFinished(result['files'][-1])
 
     def ProcessFilesWorkMac(self, file):
         """dump_syms on Mac needs to be run on a dSYM bundle produced
         by dsymutil(1), so run dsymutil here and pass the bundle name
         down to the superclass method instead."""
         self.output_pid(sys.stderr, "Worker running Mac pre-processing on file: %s" % (file,))
 
         # our return is a status and a tuple of files to dump symbols for
         # the extra files are fallbacks; as soon as one is dumped successfully, we stop
         result = { 'status' : False, 'files' : None, 'file_key' : file }
         dsymbundle = file + ".dSYM"
         if os.path.exists(dsymbundle):
             shutil.rmtree(dsymbundle)
         # dsymutil takes --arch=foo instead of -a foo like everything else
         subprocess.call(["dsymutil"] + [a.replace('-a ', '--arch=') for a in self.archs if a]
                         + [file],
-                        stdout=open("/dev/null","w"))
+                        stdout=open(os.devnull, 'w'))
         if not os.path.exists(dsymbundle):
             # dsymutil won't produce a .dSYM for files without symbols
             self.output_pid(sys.stderr, "No symbols found in file: %s" % (file,))
             result['status'] = False
             result['files'] = (file, )
             return result
 
         result['status'] = True
@@ -917,17 +925,17 @@ class Dumper_Mac(Dumper):
         dSYM bundle, and |debug_file| is the original filename."""
         rel_path = os.path.join(debug_file,
                                 guid,
                                 os.path.basename(file) + ".tar.bz2")
         full_path = os.path.abspath(os.path.join(self.symbol_path,
                                                   rel_path))
         success = subprocess.call(["tar", "cjf", full_path, os.path.basename(file)],
                                   cwd=os.path.dirname(file),
-                                  stdout=open("/dev/null","w"), stderr=subprocess.STDOUT)
+                                  stdout=open(os.devnull, 'w'), stderr=subprocess.STDOUT)
         if success == 0 and os.path.exists(full_path):
             self.output(sys.stdout, rel_path)
 
 # Entry point if called as a standalone program
 def main():
     parser = OptionParser(usage="usage: %prog [options] <dump_syms binary> <symbol store path> <debug info files>")
     parser.add_option("-c", "--copy",
                       action="store_true", dest="copy_debug", default=False,
--- a/toolkit/crashreporter/tools/unit-symbolstore.py
+++ b/toolkit/crashreporter/tools/unit-symbolstore.py
@@ -1,19 +1,28 @@
 #!/usr/bin/env python
 # This Source Code Form is subject to the terms of the Mozilla Public
 # License, v. 2.0. If a copy of the MPL was not distributed with this
 # file, You can obtain one at http://mozilla.org/MPL/2.0/.
 
-import os, tempfile, unittest, shutil, struct, platform, subprocess, multiprocessing.dummy
+import concurrent.futures
 import mock
+import os
+import platform
+import shutil
+import struct
+import subprocess
+import sys
+import tempfile
+import unittest
+
 from mock import patch
-import symbolstore
+from mozpack.manifests import InstallManifest
 
-from mozpack.manifests import InstallManifest
+import symbolstore
 
 # Some simple functions to mock out files that the platform-specific dumpers will accept.
 # dump_syms itself will not be run (we mock that call out), but we can't override
 # the ShouldProcessFile method since we actually want to test that.
 def write_elf(filename):
     open(filename, "wb").write(struct.pack("<7B45x", 0x7f, ord("E"), ord("L"), ord("F"), 1, 1, 1))
 
 def write_macho(filename):
@@ -418,17 +427,79 @@ class TestFileMapping(HelperMixin, unitt
         open(f, 'wb').write('blah')
         d.Process(f)
         d.Finish(stop_pool=False)
         expected_output = ''.join(mk_output(expected_files))
         symbol_file = os.path.join(self.symboldir,
                                    file_id[1], file_id[0], file_id[1] + '.sym')
         self.assertEqual(open(symbol_file, 'r').read(), expected_output)
 
+class TestFunctional(HelperMixin, unittest.TestCase):
+    '''Functional tests of symbolstore.py, calling it with a real
+    dump_syms binary and passing in a real binary to dump symbols from.
+
+    Since the rest of the tests in this file mock almost everything and
+    don't use the actual process pool like buildsymbols does, this tests
+    that the way symbolstore.py gets called in buildsymbols works.
+    '''
+    def setUp(self):
+        HelperMixin.setUp(self)
+        import buildconfig
+        self.skip_test = False
+        if buildconfig.substs['MOZ_BUILD_APP'] != 'browser':
+            self.skip_test = True
+        self.topsrcdir = buildconfig.topsrcdir
+        self.script_path = os.path.join(self.topsrcdir, 'toolkit',
+                                        'crashreporter', 'tools',
+                                        'symbolstore.py')
+        if platform.system() in ("Windows", "Microsoft"):
+            self.dump_syms = os.path.join(self.topsrcdir,
+                                          'toolkit',
+                                          'crashreporter',
+                                          'tools',
+                                          'win32',
+                                          'dump_syms_vc{_MSC_VER}.exe'.format(**buildconfig.substs))
+            self.target_bin = os.path.join(buildconfig.topobjdir,
+                                           'browser',
+                                           'app',
+                                           'firefox.pdb')
+        else:
+            self.dump_syms = os.path.join(buildconfig.topobjdir,
+                                          'dist', 'host', 'bin',
+                                          'dump_syms')
+            self.target_bin = os.path.join(buildconfig.topobjdir,
+                                           'dist', 'bin', 'firefox')
+
+
+    def tearDown(self):
+        HelperMixin.tearDown(self)
+
+    def testSymbolstore(self):
+        if self.skip_test:
+            raise unittest.SkipTest('Skipping test in non-Firefox product')
+        output = subprocess.check_output([sys.executable,
+                                          self.script_path,
+                                          '--vcs-info',
+                                          '-s', self.topsrcdir,
+                                          self.dump_syms,
+                                          self.test_dir,
+                                          self.target_bin],
+                                         stderr=open(os.devnull, 'w'))
+        lines = filter(lambda x: x.strip(), output.splitlines())
+        self.assertEqual(1, len(lines),
+                         'should have one filename in the output')
+        symbol_file = os.path.join(self.test_dir, lines[0])
+        self.assertTrue(os.path.isfile(symbol_file))
+        symlines = open(symbol_file, 'r').readlines()
+        file_lines = filter(lambda x: x.startswith('FILE') and 'nsBrowserApp.cpp' in x, symlines)
+        self.assertEqual(len(file_lines), 1,
+                         'should have nsBrowserApp.cpp FILE line')
+        filename = file_lines[0].split(None, 2)[2]
+        self.assertEqual('hg:', filename[:3])
+
+
 if __name__ == '__main__':
-    # use the multiprocessing.dummy module to use threading wrappers so
-    # that our mocking/module-patching works
-    symbolstore.Dumper.GlobalInit(module=multiprocessing.dummy)
+    # use ThreadPoolExecutor to use threading instead of processes so
+    # that our mocking/module-patching works.
+    symbolstore.Dumper.GlobalInit(concurrent.futures.ThreadPoolExecutor)
 
     unittest.main()
 
-    symbolstore.Dumper.pool.close()
-    symbolstore.Dumper.pool.join()