Sora 2 API 完整文档:集成教程与代码示例
Sora 2 API 使用教程:介绍
Sora 2 API 使开发者能够以编程方式生成 AI 视频,为自动化内容生产、工作流集成以及可扩展的视频制作打开了可能性。本 Sora 2 API 指南从环境配置到生产部署,全面覆盖集成流程。
你将学到什么
- 设置 API 身份验证
- 发出你的第一条 API 请求
- 高级生成参数
- 管理视频输出
- 错误处理与速率限制
- 生产环境最佳实践
- 真实集成示例
前置条件
- 具有 Sora 访问权限的 OpenAI API 账号
- 基础编程知识(Python 或 JavaScript)
- 对 REST API 的理解
- 熟悉 async/await 异步模式
Sora 2 API 使用教程:身份验证与设置
API 访问
Sora 2 API 当前向以下用户开放:
- 拥有审批访问的 OpenAI API 用户
- ChatGPT Pro 订阅者(每日 20 次请求)
- 企业客户(自定义配额)
价格(截至 2025 年 10 月):
- 每生成 1 秒视频:$0.10
- 5 秒视频:$0.50
- 20 秒视频:$2.00
- 60 秒视频:$6.00
身份验证
步骤 1:获取 API Key
- 访问 platform.openai.com/api-keys
- 点击 "Create new secret key"
- 输入名称(例如 "Sora Integration")
- 复制并安全保存该密钥
步骤 2:设置环境
# 安装 OpenAI SDK
npm install openai
# 或者
pip install openai
// JavaScript/Node.js
import OpenAI from 'openai';
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
# Python
from openai import OpenAI
client = OpenAI(
api_key=os.environ.get("OPENAI_API_KEY")
)
Sora 2 API 教程:首次视频生成请求
你的第一个视频生成请求
JavaScript 示例
async function generateVideo() {
try {
const video = await openai.videos.generate({
model: "sora-2",
prompt: "A golden retriever playing in a sunny garden",
duration: 10,
resolution: "1080p",
aspect_ratio: "16:9"
});
console.log("Video URL:", video.url);
return video;
} catch (error) {
console.error("Error:", error);
}
}
Python 示例
def generate_video():
try:
video = client.videos.generate(
model="sora-2",
prompt="A golden retriever playing in a sunny garden",
duration=10,
resolution="1080p",
aspect_ratio="16:9"
)
print(f"Video URL: {video.url}")
return video
except Exception as error:
print(f"Error: {error}")
响应对象
{
"id": "video_abc123",
"object": "video",
"created": 1704672000,
"model": "sora-2",
"status": "processing",
"prompt": "A golden retriever playing in a sunny garden",
"duration": 10,
"resolution": "1080p",
"aspect_ratio": "16:9",
"url": null,
"expires_at": 1704758400
}
Sora 2 API 高级参数:自定义视频生成
生成选项
const advancedVideo = await openai.videos.generate({
// 必填
model: "sora-2",
prompt: "Your detailed prompt here",
// 时长(5-60 秒)
duration: 20,
// 分辨率
resolution: "1080p", // 可选:"480p", "720p", "1080p"
// 纵横比
aspect_ratio: "16:9", // 可选:"16:9", "9:16", "1:1", "21:9"
// 帧率
fps: 30, // 24 或 30
// 可选:用于可重复性的种子
seed: 12345,
// 可选:生成的变体数量
n: 1, // 1-4
// 可选:质量设置
quality: "standard", // "standard" 或 "hd"
// 可选:异步通知的 Webhook
webhook_url: "https://yourapi.com/webhook",
// 可选:自定义元数据
metadata: {
project: "Marketing Campaign Q1",
user_id: "user_123"
}
});
纵横比指南
const aspectRatios = {
"16:9": {
use_cases: ["YouTube", "Website", "Presentations"],
dimensions: "1920x1080"
},
"9:16": {
use_cases: ["TikTok", "Instagram Reels", "Stories"],
dimensions: "1080x1920"
},
"1:1": {
use_cases: ["Instagram Feed", "Social Media"],
dimensions: "1080x1080"
},
"21:9": {
use_cases: ["Cinematic", "Ultra-wide"],
dimensions: "2560x1080"
}
};
等待生成完成(轮询)
视频是异步生成的,你需要轮询其完成状态:
JavaScript 实现
async function waitForVideo(videoId) {
const maxAttempts = 60; // 最长 10 分钟
const pollInterval = 10000; // 间隔 10 秒
for (let attempt = 0; attempt < maxAttempts; attempt++) {
const video = await openai.videos.retrieve(videoId);
if (video.status === "completed") {
return video;
} else if (video.status === "failed") {
throw new Error(`Video generation failed: ${video.error}`);
}
// 在下一次轮询前等待
await new Promise(resolve => setTimeout(resolve, pollInterval));
}
throw new Error("Video generation timed out");
}
// 用法
const video = await generateVideo();
const completedVideo = await waitForVideo(video.id);
console.log("Video ready:", completedVideo.url);
Python 实现
import time
def wait_for_video(video_id):
max_attempts = 60 # 最长 10 分钟
poll_interval = 10 # 每 10 秒一次
for attempt in range(max_attempts):
video = client.videos.retrieve(video_id)
if video.status == "completed":
return video
elif video.status == "failed":
raise Exception(f"Video generation failed: {video.error}")
time.sleep(poll_interval)
raise Exception("Video generation timed out")
# 用法
video = generate_video()
completed_video = wait_for_video(video.id)
print(f"Video ready: {completed_video.url}")
Webhook 集成
为更高效的通知体验,建议使用 Webhook 替代轮询:
搭建 Webhook 端点
// 以 Express.js 为例
import express from 'express';
import crypto from 'crypto';
const app = express();
app.use(express.json());
app.post('/webhooks/sora', (req, res) => {
// 校验 Webhook 签名
const signature = req.headers['x-openai-signature'];
const payload = JSON.stringify(req.body);
if (!verifySignature(payload, signature)) {
return res.status(401).send('Invalid signature');
}
// 处理 Webhook
const { id, status, url, error } = req.body;
if (status === 'completed') {
console.log(`Video ${id} completed: ${url}`);
// 下载并处理视频
downloadVideo(url, id);
} else if (status === 'failed') {
console.error(`Video ${id} failed: ${error}`);
// 错误处理
}
res.status(200).send('OK');
});
function verifySignature(payload, signature) {
const secret = process.env.WEBHOOK_SECRET;
const hmac = crypto.createHmac('sha256', secret);
const digest = hmac.update(payload).digest('hex');
return digest === signature;
}
下载视频
JavaScript 下载示例
import fs from 'fs';
import https from 'https';
async function downloadVideo(url, filename) {
return new Promise((resolve, reject) => {
const file = fs.createWriteStream(`./videos/${filename}.mp4`);
https.get(url, (response) => {
response.pipe(file);
file.on('finish', () => {
file.close();
console.log(`Downloaded: ${filename}.mp4`);
resolve();
});
}).on('error', (err) => {
fs.unlink(`./videos/${filename}.mp4`, () => {});
reject(err);
});
});
}
Python 下载示例
import requests
def download_video(url, filename):
response = requests.get(url, stream=True)
response.raise_for_status()
filepath = f"./videos/{filename}.mp4"
with open(filepath, 'wb') as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
print(f"Downloaded: {filename}.mp4")
return filepath
Sora 2 API 错误处理:重试逻辑与最佳实践
常见错误处理模式
async function robustGenerate(prompt, options = {}) {
try {
const video = await openai.videos.generate({
model: "sora-2",
prompt,
...options
});
return video;
} catch (error) {
switch (error.status) {
case 400:
console.error("Invalid request:", error.message);
// 提示可能违反了内容政策或格式不正确
break;
case 401:
console.error("Authentication failed");
// 检查 API key
break;
case 429:
console.error("Rate limit exceeded");
// 实现指数退避
return retryWithBackoff(prompt, options);
case 500:
console.error("Server error");
// 延迟后重试
break;
default:
console.error("Unknown error:", error);
}
throw error;
}
}
指数退避的重试逻辑
async function retryWithBackoff(fn, maxRetries = 3) {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn();
} catch (error) {
if (i === maxRetries - 1) throw error;
const delay = Math.pow(2, i) * 1000; // 1s, 2s, 4s
console.log(`Retry ${i + 1}/${maxRetries} after ${delay}ms`);
await new Promise(resolve => setTimeout(resolve, delay));
}
}
}
// 用法
const video = await retryWithBackoff(() =>
openai.videos.generate({
model: "sora-2",
prompt: "Your prompt here"
})
);
速率限制(Rate Limiting)
实现一个简单的限流器
class RateLimiter {
constructor(requestsPerMinute) {
this.requestsPerMinute = requestsPerMinute;
this.queue = [];
this.processing = false;
}
async add(fn) {
return new Promise((resolve, reject) => {
this.queue.push({ fn, resolve, reject });
this.process();
});
}
async process() {
if (this.processing || this.queue.length === 0) return;
this.processing = true;
const { fn, resolve, reject } = this.queue.shift();
try {
const result = await fn();
resolve(result);
} catch (error) {
reject(error);
}
// 根据限流速率等待
const delay = 60000 / this.requestsPerMinute;
await new Promise(resolve => setTimeout(resolve, delay));
this.processing = false;
this.process();
}
}
// 用法
const limiter = new RateLimiter(20); // 每分钟 20 次请求
for (const prompt of prompts) {
await limiter.add(() => openai.videos.generate({
model: "sora-2",
prompt
}));
}
生产环境最佳实践
1. 配置管理
// config.js
export const config = {
openai: {
apiKey: process.env.OPENAI_API_KEY,
model: "sora-2",
defaultDuration: 10,
defaultResolution: "1080p",
timeout: 600000 // 10 分钟
},
storage: {
provider: "s3", // 或 "gcs", "azure"
bucket: process.env.STORAGE_BUCKET,
region: process.env.STORAGE_REGION
},
webhooks: {
endpoint: process.env.WEBHOOK_URL,
secret: process.env.WEBHOOK_SECRET
},
rateLimit: {
requestsPerMinute: 20,
maxConcurrent: 5
}
};
2. 日志与监控
import winston from 'winston';
const logger = winston.createLogger({
level: 'info',
format: winston.format.json(),
transports: [
new winston.transports.File({ filename: 'error.log', level: 'error' }),
new winston.transports.File({ filename: 'combined.log' })
]
});
async function generateWithLogging(prompt, metadata = {}) {
const startTime = Date.now();
logger.info('Video generation started', {
prompt: prompt.substring(0, 100),
...metadata
});
try {
const video = await openai.videos.generate({
model: "sora-2",
prompt,
metadata
});
const duration = Date.now() - startTime;
logger.info('Video generation completed', {
videoId: video.id,
duration: `${duration}ms`,
...metadata
});
return video;
} catch (error) {
logger.error('Video generation failed', {
error: error.message,
prompt: prompt.substring(0, 100),
...metadata
});
throw error;
}
}
3. 数据库集成
// 使用 Prisma ORM
import { PrismaClient } from '@prisma/client';
const prisma = new PrismaClient();
async function createVideoRecord(userId, prompt, options) {
// 创建数据库记录
const record = await prisma.video.create({
data: {
userId,
prompt,
status: 'pending',
duration: options.duration,
resolution: options.resolution,
aspectRatio: options.aspect_ratio
}
});
try {
// 生成视频
const video = await openai.videos.generate({
model: "sora-2",
prompt,
...options,
metadata: { recordId: record.id }
});
// 用视频 ID 更新记录
await prisma.video.update({
where: { id: record.id },
data: {
videoId: video.id,
status: 'processing'
}
});
return record;
} catch (error) {
// 写入失败状态与错误信息
await prisma.video.update({
where: { id: record.id },
data: {
status: 'failed',
error: error.message
}
});
throw error;
}
}
Sora 2 API 代码示例:真实集成场景
示例 1:批量视频生成器
class BatchVideoGenerator {
constructor(openai, options = {}) {
this.openai = openai;
this.maxConcurrent = options.maxConcurrent || 5;
this.onProgress = options.onProgress || (() => {});
this.onComplete = options.onComplete || (() => {});
}
async generateBatch(prompts) {
const results = [];
const chunks = this.chunkArray(prompts, this.maxConcurrent);
for (let i = 0; i < chunks.length; i++) {
const chunk = chunks[i];
const chunkResults = await Promise.all(
chunk.map(prompt => this.generateSingle(prompt))
);
results.push(...chunkResults);
this.onProgress({
completed: results.length,
total: prompts.length,
percentage: (results.length / prompts.length) * 100
});
}
this.onComplete(results);
return results;
}
async generateSingle(prompt) {
try {
const video = await this.openai.videos.generate({
model: "sora-2",
prompt
});
const completed = await this.waitForVideo(video.id);
return { success: true, prompt, video: completed };
} catch (error) {
return { success: false, prompt, error: error.message };
}
}
chunkArray(array, size) {
const chunks = [];
for (let i = 0; i < array.length; i += size) {
chunks.push(array.slice(i, i + size));
}
return chunks;
}
async waitForVideo(videoId) {
// 复用前文的实现
}
}
// 用法
const generator = new BatchVideoGenerator(openai, {
maxConcurrent: 3,
onProgress: (progress) => {
console.log(`Progress: ${progress.percentage.toFixed(1)}%`);
}
});
const prompts = [
"A cat playing with yarn",
"Sunset over mountains",
"Busy city street at night"
];
const results = await generator.generateBatch(prompts);
示例 2:社媒内容生产流水线
class SocialMediaPipeline {
async createCampaign(content, platforms) {
const videos = {};
for (const platform of platforms) {
const config = this.getPlatformConfig(platform);
const prompt = this.optimizePromptForPlatform(content, platform);
const video = await openai.videos.generate({
model: "sora-2",
prompt,
aspect_ratio: config.aspectRatio,
duration: config.maxDuration
});
const completed = await waitForVideo(video.id);
const downloaded = await downloadVideo(completed.url, `${platform}_${Date.now()}`);
videos[platform] = {
file: downloaded,
url: completed.url,
platform,
config
};
}
return videos;
}
getPlatformConfig(platform) {
const configs = {
youtube: { aspectRatio: "16:9", maxDuration: 60 },
instagram: { aspectRatio: "1:1", maxDuration: 30 },
tiktok: { aspectRatio: "9:16", maxDuration: 60 },
twitter: { aspectRatio: "16:9", maxDuration: 30 }
};
return configs[platform];
}
optimizePromptForPlatform(content, platform) {
const styles = {
youtube: "cinematic, professional",
instagram: "aesthetic, trendy, high saturation",
tiktok: "energetic, fast-paced, engaging",
twitter: "attention-grabbing, clear message"
};
return `${content}, ${styles[platform]}, optimized for ${platform}`;
}
}
// 用法
const pipeline = new SocialMediaPipeline();
const videos = await pipeline.createCampaign(
"Product launch teaser for new smartphone",
["youtube", "instagram", "tiktok"]
);
安全性注意事项
1. API Key 保护
// ❌ 切勿这样做
const openai = new OpenAI({
apiKey: "sk-..." // 硬编码密钥
});
// ✅ 使用环境变量
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY
});
// ✅ 使用密钥管理服务
import { SecretsManager } from '@aws-sdk/client-secrets-manager';
async function getApiKey() {
const client = new SecretsManager({ region: 'us-east-1' });
const response = await client.getSecretValue({
SecretId: 'openai-api-key'
});
return JSON.parse(response.SecretString).apiKey;
}
2. 内容过滤
async function safeGenerate(prompt) {
// 预校验
if (!isValidPrompt(prompt)) {
throw new Error("Invalid prompt content");
}
// 内容审核
const moderation = await openai.moderations.create({
input: prompt
});
if (moderation.results[0].flagged) {
throw new Error("Prompt violates content policy");
}
// 生成视频
return await openai.videos.generate({
model: "sora-2",
prompt
});
}
function isValidPrompt(prompt) {
return prompt.length > 10 && prompt.length < 1000;
}
结语
通过本指南,你已系统了解 Sora 2 API 的方方面面,包括:
✅ 身份验证与环境配置 ✅ 基础与高级生成参数 ✅ 轮询与 Webhook 的异步处理 ✅ 错误处理与重试逻辑 ✅ 速率限制与性能优化 ✅ 生产环境最佳实践 ✅ 真实世界集成案例 ✅ 安全性注意事项
下一步
- 尝试 在开发环境中实验 API
- 构建 一个小型的概念验证应用
- 优化 以适配你的具体场景
- 扩展 到生产环境并完善监控
- 保持关注 API 变更与新特性
其他资源
最后更新:October 2025 作者:Sora2Everything 团队 阅读时长:18 分钟