This commit is contained in:
liubiren 2026-02-25 21:38:46 +08:00
parent 17280f0d2a
commit 796c9a64fe
2 changed files with 41 additions and 241 deletions

View File

@ -0,0 +1,5 @@
# -*- coding: utf-8 -*-
"""
主模块
"""

View File

@ -3,27 +3,24 @@
导出草稿模块
"""
from copy import deepcopy
import hashlib
import json
from pathlib import Path
from pathlib import WindowsPath
import random
import re
import shutil
import subprocess
import sys
import time
from typing import Any, Dict, List, Optional
from uuid import uuid4
from pathlib import WindowsPath
import pyJianYingDraft
import win32con
import win32gui
import hashlib
from draft import JianYingDraft
import sys
import pyJianYingDraft
from draft import JianYingDraft
from utils.sqlite import SQLite
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.sqlite import SQLite
# 自定义JSON编码器
@ -73,7 +70,7 @@ class Caches(SQLite):
"""
)
except Exception as exception:
raise RuntimeError(f"初始化缓存发生异常:{str(exception)}") from exception
raise RuntimeError(f"初始化缓存发生异常:{str(exception)}") from exception
def query(self, draft_name: str) -> Optional[Dict[str, Any]]:
"""
@ -186,28 +183,25 @@ class JianYingExport:
# 初始化所有工作流
self.workflows = {
"0000": [
"默认": [
"add_subtitles",
"add_background_video",
"add_statement",
"add_sticker1",
"add_sticker2",
"save",
],
"0001": [
], # 默认工作流先根据脚本合成音频再叠加背景视频、声明视频、贴纸1视频和贴纸2视频
"淘宝闪购": [
"add_subtitles_video", # 以此作为草稿持续时长
"add_background_video",
"add_background_audio",
"add_statement_video",
"save",
], # 适用于淘宝闪购、存量抽手机
"0002": [
"add_subtitles_video",
"视频号": [
"add_subtitles_video", # 以此作为草稿持续时长
"add_background_video",
"add_background_audio",
"add_logo_video",
"add_statement_video",
"save",
], # 适用于视频号
}
@ -395,7 +389,6 @@ class JianYingExport:
},
], # 图像调节设置
}, # 添加贴纸2工作配置
"save": {}, # 保存
}
self.video_width, self.video_height = video_width, video_height
@ -515,40 +508,19 @@ class JianYingExport:
:param draft_counts: 每批次导出草稿数
"""
if workflow_name not in self.workflows:
raise RuntimeError(f"未配置该工作流")
raise RuntimeError(f"该工作流 {workflow_name} 未配置")
workflow = self.workflows[workflow_name]
# 若工作流包含添加背景音频则在添加背景视频节点配置的播放音量设置为0
if "add_background_audio" in workflow:
self.configurations["add_background_video"]["volume"] = [0.0]
# 按照工作流和工作配置拼接素材,批量生成草稿
batch_draft_names = self._batch_generate_drafts(
# 批量生成草稿
self._batch_generate_drafts(
workflow_name=workflow_name,
draft_counts=draft_counts,
)
for draft_name in batch_draft_names:
print(f"正在导出 {draft_name}...")
if (self.exports_folder_path / f"{draft_name}.mp4").is_file():
print("存在相同名称的草稿,跳过")
continue
jianying_controller.export_draft(
draft_name=draft_name,
output_path=self.exports_folder_path.as_posix(),
)
print("已完成")
print()
# 关闭剪映专业版进程
self._close_process()
time.sleep(2)
for draft_name in batch_draft_names:
# 就已导出草稿删除
self.drafts_folder.remove(draft_name=draft_name)
time.sleep(2)
def _batch_generate_drafts(
self,
workflow_name: str,
@ -576,7 +548,6 @@ class JianYingExport:
continue
print(f"正在生成草稿 {draft_name}...")
draft_index += 1
# 实例化 JianYingDraft
draft = JianYingDraft(
@ -641,23 +612,18 @@ class JianYingExport:
draft.save()
print("已完成")
if "add_subtitles" in self.workflow:
# 高亮关键词
self._highlight_keywords(draft_name=draft_name)
self.draft_names.append(draft_name)
with open(
file=self.exports_folder_path / f"{uuid4().hex.upper()}.txt",
mode="w",
encoding="utf-8",
) as file:
file.write(f"{configuration}")
# 缓存
self.caches.update(
draft_name=draft_name,
workflow_configurations=workflow_configurations,
)
print("已完成")
print()
# 就所有草稿名称倒叙排序排序
self.draft_names.sort(reverse=True)
draft_index += 1
if draft_index > draft_counts:
break
def _get_workflow_configurations(
self,
@ -676,8 +642,8 @@ class JianYingExport:
key: random.choice(value)
for key, value in self.configurations[node_name].items()
}
# 若非添加字幕或保存则在工作流配置添加轨道名称
if node_name not in ["add_subtitles", "save"]:
# 若非添加字幕则在工作流配置添加轨道名称
if node_name not in ["add_subtitles"]:
configurations["track_name"] = (
matched.group("track_name")
if (
@ -696,6 +662,13 @@ class JianYingExport:
}
)
# 添加保存节点
workflow_configurations.append(
{
"node_name": "save",
"configurations": {},
}
)
return workflow_configurations
def _generate_draft_name(
@ -715,185 +688,7 @@ class JianYingExport:
sort_keys=True,
ensure_ascii=False,
).encode("utf-8")
) # 将工作流配置序列化再以MD5哈希值作为草稿名称
) # 将工作流配置序列化
.hexdigest()
.upper()
.upper() # MD5哈希值的大写十六进制作为草稿名称
)
def _highlight_keywords(
self,
draft_name: str,
) -> None:
"""
高亮关键词
:param draft_name: 草稿名称
:return:
"""
time.sleep(2)
# 草稿内容路径
draft_content_path = self.drafts_folder_path / draft_name / "draft_content.json"
with open(
file=draft_content_path,
mode="r",
encoding="utf-8",
) as file:
draft_content = json.load(file)
# 字幕文本轨道所有文本片段的素材标识
material_ids = [
segment["material_id"]
for track in draft_content["tracks"]
if track["name"] == "subtitles(text)"
for segment in track["segments"]
]
# 遍历所有文本素材
for idx, material in enumerate(draft_content["materials"]["texts"]):
if material["id"] in material_ids:
# 素材内容
content = json.loads(s=material["content"])
# 素材文本
text = content["text"]
# 遍历关键词
for keyword in self.configuration["add_subtitles"]["keywords"]:
if match := next(re.finditer(pattern=keyword, string=text), None):
assert len(styles := content["styles"]) == 1, "样式设置数不为1"
# 根据关键词将文本拆分为三段,分别为前段、中段和后段样式。其中前段和后段样式无花字设置,中段样式有花字设置
middle_style = styles[0]
style = {
key: value
for key, value in middle_style.items()
if key != "effectStyle"
}
front_style = deepcopy(style)
rear_style = deepcopy(style)
# 将前段样式中终止位置设置为关键词起始位置
front_style["range"] = [0, match.start()]
# 将中段样式中起始和终止位置设置为关键词起始和终止位置
middle_style["range"] = [match.start(), match.end()]
# 调整字号
middle_style["size"] += 3
if match.end() != len(text):
# 将后段样式中起始位置设置为关键词终止位置
rear_style["range"] = [match.end(), len(text)]
styles = [front_style, middle_style, rear_style]
else:
styles = [front_style, middle_style]
draft_content["materials"]["texts"][idx]["content"] = (
json.dumps(
obj={
"styles": [front_style, middle_style, rear_style],
"text": text,
},
ensure_ascii=False,
)
)
with open(
file=draft_content_path,
mode="w",
encoding="utf-8",
) as file:
file.write(
json.dumps(
obj=draft_content,
default=lambda x: x.name if isinstance(x, Path) else x,
ensure_ascii=False,
indent=4,
)
)
def _start_process(self, timeout: int = 60) -> None:
"""
启动剪映专业版进程
:param timeout: 最大等待时间单位为秒默认为 60
:return:
"""
try:
# 关闭剪映专业版进程
self._close_process()
# 非堵塞方法
self.jianying_process = subprocess.Popen(
args=self.program_path.as_posix(),
shell=True, # 适配 Windows路径中的空格
stdout=subprocess.DEVNULL, # 重定向
stderr=subprocess.DEVNULL,
creationflags=subprocess.CREATE_NEW_CONSOLE,
)
start_time = time.time()
while time.time() - start_time < timeout:
# 定位剪映程序窗口
if self._locate_window() is not None:
print(f"已启动剪映专业版进程PID {self.jianying_process.pid}")
return
time.sleep(2)
raise RuntimeError("启动超时")
except Exception as exception:
raise RuntimeError(
f"启动剪映专业版进程发生异常:{str(exception)}"
) from exception
def _close_process(self, timeout: int = 60) -> None:
"""
关闭剪映专业版进程
:param timeout: 最大等待时间单位为秒默认为 60
:return:
"""
try:
# 定位剪映程序窗口
window_handle = self._locate_window()
if window_handle is not None:
# 请求关闭剪映程序窗口
win32gui.SendMessage(window_handle, win32con.WM_CLOSE, 0, 0)
start_time = time.time()
while time.time() - start_time < timeout:
if not win32gui.IsWindow(window_handle):
print("已关闭剪映专业版进程")
return
time.sleep(2)
raise RuntimeError("关闭超时")
except Exception as exception:
raise RuntimeError(
f"关闭剪映专业版进程发生异常:{str(exception)}"
) from exception
@staticmethod
def _locate_window() -> Optional[int]:
"""
定位剪映程序窗口
:return: 剪映程序窗口句柄
"""
window_handle = None
def callback(handle, _):
"""
遍历所有窗口的回调函数
"""
# 初始化窗口句柄
nonlocal window_handle
# 获取窗口标题
window_text = win32gui.GetWindowText(handle)
# 检查窗口是否可见且窗口标题为剪映专业版
if (
win32gui.IsWindow(handle)
and win32gui.IsWindowVisible(handle)
and window_text == "剪映专业版"
):
window_handle = handle
return False
return True
# 遍历所有顶层窗口
win32gui.EnumWindows(callback, None)
return window_handle