selecting.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. from typing import Callable, Iterator, Tuple, TypeVar
  2. from torch.utils.data.datapipes._decorator import functional_datapipe
  3. from torch.utils.data.datapipes.datapipe import IterDataPipe
  4. from torch.utils.data.datapipes.dataframe import dataframe_wrapper as df_wrapper
  5. from torch.utils.data.datapipes.utils.common import (
  6. _check_unpickable_fn,
  7. StreamWrapper,
  8. validate_input_col
  9. )
  10. __all__ = ["FilterIterDataPipe", ]
  11. T = TypeVar('T')
  12. T_co = TypeVar('T_co', covariant=True)
  13. @functional_datapipe('filter')
  14. class FilterIterDataPipe(IterDataPipe[T_co]):
  15. r"""
  16. Filters out elements from the source datapipe according to input ``filter_fn`` (functional name: ``filter``).
  17. Args:
  18. datapipe: Iterable DataPipe being filtered
  19. filter_fn: Customized function mapping an element to a boolean.
  20. input_col: Index or indices of data which ``filter_fn`` is applied, such as:
  21. - ``None`` as default to apply ``filter_fn`` to the data directly.
  22. - Integer(s) is used for list/tuple.
  23. - Key(s) is used for dict.
  24. Example:
  25. >>> # xdoctest: +SKIP
  26. >>> from torchdata.datapipes.iter import IterableWrapper
  27. >>> def is_even(n):
  28. ... return n % 2 == 0
  29. >>> dp = IterableWrapper(range(5))
  30. >>> filter_dp = dp.filter(filter_fn=is_even)
  31. >>> list(filter_dp)
  32. [0, 2, 4]
  33. """
  34. datapipe: IterDataPipe[T_co]
  35. filter_fn: Callable
  36. def __init__(
  37. self,
  38. datapipe: IterDataPipe[T_co],
  39. filter_fn: Callable,
  40. input_col=None,
  41. ) -> None:
  42. super().__init__()
  43. self.datapipe = datapipe
  44. _check_unpickable_fn(filter_fn)
  45. self.filter_fn = filter_fn # type: ignore[assignment]
  46. self.input_col = input_col
  47. validate_input_col(filter_fn, input_col)
  48. def _apply_filter_fn(self, data) -> bool:
  49. if self.input_col is None:
  50. return self.filter_fn(data)
  51. elif isinstance(self.input_col, (list, tuple)):
  52. args = tuple(data[col] for col in self.input_col)
  53. return self.filter_fn(*args)
  54. else:
  55. return self.filter_fn(data[self.input_col])
  56. def __iter__(self) -> Iterator[T_co]:
  57. for data in self.datapipe:
  58. condition, filtered = self._returnIfTrue(data)
  59. if condition:
  60. yield filtered
  61. else:
  62. StreamWrapper.close_streams(data)
  63. def _returnIfTrue(self, data: T) -> Tuple[bool, T]:
  64. condition = self._apply_filter_fn(data)
  65. if df_wrapper.is_column(condition):
  66. # We are operating on DataFrames filter here
  67. result = []
  68. for idx, mask in enumerate(df_wrapper.iterate(condition)):
  69. if mask:
  70. result.append(df_wrapper.get_item(data, idx))
  71. if len(result):
  72. return True, df_wrapper.concat(result)
  73. else:
  74. return False, None # type: ignore[return-value]
  75. if not isinstance(condition, bool):
  76. raise ValueError("Boolean output is required for `filter_fn` of FilterIterDataPipe, got", type(condition))
  77. return condition, data