123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544 |
- import datetime
- import re
- from warnings import (
- catch_warnings,
- simplefilter,
- )
- import numpy as np
- import pytest
- from pandas._libs.tslibs import Timestamp
- from pandas.compat import is_platform_windows
- import pandas as pd
- from pandas import (
- DataFrame,
- Index,
- Series,
- _testing as tm,
- bdate_range,
- read_hdf,
- )
- from pandas.tests.io.pytables.common import (
- _maybe_remove,
- ensure_clean_store,
- )
- from pandas.util import _test_decorators as td
- _default_compressor = "blosc"
- pytestmark = pytest.mark.single_cpu
- def test_conv_read_write():
- with tm.ensure_clean() as path:
- def roundtrip(key, obj, **kwargs):
- obj.to_hdf(path, key, **kwargs)
- return read_hdf(path, key)
- o = tm.makeTimeSeries()
- tm.assert_series_equal(o, roundtrip("series", o))
- o = tm.makeStringSeries()
- tm.assert_series_equal(o, roundtrip("string_series", o))
- o = tm.makeDataFrame()
- tm.assert_frame_equal(o, roundtrip("frame", o))
- # table
- df = DataFrame({"A": range(5), "B": range(5)})
- df.to_hdf(path, "table", append=True)
- result = read_hdf(path, "table", where=["index>2"])
- tm.assert_frame_equal(df[df.index > 2], result)
- def test_long_strings(setup_path):
- # GH6166
- df = DataFrame(
- {"a": tm.rands_array(100, size=10)}, index=tm.rands_array(100, size=10)
- )
- with ensure_clean_store(setup_path) as store:
- store.append("df", df, data_columns=["a"])
- result = store.select("df")
- tm.assert_frame_equal(df, result)
- def test_api(tmp_path, setup_path):
- # GH4584
- # API issue when to_hdf doesn't accept append AND format args
- path = tmp_path / setup_path
- df = tm.makeDataFrame()
- df.iloc[:10].to_hdf(path, "df", append=True, format="table")
- df.iloc[10:].to_hdf(path, "df", append=True, format="table")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- # append to False
- df.iloc[:10].to_hdf(path, "df", append=False, format="table")
- df.iloc[10:].to_hdf(path, "df", append=True, format="table")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- def test_api_append(tmp_path, setup_path):
- path = tmp_path / setup_path
- df = tm.makeDataFrame()
- df.iloc[:10].to_hdf(path, "df", append=True)
- df.iloc[10:].to_hdf(path, "df", append=True, format="table")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- # append to False
- df.iloc[:10].to_hdf(path, "df", append=False, format="table")
- df.iloc[10:].to_hdf(path, "df", append=True)
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- def test_api_2(tmp_path, setup_path):
- path = tmp_path / setup_path
- df = tm.makeDataFrame()
- df.to_hdf(path, "df", append=False, format="fixed")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- df.to_hdf(path, "df", append=False, format="f")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- df.to_hdf(path, "df", append=False)
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- df.to_hdf(path, "df")
- tm.assert_frame_equal(read_hdf(path, "df"), df)
- with ensure_clean_store(setup_path) as store:
- df = tm.makeDataFrame()
- _maybe_remove(store, "df")
- store.append("df", df.iloc[:10], append=True, format="table")
- store.append("df", df.iloc[10:], append=True, format="table")
- tm.assert_frame_equal(store.select("df"), df)
- # append to False
- _maybe_remove(store, "df")
- store.append("df", df.iloc[:10], append=False, format="table")
- store.append("df", df.iloc[10:], append=True, format="table")
- tm.assert_frame_equal(store.select("df"), df)
- # formats
- _maybe_remove(store, "df")
- store.append("df", df.iloc[:10], append=False, format="table")
- store.append("df", df.iloc[10:], append=True, format="table")
- tm.assert_frame_equal(store.select("df"), df)
- _maybe_remove(store, "df")
- store.append("df", df.iloc[:10], append=False, format="table")
- store.append("df", df.iloc[10:], append=True, format=None)
- tm.assert_frame_equal(store.select("df"), df)
- def test_api_invalid(tmp_path, setup_path):
- path = tmp_path / setup_path
- # Invalid.
- df = tm.makeDataFrame()
- msg = "Can only append to Tables"
- with pytest.raises(ValueError, match=msg):
- df.to_hdf(path, "df", append=True, format="f")
- with pytest.raises(ValueError, match=msg):
- df.to_hdf(path, "df", append=True, format="fixed")
- msg = r"invalid HDFStore format specified \[foo\]"
- with pytest.raises(TypeError, match=msg):
- df.to_hdf(path, "df", append=True, format="foo")
- with pytest.raises(TypeError, match=msg):
- df.to_hdf(path, "df", append=False, format="foo")
- # File path doesn't exist
- path = ""
- msg = f"File {path} does not exist"
- with pytest.raises(FileNotFoundError, match=msg):
- read_hdf(path, "df")
- def test_get(setup_path):
- with ensure_clean_store(setup_path) as store:
- store["a"] = tm.makeTimeSeries()
- left = store.get("a")
- right = store["a"]
- tm.assert_series_equal(left, right)
- left = store.get("/a")
- right = store["/a"]
- tm.assert_series_equal(left, right)
- with pytest.raises(KeyError, match="'No object named b in the file'"):
- store.get("b")
- def test_put_integer(setup_path):
- # non-date, non-string index
- df = DataFrame(np.random.randn(50, 100))
- _check_roundtrip(df, tm.assert_frame_equal, setup_path)
- def test_table_values_dtypes_roundtrip(setup_path):
- with ensure_clean_store(setup_path) as store:
- df1 = DataFrame({"a": [1, 2, 3]}, dtype="f8")
- store.append("df_f8", df1)
- tm.assert_series_equal(df1.dtypes, store["df_f8"].dtypes)
- df2 = DataFrame({"a": [1, 2, 3]}, dtype="i8")
- store.append("df_i8", df2)
- tm.assert_series_equal(df2.dtypes, store["df_i8"].dtypes)
- # incompatible dtype
- msg = re.escape(
- "invalid combination of [values_axes] on appending data "
- "[name->values_block_0,cname->values_block_0,"
- "dtype->float64,kind->float,shape->(1, 3)] vs "
- "current table [name->values_block_0,"
- "cname->values_block_0,dtype->int64,kind->integer,"
- "shape->None]"
- )
- with pytest.raises(ValueError, match=msg):
- store.append("df_i8", df1)
- # check creation/storage/retrieval of float32 (a bit hacky to
- # actually create them thought)
- df1 = DataFrame(np.array([[1], [2], [3]], dtype="f4"), columns=["A"])
- store.append("df_f4", df1)
- tm.assert_series_equal(df1.dtypes, store["df_f4"].dtypes)
- assert df1.dtypes[0] == "float32"
- # check with mixed dtypes
- df1 = DataFrame(
- {
- c: Series(np.random.randint(5), dtype=c)
- for c in ["float32", "float64", "int32", "int64", "int16", "int8"]
- }
- )
- df1["string"] = "foo"
- df1["float322"] = 1.0
- df1["float322"] = df1["float322"].astype("float32")
- df1["bool"] = df1["float32"] > 0
- df1["time1"] = Timestamp("20130101")
- df1["time2"] = Timestamp("20130102")
- store.append("df_mixed_dtypes1", df1)
- result = store.select("df_mixed_dtypes1").dtypes.value_counts()
- result.index = [str(i) for i in result.index]
- expected = Series(
- {
- "float32": 2,
- "float64": 1,
- "int32": 1,
- "bool": 1,
- "int16": 1,
- "int8": 1,
- "int64": 1,
- "object": 1,
- "datetime64[ns]": 2,
- },
- name="count",
- )
- result = result.sort_index()
- expected = expected.sort_index()
- tm.assert_series_equal(result, expected)
- @pytest.mark.filterwarnings("ignore::pandas.errors.PerformanceWarning")
- def test_series(setup_path):
- s = tm.makeStringSeries()
- _check_roundtrip(s, tm.assert_series_equal, path=setup_path)
- ts = tm.makeTimeSeries()
- _check_roundtrip(ts, tm.assert_series_equal, path=setup_path)
- ts2 = Series(ts.index, Index(ts.index, dtype=object))
- _check_roundtrip(ts2, tm.assert_series_equal, path=setup_path)
- ts3 = Series(ts.values, Index(np.asarray(ts.index, dtype=object), dtype=object))
- _check_roundtrip(
- ts3, tm.assert_series_equal, path=setup_path, check_index_type=False
- )
- def test_float_index(setup_path):
- # GH #454
- index = np.random.randn(10)
- s = Series(np.random.randn(10), index=index)
- _check_roundtrip(s, tm.assert_series_equal, path=setup_path)
- def test_tuple_index(setup_path):
- # GH #492
- col = np.arange(10)
- idx = [(0.0, 1.0), (2.0, 3.0), (4.0, 5.0)]
- data = np.random.randn(30).reshape((3, 10))
- DF = DataFrame(data, index=idx, columns=col)
- with catch_warnings(record=True):
- simplefilter("ignore", pd.errors.PerformanceWarning)
- _check_roundtrip(DF, tm.assert_frame_equal, path=setup_path)
- @pytest.mark.filterwarnings("ignore::pandas.errors.PerformanceWarning")
- def test_index_types(setup_path):
- with catch_warnings(record=True):
- values = np.random.randn(2)
- func = lambda lhs, rhs: tm.assert_series_equal(lhs, rhs, check_index_type=True)
- with catch_warnings(record=True):
- ser = Series(values, [0, "y"])
- _check_roundtrip(ser, func, path=setup_path)
- with catch_warnings(record=True):
- ser = Series(values, [datetime.datetime.today(), 0])
- _check_roundtrip(ser, func, path=setup_path)
- with catch_warnings(record=True):
- ser = Series(values, ["y", 0])
- _check_roundtrip(ser, func, path=setup_path)
- with catch_warnings(record=True):
- ser = Series(values, [datetime.date.today(), "a"])
- _check_roundtrip(ser, func, path=setup_path)
- with catch_warnings(record=True):
- ser = Series(values, [0, "y"])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, [datetime.datetime.today(), 0])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, ["y", 0])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, [datetime.date.today(), "a"])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, [1.23, "b"])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, [1, 1.53])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(values, [1, 5])
- _check_roundtrip(ser, func, path=setup_path)
- ser = Series(
- values, [datetime.datetime(2012, 1, 1), datetime.datetime(2012, 1, 2)]
- )
- _check_roundtrip(ser, func, path=setup_path)
- def test_timeseries_preepoch(setup_path, request):
- dr = bdate_range("1/1/1940", "1/1/1960")
- ts = Series(np.random.randn(len(dr)), index=dr)
- try:
- _check_roundtrip(ts, tm.assert_series_equal, path=setup_path)
- except OverflowError:
- if is_platform_windows():
- request.node.add_marker(
- pytest.mark.xfail("known failure on some windows platforms")
- )
- raise
- @pytest.mark.parametrize(
- "compression", [False, pytest.param(True, marks=td.skip_if_windows)]
- )
- def test_frame(compression, setup_path):
- df = tm.makeDataFrame()
- # put in some random NAs
- df.iloc[0, 0] = np.nan
- df.iloc[5, 3] = np.nan
- _check_roundtrip_table(
- df, tm.assert_frame_equal, path=setup_path, compression=compression
- )
- _check_roundtrip(
- df, tm.assert_frame_equal, path=setup_path, compression=compression
- )
- tdf = tm.makeTimeDataFrame()
- _check_roundtrip(
- tdf, tm.assert_frame_equal, path=setup_path, compression=compression
- )
- with ensure_clean_store(setup_path) as store:
- # not consolidated
- df["foo"] = np.random.randn(len(df))
- store["df"] = df
- recons = store["df"]
- assert recons._mgr.is_consolidated()
- # empty
- _check_roundtrip(df[:0], tm.assert_frame_equal, path=setup_path)
- def test_empty_series_frame(setup_path):
- s0 = Series(dtype=object)
- s1 = Series(name="myseries", dtype=object)
- df0 = DataFrame()
- df1 = DataFrame(index=["a", "b", "c"])
- df2 = DataFrame(columns=["d", "e", "f"])
- _check_roundtrip(s0, tm.assert_series_equal, path=setup_path)
- _check_roundtrip(s1, tm.assert_series_equal, path=setup_path)
- _check_roundtrip(df0, tm.assert_frame_equal, path=setup_path)
- _check_roundtrip(df1, tm.assert_frame_equal, path=setup_path)
- _check_roundtrip(df2, tm.assert_frame_equal, path=setup_path)
- @pytest.mark.parametrize("dtype", [np.int64, np.float64, object, "m8[ns]", "M8[ns]"])
- def test_empty_series(dtype, setup_path):
- s = Series(dtype=dtype)
- _check_roundtrip(s, tm.assert_series_equal, path=setup_path)
- def test_can_serialize_dates(setup_path):
- rng = [x.date() for x in bdate_range("1/1/2000", "1/30/2000")]
- frame = DataFrame(np.random.randn(len(rng), 4), index=rng)
- _check_roundtrip(frame, tm.assert_frame_equal, path=setup_path)
- def test_store_hierarchical(setup_path, multiindex_dataframe_random_data):
- frame = multiindex_dataframe_random_data
- _check_roundtrip(frame, tm.assert_frame_equal, path=setup_path)
- _check_roundtrip(frame.T, tm.assert_frame_equal, path=setup_path)
- _check_roundtrip(frame["A"], tm.assert_series_equal, path=setup_path)
- # check that the names are stored
- with ensure_clean_store(setup_path) as store:
- store["frame"] = frame
- recons = store["frame"]
- tm.assert_frame_equal(recons, frame)
- @pytest.mark.parametrize(
- "compression", [False, pytest.param(True, marks=td.skip_if_windows)]
- )
- def test_store_mixed(compression, setup_path):
- def _make_one():
- df = tm.makeDataFrame()
- df["obj1"] = "foo"
- df["obj2"] = "bar"
- df["bool1"] = df["A"] > 0
- df["bool2"] = df["B"] > 0
- df["int1"] = 1
- df["int2"] = 2
- return df._consolidate()
- df1 = _make_one()
- df2 = _make_one()
- _check_roundtrip(df1, tm.assert_frame_equal, path=setup_path)
- _check_roundtrip(df2, tm.assert_frame_equal, path=setup_path)
- with ensure_clean_store(setup_path) as store:
- store["obj"] = df1
- tm.assert_frame_equal(store["obj"], df1)
- store["obj"] = df2
- tm.assert_frame_equal(store["obj"], df2)
- # check that can store Series of all of these types
- _check_roundtrip(
- df1["obj1"],
- tm.assert_series_equal,
- path=setup_path,
- compression=compression,
- )
- _check_roundtrip(
- df1["bool1"],
- tm.assert_series_equal,
- path=setup_path,
- compression=compression,
- )
- _check_roundtrip(
- df1["int1"],
- tm.assert_series_equal,
- path=setup_path,
- compression=compression,
- )
- def _check_roundtrip(obj, comparator, path, compression=False, **kwargs):
- options = {}
- if compression:
- options["complib"] = _default_compressor
- with ensure_clean_store(path, "w", **options) as store:
- store["obj"] = obj
- retrieved = store["obj"]
- comparator(retrieved, obj, **kwargs)
- def _check_roundtrip_table(obj, comparator, path, compression=False):
- options = {}
- if compression:
- options["complib"] = _default_compressor
- with ensure_clean_store(path, "w", **options) as store:
- store.put("obj", obj, format="table")
- retrieved = store["obj"]
- comparator(retrieved, obj)
- def test_unicode_index(setup_path):
- unicode_values = ["\u03c3", "\u03c3\u03c3"]
- # PerformanceWarning
- with catch_warnings(record=True):
- simplefilter("ignore", pd.errors.PerformanceWarning)
- s = Series(np.random.randn(len(unicode_values)), unicode_values)
- _check_roundtrip(s, tm.assert_series_equal, path=setup_path)
- def test_unicode_longer_encoded(setup_path):
- # GH 11234
- char = "\u0394"
- df = DataFrame({"A": [char]})
- with ensure_clean_store(setup_path) as store:
- store.put("df", df, format="table", encoding="utf-8")
- result = store.get("df")
- tm.assert_frame_equal(result, df)
- df = DataFrame({"A": ["a", char], "B": ["b", "b"]})
- with ensure_clean_store(setup_path) as store:
- store.put("df", df, format="table", encoding="utf-8")
- result = store.get("df")
- tm.assert_frame_equal(result, df)
- def test_store_datetime_mixed(setup_path):
- df = DataFrame({"a": [1, 2, 3], "b": [1.0, 2.0, 3.0], "c": ["a", "b", "c"]})
- ts = tm.makeTimeSeries()
- df["d"] = ts.index[:3]
- _check_roundtrip(df, tm.assert_frame_equal, path=setup_path)
- def test_round_trip_equals(tmp_path, setup_path):
- # GH 9330
- df = DataFrame({"B": [1, 2], "A": ["x", "y"]})
- path = tmp_path / setup_path
- df.to_hdf(path, "df", format="table")
- other = read_hdf(path, "df")
- tm.assert_frame_equal(df, other)
- assert df.equals(other)
- assert other.equals(df)
|