|
|
|
|
@ -3,13 +3,36 @@ import re |
|
|
|
|
from re import Match, Pattern |
|
|
|
|
from logging import getLogger, basicConfig |
|
|
|
|
from json import load, dump |
|
|
|
|
from typing import TypeAlias, TypeVar |
|
|
|
|
import pathlib as pl |
|
|
|
|
|
|
|
|
|
# The raw text of an infotreieve report |
|
|
|
|
RawReport: TypeAlias = str |
|
|
|
|
# The raw report broken into lines |
|
|
|
|
ReportLines: TypeAlias = list[str] |
|
|
|
|
# Row with the column name data |
|
|
|
|
HeaderRowStr: TypeAlias = str |
|
|
|
|
# Row with actual data |
|
|
|
|
DataRow : TypeAlias = str |
|
|
|
|
# A list of the rows of data |
|
|
|
|
DataRows: TypeAlias = list[DataRow] |
|
|
|
|
# |
|
|
|
|
HeaderDict: TypeAlias = dict[str:list[str]] |
|
|
|
|
|
|
|
|
|
#[ ] Add logging to the report processor |
|
|
|
|
|
|
|
|
|
logger = getLogger(__name__) |
|
|
|
|
logger.setLevel("DEBUG") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
COLUMN_NAME_REGEX = re.compile(r"(?P<column_name>(\w|\.|#|\/)+)", re.IGNORECASE) |
|
|
|
|
def remove_lower_adjacent(nums: list[int]) -> list[int]: |
|
|
|
|
|
|
|
|
|
filtered = [nums[0]] |
|
|
|
|
for i in range(1, len(nums)): |
|
|
|
|
if nums[i] - nums[i - 1] > 1: |
|
|
|
|
filtered.append(nums[i]) |
|
|
|
|
|
|
|
|
|
return filtered |
|
|
|
|
|
|
|
|
|
def replace_bad_cols(line: str, cols: list[str]) -> str: |
|
|
|
|
""" |
|
|
|
|
@ -44,62 +67,292 @@ def replace_bad_cols(line: str, cols: list[str]) -> str: |
|
|
|
|
return line |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_data(input_doc: str, column_list: list[str]) -> DataFrame|None: |
|
|
|
|
class InvalidReport(Exception): |
|
|
|
|
pass |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class Header: |
|
|
|
|
|
|
|
|
|
def __init__(self, header_row: HeaderRowStr, row_start_pos: int, row_end_pos: int) -> None: |
|
|
|
|
|
|
|
|
|
row_start_pos += 1 |
|
|
|
|
self.name: str = header_row[row_start_pos:row_end_pos].strip() |
|
|
|
|
self.start: int = row_start_pos |
|
|
|
|
self.end: int = row_end_pos |
|
|
|
|
|
|
|
|
|
def __str__(self) -> str: |
|
|
|
|
return f"( Header Name: '{self.name}' -> {self.start}:{self.end} )" |
|
|
|
|
|
|
|
|
|
def __repr__(self) -> str: |
|
|
|
|
return self.__str__() |
|
|
|
|
|
|
|
|
|
def extract_from_row(self, data_row: DataRow) -> tuple[str, str]: |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
value: str = data_row[self.start : self.end] |
|
|
|
|
except IndexError: |
|
|
|
|
value = None |
|
|
|
|
|
|
|
|
|
value: str = value.strip() |
|
|
|
|
|
|
|
|
|
if value == '': |
|
|
|
|
value = None |
|
|
|
|
|
|
|
|
|
return self.name, value |
|
|
|
|
|
|
|
|
|
class HeaderRow: |
|
|
|
|
|
|
|
|
|
def __init__(self, header_row: HeaderRowStr, data_rows: DataRows, bad_col_list: list[str]) -> None: |
|
|
|
|
logger.debug(f"Initializing HeaderRow with header_row: {header_row}, data_rows: {data_rows}") |
|
|
|
|
self.header_row = replace_bad_cols(header_row, bad_col_list) |
|
|
|
|
columns_breaks: list[int] = self._validate_columns(data_rows) |
|
|
|
|
logger.debug(f"Columns breaks: {columns_breaks}") |
|
|
|
|
self.headers: list[Header] = [] |
|
|
|
|
self._create_columns(columns_breaks) |
|
|
|
|
|
|
|
|
|
def _get_spaces(self) -> list[int]: |
|
|
|
|
|
|
|
|
|
# Regex to find spaces and returnt he middle with 'space' group |
|
|
|
|
SPACE_REGEX: Pattern = re.compile(r"[^\s]\s[^\s]") |
|
|
|
|
space_matches: list[Match] = re.finditer(SPACE_REGEX, self.header_row) |
|
|
|
|
# Get the int position of the space |
|
|
|
|
space_locations: list[int] = [s.start()+1 for s in space_matches] |
|
|
|
|
logger.debug(f"Space Locations: {space_locations}") |
|
|
|
|
# Remove any spaces that are adjacent, keeping that larger one |
|
|
|
|
space_locations: list[int] = remove_lower_adjacent(space_locations) |
|
|
|
|
return space_locations |
|
|
|
|
|
|
|
|
|
def _validate_columns(self, data_lines: DataRows) -> list[int]: |
|
|
|
|
logger.debug(f"Validating columns for data_lines: {data_lines}") |
|
|
|
|
# Get a list of potential column breaks |
|
|
|
|
column_breaks: list[int] = self._get_spaces() |
|
|
|
|
|
|
|
|
|
row: str |
|
|
|
|
for row in data_lines: |
|
|
|
|
# Check each of the column positions for values |
|
|
|
|
cb: int # Column Break |
|
|
|
|
for cb in column_breaks: |
|
|
|
|
# If the row is not long enough, the value is blank |
|
|
|
|
if len(row) <= cb: |
|
|
|
|
continue |
|
|
|
|
# If the value is not blank or a space, the this is not a |
|
|
|
|
# column delimatator |
|
|
|
|
elif row[cb] != ' ': |
|
|
|
|
logger.debug(f"Remove CB {cb} | '{row[cb]}' -> {row}") |
|
|
|
|
# Remove column breaks that are not actually empty |
|
|
|
|
column_breaks.remove(cb) |
|
|
|
|
return column_breaks |
|
|
|
|
|
|
|
|
|
def _create_columns(self, column_breaks: list[int]) -> list[Header]: |
|
|
|
|
logger.debug(f"Creating columns with column_breaks: {column_breaks}") |
|
|
|
|
# Get the column/data names and their position spans |
|
|
|
|
col_start: int = -1 |
|
|
|
|
# Add the end of the line so that we can capture the last column |
|
|
|
|
column_breaks.append(len(self.header_row)) |
|
|
|
|
|
|
|
|
|
header_names: list[str] = [] |
|
|
|
|
# Create a header for each column break |
|
|
|
|
cb: int |
|
|
|
|
for cb in column_breaks: |
|
|
|
|
|
|
|
|
|
# Don't try to make a header if there are not enough |
|
|
|
|
# characters in the line |
|
|
|
|
if col_start >= len(self.header_row): |
|
|
|
|
break |
|
|
|
|
|
|
|
|
|
header: Header = Header( |
|
|
|
|
header_row= self.header_row, |
|
|
|
|
row_start_pos= col_start, |
|
|
|
|
row_end_pos= cb |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
# Handle duplicate columns |
|
|
|
|
if header.name in header_names: |
|
|
|
|
logger.debug(f"Found Matching header name: {header.name}") |
|
|
|
|
header.name = header.name + f"_{header_names.count(header.name)+1}" |
|
|
|
|
|
|
|
|
|
header_names.append(header.name) |
|
|
|
|
self.headers.append(header) |
|
|
|
|
col_start = cb |
|
|
|
|
|
|
|
|
|
logger.debug(f"Created headers: {self.headers}") |
|
|
|
|
if len(self.headers) < 1: |
|
|
|
|
raise InvalidReport(f"No headers found in report! Header Row: {self.header_row} | CBs: {column_breaks}") |
|
|
|
|
|
|
|
|
|
class InfoTreieveReport: |
|
|
|
|
|
|
|
|
|
def __init__(self, raw_report: RawReport, bad_col_list: list[str]) -> None: |
|
|
|
|
""" |
|
|
|
|
Extracts data from a string in a table-like format, where columns are identified by a list of column names, and |
|
|
|
|
returns the data as a Pandas DataFrame. |
|
|
|
|
|
|
|
|
|
Args: |
|
|
|
|
input_doc (str): The string containing the table-like data to extract. |
|
|
|
|
column_list (list[str]): A list of column names to identify the columns in the table-like data. |
|
|
|
|
raw_report (str): an unprocessed infotreive report |
|
|
|
|
|
|
|
|
|
Raises: |
|
|
|
|
InvalidReportError: Program failed to find the header or end row |
|
|
|
|
""" |
|
|
|
|
self.raw_report: RawReport = raw_report |
|
|
|
|
|
|
|
|
|
# Find the row after the last data row |
|
|
|
|
# also has info about expected data rows |
|
|
|
|
end_row_index, self.num_records = self._find_end_row(raw_report) |
|
|
|
|
# Find the header row |
|
|
|
|
header_row_index: int = self._find_header_row(raw_report) |
|
|
|
|
|
|
|
|
|
# Split the report by lines |
|
|
|
|
self.full_report_lines: list[str] = raw_report.splitlines() |
|
|
|
|
|
|
|
|
|
# Get a list of the rows with actual data |
|
|
|
|
self.raw_data: DataRows = self.full_report_lines[ |
|
|
|
|
header_row_index + 2 : end_row_index ] |
|
|
|
|
|
|
|
|
|
# Find the columns for each row |
|
|
|
|
self.header_row: HeaderRow = HeaderRow( |
|
|
|
|
header_row= self.full_report_lines[header_row_index], |
|
|
|
|
data_rows= self.raw_data, |
|
|
|
|
bad_col_list= bad_col_list |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _find_end_row(text: RawReport) -> tuple[int, int]: |
|
|
|
|
""" |
|
|
|
|
Finds the row below the last line of data using regex. |
|
|
|
|
|
|
|
|
|
Returns: |
|
|
|
|
pandas.DataFrame: A DataFrame containing the extracted data from the input string. |
|
|
|
|
- row index of end row (int) |
|
|
|
|
- number_of_records in report (int) |
|
|
|
|
|
|
|
|
|
## Exception: InvalidReport |
|
|
|
|
Raises an 'InvalidReport' exception if no end row is found. |
|
|
|
|
""" |
|
|
|
|
logger.debug(f"Finding end row in text: {text}") |
|
|
|
|
END_ROW_REGEX = re.compile("^(?P<n_records>\d+) records listed$") |
|
|
|
|
|
|
|
|
|
lines_from_bottom: list[str] = text.splitlines() |
|
|
|
|
lines_from_bottom.reverse() |
|
|
|
|
|
|
|
|
|
index: int |
|
|
|
|
line: str |
|
|
|
|
columns = {} |
|
|
|
|
data = {} |
|
|
|
|
for line in input_doc.splitlines(): |
|
|
|
|
if len(columns) == 0 : |
|
|
|
|
logger.debug(f"Columns = 0: {line}") |
|
|
|
|
# Find the line that contains the column names and replace bad column names |
|
|
|
|
if re.search("^\w", line): |
|
|
|
|
logger.debug("Found word on first line.") |
|
|
|
|
line = replace_bad_cols(line, column_list) |
|
|
|
|
logger.debug(f"Column replacements made: {line}") |
|
|
|
|
# Find the start and end positions of each column name and store them in a dictionary |
|
|
|
|
columns_names = re.finditer(COLUMN_NAME_REGEX, line) |
|
|
|
|
logger.debug(f"Found column names: {columns_names}") |
|
|
|
|
for c in columns_names: |
|
|
|
|
columns[c.group("column_name")] = {"start": c.start(), "end": c.end()} |
|
|
|
|
logger.debug(f"Column section: {columns[c.group('column_name')]}") |
|
|
|
|
data[c.group("column_name")] = [] |
|
|
|
|
continue |
|
|
|
|
elif len(line) < 2: |
|
|
|
|
logger.debug(f"Line len less than 2.") |
|
|
|
|
for index, line in enumerate(lines_from_bottom): |
|
|
|
|
row_regex: Match|None = re.search(END_ROW_REGEX, line) |
|
|
|
|
if row_regex: |
|
|
|
|
number_records: int = int(row_regex.group("n_records")) |
|
|
|
|
logger.debug(f"End row found at index {len(lines_from_bottom)-index-1} with {number_records} records") |
|
|
|
|
return len(lines_from_bottom)-index-1, number_records |
|
|
|
|
|
|
|
|
|
raise InvalidReport(f"No end row found! Search regex: {END_ROW_REGEX}") |
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def _find_header_row(text: RawReport) -> int: |
|
|
|
|
|
|
|
|
|
header_row = None |
|
|
|
|
greatest_filed_space: int = 0 |
|
|
|
|
# Find the row with the least blank space |
|
|
|
|
index: int |
|
|
|
|
row: str |
|
|
|
|
for index, row in enumerate(text.splitlines()): |
|
|
|
|
# Spaces do not count |
|
|
|
|
row_size: int = len(row.replace(' ', '')) |
|
|
|
|
if row_size > greatest_filed_space: |
|
|
|
|
greatest_filed_space = row_size |
|
|
|
|
header_row = index |
|
|
|
|
logger.debug(f"Header row found at index {header_row}") |
|
|
|
|
return header_row |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def process(self) -> DataFrame: |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
Raises: |
|
|
|
|
KeyError: Header key not found in header dict |
|
|
|
|
ValueError: Some headers did not return as many values |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
self.report_data: HeaderDict = {} |
|
|
|
|
|
|
|
|
|
# Get get the data from each data row |
|
|
|
|
data_row: DataRow |
|
|
|
|
for data_row in self.raw_data: |
|
|
|
|
header: Header |
|
|
|
|
for header in self.header_row.headers: |
|
|
|
|
column, value = header.extract_from_row(data_row) |
|
|
|
|
try: |
|
|
|
|
self.report_data[column].append(value) |
|
|
|
|
except KeyError: |
|
|
|
|
self.report_data[column] = [value] |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
logger.debug(f"Processed data: {self.report_data}") |
|
|
|
|
processed_data: DataFrame = DataFrame(self.report_data) |
|
|
|
|
except ValueError as ve: |
|
|
|
|
#TODO log this |
|
|
|
|
len_dict: dict = { |
|
|
|
|
col: len(cl) for |
|
|
|
|
col, cl in self.report_data.items() |
|
|
|
|
} |
|
|
|
|
logger.exception(f"Lengths:\n{len_dict}") |
|
|
|
|
raise ve |
|
|
|
|
|
|
|
|
|
return processed_data |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FlippedReport: |
|
|
|
|
|
|
|
|
|
def __init__(self, raw_report: RawReport, bad_cols: list[str]) -> None: |
|
|
|
|
self.rr: RawReport = raw_report |
|
|
|
|
|
|
|
|
|
self.report_lines: list[str] = raw_report.splitlines() |
|
|
|
|
self.divider_column = self.find_common_first_space(self.report_lines) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@staticmethod |
|
|
|
|
def find_common_first_space(lines: list[str]) -> int: |
|
|
|
|
min_space_index = None |
|
|
|
|
|
|
|
|
|
for line in lines: |
|
|
|
|
space_indices = [index for index, char in enumerate(line) |
|
|
|
|
if char == ' ' and index != 0] |
|
|
|
|
|
|
|
|
|
# If there's no space in the line, we cannot find a common space index |
|
|
|
|
if not space_indices: |
|
|
|
|
return -1 |
|
|
|
|
|
|
|
|
|
current_line_min_space_index = min(space_indices) |
|
|
|
|
if min_space_index is None or current_line_min_space_index > min_space_index: |
|
|
|
|
min_space_index = current_line_min_space_index |
|
|
|
|
|
|
|
|
|
return min_space_index |
|
|
|
|
|
|
|
|
|
def process(self) -> DataFrame: |
|
|
|
|
|
|
|
|
|
report_data = {} |
|
|
|
|
|
|
|
|
|
headers_seen = [] |
|
|
|
|
|
|
|
|
|
for line in self.report_lines: |
|
|
|
|
|
|
|
|
|
# Restart the headers |
|
|
|
|
if line == '': |
|
|
|
|
headers_seen = [] |
|
|
|
|
|
|
|
|
|
if len(line) < self.divider_column: |
|
|
|
|
continue |
|
|
|
|
# Check if we've reached the end of the table and return the data |
|
|
|
|
if re.search("\d+ records listed", line): |
|
|
|
|
logger.debug(f"End of document: {line}") |
|
|
|
|
logger.debug(f"Extracted data: {data}") |
|
|
|
|
return DataFrame(data) |
|
|
|
|
# Extract the data from each column based on the start and end positions |
|
|
|
|
for key, span in columns.items(): |
|
|
|
|
data[key].append(line[span["start"]:span["end"]].strip()) |
|
|
|
|
header = line[0:self.divider_column].strip() |
|
|
|
|
if header in headers_seen: |
|
|
|
|
header = header + f"_{headers_seen.count(header)}+1" |
|
|
|
|
try: |
|
|
|
|
value = line[self.divider_column:].strip() |
|
|
|
|
except IndexError: |
|
|
|
|
value = None |
|
|
|
|
|
|
|
|
|
try: |
|
|
|
|
report_data[header].append(value) |
|
|
|
|
except KeyError: |
|
|
|
|
report_data[header] = [value] |
|
|
|
|
|
|
|
|
|
return DataFrame(report_data) |
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
|
|
|
|
basicConfig(filename='ILParser.log', encoding='utf-8', |
|
|
|
|
level="DEBUG", filemode='w', force=True) |
|
|
|
|
|
|
|
|
|
def test_replace_bad_cols(): |
|
|
|
|
|
|
|
|
|
with open("Inputs\CUST_ISSUE") as c: |
|
|
|
|
input: str = c.read() |
|
|
|
|
with open("config.json") as configFile: |
|
|
|
|
config: dict = load(configFile) |
|
|
|
|
columns: list[str] = config["COLS"] |
|
|
|
|
|
|
|
|
|
replace_bad_cols(input.splitlines()[1], columns) |
|
|
|
|
|
|
|
|
|
test_replace_bad_cols() |