性能优化最佳实践
系统化的性能优化策略,让您的AI应用快速响应、高效运行
高级优化更新时间:2024年12月重要
关键性能指标
< 1s
响应时间
首字节时间目标
99.9%
可用性
服务正常运行时间
1000
并发数
同时处理请求数
80%
缓存命中
缓存命中率目标
💡 提示:定期监控这些指标,及时发现和解决性能瓶颈。
核心优化策略
1. 请求优化
批处理请求
# 低效:多次单独请求
results = []
for item in items:
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": item}]
)
results.append(response)
# 高效:批处理
import asyncio
from openai import AsyncOpenAI
async_client = AsyncOpenAI()
async def batch_process(items):
tasks = [
async_client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": item}]
)
for item in items
]
return await asyncio.gather(*tasks)
results = asyncio.run(batch_process(items))请求去重
import hashlib
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_api_call(prompt_hash):
"""缓存相同请求的结果"""
return actual_api_call(prompt_hash)
def get_response(prompt):
# 生成prompt的哈希值
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()
# 检查缓存
if prompt_hash in cache:
return cache[prompt_hash]
# 新请求
response = client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
cache[prompt_hash] = response
return response2. 流式响应优化
❌ 传统方式
- • 等待完整响应
- • 用户体验延迟
- • 感知速度慢
✅ 流式优化
- • 实时展示内容
- • 立即响应用户
- • 提升感知速度
// 前端流式处理
async function streamChat(message) {
const response = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ message, stream: true })
});
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
// 实时更新UI
appendToChat(chunk);
}
}3. 智能缓存策略
多级缓存架构
L1内存缓存(Redis)- 毫秒级响应
L2分布式缓存(Memcached)- 秒级响应
L3持久化存储(数据库)- 长期存储
import redis
import json
from datetime import timedelta
class SmartCache:
def __init__(self):
self.redis_client = redis.Redis(
host='localhost',
port=6379,
decode_responses=True
)
def get_or_fetch(self, key, fetch_func, ttl=3600):
"""智能缓存:优先从缓存获取,否则执行函数并缓存结果"""
# 尝试从缓存获取
cached = self.redis_client.get(key)
if cached:
return json.loads(cached)
# 执行实际请求
result = fetch_func()
# 缓存结果
self.redis_client.setex(
key,
timedelta(seconds=ttl),
json.dumps(result)
)
return result
# 使用示例
cache = SmartCache()
def get_ai_response(prompt):
cache_key = f"ai:response:{hashlib.md5(prompt.encode()).hexdigest()}"
def fetch():
return client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
).choices[0].message.content
return cache.get_or_fetch(cache_key, fetch, ttl=7200)4. 连接池优化
import httpx
from typing import Optional
class APIConnectionPool:
_instance: Optional['APIConnectionPool'] = None
_client: Optional[httpx.AsyncClient] = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
async def get_client(self) -> httpx.AsyncClient:
if self._client is None:
self._client = httpx.AsyncClient(
limits=httpx.Limits(
max_keepalive_connections=20,
max_connections=100,
keepalive_expiry=30
),
timeout=httpx.Timeout(30.0, connect=5.0),
http2=True # 启用HTTP/2
)
return self._client
async def close(self):
if self._client:
await self._client.aclose()
self._client = None
# 使用连接池
pool = APIConnectionPool()
client = await pool.get_client()连接复用
减少连接建立开销
HTTP/2支持
多路复用提升效率
自动重试
提高请求成功率
性能监控与调试
性能监控代码
import time
import logging
from contextlib import contextmanager
from typing import Dict, Any
import statistics
class PerformanceMonitor:
def __init__(self):
self.metrics = {
'response_times': [],
'token_usage': [],
'error_count': 0,
'success_count': 0
}
@contextmanager
def measure_time(self, operation: str):
"""测量操作执行时间"""
start = time.time()
try:
yield
self.metrics['success_count'] += 1
except Exception as e:
self.metrics['error_count'] += 1
logging.error(f"Error in {operation}: {e}")
raise
finally:
duration = time.time() - start
self.metrics['response_times'].append(duration)
logging.info(f"{operation} took {duration:.2f}s")
def add_token_usage(self, tokens: int):
"""记录Token使用量"""
self.metrics['token_usage'].append(tokens)
def get_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
if self.metrics['response_times']:
return {
'avg_response_time': statistics.mean(self.metrics['response_times']),
'p95_response_time': statistics.quantiles(
self.metrics['response_times'], n=20
)[18] if len(self.metrics['response_times']) > 20 else None,
'total_tokens': sum(self.metrics['token_usage']),
'error_rate': self.metrics['error_count'] /
(self.metrics['success_count'] + self.metrics['error_count'])
if (self.metrics['success_count'] + self.metrics['error_count']) > 0 else 0,
'total_requests': self.metrics['success_count'] + self.metrics['error_count']
}
return {}
# 使用示例
monitor = PerformanceMonitor()
async def monitored_api_call(prompt):
with monitor.measure_time("API Call"):
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=[{"role": "user", "content": prompt}]
)
monitor.add_token_usage(response.usage.total_tokens)
return response
# 定期输出统计
print(monitor.get_stats())性能优化清单
✅ 请求优化
- 使用批处理减少请求次数
- 实现请求去重机制
- 优化prompt长度
- 设置合理的max_tokens
✅ 系统优化
- 使用连接池管理
- 实施多级缓存策略
- 启用HTTP/2协议
- 配置负载均衡
✅ 监控指标
- 追踪响应时间分布
- 监控Token使用量
- 记录错误率和重试
- 分析缓存命中率
✅ 用户体验
- 使用流式响应
- 实现进度指示器
- 优雅的错误处理
- 提供取消操作
优化效果对比
| 优化项 | 优化前 | 优化后 | 提升 |
|---|---|---|---|
| 平均响应时间 | 3.2s | 0.8s | 75%↑ |
| 并发处理能力 | 100/s | 1000/s | 10x |
| 缓存命中率 | 20% | 80% | 60%↑ |
| API成本 | $1000/月 | $400/月 | 60%↓ |
| 错误率 | 5% | 0.1% | 98%↓ |
🚀 立即开始优化
性能优化是一个持续的过程。从最影响用户体验的部分开始,逐步实施各项优化策略。
第一步
实施流式响应和基础缓存
第二步
优化请求批处理和连接池
第三步
建立监控体系,持续优化