test_windows.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897
  1. #!/usr/bin/env python3
  2. # -*- coding: UTF-8 -*
  3. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  4. # Use of this source code is governed by a BSD-style license that can be
  5. # found in the LICENSE file.
  6. """Windows specific tests."""
  7. import datetime
  8. import errno
  9. import glob
  10. import os
  11. import platform
  12. import re
  13. import signal
  14. import subprocess
  15. import sys
  16. import time
  17. import unittest
  18. import warnings
  19. import psutil
  20. from psutil import WINDOWS
  21. from psutil._compat import FileNotFoundError
  22. from psutil._compat import super
  23. from psutil._compat import which
  24. from psutil.tests import APPVEYOR
  25. from psutil.tests import GITHUB_ACTIONS
  26. from psutil.tests import HAS_BATTERY
  27. from psutil.tests import IS_64BIT
  28. from psutil.tests import PY3
  29. from psutil.tests import PYPY
  30. from psutil.tests import TOLERANCE_DISK_USAGE
  31. from psutil.tests import TOLERANCE_SYS_MEM
  32. from psutil.tests import PsutilTestCase
  33. from psutil.tests import mock
  34. from psutil.tests import retry_on_failure
  35. from psutil.tests import sh
  36. from psutil.tests import spawn_testproc
  37. from psutil.tests import terminate
  38. if WINDOWS and not PYPY:
  39. with warnings.catch_warnings():
  40. warnings.simplefilter("ignore")
  41. import win32api # requires "pip install pywin32"
  42. import win32con
  43. import win32process
  44. import wmi # requires "pip install wmi" / "make setup-dev-env"
  45. if WINDOWS:
  46. from psutil._pswindows import convert_oserror
  47. cext = psutil._psplatform.cext
  48. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  49. @unittest.skipIf(PYPY, "pywin32 not available on PYPY")
  50. # https://github.com/giampaolo/psutil/pull/1762#issuecomment-632892692
  51. @unittest.skipIf(GITHUB_ACTIONS and not PY3, "pywin32 broken on GITHUB + PY2")
  52. class WindowsTestCase(PsutilTestCase):
  53. pass
  54. def powershell(cmd):
  55. """Currently not used, but available just in case. Usage:
  56. >>> powershell(
  57. "Get-CIMInstance Win32_PageFileUsage | Select AllocatedBaseSize")
  58. """
  59. if not which("powershell.exe"):
  60. raise unittest.SkipTest("powershell.exe not available")
  61. cmdline = \
  62. 'powershell.exe -ExecutionPolicy Bypass -NoLogo -NonInteractive ' + \
  63. '-NoProfile -WindowStyle Hidden -Command "%s"' % cmd
  64. return sh(cmdline)
  65. def wmic(path, what, converter=int):
  66. """Currently not used, but available just in case. Usage:
  67. >>> wmic("Win32_OperatingSystem", "FreePhysicalMemory")
  68. 2134124534
  69. """
  70. out = sh("wmic path %s get %s" % (path, what)).strip()
  71. data = "".join(out.splitlines()[1:]).strip() # get rid of the header
  72. if converter is not None:
  73. if "," in what:
  74. return tuple([converter(x) for x in data.split()])
  75. else:
  76. return converter(data)
  77. else:
  78. return data
  79. # ===================================================================
  80. # System APIs
  81. # ===================================================================
  82. class TestCpuAPIs(WindowsTestCase):
  83. @unittest.skipIf('NUMBER_OF_PROCESSORS' not in os.environ,
  84. 'NUMBER_OF_PROCESSORS env var is not available')
  85. def test_cpu_count_vs_NUMBER_OF_PROCESSORS(self):
  86. # Will likely fail on many-cores systems:
  87. # https://stackoverflow.com/questions/31209256
  88. num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
  89. self.assertEqual(num_cpus, psutil.cpu_count())
  90. def test_cpu_count_vs_GetSystemInfo(self):
  91. # Will likely fail on many-cores systems:
  92. # https://stackoverflow.com/questions/31209256
  93. sys_value = win32api.GetSystemInfo()[5]
  94. psutil_value = psutil.cpu_count()
  95. self.assertEqual(sys_value, psutil_value)
  96. def test_cpu_count_logical_vs_wmi(self):
  97. w = wmi.WMI()
  98. procs = sum(proc.NumberOfLogicalProcessors
  99. for proc in w.Win32_Processor())
  100. self.assertEqual(psutil.cpu_count(), procs)
  101. def test_cpu_count_cores_vs_wmi(self):
  102. w = wmi.WMI()
  103. cores = sum(proc.NumberOfCores for proc in w.Win32_Processor())
  104. self.assertEqual(psutil.cpu_count(logical=False), cores)
  105. def test_cpu_count_vs_cpu_times(self):
  106. self.assertEqual(psutil.cpu_count(),
  107. len(psutil.cpu_times(percpu=True)))
  108. def test_cpu_freq(self):
  109. w = wmi.WMI()
  110. proc = w.Win32_Processor()[0]
  111. self.assertEqual(proc.CurrentClockSpeed, psutil.cpu_freq().current)
  112. self.assertEqual(proc.MaxClockSpeed, psutil.cpu_freq().max)
  113. class TestSystemAPIs(WindowsTestCase):
  114. def test_nic_names(self):
  115. out = sh('ipconfig /all')
  116. nics = psutil.net_io_counters(pernic=True).keys()
  117. for nic in nics:
  118. if "pseudo-interface" in nic.replace(' ', '-').lower():
  119. continue
  120. if nic not in out:
  121. raise self.fail(
  122. "%r nic wasn't found in 'ipconfig /all' output" % nic)
  123. def test_total_phymem(self):
  124. w = wmi.WMI().Win32_ComputerSystem()[0]
  125. self.assertEqual(int(w.TotalPhysicalMemory),
  126. psutil.virtual_memory().total)
  127. def test_free_phymem(self):
  128. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  129. self.assertAlmostEqual(
  130. int(w.AvailableBytes), psutil.virtual_memory().free,
  131. delta=TOLERANCE_SYS_MEM)
  132. def test_total_swapmem(self):
  133. w = wmi.WMI().Win32_PerfRawData_PerfOS_Memory()[0]
  134. self.assertEqual(int(w.CommitLimit) - psutil.virtual_memory().total,
  135. psutil.swap_memory().total)
  136. if (psutil.swap_memory().total == 0):
  137. self.assertEqual(0, psutil.swap_memory().free)
  138. self.assertEqual(0, psutil.swap_memory().used)
  139. def test_percent_swapmem(self):
  140. if (psutil.swap_memory().total > 0):
  141. w = wmi.WMI().Win32_PerfRawData_PerfOS_PagingFile(
  142. Name="_Total")[0]
  143. # calculate swap usage to percent
  144. percentSwap = int(w.PercentUsage) * 100 / int(w.PercentUsage_Base)
  145. # exact percent may change but should be reasonable
  146. # assert within +/- 5% and between 0 and 100%
  147. self.assertGreaterEqual(psutil.swap_memory().percent, 0)
  148. self.assertAlmostEqual(psutil.swap_memory().percent, percentSwap,
  149. delta=5)
  150. self.assertLessEqual(psutil.swap_memory().percent, 100)
  151. # @unittest.skipIf(wmi is None, "wmi module is not installed")
  152. # def test__UPTIME(self):
  153. # # _UPTIME constant is not public but it is used internally
  154. # # as value to return for pid 0 creation time.
  155. # # WMI behaves the same.
  156. # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  157. # p = psutil.Process(0)
  158. # wmic_create = str(w.CreationDate.split('.')[0])
  159. # psutil_create = time.strftime("%Y%m%d%H%M%S",
  160. # time.localtime(p.create_time()))
  161. # Note: this test is not very reliable
  162. @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
  163. @retry_on_failure()
  164. def test_pids(self):
  165. # Note: this test might fail if the OS is starting/killing
  166. # other processes in the meantime
  167. w = wmi.WMI().Win32_Process()
  168. wmi_pids = set([x.ProcessId for x in w])
  169. psutil_pids = set(psutil.pids())
  170. self.assertEqual(wmi_pids, psutil_pids)
  171. @retry_on_failure()
  172. def test_disks(self):
  173. ps_parts = psutil.disk_partitions(all=True)
  174. wmi_parts = wmi.WMI().Win32_LogicalDisk()
  175. for ps_part in ps_parts:
  176. for wmi_part in wmi_parts:
  177. if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
  178. if not ps_part.mountpoint:
  179. # this is usually a CD-ROM with no disk inserted
  180. break
  181. if 'cdrom' in ps_part.opts:
  182. break
  183. if ps_part.mountpoint.startswith('A:'):
  184. break # floppy
  185. try:
  186. usage = psutil.disk_usage(ps_part.mountpoint)
  187. except FileNotFoundError:
  188. # usually this is the floppy
  189. break
  190. self.assertEqual(usage.total, int(wmi_part.Size))
  191. wmi_free = int(wmi_part.FreeSpace)
  192. self.assertEqual(usage.free, wmi_free)
  193. # 10 MB tolerance
  194. if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
  195. raise self.fail("psutil=%s, wmi=%s" % (
  196. usage.free, wmi_free))
  197. break
  198. else:
  199. raise self.fail("can't find partition %s" % repr(ps_part))
  200. @retry_on_failure()
  201. def test_disk_usage(self):
  202. for disk in psutil.disk_partitions():
  203. if 'cdrom' in disk.opts:
  204. continue
  205. sys_value = win32api.GetDiskFreeSpaceEx(disk.mountpoint)
  206. psutil_value = psutil.disk_usage(disk.mountpoint)
  207. self.assertAlmostEqual(sys_value[0], psutil_value.free,
  208. delta=TOLERANCE_DISK_USAGE)
  209. self.assertAlmostEqual(sys_value[1], psutil_value.total,
  210. delta=TOLERANCE_DISK_USAGE)
  211. self.assertEqual(psutil_value.used,
  212. psutil_value.total - psutil_value.free)
  213. def test_disk_partitions(self):
  214. sys_value = [
  215. x + '\\' for x in win32api.GetLogicalDriveStrings().split("\\\x00")
  216. if x and not x.startswith('A:')]
  217. psutil_value = [x.mountpoint for x in psutil.disk_partitions(all=True)
  218. if not x.mountpoint.startswith('A:')]
  219. self.assertEqual(sys_value, psutil_value)
  220. def test_net_if_stats(self):
  221. ps_names = set(cext.net_if_stats())
  222. wmi_adapters = wmi.WMI().Win32_NetworkAdapter()
  223. wmi_names = set()
  224. for wmi_adapter in wmi_adapters:
  225. wmi_names.add(wmi_adapter.Name)
  226. wmi_names.add(wmi_adapter.NetConnectionID)
  227. self.assertTrue(ps_names & wmi_names,
  228. "no common entries in %s, %s" % (ps_names, wmi_names))
  229. def test_boot_time(self):
  230. wmi_os = wmi.WMI().Win32_OperatingSystem()
  231. wmi_btime_str = wmi_os[0].LastBootUpTime.split('.')[0]
  232. wmi_btime_dt = datetime.datetime.strptime(
  233. wmi_btime_str, "%Y%m%d%H%M%S")
  234. psutil_dt = datetime.datetime.fromtimestamp(psutil.boot_time())
  235. diff = abs((wmi_btime_dt - psutil_dt).total_seconds())
  236. self.assertLessEqual(diff, 5)
  237. def test_boot_time_fluctuation(self):
  238. # https://github.com/giampaolo/psutil/issues/1007
  239. with mock.patch('psutil._pswindows.cext.boot_time', return_value=5):
  240. self.assertEqual(psutil.boot_time(), 5)
  241. with mock.patch('psutil._pswindows.cext.boot_time', return_value=4):
  242. self.assertEqual(psutil.boot_time(), 5)
  243. with mock.patch('psutil._pswindows.cext.boot_time', return_value=6):
  244. self.assertEqual(psutil.boot_time(), 5)
  245. with mock.patch('psutil._pswindows.cext.boot_time', return_value=333):
  246. self.assertEqual(psutil.boot_time(), 333)
  247. # ===================================================================
  248. # sensors_battery()
  249. # ===================================================================
  250. class TestSensorsBattery(WindowsTestCase):
  251. def test_has_battery(self):
  252. if win32api.GetPwrCapabilities()['SystemBatteriesPresent']:
  253. self.assertIsNotNone(psutil.sensors_battery())
  254. else:
  255. self.assertIsNone(psutil.sensors_battery())
  256. @unittest.skipIf(not HAS_BATTERY, "no battery")
  257. def test_percent(self):
  258. w = wmi.WMI()
  259. battery_wmi = w.query('select * from Win32_Battery')[0]
  260. battery_psutil = psutil.sensors_battery()
  261. self.assertAlmostEqual(
  262. battery_psutil.percent, battery_wmi.EstimatedChargeRemaining,
  263. delta=1)
  264. @unittest.skipIf(not HAS_BATTERY, "no battery")
  265. def test_power_plugged(self):
  266. w = wmi.WMI()
  267. battery_wmi = w.query('select * from Win32_Battery')[0]
  268. battery_psutil = psutil.sensors_battery()
  269. # Status codes:
  270. # https://msdn.microsoft.com/en-us/library/aa394074(v=vs.85).aspx
  271. self.assertEqual(battery_psutil.power_plugged,
  272. battery_wmi.BatteryStatus == 2)
  273. def test_emulate_no_battery(self):
  274. with mock.patch("psutil._pswindows.cext.sensors_battery",
  275. return_value=(0, 128, 0, 0)) as m:
  276. self.assertIsNone(psutil.sensors_battery())
  277. assert m.called
  278. def test_emulate_power_connected(self):
  279. with mock.patch("psutil._pswindows.cext.sensors_battery",
  280. return_value=(1, 0, 0, 0)) as m:
  281. self.assertEqual(psutil.sensors_battery().secsleft,
  282. psutil.POWER_TIME_UNLIMITED)
  283. assert m.called
  284. def test_emulate_power_charging(self):
  285. with mock.patch("psutil._pswindows.cext.sensors_battery",
  286. return_value=(0, 8, 0, 0)) as m:
  287. self.assertEqual(psutil.sensors_battery().secsleft,
  288. psutil.POWER_TIME_UNLIMITED)
  289. assert m.called
  290. def test_emulate_secs_left_unknown(self):
  291. with mock.patch("psutil._pswindows.cext.sensors_battery",
  292. return_value=(0, 0, 0, -1)) as m:
  293. self.assertEqual(psutil.sensors_battery().secsleft,
  294. psutil.POWER_TIME_UNKNOWN)
  295. assert m.called
  296. # ===================================================================
  297. # Process APIs
  298. # ===================================================================
  299. class TestProcess(WindowsTestCase):
  300. @classmethod
  301. def setUpClass(cls):
  302. cls.pid = spawn_testproc().pid
  303. @classmethod
  304. def tearDownClass(cls):
  305. terminate(cls.pid)
  306. def test_issue_24(self):
  307. p = psutil.Process(0)
  308. self.assertRaises(psutil.AccessDenied, p.kill)
  309. def test_special_pid(self):
  310. p = psutil.Process(4)
  311. self.assertEqual(p.name(), 'System')
  312. # use __str__ to access all common Process properties to check
  313. # that nothing strange happens
  314. str(p)
  315. p.username()
  316. self.assertGreaterEqual(p.create_time(), 0.0)
  317. try:
  318. rss, vms = p.memory_info()[:2]
  319. except psutil.AccessDenied:
  320. # expected on Windows Vista and Windows 7
  321. if platform.uname()[1] not in ('vista', 'win-7', 'win7'):
  322. raise
  323. else:
  324. self.assertGreater(rss, 0)
  325. def test_send_signal(self):
  326. p = psutil.Process(self.pid)
  327. self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
  328. def test_num_handles_increment(self):
  329. p = psutil.Process(os.getpid())
  330. before = p.num_handles()
  331. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  332. win32con.FALSE, os.getpid())
  333. after = p.num_handles()
  334. self.assertEqual(after, before + 1)
  335. win32api.CloseHandle(handle)
  336. self.assertEqual(p.num_handles(), before)
  337. def test_ctrl_signals(self):
  338. p = psutil.Process(self.spawn_testproc().pid)
  339. p.send_signal(signal.CTRL_C_EVENT)
  340. p.send_signal(signal.CTRL_BREAK_EVENT)
  341. p.kill()
  342. p.wait()
  343. self.assertRaises(psutil.NoSuchProcess,
  344. p.send_signal, signal.CTRL_C_EVENT)
  345. self.assertRaises(psutil.NoSuchProcess,
  346. p.send_signal, signal.CTRL_BREAK_EVENT)
  347. def test_username(self):
  348. name = win32api.GetUserNameEx(win32con.NameSamCompatible)
  349. if name.endswith('$'):
  350. # When running as a service account (most likely to be
  351. # NetworkService), these user name calculations don't produce the
  352. # same result, causing the test to fail.
  353. raise unittest.SkipTest('running as service account')
  354. self.assertEqual(psutil.Process().username(), name)
  355. def test_cmdline(self):
  356. sys_value = re.sub('[ ]+', ' ', win32api.GetCommandLine()).strip()
  357. psutil_value = ' '.join(psutil.Process().cmdline())
  358. if sys_value[0] == '"' != psutil_value[0]:
  359. # The PyWin32 command line may retain quotes around argv[0] if they
  360. # were used unnecessarily, while psutil will omit them. So remove
  361. # the first 2 quotes from sys_value if not in psutil_value.
  362. # A path to an executable will not contain quotes, so this is safe.
  363. sys_value = sys_value.replace('"', '', 2)
  364. self.assertEqual(sys_value, psutil_value)
  365. # XXX - occasional failures
  366. # def test_cpu_times(self):
  367. # handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  368. # win32con.FALSE, os.getpid())
  369. # self.addCleanup(win32api.CloseHandle, handle)
  370. # sys_value = win32process.GetProcessTimes(handle)
  371. # psutil_value = psutil.Process().cpu_times()
  372. # self.assertAlmostEqual(
  373. # psutil_value.user, sys_value['UserTime'] / 10000000.0,
  374. # delta=0.2)
  375. # self.assertAlmostEqual(
  376. # psutil_value.user, sys_value['KernelTime'] / 10000000.0,
  377. # delta=0.2)
  378. def test_nice(self):
  379. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  380. win32con.FALSE, os.getpid())
  381. self.addCleanup(win32api.CloseHandle, handle)
  382. sys_value = win32process.GetPriorityClass(handle)
  383. psutil_value = psutil.Process().nice()
  384. self.assertEqual(psutil_value, sys_value)
  385. def test_memory_info(self):
  386. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  387. win32con.FALSE, self.pid)
  388. self.addCleanup(win32api.CloseHandle, handle)
  389. sys_value = win32process.GetProcessMemoryInfo(handle)
  390. psutil_value = psutil.Process(self.pid).memory_info()
  391. self.assertEqual(
  392. sys_value['PeakWorkingSetSize'], psutil_value.peak_wset)
  393. self.assertEqual(
  394. sys_value['WorkingSetSize'], psutil_value.wset)
  395. self.assertEqual(
  396. sys_value['QuotaPeakPagedPoolUsage'], psutil_value.peak_paged_pool)
  397. self.assertEqual(
  398. sys_value['QuotaPagedPoolUsage'], psutil_value.paged_pool)
  399. self.assertEqual(
  400. sys_value['QuotaPeakNonPagedPoolUsage'],
  401. psutil_value.peak_nonpaged_pool)
  402. self.assertEqual(
  403. sys_value['QuotaNonPagedPoolUsage'], psutil_value.nonpaged_pool)
  404. self.assertEqual(
  405. sys_value['PagefileUsage'], psutil_value.pagefile)
  406. self.assertEqual(
  407. sys_value['PeakPagefileUsage'], psutil_value.peak_pagefile)
  408. self.assertEqual(psutil_value.rss, psutil_value.wset)
  409. self.assertEqual(psutil_value.vms, psutil_value.pagefile)
  410. def test_wait(self):
  411. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  412. win32con.FALSE, self.pid)
  413. self.addCleanup(win32api.CloseHandle, handle)
  414. p = psutil.Process(self.pid)
  415. p.terminate()
  416. psutil_value = p.wait()
  417. sys_value = win32process.GetExitCodeProcess(handle)
  418. self.assertEqual(psutil_value, sys_value)
  419. def test_cpu_affinity(self):
  420. def from_bitmask(x):
  421. return [i for i in range(64) if (1 << i) & x]
  422. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  423. win32con.FALSE, self.pid)
  424. self.addCleanup(win32api.CloseHandle, handle)
  425. sys_value = from_bitmask(
  426. win32process.GetProcessAffinityMask(handle)[0])
  427. psutil_value = psutil.Process(self.pid).cpu_affinity()
  428. self.assertEqual(psutil_value, sys_value)
  429. def test_io_counters(self):
  430. handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
  431. win32con.FALSE, os.getpid())
  432. self.addCleanup(win32api.CloseHandle, handle)
  433. sys_value = win32process.GetProcessIoCounters(handle)
  434. psutil_value = psutil.Process().io_counters()
  435. self.assertEqual(
  436. psutil_value.read_count, sys_value['ReadOperationCount'])
  437. self.assertEqual(
  438. psutil_value.write_count, sys_value['WriteOperationCount'])
  439. self.assertEqual(
  440. psutil_value.read_bytes, sys_value['ReadTransferCount'])
  441. self.assertEqual(
  442. psutil_value.write_bytes, sys_value['WriteTransferCount'])
  443. self.assertEqual(
  444. psutil_value.other_count, sys_value['OtherOperationCount'])
  445. self.assertEqual(
  446. psutil_value.other_bytes, sys_value['OtherTransferCount'])
  447. def test_num_handles(self):
  448. import ctypes
  449. import ctypes.wintypes
  450. PROCESS_QUERY_INFORMATION = 0x400
  451. handle = ctypes.windll.kernel32.OpenProcess(
  452. PROCESS_QUERY_INFORMATION, 0, self.pid)
  453. self.addCleanup(ctypes.windll.kernel32.CloseHandle, handle)
  454. hndcnt = ctypes.wintypes.DWORD()
  455. ctypes.windll.kernel32.GetProcessHandleCount(
  456. handle, ctypes.byref(hndcnt))
  457. sys_value = hndcnt.value
  458. psutil_value = psutil.Process(self.pid).num_handles()
  459. self.assertEqual(psutil_value, sys_value)
  460. def test_error_partial_copy(self):
  461. # https://github.com/giampaolo/psutil/issues/875
  462. exc = WindowsError()
  463. exc.winerror = 299
  464. with mock.patch("psutil._psplatform.cext.proc_cwd", side_effect=exc):
  465. with mock.patch("time.sleep") as m:
  466. p = psutil.Process()
  467. self.assertRaises(psutil.AccessDenied, p.cwd)
  468. self.assertGreaterEqual(m.call_count, 5)
  469. def test_exe(self):
  470. # NtQuerySystemInformation succeeds if process is gone. Make sure
  471. # it raises NSP for a non existent pid.
  472. pid = psutil.pids()[-1] + 99999
  473. proc = psutil._psplatform.Process(pid)
  474. self.assertRaises(psutil.NoSuchProcess, proc.exe)
  475. class TestProcessWMI(WindowsTestCase):
  476. """Compare Process API results with WMI."""
  477. @classmethod
  478. def setUpClass(cls):
  479. cls.pid = spawn_testproc().pid
  480. @classmethod
  481. def tearDownClass(cls):
  482. terminate(cls.pid)
  483. def test_name(self):
  484. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  485. p = psutil.Process(self.pid)
  486. self.assertEqual(p.name(), w.Caption)
  487. # This fail on github because using virtualenv for test environment
  488. @unittest.skipIf(GITHUB_ACTIONS, "unreliable path on GITHUB_ACTIONS")
  489. def test_exe(self):
  490. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  491. p = psutil.Process(self.pid)
  492. # Note: wmi reports the exe as a lower case string.
  493. # Being Windows paths case-insensitive we ignore that.
  494. self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
  495. def test_cmdline(self):
  496. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  497. p = psutil.Process(self.pid)
  498. self.assertEqual(' '.join(p.cmdline()),
  499. w.CommandLine.replace('"', ''))
  500. def test_username(self):
  501. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  502. p = psutil.Process(self.pid)
  503. domain, _, username = w.GetOwner()
  504. username = "%s\\%s" % (domain, username)
  505. self.assertEqual(p.username(), username)
  506. @retry_on_failure()
  507. def test_memory_rss(self):
  508. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  509. p = psutil.Process(self.pid)
  510. rss = p.memory_info().rss
  511. self.assertEqual(rss, int(w.WorkingSetSize))
  512. @retry_on_failure()
  513. def test_memory_vms(self):
  514. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  515. p = psutil.Process(self.pid)
  516. vms = p.memory_info().vms
  517. # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
  518. # ...claims that PageFileUsage is represented in Kilo
  519. # bytes but funnily enough on certain platforms bytes are
  520. # returned instead.
  521. wmi_usage = int(w.PageFileUsage)
  522. if (vms != wmi_usage) and (vms != wmi_usage * 1024):
  523. raise self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
  524. def test_create_time(self):
  525. w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
  526. p = psutil.Process(self.pid)
  527. wmic_create = str(w.CreationDate.split('.')[0])
  528. psutil_create = time.strftime("%Y%m%d%H%M%S",
  529. time.localtime(p.create_time()))
  530. self.assertEqual(wmic_create, psutil_create)
  531. # ---
  532. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  533. class TestDualProcessImplementation(PsutilTestCase):
  534. """Certain APIs on Windows have 2 internal implementations, one
  535. based on documented Windows APIs, another one based
  536. NtQuerySystemInformation() which gets called as fallback in
  537. case the first fails because of limited permission error.
  538. Here we test that the two methods return the exact same value,
  539. see:
  540. https://github.com/giampaolo/psutil/issues/304.
  541. """
  542. @classmethod
  543. def setUpClass(cls):
  544. cls.pid = spawn_testproc().pid
  545. @classmethod
  546. def tearDownClass(cls):
  547. terminate(cls.pid)
  548. def test_memory_info(self):
  549. mem_1 = psutil.Process(self.pid).memory_info()
  550. with mock.patch("psutil._psplatform.cext.proc_memory_info",
  551. side_effect=OSError(errno.EPERM, "msg")) as fun:
  552. mem_2 = psutil.Process(self.pid).memory_info()
  553. self.assertEqual(len(mem_1), len(mem_2))
  554. for i in range(len(mem_1)):
  555. self.assertGreaterEqual(mem_1[i], 0)
  556. self.assertGreaterEqual(mem_2[i], 0)
  557. self.assertAlmostEqual(mem_1[i], mem_2[i], delta=512)
  558. assert fun.called
  559. def test_create_time(self):
  560. ctime = psutil.Process(self.pid).create_time()
  561. with mock.patch("psutil._psplatform.cext.proc_times",
  562. side_effect=OSError(errno.EPERM, "msg")) as fun:
  563. self.assertEqual(psutil.Process(self.pid).create_time(), ctime)
  564. assert fun.called
  565. def test_cpu_times(self):
  566. cpu_times_1 = psutil.Process(self.pid).cpu_times()
  567. with mock.patch("psutil._psplatform.cext.proc_times",
  568. side_effect=OSError(errno.EPERM, "msg")) as fun:
  569. cpu_times_2 = psutil.Process(self.pid).cpu_times()
  570. assert fun.called
  571. self.assertAlmostEqual(
  572. cpu_times_1.user, cpu_times_2.user, delta=0.01)
  573. self.assertAlmostEqual(
  574. cpu_times_1.system, cpu_times_2.system, delta=0.01)
  575. def test_io_counters(self):
  576. io_counters_1 = psutil.Process(self.pid).io_counters()
  577. with mock.patch("psutil._psplatform.cext.proc_io_counters",
  578. side_effect=OSError(errno.EPERM, "msg")) as fun:
  579. io_counters_2 = psutil.Process(self.pid).io_counters()
  580. for i in range(len(io_counters_1)):
  581. self.assertAlmostEqual(
  582. io_counters_1[i], io_counters_2[i], delta=5)
  583. assert fun.called
  584. def test_num_handles(self):
  585. num_handles = psutil.Process(self.pid).num_handles()
  586. with mock.patch("psutil._psplatform.cext.proc_num_handles",
  587. side_effect=OSError(errno.EPERM, "msg")) as fun:
  588. self.assertEqual(psutil.Process(self.pid).num_handles(),
  589. num_handles)
  590. assert fun.called
  591. def test_cmdline(self):
  592. for pid in psutil.pids():
  593. try:
  594. a = cext.proc_cmdline(pid, use_peb=True)
  595. b = cext.proc_cmdline(pid, use_peb=False)
  596. except OSError as err:
  597. err = convert_oserror(err)
  598. if not isinstance(err, (psutil.AccessDenied,
  599. psutil.NoSuchProcess)):
  600. raise
  601. else:
  602. self.assertEqual(a, b)
  603. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  604. class RemoteProcessTestCase(PsutilTestCase):
  605. """Certain functions require calling ReadProcessMemory.
  606. This trivially works when called on the current process.
  607. Check that this works on other processes, especially when they
  608. have a different bitness.
  609. """
  610. @staticmethod
  611. def find_other_interpreter():
  612. # find a python interpreter that is of the opposite bitness from us
  613. code = "import sys; sys.stdout.write(str(sys.maxsize > 2**32))"
  614. # XXX: a different and probably more stable approach might be to access
  615. # the registry but accessing 64 bit paths from a 32 bit process
  616. for filename in glob.glob(r"C:\Python*\python.exe"):
  617. proc = subprocess.Popen(args=[filename, "-c", code],
  618. stdout=subprocess.PIPE,
  619. stderr=subprocess.STDOUT)
  620. output, _ = proc.communicate()
  621. proc.wait()
  622. if output == str(not IS_64BIT):
  623. return filename
  624. test_args = ["-c", "import sys; sys.stdin.read()"]
  625. def setUp(self):
  626. super().setUp()
  627. other_python = self.find_other_interpreter()
  628. if other_python is None:
  629. raise unittest.SkipTest(
  630. "could not find interpreter with opposite bitness")
  631. if IS_64BIT:
  632. self.python64 = sys.executable
  633. self.python32 = other_python
  634. else:
  635. self.python64 = other_python
  636. self.python32 = sys.executable
  637. env = os.environ.copy()
  638. env["THINK_OF_A_NUMBER"] = str(os.getpid())
  639. self.proc32 = self.spawn_testproc(
  640. [self.python32] + self.test_args,
  641. env=env,
  642. stdin=subprocess.PIPE)
  643. self.proc64 = self.spawn_testproc(
  644. [self.python64] + self.test_args,
  645. env=env,
  646. stdin=subprocess.PIPE)
  647. def tearDown(self):
  648. super().tearDown()
  649. self.proc32.communicate()
  650. self.proc64.communicate()
  651. def test_cmdline_32(self):
  652. p = psutil.Process(self.proc32.pid)
  653. self.assertEqual(len(p.cmdline()), 3)
  654. self.assertEqual(p.cmdline()[1:], self.test_args)
  655. def test_cmdline_64(self):
  656. p = psutil.Process(self.proc64.pid)
  657. self.assertEqual(len(p.cmdline()), 3)
  658. self.assertEqual(p.cmdline()[1:], self.test_args)
  659. def test_cwd_32(self):
  660. p = psutil.Process(self.proc32.pid)
  661. self.assertEqual(p.cwd(), os.getcwd())
  662. def test_cwd_64(self):
  663. p = psutil.Process(self.proc64.pid)
  664. self.assertEqual(p.cwd(), os.getcwd())
  665. def test_environ_32(self):
  666. p = psutil.Process(self.proc32.pid)
  667. e = p.environ()
  668. self.assertIn("THINK_OF_A_NUMBER", e)
  669. self.assertEqual(e["THINK_OF_A_NUMBER"], str(os.getpid()))
  670. def test_environ_64(self):
  671. p = psutil.Process(self.proc64.pid)
  672. try:
  673. p.environ()
  674. except psutil.AccessDenied:
  675. pass
  676. # ===================================================================
  677. # Windows services
  678. # ===================================================================
  679. @unittest.skipIf(not WINDOWS, "WINDOWS only")
  680. class TestServices(PsutilTestCase):
  681. def test_win_service_iter(self):
  682. valid_statuses = set([
  683. "running",
  684. "paused",
  685. "start",
  686. "pause",
  687. "continue",
  688. "stop",
  689. "stopped",
  690. ])
  691. valid_start_types = set([
  692. "automatic",
  693. "manual",
  694. "disabled",
  695. ])
  696. valid_statuses = set([
  697. "running",
  698. "paused",
  699. "start_pending",
  700. "pause_pending",
  701. "continue_pending",
  702. "stop_pending",
  703. "stopped"
  704. ])
  705. for serv in psutil.win_service_iter():
  706. data = serv.as_dict()
  707. self.assertIsInstance(data['name'], str)
  708. self.assertNotEqual(data['name'].strip(), "")
  709. self.assertIsInstance(data['display_name'], str)
  710. self.assertIsInstance(data['username'], str)
  711. self.assertIn(data['status'], valid_statuses)
  712. if data['pid'] is not None:
  713. psutil.Process(data['pid'])
  714. self.assertIsInstance(data['binpath'], str)
  715. self.assertIsInstance(data['username'], str)
  716. self.assertIsInstance(data['start_type'], str)
  717. self.assertIn(data['start_type'], valid_start_types)
  718. self.assertIn(data['status'], valid_statuses)
  719. self.assertIsInstance(data['description'], str)
  720. pid = serv.pid()
  721. if pid is not None:
  722. p = psutil.Process(pid)
  723. self.assertTrue(p.is_running())
  724. # win_service_get
  725. s = psutil.win_service_get(serv.name())
  726. # test __eq__
  727. self.assertEqual(serv, s)
  728. def test_win_service_get(self):
  729. ERROR_SERVICE_DOES_NOT_EXIST = \
  730. psutil._psplatform.cext.ERROR_SERVICE_DOES_NOT_EXIST
  731. ERROR_ACCESS_DENIED = psutil._psplatform.cext.ERROR_ACCESS_DENIED
  732. name = next(psutil.win_service_iter()).name()
  733. with self.assertRaises(psutil.NoSuchProcess) as cm:
  734. psutil.win_service_get(name + '???')
  735. self.assertEqual(cm.exception.name, name + '???')
  736. # test NoSuchProcess
  737. service = psutil.win_service_get(name)
  738. if PY3:
  739. args = (0, "msg", 0, ERROR_SERVICE_DOES_NOT_EXIST)
  740. else:
  741. args = (ERROR_SERVICE_DOES_NOT_EXIST, "msg")
  742. exc = WindowsError(*args)
  743. with mock.patch("psutil._psplatform.cext.winservice_query_status",
  744. side_effect=exc):
  745. self.assertRaises(psutil.NoSuchProcess, service.status)
  746. with mock.patch("psutil._psplatform.cext.winservice_query_config",
  747. side_effect=exc):
  748. self.assertRaises(psutil.NoSuchProcess, service.username)
  749. # test AccessDenied
  750. if PY3:
  751. args = (0, "msg", 0, ERROR_ACCESS_DENIED)
  752. else:
  753. args = (ERROR_ACCESS_DENIED, "msg")
  754. exc = WindowsError(*args)
  755. with mock.patch("psutil._psplatform.cext.winservice_query_status",
  756. side_effect=exc):
  757. self.assertRaises(psutil.AccessDenied, service.status)
  758. with mock.patch("psutil._psplatform.cext.winservice_query_config",
  759. side_effect=exc):
  760. self.assertRaises(psutil.AccessDenied, service.username)
  761. # test __str__ and __repr__
  762. self.assertIn(service.name(), str(service))
  763. self.assertIn(service.display_name(), str(service))
  764. self.assertIn(service.name(), repr(service))
  765. self.assertIn(service.display_name(), repr(service))
  766. if __name__ == '__main__':
  767. from psutil.tests.runner import run_from_name
  768. run_from_name(__file__)