diff --git a/普康健康发票查验/main.py b/普康健康发票查验/main.py new file mode 100644 index 0000000..c8fdc6f --- /dev/null +++ b/普康健康发票查验/main.py @@ -0,0 +1,787 @@ +# -*- coding: utf-8 -*- + +""" +普康健康_发票查验 +""" + +# 加载模块 + +import hashlib +import json +import shutil +import uuid +from base64 import b64decode, b64encode +from concurrent.futures import ThreadPoolExecutor, as_completed +from pathlib import Path + +import cv2 +import numpy +import pandas + +from utils.client import Authenticator, HTTPClient, RequestException, restrict +from utils.pandas_extension import open_csv, save_as_workbook, traverse_directory + + +# 影像件压缩 +def image_compression( + image_path: str | None = None, + image_format: str | None = None, + image_data: bytes | None = None, # 数据类型为包含图像文件的二进制数据的字节串 + image_size_specified: int = 2, # 指定影像件大小 + raw: bool = False, # 250804新增返回是否为完整URI数据格式 +) -> str | None: + + try: + + # 若影像件路径数据类型为STR则创建路径对象 + if isinstance(image_path, str): + image_path = Path(image_path) + # 影像件文件名称后缀 + image_format = image_path.suffix.strip().lstrip(".").lower() + + # 读取影像件数据 + with open(image_path, "rb") as image: + image_data = image.read() + + # 影像件数据BASE64编码 + image_data_base64 = b64encode(image_data).decode("utf-8") + + # 指定影像件大小的单位由MB转为KB + image_size_specified = image_size_specified * 1024 * 1024 + + # 若影像件大小小于指定影像件大小则返回BASE64编码后影像件数据 + if len(image_data_base64) < image_size_specified: + if raw: + # 返回非完整URI数据格式 + return image_data_base64 + else: + # 返回完整URI数据格式 + return f"data:image/{image_format};base64,{image_data_base64}" + + # OPENCV解码(数据类型为NUMPY-UINT8) + image_data_cv2 = cv2.imdecode( + numpy.frombuffer(image_data, numpy.uint8), cv2.IMREAD_COLOR + ) + + # 若OPENCV解码失败则抛出异常 + if image_data_cv2 is None: + raise RuntimeError(f"OPENCV解码发生异常") + + # 初始化近似BASE64编码后影像件数据 + proximate_image_data_base64 = None + + # 初始化最小压缩前后影像件大小差值 + min_image_size_difference = float("inf") + + # 基于双层压缩方法:先外层降低图像质量,再内层缩小图像尺寸 + for quality in range(90, 0, -10): + + image_data_cv2_ = image_data_cv2.copy() + + # 根据影像件格式匹配图片质量配置 + # noinspection PyUnreachableCode + match image_format: + case "png": + encoding_params = [cv2.IMWRITE_PNG_COMPRESSION, 10 - quality // 10] + case _: + encoding_params = [cv2.IMWRITE_JPEG_QUALITY, quality] + + for i in range(25): + # 降低图像质量 + # noinspection PyTypeChecker + success, image_data_encoded = cv2.imencode( + image_format, image_data_cv2_, encoding_params + ) # 图像编码 + # 若图像编码失败则退出 + if not success: + break + image_data_base64 = b64encode(image_data_encoded.tobytes()).decode( + "utf-8" + ) + + # 压缩前后影像件大小差值 + image_size_difference = len(image_data_base64) - image_size_specified + + if image_size_difference <= 0: + if raw: + return image_data_base64 + else: + return f"data:image/{image_format};base64,{image_data_base64}" + + if image_size_difference < min_image_size_difference: + min_image_size_difference = image_size_difference + proximate_image_data_base64 = image_data_base64 + + # 影像件高度和宽度 + image_height, image_weight = image_data_cv2_.shape[:2] + + # 若仍超过影像件指定大小则调整图像尺寸 + image_data_cv2_ = cv2.resize( + image_data_cv2_, + dsize=(int(image_weight * 0.9), int(image_height * 0.9)), + interpolation=cv2.INTER_AREA, + ) + + if proximate_image_data_base64: + if raw: + return proximate_image_data_base64 + else: + return f"data:image/{image_format};base64,{image_data_base64}" + else: + raise RuntimeError("影像件压缩失败") + + except: + return None + + +# 票据查验接口(需要) +@restrict(refill_rate=5, max_tokens=5) # 限速至5QPS +def invoices_verification( + image_index, + image_path=None, + invoice_number=None, + invoice_code=None, + invoice_check_code=None, + invoice_date=None, + invoice_amount=None, + id_number=None, + process_mode=None, + supplier=None, +): + + try: + + # 若影像件地址非空则imgBASE64请求,否则根据发票五要素请求 + if image_path: + + match process_mode: + case "通过影像件本地地址": + + # 创建路径对象 + image_path = Path(image_path) + + # 影像件文件名称后缀 + image_format = image_path.suffix.strip().lstrip(".").lower() + + # 读取影像件数据 + with open(image_path, "rb") as image: + image_data = image.read() + + case "通过影像件对象服务器地址": + image_format, image_data = http_client.download(url=image_path) + + # 断定影像件格式为JGP、JPEG或者PNG + # noinspection PyUnboundLocalVariable + assert image_format in [ + "jpg", + "jpeg", + "png", + ], f"影像件格式({image_format})不支持" + + match supplier: + case "szkt": + + image_data_base64 = image_compression( + image_format=image_format, image_data=image_data + ) + + # noinspection PyUnusedLocal + response = http_client.post( + # 深圳快瞳增值税发票、医疗发票查验兼容版 + url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll", + # 用于和深圳快瞳联查时定位请求 + headers={"X-RequestId-Header": image_index}, + data={ + "token": authenticator.get_token(servicer="szkt"), + "imgBase64": image_data_base64, + }, + ) + case "bjfd": + + image_data_base64 = image_compression( + image_format=image_format, image_data=image_data, raw=True + ) # 北京分单不支持完整URI数据格式 + + # 业务入参,序列化并BASE64编码 + data = b64encode( + json.dumps( + { + "fileByte": image_data_base64, + "fileType": ( + "png" if image_format == "png" else "jpg" + ), # 北京分单影像件格式支持JPG、PNG或PDF(本脚本暂不支持PDF) + } + ).encode("utf-8") + ).decode("utf-8") + + # 应用账号 + appid = "mbYr11Rc_42" + + # 随机标识 + noise = image_index + + # 版本号 + version = "1.0" + + # 装配签名 + sign = ( + hashlib.md5( + f"appid={appid}&data={data}&noise={noise}&key=80357535c95333c3b133dfe5533f6334fe5e9321&version={version}".encode( + "utf-8" + ) + ) + .hexdigest() + .upper() + ) + + # noinspection PyUnusedLocal + response = http_client.post( + # 北京分单增值税发票、医疗票据二维码查验接口 + url="https://api.fendanyun.com/rsx/api/checkByQRCode", + headers={"Content-Type": "application/json; charset=utf-8"}, + json={ + "appid": appid, + "data": data, + "noise": noise, + "version": version, + "sign": sign, + }, + ) + + else: + + response = http_client.post( + # 深圳快瞳增值税发票、医疗发票查验兼容版 + url="https://ai.inspirvision.cn/s/api/ocr/invoiceCheckAll", + data={ + "token": authenticator.get_token(servicer="szkt"), + "invoiceNumber": invoice_number, + "invoiceCode": invoice_code, + "checkCode": invoice_check_code, + "invoicingDate": invoice_date, + "pretaxAmount": invoice_amount, + "idCardNo": id_number, + }, + ) + + except RequestException as request_exception: + response = { + "status": request_exception.status, + "code": request_exception.code, + "message": request_exception.message, + } + + except Exception as exception: + response = { + "code": "40000", + "message": f"发生其它异常{exception}", + } + + return image_index, response + + +if __name__ == "__main__": + + print("已启动批量票据查验") + + match input("请选择票据查验供应商(1:深圳快瞳,2:北京分单,其它任意字符:退出脚本):"): + case "1": + supplier = "szkt" + case "2": + supplier = "bjfd" + case _: + print("选择退出脚本!") + exit(0) + + match input( + "请选择处理流程(1:批量解析已归档响应报文,2:根据影像件地址或票据五要素批量查验,其它任意字符:退出脚本):" + ): + case "1": + # 打开前置影像件索引CSV文件 + dataframe = open_csv(file_name="dataframe_indexed.csv") + case "2": + print("正在归档响应报文...", end="") + + # 创建响应报文目录路径对象 + responses_path = Path("temporary/responses") + + # 若响应报文目录路径不存在则创建 + if not responses_path.exists(): + responses_path.mkdir(parents=True, exist_ok=True) + + # 创建归档响应报文目录路径对象 + archives_path = Path("temporary/archives") + + # 若归档响应报文目录路径不存在则创建 + if not archives_path.exists(): + archives_path.mkdir(parents=True, exist_ok=True) + + # 遍历响应报文目录下所有文件名后缀为JSON的文件路径 + for file_path in Path(responses_path).glob("*.json"): + # 若文件路径为文件 + if file_path.is_file(): + # 移动响应报文由响应报文目录至归档响应报文目录 + shutil.move(str(file_path), str(archives_path / file_path.name)) + + print("已完成") + + match input( + "请选择批量查验方法(1:通过影像件本地地址,2:通过影像件对象服务器地址,3:通过增值税发票和医疗票据的五要素,其它任意字符:退出脚本):" + ): + case "1": + print("正在读取影像件本地地址...", end="") + + dataframe = traverse_directory( + directory_path="待查验发票", suffixes=[".jpg", ".jpeg", ".png"] + ) + + # 修改列名相对路径为影像件地址 + dataframe.rename(columns={"相对路径": "影像件地址"}, inplace=True) + + process_mode = "通过影像件本地地址" + + case "2": + print("正在读取影像件对象服务器地址...", end="") + + dataframe = open_csv(file_name="dataframe.csv") + + # 断定列名包括赔案编号、发票编号和影像件地址 + assert all( + [ + column_name in dataframe.columns + for column_name in ["赔案编号", "发票编号", "影像件地址"] + ] + ), "CSV文件中列名必须包括赔案编号、发票编号和影像件地址" + + # 根据赔案编号和发票编号去重 + dataframe.drop_duplicates( + subset=["赔案编号", "发票编号"], keep="first", inplace=True + ) + + # 处理方式 + process_mode = "通过影像件对象服务器地址" + + case "3": + print("正在读取增值税发票和医疗票据的五要素...", end="") + + dataframe = open_csv(file_name="dataframe.csv") + + # 断定列名包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额 + assert all( + [ + column_name in dataframe.columns + for column_name in [ + "身份证号码后六位", + "发票编号", + "发票代码", + "校验号码后六位", + "开票日期", + "发票金额", + ] + ] + ), "CSV文件中列名必须包括身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额" + + # 根据身份证号码后六位、发票编号、发票代码、校验号码后六位、开票日期和发票金额去重 + dataframe.drop_duplicates( + subset=[ + "身份证号码后六位", + "发票编号", + "发票代码", + "校验号码后六位", + "开票日期", + "发票金额", + ], + keep="first", + inplace=True, + ) + + # 格式化开票日期 + dataframe["开票日期"] = dataframe["开票日期"].str.replace( + "-", "", regex=False + ) + + # 处理方式 + process_mode = "通过增值税发票和医疗票据的五要素" + + case _: + print("选择退出脚本!") + exit(0) + + # 统计待查验发票张数 + rows = dataframe.shape[0] + + # 若待查验发票张数为0则退出脚本 + if rows == 0: + print("待查验发票张数为0,退出脚本") + exit(0) + + print(f"已完成,待查验发票张数为 {rows}") + + # 添加索引 + dataframe["索引"] = dataframe.apply( + lambda x: uuid.uuid4().hex, axis="columns" + ) + + dataframe.to_csv("dataframe_indexed.csv", index=False) + + # 创建深圳快瞳获取访问令牌方法 + authenticator = Authenticator() + + # 初始化请求客户端 + http_client = HTTPClient() + + # 用于记录已完成任务数 + completed_futures = 0 + + # 创建线程池 + with ThreadPoolExecutor(max_workers=5) as executor: + + # noinspection PyUnreachableCode + # noinspection PyUnboundLocalVariable + match process_mode: + + case "通过影像件本地地址" | "通过影像件对象服务器地址": + + futures = [ + executor.submit( + invoices_verification, + image_index=row.索引, + image_path=row.影像件地址, + process_mode=process_mode, + supplier=supplier, + ) + for row in dataframe[["索引", "影像件地址"]].itertuples( + index=False, name="row" + ) + ] + + case "通过增值税发票和医疗票据的五要素": + + # 提交任务 + futures = [ + executor.submit( + invoices_verification, + image_index=row.索引, + invoice_number=row.发票编号, + invoice_code=row.发票代码, + invoice_check_code=row.校验号码后六位, + invoice_date=row.开票日期, + invoice_amount=row.发票金额, + id_number=row.身份证号码后六位, + process_mode=process_mode, + supplier=supplier, + ) + for row in dataframe[ + [ + "索引", + "发票编号", + "发票代码", + "校验号码后六位", + "开票日期", + "发票金额", + "身份证号码后六位", + ] + ].itertuples(index=False, name="row") + ] + + for future in as_completed(futures): + index, response = future.result() + + # 保存报文 + with open( + "temporary/responses/{}.json".format(index), + "w", + encoding="utf-8", + ) as file: + json.dump(response, file, ensure_ascii=False) + + completed_futures += 1 + + print(f"已完成 {completed_futures / rows * 100:.2f} %") + + case _: + print("选择退出脚本!") + exit(0) + + print("正在解析报文...", end="") + + # 解析后数据体 + dataframe_parsed = [] + + # 遍历报文所在目录 + for path_object in list(Path("temporary/responses").glob("*.json")): + + # 解析报文结构 + parse = { + "索引": "", + "机打发票号码": "", + "发票金额": "", + "购买方": "", + "销售方": "", + "发票状态": "", + "最大销售项目名称": "", + "最大销售项目数量": "", + "XML版式文件": "", + } + + # 若路径对象包含下划线则在解析报文结构添加赔案编号和发票编号 + if "_" in path_object.stem: + + parse["赔案编号"] = path_object.stem.split("_")[0] + + parse["发票编号"] = path_object.stem.split("_")[1] + + # 打开报文并JSON逆序列化 + with open(path_object, "r", encoding="utf-8") as file: + response = json.load(file) + + # 索引 + parse["索引"] = path_object.stem + + match supplier: + case "szkt": + try: + + # 响应状态码 + status_code = response.get("status", "") + + # 错误码 + code = response.get("code", "") + + # 流水号 + serial = response.get("serialNo", "") + + # 若响应状态码为200且错误码为10000,则定义为响应成功 + if status_code == 200 and code == 10000: + + # 查验类型,若查验类型为003081则为医疗票据查验,003082则为增值税发票查验,两者报文结构不一致 + match response.get("data").get( + "productCode" + ): # 若响应成功则必定存在键DATA和PRODUCTCODE + # 解析医疗票据查验结果 + case "003081": + + parse["机打发票号码"] = response.get("data").get( + "billNumber" + ) + + parse["校验码"] = response.get("data").get("checkCode") + + parse["发票金额"] = response.get("data").get("amount") + + parse["购买方"] = response.get("data").get("payer") + + parse["销售方"] = response.get("data").get("payeeName") + + # 发票状态 + match response.get("data").get("flushedRed"): + case "true": + parse["发票状态"] = "正常" + case "false": + parse["发票状态"] = "已红冲" + + # 最大销售项目数量 + max_item_quantity = 0 + + # 遍历销售项目列表 + for item in response.get("data").get("feeitems", []): + # 销售项目数量 + item_quantity = item.get("number") + + # 若销售项目数量非空,进一步判断是否包含斜杠,若包含斜杠则分割并取第一部分,最后转为浮点 + if item_quantity: + if "/" in item_quantity: + item_quantity = item_quantity.split("/")[0] + + item_quantity = float(item_quantity) + else: + item_quantity = 1 + + if item_quantity > max_item_quantity: + parse["最大销售项目名称"] = item.get( + "itemName", "" + ) + + parse["最大销售项目数量"] = str(item_quantity) + + parse["XML版式文件"] = response.get("PDFInfo", {}).get( + "fileUrl" + ) + + # 解析增值税发票查验结果 + case "003082": + + parse["机打发票号码"] = ( + response.get("data").get("details").get("number") + ) + + parse["校验码"] = ( + response.get("data") + .get("details") + .get("check_code") + ) + + parse["发票金额"] = ( + response.get("data").get("details").get("total") + ) + + parse["购买方"] = ( + response.get("data").get("details").get("buyer") + ) + + parse["销售方"] = ( + response.get("data").get("details").get("seller") + ) + + # 发票状态 + match response.get("data").get("details").get( + "invoiceTypeNo" + ): + case "0": + parse["发票状态"] = "正常" + case "1": + parse["发票状态"] = "无法查验" + case "2" | "3" | "7" | "8": + parse["发票状态"] = "已红冲" + + max_item_quantity = 0 + + items = ( + response.get("data").get("details").get("items", []) + ) + + for item in items: + item_quantity = ( + float(item.get("quantity", 1)) + if item.get("quantity") + else 1 + ) + + if item_quantity > max_item_quantity: + parse["最大销售项目名称"] = item.get("name") + + parse["最大销售项目数量"] = str(item_quantity) + + # XML版式文件(25-06-11本接口不在提供版式文件,通过另一接口可获取数电增值税发票版式文件) + parse["XML版式文件"] = "本接口不再提供版式文件" + + # 若响应状态码为400且错误码为10001或10100,则定义为假票 + elif status_code == 400 and (code == 10001 or code == 10100): + parse["发票状态"] = "假票" + + else: + raise Exception("解析报文发生其它异常") + + except Exception as exception: + + parse["发票状态"] = "{}".format(response.get("message")) + + case "bjfd": + try: + + # 不验签,业务出参BASE64解码并反序列化 + response = json.loads( + b64decode(response.get("data")).decode("utf-8") + ) + + # 增值税发票、医疗票据查验结果BASE64解码并反序列化 + response["message"] = json.loads( + b64decode(response.get("message")).decode("utf-8") + ) + + # 错误码 + code = response.get("result") + + # 流水号 + serial = response.get("message").get("checkId") + + # 核验结果代码 + result_code = response.get("message").get("resultCode") + + # 若错误码为S0000则定义为响应成功 + if code == "S0000": + # noinspection PyUnreachableCode + match result_code: + # 若查验成功则根据增值税发票、医疗票据状态匹配发票状态 + case "200": + + parse["机打发票号码"] = ( + response.get("message") + .get("tickMainInfo") + .get("invoiceNo") + ) + + parse["发票金额"] = ( + response.get("message") + .get("tickMainInfo") + .get("invoiceTotalPrice") + ) + + parse["购买方"] = ( + response.get("message") + .get("tickMainInfo") + .get("payerPartyName") + ) + + parse["销售方"] = ( + response.get("message") + .get("tickMainInfo") + .get("invoicingPartyName") + ) + + max_item_quantity = 0 + + for item in ( + response.get("message") + .get("tickMainInfo") + .get("chargeItems", []) + ): + item_quantity = ( + float(item.get("num", 1)) + if item.get("num") + else 1 + ) + + if item_quantity > max_item_quantity: + parse["最大销售项目名称"] = item.get( + "chargeName" + ) + parse["最大销售项目数量"] = str(item_quantity) + + match response.get("message").get("invoiceStatus"): + case "0": + parse["发票状态"] = "正常" + case ( + "1" | "2" + ): # 沿用深圳快瞳解析规则,北京分单已开红票和已作废映射为已红冲 + parse["发票状态"] = "已红冲" + + case "E20003" | "E20007 ": + parse["发票状态"] = "假票" + + # 其它情况发票状态根据核验结果描述 + case _: + parse["发票状态"] = response.get("message").get( + "resultMsg" + ) + + except Exception as exception: + parse["发票状态"] = str(exception) + + dataframe_parsed.append(parse) + + dataframe_parsed = pandas.DataFrame(data=dataframe_parsed, dtype=str) + + # 将解析数据集拼接至数据集 + dataframe = dataframe.merge(right=dataframe_parsed, how="left", on=["索引"]) + + # 填补缺失值 + dataframe = dataframe.fillna(value="") + + print("已完成") + + print("正在保存为工作簿...", end="") + + save_as_workbook(worksheets=[("Sheet1", dataframe)], workbook_name="results.xlsx") + + print("已完成") diff --git a/普康健康发票识别/main.py b/普康健康发票识别/main.py new file mode 100644 index 0000000..25bd9e3 --- /dev/null +++ b/普康健康发票识别/main.py @@ -0,0 +1,305 @@ +# -*- coding: utf-8 -*- + +""" +普康健康_发票识别 +""" + +# 加载模块 + +import json + +from pathlib import Path + +import shutil + +import pandas + +import uuid + +from concurrent.futures import ThreadPoolExecutor, as_completed + +from 普康健康发票查验.main import image_compression + +from utils.pandas_extension import traverse_directory, save_as_workbook + +from utils.client import restrict, HTTPClient, RequestException, Authenticator + + +if __name__ == "__main__": + + print("正在基于深圳快瞳的增值税发票识别接口批量识别") + + match input( + "请选择获取待识别发票影像件方式(1:遍历目录通过影像件路径,其它任意字符:退出脚本):" + ): + + case "1": + + print("正在遍历目录...", end="") + + dataframe = traverse_directory( + directory_path="待识别发票", suffixes=[".jpg", ".jpeg", ".png"] + ) + + # 修改列名相对路径为影像件地址 + dataframe.rename(columns={"相对路径": "影像件地址"}, inplace=True) + + # 添加索引 + dataframe["索引"] = dataframe.apply( + lambda x: uuid.uuid4().hex, axis="columns" + ) + + case _: + + print("选择退出脚本!") + + exit(0) + + # 统计待识别发票张数 + rows = dataframe.shape[0] + + # 若待识别发票张数为0则退出脚本 + if rows == 0: + + print("待识别发票张数为0,退出脚本") + + exit(0) + + print(f"已完成,待识别发票张数为 {rows}") + + match input( + "请选择是否就上一次的响应报文进行归档(0:不归档,1:归档,其它任意字符:退出脚本):" + ): + + # 若不归档则不请求深圳快瞳的增值税发票识别接口 + case "0": + + pass + + case "1": + + print("正在归档响应报文...", end="") + + # 创建响应报文目录路径对象 + responses_path = Path("temporary/responses") + + # 若响应报文目录路径不存在则创建 + if not responses_path.exists(): + + responses_path.mkdir(parents=True, exist_ok=True) + + # 创建归档响应报文目录路径对象 + archives_path = Path("temporary/archives") + + # 若归档响应报文目录路径不存在则创建 + if not archives_path.exists(): + + archives_path.mkdir(parents=True, exist_ok=True) + + # 遍历响应报文目录下所有文件名后缀为JSON的文件路径 + for file_path in Path(responses_path).glob("*.json"): + + # 若文件路径为文件 + if file_path.is_file(): + + # 移动响应报文由响应报文目录至归档响应报文目录 + shutil.move(str(file_path), str(archives_path / file_path.name)) + + print("已完成") + + # 创建深圳快瞳获取访问令牌方法 + authenticator = Authenticator(servicer="szkt") + + # 初始化请求客户端 + http_client = HTTPClient() + + @restrict(refill_rate=5, max_tokens=5) + def szkt_request( + image_index, + image_path=None, + ): + + try: + + # 创建影像件路径对象 + image_path = Path(image_path) + + # 影像件文件名称后缀 + image_format = image_path.suffix.strip().lower() + + # 根据影像件路径读取图像数据(二进制) + with open(image_path, "rb") as image: + + image_data = image.read() + + # 标准化影像件格式 + # noinspection PyUnboundLocalVariable + image_format = image_format if image_format == ".png" else ".jpeg" + + # noinspection PyUnboundLocalVariable + image_data_base64 = image_compression(image_format, image_data) + + response = http_client.post( + # 增值税发票识别 + url="https://ai.inspirvision.cn/s/api/ocr/vatInvoice", + # 用于和深圳快瞳联查时定位请求 + headers={"X-RequestId-Header": image_index}, + data={ + "token": authenticator.get_token(), + "imgBase64": image_data_base64, + }, + ) + + except RequestException as request_exception: + + response = { + "status": request_exception.status_code, + "message": request_exception.message, + } + + except Exception as exception: + + response = { + "status": "90000", + "message": f"发生其它异常{exception}", + } + + return image_index, response + + # 用于记录已完成任务数 + completed_futures = 0 + + # 创建线程池 + with ThreadPoolExecutor(max_workers=5) as executor: + + futures = [ + executor.submit( + szkt_request, + image_index=row.索引, + image_path=row.影像件地址, + ) + for row in dataframe[["索引", "影像件地址"]].itertuples( + index=False, name="row" + ) + ] + + for future in as_completed(futures): + + index, response = future.result() + + # 保存报文 + with open( + "temporary/responses/{}.json".format(index), + "w", + encoding="utf-8", + ) as file: + + json.dump(response, file, ensure_ascii=False) + + completed_futures += 1 + + print(f"已完成 {completed_futures / rows * 100:.2f} %") + + case _: + + print("选择退出脚本!") + + exit(0) + + print("正在解析报文...", end="") + + # 解析后数据体 + dataframe_parsed = [] + + # 遍历报文所在目录 + for path_object in list(Path("temporary/responses").glob("*.json")): + + # 解析报文结构 + parse = { + "索引": "", + "发票号码": "", + "小写金额": "", + "合计金额": "", + "合计税额": "", + "购买方名称": "", + "销售方名称": "", + } + + try: + + # 打开报文并JSON逆序列化 + with open(path_object, "r", encoding="utf-8") as file: + + response = json.load(file) + + # 索引 + parse["索引"] = path_object.stem + + # 响应状态码 + status_code = response.get("status", "") + + # 错误码 + code = response.get("code", "") + + # 流水号 + serial = response.get("serialNo", "") + + # 若响应状态码为200且错误码为0,则定义为响应成功 + if status_code == 200 and code == 0: + + for item in response.get("data", []): + + # 根据DESC匹配字段 + match item.get("desc"): + + case "发票号码": + + parse["发票号码"] = item.get("value", "") + + case "小写金额": + + parse["小写金额"] = item.get("value").replace("¥", "") + + case "合计金额": + + parse["合计金额"] = item.get("value").replace("¥", "") + + case "合计税额": + + parse["合计税额"] = item.get("value").replace("¥", "") + + case "购买方名称": + + parse["购买方名称"] = item.get("value", "") + + case "销售方名称": + + parse["销售方名称"] = item.get("value", "") + + else: + + raise Exception("解析报文发生其它异常") + + except Exception as exception: + + parse["发票号码"] = "{}".format(response.get("message")) + + finally: + + dataframe_parsed.append(parse) + + dataframe_parsed = pandas.DataFrame(data=dataframe_parsed, dtype=str) + + # 将解析数据集拼接至数据集 + dataframe = dataframe.merge(right=dataframe_parsed, how="left", on=["索引"]) + + # 填补缺失值 + dataframe = dataframe.fillna(value="") + + print("已完成") + + print("正在保存为工作簿...", end="") + + save_as_workbook(worksheets=[("Sheet1", dataframe)], workbook_name="results.xlsx") + + print("已完成") diff --git a/普康健康客服会话记录整合/main.py b/普康健康客服会话记录整合/main.py new file mode 100644 index 0000000..e235588 --- /dev/null +++ b/普康健康客服会话记录整合/main.py @@ -0,0 +1,172 @@ +# -*- coding: utf-8 -*- + +""" +普康健康_客服会话记录整合 +""" + +# 加载模块 + +import json +import re +from pathlib import Path + +import pandas +from jinja2 import Environment, FileSystemLoader + +from utils.client import Authenticator, HTTPClient + +# 创建目录地址对象 +directory_path = Path("客服会话记录") + +# 初始化数据体 +dataframe = pandas.DataFrame() + +for file_path in directory_path.glob("*.csv"): + # 读取本地CSV + dataframe = pandas.concat( + [ + dataframe, + pandas.read_csv( + filepath_or_buffer=file_path, + usecols=["用户名称", "会话开始时间", "详情"], + dtype=str, + encoding="gbk", + encoding_errors="ignore", + ), + ], + ignore_index=True, + ) + +dataframe = ( + dataframe.assign( + 会话开始时间=pandas.to_datetime(dataframe["会话开始时间"], errors="coerce"), + # 删除客服导航语、结束语和双换行符 + 详情=dataframe["详情"].apply( + lambda row: ( + row.split("\n\n", maxsplit=1)[-1] + if "您好,欢迎咨询小暖,猜您可能咨询以下问题" in row + else row + ) + .replace("对话结束 >>\n", "") + .replace("\n\n", "") + ), + ) + # 按照会话开始时间倒序排序 + .sort_values(by="会话开始时间", ascending=False) + # 按照用户名称分组并就详情以双换行符拼接 + .groupby(by="用户名称", as_index=False).agg(详情=("详情", "\n\n".join)) +) + +# 长文本建议以JSON方式保存 + +template = Environment(loader=FileSystemLoader(".")).get_template("template.html") + +# 初始化认证器 +Authenticator = Authenticator() + +# 初始化HTTP客户端 +http_client = HTTPClient() + +rows = [] + +for _, row in dataframe.iterrows(): + + # 初始化会话列表 + conversations = [] + + # 以双换行符切割文本,每部分为一个会话 + for lines in row["详情"].split("\n\n"): + + # 初始化会话 + conversation = {"messages": [], "started_at": ""} + + # 以换行符切割文本,遍历每一行 + for i, line in enumerate(lines := lines.split("\n")): + + # 正则匹配行包含发送时间 + match = re.search( + r"(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})", + sent_at := line.split(" ", maxsplit=1)[-1], + ) + + while match and not conversation["started_at"]: + # 更新会话开始时间 + conversation["started_at"] = match.group("started_at") + + if match and row["用户名称"] in (name := line.split(" ", maxsplit=1)[0]): + # 初始化客户信息体 + message = {"sender": "客户", "sender_type": "customer", "content": ""} + # 若某行匹配发送时间格式,则至下个匹配发送时间格式的中间行为发送内容 + for i_, line_ in enumerate(lines[i + 1 :]): + if ( + not ( + match_ := re.search( + r"(^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$", + line_.split(" ", maxsplit=1)[-1], + ) + ) + and line_ + ): + message["content"] += line_ + if match_: + break + conversation["messages"].append(message) + + elif match and any( + x in (name := line.split(" ", maxsplit=1)[0]) + for x in ["机器人", "kefu"] + ): + # 初始化客服信息体 + # noinspection PyUnboundLocalVariable + message = { + "sender": name, + "sender_type": "staff", + "content": "", + } + for j, line_ in enumerate(lines[i + 1 :]): + if ( + not ( + match_ := re.search( + r"(^\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})$", + line_.split(" ", maxsplit=1)[-1], + ) + ) + and line_ + ): + message["content"] += line_ + if match_: + break + conversation["messages"].append(message) + + conversations.append(conversation) + + response = http_client.post( + url="https://kms.7x24cc.com/api/v1/knowledges/", + headers={ + "Authorization": "Bearer {}".format( + Authenticator.get_token(servicer="hlyj") + ), # 获取访问令牌并装配置请求头 + "Content-Type": "application/json; charset=utf-8", + }, + json={ + "knowledge_base_id": "143bfe7f-fd79-49f2-8359-c123aba944c2", + "title": ( + customer if "_" in (customer := row["用户名称"]) else f"客户{customer}" + ), + "content": template.render( + { + "conversations": conversations, + } + ), # 根据模版生成HTML + }, + ) + + print(response) + + # 就响应中知识条目标识更新至行 + row["新增结果"] = response.get("knowledge_id", "") + + rows.append(row) + +with open("results.json", "w", encoding="utf-8") as file: + json.dump(rows, file, ensure_ascii=False) diff --git a/普康健康客服会话记录整合/template.html b/普康健康客服会话记录整合/template.html new file mode 100644 index 0000000..841db1d --- /dev/null +++ b/普康健康客服会话记录整合/template.html @@ -0,0 +1,22 @@ + + + +
+ {% for conversation in conversations %} +
+
会话开始时间:{{ conversation.started_at }}
+
+ {% for message in conversation.messages %} +
+
{{ message.sender }}
+
+ {{ message.content }} +
+
+ {% endfor %} +
+
+ {% endfor %} +
+ + \ No newline at end of file diff --git a/普康健康客服会话记录整合/演示会话.html b/普康健康客服会话记录整合/演示会话.html new file mode 100644 index 0000000..b8a22d5 --- /dev/null +++ b/普康健康客服会话记录整合/演示会话.html @@ -0,0 +1,185 @@ + + + + + + 客服对话记录 + + + +
+ +
+
会话开始时间:2024-11-13 09:14:28
+
+ +
+
客户
+
+ 订单取消 +
+
+ +
+
机器人(人保小暖)
+
+ 非常抱歉呢,这个问题小暖还没有学过呢~您可以转人工咨询~ +
+
+ +
+
客户
+
+ 转人工 +
+
+ +
+
机器人(人保小暖)
+
+ 正在为您转接人工客服 +
+
+ +
+
机器人(人保小暖)
+
+ 人工服务是一对多服务,回复可能稍慢,转接请稍等...! +
+
+ +
+
kefu04(小暖暖)
+
+ 您好 +
+
+ +
+
客户
+
+ 您好,华怡药房有个订单要取消17314220802108 +
+
+ +
+
客户
+
+ 商家的客服页面是空白,页面没有取消订单操作 +
+
+ +
+
客户
+
+ 请后台联系取消订单,或者提供可以联系上店铺的客服电话 +
+
+ +
+
kefu04(小暖暖)
+
+ 您截图 +
+
+ +
+
客户
+
+ 发送图片:https://minioss.cn-wlcb.ufileos.com/filetransfer/20241113/kf_10566/688595f6639b3b895fcdd8364817cdd51731460659.jpg ;大小:69.76 KB +
+
+ +
+
kefu04(小暖暖)
+
+ 这边咨询一下后台是否可以进行操作 +
+
+ +
+
kefu04(小暖暖)
+
+ 已经咨询,后台还未有回复 +
+
+ +
+
+ +
+ + \ No newline at end of file diff --git a/普康健康机构周报/main.py b/普康健康机构周报/main.py new file mode 100644 index 0000000..1661bc2 --- /dev/null +++ b/普康健康机构周报/main.py @@ -0,0 +1,474 @@ +# -*- coding: utf-8 -*- + +""" +普康健康_生成直付理赔周报 +""" + +# 加载模块 + +import pandas, numpy + +from utils.pandas_extension import open_csv, save_as_workbook + + +# 根据机构所在省份匹配为机构名称 +def match_institution_name(x): + + x_matched = "总部" + + match x: + + case "北京市" | "天津市": + + x_matched = "京津" + + case "河北省": + + x_matched = "河北" + + case "山西省": + + x_matched = "山西" + + case "内蒙古自治区": + + x_matched = "内蒙" + + case "辽宁省": + + x_matched = "辽宁" + + case "吉林省" | "黑龙江省": + + x_matched = "黑吉" + + case "上海市": + + x_matched = "上海" + + case "江苏省": + + x_matched = "江苏" + + case "浙江省": + + x_matched = "浙江" + + case "安徽省": + + x_matched = "安徽" + + case "福建省": + + x_matched = "福建" + + case "江西省": + + x_matched = "江西" + + case "山东省": + + x_matched = "山东" + + case "河南省": + + x_matched = "河南" + + case "湖北省": + + x_matched = "湖北" + + case "湖南省": + + x_matched = "湖南" + + case "广东省" | "海南省": + + x_matched = "广东" + + case "广西壮族自治区": + + x_matched = "广西" + + case "重庆市" | "四川省" | "西藏自治区": + + x_matched = "四川" + + case "贵州省": + + x_matched = "贵州" + + case "云南省": + + x_matched = "云南" + + case "新疆维吾尔自治区": + + x_matched = "新疆" + + case "陕西省" | "青海省": + + x_matched = "陕西" + + case "甘肃省": + + x_matched = "甘肃" + + case "宁夏回族自治区": + + x_matched = "宁夏" + + return x_matched + + +# 根据机构名称匹配为大区名称 +def match_region_name(x): + + x_matched = "总部" + + match x: + + case "内蒙" | "辽宁" | "黑吉": + + x_matched = "东北大区" + + case "京津" | "河北" | "山西": + + x_matched = "华北大区" + + case "安徽" | "山东" | "河南": + + x_matched = "华东大区" + + case "江苏" | "福建" | "广东": + + x_matched = "东南大区" + + case "江西" | "湖北" | "湖南": + + x_matched = "华中大区" + + case "新疆" | "陕西" | "甘肃" | "宁夏": + + x_matched = "西北大区" + + case "广西" | "四川" | "云南" | "贵州": + + x_matched = "西南大区" + + case "上海": + + x_matched = "上海" + + case "浙江": + + x_matched = "浙江" + + return x_matched + + +""" +统计方案: + +1、读取当年往月对账单数据,包括对账期、商家编号、保单编号和对账金额,文件名为reconciliations.csv + + 1.1 根据对账期和保单编号分组,就对账金额求和,其中对账期、对账金额之和重命名为考核周期、消费规模 + +2、读取当年当月保单扣减数据,包括扣减期、商家编号、保单编号和扣减金额,文件名为reconciliations_month.csv + + 2.1 根据扣减期和保单编号分组,就扣减金额求和,其中扣减期、扣减金额之和重命名为考核周期、消费规模 + +3、合并1.1和2.1,即当年往月和当年当月考核周期、保单编号和消费规模 + +4、读取徐丹老师提供的保单机构分配数据,包括保单编号、落地机构、落地机构分配比例、出单机构、出单机构分配比例、总部分配比例,文件名为slips.csv + + 4.1 先查询3中消费规模大于0的保单编号,再和4中保单编号比较、就不在4中的保单编号提供徐丹老师,由其补录保单机构分配方案。补录后重复4.1至无需再提供徐丹老师 -->过程表 + +5、就机构拆解保单消费规模,根据考核周期、机构分组,就消费规模求和 + + 5.1 根据机构名称匹配大区名称 + + 5.2 读取当年机构消费目标数据,包括考核周期、机构名称和消费目标,根据考核周期和机构名称匹配消费目标 + + 5.3 就算达成率(消费规模/消费目标)-->基表 + +6、透视基表,生成各机构在当年各月消费目标、消费规模和转化率,当年消费目标、消费规模和转化率,并汇总 +""" + +print("正在生成直付理赔周报...", end="") + +# 当年往月对账单数据(维度为对账期-商家编号-保单编号) +reconciliations = open_csv(file_name="reconciliations.csv") + +# 删除保单编号为空的行(若保单编号为空则对账金额必定为空,若对账金额为空则保单编号必定为空) +reconciliations.dropna(subset=["保单编号"], inplace=True) + +# 数据类型转换 +for variable_label in reconciliations.columns: + + match variable_label: + + case "对账金额": + + # 不可能出现缺失值,无需填补缺失值 + + reconciliations[variable_label] = reconciliations[variable_label].astype( + "float" + ) + +# 按照对账期和保单编号分组,就对账金额求和,重置索引,修改列名 +reconciliations = ( + reconciliations.groupby(by=["对账期", "保单编号"]) + .agg(对账金额=("对账金额", "sum")) + .reset_index() + .rename(columns={"对账期": "考核周期", "对账金额": "消费规模"}) +) + +# 当年当月保单扣减数据(维度为扣减期-商家编号-保单编号) +reconciliations_month = open_csv(file_name="reconciliations_month.csv") + +# 数据类型转换 +for variable_label in reconciliations_month.columns: + + match variable_label: + + case "扣减金额": + + # 不可能出现缺失值,无需填补缺失值 + + reconciliations_month[variable_label] = reconciliations_month[ + variable_label + ].astype("float") + +# 按照扣减期和保单编号分组,就扣减金额求和,重置索引,修改列名 +reconciliations_month = ( + reconciliations_month.groupby(by=["扣减期", "保单编号"]) + .agg(扣减金额=("扣减金额", "sum")) + .reset_index() + .rename(columns={"扣减期": "考核周期", "扣减金额": "消费规模"}) +) + +# 合并上述当年往月对账单数据和当年当月保单扣减数据 +reconciliations = pandas.concat( + objs=[reconciliations, reconciliations_month], ignore_index=True +) + +# 徐丹老师提供的保单机构分配数据 +slips = open_csv(file_name="slips.csv") + +# 数据类型转换 +for variable_label in slips.columns: + + match variable_label: + + # 不可能出现缺失值,无需填补缺失值 + + case "落地机构分配比例" | "出单机构分配比例" | "总部分配比例": + + slips[variable_label] = slips[variable_label].astype("int") + +# 过程表 +process_table = reconciliations.merge(right=slips, on="保单编号", how="left") + +# 统计消费规模大于0且出单机构分配比例为空的保单机构分配数据 +process_table.loc[ + (process_table["消费规模"] > 0) & (process_table["出单机构分配比例"].isna()), + "异常标签", +] = "无分配方案" + +if process_table.loc[process_table["异常标签"] == "无分配方案"].shape[0] > 0: + + print("存在未分配机构的保单,请提请徐丹老师补录") + print() + + save_as_workbook( + worksheets=[("异常保单", process_table)], + workbook_name="普康健康_需补录保单机构分配方案.xlsx", + ) + + exit() + +# 新增总部 +slips.insert(loc=slips.shape[1] - 1, column="总部", value="总部") + +# 先就落地机构、出单机构和总部新增机构名称列,落地机构分配比例、出单机构分配比例和总部分配比例新增分配比例列,再拆分为行 +slips = ( + slips.assign( + # 整合机构 + 机构名称=slips.apply( + lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns" + ), + # 整合分配比例 + 分配比例=slips.apply( + lambda x: [x["落地机构分配比例"], x["出单机构分配比例"], x["总部分配比例"]], + axis="columns", + ), + ) + # 拆分机构名称和分配比例并重置索引 + .explode(["机构名称", "分配比例"]).reset_index(drop=True) +) + +# 保留分配比例大于0的保单机构分配数据 +slips = slips.loc[slips["分配比例"] > 0, ["保单编号", "机构名称", "分配比例"]] + +# 根据机构所在省份匹配为机构名称 +slips["机构名称"] = slips["机构名称"].apply(lambda x: match_institution_name(x)) + +# 根据机构名称匹配为大区名称并插入至第二列 +slips.insert( + loc=slips.shape[1] - 2, + column="大区名称", + value=slips["机构名称"].apply(lambda x: match_region_name(x)), +) + +# 左拼接保单机构分配数据(分配比例不可能出现缺失值,无需填补缺失值) +process_table = process_table.merge(right=slips, on="保单编号", how="left") + +# 分配后消费规模 +process_table["分配后消费规模"] = process_table.apply( + lambda x: x["消费规模"] * x["分配比例"] / 100, axis="columns" +) + +# 按照考核周期和机构名称分组,就分配后消费规模求和 +process_table = ( + process_table.groupby(by=["考核周期", "机构名称"]) + .agg(大区名称=("大区名称", "first"), 分配后消费规模=("分配后消费规模", "sum")) + .reset_index() +) + +# 机构考核周期消费目标数据(维度为对机构名称-考核周期) +targets = open_csv(file_name="targets.csv") + +# 数据类型转换 +for variable_label in targets.columns: + + match variable_label: + + case "消费目标": + + # 消费目标不可能出现缺失值,无需填补缺失值 + + targets[variable_label] = targets[variable_label].astype("float") + +process_table = process_table.merge( + right=targets, on=["机构名称", "考核周期"], how="left" +) + +# 根据过程表透视(第一级行索引为大区名称,第二级行索引为机构名称,第一级列索引为考核周期,列索引值为分配后消费规模和消费目标,行和列汇总) +pivot_table = process_table.pivot_table( + index=["大区名称", "机构名称"], + columns="考核周期", + values=[ + "分配后消费规模", + "消费目标", + ], # 注意:若设置一个列索引和多个列索引值PANDAS将自动创建多级列索引,第一级列索引为VALUES,第二季列索引为COLUMNS + aggfunc="sum", + margins=True, + margins_name="汇总", +) + +# 添加大区汇总 +for region_name in pivot_table.index.get_level_values("大区名称").unique(): + + if region_name not in ["上海", "浙江", "总部", "汇总"]: + + # 汇总大区数据(就各机构的考核周期分配后消费规模和消费目标分别求和) + region_summary = pivot_table.loc[region_name].sum() # SERIES对象 + + region_summary = pandas.DataFrame( + data=[region_summary], # SERIES列表 + # 创建多级行索引 + index=pandas.MultiIndex.from_tuples( + tuples=[(region_name, "汇总")], names=["大区名称", "机构名称"] + ), + columns=region_summary.index, + ) + + pivot_table = pandas.concat(objs=[pivot_table, region_summary]) + +# 计算各考核周期和汇总达成率 +for period in pivot_table.columns.get_level_values("考核周期").unique(): + + pivot_table[("达成率", period)] = pivot_table.apply( + lambda x: ( + x[("分配后消费规模", period)] / x[("消费目标", period)] + if x[("消费目标", period)] != 0 + else 0 + ), + axis="columns", + ) + +# 交换列索引层级,再就列索引排序 +pivot_table = pivot_table.swaplevel(axis="columns").sort_index(axis="columns") + +# 大区名称排序 +regions_orders = [ + "东北大区", + "华北大区", + "华东大区", + "华中大区", + "东南大区", + "西北大区", + "西南大区", + "上海", + "浙江", + "总部", + "汇总", +] + +# 大区名称和排序映射器 +region_mapper = { + region_name: region_index for region_index, region_name in enumerate(regions_orders) +} + +# 根据大区名称映射排序 +regions_mapped = [ + region_mapper.get(region_name) + for region_name in pivot_table.index.get_level_values("大区名称") +] + +# 机构排序 +institutions_orders = { + "东北大区": ["汇总", "内蒙", "辽宁", "黑吉"], + "华北大区": ["汇总", "京津", "河北", "山西"], + "华东大区": ["汇总", "安徽", "山东", "河南"], + "华中大区": ["汇总", "江西", "湖北", "湖南"], + "东南大区": ["汇总", "江苏", "福建", "广东"], + "西北大区": ["汇总", "新疆", "陕西", "甘肃", "宁夏"], + "西南大区": ["汇总", "广西", "四川", "云南", "贵州"], + "上海": ["上海"], + "浙江": ["浙江"], + "总部": ["总部"], + "汇总": [""], +} + +# 机构名称和排序映射器 +institution_mapper = {} + +institution_mapper.update( + { + (region_name, institution_name): institution_index + for region_name, institution_names in institutions_orders.items() + for institution_index, institution_name in enumerate(institution_names) + } +) + +# 根据机构名称映射排序 +institutions_mapped = [ + institution_mapper.get((region, institution)) + for region, institution in zip( + pivot_table.index.get_level_values("大区名称"), + pivot_table.index.get_level_values("机构名称"), + ) +] + +# 根据大区名称映射排序和机构名称映射排序多重排序 +pivot_table = pivot_table.iloc[ + numpy.lexsort((institutions_mapped, regions_mapped)) +].reset_index() + +save_as_workbook( + worksheets=[("sheet1", pivot_table)], workbook_name="普康健康_机构周报.xlsx" +) + +print("生成成功") diff --git a/普康健康机构绩效/main.py b/普康健康机构绩效/main.py new file mode 100644 index 0000000..a968e1b --- /dev/null +++ b/普康健康机构绩效/main.py @@ -0,0 +1,576 @@ +from decimal import Decimal, ROUND_HALF_UP + +import pandas + +from utils.pandas_extension import save_as_workbook + +dataset = {} + +# 机构绩效基础数据 +sheets = pandas.ExcelFile("基础数据.xlsx") + +for sheet_name in sheets.sheet_names: + dataset[sheet_name] = pandas.read_excel(sheets, sheet_name=sheet_name, dtype=str) + + +# 根据年度达成率映射为年终奖 +def mapped_as_regional_year_end_bonus(x: Decimal) -> str: + + # noinspection PyUnreachableCode + match x: + case x if x < 70: + z = "0薪" + case x if 70 <= x < 80: + z = "4薪" + case x if 80 <= x < 90: + z = "5薪" + case x if 90 <= x < 100: + z = "6薪" + case x if 100 <= x < 110: + z = "7薪" + case x if 110 <= x < 120: + z = "8薪" + case x if 120 <= x < 130: + z = "9薪" + case x if 130 <= x < 140: + z = "10薪" + case x if 140 <= x < 150: + z = "11薪" + case x if x >= 150: + z = "12薪" + case _: + z = "年度达成率未匹配年终奖" + + return z + + +# 大区年终薪酬 +# 就大区机构周报,筛选大区/机构名称包含大区的行、年度消费目标(万)/累计消费规模(万)/年度达成率,根据年度达成率匹配年终奖基准额度 +regional_year_end_bonus = ( + dataset["大区机构达成率1"] + .loc[ + dataset["大区机构达成率1"]["大区/机构名称"].str.contains("大区"), + ["大区/机构名称", "年度消费目标(万)", "累计消费规模(万)", "年度达成率"], + ] + .assign( + 年度达成率=lambda dataframe: dataframe["年度达成率"].apply( + lambda cell: (Decimal(cell) * 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ) + ) + .assign( + 年终奖=lambda dataframe: dataframe["年度达成率"].apply( + lambda cell: mapped_as_regional_year_end_bonus(cell) + ) + ) + .sort_values(by="年度达成率", ascending=False) + .reset_index(drop=True) +) + + +def mapped_as_institutional_year_end_bonus(x: pandas.Series) -> str | None: + + if x["年度达成率"] < 70: + if ( + x["大区/机构名称"] + in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃、黑吉、辽宁、山西、新疆、四川" + ): + z = "0薪" + else: + z = None + elif 70 <= x["年度达成率"] < 80: + if ( + x["大区/机构名称"] + in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃" + ): + z = "0薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "2薪" + else: + z = None + elif 80 <= x["年度达成率"] < 90: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "0薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "2薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "3薪" + else: + z = None + elif 90 <= x["年度达成率"] < 100: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "2薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "3薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "4薪" + else: + z = None + elif 100 <= x["年度达成率"] < 120: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "3薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "4薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "5薪" + else: + z = None + elif 120 <= x["年度达成率"] < 140: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "4薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "5薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "6薪" + else: + z = None + elif 140 <= x["年度达成率"] < 160: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "5薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "6薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "7薪" + else: + z = None + elif 160 <= x["年度达成率"] < 180: + if x["大区/机构名称"] in "安徽、河南、浙江、上海、云南、湖北、广东、广西": + z = "6薪" + elif x["大区/机构名称"] in "河北、江苏、江西、甘肃": + z = "7薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "8薪" + else: + z = None + elif 180 <= x["年度达成率"] < 200: + if ( + x["大区/机构名称"] + in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃" + ): + z = "8薪" + elif x["大区/机构名称"] in "黑吉、辽宁、山西、新疆、四川": + z = "9薪" + else: + z = None + else: + if ( + x["大区/机构名称"] + in "安徽、河南、浙江、上海、云南、湖北、广东、广西、河北、江苏、江西、甘肃、黑吉、辽宁、山西、新疆、四川" + ): + z = "10薪" + else: + z = None + + return z + + +# 机构年终薪酬 +institutional_year_end_bonus = ( + dataset["大区机构达成率1"] + .loc[ + ~dataset["大区机构达成率1"]["大区/机构名称"].str.contains("大区") + & ~dataset["大区机构达成率1"]["大区/机构名称"].isin(["宁夏(只宁煤)"]), + ["大区/机构名称", "年度消费目标(万)", "累计消费规模(万)", "年度达成率"], + ] + .assign( + 年度达成率=lambda dataframe: dataframe["年度达成率"].apply( + lambda cell: (Decimal(cell) * 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + ) + ) + .assign( + 年终奖=lambda dataframe: dataframe[["大区/机构名称", "年度达成率"]].apply( + lambda row: mapped_as_institutional_year_end_bonus(row), axis="columns" + ) + ) + .dropna(subset=["年终奖"]) + .sort_values(by="年度达成率", ascending=False) + .reset_index(drop=True) +) + + +# 根据机构所在省份匹配为机构名称 +def mapped_as_institution_name(x): + + # noinspection PyUnreachableCode + match x: + case "北京市" | "天津市": + z = "京津" + case "河北省": + z = "河北" + case "山西省": + z = "山西" + case "内蒙古自治区": + z = "内蒙" + case "辽宁省": + z = "辽宁" + case "吉林省" | "黑龙江省": + z = "黑吉" + case "上海市": + z = "上海" + case "江苏省": + z = "江苏" + case "浙江省": + z = "浙江" + case "安徽省": + z = "安徽" + case "福建省": + z = "福建" + case "江西省": + z = "江西" + case "山东省": + z = "山东" + case "河南省": + z = "河南" + case "湖北省": + z = "湖北" + case "湖南省": + z = "湖南" + case "广东省" | "海南省": + z = "广东" + case "广西壮族自治区": + z = "广西" + case "重庆市" | "四川省" | "西藏自治区": + z = "四川" + case "贵州省": + z = "贵州" + case "云南省": + z = "云南" + case "新疆维吾尔自治区": + z = "新疆" + case "陕西省" | "青海省": + z = "陕西" + case "甘肃省": + z = "甘肃" + case "宁夏回族自治区": + z = "宁夏" + case "总部": + z = "总部" + case _: + z = "" + + return z + + +# 根据发卡金额映射为奖励 +def mapped_as_card_issuance_reward(x: Decimal) -> int: + + # noinspection PyUnreachableCode + match x: + case x if 50 <= x < 100: + z = 500 + case x if 100 <= x < 300: + z = 800 + case x if 300 <= x < 500: + z = 1000 + case x if 500 <= x < 1000: + z = 2000 + case x if 1000 <= x < 3000: + z = 4000 + case x if 3000 <= x < 5000: + z = 10000 + case x if 5000 <= x < 10000: + z = 20000 + case x if x >= 10000: + z = 30000 + case _: + z = 0 + + return z + + +# 自主经营发卡奖励 +card_issuance_reward = ( + dataset["25年发卡"] + .loc[~dataset["25年发卡"]["投保公司"].isin(dataset["24年发卡"]["投保公司"])] + .merge( + dataset["保单机构分配"], + how="left", + on="保单编号", + suffixes=("", "_duplication"), + ) # 若有发卡无消费则无保单机构分配方案 + .fillna("0") + .assign( + 发卡金额=lambda dataframe: dataframe["发卡金额"].apply( + lambda cell: (Decimal(cell) / 10000).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + ) # 发卡金额单位为万元 + ) + .assign( + 奖励金额=lambda dataframe: dataframe["发卡金额"].apply( + lambda cell: mapped_as_card_issuance_reward(cell) + ) + ) # 先计算奖励金额在根据机构分配比例拆分 + .assign(总部="总部") + .assign( + # 整合机构 + 机构名称=lambda dataframe: dataframe.apply( + lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns" + ), + # 整合分配比例 + 分配比例=lambda dataframe: dataframe.apply( + lambda x: [ + x["落地机构分配比例"], + x["出单机构分配比例"], + x["总部分配比例"], + ], + axis="columns", + ), + ) + .explode(["机构名称", "分配比例"]) + .assign( + 分配后奖励金额=lambda dataframe: dataframe.apply( + lambda row: ( + Decimal(row["奖励金额"]) * Decimal(row["分配比例"]) / 100 + ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + axis="columns", + ), + ) + .assign( + 机构=lambda dataframe: dataframe["机构名称"].apply( + lambda cell: mapped_as_institution_name(cell) + ) + ) + .loc[lambda dataframe: dataframe["机构"] != "总部"] # 过滤总部 + .sort_values(by="分配后奖励金额", ascending=False) + .reset_index(drop=True) +)[["保单编号", "投保公司", "保险总公司", "保险分公司", "发卡金额", "奖励金额", "机构", "分配比例", "分配后奖励金额"]] + + +# 根据转化率提升映射为奖励比例 +def mapped_as_increase_reward_ratio(x: Decimal) -> Decimal: + + # noinspection PyUnreachableCode + match x: + case x if 5 <= x < 10: + z = Decimal("0.1") + case x if 10 <= x < 15: + z = Decimal("0.2") + case x if 15 <= x < 20: + z = Decimal("0.3") + case x if 20 <= x < 30: + z = Decimal("0.4") + case x if x >= 30: + z = Decimal("0.5") + case _: + z = Decimal("0") + + return z + + +# 重点存量业务消费提升奖励 +increase_reward = ( + dataset["投保公司"] + .loc[dataset["投保公司"]["项目名称"].isin(dataset["重点项目"]["项目名称"])] + .merge( + dataset["24年数据"], how="left", on="投保公司", suffixes=("", "_duplication") + ) + .merge( + dataset["25年数据"], how="left", on="投保公司", suffixes=("", "_duplication") + ) + .fillna("0") + .assign( + 转化率25年=lambda dataframe: dataframe["25年转化率"].apply( + lambda row: (Decimal(row) * 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + ), + 转化率24年=lambda dataframe: dataframe["24年转化率"].apply( + lambda row: (Decimal(row) * 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + ), + ) + .assign( + 转化率提升=lambda dataframe: dataframe.apply( + lambda row: (row["转化率25年"] - row["转化率24年"]).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + axis="columns", + ) + ) + .assign( + 奖励比例=lambda dataframe: dataframe["转化率提升"].apply( + lambda cell: mapped_as_increase_reward_ratio(cell) + ) + ) + .assign( + 消费金额提升=lambda dataframe: dataframe.apply( + lambda row: ( + Decimal(row["25年消费金额"]) - Decimal(row["24年消费金额"]) + ).quantize(Decimal("0.00"), rounding=ROUND_HALF_UP), + axis="columns", + ) + ) + .assign( + 奖励金额=lambda dataframe: dataframe.apply( + lambda row: (row["消费金额提升"] * row["奖励比例"] / 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + axis="columns", + ) + ) + .sort_values(by="奖励金额", ascending=False) + .reset_index(drop=True) +)[ + [ + "项目名称", + "投保公司", + "25年发卡金额", + "25年消费金额", + "转化率25年", + "24年发卡金额", + "24年消费金额", + "转化率24年", + "转化率提升", + "奖励比例", + "消费金额提升", + "奖励金额", + ] +] + + +# 根据转化率映射为奖励比例 +def mapped_as_new_reward_ratio(x: Decimal) -> Decimal: + + # noinspection PyUnreachableCode + match x: + case x if 30 <= x < 40: + z = Decimal("0.3") + case x if 40 <= x < 50: + z = Decimal("0.4") + case x if 50 <= x < 60: + z = Decimal("0.5") + case x if 60 <= x < 70: + z = Decimal("0.6") + case x if 70 <= x < 80: + z = Decimal("0.7") + case x if 80 <= x < 90: + z = Decimal("0.8") + case x if x >= 90: + z = Decimal("1.0") + case _: + z = Decimal("0") + + return z + + +# 新增业务消费奖励 +new_reward = ( + dataset["保单机构分配"] + .loc[ + (dataset["保单机构分配"]["自力更生"] == "是") + & ~dataset["保单机构分配"]["投保公司"].isin(dataset["24年发卡"]["投保公司"]) + ] + .merge( + dataset["25年发卡"], + how="left", + on="保单编号", + suffixes=("", "_duplication"), + ) + .merge( + dataset["25年消费"], how="left", on="保单编号", suffixes=("", "_duplication") + ) + .fillna("0") + .assign( + 发卡金额=lambda dataframe: dataframe["发卡金额"].apply( + lambda cell: (Decimal(cell) / 10000).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + ), # 发卡金额单位为万元 + 消费金额=lambda dataframe: dataframe["消费金额"].apply( + lambda cell: (Decimal(cell) / 10000).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + ), # 消费金额单位为万元 + ) + .assign( + 转化率=lambda dataframe: dataframe.apply( + lambda row: ( + (row["消费金额"] / row["发卡金额"] * 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ) + if row["发卡金额"] != 0 + else Decimal("0.00") + ), + axis="columns", + ) + ) + .assign( + 奖励比例=lambda dataframe: dataframe["转化率"].apply( + lambda cell: mapped_as_new_reward_ratio(cell) + ) + ) + .assign( + 奖励金额=lambda dataframe: dataframe.apply( + lambda row: (row["消费金额"] * row["奖励比例"] / 100 * 10000).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + axis="columns", + ) + ) # 先计算奖励金额在根据机构分配比例拆分 + .assign(总部="总部") + .assign( + # 整合机构 + 机构名称=lambda dataframe: dataframe.apply( + lambda x: [x["落地机构"], x["出单机构"], x["总部"]], axis="columns" + ), + # 整合分配比例 + 分配比例=lambda dataframe: dataframe.apply( + lambda x: [ + x["落地机构分配比例"], + x["出单机构分配比例"], + x["总部分配比例"], + ], + axis="columns", + ), + ) + .explode(["机构名称", "分配比例"]) + .assign( + 分配后奖励金额=lambda dataframe: dataframe.apply( + lambda row: (row["奖励金额"] * Decimal(row["分配比例"]) / 100).quantize( + Decimal("0.00"), rounding=ROUND_HALF_UP + ), + axis="columns", + ), + ) + .assign( + 机构=lambda dataframe: dataframe["机构名称"].apply( + lambda cell: mapped_as_institution_name(cell) + ) + ) + .loc[ + lambda dataframe: (dataframe["机构"] != "总部") & (dataframe["分配比例"] != "0") + ] # 过滤总部 + .sort_values(by="分配后奖励金额", ascending=False) + .reset_index(drop=True) +)[ + [ + "保单编号", + "投保公司", + "保险总公司", + "保险分公司", + "发卡金额", + "消费金额", + "转化率", + "奖励比例", + "奖励金额", + "机构", + "分配比例", + "分配后奖励金额", + ] +] + +save_as_workbook( + workbook_name="机构绩效试算.xlsx", + worksheets=[ + ("大区年终薪酬", regional_year_end_bonus), + ("机构年终薪酬", institutional_year_end_bonus), + ("自主经营发卡奖励", card_issuance_reward), + ("重点存量业务消费提升奖励", increase_reward), + ("新增业务消费奖励", new_reward), + ], +) diff --git a/爬虫工厂/main.py b/爬虫工厂/main.py new file mode 100644 index 0000000..87f0829 --- /dev/null +++ b/爬虫工厂/main.py @@ -0,0 +1,425 @@ +# -*- coding: utf-8 -*- + +''' + +脚本名称: + +监控数据工厂 + +脚本说明: + +根据监控任务初始化搜索任务,根据搜索任务初始化爬虫并启动 + +''' + +#加载模块 + +from urllib.parse import quote + +import time + +import json + +from selenium.webdriver import Chrome + +#from undetected_chromedriver import Chrome + +from selenium.webdriver.support.wait import WebDriverWait + +from selenium.webdriver.support import expected_conditions + +from selenium.webdriver.common.by import By + +from concurrent.futures import ThreadPoolExecutor, as_completed + +import sys + +sys.path.append('..') + +from utils.request import Feishu + +''' + + + +''' + +#根据监控任务初始化搜索任务 +def Initialize_search() -> None: + + print('根据监控任务初始化搜索任务...', end = '') + + #列出记录(监控任务) + url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblcVxN52hZKT3z1/records' + + monitor_tasks = Feishu().request(url) + + #检查获取监控任务是否成功 + if monitor_tasks.code != 200: + + print('获取监控任务失败,本次启动终止!') + print() + + return None + + monitor_tasks = monitor_tasks.data.get('items') + + #检查监控任务是否为空 + if monitor_tasks is None: + + print('无监控任务,本次启动终止!') + print() + + return None + + #列出记录(搜索任务) + url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records' + + search_tasks = Feishu().request(url) + + #检查获取搜索任务是否成功 + if search_tasks.code != 200: + + print('获取搜索任务失败,本次启动终止!') + print() + + return None + + search_tasks = search_tasks.data.get('items') + + #检查搜索任务是否为空 + if search_tasks is None: + + comparators = [] + + else: + + #仅保留监控任务标识、搜索渠道和关键词,用于与添加、删除的搜索任务比较 + comparators = [{'监控任务标识': search_task.get('fields').get('监控任务标识'), '搜索渠道': search_task.get('fields').get('搜索渠道'), '关键词': search_task.get('fields').get('关键词')} for search_task in search_tasks] + + #用于保存新增的搜索任务 + records = [] + + #定义最近搜索时间为23-12-25 00:00:00(毫秒级时间戳) + last_search_time = int(time.mktime(time.strptime('23-12-25', '%y-%m-%d')) * 1000) + + for monitor_task in monitor_tasks: + + #解析监控任务标识 + record_id = monitor_task.get('record_id') + + #解析搜索渠道 + search_channels = monitor_task.get('fields').get('搜索渠道') + + #解析关键词(可能存在多个关键词,使用“,”区隔) + keywords = monitor_task.get('fields').get('关键词').split(',') + + #遍历搜索渠道和关键词生成新增的搜索任务 + for search_channel in search_channels: + + for keyword in keywords: + + fields = { + + '监控任务标识': record_id, + + '搜索渠道': search_channel, + + '关键词': keyword + + } + + if fields not in comparators: + + #添加最多搜索页数、搜索周期、最近搜索时间和搜索状态(默认为停用) + fields.update({'最多搜索页数': 1, '搜索周期': 168, '最近搜索时间': last_search_time, '搜索状态': '停用'}) + + records.append( + + { + + 'fields': fields + + } + + ) + + #检查新增的搜索任务是否为空 + if records != []: + + #新增多条记录 + url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_create' + + response = Feishu().request(url, 'post', {'records': records}) + + #检查新增搜索任务是否成功 + if response.code != 200: + + print('新增搜索任务失败,本次启动终止!') + print() + + return None + + #检查搜索任务是否非空 + if search_tasks is not None: + + #生成删除的搜索任务 + records = [search_task.get('record_id') for search_task in search_tasks if search_task.get('fields').get('监控任务标识') is None] + + if records != []: + + #删除多条记录 + url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records/batch_delete' + + response = Feishu().request(url, 'post', {'records': records}) + + #检查删除搜索任务是否成功 + if response.code != 200: + + print('删除搜索任务失败,本次启动终止!') + print() + + return None + + print('初始化成功。') + print() + +#根据搜索任务启动爬虫 +def Spider(search_task) -> list: + + records = [] + + #搜索链接 + search_url = search_task.get('search_url') + + print('正在爬取 %s ...' % search_url, end = '') + + try: + + #初始化chrome + #chrome = Chrome(headless = False, use_subprocess = True) + + chrome = Chrome() + + chrome.get(search_url) + + #搜索渠道 + search_channel = search_task.get('search_channel') + + match search_channel: + + case '百度': + + #定位元素 + elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"c-title t t tts-title")]/a'))) + + case '搜狗': + + elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"vr-title")]/a'))) + + case '360': + + elements = WebDriverWait(chrome, 30).until(expected_conditions.presence_of_all_elements_located((By.XPATH, '//h3[contains(@class,"res-title")]/a'))) + + #解析监控对象 + monitor_object = search_task.get('monitor_object') + + #解析关键词 + keyword = search_task.get('keyword') + + #定义搜索时间 + search_time = int(time.time() * 1000) + + for element in elements: + + records.append( + + { + + 'monitor_object': monitor_object, + + 'search_channel': search_channel, + + 'keyword': keyword, + + 'search_url': search_url, + + 'fields': + + { + + #搜索结果标题 + 'title': element.get_attribute('outerText'), + + #搜索结果连接 + 'hyperlink': element.get_attribute('href') + + }, + + 'search_time': search_time + + } + + ) + + print('已完成。') + print() + + except: + + print('爬取失败!') + print() + + finally: + + try: + + chrome.quit() + + except: + + pass + + return records + +#根据搜索任务初始化爬取任务 +def Initialize_crawl() -> None: + + print('根据搜索任务初始化爬取任务...', end = '') + + #列出记录(搜索状态为「启用」的搜索任务) + url = 'https://open.feishu.cn/open-apis/bitable/v1/apps/KOztbJC63aLF4qsaZK2cuGdXnob/tables/tblSsnhXhA24PIsy/records?filter={}'.format(quote('CurrentValue.[搜索状态]="启用"')) + + search_tasks = Feishu().request(url) + + #检查获取启用的搜索任务是否成功 + if search_tasks.code != 200: + + print('获取启用的搜索任务失败,本次启动终止!') + print() + + return None + + search_tasks = search_tasks.data.get('items') + + #检查启用的搜索任务是否为空 + if search_tasks is None: + + print('无启用的搜索任务,本次启动终止!') + print() + + return None + + #用于记录爬取任务 + crawl_tasks = [] + + for search_task in search_tasks: + + #解析最近搜索时间 + last_search_time = search_task.get('fields').get('最近搜索时间') + + #解析搜索周期 + search_period = int(search_task.get('fields').get('搜索周期') + + #若最近搜索时间距现在的小时数超过搜索周期则生成爬取任务 + if (int(time.time() * 1000) - last_search_time) / 3600000 >= search_period): + + #解析搜索任务标识 + record_id = search_task.get('record_id') + + #解析搜索渠道 + search_channel = search_task.get('fields').get('搜索渠道') + + #解析关键词 + keyword = search_task.get('fields').get('关键词') + + #解析最多搜索页数 + maximun_pages = int(search_task.get('fields').get('最多搜索页数')) + + #根据最多搜索页数生成爬取任务 + for page in range(maximun_pages): + + #仅考虑通过链接爬取数据 + match search_channel: + + case '百度': + + url = 'https://www.baidu.com/s?wd={}&ie=utf-8&pn={}'.format(quote(keyword), page * 10) + + xpath = '//h3[contains(@class,"c-title t t tts-title")]/a' + + case '搜狗': + + #ie为关键词编码 + url = 'https://www.sogou.com/web?query={}&page={}&ie=utf8'.format(quote(keyword), page + 1) + + xpath = '//h3[contains(@class,"vr-title")]/a' + + case '360': + + url = 'https://www.so.com/s?q={}&pn={}'.format(quote(keyword), page + 1) + + xpath = '//h3[contains(@class,"res-title")]/a' + + crawl_tasks.append( + + { + + 'monitor_task': + + { + + 'record_id': + + }, + + 'search_task': + + { + + 'search_channel': search_channel, + + 'keyword': keyword, + + 'search_url': search_url, + + } + + } + + ) + + print('初始化成功。') + print() + + print('启动爬虫... \\') + print() + + #创建线程池 + with ThreadPoolExecutor(max_workers = 2) as executor: + + threads = [] + + for search_task in search_tasks: + + thread = executor.submit(Spider, search_task) + + #将搜索任务提交至进程池 + threads.append(thread) + + results = [] + + for thread in as_completed(threads): + + result = thread.result() + + #若爬虫返回非空则添加至结果 + if result != []: + + results.extend(result) + + print(results) + + +print(Initialize_crawl()) + diff --git a/评分卡/main.py b/评分卡/main.py new file mode 100644 index 0000000..b4de003 --- /dev/null +++ b/评分卡/main.py @@ -0,0 +1,556 @@ +# -*- coding: utf-8 -*- + +print('''脚本说明: + +基于GiveMeSomeCredit数据集,构建贷款申请评分卡并生成建模报告 +''') + +#导入包 + +import pandas + +import numpy + +from sklearn.tree import DecisionTreeClassifier + +from statsmodels.stats.outliers_influence import variance_inflation_factor + +from sklearn.model_selection import GridSearchCV + +from sklearn.linear_model import LogisticRegression + +from sklearn.metrics import roc_curve + +from jinja2 import Environment, FileSystemLoader + +import time + +import warnings + +#忽略警告 +warnings.simplefilter('ignore') + +import sys + +sys.path.append('..') + +from utils.mysql import MySQL + +from utils.pandas2chart import Pandas2chart + +#本脚本中调用函数 + +''' + +函数说明:基于决策树最优分箱并证据权重编码 + +参数说明: + + 数据集,格式为Pandas.DataFrame,第一列为目标变量,其它为特征变量 + +返回说明: + + 数据格式:Pandas.DataFrame + +''' + +def OptimalEncodeByWOE(dataset): + + dataset_woe = dataset.copy() + + #目标变量名 + dependent = dataset_woe.columns[0] + + #特征变量名 + independents = dataset_woe.columns[1: ] + + #字典,用于记录特征变量各箱证据权重 + dictionary = pandas.DataFrame() + + #遍历特征变量 + for independent in independents: + + print('正在就特征变量 %s 基于决策树最优分箱并证据权重编码...' % independent, end = '') + + #按照特征变量是否包含缺失值划分目标变量 + y = dataset_woe.loc[dataset_woe[independent].notna(), dependent] + + #按照是否包含缺失值划分特征变量 + x = dataset_woe.loc[dataset_woe[independent].notna(), independent] + + #遍历分箱数,取特征变量值数和5的最小值作为最大分箱数,2作为最小分箱数 + for bins in range(min(len(x.value_counts()), 5), 1, -1): + + #创建决策树分类器,每箱最小样本数占比为5% + decision_tree = DecisionTreeClassifier(max_leaf_nodes = bins, min_samples_leaf = 0.05, class_weight = 'balanced').fit(x.to_numpy().reshape(-1, 1), y.to_numpy()) + + #切点 + tangencies = [] + + #遍历节点 + for i in range(decision_tree.tree_.node_count) : + + #若决策树某节点的左子节点和右子节点不同,则将该节点作为切点 + if decision_tree.tree_.children_left[i] != decision_tree.tree_.children_right[i] : + + tangencies.append(decision_tree.tree_.threshold[i]) + + tangencies.sort() + + #添加边界点 + tangencies = [x.min() - 0.01] + tangencies + [x.max() + 0.01] + + #特征变量分箱 + dataset_woe.loc[dataset_woe[independent].notna(), independent] = pandas.cut(x = x.to_numpy(), bins = tangencies, right = False) + + #按照特征变量分组 + woe = dataset_woe.loc[dataset_woe[independent].notna(), [dependent, independent]].groupby(by = independent)[dependent].agg(func = [('positives', lambda x : (x == 1).sum()), ('negatives', lambda x : (x == 0).sum())]) + + #重置索引 + woe.reset_index(inplace = True) + + woe.rename(columns = {independent: 'bin'}, inplace = True) + + #若特征变量包含缺失值,则将缺失值单独作为一箱 + if len(dataset_woe.loc[dataset_woe[independent].isna()]) > 0: + + #统计特征变量包含缺失值样本中阳性样本数和阴性样本数 + woe.loc[len(woe)] = {'bin': numpy.nan, 'positives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 1), independent]), 'negatives': len(dataset_woe.loc[dataset_woe[independent].isna() & (dataset_woe[dependent] == 0), independent])} + + #统计样本数 + woe['samples'] = woe.apply(lambda x: x['positives'] + x['negatives'], axis = 'columns') + + #统计阳性样本数占比 + woe['proportion_positive'] = round(woe['positives'] / woe['positives'].sum(), 2) + + #统计阴性样本数占比 + woe['proportion_negative'] = round(woe['negatives'] / woe['negatives'].sum(), 2) + + #统计证据权重 + woe['woe'] = round(numpy.log((woe['proportion_positive'] + 0.01) / (woe['proportion_negative'] + 0.01)), 2) + + #统计信息价值 + woe['iv'] = round((woe['proportion_positive'] - woe['proportion_negative']) * woe['woe'], 2) + + #按照分箱是否包含缺失值划分 + woe_notna = woe.loc[woe['bin'].notna()].reset_index(drop = True) + + #单调性检验 + monotonicity = [((woe_notna.loc[i, 'woe'] <= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] <= woe_notna.loc[i, 'woe'])) | ((woe_notna.loc[i, 'woe'] >= woe_notna.loc[i + 1, 'woe']) & (woe_notna.loc[i - 1, 'woe'] >= woe_notna.loc[i, 'woe'])) for i in range(1, woe_notna.shape[0] - 1)] + + #若通过单调性检验,则将特征变量证据权重编码 + if False not in monotonicity: + + dataset_woe[independent].replace(woe['bin'].to_numpy(), woe['woe'].to_numpy(), inplace = True) + + woe['independent'] = independent + + dictionary = pandas.concat([dictionary, woe]) + + print('已完成') + print() + + break + + return dataset_woe, dictionary + +#若本脚本被调用报错 +if __name__ != '__main__': + + print('本脚本不允许被调用') + print() + + exit() + +print('1、连接数据库查表并保存至数据集...', end = '') + +dataset = MySQL(database = 'data_analysis').query('select * from credit_dataset') + +if isinstance(dataset, str): + + print('连接失败,请检查数据库连接是否正常') + print() + + exit() + +print('已完成') +print() + +#目标变量名,第一列即为目标变量 +dependent = dataset.columns[0] + +#检查目标变量值是否为0或1 +if not ((dataset[dependent] == 0) | (dataset[dependent] == 1)).all(): + + print('第一列应为目标变量且值应为0或1,脚本终止!') + print() + + exit() + +#统计样本数 +samples = dataset.shape[0] + +#特征变量名 +independents = dataset.columns[1: ] + +#统计特征变量数 +variables_independent = len(independents) + +print('数据集样本数为 %d 份,特征变量数为 %d 个。' % (samples, variables_independent)) +print() + +#考虑变量数较多转置并重命名 +Pandas2chart(dataset = dataset.loc[1:4, :].T.reset_index().rename(columns = {'index': '变量名', 1: '样本1', 2: '样本2', 3: '样本3', 4: '样本4'}), type = 'table', path = './reports/scorecard_report/dataset_preview.html') + +print('2、预处理') +print() + +print('2.1 清洗数据...', end = '') + +#删除目标变量包含缺失值的样本 +dataset.dropna(subset = dependent, inplace = True) + +#删除重复样本(仅保留第一份) +dataset.drop_duplicates(inplace = True) + +print('已完成') +print() + +#统计样本数 +samples = dataset.shape[0] + +print('处理后,数据集样本数为 %d 份。' % samples) +print() + +print('2.2 处理缺失值...', end = '') + +print('在特征变量证据权重编码时,将对缺失值单独作为一箱,本节点略过') +print() + +print('2.3 处理异常值...', end = '') + +print('在特征变量证据权重编码时,可以消除异常值的影响,本节点略过') +print() + +print('2.4 特征变量最优分箱并证据权重编码') +print() + +dataset_woe, dictionary = OptimalEncodeByWOE(dataset) + +Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'samples', 'woe']].rename(columns = {'bin': '分箱', 'samples': '样本数', 'woe': '证据权重'}), type = 'bar+line', path = './reports/scorecard_report/dictionary.html') + +print('3、选择特征变量') +print('') + +#统计报告 +statistics = pandas.DataFrame(data = independents, columns = ['independent']) + +print('3.1 基于信息价值选择特征变量...', end = '') + +#变量特征变量 +for independent in independents: + + #统计特征变量信息价值 + statistics.loc[statistics['independent'] == independent, 'iv'] = dictionary.loc[dictionary['independent'] == independent, 'iv'].sum() + +#选择信息价值大于等于阈值的特征变量(0.1为低水平预测能力,0.3为中水平预测能力,本次建模选择0.1作为阈值) +statistics = statistics.loc[statistics['iv'] >= 0.1] + +independents = statistics['independent'].tolist() + +print('已完成') +print() + +#统计特征变量数 +variables_independent = len(independents) + +print('处理后,特征变量数为 %d 个。' % variables_independent) +print() + +print('3.2 基于后向淘汰条件选择特征变量(基于归回系数和方差扩大因子)') +print() + +parameters = { + + #测试 + 'l1_ratio': [0.5], + + #'l1_ratio': [0, 0.25, 0.5, 0.75, 1], + + #测试 + 'C': [1.1] + + #'C': [0.001, 0.01, 0.1, 1.1, 10.1, 100.1, 1000.1] +} + +#创建带交叉验证的参数网格搜索模型 +model = GridSearchCV(estimator = LogisticRegression(solver = 'saga', penalty = 'elasticnet', class_weight = 'balanced'), param_grid = parameters, scoring = 'roc_auc', refit = True) + +while True: + + model.fit(dataset_woe[independents].to_numpy(), dataset_woe[dependent].to_numpy()) + + #统计回归系数 + statistics['coefficient'] = model.best_estimator_.coef_[0, :] + + #统计方差扩大因子 + statistics['vif'] = [variance_inflation_factor(dataset_woe[independents].assign(constant = 1).to_numpy(), i) for i in range(len(independents) + 1)][: -1] + + #按照方差扩大因子降序排序 + statistics.sort_values(by = 'vif', ascending = False, inplace = True) + + independents = statistics['independent'].tolist() + + #统计回归系数大于等于0.1且方差扩大因子小于等于10的特征变量 + statistics = statistics.loc[(statistics['coefficient'] >= 0.1) & (statistics['vif'] <= 10)] + + #淘汰特征变量 + obsolescence = [independent for independent in independents if independent not in statistics['independent'].tolist()] + + if obsolescence != []: + + #淘汰最大方差扩大因子的特征变量 + independents.remove(obsolescence[0]) + + print('特征变量 %s 满足淘汰条件,继续后进' % obsolescence[0]) + print('') + + else: + + print('所有特征变量不满足淘汰条件,停止后进') + print('') + + break + +#统计特征变量数 +variables_independent = len(independents) + +print('处理后,特征变量数为 %d 个。' % variables_independent) +print() + +#统计假阳率和真阳率 +fpr, tpr, thresholds = roc_curve(y_true = dataset_woe[dependent].to_numpy(), y_score = model.predict_proba(dataset_woe[independents].to_numpy())[:, 1]) + +#统计洛伦兹统计量 +ks = max(tpr - fpr) + +print('基于选择后的特征变量构建逻辑回归模型,洛伦兹统计量(KS)为 %.2f 。(~0.2不建议使用,0.2~0.4模型区分能力较好,0.4~0.5良好,0.5~0.6很好,0.6~0.75非常好,0.75~ 区别能力存疑)' % ks) +print() + +Pandas2chart(dataset = statistics.loc[:, ['independent', 'iv', 'vif', 'coefficient']].rename(columns = {'independent': '特征变量名', 'iv': '信息价值', 'vif': '方差扩大因子', 'coefficient': '回归系数'}), type = 'table', path = './reports/scorecard_report/statistics.html') + +print('4、编制评分卡') +print('') + +print('4.1 基于构建后的逻辑回归模型编制评分卡...', end = '') + +dictionary = dictionary.loc[dictionary['independent'].isin(independents), ['independent', 'bin', 'woe']].reset_index(drop = True) + +#评分公式为S=A+BlnOdd。若优势率为1时,评分为500;若优势率为2时,评分减少50 + +#评分公式系数alpha +alpha = 500 + +#评分公式系数beta +beta = -50 / numpy.log(2) + +#统计基础分数(先将逻辑回归模型常数项按照评分公式分数化,再按照回归系数分摊至各特征变量) +gamma = (alpha + beta * model.best_estimator_.intercept_[0]) / statistics['coefficient'].sum() + +#遍历特征变量 +for independent in independents: + + coefficient = statistics.loc[statistics['independent'] == independent, 'coefficient'].iat[0] + + #统计特征变量的加权基础分数 + dictionary.loc[dictionary['independent'] == independent, 'gamma'] = gamma * coefficient + + #统计特征变量的加权回归系数 + dictionary.loc[dictionary['independent'] == independent, 'beta'] = beta * coefficient + + #先将回归常量按照回归系数分摊至各特征变量,再统计各箱分数 + dictionary.loc[dictionary['independent'] == independent, 'score'] = dictionary.loc[dictionary['independent'] == independent, ['woe', 'gamma', 'beta']].apply(lambda x: round(x['gamma'] + x['beta'] * x['woe']), axis = 'columns') + + dataset_woe[independent].replace(dictionary.loc[dictionary['independent'] == independent, 'woe'].to_numpy(), dictionary.loc[dictionary['independent'] == independent, 'score'].to_numpy(), inplace = True) + +#统计总分数 +dataset_woe['score'] = dataset_woe[independents].apply(lambda x: x.sum(), axis = 'columns') + +print('已完成') +print() + +Pandas2chart(dataset = dictionary.loc[dictionary['independent'] == 'Age', ['bin', 'woe', 'gamma', 'beta', 'score']].rename(columns = {'bin': '分箱', 'woe': '证据权重', 'gamma': '加权基础分数', 'beta': '加权回归系数', 'score': '分数'}), type = 'table', path = './reports/scorecard_report/dictionary_score.html') + +#总分数等距分箱 +dataset_woe['bin'] = pandas.cut(x = dataset_woe['score'].to_numpy(), bins = [0, 350, 400, 450, 500, 550, 600, 650, 1000], right = False) + +#按照特征变量分组 +score = dataset_woe.groupby(by = 'bin').agg( + + #阳性样本数 + positives = (dependent, lambda x: (x == 1).sum()), + + #阴性样本数 + negatives = (dependent, lambda x: (x == 0).sum()), + + #样本数 + samples = (dependent, lambda x: x.count()) + +) + +#重置索引 +score.reset_index(inplace = True) + +#审批拒绝 +score['threshold'] = score['bin'].apply(lambda x: '<{}'.format(x.right)) + +#统计各箱阳性样本数就各箱样本数占比 +score['proportion'] = round(score['positives'] / score['samples'] * 100, 2) + +#统计各箱样本数就总样本数占比 +score['proportion_sample'] = score['samples'] / score['samples'].sum() * 100 + +#累计求和各箱样本数就总样本数占比 +score['accumulation_sample'] = round(score['proportion_sample'].cumsum(), 2) + +#累计求和各箱阳性样本数 +score['accumulation_positives'] = score['positives'].cumsum() + +#累计求和各箱样本数 +score['accumulation_samples'] = score['samples'].cumsum() + +#统计各箱累计求和阳性样本数就累计求和样本数占比 +score['proportion_positives'] = round(score['accumulation_positives'] / score['accumulation_samples'] * 100, 2) + +#统计各箱阳性样本数就总阳性样本数占比 +score['proportion_positive'] = score['positives'] / score['positives'].sum() * 100 + +#累计求和各箱阳性样本数就总阳性样本数占比 +score['accumulation_positive'] = round(score['proportion_positive'].cumsum(), 2) + +#统计各箱阴样本数就总阴性样本数占比 +score['proportion_negative'] = score['negatives'] / score['negatives'].sum() * 100 + +#累计求和各箱阴性样本数就总阴性样本数占比 +score['accumulation_negative'] = round(score['proportion_negative'].cumsum(), 2) + +#统计各箱柯斯统计量 +score['ks'] = round(abs(score['accumulation_positive'] - score['accumulation_negative']), 2) + +#统计评分卡柯斯统计量 +ks = score['ks'].max() + +#统计各箱提升统计量 +score['lift'] = round((score['accumulation_positive'] + 0.01) / (score['accumulation_sample'] + 0.01), 2) + +#统计评分卡提升统计量 +lift = score['lift'].max() + +print('基于构建后的逻辑回归模型编制评分卡,柯斯统计量(KS)为 %.2f ,提升统计量(LIFT)为 %.2f 。' % (ks, lift)) +print() + +Pandas2chart(dataset = score[['bin', 'ks', 'lift']].rename(columns = {'bin': '分箱', 'ks': '柯斯统计量', 'lift': '提升统计量'}), type = 'line+line', path = './reports/scorecard_report/model_evaluation.html') + +Pandas2chart(dataset = score[['bin', 'threshold', 'proportion', 'accumulation_sample', 'proportion_positives', 'accumulation_positive']].rename(columns = {'bin': '分箱', 'threshold': '拒绝规则', 'proportion': '分箱逾期率', 'accumulation_sample': '拒绝率', 'proportion_positives': '拒绝逾期率', 'accumulation_positive': '累计逾期率'}), type = 'table', path = './reports/scorecard_report/business_evaluation.html') + +print('4.2 生成评分卡规则文件并保存...', end = '') + +calculate = '''def Calculate(sample):\n\n\tscore = 0\n\n''' + +#遍历特征变量 +for independent in independents: + + calculate = calculate + '\tmatch sample["{}"]:\n\n'.format(independent) + + subset = dictionary.loc[dictionary['independent'] == independent].reset_index(drop = True) + + #若倒数第一个分箱为缺失值则倒数第二个分箱开放右边界、倒数第一个分箱就缺失值赋分 + if subset.loc[subset.index[-1], 'bin'] is numpy.nan: + + for index in subset.index: + + #正数第一个分箱 + if index == subset.index[0]: + + calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score']) + + #倒数第二个分箱 + elif index == subset.index[-2]: + + calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score']) + + #倒数第一个分箱 + elif index == subset.index[-1]: + + calculate += '\t\tcase numpy.nan: score += {}\n\n'.format(subset.loc[index, 'score']) + + else: + + calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score']) + + else: + + for index in subset.index: + + #正数第一个分箱 + if index == subset.index[0]: + + calculate += '\t\tcase x if x < {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'score']) + + #倒数第一个分箱 + elif index == subset.index[-1]: + + calculate += '\t\tcase x if x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].left, subset.loc[index, 'score']) + + else: + + calculate += '\t\tcase x if x < {} and x >= {}: score += {}\n\n'.format(subset.loc[index, 'bin'].right, subset.loc[index, 'bin'].left, subset.loc[index, 'score']) + +calculate += '\treturn score' + +#将评分卡规则写入本地文件 +with open('../utils/scorecrad_calculate.txt', 'w') as file: + + file.write(calculate) + +print('已完成') +print() + +print('5、生成贷款申请评分卡报告...', end = '') + +#选择报告模版 +template = Environment(loader = FileSystemLoader('./reports/scorecard_report/')).get_template('template.html') + +#渲染 +scorecard_report = template.render( + + { + + #报告日期 + 'report_date': time.strftime('%y-%m-%d', time.localtime()), + + 'samples': samples, + + 'variables_independent': variables_independent, + + 'ks': ks, + + 'lift': lift + + } + +) + +with open('./reports/scorecard_report/scorecard_report.html', 'w', encoding = 'utf8') as file: + + file.write(scorecard_report) + +print('已完成') +print() + + + + + + +