diff --git a/票据理赔自动化/abandoned.py b/票据理赔自动化/abandoned.py index c0d8688..7c25a5a 100644 --- a/票据理赔自动化/abandoned.py +++ b/票据理赔自动化/abandoned.py @@ -172,3 +172,23 @@ def general_text_recognize(image) -> str: {"desc": "当前页数", "value": ""}, ], } + +""" + + + + + + + +with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file: + file.write( + template.render( + { + "dossier": dossier, + } + ) + ) + + +""" diff --git a/票据理赔自动化/database.db b/票据理赔自动化/database.db index df642b9..e2c0ee7 100644 Binary files a/票据理赔自动化/database.db and b/票据理赔自动化/database.db differ diff --git a/票据理赔自动化/main.py b/票据理赔自动化/main.py index a4edb8e..8421ec6 100644 --- a/票据理赔自动化/main.py +++ b/票据理赔自动化/main.py @@ -92,9 +92,9 @@ if __name__ == "__main__": --与主被保险人关系,包括本人和附属(附属包括配偶、父母和子女等) relationship TEXT NOT NULL, --保险起期(取个单和团单起期最大值) - commencement_date TEXT NOT NULL, + commence_date TEXT NOT NULL, --保险止期(取个单和团单止期最小值) - termination_date TEXT NOT NULL, + terminate_date TEXT NOT NULL, --联合主键(被保险人+证件类型+证件号码+保险分公司) PRIMARY KEY (insured_person, identity_type, identity_number, insurer_company) @@ -164,13 +164,15 @@ if __name__ == "__main__": insured_person: str, identity_type: str, identity_number: str, + report_date: str, ) -> Optional[List[Dict[str, Any]]]: """ - 根据保险分公司、被保险人、证件类型和证件号码查询被保险人(备注,若夫妻同在投保公司则互为附加被保险人,一方被保险人记录包括本人和配偶两条) + 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询被保险人(备注,若夫妻同在投保公司则互为附加被保险人,一方被保险人记录包括本人和配偶两条) :param insurer_company: 保险分公司 :param insured_person: 被保险人 :param identity_type: 证件类型 :param identity_number: 证件号码 + :param report_date: 报案时间 :return: 被保险人列表,包括被被保险人、个单号、主被保险人、与主被保险人关系、保险起期和保险止期 """ # noinspection PyBroadException @@ -184,19 +186,21 @@ if __name__ == "__main__": master_insured_person AS "主被保险人", insured_person AS "被保险人", relationship AS "与主被保险人关系", - commencement_date AS "保险起期", - termination_date AS "保险止期" + commence_date AS "保险起期", + terminate_date AS "保险止期" FROM insured_persons WHERE insurer_company = ? AND insured_person = ? AND identity_type = ? AND identity_number = ? + AND ? BETWEEN commence_date AND terminate_date """, parameters=( insurer_company, insured_person, identity_type, identity_number, + report_date, ), ) if result: @@ -224,7 +228,7 @@ if __name__ == "__main__": content: str, ) -> Optional[str]: """ - 根据明细项名称中具体内容查询药品/医疗服务 + 根据明细项中具体内容查询药品/医疗服务 :param content: 明细项具体内容 :return: 药品/医疗服务 """ @@ -246,9 +250,9 @@ if __name__ == "__main__": "medicine" ] # 返回药品最大长度的药品 raise - # TODO: 若根据明细项名称中具体内容查询药品/医疗服务发生异常则流转至主数据人工处理 + # TODO: 若根据明细项中具体内容查询药品/医疗服务发生异常则流转至主数据人工处理 except Exception: - raise RuntimeError("根据明细项名称中具体内容查询药品/医疗服务发生异常") + raise RuntimeError("根据明细项中具体内容查询药品/医疗服务发生异常") # 实例化主数据 master_data = MasterData() @@ -544,7 +548,7 @@ if __name__ == "__main__": # noinspection PyTypeChecker dossier["出险人层"].update( { - "姓名": (insured_person := response["data"]["name"]), + "出险人": (insured_person := response["data"]["name"]), "证件类型": (identity_type := "居民身份证"), "证件号码": (indentity_number := response["data"]["idNo"]), "性别": response["data"]["sex"], @@ -567,12 +571,13 @@ if __name__ == "__main__": } ) - # 查询并获取多条被保险人记录 + # 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询被保险人 dossier["被保险人层"] = master_data.query_insured_persons( insurer_company, insured_person, # 出险人和被保险人为同一人,视角不同:出险人为理赔,被保险人为承保/保全 identity_type, indentity_number, + dossier["报案层"]["报案时间"].strftime("%Y-%m-%d"), ) # noinspection PyShadowingNames @@ -764,15 +769,18 @@ if __name__ == "__main__": raise RuntimeError( "就中银保险有限公司的理赔申请书识别并整合至赔案档案发生异常" ) - - dossier["受益人层"].update( + dossier["出险人层"].update( { - "开户行": recognition["开户银行"], - "户名": recognition["户名"], - "户号": recognition["账号"], "手机号": recognition["手机"], } ) + dossier["领款人层"].update( + { + "领款人": recognition["户名"], + "银行": recognition["开户银行"], + "账号": recognition["账号"], + } + ) # 根据保险分公司匹配结构化识别文本方法 match insurer_company: @@ -790,15 +798,14 @@ if __name__ == "__main__": """ # noinspection PyShadowingNames - def query_value(contents: list, key: str) -> Optional[str]: + def fuzzy_match(contents: list, key: str) -> Optional[str]: """ - 就识别结果,根据指定键名查询值 - :param contents: 识别结果 - :param key: 指定键名 + 根据指定内容列表(基于深圳快瞳增值税发票和医疗收费票据识别结果)模糊匹配键名并获取值 + :param contents: 内容列表 + :param key: 键名 :return 值 - 需要匹配的键名的键值 """ - # 若识别结果为空列表则返回None + # 若内容列表为空值则返回None if not contents: return None @@ -806,32 +813,29 @@ if __name__ == "__main__": match contents[0].keys(): # 对应深圳快瞳增值税发票识别结果 case _ if "desc" in contents[0].keys(): - # 遍历识别结果,若内容的键名为指定键名则返回值 for content in contents: if content["desc"] == key: return content["value"] if content["value"] else None candidates = [] - # 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度 for content in contents: candidates.append( ( content["value"], fuzz.WRatio( content["desc"], key, force_ascii=False - ), + ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 ) ) - # 返回最大相似度的值 return ( (result[0] if result[0] else None) if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 else None - ) + ) # 返回>=80且最大的相似度的值 + # 对应深圳快瞳医疗收费票据识别结果 case _ if "name" in contents[0].keys(): - # 遍历识别结果,若内容的键名为指定键名则返回值 for content in contents: if content["name"] == key: return ( @@ -847,21 +851,20 @@ if __name__ == "__main__": content["word"]["value"], fuzz.WRatio( content["name"], key, force_ascii=False - ), # 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度 + ), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度 ) ) - # 返回最大相似度的值 return ( (result[0] if result[0] else None) if (result := max(candidates, key=lambda x: x[1]))[1] >= 80 else None - ) + ) # 返回>=80且最大的相似度的值 - def parse_name(name: str) -> Tuple[str, Optional[str]]: + def parse_item(name: str) -> Tuple[str, Optional[str]]: """ - 根据明细项名称解析明细项类别和具体内容,并根据明细项名称中具体内容查询药品/医疗服务 - :param name: 明细项名称 + 根据明细项解析明细项类别和具体内容,并具体内容查询药品/医疗服务 + :param name: 明细项 return 明细项类别和药品/医疗服务 """ if match := re.match( @@ -928,7 +931,7 @@ if __name__ == "__main__": "购药及就医机构": response["data"]["details"]["seller"], "明细项": [ { - "名称": item["name"], + "明细项": item["name"], "数量": ( Decimal(item["quantity"]).quantize( Decimal("0.00"), @@ -1002,7 +1005,7 @@ if __name__ == "__main__": ], "明细项": [ { - "名称": item["itemName"], + "明细项": item["itemName"], "数量": Decimal(item["number"]).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, @@ -1080,41 +1083,41 @@ if __name__ == "__main__": ): raise RuntimeError("请求深圳快瞳增值税发票识别接口发生异常") - match query_value(response["data"], "发票类型"): + match fuzzy_match(response["data"], "发票类型"): case "电子发票(普通发票)": # noinspection PyTypeChecker receipt.update( { - "票据号": query_value( + "票据号": fuzzy_match( response["data"], "发票号码" ), - "票据代码": query_value( + "票据代码": fuzzy_match( response["data"], "发票代码" ), "开票日期": datetime.strptime( - query_value(response["data"], "开票日期"), + fuzzy_match(response["data"], "开票日期"), "%Y年%m月%d日", ), - "校验码": query_value( + "校验码": fuzzy_match( response["data"], "校验码" ), "开票金额": Decimal( - query_value( + fuzzy_match( response["data"], "小写金额" ).replace("¥", "") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), - "姓名": query_value( + "姓名": fuzzy_match( response["data"], "购买方名称" ), - "购药及就医机构": query_value( + "购药及就医机构": fuzzy_match( response["data"], "销售方名称" ), "明细项": [ { - "名称": name, + "明细项": name, "数量": Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, @@ -1161,43 +1164,43 @@ if __name__ == "__main__": ], ) ], - "备注": query_value(response["data"], "备注"), + "备注": fuzzy_match(response["data"], "备注"), } ) case "增值税普通发票(卷票)": # noinspection PyTypeChecker receipt.update( { - "票据号": query_value( + "票据号": fuzzy_match( response["data"], "发票号码" ), - "票据代码": query_value( + "票据代码": fuzzy_match( response["data"], "发票代码" ), "开票日期": datetime.strptime( - query_value(response["data"], "开票日期"), + fuzzy_match(response["data"], "开票日期"), "%Y-%m-%d", ), - "校验码": query_value( + "校验码": fuzzy_match( response["data"], "校验码" ), "开票金额": Decimal( - query_value( + fuzzy_match( response["data"], "合计金额(小写)" ).replace("¥", "") ).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, ), - "姓名": query_value( + "姓名": fuzzy_match( response["data"], "购买方名称" ), - "购药及就医机构": query_value( + "购药及就医机构": fuzzy_match( response["data"], "销售方名称" ), "明细项": [ { - "名称": name, + "明细项": name, "数量": Decimal(quantity).quantize( Decimal("0.00"), rounding=ROUND_HALF_UP, @@ -1234,7 +1237,7 @@ if __name__ == "__main__": ], ) ], - "备注": query_value(response["data"], "备注"), + "备注": fuzzy_match(response["data"], "备注"), } ) case "医疗门诊收费票据" | "医疗住院收费票据": @@ -1300,7 +1303,7 @@ if __name__ == "__main__": if isinstance(receipt["endtime"], dict) else None ), - "校验码": query_value( + "校验码": fuzzy_match( receipt["global_detail"]["region_specific"], "校验码", ), @@ -1314,7 +1317,7 @@ if __name__ == "__main__": "购药及就医机构": receipt["hospital_name"]["value"], "明细项": [ { - "名称": ( + "明细项": ( item["item_name"]["value"] if isinstance(item["item_name"], dict) else None @@ -1383,7 +1386,7 @@ if __name__ == "__main__": case ("增值税发票", "药店"): items = ( pandas.DataFrame(receipt["明细项"]) - .groupby("名称") # 就相同明细项名称合并数量和金额 + .groupby("明细项") # 就相同明细项合并数量和金额 .agg(数量=("数量", "sum"), 金额=("金额", "sum")) .loc[ lambda dataframe: dataframe["金额"] != 0 @@ -1391,10 +1394,10 @@ if __name__ == "__main__": .reset_index() .pipe( lambda dataframe: dataframe.join( - dataframe["名称"] + dataframe["明细项"] .apply( - parse_name - ) # 根据明细项名称解析明细项类别和具体内容,并根据明细项名称中具体内容查询药品/医疗服务 + parse_item + ) # 根据明细项解析明细项类别和具体内容,并根据具体内容查询药品/医疗服务 .apply( pandas.Series ) # 就明细项类别和药品/医疗服务元组展开为两列 @@ -1423,14 +1426,15 @@ if __name__ == "__main__": receipt.update( { - "起期": receipt["开票日期"], - "止期": receipt["开票日期"], + "事故起期": receipt["开票日期"], + "事故止期": receipt["开票日期"], "姓名": ( - dossier["出险人层"]["姓名"] - if dossier["出险人层"]["姓名"] in receipt["姓名"] + dossier["出险人层"]["出险人"] + if dossier["出险人层"]["出险人"] in receipt["姓名"] else receipt["姓名"] ), "购药及就医类型": "药店购药", + "事故诊断": "购药拟诊", "个人自费": Decimal("0.00"), "个人自付": Decimal("0.00"), "医保支付": Decimal("0.00"), @@ -1495,12 +1499,17 @@ if __name__ == "__main__": ): raise RuntimeError("请求深圳快瞳银行卡识别接口发生异常或非借记卡") # noinspection PyTypeChecker - dossier["受益人层"].update( + dossier["出险人层"].update( { - "开户行": response["data"]["bankInfo"], - "户名": "", - "户号": response["data"]["cardNo"].replace(" ", ""), - "手机号": "", + "手机号": None, + } + ) + # noinspection PyTypeChecker + dossier["领款人层"].update( + { + "领款人": None, + "银行": response["data"]["bankInfo"], + "账号": response["data"]["cardNo"].replace(" ", ""), } ) @@ -1541,9 +1550,6 @@ if __name__ == "__main__": # 初始化赔案档案(保险公司将提供投保公司、保险分公司和报案时间等,TPA作业系统签收后生成赔案号) dossier = { "报案层": { - "投保公司": ( - insurance_company := "中国银行股份有限公司昆山分行" - ), # 指定投保公司 "保险分公司": ( insurer_company := "中银保险有限公司苏州分公司" ), # 指定保险分公司 @@ -1553,7 +1559,7 @@ if __name__ == "__main__": "影像件层": [], "出险人层": {}, "被保险人层": [], - "受益人层": {}, + "领款人层": {}, "票据层": [], } @@ -1595,311 +1601,40 @@ if __name__ == "__main__": dossier["影像件层"].append(image) - # 就影像件按照影像件类型排序 - dossier["影像件层"].sort( - key=lambda x: [ - "居民户口簿", - "居民身份证(国徽面)", - "居民身份证(头像面)", - "居民身份证(国徽、头像面)", - "中国港澳台地区及境外护照", - "理赔申请书", - "增值税发票", - "医疗门诊收费票据", - "医疗住院收费票据", - "医疗费用清单", - "银行卡", - "其它", - ].index(x["影像件类型"]) - ) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保险人信息 + # 就影像件按照影像件类型排序 + dossier["影像件层"].sort( + key=lambda x: [ + "居民户口簿", + "居民身份证(国徽面)", + "居民身份证(头像面)", + "居民身份证(国徽、头像面)", + "中国港澳台地区及境外护照", + "理赔申请书", + "增值税发票", + "医疗门诊收费票据", + "医疗住院收费票据", + "医疗费用清单", + "银行卡", + "其它", + ].index(x["影像件类型"]) + ) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保险人信息 - # 遍历影像件层中影像件 - for image in dossier["影像件层"]: - # 影像件识别并整合至赔案档案 - image_recognize( - image, - insurer_company, - ) - - for receipt in dossier["票据层"]: - print(receipt) - - print(dossier["被保险人层"]) - -""" - - - - - - - - case "增值税发票" | "门诊收费票据" | "住院收费票据": - extraction = invoice_extraction() - # 若发生异常则跳过该影像件 - if extraction is None: - dossier["影像件层"][-1]["已识别"] = "否,无法识别" - continue - - dossier["发票层"].append( - { - "关联影像件序号": image_index, - "票据类型": extraction["票据类型"], - "票据号码": extraction["票据号码"], - "票据代码": ( - extraction["票据代码"] - if extraction["票据代码"] - else "--" - ), # 数电票无票据代码,校验码同票据号码 - "开票日期": datetime.strptime( - extraction["开票日期"], "%Y-%m-%d" - ), - "校验码后六位": ( - check_code[-6:] - if (check_code := extraction["校验码"]) - else "--" - ), - "医药机构": extraction["收款方"], - "就诊人": ( - match.group("name") - if ( - match := re.search( - r"^(?P[^((]+)", extraction["付款方"] - ) - ) - else extraction["付款方"] - ), - "票据金额": Decimal(extraction["票据金额"]).quantize( - Decimal("0.00"), rounding=ROUND_HALF_UP - ), # 默认金额转为小数,保留两位小数 - "查验状态": extraction["查验状态"], - "项目": ( - pandas.DataFrame(extraction["项目"]) - .assign( - 数量=lambda dataframe: dataframe["数量"].apply( - lambda row: ( - Decimal(row).quantize( - Decimal("0.00"), - rounding=ROUND_HALF_UP, - ) - if row - else Decimal("0.00") - ) - ), - 金额=lambda dataframe: dataframe["金额"].apply( - lambda row: ( - Decimal(row).quantize( - Decimal("0.00"), - rounding=ROUND_HALF_UP, - ) - if row - else Decimal("0.00") - ) - ), - ) - .groupby(by="名称", as_index=False) - .agg(数量=("数量", "sum"), 金额=("金额", "sum")) - .assign( - 大项=lambda dataframe: dataframe["名称"].apply( - lambda row: ( - match.group("category") - if ( - match := re.match( - r"^\*(?P.+?)\*.*$", - row, - ) - ) - else row - ) - ), - 小项=lambda dataframe: dataframe["名称"].apply( - lambda row: ( - re.sub( - r"[^\u4e00-\u9fa5a-zA-Z0-9./%*]", - "", - match.group("name"), - ) - if ( - match := re.match( - r"^\*.+?\*(?:\[[^]]+])?(?P[^\s(]+)(?:\([^\s(]+\))?(?:.*?)?$", - row, - ) - ) - else "" - ) - ), - ) - .loc[ - lambda dataframe: dataframe["金额"] != 0, - ["名称", "大项", "小项", "数量", "金额"], - ] - .to_dict(orient="records") - ), - "就诊类型": ( - "药店购药" - if "增值税发票" in image_type - else ( - "门诊就诊" - if "门诊收费票据" in image_type - else "住院治疗" - ) - ), - } - ) - - case "理赔申请书": - # 根据保险总公司匹配理赔申请书 - # noinspection PyUnreachableCode - match insurer: - case "中银保险有限公司": - extraction = common_extraction( - application_form="中行员工福利保障计划索赔申请书" - ) - # 若识别异常则跳过该影像件 - if extraction is None: - dossier["影像件层"][-1]["已识别"] = "否,无法识别" - continue - - dossier["赔案层"]["申请人信息"].update( - { - "与被保险人关系": "本人", # 中银保险有限公司:默认申请人与被保险人关系为本人 - "年龄": ( - Decimal(age).quantize( - Decimal("0"), - rounding=ROUND_HALF_UP, - ) - if ( - age := extraction.get("基础信息", {}).get( - "年龄", "--" - ) - ).isdigit() - else age - ), # 若年龄仅数字则转为小数、取整,否则默认为“--” - "手机号": ( - phone_number - if re.match( - r"^1[3-9]\d{9}$", - phone_number := extraction.get( - "基础信息", {} - ).get("手机", "--"), - ) - else phone_number - ), # 若手机未正则匹配手机号格式则为“--” - } - ) - - dossier["赔案层"]["受益人信息"].update( - { - "与被保险人关系": "本人", # 中银保险有限公司:默认受益人与被保人关系为本人 - "户名": ( - account_name - if ( - account_name := extraction.get( - "基础信息", {} - ).get("户名") - ) - else "--" - ), # 若户名为NONE则为“--” - "开户银行": ( - account_name - if ( - account_name := extraction.get( - "基础信息", {} - ).get("开户银行") - ) - else "--" - ), # 若开户银行为NONE则为“--” - "银行账号": ( - account_name - if ( - account_name := extraction.get( - "基础信息", {} - ).get("账号") - ) - is not None - else "--" - ), # 若银行账号为NONE则为“--” - } - ) - - dossier["赔案层"]["其它信息"]["自述症状"] = ( - ("、".join(diagnoses)) - if ( - diagnoses := sorted( - set( - "、".join( - [ - diagnosis - for invoice in extraction.get( - "票据表格", [] - ) - if ( - diagnosis := invoice.get("诊断") - ) - ] - ).split("、") - ) - ) - ) - else "--" - ) - - case _: - dossier["影像件层"][-1]["已识别"] = "否,无法识别" - continue - - case _: - dossier["影像件层"][-1]["已识别"] = "否,无法识别" - continue - - dossier["影像件层"][-1].update( - { - "已识别": "是", - "识别结果": extraction, - } + # 遍历影像件层中影像件 + for image in dossier["影像件层"]: + # 影像件识别并整合至赔案档案 + image_recognize( + image, + insurer_company, ) + # 就票据层按照事故止期和票据号顺序排序 + dossier["票据层"].sort(key=lambda x: (x["事故止期"], x["票据号"])) - # 发票层根据开票日期顺序排序 - dossier["发票层"] = sorted( - dossier["发票层"], key=lambda x: (x["开票日期"], x["票据号码"]) - ) + # 就 - # 构建小项层 - # noinspection PyTypeChecker - dossier["小项层"] = ( - pandas.DataFrame( - [ - { - "小项": item["小项"], - "数量": item["数量"], - "金额": item["金额"], - } - for invoice in dossier["发票层"] - for item in invoice["项目"] - ] - ) - .groupby(by="小项", as_index=False) - .agg(数量=("数量", "sum"), 金额=("金额", "sum")) - .to_dict(orient="records") - ) + for receipt in dossier["票据层"]: + print(receipt) - for invoice in dossier["发票层"]: - # noinspection PyTypeChecker - invoice["推定疾病"] = disease_diagnosis( - items="、".join(sorted(set([item["小项"] for item in invoice["项目"]]))) - ) - - print(dossier) - exit() - - with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file: - file.write( - template.render( - { - "dossier": dossier, - } - ) - ) - - -""" + print(dossier["被保险人层"]) + print(dossier["出险人层"]) + print(dossier["领款人层"]) + print(dossier["报案层"]) diff --git a/营销短视频生成自动化/main.py b/营销短视频生成自动化/main.py index 51328c1..feb8749 100644 --- a/营销短视频生成自动化/main.py +++ b/营销短视频生成自动化/main.py @@ -2,241 +2,289 @@ """ 营销短视频生成自动化 -功能清单 -1、打开并读取任务 -2、按照初始化剪映草稿、分别添加视频轨、音频轨、字幕轨和图片轨等、生成视频执行任务 """ -import asyncio + from pathlib import Path - -import edge_tts - -task = { - "video_path": "", # 视频路径 - "video_cover_path": "", # 视频首帧图片路径,可空 - "pictures": [ - { - "position": (0.0, 0.0), # 图片左上角X和Y位置,于视频宽度百分比 - "size_ratio": "", # 图片尺寸比例,于视频百分比 - "picture_path": "", # 图片尺寸比例,于视频百分比 - }, # 图片 - ], - "texts": [ - { - "content": "", # 文字内容 - "start": "", # 文字显示开始时间 - "duration": "", # 文字展示时长,若为空值则默认为视频播放时长 - "background_position": (0.0, 0.0), # 背景左上角X和Y位置 - "background_color": "", # 背景颜色 - "background_opacity": "", # 背景透明度 - "stroke_color": "", # 边框颜色 - "stroke_width": "", # 边框线宽 - "font_size": "", # 字体大小 - "font_color": "", # 字体颜色 - }, - ], -} - -from pycapcut import DraftFolder - -import sys - - -# 指定剪映草稿文件夹地址 -folder_path = { - "win32": r"C:\Users\admin\AppData\Local\JianyingPro\User Data\Projects\com.lveditor.draft".replace( - "\\", "/" - ), # 若当前系统为windows则将剪映草稿文件夹原始字符串(避免转义问题)中反斜杠"\"转为正斜杠“/” - "darwin": None, -}.get(sys.platform) - -if not folder_path: - raise RuntimeError("未指定剪映草稿文件夹地址") - -# 初始化剪映草稿文件夹 -folder = DraftFolder(folder_path) - -print(folder) - -exit() - - -async def audio_gen( - text: str, output_path: Path, voice_name: str = "zh-CN-XiaoxiaoNeural" -): - # 可用的中文语音列表 - # chinese_voices = { - # "女声-晓晓": "zh-CN-XiaoxiaoNeural", # 年轻女声,自然 - # "女声-晓辰": "zh-CN-XiaochenNeural", # 年轻女声,温暖 - # "女声-晓墨": "zh-CN-XiaomoNeural", # 女声,富有表现力 - # "男声-晓悠": "zh-CN-XiaoyouNeural", # 儿童音,可爱 - # "男声-晓涵": "zh-CN-XiaohanNeural", # 年轻男声,温暖 - # "男声-晓睿": "zh-CN-XiaoruiNeural", # 成熟男声,沉稳 - # "女声-晓倩": "zh-CN-XiaoqianNeural", # 女声,成熟 - # "男声-晓东": "zh-CN-XiaodongNeural", # 男声,权威 - # } - - # 生成语音 - communicate = edge_tts.Communicate( - text, voice_name, rate="+0%", pitch="+0Hz", volume="+0%" - ) - - await communicate.save(output_path) - print(f"语音已保存到: {output_path}") - - -# ---- +from typing import Optional, Tuple, Union import pycapcut as capcut -from pycapcut import KeyframeProperty, SEC -from pycapcut import FontType, TextStyle, ClipSettings -from typing import List -import srt +from pycapcut import tim, trange -async def video_merge_audio_and_caption( - video_path: Path, - audioPathList: List[Path], - image_path: Path, - subtitleList: List[srt.Subtitle], - output_path: Path, -): - PROJECT_DIR = "C:\\Users\\z4938\\AppData\\Local\\JianyingPro\\User Data\\Projects\\com.lveditor.draft" +class Draft: + """剪映脚本生成器类""" - draft_folder = capcut.DraftFolder(PROJECT_DIR) + # noinspection PyShadowingNames + def __init__( + self, + draft_name: str, + drafts_path: str = r"C:\Users\admin\AppData\Local\JianyingPro\User Data\Projects\com.lveditor.draft", + stocks_path: Path = Path(__file__).parent / "materials", + video_width: int = 1920, + video_height: int = 1080, + ): + """ + 初始化剪映草稿生成器 + :param drafts_path: 草稿文件夹路径 + :param stocks_path: 素材文件夹路径 + :param draft_name: 草稿名称 + :param video_width: 视频宽度 + :param video_height: 视频高度 + """ + # 基础配置 + self.drafts_path = drafts_path + self.stocks_path = stocks_path + self.draft_name = draft_name + self.video_width, self.video_height = video_width, video_height - script = draft_folder.create_draft("新草稿", 1920, 1080) - print("成功创建 draft(剪映项目草稿)") + # 尝试创建草稿文件夹和草稿 + # noinspection PyBroadException + try: + self.draft_folder = capcut.DraftFolder(self.drafts_path) + self.draft = self.draft_folder.create_draft( + self.draft_name, self.video_width, self.video_height, allow_replace=True + ) - # 添加视频轨 - video_track_id = 1 - script.add_track( - capcut.TrackType.video, track_name="视频", relative_index=video_track_id - ) - video_mat = capcut.VideoMaterial(video_path) - video_seg = capcut.VideoSegment(video_mat, capcut.Timerange(0, video_mat.duration)) - video_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration - SEC, 1.0) - video_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration, 0.0) - video_seg.volume = 0 - script.add_segment(video_seg, "视频") - print( - f"视频素材导入成功,素材ID:{video_seg.material_id}, 片段id:{video_seg.segment_id}" + # 添加基础轨道:音频、视频、文本(图片/贴纸复用视频轨) + self.draft.add_track(capcut.TrackType.audio) + self.draft.add_track(capcut.TrackType.video) + self.draft.add_track(capcut.TrackType.text) + + except: + raise RuntimeError("创建草稿文件夹和草稿发生异常") + + # 检查素材文件夹是否存在,若不存在则抛出异常 + if not self.stocks_path.exists(): + raise FileNotFoundError(f"素材文件夹不存在") + + def _check_path(self, file_name: str) -> str: + """ + 检查文件是否存在,若存在则返回文件路径 + :param file_name: 文件名称 + :return 文件路径 + """ + file_path = self.stocks_path / file_name + if not file_path.exists(): + raise FileNotFoundError(f"素材文件不存在") + return file_path.as_posix() + + def add_audio( + self, + file_name: str, + start_time: str, + duration: str, + volume: float = 1.0, + fade_in: str = "0s", + fade_out: str = "0s", + ) -> capcut.AudioSegment: + """ + 添加音频片段 + :param file_name: 音频文件名称 + :param start_time: 音频在轨道上的开始时间(如 "0s") + :param duration: 音频持续时长(如 "1s") + :param volume: 音量(如 1.0) + :param fade_in: 淡入时长(如 “0s”) + :param fade_out: 淡出时长(如 “0s”) + Returns: capcut.AudioSegment + """ + try: + # 创建音频片段 + audio_segment = capcut.AudioSegment( + self._check_path(file_name), trange(start_time, duration), volume=volume + ) + # 添加淡入淡出效果 + audio_segment.add_fade(fade_in, fade_out) + + self.draft.add_segment(audio_segment) + return audio_segment + except Exception: + raise RuntimeError("添加音频片段发生异常") + + def add_video( + self, + file_name: str, + start_time: str, + duration: str, + animation: Optional[capcut.IntroType] = None, + transition: Optional[capcut.TransitionType] = None, + keyframes: Optional[list] = None, + ) -> capcut.VideoSegment: + """ + 添加视频片段 + :param file_name: 视频文件名称 + :param start_time: 视频在轨道上的开始时间(如 "0s") + :param duration: 视频持续时长(如 "1s") + :param animation: 动画配置 + :param transition: 转场配置 + :param keyframes: 关键帧配置 + :return capcut.VideoSegment + """ + try: + # 创建视频片段 + video_segment = capcut.VideoSegment( + self._check_path(file_name), trange(start_time, duration) + ) + # 添加动画 + if animation: + video_segment.add_animation(animation) + + # 添加转场 + if transition: + video_segment.add_transition(transition) + + # 添加关键帧 + if keyframes: + # noinspection PyShadowingBuiltins + for property, time, value in keyframes: + video_segment.add_keyframe(property, time, value) + + self.draft.add_segment(video_segment) + return video_segment + except Exception: + raise RuntimeError("添加视频片段发生异常") + + def add_image( + self, + file_name: str, + start_time: Union[str, tim], + duration: Optional[str] = None, + background_filling: Tuple[str, float] = ("blur", 0.0625), + ) -> capcut.VideoSegment: + """ + 添加图片/贴纸 + :param file_name: 图片文件名称 + :param start_time: 图片在轨道上的开始时间(如 "0s") + :param duration: 图片持续时长(如 "1s") + :param background_filling: 背景填充配置 + :return: capcut.VideoSegment + """ + try: + # 创建图片素材 + image_material = capcut.VideoMaterial(self._check_path(file_name)) + # 创建图片片段 + image_segment = capcut.VideoSegment( + image_material, + trange(start_time, duration if duration else image_material.duration), + ) # 若已设置图片持续时长则使用,否则使用视频持续时长 + + # 添加背景填充 + if background_filling: + image_segment.add_background_filling(*background_filling) + + self.draft.add_segment(image_segment) + return image_segment + except Exception: + raise RuntimeError("添加图片/贴纸发生异常") + + def add_text( + self, + content: str, + target_timerange: trange, + font: capcut.FontType = capcut.FontType.悠然体, + color: Tuple[float, float, float] = (1.0, 1.0, 0.0), + position_y: float = -0.8, + outro_animation: Optional[capcut.TextOutro] = capcut.TextOutro.故障闪动, + anim_duration: tim = tim("1s"), + bubble_id: Optional[str] = "7446997603268496646", + effect_id: Optional[str] = "7336825073334078725", + ) -> capcut.TextSegment: + """ + 添加文字片段 + + Args: + content: 文字内容 + target_timerange: 文字显示的时间范围(通常对齐视频片段) + font: 字体类型 + color: 文字颜色(RGB,0-1) + position_y: 文字Y轴位置(-0.8为屏幕下方) + outro_animation: 出场动画(None则不添加) + anim_duration: 动画时长 + bubble_id: 气泡效果ID(None则不添加) + effect_id: 花字效果ID(None则不添加) + + Returns: + capcut.TextSegment: 创建的文字片段对象 + """ + try: + # 创建文字片段 + text_segment = capcut.TextSegment( + content, + target_timerange, + font=font, + style=capcut.TextStyle(color=color), + clip_settings=capcut.ClipSettings(transform_y=position_y), + ) + + # 添加出场动画 + if outro_animation: + text_segment.add_animation(outro_animation, duration=anim_duration) + + # 添加气泡 + if bubble_id: + text_segment.add_bubble(bubble_id, bubble_id) + + # 添加花字 + if effect_id: + text_segment.add_effect(effect_id) + + self.draft.add_segment(text_segment) + return text_segment + except Exception: + raise RuntimeError("添加文字片段发生异常") + + def save(self) -> None: + """保存草稿""" + try: + self.draft.save() + print("草稿保存成功") + except Exception: + raise RuntimeError(f"保存草稿发生异常") + + +# ======================== 调用示例(使用抽象后的方法) ======================== +def main(): + """生成脚本""" + # 实例化 + draft = Draft( + draft_name="demo2", ) - # 添加logo图片轨 - logo_track_id = 2 - script.add_track( - capcut.TrackType.video, track_name="logo图片", relative_index=logo_track_id - ) - logo_mat = capcut.VideoMaterial(image_path) - logo_seg = capcut.VideoSegment(logo_mat, capcut.Timerange(0, video_mat.duration)) - logo_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration - SEC, 1.0) - logo_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration, 0.0) - script.add_segment(logo_seg, "logo图片") - print(f"logo导入成功,视频轨道id:{logo_track_id}") - - # 添加音频轨 - script.add_track(capcut.TrackType.audio, track_name="音频", relative_index=3) - for audio_path in audioPathList: - audio_mat = capcut.AudioMaterial(audio_path) - audio_seg = capcut.AudioSegment( - audio_mat, - capcut.Timerange(0, audio_mat.duration), - source_timerange=capcut.Timerange(0, audio_mat.duration), - ) - script.add_segment(audio_seg, "音频") - print( - f"音频素材导入成功,素材ID:{audio_seg.material_id}, 片段id:{audio_seg.segment_id}" + # 添加音频 + draft.add_audio( + file_name="audio.mp3", + start_time="0s", + duration="5s", + volume=0.6, ) - # 添加字幕轨 - script.add_track(capcut.TrackType.text, track_name="字幕", relative_index=4) - - for subtitle in subtitleList: - text_seg = capcut.TextSegment( - subtitle.content, - capcut.Timerange( - subtitle.start.total_seconds(), subtitle.end.total_seconds() - ), - font=FontType.下午茶, - style=TextStyle( - size=5.0, - color=(0.7, 0.7, 1.0), - auto_wrapping=True, - underline=True, - align=1, - ), - clip_settings=ClipSettings(transform_y=-0.8), - ) - script.add_segment(text_seg, "字幕") - print("字幕添加成功") - - export_result = script.export( - output_path=output_path, resolution="1080p", fps=30, quality="high" + # 添加视频 + video_segment = draft.add_video( + file_name="video.mp4", + start_time="0s", + duration="4.2s", + keyframes=[ + (capcut.KeyframeProperty.position_x, tim(0), -2), + (capcut.KeyframeProperty.position_x, tim("0.5s"), 0), + ], ) - print(f"视频导出完成!result: {export_result}, 成品路径:{output_path}") - -# ---- - -from datetime import timedelta -from moviepy import AudioFileClip - - -async def main(): - TEXT_PATH_STR = "D:\\develop\\trkj\\resources\\tts_text.txt" - AUDIO_PATH_STR_FMT = "D:\\develop\\trkj\\resources\\audio_tts_{}.aac" - SRT_PATH_STR = "D:\\develop\\trkj\\resources\\tts_text_srt.srt" - - VIDEO_PATH_STR = "D:\\develop\\trkj\\resources\\test_video.mp4" - IMAGE_PATH_STR = "D:\\develop\\trkj\\resources\\logo.png" - OUTPUT_PATH_STR = "D:\\develop\\trkj\\resources\\video_output.mp4" - - videoPath = Path(VIDEO_PATH_STR) - audioPathList = list() - subtitleList = list() - imagePath = Path(IMAGE_PATH_STR) - outputPath = Path(OUTPUT_PATH_STR) - - with open(TEXT_PATH_STR, "r", encoding="UTF-8") as text_f: - line = text_f.readline() - i = 0 - next_start_seconds = timedelta(seconds=0.0) - while line: - audioPath = AUDIO_PATH_STR_FMT.format(i) - # 每行生成音频 - await audio_gen(line, audioPath) - audioPathList.append(audioPath) - # 创建音频对应的字幕 - audio = AudioFileClip(audioPath) - try: - current_start_seconds = next_start_seconds - current_end_seconds = next_start_seconds + timedelta( - seconds=audio.duration - ) - subtitle = srt.Subtitle( - index=i + 1, - start=current_start_seconds, - end=current_end_seconds, - content=line.strip(), - ) - subtitleList.append(subtitle) - finally: - audio.close() - - line = text_f.readline() - i = i + 1 - next_start_seconds = current_end_seconds - - srt_content = srt.compose(subtitleList) - with open(SRT_PATH_STR, "w", encoding="UTF-8") as srt_file: - srt_file.write(srt_content) - print(f"字幕文件已生成: {SRT_PATH_STR}") - - await video_merge_audio_and_caption( - videoPath, audioPathList, imagePath, subtitleList, outputPath + # 添加图片/贴纸 + draft.add_image( + file_name="sticker.gif", + start_time=video_segment.end, # 视频结束位置开始 + background_filling=("blur", 0.0625), ) + # 添加文字 + draft.add_text( + content="抽象化后更易扩展!", + target_timerange=video_segment.target_timerange, + position_y=-0.5, # 微调位置 + ) + + # 保存草稿 + draft.save() + if __name__ == "__main__": - asyncio.run(main()) + main() diff --git a/营销短视频生成自动化/materials/audio.mp3 b/营销短视频生成自动化/materials/audio.mp3 new file mode 100644 index 0000000..f79c238 Binary files /dev/null and b/营销短视频生成自动化/materials/audio.mp3 differ diff --git a/营销短视频生成自动化/materials/sticker.gif b/营销短视频生成自动化/materials/sticker.gif new file mode 100644 index 0000000..81a28fa Binary files /dev/null and b/营销短视频生成自动化/materials/sticker.gif differ diff --git a/营销短视频生成自动化/materials/video.mp4 b/营销短视频生成自动化/materials/video.mp4 new file mode 100644 index 0000000..8b21113 Binary files /dev/null and b/营销短视频生成自动化/materials/video.mp4 differ diff --git a/营销短视频生成自动化/示例.mp4 b/营销短视频生成自动化/materials/示例.mp4 similarity index 100% rename from 营销短视频生成自动化/示例.mp4 rename to 营销短视频生成自动化/materials/示例.mp4