日常更新

from NUC
This commit is contained in:
liubiren 2025-12-24 22:00:58 +08:00
parent abfe04f2b9
commit 00caffb1c6
8 changed files with 398 additions and 595 deletions

View File

@ -172,3 +172,23 @@ def general_text_recognize(image) -> str:
{"desc": "当前页数", "value": ""},
],
}
"""
with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file:
file.write(
template.render(
{
"dossier": dossier,
}
)
)
"""

Binary file not shown.

View File

@ -92,9 +92,9 @@ if __name__ == "__main__":
--与主被保险人关系包括本人和附属附属包括配偶父母和子女等
relationship TEXT NOT NULL,
--保险起期取个单和团单起期最大值
commencement_date TEXT NOT NULL,
commence_date TEXT NOT NULL,
--保险止期取个单和团单止期最小值
termination_date TEXT NOT NULL,
terminate_date TEXT NOT NULL,
--联合主键被保险人+证件类型+证件号码+保险分公司
PRIMARY KEY (insured_person, identity_type,
identity_number, insurer_company)
@ -164,13 +164,15 @@ if __name__ == "__main__":
insured_person: str,
identity_type: str,
identity_number: str,
report_date: str,
) -> Optional[List[Dict[str, Any]]]:
"""
根据保险分公司被保险人证件类型和证件号码查询被保险人备注若夫妻同在投保公司则互为附加被保险人一方被保险人记录包括本人和配偶两条
根据保险分公司被保险人证件类型证件号码和出险时间查询被保险人备注若夫妻同在投保公司则互为附加被保险人一方被保险人记录包括本人和配偶两条
:param insurer_company: 保险分公司
:param insured_person: 被保险人
:param identity_type: 证件类型
:param identity_number: 证件号码
:param report_date: 报案时间
:return: 被保险人列表包括被被保险人个单号主被保险人与主被保险人关系保险起期和保险止期
"""
# noinspection PyBroadException
@ -184,19 +186,21 @@ if __name__ == "__main__":
master_insured_person AS "主被保险人",
insured_person AS "被保险人",
relationship AS "与主被保险人关系",
commencement_date AS "保险起期",
termination_date AS "保险止期"
commence_date AS "保险起期",
terminate_date AS "保险止期"
FROM insured_persons
WHERE insurer_company = ?
AND insured_person = ?
AND identity_type = ?
AND identity_number = ?
AND ? BETWEEN commence_date AND terminate_date
""",
parameters=(
insurer_company,
insured_person,
identity_type,
identity_number,
report_date,
),
)
if result:
@ -224,7 +228,7 @@ if __name__ == "__main__":
content: str,
) -> Optional[str]:
"""
根据明细项名称中具体内容查询药品/医疗服务
根据明细项中具体内容查询药品/医疗服务
:param content: 明细项具体内容
:return: 药品/医疗服务
"""
@ -246,9 +250,9 @@ if __name__ == "__main__":
"medicine"
] # 返回药品最大长度的药品
raise
# TODO: 若根据明细项名称中具体内容查询药品/医疗服务发生异常则流转至主数据人工处理
# TODO: 若根据明细项中具体内容查询药品/医疗服务发生异常则流转至主数据人工处理
except Exception:
raise RuntimeError("根据明细项名称中具体内容查询药品/医疗服务发生异常")
raise RuntimeError("根据明细项中具体内容查询药品/医疗服务发生异常")
# 实例化主数据
master_data = MasterData()
@ -544,7 +548,7 @@ if __name__ == "__main__":
# noinspection PyTypeChecker
dossier["出险人层"].update(
{
"姓名": (insured_person := response["data"]["name"]),
"出险人": (insured_person := response["data"]["name"]),
"证件类型": (identity_type := "居民身份证"),
"证件号码": (indentity_number := response["data"]["idNo"]),
"性别": response["data"]["sex"],
@ -567,12 +571,13 @@ if __name__ == "__main__":
}
)
# 查询并获取多条被保险人记录
# 根据保险分公司、被保险人、证件类型、证件号码和出险时间查询被保险人
dossier["被保险人层"] = master_data.query_insured_persons(
insurer_company,
insured_person, # 出险人和被保险人为同一人,视角不同:出险人为理赔,被保险人为承保/保全
identity_type,
indentity_number,
dossier["报案层"]["报案时间"].strftime("%Y-%m-%d"),
)
# noinspection PyShadowingNames
@ -764,15 +769,18 @@ if __name__ == "__main__":
raise RuntimeError(
"就中银保险有限公司的理赔申请书识别并整合至赔案档案发生异常"
)
dossier["受益人层"].update(
dossier["出险人层"].update(
{
"开户行": recognition["开户银行"],
"户名": recognition["户名"],
"户号": recognition["账号"],
"手机号": recognition["手机"],
}
)
dossier["领款人层"].update(
{
"领款人": recognition["户名"],
"银行": recognition["开户银行"],
"账号": recognition["账号"],
}
)
# 根据保险分公司匹配结构化识别文本方法
match insurer_company:
@ -790,15 +798,14 @@ if __name__ == "__main__":
"""
# noinspection PyShadowingNames
def query_value(contents: list, key: str) -> Optional[str]:
def fuzzy_match(contents: list, key: str) -> Optional[str]:
"""
就识别结果根据指定键名查询
:param contents: 识别结果
:param key: 指定键名
根据指定内容列表基于深圳快瞳增值税发票和医疗收费票据识别结果模糊匹配键名并获取
:param contents: 内容列表
:param key: 键名
:return
需要匹配的键名的键值
"""
# 若识别结果为空列表则返回None
# 若内容列表为空值则返回None
if not contents:
return None
@ -806,32 +813,29 @@ if __name__ == "__main__":
match contents[0].keys():
# 对应深圳快瞳增值税发票识别结果
case _ if "desc" in contents[0].keys():
# 遍历识别结果,若内容的键名为指定键名则返回值
for content in contents:
if content["desc"] == key:
return content["value"] if content["value"] else None
candidates = []
# 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度
for content in contents:
candidates.append(
(
content["value"],
fuzz.WRatio(
content["desc"], key, force_ascii=False
),
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
# 返回最大相似度的值
return (
(result[0] if result[0] else None)
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else None
)
) # 返回>=80且最大的相似度的值
# 对应深圳快瞳医疗收费票据识别结果
case _ if "name" in contents[0].keys():
# 遍历识别结果,若内容的键名为指定键名则返回值
for content in contents:
if content["name"] == key:
return (
@ -847,21 +851,20 @@ if __name__ == "__main__":
content["word"]["value"],
fuzz.WRatio(
content["name"], key, force_ascii=False
), # 基于加权补偿的莱文斯坦距离算法计算所有内容的键名和指定键名的相似度
), # 基于加权莱文斯坦距离算法计算所有键名和指定键名的相似度
)
)
# 返回最大相似度的值
return (
(result[0] if result[0] else None)
if (result := max(candidates, key=lambda x: x[1]))[1] >= 80
else None
)
) # 返回>=80且最大的相似度的值
def parse_name(name: str) -> Tuple[str, Optional[str]]:
def parse_item(name: str) -> Tuple[str, Optional[str]]:
"""
根据明细项名称解析明细项类别和具体内容根据明细项名称中具体内容查询药品/医疗服务
:param name: 明细项名称
根据明细项解析明细项类别和具体内容具体内容查询药品/医疗服务
:param name: 明细项
return 明细项类别和药品/医疗服务
"""
if match := re.match(
@ -928,7 +931,7 @@ if __name__ == "__main__":
"购药及就医机构": response["data"]["details"]["seller"],
"明细项": [
{
"名称": item["name"],
"明细项": item["name"],
"数量": (
Decimal(item["quantity"]).quantize(
Decimal("0.00"),
@ -1002,7 +1005,7 @@ if __name__ == "__main__":
],
"明细项": [
{
"名称": item["itemName"],
"明细项": item["itemName"],
"数量": Decimal(item["number"]).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
@ -1080,41 +1083,41 @@ if __name__ == "__main__":
):
raise RuntimeError("请求深圳快瞳增值税发票识别接口发生异常")
match query_value(response["data"], "发票类型"):
match fuzzy_match(response["data"], "发票类型"):
case "电子发票(普通发票)":
# noinspection PyTypeChecker
receipt.update(
{
"票据号": query_value(
"票据号": fuzzy_match(
response["data"], "发票号码"
),
"票据代码": query_value(
"票据代码": fuzzy_match(
response["data"], "发票代码"
),
"开票日期": datetime.strptime(
query_value(response["data"], "开票日期"),
fuzzy_match(response["data"], "开票日期"),
"%Y年%m月%d",
),
"校验码": query_value(
"校验码": fuzzy_match(
response["data"], "校验码"
),
"开票金额": Decimal(
query_value(
fuzzy_match(
response["data"], "小写金额"
).replace("¥", "")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"姓名": query_value(
"姓名": fuzzy_match(
response["data"], "购买方名称"
),
"购药及就医机构": query_value(
"购药及就医机构": fuzzy_match(
response["data"], "销售方名称"
),
"明细项": [
{
"名称": name,
"明细项": name,
"数量": Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
@ -1161,43 +1164,43 @@ if __name__ == "__main__":
],
)
],
"备注": query_value(response["data"], "备注"),
"备注": fuzzy_match(response["data"], "备注"),
}
)
case "增值税普通发票(卷票)":
# noinspection PyTypeChecker
receipt.update(
{
"票据号": query_value(
"票据号": fuzzy_match(
response["data"], "发票号码"
),
"票据代码": query_value(
"票据代码": fuzzy_match(
response["data"], "发票代码"
),
"开票日期": datetime.strptime(
query_value(response["data"], "开票日期"),
fuzzy_match(response["data"], "开票日期"),
"%Y-%m-%d",
),
"校验码": query_value(
"校验码": fuzzy_match(
response["data"], "校验码"
),
"开票金额": Decimal(
query_value(
fuzzy_match(
response["data"], "合计金额(小写)"
).replace("¥", "")
).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
),
"姓名": query_value(
"姓名": fuzzy_match(
response["data"], "购买方名称"
),
"购药及就医机构": query_value(
"购药及就医机构": fuzzy_match(
response["data"], "销售方名称"
),
"明细项": [
{
"名称": name,
"明细项": name,
"数量": Decimal(quantity).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
@ -1234,7 +1237,7 @@ if __name__ == "__main__":
],
)
],
"备注": query_value(response["data"], "备注"),
"备注": fuzzy_match(response["data"], "备注"),
}
)
case "医疗门诊收费票据" | "医疗住院收费票据":
@ -1300,7 +1303,7 @@ if __name__ == "__main__":
if isinstance(receipt["endtime"], dict)
else None
),
"校验码": query_value(
"校验码": fuzzy_match(
receipt["global_detail"]["region_specific"],
"校验码",
),
@ -1314,7 +1317,7 @@ if __name__ == "__main__":
"购药及就医机构": receipt["hospital_name"]["value"],
"明细项": [
{
"名称": (
"明细项": (
item["item_name"]["value"]
if isinstance(item["item_name"], dict)
else None
@ -1383,7 +1386,7 @@ if __name__ == "__main__":
case ("增值税发票", "药店"):
items = (
pandas.DataFrame(receipt["明细项"])
.groupby("名称") # 就相同明细项名称合并数量和金额
.groupby("明细项") # 就相同明细项合并数量和金额
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.loc[
lambda dataframe: dataframe["金额"] != 0
@ -1391,10 +1394,10 @@ if __name__ == "__main__":
.reset_index()
.pipe(
lambda dataframe: dataframe.join(
dataframe["名称"]
dataframe["明细项"]
.apply(
parse_name
) # 根据明细项名称解析明细项类别和具体内容,并根据明细项名称中具体内容查询药品/医疗服务
parse_item
) # 根据明细项解析明细项类别和具体内容,并根据具体内容查询药品/医疗服务
.apply(
pandas.Series
) # 就明细项类别和药品/医疗服务元组展开为两列
@ -1423,14 +1426,15 @@ if __name__ == "__main__":
receipt.update(
{
"起期": receipt["开票日期"],
"止期": receipt["开票日期"],
"事故起期": receipt["开票日期"],
"事故止期": receipt["开票日期"],
"姓名": (
dossier["出险人层"]["姓名"]
if dossier["出险人层"]["姓名"] in receipt["姓名"]
dossier["出险人层"]["出险人"]
if dossier["出险人层"]["出险人"] in receipt["姓名"]
else receipt["姓名"]
),
"购药及就医类型": "药店购药",
"事故诊断": "购药拟诊",
"个人自费": Decimal("0.00"),
"个人自付": Decimal("0.00"),
"医保支付": Decimal("0.00"),
@ -1495,12 +1499,17 @@ if __name__ == "__main__":
):
raise RuntimeError("请求深圳快瞳银行卡识别接口发生异常或非借记卡")
# noinspection PyTypeChecker
dossier["受益人层"].update(
dossier["出险人层"].update(
{
"开户行": response["data"]["bankInfo"],
"户名": "",
"户号": response["data"]["cardNo"].replace(" ", ""),
"手机号": "",
"手机号": None,
}
)
# noinspection PyTypeChecker
dossier["领款人层"].update(
{
"领款人": None,
"银行": response["data"]["bankInfo"],
"账号": response["data"]["cardNo"].replace(" ", ""),
}
)
@ -1541,9 +1550,6 @@ if __name__ == "__main__":
# 初始化赔案档案保险公司将提供投保公司、保险分公司和报案时间等TPA作业系统签收后生成赔案号
dossier = {
"报案层": {
"投保公司": (
insurance_company := "中国银行股份有限公司昆山分行"
), # 指定投保公司
"保险分公司": (
insurer_company := "中银保险有限公司苏州分公司"
), # 指定保险分公司
@ -1553,7 +1559,7 @@ if __name__ == "__main__":
"影像件层": [],
"出险人层": {},
"被保险人层": [],
"受益人层": {},
"领款人层": {},
"票据层": [],
}
@ -1595,311 +1601,40 @@ if __name__ == "__main__":
dossier["影像件层"].append(image)
# 就影像件按照影像件类型排序
dossier["影像件层"].sort(
key=lambda x: [
"居民户口簿",
"居民身份证(国徽面)",
"居民身份证(头像面)",
"居民身份证(国徽、头像面)",
"中国港澳台地区及境外护照",
"理赔申请书",
"增值税发票",
"医疗门诊收费票据",
"医疗住院收费票据",
"医疗费用清单",
"银行卡",
"其它",
].index(x["影像件类型"])
) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保险人信息
# 就影像件按照影像件类型排序
dossier["影像件层"].sort(
key=lambda x: [
"居民户口簿",
"居民身份证(国徽面)",
"居民身份证(头像面)",
"居民身份证(国徽、头像面)",
"中国港澳台地区及境外护照",
"理赔申请书",
"增值税发票",
"医疗门诊收费票据",
"医疗住院收费票据",
"医疗费用清单",
"银行卡",
"其它",
].index(x["影像件类型"])
) # 优先居民户口簿、居民身份证、中国港澳台地区及境外护照和理赔申请书以查询被保险人信息
# 遍历影像件层中影像件
for image in dossier["影像件层"]:
# 影像件识别并整合至赔案档案
image_recognize(
image,
insurer_company,
)
for receipt in dossier["票据层"]:
print(receipt)
print(dossier["被保险人层"])
"""
case "增值税发票" | "门诊收费票据" | "住院收费票据":
extraction = invoice_extraction()
# 若发生异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["发票层"].append(
{
"关联影像件序号": image_index,
"票据类型": extraction["票据类型"],
"票据号码": extraction["票据号码"],
"票据代码": (
extraction["票据代码"]
if extraction["票据代码"]
else "--"
), # 数电票无票据代码,校验码同票据号码
"开票日期": datetime.strptime(
extraction["开票日期"], "%Y-%m-%d"
),
"校验码后六位": (
check_code[-6:]
if (check_code := extraction["校验码"])
else "--"
),
"医药机构": extraction["收款方"],
"就诊人": (
match.group("name")
if (
match := re.search(
r"^(?P<name>[^(]+)", extraction["付款方"]
)
)
else extraction["付款方"]
),
"票据金额": Decimal(extraction["票据金额"]).quantize(
Decimal("0.00"), rounding=ROUND_HALF_UP
), # 默认金额转为小数,保留两位小数
"查验状态": extraction["查验状态"],
"项目": (
pandas.DataFrame(extraction["项目"])
.assign(
数量=lambda dataframe: dataframe["数量"].apply(
lambda row: (
Decimal(row).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if row
else Decimal("0.00")
)
),
金额=lambda dataframe: dataframe["金额"].apply(
lambda row: (
Decimal(row).quantize(
Decimal("0.00"),
rounding=ROUND_HALF_UP,
)
if row
else Decimal("0.00")
)
),
)
.groupby(by="名称", as_index=False)
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.assign(
大项=lambda dataframe: dataframe["名称"].apply(
lambda row: (
match.group("category")
if (
match := re.match(
r"^\*(?P<category>.+?)\*.*$",
row,
)
)
else row
)
),
小项=lambda dataframe: dataframe["名称"].apply(
lambda row: (
re.sub(
r"[^\u4e00-\u9fa5a-zA-Z0-9./%*]",
"",
match.group("name"),
)
if (
match := re.match(
r"^\*.+?\*(?:\[[^]]+])?(?P<name>[^\s(]+)(?:\([^\s(]+\))?(?:.*?)?$",
row,
)
)
else ""
)
),
)
.loc[
lambda dataframe: dataframe["金额"] != 0,
["名称", "大项", "小项", "数量", "金额"],
]
.to_dict(orient="records")
),
"就诊类型": (
"药店购药"
if "增值税发票" in image_type
else (
"门诊就诊"
if "门诊收费票据" in image_type
else "住院治疗"
)
),
}
)
case "理赔申请书":
# 根据保险总公司匹配理赔申请书
# noinspection PyUnreachableCode
match insurer:
case "中银保险有限公司":
extraction = common_extraction(
application_form="中行员工福利保障计划索赔申请书"
)
# 若识别异常则跳过该影像件
if extraction is None:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["赔案层"]["申请人信息"].update(
{
"与被保险人关系": "本人", # 中银保险有限公司:默认申请人与被保险人关系为本人
"年龄": (
Decimal(age).quantize(
Decimal("0"),
rounding=ROUND_HALF_UP,
)
if (
age := extraction.get("基础信息", {}).get(
"年龄", "--"
)
).isdigit()
else age
), # 若年龄仅数字则转为小数、取整,否则默认为“--”
"手机号": (
phone_number
if re.match(
r"^1[3-9]\d{9}$",
phone_number := extraction.get(
"基础信息", {}
).get("手机", "--"),
)
else phone_number
), # 若手机未正则匹配手机号格式则为“--”
}
)
dossier["赔案层"]["受益人信息"].update(
{
"与被保险人关系": "本人", # 中银保险有限公司:默认受益人与被保人关系为本人
"户名": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("户名")
)
else "--"
), # 若户名为NONE则为“--”
"开户银行": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("开户银行")
)
else "--"
), # 若开户银行为NONE则为“--”
"银行账号": (
account_name
if (
account_name := extraction.get(
"基础信息", {}
).get("账号")
)
is not None
else "--"
), # 若银行账号为NONE则为“--”
}
)
dossier["赔案层"]["其它信息"]["自述症状"] = (
("".join(diagnoses))
if (
diagnoses := sorted(
set(
"".join(
[
diagnosis
for invoice in extraction.get(
"票据表格", []
)
if (
diagnosis := invoice.get("诊断")
)
]
).split("")
)
)
)
else "--"
)
case _:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
case _:
dossier["影像件层"][-1]["已识别"] = "否,无法识别"
continue
dossier["影像件层"][-1].update(
{
"已识别": "",
"识别结果": extraction,
}
# 遍历影像件层中影像件
for image in dossier["影像件层"]:
# 影像件识别并整合至赔案档案
image_recognize(
image,
insurer_company,
)
# 就票据层按照事故止期和票据号顺序排序
dossier["票据层"].sort(key=lambda x: (x["事故止期"], x["票据号"]))
# 发票层根据开票日期顺序排序
dossier["发票层"] = sorted(
dossier["发票层"], key=lambda x: (x["开票日期"], x["票据号码"])
)
# 就
# 构建小项层
# noinspection PyTypeChecker
dossier["小项层"] = (
pandas.DataFrame(
[
{
"小项": item["小项"],
"数量": item["数量"],
"金额": item["金额"],
}
for invoice in dossier["发票层"]
for item in invoice["项目"]
]
)
.groupby(by="小项", as_index=False)
.agg(数量=("数量", "sum"), 金额=("金额", "sum"))
.to_dict(orient="records")
)
for receipt in dossier["票据层"]:
print(receipt)
for invoice in dossier["发票层"]:
# noinspection PyTypeChecker
invoice["推定疾病"] = disease_diagnosis(
items="".join(sorted(set([item["小项"] for item in invoice["项目"]])))
)
print(dossier)
exit()
with open(f"dossiers/{case_number}.html", "w", encoding="utf-8") as file:
file.write(
template.render(
{
"dossier": dossier,
}
)
)
"""
print(dossier["被保险人层"])
print(dossier["出险人层"])
print(dossier["领款人层"])
print(dossier["报案层"])

View File

@ -2,241 +2,289 @@
"""
营销短视频生成自动化
功能清单
1打开并读取任务
2按照初始化剪映草稿分别添加视频轨音频轨字幕轨和图片轨等生成视频执行任务
"""
import asyncio
from pathlib import Path
import edge_tts
task = {
"video_path": "", # 视频路径
"video_cover_path": "", # 视频首帧图片路径,可空
"pictures": [
{
"position": (0.0, 0.0), # 图片左上角X和Y位置于视频宽度百分比
"size_ratio": "", # 图片尺寸比例,于视频百分比
"picture_path": "", # 图片尺寸比例,于视频百分比
}, # 图片
],
"texts": [
{
"content": "", # 文字内容
"start": "", # 文字显示开始时间
"duration": "", # 文字展示时长,若为空值则默认为视频播放时长
"background_position": (0.0, 0.0), # 背景左上角X和Y位置
"background_color": "", # 背景颜色
"background_opacity": "", # 背景透明度
"stroke_color": "", # 边框颜色
"stroke_width": "", # 边框线宽
"font_size": "", # 字体大小
"font_color": "", # 字体颜色
},
],
}
from pycapcut import DraftFolder
import sys
# 指定剪映草稿文件夹地址
folder_path = {
"win32": r"C:\Users\admin\AppData\Local\JianyingPro\User Data\Projects\com.lveditor.draft".replace(
"\\", "/"
), # 若当前系统为windows则将剪映草稿文件夹原始字符串避免转义问题中反斜杠"\"转为正斜杠“/”
"darwin": None,
}.get(sys.platform)
if not folder_path:
raise RuntimeError("未指定剪映草稿文件夹地址")
# 初始化剪映草稿文件夹
folder = DraftFolder(folder_path)
print(folder)
exit()
async def audio_gen(
text: str, output_path: Path, voice_name: str = "zh-CN-XiaoxiaoNeural"
):
# 可用的中文语音列表
# chinese_voices = {
# "女声-晓晓": "zh-CN-XiaoxiaoNeural", # 年轻女声,自然
# "女声-晓辰": "zh-CN-XiaochenNeural", # 年轻女声,温暖
# "女声-晓墨": "zh-CN-XiaomoNeural", # 女声,富有表现力
# "男声-晓悠": "zh-CN-XiaoyouNeural", # 儿童音,可爱
# "男声-晓涵": "zh-CN-XiaohanNeural", # 年轻男声,温暖
# "男声-晓睿": "zh-CN-XiaoruiNeural", # 成熟男声,沉稳
# "女声-晓倩": "zh-CN-XiaoqianNeural", # 女声,成熟
# "男声-晓东": "zh-CN-XiaodongNeural", # 男声,权威
# }
# 生成语音
communicate = edge_tts.Communicate(
text, voice_name, rate="+0%", pitch="+0Hz", volume="+0%"
)
await communicate.save(output_path)
print(f"语音已保存到: {output_path}")
# ----
from typing import Optional, Tuple, Union
import pycapcut as capcut
from pycapcut import KeyframeProperty, SEC
from pycapcut import FontType, TextStyle, ClipSettings
from typing import List
import srt
from pycapcut import tim, trange
async def video_merge_audio_and_caption(
video_path: Path,
audioPathList: List[Path],
image_path: Path,
subtitleList: List[srt.Subtitle],
output_path: Path,
):
PROJECT_DIR = "C:\\Users\\z4938\\AppData\\Local\\JianyingPro\\User Data\\Projects\\com.lveditor.draft"
class Draft:
"""剪映脚本生成器类"""
draft_folder = capcut.DraftFolder(PROJECT_DIR)
# noinspection PyShadowingNames
def __init__(
self,
draft_name: str,
drafts_path: str = r"C:\Users\admin\AppData\Local\JianyingPro\User Data\Projects\com.lveditor.draft",
stocks_path: Path = Path(__file__).parent / "materials",
video_width: int = 1920,
video_height: int = 1080,
):
"""
初始化剪映草稿生成器
:param drafts_path: 草稿文件夹路径
:param stocks_path: 素材文件夹路径
:param draft_name: 草稿名称
:param video_width: 视频宽度
:param video_height: 视频高度
"""
# 基础配置
self.drafts_path = drafts_path
self.stocks_path = stocks_path
self.draft_name = draft_name
self.video_width, self.video_height = video_width, video_height
script = draft_folder.create_draft("新草稿", 1920, 1080)
print("成功创建 draft剪映项目草稿")
# 尝试创建草稿文件夹和草稿
# noinspection PyBroadException
try:
self.draft_folder = capcut.DraftFolder(self.drafts_path)
self.draft = self.draft_folder.create_draft(
self.draft_name, self.video_width, self.video_height, allow_replace=True
)
# 添加视频轨
video_track_id = 1
script.add_track(
capcut.TrackType.video, track_name="视频", relative_index=video_track_id
)
video_mat = capcut.VideoMaterial(video_path)
video_seg = capcut.VideoSegment(video_mat, capcut.Timerange(0, video_mat.duration))
video_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration - SEC, 1.0)
video_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration, 0.0)
video_seg.volume = 0
script.add_segment(video_seg, "视频")
print(
f"视频素材导入成功素材ID{video_seg.material_id}, 片段id{video_seg.segment_id}"
# 添加基础轨道:音频、视频、文本(图片/贴纸复用视频轨)
self.draft.add_track(capcut.TrackType.audio)
self.draft.add_track(capcut.TrackType.video)
self.draft.add_track(capcut.TrackType.text)
except:
raise RuntimeError("创建草稿文件夹和草稿发生异常")
# 检查素材文件夹是否存在,若不存在则抛出异常
if not self.stocks_path.exists():
raise FileNotFoundError(f"素材文件夹不存在")
def _check_path(self, file_name: str) -> str:
"""
检查文件是否存在若存在则返回文件路径
:param file_name: 文件名称
:return 文件路径
"""
file_path = self.stocks_path / file_name
if not file_path.exists():
raise FileNotFoundError(f"素材文件不存在")
return file_path.as_posix()
def add_audio(
self,
file_name: str,
start_time: str,
duration: str,
volume: float = 1.0,
fade_in: str = "0s",
fade_out: str = "0s",
) -> capcut.AudioSegment:
"""
添加音频片段
:param file_name: 音频文件名称
:param start_time: 音频在轨道上的开始时间 "0s"
:param duration: 音频持续时长 "1s"
:param volume: 音量 1.0
:param fade_in: 淡入时长 0s
:param fade_out: 淡出时长 0s
Returns: capcut.AudioSegment
"""
try:
# 创建音频片段
audio_segment = capcut.AudioSegment(
self._check_path(file_name), trange(start_time, duration), volume=volume
)
# 添加淡入淡出效果
audio_segment.add_fade(fade_in, fade_out)
self.draft.add_segment(audio_segment)
return audio_segment
except Exception:
raise RuntimeError("添加音频片段发生异常")
def add_video(
self,
file_name: str,
start_time: str,
duration: str,
animation: Optional[capcut.IntroType] = None,
transition: Optional[capcut.TransitionType] = None,
keyframes: Optional[list] = None,
) -> capcut.VideoSegment:
"""
添加视频片段
:param file_name: 视频文件名称
:param start_time: 视频在轨道上的开始时间 "0s"
:param duration: 视频持续时长 "1s"
:param animation: 动画配置
:param transition: 转场配置
:param keyframes: 关键帧配置
:return capcut.VideoSegment
"""
try:
# 创建视频片段
video_segment = capcut.VideoSegment(
self._check_path(file_name), trange(start_time, duration)
)
# 添加动画
if animation:
video_segment.add_animation(animation)
# 添加转场
if transition:
video_segment.add_transition(transition)
# 添加关键帧
if keyframes:
# noinspection PyShadowingBuiltins
for property, time, value in keyframes:
video_segment.add_keyframe(property, time, value)
self.draft.add_segment(video_segment)
return video_segment
except Exception:
raise RuntimeError("添加视频片段发生异常")
def add_image(
self,
file_name: str,
start_time: Union[str, tim],
duration: Optional[str] = None,
background_filling: Tuple[str, float] = ("blur", 0.0625),
) -> capcut.VideoSegment:
"""
添加图片/贴纸
:param file_name: 图片文件名称
:param start_time: 图片在轨道上的开始时间 "0s"
:param duration: 图片持续时长 "1s"
:param background_filling: 背景填充配置
:return: capcut.VideoSegment
"""
try:
# 创建图片素材
image_material = capcut.VideoMaterial(self._check_path(file_name))
# 创建图片片段
image_segment = capcut.VideoSegment(
image_material,
trange(start_time, duration if duration else image_material.duration),
) # 若已设置图片持续时长则使用,否则使用视频持续时长
# 添加背景填充
if background_filling:
image_segment.add_background_filling(*background_filling)
self.draft.add_segment(image_segment)
return image_segment
except Exception:
raise RuntimeError("添加图片/贴纸发生异常")
def add_text(
self,
content: str,
target_timerange: trange,
font: capcut.FontType = capcut.FontType.悠然体,
color: Tuple[float, float, float] = (1.0, 1.0, 0.0),
position_y: float = -0.8,
outro_animation: Optional[capcut.TextOutro] = capcut.TextOutro.故障闪动,
anim_duration: tim = tim("1s"),
bubble_id: Optional[str] = "7446997603268496646",
effect_id: Optional[str] = "7336825073334078725",
) -> capcut.TextSegment:
"""
添加文字片段
Args:
content: 文字内容
target_timerange: 文字显示的时间范围通常对齐视频片段
font: 字体类型
color: 文字颜色RGB0-1
position_y: 文字Y轴位置-0.8为屏幕下方
outro_animation: 出场动画None则不添加
anim_duration: 动画时长
bubble_id: 气泡效果IDNone则不添加
effect_id: 花字效果IDNone则不添加
Returns:
capcut.TextSegment: 创建的文字片段对象
"""
try:
# 创建文字片段
text_segment = capcut.TextSegment(
content,
target_timerange,
font=font,
style=capcut.TextStyle(color=color),
clip_settings=capcut.ClipSettings(transform_y=position_y),
)
# 添加出场动画
if outro_animation:
text_segment.add_animation(outro_animation, duration=anim_duration)
# 添加气泡
if bubble_id:
text_segment.add_bubble(bubble_id, bubble_id)
# 添加花字
if effect_id:
text_segment.add_effect(effect_id)
self.draft.add_segment(text_segment)
return text_segment
except Exception:
raise RuntimeError("添加文字片段发生异常")
def save(self) -> None:
"""保存草稿"""
try:
self.draft.save()
print("草稿保存成功")
except Exception:
raise RuntimeError(f"保存草稿发生异常")
# ======================== 调用示例(使用抽象后的方法) ========================
def main():
"""生成脚本"""
# 实例化
draft = Draft(
draft_name="demo2",
)
# 添加logo图片轨
logo_track_id = 2
script.add_track(
capcut.TrackType.video, track_name="logo图片", relative_index=logo_track_id
)
logo_mat = capcut.VideoMaterial(image_path)
logo_seg = capcut.VideoSegment(logo_mat, capcut.Timerange(0, video_mat.duration))
logo_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration - SEC, 1.0)
logo_seg.add_keyframe(KeyframeProperty.alpha, video_seg.duration, 0.0)
script.add_segment(logo_seg, "logo图片")
print(f"logo导入成功视频轨道id{logo_track_id}")
# 添加音频轨
script.add_track(capcut.TrackType.audio, track_name="音频", relative_index=3)
for audio_path in audioPathList:
audio_mat = capcut.AudioMaterial(audio_path)
audio_seg = capcut.AudioSegment(
audio_mat,
capcut.Timerange(0, audio_mat.duration),
source_timerange=capcut.Timerange(0, audio_mat.duration),
)
script.add_segment(audio_seg, "音频")
print(
f"音频素材导入成功素材ID{audio_seg.material_id}, 片段id{audio_seg.segment_id}"
# 添加音频
draft.add_audio(
file_name="audio.mp3",
start_time="0s",
duration="5s",
volume=0.6,
)
# 添加字幕轨
script.add_track(capcut.TrackType.text, track_name="字幕", relative_index=4)
for subtitle in subtitleList:
text_seg = capcut.TextSegment(
subtitle.content,
capcut.Timerange(
subtitle.start.total_seconds(), subtitle.end.total_seconds()
),
font=FontType.下午茶,
style=TextStyle(
size=5.0,
color=(0.7, 0.7, 1.0),
auto_wrapping=True,
underline=True,
align=1,
),
clip_settings=ClipSettings(transform_y=-0.8),
)
script.add_segment(text_seg, "字幕")
print("字幕添加成功")
export_result = script.export(
output_path=output_path, resolution="1080p", fps=30, quality="high"
# 添加视频
video_segment = draft.add_video(
file_name="video.mp4",
start_time="0s",
duration="4.2s",
keyframes=[
(capcut.KeyframeProperty.position_x, tim(0), -2),
(capcut.KeyframeProperty.position_x, tim("0.5s"), 0),
],
)
print(f"视频导出完成result: {export_result}, 成品路径:{output_path}")
# ----
from datetime import timedelta
from moviepy import AudioFileClip
async def main():
TEXT_PATH_STR = "D:\\develop\\trkj\\resources\\tts_text.txt"
AUDIO_PATH_STR_FMT = "D:\\develop\\trkj\\resources\\audio_tts_{}.aac"
SRT_PATH_STR = "D:\\develop\\trkj\\resources\\tts_text_srt.srt"
VIDEO_PATH_STR = "D:\\develop\\trkj\\resources\\test_video.mp4"
IMAGE_PATH_STR = "D:\\develop\\trkj\\resources\\logo.png"
OUTPUT_PATH_STR = "D:\\develop\\trkj\\resources\\video_output.mp4"
videoPath = Path(VIDEO_PATH_STR)
audioPathList = list()
subtitleList = list()
imagePath = Path(IMAGE_PATH_STR)
outputPath = Path(OUTPUT_PATH_STR)
with open(TEXT_PATH_STR, "r", encoding="UTF-8") as text_f:
line = text_f.readline()
i = 0
next_start_seconds = timedelta(seconds=0.0)
while line:
audioPath = AUDIO_PATH_STR_FMT.format(i)
# 每行生成音频
await audio_gen(line, audioPath)
audioPathList.append(audioPath)
# 创建音频对应的字幕
audio = AudioFileClip(audioPath)
try:
current_start_seconds = next_start_seconds
current_end_seconds = next_start_seconds + timedelta(
seconds=audio.duration
)
subtitle = srt.Subtitle(
index=i + 1,
start=current_start_seconds,
end=current_end_seconds,
content=line.strip(),
)
subtitleList.append(subtitle)
finally:
audio.close()
line = text_f.readline()
i = i + 1
next_start_seconds = current_end_seconds
srt_content = srt.compose(subtitleList)
with open(SRT_PATH_STR, "w", encoding="UTF-8") as srt_file:
srt_file.write(srt_content)
print(f"字幕文件已生成: {SRT_PATH_STR}")
await video_merge_audio_and_caption(
videoPath, audioPathList, imagePath, subtitleList, outputPath
# 添加图片/贴纸
draft.add_image(
file_name="sticker.gif",
start_time=video_segment.end, # 视频结束位置开始
background_filling=("blur", 0.0625),
)
# 添加文字
draft.add_text(
content="抽象化后更易扩展!",
target_timerange=video_segment.target_timerange,
position_y=-0.5, # 微调位置
)
# 保存草稿
draft.save()
if __name__ == "__main__":
asyncio.run(main())
main()

Binary file not shown.

Binary file not shown.

After

Width:  |  Height:  |  Size: 81 KiB

Binary file not shown.