Bug 500388 - Don't buffer xpcshell process output, use a callback to consume output as it is available. r=ted
authorChris Manchester <cmanchester@mozilla.com>
Thu, 12 Sep 2013 19:48:43 -0400
changeset 159931 3c7913e853b86dbe2503819145830689747ab59b
parent 159930 cd56b1bd18e3bf8cea288252125c6c2d5458ab35
child 159932 017efee4dc037efbd0fae00e85fdad790fd4aeb8
push id2961
push userlsblakk@mozilla.com
push dateMon, 28 Oct 2013 21:59:28 +0000
treeherdermozilla-beta@73ef4f13486f [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersted
bugs500388
milestone26.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 500388 - Don't buffer xpcshell process output, use a callback to consume output as it is available. r=ted
testing/xpcshell/mach_commands.py
testing/xpcshell/runxpcshelltests.py
--- a/testing/xpcshell/mach_commands.py
+++ b/testing/xpcshell/mach_commands.py
@@ -137,16 +137,18 @@ class XPCShellRunner(MozbuildObject):
             'profileName': 'firefox',
             'verbose': test_path is not None,
             'xunitFilename': os.path.join(self.statedir, 'xpchsell.xunit.xml'),
             'xunitName': 'xpcshell',
             'pluginsPath': os.path.join(self.distdir, 'plugins'),
             'debugger': debugger,
             'debuggerArgs': debuggerArgs,
             'debuggerInteractive': debuggerInteractive,
+            'on_message': (lambda obj, msg: xpcshell.log.info(msg)) \
+                            if test_path is not None else None,
         }
 
         if manifest is not None:
             args['manifest'] = manifest
         elif test_dirs is not None:
             if isinstance(test_dirs, list):
                 args['testdirs'] = test_dirs
             else:
--- a/testing/xpcshell/runxpcshelltests.py
+++ b/testing/xpcshell/runxpcshelltests.py
@@ -109,16 +109,17 @@ class XPCShellTestThread(Thread):
         self.profileName = kwargs.get('profileName')
         self.singleFile = kwargs.get('singleFile')
         self.env = copy.deepcopy(kwargs.get('env'))
         self.symbolsPath = kwargs.get('symbolsPath')
         self.logfiles = kwargs.get('logfiles')
         self.xpcshell = kwargs.get('xpcshell')
         self.xpcsRunArgs = kwargs.get('xpcsRunArgs')
         self.failureManifest = kwargs.get('failureManifest')
+        self.on_message = kwargs.get('on_message')
 
         self.tests_root_dir = tests_root_dir
         self.app_dir_key = app_dir_key
         self.interactive = interactive
         self.verbose = verbose
         self.pStdout = pStdout
         self.pStderr = pStderr
         self.keep_going = keep_going
@@ -127,16 +128,18 @@ class XPCShellTestThread(Thread):
         # only one of these will be set to 1. adding them to the totals in
         # the harness
         self.passCount = 0
         self.todoCount = 0
         self.failCount = 0
 
         self.output_lines = []
         self.has_failure_output = False
+        self.saw_proc_start = False
+        self.saw_proc_end = False
 
         # event from main thread to signal work done
         self.event = event
         self.done = False # explicitly set flag so we don't rely on thread.isAlive
 
     def run(self):
         try:
             self.run_test()
@@ -188,16 +191,28 @@ class XPCShellTestThread(Thread):
         """
         return proc.returncode
 
     def communicate(self, proc):
         """
           Simple wrapper to communicate with a process.
           On a remote system, this is overloaded to handle remote process communication.
         """
+        # Processing of incremental output put here to
+        # sidestep issues on remote platforms, where what we know
+        # as proc is a file pulled off of a device.
+        while True:
+            line = proc.stdout.readline()
+            if not line:
+                break
+            self.process_line(line)
+
+        if self.saw_proc_start and not self.saw_proc_end:
+            self.has_failure_output = True
+
         return proc.communicate()
 
     def launchProcess(self, cmd, stdout, stderr, env, cwd):
         """
           Simple wrapper to launch a process.
           On a remote system, this is more complex and we need to overload this function.
         """
         if HAVE_PSUTIL:
@@ -388,78 +403,88 @@ class XPCShellTestThread(Thread):
         if self.profileDir and not self.interactive and not self.singleFile:
             self.cleanupDir(self.profileDir, name, self.xunit_result)
 
         self.cleanupDir(self.tempDir, name, self.xunit_result)
 
         if self.pluginsDir:
             self.cleanupDir(self.pluginsDir, name, self.xunit_result)
 
-    def append_message_from_line(self, line):
-        """Given a line of raw output, convert to message and append to
-        output buffer."""
+    def message_from_line(self, line):
+        """ Given a line of raw output, convert to a string message. """
         if isinstance(line, basestring):
             # This function has received unstructured output.
             if line:
-                self.output_lines.append(line)
                 if 'TEST-UNEXPECTED-' in line:
                     self.has_failure_output = True
-            return
+            return line
 
         msg = ['%s: ' % line['process'] if 'process' in line else '']
 
         # Each call to the logger in head.js either specified '_message'
         # or both 'source_file' and 'diagnostic'. If either of these are
         # missing, they ended up being undefined as a result of the way
         # the test was run.
         if '_message' in line:
             msg.append(line['_message'])
         else:
             msg.append('%s | %s | %s' % (ACTION_STRINGS[line['action']],
                                          line.get('source_file', 'undefined'),
                                          line.get('diagnostic', 'undefined')))
 
         msg.append('\n%s' % line['stack'] if 'stack' in line else '')
-        self.output_lines.append(''.join(msg))
+        return ''.join(msg)
 
     def parse_output(self, output):
         """Parses process output for structured messages and saves output as it is
         read. Sets self.has_failure_output in case of evidence of a failure"""
-        seen_proc_start = False
-        seen_proc_end = False
-        self.output_lines = []
         for line_string in output.splitlines():
-            try:
-                line_object = json.loads(line_string)
-                if not isinstance(line_object, dict):
-                    self.append_message_from_line(line_string)
-                    continue
-            except ValueError:
-                self.append_message_from_line(line_string)
-                continue
+            self.process_line(line_string)
+
+        if self.saw_proc_start and not self.saw_proc_end:
+            self.has_failure_output = True
+
+    def report_message(self, line):
+        """ Reports a message to a consumer, both as a strucutured and
+        human-readable log message. """
+        message = self.message_from_line(line).strip()
+
+        if self.on_message:
+            self.on_message(line, message)
+        else:
+            self.output_lines.append(message)
 
-            if 'action' not in line_object:
-                # In case a test outputs something that happens to be valid
-                # JSON object.
-                self.append_message_from_line(line_string)
-                continue
-
-            action = line_object['action']
-            self.append_message_from_line(line_object)
+    def process_line(self, line_string):
+        """ Parses a single line of output, determining its significance and
+        reporting a message.
+        """
+        try:
+            line_object = json.loads(line_string)
+            if not isinstance(line_object, dict):
+                self.report_message(line_string)
+                return
+        except ValueError:
+            self.report_message(line_string)
+            return
 
-            if action in FAILURE_ACTIONS:
-                self.has_failure_output = True
+        if 'action' not in line_object:
+            # In case a test outputs something that happens to be valid
+            # JSON.
+            self.report_message(line_string)
+            return
 
-            elif action == 'child_test_start':
-                seen_proc_start = True
-            elif action == 'child_test_end':
-                seen_proc_end = True
+        action = line_object['action']
+        self.report_message(line_object)
 
-        if seen_proc_start and not seen_proc_end:
+        if action in FAILURE_ACTIONS:
             self.has_failure_output = True
+        elif action == 'child_test_start':
+            self.saw_proc_start = True
+        elif action == 'child_test_end':
+            self.saw_proc_end = True
 
     def log_output(self, output):
         """Prints given output line-by-line to avoid overflowing buffers."""
         self.log.info(">>>>>>>")
         if output:
             if isinstance(output, basestring):
                 output = output.splitlines()
             for part in output:
@@ -574,18 +599,20 @@ class XPCShellTestThread(Thread):
                           (self.getReturnCode(proc) != 0))
 
             if result != expected:
                 if self.retry:
                     self.clean_temp_dirs(name, stdout)
                     return
 
                 failureType = "TEST-UNEXPECTED-%s" % ("FAIL" if expected else "PASS")
-                message = "%s | %s | test failed (with xpcshell return code: %d), see following log:" % (
+                message = "%s | %s | test failed (with xpcshell return code: %d)" % (
                               failureType, name, self.getReturnCode(proc))
+                if self.output_lines:
+                    message += ", see following log:"
 
                 with LOG_MUTEX:
                     self.log.error(message)
                     self.log_output(self.output_lines)
 
                 self.failCount += 1
                 self.xunit_result["passed"] = False
 
@@ -1087,17 +1114,17 @@ class XPCShellTests(object):
                  manifest=None, testdirs=None, testPath=None, mobileArgs=None,
                  interactive=False, verbose=False, keepGoing=False, logfiles=True,
                  thisChunk=1, totalChunks=1, debugger=None,
                  debuggerArgs=None, debuggerInteractive=False,
                  profileName=None, mozInfo=None, sequential=False, shuffle=False,
                  testsRootDir=None, xunitFilename=None, xunitName=None,
                  testingModulesDir=None, autolog=False, pluginsPath=None,
                  testClass=XPCShellTestThread, failureManifest=None,
-                 **otherOptions):
+                 on_message=None, **otherOptions):
         """Run xpcshell tests.
 
         |xpcshell|, is the xpcshell executable to use to run the tests.
         |xrePath|, if provided, is the path to the XRE to use.
         |appPath|, if provided, is the path to an application directory.
         |symbolsPath|, if provided is the path to a directory containing
           breakpad symbols for processing crashes in tests.
         |manifest|, if provided, is a file containing a list of
@@ -1176,16 +1203,17 @@ class XPCShellTests(object):
         self.symbolsPath = symbolsPath
         self.manifest = manifest
         self.testdirs = testdirs
         self.testPath = testPath
         self.interactive = interactive
         self.verbose = verbose
         self.keepGoing = keepGoing
         self.logfiles = logfiles
+        self.on_message = on_message
         self.totalChunks = totalChunks
         self.thisChunk = thisChunk
         self.debuggerInfo = getDebuggerInfo(self.oldcwd, debugger, debuggerArgs, debuggerInteractive)
         self.profileName = profileName or "xpcshell"
         self.mozInfo = mozInfo
         self.testingModulesDir = testingModulesDir
         self.pluginsPath = pluginsPath
         self.sequential = sequential
@@ -1252,17 +1280,18 @@ class XPCShellTests(object):
             'testharnessdir': self.testharnessdir,
             'profileName': self.profileName,
             'singleFile': self.singleFile,
             'env': self.env, # making a copy of this in the testthreads
             'symbolsPath': self.symbolsPath,
             'logfiles': self.logfiles,
             'xpcshell': self.xpcshell,
             'xpcsRunArgs': self.xpcsRunArgs,
-            'failureManifest': failureManifest
+            'failureManifest': failureManifest,
+            'on_message': self.on_message,
         }
 
         if self.sequential:
             # Allow user to kill hung xpcshell subprocess with SIGINT
             # when we are only running tests sequentially.
             signal.signal(signal.SIGINT, markGotSIGINT)
 
         if self.debuggerInfo: