Bug 883205 - Add mozsystemmonitor to mozbase; r=ahal
authorGregory Szorc <gps@mozilla.com>
Fri, 14 Jun 2013 15:15:58 -0700
changeset 135144 46dc449aa545282e11104a0ef8c014f196908122
parent 135143 7352b0e058d6ce53baf819b207ab5217b6aa260f
child 135145 a74b3d43c3af2b8b09437a735e65c0e5bdbf7e4a
child 135165 05d9196b27a1dcf13049ba8be533a044cc642444
push id29539
push usergszorc@mozilla.com
push dateFri, 14 Jun 2013 22:16:16 +0000
treeherdermozilla-inbound@46dc449aa545 [default view] [failures only]
perfherder[talos] [build metrics] [platform microbench] (compared to previous push)
reviewersahal
bugs883205
milestone24.0a1
first release with
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
last release without
nightly linux32
nightly linux64
nightly mac
nightly win32
nightly win64
Bug 883205 - Add mozsystemmonitor to mozbase; r=ahal
testing/mozbase/Makefile.in
testing/mozbase/mozsystemmonitor/README.rst
testing/mozbase/mozsystemmonitor/mozsystemmonitor/__init__.py
testing/mozbase/mozsystemmonitor/mozsystemmonitor/resourcemonitor.py
testing/mozbase/mozsystemmonitor/mozsystemmonitor/test/__init__.py
testing/mozbase/mozsystemmonitor/mozsystemmonitor/test/test_resource_monitor.py
testing/mozbase/mozsystemmonitor/setup.py
testing/mozbase/packages.txt
--- a/testing/mozbase/Makefile.in
+++ b/testing/mozbase/Makefile.in
@@ -20,16 +20,17 @@ MOZBASE_PACKAGES = \
   mozinfo \
   mozinstall \
   mozlog \
   mozprocess \
   mozprofile \
   mozrunner \
   mozdevice \
   moznetwork \
+  mozsystemmonitor \
   $(NULL)
 
 MOZBASE_EXTRAS = \
   setup_development.py \
   $(NULL)
 
 _DEST_DIR = $(DEPTH)/_tests/mozbase
 libs:: $(MOZBASE_PACKAGES)
new file mode 100644
--- /dev/null
+++ b/testing/mozbase/mozsystemmonitor/README.rst
@@ -0,0 +1,13 @@
+================
+mozsystemmonitor
+================
+
+mozsystemmonitor contains modules for monitoring a running system.
+
+SystemResourceMonitor
+=====================
+
+mozsystemmonitor.resourcemonitor.SystemResourceMonitor is class used to
+measure system resource usage. It is useful to get a handle on what an
+overall system is doing.
+
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/testing/mozbase/mozsystemmonitor/mozsystemmonitor/resourcemonitor.py
@@ -0,0 +1,481 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this file,
+# You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import multiprocessing
+import sys
+import time
+
+try:
+    import psutil
+except ImportError:
+    psutil = None
+
+from collections import (
+    OrderedDict,
+    namedtuple,
+)
+
+from contextlib import contextmanager
+
+
+def _collect(pipe, poll_interval):
+    """Collects system metrics.
+
+    This is the main function for the background process. It collects
+    data then forwards it on a pipe until told to stop.
+    """
+
+    data = []
+
+    # Establish initial values.
+
+    # We should ideally use a monotonic clock. However, Python 2.7 doesn't
+    # make a monotonic clock available on all platforms. Python 3.3 does!
+    last_time = time.time()
+    io_last = psutil.disk_io_counters()
+    cpu_last = psutil.cpu_times(True)
+    swap_last = psutil.swap_memory()
+    psutil.cpu_percent(None, True)
+
+    sin_index = swap_last._fields.index('sin')
+    sout_index = swap_last._fields.index('sout')
+
+    sleep_interval = poll_interval
+
+    while not pipe.poll(sleep_interval):
+        io = psutil.disk_io_counters()
+        cpu_times = psutil.cpu_times(True)
+        cpu_percent = psutil.cpu_percent(None, True)
+        virt_mem = psutil.virtual_memory()
+        swap_mem = psutil.swap_memory()
+        measured_end_time = time.time()
+
+        # TODO Does this wrap? At 32 bits? At 64 bits?
+        # TODO Consider patching "delta" API to upstream.
+        io_diff = [v - io_last[i] for i, v in enumerate(io)]
+        io_last = io
+
+        cpu_diff = []
+        for core, values in enumerate(cpu_times):
+            cpu_diff.append([v - cpu_last[core][i] for i, v in
+                enumerate(values)])
+
+        cpu_last = cpu_times
+
+        swap_entry = list(swap_mem)
+        swap_entry[sin_index] = swap_mem.sin - swap_last.sin
+        swap_entry[sout_index] = swap_mem.sout - swap_last.sout
+        swap_last = swap_mem
+
+        data.append((last_time, measured_end_time, io_diff, cpu_diff,
+            cpu_percent, list(virt_mem), swap_entry))
+
+        collection_overhead = time.time() - last_time - poll_interval
+        last_time = measured_end_time
+        sleep_interval = max(0, poll_interval - collection_overhead)
+
+    for entry in data:
+        pipe.send(entry)
+
+    pipe.send(('done', None, None, None, None, None, None))
+    pipe.close()
+    sys.exit(0)
+
+
+SystemResourceUsage = namedtuple('SystemResourceUsage',
+    ['start', 'end', 'cpu_times', 'cpu_percent', 'io', 'virt', 'swap'])
+
+
+class SystemResourceMonitor(object):
+    """Measures system resources.
+
+    Each instance measures system resources from the time it is started
+    until it is finished. It does this on a separate process so it doesn't
+    impact execution of the main Python process.
+
+    Each instance is a one-shot instance. It cannot be used to record multiple
+    durations.
+
+    Aside from basic data gathering, the class supports basic analysis
+    capabilities. You can query for data between ranges. You can also tell it
+    when certain events occur and later grab data relevant to those events or
+    plot those events on a timeline.
+
+    The resource monitor works by periodically polling the state of the
+    system. By default, it polls every second. This can be adjusted depending
+    on the required granularity of the data and considerations for probe
+    overhead. It tries to probe at the interval specified. However, variations
+    should be expected. Fast and well-behaving systems should experience
+    variations in the 1ms range. Larger variations may exist if the system is
+    under heavy load or depending on how accurate socket polling is on your
+    system.
+
+    In its current implementation, data is not available until collection has
+    stopped. This may change in future iterations.
+
+    Usage
+    =====
+
+    monitor = SystemResourceMonitor()
+    monitor.start()
+
+    # Record that a single event in time just occurred.
+    foo.do_stuff()
+    monitor.record_event('foo_did_stuff')
+
+    # Record that we're about to perform a possibly long-running event.
+    with monitor.phase('long_job'):
+        foo.do_long_running_job()
+
+    # Stop recording. Currently we need to stop before data is available.
+    monitor.stop()
+
+    # Obtain the raw data for the entire probed range.
+    print('CPU Usage:')
+    for core in monitor.aggregate_cpu():
+        print(core)
+
+    # We can also request data corresponding to a specific phase.
+    for data in monitor.phase_usage('long_job'):
+        print(data.cpu_percent)
+    """
+
+    # The interprocess communication is complicated enough to warrant
+    # explanation. To work around the Python GIL, we launch a separate
+    # background process whose only job is to collect metrics. If we performed
+    # collection in the main process, the polling interval would be
+    # inconsistent if a long-running function were on the stack. Since the
+    # child process is independent of the instantiating process, data
+    # collection should be evenly spaced.
+    #
+    # As the child process collects data, it buffers it locally. When
+    # collection stops, it flushes all that data to a pipe to be read by
+    # the parent process.
+
+    def __init__(self, poll_interval=1.0):
+        """Instantiate a system resource monitor instance.
+
+        The instance is configured with a poll interval. This is the interval
+        between samples, in float seconds.
+        """
+        self.start_time = None
+        self.end_time = None
+
+        self.events = []
+        self.phases = OrderedDict()
+
+        self._active_phases = {}
+
+        self._running = False
+        self._stopped = False
+
+        if psutil is None:
+            return
+
+        cpu_percent = psutil.cpu_percent(0.0, True)
+        cpu_times = psutil.cpu_times(False)
+        io = psutil.disk_io_counters()
+        virt = psutil.virtual_memory()
+        swap = psutil.swap_memory()
+
+        self._cpu_cores = len(cpu_percent)
+        self._cpu_times_type = type(cpu_times)
+        self._cpu_times_len = len(cpu_times)
+        self._io_type = type(io)
+        self._io_len = len(io)
+        self._virt_type = type(virt)
+        self._virt_len = len(virt)
+        self._swap_type = type(swap)
+        self._swap_len = len(swap)
+
+        self._pipe, child_pipe = multiprocessing.Pipe(True)
+
+        self._process = multiprocessing.Process(None, _collect,
+            args=(child_pipe, poll_interval))
+
+    def __del__(self):
+        if self._running:
+            self._pipe.send(('terminate',))
+            self._process.join()
+
+    # Methods to control monitoring.
+
+    def start(self):
+        """Start measuring system-wide CPU resource utilization.
+
+        You should only call this once per instance.
+        """
+        if psutil is None:
+            return
+
+        self._running = True
+        self._process.start()
+
+    def stop(self):
+        """Stop measuring system-wide CPU resource utilization.
+
+        You should call this if and only if you have called start(). You should
+        always pair a stop() with a start().
+
+        Currently, data is not available until you call stop().
+        """
+        if psutil is None:
+            self._stopped = True
+            return
+
+        assert self._running
+        assert not self._stopped
+
+        self._pipe.send(('terminate'))
+        self._running = False
+        self._stopped = True
+
+        self.measurements = []
+
+        done = False
+
+        while self._pipe.poll(1):
+            start_time, end_time, io_diff, cpu_diff, cpu_percent, virt_mem, \
+                swap_mem = self._pipe.recv()
+
+            if start_time == 'done':
+                done = True
+                continue
+
+            io = self._io_type(*io_diff)
+            virt = self._virt_type(*virt_mem)
+            swap = self._swap_type(*swap_mem)
+            cpu_times = [self._cpu_times_type(*v) for v in cpu_diff]
+
+            self.measurements.append(SystemResourceUsage(start_time, end_time,
+                cpu_times, cpu_percent, io, virt, swap))
+
+        self._process.join()
+        assert done
+
+        if len(self.measurements):
+            self.start_time = self.measurements[0].start
+            self.end_time = self.measurements[-1].end
+
+    # Methods to record events alongside the monitored data.
+
+    def record_event(self, name):
+        """Record an event as occuring now.
+
+        Events are actions that occur at a specific point in time. If you are
+        looking for an action that has a duration, see the phase API below.
+        """
+        self.events.append((time.time(), name))
+
+    @contextmanager
+    def phase(self, name):
+        """Context manager for recording an active phase."""
+        self.begin_phase(name)
+        yield
+        self.finish_phase(name)
+
+    def begin_phase(self, name):
+        """Record the start of a phase.
+
+        Phases are actions that have a duration. Multiple phases can be active
+        simultaneously. Phases can be closed in any order.
+
+        Keep in mind that if phases occur in parallel, it will become difficult
+        to isolate resource utilization specific to individual phases.
+        """
+        assert name not in self._active_phases
+
+        self._active_phases[name] = time.time()
+
+    def finish_phase(self, name):
+        """Record the end of a phase."""
+
+        assert name in self._active_phases
+
+        phase = (self._active_phases[name], time.time())
+        self.phases[name] = phase
+        del self._active_phases[name]
+
+        return phase[1] - phase[0]
+
+    # Methods to query data.
+
+    def range_usage(self, start=None, end=None):
+        """Obtain the usage data falling within the given time range.
+
+        This is a generator of SystemResourceUsage.
+
+        If no time range bounds are given, all data is returned.
+        """
+        if not self._stopped or self.start_time is None:
+            return
+
+        if start is None:
+            start = self.start_time
+
+        if end is None:
+            end = self.end_time
+
+        for entry in self.measurements:
+            if entry.start < start:
+                continue
+
+            if entry.end > end:
+                break
+
+            yield entry
+
+    def phase_usage(self, phase):
+        """Obtain usage data for a specific phase.
+
+        This is a generator of SystemResourceUsage.
+        """
+        time_start, time_end = self.phases[phase]
+
+        return self.range_usage(time_start, time_end)
+
+    def between_events_usage(self, start_event, end_event):
+        """Obtain usage data between two point events.
+
+        This is a generator of SystemResourceUsage.
+        """
+        start_time = None
+        end_time = None
+
+        for t, name in self.events:
+            if name == start_event:
+                start_time = t
+            elif name == end_event:
+                end_time = t
+
+        if start_time is None:
+            raise Exception('Could not find start event: %s' % start_event)
+
+        if end_time is None:
+            raise Exception('Could not find end event: %s' % end_event)
+
+        return self.range_usage(start_time, end_time)
+
+    def aggregate_cpu_percent(self, start=None, end=None, phase=None,
+        per_cpu=True):
+        """Obtain the aggregate CPU percent usage for a range.
+
+        Returns a list of floats representing average CPU usage percentage per
+        core if per_cpu is True (the default). If per_cpu is False, return a
+        single percentage value.
+
+        By default this will return data for the entire instrumented interval.
+        If phase is defined, data for a named phase will be returned. If start
+        and end are defined, these times will be fed into range_usage().
+        """
+        cpu = [[] for i in range(0, self._cpu_cores)]
+
+        if phase:
+            data = self.phase_usage(phase)
+        else:
+            data = self.range_usage(start, end)
+
+        for usage in data:
+            for i, v in enumerate(usage.cpu_percent):
+                cpu[i].append(v)
+
+        samples = len(cpu[0])
+
+        if not samples:
+            return None
+
+        if per_cpu:
+            return [sum(x) / samples for x in cpu]
+
+        cores = [sum(x) for x in cpu]
+
+        return sum(cores) / len(cpu) / samples
+
+    def aggregate_cpu_times(self, start=None, end=None, phase=None,
+        per_cpu=True):
+        """Obtain the aggregate CPU times for a range.
+
+        If per_cpu is True (the default), this returns a list of named tuples.
+        Each tuple is as if it were returned by psutil.cpu_times(). If per_cpu
+        is False, this returns a single named tuple of the aforementioned type.
+        """
+        empty = [0 for i in range(0, self._cpu_times_len)]
+        cpu = [list(empty) for i in range(0, self._cpu_cores)]
+
+        if phase:
+            data = self.phase_usage(phase)
+        else:
+            data = self.range_usage(start, end)
+
+        for usage in data:
+            for i, core_values in enumerate(usage.cpu_times):
+                for j, v in enumerate(core_values):
+                    cpu[i][j] += v
+
+        if per_cpu:
+            return [self._cpu_times_type(*v) for v in cpu]
+
+        sums = list(empty)
+        for core in cpu:
+            for i, v in enumerate(core):
+                sums[i] += v
+
+        return self._cpu_times_type(*sums)
+
+    def aggregate_io(self, start=None, end=None, phase=None):
+        """Obtain aggregate I/O counters for a range.
+
+        Returns an iostat named tuple from psutil.
+        """
+
+        io = [0 for i in range(self._io_len)]
+
+        if phase:
+            data = self.phase_usage(phase)
+        else:
+            data = self.range_usage(start, end)
+
+        for usage in data:
+            for i, v in enumerate(usage.io):
+                io[i] += v
+
+        return self._io_type(*io)
+
+    def min_memory_available(self, start=None, end=None, phase=None):
+        """Return the minimum observed available memory number from a range.
+
+        Returns long bytes of memory available.
+
+        See psutil for notes on how this is calculated.
+        """
+        if phase:
+            data = self.phase_usage(phase)
+        else:
+            data = self.range_usage(start, end)
+
+        values = []
+
+        for usage in data:
+            values.append(usage.virt.available)
+
+        return min(values)
+
+    def max_memory_percent(self, start=None, end=None, phase=None):
+        """Returns the maximum percentage of system memory used.
+
+        Returns a float percentage. 1.00 would mean all system memory was in
+        use at one point.
+        """
+        if phase:
+            data = self.phase_usage(phase)
+        else:
+            data = self.range_usage(start, end)
+
+        values = []
+
+        for usage in data:
+            values.append(usage.virt.percent)
+
+        return max(values)
+
new file mode 100644
new file mode 100644
--- /dev/null
+++ b/testing/mozbase/mozsystemmonitor/mozsystemmonitor/test/test_resource_monitor.py
@@ -0,0 +1,154 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import multiprocessing
+import tempfile
+import time
+import unittest
+
+try:
+    import psutil
+except ImportError:
+    psutil = None
+
+from mozsystemmonitor.resourcemonitor import (
+    SystemResourceMonitor,
+    SystemResourceUsage,
+)
+
+
+@unittest.skipIf(psutil is None, 'Resource monitor requires psutil.')
+class TestResourceMonitor(unittest.TestCase):
+    def test_basic(self):
+        monitor = SystemResourceMonitor(poll_interval=0.5)
+
+        monitor.start()
+        time.sleep(3)
+
+        monitor.stop()
+
+        data = list(monitor.range_usage())
+        self.assertGreater(len(data), 3)
+
+        self.assertIsInstance(data[0], SystemResourceUsage)
+
+    def test_empty(self):
+        monitor = SystemResourceMonitor(poll_interval=2.0)
+        monitor.start()
+        monitor.stop()
+
+        data = list(monitor.range_usage())
+        self.assertEqual(len(data), 0)
+
+    def test_phases(self):
+        monitor = SystemResourceMonitor(poll_interval=0.25)
+
+        monitor.start()
+        time.sleep(1)
+
+        with monitor.phase('phase1'):
+            time.sleep(1)
+
+            with monitor.phase('phase2'):
+                time.sleep(1)
+
+        monitor.stop()
+
+        self.assertEqual(len(monitor.phases), 2)
+        self.assertEqual(['phase2', 'phase1'], monitor.phases.keys())
+
+        all = list(monitor.range_usage())
+        data1 = list(monitor.phase_usage('phase1'))
+        data2 = list(monitor.phase_usage('phase2'))
+
+        self.assertGreater(len(all), len(data1))
+        self.assertGreater(len(data1), len(data2))
+
+        # This could fail if time.time() takes more than 0.1s. It really
+        # shouldn't.
+        self.assertAlmostEqual(data1[-1].end, data2[-1].end, delta=0.25)
+
+    def test_no_data(self):
+        monitor = SystemResourceMonitor()
+
+        data = list(monitor.range_usage())
+        self.assertEqual(len(data), 0)
+
+    def test_events(self):
+        monitor = SystemResourceMonitor(poll_interval=0.25)
+
+        monitor.start()
+        time.sleep(0.5)
+
+        t0 = time.time()
+        monitor.record_event('t0')
+        time.sleep(0.5)
+
+        t1 = time.time()
+        monitor.record_event('t1')
+        time.sleep(0.5)
+        monitor.stop()
+
+        events = monitor.events
+        self.assertEqual(len(events), 2)
+
+        event = events[0]
+
+        self.assertEqual(event[1], 't0')
+        self.assertAlmostEqual(event[0], t0, delta=0.25)
+
+        data = list(monitor.between_events_usage('t0', 't1'))
+        self.assertGreater(len(data), 0)
+
+    def test_aggregate_cpu(self):
+        monitor = SystemResourceMonitor(poll_interval=0.25)
+
+        monitor.start()
+        time.sleep(1)
+        monitor.stop()
+
+        values = monitor.aggregate_cpu_percent()
+        self.assertIsInstance(values, list)
+        self.assertEqual(len(values), multiprocessing.cpu_count())
+        for v in values:
+            self.assertIsInstance(v, float)
+
+        value = monitor.aggregate_cpu_percent(per_cpu=False)
+        self.assertIsInstance(value, float)
+
+        values = monitor.aggregate_cpu_times()
+        self.assertIsInstance(values, list)
+        self.assertGreater(len(values), 0)
+        self.assertTrue(hasattr(values[0], 'user'))
+
+        t = type(values[0])
+
+        value = monitor.aggregate_cpu_times(per_cpu=False)
+        self.assertIsInstance(value, t)
+
+    def test_aggregate_io(self):
+        monitor = SystemResourceMonitor(poll_interval=0.25)
+
+        # There's really no easy way to ensure I/O occurs. For all we know
+        # reads and writes will all be serviced by the page cache.
+        monitor.start()
+        time.sleep(1.0)
+        monitor.stop()
+
+        values = monitor.aggregate_io()
+        self.assertTrue(hasattr(values, 'read_count'))
+
+    def test_memory(self):
+        monitor = SystemResourceMonitor(poll_interval=0.25)
+
+        monitor.start()
+        time.sleep(1.0)
+        monitor.stop()
+
+        v = monitor.min_memory_available()
+        self.assertIsInstance(v, long)
+
+        v = monitor.max_memory_percent()
+        self.assertIsInstance(v, float)
+
new file mode 100644
--- /dev/null
+++ b/testing/mozbase/mozsystemmonitor/setup.py
@@ -0,0 +1,29 @@
+# This Source Code Form is subject to the terms of the Mozilla Public
+# License, v. 2.0. If a copy of the MPL was not distributed with this
+# file, You can obtain one at http://mozilla.org/MPL/2.0/.
+
+import os
+
+from setuptools import setup
+
+PACKAGE_VERSION = '0.0'
+
+try:
+    pwd = os.path.dirname(os.path.abspath(__file__))
+    description = open(os.path.join(here, 'README.rst')).read()
+except:
+    description = ''
+
+setup(
+    name='mozsystemmonitor',
+    description='Monitor system resource usage.',
+    long_description=description,
+    license='MPL 2.0',
+    keywords='mozilla',
+    author='Mozilla Automation and Tools Team',
+    author_email='tools@lists.mozilla.org',
+    url='https://wiki.mozilla.org/Auto-tools/Projects/Mozbase',
+    packages=['mozsystemmonitor'],
+    version=PACKAGE_VERSION,
+    install_requires=['psutil >= 0.7.1'],
+)
--- a/testing/mozbase/packages.txt
+++ b/testing/mozbase/packages.txt
@@ -6,9 +6,10 @@ mozfile.pth:testing/mozbase/mozfile
 mozhttpd.pth:testing/mozbase/mozhttpd
 mozinfo.pth:testing/mozbase/mozinfo
 mozinstall.pth:testing/mozbase/mozinstall
 mozlog.pth:testing/mozbase/mozlog
 moznetwork.pth:testing/mozbase/moznetwork
 mozprocess.pth:testing/mozbase/mozprocess
 mozprofile.pth:testing/mozbase/mozprofile
 mozrunner.pth:testing/mozbase/mozrunner
+mozsystemmonitor.pth:testing/mozbase/mozsystemmonitor
 moztest.pth:testing/mozbase/moztest