ARS 第一版草稿设计(III)Agent Orchestrator(Agent 编排器)
在前两篇中,我们已经完成了 ChatSession / Message / SSE 等基础设施的设计与实现, 至此,一个“可持久化、可回放、可续传”的对话系统已经成型。
这一篇开始进入 ARS 的核心能力:
Agent Orchestrator(Agent 编排器)
它负责把「一次用户输入」真正变成「一次可控、可中断、可扩展的 Agent 执行过程」。
一、设计目标
在进入具体模块前,先明确这一层的设计目标:
- 一次 user.prompt ≠ 一次简单模型调用
- Agent 执行应当具备:
- ✅ 可异步执行(断开连接也能继续)
- ✅ 可中断(Stop / Cancel)
- ✅ 可审计(所有行为都是事件)
- ✅ 可扩展(Tool / SubAgent / Policy)
- 不将“模型调用”写死在 Controller 或 Job 中
因此我们引入 Agent Orchestrator,作为一次 Agent Run 的调度与编排中枢。
二、核心模块说明
| 模块 | 职责 |
|---|---|
| AgentProvider | 模型适配层(OpenAI / GLM / Kimi …) |
| AgentRunJob | 一次 Agent Run 的队列执行入口,负责异常兜底与状态落库 |
| RunLoop | Run 内部 step 的逻辑编排 |
| ContextBuilder | 从 ContextStore 构建模型上下文 |
| RunController | HTTP/API 入口 |
| RunService | 业务触发门面 |
| RunDispatcher | 调度器(并发控制、幂等、投递 Job) |
| OutputSink | 将 Agent 输出追加为 message 事件 |
| CancelChecker | 协作式取消检测器 |
三、整体结构(静态视角)
flowchart TD
%% Entry Layer
User -->|user.prompt| RunController
RunController --> RunService
RunService --> RunDispatcher
%% Scheduling Layer
RunDispatcher -->|dispatch| AgentRunJob
%% Execution Layer
subgraph AgentRunJob["AgentRunJob(一次 Run 执行)"]
CancelChecker
ContextBuilder
RunLoop
OutputSink
CancelChecker --> RunLoop
RunLoop --> ContextBuilder
RunLoop --> AgentProvider
AgentProvider --> OutputSink
end
%% Providers
subgraph AgentProvider["AgentProvider(模型适配层)"]
OpenAIProvider
GLMProvider
KimiProvider
ChatGPTProvider
end
结构说明
- Controller / Service / Dispatcher 不属于 RunLoop 它们负责的是「是否启动一次 Run」
- AgentRunJob 是执行容器
- RunLoop 只关心「当前 Run 的下一步该做什么」
- ContextBuilder / Provider / Sink 是 Run 内部组件
- 当前 MVP 中:
- messages 表即为 ContextStore(事件日志实现)
四、Agent Run 的完整执行过程(动态视角)
下面用一张 sequenceDiagram 描述“一次 user.prompt 如何变成 Agent 输出”。
sequenceDiagram
participant User
participant API as RunController
participant Dispatcher as RunDispatcher
participant Job as AgentRunJob
participant Loops as RunLoop
participant Context as ContextBuilder
participant Provider as AgentProvider
participant Sink as OutputSink
participant Store as ContextStore(messages)
User->>API: POST user.prompt
API->>Store: append user.prompt
API->>Dispatcher: dispatch(session_id, trigger_message)
Dispatcher->>Store: append run.status=RUNNING
Dispatcher->>Job: enqueue AgentRunJob
Job->>Loops: execute context
Loops->>Context: build context
Context->>Store: load messages
Loops->>Provider: generate(context)
Provider-->>Loops: output (stream / once)
Loops->>Sink: append agent.message
Sink->>Store: append message event
Loops->>Store: append run.status=DONE
关键特性说明
- 所有状态变化都是 message 事件
- Run 可随时被 CancelChecker 中断
- SSE 只负责“流出事件”,不参与状态判断
- 断线后可通过 seq 补洞恢复状态
五、RunLoop 的职责边界(非常重要)
RunLoop 只做一件事:
在一次 Run 中,根据当前上下文,决定「下一步动作」
- 它不应该:
- 创建 run_id
- 决定是否允许启动
- 投递 Job
- 管理并发
- 它应该:
- 检查是否 Cancel
- 构建上下文
- 调用 AgentProvider / Tool / SubAgent
- 判断是否结束 Run
这一点对于未来扩展 Tool / SubAgent 至关重要。
六、关于取消(Cancel)的思考
取消是协作式的:
Cancel 请求本身是一个 run.cancel.request 事件
CancelChecker 在以下节点检查:
- 每个 step 开始
- 流式输出过程中
- Tool 调用前后
一旦检测到取消:
- 停止后续编排
- 追加 run.status=CANCELED
七、最小实现的功能
在第一版最小实现中,仅实现:
- 单次 Agent Run
- 单一 AgentProvider
- 无 Tool / SubAgent
- 非复杂上下文压缩
但 架构本身已为后续扩展预留空间:
- ToolRouter / ToolRunner
- PolicyEngine
- ContextCompressor
- SubAgent(嵌套 Run)
到此为止,ARS 的基础形态已经完整:
- ChatSession:交互与持久化
- ContextStore:事件真相源
- Agent Orchestrator:可控的智能执行
它在实质意义上不是一个“调用模型的 Demo”, 而是一个 真正可以长期演进的 Agent 系统骨架。
Tags: