API 并发调用最佳实践
掌握高并发 API 调用技术,提升处理效率,优化成本,确保系统稳定性
并发控制
限制同时请求数
限流处理
遵守速率限制
错误重试
智能失败恢复
性能监控
实时指标追踪
一、Python 异步并发
asyncio 实现
import asyncio
import aiohttp
from typing import List
import time
class ConcurrentAPIClient:
def __init__(self, api_key: str, max_concurrent: int = 10):
self.api_key = api_key
self.base_url = "https://api.n1n.ai/v1"
self.semaphore = asyncio.Semaphore(max_concurrent)
async def make_request(self, prompt: str):
async with self.semaphore: # 限制并发数
async with aiohttp.ClientSession() as session:
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
payload = {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 100
}
async with session.post(
f"{self.base_url}/chat/completions",
headers=headers,
json=payload
) as response:
if response.status == 200:
result = await response.json()
return result['choices'][0]['message']['content']
else:
raise Exception(f"API error: {response.status}")
async def batch_process(self, prompts: List[str]):
tasks = [self.make_request(p) for p in prompts]
return await asyncio.gather(*tasks, return_exceptions=True)
# 使用示例
async def main():
client = ConcurrentAPIClient("your-api-key", max_concurrent=10)
prompts = [f"翻译:Hello {i}" for i in range(50)]
start = time.time()
results = await client.batch_process(prompts)
print(f"处理 {len(prompts)} 个请求耗时: {time.time() - start:.2f}秒")
asyncio.run(main())核心特性
- • 信号量控制并发
- • 异步非阻塞
- • 批量处理
- • 错误隔离
性能提升
- • 10x 速度提升
- • CPU 利用率高
- • 内存占用低
- • 响应时间短
适用场景
- • 批量翻译
- • 数据处理
- • 内容生成
- • API 测试
二、Node.js 并发控制
Promise 并发
const axios = require('axios');
const pLimit = require('p-limit');
class ConcurrentAPIClient {
constructor(apiKey, maxConcurrent = 10) {
this.apiKey = apiKey;
this.baseURL = 'https://api.n1n.ai/v1';
this.limit = pLimit(maxConcurrent);
}
async makeRequest(prompt) {
return this.limit(async () => {
const response = await axios.post(
`${this.baseURL}/chat/completions`,
{
model: 'gpt-4o-mini',
messages: [{ role: 'user', content: prompt }],
max_tokens: 100
},
{
headers: {
'Authorization': `Bearer ${this.apiKey}`,
'Content-Type': 'application/json'
}
}
);
return response.data.choices[0].message.content;
});
}
async batchProcess(prompts) {
const start = Date.now();
const promises = prompts.map(p => this.makeRequest(p));
const results = await Promise.all(promises);
console.log(`处理 ${prompts.length} 个请求耗时: ${(Date.now() - start) / 1000}秒`);
return results;
}
}
// 使用示例
async function main() {
const client = new ConcurrentAPIClient('your-api-key', 10);
const prompts = Array.from({ length: 50 }, (_, i) => `翻译: Hello ${i}`);
await client.batchProcess(prompts);
}💡 最佳实践
- • 使用 p-limit 控制并发数
- • Promise.all 批量处理
- • 合理设置超时时间
- • 实现错误重试机制
三、智能限流
限流器实现
class RateLimiter:
"""智能限流器"""
def __init__(self):
self.limits = {
"gpt-4o": {"rpm": 500, "tpm": 30000},
"gpt-4o-mini": {"rpm": 5000, "tpm": 200000}
}
self.request_times = []
self.token_counts = []
async def wait_if_needed(self, model: str, tokens: int):
"""必要时等待以遵守限流"""
current_time = time.time()
# 清理60秒前的记录
self.request_times = [t for t in self.request_times if t > current_time - 60]
self.token_counts = [(t, c) for t, c in self.token_counts if t > current_time - 60]
# 检查 RPM
if len(self.request_times) >= self.limits[model]["rpm"]:
wait_time = 60 - (current_time - self.request_times[0])
if wait_time > 0:
await asyncio.sleep(wait_time)
# 检查 TPM
total_tokens = sum(c for _, c in self.token_counts) + tokens
if total_tokens > self.limits[model]["tpm"]:
await asyncio.sleep(5) # 等待token窗口滑动
# 记录请求
self.request_times.append(current_time)
self.token_counts.append((current_time, tokens))API 限制
| 模型 | RPM | TPM |
|---|---|---|
| GPT-4o | 500 | 30K |
| GPT-4o mini | 5000 | 200K |
限流策略
- • 滑动窗口算法
- • Token 预估
- • 自适应退避
- • 优先级队列
四、错误处理与重试
指数退避重试
import backoff
class RobustAPIClient:
@backoff.on_exception(
backoff.expo,
(aiohttp.ClientError, asyncio.TimeoutError),
max_tries=3,
max_time=30
)
async def make_request_with_retry(self, prompt: str):
"""带指数退避重试的请求"""
try:
response = await self._make_request(prompt)
return {"success": True, "data": response}
except aiohttp.ClientResponseError as e:
if e.status == 429: # Rate limit
retry_after = int(e.headers.get('Retry-After', 5))
await asyncio.sleep(retry_after)
raise # 触发重试
elif e.status >= 500: # 服务器错误
raise # 触发重试
else:
return {"success": False, "error": str(e)}五、性能监控
监控指标
class PerformanceMonitor:
def __init__(self):
self.metrics = {
"total_requests": 0,
"successful": 0,
"failed": 0,
"total_tokens": 0,
"response_times": []
}
def record_success(self, response_time: float, tokens: int):
self.metrics["total_requests"] += 1
self.metrics["successful"] += 1
self.metrics["total_tokens"] += tokens
self.metrics["response_times"].append(response_time)
def get_stats(self):
avg_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
return {
"成功率": f"{self.metrics['successful'] / self.metrics['total_requests'] * 100:.1f}%",
"平均响应时间": f"{avg_time:.2f}秒",
"吞吐量": f"{self.metrics['total_requests'] / sum(self.metrics['response_times']):.1f} req/s"
}📊 关键指标
- • 成功率:监控 API 调用成功率
- • 响应时间:P50、P95、P99 分位数
- • 吞吐量:每秒处理请求数
- • 错误分布:错误类型统计
六、最佳实践总结
🚀 性能优化
- ✅ 合理设置并发数(10-50)
- ✅ 使用连接池复用
- ✅ 批量处理相似请求
- ✅ 实施请求去重
- ✅ 优先级队列管理
🛡️ 稳定性保障
- ✅ 指数退避重试
- ✅ 合理超时设置
- ✅ 错误隔离机制
- ✅ 降级策略
- ✅ 监控告警
并发数建议
- • 开发测试:5-10 并发
- • 生产环境:20-50 并发
- • 高吞吐量:100+ 并发(需多 Key)