多智能体系统:让AI协同工作
单个AI模型的能力是有限的,但当多个智能体协同工作时, 可以解决更复杂的问题。本指南将教您如何构建高效的多智能体系统。
多智能体架构设计
系统架构概览
// 多智能体系统框架
class MultiAgentSystem {
private agents: Map<string, Agent>;
private coordinator: Coordinator;
private messageQueue: MessageQueue;
private sharedMemory: SharedMemory;
constructor() {
this.agents = new Map();
this.coordinator = new Coordinator();
this.messageQueue = new MessageQueue();
this.sharedMemory = new SharedMemory();
}
// 注册智能体
registerAgent(agent: Agent) {
this.agents.set(agent.id, agent);
// 设置通信通道
agent.setMessageQueue(this.messageQueue);
agent.setSharedMemory(this.sharedMemory);
// 注册到协调器
this.coordinator.register(agent);
}
// 执行任务
async executeTask(task: Task): Promise<TaskResult> {
// 1. 任务分解
const subtasks = await this.coordinator.decomposeTask(task);
// 2. 分配任务
const assignments = await this.coordinator.assignTasks(
subtasks,
this.agents
);
// 3. 并行执行
const results = await Promise.all(
assignments.map(async ({ agent, subtask }) => {
return agent.execute(subtask);
})
);
// 4. 结果聚合
return this.coordinator.aggregateResults(results);
}
}
// 智能体基类
abstract class Agent {
readonly id: string;
readonly capabilities: string[];
private messageQueue: MessageQueue;
private memory: SharedMemory;
constructor(id: string, capabilities: string[]) {
this.id = id;
this.capabilities = capabilities;
}
// 执行任务
abstract async execute(task: Subtask): Promise<any>;
// 发送消息给其他智能体
async sendMessage(recipientId: string, message: Message) {
await this.messageQueue.send({
from: this.id,
to: recipientId,
content: message,
timestamp: Date.now()
});
}
// 接收消息
async receiveMessages(): Promise<Message[]> {
return this.messageQueue.receive(this.id);
}
// 访问共享内存
async readMemory(key: string): Promise<any> {
return this.memory.get(key);
}
async writeMemory(key: string, value: any) {
await this.memory.set(key, value);
}
}
// 专门的智能体实现
class ResearchAgent extends Agent {
private llm: LLMClient;
constructor() {
super('research-agent', ['research', 'analysis']);
this.llm = new LLMClient({ model: 'gpt-4' });
}
async execute(task: Subtask): Promise<ResearchResult> {
// 1. 理解任务
const understanding = await this.understandTask(task);
// 2. 收集信息
const information = await this.gatherInformation(
understanding.keywords
);
// 3. 分析整理
const analysis = await this.analyzeInformation(information);
// 4. 生成报告
const report = await this.generateReport(analysis);
// 5. 存储到共享内存
await this.writeMemory(`research_${task.id}`, report);
return report;
}
private async gatherInformation(keywords: string[]) {
// 搜索相关信息
const searchResults = await this.searchWeb(keywords);
// 提取关键信息
const extracted = await this.llm.extract({
prompt: "Extract key information from search results",
data: searchResults
});
return extracted;
}
}任务分解与协调
智能任务调度器
// 协调器实现
class Coordinator {
private planner: TaskPlanner;
private scheduler: TaskScheduler;
private monitor: TaskMonitor;
constructor() {
this.planner = new TaskPlanner();
this.scheduler = new TaskScheduler();
this.monitor = new TaskMonitor();
}
// 任务分解
async decomposeTask(task: Task): Promise<Subtask[]> {
// 使用LLM分解任务
const decomposition = await this.planner.decompose({
task: task.description,
constraints: task.constraints,
resources: task.resources
});
// 创建任务依赖图
const dependencyGraph = this.createDependencyGraph(
decomposition.subtasks
);
// 优化执行顺序
const optimizedOrder = this.topologicalSort(dependencyGraph);
return optimizedOrder.map(node => ({
id: generateId(),
name: node.name,
description: node.description,
dependencies: node.dependencies,
requiredCapabilities: node.capabilities,
estimatedDuration: node.duration,
priority: node.priority
}));
}
// 任务分配算法
async assignTasks(
subtasks: Subtask[],
agents: Map<string, Agent>
): Promise<Assignment[]> {
const assignments: Assignment[] = [];
const agentWorkload = new Map<string, number>();
// 初始化工作负载
agents.forEach((_, id) => agentWorkload.set(id, 0));
// 按优先级排序任务
const sortedTasks = subtasks.sort((a, b) =>
b.priority - a.priority
);
for (const subtask of sortedTasks) {
// 找到最合适的智能体
const bestAgent = this.findBestAgent(
subtask,
agents,
agentWorkload
);
if (!bestAgent) {
throw new Error(`No agent available for task ${subtask.id}`);
}
// 分配任务
assignments.push({
agent: bestAgent,
subtask: subtask
});
// 更新工作负载
const currentLoad = agentWorkload.get(bestAgent.id) || 0;
agentWorkload.set(
bestAgent.id,
currentLoad + subtask.estimatedDuration
);
}
return assignments;
}
// 选择最佳智能体
private findBestAgent(
subtask: Subtask,
agents: Map<string, Agent>,
workload: Map<string, number>
): Agent | null {
let bestAgent: Agent | null = null;
let bestScore = -Infinity;
agents.forEach(agent => {
// 检查能力匹配
const hasCapabilities = subtask.requiredCapabilities.every(
cap => agent.capabilities.includes(cap)
);
if (!hasCapabilities) return;
// 计算适配度分数
const score = this.calculateFitness(
agent,
subtask,
workload.get(agent.id) || 0
);
if (score > bestScore) {
bestScore = score;
bestAgent = agent;
}
});
return bestAgent;
}
// 结果聚合
async aggregateResults(results: any[]): Promise<TaskResult> {
// 验证所有结果
const validResults = results.filter(r => this.validateResult(r));
// 合并结果
const merged = this.mergeResults(validResults);
// 质量检查
const quality = await this.assessQuality(merged);
return {
success: quality.score > 0.8,
data: merged,
quality: quality,
metadata: {
totalSubtasks: results.length,
successfulSubtasks: validResults.length,
executionTime: this.monitor.getTotalTime(),
agentPerformance: this.monitor.getAgentStats()
}
};
}
}智能体通信机制
消息传递系统
消息队列实现
class MessageQueue {
private queues: Map<string, Message[]>;
private subscribers: Map<string, Subscriber[]>;
constructor() {
this.queues = new Map();
this.subscribers = new Map();
}
// 发送消息
async send(message: Message) {
const queue = this.getOrCreateQueue(message.to);
queue.push(message);
// 触发订阅者
await this.notifySubscribers(message.to, message);
// 记录消息
this.logMessage(message);
}
// 广播消息
async broadcast(
from: string,
content: any,
filter?: (agent: Agent) => boolean
) {
const agents = this.getAllAgents();
for (const agentId of agents) {
if (agentId === from) continue;
if (filter && !filter(this.getAgent(agentId))) continue;
await this.send({
from,
to: agentId,
content,
type: 'broadcast',
timestamp: Date.now()
});
}
}
// 订阅消息
subscribe(
agentId: string,
callback: (message: Message) => void
) {
const subs = this.subscribers.get(agentId) || [];
subs.push({ callback });
this.subscribers.set(agentId, subs);
}
}协议定义
// 通信协议
interface Protocol {
// 请求协助
requestHelp(task: Task): Promise<HelpResponse>;
// 共享信息
shareInformation(info: Information): void;
// 协商决策
negotiate(proposal: Proposal): Promise<Decision>;
// 状态同步
syncState(state: AgentState): void;
}
// 消息类型
enum MessageType {
REQUEST = 'request',
RESPONSE = 'response',
BROADCAST = 'broadcast',
NOTIFICATION = 'notification',
NEGOTIATION = 'negotiation'
}
// 消息格式
interface Message {
id: string;
from: string;
to: string;
type: MessageType;
content: any;
timestamp: number;
priority?: number;
requiresResponse?: boolean;
timeout?: number;
}实战案例:客服系统
多智能体客服系统
// 客服系统实现
class CustomerServiceSystem extends MultiAgentSystem {
constructor() {
super();
// 注册专门的智能体
this.registerAgent(new IntentClassifierAgent());
this.registerAgent(new FAQAgent());
this.registerAgent(new TechnicalSupportAgent());
this.registerAgent(new OrderProcessingAgent());
this.registerAgent(new EscalationAgent());
this.registerAgent(new SentimentAnalysisAgent());
}
async handleCustomerQuery(query: CustomerQuery): Promise<Response> {
// 1. 意图识别
const intent = await this.classifyIntent(query);
// 2. 情感分析
const sentiment = await this.analyzeSentiment(query);
// 3. 路由到合适的智能体
const response = await this.routeQuery(query, intent, sentiment);
// 4. 质量检查
const qualityCheck = await this.checkResponseQuality(response);
// 5. 人工升级判断
if (qualityCheck.needsEscalation) {
return this.escalateToHuman(query, response);
}
return response;
}
}
// 意图分类智能体
class IntentClassifierAgent extends Agent {
constructor() {
super('intent-classifier', ['classification', 'nlp']);
}
async execute(task: Subtask): Promise<Intent> {
const query = task.data.query;
// 使用LLM分类意图
const classification = await this.llm.classify({
text: query,
categories: [
'technical_support',
'order_inquiry',
'product_question',
'complaint',
'general_inquiry'
]
});
// 提取实体
const entities = await this.extractEntities(query);
// 广播意图信息
await this.sendMessage('broadcast', {
type: 'intent_classified',
intent: classification.category,
confidence: classification.confidence,
entities: entities
});
return {
category: classification.category,
confidence: classification.confidence,
entities: entities,
keywords: this.extractKeywords(query)
};
}
}
// FAQ智能体
class FAQAgent extends Agent {
private knowledgeBase: KnowledgeBase;
constructor() {
super('faq-agent', ['faq', 'knowledge_retrieval']);
this.knowledgeBase = new KnowledgeBase();
}
async execute(task: Subtask): Promise<FAQResponse> {
const { query, intent } = task.data;
// 检查是否是FAQ类问题
if (!this.canHandle(intent)) {
return null;
}
// 搜索知识库
const relevantDocs = await this.knowledgeBase.search(query, {
topK: 5,
threshold: 0.8
});
if (relevantDocs.length === 0) {
// 请求其他智能体协助
const helpResponse = await this.requestHelp({
query,
reason: 'No FAQ matches found'
});
return helpResponse;
}
// 生成回答
const answer = await this.generateAnswer(query, relevantDocs);
// 记录到共享内存供其他智能体参考
await this.writeMemory('last_faq_answer', {
query,
answer,
confidence: answer.confidence
});
return answer;
}
private async requestHelp(context: any): Promise<any> {
// 向技术支持智能体请求帮助
const response = await this.sendMessage('technical-support-agent', {
type: 'help_request',
context,
sender: this.id
});
return response;
}
}协作模式
常见协作模式
🔄 流水线模式
// 顺序处理任务
class Pipeline {
async execute(input: any) {
let result = input;
// Agent1: 数据预处理
result = await this.preprocessAgent.process(result);
// Agent2: 主要处理
result = await this.mainAgent.process(result);
// Agent3: 后处理
result = await this.postprocessAgent.process(result);
return result;
}
}🌟 星型模式
// 中心协调模式
class StarTopology {
async execute(task: Task) {
// 中心智能体分发任务
const subtasks = this.centralAgent.distribute(task);
// 并行执行
const results = await Promise.all(
subtasks.map(st =>
this.workerAgents.get(st.type).execute(st)
)
);
// 中心智能体聚合
return this.centralAgent.aggregate(results);
}
}🕸️ 网状模式
// 对等协作
class MeshTopology {
async execute(task: Task) {
// 所有智能体都可以通信
const agents = this.getAllAgents();
// 自组织协作
for (const agent of agents) {
agent.startCollaboration(task);
}
// 等待共识
return this.waitForConsensus();
}
}🏛️ 层级模式
// 分层管理
class HierarchicalTopology {
async execute(task: Task) {
// 管理层决策
const strategy = await this.managerAgent.plan(task);
// 主管分配
const assignments = await this.supervisorAgents.assign(strategy);
// 工作层执行
return this.workerAgents.execute(assignments);
}
}性能优化
系统优化策略
⚡ 并发控制
class ConcurrencyController {
private semaphore: Semaphore;
private taskQueue: PriorityQueue<Task>;
constructor(maxConcurrency: number) {
this.semaphore = new Semaphore(maxConcurrency);
this.taskQueue = new PriorityQueue();
}
async executeWithLimit(tasks: Task[]): Promise<Result[]> {
const results = [];
// 使用信号量控制并发
const promises = tasks.map(async (task) => {
await this.semaphore.acquire();
try {
const result = await this.executeTask(task);
results.push(result);
} finally {
this.semaphore.release();
}
});
await Promise.all(promises);
return results;
}
}💾 资源管理
内存优化
- • 共享内存池
- • 定期清理
- • 智能缓存
计算优化
- • 负载均衡
- • 任务批处理
- • 结果复用
调试与监控
多智能体调试工具
// 调试监控系统
class MultiAgentDebugger {
private tracer: DistributedTracer;
private visualizer: AgentVisualizer;
private profiler: PerformanceProfiler;
// 追踪消息流
traceMessageFlow(sessionId: string) {
return this.tracer.getMessageFlow(sessionId);
}
// 可视化智能体状态
visualizeAgentStates() {
const states = this.collectAgentStates();
return this.visualizer.render(states);
}
// 性能分析
profileExecution(taskId: string) {
return {
timeline: this.profiler.getTimeline(taskId),
bottlenecks: this.profiler.findBottlenecks(taskId),
resourceUsage: this.profiler.getResourceUsage(taskId),
recommendations: this.generateOptimizationTips()
};
}
// 实时监控面板
getDashboard() {
return {
activeAgents: this.getActiveAgentCount(),
messageRate: this.getMessageRate(),
taskQueue: this.getQueueDepth(),
errorRate: this.getErrorRate(),
avgResponseTime: this.getAverageResponseTime()
};
}
}