from pandas import DataFrame, merge, to_datetime, NaT, concat, Series from numpy import concatenate from abc import ABC, abstractmethod from logging import getLogger import re from typing import Literal import datetime from copy import deepcopy from helpers import CN_REGEX, drop_unnamed from memory import get_prev_reconciled logger = getLogger(__name__) class HoldReport(ABC): source = "" def __init__(self, dataframe: DataFrame, reports_config: dict) -> None: self.config = reports_config drop_unnamed(dataframe) self.df = dataframe self.prev_rec = None self._normalize() self._previsouly_resolved() def _normalize(self): # Rename the columns to standardize the column names self.df.rename( columns= { unique_cols[self.source] : common_col for common_col, unique_cols in self.config["shared_columns"].items() }, inplace=True) # Convert the on-hold amount column to float format and round to two decimal places self.df["onhold_amount"] = self.df["onhold_amount"].astype(float).round(2) # Use regex to extract the contract number from the column values and create a new column with the standardized format self.df["contract_number"] = self.df["contract_number"].apply( lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) else re.search(CN_REGEX, str(cn)).group(0) ) # Create a new column with a unique transaction ID self.df["ID"] = self.df["contract_number"] +'_'+\ self.df["onhold_amount"].astype(str) # Create a new column with the data source self.df["Source"] = self.source def _previsouly_resolved(self): """ """ current_contracts: list[str] = self.df["contract_number"] prev_recd: DataFrame = get_prev_reconciled(contracts=current_contracts) if not prev_recd: logger.info("No previously reconciled!") self.df = self._add_work_columns(self.df) return self.prev_rec = prev_recd start_size = self.df.shape[0] logger.debug(f"Report DF: \n{self.df}") logger.debug(f"prev_rec: \n{prev_recd}") source_id = f"ID_{self.source}" self.df[source_id] = self.df["ID"] self.df = merge( self.df, prev_recd, how="left", on= source_id, suffixes=("_cur", "_prev") ) #self.df.to_excel(f"merged_df_{self.source}.xlsx") # Drop anything that should be ignored self.df = self.df[self.df["Hide Next Month"] != True] logger.info(f"Prev res added:\n{self.df}") col_to_drop = [] for c in self.df.keys().to_list(): logger.debug(f"{c=}") if "_prev" in c or "ID_" in c: logger.debug(f"Found '_prev' in {c}") col_to_drop.append(c) else: logger.debug(f"{c} is a good col!") #col_to_drop.extend([c for c in self.df.keys().to_list() if '_prev' in c]) logger.debug(f"{col_to_drop=}") self.df.drop( columns= col_to_drop, inplace=True ) # Restandardize self.df.rename(columns={"contract_number_cur": "contract_number"}, inplace=True) end_size = self.df.shape[0] logger.info(f"Reduced df by {start_size-end_size}") def _remove_full_matches(self, other: 'HoldReport'): """ Removes any contracts that match both contract number and hold amount. These do not need to be reconciled. This id done 'in place' to both dataframes """ filter_id_match: DataFrame = self.df[~(self.df["ID"].isin(other.df["ID"]))] other.df: DataFrame = other.df[~(other.df["ID"].isin(self.df["ID"]))] self.df = filter_id_match self.combined_missing: DataFrame = concat([self.df, other.df], ignore_index=True) self.combined_missing.to_excel("ALL MISSING.xlsx") logger.debug(f"Combined Missing:\n{self.combined_missing}") logger.info(f"Payments with errors: {self.combined_missing.shape[0]}") @staticmethod def _created_combined_col(column: str, target_df: DataFrame, sources: tuple[str, str]) -> DataFrame : """ Creates a new column by filling empty columns of this source, with the matching column from another source """ this, that = sources target_df[column] = target_df[f"{column}_{this}"].fillna( target_df[f"{column}_{that}"] ) return target_df def _requires_rec(self, other: 'HoldReport') -> DataFrame: """ To be run after full matches have been re """ # Merge the two filtered DataFrames on the contract number contract_match = merge( self.df, other.df, how="inner", on=["contract_number"], suffixes=('_'+self.source, '_'+other.source) ) #contract_match.to_excel("CONTRACT_MATCH.xlsx") for col in ["vendor_name", "Resolution", "Notes"]: self._created_combined_col(col, contract_match, (self.source, other.source)) logger.debug(f"_requires_rec | contract_match:\n{contract_match.columns} ({contract_match.shape})") no_match: DataFrame = self.combined_missing[~( self.combined_missing["contract_number"].isin( contract_match["contract_number"] )) ] no_match[f"ID_{self.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == self.source else None , axis=1) no_match[f"ID_{other.source}"] = no_match.apply(lambda row: row["ID"] if row["Source"] == other.source else None , axis=1) logger.debug(f"_requires_rec | no_match:\n{no_match.columns} ({no_match.shape})") return contract_match, no_match @staticmethod def _add_work_columns(df: DataFrame) -> DataFrame: """ Add empty columns to the dataframe to faciliate working through the report. """ logger.debug("Adding work columns!") df_cols: list[str] = df.columns.to_list() WORK_COLS = ["Hide Next Month","Resolution"] for col in WORK_COLS: if col not in df_cols: df[col] = '' return df def reconcile(self, other: 'HoldReport') -> tuple[DataFrame]: """ """ self._remove_full_matches(other) all_prev_reced = concat([self.prev_rec, other.prev_rec],ignore_index=True) logger.debug(f"Removed matches:\n{self.df}") amount_mismatch, no_match = self._requires_rec(other) logger.debug(f"reconcile | no_match unaltered\n{no_match.columns} ({no_match.shape})") logger.debug(f"reconcile | am_mm unaltered:\n{amount_mismatch.columns} ({amount_mismatch.shape})") columns: list[str] = ["ID_GP", "ID_OB"] columns.extend(self.config["output_columns"]) nm_cols:list[str] = deepcopy(columns) nm_cols.insert(3,"onhold_amount") nm_cols.insert(4,"Source") columns.insert(3,"onhold_amount_GP") columns.insert(4, "onhold_amount_OB") # Select and reorder columns no_match = no_match[ nm_cols ] amount_mismatch = amount_mismatch[ columns ] logger.info(f"no_match: {no_match.shape[0]}") logger.info(f"am_mm: {amount_mismatch.shape[0]}") return no_match, amount_mismatch class OnBaseReport(HoldReport): source = "OB" def get_overdue(self) -> DataFrame: """ """ self.df["InstallDate"] = to_datetime(self.df["InstallDate"]) self.df["InstallDate"].fillna(NaT, inplace=True) return self.df[self.df["InstallDate"].dt.date < datetime.date.today()] class GreatPlainsReport(HoldReport): source = "GP" def __init__(self, dataframe: DataFrame, report_config: dict) -> None: self._filter( gp_report_df= dataframe, doc_num_filters= report_config["gp_filters"]["doc_num_filters"], good_po_num_regex= report_config["gp_filters"]["po_filter"] ) super().__init__(dataframe, report_config) @staticmethod def _filter(gp_report_df: DataFrame, doc_num_filters: list[str], good_po_num_regex: str) -> DataFrame: GOOD_PO_NUM = re.compile(good_po_num_regex, re.IGNORECASE) bad_doc_num = '' rx : str for rx in doc_num_filters: bad_doc_num += f"({rx})|" bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) # Create a mask/filter that will keep rows that match these # requirments keep_mask = ( (gp_report_df["Document Type"] == "Invoice") & (gp_report_df["Purchase Order Number"].str.contains(GOOD_PO_NUM)) ) # Get the rows that DO NOT fit the keep_mask rows_to_drop = gp_report_df[~keep_mask].index # Drop the rows to filter gp_report_df.drop(rows_to_drop, inplace=True) # Create a filter to remove rows that meet this requirment # Making this a negative in the keep mask is more trouble than # it's worth remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num) rows_to_drop = gp_report_df[remove_mask].index gp_report_df.drop(rows_to_drop, inplace=True) return gp_report_df