# -*- coding: utf-8 -*- ''' 自定义计算相关模块 ''' #加载模块 from distance import levenshtein import numpy import pandas from sklearn.cluster import KMeans import warnings warnings.simplefilter('ignore') ''' 函数说明:计算两个字符串之间的莱文斯坦距离比 参数说明: string1:字符串 string2:字符串 返回说明: 数据格式:整数 ''' def Levenshtein(string1, string2): return round((1 - levenshtein(string1, string2) / (len(string1) + len(string2))) * 100) ''' 函数说明:计算聚类紧支测度 参数说明: data:数据 clusters:簇数 返回说明: 数据格式:浮点数 ''' def Compactness(data, clusters): #训练K均值聚类模型 model = KMeans(n_clusters = clusters, n_init = 'auto').fit(X = data) return sum([numpy.linalg.norm(i - model.cluster_centers_[j]) for i, j in zip(data, model.labels_)]) ''' 函数说明:基于间隔统计量(GapStatistic)评估K均值聚类最优聚类簇数,返回最优聚类簇数 返回说明:最优聚类簇数:整数 ''' def OptimalClusters(data, sampling_size = 200, maximum_clusters = 9): #校验抽样次数数据类型 assert isinstance(sampling_size, int) and sampling_size >= 1, 'sampling_size data type must be int and greater than or equal to 1' #校验最大聚类簇数 assert isinstance(maximum_clusters, int) and maximum_clusters >= 2, 'maximum_clusters data type must be int and greater than or equal to 2' #校验数据集数据格式 assert isinstance(data, numpy.ndarray) and len(data.shape) == 1, 'data data format must be NumPy and dimensions equal to 1' #样本数 samples = data.shape[0] #抽样数据集 sampling_dataset = numpy.zeros((sampling_size, samples)) for sampling_number in range(sampling_size): #基于均匀分布随机抽样 sampling_dataset[sampling_number, :] = numpy.random.uniform(low = data.min(), high = data.max(), size = samples) #间隔统计量统计报告 statistics = pandas.DataFrame(data = [], columns = ['clusters', 'gap_statistic', 'adjusted_gap_statistic']) for clusters in range(2, maximum_clusters + 1): compactness = Compactness(data = data.reshape(-1, 1), clusters = clusters) #抽样数据集的紧支测度 sampling_compactness = [] for sampling_number in range(sampling_size): sampling_compactness.append(Compactness(data = sampling_dataset[sampling_number, :].reshape(-1, 1), clusters = clusters)) #计算间隔统计量 gap_statistic = numpy.mean(numpy.log(sampling_compactness)) - numpy.log(compactness) #校正间隔统计量 adjusted_gap_statistic = gap_statistic - numpy.sqrt((1 + sampling_size) / sampling_size) * numpy.std(numpy.log(sampling_compactness)) statistics.loc[len(statistics)] = [clusters, gap_statistic, adjusted_gap_statistic] #以校正间隔统计量最大值对应的聚类簇数作为最优聚类簇数 optimal_clusters = statistics.at[statistics['adjusted_gap_statistic'].idxmax(), 'clusters'] return int(optimal_clusters) ''' 类说明:整数自增 ''' class AutoIncrement: _counter = 0 def __init__(self): pass @classmethod def generate(cls): cls._counter += 1 return cls._counter incrementor = AutoIncrement() ''' 类说明:聚类树叶子节点,包括中间节点和叶子节点 ''' class Node: def __init__(self, independent_index = None, threshold = None, child_left = None, child_right = None, cluster_label = None): #自变量索引 self.independent_index = independent_index #划分阈值 self.threshold = threshold #左子节点(小于等于划分阈值的子数据集) self.child_left = child_left #右子节点(大于划分阈值的子数据集) self.child_right = child_right #叶子节点的聚类标签 self.cluster_label = cluster_label ''' 类说明:聚类树 主要算法说明: 步骤1:基于离散系数(Coefficient of Variation,CV)最大原则选择自变量 步骤2:根据所选的特征,基于组内方差和(Sum of Squares Within)最小原则将数据集划分两个子集 步骤3:就每个子集重复步骤1和步骤2直至满足停止条件 class ClusterTree: def __init__(self, minimum_samples_proportion = 0.05): #最小叶子节点的样本占比 self.minimum_samples_proportion = minimum_samples_proportion def fit(self, X): #构建根节点 self.root = self._build_tree(X = X, depth = 0) def _compute_sse(self, data_left, data_right): return numpy.std(a = data_left, ddof = 0) + numpy.std(a = data_right, ddof = 0) def _build_tree(self, X, depth): #统计样本数和自变量数 samples, independents = X.shape #计算最小叶子节点的样本数 minimum_samples = numpy.ceil(samples * self.minimum_samples_proportion) #若样本数小于最小叶子节点的样本数则返回叶子节点 if X.shape[0] < minimum_samples: return Node(cluster_label = incrementor.generate()) best_cv = -numpy.inf best_independent_index = None for independent_index in range(independents): data = X[:, independent_index] #计算离散系数(标准差使用无偏估计) cv = numpy.std(a = data, ddof = 0) / abs(numpy.mean(a = data)) #基于离散系数最大原则选择自变量 if cv > best_cv: best_independent_index = independent_index best_cv = cv data = X[:, best_independent_index] #统计最大离散系数自变量的唯一值 thresholds = numpy.unique(ar = data) best_sse = numpy.inf best_threshold = None for threshold in thresholds: #左子节点的样本索引 indices_left = numpy.where(data <= threshold)[0] #右子节点的样本索引 indices_right = numpy.where(data > threshold)[0] #若划分后样本数小于最小叶子节点的样本数则跳过 if len(indices_left) < minimum_samples or len(indices_right) < minimum_samples: continue sse = self._compute_sse(data_left = data[indices_left], data_right = data[indices_right]) if sse < best_sse: best_threshold = threshold best_sse = sse print(data) print(best_threshold) indices_left = numpy.where(data <= best_threshold)[0] indices_right = numpy.where(data > best_threshold)[0] #构建左子节点 child_left = self._build_tree(X = X[indices_left], depth = depth + 1) #构建右子节点 child_right = self._build_tree(X = X[indices_right], depth = depth + 1) return Node(independent_index = best_independent_index, threshold = best_threshold, child_left = child_left, child_right = child_right) def _traverse_tree(self, x, node): if node.cluster_label is not None: return node.cluster_label if x[node.independent_index] <= node.threshold: return self._traverse_tree(x, node.child_left) else: return self._traverse_tree(x, node.child_right) def predict(self, X): return numpy.array([self._traverse_tree(x, self.root) for x in X]) a = numpy.array([[0, 3], [10, 2], [2, 2.5], [9, 2.6], [1, 2.9], [0, 2.9], [1, 2.95], [0, 3.01], [9.5, 2.7], [10, 2.3]]) model = ClusterTree() model.fit(X = a) print(model.predict(X = a)) print('5.2 衍生变量...') print('') print('5.2.1 基于TPE算法对梯度提升决策树调参...') print('') 定义函数:目标函数 入参: parameters:梯度提升决策树参数,格式为字典 出参: statistic:统计量 def function_target(trial): #定义参数范围 #回归树数量 n_estimators = trial.suggest_int('n_estimators', 50, 200) #学习速率 learning_rate = trial.suggest_loguniform('learning_rate', 1e-3, 1e-1) #采样率 subsample = trial.suggest_float('subsample', 0.5, 0.8) #创建梯度提升决策树分类器,叶节点所需最小样本占比为0.2 gradient_boosting_trees = GradientBoostingClassifier(n_estimators = n_estimators, learning_rate = learning_rate, subsample = subsample, min_samples_leaf = 0.2) #基于交叉验证统计ROC曲线下方的面积并返回 return numpy.mean(cross_validate(estimator = gradient_boosting_trees, y = y, X = x, cv = KFold(n_splits = 5, shuffle = True), scoring = 'roc_auc')['test_score']) ''' ''' #过采样 x, y = RandomOverSampler().fit_resample(dataset[independent_variable_labels].to_numpy(), dataset[dependent_variable_label].to_numpy()) #创建OPTUNA学习器 optuna_study = optuna.create_study(study_name = 'GradientBoostingClassifier', sampler = optuna.samplers.TPESampler(n_startup_trials = 30, n_ei_candidates = 100), direction = 'maximize') #关闭学习过程 optuna.logging.set_verbosity(optuna.logging.ERROR) optuna_study.optimize(function_target, n_trials = 100) print('') print('训练后,梯度提升决策树最优ROC-AUC为 %.2f,最优参数为:' % optuna_study.best_trial.values[0], end = '') print(optuna_study.best_trial.params, end = '') print('。') print('') print('5.2.2 优化梯度提升决策树并基于叶子节点分裂路径二阶衍生变量...') print('') 定义函数:打印混淆矩阵 入参: y:因变量,格式为数组 predict_classification:预测分类,格式为数组 predict_probability:预测概率,格式为数组 出参: 打印混淆矩阵、正确率、阳性样本查准率、查全率和KS def print_confusion_matrix(y, predict_classification, predict_probability): #统计混淆矩阵 statistical_report = pandas.DataFrame(data = confusion_matrix(y, predict_classification, labels = [1, 0]), index = ['reality:positive', 'reality:negative'], columns = ['forecast:positive', 'forecast:negative']) #统计正确率 accuracy = (statistical_report.loc['reality:positive', 'forecast:positive'] + statistical_report.loc['reality:negative', 'forecast:negative']) / y.shape[0] * 100 #统计查准率 precision = statistical_report.loc['reality:positive', 'forecast:positive'] / (statistical_report.loc['reality:positive', 'forecast:positive'] + statistical_report.loc['reality:negative', 'forecast:positive']) * 100 #统计查全率 recall = statistical_report.loc['reality:positive', 'forecast:positive'] / (statistical_report.loc['reality:positive', 'forecast:positive'] + statistical_report.loc['reality:positive', 'forecast:negative']) * 100 statistical_report.reset_index(drop = False, inplace = True) #计算假阳性率和真阳性率 fpr, tpr, thresholds = roc_curve(y_true = y, y_score = predict_probability) #计算AUC roc_auc = auc(fpr, tpr) #计算KolmogorovSmirnov ks = max(tpr - fpr) Print_table(statistical_report) print('备注1:reality:positive为实际阳性,reality:negative为实际阴性,forecast:positive为预测阳性,forecast:negative为预测阴性') print('备注2:AUC为 %.2f。AUC说明 ~0.5:不建议使用;0.5~0.7:模型预测能力较低;0.7~0.85:一般;0.85~0.95:较高;0.95~ 预测能力完美' % roc_auc) print('备注3:正确率为 %.2f %%,阳性样本查准率为 %.2f %%、查全率为 %.2f %%' % (accuracy, precision, recall)) print('备注4:KolmogorovSmirnov为 %.2f。KolmogorovSmirnov说明 ~0.2:不建议使用;0.2~0.4:模型区别阳性和阴性样本能力较好;0.4~0.5:良好;0.5~0.6:很好;0.6~0.75:非常好;0.75~ 区别能力存疑' % ks) print('') #优化梯度提升决策树 gradient_boosting_trees = GradientBoostingClassifier(n_estimators = optuna_study.best_trial.params.get('n_estimators'), learning_rate = optuna_study.best_trial.params.get('learning_rate'), subsample = optuna_study.best_trial.params.get('subsample'), min_samples_leaf = 0.05) gradient_boosting_trees.fit(x, y) print('附表:梯度提升决策树混淆矩阵') print_confusion_matrix(y, gradient_boosting_trees.predict(x), gradient_boosting_trees.predict_proba(x)[:, 1]) #用于保存叶子节点分裂路径 leaf_node_path = [] #遍历回归树获取叶子节点分裂路径 for estimator in range(gradient_boosting_trees.n_estimators_): #获取回归树,格式为字符串 tree = export_graphviz(decision_tree = gradient_boosting_trees.estimators_[estimator, 0], feature_names = independent_variable_labels) #用于保存节点中自变量标签 node = {} #用于保存节点分裂路径 node_path = {} #用于保存分裂节点 split_node = [] #用于保存叶子节点下标 leaf_node_index = [] #遍历回归树 for section in tree.split('\n'): if '[label=' in section: #获取节点索引 node_index = int(section[: section.find('[') - 1]) if '[label="friedman_mse' not in section: #获取节点中自变量标签 node[node_index] = section[section.find('[label="') + 8: section.find('\\nfriedman_mse')] else: #获取叶子节点下标 leaf_node_index.append(node_index) #上一个节点到下一个节点 if '->' in section: #获取上一个节点下标 node_index_previous = int(section[: section.find('->') - 1]) #获取下一个节点下标 if '[' in section: node_index_next = int(section[section.find('->') + 3: section.find('[') - 1]) else: node_index_next = int(section[section.find('->') + 3: section.find(';') - 1]) #获取分裂左路径 if node_index_previous not in split_node: #获取分裂节点 split_node.append(node_index_previous) if node_index_previous == 0: #从根节点到下一个节点分裂路径 node_path[node_index_next] = node[node_index_previous] else: #从上一个节点到下一个节点分裂路径 node_path[node_index_next] = (node_path[node_index_previous] + ' & ' + node[node_index_previous]) #获取分裂右路径 else: if node_index_previous == 0: #从根节点到下一个节点分裂路径 node_path[node_index_next] = node[node_index_previous].replace('<=', '>') else: #从上一个节点到下一个节点分裂路径 node_path[node_index_next] = (node_path[node_index_previous] + ' & ' + node[node_index_previous].replace('<=', '>')) leaf_node_path += [value for key, value in node_path.items() if key in leaf_node_index] #仅保留二阶交叉衍生变量 leaf_node_path = [x for x in leaf_node_path if x.count(' & ') == 1] for key, value in {index + 1: value for index, value in enumerate(leaf_node_path)}.items(): section = 'dataset_derivation["{}:int"] = '.format(value.replace(':str', '').replace(':int', '').replace(':float', '')) for index, condition_section in enumerate(value.split(' & ')): if index == 0: section += ('((dataset["{}"]{})'.format(condition_section[: condition_section.find(' ')], condition_section[condition_section.find(' '): ])) else: section += (' & (dataset["{}"]{}))'.format(condition_section[: condition_section.find(' ')], condition_section[condition_section.find(' '): ])) section += '.astype(int)' #衍生变量 exec(section) derivation_report = pandas.DataFrame(data = [], columns = ['cross_indices', 'cross_amount', 'cross_strategies', 'coverage', 'iv']) for derivation_label in [x for x in dataset_derivation.columns.tolist() if ' & ' in x]: #获取交叉自变量下标并去重 cross_indices = list(set([[x.split(':')[0] for x in independent_variable_labels].index(derivation_label.split('&')[0].split(' ')[0]), [x.split(':')[0] for x in independent_variable_labels].index(derivation_label.split('&')[1].split(' ')[1])])) cross_indices.sort() #统计交叉自变量数量 cross_amount = len(cross_indices) encode_woe_report = Encode_to_woe(dataset_derivation, derivation_label, dependent_variable_label) encode_woe_report_notnull = encode_woe_report.loc[encode_woe_report['bin_label'].notnull()].sort_values(by = 'woe_positive', ascending = False) #统计阳性样本占比 coverage = sum(encode_woe_report_notnull.loc[0, ['sample_size_positive', 'sample_size_negative']].to_numpy()) / sample_size * 100 iv = encode_woe_report['iv_positive'].sum() derivation_report.loc[derivation_report.shape[0]] = {'cross_indices': str(cross_indices), 'cross_amount': cross_amount, 'cross_strategies': derivation_label, 'coverage': coverage, 'iv': iv} #仅保留交叉自变量数量等于2、信息价值大于0.02且信息价值最大的衍生变量 derivation_report = derivation_report.loc[(derivation_report['cross_amount'] == 2) & (derivation_report['iv'] >= 0.02)].sort_values(by = ['cross_indices', 'iv'], ascending = False).groupby(by = 'cross_indices', as_index = False).first() #获取衍生标签标签 derivation_labels = derivation_report['cross_strategies'] print('处理后,衍生变量数量为: %d 个,分别为:' % len(derivation_labels)) print('') for derivation_label in derivation_labels: coverage = derivation_report.loc[derivation_report['cross_strategies'] == derivation_label, 'coverage'] iv = derivation_report.loc[derivation_report['cross_strategies'] == derivation_label, 'iv'] print('衍生变量:%s,阳性样本占比为 %.2f %%,信息价值为 %.2f;' % (derivation_label.split(':')[0], coverage, iv)) print('') print('5.2.3 衍生变量证据权重编码并合并...') print('') for derivation_label in derivation_labels: encode_woe_report = Encode_to_woe(dataset_derivation, derivation_label, dependent_variable_label) encode_woe_report_notnull = encode_woe_report.loc[encode_woe_report['bin_label'].notnull()] dataset_derivation[derivation_label].replace(encode_woe_report_notnull['bin_label'].to_numpy(), encode_woe_report_notnull['woe_positive'].to_numpy(), inplace = True) encode_woe_report['independent_variable_label'] = derivation_label.split(':')[0] encode_woe_reports = pandas.concat([encode_woe_reports, encode_woe_report]) print(dataset[dependent_variable_label]) print(dataset_derivation[input_labels]) print(dataset_derivation[derivation_labels]) #仅保留因变量,证据权重且选择后的自变量和衍生变量 dataset = pandas.concat([dataset[dependent_variable_label], dataset_derivation[input_labels], dataset_derivation[derivation_labels]], axis = 'columns') #重新获取自变量标签 independent_variable_labels = dataset.columns.tolist()[1: ] print('已完成(如无特殊说明,自变量和衍生变量统称自变量),自变量数量为 %d 个。' % len(independent_variable_labels)) print('') print('') 定义函数:目标函数 入参: parameters:逻辑回归参数,格式为字典 出参: statistic:统计量 def function_target(trial): #定义参数范围 #正则惩罚系数的倒数 C = trial.suggest_loguniform('C', 1e-3, 1e3) #L1占比 l1_ratio = trial.suggest_float('l1_ratio', 0, 1) #创建逻辑回归 logistic_regression = LogisticRegression(solver = 'saga', penalty = 'elasticnet', l1_ratio = l1_ratio, C = C, max_iter = 10**3) #基于交叉验证统计ROC曲线下方的面积并返回 return numpy.mean(cross_validate(estimator = logistic_regression, y = y, X = x, cv = KFold(n_splits = 5, shuffle = True), scoring = 'roc_auc')['test_score']) ''' ''' output_labels = input_labels iteration = 1 while len(output_labels) != 0: statistical_report = pandas.DataFrame(data = [], columns = ['independent_variable_label']) statistical_report['independent_variable_label'] = input_labels x, y = RandomOverSampler().fit_resample(dataset[input_labels].to_numpy(), dataset[dependent_variable_label].to_numpy()) #创建OPTUNA学习器 optuna_study = optuna.create_study(study_name = 'GradientBoostingClassifier', sampler = optuna.samplers.TPESampler(n_startup_trials = 30, n_ei_candidates = 100), direction = 'maximize') #关闭学习过程 optuna.logging.set_verbosity(optuna.logging.ERROR) optuna_study.optimize(function_target, n_trials = 100) #优化逻辑回归 logistic_regression = LogisticRegression(solver = 'saga', penalty = 'elasticnet', l1_ratio = optuna_study.best_trial.params.get('l1_ratio'), C = optuna_study.best_trial.params.get('C'), max_iter = 10**3) logistic_regression.fit(x, y) #获取逻辑回归常数项 constant = logistic_regression.intercept_ #统计回归系数 statistical_report['regression_coefficient'] = logistic_regression.coef_[0, :] #统计方差膨胀因子 statistical_report['variance_inflation_factor'] = [variance_inflation_factor(dataset[input_labels].assign(constant = 1).to_numpy(), x) for x in range(len(input_labels) + 1)][: -1] #统计回归系数小于0.01或方差膨胀因子大于等于2.78(R=0.8)的入模自变量标签的入模自变量标签 output_labels = statistical_report.loc[(statistical_report['regression_coefficient'] < 0.01) | (statistical_report['variance_inflation_factor'] >= 2.78), 'independent_variable_label'].tolist() if len(output_labels) != 0: #逐步删除方差膨胀因子最大的入模自变量至所有入模自变量的回归系数大于0且方差膨胀因子小于等于2.78 output_label = statistical_report.loc[statistical_report['independent_variable_label'].isin(output_labels)].sort_values(by = 'variance_inflation_factor', ascending = False)['independent_variable_label'].tolist()[0] input_labels.remove(output_label) print('第 %d 次迭代:自变量 %s 的回归系数为 %.2f、方差膨胀因子为 %.2f且最大,不符合入模规则,删除并继续迭代;' % (iteration, output_label.split(':')[0], statistical_report.loc[statistical_report['independent_variable_label'] == output_label, 'regression_coefficient'].to_numpy()[0], statistical_report.loc[statistical_report['independent_variable_label'] == output_label, 'variance_inflation_factor'].to_numpy()[0])) print('') iteration += 1 else: #仅保留因变量,入模自变量 dataset = pandas.concat([dataset[dependent_variable_label],dataset[input_labels]], axis = 'columns') print('第 %d 次迭代:所有自变量的回归系数大于0且方差膨胀因子小于等于2.78,停止迭代。' % iteration) print('') '''