|
|
|
|
@ -7,18 +7,40 @@ from pathlib import Path |
|
|
|
|
import numpy as np |
|
|
|
|
from glob import glob |
|
|
|
|
from logging import debug, DEBUG, basicConfig, warn, error |
|
|
|
|
from hashlib import md5 |
|
|
|
|
import openpyxl as pxl |
|
|
|
|
from tomllib import load |
|
|
|
|
|
|
|
|
|
# V3.2 | 04/21/23 |
|
|
|
|
# V332 | 05/24/23 |
|
|
|
|
|
|
|
|
|
with open("settings.json") as s: |
|
|
|
|
settings = json.loads(s.read()) |
|
|
|
|
with open("settings.toml", mode='rb') as s: |
|
|
|
|
settings = load(s) |
|
|
|
|
if settings["debug"]: |
|
|
|
|
basicConfig(filename='debug.log', encoding='utf-8', level=DEBUG) |
|
|
|
|
basicConfig(filename='debug.log', filemode='w',encoding='utf-8', |
|
|
|
|
format="%(asctime)s %(message)s", |
|
|
|
|
level=DEBUG) |
|
|
|
|
|
|
|
|
|
# contract numbers are a common feature in many reports to it's |
|
|
|
|
# useful to have the regex for them globally avaiable |
|
|
|
|
contract_number_regex = "\d{3}-\d{7}-\d{3}" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def extract_date_path(path: Path) -> Path|None: |
|
|
|
|
""" |
|
|
|
|
Used to get the month folder for a report |
|
|
|
|
""" |
|
|
|
|
date_pattern = re.compile(r'^\d{4}\.\d{2}$') |
|
|
|
|
|
|
|
|
|
for parent in path.parents: |
|
|
|
|
if date_pattern.match(parent.name): |
|
|
|
|
return parent |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def hash_cols(row: pd.Series, cols_to_hash: list[str]) -> str: |
|
|
|
|
md5_hash = md5() |
|
|
|
|
md5_hash.update((''.join(str(row[col]) for col in cols_to_hash)).encode('utf-8')) |
|
|
|
|
return md5_hash.hexdigest() |
|
|
|
|
|
|
|
|
|
class ILReport: |
|
|
|
|
""" |
|
|
|
|
InfoLease Report class will be used to work with the files. |
|
|
|
|
@ -57,7 +79,8 @@ class ILReport: |
|
|
|
|
self._append_to_consolidated_report(dataframe, settings["consolidatedBasePath"]) |
|
|
|
|
return dataframe |
|
|
|
|
|
|
|
|
|
def _append_to_consolidated_report(self, dataframe_to_append: DataFrame, base_path: str): |
|
|
|
|
def _append_to_consolidated_report(self, dataframe_to_append: DataFrame, |
|
|
|
|
reports_base_path: Path = None): |
|
|
|
|
""" |
|
|
|
|
""" |
|
|
|
|
# Decide the sheet name based on the save_location_name |
|
|
|
|
@ -79,54 +102,88 @@ class ILReport: |
|
|
|
|
elif re.search("(?i)RETURNS_BER", self.location) != None: |
|
|
|
|
sheet_name = "RETURNS Portal" |
|
|
|
|
else: |
|
|
|
|
debug(f"No consolidated report for {self.location}!") |
|
|
|
|
return None |
|
|
|
|
il_report_path: Path = Path(self.location) |
|
|
|
|
debug(f"{il_report_path=}") |
|
|
|
|
|
|
|
|
|
month_dir: Path|None = extract_date_path(il_report_path) |
|
|
|
|
if month_dir is None and reports_base_path is None: |
|
|
|
|
warn(f"Consolidated report not created! No valid base path: {il_report_path} | {reports_base_path}") |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
report_date: str = dt.now().date() \ |
|
|
|
|
if re.search(r"^\d{4}\.\d{2}\.\d{2}$",il_report_path.parent.parent.name) is None\ |
|
|
|
|
else il_report_path.parent.parent.name.replace('.','/') |
|
|
|
|
debug(f"{report_date=}") |
|
|
|
|
report_month:str = il_report_path.parents[2].name.replace('.','-') |
|
|
|
|
|
|
|
|
|
if month_dir is None: |
|
|
|
|
|
|
|
|
|
if reports_base_path is None: |
|
|
|
|
warn(f"Consolidated report not created! Could not find month folder: {il_report_path} | {reports_base_path}") |
|
|
|
|
return None |
|
|
|
|
else: |
|
|
|
|
month_dir = reports_base_path |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
current_date: list(str) = dt.now().strftime("%Y.%m.%d").split('.') |
|
|
|
|
report_name = f"{dt.now().strftime('%B')}_ConsolidatedReport.xlsx" |
|
|
|
|
debug(f"Consolidated Reports {report_name} | {self.output_location} | {self.x_method} | {current_date}") |
|
|
|
|
year = current_date[0] |
|
|
|
|
month = current_date[1] |
|
|
|
|
year_month = f"{year}.{month}" |
|
|
|
|
|
|
|
|
|
save_path = f"{base_path}/{year}/{year_month}/{report_name}" |
|
|
|
|
# Check if the current month has a consolidated report |
|
|
|
|
month_summary_file: list(str) = glob(save_path) |
|
|
|
|
if len(month_summary_file) == 0: |
|
|
|
|
|
|
|
|
|
report_name: Path = Path(f"{report_month} ConsolidatedReport.xlsx") |
|
|
|
|
debug(f"{month_dir=}") |
|
|
|
|
debug(f"{report_name=}") |
|
|
|
|
save_path = Path(month_dir, report_name) |
|
|
|
|
debug(f"Consolidated Report {save_path=}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
consolidated_df = dataframe_to_append.copy(deep=True) |
|
|
|
|
consolidated_df["ExtractDate"] = report_date |
|
|
|
|
consolidated_df.fillna('--', inplace=True) |
|
|
|
|
consolidated_df["Hash"] = consolidated_df.apply( |
|
|
|
|
lambda r: hash_cols(r,r.keys()) |
|
|
|
|
, axis=1 |
|
|
|
|
) |
|
|
|
|
consolidated_df.replace("--", None, inplace=True) |
|
|
|
|
debug(consolidated_df) |
|
|
|
|
|
|
|
|
|
if not save_path.exists(): |
|
|
|
|
debug(f"Consolidated Report | No monthly summary file!\n\tCreating: {save_path}") |
|
|
|
|
# No file exists yet |
|
|
|
|
# Create it and add the current month |
|
|
|
|
try: |
|
|
|
|
with pd.ExcelWriter(save_path) as writer: |
|
|
|
|
debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
|
|
|
|
dataframe_to_append.to_excel(writer, index=False, sheet_name=sheet_name) |
|
|
|
|
consolidated_df.to_excel(writer, index=False, sheet_name=sheet_name) |
|
|
|
|
except Exception as e: |
|
|
|
|
error(f"[E] Failed to create consolidated report! {sheet_name}:\n{e}") |
|
|
|
|
error(f"Failed to create to consolidated report! {report_name} | {sheet_name} | {il_report_path} :\n{e}") |
|
|
|
|
|
|
|
|
|
else: |
|
|
|
|
# We need to read the dataframe in the current monthly report |
|
|
|
|
# Check that we are not adding matching data |
|
|
|
|
# Save the new report |
|
|
|
|
#FIXME: This is so hacky it's embaressing |
|
|
|
|
add_headers = False |
|
|
|
|
try: |
|
|
|
|
current_data: DataFrame = pd.read_excel(month_summary_file[0], sheet_name=sheet_name) |
|
|
|
|
new_data_len = len(dataframe_to_append) |
|
|
|
|
cur_first_col = current_data.iloc[len(current_data)-new_data_len:,0].to_list() |
|
|
|
|
new_first_col = dataframe_to_append.iloc[:,0].to_list() |
|
|
|
|
if cur_first_col == new_first_col: |
|
|
|
|
debug(f"Consolidated Report | Data is same as previous! Skipping!") |
|
|
|
|
return None |
|
|
|
|
except ValueError as ve: |
|
|
|
|
ve == ValueError(f"Worksheet named '{sheet_name} not found") |
|
|
|
|
current_data = [] |
|
|
|
|
add_headers = True |
|
|
|
|
# We need to find the start cols (where the new data should go) |
|
|
|
|
try: |
|
|
|
|
with pd.ExcelWriter(save_path, engine='openpyxl', mode='a',if_sheet_exists="overlay") as writer: |
|
|
|
|
debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
|
|
|
|
dataframe_to_append.to_excel(writer, index=False, sheet_name=sheet_name,startrow=len(current_data),header=add_headers) |
|
|
|
|
except Exception as e: |
|
|
|
|
error(f"[E] Failed to append to consolidated report! {sheet_name}:\n{e}") |
|
|
|
|
debug(f"{save_path} already exisits.") |
|
|
|
|
# Get the current worksheets |
|
|
|
|
wb: pxl.Workbook = pxl.open(save_path, read_only=True) |
|
|
|
|
current_sheets = wb.worksheets |
|
|
|
|
wb.close() |
|
|
|
|
with pd.ExcelWriter(save_path, engine='openpyxl', mode='a',if_sheet_exists="overlay") as writer: |
|
|
|
|
|
|
|
|
|
if sheet_name in [w.title for w in current_sheets]: |
|
|
|
|
debug(f"{sheet_name} sheet already exisits.") |
|
|
|
|
# We need to read the dataframe in the current monthly report |
|
|
|
|
# Check that we are not adding duplicate data |
|
|
|
|
try: |
|
|
|
|
# Merge the current data and drop duplicates |
|
|
|
|
prev_data: DataFrame = pd.read_excel(save_path, sheet_name=sheet_name) |
|
|
|
|
debug(f"Prev:\n{prev_data}") |
|
|
|
|
consolidated_df = consolidated_df[~(consolidated_df["Hash"].isin(prev_data["Hash"]) )] |
|
|
|
|
debug(f"New data:\n{consolidated_df}") |
|
|
|
|
debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}") |
|
|
|
|
consolidated_df.to_excel(writer, index=False, sheet_name=sheet_name, header=False, |
|
|
|
|
startrow=prev_data.shape[0]+1) |
|
|
|
|
except Exception as e: |
|
|
|
|
error(f"Failed to append to consolidated report! {report_name} | {sheet_name} | {il_report_path} :\n{e}") |
|
|
|
|
else: |
|
|
|
|
consolidated_df.to_excel(writer, index=False, sheet_name=sheet_name) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_line_divider(breakage_list: list): |
|
|
|
|
|