registration.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. """Module for handling symbolic function registration."""
  2. import warnings
  3. from typing import (
  4. Callable,
  5. Collection,
  6. Dict,
  7. Generic,
  8. Optional,
  9. Sequence,
  10. Set,
  11. TypeVar,
  12. Union,
  13. )
  14. from torch.onnx import _constants, errors
  15. from torch.onnx._internal import _beartype
  16. OpsetVersion = int
  17. def _dispatch_opset_version(
  18. target: OpsetVersion, registered_opsets: Collection[OpsetVersion]
  19. ) -> Optional[OpsetVersion]:
  20. """Finds the registered opset given a target opset version and the available opsets.
  21. Args:
  22. target: The target opset version.
  23. registered_opsets: The available opsets.
  24. Returns:
  25. The registered opset version.
  26. """
  27. if not registered_opsets:
  28. return None
  29. descending_registered_versions = sorted(registered_opsets, reverse=True)
  30. # Linear search for the opset version, which is fine since the number of opset
  31. # versions is small.
  32. if target >= _constants.ONNX_BASE_OPSET:
  33. # Always look down toward opset 1 when the target is >= ONNX_BASE_OPSET (opset 9).
  34. # When a custom op is register at opset 1, we want to be able to discover it as a
  35. # fallback for all opsets >= ONNX_BASE_OPSET.
  36. for version in descending_registered_versions:
  37. if version <= target:
  38. return version
  39. return None
  40. # target < opset 9. This is the legacy behavior to support opset 7 and opset 8.
  41. # for caffe2 support. We search up toward opset 9.
  42. for version in reversed(descending_registered_versions):
  43. # Count back up until _constants.ONNX_BASE_OPSET
  44. if target <= version <= _constants.ONNX_BASE_OPSET:
  45. return version
  46. return None
  47. _K = TypeVar("_K")
  48. _V = TypeVar("_V")
  49. class OverrideDict(Generic[_K, _V], Collection[_K]):
  50. """A dictionary that merges built-in and custom symbolic functions.
  51. It supports overriding and un-overriding built-in symbolic functions with custom
  52. ones.
  53. """
  54. def __init__(self):
  55. self._base: Dict[_K, _V] = {}
  56. self._overrides: Dict[_K, _V] = {}
  57. self._merged: Dict[_K, _V] = {}
  58. def set_base(self, key: _K, value: _V) -> None:
  59. self._base[key] = value
  60. if key not in self._overrides:
  61. self._merged[key] = value
  62. def in_base(self, key: _K) -> bool:
  63. """Checks if a key is in the base dictionary."""
  64. return key in self._base
  65. def override(self, key: _K, value: _V) -> None:
  66. """Overrides a base key-value with a new pair."""
  67. self._overrides[key] = value
  68. self._merged[key] = value
  69. def remove_override(self, key: _K) -> None:
  70. """Un-overrides a key-value pair."""
  71. self._overrides.pop(key, None) # type: ignore[arg-type]
  72. self._merged.pop(key, None) # type: ignore[arg-type]
  73. if key in self._base:
  74. self._merged[key] = self._base[key]
  75. def overridden(self, key: _K) -> bool:
  76. """Checks if a key-value pair is overridden."""
  77. return key in self._overrides
  78. def __getitem__(self, key: _K) -> _V:
  79. return self._merged[key]
  80. def get(self, key: _K, default: Optional[_V] = None):
  81. return self._merged.get(key, default)
  82. def __contains__(self, key: object) -> bool:
  83. return key in self._merged
  84. def __iter__(self):
  85. return iter(self._merged)
  86. def __len__(self) -> int:
  87. return len(self._merged)
  88. def __repr__(self) -> str:
  89. return f"OverrideDict(base={self._base}, overrides={self._overrides})"
  90. def __bool__(self) -> bool:
  91. return bool(self._merged)
  92. class _SymbolicFunctionGroup:
  93. """Different versions of symbolic functions registered to the same name.
  94. O(number of registered versions of an op) search is performed to find the most
  95. recent version of the op.
  96. The registration is delayed until op is used to improve startup time.
  97. Function overloads with different arguments are not allowed.
  98. Custom op overrides are supported.
  99. """
  100. def __init__(self, name: str) -> None:
  101. self._name = name
  102. # A dictionary of functions, keyed by the opset version.
  103. self._functions: OverrideDict[OpsetVersion, Callable] = OverrideDict()
  104. def __repr__(self) -> str:
  105. return f"_SymbolicFunctionGroup({self._name}, registered={self._functions})"
  106. def __getitem__(self, key: OpsetVersion) -> Callable:
  107. result = self.get(key)
  108. if result is None:
  109. raise KeyError(key)
  110. return result
  111. # TODO(justinchuby): Add @functools.lru_cache(maxsize=None) if lookup time becomes
  112. # a problem.
  113. def get(self, opset: OpsetVersion) -> Optional[Callable]:
  114. """Find the most recent version of the function."""
  115. version = _dispatch_opset_version(opset, self._functions)
  116. if version is None:
  117. return None
  118. return self._functions[version]
  119. def add(self, func: Callable, opset: OpsetVersion) -> None:
  120. """Adds a symbolic function.
  121. Args:
  122. func: The function to add.
  123. opset: The opset version of the function to add.
  124. """
  125. if self._functions.in_base(opset):
  126. warnings.warn(
  127. f"Symbolic function '{self._name}' already registered for opset {opset}. "
  128. f"Replacing the existing function with new function. This is unexpected. "
  129. f"Please report it on {_constants.PYTORCH_GITHUB_ISSUES_URL}.",
  130. errors.OnnxExporterWarning,
  131. )
  132. self._functions.set_base(opset, func)
  133. def add_custom(self, func: Callable, opset: OpsetVersion) -> None:
  134. """Adds a custom symbolic function.
  135. Args:
  136. func: The symbolic function to register.
  137. opset: The corresponding opset version.
  138. """
  139. self._functions.override(opset, func)
  140. def remove_custom(self, opset: OpsetVersion) -> None:
  141. """Removes a custom symbolic function.
  142. Args:
  143. opset: The opset version of the custom function to remove.
  144. """
  145. if not self._functions.overridden(opset):
  146. warnings.warn(
  147. f"No custom function registered for '{self._name}' opset {opset}"
  148. )
  149. return
  150. self._functions.remove_override(opset)
  151. def get_min_supported(self) -> OpsetVersion:
  152. """Returns the lowest built-in opset version supported by the function."""
  153. return min(self._functions)
  154. class SymbolicRegistry:
  155. """Registry for symbolic functions.
  156. The registry maintains a mapping from qualified names to symbolic functions.
  157. It is used to register new symbolic functions and to dispatch calls to
  158. the appropriate function.
  159. """
  160. def __init__(self) -> None:
  161. self._registry: Dict[str, _SymbolicFunctionGroup] = {}
  162. def register(
  163. self, name: str, opset: OpsetVersion, func: Callable, custom: bool = False
  164. ) -> None:
  165. """Registers a symbolic function.
  166. Args:
  167. name: The qualified name of the function to register. In the form of 'domain::op'.
  168. E.g. 'aten::add'.
  169. opset: The opset version of the function to register.
  170. func: The symbolic function to register.
  171. custom: Whether the function is a custom function that overrides existing ones.
  172. Raises:
  173. ValueError: If the separator '::' is not in the name.
  174. """
  175. if "::" not in name:
  176. raise ValueError(
  177. f"The name must be in the form of 'domain::op', not '{name}'"
  178. )
  179. symbolic_functions = self._registry.setdefault(
  180. name, _SymbolicFunctionGroup(name)
  181. )
  182. if custom:
  183. symbolic_functions.add_custom(func, opset)
  184. else:
  185. symbolic_functions.add(func, opset)
  186. def unregister(self, name: str, opset: OpsetVersion) -> None:
  187. """Unregisters a symbolic function.
  188. Args:
  189. name: The qualified name of the function to unregister.
  190. opset: The opset version of the function to unregister.
  191. """
  192. if name not in self._registry:
  193. return
  194. self._registry[name].remove_custom(opset)
  195. def get_function_group(self, name: str) -> Optional[_SymbolicFunctionGroup]:
  196. """Returns the function group for the given name."""
  197. return self._registry.get(name)
  198. def is_registered_op(self, name: str, version: int) -> bool:
  199. """Returns whether the given op is registered for the given opset version."""
  200. functions = self.get_function_group(name)
  201. if functions is None:
  202. return False
  203. return functions.get(version) is not None
  204. def all_functions(self) -> Set[str]:
  205. """Returns the set of all registered function names."""
  206. return set(self._registry)
  207. @_beartype.beartype
  208. def onnx_symbolic(
  209. name: str,
  210. opset: Union[OpsetVersion, Sequence[OpsetVersion]],
  211. decorate: Optional[Sequence[Callable]] = None,
  212. custom: bool = False,
  213. ) -> Callable:
  214. """Registers a symbolic function.
  215. Usage::
  216. ```
  217. @onnx_symbolic("aten::symbolic_b", opset=10, decorate=[quantized_aten_handler(scale=1/128, zero_point=0)])
  218. @symbolic_helper.parse_args("v", "v", "b")
  219. def symbolic_b(g: _C.Graph, x: _C.Value, y: _C.Value, arg1: bool) -> _C.Value:
  220. ...
  221. ```
  222. Args:
  223. name: The qualified name of the function in the form of 'domain::op'.
  224. E.g. 'aten::add'.
  225. opset: The opset versions of the function to register at.
  226. decorate: A sequence of decorators to apply to the function.
  227. custom: Whether the function is a custom symbolic function.
  228. Raises:
  229. ValueError: If the separator '::' is not in the name.
  230. """
  231. def wrapper(func: Callable) -> Callable:
  232. decorated = func
  233. if decorate is not None:
  234. for decorate_func in decorate:
  235. decorated = decorate_func(decorated)
  236. global registry
  237. nonlocal opset
  238. if isinstance(opset, OpsetVersion):
  239. opset = (opset,)
  240. for opset_version in opset:
  241. registry.register(name, opset_version, decorated, custom=custom)
  242. # Return the original function because the decorators in "decorate" are only
  243. # specific to the instance being registered.
  244. return func
  245. return wrapper
  246. @_beartype.beartype
  247. def custom_onnx_symbolic(
  248. name: str,
  249. opset: Union[OpsetVersion, Sequence[OpsetVersion]],
  250. decorate: Optional[Sequence[Callable]] = None,
  251. ) -> Callable:
  252. """Registers a custom symbolic function.
  253. Args:
  254. name: the qualified name of the function.
  255. opset: the opset version of the function.
  256. decorate: a sequence of decorators to apply to the function.
  257. Returns:
  258. The decorator.
  259. Raises:
  260. ValueError: If the separator '::' is not in the name.
  261. """
  262. return onnx_symbolic(name, opset, decorate, custom=True)
  263. # The registry for all symbolic functions.
  264. registry = SymbolicRegistry()