Added debugging info, code clarification, we well as removing commented out code

v3.1
Griffiths Lott 3 years ago
parent b0e2bf6965
commit f32ed57130
  1. 305
      ILExtract.py

@ -1,13 +1,20 @@
import os import os
import pandas as pd import pandas as pd
from pandas import DataFrame
from datetime import datetime as dt, timedelta from datetime import datetime as dt, timedelta
import sys, getopt import json
import re import re
from pathlib import Path from pathlib import Path
import time import time
import numpy as np import numpy as np
from logging import debug, DEBUG, basicConfig, warn
# V3.0 | 08/22/22 # V3.1 | 01/19/23
with open("settings.json") as s:
settings = json.loads(s.read())
if settings["debug"]:
basicConfig(filename='debug.log', encoding='utf-8', level=DEBUG)
# contract numbers are a common feature in many reports to it's # contract numbers are a common feature in many reports to it's
# useful to have the regex for them globally avaiable # useful to have the regex for them globally avaiable
@ -19,50 +26,38 @@ class ILReport:
It makes it easier to add new reports to the workflow and to make it more clear where It makes it easier to add new reports to the workflow and to make it more clear where
the reports are coming from. It also helps with tracking reports that may not be ready yet. the reports are coming from. It also helps with tracking reports that may not be ready yet.
""" """
def __init__(self, location, extraction_function, output_location = None): def __init__(self, location: str, extraction_function: str, output_location: str = None):
debug(f"ILReport:\n\tLocation: {location}\n\tExtract Function: {extraction_function}\n\tOutput Location: {output_location}")
# The location where the InfoLease report is stored # The location where the InfoLease report is stored
self.location = location self.location = location
# If output location not specified, save to the input location # If output location not specified, save to the input location
if output_location == None: if output_location == None:
self.output_location = Path(location).parent.absolute() self.output_location = Path(location).parent.absolute()
debug(f"ILReport using Parent path for output: {self.output_location}")
else: else:
self.output_location = output_location self.output_location = output_location
# The function used to extract the data from the report # The function used to extract the data from the report
self.x_method = extraction_function self.x_method = extraction_function
# Tracks whether the data was successfully exctracted
self.successful = False
def process(self): def process(self):
print("Processing file") debug("ILReport: Reading file")
try: try:
# Open the file and read it to a string | errors = 'replace' deals with non UTF-8 characters (no affect on output) # Open the file and read it to a string | errors = 'replace' deals with non UTF-8 characters (no affect on output)
with open(self.location, errors="replace") as ifile: with open(self.location, errors="replace") as ifile:
report = ifile.read() report = ifile.read()
report = report.replace("^"," ") report = report.replace("^"," ")
except IOError as ioe: except IOError as ioe:
print(f"Failed to open file: {self.location}\n{ioe}") warn(f"ILReport: Failed to open file: {self.location}\n{ioe}")
self.successful = False
return 1
#try:
# Run the associated method to extract the data and get the dataframe
print("Running parsing process")
print("Print something")
dataframe = self.x_method(report, self.output_location)
#except Exception as e:
# print(f"Failed to create dataframe: {self.output_name}\n{e}")
# self.successful = False
# return 1
try:
assert(len(dataframe) > 1)
except Exception as e:
print(f"Data Length Error: {self.output_name} is empty:\n{dataframe}")
self.successful = False
return 1 return 1
self.successful = True debug("ILReport: Starting parsing process")
dataframe: DataFrame = self.x_method(report, self.output_location)
if dataframe.empty:
warn(f"ILReport: resulting dataframe was empty! Exiting with None.")
return None
return dataframe return dataframe
def append_to_consolidated_report(self, output_dataframe: pd.DataFrame): def append_to_consolidated_report(self, output_dataframe: DataFrame):
""" """
Add's the reports dataframe to the current months consolidated report or creates one if Add's the reports dataframe to the current months consolidated report or creates one if
it already exists it already exists
@ -83,7 +78,7 @@ def create_line_divider(breakage_list: list):
characters 0 - 10 from the string. characters 0 - 10 from the string.
Slot 1 would give characters 10 - 20 Slot 1 would give characters 10 - 20
""" """
def extract_line_slot(slot_num : int, line_string: str, debug : bool = False): def extract_line_slot(slot_num : int, line_string: str):
""" """
Pulls data from a line/string using break points defined by the Pulls data from a line/string using break points defined by the
parent function. parent function.
@ -98,8 +93,7 @@ def create_line_divider(breakage_list: list):
data = line_string[low_range:high_range].strip().replace(",", "") data = line_string[low_range:high_range].strip().replace(",", "")
try: data = float(data) try: data = float(data)
except: pass except: pass
if debug: debug(f"Slot num: {slot_num} | Low: {low_range} | High: {high_range} | Data: {data}")
print(f"Slot num: {slot_num} | Low: {low_range} | High: {high_range} | Data: {data}")
return data return data
return extract_line_slot return extract_line_slot
@ -138,6 +132,7 @@ COMMON REGEX COMPONENTS
def ach(report: str, save_name: str): def ach(report: str, save_name: str):
debug(f"ACH Report {save_name} :\n{report}")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
"ContractNumber" : [], "ContractNumber" : [],
@ -155,49 +150,50 @@ def ach(report: str, save_name: str):
"batch_num": [], "batch_num": [],
"payment_date": [], "payment_date": [],
"lessor": [], "lessor": [],
#"count": [],
"total": [] "total": []
} }
data_extractor = create_line_divider([19,57,67,82,104]) data_extractor = create_line_divider([19,57,67,82,104])
bank_number_regex = "\d{9}" bank_number_regex = "\d{9}"
batch_num_regex = "BATCH \d{4} TOTAL" batch_num_regex = "BATCH \d{4} TOTAL"
for line in enumerate(lines): for index, line in enumerate(lines):
# Check for a contract number and a bank number in the line # Check for a contract number and a bank number in the line
if (re.search(contract_number_regex, line[1]) != None) & (re.search(bank_number_regex, line[1]) != None): if (re.search(contract_number_regex, line) != None) & (re.search(bank_number_regex, line) != None):
debug(f"ACH {index}: Found a contract or bank number:\n{line}")
# Iterates through the columns list and adds the corresponding slot number to the dictonary for the column # Iterates through the columns list and adds the corresponding slot number to the dictonary for the column
# Here the order of the columns (keys in dictonary) matter since they need to be in the same order as # Here the order of the columns (keys in dictonary) matter since they need to be in the same order as
# the slot numbers # the slot numbers
[extracted_data_dict[columns[c]].append(data_extractor(c, line[1])) for c in range(0, len(columns)-3)] [extracted_data_dict[columns[c]].append(data_extractor(c, line)) for c in range(0, len(columns)-3)]
# This searches for a statement that looks like a batch number # This searches for a statement that looks like a batch number
# This sums the contracts by thier lessor code. A feature requested by cash apps # This sums the contracts by thier lessor code. A feature requested by cash apps
if re.search(batch_num_regex, line[1]) != None: if re.search(batch_num_regex, line) != None:
debug(f"ACH {index}: Found a batch number:\n{line}")
# Batch number is always in characters 96 to 101 # Batch number is always in characters 96 to 101
batches["batch_num"].append(line[1][96:101]) batches["batch_num"].append(line[96:101])
# Payment date will be 2 lines below that between charactes 114 and 125 # Payment date will be 2 lines below that between charactes 114 and 125
batches["payment_date"].append(lines[line[0]+2][114:125]) batches["payment_date"].append(lines[index+2][114:125])
# Lessor is just the first three number sof the contract number # Lessor is just the first three number sof the contract number
batches["lessor"].append(extracted_data_dict["ContractNumber"][-1][0:3]) batches["lessor"].append(extracted_data_dict["ContractNumber"][-1][0:3])
# Total is a number given by the report for that batch. ',' is removed so that it can be transformed into a float # Total is a number given by the report for that batch. ',' is removed so that it can be transformed into a float
batches["total"].append(float(line[1][107:125].strip().replace(",", ""))) batches["total"].append(float(line[107:125].strip().replace(",", "")))
#print(f"{line[0]+6} | {lines[line[0]+6][107:125]}\n{lines[line[0]+6]}")
#batches["count"].append(float(lines[line[0]+6][107:125].strip().replace(",", "")))
# Any time there's a new batch we need to add this data to the dictionary up up to the currrent place # Any time there's a new batch we need to add this data to the dictionary up up to the currrent place
# So we iterate over the number of contracts and add in the newest value for each that don't have one of these values already # So we iterate over the number of contracts and add in the newest value for each that don't have one of these values already
[extracted_data_dict["Batch"].append(batches["batch_num"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Batch"])))] [extracted_data_dict["Batch"].append(batches["batch_num"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Batch"])))]
[extracted_data_dict["Lessor"].append(batches["lessor"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Lessor"])))] [extracted_data_dict["Lessor"].append(batches["lessor"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["Lessor"])))]
[extracted_data_dict["PaymentDate"].append(batches["payment_date"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["PaymentDate"])))] [extracted_data_dict["PaymentDate"].append(batches["payment_date"][-1]) for _ in range(0, (len(extracted_data_dict["BankCode"]) - len(extracted_data_dict["PaymentDate"])))]
# Now the dictioanry lists should all be equal lengths and we can create a dataframe # Now the dictioanry lists should all be equal lengths and we can create a dataframe
dataframe = pd.DataFrame(extracted_data_dict) dataframe: DataFrame = DataFrame(extracted_data_dict)
# We're creating two sheets: data & summary so we need to open and excel writer # We're creating two sheets: data & summary so we need to open and excel writer
# This also helps with a bug caused by larger dataframes # This also helps with a bug caused by larger dataframes
with pd.ExcelWriter(save_name) as writer: with pd.ExcelWriter(save_name) as writer:
debug(f"ACH: Saving data as: {save_name}")
dataframe.to_excel(writer, index=False, sheet_name="data") dataframe.to_excel(writer, index=False, sheet_name="data")
# The batches dictioanry is converted to a dataframe and added as it's own sheet # The batches dictioanry is converted to a dataframe and added as it's own sheet
pd.DataFrame(batches).to_excel(writer, index=False, sheet_name="Summary") DataFrame(batches).to_excel(writer, index=False, sheet_name="Summary")
return dataframe return dataframe
def disposition(report: str, save_name: str): def disposition(report: str, save_name: str):
debug(f"Disp Report {save_name} :\n{report}")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
"ContractNumber" : [], "ContractNumber" : [],
@ -214,17 +210,19 @@ def disposition(report: str, save_name: str):
} }
columns = list(extracted_data_dict.keys()) columns = list(extracted_data_dict.keys())
data_extractor = create_line_divider([15,32,41, 51, 61, 79,88, 103, 114]) data_extractor = create_line_divider([15,32,41, 51, 61, 79,88, 103, 114])
for line in enumerate(lines): for index, line in enumerate(lines):
if re.search(contract_number_regex, data_extractor(0,line[1])): if re.search(contract_number_regex, data_extractor(0,line)):
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1])) for c in range(0, len(columns)-1)] debug(f"Disp {index}: Found contract number:\n{line}")
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0, len(columns)-1)]
# Customer name is on a seperate line so we need to grab that seperately # Customer name is on a seperate line so we need to grab that seperately
extracted_data_dict["Customer Name"].append(lines[line[0]+1].strip()) extracted_data_dict["Customer Name"].append(lines[index+1].strip())
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
dataframe.to_excel(save_name, index=False) dataframe.to_excel(save_name, index=False)
return dataframe return dataframe
def gainloss(report: str, save_name: str): def gainloss(report: str, save_name: str):
debug(f"GL Report {save_name} :\n{report}")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
'REM RENT RCVB' : [], 'REM RENT RCVB' : [],
@ -267,7 +265,7 @@ def gainloss(report: str, save_name: str):
'OPER BASIS' : [], 'OPER BASIS' : [],
'CTD OPER DEPR' : [], 'CTD OPER DEPR' : [],
} }
# L0: BlendedInc 6 # Level 0: BlendedInc 6
# L1: Late CHGS 14 # L1: Late CHGS 14
# L2: Gain/Loss 22 # L2: Gain/Loss 22
# L3: Def Ren Int 30 # L3: Def Ren Int 30
@ -280,18 +278,19 @@ def gainloss(report: str, save_name: str):
# #
# It looks confusing but makes more sense if you look at the actual Info Lease reports # It looks confusing but makes more sense if you look at the actual Info Lease reports
# This is one of the messiest reports # This is one of the messiest reports
line0 = list(zip(columns[0:7],[i for i in range(1,8)])) line0 = list(zip(columns[0:7], [i for i in range(1,8)]))
line1 = list(zip(columns[7:15],[i for i in range(0,8)])) line1 = list(zip(columns[7:15], [i for i in range(0,8)]))
line2 = list(zip(columns[15:23], [i for i in range(0,8)])) line2 = list(zip(columns[15:23], [i for i in range(0,8)]))
line3 = list(zip(columns[23:31], [i for i in range(0,8)])) line3 = list(zip(columns[23:31], [i for i in range(0,8)]))
line4 = list(zip(columns[31:36], [i for i in range(1,8) if i not in [3,6]])) # In line 4 we need to skip blank slots 3,6
line5 = list(zip(columns[36:], [i for i in range(1,4)])) line4 = list(zip(columns[31:36], [i for i in range(1,8) if i not in [3,6]]))
line5 = list(zip(columns[36:], [i for i in range(1,4)]))
data_extractor = create_line_divider([27,43,58,74,88,105,120]) data_extractor = create_line_divider([27,43,58,74,88,105,120])
for line in enumerate(lines): for index, line in enumerate(lines):
# The line must contain a contract number and the first data slot should be a float # The line must contain a contract number and the first data slot should be a float
if (re.search(contract_number_regex, data_extractor(0,line[1])) != None)&\ if (re.search(contract_number_regex, data_extractor(0,line)) != None) & (type(data_extractor(1,line)) == float) :
(type(data_extractor(1,line[1])) == float) : debug(f"GL {index}: Found contract number and float in slot 1:\n{line}")
data_section = lines[line[0]-1:line[0]+5] data_section = lines[index-1:index+5]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[0])) for c in line0] [extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[0])) for c in line0]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[1])) for c in line1] [extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[1])) for c in line1]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[2])) for c in line2] [extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[2])) for c in line2]
@ -299,16 +298,16 @@ def gainloss(report: str, save_name: str):
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[4])) for c in line4] [extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[4])) for c in line4]
[extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[5])) for c in line5] [extracted_data_dict[c[0]].append(data_extractor(c[1], data_section[5])) for c in line5]
df = pd.DataFrame(extracted_data_dict) df = DataFrame(extracted_data_dict)
print("df created") debug(f"GL | dataframe created:\n{df}")
# The Accounting team wanted the disposotion code split into number and descriptionso... # The Accounting team wanted the disposotion code split into number and description so...
print("Splitting disp") debug(f"GL | Splitting disp code")
df["DISPOSITION DESC"] = df['DISPOSITION CODE'].apply(lambda dc: " ".join(dc.split(" ")[1:])) df["DISPOSITION DESC"] = df['DISPOSITION CODE'].apply(lambda dc: " ".join(dc.split(" ")[1:]))
df["DISPOSITION CODE"] = df['DISPOSITION CODE'].apply(lambda dc: dc.split(" ")[0]) df["DISPOSITION CODE"] = df['DISPOSITION CODE'].apply(lambda dc: dc.split(" ")[0])
print("adding Fund") debug("GL | adding Fund column (first 3 of contract number)")
df["Fund"] = df["CONTRACT NUMBER"].apply( df["Fund"] = df["CONTRACT NUMBER"].apply(
lambda con_num: con_num[0:3]) lambda con_num: con_num[0:3])
print("Reordering df") debug("GL | Reordering dataframe")
df = df[['Fund', df = df[['Fund',
'CONTRACT NUMBER', 'CONTRACT NUMBER',
'CUSTOMER NAME', 'CUSTOMER NAME',
@ -351,12 +350,13 @@ def gainloss(report: str, save_name: str):
'UNPAID ACCRD', 'UNPAID ACCRD',
'UNRECOG GST', 'UNRECOG GST',
]] ]]
print("saving df") debug(f"GL | saving dataframe {save_name}:\n{df}")
df.to_excel(save_name, index=False) df.to_excel(save_name, index=False)
return df return df
# Works for Net-inv-loans & NIV-after # Works for Net-inv-loans & NIV-after
def net_invest_trial_balance(report: str, save_name: str): def net_invest_trial_balance(report: str, save_name: str):
debug(f"net_inv_tb Report {save_name} :\n{report}")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
'CUSTOMER NAME': [], 'CUSTOMER NAME': [],
@ -384,28 +384,30 @@ def net_invest_trial_balance(report: str, save_name: str):
'NET INV': [], 'NET INV': [],
'UNEARNED IDC': [], 'UNEARNED IDC': [],
} }
lessors = []
columns = list(extracted_data_dict.keys()) columns = list(extracted_data_dict.keys())
line0 = list(zip(columns[0:4], [0, 3, 4, 5])) line0 = list(zip(columns[0:4], [0, 3, 4, 5]))
line1 = list(zip(columns[4:12], [i for i in range(0, 8)])) line1 = list(zip(columns[4:12], [i for i in range(0, 8)]))
line2 = list(zip(columns[12:19], [i for i in range(0, 7)])) line2 = list(zip(columns[12:19],[i for i in range(0, 7)]))
line3 = list(zip(columns[19:], [i for i in range(1, 6)])) line3 = list(zip(columns[19:], [i for i in range(1, 6)]))
data_extractor = create_line_divider([18, 32, 50, 66, 84, 100, 117,132]) data_extractor = create_line_divider([18, 32, 50, 66, 84, 100, 117,132])
for line in enumerate(lines): for index, line in enumerate(lines):
slot1 = data_extractor(0, line[1], False) slot1 = data_extractor(0, line)
if type(slot1) != str: if type(slot1) != str:
continue continue
if re.search(contract_number_regex, slot1) != None: if re.search(contract_number_regex, slot1) != None:
data_section = lines[line[0]-1:line[0]+3] debug(f"net_inv_tb {index} | Found contract number in slot 1:\n{line}")
data_section = lines[index-1:index+3]
debug(f"net_inv_tb {index} | Data section:\n{data_section}")
# There were issues were the IL Report would have random blank lines so that needs to be checked # There were issues were the IL Report would have random blank lines so that needs to be checked
# and adjusted for # and adjusted for
# A dead give away of an empty line in a data section is a line without a '.' # A dead give away of an empty line in a data section is a line without a '.'
# Check the first data line # Check the first data line
if data_section[0].find(".") == -1: if data_section[0].find(".") == -1:
debug(f"net_int_tb {index} | datasection[0] is empty. Moving back")
# Move it back if empty # Move it back if empty
data_section[0] = lines[line[0]-2] data_section[0] = lines[index-2]
# Now we go through each relevant data line and make sure they're not blank # Now we go through each relevant data line and make sure they're not blank
for ds in enumerate(data_section): for ds in enumerate(data_section):
if ds[1].find(".") == -1: if ds[1].find(".") == -1:
@ -414,27 +416,24 @@ def net_invest_trial_balance(report: str, save_name: str):
# This allows us to move down all the data lines after a blank data line # This allows us to move down all the data lines after a blank data line
data_section[i] = data_section[i+1] data_section[i] = data_section[i+1]
# This handles the last data line which goes 'out-of-bounds' of the existing data selection # This handles the last data line which goes 'out-of-bounds' of the existing data selection
data_section[3] = lines[line[0]+3] data_section[3] = lines[index+3]
else: else:
data_section[3] = lines[line[0]+3] data_section[3] = lines[index+3]
# Now that the datasection is sorted we can extract the data # Now that the datasection is sorted we can extract the data
[extracted_data_dict[c[0]].append(data_extractor( # c[0] : Column name
c[1], data_section[0], False)) for c in line0] # c[1] : Column slot number
[extracted_data_dict[c[0]].append(data_extractor( [ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[0]) ) for c in line0 ]
c[1], data_section[1], False)) for c in line1] [ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[1]) ) for c in line1 ]
[extracted_data_dict[c[0]].append(data_extractor( [ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[2]) ) for c in line2 ]
c[1], data_section[2], False)) for c in line2] [ extracted_data_dict[c[0]].append( data_extractor(c[1], data_section[3]) ) for c in line3 ]
[extracted_data_dict[c[0]].append(data_extractor(
c[1], data_section[3], False)) for c in line3] dataframe: DataFrame = DataFrame(extracted_data_dict)
dataframe = pd.DataFrame(extracted_data_dict)
dataframe["LESSOR"] = dataframe["LEASE NUMBER"].apply(lambda con: con[0:3]) dataframe["LESSOR"] = dataframe["LEASE NUMBER"].apply(lambda con: con[0:3])
dataframe = dataframe.replace("REVOLV", np.NaN) dataframe = dataframe.replace("REVOLV", np.NaN)
dataframe = dataframe.replace("ING ACCOUNT", np.NaN) dataframe = dataframe.replace("ING ACCOUNT", np.NaN)
dataframe = dataframe.replace("", np.NaN) dataframe = dataframe.replace("", np.NaN)
print(dataframe) debug(f"net_inv_tb | Dataframe complete:\n{dataframe}")
#dataframe.to_excel("test_niv.xlsx") debug("net_inv_tb | Createing pivot...")
print("Dataframe complete")
print("Createing pivot...")
nums = ['RESIDUAL', nums = ['RESIDUAL',
'UNEARN FIN', 'UNEARN FIN',
'UNEARNED BLENDED', 'UNEARNED BLENDED',
@ -507,8 +506,6 @@ def net_invest_trial_balance(report: str, save_name: str):
}, },
index="LESSOR") index="LESSOR")
print(summary)
print("Summary complete")
summary.rename(columns={"CUSTOMER NAME": "Contract Count"}, inplace=True) summary.rename(columns={"CUSTOMER NAME": "Contract Count"}, inplace=True)
summary = summary[['Contract Count', summary = summary[['Contract Count',
'BAL REMAINING', 'BAL REMAINING',
@ -533,6 +530,7 @@ def net_invest_trial_balance(report: str, save_name: str):
'CURR RENT RCVB', 'CURR RENT RCVB',
'END DEPOSIT', 'END DEPOSIT',
]] ]]
debug(f"net_inv_tb | Summary complete:\n{summary}")
dataframe = dataframe[['LESSOR', dataframe = dataframe[['LESSOR',
'LEASE NUMBER', 'LEASE NUMBER',
'BAL REMAINING', 'BAL REMAINING',
@ -559,7 +557,7 @@ def net_invest_trial_balance(report: str, save_name: str):
'REM RENT RCVB', 'REM RENT RCVB',
'CURR INT RCVB', 'CURR INT RCVB',
]] ]]
print("Attempting to save") debug(f"net_inv_tb | Saving data {save_name}")
with pd.ExcelWriter(save_name) as writer: with pd.ExcelWriter(save_name) as writer:
dataframe.to_excel(writer, index=False, sheet_name="data") dataframe.to_excel(writer, index=False, sheet_name="data")
summary.to_excel( summary.to_excel(
@ -567,6 +565,7 @@ def net_invest_trial_balance(report: str, save_name: str):
return dataframe return dataframe
def lockbox(report: str, save_name: str): def lockbox(report: str, save_name: str):
debug(f"LockBox Report {save_name}:\n{report}")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
"SEQ" : [], "SEQ" : [],
@ -582,43 +581,48 @@ def lockbox(report: str, save_name: str):
} }
columns = list(extracted_data_dict.keys()) columns = list(extracted_data_dict.keys())
data_extractor = create_line_divider([9,19,39,56,69,89,98,118]) data_extractor = create_line_divider([9,19,39,56,69,89,98,118])
for line in enumerate(lines): for index, line in enumerate(lines):
match = False match = False
# Try to find the first SEQ # & a contract payment date e.i. ' 197 05/10/2022' # Try to find the first SEQ # & a contract payment date e.i. ' 197 05/10/2022'
if re.match("(\s|\d){3}\d{1}\s{5}\d{2}/\d{2}/\d{4}", line[1]): if re.match("(\s|\d){3}\d{1}\s{5}\d{2}/\d{2}/\d{4}", line):
debug(f"LockBox {index} | Found SEQ # and payment date:\n{line}")
match = True match = True
# Add all of the data points except customer name # Add all of the data points except customer name
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1],debug=False)) for c in range(0,len(columns)-1)] [extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0,len(columns)-1)]
# Check to see if this line contains only an infolease payment # Check to see if this line contains only an infolease payment
# Some times there are multiple infolease payments for a single bank record # Some times there are multiple infolease payments for a single bank record
elif re.search(contract_number_regex, line[1]) != None: elif re.search(contract_number_regex, line) != None:
debug(f"LockBox {index} | Found contract number:\n{line}")
match = True match = True
# If there is then we can add the same data as the previous complete line # If there is then we can add the same data as the previous complete line
[extracted_data_dict[columns[c]].append(extracted_data_dict[columns[c]][-1]) for c in range(0,6)] [extracted_data_dict[columns[c]].append(extracted_data_dict[columns[c]][-1]) for c in range(0,6)]
# Then add the new data for the infolease contract # Then add the new data for the infolease contract
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1],debug=False)) for c in range(6,len(columns)-1)] [extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(6,len(columns)-1)]
# If we had a match we need a customer name to associate with it # If we had a match we need a customer name to associate with it
# Sometimes these can appear on the next page hense the while loop searching for a match # Sometimes these can appear on the next page hense the while loop searching for a match
if match: if match:
# We can tell the cust name will be on the next page if the word "PAGE" appears three lines under the current line # We can tell the cust name will be on the next page if the word "PAGE" appears three lines under the current line
# And the next line is blank # And the next line is blank
if (lines[line[0]+1].strip() == "") & (lines[line[0]+3].find("PAGE") != -1): if (lines[index+1].strip() == "") & (lines[index+3].find("PAGE") != -1):
debug(f"LockBox found PAGE on line {index+3}. Looping to find cust name.")
i = 0 i = 0
# Look for a bunch of whitespace then some writing # Look for a bunch of whitespace then some writing
while not re.match("\s{98}.{34}", lines[line[0]+i]): while not re.match("\s{98}.{34}", lines[index+i]):
debug(f"LockBox searching for whitespace above custname. Line {index+1}.")
i +=1 i +=1
# Once we find it add the cust name to the dict (it's the only thing on the line) # Once we find it add the cust name to the dict (it's the only thing on the line)
extracted_data_dict["CUST NAME"].append(lines[line[0]+i].strip()) extracted_data_dict["CUST NAME"].append(lines[index+i].strip())
# if the condition above isnt met then the cust name is on the next line (even if that line is blank) # if the condition above isnt met then the cust name is on the next line (even if that line is blank)
else: else:
extracted_data_dict["CUST NAME"].append(lines[line[0]+1].strip()) extracted_data_dict["CUST NAME"].append(lines[index+1].strip())
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
debug(f"LockBox | Saving dataframe: {save_name}")
dataframe.to_excel(save_name, index=False) dataframe.to_excel(save_name, index=False)
return dataframe return dataframe
def minv(report: str, save_name: str): def minv(report: str, save_name: str):
print("Started minv process") debug(f"Minv {save_name}:\n{report}")
lines = report.splitlines() lines = report.splitlines()
data_extractor = create_line_divider([15,32,52,71,83,107,116,128]) data_extractor = create_line_divider([15,32,52,71,83,107,116,128])
extracted_data_dict = { extracted_data_dict = {
@ -633,24 +637,27 @@ def minv(report: str, save_name: str):
"Branch" : [], "Branch" : [],
} }
columns = list(extracted_data_dict.keys()) columns = list(extracted_data_dict.keys())
for line in enumerate(lines): for _index, line in enumerate(lines):
if re.search(contract_number_regex, line[1]) != None: if re.search(contract_number_regex, line) != None:
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1],debug=False)) for c in range(0,len(columns))] debug(f"Minv {_index} | Found contract number:\n{line}")
[extracted_data_dict[columns[c]].append(data_extractor(c,line)) for c in range(0,len(columns))]
#All the list lengths need to be the same so if anything was missed it will fail to build #All the list lengths need to be the same so if anything was missed it will fail to build
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
debug(f"Minv | Original DF:\n{dataframe}")
filtered = dataframe[(dataframe["BookingDate"] != dt.today().strftime("%m/%d/%Y")) & filtered = dataframe[(dataframe["BookingDate"] != dt.today().strftime("%m/%d/%Y")) &
((dataframe["RentalDue"] > 0) | ((dataframe["RentalDue"] == 0) & (dataframe["OutstandBalance"] > 100)))] ((dataframe["RentalDue"] > 0) | ((dataframe["RentalDue"] == 0) & (dataframe["OutstandBalance"] > 100)))]
debug(f"Minv | Filtered DF:\n{filtered}")
with open(save_name, 'w') as output: with open(save_name, 'w') as output:
add_contracts = [] debug(f"Minv | Saving number list to {save_name}.")
for contract in filtered['ContractNumber'].to_list(): # Use set to get delete duplicate values
output.write(f"{contract}\n") if contract not in add_contracts else None for contract in list(set(filtered['ContractNumber'].to_list())):
add_contracts.append(contract) output.write(f"{contract}\n")
return filtered return filtered
# Good for PUB_WIRES, VMCC, PBP_EPAY, returned check # Good for PUB_WIRES, VMCC, PBP_EPAY, returned check
def payment_transactions(report: str, save_name: str): def payment_transactions(report: str, save_name: str):
debug(f"PayTrans | {save_name}:\n{report}")
lines = report.splitlines() lines = report.splitlines()
data_extractor = create_line_divider([6,33,52,62,80,89,110,121]) data_extractor = create_line_divider([6,33,52,62,80,89,110,121])
extracted_data_dict = { extracted_data_dict = {
@ -669,26 +676,25 @@ def payment_transactions(report: str, save_name: str):
} }
columns = list(extracted_data_dict.keys()) columns = list(extracted_data_dict.keys())
transaction_num_regex = "\d{8}" transaction_num_regex = "\d{8}"
for line in enumerate(lines): for index, line in enumerate(lines):
slot1 = data_extractor(1,line[1],False) slot1 = data_extractor(1,line)
if type(slot1) != str : continue if type(slot1) != str : continue
if (re.search(contract_number_regex, slot1) or re.search("\d{3}\.\d{4}\.\d{4}", slot1))!= None: if (re.search(contract_number_regex, slot1) or re.search("\d{3}\.\d{4}\.\d{4}", slot1))!= None:
[extracted_data_dict[columns[c]].append(data_extractor(c, line[1])) for c in range(0,len(columns)-3)] [extracted_data_dict[columns[c]].append(data_extractor(c, line)) for c in range(0,len(columns)-3)]
tnum_match = re.search(transaction_num_regex, lines[line[0]+1]) tnum_match = re.search(transaction_num_regex, lines[index+1])
if tnum_match: if tnum_match:
tnum = lines[line[0]+1][tnum_match.start():tnum_match.end()] tnum = lines[index+1][tnum_match.start():tnum_match.end()]
else: else:
tnum = "" tnum = ""
extracted_data_dict["TRANSACTIONS NUM"].append(tnum) extracted_data_dict["TRANSACTIONS NUM"].append(tnum)
cname = lines[line[0]+1][6:37].strip() cname = lines[index+1][6:37].strip()
extracted_data_dict['CUSTOMER NAME'].append(cname) extracted_data_dict['CUSTOMER NAME'].append(cname)
inv_no = lines[line[0]+1][79:90].strip() inv_no = lines[index+1][79:90].strip()
extracted_data_dict['INV NO'].append(inv_no) extracted_data_dict['INV NO'].append(inv_no)
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
print(dataframe) debug(f"PayTrans | Complted Dataframe:\n{dataframe}")
print("Saving")
dataframe.to_excel(save_name, index=False) dataframe.to_excel(save_name, index=False)
print("Saved successfully") debug(f"PayTrans | Saved to {save_name}")
return dataframe return dataframe
@ -720,23 +726,23 @@ def renewal_net_invest_trial_balance(report: str, save_name: str):
line1 = list(zip(columns[7:16], [i for i in range(0, 9)])) line1 = list(zip(columns[7:16], [i for i in range(0, 9)]))
line2 = list(zip(columns[16:], [3, 4])) line2 = list(zip(columns[16:], [3, 4]))
for line in enumerate(lines): for index, line in enumerate(lines):
slot1 = data_extractor(0, line[1], False) slot1 = data_extractor(0, line)
if type(slot1) != str: if type(slot1) != str:
continue continue
if re.search(contract_number_regex, slot1) != None: if re.search(contract_number_regex, slot1) != None:
data_section = lines[line[0]-1:line[0]+2] data_section = lines[index-1:index+2]
# SEE net_invest_trial_balance FOR EXPLAINATION # SEE net_invest_trial_balance FOR EXPLAINATION
if data_section[0].find(".") == -1: if data_section[0].find(".") == -1:
data_section[0] = lines[line[0]-2] data_section[0] = lines[index-2]
for ds in enumerate(data_section): for ds in enumerate(data_section):
if ds[1].find(".") == -1: if ds[1].find(".") == -1:
if ds[0] < len(data_section) - 1: if ds[0] < len(data_section) - 1:
for i in range(ds[0], len(data_section)-1): for i in range(ds[0], len(data_section)-1):
data_section[i] = data_section[i+1] data_section[i] = data_section[i+1]
data_section[2] = lines[line[0]+2] data_section[2] = lines[index+2]
else: else:
data_section[2] = lines[line[0]+2] data_section[2] = lines[index+2]
[extracted_data_dict[c[0]].append( [extracted_data_dict[c[0]].append(
data_extractor(c[1], data_section[0])) for c in line0] data_extractor(c[1], data_section[0])) for c in line0]
@ -744,11 +750,9 @@ def renewal_net_invest_trial_balance(report: str, save_name: str):
data_extractor(c[1], data_section[1])) for c in line1] data_extractor(c[1], data_section[1])) for c in line1]
[extracted_data_dict[c[0]].append( [extracted_data_dict[c[0]].append(
data_extractor(c[1], data_section[2])) for c in line2] data_extractor(c[1], data_section[2])) for c in line2]
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
print("df created")
dataframe["Fund"] = dataframe["CONTRACT NUMBER"].apply( dataframe["Fund"] = dataframe["CONTRACT NUMBER"].apply(
lambda con_num: con_num[0:3]) lambda con_num: con_num[0:3])
print("Fund added")
summary = pd.pivot_table(dataframe, summary = pd.pivot_table(dataframe,
values=['CUSTOMER NAME', values=['CUSTOMER NAME',
"UNPAID RES", "REMAINING RES", "SECURITY DEP", 'GROSS RENEWAL', "UNPAID RES", "REMAINING RES", "SECURITY DEP", 'GROSS RENEWAL',
@ -772,9 +776,7 @@ def renewal_net_invest_trial_balance(report: str, save_name: str):
}, },
index="Fund") index="Fund")
print("Summary complete")
summary.rename(columns={"CUSTOMER NAME": "Renewal Count"}, inplace=True) summary.rename(columns={"CUSTOMER NAME": "Renewal Count"}, inplace=True)
print("Remaned Renewal count")
summary = summary[['Renewal Count', summary = summary[['Renewal Count',
'UNPAID RES', 'UNPAID RES',
'REMAINING RES', 'REMAINING RES',
@ -791,7 +793,6 @@ def renewal_net_invest_trial_balance(report: str, save_name: str):
'UNEARN INCOME', 'UNEARN INCOME',
'REM RENT RCVB', 'REM RENT RCVB',
]] ]]
print("Reordered sum")
dataframe = dataframe[['Fund', dataframe = dataframe[['Fund',
'CONTRACT NUMBER', 'CONTRACT NUMBER',
'TYPE', 'TYPE',
@ -812,7 +813,6 @@ def renewal_net_invest_trial_balance(report: str, save_name: str):
'TOTAL', 'TOTAL',
'REM RENT RCVB', 'REM RENT RCVB',
]] ]]
print("dfs rearragned | Savings")
with pd.ExcelWriter(save_name) as writer: with pd.ExcelWriter(save_name) as writer:
dataframe.to_excel(writer, index=False, sheet_name="data") dataframe.to_excel(writer, index=False, sheet_name="data")
summary.to_excel( summary.to_excel(
@ -847,24 +847,16 @@ def unapplied(report: str, save_name: str):
# this allows us to also work in the 'report' structure so that we can # this allows us to also work in the 'report' structure so that we can
# grab the customer name from the line proceding the data # grab the customer name from the line proceding the data
data_extractor = create_line_divider([9, 29, 38, 50, 65, 80, 89, 108]) data_extractor = create_line_divider([9, 29, 38, 50, 65, 80, 89, 108])
bank_num_reg = ".*\s*\d\d\d\.\d\d\s.*PAGE" for index, line in enumerate(lines):
#current_bank_num = np.NaN if (re.search("\d{7}", str(data_extractor(0, line))) != None) &\
#bank_nums = [] (re.search("\d{2}/\d{2}/\d{4}", str(data_extractor(3, line))) != None):
trans_num = "\d{7}"
for line in enumerate(lines):
if (re.search("\d{7}", str(data_extractor(0, line[1], debug=False))) != None) &\
(re.search("\d{2}/\d{2}/\d{4}", str(data_extractor(3, line[1], debug=False))) != None):
[extracted_data_dict[columns[c]].append( [extracted_data_dict[columns[c]].append(
data_extractor(c, line[1])) for c in range(0, 9)] data_extractor(c, line)) for c in range(0, 9)]
[extracted_data_dict[columns[8+c]].append(data_extractor( [extracted_data_dict[columns[8+c]].append(data_extractor(
c, lines[line[0]+1])) for c in range(1, len(columns)-8)] c, lines[index+1])) for c in range(1, len(columns)-8)]
#bank_nums.append(current_bank_num)
#elif re.search(bank_num_reg, line[1]) != None: dataframe = DataFrame(extracted_data_dict)
#current_bank_num = re.search("\d\d\d\.\d\d", line[1]).group(0)
dataframe = pd.DataFrame(extracted_data_dict)
dataframe["ReverseAmt"] = [np.NaN for _ in range(0, len(dataframe))] dataframe["ReverseAmt"] = [np.NaN for _ in range(0, len(dataframe))]
#dataframe["Bank_"]
dataframe = dataframe[[ dataframe = dataframe[[
'Trans Num', 'Trans Num',
'ContractNumber', 'ContractNumber',
@ -884,13 +876,10 @@ def unapplied(report: str, save_name: str):
'PaymentMemo', 'PaymentMemo',
'Check Amt', 'Check Amt',
]] ]]
dataframe.to_excel(save_name, index=False) dataframe.to_excel(save_name, index=False)
print(dataframe)
return dataframe return dataframe
def pastdue(report: str, save_name: str): def pastdue(report: str, save_name: str):
print("Running past due")
lines = report.splitlines() lines = report.splitlines()
extracted_data_dict = { extracted_data_dict = {
"Contract Number": [], "Contract Number": [],
@ -917,14 +906,14 @@ def pastdue(report: str, save_name: str):
# These are the line spaces where each column is held # These are the line spaces where each column is held
slots = [(0,16), (5,16),(389,405),(126,141),(16,36),(37,67),(68,74),(75,93),(94,111),(168,180),\ slots = [(0,16), (5,16),(389,405),(126,141),(16,36),(37,67),(68,74),(75,93),(94,111),(168,180),\
(190,204),(204,225), (242,253), (225,241), (436,444), (445,461), (462,469), (470,478)] (190,204),(204,225), (242,253), (225,241), (436,444), (445,461), (462,469), (470,478)]
for line in enumerate(lines): for _index, line in enumerate(lines):
if re.search(contract_number_regex, line[1]) != None: if re.search(contract_number_regex, line) != None:
# goes through the column names (by number) then gets the charcter slot (start and end) # goes through the column names (by number) then gets the charcter slot (start and end)
[extracted_data_dict[columns[c]].append((line[1][slots[c][0]:slots[c][1]]).strip()) for c in range(0, len(columns))] [extracted_data_dict[columns[c]].append((line[slots[c][0]:slots[c][1]]).strip()) for c in range(0, len(columns))]
# This regex finds lines with only a name in them | (blank in the beginig then atleast one character) # This regex finds lines with only a name in them | (blank in the beginig then atleast one character)
elif re.search("\s{38}\w+", line[1]) != None: elif re.search("\s{38}\w+", line) != None:
extracted_data_dict["Cust Name"][-1] = (extracted_data_dict["Cust Name"][-1] + line[1][37:67]).strip() extracted_data_dict["Cust Name"][-1] = (extracted_data_dict["Cust Name"][-1] + line[37:67]).strip()
dataframe = pd.DataFrame(extracted_data_dict) dataframe = DataFrame(extracted_data_dict)
dataframe = dataframe.astype( dataframe = dataframe.astype(
{"Past Due Rental": "float", "Current Rent": "float", "Branch": "int32", {"Past Due Rental": "float", "Current Rent": "float", "Branch": "int32",
"Blend NIV": "float", "Delinq Code": "int32", "Due Day":"int32", "Invoice LEAD Days": "int32", "ACH LEAD Days": "int32" "Blend NIV": "float", "Delinq Code": "int32", "Due Day":"int32", "Invoice LEAD Days": "int32", "ACH LEAD Days": "int32"

Loading…
Cancel
Save