test_gcs.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. from io import BytesIO
  2. import os
  3. import tarfile
  4. import zipfile
  5. import numpy as np
  6. import pytest
  7. from pandas import (
  8. DataFrame,
  9. date_range,
  10. read_csv,
  11. read_excel,
  12. read_json,
  13. read_parquet,
  14. )
  15. import pandas._testing as tm
  16. from pandas.tests.io.test_compression import _compression_to_extension
  17. from pandas.util import _test_decorators as td
  18. @pytest.fixture
  19. def gcs_buffer(monkeypatch):
  20. """Emulate GCS using a binary buffer."""
  21. import fsspec
  22. gcs_buffer = BytesIO()
  23. gcs_buffer.close = lambda: True
  24. class MockGCSFileSystem(fsspec.AbstractFileSystem):
  25. @staticmethod
  26. def open(*args, **kwargs):
  27. gcs_buffer.seek(0)
  28. return gcs_buffer
  29. def ls(self, path, **kwargs):
  30. # needed for pyarrow
  31. return [{"name": path, "type": "file"}]
  32. # Overwrites the default implementation from gcsfs to our mock class
  33. fsspec.register_implementation("gs", MockGCSFileSystem, clobber=True)
  34. return gcs_buffer
  35. @td.skip_if_no("gcsfs")
  36. @pytest.mark.parametrize("format", ["csv", "json", "parquet", "excel", "markdown"])
  37. def test_to_read_gcs(gcs_buffer, format):
  38. """
  39. Test that many to/read functions support GCS.
  40. GH 33987
  41. """
  42. df1 = DataFrame(
  43. {
  44. "int": [1, 3],
  45. "float": [2.0, np.nan],
  46. "str": ["t", "s"],
  47. "dt": date_range("2018-06-18", periods=2),
  48. }
  49. )
  50. path = f"gs://test/test.{format}"
  51. if format == "csv":
  52. df1.to_csv(path, index=True)
  53. df2 = read_csv(path, parse_dates=["dt"], index_col=0)
  54. elif format == "excel":
  55. path = "gs://test/test.xlsx"
  56. df1.to_excel(path)
  57. df2 = read_excel(path, parse_dates=["dt"], index_col=0)
  58. elif format == "json":
  59. df1.to_json(path)
  60. df2 = read_json(path, convert_dates=["dt"])
  61. elif format == "parquet":
  62. pytest.importorskip("pyarrow")
  63. df1.to_parquet(path)
  64. df2 = read_parquet(path)
  65. elif format == "markdown":
  66. pytest.importorskip("tabulate")
  67. df1.to_markdown(path)
  68. df2 = df1
  69. tm.assert_frame_equal(df1, df2)
  70. def assert_equal_zip_safe(result: bytes, expected: bytes, compression: str):
  71. """
  72. For zip compression, only compare the CRC-32 checksum of the file contents
  73. to avoid checking the time-dependent last-modified timestamp which
  74. in some CI builds is off-by-one
  75. See https://en.wikipedia.org/wiki/ZIP_(file_format)#File_headers
  76. """
  77. if compression == "zip":
  78. # Only compare the CRC checksum of the file contents
  79. with zipfile.ZipFile(BytesIO(result)) as exp, zipfile.ZipFile(
  80. BytesIO(expected)
  81. ) as res:
  82. for res_info, exp_info in zip(res.infolist(), exp.infolist()):
  83. assert res_info.CRC == exp_info.CRC
  84. elif compression == "tar":
  85. with tarfile.open(fileobj=BytesIO(result)) as tar_exp, tarfile.open(
  86. fileobj=BytesIO(expected)
  87. ) as tar_res:
  88. for tar_res_info, tar_exp_info in zip(
  89. tar_res.getmembers(), tar_exp.getmembers()
  90. ):
  91. actual_file = tar_res.extractfile(tar_res_info)
  92. expected_file = tar_exp.extractfile(tar_exp_info)
  93. assert (actual_file is None) == (expected_file is None)
  94. if actual_file is not None and expected_file is not None:
  95. assert actual_file.read() == expected_file.read()
  96. else:
  97. assert result == expected
  98. @td.skip_if_no("gcsfs")
  99. @pytest.mark.parametrize("encoding", ["utf-8", "cp1251"])
  100. def test_to_csv_compression_encoding_gcs(gcs_buffer, compression_only, encoding):
  101. """
  102. Compression and encoding should with GCS.
  103. GH 35677 (to_csv, compression), GH 26124 (to_csv, encoding), and
  104. GH 32392 (read_csv, encoding)
  105. """
  106. df = tm.makeDataFrame()
  107. # reference of compressed and encoded file
  108. compression = {"method": compression_only}
  109. if compression_only == "gzip":
  110. compression["mtime"] = 1 # be reproducible
  111. buffer = BytesIO()
  112. df.to_csv(buffer, compression=compression, encoding=encoding, mode="wb")
  113. # write compressed file with explicit compression
  114. path_gcs = "gs://test/test.csv"
  115. df.to_csv(path_gcs, compression=compression, encoding=encoding)
  116. res = gcs_buffer.getvalue()
  117. expected = buffer.getvalue()
  118. assert_equal_zip_safe(res, expected, compression_only)
  119. read_df = read_csv(
  120. path_gcs, index_col=0, compression=compression_only, encoding=encoding
  121. )
  122. tm.assert_frame_equal(df, read_df)
  123. # write compressed file with implicit compression
  124. file_ext = _compression_to_extension[compression_only]
  125. compression["method"] = "infer"
  126. path_gcs += f".{file_ext}"
  127. df.to_csv(path_gcs, compression=compression, encoding=encoding)
  128. res = gcs_buffer.getvalue()
  129. expected = buffer.getvalue()
  130. assert_equal_zip_safe(res, expected, compression_only)
  131. read_df = read_csv(path_gcs, index_col=0, compression="infer", encoding=encoding)
  132. tm.assert_frame_equal(df, read_df)
  133. @td.skip_if_no("fastparquet")
  134. @td.skip_if_no("gcsfs")
  135. def test_to_parquet_gcs_new_file(monkeypatch, tmpdir):
  136. """Regression test for writing to a not-yet-existent GCS Parquet file."""
  137. from fsspec import AbstractFileSystem
  138. df1 = DataFrame(
  139. {
  140. "int": [1, 3],
  141. "float": [2.0, np.nan],
  142. "str": ["t", "s"],
  143. "dt": date_range("2018-06-18", periods=2),
  144. }
  145. )
  146. class MockGCSFileSystem(AbstractFileSystem):
  147. def open(self, path, mode="r", *args):
  148. if "w" not in mode:
  149. raise FileNotFoundError
  150. return open(os.path.join(tmpdir, "test.parquet"), mode)
  151. monkeypatch.setattr("gcsfs.GCSFileSystem", MockGCSFileSystem)
  152. df1.to_parquet(
  153. "gs://test/test.csv", index=True, engine="fastparquet", compression=None
  154. )
  155. @td.skip_if_installed("gcsfs")
  156. def test_gcs_not_present_exception():
  157. with tm.external_error_raised(ImportError):
  158. read_csv("gs://test/test.csv")