123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449 |
- import os
- import numpy as np
- import pytest
- from pandas.compat import (
- PY311,
- is_ci_environment,
- is_platform_linux,
- is_platform_little_endian,
- )
- from pandas.errors import (
- ClosedFileError,
- PossibleDataLossError,
- )
- from pandas import (
- DataFrame,
- HDFStore,
- Series,
- _testing as tm,
- read_hdf,
- )
- from pandas.tests.io.pytables.common import (
- _maybe_remove,
- ensure_clean_store,
- tables,
- )
- from pandas.io import pytables
- from pandas.io.pytables import Term
- pytestmark = pytest.mark.single_cpu
- @pytest.mark.parametrize("mode", ["r", "r+", "a", "w"])
- def test_mode(setup_path, tmp_path, mode):
- df = tm.makeTimeDataFrame()
- msg = r"[\S]* does not exist"
- path = tmp_path / setup_path
- # constructor
- if mode in ["r", "r+"]:
- with pytest.raises(OSError, match=msg):
- HDFStore(path, mode=mode)
- else:
- with HDFStore(path, mode=mode) as store:
- assert store._handle.mode == mode
- path = tmp_path / setup_path
- # context
- if mode in ["r", "r+"]:
- with pytest.raises(OSError, match=msg):
- with HDFStore(path, mode=mode) as store:
- pass
- else:
- with HDFStore(path, mode=mode) as store:
- assert store._handle.mode == mode
- path = tmp_path / setup_path
- # conv write
- if mode in ["r", "r+"]:
- with pytest.raises(OSError, match=msg):
- df.to_hdf(path, "df", mode=mode)
- df.to_hdf(path, "df", mode="w")
- else:
- df.to_hdf(path, "df", mode=mode)
- # conv read
- if mode in ["w"]:
- msg = (
- "mode w is not allowed while performing a read. "
- r"Allowed modes are r, r\+ and a."
- )
- with pytest.raises(ValueError, match=msg):
- read_hdf(path, "df", mode=mode)
- else:
- result = read_hdf(path, "df", mode=mode)
- tm.assert_frame_equal(result, df)
- def test_default_mode(tmp_path, setup_path):
- # read_hdf uses default mode
- df = tm.makeTimeDataFrame()
- path = tmp_path / setup_path
- df.to_hdf(path, "df", mode="w")
- result = read_hdf(path, "df")
- tm.assert_frame_equal(result, df)
- def test_reopen_handle(tmp_path, setup_path):
- path = tmp_path / setup_path
- store = HDFStore(path, mode="a")
- store["a"] = tm.makeTimeSeries()
- msg = (
- r"Re-opening the file \[[\S]*\] with mode \[a\] will delete the "
- "current file!"
- )
- # invalid mode change
- with pytest.raises(PossibleDataLossError, match=msg):
- store.open("w")
- store.close()
- assert not store.is_open
- # truncation ok here
- store.open("w")
- assert store.is_open
- assert len(store) == 0
- store.close()
- assert not store.is_open
- store = HDFStore(path, mode="a")
- store["a"] = tm.makeTimeSeries()
- # reopen as read
- store.open("r")
- assert store.is_open
- assert len(store) == 1
- assert store._mode == "r"
- store.close()
- assert not store.is_open
- # reopen as append
- store.open("a")
- assert store.is_open
- assert len(store) == 1
- assert store._mode == "a"
- store.close()
- assert not store.is_open
- # reopen as append (again)
- store.open("a")
- assert store.is_open
- assert len(store) == 1
- assert store._mode == "a"
- store.close()
- assert not store.is_open
- def test_open_args(setup_path):
- with tm.ensure_clean(setup_path) as path:
- df = tm.makeDataFrame()
- # create an in memory store
- store = HDFStore(
- path, mode="a", driver="H5FD_CORE", driver_core_backing_store=0
- )
- store["df"] = df
- store.append("df2", df)
- tm.assert_frame_equal(store["df"], df)
- tm.assert_frame_equal(store["df2"], df)
- store.close()
- # the file should not have actually been written
- assert not os.path.exists(path)
- def test_flush(setup_path):
- with ensure_clean_store(setup_path) as store:
- store["a"] = tm.makeTimeSeries()
- store.flush()
- store.flush(fsync=True)
- def test_complibs_default_settings(tmp_path, setup_path):
- # GH15943
- df = tm.makeDataFrame()
- # Set complevel and check if complib is automatically set to
- # default value
- tmpfile = tmp_path / setup_path
- df.to_hdf(tmpfile, "df", complevel=9)
- result = read_hdf(tmpfile, "df")
- tm.assert_frame_equal(result, df)
- with tables.open_file(tmpfile, mode="r") as h5file:
- for node in h5file.walk_nodes(where="/df", classname="Leaf"):
- assert node.filters.complevel == 9
- assert node.filters.complib == "zlib"
- # Set complib and check to see if compression is disabled
- tmpfile = tmp_path / setup_path
- df.to_hdf(tmpfile, "df", complib="zlib")
- result = read_hdf(tmpfile, "df")
- tm.assert_frame_equal(result, df)
- with tables.open_file(tmpfile, mode="r") as h5file:
- for node in h5file.walk_nodes(where="/df", classname="Leaf"):
- assert node.filters.complevel == 0
- assert node.filters.complib is None
- # Check if not setting complib or complevel results in no compression
- tmpfile = tmp_path / setup_path
- df.to_hdf(tmpfile, "df")
- result = read_hdf(tmpfile, "df")
- tm.assert_frame_equal(result, df)
- with tables.open_file(tmpfile, mode="r") as h5file:
- for node in h5file.walk_nodes(where="/df", classname="Leaf"):
- assert node.filters.complevel == 0
- assert node.filters.complib is None
- def test_complibs_default_settings_override(tmp_path, setup_path):
- # Check if file-defaults can be overridden on a per table basis
- df = tm.makeDataFrame()
- tmpfile = tmp_path / setup_path
- store = HDFStore(tmpfile)
- store.append("dfc", df, complevel=9, complib="blosc")
- store.append("df", df)
- store.close()
- with tables.open_file(tmpfile, mode="r") as h5file:
- for node in h5file.walk_nodes(where="/df", classname="Leaf"):
- assert node.filters.complevel == 0
- assert node.filters.complib is None
- for node in h5file.walk_nodes(where="/dfc", classname="Leaf"):
- assert node.filters.complevel == 9
- assert node.filters.complib == "blosc"
- @pytest.mark.parametrize("lvl", range(10))
- @pytest.mark.parametrize("lib", tables.filters.all_complibs)
- @pytest.mark.filterwarnings("ignore:object name is not a valid")
- @pytest.mark.skipif(
- not PY311 and is_ci_environment() and is_platform_linux(),
- reason="Segfaulting in a CI environment"
- # with xfail, would sometimes raise UnicodeDecodeError
- # invalid state byte
- )
- def test_complibs(tmp_path, lvl, lib):
- # GH14478
- df = DataFrame(
- np.ones((30, 4)), columns=list("ABCD"), index=np.arange(30).astype(np.str_)
- )
- # Remove lzo if its not available on this platform
- if not tables.which_lib_version("lzo"):
- pytest.skip("lzo not available")
- # Remove bzip2 if its not available on this platform
- if not tables.which_lib_version("bzip2"):
- pytest.skip("bzip2 not available")
- tmpfile = tmp_path / f"{lvl}_{lib}.h5"
- gname = f"{lvl}_{lib}"
- # Write and read file to see if data is consistent
- df.to_hdf(tmpfile, gname, complib=lib, complevel=lvl)
- result = read_hdf(tmpfile, gname)
- tm.assert_frame_equal(result, df)
- # Open file and check metadata for correct amount of compression
- with tables.open_file(tmpfile, mode="r") as h5table:
- for node in h5table.walk_nodes(where="/" + gname, classname="Leaf"):
- assert node.filters.complevel == lvl
- if lvl == 0:
- assert node.filters.complib is None
- else:
- assert node.filters.complib == lib
- @pytest.mark.skipif(
- not is_platform_little_endian(), reason="reason platform is not little endian"
- )
- def test_encoding(setup_path):
- with ensure_clean_store(setup_path) as store:
- df = DataFrame({"A": "foo", "B": "bar"}, index=range(5))
- df.loc[2, "A"] = np.nan
- df.loc[3, "B"] = np.nan
- _maybe_remove(store, "df")
- store.append("df", df, encoding="ascii")
- tm.assert_frame_equal(store["df"], df)
- expected = df.reindex(columns=["A"])
- result = store.select("df", Term("columns=A", encoding="ascii"))
- tm.assert_frame_equal(result, expected)
- @pytest.mark.parametrize(
- "val",
- [
- [b"E\xc9, 17", b"", b"a", b"b", b"c"],
- [b"E\xc9, 17", b"a", b"b", b"c"],
- [b"EE, 17", b"", b"a", b"b", b"c"],
- [b"E\xc9, 17", b"\xf8\xfc", b"a", b"b", b"c"],
- [b"", b"a", b"b", b"c"],
- [b"\xf8\xfc", b"a", b"b", b"c"],
- [b"A\xf8\xfc", b"", b"a", b"b", b"c"],
- [np.nan, b"", b"b", b"c"],
- [b"A\xf8\xfc", np.nan, b"", b"b", b"c"],
- ],
- )
- @pytest.mark.parametrize("dtype", ["category", object])
- def test_latin_encoding(tmp_path, setup_path, dtype, val):
- enc = "latin-1"
- nan_rep = ""
- key = "data"
- val = [x.decode(enc) if isinstance(x, bytes) else x for x in val]
- ser = Series(val, dtype=dtype)
- store = tmp_path / setup_path
- ser.to_hdf(store, key, format="table", encoding=enc, nan_rep=nan_rep)
- retr = read_hdf(store, key)
- s_nan = ser.replace(nan_rep, np.nan)
- tm.assert_series_equal(s_nan, retr)
- def test_multiple_open_close(tmp_path, setup_path):
- # gh-4409: open & close multiple times
- path = tmp_path / setup_path
- df = tm.makeDataFrame()
- df.to_hdf(path, "df", mode="w", format="table")
- # single
- store = HDFStore(path)
- assert "CLOSED" not in store.info()
- assert store.is_open
- store.close()
- assert "CLOSED" in store.info()
- assert not store.is_open
- path = tmp_path / setup_path
- if pytables._table_file_open_policy_is_strict:
- # multiples
- store1 = HDFStore(path)
- msg = (
- r"The file [\S]* is already opened\. Please close it before "
- r"reopening in write mode\."
- )
- with pytest.raises(ValueError, match=msg):
- HDFStore(path)
- store1.close()
- else:
- # multiples
- store1 = HDFStore(path)
- store2 = HDFStore(path)
- assert "CLOSED" not in store1.info()
- assert "CLOSED" not in store2.info()
- assert store1.is_open
- assert store2.is_open
- store1.close()
- assert "CLOSED" in store1.info()
- assert not store1.is_open
- assert "CLOSED" not in store2.info()
- assert store2.is_open
- store2.close()
- assert "CLOSED" in store1.info()
- assert "CLOSED" in store2.info()
- assert not store1.is_open
- assert not store2.is_open
- # nested close
- store = HDFStore(path, mode="w")
- store.append("df", df)
- store2 = HDFStore(path)
- store2.append("df2", df)
- store2.close()
- assert "CLOSED" in store2.info()
- assert not store2.is_open
- store.close()
- assert "CLOSED" in store.info()
- assert not store.is_open
- # double closing
- store = HDFStore(path, mode="w")
- store.append("df", df)
- store2 = HDFStore(path)
- store.close()
- assert "CLOSED" in store.info()
- assert not store.is_open
- store2.close()
- assert "CLOSED" in store2.info()
- assert not store2.is_open
- # ops on a closed store
- path = tmp_path / setup_path
- df = tm.makeDataFrame()
- df.to_hdf(path, "df", mode="w", format="table")
- store = HDFStore(path)
- store.close()
- msg = r"[\S]* file is not open!"
- with pytest.raises(ClosedFileError, match=msg):
- store.keys()
- with pytest.raises(ClosedFileError, match=msg):
- "df" in store
- with pytest.raises(ClosedFileError, match=msg):
- len(store)
- with pytest.raises(ClosedFileError, match=msg):
- store["df"]
- with pytest.raises(ClosedFileError, match=msg):
- store.select("df")
- with pytest.raises(ClosedFileError, match=msg):
- store.get("df")
- with pytest.raises(ClosedFileError, match=msg):
- store.append("df2", df)
- with pytest.raises(ClosedFileError, match=msg):
- store.put("df3", df)
- with pytest.raises(ClosedFileError, match=msg):
- store.get_storer("df2")
- with pytest.raises(ClosedFileError, match=msg):
- store.remove("df2")
- with pytest.raises(ClosedFileError, match=msg):
- store.select("df")
- msg = "'HDFStore' object has no attribute 'df'"
- with pytest.raises(AttributeError, match=msg):
- store.df
- def test_fspath():
- with tm.ensure_clean("foo.h5") as path:
- with HDFStore(path) as store:
- assert os.fspath(store) == str(path)
|