This commit is contained in:
liubiren 2026-03-08 12:00:03 +08:00
parent d9fd963230
commit 020f6fe12e
38 changed files with 740 additions and 0 deletions

Binary file not shown.

View File

@ -0,0 +1 @@
淘宝闪购

View File

@ -0,0 +1,375 @@
# -*- coding: utf-8 -*-
"""
创建剪映草稿
"""
import asyncio
from pathlib import Path
from pathlib import Path
from random import choice
from typing import Any, Dict, List, Optional, Tuple, Union
from typing import Tuple, Union
import edge_tts
from pyJianYingDraft import (
AudioMaterial,
AudioSegment,
ClipSettings,
DraftFolder,
FontType,
KeyframeProperty,
TextBackground,
TextBorder,
TextSegment,
TextStyle,
TrackType,
VideoMaterial,
VideoSegment,
trange,
)
class JianYingDraftGenerator:
"""
剪映草稿生成器
"""
def __init__(
self,
drafts_folder_path: str = r"E:\JianYingPro Drafts",
):
"""
初始化
:param drafts_folder_path: 剪映草稿文件夹路径
"""
try:
# 初始化草稿文件夹管理器
self.drafts_manager = DraftFolder(
folder_path=Path(drafts_folder_path).as_posix()
)
# 初始化素材文件夹路径
self.materials_folder_path = Path(__file__).parent
except Exception as exception:
raise RuntimeError(f"发生异常:{str(exception)}") from exception
def text_to_speech(
self,
audio_path: Path,
text: str,
) -> None:
"""
合成音频并本地保存音频文件
:param audio_path: 音频文件路径
:param text: 待合成音频的文本
"""
try:
# 异步调用 EdgeTTS 合成音频
async def _async_synthetize():
communicator = edge_tts.Communicate(
text=text.strip(),
voice="zh-CN-XiaoxiaoNeural", # 语音角色
rate="+20%", # 语速
volume="+10%", # 音量
pitch="+15Hz", # 音调
)
await communicator.save(audio_path.as_posix())
return
return asyncio.run(_async_synthetize())
except Exception as exception:
raise RuntimeError(
f"合成音频并本地保存音频文件发生异常:{str(exception)}"
) from exception
def add_text_segment(
self,
track_name: str,
text: str,
timerange: Tuple[Union[int, str], Union[int, str]],
font: Optional[str] = None,
style: Optional[Dict[str, Any]] = None,
border: Optional[Dict[str, Any]] = None,
background: Optional[Dict[str, Any]] = None,
clip_settings: Optional[Dict[str, Any]] = None,
bubble: Optional[Dict[str, Any]] = None,
effect: Optional[Dict[str, Any]] = None,
animation: Optional[Dict[str, Any]] = None,
) -> None:
"""
向指定文本轨道添加文本片段
:param track_name: 轨道名称
:param add_track: 添加文本轨道默认为是
:param text: 文本
:param timerange: 文本素材在轨道上的范围包括开始时间和持续时长默认为草稿持续时长
:param font: 字体默认为系统
:param style: 字体样式默认为字号 15对齐方式 左对齐
:param border: 文本描边设置默认为无
:param background: 文本背景设置默认为无
:param clip_settings: 图像调节设置默认为移动至 (0, 0)
:param bubble: 气泡设置默认为无
:param effect: 花字设置默认为无
:param animation: 动画设置默认为无
:return:
"""
try:
# 构建文本片段
text_segment = TextSegment(
text=text.replace("\\n", "\n"),
timerange=trange(*timerange),
font=FontType(font) if font else None,
style=TextStyle(**style) if style else None,
border=TextBorder(**border) if border else None,
background=(TextBackground(**background) if background else None),
clip_settings=(
ClipSettings(**clip_settings) if clip_settings else None
),
)
# 添加气泡
if bubble:
text_segment.add_bubble(**bubble)
# 添加花字
# 将花字保存预设后在C:/Users/admin/AppData/Local/JianyingPro/User Data/Presets/Text_V2/预设文本?.textpreset获取花字resource_id
if effect:
text_segment.add_effect(**effect)
# 添加动画
if animation:
text_segment.add_animation(**animation)
# 向指定文本轨道添加文本片段
self.draft.add_segment(segment=text_segment, track_name=track_name)
except Exception as exception:
raise RuntimeError(str(exception)) from exception
def add_audio_segment(
self,
track_name: str,
material: str,
target_timerange: Tuple[Union[int, str], Union[int, str]],
source_timerange: Optional[Tuple[str, str]] = None,
speed: float = 1.0,
volume: float = 1.0,
fade: Optional[Tuple[str, str]] = None,
) -> None:
"""
向指定音频轨道添加音频片段
:param track_name: 轨道名称
:param add_track: 添加音频轨道默认为是
:param material: 音频素材
:param target_timerange: 音频素材在轨道上的范围包括开始时间和持续时长
:param source_timerange: 截取音频素材范围包括开始时间和持续时长默认根据音频素材开始时间根据播放速度截取与音频素材持续时长等长的部分
:param speed: 播放速度默认为 1.0
:param volume: 播放音量默认为 1.0
:param fade: 淡入淡出设置默认为无
:return:
"""
try:
# 构建音频片段
audio_segment = AudioSegment(
material=material,
target_timerange=trange(*target_timerange),
source_timerange=(
trange(*source_timerange) if source_timerange else None
),
speed=speed,
volume=volume,
)
# 添加淡入淡出
if fade:
audio_segment.add_fade(*fade)
# 向指定音频轨道添加音频片段
self.draft.add_segment(segment=audio_segment, track_name=track_name)
except Exception as exception:
raise RuntimeError(str(exception)) from exception
def add_video_segment(
self,
track_name: str,
material: str,
target_timerange: Tuple[Union[int, str], Union[int, str]],
speed: float = 1.0,
volume: float = 1.0,
clip_settings: Optional[Dict[str, Any]] = None,
keyframes: Optional[
List[Tuple[KeyframeProperty, Union[str, int], float]]
] = None,
animation: Optional[Dict[str, Any]] = None,
transition: Optional[Dict[str, Any]] = None,
background_filling: Optional[Dict[str, Any]] = None,
) -> None:
"""
向指定视频轨道添加视频或图片片段
:param track_name: 轨道名称
:param materia: 视频或图片素材
:param target_timerange: 视频或图片素材在轨道上的范围包括开始时间和持续时长默认为草稿持续时长
:param source_timerange: 截取视频或图片素材范围包括开始时间和持续时长默认为空
:param speed: 播放速度默认为 1.0
:param volume: 播放音量默认为 1.0
:param clip_settings: 图像调节设置默认为无
:param keyframes: 关键帧设置默认为无
:param animation: 动画设置默认为无
:param transition: 转场设置默认为无
:param background_filling: 背景填充设置默认为无
:return:
"""
try:
# 构建视频或图片片段
video_segment = VideoSegment(
material=material,
target_timerange=trange(*target_timerange),
speed=speed,
volume=volume,
clip_settings=(
ClipSettings(**clip_settings) if clip_settings else None
),
)
# 添加关键帧
if keyframes:
for _property, offset, value in keyframes:
video_segment.add_keyframe(_property, offset, value)
# 添加动画
if animation:
video_segment.add_animation(**animation)
# 添加转场
if transition:
video_segment.add_transition(**transition)
# 添加背景填充
if background_filling:
video_segment.add_background_filling(**background_filling)
# 向指定视频轨道添加视频或图片片段
self.draft.add_segment(segment=video_segment, track_name=track_name)
except Exception as exception:
raise RuntimeError(str(exception)) from exception
def create_draft(
self,
task_id: str,
contents: List[str],
):
"""
创建草稿
:param task_id: 任务标识
:param contents: 分镜内容列表
:return:
"""
# 新建草稿
self.draft = self.drafts_manager.create_draft(
draft_name=task_id,
allow_replace=True, # 允许覆盖
width=1080,
height=1920,
fps=30,
)
# 添加视频轨道
self.draft.add_track(
track_type=TrackType.video,
track_name=(video_track_name := "video"),
)
# 添加文本轨道
self.draft.add_track(
track_type=TrackType.text,
track_name=(text_track_name := "text"),
)
# 添加音频轨道(字幕音频)
self.draft.add_track(
track_type=TrackType.audio,
track_name=(subtitle_audio_track_name := "subtitle_audio"),
)
# 添加音频轨道(背景音频)
self.draft.add_track(
track_type=TrackType.audio,
track_name=(background_audio_track_name := "background_audio"),
)
# 构造字幕音频文件夹路径
subtitle_audios_folder_path = self.materials_folder_path / "subtitle_audios"
subtitle_audios_folder_path.mkdir(exist_ok=True)
duration = 0 # 视频持续时长
for i, content in enumerate(contents, start=1):
content_duration = 0 # 素材持续时长
for j, text in enumerate(
[text for text in content.split("") if text.strip()], start=1
):
# 构建片段字幕音频文件路径
audio_path = (
subtitle_audios_folder_path / f"{task_id}_{i:02d}_{j:02d}.mp3"
)
if not audio_path.exists():
# 合成片段字幕音频并本地保存片段字幕音频文件
self.text_to_speech(
audio_path=audio_path,
text=text,
)
# 片段字幕音频素材化
audio = AudioMaterial(path=audio_path.as_posix())
# 向文本轨道添加文本片段(片段字幕文本)
self.add_text_segment(
track_name=text_track_name,
text=text,
timerange=(duration, audio.duration),
style={
"size": 12.0,
"align": 1, # 水平居中0为左对齐、1为水平居中、2为右对齐
"auto_wrapping": True, # 自动换行
}, # 字体样式字号为12水平居中
clip_settings={
"transform_y": -0.85,
}, # 图像设置垂直下移60%
effect={
"effect_id": "6896137924853763336", # 使用花字
},
)
# 向字幕音频的音频轨道添加音频片段(片段字幕音频)
self.add_audio_segment(
track_name=subtitle_audio_track_name,
material=audio.path,
target_timerange=(duration, audio.duration),
volume=1.0,
)
duration += audio.duration
content_duration += audio.duration
# 构建分镜视频路径
video_path = Path(
self.materials_folder_path / "videos" / f"{task_id}_{i:02d}.mp4"
)
# 分镜视频素材化
video = VideoMaterial(path=video_path.as_posix())
# 向视频轨道添加视频片段(分镜视频)
self.add_video_segment(
track_name=video_track_name,
material=video.path,
target_timerange=(duration - content_duration, content_duration),
volume=0.0,
speed=round(video.duration / content_duration, 2) - 0.01, # 冗余安全量
)
# 构建背景音频文件路径
background_audio_path = choice(
list((self.materials_folder_path / "background_audios").glob("*.mp3"))
)
# 背景音频素材化
background_audio = AudioMaterial(path=background_audio_path.as_posix())
# 向背景音频的音频轨道添加音频片段(背景音频)
self.add_audio_segment(
track_name=background_audio_track_name,
material=background_audio.path,
target_timerange=(0, duration),
volume=0.4,
)
# 将草稿保存至剪映草稿文件夹内
self.draft.save()

Binary file not shown.

After

Width:  |  Height:  |  Size: 650 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 605 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 512 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 548 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 645 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 798 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 834 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 640 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 418 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 722 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 905 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 750 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 608 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 774 KiB

View File

@ -0,0 +1,294 @@
# -*- coding: utf-8 -*-
"""
主程序
"""
import warnings
warnings.filterwarnings(
action="ignore", category=UserWarning, module="volcenginesdkarkruntime.*"
)
from base64 import b64decode, b64encode
import json
from pathlib import Path
import sys
from time import sleep
from typing import Any, Dict, List
from uuid import uuid4
from volcenginesdkarkruntime import Ark
from volcenginesdkarkruntime.types.responses import (
ResponseOutputMessage,
ResponseOutputText,
)
sys.path.append(Path(__file__).parent.parent.as_posix())
from utils.request import Request
from create_draft import JianYingDraftGenerator
# 初始化火山引擎 Ark 客户端
ark_client = Ark(
base_url="https://ark.cn-beijing.volces.com/api/v3",
api_key="2c28ab07-888c-45be-84a2-fc4b2cb5f3f2",
) # 本人火山引擎账密
# 初始化请求客户端
request_client = Request()
def get_brand_words() -> List[str]:
"""
获取品牌词
:return: 品牌词
"""
try:
with open(
file=Path(__file__).parent / "brand_words.txt", mode="r", encoding="utf-8"
) as file: # Trae IDE 需要指定文件路径(和 PyCharm 不同)
brand_words = [line.strip() for line in file.readlines() if line.strip()]
assert brand_words, "品牌词为空"
return brand_words
except FileNotFoundError:
raise FileNotFoundError("未找到品牌词文件")
except Exception as exception:
raise exception
def generate_task_id() -> str:
"""
生成任务标识
:return: 任务ID
"""
return uuid4().hex.upper().replace("-", "")
def get_storyboard(brand_word: str) -> Dict[str, Any]:
"""
获取分镜脚本
:param brand_word: 品牌词
:return: 分镜脚本
"""
try:
with open(
file=Path(__file__).parent / "storyboard_prompt_template.txt",
mode="r",
encoding="utf-8",
) as file:
storyboard_prompt = file.read()
# 重构分镜脚本的提示词
storyboard_prompt = storyboard_prompt.replace("{{品牌词}}", brand_word)
except FileNotFoundError:
raise FileNotFoundError("未找到分镜脚本的提示词模板文件")
except Exception as exception:
raise exception
retries = 0 # 重试次数
while True:
try:
# 调用 doubao-seed 语言大模型,基于分镜脚本的提示词生成分镜脚本
response = ark_client.responses.create(
model="doubao-seed-2-0-pro-260215",
input=[
{
"role": "user",
"content": [{"type": "input_text", "text": storyboard_prompt}],
}
],
)
# 解析响应并反序列化以此作为分镜脚本
storyboard = json.loads(
[item for item in [item for item in response.output if isinstance(item, ResponseOutputMessage)][0].content if isinstance(item, ResponseOutputText)][0].text # type: ignore
)
return storyboard
except Exception as exception:
retries += 1
if retries > 2:
raise Exception(f"获取分镜脚本发生异常,{str(exception)}")
continue
def generate_frame(frame_prompt: str, frame_name: str) -> None:
"""
生成视频帧
:param frame_prompt: 视频帧提示词
:param frame_name: 视频帧名称
:return: None
"""
# 构建视频帧路径
frame_path = Path(__file__).parent / "frames" / frame_name
# 若视频帧已存在则直接返回
if frame_path.exists():
return
retries = 0 # 重试次数
while True:
try:
# 调用 doubao-seedream 图像大模型,基于视频帧提示词生成视频帧
#
response = ark_client.images.generate(
model="doubao-seedream-4-5-251128",
prompt=frame_prompt,
sequential_image_generation="disabled", # 关闭组图输出
size="1600x2848",
watermark=False, # 关闭水印
response_format="b64_json", # 图像数据为base64编码的JSON字符串
)
# 解析响应
frame_base64 = b64decode(response.data[0].b64_json)
# 本地保存视频帧
with open(file=frame_path, mode="wb") as file:
file.write(frame_base64)
return
except Exception as exception:
retries += 1
if retries > 2:
raise Exception(f"生成视频帧发生异常,{str(exception)}")
continue
def generate_video(
video_name: str,
first_frame_name: str,
last_frame_name: str,
shot_prompt: str,
video_duration: int,
) -> None:
"""
生成视频
:param video_name: 视频名称
:param first_frame_name: 首帧名称
:param last_frame_name: 尾帧名称
:param shot_prompt: 运镜提示词
:param video_duration: 视频时长
:return: None
"""
# 若视频已存在则直接返回
if (Path(__file__).parent / "videos" / video_name).exists():
return
# 构建首帧路径
first_frame_path = Path(__file__).parent / "frames" / first_frame_name
if not first_frame_path.exists():
raise RuntimeError(f"首帧 {first_frame_name} 不存在")
with open(file=first_frame_path, mode="rb") as file:
first_frame_base64 = b64encode(file.read()).decode("utf-8")
# 构建尾帧路径
last_frame_path = Path(__file__).parent / "frames" / last_frame_name
if not last_frame_path.exists():
raise RuntimeError(f"尾帧 {last_frame_name} 不存在")
with open(file=last_frame_path, mode="rb") as file:
last_frame_base64 = b64encode(file.read()).decode("utf-8")
retries = 0 # 重试次数
while True:
try:
# 调用 doubao-seedrance 视频大模型,基于首尾帧和运镜提示词生成视频
# 创建视频生成任务
create_response = ark_client.content_generation.tasks.create(
model="doubao-seedance-1-5-pro-251215",
content=[
{"type": "text", "text": shot_prompt},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{first_frame_base64}"
},
"role": "first_frame",
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{last_frame_base64}"
},
"role": "last_frame",
},
],
ratio="9:16", # 视频的宽高比例
duration=video_duration, # 视频时长doubao-seedance-1-5-pro-251215 有效范围为[4, 12]
watermark=False, # 关闭水印
)
# 轮询查询视频生成任务
while True:
# 查询视频生成任务
query_response = ark_client.content_generation.tasks.get(
task_id=create_response.id,
)
# 根据视频生成任务的状态匹配处理方法
match query_response.status:
case "succeeded":
video_url = query_response.content.video_url
# 下载视频
chunk_generator = request_client.download(
url=video_url,
stream_enabled=True, # 开启流式传输
)
# 本地保存视频
with open(
file=Path(__file__).parent / "videos" / video_name,
mode="wb",
) as file:
for chunk in chunk_generator:
file.write(chunk)
return
case "failed":
raise Exception(f"{query_response.error}")
case _:
sleep(5) # 避免频繁请求查询视频生成任务故显性等待
except Exception as exception:
retries += 1
if retries > 2:
raise Exception(f"生成视频发生异常,{str(exception)}")
continue
if __name__ == "__main__":
# 分镜视频时长列表
video_durations = [15, 4, 8, 8, 4]
# 遍历品牌词
for brand_word in get_brand_words():
# 生成任务标识
task_id = generate_task_id()
# 获取分镜脚本
storyboard = get_storyboard(
brand_word=brand_word,
)
# 遍历分镜
for i, shot in enumerate(storyboard["分镜脚本"], start=1):
# 构建分镜首帧名称
first_frame_name = f"{task_id}_{i:02d}_first.jpeg"
# 生成分镜首帧
generate_frame(
frame_prompt=shot["首帧提示词"], # 分镜首帧提示词
frame_name=first_frame_name,
)
# 构建分镜尾帧名称
last_frame_name = f"{task_id}_{i:02d}_last.jpeg"
# 生成分镜尾帧
generate_frame(
frame_prompt=shot["尾帧提示词"], # 分镜尾帧提示词
frame_name=last_frame_name,
)
# 生成视频
generate_video(
video_name=f"{task_id}_{i:02d}.mp4", # 分镜视频名称
video_duration=video_durations[i], # 分镜视频时长
first_frame_name=first_frame_name,
last_frame_name=last_frame_name,
shot_prompt=shot["运镜提示词"], # 分镜运镜提示词
)
# 生成剪映草稿
draft_generator = JianYingDraftGenerator()
draft_generator.create_draft(
task_id=task_id,
contents=[shot["口播"] for shot in storyboard["分镜脚本"]],
)

View File

@ -0,0 +1,70 @@
你作为一名专业、优秀的广告编导,请为品牌“{{品牌词}}”生成一条24秒营销广告分镜脚本。该脚本用于AI生成短视频投放于快手、今日头条、视频号等短视频平台。
必须严格遵守以下所有规则,不可遗漏任何字段:
1. 内容要求
- 符合品牌真实经营范围与营销风格
- 不得虚构、不得违规
2. 时长结构
- 第一段4秒就该品牌所解决的核心痛点构思场景吸引用户注意
- 第二段8秒体现该品牌并构思该品牌2~3个价值点
- 第三段8秒围绕价值点构思实际场景
- 第四段4秒引导用户点击视频下方链接
3. 口播要求(用于 EdgeTTS
- 第一段口播一句话515个汉字
- 第二段口播1530个汉字
- 第三段口播1530个汉字
- 第四段口播一句话515个汉字
- 口语化、情绪饱满、适合短视频传播
4. 首帧 / 尾帧提示词(用于 Doubao-Seedream-5.0 文生图)
- 用简洁连贯的自然语言完整且详细描述:主体+位置+状态+环境+色彩+光影
- 9:16 竖屏1080P 高清,干净 无水印
- 首帧:元素刚出现,半透明,小尺寸,特效未展开
- 尾帧:元素完全展示,高亮饱满,特效完整
- 画面和本段口播匹配,自然、有质感,适合广告营销,
5. 运镜提示词(用于 Doubao-Seedance-2.0 首尾帧生视频,必须英文)
- 运镜平稳、无抖动
6. 输出强制要求
- 只输出标准 JSON无任何多余文字
- 不要解释、不要前言、不要后缀
- 不要 ```json 标记
- 严格按下面结构输出
{
"核心营销要点": "一句话总结品牌核心价值",
"分镜脚本": [
{
"阶段": "第一段",
"口播": "一句话口播",
"首帧提示词": "...",
"尾帧提示词": "...",
"运镜提示词": "..."
},
{
"阶段": "第二段",
"口播": "...",
"首帧提示词": "...",
"尾帧提示词": "...",
"运镜提示词": "..."
},
{
"阶段": "第三段",
"口播": "...",
"首帧提示词": "...",
"尾帧提示词": "...",
"运镜提示词": "..."
},
{
"阶段": "第四段",
"口播": "...",
"首帧提示词": "...",
"尾帧提示词": "...",
"运镜提示词": "..."
}
]
}