8.4. 多智能体协作中的上下文共享与隔离实战
多智能体协作中的上下文共享与隔离实战
你需要什么
- Python 3.10+
- Redis 服务(本地或 Docker)
- 安装依赖:
pip install langgraph langchain-openai redis python-dotenv - 预计时间:60 分钟
最终成果
你将得到一个可运行的“研究团队”多智能体系统,其中 研究员、分析员 和 工程师 三个角色通过 共享记忆板 交换关键发现,同时各自保持 私有工作记忆 的隔离,并在出现矛盾结论时自动触发冲突解决,达成共识。
这个系统的核心意义在于:它把上一章为单个编码智能体设计的记忆机制,提升为一个 可治理的多智能体记忆中间层——让团队既能高效协作,又不会泄露或混淆各自的机密上下文。
1. 环境准备与基础状态图
1.1 启动 Redis
共享记忆板依赖 Redis Pub/Sub。如果你用 Docker:
docker run -d --name redis-memory -p 6379:6379 redis:7-alpine
验证连接:用 redis-cli ping 返回 PONG。
1.2 定义多智能体状态图骨架
我们以 LangGraph 的 StateGraph 为基础。共享上下文统一存放在 TeamState 里,但私有记忆由即将设计的 MemoryMiddleware 单独管理,避免污染全局状态。
# graph_skeleton.py
from typing import TypedDict, List
from langgraph.graph import StateGraph, END
class TeamState(TypedDict):
task: str # 团队当前任务
shared_broadcasts: List[str] # 从共享记忆板收到的广播消息
def build_graph():
builder = StateGraph(TeamState)
# 节点将在后续步骤中逐步注册
builder.add_node("researcher", researcher_node)
builder.add_node("analyst", analyst_node)
builder.add_node("engineer", engineer_node)
builder.set_entry_point("researcher")
builder.add_edge("researcher", "analyst")
builder.add_edge("analyst", "engineer")
builder.add_edge("engineer", END)
return builder.compile()
注意:此处
shared_broadcasts用列表收集来自 Redis 的消息,演示简洁。生产环境中建议使用add_messagesreducer 或专门的消息队列状态键,避免状态膨胀。
2. 团队共享记忆板设计
2.1 MemoryMiddleware 的整体结构
我们将“记忆中间层”封装成一个单例类,统一管理 Redis 共享通道和私有工作记忆的隔离。
# memory_middleware.py
import json
import redis
from typing import Dict, Any, Optional
class MemoryMiddleware:
_instance = None
def __new__(cls, redis_host="localhost", redis_port=6379):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance.redis_client = redis.Redis(
host=redis_host, port=redis_port, decode_responses=True
)
# 私有工作记忆:每个 agent_id 对应一个 dict
cls._instance._private_store: Dict[str, Dict[str, Any]] = {}
# 共享记忆数据库(简化,直接存 Redis List,实际可换持久化存储)
cls._instance.shared_key = "team:shared:memory"
return cls._instance
def publish_finding(self, agent_id: str, finding: dict):
"""广播关键发现到共享记忆板,同时写入共享记忆数据库"""
payload = json.dumps({"agent": agent_id, "finding": finding})
self.redis_client.publish("team:broadcast", payload)
# 记录到共享记忆数据库(追加)
self.redis_client.rpush(self.shared_key, payload)
def read_broadcasts(self, timeout=0.5) -> list:
"""从 Pub/Sub 读取最新的广播消息(非阻塞)"""
pubsub = self.redis_client.pubsub()
pubsub.subscribe("team:broadcast")
messages = []
while True:
msg = pubsub.get_message(timeout=timeout)
if msg is None:
break
if msg["type"] == "message":
messages.append(json.loads(msg["data"]))
pubsub.unsubscribe("team:broadcast")
return messages
def get_shared_history(self, limit=20):
"""获取共享记忆数据库中的历史记录"""
return [json.loads(entry) for entry in
self.redis_client.lrange(self.shared_key, -limit, -1)]
# 私有记忆相关方法将在步骤 3 展开
预期结果:运行 publish_finding 后,同一时刻订阅 team:broadcast 的智能体能够收到广播。
2.2 将共享记忆接入 LangGraph 节点
以研究员节点为例:
# nodes.py
from memory_middleware import MemoryMiddleware
memory = MemoryMiddleware() # 单例
def researcher_node(state: TeamState):
agent_id = "researcher"
# 业务逻辑:研究员分析任务并得到某个发现
finding = {"fact": "需求A的优先级应为P0", "confidence": 0.9}
# 将关键发现广播给整个团队
memory.publish_finding(agent_id, finding)
# 将收到的广播更新到状态(以便后续节点知晓)
broadcasts = memory.read_broadcasts()
state["shared_broadcasts"].extend(
[b["finding"]["fact"] for b in broadcasts if b["agent"] != agent_id]
)
return state
踩坑经验:不要循环等待广播
read_broadcasts使用非阻塞get_message并设定超时,避免节点卡死。
此外,每个节点只应消费当前时刻的新消息,不要重复处理历史记录,否则会导致状态无限膨胀。如果希望跨轮次记住共享记忆,请使用get_shared_history有节制地加载历史。
3. 私有工作记忆的隔离
3.1 私有存储的实现
每个智能体执行任务时,可能需要保存临时假设、中间计算结果或未确认的数据,这些绝不应出现在共享记忆板中。
在 MemoryMiddleware 中补充私有记忆方法:
def set_private(self, agent_id: str, key: str, value: Any):
if agent_id not in self._private_store:
self._private_store[agent_id] = {}
self._private_store[agent_id][key] = value
def get_private(self, agent_id: str, key: str) -> Optional[Any]:
return self._private_store.get(agent_id, {}).get(key)
内部使用 _private_store 字典隔离,键为 agent_id。因为 LangGraph 节点可能并发执行(图中分支),这里用简单的字典(单线程)已够用;若真有多线程请改为 threading.local() 或独立 Redis 命名空间。
3.2 在节点中使用私有记忆
分析员节点需要保存“初步风险评估”,但不允许其他智能体读取:
def analyst_node(state: TeamState):
agent_id = "analyst"
risk_assessment = "需求A变更可能影响模块B,风险等级3"
# 存入私有工作记忆
memory.set_private(agent_id, "risk", risk_assessment)
# 稍候工程节点自己也无法读取这里的 risk,隔离性得到保证
# ... 正常业务并广播结论
return state
验证隔离性:在工程师节点中尝试 memory.get_private("analyst", "risk"),会返回 None,证明私有数据不泄漏。
踩坑经验:避免将私有记忆混入全局状态
LangGraph 的TeamState会自动在所有节点间传递。如果误将risk_assessment塞进state的某个字段,就等于全团队可见。因此,任何不希望共享的数据必须严格使用set_private,并在代码审查中检查state键是否有意外字段。
4. 冲突解决与合并共识
4.1 检测矛盾
当两个智能体对同一事实给出矛盾结论时,我们需要一套仲裁机制。这里设计一个简易的“事实冲突检测器”,利用共享记忆数据库中的历史记录。
假设研究员广播:"需求A优先级为P0",分析员广播:"需求A优先级为P1"。两者的核心事实(事实ID)相同,但值不同。
我们为广播消息引入 fact_id:
finding = {"fact_id": "reqA_priority", "value": "P0", "agent": "researcher"}
在 MemoryMiddleware 中增加冲突检测方法:
def detect_conflicts(self) -> list:
"""扫描共享记忆数据库,找出同一 fact_id 的不同值"""
entries = self.get_shared_history(limit=50)
fact_map: Dict[str, Dict[str, Any]] = {}
conflicts = []
for entry in entries:
finding = entry["finding"]
fid = finding.get("fact_id")
if not fid:
continue
val = finding.get("value")
if fid not in fact_map:
fact_map[fid] = {"value": val, "agent": entry["agent"]}
elif fact_map[fid]["value"] != val:
conflicts.append({
"fact_id": fid,
"values": [fact_map[fid]["value"], val],
"agents": [fact_map[fid]["agent"], entry["agent"]]
})
return conflicts
4.2 仲裁与共识达成
冲突发生后,我们可以在一个专门的“共识节点”中解决,或由监督者智能体处理。这里采用基于置信度和投票的策略:
def resolve_conflicts(conflicts):
for conflict in conflicts:
# 从共享记忆数据库中检索相关的所有说法及其置信度
# 实际系统可能会回查原始分析记录;这里简化为信任更高的置信度
max_confidence = 0
chosen_value = None
# 模拟:遍历所有相关条目
entries = memory.get_shared_history()
for e in entries:
f = e["finding"]
if f.get("fact_id") == conflict["fact_id"]:
conf = f.get("confidence", 0.5) # 默认置信度
if conf > max_confidence:
max_confidence = conf
chosen_value = f["value"]
# 广播最终共识
consensus = {"fact_id": conflict["fact_id"], "value": chosen_value, "status": "resolved"}
memory.publish_finding("arbitrator", consensus)
print(f"冲突 {conflict['fact_id']} 已解决,采用 {chosen_value} (置信度 {max_confidence})")
然后将仲裁调用放在图的一个节点中,或作为某个节点的步骤。
4.3 完整工作流示例
将上述组件串联起来,形成一次完整协作:
# workflow.py
from graph_skeleton import build_graph
from nodes import researcher_node, analyst_node, engineer_node
from memory_middleware import MemoryMiddleware
memory = MemoryMiddleware()
graph = build_graph()
initial_state = {"task": "评估需求A对系统的整体影响", "shared_broadcasts": []}
final_state = graph.invoke(initial_state)
# 检查冲突并解决
conflicts = memory.detect_conflicts()
resolve_conflicts(conflicts)
预期结果:控制台将输出各智能体的发现广播,并在最终打印出冲突解决信息。shared_broadcasts 列表会累积研究员、分析员、工程师的共享发现(除自身广播外)。
5. 回顾与行动清单
我们做了什么
- 基于 LangGraph 构建了多智能体工作流,并通过
MemoryMiddleware实现了记忆中间层。 - 设计了 Redis Pub/Sub 的共享记忆板,让团队成员能实时交换关键发现,同时将记录持久化到共享记忆数据库。
- 通过私有字典实现了智能体的私有工作记忆隔离,避免了数据泄漏。
- 引入事实冲突检测和基于置信度的仲裁机制,在矛盾时自动达成共识。
花了多久
- 环境准备 10 分钟,核心逻辑编写 40 分钟,调试与验证 10 分钟。
下一步你可以做的 5 件事
- 将共享记忆数据库从 Redis List 迁移到可查询的向量存储,支持语义检索历史发现。
- 为私有记忆添加 TTL(过期时间),防止长期运行的智能体发生内存泄漏。
- 引入“人类在循环中”审批节点,让冲突仲裁不确定时由人工决定。
- 结合上一章的编码智能体,把研究员/分析员/工程师记忆块作为中间层给编码智能体提供上下文。
- 进入下一章,对这套记忆系统进行极限测试。
下一章预告:《压力测试:让智能体在 1000 轮对话后仍保持精准》
你刚刚实现的多智能体记忆中间层,在真实世界中可能被高频率、长周期的交互考验。下一章将设计并运行一套极端测试:模拟上千轮对话,故意制造记忆膨胀、频繁冲突和错误广播,验证共享板、私有记忆与冲突解决机制是否依然稳健。你还会学到如何用监控指标量化记忆系统的退化,从而有信心将这套方案推向生产。
上下文治理:AI Agent 系统设计
关于 LearnKu