Rate Limit 限流处理完整方案

深入理解 API 限流机制,掌握应对策略,确保服务稳定可靠运行

限流保护

防止服务过载

智能重试

自动等待恢复

速率控制

请求频率管理

负载均衡

多Key分流

一、API 限流规则

限流类型

RPM (Requests Per Minute)

每分钟请求数限制,防止请求过于频繁

TPM (Tokens Per Minute)

每分钟Token数限制,控制处理数据量

模型限制对比

模型RPMTPM等级
GPT-4o50030,000Tier 1
GPT-4o5,000450,000Tier 4
GPT-4o mini5,000200,000Tier 1
GPT-4o mini30,000150,000,000Tier 5
Claude 3.5 Sonnet1,000100,000Standard
Claude 3.5 Haiku5,000500,000Standard

二、基础限流处理

错误处理与重试

import time
import requests
from typing import Optional, Dict

class RateLimitHandler:
    """智能限流处理器"""
    
    def __init__(self):
        self.retry_after_default = 5  # 默认重试等待时间
        
    def make_request_with_retry(
        self,
        url: str,
        headers: Dict,
        data: Dict,
        max_retries: int = 3
    ) -> Optional[Dict]:
        """带限流处理的请求"""
        
        for attempt in range(max_retries):
            try:
                response = requests.post(url, headers=headers, json=data)
                
                # 成功响应
                if response.status_code == 200:
                    return response.json()
                
                # 限流错误
                elif response.status_code == 429:
                    retry_after = self._get_retry_after(response)
                    print(f"触发限流,等待 {retry_after} 秒...")
                    
                    # 显示剩余限制信息
                    self._show_rate_limit_info(response.headers)
                    
                    time.sleep(retry_after)
                    continue
                
                # 其他错误
                else:
                    print(f"请求失败: {response.status_code}")
                    return None
                    
            except Exception as e:
                print(f"请求异常: {e}")
                if attempt < max_retries - 1:
                    time.sleep(2 ** attempt)  # 指数退避
                    
        return None
    
    def _get_retry_after(self, response) -> int:
        """从响应头获取重试时间"""
        retry_after = response.headers.get('Retry-After')
        if retry_after:
            # 可能是秒数或时间戳
            try:
                return int(retry_after)
            except ValueError:
                # 如果是日期格式,计算等待秒数
                from datetime import datetime
                retry_time = datetime.strptime(retry_after, "%a, %d %b %Y %H:%M:%S %Z")
                wait_seconds = (retry_time - datetime.utcnow()).total_seconds()
                return max(0, int(wait_seconds))
        
        return self.retry_after_default
    
    def _show_rate_limit_info(self, headers):
        """显示限流信息"""
        info = {
            "请求限制": headers.get('X-RateLimit-Limit'),
            "剩余请求": headers.get('X-RateLimit-Remaining'),
            "重置时间": headers.get('X-RateLimit-Reset'),
            "重试等待": headers.get('Retry-After')
        }
        
        for key, value in info.items():
            if value:
                print(f"  {key}: {value}")

# 使用示例
handler = RateLimitHandler()
response = handler.make_request_with_retry(
    url="https://api.n1n.ai/v1/chat/completions",
    headers={"Authorization": "Bearer your-key"},
    data={
        "model": "gpt-4o",
        "messages": [{"role": "user", "content": "Hello"}]
    }
)

💡 响应头信息

  • X-RateLimit-Limit: 限制总量
  • X-RateLimit-Remaining: 剩余配额
  • X-RateLimit-Reset: 重置时间戳
  • Retry-After: 建议等待秒数

三、高级限流策略

令牌桶算法

import asyncio
from collections import deque
from typing import Dict, Optional
import time

class TokenBucketRateLimiter:
    """令牌桶限流器"""
    
    def __init__(self, rpm: int, tpm: int):
        self.rpm = rpm  # 每分钟请求数
        self.tpm = tpm  # 每分钟token数
        self.request_times = deque()
        self.token_usage = deque()
        self.lock = asyncio.Lock()
    
    async def wait_if_needed(self, estimated_tokens: int = 0):
        """必要时等待以符合限流"""
        async with self.lock:
            current_time = time.time()
            
            # 清理过期记录(60秒前)
            self._cleanup_old_records(current_time)
            
            # 检查请求速率
            if len(self.request_times) >= self.rpm:
                wait_time = 60 - (current_time - self.request_times[0])
                if wait_time > 0:
                    print(f"达到RPM限制({self.rpm}),等待 {wait_time:.1f} 秒")
                    await asyncio.sleep(wait_time)
                    return await self.wait_if_needed(estimated_tokens)
            
            # 检查token速率
            current_tokens = sum(tokens for _, tokens in self.token_usage)
            if current_tokens + estimated_tokens > self.tpm:
                wait_time = 60 - (current_time - self.token_usage[0][0])
                if wait_time > 0:
                    print(f"达到TPM限制({self.tpm}),等待 {wait_time:.1f} 秒")
                    await asyncio.sleep(wait_time)
                    return await self.wait_if_needed(estimated_tokens)
            
            # 记录请求
            self.request_times.append(current_time)
            if estimated_tokens > 0:
                self.token_usage.append((current_time, estimated_tokens))
    
    def _cleanup_old_records(self, current_time: float):
        """清理过期记录"""
        cutoff_time = current_time - 60
        
        while self.request_times and self.request_times[0] < cutoff_time:
            self.request_times.popleft()
        
        while self.token_usage and self.token_usage[0][0] < cutoff_time:
            self.token_usage.popleft()

class MultiKeyRateLimiter:
    """多Key负载均衡限流器"""
    
    def __init__(self, api_keys: list, limits: Dict):
        self.api_keys = api_keys
        self.limiters = {
            key: TokenBucketRateLimiter(
                rpm=limits.get("rpm", 500),
                tpm=limits.get("tpm", 30000)
            )
            for key in api_keys
        }
        self.key_usage = {key: 0 for key in api_keys}
    
    async def get_available_key(self, estimated_tokens: int = 0) -> Optional[str]:
        """获取可用的API key"""
        # 按使用量排序
        sorted_keys = sorted(self.key_usage.items(), key=lambda x: x[1])
        
        for key, _ in sorted_keys:
            limiter = self.limiters[key]
            
            # 检查是否可以立即使用
            if self._can_use_immediately(limiter, estimated_tokens):
                self.key_usage[key] += 1
                return key
        
        # 如果都不能立即使用,等待使用量最少的key
        least_used_key = sorted_keys[0][0]
        await self.limiters[least_used_key].wait_if_needed(estimated_tokens)
        self.key_usage[least_used_key] += 1
        return least_used_key
    
    def _can_use_immediately(self, limiter: TokenBucketRateLimiter, tokens: int) -> bool:
        """检查是否可以立即使用"""
        current_time = time.time()
        limiter._cleanup_old_records(current_time)
        
        if len(limiter.request_times) >= limiter.rpm:
            return False
        
        current_tokens = sum(t for _, t in limiter.token_usage)
        if current_tokens + tokens > limiter.tpm:
            return False
        
        return True

# 使用示例
api_keys = ["key1", "key2", "key3"]
limiter = MultiKeyRateLimiter(
    api_keys=api_keys,
    limits={"rpm": 500, "tpm": 30000}
)

async def make_request(prompt: str):
    estimated_tokens = len(prompt.split()) * 2  # 简单估算
    api_key = await limiter.get_available_key(estimated_tokens)
    print(f"使用 API Key: {api_key}")
    # 发送请求...

四、指数退避策略

智能重试机制

import random
import asyncio
from typing import Callable, Any

class ExponentialBackoff:
    """指数退避重试策略"""
    
    def __init__(
        self,
        base_delay: float = 1.0,
        max_delay: float = 60.0,
        exponential_base: float = 2.0,
        jitter: bool = True
    ):
        self.base_delay = base_delay
        self.max_delay = max_delay
        self.exponential_base = exponential_base
        self.jitter = jitter
    
    def get_delay(self, attempt: int) -> float:
        """计算退避延迟"""
        # 指数计算
        delay = min(
            self.base_delay * (self.exponential_base ** attempt),
            self.max_delay
        )
        
        # 添加抖动避免雷鸣群效应
        if self.jitter:
            delay = delay * (0.5 + random.random() * 0.5)
        
        return delay

async def retry_with_backoff(
    func: Callable,
    max_attempts: int = 5,
    backoff: ExponentialBackoff = None
) -> Any:
    """带指数退避的重试装饰器"""
    if backoff is None:
        backoff = ExponentialBackoff()
    
    last_exception = None
    
    for attempt in range(max_attempts):
        try:
            return await func()
        except Exception as e:
            last_exception = e
            
            # 检查是否是限流错误
            if hasattr(e, 'response') and e.response.status_code == 429:
                # 优先使用服务器返回的重试时间
                retry_after = e.response.headers.get('Retry-After')
                if retry_after:
                    delay = int(retry_after)
                else:
                    delay = backoff.get_delay(attempt)
                
                print(f"尝试 {attempt + 1}/{max_attempts} 失败,{delay:.1f}秒后重试")
                await asyncio.sleep(delay)
            else:
                # 非限流错误,可能不需要重试
                raise e
    
    # 所有重试都失败
    raise Exception(f"重试{max_attempts}次后仍然失败") from last_exception

# 使用示例
@retry_with_backoff
async def call_api():
    # API 调用逻辑
    response = await make_api_request()
    if response.status_code == 429:
        raise RateLimitError(response)
    return response

# 自定义退避策略
custom_backoff = ExponentialBackoff(
    base_delay=2.0,      # 初始延迟2秒
    max_delay=120.0,     # 最大延迟2分钟
    exponential_base=3.0,  # 3倍增长
    jitter=True          # 添加随机抖动
)

result = await retry_with_backoff(
    call_api,
    max_attempts=5,
    backoff=custom_backoff
)

🔄 退避策略对比

固定延迟:

简单但可能造成拥堵

线性退避:

逐步增加等待时间

指数退避:

快速增长,效果最佳

五、熔断器模式

服务保护机制

from enum import Enum
from datetime import datetime, timedelta
import asyncio

class CircuitState(Enum):
    CLOSED = "closed"    # 正常状态
    OPEN = "open"        # 断开状态
    HALF_OPEN = "half_open"  # 半开状态

class CircuitBreaker:
    """熔断器模式实现"""
    
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: int = 60,
        expected_exception=Exception
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.expected_exception = expected_exception
        
        self.failure_count = 0
        self.last_failure_time = None
        self.state = CircuitState.CLOSED
    
    async def call(self, func, *args, **kwargs):
        """通过熔断器调用函数"""
        if self.state == CircuitState.OPEN:
            if self._should_attempt_reset():
                self.state = CircuitState.HALF_OPEN
            else:
                raise Exception("熔断器开启,服务暂时不可用")
        
        try:
            result = await func(*args, **kwargs)
            self._on_success()
            return result
        except self.expected_exception as e:
            self._on_failure()
            raise e
    
    def _should_attempt_reset(self) -> bool:
        """检查是否应该尝试重置"""
        return (
            self.last_failure_time and
            datetime.now() > self.last_failure_time + timedelta(seconds=self.recovery_timeout)
        )
    
    def _on_success(self):
        """成功调用的处理"""
        self.failure_count = 0
        self.state = CircuitState.CLOSED
    
    def _on_failure(self):
        """失败调用的处理"""
        self.failure_count += 1
        self.last_failure_time = datetime.now()
        
        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            print(f"熔断器开启,连续失败 {self.failure_count} 次")

# 使用示例
circuit_breaker = CircuitBreaker(
    failure_threshold=3,  # 3次失败后熔断
    recovery_timeout=30   # 30秒后尝试恢复
)

async def protected_api_call():
    try:
        return await circuit_breaker.call(make_api_request)
    except Exception as e:
        print(f"API调用失败: {e}")
        # 返回降级响应
        return get_fallback_response()

六、监控与优化

限流监控系统

import time
from collections import defaultdict
from datetime import datetime, timedelta

class RateLimitMonitor:
    """限流监控器"""
    
    def __init__(self):
        self.stats = defaultdict(lambda: {
            "requests": 0,
            "rate_limited": 0,
            "total_wait_time": 0,
            "last_limit_time": None
        })
    
    def record_request(self, endpoint: str):
        """记录请求"""
        self.stats[endpoint]["requests"] += 1
    
    def record_rate_limit(self, endpoint: str, wait_time: float):
        """记录限流事件"""
        stats = self.stats[endpoint]
        stats["rate_limited"] += 1
        stats["total_wait_time"] += wait_time
        stats["last_limit_time"] = datetime.now()
    
    def get_report(self) -> dict:
        """生成监控报告"""
        report = {}
        
        for endpoint, stats in self.stats.items():
            success_rate = 1 - (stats["rate_limited"] / stats["requests"])
            avg_wait = stats["total_wait_time"] / stats["rate_limited"] if stats["rate_limited"] > 0 else 0
            
            report[endpoint] = {
                "总请求数": stats["requests"],
                "限流次数": stats["rate_limited"],
                "成功率": f"{success_rate:.2%}",
                "平均等待时间": f"{avg_wait:.2f}秒",
                "最后限流时间": stats["last_limit_time"].strftime("%Y-%m-%d %H:%M:%S") if stats["last_limit_time"] else "无"
            }
        
        return report
    
    def get_recommendations(self) -> list:
        """获取优化建议"""
        recommendations = []
        
        for endpoint, stats in self.stats.items():
            limit_rate = stats["rate_limited"] / stats["requests"] if stats["requests"] > 0 else 0
            
            if limit_rate > 0.1:  # 限流率超过10%
                recommendations.append({
                    "endpoint": endpoint,
                    "issue": f"限流率过高 ({limit_rate:.1%})",
                    "suggestion": "考虑降低请求频率或使用多个API key"
                })
            
            if stats["total_wait_time"] > 300:  # 总等待超过5分钟
                recommendations.append({
                    "endpoint": endpoint,
                    "issue": f"等待时间过长 ({stats['total_wait_time']:.0f}秒)",
                    "suggestion": "实施请求队列和批处理"
                })
        
        return recommendations

# 集成到请求处理
monitor = RateLimitMonitor()

async def monitored_request(endpoint: str, **kwargs):
    monitor.record_request(endpoint)
    
    try:
        response = await make_request(endpoint, **kwargs)
        return response
    except RateLimitError as e:
        wait_time = e.retry_after
        monitor.record_rate_limit(endpoint, wait_time)
        await asyncio.sleep(wait_time)
        return await monitored_request(endpoint, **kwargs)

# 定期输出报告
async def print_monitor_report():
    while True:
        await asyncio.sleep(300)  # 每5分钟
        print("\n=== 限流监控报告 ===")
        report = monitor.get_report()
        for endpoint, stats in report.items():
            print(f"\n{endpoint}:")
            for key, value in stats.items():
                print(f"  {key}: {value}")
        
        recommendations = monitor.get_recommendations()
        if recommendations:
            print("\n优化建议:")
            for rec in recommendations:
                print(f"  - {rec['endpoint']}: {rec['suggestion']}")

七、最佳实践总结

🛡️ 预防策略

  • ✅ 实施请求速率控制
  • ✅ 使用令牌桶算法
  • ✅ 批量处理请求
  • ✅ 预估Token使用量
  • ✅ 多API Key负载均衡

🚨 应对措施

  • ✅ 优雅处理429错误
  • ✅ 指数退避重试
  • ✅ 熔断器保护
  • ✅ 降级备用方案
  • ✅ 实时监控告警

✨ 高级技巧

  • • 使用队列平滑请求峰值
  • • 预热期逐步增加请求量
  • • 根据业务优先级分配配额
  • • 定期分析限流模式优化策略