Compare commits

...

2 Commits

Author SHA1 Message Date
= 5ca0f8804b
Unified FormatterUi 2 years ago
= 1c6ec6370b
Reworked the column detection to make it more flexible. 3 years ago
  1. 31
      IL Formatter.py
  2. 357
      ILParser.py
  3. 2
      config.json
  4. 0
      config.toml
  5. 13
      todo.txt

@ -10,7 +10,7 @@ from os import startfile
from json import load, dump
from time import sleep
import ILParser
from ILParser import InfoTreieveReport, FlippedReport
# Open the config file, create a dict, and set up logging
@ -256,7 +256,8 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
debug(f"Parse Columns:\n{config['COLS']}")
try:
data: DataFrame = ILParser.extract_data(report, config["COLS"])
it_report: InfoTreieveReport = InfoTreieveReport(report, config["COLS"])
data: DataFrame = it_report.process()
except Exception as e:
self.processButton.setEnabled(False)
logException(f"Failed to parse file-> {filePath} :\n{e}")
@ -286,19 +287,18 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
self.assetFile = None
return None
custDf: DataFrame = self._parse_file(self.custFile)
custDf: DataFrame = self._parse_file(self.custFile)
debug(custDf)
if type(custDf) != DataFrame:
self.custLe.setText("")
self.custFile = None
return None
#FIXME return None
dobDf: DataFrame = self._parse_file(self.dobFile)
debug(dobDf)
if type(dobDf) != DataFrame:
debug(f"Parse Columns: {ILParser.DOB_COL}")
self.dobLE.setText("")
self.dobFile = None
return None
@ -309,7 +309,20 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
if type(finDf) != DataFrame:
self.finLE.setText("")
self.finFile = None
return None
#FIXME return None
bad_cols: list[str] = config['COLS']
successful_new: dict[str:DataFrame] = {}
for id, report_file in enumerate([self.custFile, self.finFile]):
try:
with open(report_file) as file:
report = file.read()
flipped_report: FlippedReport = FlippedReport(report, bad_cols)
flipped_df: DataFrame = flipped_report.process()
if not flipped_df.empty:
successful_new["NEW_CUST" if id == 0 else "NEW_FIN"] = flipped_df
except:
pass
try:
with ExcelWriter(self.outputLocation) as writer:
@ -317,6 +330,12 @@ class MainWindow(QtWidgets.QMainWindow, Ui_MainWindow):
custDf.to_excel(writer, sheet_name="CUST", index=False)
assetDf.to_excel(writer, sheet_name="ASSET", index=False)
dobDf.to_excel(writer, sheet_name="DOB", index=False)
key: str
df: DataFrame
for key, df in successful_new.items():
df.to_excel(writer, sheet_name=key, index=False)
except Exception as e:
logException(f"{now()} | Failed to write to excel -> {self.outputLocation} :\n{e}")
open_error_dialog("Failed to Create Excel", f"Failed to write to excel -> {self.outputLocation}", repr(e))

@ -3,13 +3,36 @@ import re
from re import Match, Pattern
from logging import getLogger, basicConfig
from json import load, dump
from typing import TypeAlias, TypeVar
import pathlib as pl
# The raw text of an infotreieve report
RawReport: TypeAlias = str
# The raw report broken into lines
ReportLines: TypeAlias = list[str]
# Row with the column name data
HeaderRowStr: TypeAlias = str
# Row with actual data
DataRow : TypeAlias = str
# A list of the rows of data
DataRows: TypeAlias = list[DataRow]
#
HeaderDict: TypeAlias = dict[str:list[str]]
#[ ] Add logging to the report processor
logger = getLogger(__name__)
logger.setLevel("DEBUG")
COLUMN_NAME_REGEX = re.compile(r"(?P<column_name>(\w|\.|#|\/)+)", re.IGNORECASE)
def remove_lower_adjacent(nums: list[int]) -> list[int]:
filtered = [nums[0]]
for i in range(1, len(nums)):
if nums[i] - nums[i - 1] > 1:
filtered.append(nums[i])
return filtered
def replace_bad_cols(line: str, cols: list[str]) -> str:
"""
@ -44,62 +67,292 @@ def replace_bad_cols(line: str, cols: list[str]) -> str:
return line
def extract_data(input_doc: str, column_list: list[str]) -> DataFrame|None:
"""
Extracts data from a string in a table-like format, where columns are identified by a list of column names, and
returns the data as a Pandas DataFrame.
class InvalidReport(Exception):
pass
Args:
input_doc (str): The string containing the table-like data to extract.
column_list (list[str]): A list of column names to identify the columns in the table-like data.
Returns:
pandas.DataFrame: A DataFrame containing the extracted data from the input string.
"""
line: str
columns = {}
data = {}
for line in input_doc.splitlines():
if len(columns) == 0 :
logger.debug(f"Columns = 0: {line}")
# Find the line that contains the column names and replace bad column names
if re.search("^\w", line):
logger.debug("Found word on first line.")
line = replace_bad_cols(line, column_list)
logger.debug(f"Column replacements made: {line}")
# Find the start and end positions of each column name and store them in a dictionary
columns_names = re.finditer(COLUMN_NAME_REGEX, line)
logger.debug(f"Found column names: {columns_names}")
for c in columns_names:
columns[c.group("column_name")] = {"start": c.start(), "end": c.end()}
logger.debug(f"Column section: {columns[c.group('column_name')]}")
data[c.group("column_name")] = []
continue
elif len(line) < 2:
logger.debug(f"Line len less than 2.")
continue
# Check if we've reached the end of the table and return the data
if re.search("\d+ records listed", line):
logger.debug(f"End of document: {line}")
logger.debug(f"Extracted data: {data}")
return DataFrame(data)
# Extract the data from each column based on the start and end positions
for key, span in columns.items():
data[key].append(line[span["start"]:span["end"]].strip())
class Header:
def __init__(self, header_row: HeaderRowStr, row_start_pos: int, row_end_pos: int) -> None:
row_start_pos += 1
self.name: str = header_row[row_start_pos:row_end_pos].strip()
self.start: int = row_start_pos
self.end: int = row_end_pos
def __str__(self) -> str:
return f"( Header Name: '{self.name}' -> {self.start}:{self.end} )"
if __name__ == "__main__":
def __repr__(self) -> str:
return self.__str__()
basicConfig(filename='ILParser.log', encoding='utf-8',
level="DEBUG", filemode='w', force=True)
def extract_from_row(self, data_row: DataRow) -> tuple[str, str]:
try:
value: str = data_row[self.start : self.end]
except IndexError:
value = None
value: str = value.strip()
if value == '':
value = None
return self.name, value
class HeaderRow:
def test_replace_bad_cols():
def __init__(self, header_row: HeaderRowStr, data_rows: DataRows, bad_col_list: list[str]) -> None:
logger.debug(f"Initializing HeaderRow with header_row: {header_row}, data_rows: {data_rows}")
self.header_row = replace_bad_cols(header_row, bad_col_list)
columns_breaks: list[int] = self._validate_columns(data_rows)
logger.debug(f"Columns breaks: {columns_breaks}")
self.headers: list[Header] = []
self._create_columns(columns_breaks)
def _get_spaces(self) -> list[int]:
with open("Inputs\CUST_ISSUE") as c:
input: str = c.read()
with open("config.json") as configFile:
config: dict = load(configFile)
columns: list[str] = config["COLS"]
# Regex to find spaces and returnt he middle with 'space' group
SPACE_REGEX: Pattern = re.compile(r"[^\s]\s[^\s]")
space_matches: list[Match] = re.finditer(SPACE_REGEX, self.header_row)
# Get the int position of the space
space_locations: list[int] = [s.start()+1 for s in space_matches]
logger.debug(f"Space Locations: {space_locations}")
# Remove any spaces that are adjacent, keeping that larger one
space_locations: list[int] = remove_lower_adjacent(space_locations)
return space_locations
def _validate_columns(self, data_lines: DataRows) -> list[int]:
logger.debug(f"Validating columns for data_lines: {data_lines}")
# Get a list of potential column breaks
column_breaks: list[int] = self._get_spaces()
row: str
for row in data_lines:
# Check each of the column positions for values
cb: int # Column Break
for cb in column_breaks:
# If the row is not long enough, the value is blank
if len(row) <= cb:
continue
# If the value is not blank or a space, the this is not a
# column delimatator
elif row[cb] != ' ':
logger.debug(f"Remove CB {cb} | '{row[cb]}' -> {row}")
# Remove column breaks that are not actually empty
column_breaks.remove(cb)
return column_breaks
def _create_columns(self, column_breaks: list[int]) -> list[Header]:
logger.debug(f"Creating columns with column_breaks: {column_breaks}")
# Get the column/data names and their position spans
col_start: int = -1
# Add the end of the line so that we can capture the last column
column_breaks.append(len(self.header_row))
header_names: list[str] = []
# Create a header for each column break
cb: int
for cb in column_breaks:
# Don't try to make a header if there are not enough
# characters in the line
if col_start >= len(self.header_row):
break
header: Header = Header(
header_row= self.header_row,
row_start_pos= col_start,
row_end_pos= cb
)
# Handle duplicate columns
if header.name in header_names:
logger.debug(f"Found Matching header name: {header.name}")
header.name = header.name + f"_{header_names.count(header.name)+1}"
header_names.append(header.name)
self.headers.append(header)
col_start = cb
logger.debug(f"Created headers: {self.headers}")
if len(self.headers) < 1:
raise InvalidReport(f"No headers found in report! Header Row: {self.header_row} | CBs: {column_breaks}")
class InfoTreieveReport:
def __init__(self, raw_report: RawReport, bad_col_list: list[str]) -> None:
"""
Args:
raw_report (str): an unprocessed infotreive report
replace_bad_cols(input.splitlines()[1], columns)
Raises:
InvalidReportError: Program failed to find the header or end row
"""
self.raw_report: RawReport = raw_report
# Find the row after the last data row
# also has info about expected data rows
end_row_index, self.num_records = self._find_end_row(raw_report)
# Find the header row
header_row_index: int = self._find_header_row(raw_report)
# Split the report by lines
self.full_report_lines: list[str] = raw_report.splitlines()
# Get a list of the rows with actual data
self.raw_data: DataRows = self.full_report_lines[
header_row_index + 2 : end_row_index ]
# Find the columns for each row
self.header_row: HeaderRow = HeaderRow(
header_row= self.full_report_lines[header_row_index],
data_rows= self.raw_data,
bad_col_list= bad_col_list
)
@staticmethod
def _find_end_row(text: RawReport) -> tuple[int, int]:
"""
Finds the row below the last line of data using regex.
Returns:
- row index of end row (int)
- number_of_records in report (int)
## Exception: InvalidReport
Raises an 'InvalidReport' exception if no end row is found.
"""
logger.debug(f"Finding end row in text: {text}")
END_ROW_REGEX = re.compile("^(?P<n_records>\d+) records listed$")
lines_from_bottom: list[str] = text.splitlines()
lines_from_bottom.reverse()
index: int
line: str
for index, line in enumerate(lines_from_bottom):
row_regex: Match|None = re.search(END_ROW_REGEX, line)
if row_regex:
number_records: int = int(row_regex.group("n_records"))
logger.debug(f"End row found at index {len(lines_from_bottom)-index-1} with {number_records} records")
return len(lines_from_bottom)-index-1, number_records
raise InvalidReport(f"No end row found! Search regex: {END_ROW_REGEX}")
@staticmethod
def _find_header_row(text: RawReport) -> int:
header_row = None
greatest_filed_space: int = 0
# Find the row with the least blank space
index: int
row: str
for index, row in enumerate(text.splitlines()):
# Spaces do not count
row_size: int = len(row.replace(' ', ''))
if row_size > greatest_filed_space:
greatest_filed_space = row_size
header_row = index
logger.debug(f"Header row found at index {header_row}")
return header_row
def process(self) -> DataFrame:
"""
Raises:
KeyError: Header key not found in header dict
ValueError: Some headers did not return as many values
"""
self.report_data: HeaderDict = {}
# Get get the data from each data row
data_row: DataRow
for data_row in self.raw_data:
header: Header
for header in self.header_row.headers:
column, value = header.extract_from_row(data_row)
try:
self.report_data[column].append(value)
except KeyError:
self.report_data[column] = [value]
try:
logger.debug(f"Processed data: {self.report_data}")
processed_data: DataFrame = DataFrame(self.report_data)
except ValueError as ve:
#TODO log this
len_dict: dict = {
col: len(cl) for
col, cl in self.report_data.items()
}
logger.exception(f"Lengths:\n{len_dict}")
raise ve
return processed_data
class FlippedReport:
def __init__(self, raw_report: RawReport, bad_cols: list[str]) -> None:
self.rr: RawReport = raw_report
self.report_lines: list[str] = raw_report.splitlines()
self.divider_column = self.find_common_first_space(self.report_lines)
@staticmethod
def find_common_first_space(lines: list[str]) -> int:
min_space_index = None
for line in lines:
space_indices = [index for index, char in enumerate(line)
if char == ' ' and index != 0]
# If there's no space in the line, we cannot find a common space index
if not space_indices:
return -1
current_line_min_space_index = min(space_indices)
if min_space_index is None or current_line_min_space_index > min_space_index:
min_space_index = current_line_min_space_index
return min_space_index
test_replace_bad_cols()
def process(self) -> DataFrame:
report_data = {}
headers_seen = []
for line in self.report_lines:
# Restart the headers
if line == '':
headers_seen = []
if len(line) < self.divider_column:
continue
header = line[0:self.divider_column].strip()
if header in headers_seen:
header = header + f"_{headers_seen.count(header)}+1"
try:
value = line[self.divider_column:].strip()
except IndexError:
value = None
try:
report_data[header].append(value)
except KeyError:
report_data[header] = [value]
return DataFrame(report_data)

@ -1 +1 @@
{"loggingLevel": "ERROR", "directories": {"ASSET": "", "CUST": "", "DOB": "", "FIN": "", "output": ""}, "COLS": ["CUST ID", "CONTRACT NO", "BUSINESS TYPE", "FED ID", "CUST CREDIT ACCT", "CUSTOMER", "LEASE TYPE", "EQUIPMENT COST", "CBR", "NET INVESTMENT", "ANNUAL COMBINED IRR", "CONTRACT TERM", "INCOME START DATE", "FIRST PYMT DATE", "FIRST PYMT AMT", "CONTRACT PYMT", "INVOICE CODE", "INV DAYS", "INV DUE DAY", "SEC DEPOSIT", "IDC AMOUNTS", "IDC DATES", "RESIDUAL", "MANAGERS RESIDUAL", "PROMOTION", "PRODUCT LINE", "REGION", "REGION DESC", "BRANCH", "BUSINESS SEGMENT", "LEAD BANK", "MRKTNG REP", "MRKTNG REGION", "REMIT TO", "PYMT OPTION", "BANK CODE", "TAPE BANK NUM", "TAPE ACCOUNT NUM", "TAPE ACCT TYPE", "DEALER", "PRIVATE LABEL", "RESID METHOD", "LATE CHRG EXMPT", "INSURANCE CODE", "VARIABLE DATE", "VARIABLE RATE", "BILLING CYCLE", "UM USER DATE\\d?", "CR ATTG PHONE", "GROSS CONTRACT", "ADV ", "PD AMT FINANCED", "PD INCOME START DATE", "INVOICE DESC", "VARIABLE PYMT CODE", "PD PAYMENT AMT", "QUOTE BUYOUT", "LATE CHARGE CODE", "LATE CHRG RATE", "M DEF COLLECTOR", "AM ACH LEAD DAYS", "UNL POOL", "PD RISK DATE", "PD RISK", "LGD RISK", "LGD DATE", "Service By Others", "CONTRACT NO", "CUST CREDIT ACCT", "CUST ID", "CUST NAME", "UATB CUST DBA", "UATB CUST ADDRESS\\d \\d{2}", "CUST CITY", "CUST STATE", "CUST ZIP", "GUAR CODE \\d", "PRIN\\d?/GUAR NAME \\d", "PRIN\\d? ADDR?\\d", "PRIN\\d? CITY\\d", "PRIN\\d? ST \\d", "ZIP \\d", "FED ID/SS#\\d", "BILLING NAME", "UATB AR ADDRESS\\d \\d{2}", "AR CITY", "AR STATE", "AR ZIP", "AR ATTN", "UATB CR ATTG NAME\\d{2}", "CR SCORING", "FACILITY SCORE", "SIC CODE", "ASSET #", "EQUIP DESC", "QUANTITY", "NEW USED", "MODEL", "A MANUFACTURER YEAR", "SERIAL NUMBER", "EQUIP CODE", "EQUIP CODE DESC", "ASSET VENDOR", "ASSET VENDOR NAME", "MANUFACTURER", "MANUFACT NAME", "UATB EQUIP ADDR\\d \\d{2}", "EQUIP CITY", "EQUIP STATE", "EQUIP ZIP", "STATE TAX CODE", "CNTY TAX CODE", "CITY TAX CODE", "PROP STATUS", "EQUIP COST", "EQUIP COST PCT", "PUR OPTION", "PUR OPTION", "AS RECOURSE CODE", "RESID AMT", "BEG DEPR DATE", "OPER LS BEGIN DATE", "OPER LS LIM", "OPER LS SALVAGE", "PRIN/GUAR NAME \\d", "DOB\\d", "GUAR CODE \\d", "PRIN/GUAR NAME \\d"]}
{"loggingLevel": "ERROR", "directories": {"ASSET": "", "CUST": "", "DOB": "", "FIN": "", "output": ""}, "COLS": ["CUST ID", "CONTRACT NO", "BUSINESS TYPE", "FED ID", "CUST CREDIT ACCT", "CUSTOMER", "LEASE TYPE", "EQUIPMENT COST", "CBR", "NET INVESTMENT", "ANNUAL COMBINED IRR", "CONTRACT TERM", "INCOME START DATE", "FIRST PYMT DATE", "FIRST PYMT AMT", "CONTRACT PYMT", "INVOICE CODE", "INV DAYS", "INV DUE DAY", "SEC DEPOSIT", "IDC AMOUNTS", "IDC DATES", "RESIDUAL", "MANAGERS RESIDUAL", "PROMOTION", "PRODUCT LINE", "REGION", "REGION DESC", "BRANCH", "BUSINESS SEGMENT", "LEAD BANK", "MRKTNG REP", "MRKTNG REGION", "REMIT TO", "PYMT OPTION", "BANK CODE", "TAPE BANK NUM", "TAPE ACCOUNT NUM", "TAPE ACCT TYPE", "DEALER", "PRIVATE LABEL", "RESID METHOD", "LATE CHRG EXMPT", "INSURANCE CODE", "VARIABLE DATE", "VARIABLE RATE", "BILLING CYCLE", "UM USER DATE\\d?", "CR ATTG PHONE", "GROSS CONTRACT", "ADV ", "PD AMT FINANCED", "PD INCOME START DATE", "INVOICE DESC", "VARIABLE PYMT CODE", "PD PAYMENT AMT", "QUOTE BUYOUT", "LATE CHARGE CODE", "LATE CHRG RATE", "M DEF COLLECTOR", "AM ACH LEAD DAYS", "UNL POOL", "PD RISK DATE", "PD RISK", "LGD RISK", "LGD DATE", "Service By Others", "CONTRACT NO", "CUST CREDIT ACCT", "CUST ID", "CUST NAME", "UATB CUST DBA", "UATB CUST ADDRESS\\d \\d{2}", "CUST CITY", "CUST STATE", "CUST ZIP", "GUAR CODE \\d", "PRIN\\d?/GUAR NAME \\d", "PRIN\\d? ADDR?\\d", "PRIN\\d? CITY\\d", "PRIN\\d? ST \\d", "ZIP \\d", "FED ID/SS#\\d", "BILLING NAME", "UATB AR ADDRESS\\d \\d{2}", "AR CITY", "AR STATE", "AR ZIP", "AR ATTN", "UATB CR ATTG NAME\\d{2}", "CR SCORING", "FACILITY SCORE", "SIC CODE", "ASSET #", "EQUIP DESC", "QUANTITY", "NEW USED", "MODEL", "A MANUFACTURER YEAR", "SERIAL NUMBER", "EQUIP CODE", "EQUIP CODE DESC", "ASSET VENDOR", "ASSET VENDOR NAME", "MANUFACTURER", "MANUFACT NAME", "UATB EQUIP ADDR\\d \\d{2}", "EQUIP CITY", "EQUIP STATE", "EQUIP ZIP", "STATE TAX CODE", "CNTY TAX CODE", "CITY TAX CODE", "PROP STATUS", "EQUIP COST", "EQUIP COST PCT", "PUR OPTION", "PUR OPTION", "AS RECOURSE CODE", "RESID AMT", "BEG DEPR DATE", "OPER LS BEGIN DATE", "OPER LS LIM", "OPER LS SALVAGE", "PRIN/GUAR NAME \\d", "DOB\\d", "GUAR CODE \\d", "PRIN/GUAR NAME \\d", "PD PURCH ID", "UTAB PURCH DESC", "UATB CD CNTC PHONE"]}

@ -3,4 +3,15 @@
[ ] Remove Indiviudal selection
[ ] Allow drag & drop (of multiselection)
[ ] Notification on completion
[ ] Icons
[ ] Icons
[ ] Recognize portfolios sections:
For example the following repeats in Customer
GUAR.CODE.1
PRIN/GUAR NAME 1
PRIN ADD1
PRIN ADD2
PRIN CITY1
PRIN.ST.1
ZIP.1
FED ID/SS#1

Loading…
Cancel
Save