API 并发调用最佳实践

掌握高并发 API 调用技术,提升处理效率,优化成本,确保系统稳定性

并发控制

限制同时请求数

限流处理

遵守速率限制

错误重试

智能失败恢复

性能监控

实时指标追踪

一、Python 异步并发

asyncio 实现

import asyncio
import aiohttp
from typing import List
import time

class ConcurrentAPIClient:
    def __init__(self, api_key: str, max_concurrent: int = 10):
        self.api_key = api_key
        self.base_url = "https://api.n1n.ai/v1"
        self.semaphore = asyncio.Semaphore(max_concurrent)
        
    async def make_request(self, prompt: str):
        async with self.semaphore:  # 限制并发数
            async with aiohttp.ClientSession() as session:
                headers = {
                    "Authorization": f"Bearer {self.api_key}",
                    "Content-Type": "application/json"
                }
                
                payload = {
                    "model": "gpt-4o-mini",
                    "messages": [{"role": "user", "content": prompt}],
                    "max_tokens": 100
                }
                
                async with session.post(
                    f"{self.base_url}/chat/completions",
                    headers=headers,
                    json=payload
                ) as response:
                    if response.status == 200:
                        result = await response.json()
                        return result['choices'][0]['message']['content']
                    else:
                        raise Exception(f"API error: {response.status}")
    
    async def batch_process(self, prompts: List[str]):
        tasks = [self.make_request(p) for p in prompts]
        return await asyncio.gather(*tasks, return_exceptions=True)

# 使用示例
async def main():
    client = ConcurrentAPIClient("your-api-key", max_concurrent=10)
    prompts = [f"翻译:Hello {i}" for i in range(50)]
    
    start = time.time()
    results = await client.batch_process(prompts)
    print(f"处理 {len(prompts)} 个请求耗时: {time.time() - start:.2f}秒")

asyncio.run(main())

核心特性

  • • 信号量控制并发
  • • 异步非阻塞
  • • 批量处理
  • • 错误隔离

性能提升

  • • 10x 速度提升
  • • CPU 利用率高
  • • 内存占用低
  • • 响应时间短

适用场景

  • • 批量翻译
  • • 数据处理
  • • 内容生成
  • • API 测试

二、Node.js 并发控制

Promise 并发

const axios = require('axios');
const pLimit = require('p-limit');

class ConcurrentAPIClient {
    constructor(apiKey, maxConcurrent = 10) {
        this.apiKey = apiKey;
        this.baseURL = 'https://api.n1n.ai/v1';
        this.limit = pLimit(maxConcurrent);
    }

    async makeRequest(prompt) {
        return this.limit(async () => {
            const response = await axios.post(
                `${this.baseURL}/chat/completions`,
                {
                    model: 'gpt-4o-mini',
                    messages: [{ role: 'user', content: prompt }],
                    max_tokens: 100
                },
                {
                    headers: {
                        'Authorization': `Bearer ${this.apiKey}`,
                        'Content-Type': 'application/json'
                    }
                }
            );
            return response.data.choices[0].message.content;
        });
    }

    async batchProcess(prompts) {
        const start = Date.now();
        const promises = prompts.map(p => this.makeRequest(p));
        const results = await Promise.all(promises);
        
        console.log(`处理 ${prompts.length} 个请求耗时: ${(Date.now() - start) / 1000}秒`);
        return results;
    }
}

// 使用示例
async function main() {
    const client = new ConcurrentAPIClient('your-api-key', 10);
    const prompts = Array.from({ length: 50 }, (_, i) => `翻译: Hello ${i}`);
    await client.batchProcess(prompts);
}

💡 最佳实践

  • • 使用 p-limit 控制并发数
  • • Promise.all 批量处理
  • • 合理设置超时时间
  • • 实现错误重试机制

三、智能限流

限流器实现

class RateLimiter:
    """智能限流器"""
    def __init__(self):
        self.limits = {
            "gpt-4o": {"rpm": 500, "tpm": 30000},
            "gpt-4o-mini": {"rpm": 5000, "tpm": 200000}
        }
        self.request_times = []
        self.token_counts = []
    
    async def wait_if_needed(self, model: str, tokens: int):
        """必要时等待以遵守限流"""
        current_time = time.time()
        
        # 清理60秒前的记录
        self.request_times = [t for t in self.request_times if t > current_time - 60]
        self.token_counts = [(t, c) for t, c in self.token_counts if t > current_time - 60]
        
        # 检查 RPM
        if len(self.request_times) >= self.limits[model]["rpm"]:
            wait_time = 60 - (current_time - self.request_times[0])
            if wait_time > 0:
                await asyncio.sleep(wait_time)
        
        # 检查 TPM
        total_tokens = sum(c for _, c in self.token_counts) + tokens
        if total_tokens > self.limits[model]["tpm"]:
            await asyncio.sleep(5)  # 等待token窗口滑动
        
        # 记录请求
        self.request_times.append(current_time)
        self.token_counts.append((current_time, tokens))

API 限制

模型RPMTPM
GPT-4o50030K
GPT-4o mini5000200K

限流策略

  • • 滑动窗口算法
  • • Token 预估
  • • 自适应退避
  • • 优先级队列

四、错误处理与重试

指数退避重试

import backoff

class RobustAPIClient:
    @backoff.on_exception(
        backoff.expo,
        (aiohttp.ClientError, asyncio.TimeoutError),
        max_tries=3,
        max_time=30
    )
    async def make_request_with_retry(self, prompt: str):
        """带指数退避重试的请求"""
        try:
            response = await self._make_request(prompt)
            return {"success": True, "data": response}
        except aiohttp.ClientResponseError as e:
            if e.status == 429:  # Rate limit
                retry_after = int(e.headers.get('Retry-After', 5))
                await asyncio.sleep(retry_after)
                raise  # 触发重试
            elif e.status >= 500:  # 服务器错误
                raise  # 触发重试
            else:
                return {"success": False, "error": str(e)}

五、性能监控

监控指标

class PerformanceMonitor:
    def __init__(self):
        self.metrics = {
            "total_requests": 0,
            "successful": 0,
            "failed": 0,
            "total_tokens": 0,
            "response_times": []
        }
    
    def record_success(self, response_time: float, tokens: int):
        self.metrics["total_requests"] += 1
        self.metrics["successful"] += 1
        self.metrics["total_tokens"] += tokens
        self.metrics["response_times"].append(response_time)
    
    def get_stats(self):
        avg_time = sum(self.metrics["response_times"]) / len(self.metrics["response_times"])
        return {
            "成功率": f"{self.metrics['successful'] / self.metrics['total_requests'] * 100:.1f}%",
            "平均响应时间": f"{avg_time:.2f}秒",
            "吞吐量": f"{self.metrics['total_requests'] / sum(self.metrics['response_times']):.1f} req/s"
        }

📊 关键指标

  • 成功率:监控 API 调用成功率
  • 响应时间:P50、P95、P99 分位数
  • 吞吐量:每秒处理请求数
  • 错误分布:错误类型统计

六、最佳实践总结

🚀 性能优化

  • ✅ 合理设置并发数(10-50)
  • ✅ 使用连接池复用
  • ✅ 批量处理相似请求
  • ✅ 实施请求去重
  • ✅ 优先级队列管理

🛡️ 稳定性保障

  • ✅ 指数退避重试
  • ✅ 合理超时设置
  • ✅ 错误隔离机制
  • ✅ 降级策略
  • ✅ 监控告警

并发数建议

  • 开发测试:5-10 并发
  • 生产环境:20-50 并发
  • 高吞吐量:100+ 并发(需多 Key)