diff --git a/kano/main.py b/kano/main.py index f9af000..e59a207 100644 --- a/kano/main.py +++ b/kano/main.py @@ -13,7 +13,6 @@ import pandas from utils.pandas_extension import save_as_workbook print("1 打开并读取Excel文件...", end="") - try: dataset = pandas.read_excel(io="KANO模型客户调研问卷.xlsx", sheet_name="问卷结果") @@ -60,9 +59,13 @@ for column in range(columns): continue # 功能名称 - feature = re.search( - pattern="【(?P.*?)】", string=dataset.columns[column] - ).group("feature") + if not ( + match := re.search( + pattern="【(?P.*?)】", string=dataset.columns[column] + ) + ): + raise RuntimeError(f"功能名称未匹配到") + feature = match.group("feature") # 生成某功能的选项分布 distribution = pandas.crosstab( @@ -178,5 +181,4 @@ kano = ( ) save_as_workbook(worksheets=[("Sheet1", kano)], workbook_name="result.xlsx") - print("已完成") diff --git a/regions/main.py b/regions/main.py index ad31f63..8188fbc 100644 --- a/regions/main.py +++ b/regions/main.py @@ -1,5 +1,4 @@ # -*- coding: utf-8 -*- - """ 脚本说明: 根据行政区划数据就待转化数据集进行转化 @@ -7,186 +6,131 @@ 行政区划数据集来源于 https://lbsyun.baidu.com/faq/api?title=webapi/download """ -import re - -import json - -import numpy - -import pandas - -import time - import os - import sys -sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) +import numpy +import pandas +sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) from utils.pandas_extension import save_as_workbook -print('1、读取Excel并创建数据集...', end = '') - +print("1、读取Excel并创建数据集...", end="") try: - - #待转化数据集 - dataset = pandas.read_excel(io = 'dataset.xlsx', sheet_name = 'Sheet1') - - #行政区划数据集 - dataset_regions = pandas.read_excel(io = 'dataset.xlsx', sheet_name = 'Sheet2') - + # 待转化数据集 + dataset = pandas.read_excel(io="dataset.xlsx", sheet_name="Sheet1") + # 行政区划数据集 + dataset_regions = pandas.read_excel(io="dataset.xlsx", sheet_name="Sheet2") except: - - print('读取Excel或创建数据集发生异常,脚本终止') - print() - - exit() - -print('已完成') + print("读取Excel或创建数据集发生异常,脚本终止") + print() + exit() +print("已完成") print() -print('2、转化数据') +print("2、转化数据") +print() +print("2.1 基于行政区划数据集生成省级、地级和县级行政区字典", end="") +# 县级名称和县级编码 +regions = dataset_regions[ + ["省级名称", "省级编码", "地级名称", "地级编码", "县级名称", "县级编码"] +].drop_duplicates() +print("已完成") print() -print('2.1 基于行政区划数据集生成省级、地级和县级行政区字典', end = '') - -#县级名称和县级编码 -regions = dataset_regions[['省级名称', '省级编码', '地级名称', '地级编码', '县级名称', '县级编码']].drop_duplicates() - -print('已完成') -print() - -print('2.2 遍历并转化数据...', end = '') - -dataset.replace(to_replace = {numpy.nan: pandas.NA, None: pandas.NA, '': pandas.NA}, inplace = True) - +print("2.2 遍历并转化数据...", end="") +dataset.replace( + to_replace={numpy.nan: pandas.NA, None: pandas.NA, "": pandas.NA}, inplace=True +) for index, row in dataset.iterrows(): + province_name = row["省名称"] + if province_name is not pandas.NA: + try: + # 根据省名称匹配省级行政区字典并填充省区划编码 + row["省区划编码"] = str( + dataset_regions.loc[ + dataset_regions["省级名称"] == province_name, "省级编码" + ].iat[0] + ) + except: + row["省区划编码"] = "未查询到省区划编码" + row["与区划编码对比结果"] = "待确定" + # 省编码数据类型转为字符 + row["省编码"] = str(int(row["省编码"])) + else: + row["省区划编码"] = "省名称为空" + row["省编码"] = "省名称为空" + row["省名称"] = "省名称为空" + row["与区划编码对比结果"] = "待确定" + city_name = row["市名称"] + if row["区划类型"] != "省": + if city_name is not pandas.NA: + try: + # 根据市名称匹配地级行政区字典并填充市区划编码 + row["市区划编码"] = str( + dataset_regions.loc[ + dataset_regions["地级名称"] == city_name, "地级编码" + ].iat[0] + ) + except: + row["市区划编码"] = "未查询到市区划编码" + row["与区划编码对比结果"] = "待确定" + # 市编码数据类型转为字符 + row["市编码"] = str(int(row["市编码"])) + else: + row["市区划编码"] = "市名称为空" + row["市编码"] = "市名称为空" + row["市名称"] = "市名称为空" + row["与区划编码对比结果"] = "待确定" + else: + row["市区划编码"] = "" + row["市编码"] = "" + row["市名称"] = "" + region_name = row["区县名称"] + if row["区划类型"] == "区县": + if region_name is not pandas.NA: + try: + # 根据区县名称匹配县级行政区字典并填充区县区划编码 + row["区县区划编码"] = str( + regions.loc[regions["县级名称"] == region_name, "县级编码"].iat[0] + ) + if row["省名称"] == "省名称为空" or row["市名称"] == "市名称为空": + # 若省名称或市名称为空则补充说明 + row["与区划编码对比说明"] = "该区县所属{}/{}".format( + str( + regions.loc[ + regions["县级名称"] == region_name, "省级名称" + ].iat[0] + ), + str( + regions.loc[ + regions["县级名称"] == region_name, "地级名称" + ].iat[0] + ), + ) + except: + row["区县区划编码"] = "未查询到区县区划编码" + row["与区划编码对比结果"] = "待确定" + # 县编码数据类型转为字符 + row["区县编码"] = str(int(row["区县编码"])) + else: + row["区县区划编码"] = "区县名称为空" + row["区县编码"] = "区县名称为空" + row["区县名称"] = "区县名称为空" + row["与区划编码对比结果"] = "待确定" + else: + row["区县区划编码"] = "" + row["区县编码"] = "" + row["区县名称"] = "" + dataset.iloc[index] = row +dataset.fillna(value="", inplace=True) - province_name = row['省名称'] - - if province_name is not pandas.NA: - - try: - - #根据省名称匹配省级行政区字典并填充省区划编码 - row['省区划编码'] = str(dataset_regions.loc[dataset_regions['省级名称'] == province_name, '省级编码'].iat[0]) - - except: - - row['省区划编码'] = '未查询到省区划编码' - - row['与区划编码对比结果'] = '待确定' - - #省编码数据类型转为字符 - row['省编码'] = str(int(row['省编码'])) - - else: - - row['省区划编码'] = '省名称为空' - - row['省编码'] = '省名称为空' - - row['省名称'] = '省名称为空' - - row['与区划编码对比结果'] = '待确定' - - city_name = row['市名称'] - - if row['区划类型'] != '省': - - if city_name is not pandas.NA: - - try: - - #根据市名称匹配地级行政区字典并填充市区划编码 - row['市区划编码'] = str(dataset_regions.loc[dataset_regions['地级名称'] == city_name, '地级编码'].iat[0]) - - except: - - row['市区划编码'] = '未查询到市区划编码' - - row['与区划编码对比结果'] = '待确定' - - #市编码数据类型转为字符 - row['市编码'] = str(int(row['市编码'])) - - else: - - row['市区划编码'] = '市名称为空' - - row['市编码'] = '市名称为空' - - row['市名称'] = '市名称为空' - - row['与区划编码对比结果'] = '待确定' - - else: - - row['市区划编码'] = '' - - row['市编码'] = '' - - row['市名称'] = '' - - region_name = row['区县名称'] - - if row['区划类型'] == '区县': - - if region_name is not pandas.NA: - - try: - - #根据区县名称匹配县级行政区字典并填充区县区划编码 - row['区县区划编码'] = str(regions.loc[regions['县级名称'] == region_name, '县级编码'].iat[0]) - - if row['省名称'] == '省名称为空' or row['市名称'] == '市名称为空': - - #若省名称或市名称为空则补充说明 - row['与区划编码对比说明'] = '该区县所属{}/{}'.format(str(regions.loc[regions['县级名称'] == region_name, '省级名称'].iat[0]), str(regions.loc[regions['县级名称'] == region_name, '地级名称'].iat[0])) - - except: - - row['区县区划编码'] = '未查询到区县区划编码' - - row['与区划编码对比结果'] = '待确定' - - #县编码数据类型转为字符 - row['区县编码'] = str(int(row['区县编码'])) - - else: - - row['区县区划编码'] = '区县名称为空' - - row['区县编码'] = '区县名称为空' - - row['区县名称'] = '区县名称为空' - - row['与区划编码对比结果'] = '待确定' - - else: - - row['区县区划编码'] = '' - - row['区县编码'] = '' - - row['区县名称'] = '' - - dataset.iloc[index] = row - -dataset.fillna(value = '', inplace = True) - -print('已完成') +print("已完成") print() -print('正在保存为EXCEL...', end = '') +print("正在保存为EXCEL...", end="") -SaveAsExcel(worksheets = [('Sheet1', dataset)], save_path = 'results.xlsx') +save_as_workbook(workbook_name="results.xlsx", worksheets=[("Sheet1", dataset)]) -print('已完成') +print("已完成") print() - -''' - -修改记录 - -''' \ No newline at end of file diff --git a/rfm/main.py b/rfm/main.py index 91dac56..14c3625 100644 --- a/rfm/main.py +++ b/rfm/main.py @@ -1,22 +1,25 @@ # -*- coding: utf-8 -*- - - """ 基于RFM模型生成数据分析报告 """ # 导入模块 -import statistics from datetime import datetime from decimal import Decimal, ROUND_HALF_UP +from pathlib import Path +import statistics +import sys -import pandas from jinja2 import Environment, FileSystemLoader +import pandas from utils.mysql import MySQLClient from utils.pandas_extension import DrawAsHTML +sys.path.append(Path(__file__).parent.parent.as_posix()) + + # 函数说明:根据RFM编码映射为客户分类 def map_classification(r_encoded, f_encoded, m_encoded): diff --git a/utils/pandas_extension.py b/utils/pandas_extension.py index a82dec3..db24fc6 100644 --- a/utils/pandas_extension.py +++ b/utils/pandas_extension.py @@ -10,25 +10,16 @@ from pathlib import Path -import pandas - -from prettytable import PrettyTable - -from openpyxl import Workbook - -from openpyxl.styles import Font, Alignment, Border, Side, PatternFill - -from openpyxl.utils import get_column_letter - -from pyecharts import __file__ as default_path, options - -from pyecharts.components import Table - -from pyecharts.charts import Scatter, Pie, Bar, Line, HeatMap - -from pyecharts.globals import ThemeType, CurrentConfig - from jinja2 import Environment, FileSystemLoader +from openpyxl import Workbook +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side +from openpyxl.utils import get_column_letter +import pandas +from prettytable import PrettyTable +from pyecharts import __file__ as default_path, options +from pyecharts.charts import Bar, Line, Pie, Scatter +from pyecharts.components import Table +from pyecharts.globals import CurrentConfig, ThemeType def open_csv(file_name): @@ -101,61 +92,46 @@ def save_as_workbook(worksheets, workbook_name): # 创建工作簿 workbook = Workbook() - # 删除默认创建的工作表 for worksheet_name in workbook.sheetnames: - workbook.remove(workbook[worksheet_name]) for worksheet_name, worksheet_dataframe in worksheets: - # 若工作表数据体为空则跳过 if not worksheet_dataframe.empty: - # 创建工作表 worksheet = workbook.create_sheet(title=worksheet_name) - # 若为多层行索引则重置行索引(单层行索引则跳过) if isinstance(worksheet_dataframe.index, pandas.MultiIndex): - worksheet_dataframe.reset_index(inplace=True) # 遍历列索引层 for level in range(worksheet_dataframe.columns.nlevels): - # 添加列名行 worksheet.append( worksheet_dataframe.columns.get_level_values(level).tolist() ) # 工作表一般包括标题行、列名行和数据行,在这里仅考虑列名行和数据行 - # 初始化合并单元格开始列号 merge_start_column = 0 - # 初始化上一个单元格值 previous_cell_value = worksheet.cell( row=level + 1, column=merge_start_column + 1 ).value - for column in range( len(worksheet_dataframe.columns.get_level_values(level)) ): - # 单元格值 value = worksheet.cell( row=level + 1, column=column + 1 ).value # 工作表数据体为PANDAS.DATAFRAME,行号和列号从0开始,工作表为OPENPYXL,行号和列号从1开始 - if value != previous_cell_value: - # 合并单元格结束列号 merge_end_column = column - 1 - # 判断合并单元格结束列号和合并单元格开始列号的差值是否大于0 if ( previous_cell_value and merge_end_column - merge_start_column > 0 ): - # 合并单元格(同行合并) worksheet.merge_cells( start_row=level + 1, @@ -163,26 +139,20 @@ def save_as_workbook(worksheets, workbook_name): start_column=merge_start_column + 1, end_column=merge_end_column + 1, ) - # 重新赋值合并单元格开始列号 merge_start_column = column - # 重新赋值上一单元格值 previous_cell_value = value - # 若单元格值和上一个单元格值相同,若列号为最大值则合并单元格 else: - # 判断列号是否为最大值 if ( column == len(worksheet_dataframe.columns.get_level_values(level)) - 1 ): - # 重新赋值合并单元格结束列号 merge_end_column = column - # 合并单元格(同行合并) worksheet.merge_cells( start_row=level + 1, @@ -193,7 +163,6 @@ def save_as_workbook(worksheets, workbook_name): # 若单元格值为空则同列合并 if not value: - # 合并单元格(同列合并) worksheet.merge_cells( start_row=level, @@ -204,27 +173,21 @@ def save_as_workbook(worksheets, workbook_name): # 工作表列宽:24磅 for column in range(worksheet_dataframe.shape[1]): - worksheet.column_dimensions[get_column_letter(column + 1)].width = 24 # 列名行行高:24磅 for level in range(worksheet_dataframe.columns.nlevels): - worksheet.row_dimensions[level + 1].height = 24 # 列名行单元格样式 for column in range( len(worksheet_dataframe.columns.get_level_values(level)) ): - cell = worksheet.cell(level + 1, column + 1) - # 字体 cell.font = Font(bold=True, size=12, name="Arial", color="00FFFFFF") - # 对齐方式 cell.alignment = Alignment(horizontal="center", vertical="center") - # 边框 cell.border = Border( left=Side(style="thin", color="00333333"), @@ -232,32 +195,24 @@ def save_as_workbook(worksheets, workbook_name): top=Side(style="thin", color="00333333"), bottom=Side(style="thin", color="00333333"), ) - # 填充 cell.fill = PatternFill(fill_type="solid", start_color="003366FF") for row, row_data in worksheet_dataframe.iterrows(): - # 数据行行高:20磅 worksheet.row_dimensions[ row + worksheet_dataframe.columns.nlevels + 1 ].height = 20 - for column in range(worksheet_dataframe.shape[1]): - cell = worksheet.cell( row + worksheet_dataframe.columns.nlevels + 1, column + 1 ) - # 单元格值 cell.value = row_data.iloc[column] - # 字体 cell.font = Font(size=12, name="Arial", color="00333333") - # 对齐方式 cell.alignment = Alignment(horizontal="center", vertical="center") - cell.border = Border( left=Side(style="thin", color="00333333"), right=Side(style="thin", color="00333333"), diff --git a/神经网络/main.py b/神经网络/main.py new file mode 100644 index 0000000..e0b299d --- /dev/null +++ b/神经网络/main.py @@ -0,0 +1,60 @@ +# -*- coding: utf-8 -*- +""" +神经网络 +""" + +# 导入模块 +from typing import List +import numpy + + +class NeuralNetwork: + """ + 神经网络 + """ + + # 激活函数和其导数函数 + FUNCTIONS = { + "relu": { + "activate": lambda x: numpy.maximum(0, x), + "derivative": lambda x: numpy.where(x > 0, 1, 0), + }, + "linear": { + "activate": lambda x: x, + "derivative": lambda x: numpy.ones_like(x), + }, # 适合回归任务的输出层 + "softmax": { + "activate": lambda x: numpy.exp(x) / numpy.sum(numpy.exp(x), axis=1), + "derivative": lambda x: x * (1 - x), + }, # 适合分类任务的输出层 + } + + def __init__( + self, + hidden_layer_neurons: List[int] = [10], + hidden_layer_function: str = "relu", + output_layer_function: str = "softmax", + ): + """ + 初始化 + :param hidden_layer_neurons: 隐含层神经元数量 + :param hidden_layer_function: 隐含层函数 + :param output_layer_function: 输出层函数 + """ + # 检查函数是否存在 + if not ( + hidden_layer_function in self.FUNCTIONS + and output_layer_function in self.FUNCTIONS + ): + raise RuntimeError("所输入的隐含层或输出层函数未定义") + + # 初始化隐含层的激活函数和导数函数 + self.hidden_layer_activate, self.hidden_layer_derivative = ( + self.FUNCTIONS[hidden_layer_function]["activate"], + self.FUNCTIONS[hidden_layer_function]["derivative"], + ) + # 初始化输出层的激活函数和导数函数 + self.output_layer_activate, self.output_layer_derivative = ( + self.FUNCTIONS[output_layer_function]["activate"], + self.FUNCTIONS[output_layer_function]["derivative"], + )