123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942 |
- # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
- # Use of this source code is governed by a BSD-style license that can be
- # found in the LICENSE file.
- """Common objects shared by __init__.py and _ps*.py modules."""
- # Note: this module is imported by setup.py so it should not import
- # psutil or third-party modules.
- from __future__ import division
- from __future__ import print_function
- import collections
- import contextlib
- import errno
- import functools
- import os
- import socket
- import stat
- import sys
- import threading
- import warnings
- from collections import namedtuple
- from socket import AF_INET
- from socket import SOCK_DGRAM
- from socket import SOCK_STREAM
- try:
- from socket import AF_INET6
- except ImportError:
- AF_INET6 = None
- try:
- from socket import AF_UNIX
- except ImportError:
- AF_UNIX = None
- # can't take it from _common.py as this script is imported by setup.py
- PY3 = sys.version_info[0] >= 3
- if PY3:
- import enum
- else:
- enum = None
- PSUTIL_DEBUG = bool(os.getenv('PSUTIL_DEBUG'))
- _DEFAULT = object()
- __all__ = [
- # OS constants
- 'FREEBSD', 'BSD', 'LINUX', 'NETBSD', 'OPENBSD', 'MACOS', 'OSX', 'POSIX',
- 'SUNOS', 'WINDOWS',
- # connection constants
- 'CONN_CLOSE', 'CONN_CLOSE_WAIT', 'CONN_CLOSING', 'CONN_ESTABLISHED',
- 'CONN_FIN_WAIT1', 'CONN_FIN_WAIT2', 'CONN_LAST_ACK', 'CONN_LISTEN',
- 'CONN_NONE', 'CONN_SYN_RECV', 'CONN_SYN_SENT', 'CONN_TIME_WAIT',
- # net constants
- 'NIC_DUPLEX_FULL', 'NIC_DUPLEX_HALF', 'NIC_DUPLEX_UNKNOWN',
- # process status constants
- 'STATUS_DEAD', 'STATUS_DISK_SLEEP', 'STATUS_IDLE', 'STATUS_LOCKED',
- 'STATUS_RUNNING', 'STATUS_SLEEPING', 'STATUS_STOPPED', 'STATUS_SUSPENDED',
- 'STATUS_TRACING_STOP', 'STATUS_WAITING', 'STATUS_WAKE_KILL',
- 'STATUS_WAKING', 'STATUS_ZOMBIE', 'STATUS_PARKED',
- # other constants
- 'ENCODING', 'ENCODING_ERRS', 'AF_INET6',
- # named tuples
- 'pconn', 'pcputimes', 'pctxsw', 'pgids', 'pio', 'pionice', 'popenfile',
- 'pthread', 'puids', 'sconn', 'scpustats', 'sdiskio', 'sdiskpart',
- 'sdiskusage', 'snetio', 'snicaddr', 'snicstats', 'sswap', 'suser',
- # utility functions
- 'conn_tmap', 'deprecated_method', 'isfile_strict', 'memoize',
- 'parse_environ_block', 'path_exists_strict', 'usage_percent',
- 'supports_ipv6', 'sockfam_to_enum', 'socktype_to_enum', "wrap_numbers",
- 'open_text', 'open_binary', 'cat', 'bcat',
- 'bytes2human', 'conn_to_ntuple', 'debug',
- # shell utils
- 'hilite', 'term_supports_colors', 'print_color',
- ]
- # ===================================================================
- # --- OS constants
- # ===================================================================
- POSIX = os.name == "posix"
- WINDOWS = os.name == "nt"
- LINUX = sys.platform.startswith("linux")
- MACOS = sys.platform.startswith("darwin")
- OSX = MACOS # deprecated alias
- FREEBSD = sys.platform.startswith(("freebsd", "midnightbsd"))
- OPENBSD = sys.platform.startswith("openbsd")
- NETBSD = sys.platform.startswith("netbsd")
- BSD = FREEBSD or OPENBSD or NETBSD
- SUNOS = sys.platform.startswith(("sunos", "solaris"))
- AIX = sys.platform.startswith("aix")
- # ===================================================================
- # --- API constants
- # ===================================================================
- # Process.status()
- STATUS_RUNNING = "running"
- STATUS_SLEEPING = "sleeping"
- STATUS_DISK_SLEEP = "disk-sleep"
- STATUS_STOPPED = "stopped"
- STATUS_TRACING_STOP = "tracing-stop"
- STATUS_ZOMBIE = "zombie"
- STATUS_DEAD = "dead"
- STATUS_WAKE_KILL = "wake-kill"
- STATUS_WAKING = "waking"
- STATUS_IDLE = "idle" # Linux, macOS, FreeBSD
- STATUS_LOCKED = "locked" # FreeBSD
- STATUS_WAITING = "waiting" # FreeBSD
- STATUS_SUSPENDED = "suspended" # NetBSD
- STATUS_PARKED = "parked" # Linux
- # Process.connections() and psutil.net_connections()
- CONN_ESTABLISHED = "ESTABLISHED"
- CONN_SYN_SENT = "SYN_SENT"
- CONN_SYN_RECV = "SYN_RECV"
- CONN_FIN_WAIT1 = "FIN_WAIT1"
- CONN_FIN_WAIT2 = "FIN_WAIT2"
- CONN_TIME_WAIT = "TIME_WAIT"
- CONN_CLOSE = "CLOSE"
- CONN_CLOSE_WAIT = "CLOSE_WAIT"
- CONN_LAST_ACK = "LAST_ACK"
- CONN_LISTEN = "LISTEN"
- CONN_CLOSING = "CLOSING"
- CONN_NONE = "NONE"
- # net_if_stats()
- if enum is None:
- NIC_DUPLEX_FULL = 2
- NIC_DUPLEX_HALF = 1
- NIC_DUPLEX_UNKNOWN = 0
- else:
- class NicDuplex(enum.IntEnum):
- NIC_DUPLEX_FULL = 2
- NIC_DUPLEX_HALF = 1
- NIC_DUPLEX_UNKNOWN = 0
- globals().update(NicDuplex.__members__)
- # sensors_battery()
- if enum is None:
- POWER_TIME_UNKNOWN = -1
- POWER_TIME_UNLIMITED = -2
- else:
- class BatteryTime(enum.IntEnum):
- POWER_TIME_UNKNOWN = -1
- POWER_TIME_UNLIMITED = -2
- globals().update(BatteryTime.__members__)
- # --- others
- ENCODING = sys.getfilesystemencoding()
- if not PY3:
- ENCODING_ERRS = "replace"
- else:
- try:
- ENCODING_ERRS = sys.getfilesystemencodeerrors() # py 3.6
- except AttributeError:
- ENCODING_ERRS = "surrogateescape" if POSIX else "replace"
- # ===================================================================
- # --- namedtuples
- # ===================================================================
- # --- for system functions
- # psutil.swap_memory()
- sswap = namedtuple('sswap', ['total', 'used', 'free', 'percent', 'sin',
- 'sout'])
- # psutil.disk_usage()
- sdiskusage = namedtuple('sdiskusage', ['total', 'used', 'free', 'percent'])
- # psutil.disk_io_counters()
- sdiskio = namedtuple('sdiskio', ['read_count', 'write_count',
- 'read_bytes', 'write_bytes',
- 'read_time', 'write_time'])
- # psutil.disk_partitions()
- sdiskpart = namedtuple('sdiskpart', ['device', 'mountpoint', 'fstype', 'opts',
- 'maxfile', 'maxpath'])
- # psutil.net_io_counters()
- snetio = namedtuple('snetio', ['bytes_sent', 'bytes_recv',
- 'packets_sent', 'packets_recv',
- 'errin', 'errout',
- 'dropin', 'dropout'])
- # psutil.users()
- suser = namedtuple('suser', ['name', 'terminal', 'host', 'started', 'pid'])
- # psutil.net_connections()
- sconn = namedtuple('sconn', ['fd', 'family', 'type', 'laddr', 'raddr',
- 'status', 'pid'])
- # psutil.net_if_addrs()
- snicaddr = namedtuple('snicaddr',
- ['family', 'address', 'netmask', 'broadcast', 'ptp'])
- # psutil.net_if_stats()
- snicstats = namedtuple('snicstats',
- ['isup', 'duplex', 'speed', 'mtu', 'flags'])
- # psutil.cpu_stats()
- scpustats = namedtuple(
- 'scpustats', ['ctx_switches', 'interrupts', 'soft_interrupts', 'syscalls'])
- # psutil.cpu_freq()
- scpufreq = namedtuple('scpufreq', ['current', 'min', 'max'])
- # psutil.sensors_temperatures()
- shwtemp = namedtuple(
- 'shwtemp', ['label', 'current', 'high', 'critical'])
- # psutil.sensors_battery()
- sbattery = namedtuple('sbattery', ['percent', 'secsleft', 'power_plugged'])
- # psutil.sensors_fans()
- sfan = namedtuple('sfan', ['label', 'current'])
- # --- for Process methods
- # psutil.Process.cpu_times()
- pcputimes = namedtuple('pcputimes',
- ['user', 'system', 'children_user', 'children_system'])
- # psutil.Process.open_files()
- popenfile = namedtuple('popenfile', ['path', 'fd'])
- # psutil.Process.threads()
- pthread = namedtuple('pthread', ['id', 'user_time', 'system_time'])
- # psutil.Process.uids()
- puids = namedtuple('puids', ['real', 'effective', 'saved'])
- # psutil.Process.gids()
- pgids = namedtuple('pgids', ['real', 'effective', 'saved'])
- # psutil.Process.io_counters()
- pio = namedtuple('pio', ['read_count', 'write_count',
- 'read_bytes', 'write_bytes'])
- # psutil.Process.ionice()
- pionice = namedtuple('pionice', ['ioclass', 'value'])
- # psutil.Process.ctx_switches()
- pctxsw = namedtuple('pctxsw', ['voluntary', 'involuntary'])
- # psutil.Process.connections()
- pconn = namedtuple('pconn', ['fd', 'family', 'type', 'laddr', 'raddr',
- 'status'])
- # psutil.connections() and psutil.Process.connections()
- addr = namedtuple('addr', ['ip', 'port'])
- # ===================================================================
- # --- Process.connections() 'kind' parameter mapping
- # ===================================================================
- conn_tmap = {
- "all": ([AF_INET, AF_INET6, AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]),
- "tcp": ([AF_INET, AF_INET6], [SOCK_STREAM]),
- "tcp4": ([AF_INET], [SOCK_STREAM]),
- "udp": ([AF_INET, AF_INET6], [SOCK_DGRAM]),
- "udp4": ([AF_INET], [SOCK_DGRAM]),
- "inet": ([AF_INET, AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
- "inet4": ([AF_INET], [SOCK_STREAM, SOCK_DGRAM]),
- "inet6": ([AF_INET6], [SOCK_STREAM, SOCK_DGRAM]),
- }
- if AF_INET6 is not None:
- conn_tmap.update({
- "tcp6": ([AF_INET6], [SOCK_STREAM]),
- "udp6": ([AF_INET6], [SOCK_DGRAM]),
- })
- if AF_UNIX is not None:
- conn_tmap.update({
- "unix": ([AF_UNIX], [SOCK_STREAM, SOCK_DGRAM]),
- })
- # =====================================================================
- # --- Exceptions
- # =====================================================================
- class Error(Exception):
- """Base exception class. All other psutil exceptions inherit
- from this one.
- """
- __module__ = 'psutil'
- def _infodict(self, attrs):
- info = collections.OrderedDict()
- for name in attrs:
- value = getattr(self, name, None)
- if value: # noqa
- info[name] = value
- elif name == "pid" and value == 0:
- info[name] = value
- return info
- def __str__(self):
- # invoked on `raise Error`
- info = self._infodict(("pid", "ppid", "name"))
- if info:
- details = "(%s)" % ", ".join(
- ["%s=%r" % (k, v) for k, v in info.items()])
- else:
- details = None
- return " ".join([x for x in (getattr(self, "msg", ""), details) if x])
- def __repr__(self):
- # invoked on `repr(Error)`
- info = self._infodict(("pid", "ppid", "name", "seconds", "msg"))
- details = ", ".join(["%s=%r" % (k, v) for k, v in info.items()])
- return "psutil.%s(%s)" % (self.__class__.__name__, details)
- class NoSuchProcess(Error):
- """Exception raised when a process with a certain PID doesn't
- or no longer exists.
- """
- __module__ = 'psutil'
- def __init__(self, pid, name=None, msg=None):
- Error.__init__(self)
- self.pid = pid
- self.name = name
- self.msg = msg or "process no longer exists"
- class ZombieProcess(NoSuchProcess):
- """Exception raised when querying a zombie process. This is
- raised on macOS, BSD and Solaris only, and not always: depending
- on the query the OS may be able to succeed anyway.
- On Linux all zombie processes are querable (hence this is never
- raised). Windows doesn't have zombie processes.
- """
- __module__ = 'psutil'
- def __init__(self, pid, name=None, ppid=None, msg=None):
- NoSuchProcess.__init__(self, pid, name, msg)
- self.ppid = ppid
- self.msg = msg or "PID still exists but it's a zombie"
- class AccessDenied(Error):
- """Exception raised when permission to perform an action is denied."""
- __module__ = 'psutil'
- def __init__(self, pid=None, name=None, msg=None):
- Error.__init__(self)
- self.pid = pid
- self.name = name
- self.msg = msg or ""
- class TimeoutExpired(Error):
- """Raised on Process.wait(timeout) if timeout expires and process
- is still alive.
- """
- __module__ = 'psutil'
- def __init__(self, seconds, pid=None, name=None):
- Error.__init__(self)
- self.seconds = seconds
- self.pid = pid
- self.name = name
- self.msg = "timeout after %s seconds" % seconds
- # ===================================================================
- # --- utils
- # ===================================================================
- # This should be in _compat.py rather than here, but does not work well
- # with setup.py importing this module via a sys.path trick.
- if PY3:
- if isinstance(__builtins__, dict): # cpython
- exec_ = __builtins__["exec"]
- else: # pypy
- exec_ = getattr(__builtins__, "exec") # noqa
- exec_("""def raise_from(value, from_value):
- try:
- raise value from from_value
- finally:
- value = None
- """)
- else:
- def raise_from(value, from_value):
- raise value
- def usage_percent(used, total, round_=None):
- """Calculate percentage usage of 'used' against 'total'."""
- try:
- ret = (float(used) / total) * 100
- except ZeroDivisionError:
- return 0.0
- else:
- if round_ is not None:
- ret = round(ret, round_)
- return ret
- def memoize(fun):
- """A simple memoize decorator for functions supporting (hashable)
- positional arguments.
- It also provides a cache_clear() function for clearing the cache:
- >>> @memoize
- ... def foo()
- ... return 1
- ...
- >>> foo()
- 1
- >>> foo.cache_clear()
- >>>
- It supports:
- - functions
- - classes (acts as a @singleton)
- - staticmethods
- - classmethods
- It does NOT support:
- - methods
- """
- @functools.wraps(fun)
- def wrapper(*args, **kwargs):
- key = (args, frozenset(sorted(kwargs.items())))
- try:
- return cache[key]
- except KeyError:
- try:
- ret = cache[key] = fun(*args, **kwargs)
- except Exception as err:
- raise raise_from(err, None)
- return ret
- def cache_clear():
- """Clear cache."""
- cache.clear()
- cache = {}
- wrapper.cache_clear = cache_clear
- return wrapper
- def memoize_when_activated(fun):
- """A memoize decorator which is disabled by default. It can be
- activated and deactivated on request.
- For efficiency reasons it can be used only against class methods
- accepting no arguments.
- >>> class Foo:
- ... @memoize
- ... def foo()
- ... print(1)
- ...
- >>> f = Foo()
- >>> # deactivated (default)
- >>> foo()
- 1
- >>> foo()
- 1
- >>>
- >>> # activated
- >>> foo.cache_activate(self)
- >>> foo()
- 1
- >>> foo()
- >>> foo()
- >>>
- """
- @functools.wraps(fun)
- def wrapper(self):
- try:
- # case 1: we previously entered oneshot() ctx
- ret = self._cache[fun]
- except AttributeError:
- # case 2: we never entered oneshot() ctx
- try:
- return fun(self)
- except Exception as err:
- raise raise_from(err, None)
- except KeyError:
- # case 3: we entered oneshot() ctx but there's no cache
- # for this entry yet
- try:
- ret = fun(self)
- except Exception as err:
- raise raise_from(err, None)
- try:
- self._cache[fun] = ret
- except AttributeError:
- # multi-threading race condition, see:
- # https://github.com/giampaolo/psutil/issues/1948
- pass
- return ret
- def cache_activate(proc):
- """Activate cache. Expects a Process instance. Cache will be
- stored as a "_cache" instance attribute.
- """
- proc._cache = {}
- def cache_deactivate(proc):
- """Deactivate and clear cache."""
- try:
- del proc._cache
- except AttributeError:
- pass
- wrapper.cache_activate = cache_activate
- wrapper.cache_deactivate = cache_deactivate
- return wrapper
- def isfile_strict(path):
- """Same as os.path.isfile() but does not swallow EACCES / EPERM
- exceptions, see:
- http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
- """
- try:
- st = os.stat(path)
- except OSError as err:
- if err.errno in (errno.EPERM, errno.EACCES):
- raise
- return False
- else:
- return stat.S_ISREG(st.st_mode)
- def path_exists_strict(path):
- """Same as os.path.exists() but does not swallow EACCES / EPERM
- exceptions. See:
- http://mail.python.org/pipermail/python-dev/2012-June/120787.html.
- """
- try:
- os.stat(path)
- except OSError as err:
- if err.errno in (errno.EPERM, errno.EACCES):
- raise
- return False
- else:
- return True
- @memoize
- def supports_ipv6():
- """Return True if IPv6 is supported on this platform."""
- if not socket.has_ipv6 or AF_INET6 is None:
- return False
- try:
- sock = socket.socket(AF_INET6, socket.SOCK_STREAM)
- with contextlib.closing(sock):
- sock.bind(("::1", 0))
- return True
- except socket.error:
- return False
- def parse_environ_block(data):
- """Parse a C environ block of environment variables into a dictionary."""
- # The block is usually raw data from the target process. It might contain
- # trailing garbage and lines that do not look like assignments.
- ret = {}
- pos = 0
- # localize global variable to speed up access.
- WINDOWS_ = WINDOWS
- while True:
- next_pos = data.find("\0", pos)
- # nul byte at the beginning or double nul byte means finish
- if next_pos <= pos:
- break
- # there might not be an equals sign
- equal_pos = data.find("=", pos, next_pos)
- if equal_pos > pos:
- key = data[pos:equal_pos]
- value = data[equal_pos + 1:next_pos]
- # Windows expects environment variables to be uppercase only
- if WINDOWS_:
- key = key.upper()
- ret[key] = value
- pos = next_pos + 1
- return ret
- def sockfam_to_enum(num):
- """Convert a numeric socket family value to an IntEnum member.
- If it's not a known member, return the numeric value itself.
- """
- if enum is None:
- return num
- else: # pragma: no cover
- try:
- return socket.AddressFamily(num)
- except ValueError:
- return num
- def socktype_to_enum(num):
- """Convert a numeric socket type value to an IntEnum member.
- If it's not a known member, return the numeric value itself.
- """
- if enum is None:
- return num
- else: # pragma: no cover
- try:
- return socket.SocketKind(num)
- except ValueError:
- return num
- def conn_to_ntuple(fd, fam, type_, laddr, raddr, status, status_map, pid=None):
- """Convert a raw connection tuple to a proper ntuple."""
- if fam in (socket.AF_INET, AF_INET6):
- if laddr:
- laddr = addr(*laddr)
- if raddr:
- raddr = addr(*raddr)
- if type_ == socket.SOCK_STREAM and fam in (AF_INET, AF_INET6):
- status = status_map.get(status, CONN_NONE)
- else:
- status = CONN_NONE # ignore whatever C returned to us
- fam = sockfam_to_enum(fam)
- type_ = socktype_to_enum(type_)
- if pid is None:
- return pconn(fd, fam, type_, laddr, raddr, status)
- else:
- return sconn(fd, fam, type_, laddr, raddr, status, pid)
- def deprecated_method(replacement):
- """A decorator which can be used to mark a method as deprecated
- 'replcement' is the method name which will be called instead.
- """
- def outer(fun):
- msg = "%s() is deprecated and will be removed; use %s() instead" % (
- fun.__name__, replacement)
- if fun.__doc__ is None:
- fun.__doc__ = msg
- @functools.wraps(fun)
- def inner(self, *args, **kwargs):
- warnings.warn(msg, category=DeprecationWarning, stacklevel=2)
- return getattr(self, replacement)(*args, **kwargs)
- return inner
- return outer
- class _WrapNumbers:
- """Watches numbers so that they don't overflow and wrap
- (reset to zero).
- """
- def __init__(self):
- self.lock = threading.Lock()
- self.cache = {}
- self.reminders = {}
- self.reminder_keys = {}
- def _add_dict(self, input_dict, name):
- assert name not in self.cache
- assert name not in self.reminders
- assert name not in self.reminder_keys
- self.cache[name] = input_dict
- self.reminders[name] = collections.defaultdict(int)
- self.reminder_keys[name] = collections.defaultdict(set)
- def _remove_dead_reminders(self, input_dict, name):
- """In case the number of keys changed between calls (e.g. a
- disk disappears) this removes the entry from self.reminders.
- """
- old_dict = self.cache[name]
- gone_keys = set(old_dict.keys()) - set(input_dict.keys())
- for gone_key in gone_keys:
- for remkey in self.reminder_keys[name][gone_key]:
- del self.reminders[name][remkey]
- del self.reminder_keys[name][gone_key]
- def run(self, input_dict, name):
- """Cache dict and sum numbers which overflow and wrap.
- Return an updated copy of `input_dict`.
- """
- if name not in self.cache:
- # This was the first call.
- self._add_dict(input_dict, name)
- return input_dict
- self._remove_dead_reminders(input_dict, name)
- old_dict = self.cache[name]
- new_dict = {}
- for key in input_dict:
- input_tuple = input_dict[key]
- try:
- old_tuple = old_dict[key]
- except KeyError:
- # The input dict has a new key (e.g. a new disk or NIC)
- # which didn't exist in the previous call.
- new_dict[key] = input_tuple
- continue
- bits = []
- for i in range(len(input_tuple)):
- input_value = input_tuple[i]
- old_value = old_tuple[i]
- remkey = (key, i)
- if input_value < old_value:
- # it wrapped!
- self.reminders[name][remkey] += old_value
- self.reminder_keys[name][key].add(remkey)
- bits.append(input_value + self.reminders[name][remkey])
- new_dict[key] = tuple(bits)
- self.cache[name] = input_dict
- return new_dict
- def cache_clear(self, name=None):
- """Clear the internal cache, optionally only for function 'name'."""
- with self.lock:
- if name is None:
- self.cache.clear()
- self.reminders.clear()
- self.reminder_keys.clear()
- else:
- self.cache.pop(name, None)
- self.reminders.pop(name, None)
- self.reminder_keys.pop(name, None)
- def cache_info(self):
- """Return internal cache dicts as a tuple of 3 elements."""
- with self.lock:
- return (self.cache, self.reminders, self.reminder_keys)
- def wrap_numbers(input_dict, name):
- """Given an `input_dict` and a function `name`, adjust the numbers
- which "wrap" (restart from zero) across different calls by adding
- "old value" to "new value" and return an updated dict.
- """
- with _wn.lock:
- return _wn.run(input_dict, name)
- _wn = _WrapNumbers()
- wrap_numbers.cache_clear = _wn.cache_clear
- wrap_numbers.cache_info = _wn.cache_info
- # The read buffer size for open() builtin. This (also) dictates how
- # much data we read(2) when iterating over file lines as in:
- # >>> with open(file) as f:
- # ... for line in f:
- # ... ...
- # Default per-line buffer size for binary files is 1K. For text files
- # is 8K. We use a bigger buffer (32K) in order to have more consistent
- # results when reading /proc pseudo files on Linux, see:
- # https://github.com/giampaolo/psutil/issues/2050
- # On Python 2 this also speeds up the reading of big files:
- # (namely /proc/{pid}/smaps and /proc/net/*):
- # https://github.com/giampaolo/psutil/issues/708
- FILE_READ_BUFFER_SIZE = 32 * 1024
- def open_binary(fname):
- return open(fname, "rb", buffering=FILE_READ_BUFFER_SIZE)
- def open_text(fname):
- """On Python 3 opens a file in text mode by using fs encoding and
- a proper en/decoding errors handler.
- On Python 2 this is just an alias for open(name, 'rt').
- """
- if not PY3:
- return open(fname, buffering=FILE_READ_BUFFER_SIZE)
- # See:
- # https://github.com/giampaolo/psutil/issues/675
- # https://github.com/giampaolo/psutil/pull/733
- fobj = open(fname, buffering=FILE_READ_BUFFER_SIZE,
- encoding=ENCODING, errors=ENCODING_ERRS)
- try:
- # Dictates per-line read(2) buffer size. Defaults is 8k. See:
- # https://github.com/giampaolo/psutil/issues/2050#issuecomment-1013387546
- fobj._CHUNK_SIZE = FILE_READ_BUFFER_SIZE
- except AttributeError:
- pass
- except Exception:
- fobj.close()
- raise
- return fobj
- def cat(fname, fallback=_DEFAULT, _open=open_text):
- """Read entire file content and return it as a string. File is
- opened in text mode. If specified, `fallback` is the value
- returned in case of error, either if the file does not exist or
- it can't be read().
- """
- if fallback is _DEFAULT:
- with _open(fname) as f:
- return f.read()
- else:
- try:
- with _open(fname) as f:
- return f.read()
- except (IOError, OSError):
- return fallback
- def bcat(fname, fallback=_DEFAULT):
- """Same as above but opens file in binary mode."""
- return cat(fname, fallback=fallback, _open=open_binary)
- def bytes2human(n, format="%(value).1f%(symbol)s"):
- """Used by various scripts. See: http://goo.gl/zeJZl.
- >>> bytes2human(10000)
- '9.8K'
- >>> bytes2human(100001221)
- '95.4M'
- """
- symbols = ('B', 'K', 'M', 'G', 'T', 'P', 'E', 'Z', 'Y')
- prefix = {}
- for i, s in enumerate(symbols[1:]):
- prefix[s] = 1 << (i + 1) * 10
- for symbol in reversed(symbols[1:]):
- if abs(n) >= prefix[symbol]:
- value = float(n) / prefix[symbol]
- return format % locals()
- return format % dict(symbol=symbols[0], value=n)
- def get_procfs_path():
- """Return updated psutil.PROCFS_PATH constant."""
- return sys.modules['psutil'].PROCFS_PATH
- if PY3:
- def decode(s):
- return s.decode(encoding=ENCODING, errors=ENCODING_ERRS)
- else:
- def decode(s):
- return s
- # =====================================================================
- # --- shell utils
- # =====================================================================
- @memoize
- def term_supports_colors(file=sys.stdout): # pragma: no cover
- if os.name == 'nt':
- return True
- try:
- import curses
- assert file.isatty()
- curses.setupterm()
- assert curses.tigetnum("colors") > 0
- except Exception:
- return False
- else:
- return True
- def hilite(s, color=None, bold=False): # pragma: no cover
- """Return an highlighted version of 'string'."""
- if not term_supports_colors():
- return s
- attr = []
- colors = dict(green='32', red='91', brown='33', yellow='93', blue='34',
- violet='35', lightblue='36', grey='37', darkgrey='30')
- colors[None] = '29'
- try:
- color = colors[color]
- except KeyError:
- raise ValueError("invalid color %r; choose between %s" % (
- list(colors.keys())))
- attr.append(color)
- if bold:
- attr.append('1')
- return '\x1b[%sm%s\x1b[0m' % (';'.join(attr), s)
- def print_color(
- s, color=None, bold=False, file=sys.stdout): # pragma: no cover
- """Print a colorized version of string."""
- if not term_supports_colors():
- print(s, file=file) # NOQA
- elif POSIX:
- print(hilite(s, color, bold), file=file) # NOQA
- else:
- import ctypes
- DEFAULT_COLOR = 7
- GetStdHandle = ctypes.windll.Kernel32.GetStdHandle
- SetConsoleTextAttribute = \
- ctypes.windll.Kernel32.SetConsoleTextAttribute
- colors = dict(green=2, red=4, brown=6, yellow=6)
- colors[None] = DEFAULT_COLOR
- try:
- color = colors[color]
- except KeyError:
- raise ValueError("invalid color %r; choose between %r" % (
- color, list(colors.keys())))
- if bold and color <= 7:
- color += 8
- handle_id = -12 if file is sys.stderr else -11
- GetStdHandle.restype = ctypes.c_ulong
- handle = GetStdHandle(handle_id)
- SetConsoleTextAttribute(handle, color)
- try:
- print(s, file=file) # NOQA
- finally:
- SetConsoleTextAttribute(handle, DEFAULT_COLOR)
- def debug(msg):
- """If PSUTIL_DEBUG env var is set, print a debug message to stderr."""
- if PSUTIL_DEBUG:
- import inspect
- fname, lineno, _, lines, index = inspect.getframeinfo(
- inspect.currentframe().f_back)
- if isinstance(msg, Exception):
- if isinstance(msg, (OSError, IOError, EnvironmentError)):
- # ...because str(exc) may contain info about the file name
- msg = "ignoring %s" % msg
- else:
- msg = "ignoring %r" % msg
- print("psutil-debug [%s:%s]> %s" % (fname, lineno, msg), # NOQA
- file=sys.stderr)
|