test_fsspec.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. import io
  2. import numpy as np
  3. import pytest
  4. from pandas import (
  5. DataFrame,
  6. date_range,
  7. read_csv,
  8. read_excel,
  9. read_feather,
  10. read_json,
  11. read_parquet,
  12. read_pickle,
  13. read_stata,
  14. read_table,
  15. )
  16. import pandas._testing as tm
  17. from pandas.util import _test_decorators as td
  18. @pytest.fixture
  19. def df1():
  20. return DataFrame(
  21. {
  22. "int": [1, 3],
  23. "float": [2.0, np.nan],
  24. "str": ["t", "s"],
  25. "dt": date_range("2018-06-18", periods=2),
  26. }
  27. )
  28. @pytest.fixture
  29. def cleared_fs():
  30. fsspec = pytest.importorskip("fsspec")
  31. memfs = fsspec.filesystem("memory")
  32. yield memfs
  33. memfs.store.clear()
  34. def test_read_csv(cleared_fs, df1):
  35. text = str(df1.to_csv(index=False)).encode()
  36. with cleared_fs.open("test/test.csv", "wb") as w:
  37. w.write(text)
  38. df2 = read_csv("memory://test/test.csv", parse_dates=["dt"])
  39. tm.assert_frame_equal(df1, df2)
  40. def test_reasonable_error(monkeypatch, cleared_fs):
  41. from fsspec.registry import known_implementations
  42. with pytest.raises(ValueError, match="nosuchprotocol"):
  43. read_csv("nosuchprotocol://test/test.csv")
  44. err_msg = "test error message"
  45. monkeypatch.setitem(
  46. known_implementations,
  47. "couldexist",
  48. {"class": "unimportable.CouldExist", "err": err_msg},
  49. )
  50. with pytest.raises(ImportError, match=err_msg):
  51. read_csv("couldexist://test/test.csv")
  52. def test_to_csv(cleared_fs, df1):
  53. df1.to_csv("memory://test/test.csv", index=True)
  54. df2 = read_csv("memory://test/test.csv", parse_dates=["dt"], index_col=0)
  55. tm.assert_frame_equal(df1, df2)
  56. def test_to_excel(cleared_fs, df1):
  57. pytest.importorskip("openpyxl")
  58. ext = "xlsx"
  59. path = f"memory://test/test.{ext}"
  60. df1.to_excel(path, index=True)
  61. df2 = read_excel(path, parse_dates=["dt"], index_col=0)
  62. tm.assert_frame_equal(df1, df2)
  63. @pytest.mark.parametrize("binary_mode", [False, True])
  64. def test_to_csv_fsspec_object(cleared_fs, binary_mode, df1):
  65. fsspec = pytest.importorskip("fsspec")
  66. path = "memory://test/test.csv"
  67. mode = "wb" if binary_mode else "w"
  68. with fsspec.open(path, mode=mode).open() as fsspec_object:
  69. df1.to_csv(fsspec_object, index=True)
  70. assert not fsspec_object.closed
  71. mode = mode.replace("w", "r")
  72. with fsspec.open(path, mode=mode) as fsspec_object:
  73. df2 = read_csv(
  74. fsspec_object,
  75. parse_dates=["dt"],
  76. index_col=0,
  77. )
  78. assert not fsspec_object.closed
  79. tm.assert_frame_equal(df1, df2)
  80. def test_csv_options(fsspectest):
  81. df = DataFrame({"a": [0]})
  82. df.to_csv(
  83. "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
  84. )
  85. assert fsspectest.test[0] == "csv_write"
  86. read_csv("testmem://test/test.csv", storage_options={"test": "csv_read"})
  87. assert fsspectest.test[0] == "csv_read"
  88. def test_read_table_options(fsspectest):
  89. # GH #39167
  90. df = DataFrame({"a": [0]})
  91. df.to_csv(
  92. "testmem://test/test.csv", storage_options={"test": "csv_write"}, index=False
  93. )
  94. assert fsspectest.test[0] == "csv_write"
  95. read_table("testmem://test/test.csv", storage_options={"test": "csv_read"})
  96. assert fsspectest.test[0] == "csv_read"
  97. def test_excel_options(fsspectest):
  98. pytest.importorskip("openpyxl")
  99. extension = "xlsx"
  100. df = DataFrame({"a": [0]})
  101. path = f"testmem://test/test.{extension}"
  102. df.to_excel(path, storage_options={"test": "write"}, index=False)
  103. assert fsspectest.test[0] == "write"
  104. read_excel(path, storage_options={"test": "read"})
  105. assert fsspectest.test[0] == "read"
  106. @td.skip_if_no("fastparquet")
  107. def test_to_parquet_new_file(cleared_fs, df1):
  108. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  109. df1.to_parquet(
  110. "memory://test/test.csv", index=True, engine="fastparquet", compression=None
  111. )
  112. @td.skip_if_no("pyarrow")
  113. def test_arrowparquet_options(fsspectest):
  114. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  115. df = DataFrame({"a": [0]})
  116. df.to_parquet(
  117. "testmem://test/test.csv",
  118. engine="pyarrow",
  119. compression=None,
  120. storage_options={"test": "parquet_write"},
  121. )
  122. assert fsspectest.test[0] == "parquet_write"
  123. read_parquet(
  124. "testmem://test/test.csv",
  125. engine="pyarrow",
  126. storage_options={"test": "parquet_read"},
  127. )
  128. assert fsspectest.test[0] == "parquet_read"
  129. @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
  130. @td.skip_if_no("fastparquet")
  131. def test_fastparquet_options(fsspectest):
  132. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  133. df = DataFrame({"a": [0]})
  134. df.to_parquet(
  135. "testmem://test/test.csv",
  136. engine="fastparquet",
  137. compression=None,
  138. storage_options={"test": "parquet_write"},
  139. )
  140. assert fsspectest.test[0] == "parquet_write"
  141. read_parquet(
  142. "testmem://test/test.csv",
  143. engine="fastparquet",
  144. storage_options={"test": "parquet_read"},
  145. )
  146. assert fsspectest.test[0] == "parquet_read"
  147. @pytest.mark.single_cpu
  148. @td.skip_if_no("s3fs")
  149. def test_from_s3_csv(s3_resource, tips_file, s3so):
  150. tm.assert_equal(
  151. read_csv("s3://pandas-test/tips.csv", storage_options=s3so), read_csv(tips_file)
  152. )
  153. # the following are decompressed by pandas, not fsspec
  154. tm.assert_equal(
  155. read_csv("s3://pandas-test/tips.csv.gz", storage_options=s3so),
  156. read_csv(tips_file),
  157. )
  158. tm.assert_equal(
  159. read_csv("s3://pandas-test/tips.csv.bz2", storage_options=s3so),
  160. read_csv(tips_file),
  161. )
  162. @pytest.mark.single_cpu
  163. @pytest.mark.parametrize("protocol", ["s3", "s3a", "s3n"])
  164. @td.skip_if_no("s3fs")
  165. def test_s3_protocols(s3_resource, tips_file, protocol, s3so):
  166. tm.assert_equal(
  167. read_csv(f"{protocol}://pandas-test/tips.csv", storage_options=s3so),
  168. read_csv(tips_file),
  169. )
  170. @pytest.mark.single_cpu
  171. @td.skip_array_manager_not_yet_implemented # TODO(ArrayManager) fastparquet
  172. @td.skip_if_no("s3fs")
  173. @td.skip_if_no("fastparquet")
  174. def test_s3_parquet(s3_resource, s3so, df1):
  175. fn = "s3://pandas-test/test.parquet"
  176. df1.to_parquet(
  177. fn, index=False, engine="fastparquet", compression=None, storage_options=s3so
  178. )
  179. df2 = read_parquet(fn, engine="fastparquet", storage_options=s3so)
  180. tm.assert_equal(df1, df2)
  181. @td.skip_if_installed("fsspec")
  182. def test_not_present_exception():
  183. msg = "Missing optional dependency 'fsspec'|fsspec library is required"
  184. with pytest.raises(ImportError, match=msg):
  185. read_csv("memory://test/test.csv")
  186. @td.skip_if_no("pyarrow")
  187. def test_feather_options(fsspectest):
  188. df = DataFrame({"a": [0]})
  189. df.to_feather("testmem://afile", storage_options={"test": "feather_write"})
  190. assert fsspectest.test[0] == "feather_write"
  191. out = read_feather("testmem://afile", storage_options={"test": "feather_read"})
  192. assert fsspectest.test[0] == "feather_read"
  193. tm.assert_frame_equal(df, out)
  194. def test_pickle_options(fsspectest):
  195. df = DataFrame({"a": [0]})
  196. df.to_pickle("testmem://afile", storage_options={"test": "pickle_write"})
  197. assert fsspectest.test[0] == "pickle_write"
  198. out = read_pickle("testmem://afile", storage_options={"test": "pickle_read"})
  199. assert fsspectest.test[0] == "pickle_read"
  200. tm.assert_frame_equal(df, out)
  201. def test_json_options(fsspectest, compression):
  202. df = DataFrame({"a": [0]})
  203. df.to_json(
  204. "testmem://afile",
  205. compression=compression,
  206. storage_options={"test": "json_write"},
  207. )
  208. assert fsspectest.test[0] == "json_write"
  209. out = read_json(
  210. "testmem://afile",
  211. compression=compression,
  212. storage_options={"test": "json_read"},
  213. )
  214. assert fsspectest.test[0] == "json_read"
  215. tm.assert_frame_equal(df, out)
  216. def test_stata_options(fsspectest):
  217. df = DataFrame({"a": [0]})
  218. df.to_stata(
  219. "testmem://afile", storage_options={"test": "stata_write"}, write_index=False
  220. )
  221. assert fsspectest.test[0] == "stata_write"
  222. out = read_stata("testmem://afile", storage_options={"test": "stata_read"})
  223. assert fsspectest.test[0] == "stata_read"
  224. tm.assert_frame_equal(df, out.astype("int64"))
  225. @td.skip_if_no("tabulate")
  226. def test_markdown_options(fsspectest):
  227. df = DataFrame({"a": [0]})
  228. df.to_markdown("testmem://afile", storage_options={"test": "md_write"})
  229. assert fsspectest.test[0] == "md_write"
  230. assert fsspectest.cat("testmem://afile")
  231. @td.skip_if_no("pyarrow")
  232. def test_non_fsspec_options():
  233. with pytest.raises(ValueError, match="storage_options"):
  234. read_csv("localfile", storage_options={"a": True})
  235. with pytest.raises(ValueError, match="storage_options"):
  236. # separate test for parquet, which has a different code path
  237. read_parquet("localfile", storage_options={"a": True})
  238. by = io.BytesIO()
  239. with pytest.raises(ValueError, match="storage_options"):
  240. read_csv(by, storage_options={"a": True})
  241. df = DataFrame({"a": [0]})
  242. with pytest.raises(ValueError, match="storage_options"):
  243. df.to_parquet("nonfsspecpath", storage_options={"a": True})