123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- """Tests dealing with the NDFrame.allows_duplicates."""
- import operator
- import numpy as np
- import pytest
- import pandas as pd
- import pandas._testing as tm
- not_implemented = pytest.mark.xfail(reason="Not implemented.")
- # ----------------------------------------------------------------------------
- # Preservation
- class TestPreserves:
- @pytest.mark.parametrize(
- "cls, data",
- [
- (pd.Series, np.array([])),
- (pd.Series, [1, 2]),
- (pd.DataFrame, {}),
- (pd.DataFrame, {"A": [1, 2]}),
- ],
- )
- def test_construction_ok(self, cls, data):
- result = cls(data)
- assert result.flags.allows_duplicate_labels is True
- result = cls(data).set_flags(allows_duplicate_labels=False)
- assert result.flags.allows_duplicate_labels is False
- @pytest.mark.parametrize(
- "func",
- [
- operator.itemgetter(["a"]),
- operator.methodcaller("add", 1),
- operator.methodcaller("rename", str.upper),
- operator.methodcaller("rename", "name"),
- operator.methodcaller("abs"),
- np.abs,
- ],
- )
- def test_preserved_series(self, func):
- s = pd.Series([0, 1], index=["a", "b"]).set_flags(allows_duplicate_labels=False)
- assert func(s).flags.allows_duplicate_labels is False
- @pytest.mark.parametrize(
- "other", [pd.Series(0, index=["a", "b", "c"]), pd.Series(0, index=["a", "b"])]
- )
- # TODO: frame
- @not_implemented
- def test_align(self, other):
- s = pd.Series([0, 1], index=["a", "b"]).set_flags(allows_duplicate_labels=False)
- a, b = s.align(other)
- assert a.flags.allows_duplicate_labels is False
- assert b.flags.allows_duplicate_labels is False
- def test_preserved_frame(self):
- df = pd.DataFrame({"A": [1, 2], "B": [3, 4]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- )
- assert df.loc[["a"]].flags.allows_duplicate_labels is False
- assert df.loc[:, ["A", "B"]].flags.allows_duplicate_labels is False
- def test_to_frame(self):
- ser = pd.Series(dtype=float).set_flags(allows_duplicate_labels=False)
- assert ser.to_frame().flags.allows_duplicate_labels is False
- @pytest.mark.parametrize("func", ["add", "sub"])
- @pytest.mark.parametrize("frame", [False, True])
- @pytest.mark.parametrize("other", [1, pd.Series([1, 2], name="A")])
- def test_binops(self, func, other, frame):
- df = pd.Series([1, 2], name="A", index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- )
- if frame:
- df = df.to_frame()
- if isinstance(other, pd.Series) and frame:
- other = other.to_frame()
- func = operator.methodcaller(func, other)
- assert df.flags.allows_duplicate_labels is False
- assert func(df).flags.allows_duplicate_labels is False
- def test_preserve_getitem(self):
- df = pd.DataFrame({"A": [1, 2]}).set_flags(allows_duplicate_labels=False)
- assert df[["A"]].flags.allows_duplicate_labels is False
- assert df["A"].flags.allows_duplicate_labels is False
- assert df.loc[0].flags.allows_duplicate_labels is False
- assert df.loc[[0]].flags.allows_duplicate_labels is False
- assert df.loc[0, ["A"]].flags.allows_duplicate_labels is False
- def test_ndframe_getitem_caching_issue(self, request, using_copy_on_write):
- if not using_copy_on_write:
- request.node.add_marker(pytest.mark.xfail(reason="Unclear behavior."))
- # NDFrame.__getitem__ will cache the first df['A']. May need to
- # invalidate that cache? Update the cached entries?
- df = pd.DataFrame({"A": [0]}).set_flags(allows_duplicate_labels=False)
- assert df["A"].flags.allows_duplicate_labels is False
- df.flags.allows_duplicate_labels = True
- assert df["A"].flags.allows_duplicate_labels is True
- @pytest.mark.parametrize(
- "objs, kwargs",
- [
- # Series
- (
- [
- pd.Series(1, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.Series(2, index=["c", "d"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {},
- ),
- (
- [
- pd.Series(1, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.Series(2, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {"ignore_index": True},
- ),
- (
- [
- pd.Series(1, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.Series(2, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {"axis": 1},
- ),
- # Frame
- (
- [
- pd.DataFrame({"A": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.DataFrame({"A": [1, 2]}, index=["c", "d"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {},
- ),
- (
- [
- pd.DataFrame({"A": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.DataFrame({"A": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {"ignore_index": True},
- ),
- (
- [
- pd.DataFrame({"A": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.DataFrame({"B": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {"axis": 1},
- ),
- # Series / Frame
- (
- [
- pd.DataFrame({"A": [1, 2]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.Series(
- [1, 2],
- index=["a", "b"],
- name="B",
- ).set_flags(
- allows_duplicate_labels=False,
- ),
- ],
- {"axis": 1},
- ),
- ],
- )
- def test_concat(self, objs, kwargs):
- result = pd.concat(objs, **kwargs)
- assert result.flags.allows_duplicate_labels is False
- @pytest.mark.parametrize(
- "left, right, kwargs, expected",
- [
- # false false false
- pytest.param(
- pd.DataFrame({"A": [0, 1]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.DataFrame({"B": [0, 1]}, index=["a", "d"]).set_flags(
- allows_duplicate_labels=False
- ),
- {"left_index": True, "right_index": True},
- False,
- marks=not_implemented,
- ),
- # false true false
- pytest.param(
- pd.DataFrame({"A": [0, 1]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- ),
- pd.DataFrame({"B": [0, 1]}, index=["a", "d"]),
- {"left_index": True, "right_index": True},
- False,
- marks=not_implemented,
- ),
- # true true true
- (
- pd.DataFrame({"A": [0, 1]}, index=["a", "b"]),
- pd.DataFrame({"B": [0, 1]}, index=["a", "d"]),
- {"left_index": True, "right_index": True},
- True,
- ),
- ],
- )
- def test_merge(self, left, right, kwargs, expected):
- result = pd.merge(left, right, **kwargs)
- assert result.flags.allows_duplicate_labels is expected
- @not_implemented
- def test_groupby(self):
- # XXX: This is under tested
- # TODO:
- # - apply
- # - transform
- # - Should passing a grouper that disallows duplicates propagate?
- df = pd.DataFrame({"A": [1, 2, 3]}).set_flags(allows_duplicate_labels=False)
- result = df.groupby([0, 0, 1]).agg("count")
- assert result.flags.allows_duplicate_labels is False
- @pytest.mark.parametrize("frame", [True, False])
- @not_implemented
- def test_window(self, frame):
- df = pd.Series(
- 1,
- index=pd.date_range("2000", periods=12),
- name="A",
- allows_duplicate_labels=False,
- )
- if frame:
- df = df.to_frame()
- assert df.rolling(3).mean().flags.allows_duplicate_labels is False
- assert df.ewm(3).mean().flags.allows_duplicate_labels is False
- assert df.expanding(3).mean().flags.allows_duplicate_labels is False
- # ----------------------------------------------------------------------------
- # Raises
- class TestRaises:
- @pytest.mark.parametrize(
- "cls, axes",
- [
- (pd.Series, {"index": ["a", "a"], "dtype": float}),
- (pd.DataFrame, {"index": ["a", "a"]}),
- (pd.DataFrame, {"index": ["a", "a"], "columns": ["b", "b"]}),
- (pd.DataFrame, {"columns": ["b", "b"]}),
- ],
- )
- def test_set_flags_with_duplicates(self, cls, axes):
- result = cls(**axes)
- assert result.flags.allows_duplicate_labels is True
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- cls(**axes).set_flags(allows_duplicate_labels=False)
- @pytest.mark.parametrize(
- "data",
- [
- pd.Series(index=[0, 0], dtype=float),
- pd.DataFrame(index=[0, 0]),
- pd.DataFrame(columns=[0, 0]),
- ],
- )
- def test_setting_allows_duplicate_labels_raises(self, data):
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- data.flags.allows_duplicate_labels = False
- assert data.flags.allows_duplicate_labels is True
- def test_series_raises(self):
- a = pd.Series(0, index=["a", "b"])
- b = pd.Series([0, 1], index=["a", "b"]).set_flags(allows_duplicate_labels=False)
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.concat([a, b])
- @pytest.mark.parametrize(
- "getter, target",
- [
- (operator.itemgetter(["A", "A"]), None),
- # loc
- (operator.itemgetter(["a", "a"]), "loc"),
- pytest.param(operator.itemgetter(("a", ["A", "A"])), "loc"),
- (operator.itemgetter((["a", "a"], "A")), "loc"),
- # iloc
- (operator.itemgetter([0, 0]), "iloc"),
- pytest.param(operator.itemgetter((0, [0, 0])), "iloc"),
- pytest.param(operator.itemgetter(([0, 0], 0)), "iloc"),
- ],
- )
- def test_getitem_raises(self, getter, target):
- df = pd.DataFrame({"A": [1, 2], "B": [3, 4]}, index=["a", "b"]).set_flags(
- allows_duplicate_labels=False
- )
- if target:
- # df, df.loc, or df.iloc
- target = getattr(df, target)
- else:
- target = df
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- getter(target)
- @pytest.mark.parametrize(
- "objs, kwargs",
- [
- (
- [
- pd.Series(1, index=[0, 1], name="a").set_flags(
- allows_duplicate_labels=False
- ),
- pd.Series(2, index=[0, 1], name="a").set_flags(
- allows_duplicate_labels=False
- ),
- ],
- {"axis": 1},
- )
- ],
- )
- def test_concat_raises(self, objs, kwargs):
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.concat(objs, **kwargs)
- @not_implemented
- def test_merge_raises(self):
- a = pd.DataFrame({"A": [0, 1, 2]}, index=["a", "b", "c"]).set_flags(
- allows_duplicate_labels=False
- )
- b = pd.DataFrame({"B": [0, 1, 2]}, index=["a", "b", "b"])
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.merge(a, b, left_index=True, right_index=True)
- @pytest.mark.parametrize(
- "idx",
- [
- pd.Index([1, 1]),
- pd.Index(["a", "a"]),
- pd.Index([1.1, 1.1]),
- pd.PeriodIndex([pd.Period("2000", "D")] * 2),
- pd.DatetimeIndex([pd.Timestamp("2000")] * 2),
- pd.TimedeltaIndex([pd.Timedelta("1D")] * 2),
- pd.CategoricalIndex(["a", "a"]),
- pd.IntervalIndex([pd.Interval(0, 1)] * 2),
- pd.MultiIndex.from_tuples([("a", 1), ("a", 1)]),
- ],
- ids=lambda x: type(x).__name__,
- )
- def test_raises_basic(idx):
- msg = "Index has duplicates."
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.Series(1, index=idx).set_flags(allows_duplicate_labels=False)
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.DataFrame({"A": [1, 1]}, index=idx).set_flags(allows_duplicate_labels=False)
- with pytest.raises(pd.errors.DuplicateLabelError, match=msg):
- pd.DataFrame([[1, 2]], columns=idx).set_flags(allows_duplicate_labels=False)
- def test_format_duplicate_labels_message():
- idx = pd.Index(["a", "b", "a", "b", "c"])
- result = idx._format_duplicate_message()
- expected = pd.DataFrame(
- {"positions": [[0, 2], [1, 3]]}, index=pd.Index(["a", "b"], name="label")
- )
- tm.assert_frame_equal(result, expected)
- def test_format_duplicate_labels_message_multi():
- idx = pd.MultiIndex.from_product([["A"], ["a", "b", "a", "b", "c"]])
- result = idx._format_duplicate_message()
- expected = pd.DataFrame(
- {"positions": [[0, 2], [1, 3]]},
- index=pd.MultiIndex.from_product([["A"], ["a", "b"]]),
- )
- tm.assert_frame_equal(result, expected)
- def test_dataframe_insert_raises():
- df = pd.DataFrame({"A": [1, 2]}).set_flags(allows_duplicate_labels=False)
- msg = "Cannot specify"
- with pytest.raises(ValueError, match=msg):
- df.insert(0, "A", [3, 4], allow_duplicates=True)
- @pytest.mark.parametrize(
- "method, frame_only",
- [
- (operator.methodcaller("set_index", "A", inplace=True), True),
- (operator.methodcaller("reset_index", inplace=True), True),
- (operator.methodcaller("rename", lambda x: x, inplace=True), False),
- ],
- )
- def test_inplace_raises(method, frame_only):
- df = pd.DataFrame({"A": [0, 0], "B": [1, 2]}).set_flags(
- allows_duplicate_labels=False
- )
- s = df["A"]
- s.flags.allows_duplicate_labels = False
- msg = "Cannot specify"
- with pytest.raises(ValueError, match=msg):
- method(df)
- if not frame_only:
- with pytest.raises(ValueError, match=msg):
- method(s)
- def test_pickle():
- a = pd.Series([1, 2]).set_flags(allows_duplicate_labels=False)
- b = tm.round_trip_pickle(a)
- tm.assert_series_equal(a, b)
- a = pd.DataFrame({"A": []}).set_flags(allows_duplicate_labels=False)
- b = tm.round_trip_pickle(a)
- tm.assert_frame_equal(a, b)
|