test_sas7bdat.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. import contextlib
  2. from datetime import datetime
  3. import io
  4. import os
  5. from pathlib import Path
  6. import dateutil.parser
  7. import numpy as np
  8. import pytest
  9. from pandas.errors import EmptyDataError
  10. import pandas.util._test_decorators as td
  11. import pandas as pd
  12. import pandas._testing as tm
  13. @pytest.fixture
  14. def dirpath(datapath):
  15. return datapath("io", "sas", "data")
  16. @pytest.fixture(params=[(1, range(1, 16)), (2, [16])])
  17. def data_test_ix(request, dirpath):
  18. i, test_ix = request.param
  19. fname = os.path.join(dirpath, f"test_sas7bdat_{i}.csv")
  20. df = pd.read_csv(fname)
  21. epoch = datetime(1960, 1, 1)
  22. t1 = pd.to_timedelta(df["Column4"], unit="d")
  23. df["Column4"] = epoch + t1
  24. t2 = pd.to_timedelta(df["Column12"], unit="d")
  25. df["Column12"] = epoch + t2
  26. for k in range(df.shape[1]):
  27. col = df.iloc[:, k]
  28. if col.dtype == np.int64:
  29. df.isetitem(k, df.iloc[:, k].astype(np.float64))
  30. return df, test_ix
  31. # https://github.com/cython/cython/issues/1720
  32. class TestSAS7BDAT:
  33. @pytest.mark.slow
  34. def test_from_file(self, dirpath, data_test_ix):
  35. df0, test_ix = data_test_ix
  36. for k in test_ix:
  37. fname = os.path.join(dirpath, f"test{k}.sas7bdat")
  38. df = pd.read_sas(fname, encoding="utf-8")
  39. tm.assert_frame_equal(df, df0)
  40. @pytest.mark.slow
  41. def test_from_buffer(self, dirpath, data_test_ix):
  42. df0, test_ix = data_test_ix
  43. for k in test_ix:
  44. fname = os.path.join(dirpath, f"test{k}.sas7bdat")
  45. with open(fname, "rb") as f:
  46. byts = f.read()
  47. buf = io.BytesIO(byts)
  48. with pd.read_sas(
  49. buf, format="sas7bdat", iterator=True, encoding="utf-8"
  50. ) as rdr:
  51. df = rdr.read()
  52. tm.assert_frame_equal(df, df0, check_exact=False)
  53. @pytest.mark.slow
  54. def test_from_iterator(self, dirpath, data_test_ix):
  55. df0, test_ix = data_test_ix
  56. for k in test_ix:
  57. fname = os.path.join(dirpath, f"test{k}.sas7bdat")
  58. with pd.read_sas(fname, iterator=True, encoding="utf-8") as rdr:
  59. df = rdr.read(2)
  60. tm.assert_frame_equal(df, df0.iloc[0:2, :])
  61. df = rdr.read(3)
  62. tm.assert_frame_equal(df, df0.iloc[2:5, :])
  63. @pytest.mark.slow
  64. def test_path_pathlib(self, dirpath, data_test_ix):
  65. df0, test_ix = data_test_ix
  66. for k in test_ix:
  67. fname = Path(os.path.join(dirpath, f"test{k}.sas7bdat"))
  68. df = pd.read_sas(fname, encoding="utf-8")
  69. tm.assert_frame_equal(df, df0)
  70. @td.skip_if_no("py.path")
  71. @pytest.mark.slow
  72. def test_path_localpath(self, dirpath, data_test_ix):
  73. from py.path import local as LocalPath
  74. df0, test_ix = data_test_ix
  75. for k in test_ix:
  76. fname = LocalPath(os.path.join(dirpath, f"test{k}.sas7bdat"))
  77. df = pd.read_sas(fname, encoding="utf-8")
  78. tm.assert_frame_equal(df, df0)
  79. @pytest.mark.slow
  80. @pytest.mark.parametrize("chunksize", (3, 5, 10, 11))
  81. @pytest.mark.parametrize("k", range(1, 17))
  82. def test_iterator_loop(self, dirpath, k, chunksize):
  83. # github #13654
  84. fname = os.path.join(dirpath, f"test{k}.sas7bdat")
  85. with pd.read_sas(fname, chunksize=chunksize, encoding="utf-8") as rdr:
  86. y = 0
  87. for x in rdr:
  88. y += x.shape[0]
  89. assert y == rdr.row_count
  90. def test_iterator_read_too_much(self, dirpath):
  91. # github #14734
  92. fname = os.path.join(dirpath, "test1.sas7bdat")
  93. with pd.read_sas(
  94. fname, format="sas7bdat", iterator=True, encoding="utf-8"
  95. ) as rdr:
  96. d1 = rdr.read(rdr.row_count + 20)
  97. with pd.read_sas(fname, iterator=True, encoding="utf-8") as rdr:
  98. d2 = rdr.read(rdr.row_count + 20)
  99. tm.assert_frame_equal(d1, d2)
  100. def test_encoding_options(datapath):
  101. fname = datapath("io", "sas", "data", "test1.sas7bdat")
  102. df1 = pd.read_sas(fname)
  103. df2 = pd.read_sas(fname, encoding="utf-8")
  104. for col in df1.columns:
  105. try:
  106. df1[col] = df1[col].str.decode("utf-8")
  107. except AttributeError:
  108. pass
  109. tm.assert_frame_equal(df1, df2)
  110. from pandas.io.sas.sas7bdat import SAS7BDATReader
  111. with contextlib.closing(SAS7BDATReader(fname, convert_header_text=False)) as rdr:
  112. df3 = rdr.read()
  113. for x, y in zip(df1.columns, df3.columns):
  114. assert x == y.decode()
  115. def test_encoding_infer(datapath):
  116. fname = datapath("io", "sas", "data", "test1.sas7bdat")
  117. with pd.read_sas(fname, encoding="infer", iterator=True) as df1_reader:
  118. # check: is encoding inferred correctly from file
  119. assert df1_reader.inferred_encoding == "cp1252"
  120. df1 = df1_reader.read()
  121. with pd.read_sas(fname, encoding="cp1252", iterator=True) as df2_reader:
  122. df2 = df2_reader.read()
  123. # check: reader reads correct information
  124. tm.assert_frame_equal(df1, df2)
  125. def test_productsales(datapath):
  126. fname = datapath("io", "sas", "data", "productsales.sas7bdat")
  127. df = pd.read_sas(fname, encoding="utf-8")
  128. fname = datapath("io", "sas", "data", "productsales.csv")
  129. df0 = pd.read_csv(fname, parse_dates=["MONTH"])
  130. vn = ["ACTUAL", "PREDICT", "QUARTER", "YEAR"]
  131. df0[vn] = df0[vn].astype(np.float64)
  132. tm.assert_frame_equal(df, df0)
  133. def test_12659(datapath):
  134. fname = datapath("io", "sas", "data", "test_12659.sas7bdat")
  135. df = pd.read_sas(fname)
  136. fname = datapath("io", "sas", "data", "test_12659.csv")
  137. df0 = pd.read_csv(fname)
  138. df0 = df0.astype(np.float64)
  139. tm.assert_frame_equal(df, df0)
  140. def test_airline(datapath):
  141. fname = datapath("io", "sas", "data", "airline.sas7bdat")
  142. df = pd.read_sas(fname)
  143. fname = datapath("io", "sas", "data", "airline.csv")
  144. df0 = pd.read_csv(fname)
  145. df0 = df0.astype(np.float64)
  146. tm.assert_frame_equal(df, df0, check_exact=False)
  147. def test_date_time(datapath):
  148. # Support of different SAS date/datetime formats (PR #15871)
  149. fname = datapath("io", "sas", "data", "datetime.sas7bdat")
  150. df = pd.read_sas(fname)
  151. fname = datapath("io", "sas", "data", "datetime.csv")
  152. df0 = pd.read_csv(
  153. fname, parse_dates=["Date1", "Date2", "DateTime", "DateTimeHi", "Taiw"]
  154. )
  155. # GH 19732: Timestamps imported from sas will incur floating point errors
  156. df[df.columns[3]] = df.iloc[:, 3].dt.round("us")
  157. tm.assert_frame_equal(df, df0)
  158. @pytest.mark.parametrize("column", ["WGT", "CYL"])
  159. def test_compact_numerical_values(datapath, column):
  160. # Regression test for #21616
  161. fname = datapath("io", "sas", "data", "cars.sas7bdat")
  162. df = pd.read_sas(fname, encoding="latin-1")
  163. # The two columns CYL and WGT in cars.sas7bdat have column
  164. # width < 8 and only contain integral values.
  165. # Test that pandas doesn't corrupt the numbers by adding
  166. # decimals.
  167. result = df[column]
  168. expected = df[column].round()
  169. tm.assert_series_equal(result, expected, check_exact=True)
  170. def test_many_columns(datapath):
  171. # Test for looking for column information in more places (PR #22628)
  172. fname = datapath("io", "sas", "data", "many_columns.sas7bdat")
  173. df = pd.read_sas(fname, encoding="latin-1")
  174. fname = datapath("io", "sas", "data", "many_columns.csv")
  175. df0 = pd.read_csv(fname, encoding="latin-1")
  176. tm.assert_frame_equal(df, df0)
  177. def test_inconsistent_number_of_rows(datapath):
  178. # Regression test for issue #16615. (PR #22628)
  179. fname = datapath("io", "sas", "data", "load_log.sas7bdat")
  180. df = pd.read_sas(fname, encoding="latin-1")
  181. assert len(df) == 2097
  182. def test_zero_variables(datapath):
  183. # Check if the SAS file has zero variables (PR #18184)
  184. fname = datapath("io", "sas", "data", "zero_variables.sas7bdat")
  185. with pytest.raises(EmptyDataError, match="No columns to parse from file"):
  186. pd.read_sas(fname)
  187. def test_zero_rows(datapath):
  188. # GH 18198
  189. fname = datapath("io", "sas", "data", "zero_rows.sas7bdat")
  190. result = pd.read_sas(fname)
  191. expected = pd.DataFrame([{"char_field": "a", "num_field": 1.0}]).iloc[:0]
  192. tm.assert_frame_equal(result, expected)
  193. def test_corrupt_read(datapath):
  194. # We don't really care about the exact failure, the important thing is
  195. # that the resource should be cleaned up afterwards (BUG #35566)
  196. fname = datapath("io", "sas", "data", "corrupt.sas7bdat")
  197. msg = "'SAS7BDATReader' object has no attribute 'row_count'"
  198. with pytest.raises(AttributeError, match=msg):
  199. pd.read_sas(fname)
  200. def round_datetime_to_ms(ts):
  201. if isinstance(ts, datetime):
  202. return ts.replace(microsecond=int(round(ts.microsecond, -3) / 1000) * 1000)
  203. elif isinstance(ts, str):
  204. _ts = dateutil.parser.parse(timestr=ts)
  205. return _ts.replace(microsecond=int(round(_ts.microsecond, -3) / 1000) * 1000)
  206. else:
  207. return ts
  208. def test_max_sas_date(datapath):
  209. # GH 20927
  210. # NB. max datetime in SAS dataset is 31DEC9999:23:59:59.999
  211. # but this is read as 29DEC9999:23:59:59.998993 by a buggy
  212. # sas7bdat module
  213. fname = datapath("io", "sas", "data", "max_sas_date.sas7bdat")
  214. df = pd.read_sas(fname, encoding="iso-8859-1")
  215. # SAS likes to left pad strings with spaces - lstrip before comparing
  216. df = df.applymap(lambda x: x.lstrip() if isinstance(x, str) else x)
  217. # GH 19732: Timestamps imported from sas will incur floating point errors
  218. try:
  219. df["dt_as_dt"] = df["dt_as_dt"].dt.round("us")
  220. except pd._libs.tslibs.np_datetime.OutOfBoundsDatetime:
  221. df = df.applymap(round_datetime_to_ms)
  222. except AttributeError:
  223. df["dt_as_dt"] = df["dt_as_dt"].apply(round_datetime_to_ms)
  224. # if there are any date/times > pandas.Timestamp.max then ALL in that chunk
  225. # are returned as datetime.datetime
  226. expected = pd.DataFrame(
  227. {
  228. "text": ["max", "normal"],
  229. "dt_as_float": [253717747199.999, 1880323199.999],
  230. "dt_as_dt": [
  231. datetime(9999, 12, 29, 23, 59, 59, 999000),
  232. datetime(2019, 8, 1, 23, 59, 59, 999000),
  233. ],
  234. "date_as_float": [2936547.0, 21762.0],
  235. "date_as_date": [datetime(9999, 12, 29), datetime(2019, 8, 1)],
  236. },
  237. columns=["text", "dt_as_float", "dt_as_dt", "date_as_float", "date_as_date"],
  238. )
  239. tm.assert_frame_equal(df, expected)
  240. def test_max_sas_date_iterator(datapath):
  241. # GH 20927
  242. # when called as an iterator, only those chunks with a date > pd.Timestamp.max
  243. # are returned as datetime.datetime, if this happens that whole chunk is returned
  244. # as datetime.datetime
  245. col_order = ["text", "dt_as_float", "dt_as_dt", "date_as_float", "date_as_date"]
  246. fname = datapath("io", "sas", "data", "max_sas_date.sas7bdat")
  247. results = []
  248. for df in pd.read_sas(fname, encoding="iso-8859-1", chunksize=1):
  249. # SAS likes to left pad strings with spaces - lstrip before comparing
  250. df = df.applymap(lambda x: x.lstrip() if isinstance(x, str) else x)
  251. # GH 19732: Timestamps imported from sas will incur floating point errors
  252. try:
  253. df["dt_as_dt"] = df["dt_as_dt"].dt.round("us")
  254. except pd._libs.tslibs.np_datetime.OutOfBoundsDatetime:
  255. df = df.applymap(round_datetime_to_ms)
  256. except AttributeError:
  257. df["dt_as_dt"] = df["dt_as_dt"].apply(round_datetime_to_ms)
  258. df.reset_index(inplace=True, drop=True)
  259. results.append(df)
  260. expected = [
  261. pd.DataFrame(
  262. {
  263. "text": ["max"],
  264. "dt_as_float": [253717747199.999],
  265. "dt_as_dt": [datetime(9999, 12, 29, 23, 59, 59, 999000)],
  266. "date_as_float": [2936547.0],
  267. "date_as_date": [datetime(9999, 12, 29)],
  268. },
  269. columns=col_order,
  270. ),
  271. pd.DataFrame(
  272. {
  273. "text": ["normal"],
  274. "dt_as_float": [1880323199.999],
  275. "dt_as_dt": [np.datetime64("2019-08-01 23:59:59.999")],
  276. "date_as_float": [21762.0],
  277. "date_as_date": [np.datetime64("2019-08-01")],
  278. },
  279. columns=col_order,
  280. ),
  281. ]
  282. for result, expected in zip(results, expected):
  283. tm.assert_frame_equal(result, expected)
  284. def test_null_date(datapath):
  285. fname = datapath("io", "sas", "data", "dates_null.sas7bdat")
  286. df = pd.read_sas(fname, encoding="utf-8")
  287. expected = pd.DataFrame(
  288. {
  289. "datecol": [
  290. datetime(9999, 12, 29),
  291. pd.NaT,
  292. ],
  293. "datetimecol": [
  294. datetime(9999, 12, 29, 23, 59, 59, 998993),
  295. pd.NaT,
  296. ],
  297. },
  298. )
  299. tm.assert_frame_equal(df, expected)
  300. def test_meta2_page(datapath):
  301. # GH 35545
  302. fname = datapath("io", "sas", "data", "test_meta2_page.sas7bdat")
  303. df = pd.read_sas(fname)
  304. assert len(df) == 1000
  305. @pytest.mark.parametrize(
  306. "test_file, override_offset, override_value, expected_msg",
  307. [
  308. ("test2.sas7bdat", 0x10000 + 55229, 0x80 | 0x0F, "Out of bounds"),
  309. ("test2.sas7bdat", 0x10000 + 55229, 0x10, "unknown control byte"),
  310. ("test3.sas7bdat", 118170, 184, "Out of bounds"),
  311. ],
  312. )
  313. def test_rle_rdc_exceptions(
  314. datapath, test_file, override_offset, override_value, expected_msg
  315. ):
  316. """Errors in RLE/RDC decompression should propagate."""
  317. with open(datapath("io", "sas", "data", test_file), "rb") as fd:
  318. data = bytearray(fd.read())
  319. data[override_offset] = override_value
  320. with pytest.raises(Exception, match=expected_msg):
  321. pd.read_sas(io.BytesIO(data), format="sas7bdat")
  322. def test_0x40_control_byte(datapath):
  323. # GH 31243
  324. fname = datapath("io", "sas", "data", "0x40controlbyte.sas7bdat")
  325. df = pd.read_sas(fname, encoding="ascii")
  326. fname = datapath("io", "sas", "data", "0x40controlbyte.csv")
  327. df0 = pd.read_csv(fname, dtype="object")
  328. tm.assert_frame_equal(df, df0)
  329. def test_0x00_control_byte(datapath):
  330. # GH 47099
  331. fname = datapath("io", "sas", "data", "0x00controlbyte.sas7bdat.bz2")
  332. df = next(pd.read_sas(fname, chunksize=11_000))
  333. assert df.shape == (11_000, 20)