import pandas as pd import sqlalchemy as sqa import re import businesstimedelta import pytz from datetime import time, datetime as dt import sys def get_data(login: str, startDate: str, endDate: str, sqlQuery: str) -> pd.DataFrame: """ Connects to the OnBase database on LPP-SQL01 and runs the query specified in OnBaseSearchQuery.txt login : The login information used to authenticate with the MSSQL server. The user needs read permission on the Onbase database startDate : The earliest dated record to pull. Format = mm/dd/yyyy endDate : The most recent record to pull. Format = mm/dd/yyyy """ connStr = f"mssql+pymssql://leafnow.com\{login}@LPP-SQL01" try: print(f"Connectiong to SQL database...") with sqa.create_engine(connStr).connect() as con: print("Pulling data...") with open('OnBaseSearchQuery.txt') as obQueryFile: filledQuery = sqlQuery.replace("REPLACE_START_DATE", startDate).replace("REPLACE_END_DATE", endDate) try: result = con.execute(filledQuery).all() try: dataframe = pd.DataFrame(result) assert len(dataframe) > 0, f"No data in dataframe: {dataframe}\nQuery result: {result}" return dataframe except: print(f"Failed to create a dataframe from SQL result:\n{result}") sys.exit(2) except Exception as e: print(f"Failed to pull data from SQL:\n{filledQuery}\n{e}") sys.exit(2) except Exception as e: print(f"Failed to connect to SQL:\n{e}\nPlease make sure your username and password are correct!\tlogin: {login}") sys.exit(2) def inital_data_processing(raw_report: pd.DataFrame) -> pd.DataFrame: """ Takes in a dataframe of ACH verification entries from the Onbase database. This dataframe is based on the returns in the SQL query in OnBaseSearchQuery.txt. The return adds a number of columns to the data: - APTurnAround: Total time between a report entering and exiting the ACH Verification queue - AttemptTimeDif: Time between first and second attempt - TimeToFirstAttempt: Time intil making the first attempt All time measurments are in business hours """ # Define business hours # Currently 7am to 6pm eastern time workday = businesstimedelta.WorkDayRule( start_time= time(7), end_time= time(18), working_days=[0, 1, 2, 3, 4], tz=pytz.timezone("US/Eastern")) businesshrs = businesstimedelta.Rules([workday]) # Convert columns to datetime] date_time_format = "%Y-%m-%d %H:%M:%S.%f" raw_report["StatusDateTime"] = pd.to_datetime(raw_report["StatusDateTime"], format="%b %d %Y %I:%M%p") raw_report["firstattemptdate"] = pd.to_datetime(raw_report["firstattemptdate"], format=date_time_format) raw_report["secondattemptdate"] = pd.to_datetime(raw_report["secondattemptdate"], format=date_time_format) raw_report["QueueEntryTime"] = pd.to_datetime(raw_report["QueueEntryTime"], format=date_time_format) raw_report["QueueExitTime"] = pd.to_datetime(raw_report["QueueExitTime"], format=date_time_format) # Add calculated time columns # Check to make sure the columns being used are valid otherwise fill with None raw_report["APTurnAround"] = raw_report.apply(lambda row: businesshrs.difference(row.QueueEntryTime, row.QueueExitTime).timedelta.total_seconds() / 60**2 if row.QueueExitTime > dt(1965,1,1) and row.QueueExitTime > row.QueueEntryTime else None, axis = 1) raw_report["AttemptTimeDif"] = raw_report.apply(lambda row: businesshrs.difference(row.firstattemptdate, row.secondattemptdate).timedelta.total_seconds() / 60**2 if (row.secondattempt != None and row.firstattempt != None) and (row.secondattemptdate > row.firstattemptdate) else None, axis = 1) raw_report["TimeToFirstAttempt"] = raw_report.apply(lambda row: businesshrs.difference(row.QueueEntryTime, row.firstattemptdate).timedelta.total_seconds() / 60**2 if row.firstattempt != None and row.QueueEntryTime > dt(1965,1,1) else None, axis = 1) return raw_report