author Alexandre Poirot <>
Wed, 04 Nov 2015 08:56:00 +0100
changeset 307645 f4c516bc60ee0a50885a73c8e9a97b8bb8e58162
parent 286873 2eea7ab5228dde61fdc8706ea8452df10da6dd1d
permissions -rw-r--r--
Bug 1221238 - Fix devtools filter popup inputs width on linux. r=pbrosset

#!/usr/bin/env python

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""POSIX specific tests.  These are implicitly run by"""

import datetime
import os
import subprocess
import sys
import time

import psutil

from psutil._compat import PY3, callable
from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
from test_psutil import (get_test_subprocess, skip_on_access_denied,
                         retry_before_failing, reap_children, sh, unittest,
                         get_kernel_version, wait_for_pid)

def ps(cmd):
    """Expects a ps command with a -o argument and parse the result
    returning only the value of interest.
    if not LINUX:
        cmd = cmd.replace(" --no-headers ", " ")
    if SUNOS:
        cmd = cmd.replace("-o command", "-o comm")
        cmd = cmd.replace("-o start", "-o stime")
    p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
    output = p.communicate()[0].strip()
    if PY3:
        output = str(output, sys.stdout.encoding)
    if not LINUX:
        output = output.split('\n')[1].strip()
        return int(output)
    except ValueError:
        return output

@unittest.skipUnless(POSIX, "not a POSIX system")
class PosixSpecificTestCase(unittest.TestCase):
    """Compare psutil results against 'ps' command line utility."""

    def setUpClass(cls): = get_test_subprocess([PYTHON, "-E", "-O"],

    def tearDownClass(cls):

    # for ps -o arguments see:

    def test_process_parent_pid(self):
        ppid_ps = ps("ps --no-headers -o ppid -p %s" %
        ppid_psutil = psutil.Process(
        self.assertEqual(ppid_ps, ppid_psutil)

    def test_process_uid(self):
        uid_ps = ps("ps --no-headers -o uid -p %s" %
        uid_psutil = psutil.Process(
        self.assertEqual(uid_ps, uid_psutil)

    def test_process_gid(self):
        gid_ps = ps("ps --no-headers -o rgid -p %s" %
        gid_psutil = psutil.Process(
        self.assertEqual(gid_ps, gid_psutil)

    def test_process_username(self):
        username_ps = ps("ps --no-headers -o user -p %s" %
        username_psutil = psutil.Process(
        self.assertEqual(username_ps, username_psutil)

    def test_process_rss_memory(self):
        # give python interpreter some time to properly initialize
        # so that the results are the same
        rss_ps = ps("ps --no-headers -o rss -p %s" %
        rss_psutil = psutil.Process([0] / 1024
        self.assertEqual(rss_ps, rss_psutil)

    def test_process_vsz_memory(self):
        # give python interpreter some time to properly initialize
        # so that the results are the same
        vsz_ps = ps("ps --no-headers -o vsz -p %s" %
        vsz_psutil = psutil.Process([1] / 1024
        self.assertEqual(vsz_ps, vsz_psutil)

    def test_process_name(self):
        # use command + arg since "comm" keyword not supported on all platforms
        name_ps = ps("ps --no-headers -o command -p %s" % (
  ' ')[0]
        # remove path if there is any, from the command
        name_ps = os.path.basename(name_ps).lower()
        name_psutil = psutil.Process(
        self.assertEqual(name_ps, name_psutil)

    @unittest.skipIf(OSX or BSD,
                     'ps -o start not available')
    def test_process_create_time(self):
        time_ps = ps("ps --no-headers -o start -p %s" %' ')[0]
        time_psutil = psutil.Process(
        time_psutil_tstamp = datetime.datetime.fromtimestamp(
        # sometimes ps shows the time rounded up instead of down, so we check
        # for both possible values
        round_time_psutil = round(time_psutil)
        round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
        self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])

    def test_process_exe(self):
        ps_pathname = ps("ps --no-headers -o command -p %s" %
               ' ')[0]
        psutil_pathname = psutil.Process(
            self.assertEqual(ps_pathname, psutil_pathname)
        except AssertionError:
            # certain platforms such as BSD are more accurate returning:
            # "/usr/local/bin/python2.7"
            # ...instead of:
            # "/usr/local/bin/python"
            # We do not want to consider this difference in accuracy
            # an error.
            adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
            self.assertEqual(ps_pathname, adjusted_ps_pathname)

    def test_process_cmdline(self):
        ps_cmdline = ps("ps --no-headers -o command -p %s" %
        psutil_cmdline = " ".join(psutil.Process(
        if SUNOS:
            # ps on Solaris only shows the first part of the cmdline
            psutil_cmdline = psutil_cmdline.split(" ")[0]
        self.assertEqual(ps_cmdline, psutil_cmdline)

    def test_pids(self):
        # Note: this test might fail if the OS is starting/killing
        # other processes in the meantime
        if SUNOS:
            cmd = ["ps", "ax"]
            cmd = ["ps", "ax", "-o", "pid"]
        p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        if PY3:
            output = str(output, sys.stdout.encoding)
        pids_ps = []
        for line in output.split('\n')[1:]:
            if line:
                pid = int(line.split()[0].strip())
        # remove ps subprocess pid which is supposed to be dead in meantime
        pids_psutil = psutil.pids()

        # on OSX ps doesn't show pid 0
        if OSX and 0 not in pids_ps:
            pids_ps.insert(0, 0)

        if pids_ps != pids_psutil:
            difference = [x for x in pids_psutil if x not in pids_ps] + \
                         [x for x in pids_ps if x not in pids_psutil]
  "difference: " + str(difference))

    # for some reason ifconfig -a does not report all interfaces
    # returned by psutil
    @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
    @unittest.skipIf(TRAVIS, "test not reliable on Travis")
    def test_nic_names(self):
        p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
        output = p.communicate()[0].strip()
        if PY3:
            output = str(output, sys.stdout.encoding)
        for nic in psutil.net_io_counters(pernic=True).keys():
            for line in output.split():
                if line.startswith(nic):
                    "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
                        nic, output))

    def test_users(self):
        out = sh("who")
        lines = out.split('\n')
        users = [x.split()[0] for x in lines]
        self.assertEqual(len(users), len(psutil.users()))
        terminals = [x.split()[1] for x in lines]
        for u in psutil.users():
            self.assertTrue( in users,
            self.assertTrue(u.terminal in terminals, u.terminal)

    def test_fds_open(self):
        # Note: this fails from time to time; I'm keen on thinking
        # it doesn't mean something is broken
        def call(p, attr):
            args = ()
            attr = getattr(p, name, None)
            if attr is not None and callable(attr):
                if name == 'rlimit':
                    args = (psutil.RLIMIT_NOFILE,)

        p = psutil.Process(os.getpid())
        failures = []
        ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
                         'send_signal', 'wait', 'children', 'as_dict']
        if LINUX and get_kernel_version() < (2, 6, 36):
        if LINUX and get_kernel_version() < (2, 6, 23):
        for name in dir(psutil.Process):
            if (name.startswith('_') or name in ignored_names):
                    num1 = p.num_fds()
                    for x in range(2):
                        call(p, name)
                    num2 = p.num_fds()
                except psutil.AccessDenied:
                    if abs(num2 - num1) > 1:
                        fail = "failure while processing Process.%s method " \
                               "(before=%s, after=%s)" % (name, num1, num2)
        if failures:
  '\n' + '\n'.join(failures))

def main():
    test_suite = unittest.TestSuite()
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
    return result.wasSuccessful()

if __name__ == '__main__':
    if not main():