1. 视觉& Veo
API接口文档
  • 更新日志
  • 介绍
  • 稳定性保障
  • 模型接口
    • 谷歌gemini
      • 聊天
        • 文本生成
        • 文本生成-流式
        • 文本生成-流式-思考
      • 图像
        • 图片生成-文生图
        • 图片生成-图生图-传入base64编码
        • 图片生成-图生图-控制长宽比
        • 图片生成-图生图-控制长宽比和分辨率
        • 图片生成-依托 Google 图片搜索生成图片
        • 图片编辑
        • 图片理解
      • 语音
        • Gemini-音频理解
        • 文字转语音-单人
        • 文字转语音-多人
      • 文档
        • 文档理解
      • 工具
        • 函数调用
        • 代码执行
        • Google Search
        • Google Maps
        • URL context
      • 视觉& Veo
        • 视频生成
        • 视频理解
          POST
      • 嵌入(Embeddings)
        • Embeddings
    • Claude
      • 聊天
        • 聊天对话
        • 思考
      • 图片
        • 图片理解-本地图片
        • 图片理解-URL图片
      • 文档
        • PDF文档识别-URL文档
        • PDF文档识别 -本地文档
      • 工具
        • 函数调用
    • OpenAI
      • 聊天
        • 聊天对话
      • 图片
        • 图片识别-传入url
        • 图片识别-本地图片
        • gpt-图像编辑
        • gpt-文本生图
      • 视频
        • Sora-视频生成官方格式
      • 语音
        • TTS-文本转语音
      • 使用工具
        • 网络搜索
    • 可灵
      • 图像生成
      • 文生视频
      • 图生视频
      • 查询任务
    • MJ图像
      • 快速教学
      • 切换不同的速度
      • 任务提交
        • 提交Imagine
        • 执行Action任务
        • 提交Describe任务
        • 提交Blend任务
      • 任务查询
        • 查询所有任务
        • 根据ID列表查询任务
        • 指定ID获取任务
        • 获取任务图片的seed
    • xAI
      • 聊天
        • 文本生成
        • 流式输出
        • 思维Reasoning
      • 图片
        • 图片理解
        • 图片生成
      • 视频
        • 视频生成
      • 语音
        • 文字生成语音
      • 工具
        • 函数调用
        • 网络搜索
    • minimax
      • MiniMax概述
    • 模型(Models)
      • 列出模型
    • 小米mimo
      • 模型超参
      • 对话
        • openai格式
        • Anthropic API格式
      • 图片
        • 图片理解
        • 图片传入方式
        • 图片限制
      • 视频
        • 视频理解
  • 常见错误
    • Unexpected role "tool". Allowed roles are "user" or "assistant". For instructions on how to use tools
    • This model is not supported by Responses API.
    • 403 Forbidden {"message":"用户额度不足, 剩余额度: $-1.74
    • The model is overloaded. Please try again later
    • No available channel for model
    • API接口返回HTTP 状态码及其含义
  1. 视觉& Veo

视频生成

文生视频生成
选择一个示例,了解如何生成视频,其中包含对话、电影级真实感或创意动画:
import time
import requests
import json

BASE_URL = "https://"
API_KEY = "sk"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

prompt = """A close up of two people staring at a cryptic drawing on a wall, torchlight flickering.
A man murmurs, 'This must be it. That's the secret code.' The woman looks at him and whispering excitedly, 'What did you find?'"""

# ============================================================
# 第1步:发起视频生成请求
# ============================================================
print("发起视频生成请求...")

response = requests.post(
    f"{BASE_URL}/v1/video/generations",
    headers=headers,
    json={
        "model": "veo-3.1-generate-preview",
        "prompt": prompt
    }
)

if response.status_code != 200:
    print(f"请求失败: {response.status_code}")
    print(response.text)
    exit(1)

result = response.json()
print("提交成功:", json.dumps(result, indent=2))

task_id = result.get("id")
operation_name = result.get("name")
print(f"任务 ID: {task_id}")

# ============================================================
# 第2步:轮询任务状态,直到视频生成完成
# ============================================================

# 尝试多种可能的轮询端点
poll_urls = [
    f"{BASE_URL}/v1/video/generations/{task_id}",
    f"{BASE_URL}/v1/video/{task_id}",
    f"{BASE_URL}/v1/videos/{task_id}",
    f"{BASE_URL}/v1/generation/{task_id}",
]

poll_url = None

while True:
    print("等待视频生成完成...")
    time.sleep(10)

    if poll_url:
        # 已经找到正确的轮询地址
        resp = requests.get(poll_url, headers=headers)
        if resp.status_code == 200:
            status_data = resp.json()
        else:
            print(f"轮询失败: {resp.status_code} {resp.text[:300]}")
            continue
    else:
        # 还没找到,逐个尝试
        status_data = None
        for url in poll_urls:
            resp = requests.get(url, headers=headers)
            print(f"  尝试轮询: {url} -> {resp.status_code}")
            if resp.status_code == 200:
                text = resp.text[:100]
                if not text.startswith("<!doctype") and not text.startswith("<html"):
                    try:
                        status_data = resp.json()
                        poll_url = url
                        print(f"  找到轮询端点: {url}")
                        break
                    except json.JSONDecodeError:
                        continue

        if status_data is None:
            print("  所有轮询端点均失败,10秒后重试...")
            continue

    status = status_data.get("status", "")
    print(f"  当前状态: {status}")
    print(f"  响应: {json.dumps(status_data, indent=2)[:500]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("视频生成完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("视频生成失败:")
        print(json.dumps(status_data, indent=2))
        exit(1)

# ============================================================
# 第3步:下载视频
# ============================================================
output_file = "dialogue_example.mp4"

# 尝试从不同字段提取视频 URL
video_url = None

# 常见字段名
for key in ["video_url", "url", "output", "result", "download_url", "video"]:
    val = status_data.get(key)
    if isinstance(val, str) and (val.startswith("http") or val.startswith("/")):
        video_url = val
        break
    if isinstance(val, dict):
        for subkey in ["url", "uri", "download_url"]:
            subval = val.get(subkey)
            if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                video_url = subval
                break
        if video_url:
            break

# 检查 data/results 数组
for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
    items = status_data.get(key, [])
    if isinstance(items, list) and len(items) > 0:
        item = items[0]
        if isinstance(item, str) and item.startswith("http"):
            video_url = item
            break
        if isinstance(item, dict):
            for subkey in ["url", "uri", "video_url", "download_url", "video"]:
                subval = item.get(subkey)
                if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                    video_url = subval
                    break
                if isinstance(subval, dict):
                    for sskey in ["url", "uri"]:
                        ssval = subval.get(sskey)
                        if isinstance(ssval, str) and ssval.startswith("http"):
                            video_url = ssval
                            break
                    if video_url:
                        break
            if video_url:
                break

if video_url:
    if video_url.startswith("/"):
        video_url = f"{BASE_URL}{video_url}"

    print(f"下载视频: {video_url}")
    dl = requests.get(video_url, headers=headers, stream=True)

    if dl.status_code == 200:
        with open(output_file, "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Generated video saved to {output_file}")
    else:
        print(f"下载失败: {dl.status_code}")
        print(dl.text[:500])
else:
    print("无法从响应中提取视频下载地址,完整响应:")
    print(json.dumps(status_data, indent=2))
控制宽高比
借助 Veo 3.1,您可以创建横屏视频(16:9,默认设置)或竖屏视频 (9:16)。您可以使用 aspect_ratio 参数告知模型您想要哪个:
关键代码段
operation = client.models.generate_videos(
    model="veo-3.1-generate-preview",
    prompt=prompt,
    config=types.GenerateVideosConfig(
      aspect_ratio="9:16",
    ),
)
控制分辨率
Veo 3.1 还可以直接生成 720p、1080p 或 4k 视频。
请注意,分辨率越高,延迟时间就越长。4K 视频的价格也更高(请参阅价格)。
视频扩展广告也仅限于 720p 视频。
operation = client.models.generate_videos(
    model="veo-3.1-generate-preview",
    prompt=prompt,
    config=types.GenerateVideosConfig(
      resolution="4k",
    ),
)
图片转视频生成
以下代码演示了如何使用 Gemini 2.5 Flash Image(又称 Nano Banana)生成图片,然后将该图片用作起始帧,以使用 Veo 3.1 生成视频。
import time
import requests
import json
import base64

BASE_URL = "https://"
API_KEY = "sk-"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

prompt = "Panning wide shot of a calico kitten sleeping in the sunshine"

# ============================================================
# 第1步:用 gemini-2.5-flash 生成图片
# ============================================================
print("=" * 60)
print("第1步:生成图片")
print("=" * 60)

# 尝试多种可能的图片生成端点
image_endpoints = [
    f"{BASE_URL}/v1/images/generations",
    f"{BASE_URL}/v1/image/generations",
    f"{BASE_URL}/v1/chat/completions",  # OpenAI兼容格式
]

# 对应不同端点的请求体
image_payloads = {
    f"{BASE_URL}/v1/images/generations": {
        "model": "gemini-2.5-flash-image",
        "prompt": prompt,
    },
    f"{BASE_URL}/v1/image/generations": {
        "model": "gemini-2.5-flash-image",
        "prompt": prompt,
    },
    f"{BASE_URL}/v1/chat/completions": {
        "model": "gemini-2.5-flash-image",
        "messages": [{"role": "user", "content": prompt}],
        "response_modalities": ["IMAGE"],
    },
}

image_data = None
image_url = None
image_b64 = None
image_response = None

for endpoint in image_endpoints:
    payload = image_payloads[endpoint]
    print(f"\n尝试图片生成端点: {endpoint}")
    print(f"请求体: {json.dumps(payload, indent=2)[:300]}")

    try:
        resp = requests.post(endpoint, headers=headers, json=payload, timeout=120)
        print(f"  状态码: {resp.status_code}")

        if resp.status_code == 200:
            text = resp.text[:100]
            if text.startswith("<!doctype") or text.startswith("<html"):
                print(f"  返回HTML页面,跳过")
                continue

            try:
                image_response = resp.json()
                print(f"  ✓ 响应: {json.dumps(image_response, indent=2)[:500]}")

                # 提取图片数据 - 多种格式兼容
                # 格式1: {"data": [{"url": "..."}, ...]}
                data_list = image_response.get("data", [])
                if data_list and isinstance(data_list, list):
                    item = data_list[0]
                    if isinstance(item, dict):
                        image_url = item.get("url")
                        image_b64 = item.get("b64_json")

                # 格式2: OpenAI chat 格式,图片在 message content 中
                choices = image_response.get("choices", [])
                if choices:
                    message = choices[0].get("message", {})
                    content = message.get("content", "")
                    # 内容可能是数组
                    if isinstance(content, list):
                        for part in content:
                            if isinstance(part, dict):
                                if part.get("type") == "image_url":
                                    image_url = part.get("image_url", {}).get("url")
                                elif part.get("type") == "image":
                                    image_b64 = part.get("data") or part.get("b64_json")
                                    image_url = part.get("url")

                # 格式3: 直接字段
                if not image_url and not image_b64:
                    image_url = image_response.get("url")
                    image_b64 = image_response.get("b64_json")

                if image_url or image_b64:
                    print(f"  ✓ 成功获取图片!")
                    if image_url:
                        print(f"  图片URL: {image_url[:200]}")
                    if image_b64:
                        print(f"  图片Base64长度: {len(image_b64)}")
                    break
                else:
                    print(f"  响应中未找到图片数据,继续尝试...")

            except json.JSONDecodeError:
                print(f"  响应非JSON: {resp.text[:200]}")
                continue
        else:
            print(f"  失败: {resp.text[:300]}")
    except Exception as e:
        print(f"  异常: {e}")

# 下载图片为二进制数据
if image_url:
    print(f"\n下载图片: {image_url[:200]}")
    if image_url.startswith("data:"):
        # data URI 格式
        _, encoded = image_url.split(",", 1)
        image_data = base64.b64decode(encoded)
    else:
        dl = requests.get(image_url, timeout=60)
        if dl.status_code == 200:
            image_data = dl.content
        else:
            print(f"图片下载失败: {dl.status_code}")
elif image_b64:
    image_data = base64.b64decode(image_b64)

if image_data:
    with open("step1_image.png", "wb") as f:
        f.write(image_data)
    print(f"✅ 图片已保存到 step1_image.png ({len(image_data)} bytes)")
    image_b64_str = base64.b64encode(image_data).decode("utf-8")
else:
    print("❌ 图片生成失败,无法继续")
    print("完整响应:", json.dumps(image_response, indent=2) if image_response else "无响应")
    exit(1)

# ============================================================
# 第2步:用 veo-3.1 + 图片生成视频
# ============================================================
print("\n" + "=" * 60)
print("第2步:图片 + 提示词 → 视频生成")
print("=" * 60)

# 尝试多种请求体格式
video_payloads = [
    # 格式1: image 字段传 base64
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image": {
            "data": image_b64_str,
            "mime_type": "image/png"
        }
    },
    # 格式2: image 直接传 base64 字符串
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image": f"data:image/png;base64,{image_b64_str}"
    },
    # 格式3: 如果图片有URL,传URL
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image_url": image_url if image_url and not image_url.startswith("data:") else None
    },
    # 格式4: first_frame
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "first_frame": {
            "data": image_b64_str,
            "mime_type": "image/png"
        }
    },
]

video_endpoint = f"{BASE_URL}/v1/video/generations"
task_id = None
submit_response = None

for i, payload in enumerate(video_payloads):
    # 跳过 image_url 为 None 的
    if payload.get("image_url") is None and "image_url" in payload:
        continue

    # 避免请求体过大时打印全部
    payload_preview = {k: (v[:80] + "..." if isinstance(v, str) and len(v) > 80 else v) for k, v in payload.items()}
    if isinstance(payload.get("image"), dict):
        payload_preview["image"] = {"data": f"[base64, {len(image_b64_str)} chars]", "mime_type": "image/png"}
    if isinstance(payload.get("first_frame"), dict):
        payload_preview["first_frame"] = {"data": f"[base64, {len(image_b64_str)} chars]", "mime_type": "image/png"}

    print(f"\n尝试视频生成格式 {i+1}: {video_endpoint}")
    print(f"请求体: {json.dumps(payload_preview, indent=2)}")

    try:
        resp = requests.post(video_endpoint, headers=headers, json=payload, timeout=30)
        print(f"  状态码: {resp.status_code}")
        print(f"  响应: {resp.text[:500]}")

        if resp.status_code == 200:
            try:
                submit_response = resp.json()
                task_id = submit_response.get("id")
                if task_id:
                    print(f"  ✓ 视频任务提交成功! ID: {task_id}")
                    break
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"  异常: {e}")

if not task_id:
    print("\n❌ 视频生成请求提交失败")
    exit(1)

# ============================================================
# 第3步:轮询视频生成状态
# ============================================================
print("\n" + "=" * 60)
print("第3步:轮询视频生成状态")
print("=" * 60)

poll_urls = [
    f"{BASE_URL}/v1/video/generations/{task_id}",
    f"{BASE_URL}/v1/video/{task_id}",
    f"{BASE_URL}/v1/videos/{task_id}",
]

poll_url = None
round_count = 0
status_data = None

while True:
    round_count += 1
    print(f"\n--- 第 {round_count} 次轮询 ---")
    time.sleep(10)

    if poll_url:
        print(f"轮询地址: {poll_url}")
        resp = requests.get(poll_url, headers=headers)
        print(f"状态码: {resp.status_code}")
        if resp.status_code == 200:
            try:
                status_data = resp.json()
            except json.JSONDecodeError:
                print(f"响应非JSON: {resp.text[:300]}")
                continue
        else:
            print(f"轮询失败: {resp.text[:300]}")
            continue
    else:
        status_data = None
        for url in poll_urls:
            print(f"尝试轮询地址: {url}")
            resp = requests.get(url, headers=headers)
            print(f"  状态码: {resp.status_code}")

            if resp.status_code == 200:
                text = resp.text[:100]
                if text.startswith("<!doctype") or text.startswith("<html"):
                    print(f"  返回HTML页面,跳过")
                    continue
                try:
                    status_data = resp.json()
                    poll_url = url
                    print(f"  ✓ 找到有效轮询端点: {url}")
                    break
                except json.JSONDecodeError:
                    print(f"  响应非JSON,跳过")
                    continue
            else:
                print(f"  失败: {resp.text[:200]}")

        if status_data is None:
            print("所有轮询端点均失败,10秒后重试...")
            continue

    status = status_data.get("status", "")
    print(f"当前状态: {status}")
    print(f"响应内容: {json.dumps(status_data, indent=2)[:600]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("\n✅ 视频生成完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("\n❌ 视频生成失败:")
        print(json.dumps(status_data, indent=2))
        exit(1)

# ============================================================
# 第4步:下载视频
# ============================================================
print("\n" + "=" * 60)
print("第4步:下载视频")
print("=" * 60)

output_file = "veo3_with_image_input.mp4"
video_url = None

# 多种字段提取
for key in ["video_url", "url", "output", "result", "download_url", "video"]:
    val = status_data.get(key)
    if isinstance(val, str) and (val.startswith("http") or val.startswith("/")):
        video_url = val
        break
    if isinstance(val, dict):
        for subkey in ["url", "uri", "download_url"]:
            subval = val.get(subkey)
            if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                video_url = subval
                break
        if video_url:
            break

for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
    if video_url:
        break
    items = status_data.get(key, [])
    if isinstance(items, list) and len(items) > 0:
        item = items[0]
        if isinstance(item, str) and item.startswith("http"):
            video_url = item
            break
        if isinstance(item, dict):
            for subkey in ["url", "uri", "video_url", "download_url", "video"]:
                subval = item.get(subkey)
                if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                    video_url = subval
                    break
                if isinstance(subval, dict):
                    for sskey in ["url", "uri"]:
                        ssval = subval.get(sskey)
                        if isinstance(ssval, str) and ssval.startswith("http"):
                            video_url = ssval
                            break
                    if video_url:
                        break
            if video_url:
                break

if video_url:
    if video_url.startswith("/"):
        video_url = f"{BASE_URL}{video_url}"

    print(f"下载地址: {video_url}")
    dl = requests.get(video_url, headers=headers, stream=True)

    if dl.status_code == 200:
        with open(output_file, "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"✅ Generated video saved to {output_file}")
    else:
        print(f"下载失败: {dl.status_code}")
        print(dl.text[:500])
else:
    print("无法从响应中提取视频下载地址,完整响应:")
    print(json.dumps(status_data, indent=2))
使用参考图片
注意: 此功能仅适用于 Veo 3.1 型号。
Veo 3.1 现在最多可接受 3 张参考图片,以指导生成的视频的内容。提供人物、角色或产品的图片,以便在输出视频中保留主题的外观。
import time
import requests
import json
import base64

BASE_URL = "https://"
API_KEY = "sk"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

prompt = "The video opens with a medium, eye-level shot of a beautiful woman with dark hair and warm brown eyes. She wears a magnificent, high-fashion flamingo dress with layers of pink and fuchsia feathers, complemented by whimsical pink, heart-shaped sunglasses. She walks with serene confidence through the crystal-clear, shallow turquoise water of a sun-drenched lagoon. The camera slowly pulls back to a medium-wide shot, revealing the breathtaking scene as the dress's long train glides and floats gracefully on the water's surface behind her. The cinematic, dreamlike atmosphere is enhanced by the vibrant colors of the dress against the serene, minimalist landscape, capturing a moment of pure elegance and high-fashion fantasy."

# ============================================================
# 第1步:用 Gemini 生成3张参考图片
# ============================================================
print("=" * 60)
print("第1步:生成参考图片 (dress / sunglasses / woman)")
print("=" * 60)

image_prompts = {
    "dress": "A magnificent high-fashion flamingo dress with layers of pink and fuchsia feathers, long flowing train, displayed on a mannequin, studio photography, white background",
    "sunglasses": "A pair of whimsical pink heart-shaped sunglasses, product photography, white background, high detail",
    "woman": "A beautiful woman with dark hair and warm brown eyes, portrait photography, soft lighting, neutral background, elegant and serene expression",
}

reference_images = {}  # key -> base64 string

for name, img_prompt in image_prompts.items():
    print(f"\n--- 生成 {name} 参考图 ---")
    print(f"提示词: {img_prompt[:80]}...")

    # 尝试 /v1/images/generations
    resp = requests.post(
        f"{BASE_URL}/v1/images/generations",
        headers=headers,
        json={
            "model": "gemini-2.5-flash-image",
            "prompt": img_prompt,
        },
        timeout=120
    )
    print(f"  端点: {BASE_URL}/v1/images/generations")
    print(f"  状态码: {resp.status_code}")

    image_data = None
    image_url = None
    image_b64 = None

    if resp.status_code == 200:
        try:
            img_resp = resp.json()
            print(f"  响应: {json.dumps(img_resp, indent=2)[:400]}")

            # 提取图片: data[0].url 或 data[0].b64_json
            data_list = img_resp.get("data", [])
            if data_list and isinstance(data_list, list):
                item = data_list[0]
                if isinstance(item, dict):
                    image_url = item.get("url")
                    image_b64 = item.get("b64_json")

            # 直接字段
            if not image_url and not image_b64:
                image_url = img_resp.get("url")
                image_b64 = img_resp.get("b64_json")

        except json.JSONDecodeError:
            print(f"  响应非JSON: {resp.text[:200]}")

    # 如果 /v1/images/generations 失败,尝试 /v1/chat/completions
    if not image_url and not image_b64:
        print(f"  尝试备选端点: {BASE_URL}/v1/chat/completions")
        resp2 = requests.post(
            f"{BASE_URL}/v1/chat/completions",
            headers=headers,
            json={
                "model": "gemini-2.5-flash-image",
                "messages": [{"role": "user", "content": img_prompt}],
                "response_modalities": ["IMAGE"],
            },
            timeout=120
        )
        print(f"  状态码: {resp2.status_code}")

        if resp2.status_code == 200:
            try:
                chat_resp = resp2.json()
                print(f"  响应: {json.dumps(chat_resp, indent=2)[:400]}")

                choices = chat_resp.get("choices", [])
                if choices:
                    message = choices[0].get("message", {})
                    content = message.get("content", "")
                    if isinstance(content, list):
                        for part in content:
                            if isinstance(part, dict):
                                if part.get("type") == "image_url":
                                    image_url = part.get("image_url", {}).get("url")
                                elif part.get("type") == "image":
                                    image_b64 = part.get("data") or part.get("b64_json")
                                    image_url = part.get("url")
            except json.JSONDecodeError:
                print(f"  响应非JSON: {resp2.text[:200]}")

    # 下载/解码图片
    if image_url:
        if image_url.startswith("data:"):
            _, encoded = image_url.split(",", 1)
            image_data = base64.b64decode(encoded)
        else:
            dl = requests.get(image_url, timeout=60)
            if dl.status_code == 200:
                image_data = dl.content
    elif image_b64:
        image_data = base64.b64decode(image_b64)

    if image_data:
        filename = f"step1_{name}.png"
        with open(filename, "wb") as f:
            f.write(image_data)
        reference_images[name] = base64.b64encode(image_data).decode("utf-8")
        print(f"  ✅ {name} 图片已保存到 {filename} ({len(image_data)} bytes)")
    else:
        print(f"  ❌ {name} 图片生成失败!")

print(f"\n成功生成 {len(reference_images)}/3 张参考图片")

if len(reference_images) == 0:
    print("❌ 没有参考图片,无法继续")
    exit(1)

# ============================================================
# 第2步:用 veo-3.1 + 参考图片生成视频
# ============================================================
print("\n" + "=" * 60)
print("第2步:参考图片 + 提示词 → 视频生成")
print("=" * 60)

video_endpoint = f"{BASE_URL}/v1/video/generations"

# 构建 reference_images 数组,对应 SDK 的 VideoGenerationReferenceImage
reference_images_payload = []
ref_type_map = {
    "dress": "asset",
    "sunglasses": "asset",
    "woman": "asset",
}

for name, b64_str in reference_images.items():
    reference_images_payload.append({
        "image": {
            "data": b64_str,
            "mime_type": "image/png"
        },
        "reference_type": ref_type_map.get(name, "asset")
    })

# 尝试多种请求体格式
video_payloads = [
    # 格式1: reference_images 数组 (对应SDK格式)
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "reference_images": reference_images_payload,
    },
    # 格式2: config 嵌套
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "config": {
            "reference_images": reference_images_payload,
        },
    },
    # 格式3: images 数组
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "images": [
            {
                "data": f"data:image/png;base64,{b64_str}",
                "reference_type": ref_type_map.get(name, "asset")
            }
            for name, b64_str in reference_images.items()
        ],
    },
]

task_id = None
submit_response = None

for i, payload in enumerate(video_payloads):
    # 构造简短预览(不打印完整base64)
    payload_preview = json.dumps(payload, indent=2)
    # 截断base64数据用于显示
    if len(payload_preview) > 500:
        payload_preview = payload_preview[:200] + "\n  ... [base64 data truncated] ...\n" + payload_preview[-200:]

    print(f"\n尝试视频生成格式 {i+1}: {video_endpoint}")
    print(f"请求体预览: {payload_preview}")

    try:
        resp = requests.post(video_endpoint, headers=headers, json=payload, timeout=30)
        print(f"  状态码: {resp.status_code}")
        print(f"  响应: {resp.text[:500]}")

        if resp.status_code == 200:
            try:
                submit_response = resp.json()
                task_id = submit_response.get("id")
                if task_id:
                    print(f"  ✓ 视频任务提交成功! ID: {task_id}")
                    break
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"  异常: {e}")

if not task_id:
    print("\n❌ 视频生成请求提交失败")
    exit(1)

# ============================================================
# 第3步:轮询视频生成状态
# ============================================================
print("\n" + "=" * 60)
print("第3步:轮询视频生成状态")
print("=" * 60)

poll_urls = [
    f"{BASE_URL}/v1/video/generations/{task_id}",
    f"{BASE_URL}/v1/video/{task_id}",
    f"{BASE_URL}/v1/videos/{task_id}",
]

poll_url = None
round_count = 0
status_data = None

while True:
    round_count += 1
    print(f"\n--- 第 {round_count} 次轮询 ---")
    time.sleep(10)

    if poll_url:
        print(f"轮询地址: {poll_url}")
        resp = requests.get(poll_url, headers=headers)
        print(f"状态码: {resp.status_code}")
        if resp.status_code == 200:
            try:
                status_data = resp.json()
            except json.JSONDecodeError:
                print(f"响应非JSON: {resp.text[:300]}")
                continue
        else:
            print(f"轮询失败: {resp.text[:300]}")
            continue
    else:
        status_data = None
        for url in poll_urls:
            print(f"尝试轮询地址: {url}")
            resp = requests.get(url, headers=headers)
            print(f"  状态码: {resp.status_code}")

            if resp.status_code == 200:
                text = resp.text[:100]
                if text.startswith("<!doctype") or text.startswith("<html"):
                    print(f"  返回HTML页面,跳过")
                    continue
                try:
                    status_data = resp.json()
                    poll_url = url
                    print(f"  ✓ 找到有效轮询端点: {url}")
                    break
                except json.JSONDecodeError:
                    print(f"  响应非JSON,跳过")
                    continue
            else:
                print(f"  失败: {resp.text[:200]}")

        if status_data is None:
            print("所有轮询端点均失败,10秒后重试...")
            continue

    status = status_data.get("status", "")
    print(f"当前状态: {status}")
    print(f"响应内容: {json.dumps(status_data, indent=2)[:600]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("\n✅ 视频生成完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("\n❌ 视频生成失败:")
        print(json.dumps(status_data, indent=2))
        exit(1)

# ============================================================
# 第4步:下载视频
# ============================================================
print("\n" + "=" * 60)
print("第4步:下载视频")
print("=" * 60)

output_file = "veo3.1_with_reference_images.mp4"
video_url = None

# 多种字段提取
for key in ["video_url", "url", "output", "result", "download_url", "video"]:
    val = status_data.get(key)
    if isinstance(val, str) and (val.startswith("http") or val.startswith("/")):
        video_url = val
        break
    if isinstance(val, dict):
        for subkey in ["url", "uri", "download_url"]:
            subval = val.get(subkey)
            if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                video_url = subval
                break
        if video_url:
            break

for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
    if video_url:
        break
    items = status_data.get(key, [])
    if isinstance(items, list) and len(items) > 0:
        item = items[0]
        if isinstance(item, str) and item.startswith("http"):
            video_url = item
            break
        if isinstance(item, dict):
            for subkey in ["url", "uri", "video_url", "download_url", "video"]:
                subval = item.get(subkey)
                if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                    video_url = subval
                    break
                if isinstance(subval, dict):
                    for sskey in ["url", "uri"]:
                        ssval = subval.get(sskey)
                        if isinstance(ssval, str) and ssval.startswith("http"):
                            video_url = ssval
                            break
                    if video_url:
                        break
            if video_url:
                break

if video_url:
    if video_url.startswith("/"):
        video_url = f"{BASE_URL}{video_url}"

    print(f"下载地址: {video_url}")
    dl = requests.get(video_url, headers=headers, stream=True)

    if dl.status_code == 200:
        with open(output_file, "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"✅ Generated video saved to {output_file}")
    else:
        print(f"下载失败: {dl.status_code}")
        print(dl.text[:500])
else:
    print("无法从响应中提取视频下载地址,完整响应:")
    print(json.dumps(status_data, indent=2))
使用第一帧和最后一帧
注意: 此功能仅适用于 Veo 3.1 型号
借助 Veo 3.1,您可以使用插值或指定视频的第一帧和最后一帧来创作视频。
import time
import requests
import json
import base64

BASE_URL = "https://"
API_KEY = "sk-"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

prompt = "A cinematic, haunting video. A ghostly woman with long white hair and a flowing dress swings gently on a rope swing beneath a massive, gnarled tree in a foggy, moonlit clearing. The fog thickens and swirls around her, and she slowly fades away, vanishing completely. The empty swing is left swaying rhythmically on its own in the eerie silence."

# ============================================================
# 第1步:生成首帧图片 (first_image)
# ============================================================
print("=" * 60)
print("第1步:生成首帧图片 (first_image)")
print("=" * 60)

first_frame_prompt = "A ghostly woman with long white hair and a flowing white dress sitting on a rope swing beneath a massive gnarled tree, foggy moonlit clearing, cinematic, haunting atmosphere, ethereal glow"

first_image_data = None
first_image_b64 = None


def generate_image(img_prompt, label):
    """尝试多种端点生成图片,返回 (image_bytes, base64_str) 或 (None, None)"""
    image_data = None
    image_url = None
    image_b64 = None

    # 端点1: /v1/images/generations
    endpoint = f"{BASE_URL}/v1/images/generations"
    print(f"\n  尝试端点: {endpoint}")
    try:
        resp = requests.post(
            endpoint, headers=headers,
            json={"model": "gemini-2.5-flash-image", "prompt": img_prompt},
            timeout=120
        )
        print(f"  状态码: {resp.status_code}")
        if resp.status_code == 200:
            try:
                img_resp = resp.json()
                print(f"  响应: {json.dumps(img_resp, indent=2)[:400]}")
                data_list = img_resp.get("data", [])
                if data_list and isinstance(data_list, list):
                    item = data_list[0]
                    if isinstance(item, dict):
                        image_url = item.get("url")
                        image_b64 = item.get("b64_json")
                if not image_url and not image_b64:
                    image_url = img_resp.get("url")
                    image_b64 = img_resp.get("b64_json")
            except json.JSONDecodeError:
                print(f"  响应非JSON: {resp.text[:200]}")
    except Exception as e:
        print(f"  异常: {e}")

    # 端点2: /v1/chat/completions
    if not image_url and not image_b64:
        endpoint2 = f"{BASE_URL}/v1/chat/completions"
        print(f"  尝试备选端点: {endpoint2}")
        try:
            resp2 = requests.post(
                endpoint2, headers=headers,
                json={
                    "model": "gemini-2.5-flash-image",
                    "messages": [{"role": "user", "content": img_prompt}],
                    "response_modalities": ["IMAGE"],
                },
                timeout=120
            )
            print(f"  状态码: {resp2.status_code}")
            if resp2.status_code == 200:
                try:
                    chat_resp = resp2.json()
                    print(f"  响应: {json.dumps(chat_resp, indent=2)[:400]}")
                    choices = chat_resp.get("choices", [])
                    if choices:
                        message = choices[0].get("message", {})
                        content = message.get("content", "")
                        if isinstance(content, list):
                            for part in content:
                                if isinstance(part, dict):
                                    if part.get("type") == "image_url":
                                        image_url = part.get("image_url", {}).get("url")
                                    elif part.get("type") == "image":
                                        image_b64 = part.get("data") or part.get("b64_json")
                                        image_url = part.get("url")
                except json.JSONDecodeError:
                    print(f"  响应非JSON: {resp2.text[:200]}")
        except Exception as e:
            print(f"  异常: {e}")

    # 下载/解码
    if image_url:
        if image_url.startswith("data:"):
            _, encoded = image_url.split(",", 1)
            image_data = base64.b64decode(encoded)
        else:
            print(f"  下载图片: {image_url[:200]}")
            dl = requests.get(image_url, timeout=60)
            if dl.status_code == 200:
                image_data = dl.content
    elif image_b64:
        image_data = base64.b64decode(image_b64)

    if image_data:
        b64_str = base64.b64encode(image_data).decode("utf-8")
        return image_data, b64_str
    return None, None


first_image_data, first_image_b64 = generate_image(first_frame_prompt, "first_frame")

if first_image_data:
    with open("step1_first_frame.png", "wb") as f:
        f.write(first_image_data)
    print(f"\n✅ 首帧图片已保存到 step1_first_frame.png ({len(first_image_data)} bytes)")
else:
    print("\n❌ 首帧图片生成失败!")
    exit(1)

# ============================================================
# 第2步:生成末帧图片 (last_image)
# ============================================================
print("\n" + "=" * 60)
print("第2步:生成末帧图片 (last_image)")
print("=" * 60)

last_frame_prompt = "An empty rope swing swaying gently on its own beneath a massive gnarled tree, foggy moonlit clearing, no person visible, eerie silence, cinematic haunting atmosphere, the woman has vanished completely"

last_image_data, last_image_b64 = generate_image(last_frame_prompt, "last_frame")

if last_image_data:
    with open("step2_last_frame.png", "wb") as f:
        f.write(last_image_data)
    print(f"\n✅ 末帧图片已保存到 step2_last_frame.png ({len(last_image_data)} bytes)")
else:
    print("\n❌ 末帧图片生成失败!")
    exit(1)

# ============================================================
# 第3步:用 veo-3.1 + 首帧 + 末帧 生成视频 (插值)
# ============================================================
print("\n" + "=" * 60)
print("第3步:首帧 + 末帧 + 提示词 → 视频生成 (插值)")
print("=" * 60)

video_endpoint = f"{BASE_URL}/v1/video/generations"

# 尝试多种请求体格式
video_payloads = [
    # 格式1: image=首帧, config.last_frame=末帧 (最接近SDK原始格式)
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image": {
            "data": first_image_b64,
            "mime_type": "image/png"
        },
        "config": {
            "last_frame": {
                "data": last_image_b64,
                "mime_type": "image/png"
            }
        }
    },
    # 格式2: image + last_frame 都在顶层
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image": {
            "data": first_image_b64,
            "mime_type": "image/png"
        },
        "last_frame": {
            "data": last_image_b64,
            "mime_type": "image/png"
        }
    },
    # 格式3: first_frame + last_frame
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "first_frame": {
            "data": first_image_b64,
            "mime_type": "image/png"
        },
        "last_frame": {
            "data": last_image_b64,
            "mime_type": "image/png"
        }
    },
    # 格式4: data URI 格式
    {
        "model": "veo-3.1-generate-preview",
        "prompt": prompt,
        "image": f"data:image/png;base64,{first_image_b64}",
        "last_frame": f"data:image/png;base64,{last_image_b64}"
    },
]

task_id = None
submit_response = None

for i, payload in enumerate(video_payloads):
    print(f"\n尝试视频生成格式 {i+1}: {video_endpoint}")

    # 构造简短预览
    preview = {}
    for k, v in payload.items():
        if isinstance(v, dict) and "data" in v:
            preview[k] = {"data": f"[base64, {len(v['data'])} chars]", "mime_type": v.get("mime_type", "")}
        elif isinstance(v, str) and len(v) > 100:
            preview[k] = v[:60] + "...[truncated]"
        elif isinstance(v, dict):
            inner = {}
            for ik, iv in v.items():
                if isinstance(iv, dict) and "data" in iv:
                    inner[ik] = {"data": f"[base64, {len(iv['data'])} chars]", "mime_type": iv.get("mime_type", "")}
                else:
                    inner[ik] = iv
            preview[k] = inner
        else:
            preview[k] = v
    print(f"请求体预览: {json.dumps(preview, indent=2)}")

    try:
        resp = requests.post(video_endpoint, headers=headers, json=payload, timeout=30)
        print(f"  状态码: {resp.status_code}")
        print(f"  响应: {resp.text[:500]}")

        if resp.status_code == 200:
            try:
                submit_response = resp.json()
                task_id = submit_response.get("id")
                if task_id:
                    print(f"  ✓ 视频任务提交成功! ID: {task_id}")
                    break
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"  异常: {e}")

if not task_id:
    print("\n❌ 视频生成请求提交失败")
    exit(1)

# ============================================================
# 第4步:轮询视频生成状态
# ============================================================
print("\n" + "=" * 60)
print("第4步:轮询视频生成状态")
print("=" * 60)

poll_urls = [
    f"{BASE_URL}/v1/video/generations/{task_id}",
    f"{BASE_URL}/v1/video/{task_id}",
    f"{BASE_URL}/v1/videos/{task_id}",
]

poll_url = None
round_count = 0
status_data = None

while True:
    round_count += 1
    print(f"\n--- 第 {round_count} 次轮询 ---")
    time.sleep(10)

    if poll_url:
        print(f"轮询地址: {poll_url}")
        resp = requests.get(poll_url, headers=headers)
        print(f"状态码: {resp.status_code}")
        if resp.status_code == 200:
            try:
                status_data = resp.json()
            except json.JSONDecodeError:
                print(f"响应非JSON: {resp.text[:300]}")
                continue
        else:
            print(f"轮询失败: {resp.text[:300]}")
            continue
    else:
        status_data = None
        for url in poll_urls:
            print(f"尝试轮询地址: {url}")
            resp = requests.get(url, headers=headers)
            print(f"  状态码: {resp.status_code}")

            if resp.status_code == 200:
                text = resp.text[:100]
                if text.startswith("<!doctype") or text.startswith("<html"):
                    print(f"  返回HTML页面,跳过")
                    continue
                try:
                    status_data = resp.json()
                    poll_url = url
                    print(f"  ✓ 找到有效轮询端点: {url}")
                    break
                except json.JSONDecodeError:
                    print(f"  响应非JSON,跳过")
                    continue
            else:
                print(f"  失败: {resp.text[:200]}")

        if status_data is None:
            print("所有轮询端点均失败,10秒后重试...")
            continue

    status = status_data.get("status", "")
    print(f"当前状态: {status}")
    print(f"响应内容: {json.dumps(status_data, indent=2)[:600]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("\n✅ 视频生成完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("\n❌ 视频生成失败:")
        print(json.dumps(status_data, indent=2))
        exit(1)

# ============================================================
# 第5步:下载视频
# ============================================================
print("\n" + "=" * 60)
print("第5步:下载视频")
print("=" * 60)

output_file = "veo3.1_with_interpolation.mp4"
video_url = None

# 多种字段提取
for key in ["video_url", "url", "output", "result", "download_url", "video"]:
    val = status_data.get(key)
    if isinstance(val, str) and (val.startswith("http") or val.startswith("/")):
        video_url = val
        break
    if isinstance(val, dict):
        for subkey in ["url", "uri", "download_url"]:
            subval = val.get(subkey)
            if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                video_url = subval
                break
        if video_url:
            break

for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
    if video_url:
        break
    items = status_data.get(key, [])
    if isinstance(items, list) and len(items) > 0:
        item = items[0]
        if isinstance(item, str) and item.startswith("http"):
            video_url = item
            break
        if isinstance(item, dict):
            for subkey in ["url", "uri", "video_url", "download_url", "video"]:
                subval = item.get(subkey)
                if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                    video_url = subval
                    break
                if isinstance(subval, dict):
                    for sskey in ["url", "uri"]:
                        ssval = subval.get(sskey)
                        if isinstance(ssval, str) and ssval.startswith("http"):
                            video_url = ssval
                            break
                    if video_url:
                        break
            if video_url:
                break

if video_url:
    if video_url.startswith("/"):
        video_url = f"{BASE_URL}{video_url}"

    print(f"下载地址: {video_url}")
    dl = requests.get(video_url, headers=headers, stream=True)

    if dl.status_code == 200:
        with open(output_file, "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"✅ Generated video saved to {output_file}")
    else:
        print(f"下载失败: {dl.status_code}")
        print(dl.text[:500])
else:
    print("无法从响应中提取视频下载地址,完整响应:")
    print(json.dumps(status_data, indent=2))
延长 Veo 视频
注意: 此功能仅适用于 Veo 3.1 型号
使用 Veo 3.1 可将之前使用 Veo 生成的视频延长 7 秒,最多可延长 20 次。
输入视频限制:
Veo 生成的视频时长上限为 141 秒。
Gemini API 仅支持 Veo 生成的视频的视频扩展功能。
视频应来自上一代设备,例如 operation.response.generated_videos[0].video
视频的存储期限为 2 天,但如果视频被引用以用于扩展,其 2 天的存储期限计时器会重置。您只能延长过去两天内生成或引用的视频。
输入视频应具有一定的时长、宽高比和尺寸:
宽高比:9:16 或 16:9
分辨率:720p
视频时长:不超过 141 秒
该扩展程序的输出是一个视频,其中包含用户输入的视频和生成的扩展视频,总时长最长为 148 秒。
核心逻辑: 用 video= 参数传入上一次生成的视频对象来做视频延展(Video Extension),即在已有视频的基础上用新 prompt 继续生成后续片段
import time
import requests
import json
import base64

BASE_URL = "https://"
API_KEY = "sk-"

headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_KEY}"
}

# ============================================================
# 第1步:先生成一个初始视频(作为后续延展的素材)
# ============================================================
print("=" * 60)
print("第1步:生成初始视频(供后续延展使用)")
print("=" * 60)

initial_prompt = "A colorful butterfly flutters through a sunny garden with origami flowers, cinematic tracking shot, vibrant colors, shallow depth of field"

video_endpoint = f"{BASE_URL}/v1/video/generations"

initial_payloads = [
    {
        "model": "veo-3.1-generate-preview",
        "prompt": initial_prompt,
        "config": {
            "number_of_videos": 1,
            "resolution": "720p"
        }
    },
    {
        "model": "veo-3.1-generate-preview",
        "prompt": initial_prompt,
        "number_of_videos": 1,
        "resolution": "720p"
    },
]

initial_task_id = None

for i, payload in enumerate(initial_payloads):
    print(f"\n尝试初始视频生成格式 {i+1}: {video_endpoint}")
    print(f"请求体: {json.dumps(payload, indent=2)}")

    try:
        resp = requests.post(video_endpoint, headers=headers, json=payload, timeout=30)
        print(f"  状态码: {resp.status_code}")
        print(f"  响应: {resp.text[:500]}")

        if resp.status_code == 200:
            try:
                submit_resp = resp.json()
                initial_task_id = submit_resp.get("id")
                if initial_task_id:
                    print(f"  ✓ 初始视频任务提交成功! ID: {initial_task_id}")
                    break
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"  异常: {e}")

if not initial_task_id:
    print("\n❌ 初始视频生成请求提交失败")
    exit(1)

# 轮询初始视频状态
print("\n--- 轮询初始视频生成状态 ---")

poll_urls_initial = [
    f"{BASE_URL}/v1/video/generations/{initial_task_id}",
    f"{BASE_URL}/v1/video/{initial_task_id}",
    f"{BASE_URL}/v1/videos/{initial_task_id}",
]

poll_url = None
round_count = 0
initial_status_data = None

while True:
    round_count += 1
    print(f"\n--- 第 {round_count} 次轮询 ---")
    time.sleep(10)

    if poll_url:
        resp = requests.get(poll_url, headers=headers)
        if resp.status_code == 200:
            try:
                initial_status_data = resp.json()
            except json.JSONDecodeError:
                print(f"响应非JSON: {resp.text[:300]}")
                continue
        else:
            print(f"轮询失败: {resp.text[:300]}")
            continue
    else:
        initial_status_data = None
        for url in poll_urls_initial:
            print(f"尝试轮询地址: {url}")
            resp = requests.get(url, headers=headers)
            print(f"  状态码: {resp.status_code}")

            if resp.status_code == 200:
                text = resp.text[:100]
                if text.startswith("<!doctype") or text.startswith("<html"):
                    continue
                try:
                    initial_status_data = resp.json()
                    poll_url = url
                    print(f"  ✓ 找到有效轮询端点: {url}")
                    break
                except json.JSONDecodeError:
                    continue

        if initial_status_data is None:
            print("所有轮询端点均失败,10秒后重试...")
            continue

    status = initial_status_data.get("status", "")
    print(f"当前状态: {status}")
    print(f"响应: {json.dumps(initial_status_data, indent=2)[:600]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("\n✅ 初始视频生成完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("\n❌ 初始视频生成失败:")
        print(json.dumps(initial_status_data, indent=2))
        exit(1)

# 下载初始视频
print("\n--- 下载初始视频 ---")


def extract_video_url(data):
    """从状态响应中提取视频URL"""
    video_url = None
    for key in ["video_url", "url", "output", "result", "download_url", "video"]:
        val = data.get(key)
        if isinstance(val, str) and (val.startswith("http") or val.startswith("/")):
            return val
        if isinstance(val, dict):
            for subkey in ["url", "uri", "download_url"]:
                subval = val.get(subkey)
                if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                    return subval

    for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
        items = data.get(key, [])
        if isinstance(items, list) and len(items) > 0:
            item = items[0]
            if isinstance(item, str) and item.startswith("http"):
                return item
            if isinstance(item, dict):
                for subkey in ["url", "uri", "video_url", "download_url", "video"]:
                    subval = item.get(subkey)
                    if isinstance(subval, str) and (subval.startswith("http") or subval.startswith("/")):
                        return subval
                    if isinstance(subval, dict):
                        for sskey in ["url", "uri"]:
                            ssval = subval.get(sskey)
                            if isinstance(ssval, str) and ssval.startswith("http"):
                                return ssval
    return None


initial_video_url = extract_video_url(initial_status_data)

if initial_video_url:
    if initial_video_url.startswith("/"):
        initial_video_url = f"{BASE_URL}{initial_video_url}"
    print(f"初始视频URL: {initial_video_url}")
    dl = requests.get(initial_video_url, headers=headers, stream=True)
    if dl.status_code == 200:
        initial_video_bytes = b""
        with open("step1_initial_video.mp4", "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
                initial_video_bytes += chunk
        print(f"✅ 初始视频已保存到 step1_initial_video.mp4 ({len(initial_video_bytes)} bytes)")
    else:
        print(f"下载失败: {dl.status_code}")
        exit(1)
else:
    print("无法提取初始视频URL,完整响应:")
    print(json.dumps(initial_status_data, indent=2))
    exit(1)

# 同时提取视频的file信息(用于延展请求)
initial_video_file_id = None
initial_video_file_uri = None

# 从响应中寻找file相关信息
for key in ["data", "results", "videos", "generated_videos", "generatedVideos"]:
    items = initial_status_data.get(key, [])
    if isinstance(items, list) and len(items) > 0:
        item = items[0]
        if isinstance(item, dict):
            # 可能有 video.name, video.uri 等
            video_obj = item.get("video", {})
            if isinstance(video_obj, dict):
                initial_video_file_id = video_obj.get("name") or video_obj.get("id") or video_obj.get("file_id")
                initial_video_file_uri = video_obj.get("uri") or video_obj.get("url")
            initial_video_file_id = initial_video_file_id or item.get("file_id") or item.get("id") or item.get("name")

initial_video_b64 = base64.b64encode(initial_video_bytes).decode("utf-8")

# ============================================================
# 第2步:用初始视频做视频延展 (Video Extension)
# ============================================================
print("\n" + "=" * 60)
print("第2步:视频延展 (Video Extension)")
print("=" * 60)

extension_prompt = "Track the butterfly into the garden as it lands on an orange origami flower. A fluffy white puppy runs up and gently pats the flower."

# 尝试多种请求体格式
extension_payloads = [
    # 格式1: video 字段传 base64
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "video": {
            "data": initial_video_b64,
            "mime_type": "video/mp4"
        },
        "config": {
            "number_of_videos": 1,
            "resolution": "720p"
        }
    },
    # 格式2: video 传URL
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "video": initial_video_url,
        "config": {
            "number_of_videos": 1,
            "resolution": "720p"
        }
    },
    # 格式3: video_url 字段
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "video_url": initial_video_url,
        "number_of_videos": 1,
        "resolution": "720p"
    },
    # 格式4: 如果有 file_id
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "video": {
            "name": initial_video_file_id,
            "uri": initial_video_file_uri
        } if initial_video_file_id else initial_video_url,
        "config": {
            "number_of_videos": 1,
            "resolution": "720p"
        }
    },
    # 格式5: source_video
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "source_video": {
            "data": initial_video_b64,
            "mime_type": "video/mp4"
        },
        "number_of_videos": 1,
        "resolution": "720p"
    },
    # 格式6: input_video data URI
    {
        "model": "veo-3.1-generate-preview",
        "prompt": extension_prompt,
        "input_video": f"data:video/mp4;base64,{initial_video_b64}",
        "number_of_videos": 1,
        "resolution": "720p"
    },
]

ext_task_id = None

for i, payload in enumerate(extension_payloads):
    # 构造简短预览
    preview = {}
    for k, v in payload.items():
        if isinstance(v, dict) and "data" in v:
            preview[k] = {"data": f"[base64, {len(v['data'])} chars]", "mime_type": v.get("mime_type", "")}
        elif isinstance(v, str) and len(v) > 200:
            preview[k] = v[:80] + "...[truncated]"
        else:
            preview[k] = v
    print(f"\n尝试视频延展格式 {i+1}: {video_endpoint}")
    print(f"请求体预览: {json.dumps(preview, indent=2, default=str)}")

    try:
        resp = requests.post(video_endpoint, headers=headers, json=payload, timeout=60)
        print(f"  状态码: {resp.status_code}")
        print(f"  响应: {resp.text[:500]}")

        if resp.status_code == 200:
            try:
                ext_resp = resp.json()
                ext_task_id = ext_resp.get("id")
                if ext_task_id:
                    print(f"  ✓ 视频延展任务提交成功! ID: {ext_task_id}")
                    break
            except json.JSONDecodeError:
                continue
    except Exception as e:
        print(f"  异常: {e}")

if not ext_task_id:
    print("\n❌ 视频延展请求提交失败")
    exit(1)

# ============================================================
# 第3步:轮询视频延展状态
# ============================================================
print("\n" + "=" * 60)
print("第3步:轮询视频延展状态")
print("=" * 60)

poll_urls_ext = [
    f"{BASE_URL}/v1/video/generations/{ext_task_id}",
    f"{BASE_URL}/v1/video/{ext_task_id}",
    f"{BASE_URL}/v1/videos/{ext_task_id}",
]

poll_url = None
round_count = 0
ext_status_data = None

while True:
    round_count += 1
    print(f"\n--- 第 {round_count} 次轮询 ---")
    time.sleep(10)

    if poll_url:
        print(f"轮询地址: {poll_url}")
        resp = requests.get(poll_url, headers=headers)
        print(f"状态码: {resp.status_code}")
        if resp.status_code == 200:
            try:
                ext_status_data = resp.json()
            except json.JSONDecodeError:
                print(f"响应非JSON: {resp.text[:300]}")
                continue
        else:
            print(f"轮询失败: {resp.text[:300]}")
            continue
    else:
        ext_status_data = None
        for url in poll_urls_ext:
            print(f"尝试轮询地址: {url}")
            resp = requests.get(url, headers=headers)
            print(f"  状态码: {resp.status_code}")

            if resp.status_code == 200:
                text = resp.text[:100]
                if text.startswith("<!doctype") or text.startswith("<html"):
                    print(f"  返回HTML页面,跳过")
                    continue
                try:
                    ext_status_data = resp.json()
                    poll_url = url
                    print(f"  ✓ 找到有效轮询端点: {url}")
                    break
                except json.JSONDecodeError:
                    print(f"  响应非JSON,跳过")
                    continue
            else:
                print(f"  失败: {resp.text[:200]}")

        if ext_status_data is None:
            print("所有轮询端点均失败,10秒后重试...")
            continue

    status = ext_status_data.get("status", "")
    print(f"当前状态: {status}")
    print(f"响应内容: {json.dumps(ext_status_data, indent=2)[:600]}")

    if status in ("completed", "succeeded", "success", "done"):
        print("\n✅ 视频延展完成!")
        break
    elif status in ("failed", "error", "cancelled"):
        print("\n❌ 视频延展失败:")
        print(json.dumps(ext_status_data, indent=2))
        exit(1)

# ============================================================
# 第4步:下载延展后的视频
# ============================================================
print("\n" + "=" * 60)
print("第4步:下载延展后的视频")
print("=" * 60)

output_file = "veo3.1_extension.mp4"
video_url = extract_video_url(ext_status_data)

if video_url:
    if video_url.startswith("/"):
        video_url = f"{BASE_URL}{video_url}"

    print(f"下载地址: {video_url}")
    dl = requests.get(video_url, headers=headers, stream=True)

    if dl.status_code == 200:
        with open(output_file, "wb") as f:
            for chunk in dl.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"✅ Generated video saved to {output_file}")
    else:
        print(f"下载失败: {dl.status_code}")
        print(dl.text[:500])
else:
    print("无法从响应中提取视频下载地址,完整响应:")
    print(json.dumps(ext_status_data, indent=2))
限制
请求延迟时间:最短:11 秒;最长:6 分钟(高峰时段)。
地区限制:在欧盟、英国、瑞士、中东和北非地区,personGeneration 的允许值为:
Veo 3:仅限 allow_adult。
Veo 2:dont_allow 和 allow_adult。默认值为 dont_allow。
视频保留期限:生成的视频会在服务器上存储 2 天,之后会被移除。如需保存本地副本,您必须在视频生成后的 2 天内下载。加长版视频会被视为新生成的视频。
添加水印:Veo 创建的视频会使用 SynthID(我们的 AI 生成内容水印添加和识别工具)添加水印。您可以使用 SynthID 验证平台来验证视频。
安全性:生成的视频会通过安全过滤和记忆检查流程,以帮助降低隐私、版权和偏见风险。
音频错误:由于安全过滤条件或音频的其他处理问题,Veo 3.1 有时会阻止视频生成。
修改于 2026-03-12 09:36:47
上一页
URL context
下一页
视频理解
Built with