test_linux.py 91 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277
  1. #!/usr/bin/env python3
  2. # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
  3. # Use of this source code is governed by a BSD-style license that can be
  4. # found in the LICENSE file.
  5. """Linux specific tests."""
  6. from __future__ import division
  7. import collections
  8. import contextlib
  9. import errno
  10. import glob
  11. import io
  12. import os
  13. import re
  14. import shutil
  15. import socket
  16. import struct
  17. import textwrap
  18. import time
  19. import unittest
  20. import warnings
  21. import psutil
  22. from psutil import LINUX
  23. from psutil._compat import PY3
  24. from psutil._compat import FileNotFoundError
  25. from psutil._compat import basestring
  26. from psutil._compat import u
  27. from psutil.tests import GITHUB_ACTIONS
  28. from psutil.tests import GLOBAL_TIMEOUT
  29. from psutil.tests import HAS_BATTERY
  30. from psutil.tests import HAS_CPU_FREQ
  31. from psutil.tests import HAS_GETLOADAVG
  32. from psutil.tests import HAS_RLIMIT
  33. from psutil.tests import PYPY
  34. from psutil.tests import TOLERANCE_DISK_USAGE
  35. from psutil.tests import TOLERANCE_SYS_MEM
  36. from psutil.tests import PsutilTestCase
  37. from psutil.tests import ThreadTask
  38. from psutil.tests import call_until
  39. from psutil.tests import mock
  40. from psutil.tests import reload_module
  41. from psutil.tests import retry_on_failure
  42. from psutil.tests import safe_rmpath
  43. from psutil.tests import sh
  44. from psutil.tests import skip_on_not_implemented
  45. from psutil.tests import which
  46. if LINUX:
  47. from psutil._pslinux import CLOCK_TICKS
  48. from psutil._pslinux import RootFsDeviceFinder
  49. from psutil._pslinux import calculate_avail_vmem
  50. from psutil._pslinux import open_binary
  51. HERE = os.path.abspath(os.path.dirname(__file__))
  52. SIOCGIFADDR = 0x8915
  53. SIOCGIFCONF = 0x8912
  54. SIOCGIFHWADDR = 0x8927
  55. SIOCGIFNETMASK = 0x891b
  56. SIOCGIFBRDADDR = 0x8919
  57. if LINUX:
  58. SECTOR_SIZE = 512
  59. EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
  60. # =====================================================================
  61. # --- utils
  62. # =====================================================================
  63. def get_ipv4_address(ifname):
  64. import fcntl
  65. ifname = ifname[:15]
  66. if PY3:
  67. ifname = bytes(ifname, 'ascii')
  68. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  69. with contextlib.closing(s):
  70. return socket.inet_ntoa(
  71. fcntl.ioctl(s.fileno(),
  72. SIOCGIFADDR,
  73. struct.pack('256s', ifname))[20:24])
  74. def get_ipv4_netmask(ifname):
  75. import fcntl
  76. ifname = ifname[:15]
  77. if PY3:
  78. ifname = bytes(ifname, 'ascii')
  79. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  80. with contextlib.closing(s):
  81. return socket.inet_ntoa(
  82. fcntl.ioctl(s.fileno(),
  83. SIOCGIFNETMASK,
  84. struct.pack('256s', ifname))[20:24])
  85. def get_ipv4_broadcast(ifname):
  86. import fcntl
  87. ifname = ifname[:15]
  88. if PY3:
  89. ifname = bytes(ifname, 'ascii')
  90. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  91. with contextlib.closing(s):
  92. return socket.inet_ntoa(
  93. fcntl.ioctl(s.fileno(),
  94. SIOCGIFBRDADDR,
  95. struct.pack('256s', ifname))[20:24])
  96. def get_ipv6_addresses(ifname):
  97. with open("/proc/net/if_inet6") as f:
  98. all_fields = []
  99. for line in f.readlines():
  100. fields = line.split()
  101. if fields[-1] == ifname:
  102. all_fields.append(fields)
  103. if len(all_fields) == 0:
  104. raise ValueError("could not find interface %r" % ifname)
  105. for i in range(len(all_fields)):
  106. unformatted = all_fields[i][0]
  107. groups = []
  108. for j in range(0, len(unformatted), 4):
  109. groups.append(unformatted[j:j + 4])
  110. formatted = ":".join(groups)
  111. packed = socket.inet_pton(socket.AF_INET6, formatted)
  112. all_fields[i] = socket.inet_ntop(socket.AF_INET6, packed)
  113. return all_fields
  114. def get_mac_address(ifname):
  115. import fcntl
  116. ifname = ifname[:15]
  117. if PY3:
  118. ifname = bytes(ifname, 'ascii')
  119. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  120. with contextlib.closing(s):
  121. info = fcntl.ioctl(
  122. s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
  123. if PY3:
  124. def ord(x):
  125. return x
  126. else:
  127. import __builtin__
  128. ord = __builtin__.ord
  129. return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
  130. def free_swap():
  131. """Parse 'free' cmd and return swap memory's s total, used and free
  132. values.
  133. """
  134. out = sh(["free", "-b"], env={"LANG": "C.UTF-8"})
  135. lines = out.split('\n')
  136. for line in lines:
  137. if line.startswith('Swap'):
  138. _, total, used, free = line.split()
  139. nt = collections.namedtuple('free', 'total used free')
  140. return nt(int(total), int(used), int(free))
  141. raise ValueError(
  142. "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
  143. def free_physmem():
  144. """Parse 'free' cmd and return physical memory's total, used
  145. and free values.
  146. """
  147. # Note: free can have 2 different formats, invalidating 'shared'
  148. # and 'cached' memory which may have different positions so we
  149. # do not return them.
  150. # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
  151. out = sh(["free", "-b"], env={"LANG": "C.UTF-8"})
  152. lines = out.split('\n')
  153. for line in lines:
  154. if line.startswith('Mem'):
  155. total, used, free, shared = \
  156. (int(x) for x in line.split()[1:5])
  157. nt = collections.namedtuple(
  158. 'free', 'total used free shared output')
  159. return nt(total, used, free, shared, out)
  160. raise ValueError(
  161. "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
  162. def vmstat(stat):
  163. out = sh(["vmstat", "-s"], env={"LANG": "C.UTF-8"})
  164. for line in out.split("\n"):
  165. line = line.strip()
  166. if stat in line:
  167. return int(line.split(' ')[0])
  168. raise ValueError("can't find %r in 'vmstat' output" % stat)
  169. def get_free_version_info():
  170. out = sh(["free", "-V"]).strip()
  171. if 'UNKNOWN' in out:
  172. raise unittest.SkipTest("can't determine free version")
  173. return tuple(map(int, re.findall(r'\d+', out.split()[-1])))
  174. @contextlib.contextmanager
  175. def mock_open_content(for_path, content):
  176. """Mock open() builtin and forces it to return a certain `content`
  177. on read() if the path being opened matches `for_path`.
  178. """
  179. def open_mock(name, *args, **kwargs):
  180. if name == for_path:
  181. if PY3:
  182. if isinstance(content, basestring):
  183. return io.StringIO(content)
  184. else:
  185. return io.BytesIO(content)
  186. else:
  187. return io.BytesIO(content)
  188. else:
  189. return orig_open(name, *args, **kwargs)
  190. orig_open = open
  191. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  192. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  193. yield m
  194. @contextlib.contextmanager
  195. def mock_open_exception(for_path, exc):
  196. """Mock open() builtin and raises `exc` if the path being opened
  197. matches `for_path`.
  198. """
  199. def open_mock(name, *args, **kwargs):
  200. if name == for_path:
  201. raise exc
  202. else:
  203. return orig_open(name, *args, **kwargs)
  204. orig_open = open
  205. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  206. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  207. yield m
  208. # =====================================================================
  209. # --- system virtual memory
  210. # =====================================================================
  211. @unittest.skipIf(not LINUX, "LINUX only")
  212. class TestSystemVirtualMemoryAgainstFree(PsutilTestCase):
  213. def test_total(self):
  214. cli_value = free_physmem().total
  215. psutil_value = psutil.virtual_memory().total
  216. self.assertEqual(cli_value, psutil_value)
  217. @retry_on_failure()
  218. def test_used(self):
  219. # Older versions of procps used slab memory to calculate used memory.
  220. # This got changed in:
  221. # https://gitlab.com/procps-ng/procps/commit/
  222. # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
  223. if get_free_version_info() < (3, 3, 12):
  224. raise self.skipTest("old free version")
  225. cli_value = free_physmem().used
  226. psutil_value = psutil.virtual_memory().used
  227. self.assertAlmostEqual(cli_value, psutil_value,
  228. delta=TOLERANCE_SYS_MEM)
  229. @retry_on_failure()
  230. def test_free(self):
  231. cli_value = free_physmem().free
  232. psutil_value = psutil.virtual_memory().free
  233. self.assertAlmostEqual(cli_value, psutil_value,
  234. delta=TOLERANCE_SYS_MEM)
  235. @retry_on_failure()
  236. def test_shared(self):
  237. free = free_physmem()
  238. free_value = free.shared
  239. if free_value == 0:
  240. raise unittest.SkipTest("free does not support 'shared' column")
  241. psutil_value = psutil.virtual_memory().shared
  242. self.assertAlmostEqual(
  243. free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
  244. msg='%s %s \n%s' % (free_value, psutil_value, free.output))
  245. @retry_on_failure()
  246. def test_available(self):
  247. # "free" output format has changed at some point:
  248. # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
  249. out = sh(["free", "-b"])
  250. lines = out.split('\n')
  251. if 'available' not in lines[0]:
  252. raise unittest.SkipTest("free does not support 'available' column")
  253. else:
  254. free_value = int(lines[1].split()[-1])
  255. psutil_value = psutil.virtual_memory().available
  256. self.assertAlmostEqual(
  257. free_value, psutil_value, delta=TOLERANCE_SYS_MEM,
  258. msg='%s %s \n%s' % (free_value, psutil_value, out))
  259. @unittest.skipIf(not LINUX, "LINUX only")
  260. class TestSystemVirtualMemoryAgainstVmstat(PsutilTestCase):
  261. def test_total(self):
  262. vmstat_value = vmstat('total memory') * 1024
  263. psutil_value = psutil.virtual_memory().total
  264. self.assertAlmostEqual(
  265. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  266. @retry_on_failure()
  267. def test_used(self):
  268. # Older versions of procps used slab memory to calculate used memory.
  269. # This got changed in:
  270. # https://gitlab.com/procps-ng/procps/commit/
  271. # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e
  272. if get_free_version_info() < (3, 3, 12):
  273. raise self.skipTest("old free version")
  274. vmstat_value = vmstat('used memory') * 1024
  275. psutil_value = psutil.virtual_memory().used
  276. self.assertAlmostEqual(
  277. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  278. @retry_on_failure()
  279. def test_free(self):
  280. vmstat_value = vmstat('free memory') * 1024
  281. psutil_value = psutil.virtual_memory().free
  282. self.assertAlmostEqual(
  283. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  284. @retry_on_failure()
  285. def test_buffers(self):
  286. vmstat_value = vmstat('buffer memory') * 1024
  287. psutil_value = psutil.virtual_memory().buffers
  288. self.assertAlmostEqual(
  289. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  290. @retry_on_failure()
  291. def test_active(self):
  292. vmstat_value = vmstat('active memory') * 1024
  293. psutil_value = psutil.virtual_memory().active
  294. self.assertAlmostEqual(
  295. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  296. @retry_on_failure()
  297. def test_inactive(self):
  298. vmstat_value = vmstat('inactive memory') * 1024
  299. psutil_value = psutil.virtual_memory().inactive
  300. self.assertAlmostEqual(
  301. vmstat_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  302. @unittest.skipIf(not LINUX, "LINUX only")
  303. class TestSystemVirtualMemoryMocks(PsutilTestCase):
  304. def test_warnings_on_misses(self):
  305. # Emulate a case where /proc/meminfo provides few info.
  306. # psutil is supposed to set the missing fields to 0 and
  307. # raise a warning.
  308. with mock_open_content(
  309. '/proc/meminfo',
  310. textwrap.dedent("""\
  311. Active(anon): 6145416 kB
  312. Active(file): 2950064 kB
  313. Inactive(anon): 574764 kB
  314. Inactive(file): 1567648 kB
  315. MemAvailable: -1 kB
  316. MemFree: 2057400 kB
  317. MemTotal: 16325648 kB
  318. SReclaimable: 346648 kB
  319. """).encode()) as m:
  320. with warnings.catch_warnings(record=True) as ws:
  321. warnings.simplefilter("always")
  322. ret = psutil.virtual_memory()
  323. assert m.called
  324. self.assertEqual(len(ws), 1)
  325. w = ws[0]
  326. self.assertIn(
  327. "memory stats couldn't be determined", str(w.message))
  328. self.assertIn("cached", str(w.message))
  329. self.assertIn("shared", str(w.message))
  330. self.assertIn("active", str(w.message))
  331. self.assertIn("inactive", str(w.message))
  332. self.assertIn("buffers", str(w.message))
  333. self.assertIn("available", str(w.message))
  334. self.assertEqual(ret.cached, 0)
  335. self.assertEqual(ret.active, 0)
  336. self.assertEqual(ret.inactive, 0)
  337. self.assertEqual(ret.shared, 0)
  338. self.assertEqual(ret.buffers, 0)
  339. self.assertEqual(ret.available, 0)
  340. self.assertEqual(ret.slab, 0)
  341. @retry_on_failure()
  342. def test_avail_old_percent(self):
  343. # Make sure that our calculation of avail mem for old kernels
  344. # is off by max 15%.
  345. mems = {}
  346. with open_binary('/proc/meminfo') as f:
  347. for line in f:
  348. fields = line.split()
  349. mems[fields[0]] = int(fields[1]) * 1024
  350. a = calculate_avail_vmem(mems)
  351. if b'MemAvailable:' in mems:
  352. b = mems[b'MemAvailable:']
  353. diff_percent = abs(a - b) / a * 100
  354. self.assertLess(diff_percent, 15)
  355. def test_avail_old_comes_from_kernel(self):
  356. # Make sure "MemAvailable:" coluimn is used instead of relying
  357. # on our internal algorithm to calculate avail mem.
  358. with mock_open_content(
  359. '/proc/meminfo',
  360. textwrap.dedent("""\
  361. Active: 9444728 kB
  362. Active(anon): 6145416 kB
  363. Active(file): 2950064 kB
  364. Buffers: 287952 kB
  365. Cached: 4818144 kB
  366. Inactive(file): 1578132 kB
  367. Inactive(anon): 574764 kB
  368. Inactive(file): 1567648 kB
  369. MemAvailable: 6574984 kB
  370. MemFree: 2057400 kB
  371. MemTotal: 16325648 kB
  372. Shmem: 577588 kB
  373. SReclaimable: 346648 kB
  374. """).encode()) as m:
  375. with warnings.catch_warnings(record=True) as ws:
  376. ret = psutil.virtual_memory()
  377. assert m.called
  378. self.assertEqual(ret.available, 6574984 * 1024)
  379. w = ws[0]
  380. self.assertIn(
  381. "inactive memory stats couldn't be determined", str(w.message))
  382. def test_avail_old_missing_fields(self):
  383. # Remove Active(file), Inactive(file) and SReclaimable
  384. # from /proc/meminfo and make sure the fallback is used
  385. # (free + cached),
  386. with mock_open_content(
  387. "/proc/meminfo",
  388. textwrap.dedent("""\
  389. Active: 9444728 kB
  390. Active(anon): 6145416 kB
  391. Buffers: 287952 kB
  392. Cached: 4818144 kB
  393. Inactive(file): 1578132 kB
  394. Inactive(anon): 574764 kB
  395. MemFree: 2057400 kB
  396. MemTotal: 16325648 kB
  397. Shmem: 577588 kB
  398. """).encode()) as m:
  399. with warnings.catch_warnings(record=True) as ws:
  400. ret = psutil.virtual_memory()
  401. assert m.called
  402. self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
  403. w = ws[0]
  404. self.assertIn(
  405. "inactive memory stats couldn't be determined", str(w.message))
  406. def test_avail_old_missing_zoneinfo(self):
  407. # Remove /proc/zoneinfo file. Make sure fallback is used
  408. # (free + cached).
  409. with mock_open_content(
  410. "/proc/meminfo",
  411. textwrap.dedent("""\
  412. Active: 9444728 kB
  413. Active(anon): 6145416 kB
  414. Active(file): 2950064 kB
  415. Buffers: 287952 kB
  416. Cached: 4818144 kB
  417. Inactive(file): 1578132 kB
  418. Inactive(anon): 574764 kB
  419. Inactive(file): 1567648 kB
  420. MemFree: 2057400 kB
  421. MemTotal: 16325648 kB
  422. Shmem: 577588 kB
  423. SReclaimable: 346648 kB
  424. """).encode()):
  425. with mock_open_exception(
  426. "/proc/zoneinfo",
  427. IOError(errno.ENOENT, 'no such file or directory')):
  428. with warnings.catch_warnings(record=True) as ws:
  429. ret = psutil.virtual_memory()
  430. self.assertEqual(
  431. ret.available, 2057400 * 1024 + 4818144 * 1024)
  432. w = ws[0]
  433. self.assertIn(
  434. "inactive memory stats couldn't be determined",
  435. str(w.message))
  436. def test_virtual_memory_mocked(self):
  437. # Emulate /proc/meminfo because neither vmstat nor free return slab.
  438. def open_mock(name, *args, **kwargs):
  439. if name == '/proc/meminfo':
  440. return io.BytesIO(textwrap.dedent("""\
  441. MemTotal: 100 kB
  442. MemFree: 2 kB
  443. MemAvailable: 3 kB
  444. Buffers: 4 kB
  445. Cached: 5 kB
  446. SwapCached: 6 kB
  447. Active: 7 kB
  448. Inactive: 8 kB
  449. Active(anon): 9 kB
  450. Inactive(anon): 10 kB
  451. Active(file): 11 kB
  452. Inactive(file): 12 kB
  453. Unevictable: 13 kB
  454. Mlocked: 14 kB
  455. SwapTotal: 15 kB
  456. SwapFree: 16 kB
  457. Dirty: 17 kB
  458. Writeback: 18 kB
  459. AnonPages: 19 kB
  460. Mapped: 20 kB
  461. Shmem: 21 kB
  462. Slab: 22 kB
  463. SReclaimable: 23 kB
  464. SUnreclaim: 24 kB
  465. KernelStack: 25 kB
  466. PageTables: 26 kB
  467. NFS_Unstable: 27 kB
  468. Bounce: 28 kB
  469. WritebackTmp: 29 kB
  470. CommitLimit: 30 kB
  471. Committed_AS: 31 kB
  472. VmallocTotal: 32 kB
  473. VmallocUsed: 33 kB
  474. VmallocChunk: 34 kB
  475. HardwareCorrupted: 35 kB
  476. AnonHugePages: 36 kB
  477. ShmemHugePages: 37 kB
  478. ShmemPmdMapped: 38 kB
  479. CmaTotal: 39 kB
  480. CmaFree: 40 kB
  481. HugePages_Total: 41 kB
  482. HugePages_Free: 42 kB
  483. HugePages_Rsvd: 43 kB
  484. HugePages_Surp: 44 kB
  485. Hugepagesize: 45 kB
  486. DirectMap46k: 46 kB
  487. DirectMap47M: 47 kB
  488. DirectMap48G: 48 kB
  489. """).encode())
  490. else:
  491. return orig_open(name, *args, **kwargs)
  492. orig_open = open
  493. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  494. with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
  495. mem = psutil.virtual_memory()
  496. assert m.called
  497. self.assertEqual(mem.total, 100 * 1024)
  498. self.assertEqual(mem.free, 2 * 1024)
  499. self.assertEqual(mem.buffers, 4 * 1024)
  500. # cached mem also includes reclaimable memory
  501. self.assertEqual(mem.cached, (5 + 23) * 1024)
  502. self.assertEqual(mem.shared, 21 * 1024)
  503. self.assertEqual(mem.active, 7 * 1024)
  504. self.assertEqual(mem.inactive, 8 * 1024)
  505. self.assertEqual(mem.slab, 22 * 1024)
  506. self.assertEqual(mem.available, 3 * 1024)
  507. # =====================================================================
  508. # --- system swap memory
  509. # =====================================================================
  510. @unittest.skipIf(not LINUX, "LINUX only")
  511. class TestSystemSwapMemory(PsutilTestCase):
  512. @staticmethod
  513. def meminfo_has_swap_info():
  514. """Return True if /proc/meminfo provides swap metrics."""
  515. with open("/proc/meminfo") as f:
  516. data = f.read()
  517. return 'SwapTotal:' in data and 'SwapFree:' in data
  518. def test_total(self):
  519. free_value = free_swap().total
  520. psutil_value = psutil.swap_memory().total
  521. return self.assertAlmostEqual(
  522. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  523. @retry_on_failure()
  524. def test_used(self):
  525. free_value = free_swap().used
  526. psutil_value = psutil.swap_memory().used
  527. return self.assertAlmostEqual(
  528. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  529. @retry_on_failure()
  530. def test_free(self):
  531. free_value = free_swap().free
  532. psutil_value = psutil.swap_memory().free
  533. return self.assertAlmostEqual(
  534. free_value, psutil_value, delta=TOLERANCE_SYS_MEM)
  535. def test_missing_sin_sout(self):
  536. with mock.patch('psutil._common.open', create=True) as m:
  537. with warnings.catch_warnings(record=True) as ws:
  538. warnings.simplefilter("always")
  539. ret = psutil.swap_memory()
  540. assert m.called
  541. self.assertEqual(len(ws), 1)
  542. w = ws[0]
  543. self.assertIn(
  544. "'sin' and 'sout' swap memory stats couldn't "
  545. "be determined", str(w.message))
  546. self.assertEqual(ret.sin, 0)
  547. self.assertEqual(ret.sout, 0)
  548. def test_no_vmstat_mocked(self):
  549. # see https://github.com/giampaolo/psutil/issues/722
  550. with mock_open_exception(
  551. "/proc/vmstat",
  552. IOError(errno.ENOENT, 'no such file or directory')) as m:
  553. with warnings.catch_warnings(record=True) as ws:
  554. warnings.simplefilter("always")
  555. ret = psutil.swap_memory()
  556. assert m.called
  557. self.assertEqual(len(ws), 1)
  558. w = ws[0]
  559. self.assertIn(
  560. "'sin' and 'sout' swap memory stats couldn't "
  561. "be determined and were set to 0",
  562. str(w.message))
  563. self.assertEqual(ret.sin, 0)
  564. self.assertEqual(ret.sout, 0)
  565. def test_meminfo_against_sysinfo(self):
  566. # Make sure the content of /proc/meminfo about swap memory
  567. # matches sysinfo() syscall, see:
  568. # https://github.com/giampaolo/psutil/issues/1015
  569. if not self.meminfo_has_swap_info():
  570. return unittest.skip("/proc/meminfo has no swap metrics")
  571. with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
  572. swap = psutil.swap_memory()
  573. assert not m.called
  574. import psutil._psutil_linux as cext
  575. _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
  576. total *= unit_multiplier
  577. free *= unit_multiplier
  578. self.assertEqual(swap.total, total)
  579. self.assertAlmostEqual(swap.free, free, delta=TOLERANCE_SYS_MEM)
  580. def test_emulate_meminfo_has_no_metrics(self):
  581. # Emulate a case where /proc/meminfo provides no swap metrics
  582. # in which case sysinfo() syscall is supposed to be used
  583. # as a fallback.
  584. with mock_open_content("/proc/meminfo", b"") as m:
  585. psutil.swap_memory()
  586. assert m.called
  587. # =====================================================================
  588. # --- system CPU
  589. # =====================================================================
  590. @unittest.skipIf(not LINUX, "LINUX only")
  591. class TestSystemCPUTimes(PsutilTestCase):
  592. def test_fields(self):
  593. fields = psutil.cpu_times()._fields
  594. kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
  595. kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
  596. if kernel_ver_info >= (2, 6, 11):
  597. self.assertIn('steal', fields)
  598. else:
  599. self.assertNotIn('steal', fields)
  600. if kernel_ver_info >= (2, 6, 24):
  601. self.assertIn('guest', fields)
  602. else:
  603. self.assertNotIn('guest', fields)
  604. if kernel_ver_info >= (3, 2, 0):
  605. self.assertIn('guest_nice', fields)
  606. else:
  607. self.assertNotIn('guest_nice', fields)
  608. @unittest.skipIf(not LINUX, "LINUX only")
  609. class TestSystemCPUCountLogical(PsutilTestCase):
  610. @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
  611. "/sys/devices/system/cpu/online does not exist")
  612. def test_against_sysdev_cpu_online(self):
  613. with open("/sys/devices/system/cpu/online") as f:
  614. value = f.read().strip()
  615. if "-" in str(value):
  616. value = int(value.split('-')[1]) + 1
  617. self.assertEqual(psutil.cpu_count(), value)
  618. @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
  619. "/sys/devices/system/cpu does not exist")
  620. def test_against_sysdev_cpu_num(self):
  621. ls = os.listdir("/sys/devices/system/cpu")
  622. count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
  623. self.assertEqual(psutil.cpu_count(), count)
  624. @unittest.skipIf(not which("nproc"), "nproc utility not available")
  625. def test_against_nproc(self):
  626. num = int(sh("nproc --all"))
  627. self.assertEqual(psutil.cpu_count(logical=True), num)
  628. @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
  629. def test_against_lscpu(self):
  630. out = sh("lscpu -p")
  631. num = len([x for x in out.split('\n') if not x.startswith('#')])
  632. self.assertEqual(psutil.cpu_count(logical=True), num)
  633. def test_emulate_fallbacks(self):
  634. import psutil._pslinux
  635. original = psutil._pslinux.cpu_count_logical()
  636. # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
  637. # order to cause the parsing of /proc/cpuinfo and /proc/stat.
  638. with mock.patch(
  639. 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
  640. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  641. assert m.called
  642. # Let's have open() return empty data and make sure None is
  643. # returned ('cause we mimic os.cpu_count()).
  644. with mock.patch('psutil._common.open', create=True) as m:
  645. self.assertIsNone(psutil._pslinux.cpu_count_logical())
  646. self.assertEqual(m.call_count, 2)
  647. # /proc/stat should be the last one
  648. self.assertEqual(m.call_args[0][0], '/proc/stat')
  649. # Let's push this a bit further and make sure /proc/cpuinfo
  650. # parsing works as expected.
  651. with open('/proc/cpuinfo', 'rb') as f:
  652. cpuinfo_data = f.read()
  653. fake_file = io.BytesIO(cpuinfo_data)
  654. with mock.patch('psutil._common.open',
  655. return_value=fake_file, create=True) as m:
  656. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  657. # Finally, let's make /proc/cpuinfo return meaningless data;
  658. # this way we'll fall back on relying on /proc/stat
  659. with mock_open_content('/proc/cpuinfo', b"") as m:
  660. self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
  661. assert m.called
  662. @unittest.skipIf(not LINUX, "LINUX only")
  663. class TestSystemCPUCountCores(PsutilTestCase):
  664. @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
  665. def test_against_lscpu(self):
  666. out = sh("lscpu -p")
  667. core_ids = set()
  668. for line in out.split('\n'):
  669. if not line.startswith('#'):
  670. fields = line.split(',')
  671. core_ids.add(fields[1])
  672. self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
  673. def test_method_2(self):
  674. meth_1 = psutil._pslinux.cpu_count_cores()
  675. with mock.patch('glob.glob', return_value=[]) as m:
  676. meth_2 = psutil._pslinux.cpu_count_cores()
  677. assert m.called
  678. if meth_1 is not None:
  679. self.assertEqual(meth_1, meth_2)
  680. def test_emulate_none(self):
  681. with mock.patch('glob.glob', return_value=[]) as m1:
  682. with mock.patch('psutil._common.open', create=True) as m2:
  683. self.assertIsNone(psutil._pslinux.cpu_count_cores())
  684. assert m1.called
  685. assert m2.called
  686. @unittest.skipIf(not LINUX, "LINUX only")
  687. class TestSystemCPUFrequency(PsutilTestCase):
  688. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  689. def test_emulate_use_second_file(self):
  690. # https://github.com/giampaolo/psutil/issues/981
  691. def path_exists_mock(path):
  692. if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
  693. return False
  694. else:
  695. return orig_exists(path)
  696. orig_exists = os.path.exists
  697. with mock.patch("os.path.exists", side_effect=path_exists_mock,
  698. create=True):
  699. assert psutil.cpu_freq()
  700. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  701. def test_emulate_use_cpuinfo(self):
  702. # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
  703. # exist and /proc/cpuinfo is used instead.
  704. def path_exists_mock(path):
  705. if path.startswith('/sys/devices/system/cpu/'):
  706. return False
  707. else:
  708. return os_path_exists(path)
  709. os_path_exists = os.path.exists
  710. try:
  711. with mock.patch("os.path.exists", side_effect=path_exists_mock):
  712. reload_module(psutil._pslinux)
  713. ret = psutil.cpu_freq()
  714. assert ret
  715. self.assertEqual(ret.max, 0.0)
  716. self.assertEqual(ret.min, 0.0)
  717. for freq in psutil.cpu_freq(percpu=True):
  718. self.assertEqual(freq.max, 0.0)
  719. self.assertEqual(freq.min, 0.0)
  720. finally:
  721. reload_module(psutil._pslinux)
  722. reload_module(psutil)
  723. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  724. def test_emulate_data(self):
  725. def open_mock(name, *args, **kwargs):
  726. if (name.endswith('/scaling_cur_freq') and
  727. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  728. return io.BytesIO(b"500000")
  729. elif (name.endswith('/scaling_min_freq') and
  730. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  731. return io.BytesIO(b"600000")
  732. elif (name.endswith('/scaling_max_freq') and
  733. name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
  734. return io.BytesIO(b"700000")
  735. elif name == '/proc/cpuinfo':
  736. return io.BytesIO(b"cpu MHz : 500")
  737. else:
  738. return orig_open(name, *args, **kwargs)
  739. orig_open = open
  740. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  741. with mock.patch(patch_point, side_effect=open_mock):
  742. with mock.patch(
  743. 'os.path.exists', return_value=True):
  744. freq = psutil.cpu_freq()
  745. self.assertEqual(freq.current, 500.0)
  746. # when /proc/cpuinfo is used min and max frequencies are not
  747. # available and are set to 0.
  748. if freq.min != 0.0:
  749. self.assertEqual(freq.min, 600.0)
  750. if freq.max != 0.0:
  751. self.assertEqual(freq.max, 700.0)
  752. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  753. def test_emulate_multi_cpu(self):
  754. def open_mock(name, *args, **kwargs):
  755. n = name
  756. if (n.endswith('/scaling_cur_freq') and
  757. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  758. return io.BytesIO(b"100000")
  759. elif (n.endswith('/scaling_min_freq') and
  760. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  761. return io.BytesIO(b"200000")
  762. elif (n.endswith('/scaling_max_freq') and
  763. n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
  764. return io.BytesIO(b"300000")
  765. elif (n.endswith('/scaling_cur_freq') and
  766. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  767. return io.BytesIO(b"400000")
  768. elif (n.endswith('/scaling_min_freq') and
  769. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  770. return io.BytesIO(b"500000")
  771. elif (n.endswith('/scaling_max_freq') and
  772. n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
  773. return io.BytesIO(b"600000")
  774. elif name == '/proc/cpuinfo':
  775. return io.BytesIO(b"cpu MHz : 100\n"
  776. b"cpu MHz : 400")
  777. else:
  778. return orig_open(name, *args, **kwargs)
  779. orig_open = open
  780. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  781. with mock.patch(patch_point, side_effect=open_mock):
  782. with mock.patch('os.path.exists', return_value=True):
  783. with mock.patch('psutil._pslinux.cpu_count_logical',
  784. return_value=2):
  785. freq = psutil.cpu_freq(percpu=True)
  786. self.assertEqual(freq[0].current, 100.0)
  787. if freq[0].min != 0.0:
  788. self.assertEqual(freq[0].min, 200.0)
  789. if freq[0].max != 0.0:
  790. self.assertEqual(freq[0].max, 300.0)
  791. self.assertEqual(freq[1].current, 400.0)
  792. if freq[1].min != 0.0:
  793. self.assertEqual(freq[1].min, 500.0)
  794. if freq[1].max != 0.0:
  795. self.assertEqual(freq[1].max, 600.0)
  796. @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
  797. def test_emulate_no_scaling_cur_freq_file(self):
  798. # See: https://github.com/giampaolo/psutil/issues/1071
  799. def open_mock(name, *args, **kwargs):
  800. if name.endswith('/scaling_cur_freq'):
  801. raise IOError(errno.ENOENT, "")
  802. elif name.endswith('/cpuinfo_cur_freq'):
  803. return io.BytesIO(b"200000")
  804. elif name == '/proc/cpuinfo':
  805. return io.BytesIO(b"cpu MHz : 200")
  806. else:
  807. return orig_open(name, *args, **kwargs)
  808. orig_open = open
  809. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  810. with mock.patch(patch_point, side_effect=open_mock):
  811. with mock.patch('os.path.exists', return_value=True):
  812. with mock.patch('psutil._pslinux.cpu_count_logical',
  813. return_value=1):
  814. freq = psutil.cpu_freq()
  815. self.assertEqual(freq.current, 200)
  816. @unittest.skipIf(not LINUX, "LINUX only")
  817. class TestSystemCPUStats(PsutilTestCase):
  818. def test_ctx_switches(self):
  819. vmstat_value = vmstat("context switches")
  820. psutil_value = psutil.cpu_stats().ctx_switches
  821. self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
  822. def test_interrupts(self):
  823. vmstat_value = vmstat("interrupts")
  824. psutil_value = psutil.cpu_stats().interrupts
  825. self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
  826. @unittest.skipIf(not LINUX, "LINUX only")
  827. class TestLoadAvg(PsutilTestCase):
  828. @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
  829. def test_getloadavg(self):
  830. psutil_value = psutil.getloadavg()
  831. with open("/proc/loadavg") as f:
  832. proc_value = f.read().split()
  833. self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
  834. self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
  835. self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
  836. # =====================================================================
  837. # --- system network
  838. # =====================================================================
  839. @unittest.skipIf(not LINUX, "LINUX only")
  840. class TestSystemNetIfAddrs(PsutilTestCase):
  841. def test_ips(self):
  842. for name, addrs in psutil.net_if_addrs().items():
  843. for addr in addrs:
  844. if addr.family == psutil.AF_LINK:
  845. self.assertEqual(addr.address, get_mac_address(name))
  846. elif addr.family == socket.AF_INET:
  847. self.assertEqual(addr.address, get_ipv4_address(name))
  848. self.assertEqual(addr.netmask, get_ipv4_netmask(name))
  849. if addr.broadcast is not None:
  850. self.assertEqual(addr.broadcast,
  851. get_ipv4_broadcast(name))
  852. else:
  853. self.assertEqual(get_ipv4_broadcast(name), '0.0.0.0')
  854. elif addr.family == socket.AF_INET6:
  855. # IPv6 addresses can have a percent symbol at the end.
  856. # E.g. these 2 are equivalent:
  857. # "fe80::1ff:fe23:4567:890a"
  858. # "fe80::1ff:fe23:4567:890a%eth0"
  859. # That is the "zone id" portion, which usually is the name
  860. # of the network interface.
  861. address = addr.address.split('%')[0]
  862. self.assertIn(address, get_ipv6_addresses(name))
  863. # XXX - not reliable when having virtual NICs installed by Docker.
  864. # @unittest.skipIf(not which('ip'), "'ip' utility not available")
  865. # def test_net_if_names(self):
  866. # out = sh("ip addr").strip()
  867. # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
  868. # found = 0
  869. # for line in out.split('\n'):
  870. # line = line.strip()
  871. # if re.search(r"^\d+:", line):
  872. # found += 1
  873. # name = line.split(':')[1].strip()
  874. # self.assertIn(name, nics)
  875. # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
  876. # pprint.pformat(nics), out))
  877. @unittest.skipIf(not LINUX, "LINUX only")
  878. class TestSystemNetIfStats(PsutilTestCase):
  879. @unittest.skipIf(not which("ifconfig"), "ifconfig utility not available")
  880. def test_against_ifconfig(self):
  881. for name, stats in psutil.net_if_stats().items():
  882. try:
  883. out = sh("ifconfig %s" % name)
  884. except RuntimeError:
  885. pass
  886. else:
  887. self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
  888. self.assertEqual(stats.mtu,
  889. int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
  890. def test_mtu(self):
  891. for name, stats in psutil.net_if_stats().items():
  892. with open("/sys/class/net/%s/mtu" % name) as f:
  893. self.assertEqual(stats.mtu, int(f.read().strip()))
  894. @unittest.skipIf(not which("ifconfig"), "ifconfig utility not available")
  895. def test_flags(self):
  896. # first line looks like this:
  897. # "eth0: flags=4163<UP,BROADCAST,RUNNING,MULTICAST> mtu 1500"
  898. matches_found = 0
  899. for name, stats in psutil.net_if_stats().items():
  900. try:
  901. out = sh("ifconfig %s" % name)
  902. except RuntimeError:
  903. pass
  904. else:
  905. match = re.search(r"flags=(\d+)?<(.*?)>", out)
  906. if match and len(match.groups()) >= 2:
  907. matches_found += 1
  908. ifconfig_flags = set(match.group(2).lower().split(","))
  909. psutil_flags = set(stats.flags.split(","))
  910. self.assertEqual(ifconfig_flags, psutil_flags)
  911. else:
  912. # ifconfig has a different output on CentOS 6
  913. # let's try that
  914. match = re.search(r"(.*) MTU:(\d+) Metric:(\d+)", out)
  915. if match and len(match.groups()) >= 3:
  916. matches_found += 1
  917. ifconfig_flags = set(match.group(1).lower().split())
  918. psutil_flags = set(stats.flags.split(","))
  919. self.assertEqual(ifconfig_flags, psutil_flags)
  920. if not matches_found:
  921. raise self.fail("no matches were found")
  922. @unittest.skipIf(not LINUX, "LINUX only")
  923. class TestSystemNetIOCounters(PsutilTestCase):
  924. @unittest.skipIf(not which("ifconfig"), "ifconfig utility not available")
  925. @retry_on_failure()
  926. def test_against_ifconfig(self):
  927. def ifconfig(nic):
  928. ret = {}
  929. out = sh("ifconfig %s" % nic)
  930. ret['packets_recv'] = int(
  931. re.findall(r'RX packets[: ](\d+)', out)[0])
  932. ret['packets_sent'] = int(
  933. re.findall(r'TX packets[: ](\d+)', out)[0])
  934. ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
  935. ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
  936. ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
  937. ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
  938. ret['bytes_recv'] = int(
  939. re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
  940. ret['bytes_sent'] = int(
  941. re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
  942. return ret
  943. nio = psutil.net_io_counters(pernic=True, nowrap=False)
  944. for name, stats in nio.items():
  945. try:
  946. ifconfig_ret = ifconfig(name)
  947. except RuntimeError:
  948. continue
  949. self.assertAlmostEqual(
  950. stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
  951. self.assertAlmostEqual(
  952. stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
  953. self.assertAlmostEqual(
  954. stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
  955. self.assertAlmostEqual(
  956. stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
  957. self.assertAlmostEqual(
  958. stats.errin, ifconfig_ret['errin'], delta=10)
  959. self.assertAlmostEqual(
  960. stats.errout, ifconfig_ret['errout'], delta=10)
  961. self.assertAlmostEqual(
  962. stats.dropin, ifconfig_ret['dropin'], delta=10)
  963. self.assertAlmostEqual(
  964. stats.dropout, ifconfig_ret['dropout'], delta=10)
  965. @unittest.skipIf(not LINUX, "LINUX only")
  966. class TestSystemNetConnections(PsutilTestCase):
  967. @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
  968. @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
  969. def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
  970. # see: https://github.com/giampaolo/psutil/issues/623
  971. try:
  972. s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
  973. self.addCleanup(s.close)
  974. s.bind(("::1", 0))
  975. except socket.error:
  976. pass
  977. psutil.net_connections(kind='inet6')
  978. def test_emulate_unix(self):
  979. with mock_open_content(
  980. '/proc/net/unix',
  981. textwrap.dedent("""\
  982. 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
  983. 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
  984. 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
  985. 000000000000000000000000000000000000000000000000000000
  986. """)) as m:
  987. psutil.net_connections(kind='unix')
  988. assert m.called
  989. # =====================================================================
  990. # --- system disks
  991. # =====================================================================
  992. @unittest.skipIf(not LINUX, "LINUX only")
  993. class TestSystemDiskPartitions(PsutilTestCase):
  994. @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
  995. @skip_on_not_implemented()
  996. def test_against_df(self):
  997. # test psutil.disk_usage() and psutil.disk_partitions()
  998. # against "df -a"
  999. def df(path):
  1000. out = sh('df -P -B 1 "%s"' % path).strip()
  1001. lines = out.split('\n')
  1002. lines.pop(0)
  1003. line = lines.pop(0)
  1004. dev, total, used, free = line.split()[:4]
  1005. if dev == 'none':
  1006. dev = ''
  1007. total, used, free = int(total), int(used), int(free)
  1008. return dev, total, used, free
  1009. for part in psutil.disk_partitions(all=False):
  1010. usage = psutil.disk_usage(part.mountpoint)
  1011. _, total, used, free = df(part.mountpoint)
  1012. self.assertEqual(usage.total, total)
  1013. self.assertAlmostEqual(usage.free, free,
  1014. delta=TOLERANCE_DISK_USAGE)
  1015. self.assertAlmostEqual(usage.used, used,
  1016. delta=TOLERANCE_DISK_USAGE)
  1017. def test_zfs_fs(self):
  1018. # Test that ZFS partitions are returned.
  1019. with open("/proc/filesystems") as f:
  1020. data = f.read()
  1021. if 'zfs' in data:
  1022. for part in psutil.disk_partitions():
  1023. if part.fstype == 'zfs':
  1024. break
  1025. else:
  1026. raise self.fail("couldn't find any ZFS partition")
  1027. else:
  1028. # No ZFS partitions on this system. Let's fake one.
  1029. fake_file = io.StringIO(u("nodev\tzfs\n"))
  1030. with mock.patch('psutil._common.open',
  1031. return_value=fake_file, create=True) as m1:
  1032. with mock.patch(
  1033. 'psutil._pslinux.cext.disk_partitions',
  1034. return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
  1035. ret = psutil.disk_partitions()
  1036. assert m1.called
  1037. assert m2.called
  1038. assert ret
  1039. self.assertEqual(ret[0].fstype, 'zfs')
  1040. def test_emulate_realpath_fail(self):
  1041. # See: https://github.com/giampaolo/psutil/issues/1307
  1042. try:
  1043. with mock.patch('os.path.realpath',
  1044. return_value='/non/existent') as m:
  1045. with self.assertRaises(FileNotFoundError):
  1046. psutil.disk_partitions()
  1047. assert m.called
  1048. finally:
  1049. psutil.PROCFS_PATH = "/proc"
  1050. @unittest.skipIf(not LINUX, "LINUX only")
  1051. class TestSystemDiskIoCounters(PsutilTestCase):
  1052. def test_emulate_kernel_2_4(self):
  1053. # Tests /proc/diskstats parsing format for 2.4 kernels, see:
  1054. # https://github.com/giampaolo/psutil/issues/767
  1055. with mock_open_content(
  1056. '/proc/diskstats',
  1057. " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"):
  1058. with mock.patch('psutil._pslinux.is_storage_device',
  1059. return_value=True):
  1060. ret = psutil.disk_io_counters(nowrap=False)
  1061. self.assertEqual(ret.read_count, 1)
  1062. self.assertEqual(ret.read_merged_count, 2)
  1063. self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
  1064. self.assertEqual(ret.read_time, 4)
  1065. self.assertEqual(ret.write_count, 5)
  1066. self.assertEqual(ret.write_merged_count, 6)
  1067. self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
  1068. self.assertEqual(ret.write_time, 8)
  1069. self.assertEqual(ret.busy_time, 10)
  1070. def test_emulate_kernel_2_6_full(self):
  1071. # Tests /proc/diskstats parsing format for 2.6 kernels,
  1072. # lines reporting all metrics:
  1073. # https://github.com/giampaolo/psutil/issues/767
  1074. with mock_open_content(
  1075. '/proc/diskstats',
  1076. " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"):
  1077. with mock.patch('psutil._pslinux.is_storage_device',
  1078. return_value=True):
  1079. ret = psutil.disk_io_counters(nowrap=False)
  1080. self.assertEqual(ret.read_count, 1)
  1081. self.assertEqual(ret.read_merged_count, 2)
  1082. self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
  1083. self.assertEqual(ret.read_time, 4)
  1084. self.assertEqual(ret.write_count, 5)
  1085. self.assertEqual(ret.write_merged_count, 6)
  1086. self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
  1087. self.assertEqual(ret.write_time, 8)
  1088. self.assertEqual(ret.busy_time, 10)
  1089. def test_emulate_kernel_2_6_limited(self):
  1090. # Tests /proc/diskstats parsing format for 2.6 kernels,
  1091. # where one line of /proc/partitions return a limited
  1092. # amount of metrics when it bumps into a partition
  1093. # (instead of a disk). See:
  1094. # https://github.com/giampaolo/psutil/issues/767
  1095. with mock_open_content(
  1096. '/proc/diskstats',
  1097. " 3 1 hda 1 2 3 4"):
  1098. with mock.patch('psutil._pslinux.is_storage_device',
  1099. return_value=True):
  1100. ret = psutil.disk_io_counters(nowrap=False)
  1101. self.assertEqual(ret.read_count, 1)
  1102. self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
  1103. self.assertEqual(ret.write_count, 3)
  1104. self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
  1105. self.assertEqual(ret.read_merged_count, 0)
  1106. self.assertEqual(ret.read_time, 0)
  1107. self.assertEqual(ret.write_merged_count, 0)
  1108. self.assertEqual(ret.write_time, 0)
  1109. self.assertEqual(ret.busy_time, 0)
  1110. def test_emulate_include_partitions(self):
  1111. # Make sure that when perdisk=True disk partitions are returned,
  1112. # see:
  1113. # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
  1114. with mock_open_content(
  1115. '/proc/diskstats',
  1116. textwrap.dedent("""\
  1117. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1118. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1119. """)):
  1120. with mock.patch('psutil._pslinux.is_storage_device',
  1121. return_value=False):
  1122. ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
  1123. self.assertEqual(len(ret), 2)
  1124. self.assertEqual(ret['nvme0n1'].read_count, 1)
  1125. self.assertEqual(ret['nvme0n1p1'].read_count, 1)
  1126. self.assertEqual(ret['nvme0n1'].write_count, 5)
  1127. self.assertEqual(ret['nvme0n1p1'].write_count, 5)
  1128. def test_emulate_exclude_partitions(self):
  1129. # Make sure that when perdisk=False partitions (e.g. 'sda1',
  1130. # 'nvme0n1p1') are skipped and not included in the total count.
  1131. # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
  1132. with mock_open_content(
  1133. '/proc/diskstats',
  1134. textwrap.dedent("""\
  1135. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1136. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1137. """)):
  1138. with mock.patch('psutil._pslinux.is_storage_device',
  1139. return_value=False):
  1140. ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
  1141. self.assertIsNone(ret)
  1142. #
  1143. def is_storage_device(name):
  1144. return name == 'nvme0n1'
  1145. with mock_open_content(
  1146. '/proc/diskstats',
  1147. textwrap.dedent("""\
  1148. 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11
  1149. 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
  1150. """)):
  1151. with mock.patch('psutil._pslinux.is_storage_device',
  1152. create=True, side_effect=is_storage_device):
  1153. ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
  1154. self.assertEqual(ret.read_count, 1)
  1155. self.assertEqual(ret.write_count, 5)
  1156. def test_emulate_use_sysfs(self):
  1157. def exists(path):
  1158. if path == '/proc/diskstats':
  1159. return False
  1160. return True
  1161. wprocfs = psutil.disk_io_counters(perdisk=True)
  1162. with mock.patch('psutil._pslinux.os.path.exists',
  1163. create=True, side_effect=exists):
  1164. wsysfs = psutil.disk_io_counters(perdisk=True)
  1165. self.assertEqual(len(wprocfs), len(wsysfs))
  1166. def test_emulate_not_impl(self):
  1167. def exists(path):
  1168. return False
  1169. with mock.patch('psutil._pslinux.os.path.exists',
  1170. create=True, side_effect=exists):
  1171. self.assertRaises(NotImplementedError, psutil.disk_io_counters)
  1172. @unittest.skipIf(not LINUX, "LINUX only")
  1173. class TestRootFsDeviceFinder(PsutilTestCase):
  1174. def setUp(self):
  1175. dev = os.stat("/").st_dev
  1176. self.major = os.major(dev)
  1177. self.minor = os.minor(dev)
  1178. def test_call_methods(self):
  1179. finder = RootFsDeviceFinder()
  1180. if os.path.exists("/proc/partitions"):
  1181. finder.ask_proc_partitions()
  1182. else:
  1183. self.assertRaises(FileNotFoundError, finder.ask_proc_partitions)
  1184. if os.path.exists("/sys/dev/block/%s:%s/uevent" % (
  1185. self.major, self.minor)):
  1186. finder.ask_sys_dev_block()
  1187. else:
  1188. self.assertRaises(FileNotFoundError, finder.ask_sys_dev_block)
  1189. finder.ask_sys_class_block()
  1190. @unittest.skipIf(GITHUB_ACTIONS, "unsupported on GITHUB_ACTIONS")
  1191. def test_comparisons(self):
  1192. finder = RootFsDeviceFinder()
  1193. self.assertIsNotNone(finder.find())
  1194. a = b = c = None
  1195. if os.path.exists("/proc/partitions"):
  1196. a = finder.ask_proc_partitions()
  1197. if os.path.exists("/sys/dev/block/%s:%s/uevent" % (
  1198. self.major, self.minor)):
  1199. b = finder.ask_sys_class_block()
  1200. c = finder.ask_sys_dev_block()
  1201. base = a or b or c
  1202. if base and a:
  1203. self.assertEqual(base, a)
  1204. if base and b:
  1205. self.assertEqual(base, b)
  1206. if base and c:
  1207. self.assertEqual(base, c)
  1208. @unittest.skipIf(not which("findmnt"), "findmnt utility not available")
  1209. @unittest.skipIf(GITHUB_ACTIONS, "unsupported on GITHUB_ACTIONS")
  1210. def test_against_findmnt(self):
  1211. psutil_value = RootFsDeviceFinder().find()
  1212. findmnt_value = sh("findmnt -o SOURCE -rn /")
  1213. self.assertEqual(psutil_value, findmnt_value)
  1214. def test_disk_partitions_mocked(self):
  1215. with mock.patch(
  1216. 'psutil._pslinux.cext.disk_partitions',
  1217. return_value=[('/dev/root', '/', 'ext4', 'rw')]) as m:
  1218. part = psutil.disk_partitions()[0]
  1219. assert m.called
  1220. if not GITHUB_ACTIONS:
  1221. self.assertNotEqual(part.device, "/dev/root")
  1222. self.assertEqual(part.device, RootFsDeviceFinder().find())
  1223. else:
  1224. self.assertEqual(part.device, "/dev/root")
  1225. # =====================================================================
  1226. # --- misc
  1227. # =====================================================================
  1228. @unittest.skipIf(not LINUX, "LINUX only")
  1229. class TestMisc(PsutilTestCase):
  1230. def test_boot_time(self):
  1231. vmstat_value = vmstat('boot time')
  1232. psutil_value = psutil.boot_time()
  1233. self.assertEqual(int(vmstat_value), int(psutil_value))
  1234. def test_no_procfs_on_import(self):
  1235. my_procfs = self.get_testfn()
  1236. os.mkdir(my_procfs)
  1237. with open(os.path.join(my_procfs, 'stat'), 'w') as f:
  1238. f.write('cpu 0 0 0 0 0 0 0 0 0 0\n')
  1239. f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n')
  1240. f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n')
  1241. try:
  1242. orig_open = open
  1243. def open_mock(name, *args, **kwargs):
  1244. if name.startswith('/proc'):
  1245. raise IOError(errno.ENOENT, 'rejecting access for test')
  1246. return orig_open(name, *args, **kwargs)
  1247. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1248. with mock.patch(patch_point, side_effect=open_mock):
  1249. reload_module(psutil)
  1250. self.assertRaises(IOError, psutil.cpu_times)
  1251. self.assertRaises(IOError, psutil.cpu_times, percpu=True)
  1252. self.assertRaises(IOError, psutil.cpu_percent)
  1253. self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
  1254. self.assertRaises(IOError, psutil.cpu_times_percent)
  1255. self.assertRaises(
  1256. IOError, psutil.cpu_times_percent, percpu=True)
  1257. psutil.PROCFS_PATH = my_procfs
  1258. self.assertEqual(psutil.cpu_percent(), 0)
  1259. self.assertEqual(sum(psutil.cpu_times_percent()), 0)
  1260. # since we don't know the number of CPUs at import time,
  1261. # we awkwardly say there are none until the second call
  1262. per_cpu_percent = psutil.cpu_percent(percpu=True)
  1263. self.assertEqual(sum(per_cpu_percent), 0)
  1264. # ditto awkward length
  1265. per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
  1266. self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
  1267. # much user, very busy
  1268. with open(os.path.join(my_procfs, 'stat'), 'w') as f:
  1269. f.write('cpu 1 0 0 0 0 0 0 0 0 0\n')
  1270. f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n')
  1271. f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n')
  1272. self.assertNotEqual(psutil.cpu_percent(), 0)
  1273. self.assertNotEqual(
  1274. sum(psutil.cpu_percent(percpu=True)), 0)
  1275. self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
  1276. self.assertNotEqual(
  1277. sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
  1278. finally:
  1279. shutil.rmtree(my_procfs)
  1280. reload_module(psutil)
  1281. self.assertEqual(psutil.PROCFS_PATH, '/proc')
  1282. def test_cpu_steal_decrease(self):
  1283. # Test cumulative cpu stats decrease. We should ignore this.
  1284. # See issue #1210.
  1285. with mock_open_content(
  1286. "/proc/stat",
  1287. textwrap.dedent("""\
  1288. cpu 0 0 0 0 0 0 0 1 0 0
  1289. cpu0 0 0 0 0 0 0 0 1 0 0
  1290. cpu1 0 0 0 0 0 0 0 1 0 0
  1291. """).encode()) as m:
  1292. # first call to "percent" functions should read the new stat file
  1293. # and compare to the "real" file read at import time - so the
  1294. # values are meaningless
  1295. psutil.cpu_percent()
  1296. assert m.called
  1297. psutil.cpu_percent(percpu=True)
  1298. psutil.cpu_times_percent()
  1299. psutil.cpu_times_percent(percpu=True)
  1300. with mock_open_content(
  1301. "/proc/stat",
  1302. textwrap.dedent("""\
  1303. cpu 1 0 0 0 0 0 0 0 0 0
  1304. cpu0 1 0 0 0 0 0 0 0 0 0
  1305. cpu1 1 0 0 0 0 0 0 0 0 0
  1306. """).encode()) as m:
  1307. # Increase "user" while steal goes "backwards" to zero.
  1308. cpu_percent = psutil.cpu_percent()
  1309. assert m.called
  1310. cpu_percent_percpu = psutil.cpu_percent(percpu=True)
  1311. cpu_times_percent = psutil.cpu_times_percent()
  1312. cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
  1313. self.assertNotEqual(cpu_percent, 0)
  1314. self.assertNotEqual(sum(cpu_percent_percpu), 0)
  1315. self.assertNotEqual(sum(cpu_times_percent), 0)
  1316. self.assertNotEqual(sum(cpu_times_percent), 100.0)
  1317. self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
  1318. self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
  1319. self.assertEqual(cpu_times_percent.steal, 0)
  1320. self.assertNotEqual(cpu_times_percent.user, 0)
  1321. def test_boot_time_mocked(self):
  1322. with mock.patch('psutil._common.open', create=True) as m:
  1323. self.assertRaises(
  1324. RuntimeError,
  1325. psutil._pslinux.boot_time)
  1326. assert m.called
  1327. def test_users(self):
  1328. # Make sure the C extension converts ':0' and ':0.0' to
  1329. # 'localhost'.
  1330. for user in psutil.users():
  1331. self.assertNotIn(user.host, (":0", ":0.0"))
  1332. def test_procfs_path(self):
  1333. tdir = self.get_testfn()
  1334. os.mkdir(tdir)
  1335. try:
  1336. psutil.PROCFS_PATH = tdir
  1337. self.assertRaises(IOError, psutil.virtual_memory)
  1338. self.assertRaises(IOError, psutil.cpu_times)
  1339. self.assertRaises(IOError, psutil.cpu_times, percpu=True)
  1340. self.assertRaises(IOError, psutil.boot_time)
  1341. # self.assertRaises(IOError, psutil.pids)
  1342. self.assertRaises(IOError, psutil.net_connections)
  1343. self.assertRaises(IOError, psutil.net_io_counters)
  1344. self.assertRaises(IOError, psutil.net_if_stats)
  1345. # self.assertRaises(IOError, psutil.disk_io_counters)
  1346. self.assertRaises(IOError, psutil.disk_partitions)
  1347. self.assertRaises(psutil.NoSuchProcess, psutil.Process)
  1348. finally:
  1349. psutil.PROCFS_PATH = "/proc"
  1350. @retry_on_failure()
  1351. def test_issue_687(self):
  1352. # In case of thread ID:
  1353. # - pid_exists() is supposed to return False
  1354. # - Process(tid) is supposed to work
  1355. # - pids() should not return the TID
  1356. # See: https://github.com/giampaolo/psutil/issues/687
  1357. with ThreadTask():
  1358. p = psutil.Process()
  1359. threads = p.threads()
  1360. self.assertEqual(len(threads), 2)
  1361. tid = sorted(threads, key=lambda x: x.id)[1].id
  1362. self.assertNotEqual(p.pid, tid)
  1363. pt = psutil.Process(tid)
  1364. pt.as_dict()
  1365. self.assertNotIn(tid, psutil.pids())
  1366. def test_pid_exists_no_proc_status(self):
  1367. # Internally pid_exists relies on /proc/{pid}/status.
  1368. # Emulate a case where this file is empty in which case
  1369. # psutil is supposed to fall back on using pids().
  1370. with mock_open_content("/proc/%s/status", "") as m:
  1371. assert psutil.pid_exists(os.getpid())
  1372. assert m.called
  1373. # =====================================================================
  1374. # --- sensors
  1375. # =====================================================================
  1376. @unittest.skipIf(not LINUX, "LINUX only")
  1377. @unittest.skipIf(not HAS_BATTERY, "no battery")
  1378. class TestSensorsBattery(PsutilTestCase):
  1379. @unittest.skipIf(not which("acpi"), "acpi utility not available")
  1380. def test_percent(self):
  1381. out = sh("acpi -b")
  1382. acpi_value = int(out.split(",")[1].strip().replace('%', ''))
  1383. psutil_value = psutil.sensors_battery().percent
  1384. self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
  1385. def test_emulate_power_plugged(self):
  1386. # Pretend the AC power cable is connected.
  1387. def open_mock(name, *args, **kwargs):
  1388. if name.endswith(('AC0/online', 'AC/online')):
  1389. return io.BytesIO(b"1")
  1390. else:
  1391. return orig_open(name, *args, **kwargs)
  1392. orig_open = open
  1393. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1394. with mock.patch(patch_point, side_effect=open_mock) as m:
  1395. self.assertEqual(psutil.sensors_battery().power_plugged, True)
  1396. self.assertEqual(
  1397. psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
  1398. assert m.called
  1399. def test_emulate_power_plugged_2(self):
  1400. # Same as above but pretend /AC0/online does not exist in which
  1401. # case code relies on /status file.
  1402. def open_mock(name, *args, **kwargs):
  1403. if name.endswith(('AC0/online', 'AC/online')):
  1404. raise IOError(errno.ENOENT, "")
  1405. elif name.endswith("/status"):
  1406. return io.StringIO(u("charging"))
  1407. else:
  1408. return orig_open(name, *args, **kwargs)
  1409. orig_open = open
  1410. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1411. with mock.patch(patch_point, side_effect=open_mock) as m:
  1412. self.assertEqual(psutil.sensors_battery().power_plugged, True)
  1413. assert m.called
  1414. def test_emulate_power_not_plugged(self):
  1415. # Pretend the AC power cable is not connected.
  1416. def open_mock(name, *args, **kwargs):
  1417. if name.endswith(('AC0/online', 'AC/online')):
  1418. return io.BytesIO(b"0")
  1419. else:
  1420. return orig_open(name, *args, **kwargs)
  1421. orig_open = open
  1422. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1423. with mock.patch(patch_point, side_effect=open_mock) as m:
  1424. self.assertEqual(psutil.sensors_battery().power_plugged, False)
  1425. assert m.called
  1426. def test_emulate_power_not_plugged_2(self):
  1427. # Same as above but pretend /AC0/online does not exist in which
  1428. # case code relies on /status file.
  1429. def open_mock(name, *args, **kwargs):
  1430. if name.endswith(('AC0/online', 'AC/online')):
  1431. raise IOError(errno.ENOENT, "")
  1432. elif name.endswith("/status"):
  1433. return io.StringIO(u("discharging"))
  1434. else:
  1435. return orig_open(name, *args, **kwargs)
  1436. orig_open = open
  1437. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1438. with mock.patch(patch_point, side_effect=open_mock) as m:
  1439. self.assertEqual(psutil.sensors_battery().power_plugged, False)
  1440. assert m.called
  1441. def test_emulate_power_undetermined(self):
  1442. # Pretend we can't know whether the AC power cable not
  1443. # connected (assert fallback to False).
  1444. def open_mock(name, *args, **kwargs):
  1445. if name.startswith(
  1446. ('/sys/class/power_supply/AC0/online',
  1447. '/sys/class/power_supply/AC/online')
  1448. ):
  1449. raise IOError(errno.ENOENT, "")
  1450. elif name.startswith("/sys/class/power_supply/BAT0/status"):
  1451. return io.BytesIO(b"???")
  1452. else:
  1453. return orig_open(name, *args, **kwargs)
  1454. orig_open = open
  1455. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1456. with mock.patch(patch_point, side_effect=open_mock) as m:
  1457. self.assertIsNone(psutil.sensors_battery().power_plugged)
  1458. assert m.called
  1459. def test_emulate_energy_full_0(self):
  1460. # Emulate a case where energy_full files returns 0.
  1461. with mock_open_content(
  1462. "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
  1463. self.assertEqual(psutil.sensors_battery().percent, 0)
  1464. assert m.called
  1465. def test_emulate_energy_full_not_avail(self):
  1466. # Emulate a case where energy_full file does not exist.
  1467. # Expected fallback on /capacity.
  1468. with mock_open_exception(
  1469. "/sys/class/power_supply/BAT0/energy_full",
  1470. IOError(errno.ENOENT, "")):
  1471. with mock_open_exception(
  1472. "/sys/class/power_supply/BAT0/charge_full",
  1473. IOError(errno.ENOENT, "")):
  1474. with mock_open_content(
  1475. "/sys/class/power_supply/BAT0/capacity", b"88"):
  1476. self.assertEqual(psutil.sensors_battery().percent, 88)
  1477. def test_emulate_no_power(self):
  1478. # Emulate a case where /AC0/online file nor /BAT0/status exist.
  1479. with mock_open_exception(
  1480. "/sys/class/power_supply/AC/online",
  1481. IOError(errno.ENOENT, "")):
  1482. with mock_open_exception(
  1483. "/sys/class/power_supply/AC0/online",
  1484. IOError(errno.ENOENT, "")):
  1485. with mock_open_exception(
  1486. "/sys/class/power_supply/BAT0/status",
  1487. IOError(errno.ENOENT, "")):
  1488. self.assertIsNone(psutil.sensors_battery().power_plugged)
  1489. @unittest.skipIf(not LINUX, "LINUX only")
  1490. class TestSensorsBatteryEmulated(PsutilTestCase):
  1491. def test_it(self):
  1492. def open_mock(name, *args, **kwargs):
  1493. if name.endswith("/energy_now"):
  1494. return io.StringIO(u("60000000"))
  1495. elif name.endswith("/power_now"):
  1496. return io.StringIO(u("0"))
  1497. elif name.endswith("/energy_full"):
  1498. return io.StringIO(u("60000001"))
  1499. else:
  1500. return orig_open(name, *args, **kwargs)
  1501. orig_open = open
  1502. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1503. with mock.patch('os.listdir', return_value=["BAT0"]) as mlistdir:
  1504. with mock.patch(patch_point, side_effect=open_mock) as mopen:
  1505. self.assertIsNotNone(psutil.sensors_battery())
  1506. assert mlistdir.called
  1507. assert mopen.called
  1508. @unittest.skipIf(not LINUX, "LINUX only")
  1509. class TestSensorsTemperatures(PsutilTestCase):
  1510. def test_emulate_class_hwmon(self):
  1511. def open_mock(name, *args, **kwargs):
  1512. if name.endswith('/name'):
  1513. return io.StringIO(u("name"))
  1514. elif name.endswith('/temp1_label'):
  1515. return io.StringIO(u("label"))
  1516. elif name.endswith('/temp1_input'):
  1517. return io.BytesIO(b"30000")
  1518. elif name.endswith('/temp1_max'):
  1519. return io.BytesIO(b"40000")
  1520. elif name.endswith('/temp1_crit'):
  1521. return io.BytesIO(b"50000")
  1522. else:
  1523. return orig_open(name, *args, **kwargs)
  1524. orig_open = open
  1525. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1526. with mock.patch(patch_point, side_effect=open_mock):
  1527. # Test case with /sys/class/hwmon
  1528. with mock.patch('glob.glob',
  1529. return_value=['/sys/class/hwmon/hwmon0/temp1']):
  1530. temp = psutil.sensors_temperatures()['name'][0]
  1531. self.assertEqual(temp.label, 'label')
  1532. self.assertEqual(temp.current, 30.0)
  1533. self.assertEqual(temp.high, 40.0)
  1534. self.assertEqual(temp.critical, 50.0)
  1535. def test_emulate_class_thermal(self):
  1536. def open_mock(name, *args, **kwargs):
  1537. if name.endswith('0_temp'):
  1538. return io.BytesIO(b"50000")
  1539. elif name.endswith('temp'):
  1540. return io.BytesIO(b"30000")
  1541. elif name.endswith('0_type'):
  1542. return io.StringIO(u("critical"))
  1543. elif name.endswith('type'):
  1544. return io.StringIO(u("name"))
  1545. else:
  1546. return orig_open(name, *args, **kwargs)
  1547. def glob_mock(path):
  1548. if path == '/sys/class/hwmon/hwmon*/temp*_*': # noqa
  1549. return []
  1550. elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
  1551. return []
  1552. elif path == '/sys/class/thermal/thermal_zone*':
  1553. return ['/sys/class/thermal/thermal_zone0']
  1554. elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
  1555. return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
  1556. '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
  1557. return []
  1558. orig_open = open
  1559. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1560. with mock.patch(patch_point, side_effect=open_mock):
  1561. with mock.patch('glob.glob', create=True, side_effect=glob_mock):
  1562. temp = psutil.sensors_temperatures()['name'][0]
  1563. self.assertEqual(temp.label, '')
  1564. self.assertEqual(temp.current, 30.0)
  1565. self.assertEqual(temp.high, 50.0)
  1566. self.assertEqual(temp.critical, 50.0)
  1567. @unittest.skipIf(not LINUX, "LINUX only")
  1568. class TestSensorsFans(PsutilTestCase):
  1569. def test_emulate_data(self):
  1570. def open_mock(name, *args, **kwargs):
  1571. if name.endswith('/name'):
  1572. return io.StringIO(u("name"))
  1573. elif name.endswith('/fan1_label'):
  1574. return io.StringIO(u("label"))
  1575. elif name.endswith('/fan1_input'):
  1576. return io.StringIO(u("2000"))
  1577. else:
  1578. return orig_open(name, *args, **kwargs)
  1579. orig_open = open
  1580. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1581. with mock.patch(patch_point, side_effect=open_mock):
  1582. with mock.patch('glob.glob',
  1583. return_value=['/sys/class/hwmon/hwmon2/fan1']):
  1584. fan = psutil.sensors_fans()['name'][0]
  1585. self.assertEqual(fan.label, 'label')
  1586. self.assertEqual(fan.current, 2000)
  1587. # =====================================================================
  1588. # --- test process
  1589. # =====================================================================
  1590. @unittest.skipIf(not LINUX, "LINUX only")
  1591. class TestProcess(PsutilTestCase):
  1592. @retry_on_failure()
  1593. def test_parse_smaps_vs_memory_maps(self):
  1594. sproc = self.spawn_testproc()
  1595. uss, pss, swap = psutil._pslinux.Process(sproc.pid)._parse_smaps()
  1596. maps = psutil.Process(sproc.pid).memory_maps(grouped=False)
  1597. self.assertAlmostEqual(
  1598. uss, sum([x.private_dirty + x.private_clean for x in maps]),
  1599. delta=4096)
  1600. self.assertAlmostEqual(
  1601. pss, sum([x.pss for x in maps]), delta=4096)
  1602. self.assertAlmostEqual(
  1603. swap, sum([x.swap for x in maps]), delta=4096)
  1604. def test_parse_smaps_mocked(self):
  1605. # See: https://github.com/giampaolo/psutil/issues/1222
  1606. with mock_open_content(
  1607. "/proc/%s/smaps" % os.getpid(),
  1608. textwrap.dedent("""\
  1609. fffff0 r-xp 00000000 00:00 0 [vsyscall]
  1610. Size: 1 kB
  1611. Rss: 2 kB
  1612. Pss: 3 kB
  1613. Shared_Clean: 4 kB
  1614. Shared_Dirty: 5 kB
  1615. Private_Clean: 6 kB
  1616. Private_Dirty: 7 kB
  1617. Referenced: 8 kB
  1618. Anonymous: 9 kB
  1619. LazyFree: 10 kB
  1620. AnonHugePages: 11 kB
  1621. ShmemPmdMapped: 12 kB
  1622. Shared_Hugetlb: 13 kB
  1623. Private_Hugetlb: 14 kB
  1624. Swap: 15 kB
  1625. SwapPss: 16 kB
  1626. KernelPageSize: 17 kB
  1627. MMUPageSize: 18 kB
  1628. Locked: 19 kB
  1629. VmFlags: rd ex
  1630. """).encode()) as m:
  1631. p = psutil._pslinux.Process(os.getpid())
  1632. uss, pss, swap = p._parse_smaps()
  1633. assert m.called
  1634. self.assertEqual(uss, (6 + 7 + 14) * 1024)
  1635. self.assertEqual(pss, 3 * 1024)
  1636. self.assertEqual(swap, 15 * 1024)
  1637. # On PYPY file descriptors are not closed fast enough.
  1638. @unittest.skipIf(PYPY, "unreliable on PYPY")
  1639. def test_open_files_mode(self):
  1640. def get_test_file(fname):
  1641. p = psutil.Process()
  1642. giveup_at = time.time() + GLOBAL_TIMEOUT
  1643. while True:
  1644. for file in p.open_files():
  1645. if file.path == os.path.abspath(fname):
  1646. return file
  1647. elif time.time() > giveup_at:
  1648. break
  1649. raise RuntimeError("timeout looking for test file")
  1650. #
  1651. testfn = self.get_testfn()
  1652. with open(testfn, "w"):
  1653. self.assertEqual(get_test_file(testfn).mode, "w")
  1654. with open(testfn):
  1655. self.assertEqual(get_test_file(testfn).mode, "r")
  1656. with open(testfn, "a"):
  1657. self.assertEqual(get_test_file(testfn).mode, "a")
  1658. #
  1659. with open(testfn, "r+"):
  1660. self.assertEqual(get_test_file(testfn).mode, "r+")
  1661. with open(testfn, "w+"):
  1662. self.assertEqual(get_test_file(testfn).mode, "r+")
  1663. with open(testfn, "a+"):
  1664. self.assertEqual(get_test_file(testfn).mode, "a+")
  1665. # note: "x" bit is not supported
  1666. if PY3:
  1667. safe_rmpath(testfn)
  1668. with open(testfn, "x"):
  1669. self.assertEqual(get_test_file(testfn).mode, "w")
  1670. safe_rmpath(testfn)
  1671. with open(testfn, "x+"):
  1672. self.assertEqual(get_test_file(testfn).mode, "r+")
  1673. def test_open_files_file_gone(self):
  1674. # simulates a file which gets deleted during open_files()
  1675. # execution
  1676. p = psutil.Process()
  1677. files = p.open_files()
  1678. with open(self.get_testfn(), 'w'):
  1679. # give the kernel some time to see the new file
  1680. call_until(p.open_files, "len(ret) != %i" % len(files))
  1681. with mock.patch('psutil._pslinux.os.readlink',
  1682. side_effect=OSError(errno.ENOENT, "")) as m:
  1683. files = p.open_files()
  1684. assert not files
  1685. assert m.called
  1686. # also simulate the case where os.readlink() returns EINVAL
  1687. # in which case psutil is supposed to 'continue'
  1688. with mock.patch('psutil._pslinux.os.readlink',
  1689. side_effect=OSError(errno.EINVAL, "")) as m:
  1690. self.assertEqual(p.open_files(), [])
  1691. assert m.called
  1692. def test_open_files_fd_gone(self):
  1693. # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
  1694. # while iterating through fds.
  1695. # https://travis-ci.org/giampaolo/psutil/jobs/225694530
  1696. p = psutil.Process()
  1697. files = p.open_files()
  1698. with open(self.get_testfn(), 'w'):
  1699. # give the kernel some time to see the new file
  1700. call_until(p.open_files, "len(ret) != %i" % len(files))
  1701. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1702. with mock.patch(patch_point,
  1703. side_effect=IOError(errno.ENOENT, "")) as m:
  1704. files = p.open_files()
  1705. assert not files
  1706. assert m.called
  1707. def test_open_files_enametoolong(self):
  1708. # Simulate a case where /proc/{pid}/fd/{fd} symlink
  1709. # points to a file with full path longer than PATH_MAX, see:
  1710. # https://github.com/giampaolo/psutil/issues/1940
  1711. p = psutil.Process()
  1712. files = p.open_files()
  1713. with open(self.get_testfn(), 'w'):
  1714. # give the kernel some time to see the new file
  1715. call_until(p.open_files, "len(ret) != %i" % len(files))
  1716. patch_point = 'psutil._pslinux.os.readlink'
  1717. with mock.patch(patch_point,
  1718. side_effect=OSError(errno.ENAMETOOLONG, "")) as m:
  1719. with mock.patch("psutil._pslinux.debug"):
  1720. files = p.open_files()
  1721. assert not files
  1722. assert m.called
  1723. # --- mocked tests
  1724. def test_terminal_mocked(self):
  1725. with mock.patch('psutil._pslinux._psposix.get_terminal_map',
  1726. return_value={}) as m:
  1727. self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
  1728. assert m.called
  1729. # TODO: re-enable this test.
  1730. # def test_num_ctx_switches_mocked(self):
  1731. # with mock.patch('psutil._common.open', create=True) as m:
  1732. # self.assertRaises(
  1733. # NotImplementedError,
  1734. # psutil._pslinux.Process(os.getpid()).num_ctx_switches)
  1735. # assert m.called
  1736. def test_cmdline_mocked(self):
  1737. # see: https://github.com/giampaolo/psutil/issues/639
  1738. p = psutil.Process()
  1739. fake_file = io.StringIO(u('foo\x00bar\x00'))
  1740. with mock.patch('psutil._common.open',
  1741. return_value=fake_file, create=True) as m:
  1742. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1743. assert m.called
  1744. fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
  1745. with mock.patch('psutil._common.open',
  1746. return_value=fake_file, create=True) as m:
  1747. self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
  1748. assert m.called
  1749. def test_cmdline_spaces_mocked(self):
  1750. # see: https://github.com/giampaolo/psutil/issues/1179
  1751. p = psutil.Process()
  1752. fake_file = io.StringIO(u('foo bar '))
  1753. with mock.patch('psutil._common.open',
  1754. return_value=fake_file, create=True) as m:
  1755. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1756. assert m.called
  1757. fake_file = io.StringIO(u('foo bar '))
  1758. with mock.patch('psutil._common.open',
  1759. return_value=fake_file, create=True) as m:
  1760. self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
  1761. assert m.called
  1762. def test_cmdline_mixed_separators(self):
  1763. # https://github.com/giampaolo/psutil/issues/
  1764. # 1179#issuecomment-552984549
  1765. p = psutil.Process()
  1766. fake_file = io.StringIO(u('foo\x20bar\x00'))
  1767. with mock.patch('psutil._common.open',
  1768. return_value=fake_file, create=True) as m:
  1769. self.assertEqual(p.cmdline(), ['foo', 'bar'])
  1770. assert m.called
  1771. def test_readlink_path_deleted_mocked(self):
  1772. with mock.patch('psutil._pslinux.os.readlink',
  1773. return_value='/home/foo (deleted)'):
  1774. self.assertEqual(psutil.Process().exe(), "/home/foo")
  1775. self.assertEqual(psutil.Process().cwd(), "/home/foo")
  1776. def test_threads_mocked(self):
  1777. # Test the case where os.listdir() returns a file (thread)
  1778. # which no longer exists by the time we open() it (race
  1779. # condition). threads() is supposed to ignore that instead
  1780. # of raising NSP.
  1781. def open_mock_1(name, *args, **kwargs):
  1782. if name.startswith('/proc/%s/task' % os.getpid()):
  1783. raise IOError(errno.ENOENT, "")
  1784. else:
  1785. return orig_open(name, *args, **kwargs)
  1786. orig_open = open
  1787. patch_point = 'builtins.open' if PY3 else '__builtin__.open'
  1788. with mock.patch(patch_point, side_effect=open_mock_1) as m:
  1789. ret = psutil.Process().threads()
  1790. assert m.called
  1791. self.assertEqual(ret, [])
  1792. # ...but if it bumps into something != ENOENT we want an
  1793. # exception.
  1794. def open_mock_2(name, *args, **kwargs):
  1795. if name.startswith('/proc/%s/task' % os.getpid()):
  1796. raise IOError(errno.EPERM, "")
  1797. else:
  1798. return orig_open(name, *args, **kwargs)
  1799. with mock.patch(patch_point, side_effect=open_mock_2):
  1800. self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
  1801. def test_exe_mocked(self):
  1802. with mock.patch('psutil._pslinux.readlink',
  1803. side_effect=OSError(errno.ENOENT, "")) as m:
  1804. ret = psutil.Process().exe()
  1805. assert m.called
  1806. self.assertEqual(ret, "")
  1807. def test_issue_1014(self):
  1808. # Emulates a case where smaps file does not exist. In this case
  1809. # wrap_exception decorator should not raise NoSuchProcess.
  1810. with mock_open_exception(
  1811. '/proc/%s/smaps' % os.getpid(),
  1812. IOError(errno.ENOENT, "")) as m:
  1813. p = psutil.Process()
  1814. with self.assertRaises(FileNotFoundError):
  1815. p.memory_maps()
  1816. assert m.called
  1817. @unittest.skipIf(not HAS_RLIMIT, "not supported")
  1818. def test_rlimit_zombie(self):
  1819. # Emulate a case where rlimit() raises ENOSYS, which may
  1820. # happen in case of zombie process:
  1821. # https://travis-ci.org/giampaolo/psutil/jobs/51368273
  1822. with mock.patch("psutil._pslinux.prlimit",
  1823. side_effect=OSError(errno.ENOSYS, "")) as m1:
  1824. with mock.patch("psutil._pslinux.Process._is_zombie",
  1825. return_value=True) as m2:
  1826. p = psutil.Process()
  1827. p.name()
  1828. with self.assertRaises(psutil.ZombieProcess) as exc:
  1829. p.rlimit(psutil.RLIMIT_NOFILE)
  1830. assert m1.called
  1831. assert m2.called
  1832. self.assertEqual(exc.exception.pid, p.pid)
  1833. self.assertEqual(exc.exception.name, p.name())
  1834. def test_stat_file_parsing(self):
  1835. args = [
  1836. "0", # pid
  1837. "(cat)", # name
  1838. "Z", # status
  1839. "1", # ppid
  1840. "0", # pgrp
  1841. "0", # session
  1842. "0", # tty
  1843. "0", # tpgid
  1844. "0", # flags
  1845. "0", # minflt
  1846. "0", # cminflt
  1847. "0", # majflt
  1848. "0", # cmajflt
  1849. "2", # utime
  1850. "3", # stime
  1851. "4", # cutime
  1852. "5", # cstime
  1853. "0", # priority
  1854. "0", # nice
  1855. "0", # num_threads
  1856. "0", # itrealvalue
  1857. "6", # starttime
  1858. "0", # vsize
  1859. "0", # rss
  1860. "0", # rsslim
  1861. "0", # startcode
  1862. "0", # endcode
  1863. "0", # startstack
  1864. "0", # kstkesp
  1865. "0", # kstkeip
  1866. "0", # signal
  1867. "0", # blocked
  1868. "0", # sigignore
  1869. "0", # sigcatch
  1870. "0", # wchan
  1871. "0", # nswap
  1872. "0", # cnswap
  1873. "0", # exit_signal
  1874. "6", # processor
  1875. "0", # rt priority
  1876. "0", # policy
  1877. "7", # delayacct_blkio_ticks
  1878. ]
  1879. content = " ".join(args).encode()
  1880. with mock_open_content('/proc/%s/stat' % os.getpid(), content):
  1881. p = psutil.Process()
  1882. self.assertEqual(p.name(), 'cat')
  1883. self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
  1884. self.assertEqual(p.ppid(), 1)
  1885. self.assertEqual(
  1886. p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
  1887. cpu = p.cpu_times()
  1888. self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
  1889. self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
  1890. self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
  1891. self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
  1892. self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
  1893. self.assertEqual(p.cpu_num(), 6)
  1894. def test_status_file_parsing(self):
  1895. with mock_open_content(
  1896. '/proc/%s/status' % os.getpid(),
  1897. textwrap.dedent("""\
  1898. Uid:\t1000\t1001\t1002\t1003
  1899. Gid:\t1004\t1005\t1006\t1007
  1900. Threads:\t66
  1901. Cpus_allowed:\tf
  1902. Cpus_allowed_list:\t0-7
  1903. voluntary_ctxt_switches:\t12
  1904. nonvoluntary_ctxt_switches:\t13""").encode()):
  1905. p = psutil.Process()
  1906. self.assertEqual(p.num_ctx_switches().voluntary, 12)
  1907. self.assertEqual(p.num_ctx_switches().involuntary, 13)
  1908. self.assertEqual(p.num_threads(), 66)
  1909. uids = p.uids()
  1910. self.assertEqual(uids.real, 1000)
  1911. self.assertEqual(uids.effective, 1001)
  1912. self.assertEqual(uids.saved, 1002)
  1913. gids = p.gids()
  1914. self.assertEqual(gids.real, 1004)
  1915. self.assertEqual(gids.effective, 1005)
  1916. self.assertEqual(gids.saved, 1006)
  1917. self.assertEqual(p._proc._get_eligible_cpus(), list(range(8)))
  1918. def test_connections_enametoolong(self):
  1919. # Simulate a case where /proc/{pid}/fd/{fd} symlink points to
  1920. # a file with full path longer than PATH_MAX, see:
  1921. # https://github.com/giampaolo/psutil/issues/1940
  1922. with mock.patch('psutil._pslinux.os.readlink',
  1923. side_effect=OSError(errno.ENAMETOOLONG, "")) as m:
  1924. p = psutil.Process()
  1925. with mock.patch("psutil._pslinux.debug"):
  1926. assert not p.connections()
  1927. assert m.called
  1928. @unittest.skipIf(not LINUX, "LINUX only")
  1929. class TestProcessAgainstStatus(PsutilTestCase):
  1930. """/proc/pid/stat and /proc/pid/status have many values in common.
  1931. Whenever possible, psutil uses /proc/pid/stat (it's faster).
  1932. For all those cases we check that the value found in
  1933. /proc/pid/stat (by psutil) matches the one found in
  1934. /proc/pid/status.
  1935. """
  1936. @classmethod
  1937. def setUpClass(cls):
  1938. cls.proc = psutil.Process()
  1939. def read_status_file(self, linestart):
  1940. with psutil._psplatform.open_text(
  1941. '/proc/%s/status' % self.proc.pid) as f:
  1942. for line in f:
  1943. line = line.strip()
  1944. if line.startswith(linestart):
  1945. value = line.partition('\t')[2]
  1946. try:
  1947. return int(value)
  1948. except ValueError:
  1949. return value
  1950. raise ValueError("can't find %r" % linestart)
  1951. def test_name(self):
  1952. value = self.read_status_file("Name:")
  1953. self.assertEqual(self.proc.name(), value)
  1954. def test_status(self):
  1955. value = self.read_status_file("State:")
  1956. value = value[value.find('(') + 1:value.rfind(')')]
  1957. value = value.replace(' ', '-')
  1958. self.assertEqual(self.proc.status(), value)
  1959. def test_ppid(self):
  1960. value = self.read_status_file("PPid:")
  1961. self.assertEqual(self.proc.ppid(), value)
  1962. def test_num_threads(self):
  1963. value = self.read_status_file("Threads:")
  1964. self.assertEqual(self.proc.num_threads(), value)
  1965. def test_uids(self):
  1966. value = self.read_status_file("Uid:")
  1967. value = tuple(map(int, value.split()[1:4]))
  1968. self.assertEqual(self.proc.uids(), value)
  1969. def test_gids(self):
  1970. value = self.read_status_file("Gid:")
  1971. value = tuple(map(int, value.split()[1:4]))
  1972. self.assertEqual(self.proc.gids(), value)
  1973. @retry_on_failure()
  1974. def test_num_ctx_switches(self):
  1975. value = self.read_status_file("voluntary_ctxt_switches:")
  1976. self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
  1977. value = self.read_status_file("nonvoluntary_ctxt_switches:")
  1978. self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
  1979. def test_cpu_affinity(self):
  1980. value = self.read_status_file("Cpus_allowed_list:")
  1981. if '-' in str(value):
  1982. min_, max_ = map(int, value.split('-'))
  1983. self.assertEqual(
  1984. self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
  1985. def test_cpu_affinity_eligible_cpus(self):
  1986. value = self.read_status_file("Cpus_allowed_list:")
  1987. with mock.patch("psutil._pslinux.per_cpu_times") as m:
  1988. self.proc._proc._get_eligible_cpus()
  1989. if '-' in str(value):
  1990. assert not m.called
  1991. else:
  1992. assert m.called
  1993. # =====================================================================
  1994. # --- test utils
  1995. # =====================================================================
  1996. @unittest.skipIf(not LINUX, "LINUX only")
  1997. class TestUtils(PsutilTestCase):
  1998. def test_readlink(self):
  1999. with mock.patch("os.readlink", return_value="foo (deleted)") as m:
  2000. self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
  2001. assert m.called
  2002. if __name__ == '__main__':
  2003. from psutil.tests.runner import run_from_name
  2004. run_from_name(__file__)