from pandas import DataFrame import re from re import Match, Pattern from logging import getLogger, basicConfig from json import load, dump logger = getLogger(__name__) logger.setLevel("DEBUG") COLUMN_NAME_REGEX = re.compile(r"(?P(\w|\.|#|\/)+)", re.IGNORECASE) def replace_bad_cols(line: str, cols: list[str]) -> str: """ Replaces bad column names in a string with modified names that have spaces replaced with dots. Args: line (str): The string containing the column names to modify. cols (list[str]): A list of column names to modify. Returns: str: The modified string with bad column names replaced. """ logger.debug(f"Line: {line} | Cols: {cols}") for c in cols: # Create a regex for the col col_regex: Pattern = re.compile(c.replace(' ', r'(?:\s|\.)')) logger.debug(f"Col_regex: {col_regex}") # Get all columns that match that pattern col_matches: list[str|tuple[str]] = re.findall(col_regex, line) logger.debug(f"Col_matches: {col_matches}") # Match the substition for all matches if any col_name: str for col_name in col_matches: logger.debug(f"col_name: {col_name}") # Replace the bad column name with the modified column name in the string # Adding the '.' instead of a space helps the parser tell what the continous # column are line = line.replace(col_name, col_name.replace(' ', '.')) return line def extract_data(input_doc: str, column_list: list[str]) -> DataFrame|None: """ Extracts data from a string in a table-like format, where columns are identified by a list of column names, and returns the data as a Pandas DataFrame. Args: input_doc (str): The string containing the table-like data to extract. column_list (list[str]): A list of column names to identify the columns in the table-like data. Returns: pandas.DataFrame: A DataFrame containing the extracted data from the input string. """ line: str columns = {} data = {} for line in input_doc.splitlines(): if len(columns) == 0 : logger.debug(f"Columns = 0: {line}") # Find the line that contains the column names and replace bad column names if re.search("^\w", line): logger.debug("Found word on first line.") line = replace_bad_cols(line, column_list) logger.debug(f"Column replacements made: {line}") # Find the start and end positions of each column name and store them in a dictionary columns_names = re.finditer(COLUMN_NAME_REGEX, line) logger.debug(f"Found column names: {columns_names}") for c in columns_names: columns[c.group("column_name")] = {"start": c.start(), "end": c.end()} logger.debug(f"Column section: {columns[c.group('column_name')]}") data[c.group("column_name")] = [] continue elif len(line) < 2: logger.debug(f"Line len less than 2.") continue # Check if we've reached the end of the table and return the data if re.search("\d+ records listed", line): logger.debug(f"End of document: {line}") logger.debug(f"Extracted data: {data}") return DataFrame(data) # Extract the data from each column based on the start and end positions for key, span in columns.items(): data[key].append(line[span["start"]:span["end"]].strip()) if __name__ == "__main__": basicConfig(filename='ILParser.log', encoding='utf-8', level="DEBUG", filemode='w', force=True) def test_replace_bad_cols(): with open("Inputs\CUST_ISSUE") as c: input: str = c.read() with open("config.json") as configFile: config: dict = load(configFile) columns: list[str] = config["COLS"] replace_bad_cols(input.splitlines()[1], columns) test_replace_bad_cols()