Reworked the config file to be more flexable and added a config.py file

with
a ReportConfig class to faciliate easier interaction with the report.
Actual program still expects the old config. Must implement
dev
= 3 years ago
parent 9ad5e9180c
commit f6245a3413
Signed by untrusted user who does not match committer: gprog
GPG Key ID: 5BE9BB58D37713F8
  1. 8
      Hold Reconciler.spec
  2. 0
      __init__.py
  3. 31
      config_reports.toml
  4. 6
      src/__init__.py
  5. 181
      src/config.py
  6. 0
      src/config_logger.toml
  7. 44
      src/config_reports.toml
  8. 19
      src/configs/report_config_template.json
  9. 40
      src/configs/reports_config_template.toml
  10. 0
      src/helpers.py
  11. 38
      src/hold_reconciler.py
  12. 78
      src/memory.py
  13. 215
      src/reports.py
  14. 2
      version.txt

@ -5,11 +5,11 @@ block_cipher = None
a = Analysis(
['reconcile_holds.py'],
pathex=[],
['hold_reconciler.py'],
pathex=['\\leafnow.com\shared\Business Solutions\Griff\Code\HoldReconciler'],
binaries=[],
datas=[('config.toml', '.'), ('requirements.txt', '.')],
hiddenimports=['openpyxl'],
datas=[('.\\config_logger.toml', '.'), ('.\\config_reports.toml', '.')],
hiddenimports=['reports.*','memory.*','helpers.*'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],

@ -1,31 +0,0 @@
output_columns = [
"contract_number",
"vendor_name",
"AppNum", # OB only
"DateBooked", # OB only
"Document Number",# GP Only
"Resolution",
"Notes"
# 'Source' added for 'no match'
]
[gp_filters]
# These regex will be combined and with ORs and used to filer
# the document number column of the GP report
doc_num_filters = [
"p(oin)?ts",
"pool",
"promo",
"o(ver)?f(und)?",
"m(ar)?ke?t",
"title",
"adj",
"reg free",
"cma"
]
po_filter = "^(?!.*cma(\\s|\\d)).*$"
[shared_columns]
contract_number = { GP = "Transaction Description", OB = "Contract"}
onhold_amount = { GP = "Current Trx Amount", OB = "CurrentOnHold" }
vendor_name = { GP = "Vendor Name", OB = "DealerName"}

@ -0,0 +1,6 @@
from typing import TypeVar, Literal
from enum import Enum
class ReportSource(Enum):
OB = "OB"
GP = "GP"

@ -0,0 +1,181 @@
from tomllib import load as t_load
from json import load as j_load
from pathlib import Path
from dataclasses import dataclass
from typing import TypedDict
from re import Pattern, compile
from src import ReportSource
Regex = str | Pattern
class ReportConfigError(Exception):
"""
Exception stemming from a report configuration
"""
pass
class SharedColumn(TypedDict, total=True):
"""
Excel/Dataframe column that is shared between both GP & OB
"""
standard: str
gp: str
ob: str
class PathsConfig:
"""
Configuration holding the paths to:
- input_directory: Where to search for new report files
- gp/ob_regex: regex used to find new OB & GP files in the report location
- db_path: path to an SQLite database if any
"""
def __init__(self, in_dir: str, out_dir: str,
input_regex_dict: dict[str:Regex] , db_path: str = None) -> None:
self.input_directory: Path = Path(in_dir)
self.output_directory: Path = Path(out_dir)
self.gp_regex: Pattern = compile("*.xlsx")
self.ob_regex: Pattern = compile("*.xlsx")
if db_path is not None:
self.db_path: Path = Path(db_path)
try:
self.gp_regex: Pattern = compile(input_regex_dict["GP"])
self.ob_regex: Pattern = compile(input_regex_dict["OB"])
except KeyError:
# Defaulting to newest of any xlsx file!
# TODO investigate warning
pass # will remain as *.xlsx
def get_most_recent(self, report_type: ReportSource) -> Path|None:
match report_type:
case report_type.OB:
file_regex: Pattern = self.ob_regex
case report_type.GP:
file_regex: Pattern = self.gp_regex
case _:
raise NotImplementedError(\
f"No regex pattern for report type: {report_type}"
)
files = self.input_directory.glob(file_regex)
# Find the most recently created file
most_recent_file = None
most_recent_creation_time = None
file: Path
for file in files:
creation_time = file.stat().st_ctime
if most_recent_creation_time is None or creation_time > most_recent_creation_time:
most_recent_file = file
most_recent_creation_time = creation_time
return most_recent_file
def has_database(self) -> tuple[bool, bool]:
"""
Returns whether the config has a SQlite database path and
whether that path exists
"""
has_db: bool = isinstance(self.db_path, Path)
exists: bool = self.db_path.exists() if has_db else False
return has_db, exists
@dataclass
class ReportConfig:
# Paths to work with
# - input/output
# - input discovery regexes
# - SQLite database path
paths: PathsConfig
use_mssql: bool
db_path: Path
# Work columns are included in finsished columns
work_columns: list[str]
finished_columns: list[str]
filters: dict[str:list[Pattern]|Pattern]
# Columns featured in both reports
# unified col name -> origin report -> origin col name
# e.g. contract_number -> GP -> Transaction Description
shared_columns: list[SharedColumn]
@staticmethod
def from_file(config_path: str|Path) -> 'ReportConfig':
config_path = Path(config_path) if isinstance(config_path, str) else config_path
match config_path.suffix:
case ".toml":
c_dict: dict = t_load(config_path)
case ".json":
c_dict: dict= j_load(config_path)
case _:
raise NotImplementedError(f"Only json and toml configs are supported not: {config_path.suffix}")
try:
path_config: PathsConfig = PathsConfig(
in_dir = c_dict["input_path"],
out_dir= c_dict["output_path"],
input_regex_dict= c_dict["input_regex"],
db_path= c_dict["db_path"]
)
use_mssql = False #TODO no yet implemented
work_columns = c_dict["work_columns"]
output_columns = c_dict["output_columns"]
# Add create out filter dict
filters_dict : dict = c_dict["filters"]
filters: dict[str:list[Pattern]|Pattern] = {}
k: str
v: Regex|list[Regex]
for k, v in filters_dict.items():
if not isinstance(v, Regex|list[Regex]):
raise ReportConfigError(f"Filter items must be a valid regex pattern or a list of valid patterns!\
{v} ({type(v)}) is not valid!")
# Convert the strings to regex patterns
if isinstance(v, list):
filters[k] = [
r if isinstance(r, Pattern)
else compile(r)
for r in v
]
else:
filters[k] = compile(v) if isinstance(v, Pattern) else v
shared_columns: list[SharedColumn] = c_dict["shared_columns"]
except KeyError as ke:
raise ReportConfigError(f"Invalid report config!\n{ke}")
return ReportConfig(
paths= path_config,
use_mssql= use_mssql,
work_columns= work_columns,
finished_columns= output_columns,
filters= filters,
shared_columns= shared_columns,
)

@ -0,0 +1,44 @@
output_path = '../Work'
db_path = "OnHold.db"
# Columns added each 'working' sheet in the new report dataframe
work_columns = [
"HideNextMonth", # Boolean column for user to indicate if this contract should be ignored next month
"Resolution" # Text field describing the disprecany and how it may be resolved
]
# List of Columns to show on the 'working' sheets of the rec report
output_columns = [
"contract_number",
"vendor_name",
"AppNum", # OB only
"Document Number",# GP Only
"DateBooked",# OB only
"Document Date", #GP Only
"HideNextMonth",
"Resolution",
# 'Source' added for 'no match'
]
[filters]
# These regex will be combined and with ORs and used to filer
# the document number column of the GP report
doc_num_filters = [
"p(oin)?ts",
"pool",
"promo",
"o(ver)?f(und)?",
"m(ar)?ke?t",
"title",
"adj",
"reg fee",
"rent",
"cma"
]
po_filter = ["^(?!.*cma(\\s|\\d)).*$"]
# Columns that are common to both GP and OB
[shared_columns]
contract_number = { GP = "Transaction Description", OB = "Contract"}
onhold_amount = { GP = "Current Trx Amount", OB = "CurrentOnHold" }
vendor_name = { GP = "Vendor Name", OB = "DealerName"}

@ -0,0 +1,19 @@
{
"input_directory": "",
"output_directory": "",
"use_mssql": false,
"database_path": "",
"work_columns": [],
"finished_column": [],
"filters": {
"filter_name": [],
"other_filter": ""
},
"shared_columns": [
{
"standardized_name": "",
"GP": "",
"OB": ""
}
]
}

@ -0,0 +1,40 @@
#### Paths: using '' makes the string 'raw' to avoid escape characters
# Path to the directory to search for input report files
input_directory = '/path/to/input/folder'
# Regex used to discover newest files
input_regex = { GP = '*likeThis*.xlsx', OB = '*.csv'}
# Path to the directory to save the reconcilation work report
output_directory = '/path/to/output'
# Fallback to interactive?
interactive_inputs = false # NOT YET IMPLEMENTED
#### DB
# Whether to try using a mssql database
# NOT YET IMPLEMENTED!
use_mssql = false
# Path to the SQLite database used to view/save reconcilations
database_path = './onhold.db'
### Finished rec details
# Columns to add to all 'work' sheets
# also saved 'Reconcilations' database
work_columns = ["Col_A", "Col_B" ]
# Columns to keep on reconcilation 'work' sheets
finished_column = [ "Notes", "Conctract Number" ]
# Any regex filters that might be needed
[filters]
# Use label to distinguish a regex set
filter_name = [ '\d{7}', '\w+']
other_filter = '(OB|GP)$'
# Columns that are featured & expected on both OB & GP
[[shared_columns]]
standardized_name = "contract_number" # The name you'd like to use to standardize them
GP = "Transactoin Description" # Column name used in GP
OB = "ContractNumber" # Column name used in GP

@ -5,7 +5,7 @@ saved as an excel file with todays date.
"""
# Custom module for reconciliation
from helpers import setup_logging, find_most_recent_file
from reports import OnBaseReport, GreatPlainsReport
from reports import OnBaseReport, GreatPlainsReport, ReconciledReports
import pandas as pd
from pandas import DataFrame
@ -15,19 +15,8 @@ import logging
from tomllib import load
import logging.config
from datetime import datetime as dt
from openpyxl import load_workbook, Workbook
import pathlib
from pathlib import Path
"""
[ ] Pull in past reconciliations to check against
[ ] Record reconciled transaction (connect with VBA)
[ ] Check GP against the database
[ ] Check OB against the database
[X] Add resolution column to error sheets
[ ] Add sheet for problem contractas already seen and 'resolved'
"""
setup_logging()
logger = logging.getLogger(__name__)
logger.info(f"Logger started with level: {logger.level}")
@ -103,29 +92,14 @@ def main() -> int:
obr: OnBaseReport = OnBaseReport(ob_df, reports_config)
gpr: GreatPlainsReport = GreatPlainsReport(gp_df, reports_config)
overdue: DataFrame = obr.get_overdue()
no_match, amt_mismatch = obr.reconcile(gpr)
rec_output: ReconciledReports = obr.reconcile(gpr)
# Write the results to a new Excel file
output_name: Path = Path(f"Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx")
output_path: Path = Path("./Work", output_name)
with pd.ExcelWriter(output_path, mode='w') as writer:
no_match.to_excel(writer, sheet_name="No Match",
index=False, freeze_panes=(1,3)
)
amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch",
index=False, freeze_panes=(1,3)
)
overdue.to_excel(writer, sheet_name="Overdue", index=False)
wb: Workbook = load_workbook(output_path)
for sheet in ["No Match", "Amount Mismatch"]:
ws = wb[sheet]
ws.column_dimensions['A'].hidden = True
ws.column_dimensions['B'].hidden = True
wb.save(output_path)
output_base: Path = Path(reports_config["output_path"])
output_path: Path = Path(output_base, output_name)
rec_output.save_reports(output_path)
return 0

@ -8,65 +8,82 @@ resolved holds.
*Last Updated: version 1.3
"""
from helpers import drop_unnamed, setup_logging
from ghlib.database.database_manager import SQLiteManager
from ghlib.database.database_manager import SQLiteManager, select_fields_statement
from pandas import DataFrame, Series, read_sql_query, read_excel, concat
from numpy import NaN
from logging import getLogger
from dataclasses import dataclass
from hashlib import md5
from typing import TypeAlias
setup_logging()
logger = getLogger(__name__)
col_hash: TypeAlias = str
def hash_cols(row: Series, cols_to_hash: list[str]) -> str:
def hash_cols(row: Series, cols_to_hash: list[str]) -> col_hash:
md5_hash = md5()
md5_hash.update((''.join(row[col] for col in cols_to_hash)).encode('utf-8'))
md5_hash.update((''.join(str(row[col]) for col in cols_to_hash)).encode('utf-8'))
return md5_hash.hexdigest()
def create_identifier(df: DataFrame) -> DataFrame:
for id in ["ID_OB","ID_GP"]:
df[id].fillna("x", inplace=True)
df["Indentifier"] = df.apply(lambda row:
hash_cols(row, ["ID_OB","ID_GP"]), axis=1
)
for id in ["ID_OB","ID_GP"]:
df[id].replace('x',NaN, inplace=True)
return df
def save_rec(resolved_dataframes: list[DataFrame]):
"""
#TODO Actually handle this...
"""
#raise NotImplementedError("You were too lazy to fix this after the rewrite. FIX PLZ!")
sqlManager: SQLiteManager = SQLiteManager("OnHold.db")
with sqlManager.get_session() as session:
conn = session.connection()
rdf: DataFrame
for rdf in resolved_dataframes:
cols: list[str] = rdf.columns.to_list()
logger.debug(f"{cols=}")
if "onhold_amount" in cols:
logger.debug(f"Found 'onhold_amount' in rdf: no_match dataframe")
logger.debug("Found 'onhold_amount' in rdf: no_match dataframe")
# Split the on_hold col to normalize with amount mismatch
rdf["onhold_amount_GP"] = rdf.apply(lambda row:
row.onhold_amount if row.Source == "GP" else None
)
row["onhold_amount"] if row["Source"] == "GP" else None
, axis=1)
rdf["onhold_amount_OB"] = rdf.apply(lambda row:
row.onhold_amount if row.Source == "OB" else None
)
row["onhold_amount"] if row["Source"] == "OB" else None
, axis=1 )
else:
logger.debug(f"No 'onhold_amount' col found in rdf: amount_mismatch dataframe")
logger.debug("No 'onhold_amount' col found in rdf: amount_mismatch dataframe")
# Create a unified column for index
rdf["Indentifier"] = rdf.apply(lambda row:
hash_cols(row, ["ID_OB","ID_GP"]), axis=1
)
rdf = create_identifier(rdf)
rec_cols: list[str] = [
"Indentifier",
"ID_GP",
"ID_OB",
"Hide Next Month",
"HideNextMonth",
"Resolution"
]
rdf = rdf[rec_cols]
rdf.set_index("Indentifier", inplace=True, drop=True)
rdf.drop_duplicates(inplace=True)
rdf = rdf.dropna(axis=0, how="all", subset=["HideNextMonth", "Resolution"])
logger.debug(f"Saving resolutions to db:\n{rdf}")
rdf.to_sql('Resolutions',
con=session.connection(),
if_exists="append"
)
def get_prev_reconciled(contracts: list[str]) -> DataFrame:
def get_prev_reconciled(identfiers: list[col_hash]) -> DataFrame|None:
"""
Get a DataFrame of previously reconciled contracts from an SQLite database.
@ -82,23 +99,26 @@ def get_prev_reconciled(contracts: list[str]) -> DataFrame:
# Create a temp table to hold this batches contract numbers
# this table will be cleared when sqlManager goes out of scope
temp_table_statement = """
CREATE TEMPORARY TABLE CUR_CONTRACTS (contract_number VARCHAR(11));
CREATE TEMPORARY TABLE CUR_IDENT (Indentifier VARCHAR(32));
"""
sqlManager.execute(temp_table_statement)
# Insert the current contracts into the temp table
insert_contracts = f"""
INSERT INTO CUR_CONTRACTS (contract_number) VALUES
{', '.join([f"('{cn}')" for cn in contracts])};
insert_idents = f"""
INSERT INTO CUR_IDENT (Indentifier) VALUES
{', '.join([f"('{cn}')" for cn in identfiers])};
"""
sqlManager.execute(insert_contracts)
logger.debug(f"{insert_idents=}")
sqlManager.execute(insert_idents)
# Select previously resolved contracts
res_query = """
SELECT r.*
FROM Resolutions r
JOIN CUR_CONTRACTS t
ON r.contract_number = t.contract_number;
JOIN CUR_IDENT i
ON r.Indentifier = i.Indentifier;
"""
resolved: DataFrame = sqlManager.execute(res_query, as_dataframe=True)
return resolved

@ -1,17 +1,54 @@
from pandas import DataFrame, merge, to_datetime, NaT, concat, Series
from numpy import concatenate
from abc import ABC, abstractmethod
from pandas import DataFrame, merge, to_datetime, NaT, concat, ExcelWriter
from openpyxl import Workbook, load_workbook
from abc import ABC
from logging import getLogger
import re
from typing import Literal
import datetime
from copy import deepcopy
from dataclasses import dataclass
from helpers import CN_REGEX, drop_unnamed
from memory import get_prev_reconciled
from memory import get_prev_reconciled, hash_cols, col_hash, create_identifier
from pathlib import Path
logger = getLogger(__name__)
@dataclass
class ReconciledReports:
no_match: DataFrame
amt_mismatch: DataFrame
prev_rec: DataFrame
gp_filtered: DataFrame
ob_overdue: DataFrame
def save_reports(self, output_path: Path):
with ExcelWriter(output_path, mode='w') as writer:
self.no_match.drop_duplicates(inplace=True)
self.no_match.to_excel(writer, sheet_name="No Match",
index=False, freeze_panes=(1,3)
)
self.amt_mismatch.drop_duplicates(inplace=True)
self.amt_mismatch.to_excel(writer, sheet_name="Amount Mismatch",
index=False, freeze_panes=(1,3)
)
self.ob_overdue.to_excel(writer, sheet_name="Overdue",
index=False
)
self.prev_rec.to_excel(writer, sheet_name="Previously Reconciled",
index=False, freeze_panes=(1,3)
)
self.gp_filtered.to_excel(writer, sheet_name="Filtered from GP",
index=False, freeze_panes=(1,0)
)
wb: Workbook = load_workbook(output_path)
for sheet in ["No Match", "Amount Mismatch"]:
ws = wb[sheet]
ws.column_dimensions['A'].hidden = True
ws.column_dimensions['B'].hidden = True
for sheet in ["Filtered from GP", "Previously Reconciled"]:
wb[sheet].sheet_state = "hidden"
wb.save(output_path)
wb.close()
class HoldReport(ABC):
@ -21,9 +58,8 @@ class HoldReport(ABC):
self.config = reports_config
drop_unnamed(dataframe)
self.df = dataframe
self.prev_rec = None
self.df = self._add_work_columns(self.df)
self._normalize()
self._previsouly_resolved()
def _normalize(self):
@ -50,55 +86,60 @@ class HoldReport(ABC):
self.df["Source"] = self.source
def _previsouly_resolved(self):
@staticmethod
def _remove_prev_recs(contract_match, no_match) -> \
tuple[DataFrame, DataFrame, DataFrame]:
"""
"""
current_contracts: list[str] = self.df["contract_number"]
prev_recd: DataFrame = get_prev_reconciled(contracts=current_contracts)
if not prev_recd:
idents: list[col_hash] = create_identifier(contract_match)["Indentifier"].to_list()
idents.extend(create_identifier(no_match)["Indentifier"].to_list())
logger.debug(f"{idents=}")
# Get previsouly reced
prev_recs: DataFrame|None = get_prev_reconciled(idents)
if prev_recs is None:
logger.info("No previously reconciled!")
self.df = self._add_work_columns(self.df)
return
self.prev_rec = prev_recd
start_size = self.df.shape[0]
logger.debug(f"Report DF: \n{self.df}")
logger.debug(f"prev_rec: \n{prev_recd}")
source_id = f"ID_{self.source}"
self.df[source_id] = self.df["ID"]
self.df = merge(
self.df,
prev_recd,
how="left",
on= source_id,
suffixes=("_cur", "_prev")
)
#self.df.to_excel(f"merged_df_{self.source}.xlsx")
return DataFrame(), contract_match, no_match
dfs = []
for df in [contract_match, no_match]:
start_size = df.shape[0]
logger.debug(f"Report DF: \n{df}")
logger.debug(f"prev_rec: \n{prev_recs}")
# Drop anything that should be ignored
self.df = self.df[self.df["Hide Next Month"] != True]
logger.info(f"Prev res added:\n{self.df}")
col_to_drop = []
for c in self.df.keys().to_list():
logger.debug(f"{c=}")
if "_prev" in c or "ID_" in c:
logger.debug(f"Found '_prev' in {c}")
col_to_drop.append(c)
else:
logger.debug(f"{c} is a good col!")
#col_to_drop.extend([c for c in self.df.keys().to_list() if '_prev' in c])
logger.debug(f"{col_to_drop=}")
self.df.drop(
columns= col_to_drop,
inplace=True
)
# Restandardize
self.df.rename(columns={"contract_number_cur": "contract_number"}, inplace=True)
end_size = self.df.shape[0]
logger.info(f"Reduced df by {start_size-end_size}")
df = merge(
df,
prev_recs,
how="left",
on= "Indentifier",
suffixes=("_cur", "_prev")
)
df = HoldReport._created_combined_col("HideNextMonth", df, ["prev", "cur"])
df = HoldReport._created_combined_col("Resolution", df, ["prev", "cur"])
df["ID_OB"] = df["ID_OB_cur"]
df["ID_GP"] = df["ID_GP_cur"]
# Drop anything that should be ignored
df = df[df["HideNextMonth"] != True]
logger.info(f"Prev res added:\n{df}")
col_to_drop = []
for c in df.keys().to_list():
if "_prev" in c in c or "_cur" in c:
col_to_drop.append(c)
logger.debug(f"{col_to_drop=}")
df.drop(
columns= col_to_drop,
inplace=True
)
# Restandardize
end_size = df.shape[0]
logger.info(f"Reduced df by {start_size-end_size}")
dfs.append(df)
return prev_recs, dfs[0], dfs[1]
def _remove_full_matches(self, other: 'HoldReport'):
"""
@ -111,7 +152,7 @@ class HoldReport(ABC):
other.df: DataFrame = other.df[~(other.df["ID"].isin(self.df["ID"]))]
self.df = filter_id_match
self.combined_missing: DataFrame = concat([self.df, other.df], ignore_index=True)
self.combined_missing.to_excel("ALL MISSING.xlsx")
#self.combined_missing.to_excel("ALL MISSING.xlsx")
logger.debug(f"Combined Missing:\n{self.combined_missing}")
logger.info(f"Payments with errors: {self.combined_missing.shape[0]}")
@ -127,7 +168,7 @@ class HoldReport(ABC):
return target_df
def _requires_rec(self, other: 'HoldReport') -> DataFrame:
def _requires_rec(self, other: 'HoldReport') -> tuple[DataFrame, DataFrame]:
"""
To be run after full matches have been re
"""
@ -140,9 +181,11 @@ class HoldReport(ABC):
suffixes=('_'+self.source, '_'+other.source)
)
contract_match = create_identifier(contract_match)
#contract_match.to_excel("CONTRACT_MATCH.xlsx")
for col in ["vendor_name", "Resolution", "Notes"]:
for col in ["vendor_name", "HideNextMonth", "Resolution"]:
self._created_combined_col(col, contract_match, (self.source, other.source))
logger.debug(f"_requires_rec | contract_match:\n{contract_match.columns} ({contract_match.shape})")
@ -159,7 +202,10 @@ class HoldReport(ABC):
row["ID"] if row["Source"] == other.source else None
, axis=1)
no_match = create_identifier(no_match)
logger.debug(f"_requires_rec | no_match:\n{no_match.columns} ({no_match.shape})")
self.prev_recs, contract_match, no_match = self._remove_prev_recs(contract_match, no_match)
return contract_match, no_match
@ -170,25 +216,34 @@ class HoldReport(ABC):
"""
logger.debug("Adding work columns!")
df_cols: list[str] = df.columns.to_list()
WORK_COLS = ["Hide Next Month","Resolution"]
WORK_COLS = ["HideNextMonth","Resolution"]
for col in WORK_COLS:
if col not in df_cols:
df[col] = ''
return df
def reconcile(self, other: 'HoldReport') -> tuple[DataFrame]:
def reconcile(self, other: 'HoldReport') -> ReconciledReports:
"""
"""
assert self.source != other.source, f"Reports to reconcile must be from different sources.\
({self.source} , {other.source})."
self._remove_full_matches(other)
all_prev_reced = concat([self.prev_rec, other.prev_rec],ignore_index=True)
if self.source == "OB":
over_due: DataFrame = self.overdue
filtered_gp: DataFrame = other.filtered
elif self.source == "GP":
over_due: DataFrame = other.overdue
filtered_gp: DataFrame = self.filtered
logger.debug(f"Removed matches:\n{self.df}")
amount_mismatch, no_match = self._requires_rec(other)
logger.debug(f"reconcile | no_match unaltered\n{no_match.columns} ({no_match.shape})")
logger.debug(f"reconcile | am_mm unaltered:\n{amount_mismatch.columns} ({amount_mismatch.shape})")
# Formatting
columns: list[str] = ["ID_GP", "ID_OB"]
columns.extend(self.config["output_columns"])
@ -209,19 +264,36 @@ class HoldReport(ABC):
]
logger.info(f"no_match: {no_match.shape[0]}")
logger.info(f"am_mm: {amount_mismatch.shape[0]}")
return no_match, amount_mismatch
reconciled: ReconciledReports = ReconciledReports(
no_match=no_match,
amt_mismatch=amount_mismatch,
prev_rec=self.prev_recs,
gp_filtered=filtered_gp,
ob_overdue = over_due
)
return reconciled
class OnBaseReport(HoldReport):
source = "OB"
def get_overdue(self) -> DataFrame:
def __init__(self, dataframe: DataFrame, reports_config: dict) -> None:
self.overdue = self._get_overdue(dataframe)
super().__init__(dataframe, reports_config)
@staticmethod
def _get_overdue(dataframe: DataFrame) -> DataFrame:
"""
"""
self.df["InstallDate"] = to_datetime(self.df["InstallDate"])
self.df["InstallDate"].fillna(NaT, inplace=True)
return self.df[self.df["InstallDate"].dt.date < datetime.date.today()]
dataframe["InstallDate"] = to_datetime(dataframe["InstallDate"])
dataframe["InstallDate"].fillna(NaT, inplace=True)
overdue: DataFrame = dataframe[dataframe["InstallDate"].dt.date\
< datetime.date.today()]
return overdue
class GreatPlainsReport(HoldReport):
@ -230,7 +302,7 @@ class GreatPlainsReport(HoldReport):
def __init__(self, dataframe: DataFrame, report_config: dict) -> None:
self._filter(
self.filtered: DataFrame = self._filter(
gp_report_df= dataframe,
doc_num_filters= report_config["gp_filters"]["doc_num_filters"],
good_po_num_regex= report_config["gp_filters"]["po_filter"]
@ -239,7 +311,8 @@ class GreatPlainsReport(HoldReport):
@staticmethod
def _filter(gp_report_df: DataFrame,
doc_num_filters: list[str], good_po_num_regex: str) -> DataFrame:
doc_num_filters: list[str], good_po_num_regex: str
) -> DataFrame:
GOOD_PO_NUM = re.compile(good_po_num_regex, re.IGNORECASE)
@ -257,15 +330,15 @@ class GreatPlainsReport(HoldReport):
)
# Get the rows that DO NOT fit the keep_mask
rows_to_drop = gp_report_df[~keep_mask].index
dropped_posotives: DataFrame = gp_report_df[~keep_mask]
# Drop the rows to filter
gp_report_df.drop(rows_to_drop, inplace=True)
gp_report_df.drop(dropped_posotives.index, inplace=True)
# Create a filter to remove rows that meet this requirment
# Making this a negative in the keep mask is more trouble than
# it's worth
remove_mask = gp_report_df["Document Number"].str.contains(bad_doc_num)
rows_to_drop = gp_report_df[remove_mask].index
gp_report_df.drop(rows_to_drop, inplace=True)
dropped_negatives: DataFrame = gp_report_df[remove_mask]
gp_report_df.drop(dropped_negatives.index, inplace=True)
return gp_report_df
return concat([dropped_posotives,dropped_negatives], ignore_index=False)

@ -1 +1 @@
2.0
2.1
Loading…
Cancel
Save