123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306 |
- import io
- import numpy as np
- import pytest
- from pandas import (
- DataFrame,
- date_range,
- read_csv,
- read_excel,
- read_feather,
- read_json,
- read_parquet,
- read_pickle,
- read_stata,
- read_table,
- )
- import pandas._testing as tm
- from pandas.util import _test_decorators as td
- @pytest.fixture
- def df1():
- return DataFrame(
- {
- "int": [1, 3],
- "float": [2.0, np.nan],
- "str": ["t", "s"],
- "dt": date_range("2018-06-18", periods=2),
- }
- )
- @pytest.fixture
- def cleared_fs():
- fsspec = pytest.importorskip("fsspec")
- memfs = fsspec.filesystem("memory")
- yield memfs
- memfs.store.clear()
- def test_read_csv(cleared_fs, df1):
- text = str(df1.to_csv(index=False)).encode()
- with cleared_fs.open("test/test.csv", "wb") as w:
- w.write(text)
- df2 = read_csv("memory://test/test.csv", parse_dates=["dt"])
- tm.assert_frame_equal(df1, df2)
- def test_reasonable_error(monkeypatch, cleared_fs):
- from fsspec.registry import known_implementations
- with pytest.raises(ValueError, match="nosuchprotocol"):
- read_csv("nosuchprotocol://test/test.csv")
- err_msg = "test error message"
- monkeypatch.setitem(
- known_implementations,
- "couldexist",
- {"class": "unimportable.CouldExist", "err": err_msg},
- )
- with pytest.raises(ImportError, match=err_msg):
- read_csv("couldexist://test/test.csv")
- def test_to_csv(cleared_fs, df1):
- df1.to_csv("memory://test/test.csv", index=True)
- df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0)
- tm.assert_frame_equal(df1, df2)
- def test_to_excel(cleared_fs, df1):
- pytest.importorskip("openpyxl")
- ext = "xlsx"
- path = f"memory://test/test.{ext}"
- df1.to_excel(path, index=True)
- df2 = read_excel(path, parse_dates=["dt"], index_col=0)
- tm.assert_frame_equal(df1, df2)
- @pytest.mark.parametrize("binary_mode", [False, True])
- def test_to_csv_fsspec_object(cleared_fs, binary_mode, df1):
- fsspec = pytest.importorskip("fsspec")
- path = "memory://test/test.csv"
- mode = "wb" if binary_mode else "w"
- with fsspec.open(path, mode=mode).open() as fsspec_object:
- df1.to_csv(fsspec_object, index=True)
- assert not fsspec_object.closed
- mode = mode.replace("w", "r")
- with fsspec.open(path, mode=mode) as fsspec_object:
- df2 = read_csv(
- fsspec_object,
- parse_dates=["dt"],
- index_col=0,
- )
- assert not fsspec_object.closed
- tm.assert_frame_equal(df1, df2)
- def test_csv_options(fsspectest):
- df = DataFrame({"a": [0]})
- df.to_csv(
- "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
- )
- assert fsspectest.test[0] == "csv_write"
- read_csv("testmem://test/test.csv", storage_options={"test": "csv_read"})
- assert fsspectest.test[0] == "csv_read"
- def test_read_table_options(fsspectest):
- # GH #39167
- df = DataFrame({"a": [0]})
- df.to_csv(
- "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
- )
- assert fsspectest.test[0] == "csv_write"
- read_table("testmem://test/test.csv", storage_options={"test": "csv_read"})
- assert fsspectest.test[0] == "csv_read"
- def test_excel_options(fsspectest):
- pytest.importorskip("openpyxl")
- extension = "xlsx"
- df = DataFrame({"a": [0]})
- path = f"testmem://test/test.{extension}"
- df.to_excel(path, storage_options={"test": "write"}, index=False)
- assert fsspectest.test[0] == "write"
- read_excel(path, storage_options={"test": "read"})
- assert fsspectest.test[0] == "read"
- @td.skip_if_no("fastparquet")
- def test_to_parquet_new_file(cleared_fs, df1):
- """Regression test for writing to a not-yet-existent GCS Parquet file."""
- df1.to_parquet(
- "memory://test/test.csv", index=True, engine="fastparquet", compression=None
- )
- @td.skip_if_no("pyarrow")
- def test_arrowparquet_options(fsspectest):
- """Regression test for writing to a not-yet-existent GCS Parquet file."""
- df = DataFrame({"a": [0]})
- df.to_parquet(
- "testmem://test/test.csv",
- engine="pyarrow",
- compression=None,
- storage_options={"test": "parquet_write"},
- )
- assert fsspectest.test[0] == "parquet_write"
- read_parquet(
- "testmem://test/test.csv",
- engine="pyarrow",
- storage_options={"test": "parquet_read"},
- )
- assert fsspectest.test[0] == "parquet_read"
- @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
- @td.skip_if_no("fastparquet")
- def test_fastparquet_options(fsspectest):
- """Regression test for writing to a not-yet-existent GCS Parquet file."""
- df = DataFrame({"a": [0]})
- df.to_parquet(
- "testmem://test/test.csv",
- engine="fastparquet",
- compression=None,
- storage_options={"test": "parquet_write"},
- )
- assert fsspectest.test[0] == "parquet_write"
- read_parquet(
- "testmem://test/test.csv",
- engine="fastparquet",
- storage_options={"test": "parquet_read"},
- )
- assert fsspectest.test[0] == "parquet_read"
- @pytest.mark.single_cpu
- @td.skip_if_no("s3fs")
- def test_from_s3_csv(s3_resource, tips_file, s3so):
- tm.assert_equal(
- read_csv("s3://pandas-test/tips.csv", storage_options=s3so), read_csv(tips_file)
- )
- # the following are decompressed by pandas, not fsspec
- tm.assert_equal(
- read_csv("s3://pandas-test/tips.csv.gz", storage_options=s3so),
- read_csv(tips_file),
- )
- tm.assert_equal(
- read_csv("s3://pandas-test/tips.csv.bz2", storage_options=s3so),
- read_csv(tips_file),
- )
- @pytest.mark.single_cpu
- @pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"])
- @td.skip_if_no("s3fs")
- def test_s3_protocols(s3_resource, tips_file, protocol, s3so):
- tm.assert_equal(
- read_csv(f"{protocol}://pandas-test/tips.csv", storage_options=s3so),
- read_csv(tips_file),
- )
- @pytest.mark.single_cpu
- @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
- @td.skip_if_no("s3fs")
- @td.skip_if_no("fastparquet")
- def test_s3_parquet(s3_resource, s3so, df1):
- fn = "s3://pandas-test/test.parquet"
- df1.to_parquet(
- fn, index=False, engine="fastparquet", compression=None, storage_options=s3so
- )
- df2 = read_parquet(fn, engine="fastparquet", storage_options=s3so)
- tm.assert_equal(df1, df2)
- @td.skip_if_installed("fsspec")
- def test_not_present_exception():
- msg = "Missing optional dependency 'fsspec'|fsspec library is required"
- with pytest.raises(ImportError, match=msg):
- read_csv("memory://test/test.csv")
- @td.skip_if_no("pyarrow")
- def test_feather_options(fsspectest):
- df = DataFrame({"a": [0]})
- df.to_feather("testmem://afile", storage_options={"test": "feather_write"})
- assert fsspectest.test[0] == "feather_write"
- out = read_feather("testmem://afile", storage_options={"test": "feather_read"})
- assert fsspectest.test[0] == "feather_read"
- tm.assert_frame_equal(df, out)
- def test_pickle_options(fsspectest):
- df = DataFrame({"a": [0]})
- df.to_pickle("testmem://afile", storage_options={"test": "pickle_write"})
- assert fsspectest.test[0] == "pickle_write"
- out = read_pickle("testmem://afile", storage_options={"test": "pickle_read"})
- assert fsspectest.test[0] == "pickle_read"
- tm.assert_frame_equal(df, out)
- def test_json_options(fsspectest, compression):
- df = DataFrame({"a": [0]})
- df.to_json(
- "testmem://afile",
- compression=compression,
- storage_options={"test": "json_write"},
- )
- assert fsspectest.test[0] == "json_write"
- out = read_json(
- "testmem://afile",
- compression=compression,
- storage_options={"test": "json_read"},
- )
- assert fsspectest.test[0] == "json_read"
- tm.assert_frame_equal(df, out)
- def test_stata_options(fsspectest):
- df = DataFrame({"a": [0]})
- df.to_stata(
- "testmem://afile", storage_options={"test": "stata_write"}, write_index=False
- )
- assert fsspectest.test[0] == "stata_write"
- out = read_stata("testmem://afile", storage_options={"test": "stata_read"})
- assert fsspectest.test[0] == "stata_read"
- tm.assert_frame_equal(df, out.astype("int64"))
- @td.skip_if_no("tabulate")
- def test_markdown_options(fsspectest):
- df = DataFrame({"a": [0]})
- df.to_markdown("testmem://afile", storage_options={"test": "md_write"})
- assert fsspectest.test[0] == "md_write"
- assert fsspectest.cat("testmem://afile")
- @td.skip_if_no("pyarrow")
- def test_non_fsspec_options():
- with pytest.raises(ValueError, match="storage_options"):
- read_csv("localfile", storage_options={"a": True})
- with pytest.raises(ValueError, match="storage_options"):
- # separate test for parquet, which has a different code path
- read_parquet("localfile", storage_options={"a": True})
- by = io.BytesIO()
- with pytest.raises(ValueError, match="storage_options"):
- read_csv(by, storage_options={"a": True})
- df = DataFrame({"a": [0]})
- with pytest.raises(ValueError, match="storage_options"):
- df.to_parquet("nonfsspecpath", storage_options={"a": True})
|