Python 完整示例
使用 OpenAI SDK
Copy
pip install openai
文生视频完整示例
Copy
from openai import OpenAI
import time
class Veo31Client:
def __init__(self, api_key):
self.client = OpenAI(
api_key=api_key,
base_url="https://api.laozhang.ai/v1"
)
def generate_video_from_text(self, prompt, model="veo-3.1", n=1):
"""文生视频"""
try:
response = self.client.chat.completions.create(
model=model,
messages=[{
"role": "user",
"content": [
{
"type": "text",
"text": prompt
}
]
}],
stream=True,
n=n
)
print(f"开始生成视频... (模型: {model})")
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(f"收到数据: {content}")
print("视频生成完成!")
except Exception as e:
print(f"错误: {e}")
def generate_video_from_images(self, prompt, image_urls, model="veo-3.1-fl"):
"""图生视频"""
try:
content = [
{
"type": "text",
"text": prompt
}
]
{-# JSX注释: 添加图片 #-}
for url in image_urls:
content.append({
"type": "image_url",
"image_url": {
"url": url
}
})
response = self.client.chat.completions.create(
model=model,
messages=[{
"role": "user",
"content": content
}],
stream=True
)
print(f"开始生成视频... (模型: {model}, 图片数: {len(image_urls)})")
for chunk in response:
if chunk.choices[0].delta.content:
content = chunk.choices[0].delta.content
print(f"收到数据: {content}")
print("视频生成完成!")
except Exception as e:
print(f"错误: {e}")
{-# JSX注释: 使用示例 #-}
if __name__ == "__main__":
client = Veo31Client("sk-YOUR_API_KEY")
{-# JSX注释: 示例1: 文生视频 #-}
print("=== 示例1: 文生视频 ===")
client.generate_video_from_text(
prompt="生成一只可爱的小猫在草地上玩耍的视频",
model="veo-3.1",
n=2
)
{-# JSX注释: 示例2: 图生视频 #-}
print("\n=== 示例2: 图生视频 ===")
client.generate_video_from_images(
prompt="根据两张图片生成平滑的过渡视频",
image_urls=[
"https://example.com/start.jpg",
"https://example.com/end.jpg"
],
model="veo-3.1-fl"
)
使用 Base64 图片
Copy
import base64
from openai import OpenAI
def encode_image_to_base64(image_path):
"""将本地图片编码为 base64"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
client = OpenAI(
api_key="sk-YOUR_API_KEY",
base_url="https://api.laozhang.ai/v1"
)
{-# JSX注释: 读取本地图片 #-}
image1_base64 = encode_image_to_base64("./images/start.jpg")
image2_base64 = encode_image_to_base64("./images/end.jpg")
response = client.chat.completions.create(
model="veo-3.1-fl",
messages=[{
"role": "user",
"content": [
{
"type": "text",
"text": "根据这两张图片生成过渡动画"
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image1_base64}"
}
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{image2_base64}"
}
}
]
}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end='', flush=True)
Node.js 完整示例
安装依赖
Copy
npm install openai
TypeScript 完整实现
Copy
import OpenAI from 'openai';
import * as fs from 'fs';
class Veo31Client {
private client: OpenAI;
constructor(apiKey: string) {
this.client = new OpenAI({
apiKey: apiKey,
baseURL: 'https://api.laozhang.ai/v1'
});
}
async generateVideoFromText(
prompt: string,
model: string = 'veo-3.1',
n: number = 1
): Promise<void> {
try {
const stream = await this.client.chat.completions.create({
model: model,
messages: [{
role: 'user',
content: [
{
type: 'text',
text: prompt
}
]
}],
stream: true,
n: n
});
console.log(`开始生成视频... (模型: ${model})`);
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
console.log(`收到数据: ${content}`);
}
}
console.log('视频生成完成!');
} catch (error) {
console.error('错误:', error);
}
}
async generateVideoFromImages(
prompt: string,
imageUrls: string[],
model: string = 'veo-3.1-fl'
): Promise<void> {
try {
const content: any[] = [
{
type: 'text',
text: prompt
}
];
// 添加图片
for (const url of imageUrls) {
content.push({
type: 'image_url',
image_url: {
url: url
}
});
}
const stream = await this.client.chat.completions.create({
model: model,
messages: [{
role: 'user',
content: content
}],
stream: true
});
console.log(`开始生成视频... (模型: ${model}, 图片数: ${imageUrls.length})`);
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
console.log(`收到数据: ${content}`);
}
}
console.log('视频生成完成!');
} catch (error) {
console.error('错误:', error);
}
}
encodeImageToBase64(imagePath: string): string {
const imageBuffer = fs.readFileSync(imagePath);
return imageBuffer.toString('base64');
}
}
// 使用示例
async function main() {
const client = new Veo31Client('sk-YOUR_API_KEY');
// 示例1: 文生视频
console.log('=== 示例1: 文生视频 ===');
await client.generateVideoFromText(
'生成一个日落时分海边的浪漫场景',
'veo-3.1',
2
);
// 示例2: 图生视频
console.log('\n=== 示例2: 图生视频 ===');
await client.generateVideoFromImages(
'根据这两张图片生成平滑的过渡动画',
[
'https://example.com/image1.jpg',
'https://example.com/image2.jpg'
],
'veo-3.1-fl'
);
}
main().catch(console.error);
JavaScript 简化版本
Copy
const OpenAI = require('openai');
const client = new OpenAI({
apiKey: 'sk-YOUR_API_KEY',
baseURL: 'https://api.laozhang.ai/v1'
});
async function generateVideo() {
const stream = await client.chat.completions.create({
model: 'veo-3.1-fast',
messages: [{
role: 'user',
content: [
{
type: 'text',
text: '生成一只猫咪在雨中散步的视频'
}
]
}],
stream: true,
n: 1
});
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content;
if (content) {
process.stdout.write(content);
}
}
}
generateVideo().catch(console.error);
cURL 示例
文生视频
Copy
curl --location --request POST 'https://api.laozhang.ai/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-YOUR_API_KEY' \
--data-raw '{
"messages": [{
"role": "user",
"content": [
{
"type": "text",
"text": "生成两只猫和一只狗打架的视频"
}
]
}],
"model": "veo-3.1",
"stream": true,
"n": 2
}'
图生视频(URL)
Copy
curl --location --request POST 'https://api.laozhang.ai/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-YOUR_API_KEY' \
--data-raw '{
"messages": [{
"role": "user",
"content": [
{
"type": "text",
"text": "根据两张图片生成一个完整的过渡视频"
},
{
"type": "image_url",
"image_url": {
"url": "https://example.com/start-frame.jpg"
}
},
{
"type": "image_url",
"image_url": {
"url": "https://example.com/end-frame.jpg"
}
}
]
}],
"model": "veo-3.1-fl",
"stream": true,
"n": 1
}'
图生视频(Base64)
Copy
curl --location --request POST 'https://api.laozhang.ai/v1/chat/completions' \
--header 'Content-Type: application/json' \
--header 'Authorization: Bearer sk-YOUR_API_KEY' \
--data-raw '{
"messages": [{
"role": "user",
"content": [
{
"type": "text",
"text": "根据图片生成动画"
},
{
"type": "image_url",
"image_url": {
"url": "..."
}
}
]
}],
"model": "veo-3.1-landscape",
"stream": true
}'
Go 示例
Copy
package main
import (
"context"
"fmt"
"io"
"os"
"github.com/sashabaranov/go-openai"
)
func main() {
config := openai.DefaultConfig("sk-YOUR_API_KEY")
config.BaseURL = "https://api.laozhang.ai/v1"
client := openai.NewClientWithConfig(config)
req := openai.ChatCompletionRequest{
Model: "veo-3.1",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
MultiContent: []openai.ChatMessagePart{
{
Type: openai.ChatMessagePartTypeText,
Text: "生成一只可爱的小猫在玩毛线球的视频",
},
},
},
},
Stream: true,
N: 1,
}
stream, err := client.CreateChatCompletionStream(context.Background(), req)
if err != nil {
fmt.Printf("Stream error: %v\n", err)
return
}
defer stream.Close()
fmt.Println("开始生成视频...")
for {
response, err := stream.Recv()
if err == io.EOF {
fmt.Println("\n视频生成完成!")
break
}
if err != nil {
fmt.Printf("Stream error: %v\n", err)
return
}
if len(response.Choices) > 0 {
content := response.Choices[0].Delta.Content
if content != "" {
fmt.Print(content)
}
}
}
}
图生视频 Go 示例
Copy
package main
import (
"context"
"encoding/base64"
"fmt"
"io"
"os"
"github.com/sashabaranov/go-openai"
)
func encodeImageToBase64(imagePath string) (string, error) {
imageData, err := os.ReadFile(imagePath)
if err != nil {
return "", err
}
return base64.StdEncoding.EncodeToString(imageData), nil
}
func main() {
config := openai.DefaultConfig("sk-YOUR_API_KEY")
config.BaseURL = "https://api.laozhang.ai/v1"
client := openai.NewClientWithConfig(config)
// 编码图片
image1Base64, _ := encodeImageToBase64("./start.jpg")
image2Base64, _ := encodeImageToBase64("./end.jpg")
req := openai.ChatCompletionRequest{
Model: "veo-3.1-fl",
Messages: []openai.ChatCompletionMessage{
{
Role: openai.ChatMessageRoleUser,
MultiContent: []openai.ChatMessagePart{
{
Type: openai.ChatMessagePartTypeText,
Text: "根据这两张图片生成过渡视频",
},
{
Type: openai.ChatMessagePartTypeImageURL,
ImageURL: &openai.ChatMessageImageURL{
URL: fmt.Sprintf("data:image/jpeg;base64,%s", image1Base64),
},
},
{
Type: openai.ChatMessagePartTypeImageURL,
ImageURL: &openai.ChatMessageImageURL{
URL: fmt.Sprintf("data:image/jpeg;base64,%s", image2Base64),
},
},
},
},
},
Stream: true,
}
stream, err := client.CreateChatCompletionStream(context.Background(), req)
if err != nil {
fmt.Printf("错误: %v\n", err)
return
}
defer stream.Close()
fmt.Println("开始生成视频...")
for {
response, err := stream.Recv()
if err == io.EOF {
fmt.Println("\n完成!")
break
}
if err != nil {
fmt.Printf("错误: %v\n", err)
return
}
if len(response.Choices) > 0 {
fmt.Print(response.Choices[0].Delta.Content)
}
}
}
Java 示例
使用 OkHttp
Copy
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import okhttp3.*;
import okio.BufferedSource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class Veo31Client {
private final OkHttpClient client;
private final String apiKey;
private final Gson gson;
private static final String BASE_URL = "https://api.laozhang.ai/v1";
public Veo31Client(String apiKey) {
this.apiKey = apiKey;
this.client = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(60, TimeUnit.SECONDS)
.build();
this.gson = new Gson();
}
public void generateVideoFromText(String prompt, String model, int n) throws IOException {
Map<String, Object> content = new HashMap<>();
content.put("type", "text");
content.put("text", prompt);
List<Map<String, Object>> contents = new ArrayList<>();
contents.add(content);
Map<String, Object> message = new HashMap<>();
message.put("role", "user");
message.put("content", contents);
List<Map<String, Object>> messages = new ArrayList<>();
messages.add(message);
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("model", model);
requestBody.put("messages", messages);
requestBody.put("stream", true);
requestBody.put("n", n);
String json = gson.toJson(requestBody);
Request request = new Request.Builder()
.url(BASE_URL + "/chat/completions")
.post(RequestBody.create(json, MediaType.parse("application/json")))
.addHeader("Authorization", "Bearer " + apiKey)
.addHeader("Content-Type", "application/json")
.build();
try (Response response = client.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("请求失败: " + response);
}
System.out.println("开始生成视频...");
BufferedSource source = response.body().source();
while (!source.exhausted()) {
String line = source.readUtf8Line();
if (line != null && line.startsWith("data: ")) {
String data = line.substring(6);
if (!data.equals("[DONE]")) {
System.out.print(data);
}
}
}
System.out.println("\n视频生成完成!");
}
}
public static void main(String[] args) throws IOException {
Veo31Client client = new Veo31Client("sk-YOUR_API_KEY");
// 生成视频
client.generateVideoFromText(
"生成一只可爱的熊猫在竹林里玩耍的视频",
"veo-3.1",
1
);
}
}
响应格式
流式响应示例
Copy
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"veo-3.1","choices":[{"index":0,"delta":{"content":"视频生成中..."},"finish_reason":null}]}
data: {"id":"chatcmpl-xxx","object":"chat.completion.chunk","created":1234567890,"model":"veo-3.1","choices":[{"index":0,"delta":{"content":"视频URL: https://..."},"finish_reason":"stop"}]}
data: [DONE]
高级用法
批量生成多个结果
Copy
from openai import OpenAI
client = OpenAI(
api_key="sk-YOUR_API_KEY",
base_url="https://api.laozhang.ai/v1"
)
response = client.chat.completions.create(
model="veo-3.1-fast",
messages=[{
"role": "user",
"content": [{"type": "text", "text": "生成一个日落场景"}]
}],
stream=True,
n=4 # 同时生成4个不同的视频
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content)
使用不同模型对比
Copy
models = [
"veo-3.1",
"veo-3.1-fast",
"veo-3.1-fl",
"veo-3.1-landscape"
]
prompt = "生成一个美丽的山景视频"
for model in models:
print(f"\n=== 测试模型: {model} ===")
response = client.chat.completions.create(
model=model,
messages=[{
"role": "user",
"content": [{"type": "text", "text": prompt}]
}],
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end='')