You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
168 lines
7.8 KiB
168 lines
7.8 KiB
import os
|
|
import pandas as pd
|
|
from datetime import datetime as dt, timedelta
|
|
import sys, getopt
|
|
import re
|
|
from pathlib import Path
|
|
import time
|
|
import numpy as np
|
|
from pprint import pprint as prt
|
|
|
|
|
|
contract_number_regex = "\d{3}-\d{7}-\d{3}"
|
|
|
|
def dict_lens(dictionary):
|
|
columns = list(dictionary.keys())
|
|
for c in columns:
|
|
print(f"{c} : {len(dictionary[c])}")
|
|
|
|
|
|
|
|
def create_line_divider(breakage_list: list):
|
|
"""
|
|
This allows for the creation of a custom data extractor
|
|
Breakage list defines the split points that will be used for the line
|
|
Example
|
|
Given breakage_list [10, 20, 30]
|
|
using slot_num 0 in the resulting extract_line_slot will yield
|
|
characters 0 - 10 from the string.
|
|
Slot 1 would give characters 10 - 20
|
|
"""
|
|
def extract_line_slot(slot_num : int, line_string: str, debug : bool = False):
|
|
"""
|
|
Pulls data from a line/string using break points defined by the
|
|
parent function.
|
|
ONLY USE THIS FUNCTION THROUGH CREATION USING 'create_line_extractor'
|
|
Will automatically convert numbers to floats
|
|
"""
|
|
# We can't have a slot number higher than the number of slots
|
|
assert(slot_num < len(breakage_list)+1)
|
|
low_range = 0 if slot_num == 0 else breakage_list[slot_num-1]
|
|
high_range = len(line_string) if slot_num == len(breakage_list) else breakage_list[slot_num]
|
|
# In order to create a float we need to remove the , from the string
|
|
data = line_string[low_range:high_range].strip().replace(",", "")
|
|
try: data = float(data)
|
|
except: pass
|
|
if debug:
|
|
print(f"Slot num: {slot_num} | Low: {low_range} | High: {high_range} | Data: {data}")
|
|
return data
|
|
return extract_line_slot
|
|
|
|
def lockbox(report: str, save_name: str):
|
|
lines = report.splitlines()
|
|
extracted_data_dict = {
|
|
"CustomerName" : [],
|
|
"PaymentDate" : [],
|
|
"InvoiceNumber" : [],
|
|
"CheckNumber" : [],
|
|
"InvoicePayment" : [],
|
|
"ContractNumber" : [],
|
|
"ContractPayment" : [],
|
|
}
|
|
# These are lists of the dictionary columns/keys and the data slots in which
|
|
# that data can be found in the report. this way we can iterate through them
|
|
# While extracting data
|
|
bank_payment_records = [list(extracted_data_dict.keys())[1:5],[1,2,3,4]]
|
|
infolease_payment_records = [list(extracted_data_dict.keys())[5:],[7,8]]
|
|
|
|
# Below are the Regular Exppressions used to find relvant data lines
|
|
full_line = "\d*\s{5}\d{2}/\d{2}/\d{4}\s{4}1"
|
|
contract_only_line = "\s{90}\d.{7}1\d{2}-"
|
|
cust_name_line = "\s{98}.{28}\D*"
|
|
# The data extractor allows us to extract data from the report using slots
|
|
# Slots are ranges of character denote by the list feed into the creation function
|
|
data_extractor = create_line_divider([9,19,39,56,69,90,98,118])
|
|
for line in enumerate(lines):
|
|
# We can skip empty lines
|
|
if len(line[1]) == 0: continue
|
|
# First we should check if there is a full line of data (defined by regex)
|
|
if re.search(full_line, line[1]):
|
|
# If this is true then we can iterate through the lists we created earlier and append the data to our dict
|
|
for k in range(0,len(bank_payment_records[0])):
|
|
extracted_data_dict[bank_payment_records[0][k]].append(data_extractor(bank_payment_records[1][k],line[1]))
|
|
for k in range(0,len(infolease_payment_records[0])):
|
|
extracted_data_dict[infolease_payment_records[0][k]].append(data_extractor(infolease_payment_records[1][k],line[1]))
|
|
# Otherwise we should check if this is a line with only contract data
|
|
elif re.search(contract_only_line,line[1]):
|
|
# If that's the case we can use the 'bank payment data' from the previous entry since it should apply to his contract
|
|
for k in range(0,len(bank_payment_records[0])):
|
|
extracted_data_dict[bank_payment_records[0][k]].append(extracted_data_dict[bank_payment_records[0][k]][-1])
|
|
for k in range(0,len(infolease_payment_records[0])):
|
|
extracted_data_dict[infolease_payment_records[0][k]].append(data_extractor(infolease_payment_records[1][k],line[1]))
|
|
# If it doesn't hit either of these critera then continue since it's irelevant data
|
|
else: continue
|
|
i = 1
|
|
# used to track how many lines below the current line we're looking for the customer name
|
|
# keep moving down a line and checking for a customer name
|
|
# Customer name typically happens 1 line under data but can be 13 lines if cut off by page end
|
|
while re.search(cust_name_line,lines[line[0]+i]) == None:
|
|
i += 1
|
|
# Once it hits, add the name to the dict
|
|
extracted_data_dict["CustomerName"].append(data_extractor(7,lines[line[0]+i]))
|
|
dataframe = pd.DataFrame(extracted_data_dict)
|
|
dataframe.to_excel(save_name, index=False)
|
|
return dataframe
|
|
|
|
|
|
def lb2(report:str, save_name:str):
|
|
lines = report.splitlines()
|
|
extracted_data_dict = {
|
|
"SEQ" : [],
|
|
"PYMT DATE" : [],
|
|
"INV NUM" : [],
|
|
"CHECK NUMBER" : [],
|
|
"PAYMENT AMOUNT" : [],
|
|
"NOTE" : [],
|
|
"IL SEQ" : [],
|
|
"CONTRACT NUM" : [],
|
|
"IL PAYMENT AMOUNT" : [],
|
|
"CUST NAME" : [],
|
|
}
|
|
columns = list(extracted_data_dict.keys())
|
|
data_extractor = create_line_divider([9,19,39,56,69,89,98,118])
|
|
for line in enumerate(lines):
|
|
match = False
|
|
# Try to find the first SEQ # & a contract payment date e.i. ' 197 05/10/2022'
|
|
if re.match("(\s|\d){3}\d{1}\s{5}\d{2}/\d{2}/\d{4}", line[1]):
|
|
match = True
|
|
# Add all of the data points except customer name
|
|
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1],debug=False)) for c in range(0,len(columns)-1)]
|
|
# Check to see if this line contains only an infolease payment
|
|
# Some times there are multiple infolease payments for a single bank record
|
|
elif re.search(contract_number_regex, line[1]) != None:
|
|
match = True
|
|
# If there is then we can add the same data as the previous complete line
|
|
[extracted_data_dict[columns[c]].append(extracted_data_dict[columns[c]][-1]) for c in range(0,6)]
|
|
# Then add the new data for the infolease contract
|
|
[extracted_data_dict[columns[c]].append(data_extractor(c,line[1],debug=False)) for c in range(6,len(columns)-1)]
|
|
# If we had a match we need a customer name to associate with it
|
|
# Sometimes these can appear on the next page hense the while loop searching for a match
|
|
if match:
|
|
# We can tell the cust name will be on the next page if the word "PAGE" appears three lines under the current line
|
|
# And the next line is blank
|
|
if (lines[line[0]+1].strip() == "") & (lines[line[0]+3].find("PAGE") != -1):
|
|
i = 0
|
|
# Look for a bunch of whitespace then some writing
|
|
while not re.match("\s{98}.{34}", lines[line[0]+i]):
|
|
i +=1
|
|
# Once we find it add the cust name to the dict (it's the only thing on the line)
|
|
extracted_data_dict["CUST NAME"].append(lines[line[0]+i].strip())
|
|
# if the condition above isnt met then the cust name is on the next line (even if that line is blank)
|
|
else:
|
|
extracted_data_dict["CUST NAME"].append(lines[line[0]+1].strip())
|
|
dataframe = pd.DataFrame(extracted_data_dict)
|
|
dataframe.to_excel(save_name, index=False)
|
|
return dataframe
|
|
|
|
|
|
r1 = "/config/workspace/LEAF/IL Extract SRC/lb_errors/2022.05.10_LOCKBOX_094_C"
|
|
r2 = "/config/workspace/LEAF/IL Extract SRC/lb_errors/2022.05.11_LOCKBOX_094_C"
|
|
|
|
with open(r1, errors="replace") as ifile:
|
|
report = ifile.read()
|
|
|
|
lb2(report, "test_lb_0510.xlsx")
|
|
|
|
with open(r2, errors="replace") as ifile:
|
|
report = ifile.read()
|
|
lb2(report, "test_lb_0511.xlsx") |