123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273 |
- import dataclasses
- import itertools
- import sympy # type: ignore[import]
- import operator
- import math
- import logging
- import torch
- from typing import Union
- log = logging.getLogger(__name__)
- @dataclasses.dataclass(frozen=True)
- class ValueRanges:
- lower: Union[sympy.Expr, sympy.Number, int, float, bool]
- upper: Union[sympy.Expr, sympy.Number, int, float, bool]
- def __contains__(self, x):
- # TODO This needs to be generalised if lower/upper are sympy.Expr
- assert not isinstance(x, sympy.Expr)
- return self.lower <= x <= self.upper
- @classmethod
- def wrap(cls, arg):
- if isinstance(arg, ValueRanges):
- return arg
- assert isinstance(arg, (int, float, bool))
- return ValueRanges(arg, arg)
- @classmethod
- def increasing_map(cls, x, fn):
- """map lower and upper bound with fn"""
- x = cls.wrap(x)
- return ValueRanges(fn(x.lower), fn(x.upper))
- @classmethod
- def decreasing_map(cls, x, fn):
- """map lower bound to upper bound and upper bound to lower bound"""
- x = cls.wrap(x)
- return ValueRanges(fn(x.upper), fn(x.lower))
- @classmethod
- def monotone_map(cls, x, fn):
- """check the max and min of computed upper and lower bound for the output"""
- x = cls.wrap(x)
- l = fn(x.lower)
- u = fn(x.upper)
- return ValueRanges(min(l, u), max(l, u))
- @classmethod
- def convex_min_zero_map(cls, x, fn):
- """the max is at one of the ends"""
- x = ValueRanges.wrap(x)
- if 0 in x:
- return ValueRanges(0, max(fn(x.lower), fn(x.upper)))
- else:
- return cls.monotone_map(x, fn)
- @classmethod
- def coordinatewise_increasing_map(cls, x, y, fn):
- """map upper and lower bounds accessing corresponding values of inputs"""
- x, y = cls.wrap(x), cls.wrap(y)
- return ValueRanges(
- fn(x.lower, y.lower),
- fn(x.upper, y.upper),
- )
- @classmethod
- def coordinatewise_monotone_map(cls, x, y, fn):
- """compute the product of all lower and upper bounds and take min and max"""
- x, y = cls.wrap(x), cls.wrap(y)
- products = [
- fn(a, b)
- for a, b in itertools.product([x.lower, x.upper], [y.lower, y.upper])
- ]
- return ValueRanges(min(products), max(products))
- class ValueRangeAnalysis:
- def __init__(self):
- self.name = "ValueRangeAnalysis"
- boolean_operators = (
- "eq",
- "ne",
- "lt",
- "gt",
- "le",
- "ge",
- "and_",
- "or_",
- "xor",
- "logical_and",
- "logical_or",
- "logical_not",
- )
- for op in boolean_operators:
- setattr(self, op, self.bool_handler)
- @staticmethod
- def bool_handler(*args, **kwargs):
- # just assuming bools can have both values
- return ValueRanges(sympy.false, sympy.true) # type: ignore[arg-type]
- @staticmethod
- def default_handler(*args, **kwargs):
- # many ops are unlikely to show up in optimizable indexing compute,
- # so we dont have full coverage
- return ValueRanges(-math.inf, math.inf)
- def load(self, name: str, index: sympy.Expr):
- return ValueRanges(-math.inf, math.inf)
- def store(self, name, index, value, mode=None):
- return
- def reduction(self, name, dtype, src_dtype, reduction_type, index, value):
- return ValueRanges(-math.inf, math.inf)
- def index_expr(self, index, dtype):
- assert isinstance(index, ValueRanges)
- return index
- @staticmethod
- def to_dtype(x, dtype: torch.dtype):
- def is_bool(val):
- return isinstance(val, bool) or (
- hasattr(val, "is_Boolean") and val.is_Boolean
- )
- x = ValueRanges.wrap(x)
- low, up = x.lower, x.upper
- if is_bool(low):
- assert is_bool(up)
- if dtype.is_floating_point:
- return ValueRanges(sympy.Float(0.0), sympy.Float(1.0))
- else:
- return ValueRanges(sympy.Integer(0), sympy.Integer(1))
- return ValueRanges.wrap(x)
- @staticmethod
- def constant(value, dtype):
- # using nan makes subsequent computation throw, and for the purposes of optimization
- # returning -math.inf - math.inf is equivalent to giving up
- if math.isnan(value):
- return ValueRanges(-math.inf, math.inf)
- if isinstance(value, int):
- return ValueRanges(sympy.Integer(value), sympy.Integer(value))
- else:
- return ValueRanges(sympy.Float(value), sympy.Float(value))
- @staticmethod
- def reciprocal(x):
- x = ValueRanges.wrap(x)
- if 0 in x:
- return ValueRanges(-math.inf, math.inf)
- else:
- return ValueRanges.decreasing_map(x, lambda y: 1 / y)
- @staticmethod
- def square(x):
- return ValueRanges.convex_min_zero_map(x, lambda y: y * y)
- @staticmethod
- def abs(x):
- return ValueRanges.convex_min_zero_map(x, abs)
- @staticmethod
- def neg(x):
- return ValueRanges.decreasing_map(x, operator.neg)
- @staticmethod
- def truediv(a, b):
- b = ValueRanges.wrap(b)
- if 0 in b:
- return ValueRanges(-math.inf, math.inf)
- else:
- return ValueRangeAnalysis.mul(a, ValueRanges(1 / b.upper, 1 / b.lower))
- @staticmethod
- def div(a, b):
- # We think of this as floor(a / b)
- out = ValueRangeAnalysis.truediv(a, b)
- return ValueRangeAnalysis.floor(out)
- @staticmethod
- def add(a, b):
- return ValueRanges.coordinatewise_increasing_map(a, b, operator.add)
- @staticmethod
- def mul(a, b):
- return ValueRanges.coordinatewise_monotone_map(a, b, operator.mul)
- @staticmethod
- def sub(a, b):
- b = ValueRanges.wrap(b)
- return ValueRangeAnalysis.add(a, ValueRanges(-b.upper, -b.lower))
- @staticmethod
- def exp(x):
- return ValueRanges.increasing_map(x, sympy.functions.elementary.exponential.exp)
- @staticmethod
- def log(x):
- return ValueRanges.increasing_map(
- x, lambda y: -math.inf if y <= 0 else sympy.log(y)
- )
- @staticmethod
- def sqrt(x):
- return ValueRanges.increasing_map(x, sympy.sqrt)
- @staticmethod
- def pow(a, b):
- def is_integer(val):
- return (
- isinstance(val, int)
- or (isinstance(val, float) and val == int(val))
- or (hasattr(val, "is_integer") and val.is_integer)
- )
- a = ValueRanges.wrap(a)
- b = ValueRanges.wrap(b)
- if a.lower < 0 and not is_integer(b.lower):
- # The function is not defined
- return ValueRanges(-math.inf, math.inf)
- elif 0 in a and b.lower <= 0:
- return ValueRanges(-math.inf, math.inf)
- return ValueRanges.coordinatewise_monotone_map(a, b, operator.pow)
- @staticmethod
- def minimum(a, b):
- return ValueRanges.coordinatewise_increasing_map(a, b, min)
- @staticmethod
- def maximum(a, b):
- return ValueRanges.coordinatewise_increasing_map(a, b, max)
- @staticmethod
- def where(a, b, c):
- b = ValueRanges.wrap(b)
- c = ValueRanges.wrap(c)
- return ValueRanges(min(b.lower, c.lower), max(b.upper, c.upper))
- @staticmethod
- def floor(x):
- return ValueRangeAnalysis.floor_ceil(
- x, sympy.functions.elementary.integers.floor
- )
- @staticmethod
- def ceil(x):
- return ValueRangeAnalysis.floor_ceil(
- x, sympy.functions.elementary.integers.ceiling
- )
- @staticmethod
- def floor_ceil(x, fn_int):
- def is_integer(val):
- return isinstance(val, int) or (
- hasattr(val, "is_integer") and val.is_integer
- )
- if is_integer(x):
- fn = fn_int
- else:
- def fn(x):
- return sympy.Float(fn_int(x))
- return ValueRanges.increasing_map(x, fn)
- def __getattr__(self, name):
- log.warning(f"unhandled ValueRange op {name}")
- return self.default_handler
|