sas_xport.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506
  1. """
  2. Read a SAS XPort format file into a Pandas DataFrame.
  3. Based on code from Jack Cushman (github.com/jcushman/xport).
  4. The file format is defined here:
  5. https://support.sas.com/content/dam/SAS/support/en/technical-papers/record-layout-of-a-sas-version-5-or-6-data-set-in-sas-transport-xport-format.pdf
  6. """
  7. from __future__ import annotations
  8. from collections import abc
  9. from datetime import datetime
  10. import struct
  11. import warnings
  12. import numpy as np
  13. from pandas._typing import (
  14. CompressionOptions,
  15. DatetimeNaTType,
  16. FilePath,
  17. ReadBuffer,
  18. )
  19. from pandas.util._decorators import Appender
  20. from pandas.util._exceptions import find_stack_level
  21. import pandas as pd
  22. from pandas.io.common import get_handle
  23. from pandas.io.sas.sasreader import ReaderBase
  24. _correct_line1 = (
  25. "HEADER RECORD*******LIBRARY HEADER RECORD!!!!!!!"
  26. "000000000000000000000000000000 "
  27. )
  28. _correct_header1 = (
  29. "HEADER RECORD*******MEMBER HEADER RECORD!!!!!!!000000000000000001600000000"
  30. )
  31. _correct_header2 = (
  32. "HEADER RECORD*******DSCRPTR HEADER RECORD!!!!!!!"
  33. "000000000000000000000000000000 "
  34. )
  35. _correct_obs_header = (
  36. "HEADER RECORD*******OBS HEADER RECORD!!!!!!!"
  37. "000000000000000000000000000000 "
  38. )
  39. _fieldkeys = [
  40. "ntype",
  41. "nhfun",
  42. "field_length",
  43. "nvar0",
  44. "name",
  45. "label",
  46. "nform",
  47. "nfl",
  48. "num_decimals",
  49. "nfj",
  50. "nfill",
  51. "niform",
  52. "nifl",
  53. "nifd",
  54. "npos",
  55. "_",
  56. ]
  57. _base_params_doc = """\
  58. Parameters
  59. ----------
  60. filepath_or_buffer : str or file-like object
  61. Path to SAS file or object implementing binary read method."""
  62. _params2_doc = """\
  63. index : identifier of index column
  64. Identifier of column that should be used as index of the DataFrame.
  65. encoding : str
  66. Encoding for text data.
  67. chunksize : int
  68. Read file `chunksize` lines at a time, returns iterator."""
  69. _format_params_doc = """\
  70. format : str
  71. File format, only `xport` is currently supported."""
  72. _iterator_doc = """\
  73. iterator : bool, default False
  74. Return XportReader object for reading file incrementally."""
  75. _read_sas_doc = f"""Read a SAS file into a DataFrame.
  76. {_base_params_doc}
  77. {_format_params_doc}
  78. {_params2_doc}
  79. {_iterator_doc}
  80. Returns
  81. -------
  82. DataFrame or XportReader
  83. Examples
  84. --------
  85. Read a SAS Xport file:
  86. >>> df = pd.read_sas('filename.XPT')
  87. Read a Xport file in 10,000 line chunks:
  88. >>> itr = pd.read_sas('filename.XPT', chunksize=10000)
  89. >>> for chunk in itr:
  90. >>> do_something(chunk)
  91. """
  92. _xport_reader_doc = f"""\
  93. Class for reading SAS Xport files.
  94. {_base_params_doc}
  95. {_params2_doc}
  96. Attributes
  97. ----------
  98. member_info : list
  99. Contains information about the file
  100. fields : list
  101. Contains information about the variables in the file
  102. """
  103. _read_method_doc = """\
  104. Read observations from SAS Xport file, returning as data frame.
  105. Parameters
  106. ----------
  107. nrows : int
  108. Number of rows to read from data file; if None, read whole
  109. file.
  110. Returns
  111. -------
  112. A DataFrame.
  113. """
  114. def _parse_date(datestr: str) -> DatetimeNaTType:
  115. """Given a date in xport format, return Python date."""
  116. try:
  117. # e.g. "16FEB11:10:07:55"
  118. return datetime.strptime(datestr, "%d%b%y:%H:%M:%S")
  119. except ValueError:
  120. return pd.NaT
  121. def _split_line(s: str, parts):
  122. """
  123. Parameters
  124. ----------
  125. s: str
  126. Fixed-length string to split
  127. parts: list of (name, length) pairs
  128. Used to break up string, name '_' will be filtered from output.
  129. Returns
  130. -------
  131. Dict of name:contents of string at given location.
  132. """
  133. out = {}
  134. start = 0
  135. for name, length in parts:
  136. out[name] = s[start : start + length].strip()
  137. start += length
  138. del out["_"]
  139. return out
  140. def _handle_truncated_float_vec(vec, nbytes):
  141. # This feature is not well documented, but some SAS XPORT files
  142. # have 2-7 byte "truncated" floats. To read these truncated
  143. # floats, pad them with zeros on the right to make 8 byte floats.
  144. #
  145. # References:
  146. # https://github.com/jcushman/xport/pull/3
  147. # The R "foreign" library
  148. if nbytes != 8:
  149. vec1 = np.zeros(len(vec), np.dtype("S8"))
  150. dtype = np.dtype(f"S{nbytes},S{8 - nbytes}")
  151. vec2 = vec1.view(dtype=dtype)
  152. vec2["f0"] = vec
  153. return vec2
  154. return vec
  155. def _parse_float_vec(vec):
  156. """
  157. Parse a vector of float values representing IBM 8 byte floats into
  158. native 8 byte floats.
  159. """
  160. dtype = np.dtype(">u4,>u4")
  161. vec1 = vec.view(dtype=dtype)
  162. xport1 = vec1["f0"]
  163. xport2 = vec1["f1"]
  164. # Start by setting first half of ieee number to first half of IBM
  165. # number sans exponent
  166. ieee1 = xport1 & 0x00FFFFFF
  167. # The fraction bit to the left of the binary point in the ieee
  168. # format was set and the number was shifted 0, 1, 2, or 3
  169. # places. This will tell us how to adjust the ibm exponent to be a
  170. # power of 2 ieee exponent and how to shift the fraction bits to
  171. # restore the correct magnitude.
  172. shift = np.zeros(len(vec), dtype=np.uint8)
  173. shift[np.where(xport1 & 0x00200000)] = 1
  174. shift[np.where(xport1 & 0x00400000)] = 2
  175. shift[np.where(xport1 & 0x00800000)] = 3
  176. # shift the ieee number down the correct number of places then
  177. # set the second half of the ieee number to be the second half
  178. # of the ibm number shifted appropriately, ored with the bits
  179. # from the first half that would have been shifted in if we
  180. # could shift a double. All we are worried about are the low
  181. # order 3 bits of the first half since we're only shifting by
  182. # 1, 2, or 3.
  183. ieee1 >>= shift
  184. ieee2 = (xport2 >> shift) | ((xport1 & 0x00000007) << (29 + (3 - shift)))
  185. # clear the 1 bit to the left of the binary point
  186. ieee1 &= 0xFFEFFFFF
  187. # set the exponent of the ieee number to be the actual exponent
  188. # plus the shift count + 1023. Or this into the first half of the
  189. # ieee number. The ibm exponent is excess 64 but is adjusted by 65
  190. # since during conversion to ibm format the exponent is
  191. # incremented by 1 and the fraction bits left 4 positions to the
  192. # right of the radix point. (had to add >> 24 because C treats &
  193. # 0x7f as 0x7f000000 and Python doesn't)
  194. ieee1 |= ((((((xport1 >> 24) & 0x7F) - 65) << 2) + shift + 1023) << 20) | (
  195. xport1 & 0x80000000
  196. )
  197. ieee = np.empty((len(ieee1),), dtype=">u4,>u4")
  198. ieee["f0"] = ieee1
  199. ieee["f1"] = ieee2
  200. ieee = ieee.view(dtype=">f8")
  201. ieee = ieee.astype("f8")
  202. return ieee
  203. class XportReader(ReaderBase, abc.Iterator):
  204. __doc__ = _xport_reader_doc
  205. def __init__(
  206. self,
  207. filepath_or_buffer: FilePath | ReadBuffer[bytes],
  208. index=None,
  209. encoding: str | None = "ISO-8859-1",
  210. chunksize=None,
  211. compression: CompressionOptions = "infer",
  212. ) -> None:
  213. self._encoding = encoding
  214. self._lines_read = 0
  215. self._index = index
  216. self._chunksize = chunksize
  217. self.handles = get_handle(
  218. filepath_or_buffer,
  219. "rb",
  220. encoding=encoding,
  221. is_text=False,
  222. compression=compression,
  223. )
  224. self.filepath_or_buffer = self.handles.handle
  225. try:
  226. self._read_header()
  227. except Exception:
  228. self.close()
  229. raise
  230. def close(self) -> None:
  231. self.handles.close()
  232. def _get_row(self):
  233. return self.filepath_or_buffer.read(80).decode()
  234. def _read_header(self):
  235. self.filepath_or_buffer.seek(0)
  236. # read file header
  237. line1 = self._get_row()
  238. if line1 != _correct_line1:
  239. if "**COMPRESSED**" in line1:
  240. # this was created with the PROC CPORT method and can't be read
  241. # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/movefile/p1bm6aqp3fw4uin1hucwh718f6kp.htm
  242. raise ValueError(
  243. "Header record indicates a CPORT file, which is not readable."
  244. )
  245. raise ValueError("Header record is not an XPORT file.")
  246. line2 = self._get_row()
  247. fif = [["prefix", 24], ["version", 8], ["OS", 8], ["_", 24], ["created", 16]]
  248. file_info = _split_line(line2, fif)
  249. if file_info["prefix"] != "SAS SAS SASLIB":
  250. raise ValueError("Header record has invalid prefix.")
  251. file_info["created"] = _parse_date(file_info["created"])
  252. self.file_info = file_info
  253. line3 = self._get_row()
  254. file_info["modified"] = _parse_date(line3[:16])
  255. # read member header
  256. header1 = self._get_row()
  257. header2 = self._get_row()
  258. headflag1 = header1.startswith(_correct_header1)
  259. headflag2 = header2 == _correct_header2
  260. if not (headflag1 and headflag2):
  261. raise ValueError("Member header not found")
  262. # usually 140, could be 135
  263. fieldnamelength = int(header1[-5:-2])
  264. # member info
  265. mem = [
  266. ["prefix", 8],
  267. ["set_name", 8],
  268. ["sasdata", 8],
  269. ["version", 8],
  270. ["OS", 8],
  271. ["_", 24],
  272. ["created", 16],
  273. ]
  274. member_info = _split_line(self._get_row(), mem)
  275. mem = [["modified", 16], ["_", 16], ["label", 40], ["type", 8]]
  276. member_info.update(_split_line(self._get_row(), mem))
  277. member_info["modified"] = _parse_date(member_info["modified"])
  278. member_info["created"] = _parse_date(member_info["created"])
  279. self.member_info = member_info
  280. # read field names
  281. types = {1: "numeric", 2: "char"}
  282. fieldcount = int(self._get_row()[54:58])
  283. datalength = fieldnamelength * fieldcount
  284. # round up to nearest 80
  285. if datalength % 80:
  286. datalength += 80 - datalength % 80
  287. fielddata = self.filepath_or_buffer.read(datalength)
  288. fields = []
  289. obs_length = 0
  290. while len(fielddata) >= fieldnamelength:
  291. # pull data for one field
  292. fieldbytes, fielddata = (
  293. fielddata[:fieldnamelength],
  294. fielddata[fieldnamelength:],
  295. )
  296. # rest at end gets ignored, so if field is short, pad out
  297. # to match struct pattern below
  298. fieldbytes = fieldbytes.ljust(140)
  299. fieldstruct = struct.unpack(">hhhh8s40s8shhh2s8shhl52s", fieldbytes)
  300. field = dict(zip(_fieldkeys, fieldstruct))
  301. del field["_"]
  302. field["ntype"] = types[field["ntype"]]
  303. fl = field["field_length"]
  304. if field["ntype"] == "numeric" and ((fl < 2) or (fl > 8)):
  305. msg = f"Floating field width {fl} is not between 2 and 8."
  306. raise TypeError(msg)
  307. for k, v in field.items():
  308. try:
  309. field[k] = v.strip()
  310. except AttributeError:
  311. pass
  312. obs_length += field["field_length"]
  313. fields += [field]
  314. header = self._get_row()
  315. if not header == _correct_obs_header:
  316. raise ValueError("Observation header not found.")
  317. self.fields = fields
  318. self.record_length = obs_length
  319. self.record_start = self.filepath_or_buffer.tell()
  320. self.nobs = self._record_count()
  321. self.columns = [x["name"].decode() for x in self.fields]
  322. # Setup the dtype.
  323. dtypel = [
  324. ("s" + str(i), "S" + str(field["field_length"]))
  325. for i, field in enumerate(self.fields)
  326. ]
  327. dtype = np.dtype(dtypel)
  328. self._dtype = dtype
  329. def __next__(self) -> pd.DataFrame:
  330. return self.read(nrows=self._chunksize or 1)
  331. def _record_count(self) -> int:
  332. """
  333. Get number of records in file.
  334. This is maybe suboptimal because we have to seek to the end of
  335. the file.
  336. Side effect: returns file position to record_start.
  337. """
  338. self.filepath_or_buffer.seek(0, 2)
  339. total_records_length = self.filepath_or_buffer.tell() - self.record_start
  340. if total_records_length % 80 != 0:
  341. warnings.warn(
  342. "xport file may be corrupted.",
  343. stacklevel=find_stack_level(),
  344. )
  345. if self.record_length > 80:
  346. self.filepath_or_buffer.seek(self.record_start)
  347. return total_records_length // self.record_length
  348. self.filepath_or_buffer.seek(-80, 2)
  349. last_card_bytes = self.filepath_or_buffer.read(80)
  350. last_card = np.frombuffer(last_card_bytes, dtype=np.uint64)
  351. # 8 byte blank
  352. ix = np.flatnonzero(last_card == 2314885530818453536)
  353. if len(ix) == 0:
  354. tail_pad = 0
  355. else:
  356. tail_pad = 8 * len(ix)
  357. self.filepath_or_buffer.seek(self.record_start)
  358. return (total_records_length - tail_pad) // self.record_length
  359. def get_chunk(self, size=None) -> pd.DataFrame:
  360. """
  361. Reads lines from Xport file and returns as dataframe
  362. Parameters
  363. ----------
  364. size : int, defaults to None
  365. Number of lines to read. If None, reads whole file.
  366. Returns
  367. -------
  368. DataFrame
  369. """
  370. if size is None:
  371. size = self._chunksize
  372. return self.read(nrows=size)
  373. def _missing_double(self, vec):
  374. v = vec.view(dtype="u1,u1,u2,u4")
  375. miss = (v["f1"] == 0) & (v["f2"] == 0) & (v["f3"] == 0)
  376. miss1 = (
  377. ((v["f0"] >= 0x41) & (v["f0"] <= 0x5A))
  378. | (v["f0"] == 0x5F)
  379. | (v["f0"] == 0x2E)
  380. )
  381. miss &= miss1
  382. return miss
  383. @Appender(_read_method_doc)
  384. def read(self, nrows: int | None = None) -> pd.DataFrame:
  385. if nrows is None:
  386. nrows = self.nobs
  387. read_lines = min(nrows, self.nobs - self._lines_read)
  388. read_len = read_lines * self.record_length
  389. if read_len <= 0:
  390. self.close()
  391. raise StopIteration
  392. raw = self.filepath_or_buffer.read(read_len)
  393. data = np.frombuffer(raw, dtype=self._dtype, count=read_lines)
  394. df_data = {}
  395. for j, x in enumerate(self.columns):
  396. vec = data["s" + str(j)]
  397. ntype = self.fields[j]["ntype"]
  398. if ntype == "numeric":
  399. vec = _handle_truncated_float_vec(vec, self.fields[j]["field_length"])
  400. miss = self._missing_double(vec)
  401. v = _parse_float_vec(vec)
  402. v[miss] = np.nan
  403. elif self.fields[j]["ntype"] == "char":
  404. v = [y.rstrip() for y in vec]
  405. if self._encoding is not None:
  406. v = [y.decode(self._encoding) for y in v]
  407. df_data.update({x: v})
  408. df = pd.DataFrame(df_data)
  409. if self._index is None:
  410. df.index = pd.Index(range(self._lines_read, self._lines_read + read_lines))
  411. else:
  412. df = df.set_index(self._index)
  413. self._lines_read += read_lines
  414. return df