From 1c6ec6370b9919e9d9effed7add83c92eccc4a54 Mon Sep 17 00:00:00 2001 From: = <=> Date: Sat, 22 Apr 2023 00:35:43 -0400 Subject: [PATCH] Reworked the column detection to make it more flexible. Added flipped report for CUST and FIN, but it's untested... --- IL Formatter.py | 31 ++++- ILParser.py | 357 +++++++++++++++++++++++++++++++++++++++++------- config.json | 2 +- config.toml | 0 todo.txt | 13 +- 5 files changed, 343 insertions(+), 60 deletions(-) create mode 100644 config.toml diff --git a/IL Formatter.py b/IL Formatter.py index bf3094e..2b65faf 100644 --- a/IL Formatter.py +++ b/IL Formatter.py @@ -10,7 +10,7 @@ from os import startfile from json import load, dump from time import sleep -import ILParser +from ILParser import InfoTreieveReport, FlippedReport # Open the config file, create a dict, and set up logging @@ -256,7 +256,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): debug(f"Parse Columns:\n{config['COLS']}") try: - data: DataFrame = ILParser.extract_data(report, config["COLS"]) + it_report: InfoTreieveReport = InfoTreieveReport(report, config["COLS"]) + data: DataFrame = it_report.process() except Exception as e: self.processButton.setEnabled(False) logException(f"Failed to parse file-> {filePath} :\n{e}") @@ -286,19 +287,18 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): self.assetFile = None return None - custDf: DataFrame = self._parse_file(self.custFile) + custDf: DataFrame = self._parse_file(self.custFile) debug(custDf) if type(custDf) != DataFrame: self.custLe.setText("") self.custFile = None - return None + #FIXME return None dobDf: DataFrame = self._parse_file(self.dobFile) debug(dobDf) if type(dobDf) != DataFrame: - debug(f"Parse Columns: {ILParser.DOB_COL}") self.dobLE.setText("") self.dobFile = None return None @@ -309,7 +309,20 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): if type(finDf) != DataFrame: self.finLE.setText("") self.finFile = None - return None + #FIXME return None + + bad_cols: list[str] = config['COLS'] + successful_new: dict[str:DataFrame] = [] + for id, report_file in [self.custFile, self.finFile]: + try: + with open(report_file) as file: + report = file.read() + flipped_report: FlippedReport = FlippedReport(report, bad_cols) + flipped_df: DataFrame = flipped_report.process() + if not flipped_df.empty: + successful_new["NEW_CUST" if id == 0 else "NEW_FIN"] = flipped_df + except: + pass try: with ExcelWriter(self.outputLocation) as writer: @@ -317,6 +330,12 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow): custDf.to_excel(writer, sheet_name="CUST", index=False) assetDf.to_excel(writer, sheet_name="ASSET", index=False) dobDf.to_excel(writer, sheet_name="DOB", index=False) + + key: str + df: DataFrame + for key, df in successful_new.items(): + df.to_excel(writer, sheet_name=key, index=False) + except Exception as e: logException(f"{now()} | Failed to write to excel -> {self.outputLocation} :\n{e}") open_error_dialog("Failed to Create Excel", f"Failed to write to excel -> {self.outputLocation}", repr(e)) diff --git a/ILParser.py b/ILParser.py index eded524..a7eada2 100644 --- a/ILParser.py +++ b/ILParser.py @@ -3,13 +3,36 @@ import re from re import Match, Pattern from logging import getLogger, basicConfig from json import load, dump +from typing import TypeAlias, TypeVar +import pathlib as pl +# The raw text of an infotreieve report +RawReport: TypeAlias = str +# The raw report broken into lines +ReportLines: TypeAlias = list[str] +# Row with the column name data +HeaderRowStr: TypeAlias = str +# Row with actual data +DataRow : TypeAlias = str +# A list of the rows of data +DataRows: TypeAlias = list[DataRow] +# +HeaderDict: TypeAlias = dict[str:list[str]] + +#[ ] Add logging to the report processor logger = getLogger(__name__) logger.setLevel("DEBUG") -COLUMN_NAME_REGEX = re.compile(r"(?P(\w|\.|#|\/)+)", re.IGNORECASE) +def remove_lower_adjacent(nums: list[int]) -> list[int]: + + filtered = [nums[0]] + for i in range(1, len(nums)): + if nums[i] - nums[i - 1] > 1: + filtered.append(nums[i]) + + return filtered def replace_bad_cols(line: str, cols: list[str]) -> str: """ @@ -44,62 +67,292 @@ def replace_bad_cols(line: str, cols: list[str]) -> str: return line -def extract_data(input_doc: str, column_list: list[str]) -> DataFrame|None: - """ - Extracts data from a string in a table-like format, where columns are identified by a list of column names, and - returns the data as a Pandas DataFrame. +class InvalidReport(Exception): + pass - Args: - input_doc (str): The string containing the table-like data to extract. - column_list (list[str]): A list of column names to identify the columns in the table-like data. - Returns: - pandas.DataFrame: A DataFrame containing the extracted data from the input string. - """ - line: str - columns = {} - data = {} - for line in input_doc.splitlines(): - if len(columns) == 0 : - logger.debug(f"Columns = 0: {line}") - # Find the line that contains the column names and replace bad column names - if re.search("^\w", line): - logger.debug("Found word on first line.") - line = replace_bad_cols(line, column_list) - logger.debug(f"Column replacements made: {line}") - # Find the start and end positions of each column name and store them in a dictionary - columns_names = re.finditer(COLUMN_NAME_REGEX, line) - logger.debug(f"Found column names: {columns_names}") - for c in columns_names: - columns[c.group("column_name")] = {"start": c.start(), "end": c.end()} - logger.debug(f"Column section: {columns[c.group('column_name')]}") - data[c.group("column_name")] = [] - continue - elif len(line) < 2: - logger.debug(f"Line len less than 2.") - continue - # Check if we've reached the end of the table and return the data - if re.search("\d+ records listed", line): - logger.debug(f"End of document: {line}") - logger.debug(f"Extracted data: {data}") - return DataFrame(data) - # Extract the data from each column based on the start and end positions - for key, span in columns.items(): - data[key].append(line[span["start"]:span["end"]].strip()) +class Header: + + def __init__(self, header_row: HeaderRowStr, row_start_pos: int, row_end_pos: int) -> None: + + row_start_pos += 1 + self.name: str = header_row[row_start_pos:row_end_pos].strip() + self.start: int = row_start_pos + self.end: int = row_end_pos + + def __str__(self) -> str: + return f"( Header Name: '{self.name}' -> {self.start}:{self.end} )" -if __name__ == "__main__": + def __repr__(self) -> str: + return self.__str__() - basicConfig(filename='ILParser.log', encoding='utf-8', - level="DEBUG", filemode='w', force=True) + def extract_from_row(self, data_row: DataRow) -> tuple[str, str]: + + try: + value: str = data_row[self.start : self.end] + except IndexError: + value = None + + value: str = value.strip() + + if value == '': + value = None + + return self.name, value + +class HeaderRow: - def test_replace_bad_cols(): + def __init__(self, header_row: HeaderRowStr, data_rows: DataRows, bad_col_list: list[str]) -> None: + logger.debug(f"Initializing HeaderRow with header_row: {header_row}, data_rows: {data_rows}") + self.header_row = replace_bad_cols(header_row, bad_col_list) + columns_breaks: list[int] = self._validate_columns(data_rows) + logger.debug(f"Columns breaks: {columns_breaks}") + self.headers: list[Header] = [] + self._create_columns(columns_breaks) + + def _get_spaces(self) -> list[int]: - with open("Inputs\CUST_ISSUE") as c: - input: str = c.read() - with open("config.json") as configFile: - config: dict = load(configFile) - columns: list[str] = config["COLS"] + # Regex to find spaces and returnt he middle with 'space' group + SPACE_REGEX: Pattern = re.compile(r"[^\s]\s[^\s]") + space_matches: list[Match] = re.finditer(SPACE_REGEX, self.header_row) + # Get the int position of the space + space_locations: list[int] = [s.start()+1 for s in space_matches] + logger.debug(f"Space Locations: {space_locations}") + # Remove any spaces that are adjacent, keeping that larger one + space_locations: list[int] = remove_lower_adjacent(space_locations) + return space_locations + + def _validate_columns(self, data_lines: DataRows) -> list[int]: + logger.debug(f"Validating columns for data_lines: {data_lines}") + # Get a list of potential column breaks + column_breaks: list[int] = self._get_spaces() + + row: str + for row in data_lines: + # Check each of the column positions for values + cb: int # Column Break + for cb in column_breaks: + # If the row is not long enough, the value is blank + if len(row) <= cb: + continue + # If the value is not blank or a space, the this is not a + # column delimatator + elif row[cb] != ' ': + logger.debug(f"Remove CB {cb} | '{row[cb]}' -> {row}") + # Remove column breaks that are not actually empty + column_breaks.remove(cb) + return column_breaks + + def _create_columns(self, column_breaks: list[int]) -> list[Header]: + logger.debug(f"Creating columns with column_breaks: {column_breaks}") + # Get the column/data names and their position spans + col_start: int = -1 + # Add the end of the line so that we can capture the last column + column_breaks.append(len(self.header_row)) + + header_names: list[str] = [] + # Create a header for each column break + cb: int + for cb in column_breaks: + + # Don't try to make a header if there are not enough + # characters in the line + if col_start >= len(self.header_row): + break + + header: Header = Header( + header_row= self.header_row, + row_start_pos= col_start, + row_end_pos= cb + ) + + # Handle duplicate columns + if header.name in header_names: + logger.debug(f"Found Matching header name: {header.name}") + header.name = header.name + f"_{header_names.count(header.name)+1}" + + header_names.append(header.name) + self.headers.append(header) + col_start = cb + + logger.debug(f"Created headers: {self.headers}") + if len(self.headers) < 1: + raise InvalidReport(f"No headers found in report! Header Row: {self.header_row} | CBs: {column_breaks}") + +class InfoTreieveReport: + + def __init__(self, raw_report: RawReport, bad_col_list: list[str]) -> None: + """ + + Args: + raw_report (str): an unprocessed infotreive report - replace_bad_cols(input.splitlines()[1], columns) + Raises: + InvalidReportError: Program failed to find the header or end row + """ + self.raw_report: RawReport = raw_report + + # Find the row after the last data row + # also has info about expected data rows + end_row_index, self.num_records = self._find_end_row(raw_report) + # Find the header row + header_row_index: int = self._find_header_row(raw_report) + + # Split the report by lines + self.full_report_lines: list[str] = raw_report.splitlines() + + # Get a list of the rows with actual data + self.raw_data: DataRows = self.full_report_lines[ + header_row_index + 2 : end_row_index ] + + # Find the columns for each row + self.header_row: HeaderRow = HeaderRow( + header_row= self.full_report_lines[header_row_index], + data_rows= self.raw_data, + bad_col_list= bad_col_list + ) + + @staticmethod + def _find_end_row(text: RawReport) -> tuple[int, int]: + """ + Finds the row below the last line of data using regex. + + Returns: + - row index of end row (int) + - number_of_records in report (int) + + ## Exception: InvalidReport + Raises an 'InvalidReport' exception if no end row is found. + """ + logger.debug(f"Finding end row in text: {text}") + END_ROW_REGEX = re.compile("^(?P\d+) records listed$") + + lines_from_bottom: list[str] = text.splitlines() + lines_from_bottom.reverse() + + index: int + line: str + for index, line in enumerate(lines_from_bottom): + row_regex: Match|None = re.search(END_ROW_REGEX, line) + if row_regex: + number_records: int = int(row_regex.group("n_records")) + logger.debug(f"End row found at index {len(lines_from_bottom)-index-1} with {number_records} records") + return len(lines_from_bottom)-index-1, number_records + + raise InvalidReport(f"No end row found! Search regex: {END_ROW_REGEX}") + + @staticmethod + def _find_header_row(text: RawReport) -> int: + + header_row = None + greatest_filed_space: int = 0 + # Find the row with the least blank space + index: int + row: str + for index, row in enumerate(text.splitlines()): + # Spaces do not count + row_size: int = len(row.replace(' ', '')) + if row_size > greatest_filed_space: + greatest_filed_space = row_size + header_row = index + logger.debug(f"Header row found at index {header_row}") + return header_row + + + def process(self) -> DataFrame: + """ + + Raises: + KeyError: Header key not found in header dict + ValueError: Some headers did not return as many values + """ + + self.report_data: HeaderDict = {} + + # Get get the data from each data row + data_row: DataRow + for data_row in self.raw_data: + header: Header + for header in self.header_row.headers: + column, value = header.extract_from_row(data_row) + try: + self.report_data[column].append(value) + except KeyError: + self.report_data[column] = [value] + + try: + logger.debug(f"Processed data: {self.report_data}") + processed_data: DataFrame = DataFrame(self.report_data) + except ValueError as ve: + #TODO log this + len_dict: dict = { + col: len(cl) for + col, cl in self.report_data.items() + } + logger.exception(f"Lengths:\n{len_dict}") + raise ve + + return processed_data + + +class FlippedReport: + + def __init__(self, raw_report: RawReport, bad_cols: list[str]) -> None: + self.rr: RawReport = raw_report + + self.report_lines: list[str] = raw_report.splitlines() + self.divider_column = self.find_common_first_space(self.report_lines) + + + @staticmethod + def find_common_first_space(lines: list[str]) -> int: + min_space_index = None + + for line in lines: + space_indices = [index for index, char in enumerate(line) + if char == ' ' and index != 0] + + # If there's no space in the line, we cannot find a common space index + if not space_indices: + return -1 + + current_line_min_space_index = min(space_indices) + if min_space_index is None or current_line_min_space_index > min_space_index: + min_space_index = current_line_min_space_index + + return min_space_index - test_replace_bad_cols() \ No newline at end of file + def process(self) -> DataFrame: + + report_data = {} + + headers_seen = [] + + for line in self.report_lines: + + # Restart the headers + if line == '': + headers_seen = [] + + if len(line) < self.divider_column: + continue + header = line[0:self.divider_column].strip() + if header in headers_seen: + header = header + f"_{headers_seen.count(header)}+1" + try: + value = line[self.divider_column:].strip() + except IndexError: + value = None + + try: + report_data[header].append(value) + except KeyError: + report_data[header] = [value] + + return DataFrame(report_data) + + + + + + diff --git a/config.json b/config.json index 8ac3329..cb12cd9 100644 --- a/config.json +++ b/config.json @@ -1 +1 @@ -{"loggingLevel": "ERROR", "directories": {"ASSET": "", "CUST": "", "DOB": "", "FIN": "", "output": ""}, "COLS": ["CUST ID", "CONTRACT NO", "BUSINESS TYPE", "FED ID", "CUST CREDIT ACCT", "CUSTOMER", "LEASE TYPE", "EQUIPMENT COST", "CBR", "NET INVESTMENT", "ANNUAL COMBINED IRR", "CONTRACT TERM", "INCOME START DATE", "FIRST PYMT DATE", "FIRST PYMT AMT", "CONTRACT PYMT", "INVOICE CODE", "INV DAYS", "INV DUE DAY", "SEC DEPOSIT", "IDC AMOUNTS", "IDC DATES", "RESIDUAL", "MANAGERS RESIDUAL", "PROMOTION", "PRODUCT LINE", "REGION", "REGION DESC", "BRANCH", "BUSINESS SEGMENT", "LEAD BANK", "MRKTNG REP", "MRKTNG REGION", "REMIT TO", "PYMT OPTION", "BANK CODE", "TAPE BANK NUM", "TAPE ACCOUNT NUM", "TAPE ACCT TYPE", "DEALER", "PRIVATE LABEL", "RESID METHOD", "LATE CHRG EXMPT", "INSURANCE CODE", "VARIABLE DATE", "VARIABLE RATE", "BILLING CYCLE", "UM USER DATE\\d?", "CR ATTG PHONE", "GROSS CONTRACT", "ADV ", "PD AMT FINANCED", "PD INCOME START DATE", "INVOICE DESC", "VARIABLE PYMT CODE", "PD PAYMENT AMT", "QUOTE BUYOUT", "LATE CHARGE CODE", "LATE CHRG RATE", "M DEF COLLECTOR", "AM ACH LEAD DAYS", "UNL POOL", "PD RISK DATE", "PD RISK", "LGD RISK", "LGD DATE", "Service By Others", "CONTRACT NO", "CUST CREDIT ACCT", "CUST ID", "CUST NAME", "UATB CUST DBA", "UATB CUST ADDRESS\\d \\d{2}", "CUST CITY", "CUST STATE", "CUST ZIP", "GUAR CODE \\d", "PRIN\\d?/GUAR NAME \\d", "PRIN\\d? ADDR?\\d", "PRIN\\d? CITY\\d", "PRIN\\d? ST \\d", "ZIP \\d", "FED ID/SS#\\d", "BILLING NAME", "UATB AR ADDRESS\\d \\d{2}", "AR CITY", "AR STATE", "AR ZIP", "AR ATTN", "UATB CR ATTG NAME\\d{2}", "CR SCORING", "FACILITY SCORE", "SIC CODE", "ASSET #", "EQUIP DESC", "QUANTITY", "NEW USED", "MODEL", "A MANUFACTURER YEAR", "SERIAL NUMBER", "EQUIP CODE", "EQUIP CODE DESC", "ASSET VENDOR", "ASSET VENDOR NAME", "MANUFACTURER", "MANUFACT NAME", "UATB EQUIP ADDR\\d \\d{2}", "EQUIP CITY", "EQUIP STATE", "EQUIP ZIP", "STATE TAX CODE", "CNTY TAX CODE", "CITY TAX CODE", "PROP STATUS", "EQUIP COST", "EQUIP COST PCT", "PUR OPTION", "PUR OPTION", "AS RECOURSE CODE", "RESID AMT", "BEG DEPR DATE", "OPER LS BEGIN DATE", "OPER LS LIM", "OPER LS SALVAGE", "PRIN/GUAR NAME \\d", "DOB\\d", "GUAR CODE \\d", "PRIN/GUAR NAME \\d"]} \ No newline at end of file +{"loggingLevel": "ERROR", "directories": {"ASSET": "", "CUST": "", "DOB": "", "FIN": "", "output": ""}, "COLS": ["CUST ID", "CONTRACT NO", "BUSINESS TYPE", "FED ID", "CUST CREDIT ACCT", "CUSTOMER", "LEASE TYPE", "EQUIPMENT COST", "CBR", "NET INVESTMENT", "ANNUAL COMBINED IRR", "CONTRACT TERM", "INCOME START DATE", "FIRST PYMT DATE", "FIRST PYMT AMT", "CONTRACT PYMT", "INVOICE CODE", "INV DAYS", "INV DUE DAY", "SEC DEPOSIT", "IDC AMOUNTS", "IDC DATES", "RESIDUAL", "MANAGERS RESIDUAL", "PROMOTION", "PRODUCT LINE", "REGION", "REGION DESC", "BRANCH", "BUSINESS SEGMENT", "LEAD BANK", "MRKTNG REP", "MRKTNG REGION", "REMIT TO", "PYMT OPTION", "BANK CODE", "TAPE BANK NUM", "TAPE ACCOUNT NUM", "TAPE ACCT TYPE", "DEALER", "PRIVATE LABEL", "RESID METHOD", "LATE CHRG EXMPT", "INSURANCE CODE", "VARIABLE DATE", "VARIABLE RATE", "BILLING CYCLE", "UM USER DATE\\d?", "CR ATTG PHONE", "GROSS CONTRACT", "ADV ", "PD AMT FINANCED", "PD INCOME START DATE", "INVOICE DESC", "VARIABLE PYMT CODE", "PD PAYMENT AMT", "QUOTE BUYOUT", "LATE CHARGE CODE", "LATE CHRG RATE", "M DEF COLLECTOR", "AM ACH LEAD DAYS", "UNL POOL", "PD RISK DATE", "PD RISK", "LGD RISK", "LGD DATE", "Service By Others", "CONTRACT NO", "CUST CREDIT ACCT", "CUST ID", "CUST NAME", "UATB CUST DBA", "UATB CUST ADDRESS\\d \\d{2}", "CUST CITY", "CUST STATE", "CUST ZIP", "GUAR CODE \\d", "PRIN\\d?/GUAR NAME \\d", "PRIN\\d? ADDR?\\d", "PRIN\\d? CITY\\d", "PRIN\\d? ST \\d", "ZIP \\d", "FED ID/SS#\\d", "BILLING NAME", "UATB AR ADDRESS\\d \\d{2}", "AR CITY", "AR STATE", "AR ZIP", "AR ATTN", "UATB CR ATTG NAME\\d{2}", "CR SCORING", "FACILITY SCORE", "SIC CODE", "ASSET #", "EQUIP DESC", "QUANTITY", "NEW USED", "MODEL", "A MANUFACTURER YEAR", "SERIAL NUMBER", "EQUIP CODE", "EQUIP CODE DESC", "ASSET VENDOR", "ASSET VENDOR NAME", "MANUFACTURER", "MANUFACT NAME", "UATB EQUIP ADDR\\d \\d{2}", "EQUIP CITY", "EQUIP STATE", "EQUIP ZIP", "STATE TAX CODE", "CNTY TAX CODE", "CITY TAX CODE", "PROP STATUS", "EQUIP COST", "EQUIP COST PCT", "PUR OPTION", "PUR OPTION", "AS RECOURSE CODE", "RESID AMT", "BEG DEPR DATE", "OPER LS BEGIN DATE", "OPER LS LIM", "OPER LS SALVAGE", "PRIN/GUAR NAME \\d", "DOB\\d", "GUAR CODE \\d", "PRIN/GUAR NAME \\d", "PD PURCH ID", "UTAB PURCH DESC", "UATB CD CNTC PHONE"]} \ No newline at end of file diff --git a/config.toml b/config.toml new file mode 100644 index 0000000..e69de29 diff --git a/todo.txt b/todo.txt index 3a6c2e9..3489054 100644 --- a/todo.txt +++ b/todo.txt @@ -3,4 +3,15 @@ [ ] Remove Indiviudal selection [ ] Allow drag & drop (of multiselection) [ ] Notification on completion -[ ] Icons \ No newline at end of file +[ ] Icons + +[ ] Recognize portfolios sections: +For example the following repeats in Customer + GUAR.CODE.1 + PRIN/GUAR NAME 1 + PRIN ADD1 + PRIN ADD2 + PRIN CITY1 + PRIN.ST.1 + ZIP.1 + FED ID/SS#1