bug 1164816 - Refactor symbolstore.py to remove WorkerInitializer. r=gps, a=NPOTB
authorTed Mielczarek <ted@mielczarek.org>
Wed, 13 May 2015 14:14:08 -0400
changeset 288720 5ee833743c380bde4cb38eff2d57c6d9d6ab3034
parent 288719 e948a102a4a80874239dbe8f7f9cf4e2e405017e
child 288721 3db62f9ee5e5db7f0fe52380d475789b3b67d9c8
push id5067
push userraliiev@mozilla.com
push dateMon, 21 Sep 2015 14:04:52 +0000
treeherdermozilla-beta@14221ffe5b2f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersgps, NPOTB
bugs1164816
milestone42.0a2
bug 1164816 - Refactor symbolstore.py to remove WorkerInitializer. r=gps, a=NPOTB concurrent.futures doesn't have a WorkerInitializer equivalent to what multiprocessing.Pool has, so refactor things slightly to remove that dependency.
toolkit/crashreporter/tools/symbolstore.py
--- a/toolkit/crashreporter/tools/symbolstore.py
+++ b/toolkit/crashreporter/tools/symbolstore.py
@@ -341,27 +341,23 @@ def SourceIndex(fileStream, outputPath, 
     pdbStreamFile.write('''SRCSRV: ini ------------------------------------------------\r\nVERSION=2\r\nINDEXVERSION=2\r\nVERCTRL=http\r\nSRCSRV: variables ------------------------------------------\r\nHGSERVER=''')
     pdbStreamFile.write(vcs_root)
     pdbStreamFile.write('''\r\nSRCSRVVERCTRL=http\r\nHTTP_EXTRACT_TARGET=%hgserver%/raw-file/%var3%/%var2%\r\nSRCSRVTRG=%http_extract_target%\r\nSRCSRV: source files ---------------------------------------\r\n''')
     pdbStreamFile.write(fileStream) # can't do string interpolation because the source server also uses this and so there are % in the above
     pdbStreamFile.write("SRCSRV: end ------------------------------------------------\r\n\n")
     pdbStreamFile.close()
     return result
 
-def WorkerInitializer(cls, lock, srcdirRepoInfo):
-    """Windows worker processes won't have run GlobalInit, and due to a lack of fork(),
-    won't inherit the class variables from the parent. They only need a few variables,
-    so we run an initializer to set them. Redundant but harmless on other platforms."""
-    cls.lock = lock
-    cls.srcdirRepoInfo = srcdirRepoInfo
-
-def StartProcessFilesWork(dumper, files, arch_num, arch, vcs_root, after, after_arg):
-    """multiprocessing can't handle methods as Process targets, so we define
-    a simple wrapper function around the work method."""
-    return dumper.ProcessFilesWork(files, arch_num, arch, vcs_root, after, after_arg)
+def StartJob(dumper, lock, srcdirRepoInfo, func_name, args):
+    # Windows worker processes won't have run GlobalInit,
+    # and due to a lack of fork(), won't inherit the class
+    # variables from the parent, so set them here.
+    Dumper.lock = lock
+    Dumper.srcdirRepoInfo = srcdirRepoInfo
+    return getattr(dumper, func_name)(*args)
 
 class Dumper:
     """This class can dump symbols from a file with debug info, and
     store the output in a directory structure that is valid for use as
     a Breakpad symbol server.  Requires a path to a dump_syms binary--
     |dump_syms| and a directory to store symbols in--|symbol_path|.
     Optionally takes a list of processor architectures to process from
     each debug file--|archs|, the full path to the top source
@@ -421,18 +417,17 @@ class Dumper:
             # probably better on single core anyway due to I/O constraints
             num_cpus = 2
 
         # have to create any locks etc before the pool
         cls.manager = module.Manager()
         cls.jobs_condition = Dumper.manager.Condition()
         cls.lock = Dumper.manager.RLock()
         cls.srcdirRepoInfo = Dumper.manager.dict()
-        cls.pool = module.Pool(num_cpus, WorkerInitializer,
-                               (cls, cls.lock, cls.srcdirRepoInfo))
+        cls.pool = module.Pool(num_cpus)
 
     def JobStarted(self, file_key):
         """Increments the number of submitted jobs for the specified key file,
         defined as the original file we processed; note that a single key file
         can generate up to 1 + len(self.archs) jobs in the Mac case."""
         with Dumper.jobs_condition:
             self.jobs_record[file_key] += 1
             Dumper.jobs_condition.notify_all()
@@ -567,20 +562,20 @@ class Dumper:
             for d in dirs[:]:
                 if self.ShouldSkipDir(d):
                     dirs.remove(d)
             for f in files:
                 fullpath = os.path.join(root, f)
                 if self.ShouldProcess(fullpath):
                     self.ProcessFiles((fullpath,))
 
-    def SubmitJob(self, file_key, func, args, callback):
+    def SubmitJob(self, file_key, func_name, args, callback):
         """Submits a job to the pool of workers; increments the number of submitted jobs."""
         self.JobStarted(file_key)
-        res = Dumper.pool.apply_async(func, args=args, callback=callback)
+        res = Dumper.pool.apply_async(StartJob, args=(self, Dumper.lock, Dumper.srcdirRepoInfo, func_name, args), callback=callback)
 
     def ProcessFilesFinished(self, res):
         """Callback from multiprocesing when ProcessFilesWork finishes;
         run the cleanup work, if any"""
         self.JobFinished(res['files'][-1])
         # only run the cleanup function once per tuple of files
         self.files_record[res['files']] += 1
         if self.files_record[res['files']] == len(self.archs):
@@ -596,17 +591,17 @@ class Dumper:
         successfully; if it does, no other files will be touched."""
         self.output_pid(sys.stderr, "Submitting jobs for files: %s" % str(files))
 
         # tries to get the vcs root from the .mozconfig first - if it's not set
         # the tinderbox vcs path will be assigned further down
         vcs_root = os.environ.get("SRCSRV_ROOT")
         for arch_num, arch in enumerate(self.archs):
             self.files_record[files] = 0 # record that we submitted jobs for this tuple of files
-            self.SubmitJob(files[-1], StartProcessFilesWork, args=(self, files, arch_num, arch, vcs_root, after, after_arg), callback=self.ProcessFilesFinished)
+            self.SubmitJob(files[-1], 'ProcessFilesWork', args=(files, arch_num, arch, vcs_root, after, after_arg), callback=self.ProcessFilesFinished)
 
     def ProcessFilesWork(self, files, arch_num, arch, vcs_root, after, after_arg):
         self.output_pid(sys.stderr, "Worker processing files: %s" % (files,))
 
         # our result is a status, a cleanup function, an argument to that function, and the tuple of files we were called on
         result = { 'status' : False, 'after' : after, 'after_arg' : after_arg, 'files' : files }
 
         sourceFileStream = ''
@@ -843,21 +838,16 @@ class Dumper_Solaris(Dumper):
         file(1) reports as being ELF files.  It expects to find the file
         command in PATH."""
         if not Dumper.ShouldProcess(self, file):
             return False
         if file.endswith(".so") or os.access(file, os.X_OK):
             return self.RunFileCommand(file).startswith("ELF")
         return False
 
-def StartProcessFilesWorkMac(dumper, file):
-    """multiprocessing can't handle methods as Process targets, so we define
-    a simple wrapper function around the work method."""
-    return dumper.ProcessFilesWorkMac(file)
-
 def AfterMac(status, dsymbundle):
     """Cleanup function to run on Macs after we process the file(s)."""
     # CopyDebug will already have been run from Dumper.ProcessFiles
     shutil.rmtree(dsymbundle)
 
 class Dumper_Mac(Dumper):
     def ShouldProcess(self, file):
         """This function will allow processing of files that are
@@ -878,17 +868,17 @@ class Dumper_Mac(Dumper):
         if dir.endswith(".dSYM"):
             return True
         return False
 
     def ProcessFiles(self, files, after=None, after_arg=None):
         # also note, files must be len 1 here, since we're the only ones
         # that ever add more than one file to the list
         self.output_pid(sys.stderr, "Submitting job for Mac pre-processing on file: %s" % (files[0]))
-        self.SubmitJob(files[0], StartProcessFilesWorkMac, args=(self, files[0]), callback=self.ProcessFilesMacFinished)
+        self.SubmitJob(files[0], 'ProcessFilesWorkMac', args=(files[0]), callback=self.ProcessFilesMacFinished)
 
     def ProcessFilesMacFinished(self, result):
         if result['status']:
             # kick off new jobs per-arch with our new list of files
             Dumper.ProcessFiles(self, result['files'], after=AfterMac, after_arg=result['files'][0])
         # only decrement jobs *after* that, since otherwise we'll remove the record for this file
         self.JobFinished(result['files'][-1])