| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490 | """Tests that apply specifically to the Python parser. Unless specificallystated as a Python-specific issue, the goal is to eventually move as many ofthese tests out of this module as soon as the C parser can accept furtherarguments when parsing."""from __future__ import annotationsimport csvfrom io import (    BytesIO,    StringIO,    TextIOWrapper,)from typing import Iteratorimport pytestfrom pandas.errors import (    ParserError,    ParserWarning,)from pandas import (    DataFrame,    Index,    MultiIndex,)import pandas._testing as tmdef test_default_separator(python_parser_only):    # see gh-17333    #    # csv.Sniffer in Python treats "o" as separator.    data = "aob\n1o2\n3o4"    parser = python_parser_only    expected = DataFrame({"a": [1, 3], "b": [2, 4]})    result = parser.read_csv(StringIO(data), sep=None)    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize("skipfooter", ["foo", 1.5, True])def test_invalid_skipfooter_non_int(python_parser_only, skipfooter):    # see gh-15925 (comment)    data = "a\n1\n2"    parser = python_parser_only    msg = "skipfooter must be an integer"    with pytest.raises(ValueError, match=msg):        parser.read_csv(StringIO(data), skipfooter=skipfooter)def test_invalid_skipfooter_negative(python_parser_only):    # see gh-15925 (comment)    data = "a\n1\n2"    parser = python_parser_only    msg = "skipfooter cannot be negative"    with pytest.raises(ValueError, match=msg):        parser.read_csv(StringIO(data), skipfooter=-1)@pytest.mark.parametrize("kwargs", [{"sep": None}, {"delimiter": "|"}])def test_sniff_delimiter(python_parser_only, kwargs):    data = """index|A|B|Cfoo|1|2|3bar|4|5|6baz|7|8|9"""    parser = python_parser_only    result = parser.read_csv(StringIO(data), index_col=0, **kwargs)    expected = DataFrame(        [[1, 2, 3], [4, 5, 6], [7, 8, 9]],        columns=["A", "B", "C"],        index=Index(["foo", "bar", "baz"], name="index"),    )    tm.assert_frame_equal(result, expected)def test_sniff_delimiter_comment(python_parser_only):    data = """# comment lineindex|A|B|C# comment linefoo|1|2|3 # ignore | thisbar|4|5|6baz|7|8|9"""    parser = python_parser_only    result = parser.read_csv(StringIO(data), index_col=0, sep=None, comment="#")    expected = DataFrame(        [[1, 2, 3], [4, 5, 6], [7, 8, 9]],        columns=["A", "B", "C"],        index=Index(["foo", "bar", "baz"], name="index"),    )    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize("encoding", [None, "utf-8"])def test_sniff_delimiter_encoding(python_parser_only, encoding):    parser = python_parser_only    data = """ignore thisignore this tooindex|A|B|Cfoo|1|2|3bar|4|5|6baz|7|8|9"""    if encoding is not None:        data = data.encode(encoding)        data = BytesIO(data)        data = TextIOWrapper(data, encoding=encoding)    else:        data = StringIO(data)    result = parser.read_csv(data, index_col=0, sep=None, skiprows=2, encoding=encoding)    expected = DataFrame(        [[1, 2, 3], [4, 5, 6], [7, 8, 9]],        columns=["A", "B", "C"],        index=Index(["foo", "bar", "baz"], name="index"),    )    tm.assert_frame_equal(result, expected)def test_single_line(python_parser_only):    # see gh-6607: sniff separator    parser = python_parser_only    result = parser.read_csv(StringIO("1,2"), names=["a", "b"], header=None, sep=None)    expected = DataFrame({"a": [1], "b": [2]})    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize("kwargs", [{"skipfooter": 2}, {"nrows": 3}])def test_skipfooter(python_parser_only, kwargs):    # see gh-6607    data = """A,B,C1,2,34,5,67,8,9want to skip thisalso also skip this"""    parser = python_parser_only    result = parser.read_csv(StringIO(data), **kwargs)    expected = DataFrame([[1, 2, 3], [4, 5, 6], [7, 8, 9]], columns=["A", "B", "C"])    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize(    "compression,klass", [("gzip", "GzipFile"), ("bz2", "BZ2File")])def test_decompression_regex_sep(python_parser_only, csv1, compression, klass):    # see gh-6607    parser = python_parser_only    with open(csv1, "rb") as f:        data = f.read()    data = data.replace(b",", b"::")    expected = parser.read_csv(csv1)    module = pytest.importorskip(compression)    klass = getattr(module, klass)    with tm.ensure_clean() as path:        with klass(path, mode="wb") as tmp:            tmp.write(data)        result = parser.read_csv(path, sep="::", compression=compression)        tm.assert_frame_equal(result, expected)def test_read_csv_buglet_4x_multi_index(python_parser_only):    # see gh-6607    data = """                      A       B       C       D        Eone two three   foura   b   10.0032 5    -0.5109 -2.3358 -0.4645  0.05076  0.3640a   q   20      4     0.4473  1.4152  0.2834  1.00661  0.1744x   q   30      3    -0.6662 -0.5243 -0.3580  0.89145  2.5838"""    parser = python_parser_only    expected = DataFrame(        [            [-0.5109, -2.3358, -0.4645, 0.05076, 0.3640],            [0.4473, 1.4152, 0.2834, 1.00661, 0.1744],            [-0.6662, -0.5243, -0.3580, 0.89145, 2.5838],        ],        columns=["A", "B", "C", "D", "E"],        index=MultiIndex.from_tuples(            [("a", "b", 10.0032, 5), ("a", "q", 20, 4), ("x", "q", 30, 3)],            names=["one", "two", "three", "four"],        ),    )    result = parser.read_csv(StringIO(data), sep=r"\s+")    tm.assert_frame_equal(result, expected)def test_read_csv_buglet_4x_multi_index2(python_parser_only):    # see gh-6893    data = "      A B C\na b c\n1 3 7 0 3 6\n3 1 4 1 5 9"    parser = python_parser_only    expected = DataFrame.from_records(        [(1, 3, 7, 0, 3, 6), (3, 1, 4, 1, 5, 9)],        columns=list("abcABC"),        index=list("abc"),    )    result = parser.read_csv(StringIO(data), sep=r"\s+")    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize("add_footer", [True, False])def test_skipfooter_with_decimal(python_parser_only, add_footer):    # see gh-6971    data = "1#2\n3#4"    parser = python_parser_only    expected = DataFrame({"a": [1.2, 3.4]})    if add_footer:        # The stray footer line should not mess with the        # casting of the first two lines if we skip it.        kwargs = {"skipfooter": 1}        data += "\nFooter"    else:        kwargs = {}    result = parser.read_csv(StringIO(data), names=["a"], decimal="#", **kwargs)    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize(    "sep", ["::", "#####", "!!!", "123", "#1!c5", "%!c!d", "@@#4:2", "_!pd#_"])@pytest.mark.parametrize(    "encoding", ["utf-16", "utf-16-be", "utf-16-le", "utf-32", "cp037"])def test_encoding_non_utf8_multichar_sep(python_parser_only, sep, encoding):    # see gh-3404    expected = DataFrame({"a": [1], "b": [2]})    parser = python_parser_only    data = "1" + sep + "2"    encoded_data = data.encode(encoding)    result = parser.read_csv(        BytesIO(encoded_data), sep=sep, names=["a", "b"], encoding=encoding    )    tm.assert_frame_equal(result, expected)@pytest.mark.parametrize("quoting", [csv.QUOTE_MINIMAL, csv.QUOTE_NONE])def test_multi_char_sep_quotes(python_parser_only, quoting):    # see gh-13374    kwargs = {"sep": ",,"}    parser = python_parser_only    data = 'a,,b\n1,,a\n2,,"2,,b"'    if quoting == csv.QUOTE_NONE:        msg = "Expected 2 fields in line 3, saw 3"        with pytest.raises(ParserError, match=msg):            parser.read_csv(StringIO(data), quoting=quoting, **kwargs)    else:        msg = "ignored when a multi-char delimiter is used"        with pytest.raises(ParserError, match=msg):            parser.read_csv(StringIO(data), quoting=quoting, **kwargs)def test_none_delimiter(python_parser_only, capsys):    # see gh-13374 and gh-17465    parser = python_parser_only    data = "a,b,c\n0,1,2\n3,4,5,6\n7,8,9"    expected = DataFrame({"a": [0, 7], "b": [1, 8], "c": [2, 9]})    # We expect the third line in the data to be    # skipped because it is malformed, but we do    # not expect any errors to occur.    result = parser.read_csv(StringIO(data), header=0, sep=None, on_bad_lines="warn")    tm.assert_frame_equal(result, expected)    captured = capsys.readouterr()    assert "Skipping line 3" in captured.err@pytest.mark.parametrize("data", ['a\n1\n"b"a', 'a,b,c\ncat,foo,bar\ndog,foo,"baz'])@pytest.mark.parametrize("skipfooter", [0, 1])def test_skipfooter_bad_row(python_parser_only, data, skipfooter):    # see gh-13879 and gh-15910    parser = python_parser_only    if skipfooter:        msg = "parsing errors in the skipped footer rows"        with pytest.raises(ParserError, match=msg):            parser.read_csv(StringIO(data), skipfooter=skipfooter)    else:        msg = "unexpected end of data|expected after"        with pytest.raises(ParserError, match=msg):            parser.read_csv(StringIO(data), skipfooter=skipfooter)def test_malformed_skipfooter(python_parser_only):    parser = python_parser_only    data = """ignoreA,B,C1,2,3 # comment1,2,3,4,52,3,4footer"""    msg = "Expected 3 fields in line 4, saw 5"    with pytest.raises(ParserError, match=msg):        parser.read_csv(StringIO(data), header=1, comment="#", skipfooter=1)def test_python_engine_file_no_next(python_parser_only):    parser = python_parser_only    class NoNextBuffer:        def __init__(self, csv_data) -> None:            self.data = csv_data        def __iter__(self) -> Iterator:            return self.data.__iter__()        def read(self):            return self.data        def readline(self):            return self.data    parser.read_csv(NoNextBuffer("a\n1"))@pytest.mark.parametrize("bad_line_func", [lambda x: ["2", "3"], lambda x: x[:2]])def test_on_bad_lines_callable(python_parser_only, bad_line_func):    # GH 5686    parser = python_parser_only    data = """a,b1,22,3,4,5,63,4"""    bad_sio = StringIO(data)    result = parser.read_csv(bad_sio, on_bad_lines=bad_line_func)    expected = DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]})    tm.assert_frame_equal(result, expected)def test_on_bad_lines_callable_write_to_external_list(python_parser_only):    # GH 5686    parser = python_parser_only    data = """a,b1,22,3,4,5,63,4"""    bad_sio = StringIO(data)    lst = []    def bad_line_func(bad_line: list[str]) -> list[str]:        lst.append(bad_line)        return ["2", "3"]    result = parser.read_csv(bad_sio, on_bad_lines=bad_line_func)    expected = DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]})    tm.assert_frame_equal(result, expected)    assert lst == [["2", "3", "4", "5", "6"]]@pytest.mark.parametrize("bad_line_func", [lambda x: ["foo", "bar"], lambda x: x[:2]])@pytest.mark.parametrize("sep", [",", "111"])def test_on_bad_lines_callable_iterator_true(python_parser_only, bad_line_func, sep):    # GH 5686    # iterator=True has a separate code path than iterator=False    parser = python_parser_only    data = f"""0{sep}1hi{sep}therefoo{sep}bar{sep}bazgood{sep}bye"""    bad_sio = StringIO(data)    result_iter = parser.read_csv(        bad_sio, on_bad_lines=bad_line_func, chunksize=1, iterator=True, sep=sep    )    expecteds = [        {"0": "hi", "1": "there"},        {"0": "foo", "1": "bar"},        {"0": "good", "1": "bye"},    ]    for i, (result, expected) in enumerate(zip(result_iter, expecteds)):        expected = DataFrame(expected, index=range(i, i + 1))        tm.assert_frame_equal(result, expected)def test_on_bad_lines_callable_dont_swallow_errors(python_parser_only):    # GH 5686    parser = python_parser_only    data = """a,b1,22,3,4,5,63,4"""    bad_sio = StringIO(data)    msg = "This function is buggy."    def bad_line_func(bad_line):        raise ValueError(msg)    with pytest.raises(ValueError, match=msg):        parser.read_csv(bad_sio, on_bad_lines=bad_line_func)def test_on_bad_lines_callable_not_expected_length(python_parser_only):    # GH 5686    parser = python_parser_only    data = """a,b1,22,3,4,5,63,4"""    bad_sio = StringIO(data)    result = parser.read_csv_check_warnings(        ParserWarning, "Length of header or names", bad_sio, on_bad_lines=lambda x: x    )    expected = DataFrame({"a": [1, 2, 3], "b": [2, 3, 4]})    tm.assert_frame_equal(result, expected)def test_on_bad_lines_callable_returns_none(python_parser_only):    # GH 5686    parser = python_parser_only    data = """a,b1,22,3,4,5,63,4"""    bad_sio = StringIO(data)    result = parser.read_csv(bad_sio, on_bad_lines=lambda x: None)    expected = DataFrame({"a": [1, 3], "b": [2, 4]})    tm.assert_frame_equal(result, expected)def test_on_bad_lines_index_col_inferred(python_parser_only):    # GH 5686    parser = python_parser_only    data = """a,b1,2,34,5,6"""    bad_sio = StringIO(data)    result = parser.read_csv(bad_sio, on_bad_lines=lambda x: ["99", "99"])    expected = DataFrame({"a": [2, 5], "b": [3, 6]}, index=[1, 4])    tm.assert_frame_equal(result, expected)def test_index_col_false_and_header_none(python_parser_only):    # GH#46955    parser = python_parser_only    data = """0.5,0.030.1,0.2,0.3,2"""    result = parser.read_csv_check_warnings(        ParserWarning,        "Length of header",        StringIO(data),        sep=",",        header=None,        index_col=False,    )    expected = DataFrame({0: [0.5, 0.1], 1: [0.03, 0.2]})    tm.assert_frame_equal(result, expected)def test_header_int_do_not_infer_multiindex_names_on_different_line(python_parser_only):    # GH#46569    parser = python_parser_only    data = StringIO("a\na,b\nc,d,e\nf,g,h")    result = parser.read_csv_check_warnings(        ParserWarning, "Length of header", data, engine="python", index_col=False    )    expected = DataFrame({"a": ["a", "c", "f"]})    tm.assert_frame_equal(result, expected)
 |