python/psutil/test/_windows.py
author Christian Holler <choller@mozilla.com>
Fri, 20 Sep 2013 13:37:53 +0200
changeset 148229 67fa64cde217038d28342a5af024129f9cc2bc28
parent 144129 037e6a08032fd65347687619ec6de9a02fe00f21
child 221195 697eb6db7d96dc21e817cd27a7e46ed4ab00f9bb
permissions -rw-r--r--
Bug 917242 - Symbolize ASan traces in automation. r=ted

#!/usr/bin/env python

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""Windows specific tests.  These are implicitly run by test_psutil.py."""

import os
import unittest
import platform
import signal
import time
import sys
import subprocess
import errno
import traceback

import psutil
import _psutil_mswindows
from psutil._compat import PY3, callable, long
from test_psutil import *

try:
    import wmi
except ImportError:
    err = sys.exc_info()[1]
    register_warning("Couldn't run wmi tests: %s" % str(err))
    wmi = None
try:
    import win32api
    import win32con
except ImportError:
    err = sys.exc_info()[1]
    register_warning("Couldn't run pywin32 tests: %s" % str(err))
    win32api = None


class WindowsSpecificTestCase(unittest.TestCase):

    def setUp(self):
        sproc = get_test_subprocess()
        wait_for_pid(sproc.pid)
        self.pid = sproc.pid

    def tearDown(self):
        reap_children()

    def test_issue_24(self):
        p = psutil.Process(0)
        self.assertRaises(psutil.AccessDenied, p.kill)

    def test_special_pid(self):
        p = psutil.Process(4)
        self.assertEqual(p.name, 'System')
        # use __str__ to access all common Process properties to check
        # that nothing strange happens
        str(p)
        p.username
        self.assertTrue(p.create_time >= 0.0)
        try:
            rss, vms = p.get_memory_info()
        except psutil.AccessDenied:
            # expected on Windows Vista and Windows 7
            if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
                raise
        else:
            self.assertTrue(rss > 0)

    def test_signal(self):
        p = psutil.Process(self.pid)
        self.assertRaises(ValueError, p.send_signal, signal.SIGINT)

    def test_nic_names(self):
        p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
        out = p.communicate()[0]
        if PY3:
            out = str(out, sys.stdout.encoding)
        nics = psutil.net_io_counters(pernic=True).keys()
        for nic in nics:
            if "pseudo-interface" in nic.replace(' ', '-').lower():
                continue
            if nic not in out:
                self.fail("%r nic wasn't found in 'ipconfig /all' output" % nic)

    def test_exe(self):
        for p in psutil.process_iter():
            try:
                self.assertEqual(os.path.basename(p.exe), p.name)
            except psutil.Error:
                pass

    if wmi is not None:

        # --- Process class tests

        def test_process_name(self):
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            self.assertEqual(p.name, w.Caption)

        def test_process_exe(self):
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            self.assertEqual(p.exe, w.ExecutablePath)

        def test_process_cmdline(self):
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            self.assertEqual(' '.join(p.cmdline), w.CommandLine.replace('"', ''))

        def test_process_username(self):
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            domain, _, username = w.GetOwner()
            username = "%s\\%s" %(domain, username)
            self.assertEqual(p.username, username)

        def test_process_rss_memory(self):
            time.sleep(0.1)
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            rss = p.get_memory_info().rss
            self.assertEqual(rss, int(w.WorkingSetSize))

        def test_process_vms_memory(self):
            time.sleep(0.1)
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            vms = p.get_memory_info().vms
            # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
            # ...claims that PageFileUsage is represented in Kilo
            # bytes but funnily enough on certain platforms bytes are
            # returned instead.
            wmi_usage = int(w.PageFileUsage)
            if (vms != wmi_usage) and (vms != wmi_usage * 1024):
                self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))

        def test_process_create_time(self):
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(self.pid)
            wmic_create = str(w.CreationDate.split('.')[0])
            psutil_create = time.strftime("%Y%m%d%H%M%S",
                                          time.localtime(p.create_time))
            self.assertEqual(wmic_create, psutil_create)


        # --- psutil namespace functions and constants tests

        @unittest.skipUnless(hasattr(os, 'NUMBER_OF_PROCESSORS'),
                             'NUMBER_OF_PROCESSORS env var is not available')
        def test_NUM_CPUS(self):
            num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
            self.assertEqual(num_cpus, psutil.NUM_CPUS)

        def test_TOTAL_PHYMEM(self):
            w = wmi.WMI().Win32_ComputerSystem()[0]
            self.assertEqual(int(w.TotalPhysicalMemory), psutil.TOTAL_PHYMEM)

        def test__UPTIME(self):
            # _UPTIME constant is not public but it is used internally
            # as value to return for pid 0 creation time.
            # WMI behaves the same.
            w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
            p = psutil.Process(0)
            wmic_create = str(w.CreationDate.split('.')[0])
            psutil_create = time.strftime("%Y%m%d%H%M%S",
                                          time.localtime(p.create_time))
            # XXX - ? no actual test here

        def test_get_pids(self):
            # Note: this test might fail if the OS is starting/killing
            # other processes in the meantime
            w = wmi.WMI().Win32_Process()
            wmi_pids = [x.ProcessId for x in w]
            wmi_pids.sort()
            psutil_pids = psutil.get_pid_list()
            psutil_pids.sort()
            if wmi_pids != psutil_pids:
                difference = filter(lambda x:x not in wmi_pids, psutil_pids) + \
                             filter(lambda x:x not in psutil_pids, wmi_pids)
                self.fail("difference: " + str(difference))

        def test_disks(self):
            ps_parts = psutil.disk_partitions(all=True)
            wmi_parts = wmi.WMI().Win32_LogicalDisk()
            for ps_part in ps_parts:
                for wmi_part in wmi_parts:
                    if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
                        if not ps_part.mountpoint:
                            # this is usually a CD-ROM with no disk inserted
                            break
                        try:
                            usage = psutil.disk_usage(ps_part.mountpoint)
                        except OSError:
                            err = sys.exc_info()[1]
                            if err.errno == errno.ENOENT:
                                # usually this is the floppy
                                break
                            else:
                                raise
                        self.assertEqual(usage.total, int(wmi_part.Size))
                        wmi_free = int(wmi_part.FreeSpace)
                        self.assertEqual(usage.free, wmi_free)
                        # 10 MB tollerance
                        if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
                            self.fail("psutil=%s, wmi=%s" % usage.free, wmi_free)
                        break
                else:
                    self.fail("can't find partition %s" % repr(ps_part))

    if win32api is not None:

        def test_get_num_handles(self):
            p = psutil.Process(os.getpid())
            before = p.get_num_handles()
            handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
                                          win32con.FALSE, os.getpid())
            after = p.get_num_handles()
            self.assertEqual(after, before+1)
            win32api.CloseHandle(handle)
            self.assertEqual(p.get_num_handles(), before)

        def test_get_num_handles_2(self):
            # Note: this fails from time to time; I'm keen on thinking
            # it doesn't mean something is broken
            def call(p, attr):
                attr = getattr(p, name, None)
                if attr is not None and callable(attr):
                    ret = attr()
                else:
                    ret = attr

            p = psutil.Process(self.pid)
            attrs = []
            failures = []
            for name in dir(psutil.Process):
                if name.startswith('_') \
                or name.startswith('set_') \
                or name in ('terminate', 'kill', 'suspend', 'resume', 'nice',
                            'send_signal', 'wait', 'get_children', 'as_dict'):
                    continue
                else:
                    try:
                        call(p, name)
                        num1 = p.get_num_handles()
                        call(p, name)
                        num2 = p.get_num_handles()
                    except (psutil.NoSuchProcess, psutil.AccessDenied):
                        pass
                    else:
                        if num2 > num1:
                            fail = "failure while processing Process.%s method " \
                                   "(before=%s, after=%s)" % (name, num1, num2)
                            failures.append(fail)
            if failures:
                self.fail('\n' + '\n'.join(failures))


import _psutil_mswindows
from psutil._psmswindows import ACCESS_DENIED_SET

def wrap_exceptions(callable):
    def wrapper(self, *args, **kwargs):
        try:
            return callable(self, *args, **kwargs)
        except OSError:
            err = sys.exc_info()[1]
            if err.errno in ACCESS_DENIED_SET:
                raise psutil.AccessDenied(None, None)
            if err.errno == errno.ESRCH:
                raise psutil.NoSuchProcess(None, None)
            raise
    return wrapper

class TestDualProcessImplementation(unittest.TestCase):
    fun_names = [
        # function name                 tolerance
        ('get_process_cpu_times',       0.2),
        ('get_process_create_time',     0.5),
        ('get_process_num_handles',     1),  # 1 because impl #1 opens a handle
        ('get_process_io_counters',     0),
        ('get_process_memory_info',     1024),  # KB
    ]

    def test_compare_values(self):
        # Certain APIs on Windows have 2 internal implementations, one
        # based on documented Windows APIs, another one based
        # NtQuerySystemInformation() which gets called as fallback in
        # case the first fails because of limited permission error.
        # Here we test that the two methods return the exact same value,
        # see:
        # http://code.google.com/p/psutil/issues/detail?id=304
        def assert_ge_0(obj):
            if isinstance(obj, tuple):
                for value in obj:
                    self.assertGreaterEqual(value, 0)
            elif isinstance(obj, (int, long, float)):
                self.assertGreaterEqual(obj, 0)
            else:
                assert 0  # case not handled which needs to be fixed

        def compare_with_tolerance(ret1, ret2, tolerance):
            if ret1 == ret2:
                return
            else:
                if isinstance(ret2, (int, long, float)):
                    diff = abs(ret1 - ret2)
                    self.assertLessEqual(diff, tolerance)
                elif isinstance(ret2, tuple):
                    for a, b in zip(ret1, ret2):
                        diff = abs(a - b)
                        self.assertLessEqual(diff, tolerance)

        failures = []
        for name, tolerance in self.fun_names:
            meth1 = wrap_exceptions(getattr(_psutil_mswindows, name))
            meth2 = wrap_exceptions(getattr(_psutil_mswindows, name + '_2'))
            for p in psutil.process_iter():
                if name == 'get_process_memory_info' and p.pid == os.getpid():
                    continue
                #
                try:
                    ret1 = meth1(p.pid)
                except psutil.NoSuchProcess:
                    continue
                except psutil.AccessDenied:
                    ret1 = None
                #
                try:
                    ret2 = meth2(p.pid)
                except psutil.NoSuchProcess:
                    # this is supposed to fail only in case of zombie process
                    # never for permission error
                    continue

                # compare values
                try:
                    if ret1 is None:
                        assert_ge_0(ret2)
                    else:
                        compare_with_tolerance(ret1, ret2, tolerance)
                        assert_ge_0(ret1)
                        assert_ge_0(ret2)
                except AssertionError:
                    err = sys.exc_info()[1]
                    trace = traceback.format_exc()
                    msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' \
                           % (trace, p.pid, name, ret1, ret2)
                    failures.append(msg)
                    break
        if failures:
            self.fail('\n\n'.join(failures))

    def test_zombies(self):
        # test that NPS is raised by the 2nd implementation in case a
        # process no longer exists
        ZOMBIE_PID = max(psutil.get_pid_list()) + 5000
        for name, _ in self.fun_names:
            meth = wrap_exceptions(getattr(_psutil_mswindows, name))
            self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)


def test_main():
    test_suite = unittest.TestSuite()
    test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
    test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
    return result.wasSuccessful()

if __name__ == '__main__':
    if not test_main():
        sys.exit(1)