python/psutil/test/_bsd.py
author Alexandre Poirot <poirot.alex@gmail.com>
Wed, 04 Nov 2015 08:56:00 +0100
changeset 307645 f4c516bc60ee0a50885a73c8e9a97b8bb8e58162
parent 286873 2eea7ab5228dde61fdc8706ea8452df10da6dd1d
permissions -rw-r--r--
Bug 1221238 - Fix devtools filter popup inputs width on linux. r=pbrosset

#!/usr/bin/env python

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

# TODO: add test for comparing connections with 'sockstat' cmd

"""BSD specific tests.  These are implicitly run by test_psutil.py."""

import os
import subprocess
import sys
import time

import psutil

from psutil._compat import PY3
from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which,
                         retry_before_failing, reap_children, unittest)


PAGESIZE = os.sysconf("SC_PAGE_SIZE")
if os.getuid() == 0:  # muse requires root privileges
    MUSE_AVAILABLE = which('muse')
else:
    MUSE_AVAILABLE = False


def sysctl(cmdline):
    """Expects a sysctl command with an argument and parse the result
    returning only the value of interest.
    """
    result = sh("sysctl " + cmdline)
    result = result[result.find(": ") + 2:]
    try:
        return int(result)
    except ValueError:
        return result


def muse(field):
    """Thin wrapper around 'muse' cmdline utility."""
    out = sh('muse')
    for line in out.split('\n'):
        if line.startswith(field):
            break
    else:
        raise ValueError("line not found")
    return int(line.split()[1])


@unittest.skipUnless(BSD, "not a BSD system")
class BSDSpecificTestCase(unittest.TestCase):

    @classmethod
    def setUpClass(cls):
        cls.pid = get_test_subprocess().pid

    @classmethod
    def tearDownClass(cls):
        reap_children()

    def test_boot_time(self):
        s = sysctl('sysctl kern.boottime')
        s = s[s.find(" sec = ") + 7:]
        s = s[:s.find(',')]
        btime = int(s)
        self.assertEqual(btime, psutil.boot_time())

    def test_process_create_time(self):
        cmdline = "ps -o lstart -p %s" % self.pid
        p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
        output = p.communicate()[0]
        if PY3:
            output = str(output, sys.stdout.encoding)
        start_ps = output.replace('STARTED', '').strip()
        start_psutil = psutil.Process(self.pid).create_time()
        start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
                                     time.localtime(start_psutil))
        self.assertEqual(start_ps, start_psutil)

    def test_disks(self):
        # test psutil.disk_usage() and psutil.disk_partitions()
        # against "df -a"
        def df(path):
            out = sh('df -k "%s"' % path).strip()
            lines = out.split('\n')
            lines.pop(0)
            line = lines.pop(0)
            dev, total, used, free = line.split()[:4]
            if dev == 'none':
                dev = ''
            total = int(total) * 1024
            used = int(used) * 1024
            free = int(free) * 1024
            return dev, total, used, free

        for part in psutil.disk_partitions(all=False):
            usage = psutil.disk_usage(part.mountpoint)
            dev, total, used, free = df(part.mountpoint)
            self.assertEqual(part.device, dev)
            self.assertEqual(usage.total, total)
            # 10 MB tollerance
            if abs(usage.free - free) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.free, free))
            if abs(usage.used - used) > 10 * 1024 * 1024:
                self.fail("psutil=%s, df=%s" % (usage.used, used))

    @retry_before_failing()
    def test_memory_maps(self):
        out = sh('procstat -v %s' % self.pid)
        maps = psutil.Process(self.pid).memory_maps(grouped=False)
        lines = out.split('\n')[1:]
        while lines:
            line = lines.pop()
            fields = line.split()
            _, start, stop, perms, res = fields[:5]
            map = maps.pop()
            self.assertEqual("%s-%s" % (start, stop), map.addr)
            self.assertEqual(int(res), map.rss)
            if not map.path.startswith('['):
                self.assertEqual(fields[10], map.path)

    def test_exe(self):
        out = sh('procstat -b %s' % self.pid)
        self.assertEqual(psutil.Process(self.pid).exe(),
                         out.split('\n')[1].split()[-1])

    def test_cmdline(self):
        out = sh('procstat -c %s' % self.pid)
        self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
                         ' '.join(out.split('\n')[1].split()[2:]))

    def test_uids_gids(self):
        out = sh('procstat -s %s' % self.pid)
        euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
        p = psutil.Process(self.pid)
        uids = p.uids()
        gids = p.gids()
        self.assertEqual(uids.real, int(ruid))
        self.assertEqual(uids.effective, int(euid))
        self.assertEqual(uids.saved, int(suid))
        self.assertEqual(gids.real, int(rgid))
        self.assertEqual(gids.effective, int(egid))
        self.assertEqual(gids.saved, int(sgid))

    # --- virtual_memory(); tests against sysctl

    def test_vmem_total(self):
        syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE
        self.assertEqual(psutil.virtual_memory().total, syst)

    @retry_before_failing()
    def test_vmem_active(self):
        syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
        self.assertAlmostEqual(psutil.virtual_memory().active, syst,
                               delta=TOLERANCE)

    @retry_before_failing()
    def test_vmem_inactive(self):
        syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
        self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
                               delta=TOLERANCE)

    @retry_before_failing()
    def test_vmem_wired(self):
        syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
        self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
                               delta=TOLERANCE)

    @retry_before_failing()
    def test_vmem_cached(self):
        syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
        self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
                               delta=TOLERANCE)

    @retry_before_failing()
    def test_vmem_free(self):
        syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
        self.assertAlmostEqual(psutil.virtual_memory().free, syst,
                               delta=TOLERANCE)

    @retry_before_failing()
    def test_vmem_buffers(self):
        syst = sysctl("vfs.bufspace")
        self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
                               delta=TOLERANCE)

    def test_cpu_count_logical(self):
        syst = sysctl("hw.ncpu")
        self.assertEqual(psutil.cpu_count(logical=True), syst)

    # --- virtual_memory(); tests against muse

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    def test_total(self):
        num = muse('Total')
        self.assertEqual(psutil.virtual_memory().total, num)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_active(self):
        num = muse('Active')
        self.assertAlmostEqual(psutil.virtual_memory().active, num,
                               delta=TOLERANCE)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_inactive(self):
        num = muse('Inactive')
        self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
                               delta=TOLERANCE)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_wired(self):
        num = muse('Wired')
        self.assertAlmostEqual(psutil.virtual_memory().wired, num,
                               delta=TOLERANCE)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_cached(self):
        num = muse('Cache')
        self.assertAlmostEqual(psutil.virtual_memory().cached, num,
                               delta=TOLERANCE)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_free(self):
        num = muse('Free')
        self.assertAlmostEqual(psutil.virtual_memory().free, num,
                               delta=TOLERANCE)

    @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
    @retry_before_failing()
    def test_buffers(self):
        num = muse('Buffer')
        self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
                               delta=TOLERANCE)


def main():
    test_suite = unittest.TestSuite()
    test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
    return result.wasSuccessful()

if __name__ == '__main__':
    if not main():
        sys.exit(1)