_exceptions.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. from __future__ import annotations
  2. import contextlib
  3. import inspect
  4. import os
  5. import re
  6. from typing import Generator
  7. import warnings
  8. @contextlib.contextmanager
  9. def rewrite_exception(old_name: str, new_name: str) -> Generator[None, None, None]:
  10. """
  11. Rewrite the message of an exception.
  12. """
  13. try:
  14. yield
  15. except Exception as err:
  16. if not err.args:
  17. raise
  18. msg = str(err.args[0])
  19. msg = msg.replace(old_name, new_name)
  20. args: tuple[str, ...] = (msg,)
  21. if len(err.args) > 1:
  22. args = args + err.args[1:]
  23. err.args = args
  24. raise
  25. def find_stack_level() -> int:
  26. """
  27. Find the first place in the stack that is not inside pandas
  28. (tests notwithstanding).
  29. """
  30. import pandas as pd
  31. pkg_dir = os.path.dirname(pd.__file__)
  32. test_dir = os.path.join(pkg_dir, "tests")
  33. # https://stackoverflow.com/questions/17407119/python-inspect-stack-is-slow
  34. frame = inspect.currentframe()
  35. n = 0
  36. while frame:
  37. fname = inspect.getfile(frame)
  38. if fname.startswith(pkg_dir) and not fname.startswith(test_dir):
  39. frame = frame.f_back
  40. n += 1
  41. else:
  42. break
  43. return n
  44. @contextlib.contextmanager
  45. def rewrite_warning(
  46. target_message: str,
  47. target_category: type[Warning],
  48. new_message: str,
  49. new_category: type[Warning] | None = None,
  50. ) -> Generator[None, None, None]:
  51. """
  52. Rewrite the message of a warning.
  53. Parameters
  54. ----------
  55. target_message : str
  56. Warning message to match.
  57. target_category : Warning
  58. Warning type to match.
  59. new_message : str
  60. New warning message to emit.
  61. new_category : Warning or None, default None
  62. New warning type to emit. When None, will be the same as target_category.
  63. """
  64. if new_category is None:
  65. new_category = target_category
  66. with warnings.catch_warnings(record=True) as record:
  67. yield
  68. if len(record) > 0:
  69. match = re.compile(target_message)
  70. for warning in record:
  71. if warning.category is target_category and re.search(
  72. match, str(warning.message)
  73. ):
  74. category = new_category
  75. message: Warning | str = new_message
  76. else:
  77. category, message = warning.category, warning.message
  78. warnings.warn_explicit(
  79. message=message,
  80. category=category,
  81. filename=warning.filename,
  82. lineno=warning.lineno,
  83. )