# -*- coding: utf-8 -*- print('''脚本说明: 基于GiveMeSomeCredit数据集,构建贷款申请评分卡并生成建模报告 ''') #导入包 import pandas import numpy from sklearn.tree import DecisionTreeClassifier from statsmodels.stats.outliers_influence import variance_inflation_factor from sklearn.model_selection import GridSearchCV from sklearn.linear_model import LogisticRegression from sklearn.metrics import roc_curve from jinja2 import Environment, FileSystemLoader import time import warnings #忽略警告 warnings.simplefilter('ignore') import sys sys.path.append('..') from utils.mysql import MySQL from utils.pandas2chart import Pandas2chart #本脚本中调用函数 ''' 函数说明:基于决策树最优分箱并证据权重编码 参数说明: 数据集,格式为Pandas.DataFrame,第一列为目标变量,其它为特征变量 返回说明: 数据格式:Pandas.DataFrame ''' def OptimalEncodeByWOE(dataset): dataset_woe = dataset.copy() #目标变量名 dependent = dataset_woe.columns[0] #特征变量名 independents = dataset_woe.columns[1: ] #字典,用于记录特征变量各箱证据权重 dictionary = pandas.DataFrame() #遍历特征变量 for independent in independents: print('正在就特征变量 %s 基于决策树最优分箱并证据权重编码...' % independent, end = '') #按照特征变量是否包含缺失值划分目标变量 y = dataset_woe.loc[dataset_woe[independent].notna(), dependent] #按照是否包含缺失值划分特征变量 x = dataset_woe.loc[dataset_woe[independent].notna(), independent] #遍历分箱数,取特征变量值数和5的最小值作为最大分箱数,2作为最小分箱数 for bins in range(min(len(x.value_counts()), 5), 1, -1): #创建决策树分类器,每箱最小样本数占比为5% decision_tree = DecisionTreeClassifier(max_leaf_nodes = bins, min_samples_leaf = 0.05, class_weight = 'balanced').fit(x.to_numpy().reshape(-1, 1), y.to_numpy()) #切点 tangencies = [] #遍历节点 for i in range(decision_tree.tree_.node_count) : #若决策树某节点的左子节点和右子节点不同,则将该节点作为切点 if decision_tree.tree_.children_left[i] != decision_tree.tree_.children_right[i] : tangencies.append(decision_tree.tree_.threshold[i]) tangencies.sort() #添加边界点 tangencies = [x.min() - 0.01] + tangencies + [x.max() + 0.01] #特征变量分箱 dataset_woe.loc[dataset_woe[independent].notna(), independent] = pandas.cut(x = x.to_numpy(), bins = tangencies, right = False) #按照特征变量分组 woe = dataset_woe.loc[dataset_woe[independent].notna(), [dependent, independent]].groupby(by = independent)[dependent].agg(func = [('positives', lambda x : (x == 1).sum()), ('negatives', lambda x : (x == 0).sum())]) #重置索引 woe.reset_index(inplace = True) woe.rename(columns = {independent: 'bin'}, inplace = True) #若特征变量包含缺失值,则将缺失值单独作为一箱 if len(dataset_woe.loc[dataset_woe[independent].isna()]) > 0: #统计特征变量包含缺失值样本中阳性样本数和阴性样本数 woe.loc[len(woe)] = {'bin': numpy.nan, 'positives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 1), independent]), 'negatives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 0), independent])} #统计样本数 woe['samples'] = woe.apply(lambda x: x['positives'] + x['negatives'], axis = 'columns') #统计阳性样本数占比 woe['proportion_positive'] = round(woe['positives'] / woe['positives'].sum(), 2) #统计阴性样本数占比 woe['proportion_negative'] = round(woe['negatives'] / woe['negatives'].sum(), 2) #统计证据权重 woe['woe'] = round(numpy.log((woe['proportion_positive'] + 0.01) / (woe['proportion_negative'] + 0.01)), 2) #统计信息价值 woe['iv'] = round((woe['proportion_positive'] - woe['proportion_negative']) * woe['woe'], 2) #按照分箱是否包含缺失值划分 woe_notna = woe.loc[woe['bin'].notna()].reset_index(drop = True) #单调性检验 monotonicity = [((woe_notna.loc[i, 'woe'] <= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] <= woe_notna.loc[i, 'woe'])) | ((woe_notna.loc[i, 'woe'] >= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] >= woe_notna.loc[i, 'woe'])) for i in range(1, woe_notna.shape[0] - 1)] #若通过单调性检验,则将特征变量证据权重编码 if False not in monotonicity: dataset_woe[independent].replace(woe['bin'].to_numpy(), woe['woe'].to_numpy(), inplace = True) woe['independent'] = independent dictionary = pandas.concat([dictionary, woe]) print('已完成') print() break return dataset_woe, dictionary #若本脚本被调用报错 if __name__ != '__main__': print('本脚本不允许被调用') print() exit() print('1、连接数据库查表并保存至数据集...', end = '') dataset = MySQL(database = 'data_analysis').query('select * from credit_dataset') if isinstance(dataset, str): print('连接失败,请检查数据库连接是否正常') print() exit() print('已完成') print() #目标变量名,第一列即为目标变量 dependent = dataset.columns[0] #检查目标变量值是否为0或1 if not ((dataset[dependent] == 0) | (dataset[dependent] == 1)).all(): print('第一列应为目标变量且值应为0或1,脚本终止!') print() exit() #统计样本数 samples = dataset.shape[0] #特征变量名 independents = dataset.columns[1: ] #统计特征变量数 variables_independent = len(independents) print('数据集样本数为 %d 份,特征变量数为 %d 个。' % (samples, variables_independent)) print() #考虑变量数较多转置并重命名 Pandas2chart(dataset = dataset.loc[1:4, :].T.reset_index().rename(columns = {'index': '变量名', 1: '样本1', 2: '样本2', 3: '样本3', 4: '样本4'}), type = 'table', path = './reports/scorecard_report/dataset_preview.html') print('2、预处理') print() print('2.1 清洗数据...', end = '') #删除目标变量包含缺失值的样本 dataset.dropna(subset = dependent, inplace = True) #删除重复样本(仅保留第一份) dataset.drop_duplicates(inplace = True) print('已完成') print() #统计样本数 samples = dataset.shape[0] print('处理后,数据集样本数为 %d 份。' % samples) print() print('2.2 处理缺失值...', end = '') print('在特征变量证据权重编码时,将对缺失值单独作为一箱,本节点略过') print() print('2.3 处理异常值...', end = '') print('在特征变量证据权重编码时,可以消除异常值的影响,本节点略过') print() print('2.4 特征变量最优分箱并证据权重编码') print() dataset_woe, dictionary = OptimalEncodeByWOE(dataset) Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'samples', 'woe']].rename(columns = {'bin': '分箱', 'samples': '样本数', 'woe': '证据权重'}), type = 'bar+line', path = './reports/scorecard_report/dictionary.html') print('3、选择特征变量') print('') #统计报告 statistics = pandas.DataFrame(data = independents, columns = ['independent']) print('3.1 基于信息价值选择特征变量...', end = '') #变量特征变量 for independent in independents: #统计特征变量信息价值 statistics.loc[statistics['independent'] == independent, 'iv'] = dictionary.loc[dictionary['independent'] == independent, 'iv'].sum() #选择信息价值大于等于阈值的特征变量(0.1为低水平预测能力,0.3为中水平预测能力,本次建模选择0.1作为阈值) statistics = statistics.loc[statistics['iv'] >= 0.1] independents = statistics['independent'].tolist() print('已完成') print() #统计特征变量数 variables_independent = len(independents) print('处理后,特征变量数为 %d 个。' % variables_independent) print() print('3.2 基于后向淘汰条件选择特征变量(基于归回系数和方差扩大因子)') print() parameters = { #测试 'l1_ratio': [0.5], #'l1_ratio': [0, 0.25, 0.5, 0.75, 1], #测试 'C': [1.1] #'C': [0.001, 0.01, 0.1, 1.1, 10.1, 100.1, 1000.1] } #创建带交叉验证的参数网格搜索模型 model = GridSearchCV(estimator = LogisticRegression(solver = 'saga', penalty = 'elasticnet', class_weight = 'balanced'), param_grid = parameters, scoring = 'roc_auc', refit = True) while True: model.fit(dataset_woe[independents].to_numpy(), dataset_woe[dependent].to_numpy()) #统计回归系数 statistics['coefficient'] = model.best_estimator_.coef_[0, :] #统计方差扩大因子 statistics['vif'] = [variance_inflation_factor(dataset_woe[independents].assign(constant = 1).to_numpy(), i) for i in range(len(independents) + 1)][: -1] #按照方差扩大因子降序排序 statistics.sort_values(by = 'vif', ascending = False, inplace = True) independents = statistics['independent'].tolist() #统计回归系数大于等于0.1且方差扩大因子小于等于10的特征变量 statistics = statistics.loc[(statistics['coefficient'] >= 0.1) & (statistics['vif'] <= 10)] #淘汰特征变量 obsolescence = [independent for independent in independents if independent not in statistics['independent'].tolist()] if obsolescence != []: #淘汰最大方差扩大因子的特征变量 independents.remove(obsolescence[0]) print('特征变量 %s 满足淘汰条件,继续后进' % obsolescence[0]) print('') else: print('所有特征变量不满足淘汰条件,停止后进') print('') break #统计特征变量数 variables_independent = len(independents) print('处理后,特征变量数为 %d 个。' % variables_independent) print() #统计假阳率和真阳率 fpr, tpr, thresholds = roc_curve(y_true = dataset_woe[dependent].to_numpy(), y_score = model.predict_proba(dataset_woe[independents].to_numpy())[:, 1]) #统计洛伦兹统计量 ks = max(tpr - fpr) print('基于选择后的特征变量构建逻辑回归模型,洛伦兹统计量(KS)为 %.2f 。(~0.2不建议使用,0.2~0.4模型区分能力较好,0.4~0.5良好,0.5~0.6很好,0.6~0.75非常好,0.75~ 区别能力存疑)' % ks) print() Pandas2chart(dataset = statistics.loc[:, ['independent', 'iv', 'vif', 'coefficient']].rename(columns = {'independent': '特征变量名', 'iv': '信息价值', 'vif': '方差扩大因子', 'coefficient': '回归系数'}), type = 'table', path = './reports/scorecard_report/statistics.html') print('4、编制评分卡') print('') print('4.1 基于构建后的逻辑回归模型编制评分卡...', end = '') dictionary = dictionary.loc[dictionary['independent'].isin(independents), ['independent', 'bin', 'woe']].reset_index(drop = True) #评分公式为S=A+BlnOdd。若优势率为1时,评分为500;若优势率为2时,评分减少50 #评分公式系数alpha alpha = 500 #评分公式系数beta beta = -50 / numpy.log(2) #统计基础分数(先将逻辑回归模型常数项按照评分公式分数化,再按照回归系数分摊至各特征变量) gamma = (alpha + beta * model.best_estimator_.intercept_[0]) / statistics['coefficient'].sum() #遍历特征变量 for independent in independents: coefficient = statistics.loc[statistics['independent'] == independent, 'coefficient'].iat[0] #统计特征变量的加权基础分数 dictionary.loc[dictionary['independent'] == independent, 'gamma'] = gamma * coefficient #统计特征变量的加权回归系数 dictionary.loc[dictionary['independent'] == independent, 'beta'] = beta * coefficient #先将回归常量按照回归系数分摊至各特征变量,再统计各箱分数 dictionary.loc[dictionary['independent'] == independent, 'score'] = dictionary.loc[dictionary['independent'] == independent, ['woe', 'gamma', 'beta']].apply(lambda x: round(x['gamma'] + x['beta'] * x['woe']), axis = 'columns') dataset_woe[independent].replace(dictionary.loc[dictionary['independent'] == independent, 'woe'].to_numpy(), dictionary.loc[dictionary['independent'] == independent, 'score'].to_numpy(), inplace = True) #统计总分数 dataset_woe['score'] = dataset_woe[independents].apply(lambda x: x.sum(), axis = 'columns') print('已完成') print() Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'woe', 'gamma', 'beta', 'score']].rename(columns = {'bin': '分箱', 'woe': '证据权重', 'gamma': '加权基础分数', 'beta': '加权回归系数', 'score': '分数'}), type = 'table', path = './reports/scorecard_report/dictionary_score.html') #总分数等距分箱 dataset_woe['bin'] = pandas.cut(x = dataset_woe['score'].to_numpy(), bins = [0, 350, 400, 450, 500, 550, 600, 650, 1000], right = False) #按照特征变量分组 score = dataset_woe.groupby(by = 'bin').agg( #阳性样本数 positives = (dependent, lambda x: (x == 1).sum()), #阴性样本数 negatives = (dependent, lambda x: (x == 0).sum()), #样本数 samples = (dependent, lambda x: x.count()) ) #重置索引 score.reset_index(inplace = True) #审批拒绝 score['threshold'] = score['bin'].apply(lambda x: '<{}'.format(x.right)) #统计各箱阳性样本数就各箱样本数占比 score['proportion'] = round(score['positives'] / score['samples'] * 100, 2) #统计各箱样本数就总样本数占比 score['proportion_sample'] = score['samples'] / score['samples'].sum() * 100 #累计求和各箱样本数就总样本数占比 score['accumulation_sample'] = round(score['proportion_sample'].cumsum(), 2) #累计求和各箱阳性样本数 score['accumulation_positives'] = score['positives'].cumsum() #累计求和各箱样本数 score['accumulation_samples'] = score['samples'].cumsum() #统计各箱累计求和阳性样本数就累计求和样本数占比 score['proportion_positives'] = round(score['accumulation_positives'] / score['accumulation_samples'] * 100, 2) #统计各箱阳性样本数就总阳性样本数占比 score['proportion_positive'] = score['positives'] / score['positives'].sum() * 100 #累计求和各箱阳性样本数就总阳性样本数占比 score['accumulation_positive'] = round(score['proportion_positive'].cumsum(), 2) #统计各箱阴样本数就总阴性样本数占比 score['proportion_negative'] = score['negatives'] / score['negatives'].sum() * 100 #累计求和各箱阴性样本数就总阴性样本数占比 score['accumulation_negative'] = round(score['proportion_negative'].cumsum(), 2) #统计各箱柯斯统计量 score['ks'] = round(abs(score['accumulation_positive'] - score['accumulation_negative']), 2) #统计评分卡柯斯统计量 ks = score['ks'].max() #统计各箱提升统计量 score['lift'] = round((score['accumulation_positive'] + 0.01) / (score['accumulation_sample'] + 0.01), 2) #统计评分卡提升统计量 lift = score['lift'].max() print('基于构建后的逻辑回归模型编制评分卡,柯斯统计量(KS)为 %.2f ,提升统计量(LIFT)为 %.2f 。' % (ks, lift)) print() Pandas2chart(dataset = score[['bin', 'ks', 'lift']].rename(columns = {'bin': '分箱', 'ks': '柯斯统计量', 'lift': '提升统计量'}), type = 'line+line', path = './reports/scorecard_report/model_evaluation.html') Pandas2chart(dataset = score[['bin', 'threshold', 'proportion', 'accumulation_sample', 'proportion_positives', 'accumulation_positive']].rename(columns = {'bin': '分箱', 'threshold': '拒绝规则', 'proportion': '分箱逾期率', 'accumulation_sample': '拒绝率', 'proportion_positives': '拒绝逾期率', 'accumulation_positive': '累计逾期率'}), type = 'table', path = './reports/scorecard_report/business_evaluation.html') print('4.2 生成评分卡规则文件并保存...', end = '') calculate = '''def Calculate(sample):\n\n\tscore = 0\n\n''' #遍历特征变量 for independent in independents: calculate = calculate + '\tmatch sample["{}"]:\n\n'.format(independent) subset = dictionary.loc[dictionary['independent'] == independent].reset_index(drop = True) #若倒数第一个分箱为缺失值则倒数第二个分箱开放右边界、倒数第一个分箱就缺失值赋分 if subset.loc[subset.index[-1], 'bin'] is numpy.nan: for index in subset.index: #正数第一个分箱 if index == subset.index[0]: calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score']) #倒数第二个分箱 elif index == subset.index[-2]: calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score']) #倒数第一个分箱 elif index == subset.index[-1]: calculate += '\t\tcase numpy.nan: score += {}\n\n'.format(subset.loc[index, 'score']) else: calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score']) else: for index in subset.index: #正数第一个分箱 if index == subset.index[0]: calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score']) #倒数第一个分箱 elif index == subset.index[-1]: calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score']) else: calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score']) calculate += '\treturn score' #将评分卡规则写入本地文件 with open('../utils/scorecrad_calculate.txt', 'w') as file: file.write(calculate) print('已完成') print() print('5、生成贷款申请评分卡报告...', end = '') #选择报告模版 template = Environment(loader = FileSystemLoader('./reports/scorecard_report/')).get_template('template.html') #渲染 scorecard_report = template.render( { #报告日期 'report_date': time.strftime('%y-%m-%d', time.localtime()), 'samples': samples, 'variables_independent': variables_independent, 'ks': ks, 'lift': lift } ) with open('./reports/scorecard_report/scorecard_report.html', 'w', encoding = 'utf8') as file: file.write(scorecard_report) print('已完成') print()