Compare commits
5 Commits
5067678a8c
...
86afb277e5
| Author | SHA1 | Date |
|---|---|---|
|
|
86afb277e5 | 3 years ago |
|
|
9e2d960f7e | 3 years ago |
|
|
40c2a8a0df | 3 years ago |
|
|
5caaf3d7ac | 3 years ago |
|
|
a3905d118e | 3 years ago |
|
Before Width: | Height: | Size: 1.1 KiB |
|
Before Width: | Height: | Size: 2.6 KiB |
|
Before Width: | Height: | Size: 477 B |
|
Before Width: | Height: | Size: 6.9 KiB |
|
Before Width: | Height: | Size: 18 KiB |
|
Before Width: | Height: | Size: 819 B |
|
Before Width: | Height: | Size: 512 B |
|
Before Width: | Height: | Size: 568 B |
|
Before Width: | Height: | Size: 1.2 KiB |
|
Before Width: | Height: | Size: 3.1 KiB |
@ -0,0 +1 @@ |
|||||||
|
name = "Test Name" |
||||||
@ -0,0 +1 @@ |
|||||||
|
{"debug": true, "consolidatedBasePath": ".", "defaultLocations": {"ach": "Z:/Business Solutions/Griff/Code/InfoLeaseExtract/InputFiles", "disp": "", "gl": "", "lb": "Z:/Business Solutions/Griff/Code/InfoLeaseExtract/InputFiles", "minv": "", "niv": "", "ren": "", "pymt": "Z:/Business Solutions/Griff/Code/InfoLeaseExtract/InputFiles", "uap": "", "pastdue": ""}} |
||||||
@ -0,0 +1,242 @@ |
|||||||
|
from pathlib import Path |
||||||
|
import re |
||||||
|
from re import Pattern |
||||||
|
import pandas as pd |
||||||
|
from pandas import DataFrame, ExcelWriter, read_excel |
||||||
|
from datetime import datetime as dt, timedelta |
||||||
|
import logging |
||||||
|
import il_reports as ilr |
||||||
|
from dataclasses import dataclass |
||||||
|
from typing import Callable |
||||||
|
from tqdm import tqdm |
||||||
|
from multiprocessing import Pool, cpu_count |
||||||
|
import os |
||||||
|
|
||||||
|
TOP_PATH: Path = Path(r"\\leafnow.com\shared\Accounting\CASH APPS\2023") |
||||||
|
|
||||||
|
class LevelFilter(object): |
||||||
|
def __init__(self, level): |
||||||
|
self.__level = level |
||||||
|
|
||||||
|
def filter(self, logRecord): |
||||||
|
return logRecord.levelno == self.__level |
||||||
|
|
||||||
|
def create_logger(logger_name: str = __name__, ): |
||||||
|
logger = logging.getLogger(logger_name) |
||||||
|
|
||||||
|
log_folder = Path(r"\\leafnow.com\shared\Business Solutions\Griff\Code\InfoLeaseExtract\logs") |
||||||
|
fail_handler = logging.FileHandler(Path(log_folder,"Fail_br.log"), 'w') |
||||||
|
fail_handler.setLevel(logging.WARNING) |
||||||
|
|
||||||
|
info_handler = logging.FileHandler(Path(log_folder,"Info_br.log"), 'w') |
||||||
|
info_handler.setLevel(logging.INFO) |
||||||
|
info_handler.addFilter(LevelFilter(logging.INFO)) |
||||||
|
|
||||||
|
debug_handler = logging.FileHandler(Path(log_folder,"Debug_br.log"), 'w') |
||||||
|
debug_handler.setLevel(logging.DEBUG) |
||||||
|
debug_handler.addFilter(LevelFilter(logging.DEBUG)) |
||||||
|
|
||||||
|
s_handler = logging.StreamHandler() |
||||||
|
s_handler.setLevel(logging.INFO) |
||||||
|
|
||||||
|
logger.addHandler(fail_handler) |
||||||
|
logger.addHandler(info_handler) |
||||||
|
logger.addHandler(debug_handler) |
||||||
|
logger.addHandler(s_handler) |
||||||
|
|
||||||
|
return logger |
||||||
|
|
||||||
|
logger = create_logger() |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class ExtractInstruction: |
||||||
|
input_regex: Pattern |
||||||
|
sheet_name: str |
||||||
|
extract_method: Callable |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class ReportFolder: |
||||||
|
folder_name: Path |
||||||
|
extraction_methods: list[ExtractInstruction] |
||||||
|
|
||||||
|
def extract_date_path(path: Path) -> Path: |
||||||
|
date_pattern = re.compile(r'^\d{4}\.\d{2}$') |
||||||
|
|
||||||
|
for parent in path.parents: |
||||||
|
if date_pattern.match(parent.name): |
||||||
|
return parent |
||||||
|
return None |
||||||
|
|
||||||
|
def append_to_consolidated_report( report_path: Path, report_df: DataFrame, sheet_name: str): |
||||||
|
""" |
||||||
|
""" |
||||||
|
|
||||||
|
report_month: Path = extract_date_path(report_path) |
||||||
|
report_name: str = f"{str(report_month.name).replace('.','-')}_{sheet_name}_ConsolidatedReport.xlsx" |
||||||
|
|
||||||
|
|
||||||
|
save_path = Path(r"\\leafnow.com\shared\Business Solutions\Griff\Code\InfoLeaseExtract\2023",report_name) |
||||||
|
logger.debug(f"{save_path=}") |
||||||
|
# Check if the current month has a consolidated report |
||||||
|
|
||||||
|
if not save_path.exists(): |
||||||
|
logger.debug(f"Consolidated Report | No monthly summary file!\n\tCreating: {save_path}") |
||||||
|
# No file exists yet |
||||||
|
# Create it and add the current month |
||||||
|
try: |
||||||
|
with pd.ExcelWriter(save_path) as writer: |
||||||
|
logger.debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
||||||
|
report_df.to_excel(writer, index=False, sheet_name=sheet_name) |
||||||
|
except Exception as e: |
||||||
|
logger.error(f"Failed to create to consolidated report! {report_name} | {sheet_name} | {report_path} :\n{e}") |
||||||
|
|
||||||
|
else: |
||||||
|
# We need to read the dataframe in the current monthly report |
||||||
|
# Check that we are not adding matching data |
||||||
|
# Save the new report |
||||||
|
#FIXME: This is so hacky it's embaressing |
||||||
|
try: |
||||||
|
current_data_len = len(pd.read_excel(save_path,sheet_name=sheet_name)) |
||||||
|
with pd.ExcelWriter(save_path, engine='openpyxl', mode='a',if_sheet_exists="overlay") as writer: |
||||||
|
logger.debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
||||||
|
report_df.to_excel(writer, index=False, sheet_name=sheet_name,startrow=current_data_len,header=False) |
||||||
|
except Exception as e: |
||||||
|
logger.error(f"Failed to append to consolidated report! {report_name} | {sheet_name} | {report_path} :\n{e}") |
||||||
|
|
||||||
|
def process_report(file: Path, extract_inst: ExtractInstruction) -> bool: |
||||||
|
|
||||||
|
try: |
||||||
|
with open(str(file), errors="replace") as f: |
||||||
|
report_str: str = f.read() |
||||||
|
#logger.debug(f"{report_str}") |
||||||
|
try: |
||||||
|
df: DataFrame = extract_inst.extract_method(report_str, None) |
||||||
|
if df.empty: |
||||||
|
raise ValueError("Dataframe is empty!") |
||||||
|
except Exception as e: |
||||||
|
logger.warning(f"Failed to create report df: {extract_inst.sheet_name}:\n{e}") |
||||||
|
return False |
||||||
|
append_to_consolidated_report(file, df, extract_inst.sheet_name) |
||||||
|
return True |
||||||
|
except Exception as e: |
||||||
|
logger.exception(f"could not process {file}:\n{e}") |
||||||
|
return False |
||||||
|
|
||||||
|
def process_folder(folder: ReportFolder): |
||||||
|
|
||||||
|
# Search recurively through date directories |
||||||
|
report_date: dt = dt(2023, 5, 1) |
||||||
|
while report_date.date() < dt.now().date(): |
||||||
|
logger.info(f"{folder.folder_name} | Processing date: {report_date}") |
||||||
|
report_folder: Path = Path(TOP_PATH, |
||||||
|
report_date.strftime("%Y.%m"), |
||||||
|
report_date.strftime("%Y.%m.%d"), |
||||||
|
folder.folder_name |
||||||
|
) |
||||||
|
logger.debug(f"report_folder: {report_folder}") |
||||||
|
if report_folder.exists(): |
||||||
|
for xi in folder.extraction_methods: |
||||||
|
try: |
||||||
|
files = report_folder.glob(f"*{xi.input_regex}*") |
||||||
|
report_file: Path = next(files) |
||||||
|
logger.debug(f"Report file: {report_file}") |
||||||
|
except IndexError as ie: |
||||||
|
logger.warning(f"No matching reports!: {ie}") |
||||||
|
except Exception as e: |
||||||
|
logger.debug(f"Could not get report_file: {report_folder.glob(f'*{xi.input_regex}*')} \n{e}") |
||||||
|
continue |
||||||
|
try: |
||||||
|
success = process_report(report_file, xi) |
||||||
|
if success: |
||||||
|
logger.info(f"Report Processed: {report_file} | {xi.sheet_name}") |
||||||
|
else: |
||||||
|
logger.warning(f"Failed to process report: {report_file} | {xi.sheet_name}") |
||||||
|
|
||||||
|
except Exception as e: |
||||||
|
logger.exception(f"Could not process report ({report_file}) :\n{e}") |
||||||
|
continue |
||||||
|
else: |
||||||
|
logger.debug(f"Folder '{report_folder}' does not exist!") |
||||||
|
report_date = report_date + timedelta(days=1) |
||||||
|
logger.debug(f"Finished scanning {folder.folder_name}!") |
||||||
|
|
||||||
|
|
||||||
|
def combine(): |
||||||
|
|
||||||
|
WORK_DIR = Path(r"\\leafnow.com\shared\Business Solutions\Griff\Code\InfoLeaseExtract\2023") |
||||||
|
|
||||||
|
REPORTS = [ |
||||||
|
"ACH", |
||||||
|
"CHECKS LIVE", |
||||||
|
"CREDIT CARDS", |
||||||
|
"PAY BY PHONE", |
||||||
|
"WIRE", |
||||||
|
"RETURNS ACH", |
||||||
|
"RETURNS PORTAL" |
||||||
|
] |
||||||
|
|
||||||
|
for i in range(1,6): |
||||||
|
|
||||||
|
month = f"2023-0{i}" |
||||||
|
mcr: Path = Path(f"{month} Consolidated Report.xlsx") |
||||||
|
print(f"Creating monthly consolidated report: {mcr}") |
||||||
|
with ExcelWriter(Path(WORK_DIR, "Monthly", mcr), engine="xlsxwriter") as wrtr: |
||||||
|
for r in REPORTS: |
||||||
|
report_path: Path = Path(WORK_DIR, f"{month}_{r}_ConsolidatedReport.xlsx") |
||||||
|
print(f"Report Path ({r}): {report_path}") |
||||||
|
|
||||||
|
rdf: DataFrame = read_excel(report_path, sheet_name=r) |
||||||
|
|
||||||
|
rdf.to_excel(wrtr, sheet_name=r, freeze_panes=(1,0), index=False) |
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
FOLDERS = [ |
||||||
|
ReportFolder("ACH", [ |
||||||
|
ExtractInstruction("_ACH_", "ACH", ilr.ach), |
||||||
|
]), |
||||||
|
ReportFolder("CHECKS LIVE", [ |
||||||
|
ExtractInstruction("_PROGPAY_BER", "CHECKS LIVE", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
ReportFolder("CREDIT CARDS", [ |
||||||
|
ExtractInstruction("_VMCC_BER", "CREDIT CARDS", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
ReportFolder("LOCKBOX", [ |
||||||
|
ExtractInstruction("_LOCKBOX_\d+_", "LOCKBOX", ilr.lockbox) |
||||||
|
]), |
||||||
|
ReportFolder("PAY BY PHONE", [ |
||||||
|
ExtractInstruction("_PBP_EPAY_DPS_BER", "PAY BY PHONE", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
ReportFolder("RETURN REPORTING", [ |
||||||
|
ExtractInstruction("_PBP_EPAY_RETURNS_BER", "RETURNS ACH", ilr.payment_transactions), |
||||||
|
ExtractInstruction("_RETURNS_BER", "RETURNS PORTAL", ilr.payment_transactions)] |
||||||
|
), |
||||||
|
ReportFolder("WIRES", [ |
||||||
|
ExtractInstruction("MTBWIRE_BER", "WIRE", ilr.payment_transactions) |
||||||
|
]), |
||||||
|
] |
||||||
|
|
||||||
|
process_folder(FOLDERS[0]) |
||||||
|
# with Pool(cpu_count()) as pool: |
||||||
|
# for folder in tqdm(pool.imap_unordered(process_folder,FOLDERS)): |
||||||
|
# try: |
||||||
|
# print(f"Completed!") |
||||||
|
# except Exception as e: |
||||||
|
# print(f"Failed to process\n {e}") |
||||||
|
# continue |
||||||
|
# for folder in tqdm(FOLDERS): |
||||||
|
# try: |
||||||
|
# process_folder(folder) |
||||||
|
# print(f"Completed: {folder.folder_name}") |
||||||
|
# except Exception as e: |
||||||
|
# print(f"Failed to process {folder.folder_name} \n {e}") |
||||||
|
# continue |
||||||
|
# input("Complete!") |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
combine() |
||||||
|
|
||||||
@ -0,0 +1,12 @@ |
|||||||
|
{ |
||||||
|
"name": { |
||||||
|
"report": "", |
||||||
|
"excel": "" |
||||||
|
}, |
||||||
|
"relative_position": { |
||||||
|
"rows": 0, |
||||||
|
"col": 0 |
||||||
|
}, |
||||||
|
"length": 0, |
||||||
|
"data_type": "int" |
||||||
|
} |
||||||
@ -0,0 +1,184 @@ |
|||||||
|
from typing import TypeAlias, TypeVar |
||||||
|
from dataclasses import dataclass |
||||||
|
from pathlib import Path |
||||||
|
import pathlib as pl |
||||||
|
from abc import ABC, abstractmethod, abstractproperty |
||||||
|
from re import search, match, compile, Match, Pattern |
||||||
|
from enum import Enum |
||||||
|
|
||||||
|
ColumnIndex: TypeAlias = int |
||||||
|
Money: TypeAlias = float |
||||||
|
|
||||||
|
Numeric = TypeVar("Numeric", float, int) |
||||||
|
|
||||||
|
class Line(Enum): |
||||||
|
Header: str |
||||||
|
Data: str |
||||||
|
Erroneous: str |
||||||
|
Top: str |
||||||
|
Bottom: str |
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass |
||||||
|
class RelativePosition: |
||||||
|
""" |
||||||
|
Coordinates for navigating from one point in a row to another |
||||||
|
""" |
||||||
|
rows: int |
||||||
|
col: ColumnIndex |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class DataValue: |
||||||
|
|
||||||
|
position: RelativePosition |
||||||
|
length : int |
||||||
|
regex: Pattern |
||||||
|
dtype: type |
||||||
|
|
||||||
|
def correct_line(self, adj_lines_since_header: int) -> bool: |
||||||
|
""" |
||||||
|
""" |
||||||
|
return adj_lines_since_header % self.position.rows == 0 |
||||||
|
|
||||||
|
def _line_slice(self, line: Line.Data) -> str|None: |
||||||
|
""" |
||||||
|
Attempts to get the data from the line. |
||||||
|
Returns string in correct postion or None if out of range. |
||||||
|
""" |
||||||
|
try: |
||||||
|
start: int = self.position.col |
||||||
|
end: int = start + self.length |
||||||
|
line_slice: str = line[start:end] |
||||||
|
except IndexError: |
||||||
|
#TODO: Add logging |
||||||
|
line_slice = None |
||||||
|
finally: |
||||||
|
return line_slice |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def _to_float(number_str: str) -> float|None: |
||||||
|
try: |
||||||
|
f_value:float = float(number_str.replace(',','')) |
||||||
|
return f_value |
||||||
|
except: |
||||||
|
return None |
||||||
|
|
||||||
|
def extract(self, line: Line.Data) -> type|None: |
||||||
|
""" |
||||||
|
""" |
||||||
|
line_slice: str|None = self._line_slice(line) |
||||||
|
if isinstance(line_slice, None): |
||||||
|
return None |
||||||
|
|
||||||
|
value_match: Match|None = search(self.regex, line_slice) |
||||||
|
if isinstance(value_match, None): |
||||||
|
return None |
||||||
|
|
||||||
|
value_str: str = value_match.group() |
||||||
|
|
||||||
|
value_str.strip() |
||||||
|
if self.dtype == int or self.dtype == float: |
||||||
|
return self._to_float(value_str) |
||||||
|
#TODO datetime |
||||||
|
return value_str |
||||||
|
|
||||||
|
class DataSet: |
||||||
|
|
||||||
|
def __init__(self, config: dict) -> None: |
||||||
|
self.r_name = config["naming"]["report"] |
||||||
|
try: |
||||||
|
self.e_name = config["naming"]["excel"] |
||||||
|
except KeyError: |
||||||
|
self.e_name = self.r_name |
||||||
|
|
||||||
|
self.data_value: DataValue = DataValue( |
||||||
|
position = RelativePosition( |
||||||
|
rows= config["relative_position"]["rows"], |
||||||
|
col= config["relative_position"]["col"] |
||||||
|
), |
||||||
|
length = config["length"], |
||||||
|
dtype = config["data_type"], |
||||||
|
) |
||||||
|
|
||||||
|
def line_position(self, line: str) -> ColumnIndex|None: |
||||||
|
""" |
||||||
|
Searches a line for the report header for this dataset. |
||||||
|
|
||||||
|
Returns: |
||||||
|
- ColumnIndex(int) | None: The column index of the matches end position |
||||||
|
or None if no match was found |
||||||
|
""" |
||||||
|
header_match: Match|None = search(self.r_name, line) |
||||||
|
return header_match.end() if isinstance(header_match, Match) else None |
||||||
|
|
||||||
|
@dataclass |
||||||
|
class ReportConfig: |
||||||
|
|
||||||
|
file_extension: str |
||||||
|
name: str |
||||||
|
datasets: list[DataSet] |
||||||
|
data_line_regexes: list[Pattern] |
||||||
|
|
||||||
|
|
||||||
|
class ILReport(ABC): |
||||||
|
|
||||||
|
def __init__(self, file_path: Path, report_config: ReportConfig) -> None: |
||||||
|
self.in_file_path: Path = file_path |
||||||
|
self.line_gen = self._line_generator(file_path) |
||||||
|
|
||||||
|
self.config: ReportConfig = report_config |
||||||
|
self.name = report_config.name |
||||||
|
|
||||||
|
self.line_type_history: list[Line] = [] |
||||||
|
self.last_header_line: int|None = None |
||||||
|
|
||||||
|
self.data_dict: dict = { |
||||||
|
header.e_name: [] |
||||||
|
for header in self.config.datasets |
||||||
|
} |
||||||
|
|
||||||
|
@staticmethod |
||||||
|
def _line_generator(file_path: Path): |
||||||
|
with open(file_path, 'r') as in_file: |
||||||
|
line: str |
||||||
|
for line in in_file.readlines(): |
||||||
|
yield line |
||||||
|
|
||||||
|
def _add_line_history(self, line: Line, max_history: int = 10): |
||||||
|
self.line_type_history.append(line) |
||||||
|
while len(self.line_type_history) > max_history: |
||||||
|
self.line_type_history.pop(0) |
||||||
|
|
||||||
|
def _is_header_line(self, line: str) -> bool: |
||||||
|
""" |
||||||
|
Checks whether a report line has data headers. |
||||||
|
""" |
||||||
|
regex: Pattern |
||||||
|
for regex in self.config.data_line_regexes: |
||||||
|
if isinstance(search(regex,line), Match): |
||||||
|
return True |
||||||
|
return False |
||||||
|
|
||||||
|
@abstractmethod |
||||||
|
def _skip_line(self, line) -> bool: |
||||||
|
""" |
||||||
|
Tells whether we should skip this line |
||||||
|
""" |
||||||
|
|
||||||
|
@abstractmethod |
||||||
|
def _process_line(self): |
||||||
|
""" |
||||||
|
|
||||||
|
""" |
||||||
|
|
||||||
|
@abstractmethod |
||||||
|
def _process_dataline(self, dataline: Line.Data): |
||||||
|
""" |
||||||
|
""" |
||||||
|
|
||||||
|
# Search the row for a data set name, or list of data set names |
||||||
|
# extract all the data until the next row |
||||||
|
|
||||||
|
if __name__ == "__main__": |
||||||
|
datasets = [] |
||||||
@ -1 +0,0 @@ |
|||||||
{"debug": false, "consolidatedBasePath": "leafnow.com/shared/cashapps", "defaultLocations": {"ach": "", "disp": "", "gl": "", "lb": "", "minv": "", "niv": "", "ren": "", "pymt": "", "uap": "", "pastdue": ""}} |
|
||||||