123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """POSIX specific tests."""
- import datetime
- import errno
- import os
- import re
- import subprocess
- import time
- import unittest
- import psutil
- from psutil import AIX
- from psutil import BSD
- from psutil import LINUX
- from psutil import MACOS
- from psutil import OPENBSD
- from psutil import POSIX
- from psutil import SUNOS
- from psutil.tests import HAS_NET_IO_COUNTERS
- from psutil.tests import PYTHON_EXE
- from psutil.tests import PsutilTestCase
- from psutil.tests import mock
- from psutil.tests import retry_on_failure
- from psutil.tests import sh
- from psutil.tests import skip_on_access_denied
- from psutil.tests import spawn_testproc
- from psutil.tests import terminate
- from psutil.tests import which
- if POSIX:
- import mmap
- import resource
- from psutil._psutil_posix import getpagesize
- def ps(fmt, pid=None):
- """Wrapper for calling the ps command with a little bit of cross-platform
- support for a narrow range of features.
- """
- cmd = ['ps']
- if LINUX:
- cmd.append('--no-headers')
- if pid is not None:
- cmd.extend(['-p', str(pid)])
- else:
- if SUNOS or AIX:
- cmd.append('-A')
- else:
- cmd.append('ax')
- if SUNOS:
- fmt = fmt.replace("start", "stime")
- cmd.extend(['-o', fmt])
- output = sh(cmd)
- output = output.splitlines() if LINUX else output.splitlines()[1:]
- all_output = []
- for line in output:
- line = line.strip()
- try:
- line = int(line)
- except ValueError:
- pass
- all_output.append(line)
- if pid is None:
- return all_output
- else:
- return all_output[0]
- # ps "-o" field names differ wildly between platforms.
- # "comm" means "only executable name" but is not available on BSD platforms.
- # "args" means "command with all its arguments", and is also not available
- # on BSD platforms.
- # "command" is like "args" on most platforms, but like "comm" on AIX,
- # and not available on SUNOS.
- # so for the executable name we can use "comm" on Solaris and split "command"
- # on other platforms.
- # to get the cmdline (with args) we have to use "args" on AIX and
- # Solaris, and can use "command" on all others.
- def ps_name(pid):
- field = "command"
- if SUNOS:
- field = "comm"
- return ps(field, pid).split()[0]
- def ps_args(pid):
- field = "command"
- if AIX or SUNOS:
- field = "args"
- out = ps(field, pid)
- # observed on BSD + Github CI: '/usr/local/bin/python3 -E -O (python3.9)'
- out = re.sub(r"\(python.*?\)$", "", out)
- return out.strip()
- def ps_rss(pid):
- field = "rss"
- if AIX:
- field = "rssize"
- return ps(field, pid)
- def ps_vsz(pid):
- field = "vsz"
- if AIX:
- field = "vsize"
- return ps(field, pid)
- @unittest.skipIf(not POSIX, "POSIX only")
- class TestProcess(PsutilTestCase):
- """Compare psutil results against 'ps' command line utility (mainly)."""
- @classmethod
- def setUpClass(cls):
- cls.pid = spawn_testproc([PYTHON_EXE, "-E", "-O"],
- stdin=subprocess.PIPE).pid
- @classmethod
- def tearDownClass(cls):
- terminate(cls.pid)
- def test_ppid(self):
- ppid_ps = ps('ppid', self.pid)
- ppid_psutil = psutil.Process(self.pid).ppid()
- self.assertEqual(ppid_ps, ppid_psutil)
- def test_uid(self):
- uid_ps = ps('uid', self.pid)
- uid_psutil = psutil.Process(self.pid).uids().real
- self.assertEqual(uid_ps, uid_psutil)
- def test_gid(self):
- gid_ps = ps('rgid', self.pid)
- gid_psutil = psutil.Process(self.pid).gids().real
- self.assertEqual(gid_ps, gid_psutil)
- def test_username(self):
- username_ps = ps('user', self.pid)
- username_psutil = psutil.Process(self.pid).username()
- self.assertEqual(username_ps, username_psutil)
- def test_username_no_resolution(self):
- # Emulate a case where the system can't resolve the uid to
- # a username in which case psutil is supposed to return
- # the stringified uid.
- p = psutil.Process()
- with mock.patch("psutil.pwd.getpwuid", side_effect=KeyError) as fun:
- self.assertEqual(p.username(), str(p.uids().real))
- assert fun.called
- @skip_on_access_denied()
- @retry_on_failure()
- def test_rss_memory(self):
- # give python interpreter some time to properly initialize
- # so that the results are the same
- time.sleep(0.1)
- rss_ps = ps_rss(self.pid)
- rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
- self.assertEqual(rss_ps, rss_psutil)
- @skip_on_access_denied()
- @retry_on_failure()
- def test_vsz_memory(self):
- # give python interpreter some time to properly initialize
- # so that the results are the same
- time.sleep(0.1)
- vsz_ps = ps_vsz(self.pid)
- vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
- self.assertEqual(vsz_ps, vsz_psutil)
- def test_name(self):
- name_ps = ps_name(self.pid)
- # remove path if there is any, from the command
- name_ps = os.path.basename(name_ps).lower()
- name_psutil = psutil.Process(self.pid).name().lower()
- # ...because of how we calculate PYTHON_EXE; on MACOS this may
- # be "pythonX.Y".
- name_ps = re.sub(r"\d.\d", "", name_ps)
- name_psutil = re.sub(r"\d.\d", "", name_psutil)
- # ...may also be "python.X"
- name_ps = re.sub(r"\d", "", name_ps)
- name_psutil = re.sub(r"\d", "", name_psutil)
- self.assertEqual(name_ps, name_psutil)
- def test_name_long(self):
- # On UNIX the kernel truncates the name to the first 15
- # characters. In such a case psutil tries to determine the
- # full name from the cmdline.
- name = "long-program-name"
- cmdline = ["long-program-name-extended", "foo", "bar"]
- with mock.patch("psutil._psplatform.Process.name",
- return_value=name):
- with mock.patch("psutil._psplatform.Process.cmdline",
- return_value=cmdline):
- p = psutil.Process()
- self.assertEqual(p.name(), "long-program-name-extended")
- def test_name_long_cmdline_ad_exc(self):
- # Same as above but emulates a case where cmdline() raises
- # AccessDenied in which case psutil is supposed to return
- # the truncated name instead of crashing.
- name = "long-program-name"
- with mock.patch("psutil._psplatform.Process.name",
- return_value=name):
- with mock.patch("psutil._psplatform.Process.cmdline",
- side_effect=psutil.AccessDenied(0, "")):
- p = psutil.Process()
- self.assertEqual(p.name(), "long-program-name")
- def test_name_long_cmdline_nsp_exc(self):
- # Same as above but emulates a case where cmdline() raises NSP
- # which is supposed to propagate.
- name = "long-program-name"
- with mock.patch("psutil._psplatform.Process.name",
- return_value=name):
- with mock.patch("psutil._psplatform.Process.cmdline",
- side_effect=psutil.NoSuchProcess(0, "")):
- p = psutil.Process()
- self.assertRaises(psutil.NoSuchProcess, p.name)
- @unittest.skipIf(MACOS or BSD, 'ps -o start not available')
- def test_create_time(self):
- time_ps = ps('start', self.pid)
- time_psutil = psutil.Process(self.pid).create_time()
- time_psutil_tstamp = datetime.datetime.fromtimestamp(
- time_psutil).strftime("%H:%M:%S")
- # sometimes ps shows the time rounded up instead of down, so we check
- # for both possible values
- round_time_psutil = round(time_psutil)
- round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
- round_time_psutil).strftime("%H:%M:%S")
- self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
- def test_exe(self):
- ps_pathname = ps_name(self.pid)
- psutil_pathname = psutil.Process(self.pid).exe()
- try:
- self.assertEqual(ps_pathname, psutil_pathname)
- except AssertionError:
- # certain platforms such as BSD are more accurate returning:
- # "/usr/local/bin/python2.7"
- # ...instead of:
- # "/usr/local/bin/python"
- # We do not want to consider this difference in accuracy
- # an error.
- adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
- self.assertEqual(ps_pathname, adjusted_ps_pathname)
- # On macOS the official python installer exposes a python wrapper that
- # executes a python executable hidden inside an application bundle inside
- # the Python framework.
- # There's a race condition between the ps call & the psutil call below
- # depending on the completion of the execve call so let's retry on failure
- @retry_on_failure()
- def test_cmdline(self):
- ps_cmdline = ps_args(self.pid)
- psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
- self.assertEqual(ps_cmdline, psutil_cmdline)
- # On SUNOS "ps" reads niceness /proc/pid/psinfo which returns an
- # incorrect value (20); the real deal is getpriority(2) which
- # returns 0; psutil relies on it, see:
- # https://github.com/giampaolo/psutil/issues/1082
- # AIX has the same issue
- @unittest.skipIf(SUNOS, "not reliable on SUNOS")
- @unittest.skipIf(AIX, "not reliable on AIX")
- def test_nice(self):
- ps_nice = ps('nice', self.pid)
- psutil_nice = psutil.Process().nice()
- self.assertEqual(ps_nice, psutil_nice)
- @unittest.skipIf(not POSIX, "POSIX only")
- class TestSystemAPIs(PsutilTestCase):
- """Test some system APIs."""
- @retry_on_failure()
- def test_pids(self):
- # Note: this test might fail if the OS is starting/killing
- # other processes in the meantime
- pids_ps = sorted(ps("pid"))
- pids_psutil = psutil.pids()
- # on MACOS and OPENBSD ps doesn't show pid 0
- if MACOS or OPENBSD and 0 not in pids_ps:
- pids_ps.insert(0, 0)
- # There will often be one more process in pids_ps for ps itself
- if len(pids_ps) - len(pids_psutil) > 1:
- difference = [x for x in pids_psutil if x not in pids_ps] + \
- [x for x in pids_ps if x not in pids_psutil]
- raise self.fail("difference: " + str(difference))
- # for some reason ifconfig -a does not report all interfaces
- # returned by psutil
- @unittest.skipIf(SUNOS, "unreliable on SUNOS")
- @unittest.skipIf(not which('ifconfig'), "no ifconfig cmd")
- @unittest.skipIf(not HAS_NET_IO_COUNTERS, "not supported")
- def test_nic_names(self):
- output = sh("ifconfig -a")
- for nic in psutil.net_io_counters(pernic=True):
- for line in output.split():
- if line.startswith(nic):
- break
- else:
- raise self.fail(
- "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
- nic, output))
- # @unittest.skipIf(CI_TESTING and not psutil.users(), "unreliable on CI")
- @retry_on_failure()
- def test_users(self):
- out = sh("who -u")
- if not out.strip():
- raise self.skipTest("no users on this system")
- lines = out.split('\n')
- users = [x.split()[0] for x in lines]
- terminals = [x.split()[1] for x in lines]
- self.assertEqual(len(users), len(psutil.users()))
- with self.subTest(psutil=psutil.users(), who=out):
- for idx, u in enumerate(psutil.users()):
- self.assertEqual(u.name, users[idx])
- self.assertEqual(u.terminal, terminals[idx])
- if u.pid is not None: # None on OpenBSD
- psutil.Process(u.pid)
- @retry_on_failure()
- def test_users_started(self):
- out = sh("who -u")
- if not out.strip():
- raise self.skipTest("no users on this system")
- tstamp = None
- # '2023-04-11 09:31' (Linux)
- started = re.findall(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d", out)
- if started:
- tstamp = "%Y-%m-%d %H:%M"
- else:
- # 'Apr 10 22:27' (macOS)
- started = re.findall(r"[A-Z][a-z][a-z] \d\d \d\d:\d\d", out)
- if started:
- tstamp = "%b %d %H:%M"
- else:
- # 'Apr 10'
- started = re.findall(r"[A-Z][a-z][a-z] \d\d", out)
- if started:
- tstamp = "%b %d"
- else:
- # 'apr 10' (sunOS)
- started = re.findall(r"[a-z][a-z][a-z] \d\d", out)
- if started:
- tstamp = "%b %d"
- started = [x.capitalize() for x in started]
- if not tstamp:
- raise unittest.SkipTest(
- "cannot interpret tstamp in who output\n%s" % (out))
- with self.subTest(psutil=psutil.users(), who=out):
- for idx, u in enumerate(psutil.users()):
- psutil_value = datetime.datetime.fromtimestamp(
- u.started).strftime(tstamp)
- self.assertEqual(psutil_value, started[idx])
- def test_pid_exists_let_raise(self):
- # According to "man 2 kill" possible error values for kill
- # are (EINVAL, EPERM, ESRCH). Test that any other errno
- # results in an exception.
- with mock.patch("psutil._psposix.os.kill",
- side_effect=OSError(errno.EBADF, "")) as m:
- self.assertRaises(OSError, psutil._psposix.pid_exists, os.getpid())
- assert m.called
- def test_os_waitpid_let_raise(self):
- # os.waitpid() is supposed to catch EINTR and ECHILD only.
- # Test that any other errno results in an exception.
- with mock.patch("psutil._psposix.os.waitpid",
- side_effect=OSError(errno.EBADF, "")) as m:
- self.assertRaises(OSError, psutil._psposix.wait_pid, os.getpid())
- assert m.called
- def test_os_waitpid_eintr(self):
- # os.waitpid() is supposed to "retry" on EINTR.
- with mock.patch("psutil._psposix.os.waitpid",
- side_effect=OSError(errno.EINTR, "")) as m:
- self.assertRaises(
- psutil._psposix.TimeoutExpired,
- psutil._psposix.wait_pid, os.getpid(), timeout=0.01)
- assert m.called
- def test_os_waitpid_bad_ret_status(self):
- # Simulate os.waitpid() returning a bad status.
- with mock.patch("psutil._psposix.os.waitpid",
- return_value=(1, -1)) as m:
- self.assertRaises(ValueError,
- psutil._psposix.wait_pid, os.getpid())
- assert m.called
- # AIX can return '-' in df output instead of numbers, e.g. for /proc
- @unittest.skipIf(AIX, "unreliable on AIX")
- @retry_on_failure()
- def test_disk_usage(self):
- def df(device):
- try:
- out = sh("df -k %s" % device).strip()
- except RuntimeError as err:
- if "device busy" in str(err).lower():
- raise self.skipTest("df returned EBUSY")
- raise
- line = out.split('\n')[1]
- fields = line.split()
- total = int(fields[1]) * 1024
- used = int(fields[2]) * 1024
- free = int(fields[3]) * 1024
- percent = float(fields[4].replace('%', ''))
- return (total, used, free, percent)
- tolerance = 4 * 1024 * 1024 # 4MB
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- try:
- total, used, free, percent = df(part.device)
- except RuntimeError as err:
- # see:
- # https://travis-ci.org/giampaolo/psutil/jobs/138338464
- # https://travis-ci.org/giampaolo/psutil/jobs/138343361
- err = str(err).lower()
- if "no such file or directory" in err or \
- "raw devices not supported" in err or \
- "permission denied" in err:
- continue
- raise
- else:
- self.assertAlmostEqual(usage.total, total, delta=tolerance)
- self.assertAlmostEqual(usage.used, used, delta=tolerance)
- self.assertAlmostEqual(usage.free, free, delta=tolerance)
- self.assertAlmostEqual(usage.percent, percent, delta=1)
- @unittest.skipIf(not POSIX, "POSIX only")
- class TestMisc(PsutilTestCase):
- def test_getpagesize(self):
- pagesize = getpagesize()
- self.assertGreater(pagesize, 0)
- self.assertEqual(pagesize, resource.getpagesize())
- self.assertEqual(pagesize, mmap.PAGESIZE)
- if __name__ == '__main__':
- from psutil.tests.runner import run_from_name
- run_from_name(__file__)
|