Rate Limit 限流处理完整方案
深入理解 API 限流机制,掌握应对策略,确保服务稳定可靠运行
限流保护
防止服务过载
智能重试
自动等待恢复
速率控制
请求频率管理
负载均衡
多Key分流
一、API 限流规则
限流类型
RPM (Requests Per Minute)
每分钟请求数限制,防止请求过于频繁
TPM (Tokens Per Minute)
每分钟Token数限制,控制处理数据量
模型限制对比
| 模型 | RPM | TPM | 等级 |
|---|---|---|---|
| GPT-4o | 500 | 30,000 | Tier 1 |
| GPT-4o | 5,000 | 450,000 | Tier 4 |
| GPT-4o mini | 5,000 | 200,000 | Tier 1 |
| GPT-4o mini | 30,000 | 150,000,000 | Tier 5 |
| Claude 3.5 Sonnet | 1,000 | 100,000 | Standard |
| Claude 3.5 Haiku | 5,000 | 500,000 | Standard |
二、基础限流处理
错误处理与重试
import time
import requests
from typing import Optional, Dict
class RateLimitHandler:
"""智能限流处理器"""
def __init__(self):
self.retry_after_default = 5 # 默认重试等待时间
def make_request_with_retry(
self,
url: str,
headers: Dict,
data: Dict,
max_retries: int = 3
) -> Optional[Dict]:
"""带限流处理的请求"""
for attempt in range(max_retries):
try:
response = requests.post(url, headers=headers, json=data)
# 成功响应
if response.status_code == 200:
return response.json()
# 限流错误
elif response.status_code == 429:
retry_after = self._get_retry_after(response)
print(f"触发限流,等待 {retry_after} 秒...")
# 显示剩余限制信息
self._show_rate_limit_info(response.headers)
time.sleep(retry_after)
continue
# 其他错误
else:
print(f"请求失败: {response.status_code}")
return None
except Exception as e:
print(f"请求异常: {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
return None
def _get_retry_after(self, response) -> int:
"""从响应头获取重试时间"""
retry_after = response.headers.get('Retry-After')
if retry_after:
# 可能是秒数或时间戳
try:
return int(retry_after)
except ValueError:
# 如果是日期格式,计算等待秒数
from datetime import datetime
retry_time = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
wait_seconds = (retry_time - datetime.utcnow()).total_seconds()
return max(0, int(wait_seconds))
return self.retry_after_default
def _show_rate_limit_info(self, headers):
"""显示限流信息"""
info = {
"请求限制": headers.get('X-RateLimit-Limit'),
"剩余请求": headers.get('X-RateLimit-Remaining'),
"重置时间": headers.get('X-RateLimit-Reset'),
"重试等待": headers.get('Retry-After')
}
for key, value in info.items():
if value:
print(f" {key}: {value}")
# 使用示例
handler = RateLimitHandler()
response = handler.make_request_with_retry(
url="https://api.n1n.ai/v1/chat/completions",
headers={"Authorization": "Bearer your-key"},
data={
"model": "gpt-4o",
"messages": [{"role": "user", "content": "Hello"}]
}
)💡 响应头信息
- •
X-RateLimit-Limit: 限制总量 - •
X-RateLimit-Remaining: 剩余配额 - •
X-RateLimit-Reset: 重置时间戳 - •
Retry-After: 建议等待秒数
三、高级限流策略
令牌桶算法
import asyncio
from collections import deque
from typing import Dict, Optional
import time
class TokenBucketRateLimiter:
"""令牌桶限流器"""
def __init__(self, rpm: int, tpm: int):
self.rpm = rpm # 每分钟请求数
self.tpm = tpm # 每分钟token数
self.request_times = deque()
self.token_usage = deque()
self.lock = asyncio.Lock()
async def wait_if_needed(self, estimated_tokens: int = 0):
"""必要时等待以符合限流"""
async with self.lock:
current_time = time.time()
# 清理过期记录(60秒前)
self._cleanup_old_records(current_time)
# 检查请求速率
if len(self.request_times) >= self.rpm:
wait_time = 60 - (current_time - self.request_times[0])
if wait_time > 0:
print(f"达到RPM限制({self.rpm}),等待 {wait_time:.1f} 秒")
await asyncio.sleep(wait_time)
return await self.wait_if_needed(estimated_tokens)
# 检查token速率
current_tokens = sum(tokens for _, tokens in self.token_usage)
if current_tokens + estimated_tokens > self.tpm:
wait_time = 60 - (current_time - self.token_usage[0][0])
if wait_time > 0:
print(f"达到TPM限制({self.tpm}),等待 {wait_time:.1f} 秒")
await asyncio.sleep(wait_time)
return await self.wait_if_needed(estimated_tokens)
# 记录请求
self.request_times.append(current_time)
if estimated_tokens > 0:
self.token_usage.append((current_time, estimated_tokens))
def _cleanup_old_records(self, current_time: float):
"""清理过期记录"""
cutoff_time = current_time - 60
while self.request_times and self.request_times[0] < cutoff_time:
self.request_times.popleft()
while self.token_usage and self.token_usage[0][0] < cutoff_time:
self.token_usage.popleft()
class MultiKeyRateLimiter:
"""多Key负载均衡限流器"""
def __init__(self, api_keys: list, limits: Dict):
self.api_keys = api_keys
self.limiters = {
key: TokenBucketRateLimiter(
rpm=limits.get("rpm", 500),
tpm=limits.get("tpm", 30000)
)
for key in api_keys
}
self.key_usage = {key: 0 for key in api_keys}
async def get_available_key(self, estimated_tokens: int = 0) -> Optional[str]:
"""获取可用的API key"""
# 按使用量排序
sorted_keys = sorted(self.key_usage.items(), key=lambda x: x[1])
for key, _ in sorted_keys:
limiter = self.limiters[key]
# 检查是否可以立即使用
if self._can_use_immediately(limiter, estimated_tokens):
self.key_usage[key] += 1
return key
# 如果都不能立即使用,等待使用量最少的key
least_used_key = sorted_keys[0][0]
await self.limiters[least_used_key].wait_if_needed(estimated_tokens)
self.key_usage[least_used_key] += 1
return least_used_key
def _can_use_immediately(self, limiter: TokenBucketRateLimiter, tokens: int) -> bool:
"""检查是否可以立即使用"""
current_time = time.time()
limiter._cleanup_old_records(current_time)
if len(limiter.request_times) >= limiter.rpm:
return False
current_tokens = sum(t for _, t in limiter.token_usage)
if current_tokens + tokens > limiter.tpm:
return False
return True
# 使用示例
api_keys = ["key1", "key2", "key3"]
limiter = MultiKeyRateLimiter(
api_keys=api_keys,
limits={"rpm": 500, "tpm": 30000}
)
async def make_request(prompt: str):
estimated_tokens = len(prompt.split()) * 2 # 简单估算
api_key = await limiter.get_available_key(estimated_tokens)
print(f"使用 API Key: {api_key}")
# 发送请求...四、指数退避策略
智能重试机制
import random
import asyncio
from typing import Callable, Any
class ExponentialBackoff:
"""指数退避重试策略"""
def __init__(
self,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential_base: float = 2.0,
jitter: bool = True
):
self.base_delay = base_delay
self.max_delay = max_delay
self.exponential_base = exponential_base
self.jitter = jitter
def get_delay(self, attempt: int) -> float:
"""计算退避延迟"""
# 指数计算
delay = min(
self.base_delay * (self.exponential_base ** attempt),
self.max_delay
)
# 添加抖动避免雷鸣群效应
if self.jitter:
delay = delay * (0.5 + random.random() * 0.5)
return delay
async def retry_with_backoff(
func: Callable,
max_attempts: int = 5,
backoff: ExponentialBackoff = None
) -> Any:
"""带指数退避的重试装饰器"""
if backoff is None:
backoff = ExponentialBackoff()
last_exception = None
for attempt in range(max_attempts):
try:
return await func()
except Exception as e:
last_exception = e
# 检查是否是限流错误
if hasattr(e, 'response') and e.response.status_code == 429:
# 优先使用服务器返回的重试时间
retry_after = e.response.headers.get('Retry-After')
if retry_after:
delay = int(retry_after)
else:
delay = backoff.get_delay(attempt)
print(f"尝试 {attempt + 1}/{max_attempts} 失败,{delay:.1f}秒后重试")
await asyncio.sleep(delay)
else:
# 非限流错误,可能不需要重试
raise e
# 所有重试都失败
raise Exception(f"重试{max_attempts}次后仍然失败") from last_exception
# 使用示例
@retry_with_backoff
async def call_api():
# API 调用逻辑
response = await make_api_request()
if response.status_code == 429:
raise RateLimitError(response)
return response
# 自定义退避策略
custom_backoff = ExponentialBackoff(
base_delay=2.0, # 初始延迟2秒
max_delay=120.0, # 最大延迟2分钟
exponential_base=3.0, # 3倍增长
jitter=True # 添加随机抖动
)
result = await retry_with_backoff(
call_api,
max_attempts=5,
backoff=custom_backoff
)🔄 退避策略对比
固定延迟:
简单但可能造成拥堵
线性退避:
逐步增加等待时间
指数退避:
快速增长,效果最佳
五、熔断器模式
服务保护机制
from enum import Enum
from datetime import datetime, timedelta
import asyncio
class CircuitState(Enum):
CLOSED = "closed" # 正常状态
OPEN = "open" # 断开状态
HALF_OPEN = "half_open" # 半开状态
class CircuitBreaker:
"""熔断器模式实现"""
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: int = 60,
expected_exception=Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
"""通过熔断器调用函数"""
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise Exception("熔断器开启,服务暂时不可用")
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except self.expected_exception as e:
self._on_failure()
raise e
def _should_attempt_reset(self) -> bool:
"""检查是否应该尝试重置"""
return (
self.last_failure_time and
datetime.now() > self.last_failure_time + timedelta(seconds=self.recovery_timeout)
)
def _on_success(self):
"""成功调用的处理"""
self.failure_count = 0
self.state = CircuitState.CLOSED
def _on_failure(self):
"""失败调用的处理"""
self.failure_count += 1
self.last_failure_time = datetime.now()
if self.failure_count >= self.failure_threshold:
self.state = CircuitState.OPEN
print(f"熔断器开启,连续失败 {self.failure_count} 次")
# 使用示例
circuit_breaker = CircuitBreaker(
failure_threshold=3, # 3次失败后熔断
recovery_timeout=30 # 30秒后尝试恢复
)
async def protected_api_call():
try:
return await circuit_breaker.call(make_api_request)
except Exception as e:
print(f"API调用失败: {e}")
# 返回降级响应
return get_fallback_response()六、监控与优化
限流监控系统
import time
from collections import defaultdict
from datetime import datetime, timedelta
class RateLimitMonitor:
"""限流监控器"""
def __init__(self):
self.stats = defaultdict(lambda: {
"requests": 0,
"rate_limited": 0,
"total_wait_time": 0,
"last_limit_time": None
})
def record_request(self, endpoint: str):
"""记录请求"""
self.stats[endpoint]["requests"] += 1
def record_rate_limit(self, endpoint: str, wait_time: float):
"""记录限流事件"""
stats = self.stats[endpoint]
stats["rate_limited"] += 1
stats["total_wait_time"] += wait_time
stats["last_limit_time"] = datetime.now()
def get_report(self) -> dict:
"""生成监控报告"""
report = {}
for endpoint, stats in self.stats.items():
success_rate = 1 - (stats["rate_limited"] / stats["requests"])
avg_wait = stats["total_wait_time"] / stats["rate_limited"] if stats["rate_limited"] > 0 else 0
report[endpoint] = {
"总请求数": stats["requests"],
"限流次数": stats["rate_limited"],
"成功率": f"{success_rate:.2%}",
"平均等待时间": f"{avg_wait:.2f}秒",
"最后限流时间": stats["last_limit_time"].strftime("%Y-%m-%d %H:%M:%S") if stats["last_limit_time"] else "无"
}
return report
def get_recommendations(self) -> list:
"""获取优化建议"""
recommendations = []
for endpoint, stats in self.stats.items():
limit_rate = stats["rate_limited"] / stats["requests"] if stats["requests"] > 0 else 0
if limit_rate > 0.1: # 限流率超过10%
recommendations.append({
"endpoint": endpoint,
"issue": f"限流率过高 ({limit_rate:.1%})",
"suggestion": "考虑降低请求频率或使用多个API key"
})
if stats["total_wait_time"] > 300: # 总等待超过5分钟
recommendations.append({
"endpoint": endpoint,
"issue": f"等待时间过长 ({stats['total_wait_time']:.0f}秒)",
"suggestion": "实施请求队列和批处理"
})
return recommendations
# 集成到请求处理
monitor = RateLimitMonitor()
async def monitored_request(endpoint: str, **kwargs):
monitor.record_request(endpoint)
try:
response = await make_request(endpoint, **kwargs)
return response
except RateLimitError as e:
wait_time = e.retry_after
monitor.record_rate_limit(endpoint, wait_time)
await asyncio.sleep(wait_time)
return await monitored_request(endpoint, **kwargs)
# 定期输出报告
async def print_monitor_report():
while True:
await asyncio.sleep(300) # 每5分钟
print("\n=== 限流监控报告 ===")
report = monitor.get_report()
for endpoint, stats in report.items():
print(f"\n{endpoint}:")
for key, value in stats.items():
print(f" {key}: {value}")
recommendations = monitor.get_recommendations()
if recommendations:
print("\n优化建议:")
for rec in recommendations:
print(f" - {rec['endpoint']}: {rec['suggestion']}")七、最佳实践总结
🛡️ 预防策略
- ✅ 实施请求速率控制
- ✅ 使用令牌桶算法
- ✅ 批量处理请求
- ✅ 预估Token使用量
- ✅ 多API Key负载均衡
🚨 应对措施
- ✅ 优雅处理429错误
- ✅ 指数退避重试
- ✅ 熔断器保护
- ✅ 降级备用方案
- ✅ 实时监控告警
✨ 高级技巧
- • 使用队列平滑请求峰值
- • 预热期逐步增加请求量
- • 根据业务优先级分配配额
- • 定期分析限流模式优化策略