A PyQT GUI application for converting InfoLease report outputs into Excel files. Handles parsing and summarizing. Learns where files are meant to be store and compiles monthly and yearly summaries.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
InfoLeaseExtract/ILExtract.py

972 lines
43 KiB

import pandas as pd
from pandas import DataFrame
from datetime import datetime as dt
import json
import re
from pathlib import Path
import numpy as np
from glob import glob
from logging import debug, DEBUG, basicConfig, warn, error
# V3.1 | 01/19/23
with open("settings.json") as s:
settings = json.loads(s.read())
if settings["debug"]:
basicConfig(filename='debug.log', encoding='utf-8', level=DEBUG)
# contract numbers are a common feature in many reports to it's
# useful to have the regex for them globally avaiable
contract_number_regex = "\d{3}-\d{7}-\d{3}"
class ILReport:
"""
InfoLease Report class will be used to work with the files.
It makes it easier to add new reports to the workflow and to make it more clear where
the reports are coming from. It also helps with tracking reports that may not be ready yet.
"""
def __init__(self, location: str, extraction_function, output_location: str = None):
debug(f"ILReport:\n\tLocation: {location}\n\tExtract Function: {extraction_function}\n\tOutput Location: {output_location}")
# The location where the InfoLease report is stored
self.location = location
# If output location not specified, save to the input location
if output_location == None:
self.output_location = Path(location).parent.absolute()
debug(f"ILReport using Parent path for output: {self.output_location}")
else:
self.output_location = output_location
# The function used to extract the data from the report
self.x_method = extraction_function
def process(self):
debug("ILReport: Reading file")
try:
# Open the file and read it to a string | errors = 'replace' deals with non UTF-8 characters (no affect on output)
with open(self.location, errors="replace") as ifile:
report = ifile.read()
report = report.replace("^"," ")
except IOError as ioe:
warn(f"ILReport: Failed to open file: {self.location}\n{ioe}")
return 1
debug("ILReport: Starting parsing process")
dataframe: DataFrame = self.x_method(report, self.output_location)
if dataframe.empty:
warn(f"ILReport: resulting dataframe was empty! Exiting with None.")
return dataframe
self._append_to_consolidated_report(dataframe, settings["consolidatedBasePath"])
return dataframe
def _append_to_consolidated_report(self, dataframe_to_append: DataFrame, base_path: str):
"""
"""
# Decide the sheet name based on the save_location_name
# We only add certain types to the consolidated report
if re.search("(?i)ach", self.location) != None:
sheet_name = "ACH"
elif re.search("(?i)progpay_ber", self.location) != None:
sheet_name = "CHECKS LIVE"
elif re.search("(?i)vmcc", self.location) != None:
sheet_name = "CREDIT CARDS"
elif re.search("(?i)lockbox", self.location) != None:
sheet_name = "LOCKBOX"
elif re.search("(?i)epay", self.location) != None:
sheet_name = "PAY BY PHONE"
elif re.search("(?i)wires", self.location) != None:
sheet_name = "WIRES"
else:
return None
current_date: list(str) = dt.now().strftime("%Y.%m.%d").split('.')
report_name = f"{dt.now().strftime('%B')}_ConsolidatedReport.xlsx"
debug(f"Consolidated Reports {report_name} | {self.output_location} | {self.x_method} | {current_date}")
year = current_date[0]
month = current_date[1]
year_month = f"{year}.{month}"
save_path = f"{base_path}/{year}/{year_month}/{report_name}"
# Check if the current month has a consolidated report
month_summary_file: list(str) = glob(save_path)
if len(month_summary_file) == 0:
debug(f"Consolidated Report | No monthly summary file!\n\tCreating: {save_path}")
# No file exists yet
# Create it and add the current month
try:
with pd.ExcelWriter(save_path) as writer:
debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}")
dataframe_to_append.to_excel(writer, index=False, sheet_name=sheet_name)
except Exception as e:
error(f"[E] Failed to create consolidated report! {sheet_name}:\n{e}")
else:
# We need to read the dataframe in the current monthly report
# Check that we are not adding matching data
# Save the new report
current_data: DataFrame = pd.read_excel(month_summary_file[0], sheet_name=sheet_name)
new_data_len = len(dataframe_to_append)
cur_first_col = current_data.iloc[len(current_data)-new_data_len:,0].to_list()
new_first_col = dataframe_to_append.iloc[:,0].to_list()
if cur_first_col == new_first_col:
debug(f"Consolidated Report | Data is same as previous! Skipping!")
return None
# We need to find the start cols (where the new data should go)
try:
with pd.ExcelWriter(save_path, engine='openpyxl', mode='a',if_sheet_exists="overlay") as writer:
debug(f"Consolidated Report | {sheet_name}: Saving data as: {report_name}")
dataframe_to_append.to_excel(writer, index=False, sheet_name=sheet_name,startrow=len(current_data),header=False)
except Exception as e:
error(f"[E] Failed to append to consolidated report! {sheet_name}:\n{e}")
def create_line_divider(breakage_list: list):
"""
This allows for the creation of a custom data extractor
Breakage list defines the split points that will be used for the line
Example
Given breakage_list [10, 20, 30]
using slot_num 0 in the resulting extract_line_slot will yield
characters 0 - 10 from the string.
Slot 1 would give characters 10 - 20
"""
def extract_line_slot(slot_num : int, line_string: str):
"""
Pulls data from a line/string using break points defined by the
parent function.
ONLY USE THIS FUNCTION THROUGH CREATION USING 'create_line_extractor'
Will automatically convert numbers to floats
"""
# We can't have a slot number higher than the number of slots
assert(slot_num < len(breakage_list)+1)
low_range = 0 if slot_num == 0 else breakage_list[slot_num-1]
high_range = len(line_string) if slot_num == len(breakage_list) else breakage_list[slot_num]
# In order to create a float we need to remove the , from the string
data = line_string[low_range:high_range].strip().replace(",", "")
try: data = float(data)
except: pass
debug(f"Slot num: {slot_num} | Low: {low_range} | High: {high_range} | Data: {data}")
return data
return extract_line_slot
######################################################################################################################
# #
# EXTRACTION FUNCTIONS: used to pull data out of specific InfoLease report types #
# #
######################################################################################################################
"""
COMMON EXTRACTION COMPONENTS/FEATURES:
- lines = report.splitlines() : splits the reports into a list of lines (based on \n line breaks in document)
- extracted_data_dict : this is a dictionary that will hold the extracted data and will be used to create the dataframe
- columns = list(extracted_data_dict.keys()) : breaks the extracted_data_dict into a list of its keys (excel column heads)
- data_extractor = create_line_divider([#,#,#,#,#]): This creates a function we can use to pull data from a line based on
its 'slot position'. A slot position is the characters between the numbers specified in the list passed into the function
- for line in enumerate(lines): iterates through each line in the document. Line is a tuple of (line number, line string)
having the line number can be very useful when we need to access data in adjacent lines
- line# = list(zip(columns[#:#],[i for i in range(#,#)])): This creates a list with the tuple (column name, slot number).
It allows us to iterate through this list and make sure the correct data slots are being used for each column/key in the
data dictionary
COMMON REGEX COMPONENTS
\d : any digit [0-9]
\D : any character that is not a digit
\s : whitespace
. : any character besides newline (\n)
{#}: # number of the preceding character
* : 0 or more repetitions of the preceding character
"""
def ach(report: str, save_name: str):
debug(f"ACH Report {save_name} :\n{report}")
lines = report.splitlines()
extracted_data_dict = {
"ContractNumber" : [],
"CustomerName" : [],
"BankCode" : [],
"BankNumber": [],
"AccountNumber" : [],
"Payment" : [],
"Batch": [],
"Lessor": [],
"PaymentDate": [],
}
columns = list(extracted_data_dict.keys())
batches = {
"batch_num": [],
"payment_date": [],
"lessor": [],
"total": []
}
data_extractor = create_line_divider([19,57,67,82,104])
bank_number_regex = "\d{9}"
batch_num_regex = "BATCH \d{4} TOTAL"
for index, line in enumerate(lines):
# Check for a contract number and a bank number in the line
if (re.search(contract_number_regex, line) != None) & (re.search(bank_number_regex, line) != None):
debug(f"ACH {index}: Found a contract or bank number:\n{line}")
# Iterates through the columns list and adds the corresponding slot number to the dictonary for the column
# Here the order of the columns (keys in dictonary) matter since they need to be in the same order as
# the slot numbers
[extracted_data_dict[columns[c]].append(data_extractor(c, line)) for c in range(0, len(columns)-3)]
# This searches for a statement that looks like a batch number
# This sums the contracts by thier lessor code. A feature requested by cash apps
if re.search(batch_num_regex, line) != None:
debug(f"ACH {index}: Found a batch number:\n{line}")
# Batch number is always in characters 96 to 101
batches["batch_num"].append(line[96:101])
# Payment date will be 2 lines below that between charactes 114 and 125
batches["payment_date"].append(lines[index+2][114:125])
# Lessor is just the first three number sof the contract number
batches["lessor"].append(extracted_data_dict["ContractNumber"][-1][0:3])
# Total is a number given by the report for that batch. ',' is removed so that it can be transformed into a float
batches["total"].append(float(line[107:125].strip().replace(",", "")))
# Any time there's a new batch we need to add this data to the dictionary up up to the currrent place
# So we iterate over the number of contracts and add in the newest value for each that don't have one of these values already
[extracted_data_dict["Batch"].append(batches["batch_num"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Batch"])))]
[extracted_data_dict["Lessor"].append(batches["lessor"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Lessor"])))]
[extracted_data_dict["PaymentDate"].append(batches["payment_date"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["PaymentDate"])))]
# Now the dictioanry lists should all be equal lengths and we can create a dataframe
dataframe: DataFrame = DataFrame(extracted_data_dict)
# We're creating two sheets: data & summary so we need to open and excel writer
# This also helps with a bug caused by larger dataframes
with pd.ExcelWriter(save_name) as writer:
debug(f"ACH: Saving data as: {save_name}")
dataframe.to_excel(writer, index=False, sheet_name="data")
# The batches dictioanry is converted to a dataframe and added as it's own sheet
DataFrame(batches).to_excel(writer, index=False, sheet_name="Summary")
return dataframe
def disposition(report: str, save_name: str):
debug(f"Disp Report {save_name} :\n{report}")
lines = report.splitlines()
extracted_data_dict = {
"ContractNumber" : [],
"Amount Rec" : [],
"Trans Num" : [],
"Date RCVD": [],
"Date Posted" : [],
"Last Pymt Due" : [],
"Date Due" : [],
"Residual Amt" : [],
"Term Date" : [],
"Total Pastdue" : [],
"Customer Name" : [],
}
columns = list(extracted_data_dict.keys())
data_extractor = create_line_divider([15,32,41, 51, 61, 79,88, 103, 114])
for index, line in enumerate(lines):
if re.search(contract_number_regex, data_extractor(0,line)):
debug(f"Disp {index}: Found contract number:\n{line}")
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0, len(columns)-1)]
# Customer name is on a seperate line so we need to grab that seperately
extracted_data_dict["Customer Name"].append(lines[index+1].strip())
dataframe = DataFrame(extracted_data_dict)
dataframe.to_excel(save_name, index=False)
return dataframe
def gainloss(report: str, save_name: str):
debug(f"GL Report {save_name} :\n{report}")
lines = report.splitlines()
extracted_data_dict = {
'REM RENT RCVB' : [],
'GUAR RESIDUAL' : [],
'ASSET VAL' : [],
'EQUITY ADDON' : [],
'CURR INT RCVB' : [],
'MISC G/L' : [],
'BLENDED INC' : [],
'CONTRACT NUMBER' : [],
'CURR RENT RCVB' : [],
'RESIDUAL' : [],
'END/SEC DEP' : [],
'SALES TAX' : [],
'INVENT CHANGE' : [],
'NET RESERVE' : [],
'LATE CHGS' : [],
'CUSTOMER NAME' : [],
'UNEARNED FIN' : [],
'UNAMORT RES' : [],
'MISC' : [],
'MISC TAX' : [],
'CASH RECEIVED' : [],
'RCV OFFSET' : [],
'GAIN/LOSS' : [],
'DISPOSITION CODE' : [],
'UNEARNED IDC' : [],
'UNPAID INT' : [],
'PENALTY FEE' : [],
'UNPAID ACCRD' : [],
'RENEWAL RCVBL' : [],
'DEF REN INC' : [],
'DEF REN INT' : [],
'EARNED IDC' : [],
'GST BOOK G/L' : [],
'UNRECOG GST' : [],
'INT EARNED' : [],
'OVER/SHORT' : [],
'OPER RCVB' : [],
'OPER BASIS' : [],
'CTD OPER DEPR' : [],
}
# Level 0: BlendedInc 6
# L1: Late CHGS 14
# L2: Gain/Loss 22
# L3: Def Ren Int 30
# l4 Over/Short 35
# L5: CTD OPER
columns = list(extracted_data_dict.keys())
# These line data are used to tell the data extrator which values to pull for each line of
# relevant data. It pairs dictionary keys with thier corresponding data slot in the line
# so that they can be iterated through during data extraction
#
# It looks confusing but makes more sense if you look at the actual Info Lease reports
# This is one of the messiest reports
line0 = list(zip(columns[0:7], [i for i in range(1,8)]))
line1 = list(zip(columns[7:15], [i for i in range(0,8)]))
line2 = list(zip(columns[15:23], [i for i in range(0,8)]))
line3 = list(zip(columns[23:31], [i for i in range(0,8)]))
# In line 4 we need to skip blank slots 3,6
line4 = list(zip(columns[31:36], [i for i in range(1,8) if i not in [3,6]]))
line5 = list(zip(columns[36:], [i for i in range(1,4)]))
data_extractor = create_line_divider([27,43,58,74,88,105,120])
for index, line in enumerate(lines):
# The line must contain a contract number and the first data slot should be a float
if (re.search(contract_number_regex, data_extractor(0,line)) != None) & (type(data_extractor(1,line)) == float) :
debug(f"GL {index}: Found contract number and float in slot 1:\n{line}")
data_section = lines[index-1:index+5]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[0])) for c in line0]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[1])) for c in line1]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[2])) for c in line2]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[3])) for c in line3]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[4])) for c in line4]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[5])) for c in line5]
df = DataFrame(extracted_data_dict)
debug(f"GL | dataframe created:\n{df}")
# The Accounting team wanted the disposotion code split into number and description so...
debug(f"GL | Splitting disp code")
df["DISPOSITION DESC"] = df['DISPOSITION CODE'].apply(lambda dc: " ".join(dc.split(" ")[1:]))
df["DISPOSITION CODE"] = df['DISPOSITION CODE'].apply(lambda dc: dc.split(" ")[0])
debug("GL | adding Fund column (first 3 of contract number)")
df["Fund"] = df["CONTRACT NUMBER"].apply(
lambda con_num: con_num[0:3])
debug("GL | Reordering dataframe")
df = df[['Fund',
'CONTRACT NUMBER',
'CUSTOMER NAME',
'DISPOSITION CODE',
'DISPOSITION DESC',
'CASH RECEIVED',
'GAIN/LOSS',
'NET RESERVE',
'REM RENT RCVB',
'RESIDUAL',
'UNAMORT RES',
'UNEARNED FIN',
'BLENDED INC',
'END/SEC DEP',
'UNEARNED IDC',
'UNPAID INT',
'MISC',
'ASSET VAL',
'CTD OPER DEPR',
'CURR INT RCVB',
'CURR RENT RCVB',
'DEF REN INC',
'DEF REN INT',
'EARNED IDC',
'EQUITY ADDON',
'GST BOOK G/L',
'GUAR RESIDUAL',
'INT EARNED',
'INVENT CHANGE',
'LATE CHGS',
'MISC G/L',
'MISC TAX',
'OPER BASIS',
'OPER RCVB',
'OVER/SHORT',
'PENALTY FEE',
'RCV OFFSET',
'RENEWAL RCVBL',
'SALES TAX',
'UNPAID ACCRD',
'UNRECOG GST',
]]
debug(f"GL | saving dataframe {save_name}:\n{df}")
df.to_excel(save_name, index=False)
return df
# Works for Net-inv-loans & NIV-after
def net_invest_trial_balance(report: str, save_name: str):
debug(f"net_inv_tb Report {save_name} :\n{report}")
lines = report.splitlines()
extracted_data_dict = {
'CUSTOMER NAME': [],
'CURR INT RCVB': [],
'UNEARNED BLENDED': [],
'BLEND NET INV': [],
'LEASE NUMBER': [],
'GROSS CONTRACT': [],
'CURR RENT RCVB': [],
'UNEARN FIN': [],
'END DEPOSIT': [],
'SEC DEPOSIT': [],
'LEASE PYMTS': [],
'TOTAL': [],
'CONTRACT STAT': [],
'PAYMENTS RCVD': [],
'REM RENT RCVB': [],
'UNEARN RESID': [],
'PROV LOSS': [],
'NET RESERVE': [],
'UNEARN INC': [],
'BAL REMAINING': [],
'RESIDUAL': [],
'UNPAID INT': [],
'NET INV': [],
'UNEARNED IDC': [],
}
columns = list(extracted_data_dict.keys())
line0 = list(zip(columns[0:4], [0, 3, 4, 5]))
line1 = list(zip(columns[4:12], [i for i in range(0, 8)]))
line2 = list(zip(columns[12:19],[i for i in range(0, 7)]))
line3 = list(zip(columns[19:], [i for i in range(1, 6)]))
data_extractor = create_line_divider([18, 32, 50, 66, 84, 100, 117,132])
for index, line in enumerate(lines):
slot1 = data_extractor(0, line)
if type(slot1) != str:
continue
if re.search(contract_number_regex, slot1) != None:
debug(f"net_inv_tb {index} | Found contract number in slot 1:\n{line}")
data_section = lines[index-1:index+3]
debug(f"net_inv_tb {index} | Data section:\n{data_section}")
# There were issues were the IL Report would have random blank lines so that needs to be checked
# and adjusted for
# A dead give away of an empty line in a data section is a line without a '.'
# Check the first data line
if data_section[0].find(".") == -1:
debug(f"net_int_tb {index} | datasection[0] is empty. Moving back")
# Move it back if empty
data_section[0] = lines[index-2]
# Now we go through each relevant data line and make sure they're not blank
for ds in enumerate(data_section):
if ds[1].find(".") == -1:
if ds[0] < len(data_section) - 1:
for i in range(ds[0], len(data_section)-1):
# This allows us to move down all the data lines after a blank data line
data_section[i] = data_section[i+1]
# This handles the last data line which goes 'out-of-bounds' of the existing data selection
data_section[3] = lines[index+3]
else:
data_section[3] = lines[index+3]
# Now that the datasection is sorted we can extract the data
# c[0] : Column name
# c[1] : Column slot number
[ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[0]) ) for c in line0 ]
[ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[1]) ) for c in line1 ]
[ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[2]) ) for c in line2 ]
[ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[3]) ) for c in line3 ]
dataframe: DataFrame = DataFrame(extracted_data_dict)
dataframe["LESSOR"] = dataframe["LEASE NUMBER"].apply(lambda con: con[0:3])
dataframe = dataframe.replace("REVOLV", np.NaN)
dataframe = dataframe.replace("ING ACCOUNT", np.NaN)
dataframe = dataframe.replace("", np.NaN)
debug(f"net_inv_tb | Dataframe complete:\n{dataframe}")
debug("net_inv_tb | Createing pivot...")
nums = ['RESIDUAL',
'UNEARN FIN',
'UNEARNED BLENDED',
'UNEARN RESID',
'SEC DEPOSIT',
'UNEARNED IDC',
'NET RESERVE',
'UNPAID INT',
'NET INV',
'BLEND NET INV',
'GROSS CONTRACT',
'PAYMENTS RCVD',
'REM RENT RCVB',
'LEASE PYMTS',
'UNEARN INC',
'TOTAL',
'CURR INT RCVB',
'PROV LOSS',
'CURR RENT RCVB',
'END DEPOSIT']
for n in nums:
dataframe[n].astype("float",copy=False)
summary = pd.pivot_table(dataframe,
values=['CUSTOMER NAME',
'RESIDUAL',
'UNEARN FIN',
'UNEARNED BLENDED',
'UNEARN RESID',
'SEC DEPOSIT',
'UNEARNED IDC',
'NET RESERVE',
'UNPAID INT',
'NET INV',
'BLEND NET INV',
'GROSS CONTRACT',
'PAYMENTS RCVD',
'REM RENT RCVB',
'LEASE PYMTS',
'UNEARN INC',
'TOTAL',
'CURR INT RCVB',
'PROV LOSS',
'CURR RENT RCVB',
'END DEPOSIT',
'BAL REMAINING',
],
aggfunc={
'CUSTOMER NAME': np.size,
'RESIDUAL': np.sum,
'UNEARN FIN': np.sum,
'UNEARNED BLENDED': np.sum,
'UNEARN RESID': np.sum,
'SEC DEPOSIT': np.sum,
'UNEARNED IDC': np.sum,
'NET RESERVE': np.sum,
'UNPAID INT': np.sum,
'NET INV': np.sum,
'BLEND NET INV': np.sum,
'GROSS CONTRACT': np.sum,
'PAYMENTS RCVD': np.sum,
'REM RENT RCVB': np.sum,
'LEASE PYMTS': np.sum,
'UNEARN INC': np.sum,
'TOTAL': np.sum,
'CURR INT RCVB': np.sum,
'PROV LOSS': np.sum,
'CURR RENT RCVB': np.sum,
'END DEPOSIT': np.sum,
'BAL REMAINING' : np.sum,
},
index="LESSOR")
summary.rename(columns={"CUSTOMER NAME": "Contract Count"}, inplace=True)
summary = summary[['Contract Count',
'BAL REMAINING',
'RESIDUAL',
'UNEARN FIN',
'UNEARNED BLENDED',
'UNEARN RESID',
'SEC DEPOSIT',
'UNEARNED IDC',
'NET RESERVE',
'UNPAID INT',
'NET INV',
'BLEND NET INV',
'GROSS CONTRACT',
'PAYMENTS RCVD',
'REM RENT RCVB',
'LEASE PYMTS',
'UNEARN INC',
'TOTAL',
'CURR INT RCVB',
'PROV LOSS',
'CURR RENT RCVB',
'END DEPOSIT',
]]
debug(f"net_inv_tb | Summary complete:\n{summary}")
dataframe = dataframe[['LESSOR',
'LEASE NUMBER',
'BAL REMAINING',
'RESIDUAL',
'UNEARN FIN',
'UNEARNED BLENDED',
'UNEARN RESID',
'SEC DEPOSIT',
'NET RESERVE',
'UNEARNED IDC',
'UNPAID INT',
'BLEND NET INV',
'NET INV',
'CUSTOMER NAME',
'GROSS CONTRACT',
'CURR RENT RCVB',
'END DEPOSIT',
'LEASE PYMTS',
'TOTAL',
'CONTRACT STAT',
'PAYMENTS RCVD',
'PROV LOSS',
'UNEARN INC',
'REM RENT RCVB',
'CURR INT RCVB',
]]
debug(f"net_inv_tb | Saving data {save_name}")
with pd.ExcelWriter(save_name) as writer:
dataframe.to_excel(writer, index=False, sheet_name="data")
summary.to_excel(
writer, index=True, sheet_name="Summary")
return dataframe
def lockbox(report: str, save_name: str):
debug(f"LockBox Report {save_name}:\n{report}")
lines = report.splitlines()
extracted_data_dict = {
"SEQ" : [],
"PYMT DATE" : [],
"INV NUM" : [],
"CHECK NUMBER" : [],
"PAYMENT AMOUNT" : [],
"NOTE" : [],
"IL SEQ" : [],
"CONTRACT NUM" : [],
"IL PAYMENT AMOUNT" : [],
"CUST NAME" : [],
}
columns = list(extracted_data_dict.keys())
data_extractor = create_line_divider([9,19,39,56,69,89,98,118])
for index, line in enumerate(lines):
match = False
# Try to find the first SEQ # & a contract payment date e.i. ' 197 05/10/2022'
if re.match("(\s|\d){3}\d{1}\s{5}\d{2}/\d{2}/\d{4}", line):
debug(f"LockBox {index} | Found SEQ # and payment date:\n{line}")
match = True
# Add all of the data points except customer name
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0,len(columns)-1)]
# Check to see if this line contains only an infolease payment
# Some times there are multiple infolease payments for a single bank record
elif re.search(contract_number_regex, line) != None:
debug(f"LockBox {index} | Found contract number:\n{line}")
match = True
# If there is then we can add the same data as the previous complete line
[extracted_data_dict[columns[c]].append(extracted_data_dict[columns[c]][-1]) for c in range(0,6)]
# Then add the new data for the infolease contract
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(6,len(columns)-1)]
# If we had a match we need a customer name to associate with it
# Sometimes these can appear on the next page hense the while loop searching for a match
if match:
# We can tell the cust name will be on the next page if the word "PAGE" appears three lines under the current line
# And the next line is blank
if (lines[index+1].strip() == "") & (lines[index+3].find("PAGE") != -1):
debug(f"LockBox found PAGE on line {index+3}. Looping to find cust name.")
i = 0
# Look for a bunch of whitespace then some writing
while not re.match("\s{98}.{34}", lines[index+i]):
debug(f"LockBox searching for whitespace above custname. Line {index+1}.")
i +=1
# Once we find it add the cust name to the dict (it's the only thing on the line)
extracted_data_dict["CUST NAME"].append(lines[index+i].strip())
# if the condition above isnt met then the cust name is on the next line (even if that line is blank)
else:
extracted_data_dict["CUST NAME"].append(lines[index+1].strip())
dataframe = DataFrame(extracted_data_dict)
debug(f"LockBox | Saving dataframe: {save_name}")
dataframe.to_excel(save_name, index=False)
return dataframe
def minv(report: str, save_name: str):
debug(f"Minv {save_name}:\n{report}")
lines = report.splitlines()
data_extractor = create_line_divider([15,32,52,71,83,107,116,128])
extracted_data_dict = {
"ContractNumber" : [],
"UTAB_OIC_DUE" : [],
"RentalDue" : [],
"UTAB_OIC_PYMT" : [],
"ChargeType" : [],
"OutstandBalance" : [],
"BizSegment" : [],
"BookingDate" : [],
"Branch" : [],
}
columns = list(extracted_data_dict.keys())
for _index, line in enumerate(lines):
if re.search(contract_number_regex, line) != None:
debug(f"Minv {_index} | Found contract number:\n{line}")
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0,len(columns))]
#All the list lengths need to be the same so if anything was missed it will fail to build
dataframe = DataFrame(extracted_data_dict)
debug(f"Minv | Original DF:\n{dataframe}")
filtered = dataframe[(dataframe["BookingDate"] != dt.today().strftime("%m/%d/%Y")) &
((dataframe["RentalDue"] > 0) | ((dataframe["RentalDue"] == 0) & (dataframe["OutstandBalance"] > 100)))]
debug(f"Minv | Filtered DF:\n{filtered}")
with open(save_name, 'w') as output:
debug(f"Minv | Saving number list to {save_name}.")
# Use set to get delete duplicate values
for contract in list(set(filtered['ContractNumber'].to_list())):
output.write(f"{contract}\n")
return filtered
# Good for PUB_WIRES, VMCC, PBP_EPAY, returned check
def payment_transactions(report: str, save_name: str):
debug(f"PayTrans | {save_name}:\n{report}")
lines = report.splitlines()
data_extractor = create_line_divider([6,33,52,62,80,89,110,121])
extracted_data_dict = {
'SEQ' : [],
'ACCOUNT NUMBER' : [],
'PYMT METHOD' : [],
'DATE RCVD' : [],
'AMOUNT' : [],
'REF NO': [],
'PAYMENT MEMO' : [],
'PYMT TYPE' : [],
'CHECK NO' : [],
'CUSTOMER NAME' : [],
'TRANSACTIONS NUM': [],
'INV NO' : [],
}
columns = list(extracted_data_dict.keys())
transaction_num_regex = "\d{8}"
for index, line in enumerate(lines):
slot1 = data_extractor(1,line)
if type(slot1) != str : continue
if (re.search(contract_number_regex, slot1) or re.search("\d{3}\.\d{4}\.\d{4}", slot1))!= None:
[extracted_data_dict[columns[c]].append(data_extractor(c, line)) for c in range(0,len(columns)-3)]
tnum_match = re.search(transaction_num_regex, lines[index+1])
if tnum_match:
tnum = lines[index+1][tnum_match.start():tnum_match.end()]
else:
tnum = ""
extracted_data_dict["TRANSACTIONS NUM"].append(tnum)
cname = lines[index+1][6:37].strip()
extracted_data_dict['CUSTOMER NAME'].append(cname)
inv_no = lines[index+1][79:90].strip()
extracted_data_dict['INV NO'].append(inv_no)
dataframe = DataFrame(extracted_data_dict)
debug(f"PayTrans | Complted Dataframe:\n{dataframe}")
dataframe.to_excel(save_name, index=False)
debug(f"PayTrans | Saved to {save_name}")
return dataframe
def renewal_net_invest_trial_balance(report: str, save_name: str):
lines = report.splitlines()
data_extractor = create_line_divider([21, 29, 43, 58, 71, 88, 99, 113])
extracted_data_dict = {
'CUSTOMER NAME': [],
'TYPE': [],
'GROSS RENEWAL': [],
'REMAINING BAL': [],
'FINANCED RES': [],
'REMAINING RES': [],
'LEASE PYMTS': [],
'CONTRACT NUMBER': [],
'RENEWAL': [],
'PAYMENTS RCVD': [],
'CUR RENT RCVB': [],
'UNEARNED RIN': [],
'SECURITY DEP': [],
'NET INVEST': [],
'UNEARN INCOME': [],
'TOTAL': [],
'REM RENT RCVB': [],
'UNPAID RES': [],
}
columns = list(extracted_data_dict.keys())
line0 = list(zip(columns[0:7], [0, 1, 2, 3, 4, 5, 7]))
line1 = list(zip(columns[7:16], [i for i in range(0, 9)]))
line2 = list(zip(columns[16:], [3, 4]))
for index, line in enumerate(lines):
slot1 = data_extractor(0, line)
if type(slot1) != str:
continue
if re.search(contract_number_regex, slot1) != None:
data_section = lines[index-1:index+2]
# SEE net_invest_trial_balance FOR EXPLAINATION
if data_section[0].find(".") == -1:
data_section[0] = lines[index-2]
for ds in enumerate(data_section):
if ds[1].find(".") == -1:
if ds[0] < len(data_section) - 1:
for i in range(ds[0], len(data_section)-1):
data_section[i] = data_section[i+1]
data_section[2] = lines[index+2]
else:
data_section[2] = lines[index+2]
[extracted_data_dict[c[0]].append(
data_extractor(c[1], data_section[0])) for c in line0]
[extracted_data_dict[c[0]].append(
data_extractor(c[1], data_section[1])) for c in line1]
[extracted_data_dict[c[0]].append(
data_extractor(c[1], data_section[2])) for c in line2]
dataframe = DataFrame(extracted_data_dict)
dataframe["Fund"] = dataframe["CONTRACT NUMBER"].apply(
lambda con_num: con_num[0:3])
summary = pd.pivot_table(dataframe,
values=['CUSTOMER NAME',
"UNPAID RES", "REMAINING RES", "SECURITY DEP", 'GROSS RENEWAL',
"FINANCED RES", "LEASE PYMTS", "PAYMENTS RCVD", "NET INVEST", "TOTAL",
"CUR RENT RCVB", "UNEARNED RIN", "UNEARN INCOME", "REM RENT RCVB"],
aggfunc={
'CUSTOMER NAME': np.size,
"UNPAID RES": np.sum,
"REMAINING RES": np.sum,
"SECURITY DEP": np.sum,
'GROSS RENEWAL': np.sum,
"FINANCED RES": np.sum,
"LEASE PYMTS": np.sum,
"PAYMENTS RCVD": np.sum,
"NET INVEST": np.sum,
"TOTAL": np.sum,
"CUR RENT RCVB": np.sum,
"UNEARNED RIN": np.sum,
"UNEARN INCOME": np.sum,
"REM RENT RCVB": np.sum
},
index="Fund")
summary.rename(columns={"CUSTOMER NAME": "Renewal Count"}, inplace=True)
summary = summary[['Renewal Count',
'UNPAID RES',
'REMAINING RES',
'SECURITY DEP',
'GROSS RENEWAL',
'REMAINING RES',
'FINANCED RES',
'LEASE PYMTS',
'PAYMENTS RCVD',
'NET INVEST',
'TOTAL',
'CUR RENT RCVB',
'UNEARNED RIN',
'UNEARN INCOME',
'REM RENT RCVB',
]]
dataframe = dataframe[['Fund',
'CONTRACT NUMBER',
'TYPE',
'RENEWAL',
'UNPAID RES',
'REMAINING RES',
'SECURITY DEP',
'GROSS RENEWAL',
'FINANCED RES',
'PAYMENTS RCVD',
'NET INVEST',
'REMAINING BAL',
'CUSTOMER NAME',
'LEASE PYMTS',
'CUR RENT RCVB',
'UNEARNED RIN',
'UNEARN INCOME',
'TOTAL',
'REM RENT RCVB',
]]
with pd.ExcelWriter(save_name) as writer:
dataframe.to_excel(writer, index=False, sheet_name="data")
summary.to_excel(
writer, index=True, sheet_name="Summary")
return dataframe
def unapplied(report: str, save_name: str):
lines = report.splitlines()
extracted_data_dict = {
"Trans Num": [],
"ContractNumber": [],
"CheckNum": [],
"Date RCVD": [],
"Asset ID": [],
"Reversed Amt": [],
"Branch": [],
"Unapplied Susp Acct": [],
"PaymentMemo": [],
"Payers Name": [],
"Batch Num": [],
"Posting Date": [],
"Unapplied Amt": [],
"Rev Post Date": [],
"Ref Num": [],
"Check Amt": [],
"Reason Code": [],
}
columns = list(extracted_data_dict.keys())
# Iterate through the lines one at a time to look for relavant data
# Use enumerate so that we know which line we're currently working on
# this allows us to also work in the 'report' structure so that we can
# grab the customer name from the line proceding the data
data_extractor = create_line_divider([9, 29, 38, 50, 65, 80, 89, 108])
for index, line in enumerate(lines):
if (re.search("\d{7}", str(data_extractor(0, line))) != None) &\
(re.search("\d{2}/\d{2}/\d{4}", str(data_extractor(3, line))) != None):
[extracted_data_dict[columns[c]].append(
data_extractor(c, line)) for c in range(0, 9)]
[extracted_data_dict[columns[8+c]].append(data_extractor(
c, lines[index+1])) for c in range(1, len(columns)-8)]
dataframe = DataFrame(extracted_data_dict)
dataframe["ReverseAmt"] = [np.NaN for _ in range(0, len(dataframe))]
dataframe = dataframe[[
'Trans Num',
'ContractNumber',
'CheckNum',
'Date RCVD',
'Payers Name',
'Unapplied Amt',
'Reason Code',
'Batch Num',
'Posting Date',
'Asset ID',
'Rev Post Date',
'ReverseAmt',
'Branch',
'Ref Num',
'Unapplied Susp Acct',
'PaymentMemo',
'Check Amt',
]]
dataframe.to_excel(save_name, index=False)
return dataframe
def pastdue(report: str, save_name: str):
lines = report.splitlines()
extracted_data_dict = {
"Contract Number": [],
"Right 11": [],
"Past Due Rental": [],
"Current Rent": [],
"Cust Cred Act": [],
"Cust Name": [],
"Branch": [],
"Blend NIV": [],
"Delinq Code": [],
"Pymt Option": [],
"Bank Num": [],
"Account Num": [],
"Due Day": [],
"LEAD Days": [],
"Invoice Lead Days": [],
"ACH LEAD Days": [],
"Renewal": [],
"Follow Up Code": [],
}
# WIll need 'right 11' later
columns = list(extracted_data_dict.keys())
# These are the line spaces where each column is held
slots = [(0,16), (5,16),(389,405),(126,141),(16,36),(37,67),(68,74),(75,93),(94,111),(168,180),\
(190,204),(204,225), (242,253), (225,241), (436,444), (445,461), (462,469), (470,478)]
for _index, line in enumerate(lines):
if re.search(contract_number_regex, line) != None:
# goes through the column names (by number) then gets the charcter slot (start and end)
[extracted_data_dict[columns[c]].append((line[slots[c][0]:slots[c][1]]).strip()) for c in range(0, len(columns))]
# This regex finds lines with only a name in them | (blank in the beginig then atleast one character)
elif re.search("\s{38}\w+", line) != None:
extracted_data_dict["Cust Name"][-1] = (extracted_data_dict["Cust Name"][-1] + line[37:67]).strip()
dataframe = DataFrame(extracted_data_dict)
dataframe = dataframe.astype(
{"Past Due Rental": "float", "Current Rent": "float", "Branch": "int32",
"Blend NIV": "float", "Delinq Code": "int32", "Due Day":"int32", "Invoice LEAD Days": "int32", "ACH LEAD Days": "int32"
}, errors="ignore" )
dataframe.to_excel(save_name, index=False)
return dataframe