python/psutil/test/test_memory_leaks.py
author Alexandre Poirot <poirot.alex@gmail.com>
Wed, 04 Nov 2015 08:56:00 +0100
changeset 307645 f4c516bc60ee0a50885a73c8e9a97b8bb8e58162
parent 286873 2eea7ab5228dde61fdc8706ea8452df10da6dd1d
permissions -rw-r--r--
Bug 1221238 - Fix devtools filter popup inputs width on linux. r=pbrosset

#!/usr/bin/env python

# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.

"""
A test script which attempts to detect memory leaks by calling C
functions many times and compare process memory usage before and
after the calls.  It might produce false positives.
"""

import functools
import gc
import os
import socket
import sys
import threading
import time

import psutil
import psutil._common

from psutil._compat import xrange, callable
from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN,
                         RLIMIT_SUPPORT, TRAVIS)
from test_psutil import (reap_children, supports_ipv6, safe_remove,
                         get_test_subprocess)

if sys.version_info < (2, 7):
    import unittest2 as unittest  # https://pypi.python.org/pypi/unittest2
else:
    import unittest


LOOPS = 1000
TOLERANCE = 4096
SKIP_PYTHON_IMPL = True


def skip_if_linux():
    return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
                           "not worth being tested on LINUX (pure python)")


class Base(unittest.TestCase):
    proc = psutil.Process()

    def execute(self, function, *args, **kwargs):
        def call_many_times():
            for x in xrange(LOOPS - 1):
                self.call(function, *args, **kwargs)
            del x
            gc.collect()
            return self.get_mem()

        self.call(function, *args, **kwargs)
        self.assertEqual(gc.garbage, [])
        self.assertEqual(threading.active_count(), 1)

        # RSS comparison
        # step 1
        rss1 = call_many_times()
        # step 2
        rss2 = call_many_times()

        difference = rss2 - rss1
        if difference > TOLERANCE:
            # This doesn't necessarily mean we have a leak yet.
            # At this point we assume that after having called the
            # function so many times the memory usage is stabilized
            # and if there are no leaks it should not increase any
            # more.
            # Let's keep calling fun for 3 more seconds and fail if
            # we notice any difference.
            stop_at = time.time() + 3
            while True:
                self.call(function, *args, **kwargs)
                if time.time() >= stop_at:
                    break
            del stop_at
            gc.collect()
            rss3 = self.get_mem()
            difference = rss3 - rss2
            if rss3 > rss2:
                self.fail("rss2=%s, rss3=%s, difference=%s"
                          % (rss2, rss3, difference))

    def execute_w_exc(self, exc, function, *args, **kwargs):
        kwargs['_exc'] = exc
        self.execute(function, *args, **kwargs)

    def get_mem(self):
        return psutil.Process().memory_info()[0]

    def call(self, function, *args, **kwargs):
        raise NotImplementedError("must be implemented in subclass")


class TestProcessObjectLeaks(Base):
    """Test leaks of Process class methods and properties"""

    def setUp(self):
        gc.collect()

    def tearDown(self):
        reap_children()

    def call(self, function, *args, **kwargs):
        if callable(function):
            if '_exc' in kwargs:
                exc = kwargs.pop('_exc')
                self.assertRaises(exc, function, *args, **kwargs)
            else:
                try:
                    function(*args, **kwargs)
                except psutil.Error:
                    pass
        else:
            meth = getattr(self.proc, function)
            if '_exc' in kwargs:
                exc = kwargs.pop('_exc')
                self.assertRaises(exc, meth, *args, **kwargs)
            else:
                try:
                    meth(*args, **kwargs)
                except psutil.Error:
                    pass

    @skip_if_linux()
    def test_name(self):
        self.execute('name')

    @skip_if_linux()
    def test_cmdline(self):
        self.execute('cmdline')

    @skip_if_linux()
    def test_exe(self):
        self.execute('exe')

    @skip_if_linux()
    def test_ppid(self):
        self.execute('ppid')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_uids(self):
        self.execute('uids')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_gids(self):
        self.execute('gids')

    @skip_if_linux()
    def test_status(self):
        self.execute('status')

    def test_nice_get(self):
        self.execute('nice')

    def test_nice_set(self):
        niceness = psutil.Process().nice()
        self.execute('nice', niceness)

    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
                         "Linux and Windows Vista only")
    def test_ionice_get(self):
        self.execute('ionice')

    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
                         "Linux and Windows Vista only")
    def test_ionice_set(self):
        if WINDOWS:
            value = psutil.Process().ionice()
            self.execute('ionice', value)
        else:
            from psutil._pslinux import cext
            self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
            fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
            self.execute_w_exc(OSError, fun)

    @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
    @skip_if_linux()
    def test_io_counters(self):
        self.execute('io_counters')

    @unittest.skipUnless(WINDOWS, "not worth being tested on posix")
    def test_username(self):
        self.execute('username')

    @skip_if_linux()
    def test_create_time(self):
        self.execute('create_time')

    @skip_if_linux()
    def test_num_threads(self):
        self.execute('num_threads')

    @unittest.skipUnless(WINDOWS, "Windows only")
    def test_num_handles(self):
        self.execute('num_handles')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_num_fds(self):
        self.execute('num_fds')

    @skip_if_linux()
    def test_threads(self):
        self.execute('threads')

    @skip_if_linux()
    def test_cpu_times(self):
        self.execute('cpu_times')

    @skip_if_linux()
    def test_memory_info(self):
        self.execute('memory_info')

    @skip_if_linux()
    def test_memory_info_ex(self):
        self.execute('memory_info_ex')

    @unittest.skipUnless(POSIX, "POSIX only")
    @skip_if_linux()
    def test_terminal(self):
        self.execute('terminal')

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_resume(self):
        self.execute('resume')

    @skip_if_linux()
    def test_cwd(self):
        self.execute('cwd')

    @unittest.skipUnless(WINDOWS or LINUX or BSD,
                         "Windows or Linux or BSD only")
    def test_cpu_affinity_get(self):
        self.execute('cpu_affinity')

    @unittest.skipUnless(WINDOWS or LINUX or BSD,
                         "Windows or Linux or BSD only")
    def test_cpu_affinity_set(self):
        affinity = psutil.Process().cpu_affinity()
        self.execute('cpu_affinity', affinity)
        if not TRAVIS:
            self.execute_w_exc(ValueError, 'cpu_affinity', [-1])

    @skip_if_linux()
    def test_open_files(self):
        safe_remove(TESTFN)  # needed after UNIX socket test has run
        with open(TESTFN, 'w'):
            self.execute('open_files')

    # OSX implementation is unbelievably slow
    @unittest.skipIf(OSX, "OSX implementation is too slow")
    @skip_if_linux()
    def test_memory_maps(self):
        self.execute('memory_maps')

    @unittest.skipUnless(LINUX, "Linux only")
    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
                         "only available on Linux >= 2.6.36")
    def test_rlimit_get(self):
        self.execute('rlimit', psutil.RLIMIT_NOFILE)

    @unittest.skipUnless(LINUX, "Linux only")
    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
                         "only available on Linux >= 2.6.36")
    def test_rlimit_set(self):
        limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
        self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
        self.execute_w_exc(OSError, 'rlimit', -1)

    @skip_if_linux()
    # Windows implementation is based on a single system-wide function
    @unittest.skipIf(WINDOWS, "tested later")
    def test_connections(self):
        def create_socket(family, type):
            sock = socket.socket(family, type)
            sock.bind(('', 0))
            if type == socket.SOCK_STREAM:
                sock.listen(1)
            return sock

        socks = []
        socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
        socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
        if supports_ipv6():
            socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
            socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
        if hasattr(socket, 'AF_UNIX'):
            safe_remove(TESTFN)
            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            s.bind(TESTFN)
            s.listen(1)
            socks.append(s)
        kind = 'all'
        # TODO: UNIX sockets are temporarily implemented by parsing
        # 'pfiles' cmd  output; we don't want that part of the code to
        # be executed.
        if SUNOS:
            kind = 'inet'
        try:
            self.execute('connections', kind=kind)
        finally:
            for s in socks:
                s.close()


p = get_test_subprocess()
DEAD_PROC = psutil.Process(p.pid)
DEAD_PROC.kill()
DEAD_PROC.wait()
del p


class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
    """Same as above but looks for leaks occurring when dealing with
    zombie processes raising NoSuchProcess exception.
    """
    proc = DEAD_PROC

    def call(self, *args, **kwargs):
        try:
            TestProcessObjectLeaks.call(self, *args, **kwargs)
        except psutil.NoSuchProcess:
            pass

    if not POSIX:
        def test_kill(self):
            self.execute('kill')

        def test_terminate(self):
            self.execute('terminate')

        def test_suspend(self):
            self.execute('suspend')

        def test_resume(self):
            self.execute('resume')

        def test_wait(self):
            self.execute('wait')


class TestModuleFunctionsLeaks(Base):
    """Test leaks of psutil module functions."""

    def setUp(self):
        gc.collect()

    def call(self, function, *args, **kwargs):
        fun = getattr(psutil, function)
        fun(*args, **kwargs)

    @skip_if_linux()
    def test_cpu_count_logical(self):
        psutil.cpu_count = psutil._psplatform.cpu_count_logical
        self.execute('cpu_count')

    @skip_if_linux()
    def test_cpu_count_physical(self):
        psutil.cpu_count = psutil._psplatform.cpu_count_physical
        self.execute('cpu_count')

    @skip_if_linux()
    def test_boot_time(self):
        self.execute('boot_time')

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_pid_exists(self):
        self.execute('pid_exists', os.getpid())

    def test_virtual_memory(self):
        self.execute('virtual_memory')

    # TODO: remove this skip when this gets fixed
    @unittest.skipIf(SUNOS,
                     "not worth being tested on SUNOS (uses a subprocess)")
    def test_swap_memory(self):
        self.execute('swap_memory')

    @skip_if_linux()
    def test_cpu_times(self):
        self.execute('cpu_times')

    @skip_if_linux()
    def test_per_cpu_times(self):
        self.execute('cpu_times', percpu=True)

    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
                     "not worth being tested on POSIX (pure python)")
    def test_disk_usage(self):
        self.execute('disk_usage', '.')

    def test_disk_partitions(self):
        self.execute('disk_partitions')

    @skip_if_linux()
    def test_net_io_counters(self):
        self.execute('net_io_counters')

    @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
                     '/proc/diskstats not available on this Linux version')
    @skip_if_linux()
    def test_disk_io_counters(self):
        self.execute('disk_io_counters')

    # XXX - on Windows this produces a false positive
    @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
    def test_users(self):
        self.execute('users')

    @unittest.skipIf(LINUX,
                     "not worth being tested on Linux (pure python)")
    def test_net_connections(self):
        self.execute('net_connections')

    def test_net_if_addrs(self):
        self.execute('net_if_addrs')

    @unittest.skipIf(TRAVIS, "EPERM on travis")
    def test_net_if_stats(self):
        self.execute('net_if_stats')


def main():
    test_suite = unittest.TestSuite()
    tests = [TestProcessObjectLeaksZombie,
             TestProcessObjectLeaks,
             TestModuleFunctionsLeaks]
    for test in tests:
        test_suite.addTest(unittest.makeSuite(test))
    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
    return result.wasSuccessful()

if __name__ == '__main__':
    if not main():
        sys.exit(1)