多智能体系统:让AI协同工作

单个AI模型的能力是有限的,但当多个智能体协同工作时, 可以解决更复杂的问题。本指南将教您如何构建高效的多智能体系统。

多智能体架构设计

系统架构概览

// 多智能体系统框架
class MultiAgentSystem {
  private agents: Map<string, Agent>;
  private coordinator: Coordinator;
  private messageQueue: MessageQueue;
  private sharedMemory: SharedMemory;
  
  constructor() {
    this.agents = new Map();
    this.coordinator = new Coordinator();
    this.messageQueue = new MessageQueue();
    this.sharedMemory = new SharedMemory();
  }
  
  // 注册智能体
  registerAgent(agent: Agent) {
    this.agents.set(agent.id, agent);
    
    // 设置通信通道
    agent.setMessageQueue(this.messageQueue);
    agent.setSharedMemory(this.sharedMemory);
    
    // 注册到协调器
    this.coordinator.register(agent);
  }
  
  // 执行任务
  async executeTask(task: Task): Promise<TaskResult> {
    // 1. 任务分解
    const subtasks = await this.coordinator.decomposeTask(task);
    
    // 2. 分配任务
    const assignments = await this.coordinator.assignTasks(
      subtasks,
      this.agents
    );
    
    // 3. 并行执行
    const results = await Promise.all(
      assignments.map(async ({ agent, subtask }) => {
        return agent.execute(subtask);
      })
    );
    
    // 4. 结果聚合
    return this.coordinator.aggregateResults(results);
  }
}

// 智能体基类
abstract class Agent {
  readonly id: string;
  readonly capabilities: string[];
  private messageQueue: MessageQueue;
  private memory: SharedMemory;
  
  constructor(id: string, capabilities: string[]) {
    this.id = id;
    this.capabilities = capabilities;
  }
  
  // 执行任务
  abstract async execute(task: Subtask): Promise<any>;
  
  // 发送消息给其他智能体
  async sendMessage(recipientId: string, message: Message) {
    await this.messageQueue.send({
      from: this.id,
      to: recipientId,
      content: message,
      timestamp: Date.now()
    });
  }
  
  // 接收消息
  async receiveMessages(): Promise<Message[]> {
    return this.messageQueue.receive(this.id);
  }
  
  // 访问共享内存
  async readMemory(key: string): Promise<any> {
    return this.memory.get(key);
  }
  
  async writeMemory(key: string, value: any) {
    await this.memory.set(key, value);
  }
}

// 专门的智能体实现
class ResearchAgent extends Agent {
  private llm: LLMClient;
  
  constructor() {
    super('research-agent', ['research', 'analysis']);
    this.llm = new LLMClient({ model: 'gpt-4' });
  }
  
  async execute(task: Subtask): Promise<ResearchResult> {
    // 1. 理解任务
    const understanding = await this.understandTask(task);
    
    // 2. 收集信息
    const information = await this.gatherInformation(
      understanding.keywords
    );
    
    // 3. 分析整理
    const analysis = await this.analyzeInformation(information);
    
    // 4. 生成报告
    const report = await this.generateReport(analysis);
    
    // 5. 存储到共享内存
    await this.writeMemory(`research_${task.id}`, report);
    
    return report;
  }
  
  private async gatherInformation(keywords: string[]) {
    // 搜索相关信息
    const searchResults = await this.searchWeb(keywords);
    
    // 提取关键信息
    const extracted = await this.llm.extract({
      prompt: "Extract key information from search results",
      data: searchResults
    });
    
    return extracted;
  }
}

任务分解与协调

智能任务调度器

// 协调器实现
class Coordinator {
  private planner: TaskPlanner;
  private scheduler: TaskScheduler;
  private monitor: TaskMonitor;
  
  constructor() {
    this.planner = new TaskPlanner();
    this.scheduler = new TaskScheduler();
    this.monitor = new TaskMonitor();
  }
  
  // 任务分解
  async decomposeTask(task: Task): Promise<Subtask[]> {
    // 使用LLM分解任务
    const decomposition = await this.planner.decompose({
      task: task.description,
      constraints: task.constraints,
      resources: task.resources
    });
    
    // 创建任务依赖图
    const dependencyGraph = this.createDependencyGraph(
      decomposition.subtasks
    );
    
    // 优化执行顺序
    const optimizedOrder = this.topologicalSort(dependencyGraph);
    
    return optimizedOrder.map(node => ({
      id: generateId(),
      name: node.name,
      description: node.description,
      dependencies: node.dependencies,
      requiredCapabilities: node.capabilities,
      estimatedDuration: node.duration,
      priority: node.priority
    }));
  }
  
  // 任务分配算法
  async assignTasks(
    subtasks: Subtask[],
    agents: Map<string, Agent>
  ): Promise<Assignment[]> {
    const assignments: Assignment[] = [];
    const agentWorkload = new Map<string, number>();
    
    // 初始化工作负载
    agents.forEach((_, id) => agentWorkload.set(id, 0));
    
    // 按优先级排序任务
    const sortedTasks = subtasks.sort((a, b) => 
      b.priority - a.priority
    );
    
    for (const subtask of sortedTasks) {
      // 找到最合适的智能体
      const bestAgent = this.findBestAgent(
        subtask,
        agents,
        agentWorkload
      );
      
      if (!bestAgent) {
        throw new Error(`No agent available for task ${subtask.id}`);
      }
      
      // 分配任务
      assignments.push({
        agent: bestAgent,
        subtask: subtask
      });
      
      // 更新工作负载
      const currentLoad = agentWorkload.get(bestAgent.id) || 0;
      agentWorkload.set(
        bestAgent.id,
        currentLoad + subtask.estimatedDuration
      );
    }
    
    return assignments;
  }
  
  // 选择最佳智能体
  private findBestAgent(
    subtask: Subtask,
    agents: Map<string, Agent>,
    workload: Map<string, number>
  ): Agent | null {
    let bestAgent: Agent | null = null;
    let bestScore = -Infinity;
    
    agents.forEach(agent => {
      // 检查能力匹配
      const hasCapabilities = subtask.requiredCapabilities.every(
        cap => agent.capabilities.includes(cap)
      );
      
      if (!hasCapabilities) return;
      
      // 计算适配度分数
      const score = this.calculateFitness(
        agent,
        subtask,
        workload.get(agent.id) || 0
      );
      
      if (score > bestScore) {
        bestScore = score;
        bestAgent = agent;
      }
    });
    
    return bestAgent;
  }
  
  // 结果聚合
  async aggregateResults(results: any[]): Promise<TaskResult> {
    // 验证所有结果
    const validResults = results.filter(r => this.validateResult(r));
    
    // 合并结果
    const merged = this.mergeResults(validResults);
    
    // 质量检查
    const quality = await this.assessQuality(merged);
    
    return {
      success: quality.score > 0.8,
      data: merged,
      quality: quality,
      metadata: {
        totalSubtasks: results.length,
        successfulSubtasks: validResults.length,
        executionTime: this.monitor.getTotalTime(),
        agentPerformance: this.monitor.getAgentStats()
      }
    };
  }
}

智能体通信机制

消息传递系统

消息队列实现

class MessageQueue {
  private queues: Map<string, Message[]>;
  private subscribers: Map<string, Subscriber[]>;
  
  constructor() {
    this.queues = new Map();
    this.subscribers = new Map();
  }
  
  // 发送消息
  async send(message: Message) {
    const queue = this.getOrCreateQueue(message.to);
    queue.push(message);
    
    // 触发订阅者
    await this.notifySubscribers(message.to, message);
    
    // 记录消息
    this.logMessage(message);
  }
  
  // 广播消息
  async broadcast(
    from: string, 
    content: any,
    filter?: (agent: Agent) => boolean
  ) {
    const agents = this.getAllAgents();
    
    for (const agentId of agents) {
      if (agentId === from) continue;
      if (filter && !filter(this.getAgent(agentId))) continue;
      
      await this.send({
        from,
        to: agentId,
        content,
        type: 'broadcast',
        timestamp: Date.now()
      });
    }
  }
  
  // 订阅消息
  subscribe(
    agentId: string,
    callback: (message: Message) => void
  ) {
    const subs = this.subscribers.get(agentId) || [];
    subs.push({ callback });
    this.subscribers.set(agentId, subs);
  }
}

协议定义

// 通信协议
interface Protocol {
  // 请求协助
  requestHelp(task: Task): Promise<HelpResponse>;
  
  // 共享信息
  shareInformation(info: Information): void;
  
  // 协商决策
  negotiate(proposal: Proposal): Promise<Decision>;
  
  // 状态同步
  syncState(state: AgentState): void;
}

// 消息类型
enum MessageType {
  REQUEST = 'request',
  RESPONSE = 'response',
  BROADCAST = 'broadcast',
  NOTIFICATION = 'notification',
  NEGOTIATION = 'negotiation'
}

// 消息格式
interface Message {
  id: string;
  from: string;
  to: string;
  type: MessageType;
  content: any;
  timestamp: number;
  priority?: number;
  requiresResponse?: boolean;
  timeout?: number;
}

实战案例:客服系统

多智能体客服系统

// 客服系统实现
class CustomerServiceSystem extends MultiAgentSystem {
  constructor() {
    super();
    
    // 注册专门的智能体
    this.registerAgent(new IntentClassifierAgent());
    this.registerAgent(new FAQAgent());
    this.registerAgent(new TechnicalSupportAgent());
    this.registerAgent(new OrderProcessingAgent());
    this.registerAgent(new EscalationAgent());
    this.registerAgent(new SentimentAnalysisAgent());
  }
  
  async handleCustomerQuery(query: CustomerQuery): Promise<Response> {
    // 1. 意图识别
    const intent = await this.classifyIntent(query);
    
    // 2. 情感分析
    const sentiment = await this.analyzeSentiment(query);
    
    // 3. 路由到合适的智能体
    const response = await this.routeQuery(query, intent, sentiment);
    
    // 4. 质量检查
    const qualityCheck = await this.checkResponseQuality(response);
    
    // 5. 人工升级判断
    if (qualityCheck.needsEscalation) {
      return this.escalateToHuman(query, response);
    }
    
    return response;
  }
}

// 意图分类智能体
class IntentClassifierAgent extends Agent {
  constructor() {
    super('intent-classifier', ['classification', 'nlp']);
  }
  
  async execute(task: Subtask): Promise<Intent> {
    const query = task.data.query;
    
    // 使用LLM分类意图
    const classification = await this.llm.classify({
      text: query,
      categories: [
        'technical_support',
        'order_inquiry',
        'product_question',
        'complaint',
        'general_inquiry'
      ]
    });
    
    // 提取实体
    const entities = await this.extractEntities(query);
    
    // 广播意图信息
    await this.sendMessage('broadcast', {
      type: 'intent_classified',
      intent: classification.category,
      confidence: classification.confidence,
      entities: entities
    });
    
    return {
      category: classification.category,
      confidence: classification.confidence,
      entities: entities,
      keywords: this.extractKeywords(query)
    };
  }
}

// FAQ智能体
class FAQAgent extends Agent {
  private knowledgeBase: KnowledgeBase;
  
  constructor() {
    super('faq-agent', ['faq', 'knowledge_retrieval']);
    this.knowledgeBase = new KnowledgeBase();
  }
  
  async execute(task: Subtask): Promise<FAQResponse> {
    const { query, intent } = task.data;
    
    // 检查是否是FAQ类问题
    if (!this.canHandle(intent)) {
      return null;
    }
    
    // 搜索知识库
    const relevantDocs = await this.knowledgeBase.search(query, {
      topK: 5,
      threshold: 0.8
    });
    
    if (relevantDocs.length === 0) {
      // 请求其他智能体协助
      const helpResponse = await this.requestHelp({
        query,
        reason: 'No FAQ matches found'
      });
      
      return helpResponse;
    }
    
    // 生成回答
    const answer = await this.generateAnswer(query, relevantDocs);
    
    // 记录到共享内存供其他智能体参考
    await this.writeMemory('last_faq_answer', {
      query,
      answer,
      confidence: answer.confidence
    });
    
    return answer;
  }
  
  private async requestHelp(context: any): Promise<any> {
    // 向技术支持智能体请求帮助
    const response = await this.sendMessage('technical-support-agent', {
      type: 'help_request',
      context,
      sender: this.id
    });
    
    return response;
  }
}

协作模式

常见协作模式

🔄 流水线模式

// 顺序处理任务
class Pipeline {
  async execute(input: any) {
    let result = input;
    
    // Agent1: 数据预处理
    result = await this.preprocessAgent.process(result);
    
    // Agent2: 主要处理
    result = await this.mainAgent.process(result);
    
    // Agent3: 后处理
    result = await this.postprocessAgent.process(result);
    
    return result;
  }
}

🌟 星型模式

// 中心协调模式
class StarTopology {
  async execute(task: Task) {
    // 中心智能体分发任务
    const subtasks = this.centralAgent.distribute(task);
    
    // 并行执行
    const results = await Promise.all(
      subtasks.map(st => 
        this.workerAgents.get(st.type).execute(st)
      )
    );
    
    // 中心智能体聚合
    return this.centralAgent.aggregate(results);
  }
}

🕸️ 网状模式

// 对等协作
class MeshTopology {
  async execute(task: Task) {
    // 所有智能体都可以通信
    const agents = this.getAllAgents();
    
    // 自组织协作
    for (const agent of agents) {
      agent.startCollaboration(task);
    }
    
    // 等待共识
    return this.waitForConsensus();
  }
}

🏛️ 层级模式

// 分层管理
class HierarchicalTopology {
  async execute(task: Task) {
    // 管理层决策
    const strategy = await this.managerAgent.plan(task);
    
    // 主管分配
    const assignments = await this.supervisorAgents.assign(strategy);
    
    // 工作层执行
    return this.workerAgents.execute(assignments);
  }
}

性能优化

系统优化策略

⚡ 并发控制

class ConcurrencyController {
  private semaphore: Semaphore;
  private taskQueue: PriorityQueue<Task>;
  
  constructor(maxConcurrency: number) {
    this.semaphore = new Semaphore(maxConcurrency);
    this.taskQueue = new PriorityQueue();
  }
  
  async executeWithLimit(tasks: Task[]): Promise<Result[]> {
    const results = [];
    
    // 使用信号量控制并发
    const promises = tasks.map(async (task) => {
      await this.semaphore.acquire();
      
      try {
        const result = await this.executeTask(task);
        results.push(result);
      } finally {
        this.semaphore.release();
      }
    });
    
    await Promise.all(promises);
    return results;
  }
}

💾 资源管理

内存优化

  • • 共享内存池
  • • 定期清理
  • • 智能缓存

计算优化

  • • 负载均衡
  • • 任务批处理
  • • 结果复用

调试与监控

多智能体调试工具

// 调试监控系统
class MultiAgentDebugger {
  private tracer: DistributedTracer;
  private visualizer: AgentVisualizer;
  private profiler: PerformanceProfiler;
  
  // 追踪消息流
  traceMessageFlow(sessionId: string) {
    return this.tracer.getMessageFlow(sessionId);
  }
  
  // 可视化智能体状态
  visualizeAgentStates() {
    const states = this.collectAgentStates();
    return this.visualizer.render(states);
  }
  
  // 性能分析
  profileExecution(taskId: string) {
    return {
      timeline: this.profiler.getTimeline(taskId),
      bottlenecks: this.profiler.findBottlenecks(taskId),
      resourceUsage: this.profiler.getResourceUsage(taskId),
      recommendations: this.generateOptimizationTips()
    };
  }
  
  // 实时监控面板
  getDashboard() {
    return {
      activeAgents: this.getActiveAgentCount(),
      messageRate: this.getMessageRate(),
      taskQueue: this.getQueueDepth(),
      errorRate: this.getErrorRate(),
      avgResponseTime: this.getAverageResponseTime()
    };
  }
}

构建智能协作系统

掌握多智能体技术,让AI发挥集体智慧,解决复杂业务挑战。

开始构建