Python/评分卡/main.py

557 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
print('''脚本说明:
基于GiveMeSomeCredit数据集构建贷款申请评分卡并生成建模报告
''')
#导入包
import pandas
import numpy
from sklearn.tree import DecisionTreeClassifier
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve
from jinja2 import Environment, FileSystemLoader
import time
import warnings
#忽略警告
warnings.simplefilter('ignore')
import sys
sys.path.append('..')
from utils.mysql import MySQL
from utils.pandas2chart import Pandas2chart
#本脚本中调用函数
'''
函数说明:基于决策树最优分箱并证据权重编码
参数说明:
数据集格式为Pandas.DataFrame第一列为目标变量其它为特征变量
返回说明:
数据格式Pandas.DataFrame
'''
def OptimalEncodeByWOE(dataset):
dataset_woe = dataset.copy()
#目标变量名
dependent = dataset_woe.columns[0]
#特征变量名
independents = dataset_woe.columns[1: ]
#字典,用于记录特征变量各箱证据权重
dictionary = pandas.DataFrame()
#遍历特征变量
for independent in independents:
print('正在就特征变量 %s 基于决策树最优分箱并证据权重编码...' % independent, end = '')
#按照特征变量是否包含缺失值划分目标变量
y = dataset_woe.loc[dataset_woe[independent].notna(), dependent]
#按照是否包含缺失值划分特征变量
x = dataset_woe.loc[dataset_woe[independent].notna(), independent]
#遍历分箱数取特征变量值数和5的最小值作为最大分箱数2作为最小分箱数
for bins in range(min(len(x.value_counts()), 5), 1, -1):
#创建决策树分类器每箱最小样本数占比为5%
decision_tree = DecisionTreeClassifier(max_leaf_nodes = bins, min_samples_leaf = 0.05, class_weight = 'balanced').fit(x.to_numpy().reshape(-1, 1), y.to_numpy())
#切点
tangencies = []
#遍历节点
for i in range(decision_tree.tree_.node_count) :
#若决策树某节点的左子节点和右子节点不同,则将该节点作为切点
if decision_tree.tree_.children_left[i] != decision_tree.tree_.children_right[i] :
tangencies.append(decision_tree.tree_.threshold[i])
tangencies.sort()
#添加边界点
tangencies = [x.min() - 0.01] + tangencies + [x.max() + 0.01]
#特征变量分箱
dataset_woe.loc[dataset_woe[independent].notna(), independent] = pandas.cut(x = x.to_numpy(), bins = tangencies, right = False)
#按照特征变量分组
woe = dataset_woe.loc[dataset_woe[independent].notna(), [dependent, independent]].groupby(by = independent)[dependent].agg(func = [('positives', lambda x : (x == 1).sum()), ('negatives', lambda x : (x == 0).sum())])
#重置索引
woe.reset_index(inplace = True)
woe.rename(columns = {independent: 'bin'}, inplace = True)
#若特征变量包含缺失值,则将缺失值单独作为一箱
if len(dataset_woe.loc[dataset_woe[independent].isna()]) > 0:
#统计特征变量包含缺失值样本中阳性样本数和阴性样本数
woe.loc[len(woe)] = {'bin': numpy.nan, 'positives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 1), independent]), 'negatives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 0), independent])}
#统计样本数
woe['samples'] = woe.apply(lambda x: x['positives'] + x['negatives'], axis = 'columns')
#统计阳性样本数占比
woe['proportion_positive'] = round(woe['positives'] / woe['positives'].sum(), 2)
#统计阴性样本数占比
woe['proportion_negative'] = round(woe['negatives'] / woe['negatives'].sum(), 2)
#统计证据权重
woe['woe'] = round(numpy.log((woe['proportion_positive'] + 0.01) / (woe['proportion_negative'] + 0.01)), 2)
#统计信息价值
woe['iv'] = round((woe['proportion_positive'] - woe['proportion_negative']) * woe['woe'], 2)
#按照分箱是否包含缺失值划分
woe_notna = woe.loc[woe['bin'].notna()].reset_index(drop = True)
#单调性检验
monotonicity = [((woe_notna.loc[i, 'woe'] <= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] <= woe_notna.loc[i, 'woe'])) | ((woe_notna.loc[i, 'woe'] >= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] >= woe_notna.loc[i, 'woe'])) for i in range(1, woe_notna.shape[0] - 1)]
#若通过单调性检验,则将特征变量证据权重编码
if False not in monotonicity:
dataset_woe[independent].replace(woe['bin'].to_numpy(), woe['woe'].to_numpy(), inplace = True)
woe['independent'] = independent
dictionary = pandas.concat([dictionary, woe])
print('已完成')
print()
break
return dataset_woe, dictionary
#若本脚本被调用报错
if __name__ != '__main__':
print('本脚本不允许被调用')
print()
exit()
print('1、连接数据库查表并保存至数据集...', end = '')
dataset = MySQL(database = 'data_analysis').query('select * from credit_dataset')
if isinstance(dataset, str):
print('连接失败,请检查数据库连接是否正常')
print()
exit()
print('已完成')
print()
#目标变量名,第一列即为目标变量
dependent = dataset.columns[0]
#检查目标变量值是否为0或1
if not ((dataset[dependent] == 0) | (dataset[dependent] == 1)).all():
print('第一列应为目标变量且值应为0或1脚本终止')
print()
exit()
#统计样本数
samples = dataset.shape[0]
#特征变量名
independents = dataset.columns[1: ]
#统计特征变量数
variables_independent = len(independents)
print('数据集样本数为 %d 份,特征变量数为 %d 个。' % (samples, variables_independent))
print()
#考虑变量数较多转置并重命名
Pandas2chart(dataset = dataset.loc[1:4, :].T.reset_index().rename(columns = {'index': '变量名', 1: '样本1', 2: '样本2', 3: '样本3', 4: '样本4'}), type = 'table', path = './reports/scorecard_report/dataset_preview.html')
print('2、预处理')
print()
print('2.1 清洗数据...', end = '')
#删除目标变量包含缺失值的样本
dataset.dropna(subset = dependent, inplace = True)
#删除重复样本(仅保留第一份)
dataset.drop_duplicates(inplace = True)
print('已完成')
print()
#统计样本数
samples = dataset.shape[0]
print('处理后,数据集样本数为 %d 份。' % samples)
print()
print('2.2 处理缺失值...', end = '')
print('在特征变量证据权重编码时,将对缺失值单独作为一箱,本节点略过')
print()
print('2.3 处理异常值...', end = '')
print('在特征变量证据权重编码时,可以消除异常值的影响,本节点略过')
print()
print('2.4 特征变量最优分箱并证据权重编码')
print()
dataset_woe, dictionary = OptimalEncodeByWOE(dataset)
Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'samples', 'woe']].rename(columns = {'bin': '分箱', 'samples': '样本数', 'woe': '证据权重'}), type = 'bar+line', path = './reports/scorecard_report/dictionary.html')
print('3、选择特征变量')
print('')
#统计报告
statistics = pandas.DataFrame(data = independents, columns = ['independent'])
print('3.1 基于信息价值选择特征变量...', end = '')
#变量特征变量
for independent in independents:
#统计特征变量信息价值
statistics.loc[statistics['independent'] == independent, 'iv'] = dictionary.loc[dictionary['independent'] == independent, 'iv'].sum()
#选择信息价值大于等于阈值的特征变量0.1为低水平预测能力0.3为中水平预测能力本次建模选择0.1作为阈值)
statistics = statistics.loc[statistics['iv'] >= 0.1]
independents = statistics['independent'].tolist()
print('已完成')
print()
#统计特征变量数
variables_independent = len(independents)
print('处理后,特征变量数为 %d 个。' % variables_independent)
print()
print('3.2 基于后向淘汰条件选择特征变量(基于归回系数和方差扩大因子)')
print()
parameters = {
#测试
'l1_ratio': [0.5],
#'l1_ratio': [0, 0.25, 0.5, 0.75, 1],
#测试
'C': [1.1]
#'C': [0.001, 0.01, 0.1, 1.1, 10.1, 100.1, 1000.1]
}
#创建带交叉验证的参数网格搜索模型
model = GridSearchCV(estimator = LogisticRegression(solver = 'saga', penalty = 'elasticnet', class_weight = 'balanced'), param_grid = parameters, scoring = 'roc_auc', refit = True)
while True:
model.fit(dataset_woe[independents].to_numpy(), dataset_woe[dependent].to_numpy())
#统计回归系数
statistics['coefficient'] = model.best_estimator_.coef_[0, :]
#统计方差扩大因子
statistics['vif'] = [variance_inflation_factor(dataset_woe[independents].assign(constant = 1).to_numpy(), i) for i in range(len(independents) + 1)][: -1]
#按照方差扩大因子降序排序
statistics.sort_values(by = 'vif', ascending = False, inplace = True)
independents = statistics['independent'].tolist()
#统计回归系数大于等于0.1且方差扩大因子小于等于10的特征变量
statistics = statistics.loc[(statistics['coefficient'] >= 0.1) & (statistics['vif'] <= 10)]
#淘汰特征变量
obsolescence = [independent for independent in independents if independent not in statistics['independent'].tolist()]
if obsolescence != []:
#淘汰最大方差扩大因子的特征变量
independents.remove(obsolescence[0])
print('特征变量 %s 满足淘汰条件,继续后进' % obsolescence[0])
print('')
else:
print('所有特征变量不满足淘汰条件,停止后进')
print('')
break
#统计特征变量数
variables_independent = len(independents)
print('处理后,特征变量数为 %d 个。' % variables_independent)
print()
#统计假阳率和真阳率
fpr, tpr, thresholds = roc_curve(y_true = dataset_woe[dependent].to_numpy(), y_score = model.predict_proba(dataset_woe[independents].to_numpy())[:, 1])
#统计洛伦兹统计量
ks = max(tpr - fpr)
print('基于选择后的特征变量构建逻辑回归模型洛伦兹统计量KS%.2f 。(~0.2不建议使用0.2~0.4模型区分能力较好0.4~0.5良好0.5~0.6很好0.6~0.75非常好0.75~ 区别能力存疑)' % ks)
print()
Pandas2chart(dataset = statistics.loc[:, ['independent', 'iv', 'vif', 'coefficient']].rename(columns = {'independent': '特征变量名', 'iv': '信息价值', 'vif': '方差扩大因子', 'coefficient': '回归系数'}), type = 'table', path = './reports/scorecard_report/statistics.html')
print('4、编制评分卡')
print('')
print('4.1 基于构建后的逻辑回归模型编制评分卡...', end = '')
dictionary = dictionary.loc[dictionary['independent'].isin(independents), ['independent', 'bin', 'woe']].reset_index(drop = True)
#评分公式为S=A+BlnOdd。若优势率为1时评分为500若优势率为2时评分减少50
#评分公式系数alpha
alpha = 500
#评分公式系数beta
beta = -50 / numpy.log(2)
#统计基础分数(先将逻辑回归模型常数项按照评分公式分数化,再按照回归系数分摊至各特征变量)
gamma = (alpha + beta * model.best_estimator_.intercept_[0]) / statistics['coefficient'].sum()
#遍历特征变量
for independent in independents:
coefficient = statistics.loc[statistics['independent'] == independent, 'coefficient'].iat[0]
#统计特征变量的加权基础分数
dictionary.loc[dictionary['independent'] == independent, 'gamma'] = gamma * coefficient
#统计特征变量的加权回归系数
dictionary.loc[dictionary['independent'] == independent, 'beta'] = beta * coefficient
#先将回归常量按照回归系数分摊至各特征变量,再统计各箱分数
dictionary.loc[dictionary['independent'] == independent, 'score'] = dictionary.loc[dictionary['independent'] == independent, ['woe', 'gamma', 'beta']].apply(lambda x: round(x['gamma'] + x['beta'] * x['woe']), axis = 'columns')
dataset_woe[independent].replace(dictionary.loc[dictionary['independent'] == independent, 'woe'].to_numpy(), dictionary.loc[dictionary['independent'] == independent, 'score'].to_numpy(), inplace = True)
#统计总分数
dataset_woe['score'] = dataset_woe[independents].apply(lambda x: x.sum(), axis = 'columns')
print('已完成')
print()
Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'woe', 'gamma', 'beta', 'score']].rename(columns = {'bin': '分箱', 'woe': '证据权重', 'gamma': '加权基础分数', 'beta': '加权回归系数', 'score': '分数'}), type = 'table', path = './reports/scorecard_report/dictionary_score.html')
#总分数等距分箱
dataset_woe['bin'] = pandas.cut(x = dataset_woe['score'].to_numpy(), bins = [0, 350, 400, 450, 500, 550, 600, 650, 1000], right = False)
#按照特征变量分组
score = dataset_woe.groupby(by = 'bin').agg(
#阳性样本数
positives = (dependent, lambda x: (x == 1).sum()),
#阴性样本数
negatives = (dependent, lambda x: (x == 0).sum()),
#样本数
samples = (dependent, lambda x: x.count())
)
#重置索引
score.reset_index(inplace = True)
#审批拒绝
score['threshold'] = score['bin'].apply(lambda x: '<{}'.format(x.right))
#统计各箱阳性样本数就各箱样本数占比
score['proportion'] = round(score['positives'] / score['samples'] * 100, 2)
#统计各箱样本数就总样本数占比
score['proportion_sample'] = score['samples'] / score['samples'].sum() * 100
#累计求和各箱样本数就总样本数占比
score['accumulation_sample'] = round(score['proportion_sample'].cumsum(), 2)
#累计求和各箱阳性样本数
score['accumulation_positives'] = score['positives'].cumsum()
#累计求和各箱样本数
score['accumulation_samples'] = score['samples'].cumsum()
#统计各箱累计求和阳性样本数就累计求和样本数占比
score['proportion_positives'] = round(score['accumulation_positives'] / score['accumulation_samples'] * 100, 2)
#统计各箱阳性样本数就总阳性样本数占比
score['proportion_positive'] = score['positives'] / score['positives'].sum() * 100
#累计求和各箱阳性样本数就总阳性样本数占比
score['accumulation_positive'] = round(score['proportion_positive'].cumsum(), 2)
#统计各箱阴样本数就总阴性样本数占比
score['proportion_negative'] = score['negatives'] / score['negatives'].sum() * 100
#累计求和各箱阴性样本数就总阴性样本数占比
score['accumulation_negative'] = round(score['proportion_negative'].cumsum(), 2)
#统计各箱柯斯统计量
score['ks'] = round(abs(score['accumulation_positive'] - score['accumulation_negative']), 2)
#统计评分卡柯斯统计量
ks = score['ks'].max()
#统计各箱提升统计量
score['lift'] = round((score['accumulation_positive'] + 0.01) / (score['accumulation_sample'] + 0.01), 2)
#统计评分卡提升统计量
lift = score['lift'].max()
print('基于构建后的逻辑回归模型编制评分卡柯斯统计量KS%.2f 提升统计量LIFT%.2f' % (ks, lift))
print()
Pandas2chart(dataset = score[['bin', 'ks', 'lift']].rename(columns = {'bin': '分箱', 'ks': '柯斯统计量', 'lift': '提升统计量'}), type = 'line+line', path = './reports/scorecard_report/model_evaluation.html')
Pandas2chart(dataset = score[['bin', 'threshold', 'proportion', 'accumulation_sample', 'proportion_positives', 'accumulation_positive']].rename(columns = {'bin': '分箱', 'threshold': '拒绝规则', 'proportion': '分箱逾期率', 'accumulation_sample': '拒绝率', 'proportion_positives': '拒绝逾期率', 'accumulation_positive': '累计逾期率'}), type = 'table', path = './reports/scorecard_report/business_evaluation.html')
print('4.2 生成评分卡规则文件并保存...', end = '')
calculate = '''def Calculate(sample):\n\n\tscore = 0\n\n'''
#遍历特征变量
for independent in independents:
calculate = calculate + '\tmatch sample["{}"]:\n\n'.format(independent)
subset = dictionary.loc[dictionary['independent'] == independent].reset_index(drop = True)
#若倒数第一个分箱为缺失值则倒数第二个分箱开放右边界、倒数第一个分箱就缺失值赋分
if subset.loc[subset.index[-1], 'bin'] is numpy.nan:
for index in subset.index:
#正数第一个分箱
if index == subset.index[0]:
calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score'])
#倒数第二个分箱
elif index == subset.index[-2]:
calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
#倒数第一个分箱
elif index == subset.index[-1]:
calculate += '\t\tcase numpy.nan: score += {}\n\n'.format(subset.loc[index, 'score'])
else:
calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
else:
for index in subset.index:
#正数第一个分箱
if index == subset.index[0]:
calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score'])
#倒数第一个分箱
elif index == subset.index[-1]:
calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
else:
calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
calculate += '\treturn score'
#将评分卡规则写入本地文件
with open('../utils/scorecrad_calculate.txt', 'w') as file:
file.write(calculate)
print('已完成')
print()
print('5、生成贷款申请评分卡报告...', end = '')
#选择报告模版
template = Environment(loader = FileSystemLoader('./reports/scorecard_report/')).get_template('template.html')
#渲染
scorecard_report = template.render(
{
#报告日期
'report_date': time.strftime('%y-%m-%d', time.localtime()),
'samples': samples,
'variables_independent': variables_independent,
'ks': ks,
'lift': lift
}
)
with open('./reports/scorecard_report/scorecard_report.html', 'w', encoding = 'utf8') as file:
file.write(scorecard_report)
print('已完成')
print()