123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210 |
- """ Caching facility for SymPy """
- from importlib import import_module
- from typing import Callable
- class _cache(list):
- """ List of cached functions """
- def print_cache(self):
- """print cache info"""
- for item in self:
- name = item.__name__
- myfunc = item
- while hasattr(myfunc, '__wrapped__'):
- if hasattr(myfunc, 'cache_info'):
- info = myfunc.cache_info()
- break
- else:
- myfunc = myfunc.__wrapped__
- else:
- info = None
- print(name, info)
- def clear_cache(self):
- """clear cache content"""
- for item in self:
- myfunc = item
- while hasattr(myfunc, '__wrapped__'):
- if hasattr(myfunc, 'cache_clear'):
- myfunc.cache_clear()
- break
- else:
- myfunc = myfunc.__wrapped__
- # global cache registry:
- CACHE = _cache()
- # make clear and print methods available
- print_cache = CACHE.print_cache
- clear_cache = CACHE.clear_cache
- from functools import lru_cache, wraps
- def __cacheit(maxsize):
- """caching decorator.
- important: the result of cached function must be *immutable*
- Examples
- ========
- >>> from sympy import cacheit
- >>> @cacheit
- ... def f(a, b):
- ... return a+b
- >>> @cacheit
- ... def f(a, b): # noqa: F811
- ... return [a, b] # <-- WRONG, returns mutable object
- to force cacheit to check returned results mutability and consistency,
- set environment variable SYMPY_USE_CACHE to 'debug'
- """
- def func_wrapper(func):
- cfunc = lru_cache(maxsize, typed=True)(func)
- @wraps(func)
- def wrapper(*args, **kwargs):
- try:
- retval = cfunc(*args, **kwargs)
- except TypeError as e:
- if not e.args or not e.args[0].startswith('unhashable type:'):
- raise
- retval = func(*args, **kwargs)
- return retval
- wrapper.cache_info = cfunc.cache_info
- wrapper.cache_clear = cfunc.cache_clear
- CACHE.append(wrapper)
- return wrapper
- return func_wrapper
- ########################################
- def __cacheit_nocache(func):
- return func
- def __cacheit_debug(maxsize):
- """cacheit + code to check cache consistency"""
- def func_wrapper(func):
- cfunc = __cacheit(maxsize)(func)
- @wraps(func)
- def wrapper(*args, **kw_args):
- # always call function itself and compare it with cached version
- r1 = func(*args, **kw_args)
- r2 = cfunc(*args, **kw_args)
- # try to see if the result is immutable
- #
- # this works because:
- #
- # hash([1,2,3]) -> raise TypeError
- # hash({'a':1, 'b':2}) -> raise TypeError
- # hash((1,[2,3])) -> raise TypeError
- #
- # hash((1,2,3)) -> just computes the hash
- hash(r1), hash(r2)
- # also see if returned values are the same
- if r1 != r2:
- raise RuntimeError("Returned values are not the same")
- return r1
- return wrapper
- return func_wrapper
- def _getenv(key, default=None):
- from os import getenv
- return getenv(key, default)
- # SYMPY_USE_CACHE=yes/no/debug
- USE_CACHE = _getenv('SYMPY_USE_CACHE', 'yes').lower()
- # SYMPY_CACHE_SIZE=some_integer/None
- # special cases :
- # SYMPY_CACHE_SIZE=0 -> No caching
- # SYMPY_CACHE_SIZE=None -> Unbounded caching
- scs = _getenv('SYMPY_CACHE_SIZE', '1000')
- if scs.lower() == 'none':
- SYMPY_CACHE_SIZE = None
- else:
- try:
- SYMPY_CACHE_SIZE = int(scs)
- except ValueError:
- raise RuntimeError(
- 'SYMPY_CACHE_SIZE must be a valid integer or None. ' + \
- 'Got: %s' % SYMPY_CACHE_SIZE)
- if USE_CACHE == 'no':
- cacheit = __cacheit_nocache
- elif USE_CACHE == 'yes':
- cacheit = __cacheit(SYMPY_CACHE_SIZE)
- elif USE_CACHE == 'debug':
- cacheit = __cacheit_debug(SYMPY_CACHE_SIZE) # a lot slower
- else:
- raise RuntimeError(
- 'unrecognized value for SYMPY_USE_CACHE: %s' % USE_CACHE)
- def cached_property(func):
- '''Decorator to cache property method'''
- attrname = '__' + func.__name__
- _cached_property_sentinel = object()
- def propfunc(self):
- val = getattr(self, attrname, _cached_property_sentinel)
- if val is _cached_property_sentinel:
- val = func(self)
- setattr(self, attrname, val)
- return val
- return property(propfunc)
- def lazy_function(module : str, name : str) -> Callable:
- """Create a lazy proxy for a function in a module.
- The module containing the function is not imported until the function is used.
- """
- func = None
- def _get_function():
- nonlocal func
- if func is None:
- func = getattr(import_module(module), name)
- return func
- # The metaclass is needed so that help() shows the docstring
- class LazyFunctionMeta(type):
- @property
- def __doc__(self):
- docstring = _get_function().__doc__
- docstring += f"\n\nNote: this is a {self.__class__.__name__} wrapper of '{module}.{name}'"
- return docstring
- class LazyFunction(metaclass=LazyFunctionMeta):
- def __call__(self, *args, **kwargs):
- # inline get of function for performance gh-23832
- nonlocal func
- if func is None:
- func = getattr(import_module(module), name)
- return func(*args, **kwargs)
- @property
- def __doc__(self):
- docstring = _get_function().__doc__
- docstring += f"\n\nNote: this is a {self.__class__.__name__} wrapper of '{module}.{name}'"
- return docstring
- def __str__(self):
- return _get_function().__str__()
- def __repr__(self):
- return f"<{__class__.__name__} object at 0x{id(self):x}>: wrapping '{module}.{name}'"
- return LazyFunction()
|