123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- """
- Read a SAS XPort format file into a Pandas DataFrame.
- Based on code from Jack Cushman (github.com/jcushman/xport).
- The file format is defined here:
- https://support.sas.com/content/dam/SAS/support/en/technical-papers/record-layout-of-a-sas-version-5-or-6-data-set-in-sas-transport-xport-format.pdf
- """
- from __future__ import annotations
- from collections import abc
- from datetime import datetime
- import struct
- import warnings
- import numpy as np
- from pandas._typing import (
- CompressionOptions,
- DatetimeNaTType,
- FilePath,
- ReadBuffer,
- )
- from pandas.util._decorators import Appender
- from pandas.util._exceptions import find_stack_level
- import pandas as pd
- from pandas.io.common import get_handle
- from pandas.io.sas.sasreader import ReaderBase
- _correct_line1 = (
- "HEADER RECORD*******LIBRARY HEADER RECORD!!!!!!!"
- "000000000000000000000000000000 "
- )
- _correct_header1 = (
- "HEADER RECORD*******MEMBER HEADER RECORD!!!!!!!000000000000000001600000000"
- )
- _correct_header2 = (
- "HEADER RECORD*******DSCRPTR HEADER RECORD!!!!!!!"
- "000000000000000000000000000000 "
- )
- _correct_obs_header = (
- "HEADER RECORD*******OBS HEADER RECORD!!!!!!!"
- "000000000000000000000000000000 "
- )
- _fieldkeys = [
- "ntype",
- "nhfun",
- "field_length",
- "nvar0",
- "name",
- "label",
- "nform",
- "nfl",
- "num_decimals",
- "nfj",
- "nfill",
- "niform",
- "nifl",
- "nifd",
- "npos",
- "_",
- ]
- _base_params_doc = """\
- Parameters
- ----------
- filepath_or_buffer : str or file-like object
- Path to SAS file or object implementing binary read method."""
- _params2_doc = """\
- index : identifier of index column
- Identifier of column that should be used as index of the DataFrame.
- encoding : str
- Encoding for text data.
- chunksize : int
- Read file `chunksize` lines at a time, returns iterator."""
- _format_params_doc = """\
- format : str
- File format, only `xport` is currently supported."""
- _iterator_doc = """\
- iterator : bool, default False
- Return XportReader object for reading file incrementally."""
- _read_sas_doc = f"""Read a SAS file into a DataFrame.
- {_base_params_doc}
- {_format_params_doc}
- {_params2_doc}
- {_iterator_doc}
- Returns
- -------
- DataFrame or XportReader
- Examples
- --------
- Read a SAS Xport file:
- >>> df = pd.read_sas('filename.XPT')
- Read a Xport file in 10,000 line chunks:
- >>> itr = pd.read_sas('filename.XPT', chunksize=10000)
- >>> for chunk in itr:
- >>> do_something(chunk)
- """
- _xport_reader_doc = f"""\
- Class for reading SAS Xport files.
- {_base_params_doc}
- {_params2_doc}
- Attributes
- ----------
- member_info : list
- Contains information about the file
- fields : list
- Contains information about the variables in the file
- """
- _read_method_doc = """\
- Read observations from SAS Xport file, returning as data frame.
- Parameters
- ----------
- nrows : int
- Number of rows to read from data file; if None, read whole
- file.
- Returns
- -------
- A DataFrame.
- """
- def _parse_date(datestr: str) -> DatetimeNaTType:
- """Given a date in xport format, return Python date."""
- try:
- # e.g. "16FEB11:10:07:55"
- return datetime.strptime(datestr, "%d%b%y:%H:%M:%S")
- except ValueError:
- return pd.NaT
- def _split_line(s: str, parts):
- """
- Parameters
- ----------
- s: str
- Fixed-length string to split
- parts: list of (name, length) pairs
- Used to break up string, name '_' will be filtered from output.
- Returns
- -------
- Dict of name:contents of string at given location.
- """
- out = {}
- start = 0
- for name, length in parts:
- out[name] = s[start : start + length].strip()
- start += length
- del out["_"]
- return out
- def _handle_truncated_float_vec(vec, nbytes):
- # This feature is not well documented, but some SAS XPORT files
- # have 2-7 byte "truncated" floats. To read these truncated
- # floats, pad them with zeros on the right to make 8 byte floats.
- #
- # References:
- # https://github.com/jcushman/xport/pull/3
- # The R "foreign" library
- if nbytes != 8:
- vec1 = np.zeros(len(vec), np.dtype("S8"))
- dtype = np.dtype(f"S{nbytes},S{8 - nbytes}")
- vec2 = vec1.view(dtype=dtype)
- vec2["f0"] = vec
- return vec2
- return vec
- def _parse_float_vec(vec):
- """
- Parse a vector of float values representing IBM 8 byte floats into
- native 8 byte floats.
- """
- dtype = np.dtype(">u4,>u4")
- vec1 = vec.view(dtype=dtype)
- xport1 = vec1["f0"]
- xport2 = vec1["f1"]
- # Start by setting first half of ieee number to first half of IBM
- # number sans exponent
- ieee1 = xport1 & 0x00FFFFFF
- # The fraction bit to the left of the binary point in the ieee
- # format was set and the number was shifted 0, 1, 2, or 3
- # places. This will tell us how to adjust the ibm exponent to be a
- # power of 2 ieee exponent and how to shift the fraction bits to
- # restore the correct magnitude.
- shift = np.zeros(len(vec), dtype=np.uint8)
- shift[np.where(xport1 & 0x00200000)] = 1
- shift[np.where(xport1 & 0x00400000)] = 2
- shift[np.where(xport1 & 0x00800000)] = 3
- # shift the ieee number down the correct number of places then
- # set the second half of the ieee number to be the second half
- # of the ibm number shifted appropriately, ored with the bits
- # from the first half that would have been shifted in if we
- # could shift a double. All we are worried about are the low
- # order 3 bits of the first half since we're only shifting by
- # 1, 2, or 3.
- ieee1 >>= shift
- ieee2 = (xport2 >> shift) | ((xport1 & 0x00000007) << (29 + (3 - shift)))
- # clear the 1 bit to the left of the binary point
- ieee1 &= 0xFFEFFFFF
- # set the exponent of the ieee number to be the actual exponent
- # plus the shift count + 1023. Or this into the first half of the
- # ieee number. The ibm exponent is excess 64 but is adjusted by 65
- # since during conversion to ibm format the exponent is
- # incremented by 1 and the fraction bits left 4 positions to the
- # right of the radix point. (had to add >> 24 because C treats &
- # 0x7f as 0x7f000000 and Python doesn't)
- ieee1 |= ((((((xport1 >> 24) & 0x7F) - 65) << 2) + shift + 1023) << 20) | (
- xport1 & 0x80000000
- )
- ieee = np.empty((len(ieee1),), dtype=">u4,>u4")
- ieee["f0"] = ieee1
- ieee["f1"] = ieee2
- ieee = ieee.view(dtype=">f8")
- ieee = ieee.astype("f8")
- return ieee
- class XportReader(ReaderBase, abc.Iterator):
- __doc__ = _xport_reader_doc
- def __init__(
- self,
- filepath_or_buffer: FilePath | ReadBuffer[bytes],
- index=None,
- encoding: str | None = "ISO-8859-1",
- chunksize=None,
- compression: CompressionOptions = "infer",
- ) -> None:
- self._encoding = encoding
- self._lines_read = 0
- self._index = index
- self._chunksize = chunksize
- self.handles = get_handle(
- filepath_or_buffer,
- "rb",
- encoding=encoding,
- is_text=False,
- compression=compression,
- )
- self.filepath_or_buffer = self.handles.handle
- try:
- self._read_header()
- except Exception:
- self.close()
- raise
- def close(self) -> None:
- self.handles.close()
- def _get_row(self):
- return self.filepath_or_buffer.read(80).decode()
- def _read_header(self):
- self.filepath_or_buffer.seek(0)
- # read file header
- line1 = self._get_row()
- if line1 != _correct_line1:
- if "**COMPRESSED**" in line1:
- # this was created with the PROC CPORT method and can't be read
- # https://documentation.sas.com/doc/en/pgmsascdc/9.4_3.5/movefile/p1bm6aqp3fw4uin1hucwh718f6kp.htm
- raise ValueError(
- "Header record indicates a CPORT file, which is not readable."
- )
- raise ValueError("Header record is not an XPORT file.")
- line2 = self._get_row()
- fif = [["prefix", 24], ["version", 8], ["OS", 8], ["_", 24], ["created", 16]]
- file_info = _split_line(line2, fif)
- if file_info["prefix"] != "SAS SAS SASLIB":
- raise ValueError("Header record has invalid prefix.")
- file_info["created"] = _parse_date(file_info["created"])
- self.file_info = file_info
- line3 = self._get_row()
- file_info["modified"] = _parse_date(line3[:16])
- # read member header
- header1 = self._get_row()
- header2 = self._get_row()
- headflag1 = header1.startswith(_correct_header1)
- headflag2 = header2 == _correct_header2
- if not (headflag1 and headflag2):
- raise ValueError("Member header not found")
- # usually 140, could be 135
- fieldnamelength = int(header1[-5:-2])
- # member info
- mem = [
- ["prefix", 8],
- ["set_name", 8],
- ["sasdata", 8],
- ["version", 8],
- ["OS", 8],
- ["_", 24],
- ["created", 16],
- ]
- member_info = _split_line(self._get_row(), mem)
- mem = [["modified", 16], ["_", 16], ["label", 40], ["type", 8]]
- member_info.update(_split_line(self._get_row(), mem))
- member_info["modified"] = _parse_date(member_info["modified"])
- member_info["created"] = _parse_date(member_info["created"])
- self.member_info = member_info
- # read field names
- types = {1: "numeric", 2: "char"}
- fieldcount = int(self._get_row()[54:58])
- datalength = fieldnamelength * fieldcount
- # round up to nearest 80
- if datalength % 80:
- datalength += 80 - datalength % 80
- fielddata = self.filepath_or_buffer.read(datalength)
- fields = []
- obs_length = 0
- while len(fielddata) >= fieldnamelength:
- # pull data for one field
- fieldbytes, fielddata = (
- fielddata[:fieldnamelength],
- fielddata[fieldnamelength:],
- )
- # rest at end gets ignored, so if field is short, pad out
- # to match struct pattern below
- fieldbytes = fieldbytes.ljust(140)
- fieldstruct = struct.unpack(">hhhh8s40s8shhh2s8shhl52s", fieldbytes)
- field = dict(zip(_fieldkeys, fieldstruct))
- del field["_"]
- field["ntype"] = types[field["ntype"]]
- fl = field["field_length"]
- if field["ntype"] == "numeric" and ((fl < 2) or (fl > 8)):
- msg = f"Floating field width {fl} is not between 2 and 8."
- raise TypeError(msg)
- for k, v in field.items():
- try:
- field[k] = v.strip()
- except AttributeError:
- pass
- obs_length += field["field_length"]
- fields += [field]
- header = self._get_row()
- if not header == _correct_obs_header:
- raise ValueError("Observation header not found.")
- self.fields = fields
- self.record_length = obs_length
- self.record_start = self.filepath_or_buffer.tell()
- self.nobs = self._record_count()
- self.columns = [x["name"].decode() for x in self.fields]
- # Setup the dtype.
- dtypel = [
- ("s" + str(i), "S" + str(field["field_length"]))
- for i, field in enumerate(self.fields)
- ]
- dtype = np.dtype(dtypel)
- self._dtype = dtype
- def __next__(self) -> pd.DataFrame:
- return self.read(nrows=self._chunksize or 1)
- def _record_count(self) -> int:
- """
- Get number of records in file.
- This is maybe suboptimal because we have to seek to the end of
- the file.
- Side effect: returns file position to record_start.
- """
- self.filepath_or_buffer.seek(0, 2)
- total_records_length = self.filepath_or_buffer.tell() - self.record_start
- if total_records_length % 80 != 0:
- warnings.warn(
- "xport file may be corrupted.",
- stacklevel=find_stack_level(),
- )
- if self.record_length > 80:
- self.filepath_or_buffer.seek(self.record_start)
- return total_records_length // self.record_length
- self.filepath_or_buffer.seek(-80, 2)
- last_card_bytes = self.filepath_or_buffer.read(80)
- last_card = np.frombuffer(last_card_bytes, dtype=np.uint64)
- # 8 byte blank
- ix = np.flatnonzero(last_card == 2314885530818453536)
- if len(ix) == 0:
- tail_pad = 0
- else:
- tail_pad = 8 * len(ix)
- self.filepath_or_buffer.seek(self.record_start)
- return (total_records_length - tail_pad) // self.record_length
- def get_chunk(self, size=None) -> pd.DataFrame:
- """
- Reads lines from Xport file and returns as dataframe
- Parameters
- ----------
- size : int, defaults to None
- Number of lines to read. If None, reads whole file.
- Returns
- -------
- DataFrame
- """
- if size is None:
- size = self._chunksize
- return self.read(nrows=size)
- def _missing_double(self, vec):
- v = vec.view(dtype="u1,u1,u2,u4")
- miss = (v["f1"] == 0) & (v["f2"] == 0) & (v["f3"] == 0)
- miss1 = (
- ((v["f0"] >= 0x41) & (v["f0"] <= 0x5A))
- | (v["f0"] == 0x5F)
- | (v["f0"] == 0x2E)
- )
- miss &= miss1
- return miss
- @Appender(_read_method_doc)
- def read(self, nrows: int | None = None) -> pd.DataFrame:
- if nrows is None:
- nrows = self.nobs
- read_lines = min(nrows, self.nobs - self._lines_read)
- read_len = read_lines * self.record_length
- if read_len <= 0:
- self.close()
- raise StopIteration
- raw = self.filepath_or_buffer.read(read_len)
- data = np.frombuffer(raw, dtype=self._dtype, count=read_lines)
- df_data = {}
- for j, x in enumerate(self.columns):
- vec = data["s" + str(j)]
- ntype = self.fields[j]["ntype"]
- if ntype == "numeric":
- vec = _handle_truncated_float_vec(vec, self.fields[j]["field_length"])
- miss = self._missing_double(vec)
- v = _parse_float_vec(vec)
- v[miss] = np.nan
- elif self.fields[j]["ntype"] == "char":
- v = [y.rstrip() for y in vec]
- if self._encoding is not None:
- v = [y.decode(self._encoding) for y in v]
- df_data.update({x: v})
- df = pd.DataFrame(df_data)
- if self._index is None:
- df.index = pd.Index(range(self._lines_read, self._lines_read + read_lines))
- else:
- df = df.set_index(self._index)
- self._lines_read += read_lines
- return df
|