Parses portfolio related IL outputs to Excel
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
PortfolioParser/ILParser.py

358 lines
12 KiB

from pandas import DataFrame
import re
from re import Match, Pattern
from logging import getLogger, basicConfig
from json import load, dump
from typing import TypeAlias, TypeVar
import pathlib as pl
# The raw text of an infotreieve report
RawReport: TypeAlias = str
# The raw report broken into lines
ReportLines: TypeAlias = list[str]
# Row with the column name data
HeaderRowStr: TypeAlias = str
# Row with actual data
DataRow : TypeAlias = str
# A list of the rows of data
DataRows: TypeAlias = list[DataRow]
#
HeaderDict: TypeAlias = dict[str:list[str]]
#[ ] Add logging to the report processor
logger = getLogger(__name__)
logger.setLevel("DEBUG")
def remove_lower_adjacent(nums: list[int]) -> list[int]:
filtered = [nums[0]]
for i in range(1, len(nums)):
if nums[i] - nums[i - 1] > 1:
filtered.append(nums[i])
return filtered
def replace_bad_cols(line: str, cols: list[str]) -> str:
"""
Replaces bad column names in a string with modified names that have spaces replaced with dots.
Args:
line (str): The string containing the column names to modify.
cols (list[str]): A list of column names to modify.
Returns:
str: The modified string with bad column names replaced.
"""
logger.debug(f"Line: {line} | Cols: {cols}")
for c in cols:
# Create a regex for the col
col_regex: Pattern = re.compile(c.replace(' ', r'(?:\s|\.)'))
logger.debug(f"Col_regex: {col_regex}")
# Get all columns that match that pattern
col_matches: list[str|tuple[str]] = re.findall(col_regex, line)
logger.debug(f"Col_matches: {col_matches}")
# Match the substition for all matches if any
col_name: str
for col_name in col_matches:
logger.debug(f"col_name: {col_name}")
# Replace the bad column name with the modified column name in the string
# Adding the '.' instead of a space helps the parser tell what the continous
# column are
line = line.replace(col_name, col_name.replace(' ', '.'))
return line
class InvalidReport(Exception):
pass
class Header:
def __init__(self, header_row: HeaderRowStr, row_start_pos: int, row_end_pos: int) -> None:
row_start_pos += 1
self.name: str = header_row[row_start_pos:row_end_pos].strip()
self.start: int = row_start_pos
self.end: int = row_end_pos
def __str__(self) -> str:
return f"( Header Name: '{self.name}' -> {self.start}:{self.end} )"
def __repr__(self) -> str:
return self.__str__()
def extract_from_row(self, data_row: DataRow) -> tuple[str, str]:
try:
value: str = data_row[self.start : self.end]
except IndexError:
value = None
value: str = value.strip()
if value == '':
value = None
return self.name, value
class HeaderRow:
def __init__(self, header_row: HeaderRowStr, data_rows: DataRows, bad_col_list: list[str]) -> None:
logger.debug(f"Initializing HeaderRow with header_row: {header_row}, data_rows: {data_rows}")
self.header_row = replace_bad_cols(header_row, bad_col_list)
columns_breaks: list[int] = self._validate_columns(data_rows)
logger.debug(f"Columns breaks: {columns_breaks}")
self.headers: list[Header] = []
self._create_columns(columns_breaks)
def _get_spaces(self) -> list[int]:
# Regex to find spaces and returnt he middle with 'space' group
SPACE_REGEX: Pattern = re.compile(r"[^\s]\s[^\s]")
space_matches: list[Match] = re.finditer(SPACE_REGEX, self.header_row)
# Get the int position of the space
space_locations: list[int] = [s.start()+1 for s in space_matches]
logger.debug(f"Space Locations: {space_locations}")
# Remove any spaces that are adjacent, keeping that larger one
space_locations: list[int] = remove_lower_adjacent(space_locations)
return space_locations
def _validate_columns(self, data_lines: DataRows) -> list[int]:
logger.debug(f"Validating columns for data_lines: {data_lines}")
# Get a list of potential column breaks
column_breaks: list[int] = self._get_spaces()
row: str
for row in data_lines:
# Check each of the column positions for values
cb: int # Column Break
for cb in column_breaks:
# If the row is not long enough, the value is blank
if len(row) <= cb:
continue
# If the value is not blank or a space, the this is not a
# column delimatator
elif row[cb] != ' ':
logger.debug(f"Remove CB {cb} | '{row[cb]}' -> {row}")
# Remove column breaks that are not actually empty
column_breaks.remove(cb)
return column_breaks
def _create_columns(self, column_breaks: list[int]) -> list[Header]:
logger.debug(f"Creating columns with column_breaks: {column_breaks}")
# Get the column/data names and their position spans
col_start: int = -1
# Add the end of the line so that we can capture the last column
column_breaks.append(len(self.header_row))
header_names: list[str] = []
# Create a header for each column break
cb: int
for cb in column_breaks:
# Don't try to make a header if there are not enough
# characters in the line
if col_start >= len(self.header_row):
break
header: Header = Header(
header_row= self.header_row,
row_start_pos= col_start,
row_end_pos= cb
)
# Handle duplicate columns
if header.name in header_names:
logger.debug(f"Found Matching header name: {header.name}")
header.name = header.name + f"_{header_names.count(header.name)+1}"
header_names.append(header.name)
self.headers.append(header)
col_start = cb
logger.debug(f"Created headers: {self.headers}")
if len(self.headers) < 1:
raise InvalidReport(f"No headers found in report! Header Row: {self.header_row} | CBs: {column_breaks}")
class InfoTreieveReport:
def __init__(self, raw_report: RawReport, bad_col_list: list[str]) -> None:
"""
Args:
raw_report (str): an unprocessed infotreive report
Raises:
InvalidReportError: Program failed to find the header or end row
"""
self.raw_report: RawReport = raw_report
# Find the row after the last data row
# also has info about expected data rows
end_row_index, self.num_records = self._find_end_row(raw_report)
# Find the header row
header_row_index: int = self._find_header_row(raw_report)
# Split the report by lines
self.full_report_lines: list[str] = raw_report.splitlines()
# Get a list of the rows with actual data
self.raw_data: DataRows = self.full_report_lines[
header_row_index + 2 : end_row_index ]
# Find the columns for each row
self.header_row: HeaderRow = HeaderRow(
header_row= self.full_report_lines[header_row_index],
data_rows= self.raw_data,
bad_col_list= bad_col_list
)
@staticmethod
def _find_end_row(text: RawReport) -> tuple[int, int]:
"""
Finds the row below the last line of data using regex.
Returns:
- row index of end row (int)
- number_of_records in report (int)
## Exception: InvalidReport
Raises an 'InvalidReport' exception if no end row is found.
"""
logger.debug(f"Finding end row in text: {text}")
END_ROW_REGEX = re.compile("^(?P<n_records>\d+) records listed$")
lines_from_bottom: list[str] = text.splitlines()
lines_from_bottom.reverse()
index: int
line: str
for index, line in enumerate(lines_from_bottom):
row_regex: Match|None = re.search(END_ROW_REGEX, line)
if row_regex:
number_records: int = int(row_regex.group("n_records"))
logger.debug(f"End row found at index {len(lines_from_bottom)-index-1} with {number_records} records")
return len(lines_from_bottom)-index-1, number_records
raise InvalidReport(f"No end row found! Search regex: {END_ROW_REGEX}")
@staticmethod
def _find_header_row(text: RawReport) -> int:
header_row = None
greatest_filed_space: int = 0
# Find the row with the least blank space
index: int
row: str
for index, row in enumerate(text.splitlines()):
# Spaces do not count
row_size: int = len(row.replace(' ', ''))
if row_size > greatest_filed_space:
greatest_filed_space = row_size
header_row = index
logger.debug(f"Header row found at index {header_row}")
return header_row
def process(self) -> DataFrame:
"""
Raises:
KeyError: Header key not found in header dict
ValueError: Some headers did not return as many values
"""
self.report_data: HeaderDict = {}
# Get get the data from each data row
data_row: DataRow
for data_row in self.raw_data:
header: Header
for header in self.header_row.headers:
column, value = header.extract_from_row(data_row)
try:
self.report_data[column].append(value)
except KeyError:
self.report_data[column] = [value]
try:
logger.debug(f"Processed data: {self.report_data}")
processed_data: DataFrame = DataFrame(self.report_data)
except ValueError as ve:
#TODO log this
len_dict: dict = {
col: len(cl) for
col, cl in self.report_data.items()
}
logger.exception(f"Lengths:\n{len_dict}")
raise ve
return processed_data
class FlippedReport:
def __init__(self, raw_report: RawReport, bad_cols: list[str]) -> None:
self.rr: RawReport = raw_report
self.report_lines: list[str] = raw_report.splitlines()
self.divider_column = self.find_common_first_space(self.report_lines)
@staticmethod
def find_common_first_space(lines: list[str]) -> int:
min_space_index = None
for line in lines:
space_indices = [index for index, char in enumerate(line)
if char == ' ' and index != 0]
# If there's no space in the line, we cannot find a common space index
if not space_indices:
return -1
current_line_min_space_index = min(space_indices)
if min_space_index is None or current_line_min_space_index > min_space_index:
min_space_index = current_line_min_space_index
return min_space_index
def process(self) -> DataFrame:
report_data = {}
headers_seen = []
for line in self.report_lines:
# Restart the headers
if line == '':
headers_seen = []
if len(line) < self.divider_column:
continue
header = line[0:self.divider_column].strip()
if header in headers_seen:
header = header + f"_{headers_seen.count(header)}+1"
try:
value = line[self.divider_column:].strip()
except IndexError:
value = None
try:
report_data[header].append(value)
except KeyError:
report_data[header] = [value]
return DataFrame(report_data)