# -*- coding: utf-8 -*- """ 导出草稿模块 """ from copy import deepcopy import json from pathlib import Path import random import re import subprocess import time from typing import Any, Dict, Optional from uuid import uuid4 import pyJianYingDraft import win32con import win32gui from draft import JianYingDraft class JianYingExport: """ 封装 pyJianYingDraft.JianyingController库,支持: 1、初始化素材文件夹内所有素材 2、初始化工作流和工作配置 3、导出草稿 """ def __init__( self, materials_folder_path: str, program_path: str = "E:\\JianYingPro\\5.9.0.11632\\JianYingPro.exe", # 仅可在windows运行该脚本 drafts_folder_path: str = "E:\\JianYingPro Drafts", draft_counts: int = 10, video_width: int = 1080, video_height: int = 1920, video_fps: int = 30, ): """ 初始化 :param program_path: 剪映程序路径 :param drafts_folder_path: 剪映草稿文件夹路径 :param materials_folder_path: 素材文件夹路径 :param draft_counts: 草稿数,默认为 10 :param video_width: 视频宽度,默认为 1080像素 :param video_height: 视频高度,默认为 1920像素 :param video_fps: 视频帧率(单位为帧/秒),默认为 30 """ try: self.program_path = Path(program_path) if not self.program_path.exists(): raise RuntimeError("剪映程序路径不存在") # 初始化剪映专业版进程 self.jianying_process = None self.drafts_folder_path = Path(drafts_folder_path) if not self.drafts_folder_path.exists(): raise RuntimeError("剪映草稿文件夹路径不存在") # 初始化草稿文件夹管理器 self.drafts_folder = pyJianYingDraft.DraftFolder( folder_path=self.drafts_folder_path.as_posix() ) self.materials_folder_path = Path(materials_folder_path) if not self.materials_folder_path.exists(): raise RuntimeError("素材文件夹路径不存在") # 初始化导出文件夹路径 self.exports_folder_path = Path( self.materials_folder_path.as_posix().replace("materials", "exports") ) self.exports_folder_path.mkdir() # 若导出文件夹存在则抛出异常,需手动处理 self.materials = {} # 初始化素材文件夹内所有素材 self._init_materials() # 构建项目名称 self.project_name = self.materials_folder_path.stem # 初始化所有工作流 self.workflows = { "0000": [ "add_subtitles", "add_background_video", "add_statement", "add_sticker1", "add_sticker2", "save", ], "0001": [ "add_subtitles_video", # 以此作为草稿持续时长 "add_background_video", "add_background_audio", "add_statement_video", "save", ], } # 初始化工作配置 self.configuration = { "add_subtitles": { "text": self.materials["subtitles_text"], "timbre": [ "zh-CN-XiaoxiaoNeural", "zh-CN-XiaoyiNeural", "zh-CN-YunjianNeural", "zh-CN-YunxiNeural", "zh-CN-YunxiaNeural", "zh-CN-YunyangNeural", ], # 音色 "style": [ {"size": 9.0}, {"size": 10.0}, {"size": 11.0}, ], # 字体样式 "keywords": [ "瑞幸", ], # 关键词 "effect": [ {"effect_id": "7127561998556089631"}, {"effect_id": "7166467215410187552"}, {"effect_id": "6896138122774498567"}, {"effect_id": "7166469374507765031"}, {"effect_id": "6896137924853763336"}, {"effect_id": "6896137990788091143"}, {"effect_id": "7127614731187211551"}, {"effect_id": "7127823362356743461"}, {"effect_id": "7127653467555990821"}, {"effect_id": "7127828216647011592"}, ], # 花字设置 }, # 添加字幕工作配置 "add_subtitles_video": { "material_path": self.materials["subtitles_video_material_path"], "volume": [1.0], # 播放音量 "clip_settings": [ None, ], # 图像调节设置 }, # 添加字幕视频工作配置 "add_background_video": { "material_path": self.materials["background_video_material_path"], "volume": [1.0], # 播放音量 "clip_settings": [ { "scale_x": 1.5, "scale_y": 1.5, }, ], # 图像调节设置 }, # 添加背景视频工作配置 "add_background_audio": { "material_path": self.materials["background_audio_material_path"], "volume": [0.6], # 播放音量 }, # 添加背景音频工作配置 "add_logo": { "material_path": self.materials["logo_material_path"], "clip_settings": [ { "scale_x": 0.2, "scale_y": 0.2, "transform_x": -0.78, "transform_y": 0.82, }, { "scale_x": 0.2, "scale_y": 0.2, "transform_x": -0.68, "transform_y": 0.82, }, { "scale_x": 0.2, "scale_y": 0.2, "transform_x": 0, "transform_y": 0.82, }, { "scale_x": 0.2, "scale_y": 0.2, "transform_x": 0.68, "transform_y": 0.82, }, { "scale_x": 0.2, "scale_y": 0.2, "transform_x": 0.78, "transform_y": 0.82, }, ], }, # 添加标识工作配置 "add_statement": { "text": self.materials["statement_text"], "style": [ {"size": 5.0, "align": 1, "vertical": True}, {"size": 6.0, "align": 1, "vertical": True}, {"size": 7.0, "align": 1, "vertical": True}, ], # 文本样式 "border": [ {"width": 35.0}, {"width": 40.0}, {"width": 45.0}, {"width": 50.0}, {"width": 55.0}, ], # 描边宽度 "clip_settings": [ { "transform_x": -0.80, }, { "transform_x": -0.82, }, { "transform_x": -0.84, }, ], # 图像调节设置 }, # 添加声明工作配置 "add_statement_video": { "material_path": self.materials["statement_video_material_path"], "volume": [1.0], # 播放音量 "clip_settings": [ None, ], # 图像调节设置 }, # 添加声明视频工作配置 "add_sticker1": { "resource_id": [ "7110124379568098568", "7019687632804334861", "6895933678262750478", "7010558788675652900", "7026858083393588487", "7222940306558209336", "7120543009489341727", "6939830545673227557", "6939826722451754271", "7210221631132749093", "7138432572488453408", "7137700067338620192", "6895924436822674696", "7134644683506044163", "7062539853430279437", ], "clip_settings": [ { "scale_x": 0.75, "scale_y": 0.75, "transform_x": -0.75, "transform_y": 0.75, }, { "scale_x": 0.75, "scale_y": 0.75, "transform_y": 0.75, }, { "scale_x": 0.75, "scale_y": 0.75, "transform_x": 0.75, "transform_y": 0.75, }, ], # 图像调节设置 }, # 添加贴纸1工作配置(不包含箭头类) "add_sticker2": { "resource_id": [ "7143078914989018379", "7142870400358255905", "7185568038027103544", "7024342011440319781", "7205042602184363322", ], "clip_settings": [ { "scale_x": 0.75, "scale_y": 0.75, "transform_x": -0.8, "transform_y": -0.62, }, ], # 图像调节设置 }, # 添加贴纸2工作配置 } # 初始化工作流 self.workflow = [] self.video_width, self.video_height = video_width, video_height self.video_fps = video_fps self.draft_counts = draft_counts # 初始化所有草稿名称 self.draft_names = [] except Exception as exception: raise RuntimeError(f"发生异常:{str(exception)}") from exception def _init_materials(self) -> None: """ 初始化素材文件夹内所有素材 :return: 无 """ # 字幕(文本) subtitles_path = self.materials_folder_path / "字幕.txt" if subtitles_path.exists() and subtitles_path.is_file(): with open(subtitles_path, "r", encoding="utf-8") as file: subtitles_text = file.readlines() if not subtitles_text: raise RuntimeError("字幕文本为空") self.materials["subtitles_text"] = subtitles_text else: self.materials["subtitles_text"] = [] # 字幕(视频) subtitles_video_folder_path = self.materials_folder_path / "字幕视频" if ( subtitles_video_folder_path.exists() and subtitles_video_folder_path.is_dir() ): self.materials["subtitles_video_material_path"] = [ file_path for file_path in subtitles_video_folder_path.rglob("*.mov") if file_path.is_file() ] else: self.materials["subtitles_video_material_path"] = [] # 背景视频 background_video_folder_path = self.materials_folder_path / "背景视频" if ( background_video_folder_path.exists() and background_video_folder_path.is_dir() ): self.materials["background_video_material_path"] = [ file_path for file_path in background_video_folder_path.rglob("*.mp4") if file_path.is_file() ] else: self.materials["background_video_material_path"] = [] # 背景音频 background_audio_folder_path = self.materials_folder_path / "背景音频" if ( background_audio_folder_path.exists() and background_audio_folder_path.is_dir() ): self.materials["background_audio_material_path"] = [ file_path for file_path in background_audio_folder_path.rglob("*.mp3") if file_path.is_file() ] else: self.materials["background_audio_material_path"] = [] # 标识 logo_path = self.materials_folder_path / "标识.png" if logo_path.exists() and logo_path.is_file(): self.materials["logo_material_path"] = [logo_path] # 有且只有一张标识 else: self.materials["logo_material_path"] = [] # 声明(声明) statement_path = self.materials_folder_path / "声明.txt" if statement_path.exists() and statement_path.is_file(): with open(statement_path, "r", encoding="utf-8") as file: statement_text = file.readlines() if not statement_text: raise RuntimeError("声明文本为空") self.materials["statement_text"] = statement_text else: self.materials["statement_text"] = [] # 声明(视频) statement_video_folder_path = self.materials_folder_path / "声明视频" if ( statement_video_folder_path.exists() and statement_video_folder_path.is_dir() ): self.materials["statement_video_material_path"] = [ file_path for file_path in statement_video_folder_path.rglob("*.mov") if file_path.is_file() ] else: self.materials["statement_video_material_path"] = [] def export(self, workflow_name: str = "0001", batch_draft_counts: int = 1): """ 导出草稿 :param workflow_name: 工作流名称 :param batch_draft_counts: 每批次导出草稿数 """ if workflow_name not in self.workflows: raise RuntimeError(f"未配置该工作流") self.workflow = self.workflows[workflow_name] # 若工作流包含添加背景音频,则将添加背景视频工作配置中播放音量设置为0 if "add_background_audio" in self.workflow: self.configuration["add_background_video"]["volume"] = [0.0] # 按照工作流和工作配置拼接素材,批量生成草稿 self._generate_drafts() # 批次导出 for batch_start in range(0, self.draft_counts, batch_draft_counts): # 当前批次所有草稿名称 batch_draft_names = self.draft_names[ batch_start : batch_start + batch_draft_counts ] # 启动剪映专业版进程 self._start_process() time.sleep(2) # 初始化剪映控制器 jianying_controller = pyJianYingDraft.JianyingController() for draft_name in batch_draft_names: print(f"正在导出 {draft_name}...") if (self.exports_folder_path / f"{draft_name}.mp4").is_file(): print("存在相同名称的草稿,跳过") continue jianying_controller.export_draft( draft_name=draft_name, output_path=self.exports_folder_path.as_posix(), ) print("已完成") print() # 关闭剪映专业版进程 self._close_process() time.sleep(2) for draft_name in batch_draft_names: # 就已导出草稿删除 self.drafts_folder.remove(draft_name=draft_name) time.sleep(2) def _generate_drafts( self, ) -> None: """ 按照工作流和工作配置拼接素材,批量生成草稿 :return: 无 """ for idx in range(self.draft_counts): # 构建草稿名称 draft_name = self.project_name + f"{idx + 1:03d}" print(f"正在合成短视频 {draft_name}, {idx + 1}/{self.draft_counts}...") # 实例化 JianYingDraft draft = JianYingDraft( drafts_folder=self.drafts_folder, draft_name=draft_name, video_width=self.video_width, video_height=self.video_height, video_fps=self.video_fps, materials_folder_path=self.materials_folder_path, ) # 初始化当前草稿工作配置 configuration = { "materials_folder_path": self.materials_folder_path.stem, "draft_name": draft_name, "workflows": [], } for work in self.workflow: # 获取工作配置 parameters = self._get_parameters(work=work) configuration["workflows"].append( { "work": work, "parameters": parameters, } ) match work: # 添加字幕 case "add_subtitles": print("-> 正在添加字幕...", end="") draft.add_subtitles(**parameters) print("已完成") # 添加字幕 case "add_subtitles_video": print("-> 正在添加字幕视频...", end="") draft.add_video_segment(**parameters) print("已完成") # 添加背景视频 case "add_background_video": print("-> 正在添加背景视频...", end="") draft.add_video_segment(**parameters) print("已完成") # 添加背景音频 case "add_background_audio": print("-> 正在添加背景音频...", end="") draft.add_audio_segment(**parameters) print("已完成") # 添加标识 case "add_logo": print("-> 正在添加标识...", end="") draft.add_video_segment(**parameters) print("已完成") # 添加声明 case "add_statement": print("-> 正在添加声明...", end="") draft.add_text_segment(**parameters) print("已完成") # 添加视频 case "add_statement_video": print("-> 正在添加声明视频...", end="") draft.add_video_segment(**parameters) print("已完成") # 添加贴纸 case _ if work.startswith("add_sticker"): print("-> 正在添加贴纸...", end="") draft.add_sticker(**parameters) print("已完成") # 将草稿保存至剪映草稿文件夹内 case "save": print("-> 正在将草稿保存至剪映草稿文件夹内...", end="") draft.save() print("已完成") if "add_subtitles" in self.workflow: # 高亮关键词 self._highlight_keywords(draft_name=draft_name) self.draft_names.append(draft_name) with open( file=self.exports_folder_path / f"{uuid4().hex.upper()}.txt", mode="w", encoding="utf-8", ) as file: file.write(f"{configuration}") print("已完成") print() # 就所有草稿名称倒叙排序排序 self.draft_names.sort(reverse=True) def _get_parameters( self, work: str, ) -> Dict[str, Any]: """ 获取工作配置 :param work: 工作,包括添加字幕、添加背景视频、添加标识、添加声明和添加贴纸 :return: 工作配置 """ if work == "save": return {} parameters = { key: random.choice(value) for key, value in self.configuration[work].items() } # TODO: 考虑融合贝叶斯优化 if work == "add_subtitles": parameters.pop("keywords") # 就除添加字幕其它工作添加轨道名称 if work != "add_subtitles" and ( match := re.search(r"_(?P.+)", work) ): parameters["track_name"] = match.group("track_name") return parameters def _highlight_keywords( self, draft_name: str, ) -> None: """ 高亮关键词 :param draft_name: 草稿名称 :return: 无 """ time.sleep(2) # 草稿内容路径 draft_content_path = self.drafts_folder_path / draft_name / "draft_content.json" with open( file=draft_content_path, mode="r", encoding="utf-8", ) as file: draft_content = json.load(file) # 字幕文本轨道所有文本片段的素材标识 material_ids = [ segment["material_id"] for track in draft_content["tracks"] if track["name"] == "subtitles(text)" for segment in track["segments"] ] # 遍历所有文本素材 for idx, material in enumerate(draft_content["materials"]["texts"]): if material["id"] in material_ids: # 素材内容 content = json.loads(s=material["content"]) # 素材文本 text = content["text"] # 遍历关键词 for keyword in self.configuration["add_subtitles"]["keywords"]: if match := next(re.finditer(pattern=keyword, string=text), None): assert len(styles := content["styles"]) == 1, "样式设置数不为1" # 根据关键词将文本拆分为三段,分别为前段、中段和后段样式。其中前段和后段样式无花字设置,中段样式有花字设置 middle_style = styles[0] style = { key: value for key, value in middle_style.items() if key != "effectStyle" } front_style = deepcopy(style) rear_style = deepcopy(style) # 将前段样式中终止位置设置为关键词起始位置 front_style["range"] = [0, match.start()] # 将中段样式中起始和终止位置设置为关键词起始和终止位置 middle_style["range"] = [match.start(), match.end()] # 调整字号 middle_style["size"] += 3 if match.end() != len(text): # 将后段样式中起始位置设置为关键词终止位置 rear_style["range"] = [match.end(), len(text)] styles = [front_style, middle_style, rear_style] else: styles = [front_style, middle_style] draft_content["materials"]["texts"][idx]["content"] = ( json.dumps( obj={ "styles": [front_style, middle_style, rear_style], "text": text, }, ensure_ascii=False, ) ) with open( file=draft_content_path, mode="w", encoding="utf-8", ) as file: file.write( json.dumps( obj=draft_content, default=lambda x: x.name if isinstance(x, Path) else x, ensure_ascii=False, indent=4, ) ) def _start_process(self, timeout: int = 60) -> None: """ 启动剪映专业版进程 :param timeout: 最大等待时间(单位为秒),默认为 60 :return: 无 """ try: # 关闭剪映专业版进程 self._close_process() # 非堵塞方法 self.jianying_process = subprocess.Popen( args=self.program_path.as_posix(), shell=True, # 适配 Windows路径中的空格 stdout=subprocess.DEVNULL, # 重定向 stderr=subprocess.DEVNULL, creationflags=subprocess.CREATE_NEW_CONSOLE, ) start_time = time.time() while time.time() - start_time < timeout: # 定位剪映程序窗口 if self._locate_window() is not None: print(f"已启动剪映专业版进程,PID {self.jianying_process.pid}") return time.sleep(2) raise RuntimeError("启动超时") except Exception as exception: raise RuntimeError( f"启动剪映专业版进程发生异常:{str(exception)}" ) from exception def _close_process(self, timeout: int = 60) -> None: """ 关闭剪映专业版进程 :param timeout: 最大等待时间(单位为秒),默认为 60 :return: 无 """ try: # 定位剪映程序窗口 window_handle = self._locate_window() if window_handle is not None: # 请求关闭剪映程序窗口 win32gui.SendMessage(window_handle, win32con.WM_CLOSE, 0, 0) start_time = time.time() while time.time() - start_time < timeout: if not win32gui.IsWindow(window_handle): print("已关闭剪映专业版进程") return time.sleep(2) raise RuntimeError("关闭超时") except Exception as exception: raise RuntimeError( f"关闭剪映专业版进程发生异常:{str(exception)}" ) from exception @staticmethod def _locate_window() -> Optional[int]: """ 定位剪映程序窗口 :return: 剪映程序窗口句柄 """ window_handle = None def callback(handle, _): """ 遍历所有窗口的回调函数 """ # 初始化窗口句柄 nonlocal window_handle # 获取窗口标题 window_text = win32gui.GetWindowText(handle) # 检查窗口是否可见且窗口标题为剪映专业版 if ( win32gui.IsWindow(handle) and win32gui.IsWindowVisible(handle) and window_text == "剪映专业版" ): window_handle = handle return False return True # 遍历所有顶层窗口 win32gui.EnumWindows(callback, None) return window_handle