from pandas import DataFrame, merge, to_datetime, NaT, concat, ExcelWriter from openpyxl import Workbook, load_workbook from abc import ABC from logging import getLogger import re from re import Pattern import datetime from copy import deepcopy from dataclasses import dataclass from pathlib import Path from src.helpers import CN_REGEX, drop_unnamed from src.memory import get_prev_reconciled, hash_cols, col_hash, create_identifier from src.config import ReportConfig, ReportSource logger = getLogger(__name__) @dataclass class ReconciledReports: no_match: DataFrame amt_mismatch: DataFrame prev_rec: DataFrame gp_filtered: DataFrame ob_overdue: DataFrame def save_reports(self, output_path: Path): with ExcelWriter(output_path, mode='w') as writer: self.no_match.drop_duplicates(inplace=True) self.no_match.to_excel(writer, sheet_name="No Match", index=False, freeze_panes=(1,3) ) self.amt_mismatch.drop_duplicates(inplace=True) self.amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch", index=False, freeze_panes=(1,3) ) self.ob_overdue.to_excel(writer, sheet_name="Overdue", index=False ) self.prev_rec.to_excel(writer, sheet_name="Previously Reconciled", index=False, freeze_panes=(1,3) ) self.gp_filtered.to_excel(writer, sheet_name="Filtered from GP", index=False, freeze_panes=(1,0) ) wb: Workbook = load_workbook(output_path) for sheet in ["No Match", "Amount Mismatch"]: ws = wb[sheet] ws.column_dimensions['A'].hidden = True ws.column_dimensions['B'].hidden = True for sheet in ["Filtered from GP", "Previously Reconciled"]: wb[sheet].sheet_state = "hidden" wb.save(output_path) wb.close() class HoldReport(ABC): source = "" def __init__(self, dataframe: DataFrame, reports_config: ReportConfig) -> None: self.config = reports_config drop_unnamed(dataframe) self.df = dataframe self.df = self._add_work_columns(self.df, reports_config.work_columns) self._normalize() def _normalize(self): # Rename the columns to standardize the column names self.df.rename( columns= { sc_dict[self.source] : sc_dict["standardized_name"] for sc_dict in self.config.shared_columns }, inplace=True) # Convert the on-hold amount column to float format and round to two decimal places self.df["onhold_amount"] = self.df["onhold_amount"].astype(float).round(2) # Use regex to extract the contract number from the column values and create a new column with the standardized format self.df["contract_number"] = self.df["contract_number"].apply( lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) else re.search(CN_REGEX, str(cn)).group(0) ) # Create a new column with a unique transaction ID self.df["ID"] = self.df["contract_number"] +'_'+\ self.df["onhold_amount"].astype(str) # Create a new column with the data source self.df["Source"] = self.source @staticmethod def _remove_prev_recs(contract_match, no_match, db_location: Path) -> \ tuple[DataFrame, DataFrame, DataFrame]: """ """ idents: list[col_hash] = create_identifier(contract_match)["Indentifier"].to_list() idents.extend(create_identifier(no_match)["Indentifier"].to_list()) logger.debug(f"{idents=}") # Get previsouly reced prev_recs: DataFrame|None = get_prev_reconciled(idents, db_location) if prev_recs is None: logger.info("No previously reconciled!") return DataFrame(), contract_match, no_match dfs = [] for df in [contract_match, no_match]: start_size = df.shape[0] logger.debug(f"Report DF: \n{df}") logger.debug(f"prev_rec: \n{prev_recs}") df = merge( df, prev_recs, how="left", on= "Indentifier", suffixes=("_cur", "_prev") ) df = HoldReport._created_combined_col("HideNextMonth", df, ["prev", "cur"]) df = HoldReport._created_combined_col("Resolution", df, ["prev", "cur"]) df["ID_OB"] = df["ID_OB_cur"] df["ID_GP"] = df["ID_GP_cur"] # Drop anything that should be ignored df = df[df["HideNextMonth"] != True] logger.info(f"Prev res added:\n{df}") col_to_drop = [] for c in df.keys().to_list(): if "_prev" in c in c or "_cur" in c: col_to_drop.append(c) logger.debug(f"{col_to_drop=}") df.drop( columns= col_to_drop, inplace=True ) # Restandardize end_size = df.shape[0] logger.info(f"Reduced df by {start_size-end_size}") dfs.append(df) return prev_recs, dfs[0], dfs[1] def _remove_full_matches(self, other: 'HoldReport'): """ Removes any contracts that match both contract number and hold amount. These do not need to be reconciled. This id done 'in place' to both dataframes """ filter_id_match: DataFrame = self.df[~(self.df["ID"].isin(other.df["ID"]))] other.df: DataFrame = other.df[~(other.df["ID"].isin(self.df["ID"]))] self.df = filter_id_match self.combined_missing: DataFrame = concat([self.df, other.df], ignore_index=True) #self.combined_missing.to_excel("ALL MISSING.xlsx") logger.debug(f"Combined Missing:\n{self.combined_missing}") logger.info(f"Payments with errors: {self.combined_missing.shape[0]}") @staticmethod def _created_combined_col(column: str, target_df: DataFrame, sources: tuple[str, str]) -> DataFrame : """ Creates a new column by filling empty columns of this source, with the matching column from another source """ this, that = sources target_df[column] = target_df[f"{column}_{this}"].fillna( target_df[f"{column}_{that}"] ) return target_df def _requires_rec(self, other: 'HoldReport') -> tuple[DataFrame, DataFrame]: """ To be run after full matches have been re """ # Merge the two filtered DataFrames on the contract number contract_match = merge( self.df, other.df, how="inner", on=["contract_number"], suffixes=('_'+self.source, '_'+other.source) ) contract_match = create_identifier(contract_match) #contract_match.to_excel("CONTRACT_MATCH.xlsx") for col in ["vendor_name", "HideNextMonth", "Resolution"]: self._created_combined_col(col, contract_match, (self.source, other.source)) logger.debug(f"_requires_rec | contract_match:\n{contract_match.columns} ({contract_match.shape})") no_match: DataFrame = self.combined_missing[~( self.combined_missing["contract_number"].isin( contract_match["contract_number"] )) ] no_match[f"ID_{self.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == self.source else None , axis=1) no_match[f"ID_{other.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == other.source else None , axis=1) no_match = create_identifier(no_match) logger.debug(f"_requires_rec | no_match:\n{no_match.columns} ({no_match.shape})") self.prev_recs, contract_match, no_match = self._remove_prev_recs(contract_match, no_match, self.config.paths.db_path ) return contract_match, no_match @staticmethod def _add_work_columns(df: DataFrame, work_cols: list) -> DataFrame: """ Add empty columns to the dataframe to faciliate working through the report. """ logger.debug("Adding work columns!") df_cols: list[str] = df.columns.to_list() for col in work_cols: if col not in df_cols: df[col] = '' return df def reconcile(self, other: 'HoldReport') -> ReconciledReports: """ """ assert self.source != other.source, f"Reports to reconcile must be from different sources.\ ({self.source} , {other.source})." self._remove_full_matches(other) if self.source == "OB": over_due: DataFrame = self.overdue filtered_gp: DataFrame = other.filtered elif self.source == "GP": over_due: DataFrame = other.overdue filtered_gp: DataFrame = self.filtered logger.debug(f"Removed matches:\n{self.df}") amount_mismatch, no_match = self._requires_rec(other) logger.debug(f"reconcile | no_match unaltered\n{no_match.columns} ({no_match.shape})") logger.debug(f"reconcile | am_mm unaltered:\n{amount_mismatch.columns} ({amount_mismatch.shape})") # Formatting columns: list[str] = ["ID_GP", "ID_OB"] columns.extend(self.config.finished_columns) nm_cols:list[str] = deepcopy(columns) nm_cols.insert(3,"onhold_amount") nm_cols.insert(4,"Source") columns.insert(3,"onhold_amount_GP") columns.insert(4, "onhold_amount_OB") # Select and reorder columns no_match = no_match[ nm_cols ] amount_mismatch = amount_mismatch[ columns ] logger.info(f"no_match: {no_match.shape[0]}") logger.info(f"am_mm: {amount_mismatch.shape[0]}") reconciled: ReconciledReports = ReconciledReports( no_match=no_match, amt_mismatch=amount_mismatch, prev_rec=self.prev_recs, gp_filtered=filtered_gp, ob_overdue = over_due ) return reconciled class OnBaseReport(HoldReport): source = "OB" def __init__(self, dataframe: DataFrame, reports_config: ReportConfig) -> None: self.overdue = self._get_overdue(dataframe) super().__init__(dataframe, reports_config) @staticmethod def _get_overdue(dataframe: DataFrame) -> DataFrame: """ """ dataframe["InstallDate"] = to_datetime(dataframe["InstallDate"]) dataframe["InstallDate"].fillna(NaT, inplace=True) overdue: DataFrame = dataframe[dataframe["InstallDate"].dt.date\ < datetime.date.today()] return overdue class GreatPlainsReport(HoldReport): source = "GP" def __init__(self, dataframe: DataFrame, report_config: ReportConfig) -> None: self.filtered: DataFrame = self._filter( gp_report_df= dataframe, doc_num_filters= report_config.filters["doc_num_filters"], good_po_num_regex= report_config.filters["po_filter"][0] ) super().__init__(dataframe, report_config) @staticmethod def _filter(gp_report_df: DataFrame, doc_num_filters: list[Pattern], good_po_num_regex: Pattern ) -> DataFrame: GOOD_PO_NUM = good_po_num_regex bad_doc_num = '(?i)' rx : Pattern for rx in doc_num_filters: bad_doc_num += f"({rx})|" bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) # Create a mask/filter that will keep rows that match these # requirments keep_mask = ( (gp_report_df["Document Type"] == "Invoice") & (gp_report_df["Purchase Order Number"].str.contains(GOOD_PO_NUM)) ) # Get the rows that DO NOT fit the keep_mask dropped_posotives: DataFrame = gp_report_df[~keep_mask] # Drop the rows to filter gp_report_df.drop(dropped_posotives.index, inplace=True) # Create a filter to remove rows that meet this requirment # Making this a negative in the keep mask is more trouble than # it's worth remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num) dropped_negatives: DataFrame = gp_report_df[remove_mask] gp_report_df.drop(dropped_negatives.index, inplace=True) return concat([dropped_posotives,dropped_negatives], ignore_index=False)