From 796c9a64fe05593d748fd28c4854a0dd4c9666b3 Mon Sep 17 00:00:00 2001 From: liubiren Date: Wed, 25 Feb 2026 21:38:46 +0800 Subject: [PATCH] 1 --- 短视频AI生成/main.py | 5 + 短视频合成自动化/export.py | 277 +++++-------------------------------- 2 files changed, 41 insertions(+), 241 deletions(-) create mode 100644 短视频AI生成/main.py diff --git a/短视频AI生成/main.py b/短视频AI生成/main.py new file mode 100644 index 0000000..bdb3ab9 --- /dev/null +++ b/短视频AI生成/main.py @@ -0,0 +1,5 @@ +# -*- coding: utf-8 -*- +""" +主模块 +""" + diff --git a/短视频合成自动化/export.py b/短视频合成自动化/export.py index b35926f..cafeab9 100644 --- a/短视频合成自动化/export.py +++ b/短视频合成自动化/export.py @@ -3,27 +3,24 @@ 导出草稿模块 """ -from copy import deepcopy +import hashlib import json from pathlib import Path +from pathlib import WindowsPath import random import re import shutil import subprocess +import sys import time from typing import Any, Dict, List, Optional -from uuid import uuid4 -from pathlib import WindowsPath -import pyJianYingDraft -import win32con -import win32gui -import hashlib -from draft import JianYingDraft -import sys +import pyJianYingDraft + +from draft import JianYingDraft +from utils.sqlite import SQLite sys.path.append(Path(__file__).parent.parent.as_posix()) -from utils.sqlite import SQLite # 自定义JSON编码器 @@ -73,7 +70,7 @@ class Caches(SQLite): """ ) except Exception as exception: - raise RuntimeError(f"初始化缓存表发生异常:{str(exception)}") from exception + raise RuntimeError(f"初始化缓存发生异常:{str(exception)}") from exception def query(self, draft_name: str) -> Optional[Dict[str, Any]]: """ @@ -186,28 +183,25 @@ class JianYingExport: # 初始化所有工作流 self.workflows = { - "0000": [ + "默认": [ "add_subtitles", "add_background_video", "add_statement", "add_sticker1", "add_sticker2", - "save", - ], - "0001": [ + ], # 默认工作流,先根据脚本合成音频,再叠加背景视频、声明视频、贴纸1视频和贴纸2视频 + "淘宝闪购": [ "add_subtitles_video", # 以此作为草稿持续时长 "add_background_video", "add_background_audio", "add_statement_video", - "save", ], # 适用于淘宝闪购、存量抽手机 - "0002": [ - "add_subtitles_video", + "视频号": [ + "add_subtitles_video", # 以此作为草稿持续时长 "add_background_video", "add_background_audio", "add_logo_video", "add_statement_video", - "save", ], # 适用于视频号 } @@ -395,7 +389,6 @@ class JianYingExport: }, ], # 图像调节设置 }, # 添加贴纸2工作配置 - "save": {}, # 保存 } self.video_width, self.video_height = video_width, video_height @@ -515,40 +508,19 @@ class JianYingExport: :param draft_counts: 每批次导出草稿数 """ if workflow_name not in self.workflows: - raise RuntimeError(f"未配置该工作流") + raise RuntimeError(f"该工作流 {workflow_name} 未配置") workflow = self.workflows[workflow_name] + # 若工作流包含添加背景音频,则在添加背景视频节点配置的播放音量设置为0 if "add_background_audio" in workflow: self.configurations["add_background_video"]["volume"] = [0.0] - # 按照工作流和工作配置拼接素材,批量生成草稿 - batch_draft_names = self._batch_generate_drafts( + # 批量生成草稿 + self._batch_generate_drafts( workflow_name=workflow_name, draft_counts=draft_counts, ) - for draft_name in batch_draft_names: - print(f"正在导出 {draft_name}...") - if (self.exports_folder_path / f"{draft_name}.mp4").is_file(): - print("存在相同名称的草稿,跳过") - continue - - jianying_controller.export_draft( - draft_name=draft_name, - output_path=self.exports_folder_path.as_posix(), - ) - print("已完成") - print() - - # 关闭剪映专业版进程 - self._close_process() - time.sleep(2) - - for draft_name in batch_draft_names: - # 就已导出草稿删除 - self.drafts_folder.remove(draft_name=draft_name) - time.sleep(2) - def _batch_generate_drafts( self, workflow_name: str, @@ -576,7 +548,6 @@ class JianYingExport: continue print(f"正在生成草稿 {draft_name}...") - draft_index += 1 # 实例化 JianYingDraft draft = JianYingDraft( @@ -641,23 +612,18 @@ class JianYingExport: draft.save() print("已完成") - if "add_subtitles" in self.workflow: - # 高亮关键词 - self._highlight_keywords(draft_name=draft_name) - - self.draft_names.append(draft_name) - with open( - file=self.exports_folder_path / f"{uuid4().hex.upper()}.txt", - mode="w", - encoding="utf-8", - ) as file: - file.write(f"{configuration}") + # 缓存 + self.caches.update( + draft_name=draft_name, + workflow_configurations=workflow_configurations, + ) print("已完成") print() - # 就所有草稿名称倒叙排序排序 - self.draft_names.sort(reverse=True) + draft_index += 1 + if draft_index > draft_counts: + break def _get_workflow_configurations( self, @@ -676,8 +642,8 @@ class JianYingExport: key: random.choice(value) for key, value in self.configurations[node_name].items() } - # 若非添加字幕或保存则在工作流配置添加轨道名称 - if node_name not in ["add_subtitles", "save"]: + # 若非添加字幕则在工作流配置添加轨道名称 + if node_name not in ["add_subtitles"]: configurations["track_name"] = ( matched.group("track_name") if ( @@ -696,6 +662,13 @@ class JianYingExport: } ) + # 添加保存节点 + workflow_configurations.append( + { + "node_name": "save", + "configurations": {}, + } + ) return workflow_configurations def _generate_draft_name( @@ -715,185 +688,7 @@ class JianYingExport: sort_keys=True, ensure_ascii=False, ).encode("utf-8") - ) # 先将工作流配置序列化,再以MD5哈希值作为草稿名称 + ) # 将工作流配置序列化 .hexdigest() - .upper() + .upper() # MD5哈希值的大写十六进制作为草稿名称 ) - - def _highlight_keywords( - self, - draft_name: str, - ) -> None: - """ - 高亮关键词 - :param draft_name: 草稿名称 - :return: 无 - """ - time.sleep(2) - - # 草稿内容路径 - draft_content_path = self.drafts_folder_path / draft_name / "draft_content.json" - - with open( - file=draft_content_path, - mode="r", - encoding="utf-8", - ) as file: - draft_content = json.load(file) - - # 字幕文本轨道所有文本片段的素材标识 - material_ids = [ - segment["material_id"] - for track in draft_content["tracks"] - if track["name"] == "subtitles(text)" - for segment in track["segments"] - ] - - # 遍历所有文本素材 - for idx, material in enumerate(draft_content["materials"]["texts"]): - if material["id"] in material_ids: - # 素材内容 - content = json.loads(s=material["content"]) - # 素材文本 - text = content["text"] - # 遍历关键词 - for keyword in self.configuration["add_subtitles"]["keywords"]: - if match := next(re.finditer(pattern=keyword, string=text), None): - assert len(styles := content["styles"]) == 1, "样式设置数不为1" - # 根据关键词将文本拆分为三段,分别为前段、中段和后段样式。其中前段和后段样式无花字设置,中段样式有花字设置 - middle_style = styles[0] - style = { - key: value - for key, value in middle_style.items() - if key != "effectStyle" - } - front_style = deepcopy(style) - rear_style = deepcopy(style) - - # 将前段样式中终止位置设置为关键词起始位置 - front_style["range"] = [0, match.start()] - - # 将中段样式中起始和终止位置设置为关键词起始和终止位置 - middle_style["range"] = [match.start(), match.end()] - # 调整字号 - middle_style["size"] += 3 - - if match.end() != len(text): - # 将后段样式中起始位置设置为关键词终止位置 - rear_style["range"] = [match.end(), len(text)] - styles = [front_style, middle_style, rear_style] - else: - styles = [front_style, middle_style] - - draft_content["materials"]["texts"][idx]["content"] = ( - json.dumps( - obj={ - "styles": [front_style, middle_style, rear_style], - "text": text, - }, - ensure_ascii=False, - ) - ) - - with open( - file=draft_content_path, - mode="w", - encoding="utf-8", - ) as file: - file.write( - json.dumps( - obj=draft_content, - default=lambda x: x.name if isinstance(x, Path) else x, - ensure_ascii=False, - indent=4, - ) - ) - - def _start_process(self, timeout: int = 60) -> None: - """ - 启动剪映专业版进程 - :param timeout: 最大等待时间(单位为秒),默认为 60 - :return: 无 - """ - try: - # 关闭剪映专业版进程 - self._close_process() - - # 非堵塞方法 - self.jianying_process = subprocess.Popen( - args=self.program_path.as_posix(), - shell=True, # 适配 Windows路径中的空格 - stdout=subprocess.DEVNULL, # 重定向 - stderr=subprocess.DEVNULL, - creationflags=subprocess.CREATE_NEW_CONSOLE, - ) - - start_time = time.time() - while time.time() - start_time < timeout: - # 定位剪映程序窗口 - if self._locate_window() is not None: - print(f"已启动剪映专业版进程,PID {self.jianying_process.pid}") - return - time.sleep(2) - raise RuntimeError("启动超时") - - except Exception as exception: - raise RuntimeError( - f"启动剪映专业版进程发生异常:{str(exception)}" - ) from exception - - def _close_process(self, timeout: int = 60) -> None: - """ - 关闭剪映专业版进程 - :param timeout: 最大等待时间(单位为秒),默认为 60 - :return: 无 - """ - try: - # 定位剪映程序窗口 - window_handle = self._locate_window() - if window_handle is not None: - # 请求关闭剪映程序窗口 - win32gui.SendMessage(window_handle, win32con.WM_CLOSE, 0, 0) - - start_time = time.time() - while time.time() - start_time < timeout: - if not win32gui.IsWindow(window_handle): - print("已关闭剪映专业版进程") - return - time.sleep(2) - raise RuntimeError("关闭超时") - - except Exception as exception: - raise RuntimeError( - f"关闭剪映专业版进程发生异常:{str(exception)}" - ) from exception - - @staticmethod - def _locate_window() -> Optional[int]: - """ - 定位剪映程序窗口 - :return: 剪映程序窗口句柄 - """ - window_handle = None - - def callback(handle, _): - """ - 遍历所有窗口的回调函数 - """ - # 初始化窗口句柄 - nonlocal window_handle - # 获取窗口标题 - window_text = win32gui.GetWindowText(handle) - # 检查窗口是否可见且窗口标题为剪映专业版 - if ( - win32gui.IsWindow(handle) - and win32gui.IsWindowVisible(handle) - and window_text == "剪映专业版" - ): - window_handle = handle - return False - return True - - # 遍历所有顶层窗口 - win32gui.EnumWindows(callback, None) - return window_handle