conftest.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. import os
  2. import shlex
  3. import subprocess
  4. import time
  5. import pytest
  6. from pandas.compat import (
  7. is_ci_environment,
  8. is_platform_arm,
  9. is_platform_mac,
  10. is_platform_windows,
  11. )
  12. import pandas.util._test_decorators as td
  13. import pandas._testing as tm
  14. from pandas.io.parsers import read_csv
  15. @pytest.fixture
  16. def tips_file(datapath):
  17. """Path to the tips dataset"""
  18. return datapath("io", "data", "csv", "tips.csv")
  19. @pytest.fixture
  20. def jsonl_file(datapath):
  21. """Path to a JSONL dataset"""
  22. return datapath("io", "parser", "data", "items.jsonl")
  23. @pytest.fixture
  24. def salaries_table(datapath):
  25. """DataFrame with the salaries dataset"""
  26. return read_csv(datapath("io", "parser", "data", "salaries.csv"), sep="\t")
  27. @pytest.fixture
  28. def feather_file(datapath):
  29. return datapath("io", "data", "feather", "feather-0_3_1.feather")
  30. @pytest.fixture
  31. def s3so(worker_id):
  32. if is_ci_environment():
  33. url = "http://localhost:5000/"
  34. else:
  35. worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
  36. url = f"http://127.0.0.1:555{worker_id}/"
  37. return {"client_kwargs": {"endpoint_url": url}}
  38. @pytest.fixture(scope="session")
  39. def s3_base(worker_id):
  40. """
  41. Fixture for mocking S3 interaction.
  42. Sets up moto server in separate process locally
  43. Return url for motoserver/moto CI service
  44. """
  45. pytest.importorskip("s3fs")
  46. pytest.importorskip("boto3")
  47. with tm.ensure_safe_environment_variables():
  48. # temporary workaround as moto fails for botocore >= 1.11 otherwise,
  49. # see https://github.com/spulec/moto/issues/1924 & 1952
  50. os.environ.setdefault("AWS_ACCESS_KEY_ID", "foobar_key")
  51. os.environ.setdefault("AWS_SECRET_ACCESS_KEY", "foobar_secret")
  52. if is_ci_environment():
  53. if is_platform_arm() or is_platform_mac() or is_platform_windows():
  54. # NOT RUN on Windows/macOS/ARM, only Ubuntu
  55. # - subprocess in CI can cause timeouts
  56. # - GitHub Actions do not support
  57. # container services for the above OSs
  58. # - CircleCI will probably hit the Docker rate pull limit
  59. pytest.skip(
  60. "S3 tests do not have a corresponding service in "
  61. "Windows, macOS or ARM platforms"
  62. )
  63. else:
  64. yield "http://localhost:5000"
  65. else:
  66. requests = pytest.importorskip("requests")
  67. pytest.importorskip("moto", minversion="1.3.14")
  68. pytest.importorskip("flask") # server mode needs flask too
  69. # Launching moto in server mode, i.e., as a separate process
  70. # with an S3 endpoint on localhost
  71. worker_id = "5" if worker_id == "master" else worker_id.lstrip("gw")
  72. endpoint_port = f"555{worker_id}"
  73. endpoint_uri = f"http://127.0.0.1:{endpoint_port}/"
  74. # pipe to null to avoid logging in terminal
  75. with subprocess.Popen(
  76. shlex.split(f"moto_server s3 -p {endpoint_port}"),
  77. stdout=subprocess.DEVNULL,
  78. stderr=subprocess.DEVNULL,
  79. ) as proc:
  80. timeout = 5
  81. while timeout > 0:
  82. try:
  83. # OK to go once server is accepting connections
  84. r = requests.get(endpoint_uri)
  85. if r.ok:
  86. break
  87. except Exception:
  88. pass
  89. timeout -= 0.1
  90. time.sleep(0.1)
  91. yield endpoint_uri
  92. proc.terminate()
  93. @pytest.fixture
  94. def s3_resource(s3_base, tips_file, jsonl_file, feather_file):
  95. """
  96. Sets up S3 bucket with contents
  97. The primary bucket name is "pandas-test". The following datasets
  98. are loaded.
  99. - tips.csv
  100. - tips.csv.gz
  101. - tips.csv.bz2
  102. - items.jsonl
  103. A private bucket "cant_get_it" is also created. The boto3 s3 resource
  104. is yielded by the fixture.
  105. """
  106. import boto3
  107. import s3fs
  108. test_s3_files = [
  109. ("tips#1.csv", tips_file),
  110. ("tips.csv", tips_file),
  111. ("tips.csv.gz", tips_file + ".gz"),
  112. ("tips.csv.bz2", tips_file + ".bz2"),
  113. ("items.jsonl", jsonl_file),
  114. ("simple_dataset.feather", feather_file),
  115. ]
  116. def add_tips_files(bucket_name):
  117. for s3_key, file_name in test_s3_files:
  118. with open(file_name, "rb") as f:
  119. cli.put_object(Bucket=bucket_name, Key=s3_key, Body=f)
  120. bucket = "pandas-test"
  121. conn = boto3.resource("s3", endpoint_url=s3_base)
  122. cli = boto3.client("s3", endpoint_url=s3_base)
  123. try:
  124. cli.create_bucket(Bucket=bucket)
  125. except Exception:
  126. # OK is bucket already exists
  127. pass
  128. try:
  129. cli.create_bucket(Bucket="cant_get_it", ACL="private")
  130. except Exception:
  131. # OK is bucket already exists
  132. pass
  133. timeout = 2
  134. while not cli.list_buckets()["Buckets"] and timeout > 0:
  135. time.sleep(0.1)
  136. timeout -= 0.1
  137. add_tips_files(bucket)
  138. add_tips_files("cant_get_it")
  139. s3fs.S3FileSystem.clear_instance_cache()
  140. yield conn
  141. s3 = s3fs.S3FileSystem(client_kwargs={"endpoint_url": s3_base})
  142. try:
  143. s3.rm(bucket, recursive=True)
  144. except Exception:
  145. pass
  146. try:
  147. s3.rm("cant_get_it", recursive=True)
  148. except Exception:
  149. pass
  150. timeout = 2
  151. while cli.list_buckets()["Buckets"] and timeout > 0:
  152. time.sleep(0.1)
  153. timeout -= 0.1
  154. _compression_formats_params = [
  155. (".no_compress", None),
  156. ("", None),
  157. (".gz", "gzip"),
  158. (".GZ", "gzip"),
  159. (".bz2", "bz2"),
  160. (".BZ2", "bz2"),
  161. (".zip", "zip"),
  162. (".ZIP", "zip"),
  163. (".xz", "xz"),
  164. (".XZ", "xz"),
  165. pytest.param((".zst", "zstd"), marks=td.skip_if_no("zstandard")),
  166. pytest.param((".ZST", "zstd"), marks=td.skip_if_no("zstandard")),
  167. ]
  168. @pytest.fixture(params=_compression_formats_params[1:])
  169. def compression_format(request):
  170. return request.param
  171. @pytest.fixture(params=_compression_formats_params)
  172. def compression_ext(request):
  173. return request.param[0]