information, and its pretty quick. OnHoldRec.bas is an Excel macro that almost works to filter GP...master
commit
075a84133b
@ -0,0 +1,4 @@ |
|||||||
|
__pycache__/ |
||||||
|
venv/ |
||||||
|
work/ |
||||||
|
*.log |
||||||
@ -0,0 +1,199 @@ |
|||||||
|
|
||||||
|
Sub OnHoldReconcile() |
||||||
|
|
||||||
|
Dim gpFull As Range |
||||||
|
Dim GpFilteredPaste As Range |
||||||
|
Dim gpFiltered As Range |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
' Get the values in the GP (Great PLains) sheet |
||||||
|
Set gpFull = SelectRange("GP", "A", 2, "L") |
||||||
|
Set GpFilteredPaste = WorkSheets("GPFiltered").Range("A2") |
||||||
|
Set gpFilered = FilterGreatPlains(gpFull, GpFilteredPaste) |
||||||
|
|
||||||
|
Dim gpContracts As Range |
||||||
|
gpContracts = gpFileted.Range("H:H") |
||||||
|
NormalizeContractNumbers gpContracts |
||||||
|
|
||||||
|
|
||||||
|
End Sub |
||||||
|
Sub PasteRows(sourceRange As Range, sourceRows() As Long, destRange As Range) |
||||||
|
' Copies specific rows from a source range to a destination range. |
||||||
|
|
||||||
|
' Parameters: |
||||||
|
' sourceRange: The range of data to copy rows from. |
||||||
|
' sourceRows: An array of row numbers to copy from the source range. |
||||||
|
' destRange: The destination range to paste the copied rows. |
||||||
|
|
||||||
|
Dim srcRow As Range |
||||||
|
Dim destRow As Range |
||||||
|
|
||||||
|
' Loop through each row in the sourceRows array |
||||||
|
For I = 0 To UBound(sourceRows) |
||||||
|
' Define the row to copy from the source range |
||||||
|
Set srcRow = sourceRange.Rows(sourceRows(I) - sourceRange.Row + 1) |
||||||
|
|
||||||
|
' Define the row to paste into the destination range |
||||||
|
Set destRow = destRange.Offset(I) |
||||||
|
|
||||||
|
' Copy the row from the source range to the destination range |
||||||
|
srcRow.Copy destRow |
||||||
|
Next I |
||||||
|
End Sub |
||||||
|
|
||||||
|
'------------------------------------------------------------------- |
||||||
|
'NormalizeContractNumbers Sub | Normalizes contract numbers in a given range by removing whitespace or words |
||||||
|
' |
||||||
|
'@param columnRange | The range of cells containing the contract numbers to be normalized |
||||||
|
' |
||||||
|
'@return | None |
||||||
|
'------------------------------------------------------------------- |
||||||
|
Sub NormalizeContractNumbers(columnRange As Range) |
||||||
|
|
||||||
|
Dim cnRegex As String |
||||||
|
' Regex to remove whitespace or words |
||||||
|
' Matches contracts with & without the schedule id |
||||||
|
cnRegex = "\d{7}(-\d{3})?" |
||||||
|
|
||||||
|
' Find the last filled Row |
||||||
|
Dim lastRow As Long |
||||||
|
' Subtract one to exclude the header row |
||||||
|
lastRow = columnRange.End(xlUp).Row - 1 |
||||||
|
|
||||||
|
Dim cell As Range |
||||||
|
' I = 2 to avoid header row |
||||||
|
For I = 2 To lastRow |
||||||
|
' Returns a normalized contract number, or the orginal value passed in |
||||||
|
cell.Value = ExtractMatch(cnRegex, cell.Value, room:=True) ' room: ReturnOriginalOnMiss |
||||||
|
Next I |
||||||
|
|
||||||
|
End Sub |
||||||
|
|
||||||
|
|
||||||
|
'------------------------------------------------------ |
||||||
|
' SelectRange | Selects a range of filled test using |
||||||
|
' from tl to tr, down to the last filled row of tl |
||||||
|
' |
||||||
|
' Params | |
||||||
|
' tl (Top Left): A column letter as a string 'C' |
||||||
|
' startRow: the top row to start at. |
||||||
|
' tr (Top Right): A column letter as a string 'F' |
||||||
|
' |
||||||
|
'Returns -> Selected range from tl to tr down to |
||||||
|
' last filled row of tl |
||||||
|
'----------------------------------------------------- |
||||||
|
Function SelectRange(sheet As String, tl As String, startRow As Integer, tr As String) As Range |
||||||
|
|
||||||
|
Dim lastRow As Long |
||||||
|
lastRow = WorkSheets(sheet).Cells(Rows.Count, tl).End(xlUp).Row |
||||||
|
Debug.Print "Select range " & tl & ":" & tr & " | Last Row " & lastRow |
||||||
|
Set SelectRange = WorkSheets(sheet).Range(tl & startRow & ":" & tr & lastRow) |
||||||
|
End Function |
||||||
|
|
||||||
|
Function FilterGreatPlains(gpRange As Range, destRange As Range) As Range |
||||||
|
' Filters a source range of data based on a condition, and copies the |
||||||
|
' filtered data to a destination range. |
||||||
|
|
||||||
|
' Parameters: |
||||||
|
' gpRange: The range of data to filter. |
||||||
|
' destRange: The destination range to paste the filtered data. |
||||||
|
|
||||||
|
Dim I As Long |
||||||
|
Dim goodRowList() As Long |
||||||
|
Dim filteredGp As Range |
||||||
|
|
||||||
|
Dim gdn As String |
||||||
|
gdn = "(^(\d+-?)+$)|(ho?ld)" |
||||||
|
Dim cmaRegex As String |
||||||
|
cmaRegex = "cma" |
||||||
|
|
||||||
|
' Loop through each row in the source range |
||||||
|
For Each gpRow In gpRange.Rows |
||||||
|
Debug.Print ("") |
||||||
|
Debug.Print ("Doc Type: " + Trim(gpRow.Cells(1, 5).Value)) |
||||||
|
' Only take cells in E (Document Type) that are 'Invoice' |
||||||
|
If Not gpRow.Cells(1, 5).Value = "Invoice" Then |
||||||
|
Debug.Print ("Kicked! Not an invoice!") |
||||||
|
GoTo NextRow ' This is not a row we need |
||||||
|
End If |
||||||
|
Debug.Print ("Doc Num: " + Trim(gpRow.Cells(1, 7).Value)) |
||||||
|
' If regex matches on the docuemnt number (G) kick don't include |
||||||
|
If Not MatchFound(gdn, Trim(gpRow.Cells(1, 7).Value)) Then |
||||||
|
Debug.Print ("Kicked! Did not match doc # regex!") |
||||||
|
GoTo NextRow |
||||||
|
End If |
||||||
|
Debug.Print ("Purch Num: " + Trim(gpRow.Cells(1, 11).Value)) |
||||||
|
' Check Purchase Order Number (K) for CMA |
||||||
|
If MatchFound(cmaRegex, gpRow.Cells(1, 11).Value) Then |
||||||
|
Debug.Print ("Kicked! CMA in purch order #!") |
||||||
|
GoTo NextRow |
||||||
|
End If |
||||||
|
' If none of the kickout conditions were met then |
||||||
|
' record this a good row |
||||||
|
Debug.Print ("Good!") |
||||||
|
ReDim Preserve goodRowList(I) |
||||||
|
goodRowList(I) = gpRow.Row |
||||||
|
I = I + 1 |
||||||
|
NextRow: |
||||||
|
Next gpRow |
||||||
|
|
||||||
|
' Copy the filtered rows to the destination range |
||||||
|
PasteRows gpRange, goodRowList, destRange |
||||||
|
|
||||||
|
' Define a new range object representing the filtered data |
||||||
|
Set filteredGp = destRange.Resize(I, gpRange.Columns.Count) |
||||||
|
|
||||||
|
' Return the filtered range object |
||||||
|
Set FilterGreatPlains = filteredGp |
||||||
|
End Function |
||||||
|
Function MatchFound(regexPattern As String, cellValue As String, Optional ignoreCase As Boolean = True) As Boolean |
||||||
|
Dim regex As New RegExp |
||||||
|
regex.Pattern = regexPattern ' Set the regular expression pattern |
||||||
|
regex.ignoreCase = ignoreCase |
||||||
|
If regex.Test(cellValue) Then ' Test the cell value against the regular expression pattern |
||||||
|
MatchFound = True ' If a match is found, return True |
||||||
|
Else |
||||||
|
MatchFound = False ' If no match is found, return False |
||||||
|
End If |
||||||
|
End Function |
||||||
|
|
||||||
|
'------------------------------------------------------------------- |
||||||
|
'ExtractMatch Function | Extracts the first match of a regex pattern |
||||||
|
' |
||||||
|
'@param regexPattern | The regex pattern to match |
||||||
|
'@param cellValue | The input string to search for matches |
||||||
|
'@param ignoreCase | Optional boolean indicating if case should be ignored (default True) |
||||||
|
'@param room | ReturnOriginalOnMissing -> Optional boolean indicating if the original cell value should be returned if no matches are found (default False) |
||||||
|
' |
||||||
|
'@return | String value of the first regex match, or #N/A if no matches found and room parameter is False |
||||||
|
'------------------------------------------------------------------- |
||||||
|
Function ExtractMatch(regexPattern As String, cellValue As String, Optional ignoreCase As Boolean = True, Optional room As Boolean = False) As String |
||||||
|
' Create a new RegExp object and set its properties |
||||||
|
Dim regex As New RegExp |
||||||
|
regex.Pattern = regexPattern |
||||||
|
regex.ignoreCase = ignoreCase |
||||||
|
|
||||||
|
' Find all matches in the input string |
||||||
|
Dim regexMatches As Object |
||||||
|
Set regexMatches = regex.Execute(inputString) |
||||||
|
|
||||||
|
' If there are no matches, return either the input string or Null depending on the value of "room" |
||||||
|
If regexMatchs.Count = 0 Then |
||||||
|
Debug.Print ("No match found! " + regexPattern + " not in " + cellValue) |
||||||
|
If room Then |
||||||
|
Debug.Print ("Returning original value.") |
||||||
|
Set ExtractMatch = cellValue |
||||||
|
Else |
||||||
|
Set ExtractMatch = CVErr(xlErrNA) ' Return #N/A error |
||||||
|
End If |
||||||
|
Else |
||||||
|
' If there is at least one match, iterate through all the matches and return the value of the first match found |
||||||
|
Dim match As Object |
||||||
|
For Each match In regexMatches: |
||||||
|
Debug.Print ("Match : " + match.Value) |
||||||
|
Set ExtractMatch = match.Value |
||||||
|
Exit Function ' exit the loop after the first match is found |
||||||
|
Next match |
||||||
|
End If |
||||||
|
End Function |
||||||
@ -0,0 +1,39 @@ |
|||||||
|
write_dir = "../Work" |
||||||
|
|
||||||
|
[ExcelColumns] |
||||||
|
|
||||||
|
[ExcelColumns.OB] |
||||||
|
contract_number = "Contract" # 3070508-007 |
||||||
|
onhold_amount = "CurrentOnHold" |
||||||
|
install_date = "InstallDate" |
||||||
|
|
||||||
|
[ExcelColumns.GP] |
||||||
|
contract_number = "Transaction Description" # 1234-56789 |
||||||
|
onhold_amount = "Current Trx Amount" |
||||||
|
doc_num = "Document Number" # 1-316141 HOLD |
||||||
|
pur_order = "Purchase Order Number" # ABC123 |
||||||
|
doc_type = "Document Type" # Invoice or Credit Memo |
||||||
|
|
||||||
|
[logger] |
||||||
|
version = 1 |
||||||
|
|
||||||
|
disable_existing_loggers = false |
||||||
|
|
||||||
|
[logger.formatters.custom] |
||||||
|
format = "'%(asctime)s - %(module)s - %(levelname)s - %(message)s'" |
||||||
|
|
||||||
|
[logger.handlers.console] |
||||||
|
class = "logging.StreamHandler" |
||||||
|
level = "INFO" |
||||||
|
formatter = "custom" |
||||||
|
stream = "ext://sys.stdout" |
||||||
|
|
||||||
|
[logger.handlers.file] |
||||||
|
class = "logging.FileHandler" |
||||||
|
level = "DEBUG" |
||||||
|
formatter = "custom" |
||||||
|
filename = "on_hold.log" |
||||||
|
|
||||||
|
[logger.root] |
||||||
|
level = "DEBUG" |
||||||
|
handlers = ["console", "file"] |
||||||
@ -0,0 +1,237 @@ |
|||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame |
||||||
|
from datetime import datetime as dt |
||||||
|
import datetime |
||||||
|
import re |
||||||
|
from typing import Literal |
||||||
|
|
||||||
|
def get_overdue(onbase_df: DataFrame, onbase_excel_config) -> DataFrame: |
||||||
|
""" |
||||||
|
Given a DataFrame containing OnBase installation data and a dictionary containing the OnBase Excel configuration, |
||||||
|
this function returns a DataFrame containing the rows from `onbase_df` that have an installation date that is before |
||||||
|
the current date. |
||||||
|
|
||||||
|
Args: |
||||||
|
onbase_df (pd.DataFrame): A pandas DataFrame containing OnBase installation data. |
||||||
|
onbase_excel_config (dict): A dictionary containing the OnBase Excel configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the rows from `onbase_df` that have an installation date that is before |
||||||
|
the current date. |
||||||
|
""" |
||||||
|
id_col = onbase_excel_config["install_date"] |
||||||
|
onbase_df[id_col] = pd.to_datetime(onbase_df[id_col]) |
||||||
|
onbase_df[id_col].fillna(pd.NaT, inplace=True) |
||||||
|
return onbase_df[onbase_df[id_col].dt.date < datetime.date.today()] |
||||||
|
|
||||||
|
|
||||||
|
def filter_gp(gp_dataframe: pd.DataFrame, gp_config: dict) -> pd.DataFrame: |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing GP data and a dictionary containing the GP configuration, this function |
||||||
|
filters out rows from the DataFrame that are not needed for further analysis based on certain criteria. |
||||||
|
|
||||||
|
Args: |
||||||
|
gp_dataframe (pd.DataFrame): A pandas DataFrame containing GP data. |
||||||
|
gp_config (dict): A dictionary containing the GP configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the filtered GP data. |
||||||
|
""" |
||||||
|
# Regex used to filter unneeded transactions |
||||||
|
# filters anything that does not contain a ONLY contract number OR |
||||||
|
# The work hold or just hld |
||||||
|
GOOD_DOC_NUM = re.compile(r"(^(\d+-?)+$)|(ho?ld)", re.IGNORECASE) |
||||||
|
# Excludes anything that contains cma with a space or digit following it |
||||||
|
# CMA23532 would be excluded but 'John Locman' would be allowed |
||||||
|
GOOD_PO_NUM = re.compile(r"^(?!.*cma(\s|\d)).*$", re.IGNORECASE) |
||||||
|
|
||||||
|
# Create a filter/mask to use on the data |
||||||
|
mask = ( |
||||||
|
(gp_dataframe[gp_config['doc_type']] == "Invoice") & |
||||||
|
(gp_dataframe[gp_config['doc_num']].str.contains(GOOD_DOC_NUM)) & |
||||||
|
(gp_dataframe[gp_config['pur_order']].str.contains(GOOD_PO_NUM)) |
||||||
|
) |
||||||
|
|
||||||
|
# Get the rows to drop based on the filter/mask |
||||||
|
rows_to_drop = gp_dataframe[~mask].index |
||||||
|
|
||||||
|
# Drop the rows and return the filtered DataFrame |
||||||
|
return gp_dataframe.drop(rows_to_drop, inplace=False) |
||||||
|
|
||||||
|
|
||||||
|
def create_transaction_df(dataframe: pd.DataFrame, source: Literal["GP", "OB"], excelConfig: dict): |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing transaction data, the source of the data ("GP" or "OB"), and a dictionary |
||||||
|
containing the Excel configuration, this function creates a new DataFrame with columns for the contract number, |
||||||
|
the amount on hold, a unique transaction ID, and the source of the data. |
||||||
|
|
||||||
|
Args: |
||||||
|
dataframe (pd.DataFrame): A pandas DataFrame containing transaction data. |
||||||
|
source (Literal["GP", "OB"]): The source of the data ("GP" or "OB"). |
||||||
|
excelConfig (dict): A dictionary containing the Excel configuration. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the contract number, amount on hold, transaction ID, and data source |
||||||
|
for each transaction in the original DataFrame. |
||||||
|
""" |
||||||
|
column_config: dict = excelConfig[source] |
||||||
|
|
||||||
|
# Create a new DataFrame with the contract number and on-hold amount columns |
||||||
|
transactions = dataframe[[column_config["contract_number"], column_config["onhold_amount"]]].copy() |
||||||
|
|
||||||
|
# Rename the columns to standardize the column names |
||||||
|
transactions.rename(columns={ |
||||||
|
column_config["contract_number"]: "contract_number", |
||||||
|
column_config["onhold_amount"]: "onhold_amount", |
||||||
|
}, inplace=True) |
||||||
|
|
||||||
|
# Convert the on-hold amount column to float format and round to two decimal places |
||||||
|
transactions["onhold_amount"] = transactions["onhold_amount"].astype(float).round(2) |
||||||
|
|
||||||
|
# Use regex to extract the contract number from the column values and create a new column with the standardized format |
||||||
|
CN_REGEX = re.compile(r"\d{7}(-\d{3})?") |
||||||
|
transactions["contract_number"] = transactions["contract_number"].apply( |
||||||
|
lambda cn: str(cn) if not re.search(CN_REGEX, str(cn)) |
||||||
|
else re.search(CN_REGEX, str(cn)).group(0) |
||||||
|
) |
||||||
|
|
||||||
|
# Create a new column with a unique transaction ID |
||||||
|
transactions["ID"] = transactions["contract_number"] +'_'+\ |
||||||
|
transactions["onhold_amount"].astype(str) |
||||||
|
|
||||||
|
# Create a new column with the data source |
||||||
|
transactions["Source"] = source |
||||||
|
|
||||||
|
# Return the new DataFrame with the contract number, on-hold amount, transaction ID, and data source columns |
||||||
|
return transactions |
||||||
|
|
||||||
|
|
||||||
|
def get_no_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
||||||
|
""" |
||||||
|
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns a new |
||||||
|
DataFrame containing only the transactions that do not have a match in both the OBT and GPT DataFrames. |
||||||
|
|
||||||
|
Args: |
||||||
|
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
||||||
|
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing the transactions that do not have a match in both the OBT and GPT |
||||||
|
DataFrames. |
||||||
|
""" |
||||||
|
# Merge the two DataFrames using the contract number as the join key |
||||||
|
merged_df = pd.merge( |
||||||
|
obt_df, gpt_df, |
||||||
|
how="outer", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Filter the merged DataFrame to include only the transactions that do not have a match in both OBT and GPT |
||||||
|
no_match = merged_df.loc[ |
||||||
|
(merged_df["Source_ob"].isna()) | |
||||||
|
(merged_df["Source_gp"].isna()) |
||||||
|
] |
||||||
|
|
||||||
|
# Fill in missing values and drop unnecessary columns |
||||||
|
no_match["Source"] = no_match["Source_ob"].fillna("GP") |
||||||
|
no_match["onhold_amount"] = no_match["onhold_amount_ob"].fillna(no_match["onhold_amount_gp"]) |
||||||
|
no_match.drop(columns=[ |
||||||
|
"ID_ob", "ID_gp", |
||||||
|
"onhold_amount_ob", "onhold_amount_gp", |
||||||
|
"Source_ob", "Source_gp" |
||||||
|
], |
||||||
|
inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
no_match = no_match[ |
||||||
|
[ "Source", "contract_number", "onhold_amount"] |
||||||
|
] |
||||||
|
|
||||||
|
return no_match |
||||||
|
|
||||||
|
|
||||||
|
def get_not_full_match(obt_df: pd.DataFrame, gpt_df: pd.DataFrame): |
||||||
|
""" |
||||||
|
Given two pandas DataFrames containing transaction data from OBT and GPT, respectively, this function returns two new |
||||||
|
DataFrames. The first DataFrame contains the transactions that have a full match on both the OBT and GPT DataFrames, |
||||||
|
and the second DataFrame contains the transactions that do not have a full match. |
||||||
|
|
||||||
|
Args: |
||||||
|
obt_df (pd.DataFrame): A pandas DataFrame containing transaction data from OBT. |
||||||
|
gpt_df (pd.DataFrame): A pandas DataFrame containing transaction data from GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
tuple(pd.DataFrame, pd.DataFrame): A tuple of two DataFrames. The first DataFrame contains the transactions that |
||||||
|
have a full match on both the OBT and GPT DataFrames, and the second DataFrame contains the transactions that do |
||||||
|
not have a full match. |
||||||
|
""" |
||||||
|
# Combine the two DataFrames using an outer join on the contract number and on-hold amount |
||||||
|
merged_df = pd.merge( |
||||||
|
obt_df, gpt_df, |
||||||
|
how="outer", |
||||||
|
on=["ID", "contract_number", "onhold_amount"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Filter the merged DataFrame to include only the transactions that have a full match in both OBT and GPT |
||||||
|
full_matched = merged_df.dropna(subset=["Source_ob", "Source_gp"]) |
||||||
|
full_matched.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Create a boolean mask for the rows to drop in full_matched |
||||||
|
mask = merged_df["ID"].isin(full_matched["ID"]) |
||||||
|
# Use the mask to remove the selected rows and create a new DataFrame for not full match |
||||||
|
not_full_match = merged_df[~mask] |
||||||
|
# This includes items that DO match contracts, but not amounts |
||||||
|
# It can have multiple items from one source with the same contract number |
||||||
|
|
||||||
|
# Create a new column with the data source, using OBT as the default and GPT as backup if missing |
||||||
|
not_full_match["Source"] = not_full_match["Source_ob"].fillna(not_full_match["Source_gp"]) |
||||||
|
|
||||||
|
# Drop the redundant Source columns |
||||||
|
not_full_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
not_full_match = not_full_match[ |
||||||
|
[ "Source", "contract_number", "onhold_amount"] |
||||||
|
] |
||||||
|
|
||||||
|
# Return the two DataFrames |
||||||
|
return full_matched, not_full_match |
||||||
|
|
||||||
|
|
||||||
|
def get_contract_match(not_full_match: pd.DataFrame) -> pd.DataFrame: |
||||||
|
""" |
||||||
|
Given a pandas DataFrame containing transactions that do not have a full match between OBT and GPT, this function |
||||||
|
returns a new DataFrame containing only the transactions that have a matching contract number in both OBT and GPT. |
||||||
|
|
||||||
|
Args: |
||||||
|
not_full_match (pd.DataFrame): A pandas DataFrame containing transactions that do not have a full match between |
||||||
|
OBT and GPT. |
||||||
|
|
||||||
|
Returns: |
||||||
|
pd.DataFrame: A pandas DataFrame containing only the transactions that have a matching contract number in both |
||||||
|
OBT and GPT. |
||||||
|
""" |
||||||
|
# Filter the not_full_match DataFrame by source |
||||||
|
ob_df = not_full_match[not_full_match["Source"] == "OB"] |
||||||
|
gp_df = not_full_match[not_full_match["Source"] == "GP"] |
||||||
|
|
||||||
|
# Merge the two filtered DataFrames on the contract number |
||||||
|
contract_match = pd.merge( |
||||||
|
ob_df, gp_df, |
||||||
|
how="inner", |
||||||
|
on=["contract_number"], |
||||||
|
suffixes=("_ob", "_gp") |
||||||
|
) |
||||||
|
|
||||||
|
# Fill in missing values in the Source column and drop the redundant columns |
||||||
|
contract_match["Source"] = contract_match["Source_ob"].fillna("GP") |
||||||
|
contract_match.drop(columns=["Source_ob", "Source_gp"], inplace=True) |
||||||
|
|
||||||
|
# Reorder and return the new DataFrame with the source, contract number, and on-hold amount columns |
||||||
|
contract_match = contract_match[ |
||||||
|
[ "Source", "contract_number", "onhold_amount_ob", "onhold_amount_gp"] |
||||||
|
] |
||||||
|
|
||||||
|
return contract_match |
||||||
@ -0,0 +1,196 @@ |
|||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame, Series |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import os |
||||||
|
from os.path import basename |
||||||
|
import glob |
||||||
|
import logging |
||||||
|
from pathlib import Path |
||||||
|
from tomllib import load |
||||||
|
import logging.config |
||||||
|
from datetime import datetime as dt |
||||||
|
|
||||||
|
""" |
||||||
|
[ ] Pull in past reconciliations to check against |
||||||
|
[ ] Record reconciled transaction (connect with VBA) |
||||||
|
[ ] Check GP against the database |
||||||
|
[ ] Check OB against the database |
||||||
|
""" |
||||||
|
|
||||||
|
# Custom module for reconciliation |
||||||
|
from rec_lib import get_contract_match, get_no_match, \ |
||||||
|
get_not_full_match, get_overdue, filter_gp, create_transaction_df |
||||||
|
|
||||||
|
def setup_logging(): |
||||||
|
""" |
||||||
|
Sets up logging configuration from the TOML file. If the logging configuration fails to be loaded from the file, |
||||||
|
a default logging configuration is used instead. |
||||||
|
|
||||||
|
Returns: |
||||||
|
logging.Logger: The logger instance. |
||||||
|
""" |
||||||
|
with open("config.toml", "rb") as f: |
||||||
|
config_dict: dict = load(f) |
||||||
|
try: |
||||||
|
# Try to load logging configuration from the TOML file |
||||||
|
logging.config.dictConfig(config_dict["logger"]) |
||||||
|
except Exception as e: |
||||||
|
# If the logging configuration fails, use a default configuration and log the error |
||||||
|
logger = logging.getLogger() |
||||||
|
logger.setLevel(logging.DEBUG) |
||||||
|
logger.warning("Failed setting up logger!") |
||||||
|
logger.exception(e) |
||||||
|
logger.warning(f"Config:\n{config_dict}") |
||||||
|
return logger |
||||||
|
|
||||||
|
|
||||||
|
setup_logging() |
||||||
|
logger = logging.getLogger(__name__) |
||||||
|
logger.info(f"Logger started with level: {logger.level}") |
||||||
|
|
||||||
|
def find_most_recent_file(folder_path: Path, file_pattern: Pattern) -> str: |
||||||
|
""" |
||||||
|
Given a folder path and a regular expression pattern, this function returns the path of the most recently modified |
||||||
|
file in the folder that matches the pattern. |
||||||
|
|
||||||
|
Args: |
||||||
|
folder_path (Path): A pathlib.Path object representing the folder to search. |
||||||
|
file_pattern (Pattern): A regular expression pattern used to filter the files in the folder. |
||||||
|
|
||||||
|
Returns: |
||||||
|
str: The path of the most recently modified file in the folder that matches the pattern. |
||||||
|
""" |
||||||
|
# Find all files in the folder that match the pattern |
||||||
|
files = glob.glob(f"{folder_path}/*") |
||||||
|
logger.debug(f"files: {files}") |
||||||
|
|
||||||
|
# Get the modification time of each file and filter to only those that match the pattern |
||||||
|
file_times = [(os.path.getmtime(path), path) for path in files if re.match(file_pattern, basename(path))] |
||||||
|
|
||||||
|
# Sort the files by modification time (most recent first) |
||||||
|
file_times.sort(reverse=True) |
||||||
|
logger.debug(f"file times: {file_times}") |
||||||
|
|
||||||
|
# Return the path of the most recent file |
||||||
|
return file_times[0][1] |
||||||
|
|
||||||
|
|
||||||
|
def check_sheet(df_cols: list[str], excel_col_config: dict) -> bool: |
||||||
|
""" |
||||||
|
Given a list of column names and a dictionary of column name configurations, this function checks if the required |
||||||
|
columns are present in the list of column names. |
||||||
|
|
||||||
|
Args: |
||||||
|
df_cols (list[str]): A list of column names. |
||||||
|
excel_col_config (dict): A dictionary of column name configurations. |
||||||
|
|
||||||
|
Returns: |
||||||
|
bool: True if all of the required columns are present in the list of column names, False otherwise. |
||||||
|
""" |
||||||
|
# Get the list of required columns from the column configuration dictionary |
||||||
|
required_cols: list[str] = list(excel_col_config.values()) |
||||||
|
# Check if all of the required columns are present in the list of column names |
||||||
|
return all([col in df_cols for col in required_cols]) |
||||||
|
|
||||||
|
|
||||||
|
def get_dataframes(excelConfig: dict) -> tuple[pd.DataFrame|None, pd.DataFrame|None]: |
||||||
|
""" |
||||||
|
Given a dictionary of Excel configuration options, this function searches for the most recently modified GP and OB |
||||||
|
Excel files in a "Work" folder and returns their corresponding dataframes. |
||||||
|
|
||||||
|
Args: |
||||||
|
excelConfig (dict): A dictionary containing configuration options for the GP and OB Excel files. |
||||||
|
|
||||||
|
Returns: |
||||||
|
tuple[pd.DataFrame|None, pd.DataFrame|None]: A tuple containing the OB and GP dataframes, respectively. |
||||||
|
""" |
||||||
|
# Get the current working directory and the path to the "Work" folder |
||||||
|
current_dir: Path = Path(os.getcwd()) |
||||||
|
work_folder: Path = current_dir / 'Work' |
||||||
|
logger.debug(f"Workpath: {work_folder}") |
||||||
|
|
||||||
|
# Check that the "Work" folder exists |
||||||
|
assert work_folder.exists, "No work folder found!" |
||||||
|
|
||||||
|
# Define regular expression patterns to match the GP and OB Excel files |
||||||
|
gp_regex: Pattern = re.compile(".*gp.*\.xlsx$", re.IGNORECASE) |
||||||
|
ob_regex: Pattern = re.compile(".*ob.*\.xlsx$", re.IGNORECASE) |
||||||
|
|
||||||
|
# Find the paths of the most recently modified GP and OB Excel files |
||||||
|
gp_file_path = find_most_recent_file(work_folder, gp_regex) |
||||||
|
logger.debug(f"gp_file_path: {gp_file_path}") |
||||||
|
ob_file_path = find_most_recent_file(work_folder, ob_regex) |
||||||
|
logger.debug(f"gp_file_path: {ob_file_path}") |
||||||
|
|
||||||
|
# Read the GP and OB Excel files into dataframes and check that each dataframe has the required columns |
||||||
|
gp_xl = pd.ExcelFile(gp_file_path) |
||||||
|
gp_config = excelConfig["GP"] |
||||||
|
gp_sheets = gp_xl.sheet_names |
||||||
|
gp_dfs = pd.read_excel(gp_xl, sheet_name=gp_sheets) |
||||||
|
for sheet in gp_dfs: |
||||||
|
if check_sheet(gp_dfs[sheet].columns, gp_config): |
||||||
|
gp_df = gp_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
ob_xl = pd.ExcelFile(ob_file_path) |
||||||
|
ob_config = excelConfig["OB"] |
||||||
|
ob_sheets = ob_xl.sheet_names |
||||||
|
ob_dfs = pd.read_excel(ob_xl, sheet_name=ob_sheets) |
||||||
|
for sheet in ob_dfs: |
||||||
|
if check_sheet(ob_dfs[sheet].columns, ob_config): |
||||||
|
ob_df = ob_dfs[sheet] |
||||||
|
break |
||||||
|
|
||||||
|
return ob_df, gp_df |
||||||
|
|
||||||
|
|
||||||
|
def main() -> int: |
||||||
|
""" |
||||||
|
This is the main function for the script. It reads configuration options from a TOML file, reads in the GP and OB |
||||||
|
Excel files, performs data reconciliation and analysis, and writes the results to a new Excel file. |
||||||
|
|
||||||
|
Returns: |
||||||
|
int: 0 if the script executes successfully. |
||||||
|
""" |
||||||
|
# Read the configuration options from a TOML file |
||||||
|
with open("config.toml", "rb") as f: |
||||||
|
config_dict: dict = load(f) |
||||||
|
|
||||||
|
excelConfig: dict = config_dict["ExcelColumns"] |
||||||
|
|
||||||
|
# Get the GP and OB dataframes from the Excel files |
||||||
|
ob_df, gp_df = get_dataframes(excelConfig) |
||||||
|
assert not ob_df.empty, "OB Data empty!" |
||||||
|
assert not gp_df.empty, "GP Data empty!" |
||||||
|
|
||||||
|
# Filter the GP dataframe to include only relevant transactions |
||||||
|
fgp_df: DataFrame = filter_gp(gp_df, excelConfig["GP"]) |
||||||
|
# Get the overdue transactions from the OB dataframe |
||||||
|
overdue: DataFrame = get_overdue(ob_df, excelConfig["OB"]) |
||||||
|
|
||||||
|
# Create transaction dataframes for the GP and OB dataframes |
||||||
|
ob_transactions: DataFrame = create_transaction_df(ob_df, 'OB', excelConfig) |
||||||
|
gp_transactions: DataFrame = create_transaction_df(fgp_df, 'GP', excelConfig) |
||||||
|
|
||||||
|
# Get the transactions that do not have matches in both the GP and OB dataframes |
||||||
|
no_match: DataFrame = get_no_match(ob_transactions, gp_transactions) |
||||||
|
|
||||||
|
# Get the transactions that have matches in both the GP and OB dataframes but have amount mismatches |
||||||
|
full_match, not_full_match = get_not_full_match(ob_transactions, gp_transactions) |
||||||
|
only_contracts_match: DataFrame = get_contract_match(not_full_match) |
||||||
|
|
||||||
|
# Write the results to a new Excel file |
||||||
|
with pd.ExcelWriter(f"{config_dict['work_dir']}/Reconciled Holds [{dt.now().strftime('%m-%d-%Y')}].xlsx", mode='w') as writer: |
||||||
|
full_match.to_excel(writer,sheet_name="FULL", index=False) |
||||||
|
no_match.to_excel(writer, sheet_name="No Match", index=False) |
||||||
|
only_contracts_match.to_excel(writer, sheet_name="Amount Mismatch", index=False) |
||||||
|
overdue.to_excel(writer, sheet_name="Overdue", index=False) |
||||||
|
|
||||||
|
return 0 |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
print("Starting") |
||||||
|
main() |
||||||
|
print("Completed") |
||||||
Binary file not shown.
Loading…
Reference in new issue