123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340 |
- import gzip
- import io
- import os
- from pathlib import Path
- import subprocess
- import sys
- import tarfile
- import textwrap
- import time
- import zipfile
- import pytest
- from pandas.compat import is_platform_windows
- import pandas as pd
- import pandas._testing as tm
- import pandas.io.common as icom
- _compression_to_extension = {
- value: key for key, value in icom.extension_to_compression.items()
- }
- @pytest.mark.parametrize(
- "obj",
- [
- pd.DataFrame(
- 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
- columns=["X", "Y", "Z"],
- ),
- pd.Series(100 * [0.123456, 0.234567, 0.567567], name="X"),
- ],
- )
- @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
- def test_compression_size(obj, method, compression_only):
- if compression_only == "tar":
- compression_only = {"method": "tar", "mode": "w:gz"}
- with tm.ensure_clean() as path:
- getattr(obj, method)(path, compression=compression_only)
- compressed_size = os.path.getsize(path)
- getattr(obj, method)(path, compression=None)
- uncompressed_size = os.path.getsize(path)
- assert uncompressed_size > compressed_size
- @pytest.mark.parametrize(
- "obj",
- [
- pd.DataFrame(
- 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
- columns=["X", "Y", "Z"],
- ),
- pd.Series(100 * [0.123456, 0.234567, 0.567567], name="X"),
- ],
- )
- @pytest.mark.parametrize("method", ["to_csv", "to_json"])
- def test_compression_size_fh(obj, method, compression_only):
- with tm.ensure_clean() as path:
- with icom.get_handle(
- path,
- "w:gz" if compression_only == "tar" else "w",
- compression=compression_only,
- ) as handles:
- getattr(obj, method)(handles.handle)
- assert not handles.handle.closed
- compressed_size = os.path.getsize(path)
- with tm.ensure_clean() as path:
- with icom.get_handle(path, "w", compression=None) as handles:
- getattr(obj, method)(handles.handle)
- assert not handles.handle.closed
- uncompressed_size = os.path.getsize(path)
- assert uncompressed_size > compressed_size
- @pytest.mark.parametrize(
- "write_method, write_kwargs, read_method",
- [
- ("to_csv", {"index": False}, pd.read_csv),
- ("to_json", {}, pd.read_json),
- ("to_pickle", {}, pd.read_pickle),
- ],
- )
- def test_dataframe_compression_defaults_to_infer(
- write_method, write_kwargs, read_method, compression_only
- ):
- # GH22004
- input = pd.DataFrame([[1.0, 0, -4], [3.4, 5, 2]], columns=["X", "Y", "Z"])
- extension = _compression_to_extension[compression_only]
- with tm.ensure_clean("compressed" + extension) as path:
- getattr(input, write_method)(path, **write_kwargs)
- output = read_method(path, compression=compression_only)
- tm.assert_frame_equal(output, input)
- @pytest.mark.parametrize(
- "write_method,write_kwargs,read_method,read_kwargs",
- [
- ("to_csv", {"index": False, "header": True}, pd.read_csv, {"squeeze": True}),
- ("to_json", {}, pd.read_json, {"typ": "series"}),
- ("to_pickle", {}, pd.read_pickle, {}),
- ],
- )
- def test_series_compression_defaults_to_infer(
- write_method, write_kwargs, read_method, read_kwargs, compression_only
- ):
- # GH22004
- input = pd.Series([0, 5, -2, 10], name="X")
- extension = _compression_to_extension[compression_only]
- with tm.ensure_clean("compressed" + extension) as path:
- getattr(input, write_method)(path, **write_kwargs)
- if "squeeze" in read_kwargs:
- kwargs = read_kwargs.copy()
- del kwargs["squeeze"]
- output = read_method(path, compression=compression_only, **kwargs).squeeze(
- "columns"
- )
- else:
- output = read_method(path, compression=compression_only, **read_kwargs)
- tm.assert_series_equal(output, input, check_names=False)
- def test_compression_warning(compression_only):
- # Assert that passing a file object to to_csv while explicitly specifying a
- # compression protocol triggers a RuntimeWarning, as per GH21227.
- df = pd.DataFrame(
- 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
- columns=["X", "Y", "Z"],
- )
- with tm.ensure_clean() as path:
- with icom.get_handle(path, "w", compression=compression_only) as handles:
- with tm.assert_produces_warning(RuntimeWarning):
- df.to_csv(handles.handle, compression=compression_only)
- def test_compression_binary(compression_only):
- """
- Binary file handles support compression.
- GH22555
- """
- df = tm.makeDataFrame()
- # with a file
- with tm.ensure_clean() as path:
- with open(path, mode="wb") as file:
- df.to_csv(file, mode="wb", compression=compression_only)
- file.seek(0) # file shouldn't be closed
- tm.assert_frame_equal(
- df, pd.read_csv(path, index_col=0, compression=compression_only)
- )
- # with BytesIO
- file = io.BytesIO()
- df.to_csv(file, mode="wb", compression=compression_only)
- file.seek(0) # file shouldn't be closed
- tm.assert_frame_equal(
- df, pd.read_csv(file, index_col=0, compression=compression_only)
- )
- def test_gzip_reproducibility_file_name():
- """
- Gzip should create reproducible archives with mtime.
- Note: Archives created with different filenames will still be different!
- GH 28103
- """
- df = tm.makeDataFrame()
- compression_options = {"method": "gzip", "mtime": 1}
- # test for filename
- with tm.ensure_clean() as path:
- path = Path(path)
- df.to_csv(path, compression=compression_options)
- time.sleep(2)
- output = path.read_bytes()
- df.to_csv(path, compression=compression_options)
- assert output == path.read_bytes()
- def test_gzip_reproducibility_file_object():
- """
- Gzip should create reproducible archives with mtime.
- GH 28103
- """
- df = tm.makeDataFrame()
- compression_options = {"method": "gzip", "mtime": 1}
- # test for file object
- buffer = io.BytesIO()
- df.to_csv(buffer, compression=compression_options, mode="wb")
- output = buffer.getvalue()
- time.sleep(2)
- buffer = io.BytesIO()
- df.to_csv(buffer, compression=compression_options, mode="wb")
- assert output == buffer.getvalue()
- def test_with_missing_lzma():
- """Tests if import pandas works when lzma is not present."""
- # https://github.com/pandas-dev/pandas/issues/27575
- code = textwrap.dedent(
- """\
- import sys
- sys.modules['lzma'] = None
- import pandas
- """
- )
- subprocess.check_output([sys.executable, "-c", code], stderr=subprocess.PIPE)
- def test_with_missing_lzma_runtime():
- """Tests if RuntimeError is hit when calling lzma without
- having the module available.
- """
- code = textwrap.dedent(
- """
- import sys
- import pytest
- sys.modules['lzma'] = None
- import pandas as pd
- df = pd.DataFrame()
- with pytest.raises(RuntimeError, match='lzma module'):
- df.to_csv('foo.csv', compression='xz')
- """
- )
- subprocess.check_output([sys.executable, "-c", code], stderr=subprocess.PIPE)
- @pytest.mark.parametrize(
- "obj",
- [
- pd.DataFrame(
- 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
- columns=["X", "Y", "Z"],
- ),
- pd.Series(100 * [0.123456, 0.234567, 0.567567], name="X"),
- ],
- )
- @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
- def test_gzip_compression_level(obj, method):
- # GH33196
- with tm.ensure_clean() as path:
- getattr(obj, method)(path, compression="gzip")
- compressed_size_default = os.path.getsize(path)
- getattr(obj, method)(path, compression={"method": "gzip", "compresslevel": 1})
- compressed_size_fast = os.path.getsize(path)
- assert compressed_size_default < compressed_size_fast
- @pytest.mark.parametrize(
- "obj",
- [
- pd.DataFrame(
- 100 * [[0.123456, 0.234567, 0.567567], [12.32112, 123123.2, 321321.2]],
- columns=["X", "Y", "Z"],
- ),
- pd.Series(100 * [0.123456, 0.234567, 0.567567], name="X"),
- ],
- )
- @pytest.mark.parametrize("method", ["to_pickle", "to_json", "to_csv"])
- def test_bzip_compression_level(obj, method):
- """GH33196 bzip needs file size > 100k to show a size difference between
- compression levels, so here we just check if the call works when
- compression is passed as a dict.
- """
- with tm.ensure_clean() as path:
- getattr(obj, method)(path, compression={"method": "bz2", "compresslevel": 1})
- @pytest.mark.parametrize(
- "suffix,archive",
- [
- (".zip", zipfile.ZipFile),
- (".tar", tarfile.TarFile),
- ],
- )
- def test_empty_archive_zip(suffix, archive):
- with tm.ensure_clean(filename=suffix) as path:
- with archive(path, "w"):
- pass
- with pytest.raises(ValueError, match="Zero files found"):
- pd.read_csv(path)
- def test_ambiguous_archive_zip():
- with tm.ensure_clean(filename=".zip") as path:
- with zipfile.ZipFile(path, "w") as file:
- file.writestr("a.csv", "foo,bar")
- file.writestr("b.csv", "foo,bar")
- with pytest.raises(ValueError, match="Multiple files found in ZIP file"):
- pd.read_csv(path)
- def test_ambiguous_archive_tar(tmp_path):
- csvAPath = tmp_path / "a.csv"
- with open(csvAPath, "w") as a:
- a.write("foo,bar\n")
- csvBPath = tmp_path / "b.csv"
- with open(csvBPath, "w") as b:
- b.write("foo,bar\n")
- tarpath = tmp_path / "archive.tar"
- with tarfile.TarFile(tarpath, "w") as tar:
- tar.add(csvAPath, "a.csv")
- tar.add(csvBPath, "b.csv")
- with pytest.raises(ValueError, match="Multiple files found in TAR archive"):
- pd.read_csv(tarpath)
- def test_tar_gz_to_different_filename():
- with tm.ensure_clean(filename=".foo") as file:
- pd.DataFrame(
- [["1", "2"]],
- columns=["foo", "bar"],
- ).to_csv(file, compression={"method": "tar", "mode": "w:gz"}, index=False)
- with gzip.open(file) as uncompressed:
- with tarfile.TarFile(fileobj=uncompressed) as archive:
- members = archive.getmembers()
- assert len(members) == 1
- content = archive.extractfile(members[0]).read().decode("utf8")
- if is_platform_windows():
- expected = "foo,bar\r\n1,2\r\n"
- else:
- expected = "foo,bar\n1,2\n"
- assert content == expected
- def test_tar_no_error_on_close():
- with io.BytesIO() as buffer:
- with icom._BytesTarFile(fileobj=buffer, mode="w"):
- pass
|