parent
5caaf3d7ac
commit
40c2a8a0df
@ -0,0 +1,182 @@ |
|||||||
|
from pathlib import Path |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame |
||||||
|
from datetime import datetime as dt, timedelta |
||||||
|
import logging |
||||||
|
import il_reports as ilr |
||||||
|
from dataclasses import dataclass |
||||||
|
from typing import Callable |
||||||
|
from tqdm import tqdm |
||||||
|
from multiprocessing import Pool, cpu_count |
||||||
|
|
||||||
|
TOP_PATH: Path = Path(r"\\leafnow.com\shared\Accounting\CASH APPS\2023") |
||||||
|
|
||||||
|
def create_logger(log_file: Path, logger_name: str = __name__, ): |
||||||
|
logger = logging.getLogger(logger_name) |
||||||
|
f_handler = logging.FileHandler(log_file, 'w') |
||||||
|
f_handler.setLevel(logging.DEBUG) |
||||||
|
s_handler = logging.StreamHandler() |
||||||
|
s_handler.setLevel(logging.INFO) |
||||||
|
logger.addHandler(f_handler) |
||||||
|
logger.addHandler(s_handler) |
||||||
|
return logger |
||||||
|
|
||||||
|
|
||||||
|
@dataclass |
||||||
|
class ExtractInstruction: |
||||||
|
input_regex: Pattern |
||||||
|
sheet_name: str |
||||||
|
extract_method: Callable |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class ReportFolder: |
||||||
|
folder_name: Path |
||||||
|
extraction_methods: list[ExtractInstruction] |
||||||
|
|
||||||
|
def extract_date_path(path: Path) -> Path: |
||||||
|
date_pattern = re.compile(r'^\d{4}\.\d{2}$') |
||||||
|
|
||||||
|
for parent in path.parents: |
||||||
|
if date_pattern.match(parent.name): |
||||||
|
return parent |
||||||
|
return None |
||||||
|
|
||||||
|
def append_to_consolidated_report( report_path: Path, report_df: DataFrame, sheet_name: str): |
||||||
|
""" |
||||||
|
""" |
||||||
|
|
||||||
|
report_month: Path = extract_date_path(report_path) |
||||||
|
report_name: str = f"{str(report_month.name).replace('.','-')}_ConsolidatedReport.xlsx" |
||||||
|
|
||||||
|
|
||||||
|
save_path = report_month / Path(report_name) |
||||||
|
logger.debug(f"{save_path=}") |
||||||
|
# Check if the current month has a consolidated report |
||||||
|
|
||||||
|
if not save_path.exists(): |
||||||
|
logger.debug(f"Consolidated Report | No monthly summary file!\n\tCreating: {save_path}") |
||||||
|
# No file exists yet |
||||||
|
# Create it and add the current month |
||||||
|
try: |
||||||
|
with pd.ExcelWriter(save_path) as writer: |
||||||
|
logger.debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
||||||
|
report_df.to_excel(writer, index=False, sheet_name=sheet_name) |
||||||
|
except Exception as e: |
||||||
|
logger.error(f"Failed to create to consolidated report! {report_name} | {sheet_name} | {report_path} :\n{e}") |
||||||
|
|
||||||
|
else: |
||||||
|
# We need to read the dataframe in the current monthly report |
||||||
|
# Check that we are not adding matching data |
||||||
|
# Save the new report |
||||||
|
#FIXME: This is so hacky it's embaressing |
||||||
|
try: |
||||||
|
current_data_len = len(pd.read_excel(save_path,sheet_name=sheet_name)) |
||||||
|
with pd.ExcelWriter(save_path, engine='openpyxl', mode='a',if_sheet_exists="overlay") as writer: |
||||||
|
logger.debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
||||||
|
report_df.to_excel(writer, index=False, sheet_name=sheet_name,startrow=len(current_data_len),header=False) |
||||||
|
except Exception as e: |
||||||
|
logger.error(f"Failed to append to consolidated report! {report_name} | {sheet_name} | {report_path} :\n{e}") |
||||||
|
|
||||||
|
def process_report(file: Path, extract_inst: ExtractInstruction) -> bool: |
||||||
|
|
||||||
|
try: |
||||||
|
with open(str(file), errors="replace") as f: |
||||||
|
report_str: str = f.read() |
||||||
|
#logger.debug(f"{report_str}") |
||||||
|
try: |
||||||
|
df: DataFrame = extract_inst.extract_method(report_str, |
||||||
|
Path(file.parent ,f"BACK_REPORT_{extract_inst.sheet_name}.xlsx") |
||||||
|
) |
||||||
|
if df.empty: |
||||||
|
raise ValueError("Dataframe is empty!") |
||||||
|
except Exception as e: |
||||||
|
logger.warning(f"Failed to create report df: {extract_inst.sheet_name}:\n{e}") |
||||||
|
return False |
||||||
|
append_to_consolidated_report(file, df, extract_inst.sheet_name) |
||||||
|
return True |
||||||
|
except Exception as e: |
||||||
|
logger.exception(f"could not process {file}:\n{e}") |
||||||
|
return False |
||||||
|
|
||||||
|
def process_folder(folder: ReportFolder): |
||||||
|
|
||||||
|
# Search recurively through date directories |
||||||
|
report_date: dt = dt(2023, 1, 1) |
||||||
|
while report_date.date() < dt.now().date(): |
||||||
|
logger.info(f"{folder.folder_name} | Processing date: {report_date}") |
||||||
|
report_folder: Path = Path(TOP_PATH, |
||||||
|
report_date.strftime("%Y.%m"), |
||||||
|
report_date.strftime("%Y.%m.%d"), |
||||||
|
folder.folder_name |
||||||
|
) |
||||||
|
logger.debug(f"report_folder: {report_folder}") |
||||||
|
if report_folder.exists(): |
||||||
|
for xi in folder.extraction_methods: |
||||||
|
try: |
||||||
|
report_file: Path = next(report_folder.glob(f"*{xi.input_regex}*")) |
||||||
|
logger.debug(f"Report file: {report_file}") |
||||||
|
except Exception as e: |
||||||
|
logger.debug(f"Could not get report_file: {report_folder.glob(f'*{xi.input_regex}*')} \n{e}") |
||||||
|
continue |
||||||
|
try: |
||||||
|
success = process_report(report_file, xi) |
||||||
|
if success: |
||||||
|
logger.info(f"Report Processed: {report_file} | {xi.sheet_name}") |
||||||
|
else: |
||||||
|
logger.warning(f"Failed to process report: {report_file} | {xi.sheet_name}") |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
logger.exception(f"Could not process report ({report_file}) :\n{e}") |
||||||
|
continue |
||||||
|
else: |
||||||
|
logger.debug(f"Folder '{report_folder}' does not exist!") |
||||||
|
report_date = report_date + timedelta(days=1) |
||||||
|
logger.debug(f"Finished scanning {folder.folder_name}!") |
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
|
||||||
|
logger = create_logger(f"BackReporting.log") |
||||||
|
|
||||||
|
try: |
||||||
|
|
||||||
|
|
||||||
|
FOLDERS = [ |
||||||
|
ReportFolder("ACH", [ |
||||||
|
ExtractInstruction("_ACH_", "ACH", ilr.ach), |
||||||
|
]), |
||||||
|
ReportFolder("CHECKS LIVE", [ |
||||||
|
ExtractInstruction("_PROGPAY_BER", "CHECKS LIVE", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
ReportFolder("CREDIT CARDS", [ |
||||||
|
ExtractInstruction("_VMCC_BER", "CREDIT CARDS", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
ReportFolder("LOCKBOX", [ |
||||||
|
ExtractInstruction("_LOCKBOX_\d+_", "LOCKBOX", ilr.lockbox) |
||||||
|
]), |
||||||
|
ReportFolder("PAY BY PHONE", [ |
||||||
|
ExtractInstruction("_PBP_EPAY_DPS_BER", "PAY BY PHONE", ilr.lockbox) |
||||||
|
]), |
||||||
|
ReportFolder("RETURN REPORTING", [ |
||||||
|
ExtractInstruction("_PBP_EPAY_RETURNS_BER", "RETURNS ACH", ilr.payment_transactions), |
||||||
|
ExtractInstruction("_RETURNS_BER", "RETURNS PORTAL", ilr.payment_transactions)] |
||||||
|
), |
||||||
|
ReportFolder("WIRES", [ |
||||||
|
ExtractInstruction("MTBWIRE_BER", "WIRE", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
] |
||||||
|
|
||||||
|
for folder in tqdm(FOLDERS): |
||||||
|
try: |
||||||
|
process_folder(folder) |
||||||
|
print(f"Completed: {folder.folder_name}") |
||||||
|
except Exception as e: |
||||||
|
print(f"Failed to process {folder.folder_name} \n {e}") |
||||||
|
continue |
||||||
|
input("Complete!") |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
logger.error(f"Program failed:\n{e}") |
||||||
|
input(f"") |
||||||
Loading…
Reference in new issue