添加目录和文件

This commit is contained in:
marslbr 2025-10-29 12:44:12 +08:00
parent d18d165bf0
commit 7ffae258a9
9 changed files with 3502 additions and 0 deletions

View File

@ -0,0 +1,787 @@
# -*- coding: utf-8 -*-
"""
普康健康_发票查验
"""
# 加载模块
import hashlib
import json
import shutil
import uuid
from base64 import b64decode, b64encode
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
import cv2
import numpy
import pandas
from utils.client import Authenticator, HTTPClient, RequestException, restrict
from utils.pandas_extension import open_csv, save_as_workbook, traverse_directory
# 影像件压缩
def image_compression(
image_path: str | None = None,
image_format: str | None = None,
image_data: bytes | None = None, # 数据类型为包含图像文件的二进制数据的字节串
image_size_specified: int = 2, # 指定影像件大小
raw: bool = False, # 250804新增返回是否为完整URI数据格式
) -> str | None:
try:
# 若影像件路径数据类型为STR则创建路径对象
if isinstance(image_path, str):
image_path = Path(image_path)
# 影像件文件名称后缀
image_format = image_path.suffix.strip().lstrip(".").lower()
# 读取影像件数据
with open(image_path, "rb") as image:
image_data = image.read()
# 影像件数据BASE64编码
image_data_base64 = b64encode(image_data).decode("utf-8")
# 指定影像件大小的单位由MB转为KB
image_size_specified = image_size_specified * 1024 * 1024
# 若影像件大小小于指定影像件大小则返回BASE64编码后影像件数据
if len(image_data_base64) < image_size_specified:
if raw:
# 返回非完整URI数据格式
return image_data_base64
else:
# 返回完整URI数据格式
return f"data:image/{image_format};base64,{image_data_base64}"
# OPENCV解码数据类型为NUMPY-UINT8
image_data_cv2 = cv2.imdecode(
numpy.frombuffer(image_data, numpy.uint8), cv2.IMREAD_COLOR
)
# 若OPENCV解码失败则抛出异常
if image_data_cv2 is None:
raise RuntimeError(f"OPENCV解码发生异常")
# 初始化近似BASE64编码后影像件数据
proximate_image_data_base64 = None
# 初始化最小压缩前后影像件大小差值
min_image_size_difference = float("inf")
# 基于双层压缩方法:先外层降低图像质量,再内层缩小图像尺寸
for quality in range(90, 0, -10):
image_data_cv2_ = image_data_cv2.copy()
# 根据影像件格式匹配图片质量配置
# noinspection PyUnreachableCode
match image_format:
case "png":
encoding_params = [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10]
case _:
encoding_params = [cv2.IMWRITE_JPEG_QUALITY, quality]
for i in range(25):
# 降低图像质量
# noinspection PyTypeChecker
success, image_data_encoded = cv2.imencode(
image_format, image_data_cv2_, encoding_params
) # 图像编码
# 若图像编码失败则退出
if not success:
break
image_data_base64 = b64encode(image_data_encoded.tobytes()).decode(
"utf-8"
)
# 压缩前后影像件大小差值
image_size_difference = len(image_data_base64) - image_size_specified
if image_size_difference <= 0:
if raw:
return image_data_base64
else:
return f"data:image/{image_format};base64,{image_data_base64}"
if image_size_difference < min_image_size_difference:
min_image_size_difference = image_size_difference
proximate_image_data_base64 = image_data_base64
# 影像件高度和宽度
image_height, image_weight = image_data_cv2_.shape[:2]
# 若仍超过影像件指定大小则调整图像尺寸
image_data_cv2_ = cv2.resize(
image_data_cv2_,
dsize=(int(image_weight * 0.9), int(image_height * 0.9)),
interpolation=cv2.INTER_AREA,
)
if proximate_image_data_base64:
if raw:
return proximate_image_data_base64
else:
return f"data:image/{image_format};base64,{image_data_base64}"
else:
raise RuntimeError("影像件压缩失败")
except:
return None
# 票据查验接口(需要)
@restrict(refill_rate=5, max_tokens=5) # 限速至5QPS
def invoices_verification(
image_index,
image_path=None,
invoice_number=None,
invoice_code=None,
invoice_check_code=None,
invoice_date=None,
invoice_amount=None,
id_number=None,
process_mode=None,
supplier=None,
):
try:
# 若影像件地址非空则imgBASE64请求否则根据发票五要素请求
if image_path:
match process_mode:
case "通过影像件本地地址":
# 创建路径对象
image_path = Path(image_path)
# 影像件文件名称后缀
image_format = image_path.suffix.strip().lstrip(".").lower()
# 读取影像件数据
with open(image_path, "rb") as image:
image_data = image.read()
case "通过影像件对象服务器地址":
image_format, image_data = http_client.download(url=image_path)
# 断定影像件格式为JGP、JPEG或者PNG
# noinspection PyUnboundLocalVariable
assert image_format in [
"jpg",
"jpeg",
"png",
], f"影像件格式({image_format})不支持"
match supplier:
case "szkt":
image_data_base64 = image_compression(
image_format=image_format, image_data=image_data
)
# noinspection PyUnusedLocal
response = http_client.post(
# 深圳快瞳增值税发票、医疗发票查验兼容版
url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll",
# 用于和深圳快瞳联查时定位请求
headers={"X-RequestId-Header": image_index},
data={
"token": authenticator.get_token(servicer="szkt"),
"imgBase64": image_data_base64,
},
)
case "bjfd":
image_data_base64 = image_compression(
image_format=image_format, image_data=image_data, raw=True
) # 北京分单不支持完整URI数据格式
# 业务入参序列化并BASE64编码
data = b64encode(
json.dumps(
{
"fileByte": image_data_base64,
"fileType": (
"png" if image_format == "png" else "jpg"
), # 北京分单影像件格式支持JPG、PNG或PDF本脚本暂不支持PDF
}
).encode("utf-8")
).decode("utf-8")
# 应用账号
appid = "mbYr11Rc_42"
# 随机标识
noise = image_index
# 版本号
version = "1.0"
# 装配签名
sign = (
hashlib.md5(
f"appid={appid}&data={data}&noise={noise}&key=80357535c95333c3b133dfe5533f6334fe5e9321&version={version}".encode(
"utf-8"
)
)
.hexdigest()
.upper()
)
# noinspection PyUnusedLocal
response = http_client.post(
# 北京分单增值税发票、医疗票据二维码查验接口
url="https://api.fendanyun.com/rsx/api/checkByQRCode",
headers={"Content-Type": "application/json; charset=utf-8"},
json={
"appid": appid,
"data": data,
"noise": noise,
"version": version,
"sign": sign,
},
)
else:
response = http_client.post(
# 深圳快瞳增值税发票、医疗发票查验兼容版
url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll",
data={
"token": authenticator.get_token(servicer="szkt"),
"invoiceNumber": invoice_number,
"invoiceCode": invoice_code,
"checkCode": invoice_check_code,
"invoicingDate": invoice_date,
"pretaxAmount": invoice_amount,
"idCardNo": id_number,
},
)
except RequestException as request_exception:
response = {
"status": request_exception.status,
"code": request_exception.code,
"message": request_exception.message,
}
except Exception as exception:
response = {
"code": "40000",
"message": f"发生其它异常{exception}",
}
return image_index, response
if __name__ == "__main__":
print("已启动批量票据查验")
match input("请选择票据查验供应商(1:深圳快瞳2:北京分单,其它任意字符:退出脚本):"):
case "1":
supplier = "szkt"
case "2":
supplier = "bjfd"
case _:
print("选择退出脚本!")
exit(0)
match input(
"请选择处理流程(1:批量解析已归档响应报文2:根据影像件地址或票据五要素批量查验,其它任意字符:退出脚本):"
):
case "1":
# 打开前置影像件索引CSV文件
dataframe = open_csv(file_name="dataframe_indexed.csv")
case "2":
print("正在归档响应报文...", end="")
# 创建响应报文目录路径对象
responses_path = Path("temporary/responses")
# 若响应报文目录路径不存在则创建
if not responses_path.exists():
responses_path.mkdir(parents=True, exist_ok=True)
# 创建归档响应报文目录路径对象
archives_path = Path("temporary/archives")
# 若归档响应报文目录路径不存在则创建
if not archives_path.exists():
archives_path.mkdir(parents=True, exist_ok=True)
# 遍历响应报文目录下所有文件名后缀为JSON的文件路径
for file_path in Path(responses_path).glob("*.json"):
# 若文件路径为文件
if file_path.is_file():
# 移动响应报文由响应报文目录至归档响应报文目录
shutil.move(str(file_path), str(archives_path / file_path.name))
print("已完成")
match input(
"请选择批量查验方法(1:通过影像件本地地址2:通过影像件对象服务器地址3:通过增值税发票和医疗票据的五要素,其它任意字符:退出脚本):"
):
case "1":
print("正在读取影像件本地地址...", end="")
dataframe = traverse_directory(
directory_path="待查验发票", suffixes=[".jpg", ".jpeg", ".png"]
)
# 修改列名相对路径为影像件地址
dataframe.rename(columns={"相对路径": "影像件地址"}, inplace=True)
process_mode = "通过影像件本地地址"
case "2":
print("正在读取影像件对象服务器地址...", end="")
dataframe = open_csv(file_name="dataframe.csv")
# 断定列名包括赔案编号、发票编号和影像件地址
assert all(
[
column_name in dataframe.columns
for column_name in ["赔案编号", "发票编号", "影像件地址"]
]
), "CSV文件中列名必须包括赔案编号、发票编号和影像件地址"
# 根据赔案编号和发票编号去重
dataframe.drop_duplicates(
subset=["赔案编号", "发票编号"], keep="first", inplace=True
)
# 处理方式
process_mode = "通过影像件对象服务器地址"
case "3":
print("正在读取增值税发票和医疗票据的五要素...", end="")
dataframe = open_csv(file_name="dataframe.csv")
# 断定列名包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额
assert all(
[
column_name in dataframe.columns
for column_name in [
"身份证号码后六位",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
]
]
), "CSV文件中列名必须包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额"
# 根据身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额去重
dataframe.drop_duplicates(
subset=[
"身份证号码后六位",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
],
keep="first",
inplace=True,
)
# 格式化开票日期
dataframe["开票日期"] = dataframe["开票日期"].str.replace(
"-", "", regex=False
)
# 处理方式
process_mode = "通过增值税发票和医疗票据的五要素"
case _:
print("选择退出脚本!")
exit(0)
# 统计待查验发票张数
rows = dataframe.shape[0]
# 若待查验发票张数为0则退出脚本
if rows == 0:
print("待查验发票张数为0退出脚本")
exit(0)
print(f"已完成,待查验发票张数为 {rows}")
# 添加索引
dataframe["索引"] = dataframe.apply(
lambda x: uuid.uuid4().hex, axis="columns"
)
dataframe.to_csv("dataframe_indexed.csv", index=False)
# 创建深圳快瞳获取访问令牌方法
authenticator = Authenticator()
# 初始化请求客户端
http_client = HTTPClient()
# 用于记录已完成任务数
completed_futures = 0
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
# noinspection PyUnreachableCode
# noinspection PyUnboundLocalVariable
match process_mode:
case "通过影像件本地地址" | "通过影像件对象服务器地址":
futures = [
executor.submit(
invoices_verification,
image_index=row.索引,
image_path=row.影像件地址,
process_mode=process_mode,
supplier=supplier,
)
for row in dataframe[["索引", "影像件地址"]].itertuples(
index=False, name="row"
)
]
case "通过增值税发票和医疗票据的五要素":
# 提交任务
futures = [
executor.submit(
invoices_verification,
image_index=row.索引,
invoice_number=row.发票编号,
invoice_code=row.发票代码,
invoice_check_code=row.校验号码后六位,
invoice_date=row.开票日期,
invoice_amount=row.发票金额,
id_number=row.身份证号码后六位,
process_mode=process_mode,
supplier=supplier,
)
for row in dataframe[
[
"索引",
"发票编号",
"发票代码",
"校验号码后六位",
"开票日期",
"发票金额",
"身份证号码后六位",
]
].itertuples(index=False, name="row")
]
for future in as_completed(futures):
index, response = future.result()
# 保存报文
with open(
"temporary/responses/{}.json".format(index),
"w",
encoding="utf-8",
) as file:
json.dump(response, file, ensure_ascii=False)
completed_futures += 1
print(f"已完成 {completed_futures / rows * 100:.2f} %")
case _:
print("选择退出脚本!")
exit(0)
print("正在解析报文...", end="")
# 解析后数据体
dataframe_parsed = []
# 遍历报文所在目录
for path_object in list(Path("temporary/responses").glob("*.json")):
# 解析报文结构
parse = {
"索引": "",
"机打发票号码": "",
"发票金额": "",
"购买方": "",
"销售方": "",
"发票状态": "",
"最大销售项目名称": "",
"最大销售项目数量": "",
"XML版式文件": "",
}
# 若路径对象包含下划线则在解析报文结构添加赔案编号和发票编号
if "_" in path_object.stem:
parse["赔案编号"] = path_object.stem.split("_")[0]
parse["发票编号"] = path_object.stem.split("_")[1]
# 打开报文并JSON逆序列化
with open(path_object, "r", encoding="utf-8") as file:
response = json.load(file)
# 索引
parse["索引"] = path_object.stem
match supplier:
case "szkt":
try:
# 响应状态码
status_code = response.get("status", "")
# 错误码
code = response.get("code", "")
# 流水号
serial = response.get("serialNo", "")
# 若响应状态码为200且错误码为10000则定义为响应成功
if status_code == 200 and code == 10000:
# 查验类型若查验类型为003081则为医疗票据查验003082则为增值税发票查验两者报文结构不一致
match response.get("data").get(
"productCode"
): # 若响应成功则必定存在键DATA和PRODUCTCODE
# 解析医疗票据查验结果
case "003081":
parse["机打发票号码"] = response.get("data").get(
"billNumber"
)
parse["校验码"] = response.get("data").get("checkCode")
parse["发票金额"] = response.get("data").get("amount")
parse["购买方"] = response.get("data").get("payer")
parse["销售方"] = response.get("data").get("payeeName")
# 发票状态
match response.get("data").get("flushedRed"):
case "true":
parse["发票状态"] = "正常"
case "false":
parse["发票状态"] = "已红冲"
# 最大销售项目数量
max_item_quantity = 0
# 遍历销售项目列表
for item in response.get("data").get("feeitems", []):
# 销售项目数量
item_quantity = item.get("number")
# 若销售项目数量非空,进一步判断是否包含斜杠,若包含斜杠则分割并取第一部分,最后转为浮点
if item_quantity:
if "/" in item_quantity:
item_quantity = item_quantity.split("/")[0]
item_quantity = float(item_quantity)
else:
item_quantity = 1
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get(
"itemName", ""
)
parse["最大销售项目数量"] = str(item_quantity)
parse["XML版式文件"] = response.get("PDFInfo", {}).get(
"fileUrl"
)
# 解析增值税发票查验结果
case "003082":
parse["机打发票号码"] = (
response.get("data").get("details").get("number")
)
parse["校验码"] = (
response.get("data")
.get("details")
.get("check_code")
)
parse["发票金额"] = (
response.get("data").get("details").get("total")
)
parse["购买方"] = (
response.get("data").get("details").get("buyer")
)
parse["销售方"] = (
response.get("data").get("details").get("seller")
)
# 发票状态
match response.get("data").get("details").get(
"invoiceTypeNo"
):
case "0":
parse["发票状态"] = "正常"
case "1":
parse["发票状态"] = "无法查验"
case "2" | "3" | "7" | "8":
parse["发票状态"] = "已红冲"
max_item_quantity = 0
items = (
response.get("data").get("details").get("items", [])
)
for item in items:
item_quantity = (
float(item.get("quantity", 1))
if item.get("quantity")
else 1
)
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get("name")
parse["最大销售项目数量"] = str(item_quantity)
# XML版式文件25-06-11本接口不在提供版式文件通过另一接口可获取数电增值税发票版式文件
parse["XML版式文件"] = "本接口不再提供版式文件"
# 若响应状态码为400且错误码为10001或10100则定义为假票
elif status_code == 400 and (code == 10001 or code == 10100):
parse["发票状态"] = "假票"
else:
raise Exception("解析报文发生其它异常")
except Exception as exception:
parse["发票状态"] = "{}".format(response.get("message"))
case "bjfd":
try:
# 不验签业务出参BASE64解码并反序列化
response = json.loads(
b64decode(response.get("data")).decode("utf-8")
)
# 增值税发票、医疗票据查验结果BASE64解码并反序列化
response["message"] = json.loads(
b64decode(response.get("message")).decode("utf-8")
)
# 错误码
code = response.get("result")
# 流水号
serial = response.get("message").get("checkId")
# 核验结果代码
result_code = response.get("message").get("resultCode")
# 若错误码为S0000则定义为响应成功
if code == "S0000":
# noinspection PyUnreachableCode
match result_code:
# 若查验成功则根据增值税发票、医疗票据状态匹配发票状态
case "200":
parse["机打发票号码"] = (
response.get("message")
.get("tickMainInfo")
.get("invoiceNo")
)
parse["发票金额"] = (
response.get("message")
.get("tickMainInfo")
.get("invoiceTotalPrice")
)
parse["购买方"] = (
response.get("message")
.get("tickMainInfo")
.get("payerPartyName")
)
parse["销售方"] = (
response.get("message")
.get("tickMainInfo")
.get("invoicingPartyName")
)
max_item_quantity = 0
for item in (
response.get("message")
.get("tickMainInfo")
.get("chargeItems", [])
):
item_quantity = (
float(item.get("num", 1))
if item.get("num")
else 1
)
if item_quantity > max_item_quantity:
parse["最大销售项目名称"] = item.get(
"chargeName"
)
parse["最大销售项目数量"] = str(item_quantity)
match response.get("message").get("invoiceStatus"):
case "0":
parse["发票状态"] = "正常"
case (
"1" | "2"
): # 沿用深圳快瞳解析规则,北京分单已开红票和已作废映射为已红冲
parse["发票状态"] = "已红冲"
case "E20003" | "E20007 ":
parse["发票状态"] = "假票"
# 其它情况发票状态根据核验结果描述
case _:
parse["发票状态"] = response.get("message").get(
"resultMsg"
)
except Exception as exception:
parse["发票状态"] = str(exception)
dataframe_parsed.append(parse)
dataframe_parsed = pandas.DataFrame(data=dataframe_parsed, dtype=str)
# 将解析数据集拼接至数据集
dataframe = dataframe.merge(right=dataframe_parsed, how="left", on=["索引"])
# 填补缺失值
dataframe = dataframe.fillna(value="")
print("已完成")
print("正在保存为工作簿...", end="")
save_as_workbook(worksheets=[("Sheet1", dataframe)], workbook_name="results.xlsx")
print("已完成")

View File

@ -0,0 +1,305 @@
# -*- coding: utf-8 -*-
"""
普康健康_发票识别
"""
# 加载模块
import json
from pathlib import Path
import shutil
import pandas
import uuid
from concurrent.futures import ThreadPoolExecutor, as_completed
from 普康健康发票查验.main import image_compression
from utils.pandas_extension import traverse_directory, save_as_workbook
from utils.client import restrict, HTTPClient, RequestException, Authenticator
if __name__ == "__main__":
print("正在基于深圳快瞳的增值税发票识别接口批量识别")
match input(
"请选择获取待识别发票影像件方式(1:遍历目录通过影像件路径,其它任意字符:退出脚本):"
):
case "1":
print("正在遍历目录...", end="")
dataframe = traverse_directory(
directory_path="待识别发票", suffixes=[".jpg", ".jpeg", ".png"]
)
# 修改列名相对路径为影像件地址
dataframe.rename(columns={"相对路径": "影像件地址"}, inplace=True)
# 添加索引
dataframe["索引"] = dataframe.apply(
lambda x: uuid.uuid4().hex, axis="columns"
)
case _:
print("选择退出脚本!")
exit(0)
# 统计待识别发票张数
rows = dataframe.shape[0]
# 若待识别发票张数为0则退出脚本
if rows == 0:
print("待识别发票张数为0退出脚本")
exit(0)
print(f"已完成,待识别发票张数为 {rows}")
match input(
"请选择是否就上一次的响应报文进行归档(0:不归档1:归档,其它任意字符:退出脚本):"
):
# 若不归档则不请求深圳快瞳的增值税发票识别接口
case "0":
pass
case "1":
print("正在归档响应报文...", end="")
# 创建响应报文目录路径对象
responses_path = Path("temporary/responses")
# 若响应报文目录路径不存在则创建
if not responses_path.exists():
responses_path.mkdir(parents=True, exist_ok=True)
# 创建归档响应报文目录路径对象
archives_path = Path("temporary/archives")
# 若归档响应报文目录路径不存在则创建
if not archives_path.exists():
archives_path.mkdir(parents=True, exist_ok=True)
# 遍历响应报文目录下所有文件名后缀为JSON的文件路径
for file_path in Path(responses_path).glob("*.json"):
# 若文件路径为文件
if file_path.is_file():
# 移动响应报文由响应报文目录至归档响应报文目录
shutil.move(str(file_path), str(archives_path / file_path.name))
print("已完成")
# 创建深圳快瞳获取访问令牌方法
authenticator = Authenticator(servicer="szkt")
# 初始化请求客户端
http_client = HTTPClient()
@restrict(refill_rate=5, max_tokens=5)
def szkt_request(
image_index,
image_path=None,
):
try:
# 创建影像件路径对象
image_path = Path(image_path)
# 影像件文件名称后缀
image_format = image_path.suffix.strip().lower()
# 根据影像件路径读取图像数据(二进制)
with open(image_path, "rb") as image:
image_data = image.read()
# 标准化影像件格式
# noinspection PyUnboundLocalVariable
image_format = image_format if image_format == ".png" else ".jpeg"
# noinspection PyUnboundLocalVariable
image_data_base64 = image_compression(image_format, image_data)
response = http_client.post(
# 增值税发票识别
url="https://ai.inspirvision.cn/s/api/ocr/vatInvoice",
# 用于和深圳快瞳联查时定位请求
headers={"X-RequestId-Header": image_index},
data={
"token": authenticator.get_token(),
"imgBase64": image_data_base64,
},
)
except RequestException as request_exception:
response = {
"status": request_exception.status_code,
"message": request_exception.message,
}
except Exception as exception:
response = {
"status": "90000",
"message": f"发生其它异常{exception}",
}
return image_index, response
# 用于记录已完成任务数
completed_futures = 0
# 创建线程池
with ThreadPoolExecutor(max_workers=5) as executor:
futures = [
executor.submit(
szkt_request,
image_index=row.索引,
image_path=row.影像件地址,
)
for row in dataframe[["索引", "影像件地址"]].itertuples(
index=False, name="row"
)
]
for future in as_completed(futures):
index, response = future.result()
# 保存报文
with open(
"temporary/responses/{}.json".format(index),
"w",
encoding="utf-8",
) as file:
json.dump(response, file, ensure_ascii=False)
completed_futures += 1
print(f"已完成 {completed_futures / rows * 100:.2f} %")
case _:
print("选择退出脚本!")
exit(0)
print("正在解析报文...", end="")
# 解析后数据体
dataframe_parsed = []
# 遍历报文所在目录
for path_object in list(Path("temporary/responses").glob("*.json")):
# 解析报文结构
parse = {
"索引": "",
"发票号码": "",
"小写金额": "",
"合计金额": "",
"合计税额": "",
"购买方名称": "",
"销售方名称": "",
}
try:
# 打开报文并JSON逆序列化
with open(path_object, "r", encoding="utf-8") as file:
response = json.load(file)
# 索引
parse["索引"] = path_object.stem
# 响应状态码
status_code = response.get("status", "")
# 错误码
code = response.get("code", "")
# 流水号
serial = response.get("serialNo", "")
# 若响应状态码为200且错误码为0则定义为响应成功
if status_code == 200 and code == 0:
for item in response.get("data", []):
# 根据DESC匹配字段
match item.get("desc"):
case "发票号码":
parse["发票号码"] = item.get("value", "")
case "小写金额":
parse["小写金额"] = item.get("value").replace("¥", "")
case "合计金额":
parse["合计金额"] = item.get("value").replace("¥", "")
case "合计税额":
parse["合计税额"] = item.get("value").replace("¥", "")
case "购买方名称":
parse["购买方名称"] = item.get("value", "")
case "销售方名称":
parse["销售方名称"] = item.get("value", "")
else:
raise Exception("解析报文发生其它异常")
except Exception as exception:
parse["发票号码"] = "{}".format(response.get("message"))
finally:
dataframe_parsed.append(parse)
dataframe_parsed = pandas.DataFrame(data=dataframe_parsed, dtype=str)
# 将解析数据集拼接至数据集
dataframe = dataframe.merge(right=dataframe_parsed, how="left", on=["索引"])
# 填补缺失值
dataframe = dataframe.fillna(value="")
print("已完成")
print("正在保存为工作簿...", end="")
save_as_workbook(worksheets=[("Sheet1", dataframe)], workbook_name="results.xlsx")
print("已完成")

View File

@ -0,0 +1,172 @@
# -*- coding: utf-8 -*-
"""
普康健康_客服会话记录整合
"""
# 加载模块
import json
import re
from pathlib import Path
import pandas
from jinja2 import Environment, FileSystemLoader
from utils.client import Authenticator, HTTPClient
# 创建目录地址对象
directory_path = Path("客服会话记录")
# 初始化数据体
dataframe = pandas.DataFrame()
for file_path in directory_path.glob("*.csv"):
# 读取本地CSV
dataframe = pandas.concat(
[
dataframe,
pandas.read_csv(
filepath_or_buffer=file_path,
usecols=["用户名称", "会话开始时间", "详情"],
dtype=str,
encoding="gbk",
encoding_errors="ignore",
),
],
ignore_index=True,
)
dataframe = (
dataframe.assign(
会话开始时间=pandas.to_datetime(dataframe["会话开始时间"], errors="coerce"),
# 删除客服导航语、结束语和双换行符
详情=dataframe["详情"].apply(
lambda row: (
row.split("\n\n", maxsplit=1)[-1]
if "您好,欢迎咨询小暖,猜您可能咨询以下问题" in row
else row
)
.replace("对话结束 >>\n", "")
.replace("\n\n", "")
),
)
# 按照会话开始时间倒序排序
.sort_values(by="会话开始时间", ascending=False)
# 按照用户名称分组并就详情以双换行符拼接
.groupby(by="用户名称", as_index=False).agg(详情=("详情", "\n\n".join))
)
# 长文本建议以JSON方式保存
template = Environment(loader=FileSystemLoader(".")).get_template("template.html")
# 初始化认证器
Authenticator = Authenticator()
# 初始化HTTP客户端
http_client = HTTPClient()
rows = []
for _, row in dataframe.iterrows():
# 初始化会话列表
conversations = []
# 以双换行符切割文本,每部分为一个会话
for lines in row["详情"].split("\n\n"):
# 初始化会话
conversation = {"messages": [], "started_at": ""}
# 以换行符切割文本,遍历每一行
for i, line in enumerate(lines := lines.split("\n")):
# 正则匹配行包含发送时间
match = re.search(
r"(?P<started_at>\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})",
sent_at := line.split(" ", maxsplit=1)[-1],
)
while match and not conversation["started_at"]:
# 更新会话开始时间
conversation["started_at"] = match.group("started_at")
if match and row["用户名称"] in (name := line.split(" ", maxsplit=1)[0]):
# 初始化客户信息体
message = {"sender": "客户", "sender_type": "customer", "content": ""}
# 若某行匹配发送时间格式,则至下个匹配发送时间格式的中间行为发送内容
for i_, line_ in enumerate(lines[i + 1 :]):
if (
not (
match_ := re.search(
r"(^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$",
line_.split(" ", maxsplit=1)[-1],
)
)
and line_
):
message["content"] += line_
if match_:
break
conversation["messages"].append(message)
elif match and any(
x in (name := line.split(" ", maxsplit=1)[0])
for x in ["机器人", "kefu"]
):
# 初始化客服信息体
# noinspection PyUnboundLocalVariable
message = {
"sender": name,
"sender_type": "staff",
"content": "",
}
for j, line_ in enumerate(lines[i + 1 :]):
if (
not (
match_ := re.search(
r"(^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$",
line_.split(" ", maxsplit=1)[-1],
)
)
and line_
):
message["content"] += line_
if match_:
break
conversation["messages"].append(message)
conversations.append(conversation)
response = http_client.post(
url="https://kms.7x24cc.com/api/v1/knowledges/",
headers={
"Authorization": "Bearer {}".format(
Authenticator.get_token(servicer="hlyj")
), # 获取访问令牌并装配置请求头
"Content-Type": "application/json; charset=utf-8",
},
json={
"knowledge_base_id": "143bfe7f-fd79-49f2-8359-c123aba944c2",
"title": (
customer if "_" in (customer := row["用户名称"]) else f"客户{customer}"
),
"content": template.render(
{
"conversations": conversations,
}
), # 根据模版生成HTML
},
)
print(response)
# 就响应中知识条目标识更新至行
row["新增结果"] = response.get("knowledge_id", "")
rows.append(row)
with open("results.json", "w", encoding="utf-8") as file:
json.dump(rows, file, ensure_ascii=False)

View File

@ -0,0 +1,22 @@
<!DOCTYPE html>
<html lang="zh-CN">
<body style="font-family: -apple-system, 'Segoe UI', Roboto, Arial, sans-serif; color: #4e5969; background-color: #f2f3f5; margin: 0; padding: 20px;">
<div style="max-width: 800px; margin: 0 auto;">
{% for conversation in conversations %}
<div style="background-color: white; box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1); border-radius: 12px; margin-bottom: 20px; padding: 16px;">
<div style="color: #86909c; border-bottom: 1px solid #e5e6eb; margin-bottom: 12px; padding-bottom: 8px;">会话开始时间:{{ conversation.started_at }}</div>
<div style="display: flex; flex-direction: column; gap: 12px;">
{% for message in conversation.messages %}
<div style="max-width: 75%; padding: 10px 14px; border-radius: 18px; line-height: 1.4; position: relative; word-wrap: break-word; {% if message.sender_type == 'customer' %}background-color: #94BFFF; border-top-right-radius: 4px; margin-left: 25%; align-self: flex-end;{% else %}background-color: white; border: 1px solid #e5e6eb; border-top-left-radius: 4px; margin-right: 25%; align-self: flex-start;{% endif %}">
<div style="font-size: 12px; color: #86909c; margin-bottom: 4px; {% if message.sender_type == 'customer' %}text-align: right;{% else %}text-align: left;{% endif %}">{{ message.sender }}</div>
<div>
{{ message.content }}
</div>
</div>
{% endfor %}
</div>
</div>
{% endfor %}
</div>
</body>
</html>

View File

@ -0,0 +1,185 @@
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>客服对话记录</title>
<style>
body {
font-family: -apple-system, "Segoe UI", Roboto, Arial, sans-serif;
color: #4e5969;
background-color: #f2f3f5;
margin: 0;
padding: 20px;
}
.conversations-container {
max-width: 800px;
margin: 0 auto;
}
.conversation-container {
background-color: white;
box-shadow: 0 1px 3px rgba(0, 0, 0, 0.1);
border-radius: 12px;
margin-bottom: 20px;
padding: 16px;
}
.conversation-header {
color: #86909c;
border-bottom: 1px solid #e5e6eb;
margin-bottom: 12px;
padding-bottom: 8px;
}
.message-container {
display: flex;
flex-direction: column;
gap: 12px;
}
.message-bubble {
max-width: 75%;
padding: 10px 14px;
border-radius: 18px;
line-height: 1.4;
position: relative;
word-wrap: break-word;
}
.customer-bubble {
align-self: flex-start;
background-color: white;
border: 1px solid #e5e6eb;
border-top-left-radius: 4px;
margin-right: 25%;
}
.staff-bubble {
align-self: flex-end;
background-color: #94BFFF;
border-top-right-radius: 4px;
margin-left: 25%;
}
.message-sender {
font-size: 12px;
color: #86909c;
margin-bottom: 4px;
}
.customer-bubble .message-sender {
text-align: left;
}
.staff-bubble .message-sender {
text-align: right;
}
</style>
</head>
<body>
<div class="conversations-container">
<div class="conversation-container">
<div class="conversation-header">会话开始时间2024-11-13 09:14:28</div>
<div class="message-container">
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
订单取消
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">机器人(人保小暖)</div>
<div class="bubble-content">
非常抱歉呢,这个问题小暖还没有学过呢~您可以转人工咨询~
</div>
</div>
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
转人工
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">机器人(人保小暖)</div>
<div class="bubble-content">
正在为您转接人工客服
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">机器人(人保小暖)</div>
<div class="bubble-content">
人工服务是一对多服务,回复可能稍慢,转接请稍等...!
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">kefu04(小暖暖)</div>
<div class="bubble-content">
您好
</div>
</div>
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
您好华怡药房有个订单要取消17314220802108
</div>
</div>
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
商家的客服页面是空白,页面没有取消订单操作
</div>
</div>
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
请后台联系取消订单,或者提供可以联系上店铺的客服电话
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">kefu04(小暖暖)</div>
<div class="bubble-content">
您截图
</div>
</div>
<div class="message-bubble customer-bubble">
<div class="message-sender">客户</div>
<div class="bubble-content">
发送图片https://minioss.cn-wlcb.ufileos.com/filetransfer/20241113/kf_10566/688595f6639b3b895fcdd8364817cdd51731460659.jpg 大小69.76 KB
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">kefu04(小暖暖)</div>
<div class="bubble-content">
这边咨询一下后台是否可以进行操作
</div>
</div>
<div class="message-bubble staff-bubble">
<div class="message-sender">kefu04(小暖暖)</div>
<div class="bubble-content">
已经咨询,后台还未有回复
</div>
</div>
</div>
</div>
</div>
</body>
</html>

View File

@ -0,0 +1,474 @@
# -*- coding: utf-8 -*-
"""
普康健康_生成直付理赔周报
"""
# 加载模块
import pandas, numpy
from utils.pandas_extension import open_csv, save_as_workbook
# 根据机构所在省份匹配为机构名称
def match_institution_name(x):
x_matched = "总部"
match x:
case "北京市" | "天津市":
x_matched = "京津"
case "河北省":
x_matched = "河北"
case "山西省":
x_matched = "山西"
case "内蒙古自治区":
x_matched = "内蒙"
case "辽宁省":
x_matched = "辽宁"
case "吉林省" | "黑龙江省":
x_matched = "黑吉"
case "上海市":
x_matched = "上海"
case "江苏省":
x_matched = "江苏"
case "浙江省":
x_matched = "浙江"
case "安徽省":
x_matched = "安徽"
case "福建省":
x_matched = "福建"
case "江西省":
x_matched = "江西"
case "山东省":
x_matched = "山东"
case "河南省":
x_matched = "河南"
case "湖北省":
x_matched = "湖北"
case "湖南省":
x_matched = "湖南"
case "广东省" | "海南省":
x_matched = "广东"
case "广西壮族自治区":
x_matched = "广西"
case "重庆市" | "四川省" | "西藏自治区":
x_matched = "四川"
case "贵州省":
x_matched = "贵州"
case "云南省":
x_matched = "云南"
case "新疆维吾尔自治区":
x_matched = "新疆"
case "陕西省" | "青海省":
x_matched = "陕西"
case "甘肃省":
x_matched = "甘肃"
case "宁夏回族自治区":
x_matched = "宁夏"
return x_matched
# 根据机构名称匹配为大区名称
def match_region_name(x):
x_matched = "总部"
match x:
case "内蒙" | "辽宁" | "黑吉":
x_matched = "东北大区"
case "京津" | "河北" | "山西":
x_matched = "华北大区"
case "安徽" | "山东" | "河南":
x_matched = "华东大区"
case "江苏" | "福建" | "广东":
x_matched = "东南大区"
case "江西" | "湖北" | "湖南":
x_matched = "华中大区"
case "新疆" | "陕西" | "甘肃" | "宁夏":
x_matched = "西北大区"
case "广西" | "四川" | "云南" | "贵州":
x_matched = "西南大区"
case "上海":
x_matched = "上海"
case "浙江":
x_matched = "浙江"
return x_matched
"""
统计方案
1读取当年往月对账单数据包括对账期商家编号保单编号和对账金额文件名为reconciliations.csv
1.1 根据对账期和保单编号分组就对账金额求和其中对账期对账金额之和重命名为考核周期消费规模
2读取当年当月保单扣减数据包括扣减期商家编号保单编号和扣减金额文件名为reconciliations_month.csv
2.1 根据扣减期和保单编号分组就扣减金额求和其中扣减期扣减金额之和重命名为考核周期消费规模
3合并1.1和2.1即当年往月和当年当月考核周期保单编号和消费规模
4读取徐丹老师提供的保单机构分配数据包括保单编号落地机构落地机构分配比例出单机构出单机构分配比例总部分配比例文件名为slips.csv
4.1 先查询3中消费规模大于0的保单编号再和4中保单编号比较就不在4中的保单编号提供徐丹老师由其补录保单机构分配方案补录后重复4.1至无需再提供徐丹老师 -->过程表
5就机构拆解保单消费规模根据考核周期机构分组就消费规模求和
5.1 根据机构名称匹配大区名称
5.2 读取当年机构消费目标数据包括考核周期机构名称和消费目标根据考核周期和机构名称匹配消费目标
5.3 就算达成率消费规模/消费目标-->基表
6透视基表生成各机构在当年各月消费目标消费规模和转化率当年消费目标消费规模和转化率并汇总
"""
print("正在生成直付理赔周报...", end="")
# 当年往月对账单数据(维度为对账期-商家编号-保单编号)
reconciliations = open_csv(file_name="reconciliations.csv")
# 删除保单编号为空的行(若保单编号为空则对账金额必定为空,若对账金额为空则保单编号必定为空)
reconciliations.dropna(subset=["保单编号"], inplace=True)
# 数据类型转换
for variable_label in reconciliations.columns:
match variable_label:
case "对账金额":
# 不可能出现缺失值,无需填补缺失值
reconciliations[variable_label] = reconciliations[variable_label].astype(
"float"
)
# 按照对账期和保单编号分组,就对账金额求和,重置索引,修改列名
reconciliations = (
reconciliations.groupby(by=["对账期", "保单编号"])
.agg(对账金额=("对账金额", "sum"))
.reset_index()
.rename(columns={"对账期": "考核周期", "对账金额": "消费规模"})
)
# 当年当月保单扣减数据(维度为扣减期-商家编号-保单编号)
reconciliations_month = open_csv(file_name="reconciliations_month.csv")
# 数据类型转换
for variable_label in reconciliations_month.columns:
match variable_label:
case "扣减金额":
# 不可能出现缺失值,无需填补缺失值
reconciliations_month[variable_label] = reconciliations_month[
variable_label
].astype("float")
# 按照扣减期和保单编号分组,就扣减金额求和,重置索引,修改列名
reconciliations_month = (
reconciliations_month.groupby(by=["扣减期", "保单编号"])
.agg(扣减金额=("扣减金额", "sum"))
.reset_index()
.rename(columns={"扣减期": "考核周期", "扣减金额": "消费规模"})
)
# 合并上述当年往月对账单数据和当年当月保单扣减数据
reconciliations = pandas.concat(
objs=[reconciliations, reconciliations_month], ignore_index=True
)
# 徐丹老师提供的保单机构分配数据
slips = open_csv(file_name="slips.csv")
# 数据类型转换
for variable_label in slips.columns:
match variable_label:
# 不可能出现缺失值,无需填补缺失值
case "落地机构分配比例" | "出单机构分配比例" | "总部分配比例":
slips[variable_label] = slips[variable_label].astype("int")
# 过程表
process_table = reconciliations.merge(right=slips, on="保单编号", how="left")
# 统计消费规模大于0且出单机构分配比例为空的保单机构分配数据
process_table.loc[
(process_table["消费规模"] > 0) & (process_table["出单机构分配比例"].isna()),
"异常标签",
] = "无分配方案"
if process_table.loc[process_table["异常标签"] == "无分配方案"].shape[0] > 0:
print("存在未分配机构的保单,请提请徐丹老师补录")
print()
save_as_workbook(
worksheets=[("异常保单", process_table)],
workbook_name="普康健康_需补录保单机构分配方案.xlsx",
)
exit()
# 新增总部
slips.insert(loc=slips.shape[1] - 1, column="总部", value="总部")
# 先就落地机构、出单机构和总部新增机构名称列,落地机构分配比例、出单机构分配比例和总部分配比例新增分配比例列,再拆分为行
slips = (
slips.assign(
# 整合机构
机构名称=slips.apply(
lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns"
),
# 整合分配比例
分配比例=slips.apply(
lambda x: [x["落地机构分配比例"], x["出单机构分配比例"], x["总部分配比例"]],
axis="columns",
),
)
# 拆分机构名称和分配比例并重置索引
.explode(["机构名称", "分配比例"]).reset_index(drop=True)
)
# 保留分配比例大于0的保单机构分配数据
slips = slips.loc[slips["分配比例"] > 0, ["保单编号", "机构名称", "分配比例"]]
# 根据机构所在省份匹配为机构名称
slips["机构名称"] = slips["机构名称"].apply(lambda x: match_institution_name(x))
# 根据机构名称匹配为大区名称并插入至第二列
slips.insert(
loc=slips.shape[1] - 2,
column="大区名称",
value=slips["机构名称"].apply(lambda x: match_region_name(x)),
)
# 左拼接保单机构分配数据(分配比例不可能出现缺失值,无需填补缺失值)
process_table = process_table.merge(right=slips, on="保单编号", how="left")
# 分配后消费规模
process_table["分配后消费规模"] = process_table.apply(
lambda x: x["消费规模"] * x["分配比例"] / 100, axis="columns"
)
# 按照考核周期和机构名称分组,就分配后消费规模求和
process_table = (
process_table.groupby(by=["考核周期", "机构名称"])
.agg(大区名称=("大区名称", "first"), 分配后消费规模=("分配后消费规模", "sum"))
.reset_index()
)
# 机构考核周期消费目标数据(维度为对机构名称-考核周期)
targets = open_csv(file_name="targets.csv")
# 数据类型转换
for variable_label in targets.columns:
match variable_label:
case "消费目标":
# 消费目标不可能出现缺失值,无需填补缺失值
targets[variable_label] = targets[variable_label].astype("float")
process_table = process_table.merge(
right=targets, on=["机构名称", "考核周期"], how="left"
)
# 根据过程表透视(第一级行索引为大区名称,第二级行索引为机构名称,第一级列索引为考核周期,列索引值为分配后消费规模和消费目标,行和列汇总)
pivot_table = process_table.pivot_table(
index=["大区名称", "机构名称"],
columns="考核周期",
values=[
"分配后消费规模",
"消费目标",
], # 注意若设置一个列索引和多个列索引值PANDAS将自动创建多级列索引第一级列索引为VALUES第二季列索引为COLUMNS
aggfunc="sum",
margins=True,
margins_name="汇总",
)
# 添加大区汇总
for region_name in pivot_table.index.get_level_values("大区名称").unique():
if region_name not in ["上海", "浙江", "总部", "汇总"]:
# 汇总大区数据(就各机构的考核周期分配后消费规模和消费目标分别求和)
region_summary = pivot_table.loc[region_name].sum() # SERIES对象
region_summary = pandas.DataFrame(
data=[region_summary], # SERIES列表
# 创建多级行索引
index=pandas.MultiIndex.from_tuples(
tuples=[(region_name, "汇总")], names=["大区名称", "机构名称"]
),
columns=region_summary.index,
)
pivot_table = pandas.concat(objs=[pivot_table, region_summary])
# 计算各考核周期和汇总达成率
for period in pivot_table.columns.get_level_values("考核周期").unique():
pivot_table[("达成率", period)] = pivot_table.apply(
lambda x: (
x[("分配后消费规模", period)] / x[("消费目标", period)]
if x[("消费目标", period)] != 0
else 0
),
axis="columns",
)
# 交换列索引层级,再就列索引排序
pivot_table = pivot_table.swaplevel(axis="columns").sort_index(axis="columns")
# 大区名称排序
regions_orders = [
"东北大区",
"华北大区",
"华东大区",
"华中大区",
"东南大区",
"西北大区",
"西南大区",
"上海",
"浙江",
"总部",
"汇总",
]
# 大区名称和排序映射器
region_mapper = {
region_name: region_index for region_index, region_name in enumerate(regions_orders)
}
# 根据大区名称映射排序
regions_mapped = [
region_mapper.get(region_name)
for region_name in pivot_table.index.get_level_values("大区名称")
]
# 机构排序
institutions_orders = {
"东北大区": ["汇总", "内蒙", "辽宁", "黑吉"],
"华北大区": ["汇总", "京津", "河北", "山西"],
"华东大区": ["汇总", "安徽", "山东", "河南"],
"华中大区": ["汇总", "江西", "湖北", "湖南"],
"东南大区": ["汇总", "江苏", "福建", "广东"],
"西北大区": ["汇总", "新疆", "陕西", "甘肃", "宁夏"],
"西南大区": ["汇总", "广西", "四川", "云南", "贵州"],
"上海": ["上海"],
"浙江": ["浙江"],
"总部": ["总部"],
"汇总": [""],
}
# 机构名称和排序映射器
institution_mapper = {}
institution_mapper.update(
{
(region_name, institution_name): institution_index
for region_name, institution_names in institutions_orders.items()
for institution_index, institution_name in enumerate(institution_names)
}
)
# 根据机构名称映射排序
institutions_mapped = [
institution_mapper.get((region, institution))
for region, institution in zip(
pivot_table.index.get_level_values("大区名称"),
pivot_table.index.get_level_values("机构名称"),
)
]
# 根据大区名称映射排序和机构名称映射排序多重排序
pivot_table = pivot_table.iloc[
numpy.lexsort((institutions_mapped, regions_mapped))
].reset_index()
save_as_workbook(
worksheets=[("sheet1", pivot_table)], workbook_name="普康健康_机构周报.xlsx"
)
print("生成成功")

View File

@ -0,0 +1,576 @@
from decimal import Decimal, ROUND_HALF_UP
import pandas
from utils.pandas_extension import save_as_workbook
dataset = {}
# 机构绩效基础数据
sheets = pandas.ExcelFile("基础数据.xlsx")
for sheet_name in sheets.sheet_names:
dataset[sheet_name] = pandas.read_excel(sheets, sheet_name=sheet_name, dtype=str)
# 根据年度达成率映射为年终奖
def mapped_as_regional_year_end_bonus(x: Decimal) -> str:
# noinspection PyUnreachableCode
match x:
case x if x < 70:
z = "0薪"
case x if 70 <= x < 80:
z = "4薪"
case x if 80 <= x < 90:
z = "5薪"
case x if 90 <= x < 100:
z = "6薪"
case x if 100 <= x < 110:
z = "7薪"
case x if 110 <= x < 120:
z = "8薪"
case x if 120 <= x < 130:
z = "9薪"
case x if 130 <= x < 140:
z = "10薪"
case x if 140 <= x < 150:
z = "11薪"
case x if x >= 150:
z = "12薪"
case _:
z = "年度达成率未匹配年终奖"
return z
# 大区年终薪酬
# 就大区机构周报,筛选大区/机构名称包含大区的行、年度消费目标(万)/累计消费规模(万)/年度达成率,根据年度达成率匹配年终奖基准额度
regional_year_end_bonus = (
dataset["大区机构达成率1"]
.loc[
dataset["大区机构达成率1"]["大区/机构名称"].str.contains("大区"),
["大区/机构名称", "年度消费目标(万)", "累计消费规模(万)", "年度达成率"],
]
.assign(
年度达成率=lambda dataframe: dataframe["年度达成率"].apply(
lambda cell: (Decimal(cell) * 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
)
)
.assign(
年终奖=lambda dataframe: dataframe["年度达成率"].apply(
lambda cell: mapped_as_regional_year_end_bonus(cell)
)
)
.sort_values(by="年度达成率", ascending=False)
.reset_index(drop=True)
)
def mapped_as_institutional_year_end_bonus(x: pandas.Series) -> str | None:
if x["年度达成率"] < 70:
if (
x["大区/机构名称"]
in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃、黑吉、辽宁、山西、新疆、四川"
):
z = "0薪"
else:
z = None
elif 70 <= x["年度达成率"] < 80:
if (
x["大区/机构名称"]
in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃"
):
z = "0薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "2薪"
else:
z = None
elif 80 <= x["年度达成率"] < 90:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "0薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "2薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "3薪"
else:
z = None
elif 90 <= x["年度达成率"] < 100:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "2薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "3薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "4薪"
else:
z = None
elif 100 <= x["年度达成率"] < 120:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "3薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "4薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "5薪"
else:
z = None
elif 120 <= x["年度达成率"] < 140:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "4薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "5薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "6薪"
else:
z = None
elif 140 <= x["年度达成率"] < 160:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "5薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "6薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "7薪"
else:
z = None
elif 160 <= x["年度达成率"] < 180:
if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西":
z = "6薪"
elif x["大区/机构名称"] in "河北、江苏、江西、甘肃":
z = "7薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "8薪"
else:
z = None
elif 180 <= x["年度达成率"] < 200:
if (
x["大区/机构名称"]
in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃"
):
z = "8薪"
elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川":
z = "9薪"
else:
z = None
else:
if (
x["大区/机构名称"]
in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃、黑吉、辽宁、山西、新疆、四川"
):
z = "10薪"
else:
z = None
return z
# 机构年终薪酬
institutional_year_end_bonus = (
dataset["大区机构达成率1"]
.loc[
~dataset["大区机构达成率1"]["大区/机构名称"].str.contains("大区")
& ~dataset["大区机构达成率1"]["大区/机构名称"].isin(["宁夏(只宁煤)"]),
["大区/机构名称", "年度消费目标(万)", "累计消费规模(万)", "年度达成率"],
]
.assign(
年度达成率=lambda dataframe: dataframe["年度达成率"].apply(
lambda cell: (Decimal(cell) * 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
)
)
.assign(
年终奖=lambda dataframe: dataframe[["大区/机构名称", "年度达成率"]].apply(
lambda row: mapped_as_institutional_year_end_bonus(row), axis="columns"
)
)
.dropna(subset=["年终奖"])
.sort_values(by="年度达成率", ascending=False)
.reset_index(drop=True)
)
# 根据机构所在省份匹配为机构名称
def mapped_as_institution_name(x):
# noinspection PyUnreachableCode
match x:
case "北京市" | "天津市":
z = "京津"
case "河北省":
z = "河北"
case "山西省":
z = "山西"
case "内蒙古自治区":
z = "内蒙"
case "辽宁省":
z = "辽宁"
case "吉林省" | "黑龙江省":
z = "黑吉"
case "上海市":
z = "上海"
case "江苏省":
z = "江苏"
case "浙江省":
z = "浙江"
case "安徽省":
z = "安徽"
case "福建省":
z = "福建"
case "江西省":
z = "江西"
case "山东省":
z = "山东"
case "河南省":
z = "河南"
case "湖北省":
z = "湖北"
case "湖南省":
z = "湖南"
case "广东省" | "海南省":
z = "广东"
case "广西壮族自治区":
z = "广西"
case "重庆市" | "四川省" | "西藏自治区":
z = "四川"
case "贵州省":
z = "贵州"
case "云南省":
z = "云南"
case "新疆维吾尔自治区":
z = "新疆"
case "陕西省" | "青海省":
z = "陕西"
case "甘肃省":
z = "甘肃"
case "宁夏回族自治区":
z = "宁夏"
case "总部":
z = "总部"
case _:
z = ""
return z
# 根据发卡金额映射为奖励
def mapped_as_card_issuance_reward(x: Decimal) -> int:
# noinspection PyUnreachableCode
match x:
case x if 50 <= x < 100:
z = 500
case x if 100 <= x < 300:
z = 800
case x if 300 <= x < 500:
z = 1000
case x if 500 <= x < 1000:
z = 2000
case x if 1000 <= x < 3000:
z = 4000
case x if 3000 <= x < 5000:
z = 10000
case x if 5000 <= x < 10000:
z = 20000
case x if x >= 10000:
z = 30000
case _:
z = 0
return z
# 自主经营发卡奖励
card_issuance_reward = (
dataset["25年发卡"]
.loc[~dataset["25年发卡"]["投保公司"].isin(dataset["24年发卡"]["投保公司"])]
.merge(
dataset["保单机构分配"],
how="left",
on="保单编号",
suffixes=("", "_duplication"),
) # 若有发卡无消费则无保单机构分配方案
.fillna("0")
.assign(
发卡金额=lambda dataframe: dataframe["发卡金额"].apply(
lambda cell: (Decimal(cell) / 10000).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
) # 发卡金额单位为万元
)
.assign(
奖励金额=lambda dataframe: dataframe["发卡金额"].apply(
lambda cell: mapped_as_card_issuance_reward(cell)
)
) # 先计算奖励金额在根据机构分配比例拆分
.assign(总部="总部")
.assign(
# 整合机构
机构名称=lambda dataframe: dataframe.apply(
lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns"
),
# 整合分配比例
分配比例=lambda dataframe: dataframe.apply(
lambda x: [
x["落地机构分配比例"],
x["出单机构分配比例"],
x["总部分配比例"],
],
axis="columns",
),
)
.explode(["机构名称", "分配比例"])
.assign(
分配后奖励金额=lambda dataframe: dataframe.apply(
lambda row: (
Decimal(row["奖励金额"]) * Decimal(row["分配比例"]) / 100
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
axis="columns",
),
)
.assign(
机构=lambda dataframe: dataframe["机构名称"].apply(
lambda cell: mapped_as_institution_name(cell)
)
)
.loc[lambda dataframe: dataframe["机构"] != "总部"] # 过滤总部
.sort_values(by="分配后奖励金额", ascending=False)
.reset_index(drop=True)
)[["保单编号", "投保公司", "保险总公司", "保险分公司", "发卡金额", "奖励金额", "机构", "分配比例", "分配后奖励金额"]]
# 根据转化率提升映射为奖励比例
def mapped_as_increase_reward_ratio(x: Decimal) -> Decimal:
# noinspection PyUnreachableCode
match x:
case x if 5 <= x < 10:
z = Decimal("0.1")
case x if 10 <= x < 15:
z = Decimal("0.2")
case x if 15 <= x < 20:
z = Decimal("0.3")
case x if 20 <= x < 30:
z = Decimal("0.4")
case x if x >= 30:
z = Decimal("0.5")
case _:
z = Decimal("0")
return z
# 重点存量业务消费提升奖励
increase_reward = (
dataset["投保公司"]
.loc[dataset["投保公司"]["项目名称"].isin(dataset["重点项目"]["项目名称"])]
.merge(
dataset["24年数据"], how="left", on="投保公司", suffixes=("", "_duplication")
)
.merge(
dataset["25年数据"], how="left", on="投保公司", suffixes=("", "_duplication")
)
.fillna("0")
.assign(
转化率25年=lambda dataframe: dataframe["25年转化率"].apply(
lambda row: (Decimal(row) * 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
),
转化率24年=lambda dataframe: dataframe["24年转化率"].apply(
lambda row: (Decimal(row) * 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
),
)
.assign(
转化率提升=lambda dataframe: dataframe.apply(
lambda row: (row["转化率25年"] - row["转化率24年"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
axis="columns",
)
)
.assign(
奖励比例=lambda dataframe: dataframe["转化率提升"].apply(
lambda cell: mapped_as_increase_reward_ratio(cell)
)
)
.assign(
消费金额提升=lambda dataframe: dataframe.apply(
lambda row: (
Decimal(row["25年消费金额"]) - Decimal(row["24年消费金额"])
).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP),
axis="columns",
)
)
.assign(
奖励金额=lambda dataframe: dataframe.apply(
lambda row: (row["消费金额提升"] * row["奖励比例"] / 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
axis="columns",
)
)
.sort_values(by="奖励金额", ascending=False)
.reset_index(drop=True)
)[
[
"项目名称",
"投保公司",
"25年发卡金额",
"25年消费金额",
"转化率25年",
"24年发卡金额",
"24年消费金额",
"转化率24年",
"转化率提升",
"奖励比例",
"消费金额提升",
"奖励金额",
]
]
# 根据转化率映射为奖励比例
def mapped_as_new_reward_ratio(x: Decimal) -> Decimal:
# noinspection PyUnreachableCode
match x:
case x if 30 <= x < 40:
z = Decimal("0.3")
case x if 40 <= x < 50:
z = Decimal("0.4")
case x if 50 <= x < 60:
z = Decimal("0.5")
case x if 60 <= x < 70:
z = Decimal("0.6")
case x if 70 <= x < 80:
z = Decimal("0.7")
case x if 80 <= x < 90:
z = Decimal("0.8")
case x if x >= 90:
z = Decimal("1.0")
case _:
z = Decimal("0")
return z
# 新增业务消费奖励
new_reward = (
dataset["保单机构分配"]
.loc[
(dataset["保单机构分配"]["自力更生"] == "")
& ~dataset["保单机构分配"]["投保公司"].isin(dataset["24年发卡"]["投保公司"])
]
.merge(
dataset["25年发卡"],
how="left",
on="保单编号",
suffixes=("", "_duplication"),
)
.merge(
dataset["25年消费"], how="left", on="保单编号", suffixes=("", "_duplication")
)
.fillna("0")
.assign(
发卡金额=lambda dataframe: dataframe["发卡金额"].apply(
lambda cell: (Decimal(cell) / 10000).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
), # 发卡金额单位为万元
消费金额=lambda dataframe: dataframe["消费金额"].apply(
lambda cell: (Decimal(cell) / 10000).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
), # 消费金额单位为万元
)
.assign(
转化率=lambda dataframe: dataframe.apply(
lambda row: (
(row["消费金额"] / row["发卡金额"] * 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
)
if row["发卡金额"] != 0
else Decimal("0.00")
),
axis="columns",
)
)
.assign(
奖励比例=lambda dataframe: dataframe["转化率"].apply(
lambda cell: mapped_as_new_reward_ratio(cell)
)
)
.assign(
奖励金额=lambda dataframe: dataframe.apply(
lambda row: (row["消费金额"] * row["奖励比例"] / 100 * 10000).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
axis="columns",
)
) # 先计算奖励金额在根据机构分配比例拆分
.assign(总部="总部")
.assign(
# 整合机构
机构名称=lambda dataframe: dataframe.apply(
lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns"
),
# 整合分配比例
分配比例=lambda dataframe: dataframe.apply(
lambda x: [
x["落地机构分配比例"],
x["出单机构分配比例"],
x["总部分配比例"],
],
axis="columns",
),
)
.explode(["机构名称", "分配比例"])
.assign(
分配后奖励金额=lambda dataframe: dataframe.apply(
lambda row: (row["奖励金额"] * Decimal(row["分配比例"]) / 100).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
),
axis="columns",
),
)
.assign(
机构=lambda dataframe: dataframe["机构名称"].apply(
lambda cell: mapped_as_institution_name(cell)
)
)
.loc[
lambda dataframe: (dataframe["机构"] != "总部") & (dataframe["分配比例"] != "0")
] # 过滤总部
.sort_values(by="分配后奖励金额", ascending=False)
.reset_index(drop=True)
)[
[
"保单编号",
"投保公司",
"保险总公司",
"保险分公司",
"发卡金额",
"消费金额",
"转化率",
"奖励比例",
"奖励金额",
"机构",
"分配比例",
"分配后奖励金额",
]
]
save_as_workbook(
workbook_name="机构绩效试算.xlsx",
worksheets=[
("大区年终薪酬", regional_year_end_bonus),
("机构年终薪酬", institutional_year_end_bonus),
("自主经营发卡奖励", card_issuance_reward),
("重点存量业务消费提升奖励", increase_reward),
("新增业务消费奖励", new_reward),
],
)

425
爬虫工厂/main.py Normal file
View File

@ -0,0 +1,425 @@
# -*- coding: utf-8 -*-
'''
脚本名称
监控数据工厂
脚本说明
根据监控任务初始化搜索任务根据搜索任务初始化爬虫并启动
'''
#加载模块
from urllib.parse import quote
import time
import json
from selenium.webdriver import Chrome
#from undetected_chromedriver import Chrome
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions
from selenium.webdriver.common.by import By
from concurrent.futures import ThreadPoolExecutor, as_completed
import sys
sys.path.append('..')
from utils.request import Feishu
'''
'''
#根据监控任务初始化搜索任务
def Initialize_search() -> None:
print('根据监控任务初始化搜索任务...', end = '')
#列出记录(监控任务)
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblcVxN52hZKT3z1/records'
monitor_tasks = Feishu().request(url)
#检查获取监控任务是否成功
if monitor_tasks.code != 200:
print('获取监控任务失败,本次启动终止!')
print()
return None
monitor_tasks = monitor_tasks.data.get('items')
#检查监控任务是否为空
if monitor_tasks is None:
print('无监控任务,本次启动终止!')
print()
return None
#列出记录(搜索任务)
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records'
search_tasks = Feishu().request(url)
#检查获取搜索任务是否成功
if search_tasks.code != 200:
print('获取搜索任务失败,本次启动终止!')
print()
return None
search_tasks = search_tasks.data.get('items')
#检查搜索任务是否为空
if search_tasks is None:
comparators = []
else:
#仅保留监控任务标识、搜索渠道和关键词,用于与添加、删除的搜索任务比较
comparators = [{'监控任务标识': search_task.get('fields').get('监控任务标识'), '搜索渠道': search_task.get('fields').get('搜索渠道'), '关键词': search_task.get('fields').get('关键词')} for search_task in search_tasks]
#用于保存新增的搜索任务
records = []
#定义最近搜索时间为23-12-25 00:00:00毫秒级时间戳
last_search_time = int(time.mktime(time.strptime('23-12-25', '%y-%m-%d')) * 1000)
for monitor_task in monitor_tasks:
#解析监控任务标识
record_id = monitor_task.get('record_id')
#解析搜索渠道
search_channels = monitor_task.get('fields').get('搜索渠道')
#解析关键词(可能存在多个关键词,使用“,”区隔)
keywords = monitor_task.get('fields').get('关键词').split('')
#遍历搜索渠道和关键词生成新增的搜索任务
for search_channel in search_channels:
for keyword in keywords:
fields = {
'监控任务标识': record_id,
'搜索渠道': search_channel,
'关键词': keyword
}
if fields not in comparators:
#添加最多搜索页数、搜索周期、最近搜索时间和搜索状态(默认为停用)
fields.update({'最多搜索页数': 1, '搜索周期': 168, '最近搜索时间': last_search_time, '搜索状态': '停用'})
records.append(
{
'fields': fields
}
)
#检查新增的搜索任务是否为空
if records != []:
#新增多条记录
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_create'
response = Feishu().request(url, 'post', {'records': records})
#检查新增搜索任务是否成功
if response.code != 200:
print('新增搜索任务失败,本次启动终止!')
print()
return None
#检查搜索任务是否非空
if search_tasks is not None:
#生成删除的搜索任务
records = [search_task.get('record_id') for search_task in search_tasks if search_task.get('fields').get('监控任务标识') is None]
if records != []:
#删除多条记录
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_delete'
response = Feishu().request(url, 'post', {'records': records})
#检查删除搜索任务是否成功
if response.code != 200:
print('删除搜索任务失败,本次启动终止!')
print()
return None
print('初始化成功。')
print()
#根据搜索任务启动爬虫
def Spider(search_task) -> list:
records = []
#搜索链接
search_url = search_task.get('search_url')
print('正在爬取 %s ...' % search_url, end = '')
try:
#初始化chrome
#chrome = Chrome(headless = False, use_subprocess = True)
chrome = Chrome()
chrome.get(search_url)
#搜索渠道
search_channel = search_task.get('search_channel')
match search_channel:
case '百度':
#定位元素
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"c-title t t tts-title")]/a')))
case '搜狗':
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"vr-title")]/a')))
case '360':
elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"res-title")]/a')))
#解析监控对象
monitor_object = search_task.get('monitor_object')
#解析关键词
keyword = search_task.get('keyword')
#定义搜索时间
search_time = int(time.time() * 1000)
for element in elements:
records.append(
{
'monitor_object': monitor_object,
'search_channel': search_channel,
'keyword': keyword,
'search_url': search_url,
'fields':
{
#搜索结果标题
'title': element.get_attribute('outerText'),
#搜索结果连接
'hyperlink': element.get_attribute('href')
},
'search_time': search_time
}
)
print('已完成。')
print()
except:
print('爬取失败!')
print()
finally:
try:
chrome.quit()
except:
pass
return records
#根据搜索任务初始化爬取任务
def Initialize_crawl() -> None:
print('根据搜索任务初始化爬取任务...', end = '')
#列出记录(搜索状态为「启用」的搜索任务)
url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records?filter={}'.format(quote('CurrentValue.[搜索状态]="启用"'))
search_tasks = Feishu().request(url)
#检查获取启用的搜索任务是否成功
if search_tasks.code != 200:
print('获取启用的搜索任务失败,本次启动终止!')
print()
return None
search_tasks = search_tasks.data.get('items')
#检查启用的搜索任务是否为空
if search_tasks is None:
print('无启用的搜索任务,本次启动终止!')
print()
return None
#用于记录爬取任务
crawl_tasks = []
for search_task in search_tasks:
#解析最近搜索时间
last_search_time = search_task.get('fields').get('最近搜索时间')
#解析搜索周期
search_period = int(search_task.get('fields').get('搜索周期')
#若最近搜索时间距现在的小时数超过搜索周期则生成爬取任务
if (int(time.time() * 1000) - last_search_time) / 3600000 >= search_period):
#解析搜索任务标识
record_id = search_task.get('record_id')
#解析搜索渠道
search_channel = search_task.get('fields').get('搜索渠道')
#解析关键词
keyword = search_task.get('fields').get('关键词')
#解析最多搜索页数
maximun_pages = int(search_task.get('fields').get('最多搜索页数'))
#根据最多搜索页数生成爬取任务
for page in range(maximun_pages):
#仅考虑通过链接爬取数据
match search_channel:
case '百度':
url = 'https://www.baidu.com/s?wd={}&ie=utf-8&pn={}'.format(quote(keyword), page * 10)
xpath = '//h3[contains(@class,"c-title t t tts-title")]/a'
case '搜狗':
#ie为关键词编码
url = 'https://www.sogou.com/web?query={}&page={}&ie=utf8'.format(quote(keyword), page + 1)
xpath = '//h3[contains(@class,"vr-title")]/a'
case '360':
url = 'https://www.so.com/s?q={}&pn={}'.format(quote(keyword), page + 1)
xpath = '//h3[contains(@class,"res-title")]/a'
crawl_tasks.append(
{
'monitor_task':
{
'record_id':
},
'search_task':
{
'search_channel': search_channel,
'keyword': keyword,
'search_url': search_url,
}
}
)
print('初始化成功。')
print()
print('启动爬虫... \\')
print()
#创建线程池
with ThreadPoolExecutor(max_workers = 2) as executor:
threads = []
for search_task in search_tasks:
thread = executor.submit(Spider, search_task)
#将搜索任务提交至进程池
threads.append(thread)
results = []
for thread in as_completed(threads):
result = thread.result()
#若爬虫返回非空则添加至结果
if result != []:
results.extend(result)
print(results)
print(Initialize_crawl())

556
评分卡/main.py Normal file
View File

@ -0,0 +1,556 @@
# -*- coding: utf-8 -*-
print('''脚本说明:
基于GiveMeSomeCredit数据集构建贷款申请评分卡并生成建模报告
''')
#导入包
import pandas
import numpy
from sklearn.tree import DecisionTreeClassifier
from statsmodels.stats.outliers_influence import variance_inflation_factor
from sklearn.model_selection import GridSearchCV
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_curve
from jinja2 import Environment, FileSystemLoader
import time
import warnings
#忽略警告
warnings.simplefilter('ignore')
import sys
sys.path.append('..')
from utils.mysql import MySQL
from utils.pandas2chart import Pandas2chart
#本脚本中调用函数
'''
函数说明基于决策树最优分箱并证据权重编码
参数说明
数据集格式为Pandas.DataFrame第一列为目标变量其它为特征变量
返回说明
数据格式Pandas.DataFrame
'''
def OptimalEncodeByWOE(dataset):
dataset_woe = dataset.copy()
#目标变量名
dependent = dataset_woe.columns[0]
#特征变量名
independents = dataset_woe.columns[1: ]
#字典,用于记录特征变量各箱证据权重
dictionary = pandas.DataFrame()
#遍历特征变量
for independent in independents:
print('正在就特征变量 %s 基于决策树最优分箱并证据权重编码...' % independent, end = '')
#按照特征变量是否包含缺失值划分目标变量
y = dataset_woe.loc[dataset_woe[independent].notna(), dependent]
#按照是否包含缺失值划分特征变量
x = dataset_woe.loc[dataset_woe[independent].notna(), independent]
#遍历分箱数取特征变量值数和5的最小值作为最大分箱数2作为最小分箱数
for bins in range(min(len(x.value_counts()), 5), 1, -1):
#创建决策树分类器每箱最小样本数占比为5%
decision_tree = DecisionTreeClassifier(max_leaf_nodes = bins, min_samples_leaf = 0.05, class_weight = 'balanced').fit(x.to_numpy().reshape(-1, 1), y.to_numpy())
#切点
tangencies = []
#遍历节点
for i in range(decision_tree.tree_.node_count) :
#若决策树某节点的左子节点和右子节点不同,则将该节点作为切点
if decision_tree.tree_.children_left[i] != decision_tree.tree_.children_right[i] :
tangencies.append(decision_tree.tree_.threshold[i])
tangencies.sort()
#添加边界点
tangencies = [x.min() - 0.01] + tangencies + [x.max() + 0.01]
#特征变量分箱
dataset_woe.loc[dataset_woe[independent].notna(), independent] = pandas.cut(x = x.to_numpy(), bins = tangencies, right = False)
#按照特征变量分组
woe = dataset_woe.loc[dataset_woe[independent].notna(), [dependent, independent]].groupby(by = independent)[dependent].agg(func = [('positives', lambda x : (x == 1).sum()), ('negatives', lambda x : (x == 0).sum())])
#重置索引
woe.reset_index(inplace = True)
woe.rename(columns = {independent: 'bin'}, inplace = True)
#若特征变量包含缺失值,则将缺失值单独作为一箱
if len(dataset_woe.loc[dataset_woe[independent].isna()]) > 0:
#统计特征变量包含缺失值样本中阳性样本数和阴性样本数
woe.loc[len(woe)] = {'bin': numpy.nan, 'positives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 1), independent]), 'negatives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 0), independent])}
#统计样本数
woe['samples'] = woe.apply(lambda x: x['positives'] + x['negatives'], axis = 'columns')
#统计阳性样本数占比
woe['proportion_positive'] = round(woe['positives'] / woe['positives'].sum(), 2)
#统计阴性样本数占比
woe['proportion_negative'] = round(woe['negatives'] / woe['negatives'].sum(), 2)
#统计证据权重
woe['woe'] = round(numpy.log((woe['proportion_positive'] + 0.01) / (woe['proportion_negative'] + 0.01)), 2)
#统计信息价值
woe['iv'] = round((woe['proportion_positive'] - woe['proportion_negative']) * woe['woe'], 2)
#按照分箱是否包含缺失值划分
woe_notna = woe.loc[woe['bin'].notna()].reset_index(drop = True)
#单调性检验
monotonicity = [((woe_notna.loc[i, 'woe'] <= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] <= woe_notna.loc[i, 'woe'])) | ((woe_notna.loc[i, 'woe'] >= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] >= woe_notna.loc[i, 'woe'])) for i in range(1, woe_notna.shape[0] - 1)]
#若通过单调性检验,则将特征变量证据权重编码
if False not in monotonicity:
dataset_woe[independent].replace(woe['bin'].to_numpy(), woe['woe'].to_numpy(), inplace = True)
woe['independent'] = independent
dictionary = pandas.concat([dictionary, woe])
print('已完成')
print()
break
return dataset_woe, dictionary
#若本脚本被调用报错
if __name__ != '__main__':
print('本脚本不允许被调用')
print()
exit()
print('1、连接数据库查表并保存至数据集...', end = '')
dataset = MySQL(database = 'data_analysis').query('select * from credit_dataset')
if isinstance(dataset, str):
print('连接失败,请检查数据库连接是否正常')
print()
exit()
print('已完成')
print()
#目标变量名,第一列即为目标变量
dependent = dataset.columns[0]
#检查目标变量值是否为0或1
if not ((dataset[dependent] == 0) | (dataset[dependent] == 1)).all():
print('第一列应为目标变量且值应为0或1脚本终止')
print()
exit()
#统计样本数
samples = dataset.shape[0]
#特征变量名
independents = dataset.columns[1: ]
#统计特征变量数
variables_independent = len(independents)
print('数据集样本数为 %d 份,特征变量数为 %d 个。' % (samples, variables_independent))
print()
#考虑变量数较多转置并重命名
Pandas2chart(dataset = dataset.loc[1:4, :].T.reset_index().rename(columns = {'index': '变量名', 1: '样本1', 2: '样本2', 3: '样本3', 4: '样本4'}), type = 'table', path = './reports/scorecard_report/dataset_preview.html')
print('2、预处理')
print()
print('2.1 清洗数据...', end = '')
#删除目标变量包含缺失值的样本
dataset.dropna(subset = dependent, inplace = True)
#删除重复样本(仅保留第一份)
dataset.drop_duplicates(inplace = True)
print('已完成')
print()
#统计样本数
samples = dataset.shape[0]
print('处理后,数据集样本数为 %d 份。' % samples)
print()
print('2.2 处理缺失值...', end = '')
print('在特征变量证据权重编码时,将对缺失值单独作为一箱,本节点略过')
print()
print('2.3 处理异常值...', end = '')
print('在特征变量证据权重编码时,可以消除异常值的影响,本节点略过')
print()
print('2.4 特征变量最优分箱并证据权重编码')
print()
dataset_woe, dictionary = OptimalEncodeByWOE(dataset)
Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'samples', 'woe']].rename(columns = {'bin': '分箱', 'samples': '样本数', 'woe': '证据权重'}), type = 'bar+line', path = './reports/scorecard_report/dictionary.html')
print('3、选择特征变量')
print('')
#统计报告
statistics = pandas.DataFrame(data = independents, columns = ['independent'])
print('3.1 基于信息价值选择特征变量...', end = '')
#变量特征变量
for independent in independents:
#统计特征变量信息价值
statistics.loc[statistics['independent'] == independent, 'iv'] = dictionary.loc[dictionary['independent'] == independent, 'iv'].sum()
#选择信息价值大于等于阈值的特征变量0.1为低水平预测能力0.3为中水平预测能力本次建模选择0.1作为阈值)
statistics = statistics.loc[statistics['iv'] >= 0.1]
independents = statistics['independent'].tolist()
print('已完成')
print()
#统计特征变量数
variables_independent = len(independents)
print('处理后,特征变量数为 %d 个。' % variables_independent)
print()
print('3.2 基于后向淘汰条件选择特征变量(基于归回系数和方差扩大因子)')
print()
parameters = {
#测试
'l1_ratio': [0.5],
#'l1_ratio': [0, 0.25, 0.5, 0.75, 1],
#测试
'C': [1.1]
#'C': [0.001, 0.01, 0.1, 1.1, 10.1, 100.1, 1000.1]
}
#创建带交叉验证的参数网格搜索模型
model = GridSearchCV(estimator = LogisticRegression(solver = 'saga', penalty = 'elasticnet', class_weight = 'balanced'), param_grid = parameters, scoring = 'roc_auc', refit = True)
while True:
model.fit(dataset_woe[independents].to_numpy(), dataset_woe[dependent].to_numpy())
#统计回归系数
statistics['coefficient'] = model.best_estimator_.coef_[0, :]
#统计方差扩大因子
statistics['vif'] = [variance_inflation_factor(dataset_woe[independents].assign(constant = 1).to_numpy(), i) for i in range(len(independents) + 1)][: -1]
#按照方差扩大因子降序排序
statistics.sort_values(by = 'vif', ascending = False, inplace = True)
independents = statistics['independent'].tolist()
#统计回归系数大于等于0.1且方差扩大因子小于等于10的特征变量
statistics = statistics.loc[(statistics['coefficient'] >= 0.1) & (statistics['vif'] <= 10)]
#淘汰特征变量
obsolescence = [independent for independent in independents if independent not in statistics['independent'].tolist()]
if obsolescence != []:
#淘汰最大方差扩大因子的特征变量
independents.remove(obsolescence[0])
print('特征变量 %s 满足淘汰条件,继续后进' % obsolescence[0])
print('')
else:
print('所有特征变量不满足淘汰条件,停止后进')
print('')
break
#统计特征变量数
variables_independent = len(independents)
print('处理后,特征变量数为 %d 个。' % variables_independent)
print()
#统计假阳率和真阳率
fpr, tpr, thresholds = roc_curve(y_true = dataset_woe[dependent].to_numpy(), y_score = model.predict_proba(dataset_woe[independents].to_numpy())[:, 1])
#统计洛伦兹统计量
ks = max(tpr - fpr)
print('基于选择后的特征变量构建逻辑回归模型洛伦兹统计量KS%.2f 。(~0.2不建议使用0.2~0.4模型区分能力较好0.4~0.5良好0.5~0.6很好0.6~0.75非常好0.75~ 区别能力存疑)' % ks)
print()
Pandas2chart(dataset = statistics.loc[:, ['independent', 'iv', 'vif', 'coefficient']].rename(columns = {'independent': '特征变量名', 'iv': '信息价值', 'vif': '方差扩大因子', 'coefficient': '回归系数'}), type = 'table', path = './reports/scorecard_report/statistics.html')
print('4、编制评分卡')
print('')
print('4.1 基于构建后的逻辑回归模型编制评分卡...', end = '')
dictionary = dictionary.loc[dictionary['independent'].isin(independents), ['independent', 'bin', 'woe']].reset_index(drop = True)
#评分公式为S=A+BlnOdd。若优势率为1时评分为500若优势率为2时评分减少50
#评分公式系数alpha
alpha = 500
#评分公式系数beta
beta = -50 / numpy.log(2)
#统计基础分数(先将逻辑回归模型常数项按照评分公式分数化,再按照回归系数分摊至各特征变量)
gamma = (alpha + beta * model.best_estimator_.intercept_[0]) / statistics['coefficient'].sum()
#遍历特征变量
for independent in independents:
coefficient = statistics.loc[statistics['independent'] == independent, 'coefficient'].iat[0]
#统计特征变量的加权基础分数
dictionary.loc[dictionary['independent'] == independent, 'gamma'] = gamma * coefficient
#统计特征变量的加权回归系数
dictionary.loc[dictionary['independent'] == independent, 'beta'] = beta * coefficient
#先将回归常量按照回归系数分摊至各特征变量,再统计各箱分数
dictionary.loc[dictionary['independent'] == independent, 'score'] = dictionary.loc[dictionary['independent'] == independent, ['woe', 'gamma', 'beta']].apply(lambda x: round(x['gamma'] + x['beta'] * x['woe']), axis = 'columns')
dataset_woe[independent].replace(dictionary.loc[dictionary['independent'] == independent, 'woe'].to_numpy(), dictionary.loc[dictionary['independent'] == independent, 'score'].to_numpy(), inplace = True)
#统计总分数
dataset_woe['score'] = dataset_woe[independents].apply(lambda x: x.sum(), axis = 'columns')
print('已完成')
print()
Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'woe', 'gamma', 'beta', 'score']].rename(columns = {'bin': '分箱', 'woe': '证据权重', 'gamma': '加权基础分数', 'beta': '加权回归系数', 'score': '分数'}), type = 'table', path = './reports/scorecard_report/dictionary_score.html')
#总分数等距分箱
dataset_woe['bin'] = pandas.cut(x = dataset_woe['score'].to_numpy(), bins = [0, 350, 400, 450, 500, 550, 600, 650, 1000], right = False)
#按照特征变量分组
score = dataset_woe.groupby(by = 'bin').agg(
#阳性样本数
positives = (dependent, lambda x: (x == 1).sum()),
#阴性样本数
negatives = (dependent, lambda x: (x == 0).sum()),
#样本数
samples = (dependent, lambda x: x.count())
)
#重置索引
score.reset_index(inplace = True)
#审批拒绝
score['threshold'] = score['bin'].apply(lambda x: '<{}'.format(x.right))
#统计各箱阳性样本数就各箱样本数占比
score['proportion'] = round(score['positives'] / score['samples'] * 100, 2)
#统计各箱样本数就总样本数占比
score['proportion_sample'] = score['samples'] / score['samples'].sum() * 100
#累计求和各箱样本数就总样本数占比
score['accumulation_sample'] = round(score['proportion_sample'].cumsum(), 2)
#累计求和各箱阳性样本数
score['accumulation_positives'] = score['positives'].cumsum()
#累计求和各箱样本数
score['accumulation_samples'] = score['samples'].cumsum()
#统计各箱累计求和阳性样本数就累计求和样本数占比
score['proportion_positives'] = round(score['accumulation_positives'] / score['accumulation_samples'] * 100, 2)
#统计各箱阳性样本数就总阳性样本数占比
score['proportion_positive'] = score['positives'] / score['positives'].sum() * 100
#累计求和各箱阳性样本数就总阳性样本数占比
score['accumulation_positive'] = round(score['proportion_positive'].cumsum(), 2)
#统计各箱阴样本数就总阴性样本数占比
score['proportion_negative'] = score['negatives'] / score['negatives'].sum() * 100
#累计求和各箱阴性样本数就总阴性样本数占比
score['accumulation_negative'] = round(score['proportion_negative'].cumsum(), 2)
#统计各箱柯斯统计量
score['ks'] = round(abs(score['accumulation_positive'] - score['accumulation_negative']), 2)
#统计评分卡柯斯统计量
ks = score['ks'].max()
#统计各箱提升统计量
score['lift'] = round((score['accumulation_positive'] + 0.01) / (score['accumulation_sample'] + 0.01), 2)
#统计评分卡提升统计量
lift = score['lift'].max()
print('基于构建后的逻辑回归模型编制评分卡柯斯统计量KS%.2f 提升统计量LIFT%.2f' % (ks, lift))
print()
Pandas2chart(dataset = score[['bin', 'ks', 'lift']].rename(columns = {'bin': '分箱', 'ks': '柯斯统计量', 'lift': '提升统计量'}), type = 'line+line', path = './reports/scorecard_report/model_evaluation.html')
Pandas2chart(dataset = score[['bin', 'threshold', 'proportion', 'accumulation_sample', 'proportion_positives', 'accumulation_positive']].rename(columns = {'bin': '分箱', 'threshold': '拒绝规则', 'proportion': '分箱逾期率', 'accumulation_sample': '拒绝率', 'proportion_positives': '拒绝逾期率', 'accumulation_positive': '累计逾期率'}), type = 'table', path = './reports/scorecard_report/business_evaluation.html')
print('4.2 生成评分卡规则文件并保存...', end = '')
calculate = '''def Calculate(sample):\n\n\tscore = 0\n\n'''
#遍历特征变量
for independent in independents:
calculate = calculate + '\tmatch sample["{}"]:\n\n'.format(independent)
subset = dictionary.loc[dictionary['independent'] == independent].reset_index(drop = True)
#若倒数第一个分箱为缺失值则倒数第二个分箱开放右边界、倒数第一个分箱就缺失值赋分
if subset.loc[subset.index[-1], 'bin'] is numpy.nan:
for index in subset.index:
#正数第一个分箱
if index == subset.index[0]:
calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score'])
#倒数第二个分箱
elif index == subset.index[-2]:
calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
#倒数第一个分箱
elif index == subset.index[-1]:
calculate += '\t\tcase numpy.nan: score += {}\n\n'.format(subset.loc[index, 'score'])
else:
calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
else:
for index in subset.index:
#正数第一个分箱
if index == subset.index[0]:
calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score'])
#倒数第一个分箱
elif index == subset.index[-1]:
calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
else:
calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score'])
calculate += '\treturn score'
#将评分卡规则写入本地文件
with open('../utils/scorecrad_calculate.txt', 'w') as file:
file.write(calculate)
print('已完成')
print()
print('5、生成贷款申请评分卡报告...', end = '')
#选择报告模版
template = Environment(loader = FileSystemLoader('./reports/scorecard_report/')).get_template('template.html')
#渲染
scorecard_report = template.render(
{
#报告日期
'report_date': time.strftime('%y-%m-%d', time.localtime()),
'samples': samples,
'variables_independent': variables_independent,
'ks': ks,
'lift': lift
}
)
with open('./reports/scorecard_report/scorecard_report.html', 'w', encoding = 'utf8') as file:
file.write(scorecard_report)
print('已完成')
print()