This commit is contained in:
liubiren 2026-03-11 17:55:48 +08:00
parent 020f6fe12e
commit 43c8dd3a7c
8 changed files with 841 additions and 850 deletions

View File

@ -38,10 +38,10 @@ ark_client = Ark(
request_client = Request()
def get_brand_words() -> List[str]:
def get_product_image() -> List[str]:
"""
获取品牌词
:return: 品牌词
获取产品图片
:return: 产品图片
"""
try:
with open(

Binary file not shown.

View File

@ -0,0 +1,115 @@
# -*- coding: utf-8 -*-
"""
缓存模块
"""
# 列举导入模块
import json
from pathlib import Path
from pathlib import WindowsPath
import sys
import time
from typing import Any, Dict, Optional
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.sqlite import SQLite
class Caches(SQLite):
"""
缓存客户端支持
query查询并返回单条缓存
update新增或更新单条缓存
"""
# 自定义JSON编码器
class JSONEncoder(json.JSONEncoder):
def default(self, o):
# 若为WindowsPath对象则转为字符串路径
if isinstance(o, WindowsPath):
return o.as_posix()
return super().default(o)
def __init__(self, cache_ttl: int = 30 * 86400):
"""
初始化
:param cache_ttl: 缓存生存时间单位为秒默认为30天
"""
# 初始化
super().__init__(database=Path(__file__).parent.resolve() / "caches.db")
self.cache_ttl = cache_ttl
# 初始化缓存表(不清理过期缓存)
try:
with self:
self.execute(
sql="""
CREATE TABLE IF NOT EXISTS caches
(
--草稿名称
draft_name TEXT PRIMARY KEY,
--工作流配置
configurations TEXT NOT NULL,
--创建时间戳
timestamp REAL NOT NULL
)
"""
)
self.execute(
sql="""
CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)
"""
)
except Exception as exception:
raise RuntimeError(f"初始化缓存发生异常:{str(exception)}") from exception
def query(self, draft_name: str) -> Optional[Dict[str, Any]]:
"""
查询并返回单条缓存
:param draft_name: 草稿名称
:return: 缓存
"""
try:
with self:
result = self.query_one(
sql="""
SELECT configurations
FROM caches
WHERE draft_name = ? AND timestamp >= ?
""",
parameters=(draft_name, time.time() - self.cache_ttl),
)
return None if result is None else json.loads(result["configurations"])
except Exception as exception:
raise RuntimeError(
f"查询并获取单条缓存发生异常:{str(exception)}"
) from exception
def update(
self, draft_name: str, configurations: Dict[str, Dict[str, Any]]
) -> Optional[bool]:
"""
新增或更新单条缓存
:param draft_name: 草稿名称
:param configurations: 节点配置
:return: 成功返回True失败返回False
"""
try:
with self:
return self.execute(
sql="""
INSERT OR REPLACE INTO caches (draft_name, configurations, timestamp) VALUES (?, ?, ?)
""",
parameters=(
draft_name,
json.dumps(
obj=configurations,
cls=self.JSONEncoder,
sort_keys=True,
ensure_ascii=False,
),
time.time(),
),
)
except Exception as exception:
raise RuntimeError("新增或更新缓存发生异常") from exception

View File

@ -1,62 +1,85 @@
# -*- coding: utf-8 -*-
"""
生成草稿模块
剪映草稿模块
"""
# 列举导入模块
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple, Union
import pyJianYingDraft
from pyJianYingDraft import (
AudioMaterial,
AudioSegment,
ClipSettings,
ClipSettings,
DraftFolder,
FontType,
KeyframeProperty,
StickerSegment,
TextBackground,
TextBorder,
TextSegment,
TextStyle,
TrackType,
VideoMaterial,
VideoSegment,
trange,
)
from pyJianYingDraft.time_util import tim
from edgetts import EdgeTTS
class JianYingDraft:
class Drafts:
"""
封装 pyJianYing中生成草稿的相关功能支持
1向指定文本轨道添加文本片段
2向指定音频轨道添加音频片段
3向指定视频轨道添加视频或图片片段
4向指定贴纸轨道添加贴纸片段
5根据文本逐段合成语音生成文本和语音字幕
6将草稿保存至剪映草稿文件夹内
剪映草稿支持
1 添加文本片段
2 添加音频片段
3 添加视频或图片片段
4 添加贴纸片段
5 生成字幕
6 保存剪映草稿
"""
def __init__(
self,
drafts_folder: pyJianYingDraft.DraftFolder,
draft_name: str,
materials_folder_path: Path,
allow_replace: bool = True,
drafts_folder: DraftFolder,
draft_name: str,
video_width: int = 1080,
video_height: int = 1920,
video_fps: int = 30,
video_duration: int = 0,
):
"""
初始化
:param materials_folder_path: 素材文件夹路径
:param drafts_folder: 剪映草稿文件夹管理器
:param draft_name: 草稿名称
:param allow_replace: 是否允许覆盖同名草稿若不允许覆盖同名草稿需初始化剪映草稿文件夹
:param video_width: 视频宽度默认为 1080像素
:param video_height: 视频高度默认为 1920像素
:param video_width: 视频宽度单位为像素默认为 1080
:param video_height: 视频高度单位为像素默认为 1920
:param video_fps: 视频帧率单位为帧/默认为 30
:param materials_folder_path: 素材文件夹路径
:param video_duration: 视频持续时长单位为微秒默认为 0
"""
try:
# 新建草稿
# 初始化素材文件夹路径
self.materials_folder_path = materials_folder_path
if not self.materials_folder_path.exists():
raise FileNotFoundError(f"素材文件夹路径不存在")
# 创建剪映草稿
self.draft = drafts_folder.create_draft(
draft_name=draft_name,
allow_replace=allow_replace,
allow_replace=True, # 允许覆盖与 draft_name 重名的剪映草稿
width=video_width,
height=video_height,
fps=video_fps,
)
# 草稿持续时长(单位为毫秒)
self.draft_duration = 0
self.materials_folder_path = materials_folder_path
# 初始化视频持续时长
self.video_duration = video_duration
except Exception as exception:
raise RuntimeError(f"发生异常:{str(exception)}") from exception
raise RuntimeError(f"创建剪映草稿发生异常:{str(exception)}") from exception
def add_text_segment(
self,
@ -74,7 +97,7 @@ class JianYingDraft:
animation: Optional[Dict[str, Any]] = None,
) -> None:
"""
向指定文本轨道添加文本片段
添加文本片段
:param track_name: 轨道名称
:param add_track: 添加文本轨道默认为是
:param text: 文本
@ -93,44 +116,36 @@ class JianYingDraft:
if add_track:
# 添加文本轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.text,
track_type=TrackType.text,
track_name=track_name,
)
# 构建文本片段
text_segment = pyJianYingDraft.TextSegment(
text_segment = TextSegment(
text=text.replace("\\n", "\n"),
timerange=pyJianYingDraft.trange(
*(timerange if timerange else (0, self.draft_duration))
),
font=pyJianYingDraft.FontType(font) if font else None,
style=pyJianYingDraft.TextStyle(**style) if style else None,
border=pyJianYingDraft.TextBorder(**border) if border else None,
background=(
pyJianYingDraft.TextBackground(**background) if background else None
timerange=trange(
*(timerange if timerange else (0, self.video_duration))
),
font=FontType(font) if font else None,
style=TextStyle(**style) if style else None,
border=TextBorder(**border) if border else None,
background=(TextBackground(**background) if background else None),
clip_settings=(
pyJianYingDraft.ClipSettings(**clip_settings)
if clip_settings
else None
ClipSettings(**clip_settings) if clip_settings else None
),
)
# 添加气泡
if bubble:
text_segment.add_bubble(**bubble)
# 添加花字
# 将花字保存预设后在C:/Users/admin/AppData/Local/JianyingPro/User Data/Presets/Text_V2/预设文本?.textpreset获取花字resource_id
if effect:
text_segment.add_effect(**effect)
# 添加动画
if animation:
text_segment.add_animation(**animation)
# 向指定文本轨道添加文本片段
# 向指定轨道添加文本片段
self.draft.add_segment(segment=text_segment, track_name=track_name)
except Exception as exception:
raise RuntimeError(str(exception)) from exception
@ -146,7 +161,7 @@ class JianYingDraft:
fade: Optional[Tuple[str, str]] = None,
) -> None:
"""
向指定音频轨道添加音频片段
添加音频片段
:param track_name: 轨道名称
:param add_track: 添加音频轨道默认为是
:param material_path: 音频素材路径
@ -159,40 +174,35 @@ class JianYingDraft:
"""
try:
# 音频素材
audio_material = pyJianYingDraft.AudioMaterial(
path=material_path.as_posix()
)
audio_material = AudioMaterial(path=material_path.as_posix())
# 音频素材的持续时长
audio_material_duration = audio_material.duration
# 若草稿持续时长为0则将第一个音频素材持续时长作为草稿持续时长
# 获取持续时间
target_duration = pyJianYingDraft.time_util.tim(
(target_timerange if target_timerange else (0, self.draft_duration))[1]
# 目标持续时间
target_duration = tim(
(target_timerange if target_timerange else (0, self.video_duration))[1]
)
if add_track:
# 添加音频轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.audio,
track_type=TrackType.audio,
track_name=track_name,
)
duration = 0 # 已添加音频素材的持续时长
while duration < target_duration:
cumulative_duration = 0 # 累计持续时长
while cumulative_duration < target_duration:
# 构建音频片段
audio_segment = pyJianYingDraft.AudioSegment(
audio_segment = AudioSegment(
material=audio_material,
target_timerange=pyJianYingDraft.trange(
start=duration,
target_timerange=trange(
start=cumulative_duration,
duration=min(
(target_duration - duration), audio_material_duration
(target_duration - cumulative_duration),
audio_material_duration,
),
),
source_timerange=(
pyJianYingDraft.trange(*source_timerange)
if source_timerange
else None
trange(*source_timerange) if source_timerange else None
),
speed=speed,
volume=volume,
@ -200,12 +210,10 @@ class JianYingDraft:
# 添加淡入淡出
if fade:
audio_segment.add_fade(*fade)
# 向指定音频轨道添加音频片段
# 向指定轨道添加音频片段
self.draft.add_segment(segment=audio_segment, track_name=track_name)
duration += audio_material_duration
cumulative_duration += audio_material_duration
except Exception as exception:
raise RuntimeError(str(exception)) from exception
@ -213,20 +221,20 @@ class JianYingDraft:
self,
track_name: str,
material_path: Path,
target_timerange: Optional[Tuple[Union[int, str], Union[int, str]]] = None,
source_timerange: Optional[Tuple[Union[int, str], Union[int, str]]] = None,
target_timerange: Optional[Tuple[int, Optional[int]]] = None,
source_timerange: Optional[Tuple[int, int]] = None,
speed: float = 1.0,
volume: float = 1.0,
clip_settings: Optional[Dict[str, Any]] = None,
keyframes: Optional[
List[Tuple[pyJianYingDraft.KeyframeProperty, Union[str, int], float]]
List[Tuple[KeyframeProperty, Union[str, int], float]]
] = None,
animation: Optional[Dict[str, Any]] = None,
transition: Optional[Dict[str, Any]] = None,
background_filling: Optional[Dict[str, Any]] = None,
) -> None:
"""
向指定视频轨道添加视频或图片片段
添加视频或图片片段
:param track_name: 轨道名称
:param material_path: 视频或图片素材路径
:param target_timerange: 视频或图片素材在轨道上的范围包括开始时间和持续时长默认为草稿持续时长
@ -242,74 +250,65 @@ class JianYingDraft:
"""
try:
# 视频素材
video_material = pyJianYingDraft.VideoMaterial(
path=material_path.as_posix()
)
video_material = VideoMaterial(path=material_path.as_posix())
# 视频素材的持续时长
video_material_duration = video_material.duration
# 若草稿持续时长为0则将第一个视频素材持续时长作为草稿持续时长
relative_index = 0
if not self.draft_duration:
relative_index = 1 # 视频轨道相对索引
self.draft_duration = video_material_duration
# 获取持续时间
target_duration = pyJianYingDraft.time_util.tim(
(target_timerange if target_timerange else (0, self.draft_duration))[1]
)
# 目标持续时间
target_duration = (
tim(target_duration)
if (target_timerange and (target_duration := target_timerange[1]))
else (
video_material_duration if target_timerange else self.video_duration
)
) # 若视频或图片素材在轨道上的范围为空则将视频素材持续时长作为目标持续时长,若视频或图片素材在轨道上的范围中持续时长为空则将视频素材持续时长作为目标持续时长
# 添加视频轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.video,
track_type=TrackType.video,
track_name=track_name,
relative_index=relative_index,
)
duration = 0 # 已添加视频素材的持续时长
while duration < target_duration:
cumulative_duration = 0 # 累计持续时长
while cumulative_duration < target_duration:
# 构建视频或图片片段
video_segment = pyJianYingDraft.VideoSegment(
video_segment = VideoSegment(
material=video_material,
target_timerange=pyJianYingDraft.trange(
start=duration,
target_timerange=trange(
start=cumulative_duration
+ (target_timerange[0] if target_timerange else 0),
duration=min(
(target_duration - duration), video_material_duration
(target_duration - cumulative_duration),
video_material_duration,
),
),
source_timerange=(
pyJianYingDraft.trange(*source_timerange)
if source_timerange
else None
trange(*source_timerange) if source_timerange else None
),
speed=speed,
volume=volume,
clip_settings=(
pyJianYingDraft.ClipSettings(**clip_settings)
if clip_settings
else None
ClipSettings(**clip_settings) if clip_settings else None
),
)
# 添加关键帧
if keyframes:
for _property, offset, value in keyframes:
video_segment.add_keyframe(_property, offset, value)
for keyframe_property, keyframe_offset, keyframe_value in keyframes:
video_segment.add_keyframe(
keyframe_property, keyframe_offset, keyframe_value
)
# 添加动画
if animation:
video_segment.add_animation(**animation)
# 添加转场
if transition:
video_segment.add_transition(**transition)
# 添加背景填充
if background_filling:
video_segment.add_background_filling(**background_filling)
# 向指定视频轨道添加视频或图片片段
# 向指定轨道添加视频或图片片段
self.draft.add_segment(segment=video_segment, track_name=track_name)
duration += video_material_duration
cumulative_duration += video_material_duration
except Exception as exception:
raise RuntimeError(str(exception)) from exception
@ -331,25 +330,23 @@ class JianYingDraft:
try:
# 添加贴纸轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.sticker,
track_type=TrackType.sticker,
track_name=track_name,
)
# 构建贴纸
# 将贴纸保存为我的预设后在C:\Users\admin\AppData\Local\JianyingPro\User Data\Presets\Combination\Presets\我的预设?\preset_draft获取
sticker_segment = pyJianYingDraft.StickerSegment(
sticker_segment = StickerSegment(
resource_id=resource_id,
target_timerange=pyJianYingDraft.trange(
target_timerange=trange(
*(
target_timerange
if target_timerange
else (0, self.draft_duration)
else (0, self.video_duration)
)
),
clip_settings=(
pyJianYingDraft.ClipSettings(**clip_settings)
if clip_settings
else None
ClipSettings(**clip_settings) if clip_settings else None
),
)
@ -359,10 +356,10 @@ class JianYingDraft:
except Exception as exception:
raise RuntimeError(str(exception)) from exception
def add_subtitles(
def generate_subtitle(
self,
text: str,
timbre: str = "女声-晓晓",
texts: str,
timbre: str = "zh-CN-XiaoxiaoNeural",
rate: str = "+25%",
volume: str = "+0%",
font: Optional[str] = None,
@ -371,12 +368,12 @@ class JianYingDraft:
effect: Optional[Dict[str, Any]] = None,
):
"""
添加字幕
:param text: 文本
根据字幕文本合成字幕音频并生成字幕
:param texts: 字幕文本
:param timbre: 声音音色默认为女声-晓晓
:param rate: 语速默认为 +25%
:param volume: 音量默认为 +0%
:param font: 字体默认为系统
:param font: 字体默认为系统默认字体
:param style: 文本样式默认为字号 12对齐方式 水平居中
:param clip_settings: 图像调节设置默认为移动至 (0, -0.5)
:param effect: 花字设置默认为无
@ -384,33 +381,34 @@ class JianYingDraft:
"""
# 添加文本轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.text,
track_name=(text_track_name := "subtitles(text)"),
track_type=TrackType.text,
track_name=(text_track_name := "subtitle(text)"),
)
# 添加音频轨道
self.draft.add_track(
track_type=pyJianYingDraft.TrackType.audio,
track_name=(audio_track_name := "subtitles(audio)"),
track_type=TrackType.audio,
track_name=(audio_track_name := "subtitle(audio)"),
)
# 构造语音文件保存文件夹路径path对象
subtitles_folder_path = self.materials_folder_path / "subtitles"
subtitles_folder_path.mkdir(exist_ok=True)
# 实例化 EdgeTTS
edge_tts = EdgeTTS(folder_path=subtitles_folder_path)
# 构建字幕音频文件夹路径
subtitle_folder_path = self.materials_folder_path / "字幕音频"
subtitle_folder_path.mkdir(exist_ok=True)
start = 0
for paragraph in text.split(""):
# 根据文本合成语音并将语音文件保存至指定文件夹内
file_path, duration = edge_tts.synthetize(
text=paragraph, timbre=timbre, rate=rate, volume=volume
# 实例化 EdgeTTS
edge_tts = EdgeTTS(folder_path=subtitle_folder_path)
cumulative_duration = 0 # 累计持续时长
for text in texts.split(""):
# 根据字幕文本片段合成语音并将语音文件保存至指定文件夹内
subtitle_audio_path, duration = edge_tts.synthetize(
text=text, timbre=timbre, rate=rate, volume=volume
)
# 向指定文本轨道添加文本片段
self.add_text_segment(
track_name=text_track_name,
add_track=False,
text=paragraph,
timerange=(start, duration),
add_track=False, # 不添加文本轨道
text=text,
timerange=(cumulative_duration, duration),
font=font,
style={
"size": 12.0,
@ -426,20 +424,18 @@ class JianYingDraft:
# 向指定音频轨道添加音频片段
self.add_audio_segment(
track_name=audio_track_name,
add_track=False,
material_path=file_path,
target_timerange=(start, duration),
volume=1.5,
add_track=False, # 不添加音频轨道
material_path=subtitle_audio_path,
target_timerange=(cumulative_duration, duration),
)
start += duration
cumulative_duration += duration
# 更新草稿持续时长
self.draft_duration = start
# 以累计持续时长作为视频持续时长
self.video_duration = cumulative_duration
def save(self) -> None:
"""将草稿保存至剪映草稿文件夹内"""
try:
self.draft.save()
except Exception as exception:
raise RuntimeError(str(exception)) from exception

View File

@ -3,6 +3,7 @@
合成语音模块
"""
# 列举导入模块
import asyncio
from hashlib import md5
from pathlib import Path
@ -54,17 +55,19 @@ class EdgeTTS:
.hexdigest()
.upper()}.mp3"
# 构造语音文件路径
file_path = self.folder_path / file_name
audio_path = self.folder_path / file_name
communicator = edge_tts.Communicate(
text=text.replace("\n", ""),
voice=timbre,
rate=rate,
volume=volume,
)
await communicator.save(file_path.as_posix())
await communicator.save(audio_path.as_posix())
# 持续时长(单位为微秒)
duration = int(round(MP3(file_path.as_posix()).info.length * 1_000_000))
return file_path, duration
duration = int(
round(MP3(audio_path.as_posix()).info.length * 1_000_000)
)
return audio_path, duration
return asyncio.run(_async_synthetize())
except Exception as exception:

View File

@ -1,693 +0,0 @@
# -*- coding: utf-8 -*-
"""
导出草稿模块
"""
import hashlib
import json
from pathlib import Path
from pathlib import WindowsPath
import random
import re
import shutil
import sys
import time
from typing import Any, Dict, List, Optional
import pyJianYingDraft
from draft import JianYingDraft
from utils.sqlite import SQLite
sys.path.append(Path(__file__).parent.parent.as_posix())
# 自定义JSON编码器
class JSONEncoder(json.JSONEncoder):
def default(self, o):
# 若为WindowsPath对象则转为字符串路径
if isinstance(o, WindowsPath):
return o.as_posix()
return super().default(o)
class Caches(SQLite):
"""
缓存客户端支持
query查询并返回单条缓存
update新增或更新单条缓存
"""
def __init__(self, cache_ttl: int = 30 * 86400):
"""
初始化
:param cache_ttl: 缓存生存时间单位为秒默认为30天
"""
# 初始化
super().__init__(database=Path(__file__).parent.resolve() / "caches.db")
self.cache_ttl = cache_ttl
# 初始化缓存表(不清理过期缓存)
try:
with self:
self.execute(
sql="""
CREATE TABLE IF NOT EXISTS caches
(
--草稿名称
draft_name TEXT PRIMARY KEY,
--工作流配置
workflow_configurations TEXT NOT NULL,
--创建时间戳
timestamp REAL NOT NULL
)
"""
)
self.execute(
sql="""
CREATE INDEX IF NOT EXISTS idx_timestamp ON caches(timestamp)
"""
)
except Exception as exception:
raise RuntimeError(f"初始化缓存发生异常:{str(exception)}") from exception
def query(self, draft_name: str) -> Optional[Dict[str, Any]]:
"""
查询并返回单条缓存
:param draft_name: 草稿名称
:return: 缓存
"""
try:
with self:
result = self.query_one(
sql="""
SELECT workflow_configurations
FROM caches
WHERE draft_name = ? AND timestamp >= ?
""",
parameters=(draft_name, time.time() - self.cache_ttl),
)
return (
None
if result is None
else json.loads(result["workflow_configurations"])
)
except Exception as exception:
raise RuntimeError(
f"查询并获取单条缓存发生异常:{str(exception)}"
) from exception
def update(
self, draft_name: str, workflow_configurations: List[Dict[str, Any]]
) -> Optional[bool]:
"""
新增或更新单条缓存
:param draft_name: 草稿名称
:param workflow_configurations: 工作流配置
:return: 成功返回True失败返回False
"""
try:
with self:
return self.execute(
sql="""
INSERT OR REPLACE INTO caches (draft_name, workflow_configurations, timestamp) VALUES (?, ?, ?)
""",
parameters=(
draft_name,
json.dumps(
obj=workflow_configurations,
cls=JSONEncoder,
sort_keys=True,
ensure_ascii=False,
),
time.time(),
),
)
except Exception as exception:
raise RuntimeError("新增或更新缓存发生异常") from exception
class JianYingExport:
"""
封装 pyJianYingDraft.JianyingController库支持
1初始化素材文件夹内所有素材
2初始化工作流和工作配置
3导出草稿
"""
def __init__(
self,
materials_folder_path: str,
drafts_folder_path: str = r"E:\JianYingPro Drafts",
video_width: int = 1080,
video_height: int = 1920,
video_fps: int = 30,
):
"""
初始化
:param drafts_folder_path: 剪映草稿文件夹路径
:param materials_folder_path: 素材文件夹路径
:param video_width: 视频宽度默认为 1080像素
:param video_height: 视频高度默认为 1920像素
:param video_fps: 视频帧率单位为帧/默认为 30
"""
try:
# 初始化剪映草稿文件夹路径
self.drafts_folder_path = Path(drafts_folder_path)
if not self.drafts_folder_path.exists():
raise RuntimeError("剪映草稿文件夹路径不存在")
# 初始化剪映草稿文件夹管理器
self.drafts_folder = pyJianYingDraft.DraftFolder(
folder_path=self.drafts_folder_path.as_posix()
)
# 初始化素材文件夹路径
self.materials_folder_path = Path(materials_folder_path)
if not self.materials_folder_path.exists():
raise RuntimeError("素材文件夹路径不存在")
# 初始化导出文件夹路径
self.exports_folder_path = Path(
self.materials_folder_path.as_posix().replace("materials", "exports")
)
# 若导出文件夹存在则删除,再创建导出文件夹
if self.exports_folder_path.exists():
shutil.rmtree(self.exports_folder_path)
self.exports_folder_path.mkdir()
self.materials = {}
# 初始化素材文件夹内所有素材
self._init_materials()
# 初始化所有工作流
self.workflows = {
"默认": [
"add_subtitles",
"add_background_video",
"add_statement",
"add_sticker1",
"add_sticker2",
], # 默认工作流先根据脚本合成音频再叠加背景视频、声明视频、贴纸1视频和贴纸2视频
"淘宝闪购": [
"add_subtitles_video", # 以此作为草稿持续时长
"add_background_video",
"add_background_audio",
"add_statement_video",
], # 适用于淘宝闪购、存量抽手机
"视频号": [
"add_subtitles_video", # 以此作为草稿持续时长
"add_background_video",
"add_background_audio",
"add_logo_video",
"add_statement_video",
], # 适用于视频号
}
# 初始化所有节点配置
self.configurations = {
"add_subtitles": {
"text": self.materials["subtitles_text"],
"timbre": [
"zh-CN-XiaoxiaoNeural",
"zh-CN-XiaoyiNeural",
"zh-CN-YunjianNeural",
"zh-CN-YunxiNeural",
"zh-CN-YunxiaNeural",
"zh-CN-YunyangNeural",
], # 音色
"style": [
{"size": 9.0},
{"size": 10.0},
{"size": 11.0},
], # 字体样式
"keywords": [
"瑞幸",
], # 关键词
"effect": [
{"effect_id": "7127561998556089631"},
{"effect_id": "7166467215410187552"},
{"effect_id": "6896138122774498567"},
{"effect_id": "7166469374507765031"},
{"effect_id": "6896137924853763336"},
{"effect_id": "6896137990788091143"},
{"effect_id": "7127614731187211551"},
{"effect_id": "7127823362356743461"},
{"effect_id": "7127653467555990821"},
{"effect_id": "7127828216647011592"},
], # 花字设置
}, # 添加字幕工作配置
"add_subtitles_video": {
"material_path": self.materials["subtitles_video_material_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加字幕视频工作配置
"add_background_video": {
"material_path": self.materials["background_video_material_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
{
"scale_x": 1.5,
"scale_y": 1.5,
},
], # 图像调节设置
}, # 添加背景视频工作配置
"add_background_audio": {
"material_path": self.materials["background_audio_material_path"],
"volume": [0.6], # 播放音量
}, # 添加背景音频工作配置
"add_logo": {
"material_path": self.materials["logo_material_path"],
"clip_settings": [
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": -0.78,
"transform_y": 0.82,
},
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": -0.68,
"transform_y": 0.82,
},
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": 0,
"transform_y": 0.82,
},
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": 0.68,
"transform_y": 0.82,
},
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": 0.78,
"transform_y": 0.82,
},
],
}, # 添加标识工作配置
"add_logo_video": {
"material_path": self.materials["logo_video_material_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加标识视频工作配置
"add_statement": {
"text": self.materials["statement_text"],
"style": [
{"size": 5.0, "align": 1, "vertical": True},
{"size": 6.0, "align": 1, "vertical": True},
{"size": 7.0, "align": 1, "vertical": True},
], # 文本样式
"border": [
{"width": 35.0},
{"width": 40.0},
{"width": 45.0},
{"width": 50.0},
{"width": 55.0},
], # 描边宽度
"clip_settings": [
{
"transform_x": -0.80,
},
{
"transform_x": -0.82,
},
{
"transform_x": -0.84,
},
], # 图像调节设置
}, # 添加声明工作配置
"add_statement_video": {
"material_path": self.materials["statement_video_material_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加声明视频工作配置
"add_sticker1": {
"resource_id": [
"7110124379568098568",
"7019687632804334861",
"6895933678262750478",
"7010558788675652900",
"7026858083393588487",
"7222940306558209336",
"7120543009489341727",
"6939830545673227557",
"6939826722451754271",
"7210221631132749093",
"7138432572488453408",
"7137700067338620192",
"6895924436822674696",
"7134644683506044163",
"7062539853430279437",
],
"clip_settings": [
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": -0.75,
"transform_y": 0.75,
},
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_y": 0.75,
},
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": 0.75,
"transform_y": 0.75,
},
], # 图像调节设置
}, # 添加贴纸1工作配置不包含箭头类
"add_sticker2": {
"resource_id": [
"7143078914989018379",
"7142870400358255905",
"7185568038027103544",
"7024342011440319781",
"7205042602184363322",
],
"clip_settings": [
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": -0.8,
"transform_y": -0.62,
},
], # 图像调节设置
}, # 添加贴纸2工作配置
}
self.video_width, self.video_height = video_width, video_height
self.video_fps = video_fps
# 实例化缓存
self.caches = Caches()
except Exception as exception:
raise RuntimeError(f"发生异常:{str(exception)}") from exception
def _init_materials(self) -> None:
"""
初始化素材文件夹内所有素材
:return:
"""
# 字幕(文本)
subtitles_path = self.materials_folder_path / "字幕.txt"
if subtitles_path.exists() and subtitles_path.is_file():
with open(subtitles_path, "r", encoding="utf-8") as file:
subtitles_text = file.readlines()
if not subtitles_text:
raise RuntimeError("字幕文本为空")
self.materials["subtitles_text"] = subtitles_text
else:
self.materials["subtitles_text"] = []
# 字幕(视频)
subtitles_video_folder_path = self.materials_folder_path / "字幕视频"
if (
subtitles_video_folder_path.exists()
and subtitles_video_folder_path.is_dir()
):
self.materials["subtitles_video_material_path"] = [
file_path
for file_path in subtitles_video_folder_path.rglob("*.mov")
if file_path.is_file()
]
else:
self.materials["subtitles_video_material_path"] = []
# 背景视频
background_video_folder_path = self.materials_folder_path / "背景视频"
if (
background_video_folder_path.exists()
and background_video_folder_path.is_dir()
):
self.materials["background_video_material_path"] = [
file_path
for file_path in background_video_folder_path.rglob("*.mp4")
if file_path.is_file()
]
else:
self.materials["background_video_material_path"] = []
# 背景音频
background_audio_folder_path = self.materials_folder_path / "背景音频"
if (
background_audio_folder_path.exists()
and background_audio_folder_path.is_dir()
):
self.materials["background_audio_material_path"] = [
file_path
for file_path in background_audio_folder_path.rglob("*.mp3")
if file_path.is_file()
]
else:
self.materials["background_audio_material_path"] = []
# 标识
logo_path = self.materials_folder_path / "标识.png"
if logo_path.exists() and logo_path.is_file():
self.materials["logo_material_path"] = [logo_path] # 有且只有一张标识
else:
self.materials["logo_material_path"] = []
# 标识视频
logo_video_folder_path = self.materials_folder_path / "标识视频"
if logo_video_folder_path.exists() and logo_video_folder_path.is_dir():
self.materials["logo_video_material_path"] = [
file_path
for file_path in logo_video_folder_path.rglob("*.mov")
if file_path.is_file()
]
else:
self.materials["logo_video_material_path"] = []
# 声明文本
statement_path = self.materials_folder_path / "声明.txt"
if statement_path.exists() and statement_path.is_file():
with open(statement_path, "r", encoding="utf-8") as file:
statement_text = file.readlines()
if not statement_text:
raise RuntimeError("声明文本为空")
self.materials["statement_text"] = statement_text
else:
self.materials["statement_text"] = []
# 声明视频
statement_video_folder_path = self.materials_folder_path / "声明视频"
if (
statement_video_folder_path.exists()
and statement_video_folder_path.is_dir()
):
self.materials["statement_video_material_path"] = [
file_path
for file_path in statement_video_folder_path.rglob("*.mov")
if file_path.is_file()
]
else:
self.materials["statement_video_material_path"] = []
def export_videos(self, workflow_name: str, draft_counts: int):
"""
导出视频
:param workflow_name: 工作流名称
:param draft_counts: 每批次导出草稿数
"""
if workflow_name not in self.workflows:
raise RuntimeError(f"该工作流 {workflow_name} 未配置")
workflow = self.workflows[workflow_name]
# 若工作流包含添加背景音频则在添加背景视频节点配置的播放音量设置为0
if "add_background_audio" in workflow:
self.configurations["add_background_video"]["volume"] = [0.0]
# 批量生成草稿
self._batch_generate_drafts(
workflow_name=workflow_name,
draft_counts=draft_counts,
)
def _batch_generate_drafts(
self,
workflow_name: str,
draft_counts: int,
) -> None:
"""
批量生成草稿
:param workflow_name: 工作流名称
:param draft_counts: 草稿数
:return:
"""
draft_index = 1 # 草稿索引
while True:
# 获取工作流配置
workflow_configurations = self._get_workflow_configurations(
workflow_name=workflow_name
)
# 根据工作流配置生成草稿名称
draft_name = self._generate_draft_name(
workflow_configurations=workflow_configurations,
)
# 若已缓存则跳过
if self.caches.query(draft_name=draft_name):
continue
print(f"正在生成草稿 {draft_name}...")
# 实例化 JianYingDraft
draft = JianYingDraft(
drafts_folder=self.drafts_folder,
draft_name=draft_name,
video_width=self.video_width,
video_height=self.video_height,
video_fps=self.video_fps,
materials_folder_path=self.materials_folder_path,
)
for node in workflow_configurations:
match node["node_name"]:
# 添加字幕
case "add_subtitles":
print("-> 正在添加字幕...", end="")
draft.add_subtitles(**node["configurations"])
print("已完成")
# 添加字幕视频
case "add_subtitles_video":
print("-> 正在添加字幕视频...", end="")
draft.add_video_segment(**node["configurations"])
print("已完成")
# 添加背景视频
case "add_background_video":
print("-> 正在添加背景视频...", end="")
draft.add_video_segment(**node["configurations"])
print("已完成")
# 添加背景音频
case "add_background_audio":
print("-> 正在添加背景音频...", end="")
draft.add_audio_segment(**node["configurations"])
print("已完成")
# 添加标识
case "add_logo":
print("-> 正在添加标识...", end="")
draft.add_video_segment(**node["configurations"])
print("已完成")
# 添加标识视频
case "add_logo_video":
print("-> 正在添加标识视频...", end="")
draft.add_video_segment(**node["configurations"])
print("已完成")
# 添加声明文本
case "add_statement":
print("-> 正在添加声明...", end="")
draft.add_text_segment(**node["configurations"])
print("已完成")
# 添加声明视频
case "add_statement_video":
print("-> 正在添加声明视频...", end="")
draft.add_video_segment(**node["configurations"])
print("已完成")
# 添加贴纸
case _ if node["node_name"].startswith("add_sticker"):
print("-> 正在添加贴纸...", end="")
draft.add_sticker(**node["configurations"])
print("已完成")
# 将草稿保存至剪映草稿文件夹内
case "save":
print("-> 正在将草稿保存至剪映草稿文件夹内...", end="")
draft.save()
print("已完成")
# 缓存
self.caches.update(
draft_name=draft_name,
workflow_configurations=workflow_configurations,
)
print("已完成")
print()
draft_index += 1
if draft_index > draft_counts:
break
def _get_workflow_configurations(
self,
workflow_name: str,
) -> List[Dict[str, Any]]:
"""
获取工作流配置
:param workflow_name: 工作流名称
:return: 工作流配置
"""
# 初始化工作流配置
workflow_configurations = []
for node_name in self.workflows[workflow_name]:
# 根据节点名称获取节点配置
configurations = {
key: random.choice(value)
for key, value in self.configurations[node_name].items()
}
# 若非添加字幕则在工作流配置添加轨道名称
if node_name not in ["add_subtitles"]:
configurations["track_name"] = (
matched.group("track_name")
if (
matched := re.match(
pattern=r"^.*?_(?P<track_name>.*)$",
string=node_name,
)
)
else node_name
)
workflow_configurations.append(
{
"node_name": node_name,
"configurations": configurations,
}
)
# 添加保存节点
workflow_configurations.append(
{
"node_name": "save",
"configurations": {},
}
)
return workflow_configurations
def _generate_draft_name(
self,
workflow_configurations: List[Dict[str, Any]],
) -> str:
"""
根据工作流配置生成草稿名称
:param workflow_configurations: 工作流配置
:return: 草稿名称
"""
return (
hashlib.md5(
json.dumps(
obj=workflow_configurations,
cls=JSONEncoder,
sort_keys=True,
ensure_ascii=False,
).encode("utf-8")
) # 将工作流配置序列化
.hexdigest()
.upper() # MD5哈希值的大写十六进制作为草稿名称
)

View File

@ -0,0 +1,570 @@
# -*- coding: utf-8 -*-
"""
剪映草稿管理器模块
"""
# 列举导入模块
import hashlib
import json
from pathlib import Path
import random
import re
import shutil
import sys
from typing import Any, Dict, List
from pyJianYingDraft import DraftFolder, VideoMaterial
from caches import Caches
from drafts import Drafts
sys.path.append(Path(__file__).parent.parent.as_posix())
class JianYingManager:
"""
剪映草稿管理器
"""
def __init__(
self,
materials_folder_path: str,
drafts_folder_path: str = r"E:\JianYingPro Drafts",
):
"""
初始化
:param materials_folder_path: 素材文件夹路径其中文件夹名称默认为工作流名称
:param drafts_folder_path: 剪映草稿文件夹路径默认为 E:\\JianYingPro Drafts
"""
try:
# 初始化素材文件夹路径
self.materials_folder_path = Path(materials_folder_path)
if not self.materials_folder_path.exists():
raise RuntimeError("素材文件夹路径不存在")
# 初始化所有素材
self.materials = self._init_materials()
# 初始化成品文件夹路径
self.products_folder_path = Path(
materials_folder_path.replace("materials", "products")
)
# 若成品文件夹存路径已存在则先删除
if self.products_folder_path.exists():
shutil.rmtree(self.products_folder_path)
self.products_folder_path.mkdir(parents=True)
# 初始化剪映草稿文件夹路径
self.drafts_folder_path = Path(drafts_folder_path)
if not self.drafts_folder_path.exists():
raise RuntimeError("剪映草稿文件夹路径不存在")
# 初始化节点配置
self.configurations = self._init_configurations()
# 初始化剪映草稿文件夹管理器
self.drafts_folder = DraftFolder(folder_path=drafts_folder_path)
# 实例化缓存
self.caches = Caches()
except Exception as exception:
raise RuntimeError(f"发生异常:{str(exception)}") from exception
def _init_materials(self) -> Dict[str, List[Any]]:
"""
初始化所有素材
:return: 所有素材
"""
materials = {}
# 构建字幕文本路径
subtitle_text_path = self.materials_folder_path / "字幕文本.txt"
if subtitle_text_path.exists() and subtitle_text_path.is_file():
with open(subtitle_text_path, "r", encoding="utf-8") as file:
# 字幕文本列表
subtitle_texts = file.readlines()
if not subtitle_texts:
raise RuntimeError("字幕文本为空")
materials["subtitle_texts"] = subtitle_texts
else:
materials["subtitle_texts"] = []
# 构建字幕视频文件夹路径
subtitle_video_folder_path = self.materials_folder_path / "字幕视频"
if subtitle_video_folder_path.exists() and subtitle_video_folder_path.is_dir():
materials["subtitle_video_paths"] = [
subtitle_video_path
for subtitle_video_path in subtitle_video_folder_path.rglob("*.mov")
]
else:
materials["subtitle_video_paths"] = []
# 构建背景视频文件夹路径
background_video_folder_path = self.materials_folder_path / "背景视频"
if (
background_video_folder_path.exists()
and background_video_folder_path.is_dir()
):
materials["background_video_paths"] = [
background_video_path
for background_video_path in background_video_folder_path.rglob("*.mp4")
]
else:
materials["background_video_paths"] = []
# 构建达人视频文件夹路径
kol_video_folder_path = self.materials_folder_path / "达人视频"
if kol_video_folder_path.exists() and kol_video_folder_path.is_dir():
materials["kol_video_paths"] = [
kol_video_path
for kol_video_path in kol_video_folder_path.rglob("*.mp4")
]
else:
materials["kol_video_paths"] = []
# 构建背景音频文件夹路径
background_audio_folder_path = self.materials_folder_path / "背景音频"
if (
background_audio_folder_path.exists()
and background_audio_folder_path.is_dir()
):
materials["background_audio_paths"] = [
background_audio_path
for background_audio_path in background_audio_folder_path.rglob("*.mp3")
]
else:
materials["background_audio_paths"] = []
# 构建标识图片路径
logo_image_path = self.materials_folder_path / "标识图片.png"
if logo_image_path.exists() and logo_image_path.is_file():
materials["logo_image_path"] = [logo_image_path] # 有且只有一张标识
else:
materials["logo_image_path"] = []
# 构建标识视频文件夹路径
logo_video_folder_path = self.materials_folder_path / "标识视频"
if logo_video_folder_path.exists() and logo_video_folder_path.is_dir():
materials["logo_video_path"] = [
file_path for file_path in logo_video_folder_path.rglob("*.mov")
]
else:
materials["logo_video_path"] = []
# 构建声明文本路径
statement_text_path = self.materials_folder_path / "声明文本.txt"
if statement_text_path.exists() and statement_text_path.is_file():
with open(statement_text_path, "r", encoding="utf-8") as file:
# 声明文本列表
statement_texts = file.readlines()
if not statement_texts:
raise RuntimeError("声明文本为空")
materials["statement_texts"] = statement_texts
else:
materials["statement_texts"] = []
# 构建声明视频文件夹路径
statement_video_folder_path = self.materials_folder_path / "声明视频"
if (
statement_video_folder_path.exists()
and statement_video_folder_path.is_dir()
):
materials["statement_video_path"] = [
statement_video_path
for statement_video_path in statement_video_folder_path.rglob("*.mov")
]
else:
materials["statement_video_path"] = []
return materials
def _init_configurations(
self,
) -> Dict[str, Any]:
"""
初始化节点配置
:return: 节点配置
"""
# 已配置工作流
workflows = {
"默认工作流": [
"generate_subtitle",
"add_background_video",
"add_statement",
"add_sticker",
"add_sticker_arrow",
], # 默认工作流,先根据字幕文本合成字幕音频并生成字幕,再叠加背景视频、声明视频、非箭头贴纸视频和箭头贴纸视频
"淘宝闪购": [
"add_subtitle_video",
"add_background_video",
"add_background_audio",
"add_statement_video",
], # 淘宝闪购,先根据字幕视频获取其持续时长并作为成品持续时长,再叠加背景视频、背景音频和生成视频
"淘宝闪购_达人": [
"add_background_video",
"add_background_audio",
"add_subtitle_video",
"add_kol_video",
"add_statement_video",
], # 淘宝闪购_达人第一段先根据字幕视频获取前5秒作为持续时长再叠加背景视频、背景音频第二段背景视频达人拼接第一段和第二段再叠加声明视频
}
# 默认以素材文件夹名称为工作流名称
workflow_name = self.materials_folder_path.stem
# 工作流
workflow = workflows.get(workflow_name)
if not workflow:
raise RuntimeError(f"未配置该工作流 {workflow_name}")
# 节点配置模板
configurations = {
"generate_subtitle": {
"texts": self.materials["subtitle_texts"],
"style": [
{"size": 10.0},
], # 字体样式
"effect": [
{"effect_id": "7127561998556089631"},
{"effect_id": "7166467215410187552"},
{"effect_id": "6896138122774498567"},
{"effect_id": "7166469374507765031"},
{"effect_id": "6896137924853763336"},
], # 花字设置
}, # 生成字幕工作配置
"add_subtitle_video": {
"material_path": self.materials["subtitle_video_paths"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加字幕视频工作配置
"add_background_video": {
"material_path": self.materials["background_video_paths"],
"volume": [1.0], # 播放音量
"clip_settings": [
{
"scale_x": 1.0,
"scale_y": 1.0,
},
], # 图像调节设置
}, # 添加背景视频工作配置
"add_kol_video": {
"material_path": self.materials["kol_video_paths"],
"volume": [1.0], # 播放音量
"clip_settings": [
{
"scale_x": 1.0,
"scale_y": 1.0,
},
], # 图像调节设置
}, # 添加达人视频工作配置
"add_background_audio": {
"material_path": self.materials["background_audio_paths"],
"volume": [0.6], # 播放音量
}, # 添加背景音频工作配置
"add_logo_image": {
"material_path": self.materials["logo_image_path"],
"clip_settings": [
{
"scale_x": 0.2,
"scale_y": 0.2,
"transform_x": -0.78,
"transform_y": 0.82,
},
],
}, # 添加标识工作配置
"add_logo_video": {
"material_path": self.materials["logo_video_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加标识视频工作配置
"add_statement": {
"text": self.materials["statement_texts"],
"style": [
{"size": 6.0, "align": 1, "vertical": True},
], # 文本样式
"border": [
{"width": 35.0},
{"width": 40.0},
{"width": 45.0},
], # 描边宽度
"clip_settings": [
{
"transform_x": -0.82,
},
], # 图像调节设置
}, # 添加声明工作配置
"add_statement_video": {
"material_path": self.materials["statement_video_path"],
"volume": [1.0], # 播放音量
"clip_settings": [
None,
], # 图像调节设置
}, # 添加声明视频工作配置
"add_sticker": {
"resource_id": [
"7110124379568098568",
"7019687632804334861",
"6895933678262750478",
"7010558788675652900",
"7026858083393588487",
],
"clip_settings": [
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": -0.75,
"transform_y": 0.75,
},
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_y": 0.75,
},
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": 0.75,
"transform_y": 0.75,
},
], # 图像调节设置
}, # 添加非箭头贴纸工作配置
"add_sticker_arrow": {
"resource_id": [
"7143078914989018379",
"7142870400358255905",
"7185568038027103544",
"7024342011440319781",
"7205042602184363322",
],
"clip_settings": [
{
"scale_x": 0.75,
"scale_y": 0.75,
"transform_x": -0.8,
"transform_y": -0.62,
},
], # 图像调节设置
}, # 添加箭头贴纸工作配置
}
# 达人视频特殊处理:字幕视频、背景视频和背景音频素材持续时长为 5秒叠加达人视频
if workflow_name == "淘宝闪购_达人":
configurations.update(
{
node: {
**configurations[node],
"target_timerange": [
(0, 5_000_000),
],
}
for node in [
"add_subtitle_video",
"add_background_video",
"add_background_audio",
]
}
)
configurations.update(
{
node: {
**configurations[node],
"target_timerange": [
(5_000_000, None),
],
}
for node in [
"add_kol_video",
]
}
)
# 若包含添加背景音频节点则在添加背景视频时其播放音量设置为 0
if "add_background_audio" in workflow:
configurations["add_background_video"]["volume"] = [0.0]
return {node_name: configurations[node_name] for node_name in workflow}
def batch_create(
self,
draft_counts: int,
video_width: int = 1080,
video_height: int = 1920,
video_fps: int = 30,
) -> None:
"""
批量创建草稿
:param draft_counts: 草稿数
:param video_width: 视频宽度单位为像素默认为 1080
:param video_height: 视频高度单位为像素默认为 1920
:param video_fps: 视频帧率单位为帧/默认为 30
:return:
"""
draft_index = 1 # 草稿索引
while True:
# 获取节点配置
configurations = self._get_configurations()
video_duration = VideoMaterial(
path=configurations["add_background_video"]["material_path"].as_posix()
).duration # 默认将背景视频素材持续时长作为视频持续时长(单位为微秒)
# 达人视频特殊处理:固定 5秒加上达人视频素材持续时长
if "add_kol_video" in configurations:
video_duration = (
5_000_000
+ VideoMaterial(
path=configurations["add_kol_video"]["material_path"].as_posix()
).duration
)
# 生成剪映草稿名称
draft_name = self._generate_draft_name(
configurations=configurations,
)
# 若草稿名称已缓存则跳过
if self.caches.query(draft_name=draft_name):
continue
print(f"正在创建草稿 {draft_name}({draft_index}/{draft_counts})...")
# 创建剪映草稿
draft = Drafts(
materials_folder_path=self.materials_folder_path,
drafts_folder=self.drafts_folder,
draft_name=draft_name,
video_width=video_width,
video_height=video_height,
video_fps=video_fps,
video_duration=video_duration,
)
for node_name in configurations:
match node_name:
# 添加字幕
case "generate_subtitle":
print("-> 正在根据字幕文本合成字幕音频并生成字幕...", end="")
draft.generate_subtitle(**configurations[node_name])
print("已完成")
# 添加字幕视频
case "add_subtitle_video":
print("-> 正在添加字幕视频...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加背景视频
case "add_background_video":
print("-> 正在添加背景视频...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加达人视频
case "add_kol_video":
print("-> 正在添加达人视频...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加背景音频
case "add_background_audio":
print("-> 正在添加背景音频...", end="")
draft.add_audio_segment(**configurations[node_name])
print("已完成")
# 添加标识
case "add_logo_image":
print("-> 正在添加标识图片...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加标识视频
case "add_logo_video":
print("-> 正在添加标识视频...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加声明文本
case "add_statement":
print("-> 正在添加声明文本...", end="")
draft.add_text_segment(**configurations[node_name])
print("已完成")
# 添加声明视频
case "add_statement_video":
print("-> 正在添加声明视频...", end="")
draft.add_video_segment(**configurations[node_name])
print("已完成")
# 添加贴纸
case _ if node_name.startswith("add_sticker"):
print("-> 正在添加贴纸...", end="")
draft.add_sticker(**configurations[node_name])
print("已完成")
# 保存草稿
case "save":
print("-> 正在保存草稿...", end="")
draft.save()
print("已完成")
# 缓存草稿名称和所有节点配置
self.caches.update(
draft_name=draft_name,
configurations=configurations,
)
print("已完成")
print()
draft_index += 1
if draft_index > draft_counts:
break
def _get_configurations(
self,
) -> Dict[str, Any]:
"""
获取节点配置
:return: 节点配置
"""
configurations = {}
for node_name in self.configurations:
# 根据节点名称获取节点配置
configurations.update(
{
node_name: {
key: random.choice(value)
for key, value in self.configurations[node_name].items()
}
}
)
# 若非生成字幕则在工作流配置添加轨道名称
if node_name != "generate_subtitle":
configurations[node_name]["track_name"] = (
matched.group("track_name")
if (
matched := re.match(
pattern=r"^.+_(?P<track_name>.+_.+)$",
string=node_name,
)
)
else node_name
)
# 添加保存节点
configurations.update(
{
"save": {},
}
)
return configurations
def _generate_draft_name(
self,
configurations: Dict[str, Any],
) -> str:
"""
生成剪映草稿名称
:param configurations: 指定工作流所有节点配置
:return: 草稿名称
"""
return (
hashlib.md5(
json.dumps(
obj=configurations,
cls=self.caches.JSONEncoder,
sort_keys=True,
ensure_ascii=False,
).encode("utf-8")
) # 将工作流配置序列化
.hexdigest()
.upper() # MD5哈希值的大写十六进制作为草稿名称
)

View File

@ -3,16 +3,16 @@
主模块
"""
from export import JianYingExport
# 列举导入模块
from jiangying_manager import JianYingManager
if __name__ == "__main__":
# 实例化 JianYingExport
jianying_export = JianYingExport(
materials_folder_path=r"E:\jianying\materials\淘宝闪购模版001",
# 实例化 JianYingManager
jianying_manager = JianYingManager(
materials_folder_path=r"E:\jianying\materials\淘宝闪购_达人",
)
# 导出视频
jianying_export.export_videos(
workflow_name="0001",
draft_counts=10,
jianying_manager.batch_create(
draft_counts=1,
)