Python SDK
使用Python SDK快速集成大模型API,支持同步异步调用、流式响应、错误重试等高级功能
Python 3.7+最新版本: 1.0.0MIT License
安装
使用pip安装
终端
pip install openai使用conda安装
终端
conda install -c conda-forge openai提示:建议使用虚拟环境来管理项目依赖,避免版本冲突。
快速开始
初始化客户端
from openai import OpenAI
# 创建客户端实例
client = OpenAI(
api_key="your-api-key", # 你的API密钥
base_url="https://api.n1n.ai/v1" # API端点
)
# 或者从环境变量读取
# export OPENAI_API_KEY="your-api-key"
# export OPENAI_BASE_URL="https://api.n1n.ai/v1"
# client = OpenAI()基础对话示例
# 简单对话
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "user", "content": "你好,请介绍一下Python"}
]
)
print(response.choices[0].message.content)高级功能
流式响应
实时获取生成内容,提升用户体验
# 流式响应
stream = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "写一个长故事"}],
stream=True # 启用流式响应
)
# 逐块处理响应
for chunk in stream:
if chunk.choices[0].delta.content is not None:
print(chunk.choices[0].delta.content, end="")异步调用
使用异步编程提高并发性能
import asyncio
from openai import AsyncOpenAI
# 异步客户端
async_client = AsyncOpenAI(
api_key="your-api-key",
base_url="https://api.n1n.ai/v1"
)
async def async_chat():
response = await async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "异步请求示例"}]
)
return response.choices[0].message.content
# 运行异步函数
result = asyncio.run(async_chat())
print(result)函数调用
让模型调用自定义函数
import json
# 定义函数
def get_weather(location, unit="celsius"):
"""获取指定地点的天气"""
# 实际应用中这里会调用真实的天气API
return json.dumps({
"location": location,
"temperature": 22,
"unit": unit,
"description": "晴天"
})
# 函数定义
functions = [
{
"name": "get_weather",
"description": "获取指定地点的天气",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "城市名称,如:北京"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
]
# 调用模型
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "北京今天天气怎么样?"}],
functions=functions,
function_call="auto"
)
# 处理函数调用
message = response.choices[0].message
if message.function_call:
function_name = message.function_call.name
function_args = json.loads(message.function_call.arguments)
# 执行函数
if function_name == "get_weather":
result = get_weather(**function_args)
print(f"函数执行结果:{result}")对话上下文管理
维护多轮对话历史
class ChatSession:
def __init__(self, client, model="gpt-3.5-turbo"):
self.client = client
self.model = model
self.messages = []
def add_message(self, role, content):
self.messages.append({"role": role, "content": content})
def chat(self, user_input):
# 添加用户消息
self.add_message("user", user_input)
# 调用API
response = self.client.chat.completions.create(
model=self.model,
messages=self.messages
)
# 添加助手响应到历史
assistant_message = response.choices[0].message.content
self.add_message("assistant", assistant_message)
return assistant_message
def clear_history(self):
self.messages = []
# 使用会话管理器
session = ChatSession(client)
session.add_message("system", "你是一个有用的助手")
# 多轮对话
print(session.chat("什么是机器学习?"))
print(session.chat("能给个例子吗?"))
print(session.chat("如何开始学习?"))错误处理
异常处理示例
from openai import OpenAI, RateLimitError, APIError
import time
def chat_with_retry(client, messages, max_retries=3):
"""带重试机制的聊天函数"""
for attempt in range(max_retries):
try:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)
return response
except RateLimitError as e:
# 速率限制错误,等待后重试
if attempt < max_retries - 1:
wait_time = 2 ** attempt # 指数退避
print(f"速率限制,等待{wait_time}秒后重试...")
time.sleep(wait_time)
else:
print("达到最大重试次数")
raise
except APIError as e:
# API错误
print(f"API错误: {e}")
if e.status_code >= 500:
# 服务器错误,可以重试
if attempt < max_retries - 1:
time.sleep(1)
continue
raise
except Exception as e:
# 其他错误
print(f"未预期的错误: {e}")
raise
# 使用带重试的函数
try:
response = chat_with_retry(
client,
[{"role": "user", "content": "Hello!"}]
)
print(response.choices[0].message.content)
except Exception as e:
print(f"请求失败: {e}")常见错误类型
- RateLimitError- 请求频率超过限制
- APIError- API服务端错误
- AuthenticationError- 认证失败
- InvalidRequestError- 请求参数错误
最佳实践
API密钥管理
- • 使用环境变量存储API密钥
- • 不要将密钥硬编码在代码中
- • 使用密钥管理服务(如AWS Secrets Manager)
性能优化
- • 复用客户端实例
- • 使用连接池
- • 实施请求缓存策略
成本控制
- • 设置max_tokens限制
- • 监控token使用量
- • 使用合适的模型
错误处理
- • 实现重试机制
- • 记录错误日志
- • 优雅降级策略
实用工具函数
Token计算
import tiktoken
def count_tokens(messages, model="gpt-3.5-turbo"):
"""计算消息的token数量"""
encoding = tiktoken.encoding_for_model(model)
tokens_per_message = 3
tokens_per_name = 1
num_tokens = 0
for message in messages:
num_tokens += tokens_per_message
for key, value in message.items():
num_tokens += len(encoding.encode(value))
if key == "name":
num_tokens += tokens_per_name
num_tokens += 3 # 回复的起始token
return num_tokens
# 使用示例
messages = [
{"role": "user", "content": "Hello, how are you?"}
]
token_count = count_tokens(messages)
print(f"Token数量: {token_count}")响应日志记录
import logging
import json
from datetime import datetime
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
filename='api_calls.log'
)
def log_api_call(messages, response, model="gpt-3.5-turbo"):
"""记录API调用日志"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"model": model,
"messages": messages,
"response": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
}
}
logging.info(json.dumps(log_entry, ensure_ascii=False))
return log_entry
# 使用示例
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": "测试日志"}]
)
log_api_call([{"role": "user", "content": "测试日志"}], response)