from pandas import DataFrame, merge, to_datetime, NaT, concat, ExcelWriter from openpyxl import Workbook, load_workbook from abc import ABC from logging import getLogger import re import datetime from copy import deepcopy from dataclasses import dataclass from helpers import CN_REGEX, drop_unnamed from memory import get_prev_reconciled, hash_cols, col_hash, create_identifier from pathlib import Path logger = getLogger(__name__) @dataclass class ReconciledReports: no_match: DataFrame amt_mismatch: DataFrame prev_rec: DataFrame gp_filtered: DataFrame ob_overdue: DataFrame def save_reports(self, output_path: Path): with ExcelWriter(output_path, mode='w') as writer: self.no_match.drop_duplicates(inplace=True) self.no_match.to_excel(writer, sheet_name="No Match", index=False, freeze_panes=(1,3) ) self.amt_mismatch.drop_duplicates(inplace=True) self.amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch", index=False, freeze_panes=(1,3) ) self.ob_overdue.to_excel(writer, sheet_name="Overdue", index=False ) self.prev_rec.to_excel(writer, sheet_name="Previously Reconciled", index=False, freeze_panes=(1,3) ) self.gp_filtered.to_excel(writer, sheet_name="Filtered from GP", index=False, freeze_panes=(1,0) ) wb: Workbook = load_workbook(output_path) for sheet in ["No Match", "Amount Mismatch"]: ws = wb[sheet] ws.column_dimensions['A'].hidden = True ws.column_dimensions['B'].hidden = True for sheet in ["Filtered from GP", "Previously Reconciled"]: wb[sheet].sheet_state = "hidden" wb.save(output_path) wb.close() class HoldReport(ABC): source = "" def __init__(self, dataframe: DataFrame, reports_config: dict) -> None: self.config = reports_config drop_unnamed(dataframe) self.df = dataframe self.df = self._add_work_columns(self.df) self._normalize() def _normalize(self): # Rename the columns to standardize the column names self.df.rename( columns= { unique_cols[self.source] : common_col for common_col, unique_cols in self.config["shared_columns"].items() }, inplace=True) # Convert the on-hold amount column to float format and round to two decimal places self.df["onhold_amount"] = self.df["onhold_amount"].astype(float).round(2) # Use regex to extract the contract number from the column values and create a new column with the standardized format self.df["contract_number"] = self.df["contract_number"].apply( lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) else re.search(CN_REGEX, str(cn)).group(0) ) # Create a new column with a unique transaction ID self.df["ID"] = self.df["contract_number"] +'_'+\ self.df["onhold_amount"].astype(str) # Create a new column with the data source self.df["Source"] = self.source @staticmethod def _remove_prev_recs(contract_match, no_match) -> \ tuple[DataFrame, DataFrame, DataFrame]: """ """ idents: list[col_hash] = create_identifier(contract_match)["Indentifier"].to_list() idents.extend(create_identifier(no_match)["Indentifier"].to_list()) logger.debug(f"{idents=}") # Get previsouly reced prev_recs: DataFrame|None = get_prev_reconciled(idents) if prev_recs is None: logger.info("No previously reconciled!") return DataFrame(), contract_match, no_match dfs = [] for df in [contract_match, no_match]: start_size = df.shape[0] logger.debug(f"Report DF: \n{df}") logger.debug(f"prev_rec: \n{prev_recs}") df = merge( df, prev_recs, how="left", on= "Indentifier", suffixes=("_cur", "_prev") ) df = HoldReport._created_combined_col("HideNextMonth", df, ["prev", "cur"]) df = HoldReport._created_combined_col("Resolution", df, ["prev", "cur"]) df["ID_OB"] = df["ID_OB_cur"] df["ID_GP"] = df["ID_GP_cur"] # Drop anything that should be ignored df = df[df["HideNextMonth"] != True] logger.info(f"Prev res added:\n{df}") col_to_drop = [] for c in df.keys().to_list(): if "_prev" in c in c or "_cur" in c: col_to_drop.append(c) logger.debug(f"{col_to_drop=}") df.drop( columns= col_to_drop, inplace=True ) # Restandardize end_size = df.shape[0] logger.info(f"Reduced df by {start_size-end_size}") dfs.append(df) return prev_recs, dfs[0], dfs[1] def _remove_full_matches(self, other: 'HoldReport'): """ Removes any contracts that match both contract number and hold amount. These do not need to be reconciled. This id done 'in place' to both dataframes """ filter_id_match: DataFrame = self.df[~(self.df["ID"].isin(other.df["ID"]))] other.df: DataFrame = other.df[~(other.df["ID"].isin(self.df["ID"]))] self.df = filter_id_match self.combined_missing: DataFrame = concat([self.df, other.df], ignore_index=True) #self.combined_missing.to_excel("ALL MISSING.xlsx") logger.debug(f"Combined Missing:\n{self.combined_missing}") logger.info(f"Payments with errors: {self.combined_missing.shape[0]}") @staticmethod def _created_combined_col(column: str, target_df: DataFrame, sources: tuple[str, str]) -> DataFrame : """ Creates a new column by filling empty columns of this source, with the matching column from another source """ this, that = sources target_df[column] = target_df[f"{column}_{this}"].fillna( target_df[f"{column}_{that}"] ) return target_df def _requires_rec(self, other: 'HoldReport') -> tuple[DataFrame, DataFrame]: """ To be run after full matches have been re """ # Merge the two filtered DataFrames on the contract number contract_match = merge( self.df, other.df, how="inner", on=["contract_number"], suffixes=('_'+self.source, '_'+other.source) ) contract_match = create_identifier(contract_match) #contract_match.to_excel("CONTRACT_MATCH.xlsx") for col in ["vendor_name", "HideNextMonth", "Resolution"]: self._created_combined_col(col, contract_match, (self.source, other.source)) logger.debug(f"_requires_rec | contract_match:\n{contract_match.columns} ({contract_match.shape})") no_match: DataFrame = self.combined_missing[~( self.combined_missing["contract_number"].isin( contract_match["contract_number"] )) ] no_match[f"ID_{self.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == self.source else None , axis=1) no_match[f"ID_{other.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == other.source else None , axis=1) no_match = create_identifier(no_match) logger.debug(f"_requires_rec | no_match:\n{no_match.columns} ({no_match.shape})") self.prev_recs, contract_match, no_match = self._remove_prev_recs(contract_match, no_match) return contract_match, no_match @staticmethod def _add_work_columns(df: DataFrame) -> DataFrame: """ Add empty columns to the dataframe to faciliate working through the report. """ logger.debug("Adding work columns!") df_cols: list[str] = df.columns.to_list() WORK_COLS = ["HideNextMonth","Resolution"] for col in WORK_COLS: if col not in df_cols: df[col] = '' return df def reconcile(self, other: 'HoldReport') -> ReconciledReports: """ """ assert self.source != other.source, f"Reports to reconcile must be from different sources.\ ({self.source} , {other.source})." self._remove_full_matches(other) if self.source == "OB": over_due: DataFrame = self.overdue filtered_gp: DataFrame = other.filtered elif self.source == "GP": over_due: DataFrame = other.overdue filtered_gp: DataFrame = self.filtered logger.debug(f"Removed matches:\n{self.df}") amount_mismatch, no_match = self._requires_rec(other) logger.debug(f"reconcile | no_match unaltered\n{no_match.columns} ({no_match.shape})") logger.debug(f"reconcile | am_mm unaltered:\n{amount_mismatch.columns} ({amount_mismatch.shape})") # Formatting columns: list[str] = ["ID_GP", "ID_OB"] columns.extend(self.config["output_columns"]) nm_cols:list[str] = deepcopy(columns) nm_cols.insert(3,"onhold_amount") nm_cols.insert(4,"Source") columns.insert(3,"onhold_amount_GP") columns.insert(4, "onhold_amount_OB") # Select and reorder columns no_match = no_match[ nm_cols ] amount_mismatch = amount_mismatch[ columns ] logger.info(f"no_match: {no_match.shape[0]}") logger.info(f"am_mm: {amount_mismatch.shape[0]}") reconciled: ReconciledReports = ReconciledReports( no_match=no_match, amt_mismatch=amount_mismatch, prev_rec=self.prev_recs, gp_filtered=filtered_gp, ob_overdue = over_due ) return reconciled class OnBaseReport(HoldReport): source = "OB" def __init__(self, dataframe: DataFrame, reports_config: dict) -> None: self.overdue = self._get_overdue(dataframe) super().__init__(dataframe, reports_config) @staticmethod def _get_overdue(dataframe: DataFrame) -> DataFrame: """ """ dataframe["InstallDate"] = to_datetime(dataframe["InstallDate"]) dataframe["InstallDate"].fillna(NaT, inplace=True) overdue: DataFrame = dataframe[dataframe["InstallDate"].dt.date\ < datetime.date.today()] return overdue class GreatPlainsReport(HoldReport): source = "GP" def __init__(self, dataframe: DataFrame, report_config: dict) -> None: self.filtered: DataFrame = self._filter( gp_report_df= dataframe, doc_num_filters= report_config["gp_filters"]["doc_num_filters"], good_po_num_regex= report_config["gp_filters"]["po_filter"] ) super().__init__(dataframe, report_config) @staticmethod def _filter(gp_report_df: DataFrame, doc_num_filters: list[str], good_po_num_regex: str ) -> DataFrame: GOOD_PO_NUM = re.compile(good_po_num_regex, re.IGNORECASE) bad_doc_num = '' rx : str for rx in doc_num_filters: bad_doc_num += f"({rx})|" bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) # Create a mask/filter that will keep rows that match these # requirments keep_mask = ( (gp_report_df["Document Type"] == "Invoice") & (gp_report_df["Purchase Order Number"].str.contains(GOOD_PO_NUM)) ) # Get the rows that DO NOT fit the keep_mask dropped_posotives: DataFrame = gp_report_df[~keep_mask] # Drop the rows to filter gp_report_df.drop(dropped_posotives.index, inplace=True) # Create a filter to remove rows that meet this requirment # Making this a negative in the keep mask is more trouble than # it's worth remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num) dropped_negatives: DataFrame = gp_report_df[remove_mask] gp_report_df.drop(dropped_negatives.index, inplace=True) return concat([dropped_posotives,dropped_negatives], ignore_index=False)