test_bsd.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. # TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd.
  6. """Tests specific to all BSD platforms."""
  7. import datetime
  8. import os
  9. import re
  10. import time
  11. import unittest
  12. import psutil
  13. from psutil import BSD
  14. from psutil import FREEBSD
  15. from psutil import NETBSD
  16. from psutil import OPENBSD
  17. from psutil.tests import HAS_BATTERY
  18. from psutil.tests import TOLERANCE_SYS_MEM
  19. from psutil.tests import PsutilTestCase
  20. from psutil.tests import retry_on_failure
  21. from psutil.tests import sh
  22. from psutil.tests import spawn_testproc
  23. from psutil.tests import terminate
  24. from psutil.tests import which
  25. if BSD:
  26. from psutil._psutil_posix import getpagesize
  27. PAGESIZE = getpagesize()
  28. # muse requires root privileges
  29. MUSE_AVAILABLE = os.getuid() == 0 and which('muse')
  30. else:
  31. PAGESIZE = None
  32. MUSE_AVAILABLE = False
  33. def sysctl(cmdline):
  34. """Expects a sysctl command with an argument and parse the result
  35. returning only the value of interest.
  36. """
  37. result = sh("sysctl " + cmdline)
  38. if FREEBSD:
  39. result = result[result.find(": ") + 2:]
  40. elif OPENBSD or NETBSD:
  41. result = result[result.find("=") + 1:]
  42. try:
  43. return int(result)
  44. except ValueError:
  45. return result
  46. def muse(field):
  47. """Thin wrapper around 'muse' cmdline utility."""
  48. out = sh('muse')
  49. for line in out.split('\n'):
  50. if line.startswith(field):
  51. break
  52. else:
  53. raise ValueError("line not found")
  54. return int(line.split()[1])
  55. # =====================================================================
  56. # --- All BSD*
  57. # =====================================================================
  58. @unittest.skipIf(not BSD, "BSD only")
  59. class BSDTestCase(PsutilTestCase):
  60. """Generic tests common to all BSD variants."""
  61. @classmethod
  62. def setUpClass(cls):
  63. cls.pid = spawn_testproc().pid
  64. @classmethod
  65. def tearDownClass(cls):
  66. terminate(cls.pid)
  67. @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD")
  68. def test_process_create_time(self):
  69. output = sh("ps -o lstart -p %s" % self.pid)
  70. start_ps = output.replace('STARTED', '').strip()
  71. start_psutil = psutil.Process(self.pid).create_time()
  72. start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
  73. time.localtime(start_psutil))
  74. self.assertEqual(start_ps, start_psutil)
  75. def test_disks(self):
  76. # test psutil.disk_usage() and psutil.disk_partitions()
  77. # against "df -a"
  78. def df(path):
  79. out = sh('df -k "%s"' % path).strip()
  80. lines = out.split('\n')
  81. lines.pop(0)
  82. line = lines.pop(0)
  83. dev, total, used, free = line.split()[:4]
  84. if dev == 'none':
  85. dev = ''
  86. total = int(total) * 1024
  87. used = int(used) * 1024
  88. free = int(free) * 1024
  89. return dev, total, used, free
  90. for part in psutil.disk_partitions(all=False):
  91. usage = psutil.disk_usage(part.mountpoint)
  92. dev, total, used, free = df(part.mountpoint)
  93. self.assertEqual(part.device, dev)
  94. self.assertEqual(usage.total, total)
  95. # 10 MB tolerance
  96. if abs(usage.free - free) > 10 * 1024 * 1024:
  97. raise self.fail("psutil=%s, df=%s" % (usage.free, free))
  98. if abs(usage.used - used) > 10 * 1024 * 1024:
  99. raise self.fail("psutil=%s, df=%s" % (usage.used, used))
  100. @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
  101. def test_cpu_count_logical(self):
  102. syst = sysctl("hw.ncpu")
  103. self.assertEqual(psutil.cpu_count(logical=True), syst)
  104. @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
  105. @unittest.skipIf(NETBSD, "skipped on NETBSD") # we check /proc/meminfo
  106. def test_virtual_memory_total(self):
  107. num = sysctl('hw.physmem')
  108. self.assertEqual(num, psutil.virtual_memory().total)
  109. @unittest.skipIf(not which('ifconfig'), "ifconfig cmd not available")
  110. def test_net_if_stats(self):
  111. for name, stats in psutil.net_if_stats().items():
  112. try:
  113. out = sh("ifconfig %s" % name)
  114. except RuntimeError:
  115. pass
  116. else:
  117. self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
  118. if "mtu" in out:
  119. self.assertEqual(stats.mtu,
  120. int(re.findall(r'mtu (\d+)', out)[0]))
  121. # =====================================================================
  122. # --- FreeBSD
  123. # =====================================================================
  124. @unittest.skipIf(not FREEBSD, "FREEBSD only")
  125. class FreeBSDPsutilTestCase(PsutilTestCase):
  126. @classmethod
  127. def setUpClass(cls):
  128. cls.pid = spawn_testproc().pid
  129. @classmethod
  130. def tearDownClass(cls):
  131. terminate(cls.pid)
  132. @retry_on_failure()
  133. def test_memory_maps(self):
  134. out = sh('procstat -v %s' % self.pid)
  135. maps = psutil.Process(self.pid).memory_maps(grouped=False)
  136. lines = out.split('\n')[1:]
  137. while lines:
  138. line = lines.pop()
  139. fields = line.split()
  140. _, start, stop, perms, res = fields[:5]
  141. map = maps.pop()
  142. self.assertEqual("%s-%s" % (start, stop), map.addr)
  143. self.assertEqual(int(res), map.rss)
  144. if not map.path.startswith('['):
  145. self.assertEqual(fields[10], map.path)
  146. def test_exe(self):
  147. out = sh('procstat -b %s' % self.pid)
  148. self.assertEqual(psutil.Process(self.pid).exe(),
  149. out.split('\n')[1].split()[-1])
  150. def test_cmdline(self):
  151. out = sh('procstat -c %s' % self.pid)
  152. self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
  153. ' '.join(out.split('\n')[1].split()[2:]))
  154. def test_uids_gids(self):
  155. out = sh('procstat -s %s' % self.pid)
  156. euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
  157. p = psutil.Process(self.pid)
  158. uids = p.uids()
  159. gids = p.gids()
  160. self.assertEqual(uids.real, int(ruid))
  161. self.assertEqual(uids.effective, int(euid))
  162. self.assertEqual(uids.saved, int(suid))
  163. self.assertEqual(gids.real, int(rgid))
  164. self.assertEqual(gids.effective, int(egid))
  165. self.assertEqual(gids.saved, int(sgid))
  166. @retry_on_failure()
  167. def test_ctx_switches(self):
  168. tested = []
  169. out = sh('procstat -r %s' % self.pid)
  170. p = psutil.Process(self.pid)
  171. for line in out.split('\n'):
  172. line = line.lower().strip()
  173. if ' voluntary context' in line:
  174. pstat_value = int(line.split()[-1])
  175. psutil_value = p.num_ctx_switches().voluntary
  176. self.assertEqual(pstat_value, psutil_value)
  177. tested.append(None)
  178. elif ' involuntary context' in line:
  179. pstat_value = int(line.split()[-1])
  180. psutil_value = p.num_ctx_switches().involuntary
  181. self.assertEqual(pstat_value, psutil_value)
  182. tested.append(None)
  183. if len(tested) != 2:
  184. raise RuntimeError("couldn't find lines match in procstat out")
  185. @retry_on_failure()
  186. def test_cpu_times(self):
  187. tested = []
  188. out = sh('procstat -r %s' % self.pid)
  189. p = psutil.Process(self.pid)
  190. for line in out.split('\n'):
  191. line = line.lower().strip()
  192. if 'user time' in line:
  193. pstat_value = float('0.' + line.split()[-1].split('.')[-1])
  194. psutil_value = p.cpu_times().user
  195. self.assertEqual(pstat_value, psutil_value)
  196. tested.append(None)
  197. elif 'system time' in line:
  198. pstat_value = float('0.' + line.split()[-1].split('.')[-1])
  199. psutil_value = p.cpu_times().system
  200. self.assertEqual(pstat_value, psutil_value)
  201. tested.append(None)
  202. if len(tested) != 2:
  203. raise RuntimeError("couldn't find lines match in procstat out")
  204. @unittest.skipIf(not FREEBSD, "FREEBSD only")
  205. class FreeBSDSystemTestCase(PsutilTestCase):
  206. @staticmethod
  207. def parse_swapinfo():
  208. # the last line is always the total
  209. output = sh("swapinfo -k").splitlines()[-1]
  210. parts = re.split(r'\s+', output)
  211. if not parts:
  212. raise ValueError("Can't parse swapinfo: %s" % output)
  213. # the size is in 1k units, so multiply by 1024
  214. total, used, free = (int(p) * 1024 for p in parts[1:4])
  215. return total, used, free
  216. def test_cpu_frequency_against_sysctl(self):
  217. # Currently only cpu 0 is frequency is supported in FreeBSD
  218. # All other cores use the same frequency.
  219. sensor = "dev.cpu.0.freq"
  220. try:
  221. sysctl_result = int(sysctl(sensor))
  222. except RuntimeError:
  223. self.skipTest("frequencies not supported by kernel")
  224. self.assertEqual(psutil.cpu_freq().current, sysctl_result)
  225. sensor = "dev.cpu.0.freq_levels"
  226. sysctl_result = sysctl(sensor)
  227. # sysctl returns a string of the format:
  228. # <freq_level_1>/<voltage_level_1> <freq_level_2>/<voltage_level_2>...
  229. # Ordered highest available to lowest available.
  230. max_freq = int(sysctl_result.split()[0].split("/")[0])
  231. min_freq = int(sysctl_result.split()[-1].split("/")[0])
  232. self.assertEqual(psutil.cpu_freq().max, max_freq)
  233. self.assertEqual(psutil.cpu_freq().min, min_freq)
  234. # --- virtual_memory(); tests against sysctl
  235. @retry_on_failure()
  236. def test_vmem_active(self):
  237. syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
  238. self.assertAlmostEqual(psutil.virtual_memory().active, syst,
  239. delta=TOLERANCE_SYS_MEM)
  240. @retry_on_failure()
  241. def test_vmem_inactive(self):
  242. syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
  243. self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
  244. delta=TOLERANCE_SYS_MEM)
  245. @retry_on_failure()
  246. def test_vmem_wired(self):
  247. syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
  248. self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
  249. delta=TOLERANCE_SYS_MEM)
  250. @retry_on_failure()
  251. def test_vmem_cached(self):
  252. syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
  253. self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
  254. delta=TOLERANCE_SYS_MEM)
  255. @retry_on_failure()
  256. def test_vmem_free(self):
  257. syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
  258. self.assertAlmostEqual(psutil.virtual_memory().free, syst,
  259. delta=TOLERANCE_SYS_MEM)
  260. @retry_on_failure()
  261. def test_vmem_buffers(self):
  262. syst = sysctl("vfs.bufspace")
  263. self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
  264. delta=TOLERANCE_SYS_MEM)
  265. # --- virtual_memory(); tests against muse
  266. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  267. def test_muse_vmem_total(self):
  268. num = muse('Total')
  269. self.assertEqual(psutil.virtual_memory().total, num)
  270. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  271. @retry_on_failure()
  272. def test_muse_vmem_active(self):
  273. num = muse('Active')
  274. self.assertAlmostEqual(psutil.virtual_memory().active, num,
  275. delta=TOLERANCE_SYS_MEM)
  276. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  277. @retry_on_failure()
  278. def test_muse_vmem_inactive(self):
  279. num = muse('Inactive')
  280. self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
  281. delta=TOLERANCE_SYS_MEM)
  282. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  283. @retry_on_failure()
  284. def test_muse_vmem_wired(self):
  285. num = muse('Wired')
  286. self.assertAlmostEqual(psutil.virtual_memory().wired, num,
  287. delta=TOLERANCE_SYS_MEM)
  288. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  289. @retry_on_failure()
  290. def test_muse_vmem_cached(self):
  291. num = muse('Cache')
  292. self.assertAlmostEqual(psutil.virtual_memory().cached, num,
  293. delta=TOLERANCE_SYS_MEM)
  294. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  295. @retry_on_failure()
  296. def test_muse_vmem_free(self):
  297. num = muse('Free')
  298. self.assertAlmostEqual(psutil.virtual_memory().free, num,
  299. delta=TOLERANCE_SYS_MEM)
  300. @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
  301. @retry_on_failure()
  302. def test_muse_vmem_buffers(self):
  303. num = muse('Buffer')
  304. self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
  305. delta=TOLERANCE_SYS_MEM)
  306. def test_cpu_stats_ctx_switches(self):
  307. self.assertAlmostEqual(psutil.cpu_stats().ctx_switches,
  308. sysctl('vm.stats.sys.v_swtch'), delta=1000)
  309. def test_cpu_stats_interrupts(self):
  310. self.assertAlmostEqual(psutil.cpu_stats().interrupts,
  311. sysctl('vm.stats.sys.v_intr'), delta=1000)
  312. def test_cpu_stats_soft_interrupts(self):
  313. self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts,
  314. sysctl('vm.stats.sys.v_soft'), delta=1000)
  315. @retry_on_failure()
  316. def test_cpu_stats_syscalls(self):
  317. # pretty high tolerance but it looks like it's OK.
  318. self.assertAlmostEqual(psutil.cpu_stats().syscalls,
  319. sysctl('vm.stats.sys.v_syscall'), delta=200000)
  320. # def test_cpu_stats_traps(self):
  321. # self.assertAlmostEqual(psutil.cpu_stats().traps,
  322. # sysctl('vm.stats.sys.v_trap'), delta=1000)
  323. # --- swap memory
  324. def test_swapmem_free(self):
  325. total, used, free = self.parse_swapinfo()
  326. self.assertAlmostEqual(
  327. psutil.swap_memory().free, free, delta=TOLERANCE_SYS_MEM)
  328. def test_swapmem_used(self):
  329. total, used, free = self.parse_swapinfo()
  330. self.assertAlmostEqual(
  331. psutil.swap_memory().used, used, delta=TOLERANCE_SYS_MEM)
  332. def test_swapmem_total(self):
  333. total, used, free = self.parse_swapinfo()
  334. self.assertAlmostEqual(
  335. psutil.swap_memory().total, total, delta=TOLERANCE_SYS_MEM)
  336. # --- others
  337. def test_boot_time(self):
  338. s = sysctl('sysctl kern.boottime')
  339. s = s[s.find(" sec = ") + 7:]
  340. s = s[:s.find(',')]
  341. btime = int(s)
  342. self.assertEqual(btime, psutil.boot_time())
  343. # --- sensors_battery
  344. @unittest.skipIf(not HAS_BATTERY, "no battery")
  345. def test_sensors_battery(self):
  346. def secs2hours(secs):
  347. m, s = divmod(secs, 60)
  348. h, m = divmod(m, 60)
  349. return "%d:%02d" % (h, m)
  350. out = sh("acpiconf -i 0")
  351. fields = dict([(x.split('\t')[0], x.split('\t')[-1])
  352. for x in out.split("\n")])
  353. metrics = psutil.sensors_battery()
  354. percent = int(fields['Remaining capacity:'].replace('%', ''))
  355. remaining_time = fields['Remaining time:']
  356. self.assertEqual(metrics.percent, percent)
  357. if remaining_time == 'unknown':
  358. self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED)
  359. else:
  360. self.assertEqual(secs2hours(metrics.secsleft), remaining_time)
  361. @unittest.skipIf(not HAS_BATTERY, "no battery")
  362. def test_sensors_battery_against_sysctl(self):
  363. self.assertEqual(psutil.sensors_battery().percent,
  364. sysctl("hw.acpi.battery.life"))
  365. self.assertEqual(psutil.sensors_battery().power_plugged,
  366. sysctl("hw.acpi.acline") == 1)
  367. secsleft = psutil.sensors_battery().secsleft
  368. if secsleft < 0:
  369. self.assertEqual(sysctl("hw.acpi.battery.time"), -1)
  370. else:
  371. self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60)
  372. @unittest.skipIf(HAS_BATTERY, "has battery")
  373. def test_sensors_battery_no_battery(self):
  374. # If no battery is present one of these calls is supposed
  375. # to fail, see:
  376. # https://github.com/giampaolo/psutil/issues/1074
  377. with self.assertRaises(RuntimeError):
  378. sysctl("hw.acpi.battery.life")
  379. sysctl("hw.acpi.battery.time")
  380. sysctl("hw.acpi.acline")
  381. self.assertIsNone(psutil.sensors_battery())
  382. # --- sensors_temperatures
  383. def test_sensors_temperatures_against_sysctl(self):
  384. num_cpus = psutil.cpu_count(True)
  385. for cpu in range(num_cpus):
  386. sensor = "dev.cpu.%s.temperature" % cpu
  387. # sysctl returns a string in the format 46.0C
  388. try:
  389. sysctl_result = int(float(sysctl(sensor)[:-1]))
  390. except RuntimeError:
  391. self.skipTest("temperatures not supported by kernel")
  392. self.assertAlmostEqual(
  393. psutil.sensors_temperatures()["coretemp"][cpu].current,
  394. sysctl_result, delta=10)
  395. sensor = "dev.cpu.%s.coretemp.tjmax" % cpu
  396. sysctl_result = int(float(sysctl(sensor)[:-1]))
  397. self.assertEqual(
  398. psutil.sensors_temperatures()["coretemp"][cpu].high,
  399. sysctl_result)
  400. # =====================================================================
  401. # --- OpenBSD
  402. # =====================================================================
  403. @unittest.skipIf(not OPENBSD, "OPENBSD only")
  404. class OpenBSDTestCase(PsutilTestCase):
  405. def test_boot_time(self):
  406. s = sysctl('kern.boottime')
  407. sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
  408. psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time())
  409. self.assertEqual(sys_bt, psutil_bt)
  410. # =====================================================================
  411. # --- NetBSD
  412. # =====================================================================
  413. @unittest.skipIf(not NETBSD, "NETBSD only")
  414. class NetBSDTestCase(PsutilTestCase):
  415. @staticmethod
  416. def parse_meminfo(look_for):
  417. with open('/proc/meminfo') as f:
  418. for line in f:
  419. if line.startswith(look_for):
  420. return int(line.split()[1]) * 1024
  421. raise ValueError("can't find %s" % look_for)
  422. # --- virtual mem
  423. def test_vmem_total(self):
  424. self.assertEqual(
  425. psutil.virtual_memory().total, self.parse_meminfo("MemTotal:"))
  426. def test_vmem_free(self):
  427. self.assertAlmostEqual(
  428. psutil.virtual_memory().free, self.parse_meminfo("MemFree:"),
  429. delta=TOLERANCE_SYS_MEM)
  430. def test_vmem_buffers(self):
  431. self.assertAlmostEqual(
  432. psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"),
  433. delta=TOLERANCE_SYS_MEM)
  434. def test_vmem_shared(self):
  435. self.assertAlmostEqual(
  436. psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"),
  437. delta=TOLERANCE_SYS_MEM)
  438. def test_vmem_cached(self):
  439. self.assertAlmostEqual(
  440. psutil.virtual_memory().cached, self.parse_meminfo("Cached:"),
  441. delta=TOLERANCE_SYS_MEM)
  442. # --- swap mem
  443. def test_swapmem_total(self):
  444. self.assertAlmostEqual(
  445. psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"),
  446. delta=TOLERANCE_SYS_MEM)
  447. def test_swapmem_free(self):
  448. self.assertAlmostEqual(
  449. psutil.swap_memory().free, self.parse_meminfo("SwapFree:"),
  450. delta=TOLERANCE_SYS_MEM)
  451. def test_swapmem_used(self):
  452. smem = psutil.swap_memory()
  453. self.assertEqual(smem.used, smem.total - smem.free)
  454. # --- others
  455. def test_cpu_stats_interrupts(self):
  456. with open('/proc/stat', 'rb') as f:
  457. for line in f:
  458. if line.startswith(b'intr'):
  459. interrupts = int(line.split()[1])
  460. break
  461. else:
  462. raise ValueError("couldn't find line")
  463. self.assertAlmostEqual(
  464. psutil.cpu_stats().interrupts, interrupts, delta=1000)
  465. def test_cpu_stats_ctx_switches(self):
  466. with open('/proc/stat', 'rb') as f:
  467. for line in f:
  468. if line.startswith(b'ctxt'):
  469. ctx_switches = int(line.split()[1])
  470. break
  471. else:
  472. raise ValueError("couldn't find line")
  473. self.assertAlmostEqual(
  474. psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000)
  475. if __name__ == '__main__':
  476. from psutil.tests.runner import run_from_name
  477. run_from_name(__file__)