Compare commits
No commits in common. 'dev' and 'master' have entirely different histories.
@ -0,0 +1,52 @@ |
|||||||
|
write_dir = "Work" |
||||||
|
DocNumFilter = [ |
||||||
|
"p(oin)?ts", |
||||||
|
"pool", |
||||||
|
"promo", |
||||||
|
"o(ver)?f(und)?", |
||||||
|
"m(ar)?ke?t", |
||||||
|
"title", |
||||||
|
"adj", |
||||||
|
"reg free", |
||||||
|
"cma" |
||||||
|
] |
||||||
|
[ExcelColumns] |
||||||
|
|
||||||
|
[ExcelColumns.OB] |
||||||
|
contract_number = "Contract" # 3070508-007 |
||||||
|
onhold_amount = "CurrentOnHold" |
||||||
|
install_date = "InstallDate" |
||||||
|
|
||||||
|
[ExcelColumns.GP] |
||||||
|
contract_number = "Transaction Description" # 1234-56789 |
||||||
|
onhold_amount = "Current Trx Amount" |
||||||
|
doc_num = "Document Number" # 1-316141 HOLD |
||||||
|
pur_order = "Purchase Order Number" # ABC123 |
||||||
|
doc_type = "Document Type" # Invoice or Credit Memo |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
[logger] |
||||||
|
version = 1 |
||||||
|
|
||||||
|
disable_existing_loggers = false |
||||||
|
|
||||||
|
[logger.formatters.custom] |
||||||
|
format = "'%(asctime)s - %(module)s - %(levelname)s - %(message)s'" |
||||||
|
|
||||||
|
[logger.handlers.console] |
||||||
|
class = "logging.StreamHandler" |
||||||
|
level = "DEBUG" |
||||||
|
formatter = "custom" |
||||||
|
stream = "ext://sys.stdout" |
||||||
|
|
||||||
|
[logger.handlers.file] |
||||||
|
class = "logging.FileHandler" |
||||||
|
level = "DEBUG" |
||||||
|
formatter = "custom" |
||||||
|
filename = "on_hold.log" |
||||||
|
|
||||||
|
[logger.root] |
||||||
|
level = "DEBUG" |
||||||
|
handlers = ["console", "file"] |
||||||
@ -0,0 +1,251 @@ |
|||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame |
||||||
|
from datetime import datetime as dt |
||||||
|
import datetime |
||||||
|
import re |
||||||
|
from typing import Literal |
||||||
|
import logging |
||||||
|
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__) |
||||||
|
|
||||||
|
|
||||||
|
def get_overdue(onbase_df: DataFrame, onbase_excel_config) -> DataFrame: |
||||||
|
""" |
||||||
|
Given a DataFrame containing OnBase installation data and a dictionary containing the OnBase Excel configuration, |
||||||
|
this function returns a DataFrame containing the rows from `onbase_df` that have an installation date that is before |
||||||
|
the current date. |
||||||
|
|
||||||
|
Args: |
||||||
|
onbase_df (pd.DataFrame): A pandas DataFrame containing OnBase installation data. |
||||||
|
onbase_excel_config (dict): A dictionary containing the OnBase Excel configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the rows from `onbase_df` that have an installation date that is before |
||||||
|
the current date. |
||||||
|
""" |
||||||
|
id_col = onbase_excel_config["install_date"] |
||||||
|
onbase_df[id_col] = pd.to_datetime(onbase_df[id_col]) |
||||||
|
onbase_df[id_col].fillna(pd.NaT, inplace=True) |
||||||
|
return onbase_df[onbase_df[id_col].dt.date < datetime.date.today()] |
||||||
|
|
||||||
|
|
||||||
|
def filter_gp(gp_dataframe: pd.DataFrame, full_config: dict) -> pd.DataFrame: |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing GP data and a dictionary containing the GP configuration, this function |
||||||
|
filters out rows from the DataFrame that are not needed for further analysis based on certain criteria. |
||||||
|
|
||||||
|
Args: |
||||||
|
gp_dataframe (pd.DataFrame): A pandas DataFrame containing GP data. |
||||||
|
gp_config (dict): A dictionary containing the GP configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the filtered GP data. |
||||||
|
""" |
||||||
|
|
||||||
|
# Excludes anything that contains cma with a space or digit following it |
||||||
|
# CMA23532 would be excluded but 'John Locman' would be allowed |
||||||
|
GOOD_PO_NUM = re.compile(r"^(?!.*cma(\s|\d)).*$", re.IGNORECASE) |
||||||
|
|
||||||
|
gp_config: dict = full_config["ExcelColumns"]["GP"] |
||||||
|
doc_num_regexes: list[str] = full_config["DocNumFilter"] |
||||||
|
|
||||||
|
bad_doc_num = '' |
||||||
|
rx : str |
||||||
|
for rx in doc_num_regexes: |
||||||
|
bad_doc_num += f"({rx})|" |
||||||
|
bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) |
||||||
|
logger.debug(f"Doc # filter: {bad_doc_num}") |
||||||
|
# Create a filter/mask to use on the data |
||||||
|
mask = ( |
||||||
|
(gp_dataframe[gp_config['doc_type']] == "Invoice") & |
||||||
|
(gp_dataframe[gp_config['pur_order']].str.contains(GOOD_PO_NUM)) |
||||||
|
) |
||||||
|
|
||||||
|
# Get the rows to drop based on the filter/mask |
||||||
|
rows_to_drop = gp_dataframe[~mask].index |
||||||
|
|
||||||
|
# Drop the rows and return the filtered DataFrame |
||||||
|
filtered_df = gp_dataframe.drop(rows_to_drop, inplace=False) |
||||||
|
|
||||||
|
mask = filtered_df[gp_config['doc_num']].str.contains(bad_doc_num) |
||||||
|
rows_to_drop = filtered_df[mask].index |
||||||
|
|
||||||
|
return filtered_df.drop(rows_to_drop, inplace=False) |
||||||
|
|
||||||
|
|
||||||
|
def create_transaction_df(dataframe: pd.DataFrame, source: Literal["GP", "OB"], excelConfig: dict): |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing transaction data, the source of the data ("GP" or "OB"), and a dictionary |
||||||
|
containing the Excel configuration, this function creates a new DataFrame with columns for the contract number, |
||||||
|
the amount on hold, a unique transaction ID, and the source of the data. |
||||||
|
|
||||||
|
Args: |
||||||
|
dataframe (pd.DataFrame): A pandas DataFrame containing transaction data. |
||||||
|
source (Literal["GP", "OB"]): The source of the data ("GP" or "OB"). |
||||||
|
excelConfig (dict): A dictionary containing the Excel configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the contract number, amount on hold, transaction ID, and data source |
||||||
|
for each transaction in the original DataFrame. |
||||||
|
""" |
||||||
|
column_config: dict = excelConfig[source] |
||||||
|
logger.debug(f"column_config: {column_config}") |
||||||
|
# Create a new DataFrame with the contract number and on-hold amount columns |
||||||
|
transactions = dataframe[[column_config["contract_number"], column_config["onhold_amount"]]].copy() |
||||||
|
|
||||||
|
# Rename the columns to standardize the column names |
||||||
|
transactions.rename(columns={ |
||||||
|
column_config["contract_number"]: "contract_number", |
||||||
|
column_config["onhold_amount"]: "onhold_amount", |
||||||
|
}, inplace=True) |
||||||
|
|
||||||
|
# Convert the on-hold amount column to float format and round to two decimal places |
||||||
|
transactions["onhold_amount"] = transactions["onhold_amount"].astype(float).round(2) |
||||||
|
|
||||||
|
# Use regex to extract the contract number from the column values and create a new column with the standardized format |
||||||
|
CN_REGEX = re.compile(r"\d{7}(-\d{3})?") |
||||||
|
transactions["contract_number"] = transactions["contract_number"].apply( |
||||||
|
lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) |
||||||
|
else re.search(CN_REGEX, str(cn)).group(0) |
||||||
|
) |
||||||
|
|
||||||
|
# Create a new column with a unique transaction ID |
||||||
|
transactions["ID"] = transactions["contract_number"] +'_'+\ |
||||||
|
transactions["onhold_amount"].astype(str) |
||||||
|
|
||||||
|
# Create a new column with the data source |
||||||
|
transactions["Source"] = source |
||||||
|
|
||||||
|
# Return the new DataFrame with the contract number, on-hold amount, transaction ID, and data source columns |
||||||
|
return transactions |
||||||
|
|
||||||
|
|
||||||
|
def get_no_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
||||||
|
""" |
||||||
|
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns a new |
||||||
|
DataFrame containing only the transactions that do not have a match in both the OBT and GPT DataFrames. |
||||||
|
|
||||||
|
Args: |
||||||
|
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
||||||
|
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the transactions that do not have a match in both the OBT and GPT |
||||||
|
DataFrames. |
||||||
|
""" |
||||||
|
# Merge the two DataFrames using the contract number as the join key |
||||||
|
merged_df = pd.merge( |
||||||
|
obt_df, gpt_df, |
||||||
|
how="outer", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Filter the merged DataFrame to include only the transactions that do not have a match in both OBT and GPT |
||||||
|
no_match = merged_df.loc[ |
||||||
|
(merged_df["Source_ob"].isna()) | |
||||||
|
(merged_df["Source_gp"].isna()) |
||||||
|
] |
||||||
|
|
||||||
|
# Fill in missing values and drop unnecessary columns |
||||||
|
no_match["Source"] = no_match["Source_ob"].fillna("GP") |
||||||
|
no_match["onhold_amount"] = no_match["onhold_amount_ob"].fillna(no_match["onhold_amount_gp"]) |
||||||
|
no_match.drop(columns=[ |
||||||
|
"ID_ob", "ID_gp", |
||||||
|
"onhold_amount_ob", "onhold_amount_gp", |
||||||
|
"Source_ob", "Source_gp" |
||||||
|
], |
||||||
|
inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
no_match = no_match[ |
||||||
|
[ "Source", "contract_number", "onhold_amount"] |
||||||
|
] |
||||||
|
|
||||||
|
return no_match |
||||||
|
|
||||||
|
|
||||||
|
def get_not_full_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
||||||
|
""" |
||||||
|
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns two new |
||||||
|
DataFrames. The first DataFrame contains the transactions that have a full match on both the OBT and GPT DataFrames, |
||||||
|
and the second DataFrame contains the transactions that do not have a full match. |
||||||
|
|
||||||
|
Args: |
||||||
|
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
||||||
|
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
tuple(pd.DataFrame, pd.DataFrame): A tuple of two DataFrames. The first DataFrame contains the transactions that |
||||||
|
have a full match on both the OBT and GPT DataFrames, and the second DataFrame contains the transactions that do |
||||||
|
not have a full match. |
||||||
|
""" |
||||||
|
# Combine the two DataFrames using an outer join on the contract number and on-hold amount |
||||||
|
merged_df = pd.merge( |
||||||
|
obt_df, gpt_df, |
||||||
|
how="outer", |
||||||
|
on=["ID", "contract_number", "onhold_amount"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Filter the merged DataFrame to include only the transactions that have a full match in both OBT and GPT |
||||||
|
full_matched = merged_df.dropna(subset=["Source_ob", "Source_gp"]) |
||||||
|
full_matched.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Create a boolean mask for the rows to drop in full_matched |
||||||
|
mask = merged_df["ID"].isin(full_matched["ID"]) |
||||||
|
# Use the mask to remove the selected rows and create a new DataFrame for not full match |
||||||
|
not_full_match = merged_df[~mask] |
||||||
|
# This includes items that DO match contracts, but not amounts |
||||||
|
# It can have multiple items from one source with the same contract number |
||||||
|
|
||||||
|
# Create a new column with the data source, using OBT as the default and GPT as backup if missing |
||||||
|
not_full_match["Source"] = not_full_match["Source_ob"].fillna(not_full_match["Source_gp"]) |
||||||
|
|
||||||
|
# Drop the redundant Source columns |
||||||
|
not_full_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
not_full_match = not_full_match[ |
||||||
|
[ "Source", "contract_number", "onhold_amount"] |
||||||
|
] |
||||||
|
|
||||||
|
# Return the two DataFrames |
||||||
|
return full_matched, not_full_match |
||||||
|
|
||||||
|
|
||||||
|
def get_contract_match(not_full_match: pd.DataFrame) -> pd.DataFrame: |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing transactions that do not have a full match between OBT and GPT, this function |
||||||
|
returns a new DataFrame containing only the transactions that have a matching contract number in both OBT and GPT. |
||||||
|
|
||||||
|
Args: |
||||||
|
not_full_match (pd.DataFrame): A pandas DataFrame containing transactions that do not have a full match between |
||||||
|
OBT and GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing only the transactions that have a matching contract number in both |
||||||
|
OBT and GPT. |
||||||
|
""" |
||||||
|
# Filter the not_full_match DataFrame by source |
||||||
|
ob_df = not_full_match[not_full_match["Source"] == "OB"] |
||||||
|
gp_df = not_full_match[not_full_match["Source"] == "GP"] |
||||||
|
|
||||||
|
# Merge the two filtered DataFrames on the contract number |
||||||
|
contract_match = pd.merge( |
||||||
|
ob_df, gp_df, |
||||||
|
how="inner", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Fill in missing values in the Source column and drop the redundant columns |
||||||
|
contract_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
contract_match = contract_match[ |
||||||
|
[ "contract_number", "onhold_amount_ob", "onhold_amount_gp"] |
||||||
|
] |
||||||
|
|
||||||
|
return contract_match |
||||||
@ -0,0 +1,190 @@ |
|||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame, Series |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import os |
||||||
|
from os.path import basename |
||||||
|
import glob |
||||||
|
import logging |
||||||
|
from pathlib import Path |
||||||
|
from tomllib import load |
||||||
|
import logging.config |
||||||
|
from datetime import datetime as dt |
||||||
|
|
||||||
|
""" |
||||||
|
[ ] Pull in past reconciliations to check against |
||||||
|
[ ] Record reconciled transaction (connect with VBA) |
||||||
|
[ ] Check GP against the database |
||||||
|
[ ] Check OB against the database |
||||||
|
""" |
||||||
|
|
||||||
|
# Custom module for reconciliation |
||||||
|
from rec_lib import get_contract_match, get_no_match, \ |
||||||
|
get_not_full_match, get_overdue, filter_gp, create_transaction_df |
||||||
|
|
||||||
|
def setup_logging(): |
||||||
|
""" |
||||||
|
Sets up logging configuration from the TOML file. If the logging configuration fails to be loaded from the file, |
||||||
|
a default logging configuration is used instead. |
||||||
|
|
||||||
|
Returns: |
||||||
|
logging.Logger: The logger instance. |
||||||
|
""" |
||||||
|
with open("config.toml", "rb") as f: |
||||||
|
config_dict: dict = load(f) |
||||||
|
try: |
||||||
|
# Try to load logging configuration from the TOML file |
||||||
|
logging.config.dictConfig(config_dict["logger"]) |
||||||
|
except Exception as e: |
||||||
|
# If the logging configuration fails, use a default configuration and log the error |
||||||
|
logger = logging.getLogger() |
||||||
|
logger.setLevel(logging.DEBUG) |
||||||
|
logger.warning("Failed setting up logger!") |
||||||
|
logger.exception(e) |
||||||
|
logger.warning(f"Config:\n{config_dict}") |
||||||
|
return logger |
||||||
|
|
||||||
|
|
||||||
|
setup_logging() |
||||||
|
logger = logging.getLogger(__name__) |
||||||
|
logger.info(f"Logger started with level: {logger.level}") |
||||||
|
|
||||||
|
def find_most_recent_file(folder_path: Path, file_pattern: Pattern) -> str: |
||||||
|
""" |
||||||
|
Given a folder path and a regular expression pattern, this function returns the path of the most recently modified |
||||||
|
file in the folder that matches the pattern. |
||||||
|
|
||||||
|
Args: |
||||||
|
folder_path (Path): A pathlib.Path object representing the folder to search. |
||||||
|
file_pattern (Pattern): A regular expression pattern used to filter the files in the folder. |
||||||
|
|
||||||
|
Returns: |
||||||
|
str: The path of the most recently modified file in the folder that matches the pattern. |
||||||
|
""" |
||||||
|
# Find all files in the folder that match the pattern |
||||||
|
files = glob.glob(f"{folder_path}/*") |
||||||
|
logger.debug(f"files: {files}") |
||||||
|
|
||||||
|
# Get the modification time of each file and filter to only those that match the pattern |
||||||
|
file_times = [(os.path.getmtime(path), path) for path in files if re.match(file_pattern, basename(path))] |
||||||
|
|
||||||
|
# Sort the files by modification time (most recent first) |
||||||
|
file_times.sort(reverse=True) |
||||||
|
logger.debug(f"file times: {file_times}") |
||||||
|
|
||||||
|
# Return the path of the most recent file |
||||||
|
return file_times[0][1] |
||||||
|
|
||||||
|
|
||||||
|
def check_sheet(df_cols: list[str], excel_col_config: dict) -> bool: |
||||||
|
""" |
||||||
|
Given a list of column names and a dictionary of column name configurations, this function checks if the required |
||||||
|
columns are present in the list of column names. |
||||||
|
|
||||||
|
Args: |
||||||
|
df_cols (list[str]): A list of column names. |
||||||
|
excel_col_config (dict): A dictionary of column name configurations. |
||||||
|
|
||||||
|
Returns: |
||||||
|
bool: True if all of the required columns are present in the list of column names, False otherwise. |
||||||
|
""" |
||||||
|
# Get the list of required columns from the column configuration dictionary |
||||||
|
required_cols: list[str] = list(excel_col_config.values()) |
||||||
|
# Check if all of the required columns are present in the list of column names |
||||||
|
return all([col in df_cols for col in required_cols]) |
||||||
|
|
||||||
|
|
||||||
|
def get_dataframes(work_dir: str, excelConfig: dict) -> tuple[pd.DataFrame|None, pd.DataFrame|None]: |
||||||
|
""" |
||||||
|
Given a dictionary of Excel configuration options, this function searches for the most recently modified GP and OB |
||||||
|
Excel files in a "Work" folder and returns their corresponding dataframes. |
||||||
|
|
||||||
|
Args: |
||||||
|
excelConfig (dict): A dictionary containing configuration options for the GP and OB Excel files. |
||||||
|
|
||||||
|
Returns: |
||||||
|
tuple[pd.DataFrame|None, pd.DataFrame|None]: A tuple containing the OB and GP dataframes, respectively. |
||||||
|
""" |
||||||
|
|
||||||
|
# Define regular expression patterns to match the GP and OB Excel files |
||||||
|
gp_regex: Pattern = re.compile(".*gp.*\.xlsx$", re.IGNORECASE) |
||||||
|
ob_regex: Pattern = re.compile(".*ob.*\.xlsx$", re.IGNORECASE) |
||||||
|
|
||||||
|
# Find the paths of the most recently modified GP and OB Excel files |
||||||
|
gp_file_path = find_most_recent_file(work_dir, gp_regex) |
||||||
|
logger.debug(f"gp_file_path: {gp_file_path}") |
||||||
|
ob_file_path = find_most_recent_file(work_dir, ob_regex) |
||||||
|
logger.debug(f"gp_file_path: {ob_file_path}") |
||||||
|
|
||||||
|
# Read the GP and OB Excel files into dataframes and check that each dataframe has the required columns |
||||||
|
gp_xl = pd.ExcelFile(gp_file_path) |
||||||
|
gp_config = excelConfig["GP"] |
||||||
|
gp_sheets = gp_xl.sheet_names |
||||||
|
gp_dfs = pd.read_excel(gp_xl, sheet_name=gp_sheets) |
||||||
|
for sheet in gp_dfs: |
||||||
|
if check_sheet(gp_dfs[sheet].columns, gp_config): |
||||||
|
gp_df = gp_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
ob_xl = pd.ExcelFile(ob_file_path) |
||||||
|
ob_config = excelConfig["OB"] |
||||||
|
ob_sheets = ob_xl.sheet_names |
||||||
|
ob_dfs = pd.read_excel(ob_xl, sheet_name=ob_sheets) |
||||||
|
for sheet in ob_dfs: |
||||||
|
if check_sheet(ob_dfs[sheet].columns, ob_config): |
||||||
|
ob_df = ob_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
return ob_df, gp_df |
||||||
|
|
||||||
|
|
||||||
|
def main() -> int: |
||||||
|
""" |
||||||
|
This is the main function for the script. It reads configuration options from a TOML file, reads in the GP and OB |
||||||
|
Excel files, performs data reconciliation and analysis, and writes the results to a new Excel file. |
||||||
|
|
||||||
|
Returns: |
||||||
|
int: 0 if the script executes successfully. |
||||||
|
""" |
||||||
|
# Read the configuration options from a TOML file |
||||||
|
with open("config.toml", "rb") as f: |
||||||
|
config_dict: dict = load(f) |
||||||
|
logger.debug(f"Config: {config_dict}") |
||||||
|
|
||||||
|
excelConfig: dict = config_dict["ExcelColumns"] |
||||||
|
|
||||||
|
# Get the GP and OB dataframes from the Excel files |
||||||
|
ob_df, gp_df = get_dataframes(config_dict["write_dir"] ,excelConfig) |
||||||
|
assert not ob_df.empty, "OB Data empty!" |
||||||
|
assert not gp_df.empty, "GP Data empty!" |
||||||
|
|
||||||
|
# Filter the GP dataframe to include only relevant transactions |
||||||
|
fgp_df: DataFrame = filter_gp(gp_df, config_dict) |
||||||
|
# Get the overdue transactions from the OB dataframe |
||||||
|
overdue: DataFrame = get_overdue(ob_df, excelConfig["OB"]) |
||||||
|
|
||||||
|
# Create transaction dataframes for the GP and OB dataframes |
||||||
|
ob_transactions: DataFrame = create_transaction_df(ob_df, 'OB', excelConfig) |
||||||
|
gp_transactions: DataFrame = create_transaction_df(fgp_df, 'GP', excelConfig) |
||||||
|
|
||||||
|
# Get the transactions that do not have matches in both the GP and OB dataframes |
||||||
|
no_match: DataFrame = get_no_match(ob_transactions, gp_transactions) |
||||||
|
|
||||||
|
# Get the transactions that have matches in both the GP and OB dataframes but have amount mismatches |
||||||
|
full_match, not_full_match = get_not_full_match(ob_transactions, gp_transactions) |
||||||
|
only_contracts_match: DataFrame = get_contract_match(not_full_match) |
||||||
|
|
||||||
|
# Write the results to a new Excel file |
||||||
|
with pd.ExcelWriter(f"{config_dict['write_dir']}/Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx", mode='w') as writer: |
||||||
|
full_match.to_excel(writer,sheet_name="FULL", index=False) |
||||||
|
no_match.to_excel(writer, sheet_name="No Match", index=False) |
||||||
|
only_contracts_match.to_excel(writer, sheet_name="Amount Mismatch", index=False) |
||||||
|
overdue.to_excel(writer, sheet_name="Overdue", index=False) |
||||||
|
|
||||||
|
return 0 |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
print("Starting") |
||||||
|
main() |
||||||
|
print("Completed") |
||||||
@ -1,6 +0,0 @@ |
|||||||
from typing import TypeVar, Literal |
|
||||||
from enum import Enum |
|
||||||
|
|
||||||
class ReportSource(Enum): |
|
||||||
OB = "OB" |
|
||||||
GP = "GP" |
|
||||||
@ -1,198 +0,0 @@ |
|||||||
from tomllib import load as t_load |
|
||||||
from json import load as j_load |
|
||||||
from pathlib import Path |
|
||||||
from dataclasses import dataclass |
|
||||||
from typing import TypedDict |
|
||||||
from re import Pattern, compile |
|
||||||
|
|
||||||
from src import ReportSource |
|
||||||
|
|
||||||
|
|
||||||
Regex = str | Pattern |
|
||||||
|
|
||||||
|
|
||||||
class ReportConfigError(Exception): |
|
||||||
""" |
|
||||||
Exception stemming from a report configuration |
|
||||||
""" |
|
||||||
pass |
|
||||||
|
|
||||||
|
|
||||||
class SharedColumn(TypedDict, total=True): |
|
||||||
""" |
|
||||||
Excel/Dataframe column that is shared between both GP & OB |
|
||||||
""" |
|
||||||
standard: str |
|
||||||
gp: str |
|
||||||
ob: str |
|
||||||
|
|
||||||
|
|
||||||
class PathsConfig: |
|
||||||
""" |
|
||||||
Configuration holding the paths to: |
|
||||||
- input_directory: Where to search for new report files |
|
||||||
- gp/ob_glob: regex used to find new OB & GP files in the report location |
|
||||||
- db_path: path to an SQLite database if any |
|
||||||
""" |
|
||||||
|
|
||||||
def __init__(self, in_dir: str, out_dir: str, |
|
||||||
input_regex_dict: dict[str:str] , db_path: str = None) -> None: |
|
||||||
|
|
||||||
self.input_directory: Path = Path(in_dir) |
|
||||||
self.output_directory: Path = Path(out_dir) |
|
||||||
|
|
||||||
self.gp_glob: str = r"*.xlsx" |
|
||||||
self.ob_glob: str = r"*.xlsx" |
|
||||||
|
|
||||||
if db_path is not None: |
|
||||||
self.db_path: Path = Path(db_path) |
|
||||||
|
|
||||||
try: |
|
||||||
self.gp_glob: str = input_regex_dict["GP"] |
|
||||||
self.ob_glob: str = input_regex_dict["OB"] |
|
||||||
except KeyError: |
|
||||||
# Defaulting to newest of any xlsx file! |
|
||||||
# TODO investigate warning |
|
||||||
pass # will remain as *.xlsx |
|
||||||
|
|
||||||
def get_most_recent(self, report_type: ReportSource = None) -> Path|None| tuple[Path|None, Path|None]: |
|
||||||
""" |
|
||||||
Gets the most recent hold reports for OnBase and Great Plains. |
|
||||||
If no report type is specified both OnBase & GreatPlains are returned. |
|
||||||
|
|
||||||
If no matching reports are found, None will be returned |
|
||||||
""" |
|
||||||
|
|
||||||
report_files = [] |
|
||||||
report_types = [ReportSource.OB, ReportSource.GP] if report_type is None else [report_type] |
|
||||||
rt: ReportSource |
|
||||||
for rt in report_types: |
|
||||||
match rt: |
|
||||||
case rt.OB: |
|
||||||
file_glob: str = self.ob_glob |
|
||||||
case rt.GP: |
|
||||||
file_glob: str = self.gp_glob |
|
||||||
case _: |
|
||||||
raise NotImplementedError(\ |
|
||||||
f"No regex pattern for report type: {rt}" |
|
||||||
) |
|
||||||
|
|
||||||
files = self.input_directory.glob(file_glob) |
|
||||||
|
|
||||||
# Find the most recently created file |
|
||||||
most_recent_file = None |
|
||||||
most_recent_creation_time = None |
|
||||||
|
|
||||||
file: Path |
|
||||||
for file in files: |
|
||||||
creation_time = file.stat().st_ctime |
|
||||||
if most_recent_creation_time is None or creation_time > most_recent_creation_time: |
|
||||||
most_recent_file = file |
|
||||||
most_recent_creation_time = creation_time |
|
||||||
report_files.append(most_recent_file) |
|
||||||
|
|
||||||
if len(report_files) > 1: |
|
||||||
return report_files |
|
||||||
|
|
||||||
return report_files[0] |
|
||||||
|
|
||||||
def has_database(self) -> tuple[bool, bool]: |
|
||||||
""" |
|
||||||
Returns whether the config has a SQlite database path and |
|
||||||
whether that path exists |
|
||||||
""" |
|
||||||
has_db: bool = isinstance(self.db_path, Path) |
|
||||||
exists: bool = self.db_path.exists() if has_db else False |
|
||||||
return has_db, exists |
|
||||||
|
|
||||||
|
|
||||||
@dataclass |
|
||||||
class ReportConfig: |
|
||||||
""" |
|
||||||
Allows easy interaction with program configuration. |
|
||||||
- Paths to files, db |
|
||||||
- Report/Excel column naming |
|
||||||
- Regexes |
|
||||||
""" |
|
||||||
|
|
||||||
# Paths to work with |
|
||||||
# - input/output |
|
||||||
# - input discovery regexes |
|
||||||
# - SQLite database path |
|
||||||
paths: PathsConfig |
|
||||||
|
|
||||||
use_mssql: bool |
|
||||||
|
|
||||||
# Work columns are included in finsished columns |
|
||||||
work_columns: list[str] |
|
||||||
finished_columns: list[str] |
|
||||||
|
|
||||||
filters: dict[str:list[Pattern]|Pattern] |
|
||||||
|
|
||||||
# Columns featured in both reports |
|
||||||
# unified col name -> origin report -> origin col name |
|
||||||
# e.g. contract_number -> GP -> Transaction Description |
|
||||||
shared_columns: list[SharedColumn] |
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def from_file(config_path: str|Path) -> 'ReportConfig': |
|
||||||
|
|
||||||
config_path = Path(config_path) if isinstance(config_path, str) else config_path |
|
||||||
|
|
||||||
with open(config_path, "rb") as config_file: |
|
||||||
match config_path.suffix: |
|
||||||
case ".toml": |
|
||||||
c_dict: dict = t_load(config_file) |
|
||||||
case ".json": |
|
||||||
c_dict: dict= j_load(config_file) |
|
||||||
case _: |
|
||||||
raise NotImplementedError(f"Only json and toml configs are supported not: {config_path.suffix}") |
|
||||||
|
|
||||||
try: |
|
||||||
|
|
||||||
path_config: PathsConfig = PathsConfig( |
|
||||||
in_dir = c_dict["input_directory"], |
|
||||||
out_dir= c_dict["output_directory"], |
|
||||||
input_regex_dict= c_dict["input_glob_pattern"], |
|
||||||
db_path= c_dict["database_path"] |
|
||||||
) |
|
||||||
|
|
||||||
use_mssql = False #TODO no yet implemented |
|
||||||
|
|
||||||
work_columns = c_dict["work_columns"] |
|
||||||
finished_column = c_dict["finished_column"] |
|
||||||
|
|
||||||
# Create filter dict with compiled regex |
|
||||||
filters_dict : dict = c_dict["filters"] |
|
||||||
filters: dict[str:list[Pattern]|Pattern] = {} |
|
||||||
k: str |
|
||||||
v: Regex|list[Regex] |
|
||||||
for k, v in filters_dict.items(): |
|
||||||
|
|
||||||
if not isinstance(v, Regex) and not isinstance(v, list): |
|
||||||
raise ReportConfigError(f"Filter items must be a valid regex pattern or a list of valid patterns!\ |
|
||||||
{v} ({type(v)}) is not valid!") |
|
||||||
|
|
||||||
# Convert the strings to regex patterns |
|
||||||
if isinstance(v, list): |
|
||||||
filters[k] = [ |
|
||||||
r if isinstance(r, Pattern) |
|
||||||
else compile(r) |
|
||||||
for r in v |
|
||||||
] |
|
||||||
else: |
|
||||||
filters[k] = compile(v) if isinstance(v, Pattern) else v |
|
||||||
|
|
||||||
shared_columns: list[SharedColumn] = c_dict["shared_columns"] |
|
||||||
|
|
||||||
except KeyError as ke: |
|
||||||
raise ReportConfigError(f"Invalid report config!\n{ke}") |
|
||||||
|
|
||||||
return ReportConfig( |
|
||||||
paths= path_config, |
|
||||||
use_mssql= use_mssql, |
|
||||||
work_columns= work_columns, |
|
||||||
finished_columns= finished_column, |
|
||||||
filters= filters, |
|
||||||
shared_columns= shared_columns, |
|
||||||
) |
|
||||||
@ -1,22 +0,0 @@ |
|||||||
version = 1 |
|
||||||
|
|
||||||
disable_existing_loggers = false |
|
||||||
|
|
||||||
[formatters.custom] |
|
||||||
format = "'%(asctime)s - %(module)s - %(levelname)s - %(message)s'" |
|
||||||
|
|
||||||
[handlers.console] |
|
||||||
class = "logging.StreamHandler" |
|
||||||
level = "DEBUG" |
|
||||||
formatter = "custom" |
|
||||||
stream = "ext://sys.stdout" |
|
||||||
|
|
||||||
[handlers.file] |
|
||||||
class = "logging.FileHandler" |
|
||||||
level = "DEBUG" |
|
||||||
formatter = "custom" |
|
||||||
filename = "on_hold.log" |
|
||||||
|
|
||||||
[root] |
|
||||||
level = "ERROR" |
|
||||||
handlers = ["console", "file"] |
|
||||||
@ -1,33 +0,0 @@ |
|||||||
{ |
|
||||||
"input_directory": "/path/to/input/folder", |
|
||||||
"input_glob_pattern": { |
|
||||||
"GP": "*GP*.xlsx", |
|
||||||
"OB": "*OB*.xlsx" |
|
||||||
}, |
|
||||||
"output_directory": "/path/to/output", |
|
||||||
"interactive_inputs": false, |
|
||||||
"use_mssql": false, |
|
||||||
"database_path": "./onhold.db", |
|
||||||
"work_columns": [ |
|
||||||
"Col_A", |
|
||||||
"Col_B" |
|
||||||
], |
|
||||||
"finished_column": [ |
|
||||||
"Notes", |
|
||||||
"Conctract Number" |
|
||||||
], |
|
||||||
"filters": { |
|
||||||
"filter_name": [ |
|
||||||
"\\d{7}", |
|
||||||
"\\w+" |
|
||||||
], |
|
||||||
"other_filter": "(OB|GP)$" |
|
||||||
}, |
|
||||||
"shared_columns": [ |
|
||||||
{ |
|
||||||
"standardized_name": "contract_number", |
|
||||||
"GP": "Transactoin Description", |
|
||||||
"OB": "ContractNumber" |
|
||||||
} |
|
||||||
] |
|
||||||
} |
|
||||||
@ -1,72 +0,0 @@ |
|||||||
#### Paths: using '' makes the string 'raw' to avoid escape characters |
|
||||||
|
|
||||||
# Path to the directory to search for input report files |
|
||||||
input_directory = 'Work/Reports' |
|
||||||
# Regex used to discover newest files |
|
||||||
input_glob_pattern = { GP = "*GP*.xlsx", OB = '*OB*.xlsx'} |
|
||||||
# Path to the directory to save the reconcilation work report |
|
||||||
output_directory = 'Work/Output' |
|
||||||
# Fallback to interactive? |
|
||||||
interactive_inputs = false # NOT YET IMPLEMENTED |
|
||||||
|
|
||||||
|
|
||||||
#### DB |
|
||||||
|
|
||||||
# Whether to try using a mssql database |
|
||||||
# NOT YET IMPLEMENTED! |
|
||||||
use_mssql = false |
|
||||||
# Path to the SQLite database used to view/save reconcilations |
|
||||||
database_path = 'src/onhold_reconciliation.db' |
|
||||||
|
|
||||||
|
|
||||||
### Finished rec details |
|
||||||
|
|
||||||
# Columns to add to all 'work' sheets |
|
||||||
# also saved 'Reconcilations' database |
|
||||||
work_columns = [ |
|
||||||
"HideNextMonth", # Boolean column for user to indicate if this contract should be ignored next month |
|
||||||
"Resolution" # Text field describing the disprecany and how it may be resolved |
|
||||||
] |
|
||||||
# Columns to keep on reconcilation 'work' sheets |
|
||||||
finished_column = [ |
|
||||||
"contract_number", |
|
||||||
"vendor_name", |
|
||||||
"AppNum", # OB only |
|
||||||
"Document Number", # GP Only |
|
||||||
"DateBooked", # OB only |
|
||||||
"Document Date", # GP Only |
|
||||||
# 'Source' added for 'no match' |
|
||||||
] |
|
||||||
|
|
||||||
# Any regex filters that might be needed |
|
||||||
[filters] |
|
||||||
# Use label to distinguish a regex set |
|
||||||
doc_num_filters = [ |
|
||||||
"p(oin)?ts", |
|
||||||
"pool", |
|
||||||
"promo", |
|
||||||
"o(ver)?f(und)?", |
|
||||||
"m(ar)?ke?t", |
|
||||||
"title", |
|
||||||
"adj", |
|
||||||
"reg fee", |
|
||||||
"rent", |
|
||||||
"cma" |
|
||||||
] |
|
||||||
po_filter = ['(?i)^(?!.*cma(\s|\d)).*$'] |
|
||||||
|
|
||||||
# Columns that are featured & expected on both OB & GP |
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "contract_number" # The name you'd like to use to standardize them |
|
||||||
GP = "Transaction Description" # Column name used in GP |
|
||||||
OB = "Contract" # Column name used in GP |
|
||||||
|
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "onhold_amount" |
|
||||||
GP = "Current Trx Amount" |
|
||||||
OB = "CurrentOnHold" |
|
||||||
|
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "vendor_name" |
|
||||||
GP = "Vendor Name" |
|
||||||
OB = "DealerName" |
|
||||||
@ -1,40 +0,0 @@ |
|||||||
#### Paths: using '' makes the string 'raw' to avoid escape characters |
|
||||||
|
|
||||||
# Path to the directory to search for input report files |
|
||||||
input_directory = '/path/to/input/folder' |
|
||||||
# Regex used to discover newest files |
|
||||||
input_glob_pattern = { GP = "*GP*.xlsx", OB = '*OB*.xlsx'} |
|
||||||
# Path to the directory to save the reconcilation work report |
|
||||||
output_directory = '/path/to/output' |
|
||||||
# Fallback to interactive? |
|
||||||
interactive_inputs = false # NOT YET IMPLEMENTED |
|
||||||
|
|
||||||
|
|
||||||
#### DB |
|
||||||
|
|
||||||
# Whether to try using a mssql database |
|
||||||
# NOT YET IMPLEMENTED! |
|
||||||
use_mssql = false |
|
||||||
# Path to the SQLite database used to view/save reconcilations |
|
||||||
database_path = './onhold.db' |
|
||||||
|
|
||||||
|
|
||||||
### Finished rec details |
|
||||||
|
|
||||||
# Columns to add to all 'work' sheets |
|
||||||
# also saved 'Reconcilations' database |
|
||||||
work_columns = ["Col_A", "Col_B" ] |
|
||||||
# Columns to keep on reconcilation 'work' sheets |
|
||||||
finished_column = [ "Notes", "Conctract Number" ] |
|
||||||
|
|
||||||
# Any regex filters that might be needed |
|
||||||
[filters] |
|
||||||
# Use label to distinguish a regex set |
|
||||||
filter_name = [ '\d{7}', '\w+'] |
|
||||||
other_filter = '(OB|GP)$' |
|
||||||
|
|
||||||
# Columns that are featured & expected on both OB & GP |
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "contract_number" # The name you'd like to use to standardize them |
|
||||||
GP = "Transactoin Description" # Column name used in GP |
|
||||||
OB = "ContractNumber" # Column name used in GP |
|
||||||
@ -1,63 +0,0 @@ |
|||||||
""" |
|
||||||
Hold Reconciler is an application meant to help reconcile the differences in payments |
|
||||||
that marked as on hold in Great Plains and OnBase. |
|
||||||
|
|
||||||
It takes a report csv from OnBase and a report from GreatPlains and checks them |
|
||||||
against each other. It attempts to make them based on contract number and payment |
|
||||||
amount, or just the contract number. |
|
||||||
|
|
||||||
It also does a lot of filtering for the Great Plains report to remove irrelevant data. |
|
||||||
|
|
||||||
*Last Updated: version 1.3* |
|
||||||
*Originally developed in Spring of 2023 by Griffiths Lott (g@glott.me)* |
|
||||||
""" |
|
||||||
import re |
|
||||||
from re import Pattern |
|
||||||
import os |
|
||||||
from os.path import basename |
|
||||||
import glob |
|
||||||
import logging |
|
||||||
from pathlib import Path |
|
||||||
from tomllib import load |
|
||||||
from pandas import DataFrame, Series |
|
||||||
from typing import TypeVar, Literal |
|
||||||
|
|
||||||
|
|
||||||
import logging.config |
|
||||||
from logging import getLogger |
|
||||||
|
|
||||||
logger = getLogger(__name__) |
|
||||||
|
|
||||||
CN_REGEX = re.compile(r"\d{7}(-\d{3})?") |
|
||||||
|
|
||||||
def setup_logging(): |
|
||||||
""" |
|
||||||
Sets up logging configuration from the TOML file. If the logging configuration fails to be loaded from the file, |
|
||||||
a default logging configuration is used instead. |
|
||||||
|
|
||||||
Returns: |
|
||||||
logging.Logger: The logger instance. |
|
||||||
""" |
|
||||||
with open("src/configs/config_logger.toml", "rb") as f: |
|
||||||
config_dict: dict = load(f) |
|
||||||
try: |
|
||||||
# Try to load logging configuration from the TOML file |
|
||||||
logging.config.dictConfig(config_dict) |
|
||||||
except Exception as e: |
|
||||||
# If the logging configuration fails, use a default configuration and log the error |
|
||||||
logger = logging.getLogger() |
|
||||||
logger.setLevel(logging.DEBUG) |
|
||||||
logger.warning("Failed setting up logger!") |
|
||||||
logger.exception(e) |
|
||||||
logger.warning(f"Config:\n{config_dict}") |
|
||||||
return logger |
|
||||||
|
|
||||||
|
|
||||||
def drop_unnamed(df: DataFrame, inplace: bool = True) -> DataFrame|None: |
|
||||||
""" |
|
||||||
Drops all Unnamed columns from a dataframe. |
|
||||||
### CAUTION : This function acts *inplace* by deafult |
|
||||||
(on the orignal dataframe, not a copy!) |
|
||||||
""" |
|
||||||
cols = [c for c in df.columns if "Unnamed" in c] |
|
||||||
return df.drop(cols, axis=1, inplace=inplace) |
|
||||||
@ -1,86 +0,0 @@ |
|||||||
""" |
|
||||||
This is the main entry point for this application. It find the newest reports (GP & OB) |
|
||||||
then utilizes the reconcile module to find the differences between them. The output is |
|
||||||
saved as an excel file with todays date. |
|
||||||
""" |
|
||||||
# Custom module for reconciliation |
|
||||||
from src.helpers import setup_logging |
|
||||||
from src.reports import OnBaseReport, GreatPlainsReport, ReconciledReports |
|
||||||
from src.config import ReportConfig |
|
||||||
from src import ReportSource |
|
||||||
|
|
||||||
import pandas as pd |
|
||||||
from pandas import DataFrame, read_excel, ExcelFile |
|
||||||
import re |
|
||||||
from re import Pattern |
|
||||||
import logging |
|
||||||
from tomllib import load |
|
||||||
import logging.config |
|
||||||
from datetime import datetime as dt |
|
||||||
from pathlib import Path |
|
||||||
|
|
||||||
setup_logging() |
|
||||||
logger = logging.getLogger(__name__) |
|
||||||
logger.info(f"Logger started with level: {logger.level}") |
|
||||||
|
|
||||||
|
|
||||||
def pull_report_sheet(report_path: Path, report_source: ReportSource, report_config: ReportConfig) -> DataFrame|None: |
|
||||||
|
|
||||||
xl_file = ExcelFile(report_path) |
|
||||||
# Get the columns required to be a valid report for the given report type |
|
||||||
req_cols = [col[report_source.value] for col in report_config.shared_columns] |
|
||||||
|
|
||||||
logger.debug(f"GP_Req_cols: {req_cols}") |
|
||||||
# Sheets avaialble in the excel file |
|
||||||
sheets = xl_file.sheet_names |
|
||||||
# Dictionary of dataframes keyed by their sheet name |
|
||||||
sheet_dataframes: dict[str:DataFrame] = read_excel(xl_file, sheet_name=sheets) |
|
||||||
# Check each dataframe for the required column |
|
||||||
for sheet in sheet_dataframes: |
|
||||||
sheet_columns: list[str] = list(sheet_dataframes[sheet].columns) |
|
||||||
logger.debug(f"{report_source.value} ({sheet}) : {sheet_columns}") |
|
||||||
logger.debug(f"Matches {[r in sheet_columns for r in req_cols]}") |
|
||||||
if all([r in sheet_columns for r in req_cols]): |
|
||||||
logger.debug(f"FOUND: {sheet}") |
|
||||||
return sheet_dataframes[sheet] |
|
||||||
return None |
|
||||||
|
|
||||||
|
|
||||||
def main() -> int: |
|
||||||
""" |
|
||||||
This is the main function for the script. It reads configuration options from a TOML file, reads in the GP and OB |
|
||||||
Excel files, performs data reconciliation and analysis, and writes the results to a new Excel file. |
|
||||||
|
|
||||||
Returns: |
|
||||||
int: 0 if the script executes successfully. |
|
||||||
""" |
|
||||||
# Read the configuration options |
|
||||||
report_config: ReportConfig = ReportConfig.from_file(Path("src/configs/reports_config.toml")) |
|
||||||
|
|
||||||
# Get the GP and OB dataframes from the Excel files |
|
||||||
ob_report, gp_report = report_config.paths.get_most_recent() |
|
||||||
print(ob_report) |
|
||||||
print(gp_report) |
|
||||||
ob_df: DataFrame = pull_report_sheet(ob_report, ReportSource.OB, report_config) |
|
||||||
gp_df: DataFrame = pull_report_sheet(gp_report, ReportSource.GP, report_config) |
|
||||||
assert not ob_df.empty, "OB Data empty!" |
|
||||||
assert not gp_df.empty, "GP Data empty!" |
|
||||||
|
|
||||||
obr: OnBaseReport = OnBaseReport(ob_df, report_config) |
|
||||||
gpr: GreatPlainsReport = GreatPlainsReport(gp_df, report_config) |
|
||||||
|
|
||||||
rec_output: ReconciledReports = obr.reconcile(gpr) |
|
||||||
|
|
||||||
output_name: Path = Path(f"Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx") |
|
||||||
output_base: Path = report_config.paths.output_directory |
|
||||||
output_path: Path = Path(output_base, output_name) |
|
||||||
|
|
||||||
rec_output.save_reports(output_path) |
|
||||||
|
|
||||||
return 0 |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": |
|
||||||
print("Starting") |
|
||||||
main() |
|
||||||
print("Completed") |
|
||||||
@ -1,155 +0,0 @@ |
|||||||
""" |
|
||||||
Classes and functions to parse completed reconciliation reports and remember |
|
||||||
the resolutions of contracts. |
|
||||||
|
|
||||||
Also provides a way for the reconciler to check hold against previously |
|
||||||
resolved holds. |
|
||||||
|
|
||||||
*Last Updated: version 1.3 |
|
||||||
""" |
|
||||||
from src.helpers import drop_unnamed, setup_logging |
|
||||||
from src.config import ReportConfig, ReportSource |
|
||||||
from src.ghlib.database.database_manager import SQLiteManager, select_fields_statement |
|
||||||
|
|
||||||
from pathlib import Path |
|
||||||
from pandas import DataFrame, Series, read_sql_query, read_excel, concat |
|
||||||
from numpy import NaN |
|
||||||
from logging import getLogger |
|
||||||
from dataclasses import dataclass |
|
||||||
from hashlib import md5 |
|
||||||
from typing import TypeAlias |
|
||||||
|
|
||||||
setup_logging() |
|
||||||
logger = getLogger(__name__) |
|
||||||
|
|
||||||
col_hash: TypeAlias = str |
|
||||||
|
|
||||||
def hash_cols(row: Series, cols_to_hash: list[str]) -> col_hash: |
|
||||||
md5_hash = md5() |
|
||||||
md5_hash.update((''.join(str(row[col]) for col in cols_to_hash)).encode('utf-8')) |
|
||||||
return md5_hash.hexdigest() |
|
||||||
|
|
||||||
def create_identifier(df: DataFrame) -> DataFrame: |
|
||||||
""" |
|
||||||
We want to create a unqiue and replicable ID to identify each payment pair. |
|
||||||
Some transactions may have 1 blank ID which can cause an undeterimable hash. |
|
||||||
For this reason we must replace empty IDs with x so that it will have a replicable |
|
||||||
value. |
|
||||||
|
|
||||||
Then the two ideas are hashed together using md5. Resulting in a unique 32 character |
|
||||||
identifier that can be reproduced. |
|
||||||
""" |
|
||||||
for id in ["ID_OB","ID_GP"]: |
|
||||||
df[id].fillna("x", inplace=True) |
|
||||||
df["Indentifier"] = df.apply(lambda row: |
|
||||||
hash_cols(row, ["ID_OB","ID_GP"]), axis=1 |
|
||||||
) |
|
||||||
for id in ["ID_OB","ID_GP"]: |
|
||||||
df[id].replace('x',NaN, inplace=True) |
|
||||||
return df |
|
||||||
|
|
||||||
def save_rec(resolved_dataframes: list[DataFrame], report_config: ReportConfig): |
|
||||||
""" |
|
||||||
""" |
|
||||||
sqlManager: SQLiteManager = SQLiteManager(report_config.paths.db_path) |
|
||||||
with sqlManager.get_session() as session: |
|
||||||
|
|
||||||
rdf: DataFrame |
|
||||||
for rdf in resolved_dataframes: |
|
||||||
cols: list[str] = rdf.columns.to_list() |
|
||||||
logger.debug(f"{cols=}") |
|
||||||
if "onhold_amount" in cols: |
|
||||||
logger.debug("Found 'onhold_amount' in rdf: no_match dataframe") |
|
||||||
# Split the on_hold col to normalize with amount mismatch |
|
||||||
rdf["onhold_amount_GP"] = rdf.apply(lambda row: |
|
||||||
row["onhold_amount"] if row["Source"] == "GP" else None |
|
||||||
, axis=1) |
|
||||||
rdf["onhold_amount_OB"] = rdf.apply(lambda row: |
|
||||||
row["onhold_amount"] if row["Source"] == "OB" else None |
|
||||||
, axis=1 ) |
|
||||||
else: |
|
||||||
logger.debug("No 'onhold_amount' col found in rdf: amount_mismatch dataframe") |
|
||||||
|
|
||||||
# Create a unified column for index |
|
||||||
rdf = create_identifier(rdf) |
|
||||||
|
|
||||||
rec_cols: list[str] = [ |
|
||||||
"Indentifier", |
|
||||||
"ID_GP", |
|
||||||
"ID_OB", |
|
||||||
] |
|
||||||
rec_cols.extend(report_config.work_columns) |
|
||||||
|
|
||||||
rdf = rdf[rec_cols] |
|
||||||
rdf.set_index("Indentifier", inplace=True, drop=True) |
|
||||||
rdf.drop_duplicates(inplace=True) |
|
||||||
rdf = rdf.dropna(axis=0, how="all", subset=report_config.work_columns) |
|
||||||
logger.debug(f"Saving resolutions to db:\n{rdf}") |
|
||||||
|
|
||||||
rdf.to_sql('Resolutions', |
|
||||||
con=session.connection(), |
|
||||||
if_exists="append" |
|
||||||
) |
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def get_prev_reconciled(identfiers: list[col_hash], db_location: Path) -> DataFrame|None: |
|
||||||
""" |
|
||||||
Get a DataFrame of previously reconciled contracts from an SQLite database. |
|
||||||
|
|
||||||
Args: |
|
||||||
contracts (list[str]): A list of contract numbers to check for previously reconciled contracts. |
|
||||||
|
|
||||||
Returns: |
|
||||||
DataFrame: A DataFrame of previously reconciled contracts, or an empty DataFrame if none are found. |
|
||||||
""" |
|
||||||
# Create a DB manager |
|
||||||
sqlManager: SQLiteManager = SQLiteManager(db_location) |
|
||||||
|
|
||||||
# Create a temp table to hold this batches contract numbers |
|
||||||
# this table will be cleared when sqlManager goes out of scope |
|
||||||
temp_table_statement = """ |
|
||||||
CREATE TEMPORARY TABLE CUR_IDENT (Indentifier VARCHAR(32)); |
|
||||||
""" |
|
||||||
sqlManager.execute(temp_table_statement) |
|
||||||
|
|
||||||
# Insert the current contracts into the temp table |
|
||||||
insert_idents = f""" |
|
||||||
INSERT INTO CUR_IDENT (Indentifier) VALUES |
|
||||||
{', '.join([f"('{cn}')" for cn in identfiers])}; |
|
||||||
""" |
|
||||||
|
|
||||||
logger.debug(f"{insert_idents=}") |
|
||||||
|
|
||||||
sqlManager.execute(insert_idents) |
|
||||||
|
|
||||||
# Select previously resolved contracts |
|
||||||
res_query = """ |
|
||||||
SELECT r.* |
|
||||||
FROM Resolutions r |
|
||||||
JOIN CUR_IDENT i |
|
||||||
ON r.Indentifier = i.Indentifier; |
|
||||||
""" |
|
||||||
resolved: DataFrame = sqlManager.execute(res_query, as_dataframe=True) |
|
||||||
return resolved |
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__": |
|
||||||
import argparse |
|
||||||
from logging import DEBUG |
|
||||||
logger.setLevel(DEBUG) |
|
||||||
|
|
||||||
parser = argparse.ArgumentParser( |
|
||||||
prog="HoldReconcilerRecord", |
|
||||||
) |
|
||||||
parser.add_argument("-i", "--input") |
|
||||||
args = parser.parse_args() |
|
||||||
|
|
||||||
# No Match |
|
||||||
no_match: DataFrame = read_excel(args.input, sheet_name="No Match") |
|
||||||
# Amount Mismatch |
|
||||||
amt_mm: DataFrame = read_excel(args.input, sheet_name="Amount Mismatch") |
|
||||||
|
|
||||||
report_config = ReportConfig(Path(r"configs\reports_config.toml")) |
|
||||||
|
|
||||||
save_rec(report_config, resolved_dataframes=[no_match, amt_mm]) |
|
||||||
@ -1,346 +0,0 @@ |
|||||||
from pandas import DataFrame, merge, to_datetime, NaT, concat, ExcelWriter |
|
||||||
from openpyxl import Workbook, load_workbook |
|
||||||
from abc import ABC |
|
||||||
from logging import getLogger |
|
||||||
import re |
|
||||||
from re import Pattern |
|
||||||
import datetime |
|
||||||
from copy import deepcopy |
|
||||||
from dataclasses import dataclass |
|
||||||
from pathlib import Path |
|
||||||
|
|
||||||
from src.helpers import CN_REGEX, drop_unnamed |
|
||||||
from src.memory import get_prev_reconciled, hash_cols, col_hash, create_identifier |
|
||||||
from src.config import ReportConfig, ReportSource |
|
||||||
|
|
||||||
logger = getLogger(__name__) |
|
||||||
|
|
||||||
@dataclass |
|
||||||
class ReconciledReports: |
|
||||||
no_match: DataFrame |
|
||||||
amt_mismatch: DataFrame |
|
||||||
prev_rec: DataFrame |
|
||||||
gp_filtered: DataFrame |
|
||||||
ob_overdue: DataFrame |
|
||||||
|
|
||||||
def save_reports(self, output_path: Path): |
|
||||||
with ExcelWriter(output_path, mode='w') as writer: |
|
||||||
self.no_match.drop_duplicates(inplace=True) |
|
||||||
self.no_match.to_excel(writer, sheet_name="No Match", |
|
||||||
index=False, freeze_panes=(1,3) |
|
||||||
) |
|
||||||
self.amt_mismatch.drop_duplicates(inplace=True) |
|
||||||
self.amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch", |
|
||||||
index=False, freeze_panes=(1,3) |
|
||||||
) |
|
||||||
self.ob_overdue.to_excel(writer, sheet_name="Overdue", |
|
||||||
index=False |
|
||||||
) |
|
||||||
self.prev_rec.to_excel(writer, sheet_name="Previously Reconciled", |
|
||||||
index=False, freeze_panes=(1,3) |
|
||||||
) |
|
||||||
self.gp_filtered.to_excel(writer, sheet_name="Filtered from GP", |
|
||||||
index=False, freeze_panes=(1,0) |
|
||||||
) |
|
||||||
|
|
||||||
wb: Workbook = load_workbook(output_path) |
|
||||||
for sheet in ["No Match", "Amount Mismatch"]: |
|
||||||
ws = wb[sheet] |
|
||||||
ws.column_dimensions['A'].hidden = True |
|
||||||
ws.column_dimensions['B'].hidden = True |
|
||||||
for sheet in ["Filtered from GP", "Previously Reconciled"]: |
|
||||||
wb[sheet].sheet_state = "hidden" |
|
||||||
wb.save(output_path) |
|
||||||
wb.close() |
|
||||||
|
|
||||||
class HoldReport(ABC): |
|
||||||
|
|
||||||
source = "" |
|
||||||
|
|
||||||
def __init__(self, dataframe: DataFrame, reports_config: ReportConfig) -> None: |
|
||||||
self.config = reports_config |
|
||||||
drop_unnamed(dataframe) |
|
||||||
self.df = dataframe |
|
||||||
self.df = self._add_work_columns(self.df, reports_config.work_columns) |
|
||||||
self._normalize() |
|
||||||
|
|
||||||
|
|
||||||
def _normalize(self): |
|
||||||
|
|
||||||
# Rename the columns to standardize the column names |
|
||||||
self.df.rename( columns= { sc_dict[self.source] : sc_dict["standardized_name"] |
|
||||||
for sc_dict in self.config.shared_columns |
|
||||||
}, inplace=True) |
|
||||||
|
|
||||||
# Convert the on-hold amount column to float format and round to two decimal places |
|
||||||
self.df["onhold_amount"] = self.df["onhold_amount"].astype(float).round(2) |
|
||||||
|
|
||||||
# Use regex to extract the contract number from the column values and create a new column with the standardized format |
|
||||||
self.df["contract_number"] = self.df["contract_number"].apply( |
|
||||||
lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) |
|
||||||
else re.search(CN_REGEX, str(cn)).group(0) |
|
||||||
) |
|
||||||
|
|
||||||
# Create a new column with a unique transaction ID |
|
||||||
self.df["ID"] = self.df["contract_number"] +'_'+\ |
|
||||||
self.df["onhold_amount"].astype(str) |
|
||||||
|
|
||||||
# Create a new column with the data source |
|
||||||
self.df["Source"] = self.source |
|
||||||
|
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def _remove_prev_recs(contract_match, no_match, db_location: Path) -> \ |
|
||||||
tuple[DataFrame, DataFrame, DataFrame]: |
|
||||||
""" |
|
||||||
""" |
|
||||||
|
|
||||||
idents: list[col_hash] = create_identifier(contract_match)["Indentifier"].to_list() |
|
||||||
idents.extend(create_identifier(no_match)["Indentifier"].to_list()) |
|
||||||
logger.debug(f"{idents=}") |
|
||||||
# Get previsouly reced |
|
||||||
prev_recs: DataFrame|None = get_prev_reconciled(idents, db_location) |
|
||||||
|
|
||||||
if prev_recs is None: |
|
||||||
logger.info("No previously reconciled!") |
|
||||||
return DataFrame(), contract_match, no_match |
|
||||||
|
|
||||||
dfs = [] |
|
||||||
for df in [contract_match, no_match]: |
|
||||||
start_size = df.shape[0] |
|
||||||
logger.debug(f"Report DF: \n{df}") |
|
||||||
logger.debug(f"prev_rec: \n{prev_recs}") |
|
||||||
|
|
||||||
df = merge( |
|
||||||
df, |
|
||||||
prev_recs, |
|
||||||
how="left", |
|
||||||
on= "Indentifier", |
|
||||||
suffixes=("_cur", "_prev") |
|
||||||
) |
|
||||||
|
|
||||||
df = HoldReport._created_combined_col("HideNextMonth", df, ["prev", "cur"]) |
|
||||||
df = HoldReport._created_combined_col("Resolution", df, ["prev", "cur"]) |
|
||||||
df["ID_OB"] = df["ID_OB_cur"] |
|
||||||
df["ID_GP"] = df["ID_GP_cur"] |
|
||||||
|
|
||||||
# Drop anything that should be ignored |
|
||||||
df = df[df["HideNextMonth"] != True] |
|
||||||
logger.info(f"Prev res added:\n{df}") |
|
||||||
|
|
||||||
col_to_drop = [] |
|
||||||
for c in df.keys().to_list(): |
|
||||||
if "_prev" in c in c or "_cur" in c: |
|
||||||
col_to_drop.append(c) |
|
||||||
|
|
||||||
logger.debug(f"{col_to_drop=}") |
|
||||||
df.drop( |
|
||||||
columns= col_to_drop, |
|
||||||
inplace=True |
|
||||||
) |
|
||||||
# Restandardize |
|
||||||
end_size = df.shape[0] |
|
||||||
logger.info(f"Reduced df by {start_size-end_size}") |
|
||||||
dfs.append(df) |
|
||||||
return prev_recs, dfs[0], dfs[1] |
|
||||||
|
|
||||||
def _remove_full_matches(self, other: 'HoldReport'): |
|
||||||
""" |
|
||||||
Removes any contracts that match both contract number and hold amount. |
|
||||||
These do not need to be reconciled. |
|
||||||
|
|
||||||
This id done 'in place' to both dataframes |
|
||||||
""" |
|
||||||
filter_id_match: DataFrame = self.df[~(self.df["ID"].isin(other.df["ID"]))] |
|
||||||
other.df: DataFrame = other.df[~(other.df["ID"].isin(self.df["ID"]))] |
|
||||||
self.df = filter_id_match |
|
||||||
self.combined_missing: DataFrame = concat([self.df, other.df], ignore_index=True) |
|
||||||
#self.combined_missing.to_excel("ALL MISSING.xlsx") |
|
||||||
logger.debug(f"Combined Missing:\n{self.combined_missing}") |
|
||||||
logger.info(f"Payments with errors: {self.combined_missing.shape[0]}") |
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def _created_combined_col(column: str, target_df: DataFrame, sources: tuple[str, str]) -> DataFrame : |
|
||||||
""" |
|
||||||
Creates a new column by filling empty columns of this source, with the matching column from another source |
|
||||||
""" |
|
||||||
this, that = sources |
|
||||||
target_df[column] = target_df[f"{column}_{this}"].fillna( |
|
||||||
target_df[f"{column}_{that}"] |
|
||||||
) |
|
||||||
return target_df |
|
||||||
|
|
||||||
|
|
||||||
def _requires_rec(self, other: 'HoldReport') -> tuple[DataFrame, DataFrame]: |
|
||||||
""" |
|
||||||
To be run after full matches have been re |
|
||||||
""" |
|
||||||
|
|
||||||
# Merge the two filtered DataFrames on the contract number |
|
||||||
contract_match = merge( |
|
||||||
self.df, other.df, |
|
||||||
how="inner", |
|
||||||
on=["contract_number"], |
|
||||||
suffixes=('_'+self.source, '_'+other.source) |
|
||||||
) |
|
||||||
|
|
||||||
contract_match = create_identifier(contract_match) |
|
||||||
|
|
||||||
#contract_match.to_excel("CONTRACT_MATCH.xlsx") |
|
||||||
|
|
||||||
for col in ["vendor_name", "HideNextMonth", "Resolution"]: |
|
||||||
self._created_combined_col(col, contract_match, (self.source, other.source)) |
|
||||||
|
|
||||||
logger.debug(f"_requires_rec | contract_match:\n{contract_match.columns} ({contract_match.shape})") |
|
||||||
|
|
||||||
no_match: DataFrame = self.combined_missing[~( |
|
||||||
self.combined_missing["contract_number"].isin( |
|
||||||
contract_match["contract_number"] |
|
||||||
)) |
|
||||||
] |
|
||||||
no_match[f"ID_{self.source}"] = no_match.apply(lambda row: |
|
||||||
row["ID"] if row["Source"] == self.source else None |
|
||||||
, axis=1) |
|
||||||
no_match[f"ID_{other.source}"] = no_match.apply(lambda row: |
|
||||||
row["ID"] if row["Source"] == other.source else None |
|
||||||
, axis=1) |
|
||||||
|
|
||||||
no_match = create_identifier(no_match) |
|
||||||
|
|
||||||
logger.debug(f"_requires_rec | no_match:\n{no_match.columns} ({no_match.shape})") |
|
||||||
self.prev_recs, contract_match, no_match = self._remove_prev_recs(contract_match, |
|
||||||
no_match, self.config.paths.db_path |
|
||||||
) |
|
||||||
|
|
||||||
return contract_match, no_match |
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def _add_work_columns(df: DataFrame, work_cols: list) -> DataFrame: |
|
||||||
""" |
|
||||||
Add empty columns to the dataframe to faciliate working through the report. |
|
||||||
""" |
|
||||||
logger.debug("Adding work columns!") |
|
||||||
df_cols: list[str] = df.columns.to_list() |
|
||||||
for col in work_cols: |
|
||||||
if col not in df_cols: |
|
||||||
df[col] = '' |
|
||||||
return df |
|
||||||
|
|
||||||
def reconcile(self, other: 'HoldReport') -> ReconciledReports: |
|
||||||
""" |
|
||||||
""" |
|
||||||
assert self.source != other.source, f"Reports to reconcile must be from different sources.\ |
|
||||||
({self.source} , {other.source})." |
|
||||||
self._remove_full_matches(other) |
|
||||||
|
|
||||||
if self.source == "OB": |
|
||||||
over_due: DataFrame = self.overdue |
|
||||||
filtered_gp: DataFrame = other.filtered |
|
||||||
elif self.source == "GP": |
|
||||||
over_due: DataFrame = other.overdue |
|
||||||
filtered_gp: DataFrame = self.filtered |
|
||||||
|
|
||||||
logger.debug(f"Removed matches:\n{self.df}") |
|
||||||
|
|
||||||
amount_mismatch, no_match = self._requires_rec(other) |
|
||||||
|
|
||||||
logger.debug(f"reconcile | no_match unaltered\n{no_match.columns} ({no_match.shape})") |
|
||||||
logger.debug(f"reconcile | am_mm unaltered:\n{amount_mismatch.columns} ({amount_mismatch.shape})") |
|
||||||
|
|
||||||
# Formatting |
|
||||||
columns: list[str] = ["ID_GP", "ID_OB"] |
|
||||||
columns.extend(self.config.finished_columns) |
|
||||||
|
|
||||||
nm_cols:list[str] = deepcopy(columns) |
|
||||||
nm_cols.insert(3,"onhold_amount") |
|
||||||
nm_cols.insert(4,"Source") |
|
||||||
|
|
||||||
columns.insert(3,"onhold_amount_GP") |
|
||||||
columns.insert(4, "onhold_amount_OB") |
|
||||||
|
|
||||||
# Select and reorder columns |
|
||||||
no_match = no_match[ |
|
||||||
nm_cols |
|
||||||
] |
|
||||||
|
|
||||||
amount_mismatch = amount_mismatch[ |
|
||||||
columns |
|
||||||
] |
|
||||||
logger.info(f"no_match: {no_match.shape[0]}") |
|
||||||
logger.info(f"am_mm: {amount_mismatch.shape[0]}") |
|
||||||
|
|
||||||
reconciled: ReconciledReports = ReconciledReports( |
|
||||||
no_match=no_match, |
|
||||||
amt_mismatch=amount_mismatch, |
|
||||||
prev_rec=self.prev_recs, |
|
||||||
gp_filtered=filtered_gp, |
|
||||||
ob_overdue = over_due |
|
||||||
) |
|
||||||
return reconciled |
|
||||||
|
|
||||||
|
|
||||||
class OnBaseReport(HoldReport): |
|
||||||
|
|
||||||
source = "OB" |
|
||||||
|
|
||||||
def __init__(self, dataframe: DataFrame, reports_config: ReportConfig) -> None: |
|
||||||
self.overdue = self._get_overdue(dataframe) |
|
||||||
super().__init__(dataframe, reports_config) |
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def _get_overdue(dataframe: DataFrame) -> DataFrame: |
|
||||||
""" |
|
||||||
""" |
|
||||||
dataframe["InstallDate"] = to_datetime(dataframe["InstallDate"]) |
|
||||||
dataframe["InstallDate"].fillna(NaT, inplace=True) |
|
||||||
overdue: DataFrame = dataframe[dataframe["InstallDate"].dt.date\ |
|
||||||
< datetime.date.today()] |
|
||||||
return overdue |
|
||||||
|
|
||||||
|
|
||||||
class GreatPlainsReport(HoldReport): |
|
||||||
|
|
||||||
source = "GP" |
|
||||||
|
|
||||||
def __init__(self, dataframe: DataFrame, report_config: ReportConfig) -> None: |
|
||||||
|
|
||||||
self.filtered: DataFrame = self._filter( |
|
||||||
gp_report_df= dataframe, |
|
||||||
doc_num_filters= report_config.filters["doc_num_filters"], |
|
||||||
good_po_num_regex= report_config.filters["po_filter"][0] |
|
||||||
) |
|
||||||
super().__init__(dataframe, report_config) |
|
||||||
|
|
||||||
@staticmethod |
|
||||||
def _filter(gp_report_df: DataFrame, |
|
||||||
doc_num_filters: list[Pattern], good_po_num_regex: Pattern |
|
||||||
) -> DataFrame: |
|
||||||
|
|
||||||
GOOD_PO_NUM = good_po_num_regex |
|
||||||
|
|
||||||
bad_doc_num = '(?i)' |
|
||||||
rx : Pattern |
|
||||||
for rx in doc_num_filters: |
|
||||||
bad_doc_num += f"({rx})|" |
|
||||||
bad_doc_num = re.compile(bad_doc_num[:-1], re.IGNORECASE) |
|
||||||
|
|
||||||
# Create a mask/filter that will keep rows that match these |
|
||||||
# requirments |
|
||||||
keep_mask = ( |
|
||||||
(gp_report_df["Document Type"] == "Invoice") & |
|
||||||
(gp_report_df["Purchase Order Number"].str.contains(GOOD_PO_NUM)) |
|
||||||
) |
|
||||||
|
|
||||||
# Get the rows that DO NOT fit the keep_mask |
|
||||||
dropped_posotives: DataFrame = gp_report_df[~keep_mask] |
|
||||||
# Drop the rows to filter |
|
||||||
gp_report_df.drop(dropped_posotives.index, inplace=True) |
|
||||||
|
|
||||||
# Create a filter to remove rows that meet this requirment |
|
||||||
# Making this a negative in the keep mask is more trouble than |
|
||||||
# it's worth |
|
||||||
remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num) |
|
||||||
dropped_negatives: DataFrame = gp_report_df[remove_mask] |
|
||||||
gp_report_df.drop(dropped_negatives.index, inplace=True) |
|
||||||
|
|
||||||
return concat([dropped_posotives,dropped_negatives], ignore_index=False) |
|
||||||
@ -1,72 +0,0 @@ |
|||||||
import unittest |
|
||||||
from pathlib import Path |
|
||||||
from re import Pattern, compile |
|
||||||
from src import config |
|
||||||
from src import ReportSource |
|
||||||
|
|
||||||
class TestReportConfig(unittest.TestCase): |
|
||||||
|
|
||||||
def test_from_file(self): |
|
||||||
# Provide the path to your config file |
|
||||||
config_file = Path(r"tests\test_inputs\TEST_reports_config.toml") |
|
||||||
|
|
||||||
# Call the static method from_file to create an instance of ReportConfig |
|
||||||
report_config = config.ReportConfig.from_file(config_file) |
|
||||||
|
|
||||||
# Assert the values of the attributes in the created instance |
|
||||||
self.assertEqual(report_config.paths.input_directory, Path(r"tests\test_inputs\TestSearch")) |
|
||||||
self.assertEqual(report_config.paths.gp_glob, r'*GP*.xlsx') |
|
||||||
self.assertEqual(report_config.paths.ob_glob, r"*OB*.xlsx") |
|
||||||
self.assertEqual(report_config.paths.output_directory, Path(r"tests\test_outputs")) |
|
||||||
self.assertEqual(report_config.use_mssql, False) |
|
||||||
self.assertEqual(report_config.paths.db_path, Path(r"tests\test_inputs\Static\test_static_OnHold.db")) |
|
||||||
self.assertEqual(report_config.work_columns, ["HideNextMonth", "Resolution"]) |
|
||||||
self.assertEqual(report_config.finished_columns, [ |
|
||||||
"contract_number", |
|
||||||
"vendor_name", |
|
||||||
"AppNum", |
|
||||||
"Document Number", |
|
||||||
"DateBooked", |
|
||||||
"Document Date", |
|
||||||
]) |
|
||||||
self.assertEqual(report_config.filters["doc_num_filters"], [ |
|
||||||
compile(r"p(oin)?ts",), |
|
||||||
compile(r"pool",), |
|
||||||
compile(r"promo",), |
|
||||||
compile(r"o(ver)?f(und)?",), |
|
||||||
compile(r"m(ar)?ke?t",), |
|
||||||
compile(r"title",), |
|
||||||
compile(r"adj",), |
|
||||||
compile(r"reg fee",), |
|
||||||
compile(r"rent",), |
|
||||||
compile(r"cma",), |
|
||||||
]) |
|
||||||
self.assertEqual(report_config.filters["po_filter"], [compile(r"(?i)^(?!.*cma(\s|\d)).*$")]) |
|
||||||
self.assertEqual(report_config.shared_columns[0]["standardized_name"], "contract_number") |
|
||||||
self.assertEqual(report_config.shared_columns[0]["GP"], "Transaction Description") |
|
||||||
self.assertEqual(report_config.shared_columns[0]["OB"], "Contract") |
|
||||||
self.assertEqual(report_config.shared_columns[1]["standardized_name"], "onhold_amount") |
|
||||||
self.assertEqual(report_config.shared_columns[1]["GP"], "Current Trx Amount") |
|
||||||
self.assertEqual(report_config.shared_columns[1]["OB"], "CurrentOnHold") |
|
||||||
self.assertEqual(report_config.shared_columns[2]["standardized_name"], "vendor_name") |
|
||||||
self.assertEqual(report_config.shared_columns[2]["GP"], "Vendor Name") |
|
||||||
self.assertEqual(report_config.shared_columns[2]["OB"], "DealerName") |
|
||||||
|
|
||||||
def test_get_newest(self): |
|
||||||
# Provide the path to your config file |
|
||||||
config_file = Path(r"tests\test_inputs\TEST_reports_config.toml") |
|
||||||
|
|
||||||
# Call the static method from_file to create an instance of ReportConfig |
|
||||||
report_config = config.ReportConfig.from_file(config_file) |
|
||||||
|
|
||||||
newest_ob: Path = report_config.paths.get_most_recent(report_type=ReportSource.OB) |
|
||||||
self.assertEqual(newest_ob.name, "April 2023 OB.xlsx") |
|
||||||
newest_gp: Path = report_config.paths.get_most_recent(report_type=ReportSource.GP) |
|
||||||
self.assertEqual(newest_gp.name, "April GP.xlsx") |
|
||||||
|
|
||||||
nob, ngp = report_config.paths.get_most_recent() |
|
||||||
self.assertEqual(nob.name, "April 2023 OB.xlsx") |
|
||||||
self.assertEqual(ngp.name, "April GP.xlsx") |
|
||||||
|
|
||||||
if __name__ == '__main__': |
|
||||||
unittest.main() |
|
||||||
@ -1,72 +0,0 @@ |
|||||||
#### Paths: using '' makes the string 'raw' to avoid escape characters |
|
||||||
|
|
||||||
# Path to the directory to search for input report files |
|
||||||
input_directory = 'tests\test_inputs\TestSearch' |
|
||||||
# Regex used to discover newest files |
|
||||||
input_glob_pattern = { GP = "*GP*.xlsx", OB = '*OB*.xlsx'} |
|
||||||
# Path to the directory to save the reconcilation work report |
|
||||||
output_directory = 'tests\test_outputs' |
|
||||||
# Fallback to interactive? |
|
||||||
interactive_inputs = false # NOT YET IMPLEMENTED |
|
||||||
|
|
||||||
|
|
||||||
#### DB |
|
||||||
|
|
||||||
# Whether to try using a mssql database |
|
||||||
# NOT YET IMPLEMENTED! |
|
||||||
use_mssql = false |
|
||||||
# Path to the SQLite database used to view/save reconcilations |
|
||||||
database_path = 'tests\test_inputs\Static\test_static_OnHold.db' |
|
||||||
|
|
||||||
|
|
||||||
### Finished rec details |
|
||||||
|
|
||||||
# Columns to add to all 'work' sheets |
|
||||||
# also saved 'Reconcilations' database |
|
||||||
work_columns = [ |
|
||||||
"HideNextMonth", # Boolean column for user to indicate if this contract should be ignored next month |
|
||||||
"Resolution" # Text field describing the disprecany and how it may be resolved |
|
||||||
] |
|
||||||
# Columns to keep on reconcilation 'work' sheets |
|
||||||
finished_column = [ |
|
||||||
"contract_number", |
|
||||||
"vendor_name", |
|
||||||
"AppNum", # OB only |
|
||||||
"Document Number", # GP Only |
|
||||||
"DateBooked", # OB only |
|
||||||
"Document Date", # GP Only |
|
||||||
# 'Source' added for 'no match' |
|
||||||
] |
|
||||||
|
|
||||||
# Any regex filters that might be needed |
|
||||||
[filters] |
|
||||||
# Use label to distinguish a regex set |
|
||||||
doc_num_filters = [ |
|
||||||
"p(oin)?ts", |
|
||||||
"pool", |
|
||||||
"promo", |
|
||||||
"o(ver)?f(und)?", |
|
||||||
"m(ar)?ke?t", |
|
||||||
"title", |
|
||||||
"adj", |
|
||||||
"reg fee", |
|
||||||
"rent", |
|
||||||
"cma" |
|
||||||
] |
|
||||||
po_filter = ['(?i)^(?!.*cma(\s|\d)).*$'] |
|
||||||
|
|
||||||
# Columns that are featured & expected on both OB & GP |
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "contract_number" # The name you'd like to use to standardize them |
|
||||||
GP = "Transaction Description" # Column name used in GP |
|
||||||
OB = "Contract" # Column name used in GP |
|
||||||
|
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "onhold_amount" |
|
||||||
GP = "Current Trx Amount" |
|
||||||
OB = "CurrentOnHold" |
|
||||||
|
|
||||||
[[shared_columns]] |
|
||||||
standardized_name = "vendor_name" |
|
||||||
GP = "Vendor Name" |
|
||||||
OB = "DealerName" |
|
||||||
Binary file not shown.
Binary file not shown.
@ -1,78 +0,0 @@ |
|||||||
from pandas import DataFrame, merge, to_datetime, NaT, concat, read_excel |
|
||||||
from pathlib import Path |
|
||||||
from re import Pattern |
|
||||||
import pytest as pt |
|
||||||
|
|
||||||
from src.config import ReportConfig, ReportSource |
|
||||||
from src.reports import GreatPlainsReport, OnBaseReport, ReconciledReports |
|
||||||
from src.hold_reconciler import pull_report_sheet |
|
||||||
|
|
||||||
class TestReport: |
|
||||||
|
|
||||||
@pt.fixture(autouse=True) |
|
||||||
def setup(self): |
|
||||||
self.report_config = ReportConfig.from_file( |
|
||||||
Path(r"./tests/test_inputs/TEST_reports_config.toml") |
|
||||||
) |
|
||||||
|
|
||||||
|
|
||||||
def test_full(self): |
|
||||||
""" |
|
||||||
Full process test. |
|
||||||
|
|
||||||
This tests inputs will need to be adjust anytime a change is made to the |
|
||||||
input/output report layouts, filtering, trimming, normalization. |
|
||||||
|
|
||||||
Basically, this is just to make sure everything still works after making |
|
||||||
TINY changes, that are not meant to effect the structure/logic of the program |
|
||||||
""" |
|
||||||
|
|
||||||
ob_df = pull_report_sheet( |
|
||||||
Path(r"./tests/test_inputs\Static\April 2023 OB.xlsx"), |
|
||||||
ReportSource.OB, |
|
||||||
self.report_config |
|
||||||
) |
|
||||||
gp_df = pull_report_sheet( |
|
||||||
Path(r"./tests/test_inputs\Static\April GP.xlsx"), |
|
||||||
ReportSource.GP, |
|
||||||
self.report_config |
|
||||||
) |
|
||||||
|
|
||||||
assert not ob_df.empty, "OB Data empty!" |
|
||||||
assert not gp_df.empty, "GP Data empty!" |
|
||||||
|
|
||||||
obr: OnBaseReport = OnBaseReport(ob_df, self.report_config) |
|
||||||
gpr: GreatPlainsReport = GreatPlainsReport(gp_df, self.report_config) |
|
||||||
|
|
||||||
rec_output: ReconciledReports = obr.reconcile(gpr) |
|
||||||
|
|
||||||
output_path: Path = Path( |
|
||||||
self.report_config.paths.output_directory, |
|
||||||
"TEST_REPORT.xlsx" |
|
||||||
) |
|
||||||
rec_output.save_reports(output_path) |
|
||||||
|
|
||||||
SHEET_NAMES = [ |
|
||||||
"No Match", |
|
||||||
"Amount Mismatch", |
|
||||||
"Overdue", |
|
||||||
"Previously Reconciled", |
|
||||||
"Filtered from GP", |
|
||||||
] |
|
||||||
|
|
||||||
CONTROL: dict[str:DataFrame] = read_excel( |
|
||||||
Path(r"./tests/test_inputs/Static/Reconciled Holds [TEST_FIN].xlsx"), |
|
||||||
sheet_name=SHEET_NAMES |
|
||||||
) |
|
||||||
|
|
||||||
new: dict[str:DataFrame] = read_excel( |
|
||||||
output_path, |
|
||||||
sheet_name=SHEET_NAMES |
|
||||||
) |
|
||||||
|
|
||||||
for sheet in SHEET_NAMES: |
|
||||||
print(sheet) |
|
||||||
print(new[sheet]) |
|
||||||
print("Control: ") |
|
||||||
print(CONTROL[sheet]) |
|
||||||
assert new[sheet].equals(CONTROL[sheet]) |
|
||||||
@ -1 +0,0 @@ |
|||||||
2.1 |
|
||||||
Loading…
Reference in new issue