|
|
|
|
|
import pickle |
|
import streamlit as st |
|
|
|
|
|
import numpy as np |
|
|
|
|
|
import pandas as pd |
|
|
|
|
|
from numpy import argmax |
|
|
|
|
|
|
|
|
|
from sklearn.linear_model import LogisticRegression |
|
|
|
from sklearn.model_selection import StratifiedKFold |
|
|
|
|
|
import xgboost as xgb |
|
|
|
|
|
|
|
|
|
|
|
from sklearn.metrics import classification_report |
|
|
|
|
|
from sklearn.metrics import roc_curve |
|
|
|
|
|
|
|
from sklearn.model_selection import cross_val_score |
|
|
|
|
|
|
|
|
|
|
|
def drop_columns(df, columns): |
|
return df.drop(columns, axis=1) |
|
|
|
|
|
def remove_less_than_0_columns(df, column): |
|
df[column].dropna() |
|
return df.loc[(df[column] != 0).any(1)] |
|
|
|
|
|
def boolean_int_condition_label(df, label_column_name, condition): |
|
df[label_column_name] = condition |
|
y = df[label_column_name].astype(int) |
|
df = drop_columns(df, label_column_name) |
|
return y, df |
|
|
|
|
|
@st.cache(suppress_st_warning=True) |
|
def undersample_training_data( |
|
df: pd.DataFrame, column_name: str, split_dataset |
|
): |
|
count_nondefault, count_default = split_dataset.X_y_train[ |
|
column_name |
|
].value_counts() |
|
|
|
nondefaults = df[df[column_name] == 0] |
|
|
|
defaults = df[df[column_name] == 1] |
|
|
|
under_sample = min(count_nondefault, count_default) |
|
|
|
nondefaults_under = nondefaults.sample(under_sample) |
|
|
|
defaults_under = defaults.sample(under_sample) |
|
|
|
X_y_train_under = pd.concat( |
|
[ |
|
nondefaults_under.reset_index(drop=True), |
|
defaults_under.reset_index(drop=True), |
|
], |
|
axis=0, |
|
) |
|
|
|
X_train_under = X_y_train_under.drop([column_name], axis=1) |
|
|
|
y_train_under = X_y_train_under[column_name] |
|
|
|
class_balance_default = X_y_train_under[column_name].value_counts() |
|
|
|
return [ |
|
X_train_under, |
|
y_train_under, |
|
X_y_train_under, |
|
class_balance_default, |
|
] |
|
|
|
|
|
def create_coeffient_feature_dictionary_logistic_model( |
|
logistic_model, training_data |
|
): |
|
return { |
|
feat: coef |
|
for coef, feat in zip( |
|
logistic_model.coef_[0, :], training_data.columns |
|
) |
|
} |
|
|
|
|
|
@st.cache(suppress_st_warning=True) |
|
def test_variables_logistic(X_train, y_train): |
|
|
|
return LogisticRegression(solver="lbfgs").fit(X_train, np.ravel(y_train)) |
|
|
|
|
|
@st.cache(suppress_st_warning=True) |
|
def print_coeff_logistic(clf_logistic_model, split_dataset): |
|
|
|
return create_coeffient_feature_dictionary_logistic_model( |
|
clf_logistic_model, split_dataset.X_train |
|
) |
|
|
|
|
|
@st.cache(suppress_st_warning=True, hash_funcs={ |
|
xgb.XGBClassifier: pickle.dumps |
|
}) |
|
def test_variables_gbt(X_train, y_train): |
|
|
|
return xgb.XGBClassifier( |
|
learning_rate=0.1, |
|
max_depth=7, |
|
use_label_encoder=False, |
|
eval_metric="logloss", |
|
).fit(X_train, np.ravel(y_train), eval_metric="logloss") |
|
|
|
|
|
|
|
|
|
|
|
def get_df_trueStatus_probabilityDefault_threshStatus_loanAmount( |
|
model, X, y, threshold, loan_amount_col_name |
|
): |
|
true_status = y.to_frame() |
|
|
|
loan_amount = X[loan_amount_col_name] |
|
|
|
clf_prediction_prob = model.predict_proba(np.ascontiguousarray(X)) |
|
|
|
clf_prediction_prob_df = pd.DataFrame( |
|
clf_prediction_prob[:, 1], columns=["PROB_DEFAULT"] |
|
) |
|
|
|
clf_thresh_predicted_default_status = ( |
|
clf_prediction_prob_df["PROB_DEFAULT"] |
|
.apply(lambda x: 1 if x > threshold else 0) |
|
.rename("PREDICT_DEFAULT_STATUS") |
|
) |
|
|
|
return pd.concat( |
|
[ |
|
true_status.reset_index(drop=True), |
|
clf_prediction_prob_df.reset_index(drop=True), |
|
clf_thresh_predicted_default_status.reset_index(drop=True), |
|
loan_amount.reset_index(drop=True), |
|
], |
|
axis=1, |
|
) |
|
|
|
|
|
def find_best_threshold_J_statistic(y, clf_prediction_prob_df): |
|
fpr, tpr, thresholds = roc_curve(y, clf_prediction_prob_df) |
|
|
|
|
|
|
|
|
|
return thresholds[argmax(tpr - fpr)] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def model_probability_values_df(model, X): |
|
return pd.DataFrame(model.predict_proba(X)[:, 1], columns=["PROB_DEFAULT"]) |
|
|
|
|
|
def apply_threshold_to_probability_values(probability_values, threshold): |
|
return ( |
|
probability_values["PROB_DEFAULT"] |
|
.apply(lambda x: 1 if x > threshold else 0) |
|
.rename("PREDICT_DEFAULT_STATUS") |
|
) |
|
|
|
|
|
@st.cache(suppress_st_warning=True) |
|
def find_best_threshold_J_statistic(y, clf_prediction_prob_df): |
|
fpr, tpr, thresholds = roc_curve(y, clf_prediction_prob_df) |
|
|
|
J = tpr - fpr |
|
ix = argmax(J) |
|
return thresholds[ix] |
|
|
|
|
|
|
|
|
|
|
|
def create_cross_validation_df( |
|
X, y, eval_metric, seed, trees, n_folds, early_stopping_rounds |
|
): |
|
|
|
DTrain = xgb.DMatrix(X, label=y) |
|
|
|
|
|
params = { |
|
"eval_metric": eval_metric, |
|
"objective": "binary:logistic", |
|
"seed": seed, |
|
} |
|
|
|
|
|
cv_df = xgb.cv( |
|
params, |
|
DTrain, |
|
num_boost_round=trees, |
|
nfold=n_folds, |
|
early_stopping_rounds=early_stopping_rounds, |
|
shuffle=True, |
|
) |
|
|
|
return [DTrain, cv_df] |
|
|
|
|
|
|
|
|
|
|
|
def cross_validation_scores(model, X, y, nfold, score, seed): |
|
|
|
return cross_val_score( |
|
model, |
|
np.ascontiguousarray(X), |
|
np.ravel(np.ascontiguousarray(y)), |
|
cv=StratifiedKFold(n_splits=nfold, shuffle=True, random_state=seed), |
|
scoring=score, |
|
) |
|
|
|
|
|
def default_status_per_threshold(threshold_list, prob_default): |
|
threshold_default_status_list = [] |
|
for threshold in threshold_list: |
|
threshold_default_status = prob_default.apply( |
|
lambda x: 1 if x > threshold else 0 |
|
) |
|
threshold_default_status_list.append(threshold_default_status) |
|
return threshold_default_status_list |
|
|
|
|
|
def classification_report_per_threshold( |
|
threshold_list, threshold_default_status_list, y_test |
|
): |
|
target_names = ["Non-Default", "Default"] |
|
classification_report_list = [] |
|
for threshold_default_status in threshold_default_status_list: |
|
thresh_classification_report = classification_report( |
|
y_test, |
|
threshold_default_status, |
|
target_names=target_names, |
|
output_dict=True, |
|
zero_division=0, |
|
) |
|
classification_report_list.append(thresh_classification_report) |
|
|
|
return dict(zip(threshold_list, classification_report_list)) |
|
|
|
|
|
def thresh_classification_report_recall_accuracy( |
|
thresh_classification_report_dict, |
|
): |
|
thresh_def_recalls_list = [] |
|
thresh_nondef_recalls_list = [] |
|
thresh_accs_list = [] |
|
for x in [*thresh_classification_report_dict]: |
|
thresh_def_recall = thresh_classification_report_dict[x]["Default"][ |
|
"recall" |
|
] |
|
thresh_def_recalls_list.append(thresh_def_recall) |
|
thresh_nondef_recall = thresh_classification_report_dict[x][ |
|
"Non-Default" |
|
]["recall"] |
|
thresh_nondef_recalls_list.append(thresh_nondef_recall) |
|
thresh_accs = thresh_classification_report_dict[x]["accuracy"] |
|
thresh_accs_list.append(thresh_accs) |
|
return [ |
|
thresh_def_recalls_list, |
|
thresh_nondef_recalls_list, |
|
thresh_accs_list, |
|
] |
|
|
|
|
|
def create_accept_rate_list(start, end, samples): |
|
return np.linspace(start, end, samples, endpoint=True) |
|
|
|
|
|
def create_strategyTable_df( |
|
start, end, samples, actual_probability_predicted_acc_rate, true, currency |
|
): |
|
accept_rates = create_accept_rate_list(start, end, samples) |
|
thresholds_strat = [] |
|
bad_rates_start = [] |
|
Avg_Loan_Amnt = actual_probability_predicted_acc_rate[true].mean() |
|
num_accepted_loans_start = [] |
|
|
|
for rate in accept_rates: |
|
|
|
thresh = np.quantile( |
|
actual_probability_predicted_acc_rate["PROB_DEFAULT"], rate |
|
).round(3) |
|
|
|
thresholds_strat.append( |
|
np.quantile( |
|
actual_probability_predicted_acc_rate["PROB_DEFAULT"], rate |
|
).round(3) |
|
) |
|
|
|
|
|
actual_probability_predicted_acc_rate[ |
|
"PREDICT_DEFAULT_STATUS" |
|
] = actual_probability_predicted_acc_rate["PROB_DEFAULT"].apply( |
|
lambda x: 1 if x > thresh else 0 |
|
) |
|
|
|
|
|
accepted_loans = actual_probability_predicted_acc_rate[ |
|
actual_probability_predicted_acc_rate["PREDICT_DEFAULT_STATUS"] |
|
== 0 |
|
] |
|
|
|
bad_rates_start.append( |
|
np.sum((accepted_loans[true]) / len(accepted_loans[true])).round(3) |
|
) |
|
|
|
num_accepted_loans_start.append(len(accepted_loans)) |
|
|
|
|
|
money_accepted_loans = [ |
|
accepted_loans * Avg_Loan_Amnt |
|
for accepted_loans in num_accepted_loans_start |
|
] |
|
|
|
money_bad_accepted_loans = [ |
|
2 * money_accepted_loan * bad_rate |
|
for money_accepted_loan, bad_rate in zip( |
|
money_accepted_loans, bad_rates_start |
|
) |
|
] |
|
|
|
zip_object = zip(money_accepted_loans, money_bad_accepted_loans) |
|
estimated_value = [ |
|
money_accepted_loan - money_bad_accepted_loan |
|
for money_accepted_loan, money_bad_accepted_loan in zip_object |
|
] |
|
|
|
accept_rates = ["{:.2f}".format(elem) for elem in accept_rates] |
|
|
|
thresholds_strat = ["{:.2f}".format(elem) for elem in thresholds_strat] |
|
|
|
bad_rates_start = ["{:.2f}".format(elem) for elem in bad_rates_start] |
|
|
|
estimated_value = ["{:.2f}".format(elem) for elem in estimated_value] |
|
|
|
return ( |
|
pd.DataFrame( |
|
zip( |
|
accept_rates, |
|
thresholds_strat, |
|
bad_rates_start, |
|
num_accepted_loans_start, |
|
estimated_value, |
|
), |
|
columns=[ |
|
"Acceptance Rate", |
|
"Threshold", |
|
"Bad Rate", |
|
"Num Accepted Loans", |
|
f"Estimated Value ({currency})", |
|
], |
|
) |
|
.sort_values(by="Acceptance Rate", axis=0, ascending=False) |
|
.reset_index(drop=True) |
|
) |
|
|