test_network.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. """
  2. Tests parsers ability to read and parse non-local files
  3. and hence require a network connection to be read.
  4. """
  5. from io import (
  6. BytesIO,
  7. StringIO,
  8. )
  9. import logging
  10. import numpy as np
  11. import pytest
  12. from pandas.compat import is_ci_environment
  13. import pandas.util._test_decorators as td
  14. from pandas import DataFrame
  15. import pandas._testing as tm
  16. from pandas.tests.io.test_compression import _compression_to_extension
  17. from pandas.io.feather_format import read_feather
  18. from pandas.io.parsers import read_csv
  19. @pytest.mark.network
  20. @tm.network(
  21. url=(
  22. "https://github.com/pandas-dev/pandas/raw/main/"
  23. "pandas/tests/io/parser/data/salaries.csv"
  24. ),
  25. check_before_test=True,
  26. )
  27. @pytest.mark.parametrize("mode", ["explicit", "infer"])
  28. @pytest.mark.parametrize("engine", ["python", "c"])
  29. def test_compressed_urls(salaries_table, mode, engine, compression_only):
  30. # test reading compressed urls with various engines and
  31. # extension inference
  32. extension = _compression_to_extension[compression_only]
  33. base_url = (
  34. "https://github.com/pandas-dev/pandas/raw/main/"
  35. "pandas/tests/io/parser/data/salaries.csv"
  36. )
  37. url = base_url + extension
  38. if mode != "explicit":
  39. compression_only = mode
  40. url_table = read_csv(url, sep="\t", compression=compression_only, engine=engine)
  41. tm.assert_frame_equal(url_table, salaries_table)
  42. @pytest.mark.network
  43. @tm.network(
  44. url=(
  45. "https://raw.githubusercontent.com/pandas-dev/pandas/main/"
  46. "pandas/tests/io/parser/data/unicode_series.csv"
  47. ),
  48. check_before_test=True,
  49. )
  50. def test_url_encoding_csv():
  51. """
  52. read_csv should honor the requested encoding for URLs.
  53. GH 10424
  54. """
  55. path = (
  56. "https://raw.githubusercontent.com/pandas-dev/pandas/main/"
  57. + "pandas/tests/io/parser/data/unicode_series.csv"
  58. )
  59. df = read_csv(path, encoding="latin-1", header=None)
  60. assert df.loc[15, 1] == "Á köldum klaka (Cold Fever) (1994)"
  61. @pytest.fixture
  62. def tips_df(datapath):
  63. """DataFrame with the tips dataset."""
  64. return read_csv(datapath("io", "data", "csv", "tips.csv"))
  65. @pytest.mark.single_cpu
  66. @pytest.mark.usefixtures("s3_resource")
  67. @pytest.mark.xfail(
  68. reason="CI race condition GH 45433, GH 44584",
  69. raises=FileNotFoundError,
  70. strict=False,
  71. )
  72. @td.skip_if_not_us_locale()
  73. class TestS3:
  74. @td.skip_if_no("s3fs")
  75. def test_parse_public_s3_bucket(self, tips_df, s3so):
  76. # more of an integration test due to the not-public contents portion
  77. # can probably mock this though.
  78. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  79. df = read_csv(
  80. "s3://pandas-test/tips.csv" + ext,
  81. compression=comp,
  82. storage_options=s3so,
  83. )
  84. assert isinstance(df, DataFrame)
  85. assert not df.empty
  86. tm.assert_frame_equal(df, tips_df)
  87. # Read public file from bucket with not-public contents
  88. df = read_csv("s3://cant_get_it/tips.csv", storage_options=s3so)
  89. assert isinstance(df, DataFrame)
  90. assert not df.empty
  91. tm.assert_frame_equal(df, tips_df)
  92. def test_parse_public_s3n_bucket(self, tips_df, s3so):
  93. # Read from AWS s3 as "s3n" URL
  94. df = read_csv("s3n://pandas-test/tips.csv", nrows=10, storage_options=s3so)
  95. assert isinstance(df, DataFrame)
  96. assert not df.empty
  97. tm.assert_frame_equal(tips_df.iloc[:10], df)
  98. def test_parse_public_s3a_bucket(self, tips_df, s3so):
  99. # Read from AWS s3 as "s3a" URL
  100. df = read_csv("s3a://pandas-test/tips.csv", nrows=10, storage_options=s3so)
  101. assert isinstance(df, DataFrame)
  102. assert not df.empty
  103. tm.assert_frame_equal(tips_df.iloc[:10], df)
  104. def test_parse_public_s3_bucket_nrows(self, tips_df, s3so):
  105. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  106. df = read_csv(
  107. "s3://pandas-test/tips.csv" + ext,
  108. nrows=10,
  109. compression=comp,
  110. storage_options=s3so,
  111. )
  112. assert isinstance(df, DataFrame)
  113. assert not df.empty
  114. tm.assert_frame_equal(tips_df.iloc[:10], df)
  115. def test_parse_public_s3_bucket_chunked(self, tips_df, s3so):
  116. # Read with a chunksize
  117. chunksize = 5
  118. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  119. with read_csv(
  120. "s3://pandas-test/tips.csv" + ext,
  121. chunksize=chunksize,
  122. compression=comp,
  123. storage_options=s3so,
  124. ) as df_reader:
  125. assert df_reader.chunksize == chunksize
  126. for i_chunk in [0, 1, 2]:
  127. # Read a couple of chunks and make sure we see them
  128. # properly.
  129. df = df_reader.get_chunk()
  130. assert isinstance(df, DataFrame)
  131. assert not df.empty
  132. true_df = tips_df.iloc[
  133. chunksize * i_chunk : chunksize * (i_chunk + 1)
  134. ]
  135. tm.assert_frame_equal(true_df, df)
  136. def test_parse_public_s3_bucket_chunked_python(self, tips_df, s3so):
  137. # Read with a chunksize using the Python parser
  138. chunksize = 5
  139. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  140. with read_csv(
  141. "s3://pandas-test/tips.csv" + ext,
  142. chunksize=chunksize,
  143. compression=comp,
  144. engine="python",
  145. storage_options=s3so,
  146. ) as df_reader:
  147. assert df_reader.chunksize == chunksize
  148. for i_chunk in [0, 1, 2]:
  149. # Read a couple of chunks and make sure we see them properly.
  150. df = df_reader.get_chunk()
  151. assert isinstance(df, DataFrame)
  152. assert not df.empty
  153. true_df = tips_df.iloc[
  154. chunksize * i_chunk : chunksize * (i_chunk + 1)
  155. ]
  156. tm.assert_frame_equal(true_df, df)
  157. def test_parse_public_s3_bucket_python(self, tips_df, s3so):
  158. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  159. df = read_csv(
  160. "s3://pandas-test/tips.csv" + ext,
  161. engine="python",
  162. compression=comp,
  163. storage_options=s3so,
  164. )
  165. assert isinstance(df, DataFrame)
  166. assert not df.empty
  167. tm.assert_frame_equal(df, tips_df)
  168. def test_infer_s3_compression(self, tips_df, s3so):
  169. for ext in ["", ".gz", ".bz2"]:
  170. df = read_csv(
  171. "s3://pandas-test/tips.csv" + ext,
  172. engine="python",
  173. compression="infer",
  174. storage_options=s3so,
  175. )
  176. assert isinstance(df, DataFrame)
  177. assert not df.empty
  178. tm.assert_frame_equal(df, tips_df)
  179. def test_parse_public_s3_bucket_nrows_python(self, tips_df, s3so):
  180. for ext, comp in [("", None), (".gz", "gzip"), (".bz2", "bz2")]:
  181. df = read_csv(
  182. "s3://pandas-test/tips.csv" + ext,
  183. engine="python",
  184. nrows=10,
  185. compression=comp,
  186. storage_options=s3so,
  187. )
  188. assert isinstance(df, DataFrame)
  189. assert not df.empty
  190. tm.assert_frame_equal(tips_df.iloc[:10], df)
  191. def test_read_s3_fails(self, s3so):
  192. msg = "The specified bucket does not exist"
  193. with pytest.raises(OSError, match=msg):
  194. read_csv("s3://nyqpug/asdf.csv", storage_options=s3so)
  195. # Receive a permission error when trying to read a private bucket.
  196. # It's irrelevant here that this isn't actually a table.
  197. with pytest.raises(OSError, match=msg):
  198. read_csv("s3://cant_get_it/file.csv")
  199. @pytest.mark.xfail(reason="GH#39155 s3fs upgrade", strict=False)
  200. def test_write_s3_csv_fails(self, tips_df, s3so):
  201. # GH 32486
  202. # Attempting to write to an invalid S3 path should raise
  203. import botocore
  204. # GH 34087
  205. # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
  206. # Catch a ClientError since AWS Service Errors are defined dynamically
  207. error = (FileNotFoundError, botocore.exceptions.ClientError)
  208. with pytest.raises(error, match="The specified bucket does not exist"):
  209. tips_df.to_csv(
  210. "s3://an_s3_bucket_data_doesnt_exit/not_real.csv", storage_options=s3so
  211. )
  212. @pytest.mark.xfail(reason="GH#39155 s3fs upgrade", strict=False)
  213. @td.skip_if_no("pyarrow")
  214. def test_write_s3_parquet_fails(self, tips_df, s3so):
  215. # GH 27679
  216. # Attempting to write to an invalid S3 path should raise
  217. import botocore
  218. # GH 34087
  219. # https://boto3.amazonaws.com/v1/documentation/api/latest/guide/error-handling.html
  220. # Catch a ClientError since AWS Service Errors are defined dynamically
  221. error = (FileNotFoundError, botocore.exceptions.ClientError)
  222. with pytest.raises(error, match="The specified bucket does not exist"):
  223. tips_df.to_parquet(
  224. "s3://an_s3_bucket_data_doesnt_exit/not_real.parquet",
  225. storage_options=s3so,
  226. )
  227. @pytest.mark.single_cpu
  228. def test_read_csv_handles_boto_s3_object(self, s3_resource, tips_file):
  229. # see gh-16135
  230. s3_object = s3_resource.meta.client.get_object(
  231. Bucket="pandas-test", Key="tips.csv"
  232. )
  233. with BytesIO(s3_object["Body"].read()) as buffer:
  234. result = read_csv(buffer, encoding="utf8")
  235. assert isinstance(result, DataFrame)
  236. assert not result.empty
  237. expected = read_csv(tips_file)
  238. tm.assert_frame_equal(result, expected)
  239. @pytest.mark.single_cpu
  240. @pytest.mark.skipif(
  241. is_ci_environment(),
  242. reason="This test can hang in our CI min_versions build "
  243. "and leads to '##[error]The runner has "
  244. "received a shutdown signal...' in GHA. GH: 45651",
  245. )
  246. def test_read_csv_chunked_download(self, s3_resource, caplog, s3so):
  247. # 8 MB, S3FS uses 5MB chunks
  248. import s3fs
  249. df = DataFrame(np.random.randn(100000, 4), columns=list("abcd"))
  250. str_buf = StringIO()
  251. df.to_csv(str_buf)
  252. buf = BytesIO(str_buf.getvalue().encode("utf-8"))
  253. s3_resource.Bucket("pandas-test").put_object(Key="large-file.csv", Body=buf)
  254. # Possibly some state leaking in between tests.
  255. # If we don't clear this cache, we saw `GetObject operation: Forbidden`.
  256. # Presumably the s3fs instance is being cached, with the directory listing
  257. # from *before* we add the large-file.csv in the pandas-test bucket.
  258. s3fs.S3FileSystem.clear_instance_cache()
  259. with caplog.at_level(logging.DEBUG, logger="s3fs"):
  260. read_csv("s3://pandas-test/large-file.csv", nrows=5, storage_options=s3so)
  261. # log of fetch_range (start, stop)
  262. assert (0, 5505024) in (x.args[-2:] for x in caplog.records)
  263. def test_read_s3_with_hash_in_key(self, tips_df, s3so):
  264. # GH 25945
  265. result = read_csv("s3://pandas-test/tips#1.csv", storage_options=s3so)
  266. tm.assert_frame_equal(tips_df, result)
  267. @td.skip_if_no("pyarrow")
  268. def test_read_feather_s3_file_path(self, feather_file, s3so):
  269. # GH 29055
  270. expected = read_feather(feather_file)
  271. res = read_feather(
  272. "s3://pandas-test/simple_dataset.feather", storage_options=s3so
  273. )
  274. tm.assert_frame_equal(expected, res)