Helps unify everything. Not yet prepared for memory or db search...dev
parent
6eb57d7978
commit
7ad4f76943
@ -1,53 +0,0 @@ |
|||||||
write_dir = "Work" |
|
||||||
DocNumFilter = [ |
|
||||||
"p(oin)?ts", |
|
||||||
"pool", |
|
||||||
"promo", |
|
||||||
"o(ver)?f(und)?", |
|
||||||
"m(ar)?ke?t", |
|
||||||
"title", |
|
||||||
"adj", |
|
||||||
"reg free", |
|
||||||
"cma" |
|
||||||
] |
|
||||||
[ExcelColumns] |
|
||||||
|
|
||||||
[ExcelColumns.OB] |
|
||||||
contract_number = "Contract" # 3070508-007 |
|
||||||
onhold_amount = "CurrentOnHold" |
|
||||||
install_date = "InstallDate" |
|
||||||
|
|
||||||
[ExcelColumns.GP] |
|
||||||
contract_number = "Transaction Description" # 1234-56789 |
|
||||||
onhold_amount = "Current Trx Amount" |
|
||||||
doc_num = "Document Number" # 1-316141 HOLD |
|
||||||
pur_order = "Purchase Order Number" # ABC123 |
|
||||||
doc_type = "Document Type" # Invoice or Credit Memo |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
[logger] |
|
||||||
version = 1 |
|
||||||
|
|
||||||
disable_existing_loggers = false |
|
||||||
|
|
||||||
[logger.formatters.custom] |
|
||||||
format = "'%(asctime)s - %(module)s - %(levelname)s - %(message)s'" |
|
||||||
|
|
||||||
[logger.handlers.console] |
|
||||||
class = "logging.StreamHandler" |
|
||||||
level = "DEBUG" |
|
||||||
formatter = "custom" |
|
||||||
stream = "ext://sys.stdout" |
|
||||||
|
|
||||||
[logger.handlers.file] |
|
||||||
class = "logging.FileHandler" |
|
||||||
level = "DEBUG" |
|
||||||
formatter = "custom" |
|
||||||
filename = "on_hold.log" |
|
||||||
|
|
||||||
[logger.root] |
|
||||||
level = "DEBUG" |
|
||||||
handlers = ["console", "file"] |
|
||||||
|
|
||||||
@ -0,0 +1,22 @@ |
|||||||
|
version = 1 |
||||||
|
|
||||||
|
disable_existing_loggers = false |
||||||
|
|
||||||
|
[formatters.custom] |
||||||
|
format = "'%(asctime)s - %(module)s - %(levelname)s - %(message)s'" |
||||||
|
|
||||||
|
[handlers.console] |
||||||
|
class = "logging.StreamHandler" |
||||||
|
level = "DEBUG" |
||||||
|
formatter = "custom" |
||||||
|
stream = "ext://sys.stdout" |
||||||
|
|
||||||
|
[handlers.file] |
||||||
|
class = "logging.FileHandler" |
||||||
|
level = "DEBUG" |
||||||
|
formatter = "custom" |
||||||
|
filename = "on_hold.log" |
||||||
|
|
||||||
|
[root] |
||||||
|
level = "DEBUG" |
||||||
|
handlers = ["console", "file"] |
||||||
@ -0,0 +1,34 @@ |
|||||||
|
output_columns = [ |
||||||
|
"contract_number", |
||||||
|
"vendor_name", |
||||||
|
"AppNum", # OB only |
||||||
|
"DateBooked", # OB only |
||||||
|
"Document Number"# GP Only |
||||||
|
# 'Source' added for 'no match' |
||||||
|
] |
||||||
|
|
||||||
|
|
||||||
|
[gp_filters] |
||||||
|
# These regex will be combined and with ORs and used to filer |
||||||
|
# the document number column of the GP report |
||||||
|
doc_num_filters = [ |
||||||
|
"p(oin)?ts", |
||||||
|
"pool", |
||||||
|
"promo", |
||||||
|
"o(ver)?f(und)?", |
||||||
|
"m(ar)?ke?t", |
||||||
|
"title", |
||||||
|
"adj", |
||||||
|
"reg free", |
||||||
|
"cma" |
||||||
|
] |
||||||
|
po_filter = "^(?!.*cma(\\s|\\d)).*$" |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
[shared_columns] |
||||||
|
contract_number = { GP = "Transaction Description", OB = "Contract"} |
||||||
|
onhold_amount = { GP = "Current Trx Amount", OB = "CurrentOnHold" } |
||||||
|
vendor_name = { GP = "Vendor Name", OB = "DealerName"} |
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,90 @@ |
|||||||
|
""" |
||||||
|
Hold Reconciler is an application meant to help reconcile the differences in payments |
||||||
|
that marked as on hold in Great Plains and OnBase. |
||||||
|
|
||||||
|
It takes a report csv from OnBase and a report from GreatPlains and checks them |
||||||
|
against each other. It attempts to make them based on contract number and payment |
||||||
|
amount, or just the contract number. |
||||||
|
|
||||||
|
It also does a lot of filtering for the Great Plains report to remove irrelevant data. |
||||||
|
|
||||||
|
*Last Updated: version 1.3* |
||||||
|
*Originally developed in Spring of 2023 by Griffiths Lott (g@glott.me)* |
||||||
|
""" |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import os |
||||||
|
from os.path import basename |
||||||
|
import glob |
||||||
|
import logging |
||||||
|
from pathlib import Path |
||||||
|
from tomllib import load |
||||||
|
from pandas import DataFrame, Series |
||||||
|
from typing import TypeVar, Literal |
||||||
|
|
||||||
|
|
||||||
|
import logging.config |
||||||
|
from logging import getLogger |
||||||
|
|
||||||
|
logger = getLogger(__name__) |
||||||
|
|
||||||
|
CN_REGEX = re.compile(r"\d{7}(-\d{3})?") |
||||||
|
|
||||||
|
def setup_logging(): |
||||||
|
""" |
||||||
|
Sets up logging configuration from the TOML file. If the logging configuration fails to be loaded from the file, |
||||||
|
a default logging configuration is used instead. |
||||||
|
|
||||||
|
Returns: |
||||||
|
logging.Logger: The logger instance. |
||||||
|
""" |
||||||
|
with open("config_logger.toml", "rb") as f: |
||||||
|
config_dict: dict = load(f) |
||||||
|
try: |
||||||
|
# Try to load logging configuration from the TOML file |
||||||
|
logging.config.dictConfig(config_dict) |
||||||
|
except Exception as e: |
||||||
|
# If the logging configuration fails, use a default configuration and log the error |
||||||
|
logger = logging.getLogger() |
||||||
|
logger.setLevel(logging.DEBUG) |
||||||
|
logger.warning("Failed setting up logger!") |
||||||
|
logger.exception(e) |
||||||
|
logger.warning(f"Config:\n{config_dict}") |
||||||
|
return logger |
||||||
|
|
||||||
|
|
||||||
|
def drop_unnamed(df: DataFrame, inplace: bool = True) -> DataFrame|None: |
||||||
|
""" |
||||||
|
Drops all Unnamed columns from a dataframe. |
||||||
|
### CAUTION : This function acts *inplace* by deafult |
||||||
|
(on the orignal dataframe, not a copy!) |
||||||
|
""" |
||||||
|
cols = [c for c in df.columns if "Unnamed" in c] |
||||||
|
return df.drop(cols, axis=1, inplace=inplace) |
||||||
|
|
||||||
|
|
||||||
|
def find_most_recent_file(folder_path: Path, file_pattern: Pattern) -> str: |
||||||
|
""" |
||||||
|
Given a folder path and a regular expression pattern, this function returns the path of the most recently modified |
||||||
|
file in the folder that matches the pattern. |
||||||
|
|
||||||
|
Args: |
||||||
|
folder_path (Path): A pathlib.Path object representing the folder to search. |
||||||
|
file_pattern (Pattern): A regular expression pattern used to filter the files in the folder. |
||||||
|
|
||||||
|
Returns: |
||||||
|
str: The path of the most recently modified file in the folder that matches the pattern. |
||||||
|
""" |
||||||
|
# Find all files in the folder that match the pattern |
||||||
|
files = glob.glob(f"{folder_path}/*") |
||||||
|
logger.debug(f"files: {files}") |
||||||
|
|
||||||
|
# Get the modification time of each file and filter to only those that match the pattern |
||||||
|
file_times = [(os.path.getmtime(path), path) for path in files if re.match(file_pattern, basename(path))] |
||||||
|
|
||||||
|
# Sort the files by modification time (most recent first) |
||||||
|
file_times.sort(reverse=True) |
||||||
|
logger.debug(f"file times: {file_times}") |
||||||
|
|
||||||
|
# Return the path of the most recent file |
||||||
|
return file_times[0][1] |
||||||
@ -0,0 +1,120 @@ |
|||||||
|
""" |
||||||
|
This is the main entry point for this application. It find the newest reports (GP & OB) |
||||||
|
then utilizes the reconcile module to find the differences between them. The output is |
||||||
|
saved as an excel file with todays date. |
||||||
|
""" |
||||||
|
# Custom module for reconciliation |
||||||
|
from helpers import setup_logging, find_most_recent_file, check_sheet |
||||||
|
from models import OnBaseReport, GreatPlainsReport |
||||||
|
|
||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import logging |
||||||
|
from tomllib import load |
||||||
|
import logging.config |
||||||
|
from datetime import datetime as dt |
||||||
|
|
||||||
|
""" |
||||||
|
[ ] Pull in past reconciliations to check against |
||||||
|
[ ] Record reconciled transaction (connect with VBA) |
||||||
|
[ ] Check GP against the database |
||||||
|
[ ] Check OB against the database |
||||||
|
[X] Add resolution column to error sheets |
||||||
|
[ ] Add sheet for problem contractas already seen and 'resolved' |
||||||
|
""" |
||||||
|
|
||||||
|
setup_logging() |
||||||
|
logger = logging.getLogger(__name__) |
||||||
|
logger.info(f"Logger started with level: {logger.level}") |
||||||
|
|
||||||
|
|
||||||
|
def get_reports(work_dir: str, report_config: dict) -> tuple[pd.DataFrame|None, pd.DataFrame|None]: |
||||||
|
""" |
||||||
|
Given a dictionary of Excel configuration options, this function searches for the most recently modified GP and OB |
||||||
|
Excel files in a "Work" folder and returns their corresponding dataframes. |
||||||
|
|
||||||
|
Args: |
||||||
|
excelConfig (dict): A dictionary containing configuration options for the GP and OB Excel files. |
||||||
|
|
||||||
|
Returns: |
||||||
|
tuple[pd.DataFrame|None, pd.DataFrame|None]: A tuple containing the OB and GP dataframes, respectively. |
||||||
|
""" |
||||||
|
|
||||||
|
# Define regular expression patterns to match the GP and OB Excel files |
||||||
|
gp_regex: Pattern = re.compile(".*gp.*\.xlsx$", re.IGNORECASE) |
||||||
|
ob_regex: Pattern = re.compile(".*ob.*\.xlsx$", re.IGNORECASE) |
||||||
|
|
||||||
|
# Find the paths of the most recently modified GP and OB Excel files |
||||||
|
gp_file_path = find_most_recent_file(work_dir, gp_regex) |
||||||
|
logger.debug(f"gp_file_path: {gp_file_path}") |
||||||
|
ob_file_path = find_most_recent_file(work_dir, ob_regex) |
||||||
|
logger.debug(f"gp_file_path: {ob_file_path}") |
||||||
|
|
||||||
|
# Read the GP and OB Excel files into dataframes and check that each dataframe has the required columns |
||||||
|
gp_xl = pd.ExcelFile(gp_file_path) |
||||||
|
gp_req_cols = [col["GP"] for _, col in report_config["shared_columns"].items()] |
||||||
|
logger.debug(f"GP_Req_cols: {gp_req_cols}") |
||||||
|
gp_sheets = gp_xl.sheet_names |
||||||
|
gp_dfs = pd.read_excel(gp_xl, sheet_name=gp_sheets) |
||||||
|
for sheet in gp_dfs: |
||||||
|
sheet_columns: list[str] = list(gp_dfs[sheet].columns) |
||||||
|
logger.debug(f"gp ({sheet}) : {sheet_columns}") |
||||||
|
logger.debug(f"Matches {[r in sheet_columns for r in gp_req_cols]}") |
||||||
|
if all([r in sheet_columns for r in gp_req_cols]): |
||||||
|
logger.debug("FOUND") |
||||||
|
gp_df = gp_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
ob_xl = pd.ExcelFile(ob_file_path) |
||||||
|
ob_req_cols = [col["OB"] for _, col in report_config["shared_columns"].items()] |
||||||
|
ob_sheets = ob_xl.sheet_names |
||||||
|
ob_dfs = pd.read_excel(ob_xl, sheet_name=ob_sheets) |
||||||
|
for sheet in ob_dfs: |
||||||
|
sheet_columns: list[str] = list(ob_dfs[sheet].columns) |
||||||
|
if all([r in sheet_columns for r in ob_req_cols]): |
||||||
|
ob_df = ob_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
return ob_df, gp_df |
||||||
|
|
||||||
|
|
||||||
|
def main() -> int: |
||||||
|
""" |
||||||
|
This is the main function for the script. It reads configuration options from a TOML file, reads in the GP and OB |
||||||
|
Excel files, performs data reconciliation and analysis, and writes the results to a new Excel file. |
||||||
|
|
||||||
|
Returns: |
||||||
|
int: 0 if the script executes successfully. |
||||||
|
""" |
||||||
|
# Read the configuration options from a TOML file |
||||||
|
with open("config_reports.toml", "rb") as f: |
||||||
|
reports_config: dict = load(f) |
||||||
|
logger.debug(f"Reports Config: {reports_config}") |
||||||
|
|
||||||
|
# Get the GP and OB dataframes from the Excel files |
||||||
|
ob_df, gp_df = get_reports("Work", reports_config) |
||||||
|
assert not ob_df.empty, "OB Data empty!" |
||||||
|
assert not gp_df.empty, "GP Data empty!" |
||||||
|
|
||||||
|
obr: OnBaseReport = OnBaseReport(ob_df, reports_config) |
||||||
|
gpr: GreatPlainsReport = GreatPlainsReport(gp_df, reports_config) |
||||||
|
|
||||||
|
overdue: DataFrame = obr.get_overdue() |
||||||
|
|
||||||
|
no_match, amt_mismatch = obr.reconcile(gpr) |
||||||
|
|
||||||
|
# Write the results to a new Excel file |
||||||
|
with pd.ExcelWriter(f"Work/Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx", mode='w') as writer: |
||||||
|
no_match.to_excel(writer, sheet_name="No Match", index=False) |
||||||
|
amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch", index=False) |
||||||
|
overdue.to_excel(writer, sheet_name="Overdue", index=False) |
||||||
|
|
||||||
|
return 0 |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
print("Starting") |
||||||
|
main() |
||||||
|
print("Completed") |
||||||
@ -0,0 +1,156 @@ |
|||||||
|
""" |
||||||
|
Classes and functions to parse completed reconciliation reports and remember |
||||||
|
the resolutions of contracts. |
||||||
|
|
||||||
|
Also provides a way for the reconciler to check hold against previously |
||||||
|
resolved holds. |
||||||
|
|
||||||
|
*Last Updated: version 1.3 |
||||||
|
""" |
||||||
|
from . import drop_unnamed |
||||||
|
from ghlib.database.database_manager import SQLiteManager |
||||||
|
|
||||||
|
from pandas import DataFrame, Series, read_sql_query, read_excel, concat |
||||||
|
from logging import getLogger |
||||||
|
|
||||||
|
|
||||||
|
logger = getLogger(__name__) |
||||||
|
|
||||||
|
|
||||||
|
def normalize_cols(df: DataFrame) -> DataFrame: |
||||||
|
""" |
||||||
|
|
||||||
|
""" |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def process_resolutions(df: DataFrame) -> DataFrame: |
||||||
|
""" |
||||||
|
|
||||||
|
""" |
||||||
|
# Drop unnamed columns: |
||||||
|
drop_unnamed(df) # Works 'inplace' |
||||||
|
|
||||||
|
# Drop anything where resolution is blanks |
||||||
|
df: DataFrame = df[~df["Resolution"].isnull()] |
||||||
|
|
||||||
|
# Standardize the resolution |
||||||
|
df["Resolution"] = df["Resolution"].astype(str) |
||||||
|
df["Resolution"] = df["Resolution"].apply(lambda res: res.strip().lower()) |
||||||
|
|
||||||
|
# Check for multiple 'onhold_amount' columns |
||||||
|
cols: list[str] = list(df.keys()) |
||||||
|
mult_amounts: bool = True if "onhold_amount_ob" in cols else False |
||||||
|
|
||||||
|
if mult_amounts: |
||||||
|
# Create duplicates with the other amounts |
||||||
|
gp_amts: DataFrame = df[ |
||||||
|
["contract_number", |
||||||
|
"onhold_amount_gp", |
||||||
|
"Resolution", |
||||||
|
"Notes" |
||||||
|
]] |
||||||
|
df = df[ |
||||||
|
["contract_number", |
||||||
|
"onhold_amount_ob", |
||||||
|
"Resolution", |
||||||
|
"Notes" |
||||||
|
]] |
||||||
|
|
||||||
|
# Rename the amount columns and add the source |
||||||
|
gp_amts.rename(columns={"onhold_amount_gp":"onhold_amount"}, inplace=True) |
||||||
|
gp_amts["Source"] = "GP" |
||||||
|
df.rename(columns={"onhold_amount_ob":"onhold_amount"}, inplace=True) |
||||||
|
df["Source"] = "OB" |
||||||
|
|
||||||
|
# Combine them back together |
||||||
|
df: DataFrame = concat([df, gp_amts]) |
||||||
|
df["Type"] = "AmountMismatch" |
||||||
|
|
||||||
|
else: |
||||||
|
# Filter columns |
||||||
|
df = df[ |
||||||
|
["Source", |
||||||
|
"contract_number", |
||||||
|
"onhold_amount", |
||||||
|
"Resolution", |
||||||
|
"Notes" |
||||||
|
]] |
||||||
|
df["Type"] = "NoMatch" |
||||||
|
|
||||||
|
return df |
||||||
|
|
||||||
|
|
||||||
|
def save_recs(resolved_dataframes: list[DataFrame]): |
||||||
|
""" |
||||||
|
""" |
||||||
|
sqlManager: SQLiteManager = SQLiteManager("OnHold.db") |
||||||
|
with sqlManager.get_session() as session: |
||||||
|
conn = session.connection() |
||||||
|
|
||||||
|
df: DataFrame |
||||||
|
for df in resolved_dataframes: |
||||||
|
try: |
||||||
|
# Drop uneeded columns and filter only to resolved data |
||||||
|
df = process_resolutions(df) |
||||||
|
# Save to the database |
||||||
|
df.to_sql("Resolutions", conn, if_exists="append") |
||||||
|
except Exception as e: |
||||||
|
logger.exception(f"Could not save resolution dataframe: {e}") |
||||||
|
continue |
||||||
|
|
||||||
|
|
||||||
|
def get_prev_reconciled(contracts: list[str]) -> DataFrame: |
||||||
|
""" |
||||||
|
Get a DataFrame of previously reconciled contracts from an SQLite database. |
||||||
|
|
||||||
|
Args: |
||||||
|
contracts (list[str]): A list of contract numbers to check for previously reconciled contracts. |
||||||
|
|
||||||
|
Returns: |
||||||
|
DataFrame: A DataFrame of previously reconciled contracts, or an empty DataFrame if none are found. |
||||||
|
""" |
||||||
|
# Create a DB manager |
||||||
|
sqlManager: SQLiteManager = SQLiteManager("OnHold.db") |
||||||
|
|
||||||
|
# Create a temp table to hold this batches contract numbers |
||||||
|
# this table will be cleared when sqlManager goes out of scope |
||||||
|
temp_table_statement = """ |
||||||
|
CREATE TEMPORARY TABLE CUR_CONTRACTS (contract_numbers VARCHAR(11)); |
||||||
|
""" |
||||||
|
sqlManager.execute(temp_table_statement) |
||||||
|
|
||||||
|
# Insert the current contracts into the temp table |
||||||
|
insert_contracts = f""" |
||||||
|
INSERT INTO CUR_CONTRACTS (contract_numbers) VALUES |
||||||
|
{', '.join([f"('{cn}')" for cn in contracts])}; |
||||||
|
""" |
||||||
|
sqlManager.execute(insert_contracts) |
||||||
|
|
||||||
|
# Select previously resolved contracts |
||||||
|
res_query = """ |
||||||
|
SELECT r.* |
||||||
|
FROM Resolutions r |
||||||
|
JOIN CUR_CONTRACTS t |
||||||
|
ON r.contract_number = t.contract_number; |
||||||
|
""" |
||||||
|
resolved: DataFrame = sqlManager.execute(res_query, as_dataframe=True) |
||||||
|
return resolved |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
import argparse |
||||||
|
|
||||||
|
parser = argparse.ArgumentParser( |
||||||
|
prog="HoldReconcilerRecord", |
||||||
|
) |
||||||
|
parser.add_argument("-i", "--input") |
||||||
|
args = parser.parse_args() |
||||||
|
|
||||||
|
# No Match |
||||||
|
no_match: DataFrame = read_excel(args.input, sheet_name="No Match") |
||||||
|
# Amount Mismatch |
||||||
|
amt_mm: DataFrame = read_excel(args.input, sheet_name="Amount Mismatch") |
||||||
|
|
||||||
|
save_recs(resolved_dataframes=[no_match, amt_mm]) |
||||||
@ -1,251 +0,0 @@ |
|||||||
import pandas as pd |
|
||||||
from pandas import DataFrame |
|
||||||
from datetime import datetime as dt |
|
||||||
import datetime |
|
||||||
import re |
|
||||||
from typing import Literal |
|
||||||
import logging |
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__) |
|
||||||
|
|
||||||
|
|
||||||
def get_overdue(onbase_df: DataFrame, onbase_excel_config) -> DataFrame: |
|
||||||
""" |
|
||||||
Given a DataFrame containing OnBase installation data and a dictionary containing the OnBase Excel configuration, |
|
||||||
this function returns a DataFrame containing the rows from `onbase_df` that have an installation date that is before |
|
||||||
the current date. |
|
||||||
|
|
||||||
Args: |
|
||||||
onbase_df (pd.DataFrame): A pandas DataFrame containing OnBase installation data. |
|
||||||
onbase_excel_config (dict): A dictionary containing the OnBase Excel configuration. |
|
||||||
|
|
||||||
Returns: |
|
||||||
pd.DataFrame: A pandas DataFrame containing the rows from `onbase_df` that have an installation date that is before |
|
||||||
the current date. |
|
||||||
""" |
|
||||||
id_col = onbase_excel_config["install_date"] |
|
||||||
onbase_df[id_col] = pd.to_datetime(onbase_df[id_col]) |
|
||||||
onbase_df[id_col].fillna(pd.NaT, inplace=True) |
|
||||||
return onbase_df[onbase_df[id_col].dt.date < datetime.date.today()] |
|
||||||
|
|
||||||
|
|
||||||
def filter_gp(gp_dataframe: pd.DataFrame, full_config: dict) -> pd.DataFrame: |
|
||||||
""" |
|
||||||
Given a pandas DataFrame containing GP data and a dictionary containing the GP configuration, this function |
|
||||||
filters out rows from the DataFrame that are not needed for further analysis based on certain criteria. |
|
||||||
|
|
||||||
Args: |
|
||||||
gp_dataframe (pd.DataFrame): A pandas DataFrame containing GP data. |
|
||||||
gp_config (dict): A dictionary containing the GP configuration. |
|
||||||
|
|
||||||
Returns: |
|
||||||
pd.DataFrame: A pandas DataFrame containing the filtered GP data. |
|
||||||
""" |
|
||||||
|
|
||||||
# Excludes anything that contains cma with a space or digit following it |
|
||||||
# CMA23532 would be excluded but 'John Locman' would be allowed |
|
||||||
GOOD_PO_NUM = re.compile(r"^(?!.*cma(\s|\d)).*$", re.IGNORECASE) |
|
||||||
|
|
||||||
gp_config: dict = full_config["ExcelColumns"]["GP"] |
|
||||||
doc_num_regexes: list[str] = full_config["DocNumFilter"] |
|
||||||
|
|
||||||
bad_doc_num = '' |
|
||||||
rx : str |
|
||||||
for rx in doc_num_regexes: |
|
||||||
bad_doc_num += f"({rx})|" |
|
||||||
bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) |
|
||||||
logger.debug(f"Doc # filter: {bad_doc_num}") |
|
||||||
# Create a filter/mask to use on the data |
|
||||||
mask = ( |
|
||||||
(gp_dataframe[gp_config['doc_type']] == "Invoice") & |
|
||||||
(gp_dataframe[gp_config['pur_order']].str.contains(GOOD_PO_NUM)) |
|
||||||
) |
|
||||||
|
|
||||||
# Get the rows to drop based on the filter/mask |
|
||||||
rows_to_drop = gp_dataframe[~mask].index |
|
||||||
|
|
||||||
# Drop the rows and return the filtered DataFrame |
|
||||||
filtered_df = gp_dataframe.drop(rows_to_drop, inplace=False) |
|
||||||
|
|
||||||
mask = filtered_df[gp_config['doc_num']].str.contains(bad_doc_num) |
|
||||||
rows_to_drop = filtered_df[mask].index |
|
||||||
|
|
||||||
return filtered_df.drop(rows_to_drop, inplace=False) |
|
||||||
|
|
||||||
|
|
||||||
def create_transaction_df(dataframe: pd.DataFrame, source: Literal["GP", "OB"], excelConfig: dict): |
|
||||||
""" |
|
||||||
Given a pandas DataFrame containing transaction data, the source of the data ("GP" or "OB"), and a dictionary |
|
||||||
containing the Excel configuration, this function creates a new DataFrame with columns for the contract number, |
|
||||||
the amount on hold, a unique transaction ID, and the source of the data. |
|
||||||
|
|
||||||
Args: |
|
||||||
dataframe (pd.DataFrame): A pandas DataFrame containing transaction data. |
|
||||||
source (Literal["GP", "OB"]): The source of the data ("GP" or "OB"). |
|
||||||
excelConfig (dict): A dictionary containing the Excel configuration. |
|
||||||
|
|
||||||
Returns: |
|
||||||
pd.DataFrame: A pandas DataFrame containing the contract number, amount on hold, transaction ID, and data source |
|
||||||
for each transaction in the original DataFrame. |
|
||||||
""" |
|
||||||
column_config: dict = excelConfig[source] |
|
||||||
logger.debug(f"column_config: {column_config}") |
|
||||||
# Create a new DataFrame with the contract number and on-hold amount columns |
|
||||||
transactions = dataframe[[column_config["contract_number"], column_config["onhold_amount"]]].copy() |
|
||||||
|
|
||||||
# Rename the columns to standardize the column names |
|
||||||
transactions.rename(columns={ |
|
||||||
column_config["contract_number"]: "contract_number", |
|
||||||
column_config["onhold_amount"]: "onhold_amount", |
|
||||||
}, inplace=True) |
|
||||||
|
|
||||||
# Convert the on-hold amount column to float format and round to two decimal places |
|
||||||
transactions["onhold_amount"] = transactions["onhold_amount"].astype(float).round(2) |
|
||||||
|
|
||||||
# Use regex to extract the contract number from the column values and create a new column with the standardized format |
|
||||||
CN_REGEX = re.compile(r"\d{7}(-\d{3})?") |
|
||||||
transactions["contract_number"] = transactions["contract_number"].apply( |
|
||||||
lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) |
|
||||||
else re.search(CN_REGEX, str(cn)).group(0) |
|
||||||
) |
|
||||||
|
|
||||||
# Create a new column with a unique transaction ID |
|
||||||
transactions["ID"] = transactions["contract_number"] +'_'+\ |
|
||||||
transactions["onhold_amount"].astype(str) |
|
||||||
|
|
||||||
# Create a new column with the data source |
|
||||||
transactions["Source"] = source |
|
||||||
|
|
||||||
# Return the new DataFrame with the contract number, on-hold amount, transaction ID, and data source columns |
|
||||||
return transactions |
|
||||||
|
|
||||||
|
|
||||||
def get_no_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
|
||||||
""" |
|
||||||
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns a new |
|
||||||
DataFrame containing only the transactions that do not have a match in both the OBT and GPT DataFrames. |
|
||||||
|
|
||||||
Args: |
|
||||||
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
|
||||||
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
|
||||||
|
|
||||||
Returns: |
|
||||||
pd.DataFrame: A pandas DataFrame containing the transactions that do not have a match in both the OBT and GPT |
|
||||||
DataFrames. |
|
||||||
""" |
|
||||||
# Merge the two DataFrames using the contract number as the join key |
|
||||||
merged_df = pd.merge( |
|
||||||
obt_df, gpt_df, |
|
||||||
how="outer", |
|
||||||
on=["contract_number"], |
|
||||||
suffixes=("_ob", "_gp") |
|
||||||
) |
|
||||||
|
|
||||||
# Filter the merged DataFrame to include only the transactions that do not have a match in both OBT and GPT |
|
||||||
no_match = merged_df.loc[ |
|
||||||
(merged_df["Source_ob"].isna()) | |
|
||||||
(merged_df["Source_gp"].isna()) |
|
||||||
] |
|
||||||
|
|
||||||
# Fill in missing values and drop unnecessary columns |
|
||||||
no_match["Source"] = no_match["Source_ob"].fillna("GP") |
|
||||||
no_match["onhold_amount"] = no_match["onhold_amount_ob"].fillna(no_match["onhold_amount_gp"]) |
|
||||||
no_match.drop(columns=[ |
|
||||||
"ID_ob", "ID_gp", |
|
||||||
"onhold_amount_ob", "onhold_amount_gp", |
|
||||||
"Source_ob", "Source_gp" |
|
||||||
], |
|
||||||
inplace=True) |
|
||||||
|
|
||||||
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
|
||||||
no_match = no_match[ |
|
||||||
[ "Source", "contract_number", "onhold_amount"] |
|
||||||
] |
|
||||||
|
|
||||||
return no_match |
|
||||||
|
|
||||||
|
|
||||||
def get_not_full_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
|
||||||
""" |
|
||||||
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns two new |
|
||||||
DataFrames. The first DataFrame contains the transactions that have a full match on both the OBT and GPT DataFrames, |
|
||||||
and the second DataFrame contains the transactions that do not have a full match. |
|
||||||
|
|
||||||
Args: |
|
||||||
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
|
||||||
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
|
||||||
|
|
||||||
Returns: |
|
||||||
tuple(pd.DataFrame, pd.DataFrame): A tuple of two DataFrames. The first DataFrame contains the transactions that |
|
||||||
have a full match on both the OBT and GPT DataFrames, and the second DataFrame contains the transactions that do |
|
||||||
not have a full match. |
|
||||||
""" |
|
||||||
# Combine the two DataFrames using an outer join on the contract number and on-hold amount |
|
||||||
merged_df = pd.merge( |
|
||||||
obt_df, gpt_df, |
|
||||||
how="outer", |
|
||||||
on=["ID", "contract_number", "onhold_amount"], |
|
||||||
suffixes=("_ob", "_gp") |
|
||||||
) |
|
||||||
|
|
||||||
# Filter the merged DataFrame to include only the transactions that have a full match in both OBT and GPT |
|
||||||
full_matched = merged_df.dropna(subset=["Source_ob", "Source_gp"]) |
|
||||||
full_matched.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
|
||||||
|
|
||||||
# Create a boolean mask for the rows to drop in full_matched |
|
||||||
mask = merged_df["ID"].isin(full_matched["ID"]) |
|
||||||
# Use the mask to remove the selected rows and create a new DataFrame for not full match |
|
||||||
not_full_match = merged_df[~mask] |
|
||||||
# This includes items that DO match contracts, but not amounts |
|
||||||
# It can have multiple items from one source with the same contract number |
|
||||||
|
|
||||||
# Create a new column with the data source, using OBT as the default and GPT as backup if missing |
|
||||||
not_full_match["Source"] = not_full_match["Source_ob"].fillna(not_full_match["Source_gp"]) |
|
||||||
|
|
||||||
# Drop the redundant Source columns |
|
||||||
not_full_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
|
||||||
|
|
||||||
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
|
||||||
not_full_match = not_full_match[ |
|
||||||
[ "Source", "contract_number", "onhold_amount"] |
|
||||||
] |
|
||||||
|
|
||||||
# Return the two DataFrames |
|
||||||
return full_matched, not_full_match |
|
||||||
|
|
||||||
|
|
||||||
def get_contract_match(not_full_match: pd.DataFrame) -> pd.DataFrame: |
|
||||||
""" |
|
||||||
Given a pandas DataFrame containing transactions that do not have a full match between OBT and GPT, this function |
|
||||||
returns a new DataFrame containing only the transactions that have a matching contract number in both OBT and GPT. |
|
||||||
|
|
||||||
Args: |
|
||||||
not_full_match (pd.DataFrame): A pandas DataFrame containing transactions that do not have a full match between |
|
||||||
OBT and GPT. |
|
||||||
|
|
||||||
Returns: |
|
||||||
pd.DataFrame: A pandas DataFrame containing only the transactions that have a matching contract number in both |
|
||||||
OBT and GPT. |
|
||||||
""" |
|
||||||
# Filter the not_full_match DataFrame by source |
|
||||||
ob_df = not_full_match[not_full_match["Source"] == "OB"] |
|
||||||
gp_df = not_full_match[not_full_match["Source"] == "GP"] |
|
||||||
|
|
||||||
# Merge the two filtered DataFrames on the contract number |
|
||||||
contract_match = pd.merge( |
|
||||||
ob_df, gp_df, |
|
||||||
how="inner", |
|
||||||
on=["contract_number"], |
|
||||||
suffixes=("_ob", "_gp") |
|
||||||
) |
|
||||||
|
|
||||||
# Fill in missing values in the Source column and drop the redundant columns |
|
||||||
contract_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
|
||||||
|
|
||||||
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
|
||||||
contract_match = contract_match[ |
|
||||||
[ "contract_number", "onhold_amount_ob", "onhold_amount_gp"] |
|
||||||
] |
|
||||||
|
|
||||||
return contract_match |
|
||||||
@ -1,21 +0,0 @@ |
|||||||
from pandas import DataFrame, Series, read_sql_query, read_excel |
|
||||||
import sqlite3 as sqll |
|
||||||
import sqlalchemy as sqa |
|
||||||
import argparse |
|
||||||
|
|
||||||
def drop_unnamed(df: DataFrame): |
|
||||||
cols = [c for c in df.columns if "Unnamed" in c] |
|
||||||
df.drop(cols, axis=1, inplace=True) |
|
||||||
|
|
||||||
parser = argparse.ArgumentParser( |
|
||||||
prog="HoldReconcilerRecord", |
|
||||||
) |
|
||||||
parser.add_argument("-i", "--input") |
|
||||||
args = parser.parse_args() |
|
||||||
# Resolution col |
|
||||||
|
|
||||||
no_match: DataFrame = read_excel(args.input, sheet_name="No Match") |
|
||||||
amt_mm: DataFrame = read_excel(args.input, sheet_name="Amount Mismatch") |
|
||||||
drop_unnamed(no_match) |
|
||||||
drop_unnamed(amt_mm) |
|
||||||
print(no_match) |
|
||||||
@ -1,191 +0,0 @@ |
|||||||
import pandas as pd |
|
||||||
from pandas import DataFrame, Series |
|
||||||
import re |
|
||||||
from re import Pattern |
|
||||||
import os |
|
||||||
from os.path import basename |
|
||||||
import glob |
|
||||||
import logging |
|
||||||
from pathlib import Path |
|
||||||
from tomllib import load |
|
||||||
import logging.config |
|
||||||
from datetime import datetime as dt |
|
||||||
|
|
||||||
""" |
|
||||||
[ ] Pull in past reconciliations to check against |
|
||||||
[ ] Record reconciled transaction (connect with VBA) |
|
||||||
[ ] Check GP against the database |
|
||||||
[ ] Check OB against the database |
|
||||||
[ ] Add resolution column to error sheets |
|
||||||
""" |
|
||||||
|
|
||||||
# Custom module for reconciliation |
|
||||||
from rec_lib import get_contract_match, get_no_match, \ |
|
||||||
get_not_full_match, get_overdue, filter_gp, create_transaction_df |
|
||||||
|
|
||||||
def setup_logging(): |
|
||||||
""" |
|
||||||
Sets up logging configuration from the TOML file. If the logging configuration fails to be loaded from the file, |
|
||||||
a default logging configuration is used instead. |
|
||||||
|
|
||||||
Returns: |
|
||||||
logging.Logger: The logger instance. |
|
||||||
""" |
|
||||||
with open("config.toml", "rb") as f: |
|
||||||
config_dict: dict = load(f) |
|
||||||
try: |
|
||||||
# Try to load logging configuration from the TOML file |
|
||||||
logging.config.dictConfig(config_dict["logger"]) |
|
||||||
except Exception as e: |
|
||||||
# If the logging configuration fails, use a default configuration and log the error |
|
||||||
logger = logging.getLogger() |
|
||||||
logger.setLevel(logging.DEBUG) |
|
||||||
logger.warning("Failed setting up logger!") |
|
||||||
logger.exception(e) |
|
||||||
logger.warning(f"Config:\n{config_dict}") |
|
||||||
return logger |
|
||||||
|
|
||||||
|
|
||||||
setup_logging() |
|
||||||
logger = logging.getLogger(__name__) |
|
||||||
logger.info(f"Logger started with level: {logger.level}") |
|
||||||
|
|
||||||
def find_most_recent_file(folder_path: Path, file_pattern: Pattern) -> str: |
|
||||||
""" |
|
||||||
Given a folder path and a regular expression pattern, this function returns the path of the most recently modified |
|
||||||
file in the folder that matches the pattern. |
|
||||||
|
|
||||||
Args: |
|
||||||
folder_path (Path): A pathlib.Path object representing the folder to search. |
|
||||||
file_pattern (Pattern): A regular expression pattern used to filter the files in the folder. |
|
||||||
|
|
||||||
Returns: |
|
||||||
str: The path of the most recently modified file in the folder that matches the pattern. |
|
||||||
""" |
|
||||||
# Find all files in the folder that match the pattern |
|
||||||
files = glob.glob(f"{folder_path}/*") |
|
||||||
logger.debug(f"files: {files}") |
|
||||||
|
|
||||||
# Get the modification time of each file and filter to only those that match the pattern |
|
||||||
file_times = [(os.path.getmtime(path), path) for path in files if re.match(file_pattern, basename(path))] |
|
||||||
|
|
||||||
# Sort the files by modification time (most recent first) |
|
||||||
file_times.sort(reverse=True) |
|
||||||
logger.debug(f"file times: {file_times}") |
|
||||||
|
|
||||||
# Return the path of the most recent file |
|
||||||
return file_times[0][1] |
|
||||||
|
|
||||||
|
|
||||||
def check_sheet(df_cols: list[str], excel_col_config: dict) -> bool: |
|
||||||
""" |
|
||||||
Given a list of column names and a dictionary of column name configurations, this function checks if the required |
|
||||||
columns are present in the list of column names. |
|
||||||
|
|
||||||
Args: |
|
||||||
df_cols (list[str]): A list of column names. |
|
||||||
excel_col_config (dict): A dictionary of column name configurations. |
|
||||||
|
|
||||||
Returns: |
|
||||||
bool: True if all of the required columns are present in the list of column names, False otherwise. |
|
||||||
""" |
|
||||||
# Get the list of required columns from the column configuration dictionary |
|
||||||
required_cols: list[str] = list(excel_col_config.values()) |
|
||||||
# Check if all of the required columns are present in the list of column names |
|
||||||
return all([col in df_cols for col in required_cols]) |
|
||||||
|
|
||||||
|
|
||||||
def get_dataframes(work_dir: str, excelConfig: dict) -> tuple[pd.DataFrame|None, pd.DataFrame|None]: |
|
||||||
""" |
|
||||||
Given a dictionary of Excel configuration options, this function searches for the most recently modified GP and OB |
|
||||||
Excel files in a "Work" folder and returns their corresponding dataframes. |
|
||||||
|
|
||||||
Args: |
|
||||||
excelConfig (dict): A dictionary containing configuration options for the GP and OB Excel files. |
|
||||||
|
|
||||||
Returns: |
|
||||||
tuple[pd.DataFrame|None, pd.DataFrame|None]: A tuple containing the OB and GP dataframes, respectively. |
|
||||||
""" |
|
||||||
|
|
||||||
# Define regular expression patterns to match the GP and OB Excel files |
|
||||||
gp_regex: Pattern = re.compile(".*gp.*\.xlsx$", re.IGNORECASE) |
|
||||||
ob_regex: Pattern = re.compile(".*ob.*\.xlsx$", re.IGNORECASE) |
|
||||||
|
|
||||||
# Find the paths of the most recently modified GP and OB Excel files |
|
||||||
gp_file_path = find_most_recent_file(work_dir, gp_regex) |
|
||||||
logger.debug(f"gp_file_path: {gp_file_path}") |
|
||||||
ob_file_path = find_most_recent_file(work_dir, ob_regex) |
|
||||||
logger.debug(f"gp_file_path: {ob_file_path}") |
|
||||||
|
|
||||||
# Read the GP and OB Excel files into dataframes and check that each dataframe has the required columns |
|
||||||
gp_xl = pd.ExcelFile(gp_file_path) |
|
||||||
gp_config = excelConfig["GP"] |
|
||||||
gp_sheets = gp_xl.sheet_names |
|
||||||
gp_dfs = pd.read_excel(gp_xl, sheet_name=gp_sheets) |
|
||||||
for sheet in gp_dfs: |
|
||||||
if check_sheet(gp_dfs[sheet].columns, gp_config): |
|
||||||
gp_df = gp_dfs[sheet] |
|
||||||
break |
|
||||||
|
|
||||||
ob_xl = pd.ExcelFile(ob_file_path) |
|
||||||
ob_config = excelConfig["OB"] |
|
||||||
ob_sheets = ob_xl.sheet_names |
|
||||||
ob_dfs = pd.read_excel(ob_xl, sheet_name=ob_sheets) |
|
||||||
for sheet in ob_dfs: |
|
||||||
if check_sheet(ob_dfs[sheet].columns, ob_config): |
|
||||||
ob_df = ob_dfs[sheet] |
|
||||||
break |
|
||||||
|
|
||||||
return ob_df, gp_df |
|
||||||
|
|
||||||
|
|
||||||
def main() -> int: |
|
||||||
""" |
|
||||||
This is the main function for the script. It reads configuration options from a TOML file, reads in the GP and OB |
|
||||||
Excel files, performs data reconciliation and analysis, and writes the results to a new Excel file. |
|
||||||
|
|
||||||
Returns: |
|
||||||
int: 0 if the script executes successfully. |
|
||||||
""" |
|
||||||
# Read the configuration options from a TOML file |
|
||||||
with open("config.toml", "rb") as f: |
|
||||||
config_dict: dict = load(f) |
|
||||||
logger.debug(f"Config: {config_dict}") |
|
||||||
|
|
||||||
excelConfig: dict = config_dict["ExcelColumns"] |
|
||||||
|
|
||||||
# Get the GP and OB dataframes from the Excel files |
|
||||||
ob_df, gp_df = get_dataframes(config_dict["write_dir"] ,excelConfig) |
|
||||||
assert not ob_df.empty, "OB Data empty!" |
|
||||||
assert not gp_df.empty, "GP Data empty!" |
|
||||||
|
|
||||||
# Filter the GP dataframe to include only relevant transactions |
|
||||||
fgp_df: DataFrame = filter_gp(gp_df, config_dict) |
|
||||||
# Get the overdue transactions from the OB dataframe |
|
||||||
overdue: DataFrame = get_overdue(ob_df, excelConfig["OB"]) |
|
||||||
|
|
||||||
# Create transaction dataframes for the GP and OB dataframes |
|
||||||
ob_transactions: DataFrame = create_transaction_df(ob_df, 'OB', excelConfig) |
|
||||||
gp_transactions: DataFrame = create_transaction_df(fgp_df, 'GP', excelConfig) |
|
||||||
|
|
||||||
# Get the transactions that do not have matches in both the GP and OB dataframes |
|
||||||
no_match: DataFrame = get_no_match(ob_transactions, gp_transactions) |
|
||||||
|
|
||||||
# Get the transactions that have matches in both the GP and OB dataframes but have amount mismatches |
|
||||||
full_match, not_full_match = get_not_full_match(ob_transactions, gp_transactions) |
|
||||||
only_contracts_match: DataFrame = get_contract_match(not_full_match) |
|
||||||
|
|
||||||
# Write the results to a new Excel file |
|
||||||
with pd.ExcelWriter(f"{config_dict['write_dir']}/Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx", mode='w') as writer: |
|
||||||
full_match.to_excel(writer,sheet_name="FULL", index=False) |
|
||||||
no_match.to_excel(writer, sheet_name="No Match", index=False) |
|
||||||
only_contracts_match.to_excel(writer, sheet_name="Amount Mismatch", index=False) |
|
||||||
overdue.to_excel(writer, sheet_name="Overdue", index=False) |
|
||||||
|
|
||||||
return 0 |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": |
|
||||||
print("Starting") |
|
||||||
main() |
|
||||||
print("Completed") |
|
||||||
@ -0,0 +1,188 @@ |
|||||||
|
from pandas import DataFrame, merge, to_datetime, NaT |
||||||
|
from numpy import concatenate |
||||||
|
from abc import ABC, abstractmethod |
||||||
|
from logging import getLogger |
||||||
|
import re |
||||||
|
from typing import Literal |
||||||
|
import datetime |
||||||
|
|
||||||
|
from helpers import CN_REGEX |
||||||
|
|
||||||
|
logger = getLogger(__name__) |
||||||
|
|
||||||
|
|
||||||
|
class HoldReport(ABC): |
||||||
|
|
||||||
|
source = "" |
||||||
|
|
||||||
|
def __init__(self, dataframe: DataFrame, reports_config: dict) -> None: |
||||||
|
self.config = reports_config |
||||||
|
self.df = dataframe |
||||||
|
self._normalize() |
||||||
|
|
||||||
|
|
||||||
|
def _normalize(self): |
||||||
|
|
||||||
|
# Rename the columns to standardize the column names |
||||||
|
self.df.rename( columns= { unique_cols[self.source] : common_col |
||||||
|
for common_col, unique_cols in self.config["shared_columns"].items() |
||||||
|
}, inplace=True) |
||||||
|
|
||||||
|
# Convert the on-hold amount column to float format and round to two decimal places |
||||||
|
self.df["onhold_amount"] = self.df["onhold_amount"].astype(float).round(2) |
||||||
|
|
||||||
|
# Use regex to extract the contract number from the column values and create a new column with the standardized format |
||||||
|
self.df["contract_number"] = self.df["contract_number"].apply( |
||||||
|
lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) |
||||||
|
else re.search(CN_REGEX, str(cn)).group(0) |
||||||
|
) |
||||||
|
|
||||||
|
# Create a new column with a unique transaction ID |
||||||
|
self.df["ID"] = self.df["contract_number"] +'_'+\ |
||||||
|
self.df["onhold_amount"].astype(str) |
||||||
|
|
||||||
|
# Create a new column with the data source |
||||||
|
self.df["Source"] = self.source |
||||||
|
|
||||||
|
|
||||||
|
def _get_no_match(self, other: 'HoldReport'): |
||||||
|
# Merge the two DataFrames using the contract number as the join key |
||||||
|
outer_merge = merge( |
||||||
|
self.df, other.df, |
||||||
|
how="outer", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=('_'+self.source, '_'+other.source) |
||||||
|
) |
||||||
|
|
||||||
|
# Filter the merged DataFrame to include only the transactions that do not have a match in both OBT and GPT |
||||||
|
no_match = outer_merge.loc[ |
||||||
|
(outer_merge[f"Source_{self.source}"].isna()) | |
||||||
|
(outer_merge[f"Source_{other.source}"].isna()) |
||||||
|
] |
||||||
|
|
||||||
|
# Fill in missing values and drop unnecessary columns |
||||||
|
no_match["Source"] = no_match[f"Source_{self.source}"].fillna("GP") |
||||||
|
no_match["onhold_amount"] = no_match[f"onhold_amount_{self.source}"].fillna( |
||||||
|
no_match[f"onhold_amount_{other.source}"] |
||||||
|
) |
||||||
|
no_match["vendor_name"] = no_match[f"vendor_name_{self.source}"].fillna( |
||||||
|
no_match[f"vendor_name_{other.source}"] |
||||||
|
) |
||||||
|
|
||||||
|
return no_match |
||||||
|
|
||||||
|
|
||||||
|
def _get_contract_matches(self, other: 'HoldReport') -> DataFrame: |
||||||
|
""" |
||||||
|
|
||||||
|
""" |
||||||
|
# Merge the two filtered DataFrames on the contract number |
||||||
|
contract_match = merge( |
||||||
|
self.df, other.df, |
||||||
|
how="inner", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=('_'+self.source, '_'+other.source) |
||||||
|
) |
||||||
|
|
||||||
|
contract_match["vendor_name"] = contract_match[f"vendor_name_{self.source}"].fillna( |
||||||
|
contract_match[f"vendor_name_{other.source}"] |
||||||
|
) |
||||||
|
|
||||||
|
|
||||||
|
return contract_match |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def _add_work_columns(df: DataFrame) -> DataFrame: |
||||||
|
""" |
||||||
|
Add empty columns to the dataframe to faciliate working through the report. |
||||||
|
""" |
||||||
|
WORK_COLS = ["Resolution", "Notes"] |
||||||
|
for col in WORK_COLS: |
||||||
|
df[col] = '' |
||||||
|
return df |
||||||
|
|
||||||
|
def reconcile(self, other: 'HoldReport') -> tuple[DataFrame]: |
||||||
|
""" |
||||||
|
""" |
||||||
|
no_match: DataFrame = self._get_no_match(other) |
||||||
|
no_match.to_excel("NOMATCH.xlsx") |
||||||
|
logger.debug(f"No_match: {no_match}") |
||||||
|
|
||||||
|
amount_mismatch: DataFrame = self._get_contract_matches(other) |
||||||
|
amount_mismatch.to_excel("AMTMM.xlsx") |
||||||
|
logger.debug(f"amt_mismatche: {no_match}") |
||||||
|
|
||||||
|
# Select and reorder columns |
||||||
|
no_match = no_match[ |
||||||
|
["Source"].extend(self.config["output_columns"]) |
||||||
|
] |
||||||
|
no_match = self._add_work_columns(no_match) |
||||||
|
|
||||||
|
amount_mismatch = amount_mismatch[ |
||||||
|
self.config["output_columns"] |
||||||
|
] |
||||||
|
amount_mismatch = self._add_work_columns(amount_mismatch) |
||||||
|
|
||||||
|
return no_match, amount_mismatch |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class OnBaseReport(HoldReport): |
||||||
|
|
||||||
|
source = "OB" |
||||||
|
|
||||||
|
def get_overdue(self) -> DataFrame: |
||||||
|
""" |
||||||
|
""" |
||||||
|
self.df["install_date"] = to_datetime(self.df["install_date"]) |
||||||
|
self.df["install_date"].fillna(NaT, inplace=True) |
||||||
|
return self.df[self.df["install_date"].dt.date < datetime.date.today()] |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
class GreatPlainsReport(HoldReport): |
||||||
|
|
||||||
|
source = "GP" |
||||||
|
filted_df: bool = False |
||||||
|
|
||||||
|
def __init__(self, dataframe: DataFrame, report_config: dict) -> None: |
||||||
|
|
||||||
|
self._filter( |
||||||
|
gp_report_df= dataframe, |
||||||
|
doc_num_filters= report_config["gp_filters"]["doc_num_filters"], |
||||||
|
good_po_num_regex= report_config["gp_filters"]["po_filter"] |
||||||
|
) |
||||||
|
super().__init__(dataframe, report_config) |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def _filter(gp_report_df: DataFrame, |
||||||
|
doc_num_filters: list[str], good_po_num_regex: str) -> DataFrame: |
||||||
|
|
||||||
|
GOOD_PO_NUM = re.compile(good_po_num_regex, re.IGNORECASE) |
||||||
|
|
||||||
|
bad_doc_num = '' |
||||||
|
rx : str |
||||||
|
for rx in doc_num_filters: |
||||||
|
bad_doc_num += f"({rx})|" |
||||||
|
bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) |
||||||
|
|
||||||
|
# Create a mask/filter that will keep rows that match these |
||||||
|
# requirments |
||||||
|
keep_mask = ( |
||||||
|
(gp_report_df["Document Type"] == "Invoice") & |
||||||
|
(gp_report_df["Purchase Order Number"].str.contains(GOOD_PO_NUM)) |
||||||
|
) |
||||||
|
|
||||||
|
# Get the rows that DO NOT fit the keep_mask |
||||||
|
rows_to_drop = gp_report_df[~keep_mask].index |
||||||
|
# Drop the rows to filter |
||||||
|
gp_report_df.drop(rows_to_drop, inplace=True) |
||||||
|
|
||||||
|
# Create a filter to remove rows that meet this requirment |
||||||
|
# Making this a negative in the keep mask is more trouble than |
||||||
|
# it's worth |
||||||
|
remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num) |
||||||
|
rows_to_drop = gp_report_df[remove_mask].index |
||||||
|
gp_report_df.drop(rows_to_drop, inplace=True) |
||||||
|
|
||||||
|
return gp_report_df |
||||||
@ -0,0 +1 @@ |
|||||||
|
2.0 |
||||||
Loading…
Reference in new issue