8.4. 多智能体协作中的上下文共享与隔离实战

多智能体协作中的上下文共享与隔离实战

你需要什么

  • Python 3.10+
  • Redis 服务(本地或 Docker)
  • 安装依赖:pip install langgraph langchain-openai redis python-dotenv
  • 预计时间:60 分钟

最终成果
你将得到一个可运行的“研究团队”多智能体系统,其中 研究员分析员工程师 三个角色通过 共享记忆板 交换关键发现,同时各自保持 私有工作记忆 的隔离,并在出现矛盾结论时自动触发冲突解决,达成共识。
这个系统的核心意义在于:它把上一章为单个编码智能体设计的记忆机制,提升为一个 可治理的多智能体记忆中间层——让团队既能高效协作,又不会泄露或混淆各自的机密上下文。


1. 环境准备与基础状态图

1.1 启动 Redis

共享记忆板依赖 Redis Pub/Sub。如果你用 Docker:

docker run -d --name redis-memory -p 6379:6379 redis:7-alpine

验证连接:用 redis-cli ping 返回 PONG

1.2 定义多智能体状态图骨架

我们以 LangGraph 的 StateGraph 为基础。共享上下文统一存放在 TeamState 里,但私有记忆由即将设计的 MemoryMiddleware 单独管理,避免污染全局状态。

# graph_skeleton.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, END

class TeamState(TypedDict):
    task: str               # 团队当前任务
    shared_broadcasts: List[str]  # 从共享记忆板收到的广播消息

def build_graph():
    builder = StateGraph(TeamState)
    # 节点将在后续步骤中逐步注册
    builder.add_node("researcher", researcher_node)
    builder.add_node("analyst", analyst_node)
    builder.add_node("engineer", engineer_node)
    builder.set_entry_point("researcher")
    builder.add_edge("researcher", "analyst")
    builder.add_edge("analyst", "engineer")
    builder.add_edge("engineer", END)
    return builder.compile()

注意:此处 shared_broadcasts 用列表收集来自 Redis 的消息,演示简洁。生产环境中建议使用 add_messages reducer 或专门的消息队列状态键,避免状态膨胀。


2. 团队共享记忆板设计

2.1 MemoryMiddleware 的整体结构

我们将“记忆中间层”封装成一个单例类,统一管理 Redis 共享通道和私有工作记忆的隔离。

# memory_middleware.py
import json
import redis
from typing import Dict, Any, Optional

class MemoryMiddleware:
    _instance = None

    def __new__(cls, redis_host="localhost", redis_port=6379):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance.redis_client = redis.Redis(
                host=redis_host, port=redis_port, decode_responses=True
            )
            # 私有工作记忆:每个 agent_id 对应一个 dict
            cls._instance._private_store: Dict[str, Dict[str, Any]] = {}
            # 共享记忆数据库(简化,直接存 Redis List,实际可换持久化存储)
            cls._instance.shared_key = "team:shared:memory"
        return cls._instance

    def publish_finding(self, agent_id: str, finding: dict):
        """广播关键发现到共享记忆板,同时写入共享记忆数据库"""
        payload = json.dumps({"agent": agent_id, "finding": finding})
        self.redis_client.publish("team:broadcast", payload)
        # 记录到共享记忆数据库(追加)
        self.redis_client.rpush(self.shared_key, payload)

    def read_broadcasts(self, timeout=0.5) -> list:
        """从 Pub/Sub 读取最新的广播消息(非阻塞)"""
        pubsub = self.redis_client.pubsub()
        pubsub.subscribe("team:broadcast")
        messages = []
        while True:
            msg = pubsub.get_message(timeout=timeout)
            if msg is None:
                break
            if msg["type"] == "message":
                messages.append(json.loads(msg["data"]))
        pubsub.unsubscribe("team:broadcast")
        return messages

    def get_shared_history(self, limit=20):
        """获取共享记忆数据库中的历史记录"""
        return [json.loads(entry) for entry in
                self.redis_client.lrange(self.shared_key, -limit, -1)]

    # 私有记忆相关方法将在步骤 3 展开

预期结果:运行 publish_finding 后,同一时刻订阅 team:broadcast 的智能体能够收到广播。

2.2 将共享记忆接入 LangGraph 节点

以研究员节点为例:

# nodes.py
from memory_middleware import MemoryMiddleware

memory = MemoryMiddleware()  # 单例

def researcher_node(state: TeamState):
    agent_id = "researcher"
    # 业务逻辑:研究员分析任务并得到某个发现
    finding = {"fact": "需求A的优先级应为P0", "confidence": 0.9}

    # 将关键发现广播给整个团队
    memory.publish_finding(agent_id, finding)

    # 将收到的广播更新到状态(以便后续节点知晓)
    broadcasts = memory.read_broadcasts()
    state["shared_broadcasts"].extend(
        [b["finding"]["fact"] for b in broadcasts if b["agent"] != agent_id]
    )
    return state

踩坑经验:不要循环等待广播
read_broadcasts 使用非阻塞 get_message 并设定超时,避免节点卡死。
此外,每个节点只应消费当前时刻的新消息,不要重复处理历史记录,否则会导致状态无限膨胀。如果希望跨轮次记住共享记忆,请使用 get_shared_history 有节制地加载历史。


3. 私有工作记忆的隔离

3.1 私有存储的实现

每个智能体执行任务时,可能需要保存临时假设、中间计算结果或未确认的数据,这些绝不应出现在共享记忆板中。
MemoryMiddleware 中补充私有记忆方法:

def set_private(self, agent_id: str, key: str, value: Any):
    if agent_id not in self._private_store:
        self._private_store[agent_id] = {}
    self._private_store[agent_id][key] = value

def get_private(self, agent_id: str, key: str) -> Optional[Any]:
    return self._private_store.get(agent_id, {}).get(key)

内部使用 _private_store 字典隔离,键为 agent_id。因为 LangGraph 节点可能并发执行(图中分支),这里用简单的字典(单线程)已够用;若真有多线程请改为 threading.local() 或独立 Redis 命名空间。

3.2 在节点中使用私有记忆

分析员节点需要保存“初步风险评估”,但不允许其他智能体读取:

def analyst_node(state: TeamState):
    agent_id = "analyst"
    risk_assessment = "需求A变更可能影响模块B,风险等级3"
    # 存入私有工作记忆
    memory.set_private(agent_id, "risk", risk_assessment)

    # 稍候工程节点自己也无法读取这里的 risk,隔离性得到保证
    # ... 正常业务并广播结论
    return state

验证隔离性:在工程师节点中尝试 memory.get_private("analyst", "risk"),会返回 None,证明私有数据不泄漏。

踩坑经验:避免将私有记忆混入全局状态
LangGraph 的 TeamState 会自动在所有节点间传递。如果误将 risk_assessment 塞进 state 的某个字段,就等于全团队可见。因此,任何不希望共享的数据必须严格使用 set_private,并在代码审查中检查 state 键是否有意外字段。


4. 冲突解决与合并共识

4.1 检测矛盾

当两个智能体对同一事实给出矛盾结论时,我们需要一套仲裁机制。这里设计一个简易的“事实冲突检测器”,利用共享记忆数据库中的历史记录。

假设研究员广播:"需求A优先级为P0",分析员广播:"需求A优先级为P1"。两者的核心事实(事实ID)相同,但值不同。

我们为广播消息引入 fact_id

finding = {"fact_id": "reqA_priority", "value": "P0", "agent": "researcher"}

MemoryMiddleware 中增加冲突检测方法:

def detect_conflicts(self) -> list:
    """扫描共享记忆数据库,找出同一 fact_id 的不同值"""
    entries = self.get_shared_history(limit=50)
    fact_map: Dict[str, Dict[str, Any]] = {}
    conflicts = []
    for entry in entries:
        finding = entry["finding"]
        fid = finding.get("fact_id")
        if not fid:
            continue
        val = finding.get("value")
        if fid not in fact_map:
            fact_map[fid] = {"value": val, "agent": entry["agent"]}
        elif fact_map[fid]["value"] != val:
            conflicts.append({
                "fact_id": fid,
                "values": [fact_map[fid]["value"], val],
                "agents": [fact_map[fid]["agent"], entry["agent"]]
            })
    return conflicts

4.2 仲裁与共识达成

冲突发生后,我们可以在一个专门的“共识节点”中解决,或由监督者智能体处理。这里采用基于置信度和投票的策略:

def resolve_conflicts(conflicts):
    for conflict in conflicts:
        # 从共享记忆数据库中检索相关的所有说法及其置信度
        # 实际系统可能会回查原始分析记录;这里简化为信任更高的置信度
        max_confidence = 0
        chosen_value = None
        # 模拟:遍历所有相关条目
        entries = memory.get_shared_history()
        for e in entries:
            f = e["finding"]
            if f.get("fact_id") == conflict["fact_id"]:
                conf = f.get("confidence", 0.5)  # 默认置信度
                if conf > max_confidence:
                    max_confidence = conf
                    chosen_value = f["value"]
        # 广播最终共识
        consensus = {"fact_id": conflict["fact_id"], "value": chosen_value, "status": "resolved"}
        memory.publish_finding("arbitrator", consensus)
        print(f"冲突 {conflict['fact_id']} 已解决,采用 {chosen_value} (置信度 {max_confidence})")

然后将仲裁调用放在图的一个节点中,或作为某个节点的步骤。

4.3 完整工作流示例

将上述组件串联起来,形成一次完整协作:

# workflow.py
from graph_skeleton import build_graph
from nodes import researcher_node, analyst_node, engineer_node
from memory_middleware import MemoryMiddleware

memory = MemoryMiddleware()

graph = build_graph()
initial_state = {"task": "评估需求A对系统的整体影响", "shared_broadcasts": []}
final_state = graph.invoke(initial_state)

# 检查冲突并解决
conflicts = memory.detect_conflicts()
resolve_conflicts(conflicts)

预期结果:控制台将输出各智能体的发现广播,并在最终打印出冲突解决信息。shared_broadcasts 列表会累积研究员、分析员、工程师的共享发现(除自身广播外)。


5. 回顾与行动清单

我们做了什么

  • 基于 LangGraph 构建了多智能体工作流,并通过 MemoryMiddleware 实现了记忆中间层。
  • 设计了 Redis Pub/Sub 的共享记忆板,让团队成员能实时交换关键发现,同时将记录持久化到共享记忆数据库。
  • 通过私有字典实现了智能体的私有工作记忆隔离,避免了数据泄漏。
  • 引入事实冲突检测和基于置信度的仲裁机制,在矛盾时自动达成共识。

花了多久

  • 环境准备 10 分钟,核心逻辑编写 40 分钟,调试与验证 10 分钟。

下一步你可以做的 5 件事

  1. 将共享记忆数据库从 Redis List 迁移到可查询的向量存储,支持语义检索历史发现。
  2. 为私有记忆添加 TTL(过期时间),防止长期运行的智能体发生内存泄漏。
  3. 引入“人类在循环中”审批节点,让冲突仲裁不确定时由人工决定。
  4. 结合上一章的编码智能体,把研究员/分析员/工程师记忆块作为中间层给编码智能体提供上下文。
  5. 进入下一章,对这套记忆系统进行极限测试。

下一章预告:《压力测试:让智能体在 1000 轮对话后仍保持精准》
你刚刚实现的多智能体记忆中间层,在真实世界中可能被高频率、长周期的交互考验。下一章将设计并运行一套极端测试:模拟上千轮对话,故意制造记忆膨胀、频繁冲突和错误广播,验证共享板、私有记忆与冲突解决机制是否依然稳健。你还会学到如何用监控指标量化记忆系统的退化,从而有信心将这套方案推向生产。

本文章首发在 LearnKu.com 网站上。

上一篇 下一篇
讨论数量: 0
发起讨论 只看当前版本


暂无话题~