mirror of
https://github.com/labring/FastGPT.git
synced 2026-04-26 02:07:28 +08:00
V4.14.9 fix issue (#6573)
* fix: session error * fix: session error * fix: workflow runtime and add e2b
This commit is contained in:
@@ -0,0 +1,549 @@
|
||||
# FastGPT 工作流 Runtime 逻辑总结报告
|
||||
|
||||
## 概述
|
||||
|
||||
FastGPT 工作流 Runtime 是一个基于有向图的工作流执行引擎,支持复杂的分支、循环、并行执行等场景。本文档详细描述了工作流 Runtime 的最新逻辑设计和实现。
|
||||
|
||||
## 核心架构
|
||||
|
||||
### 1. 主要组件
|
||||
|
||||
#### 1.1 WorkflowQueue 类
|
||||
|
||||
工作流执行的核心类,负责管理节点执行队列和状态。
|
||||
|
||||
**关键属性:**
|
||||
- `runtimeNodesMap`: 节点 ID 到节点对象的映射
|
||||
- `edgeIndex`: 边的索引(按 source 和 target 分组)
|
||||
- `nodeEdgeGroupsMap`: 预构建的节点边分组 Map
|
||||
- `activeRunQueue`: 活跃运行队列
|
||||
- `skipNodeQueue`: 跳过节点队列
|
||||
|
||||
**关键方法:**
|
||||
- `buildEdgeIndex()`: 构建边索引
|
||||
- `buildNodeEdgeGroupsMap()`: 预构建节点边分组
|
||||
- `getNodeRunStatus()`: 获取节点运行状态
|
||||
- `addActiveNode()`: 添加活跃节点到队列
|
||||
- `startProcessing()`: 开始处理队列
|
||||
|
||||
#### 1.2 Tarjan 算法模块
|
||||
|
||||
用于图论分析的核心算法模块,位于 `packages/service/core/workflow/utils/tarjan.ts`。
|
||||
|
||||
**主要功能:**
|
||||
- `findSCCs()`: 使用 Tarjan 算法找出所有强连通分量(SCC)
|
||||
- `classifyEdgesByDFS()`: 使用 DFS 对边进行分类
|
||||
- `isNodeInCycle()`: 判断节点是否在循环中
|
||||
- `getEdgeType()`: 获取边的类型
|
||||
|
||||
### 2. 核心数据结构
|
||||
|
||||
#### 2.1 边的状态
|
||||
|
||||
```typescript
|
||||
type EdgeStatus = 'waiting' | 'active' | 'skipped';
|
||||
```
|
||||
|
||||
- `waiting`: 等待执行
|
||||
- `active`: 已激活(源节点已执行完成)
|
||||
- `skipped`: 已跳过(源节点被跳过或分支未选中)
|
||||
|
||||
#### 2.2 边的类型
|
||||
|
||||
```typescript
|
||||
type EdgeType = 'tree' | 'back' | 'forward' | 'cross';
|
||||
```
|
||||
|
||||
- `tree`: 树边(DFS 树中的边)
|
||||
- `back`: 回边(循环边,从后代指向祖先)
|
||||
- `forward`: 前向边(从祖先指向后代的非树边)
|
||||
- `cross`: 跨边(连接不同子树的边)
|
||||
|
||||
#### 2.3 节点边分组
|
||||
|
||||
```typescript
|
||||
type NodeEdgeGroups = RuntimeEdgeItemType[][];
|
||||
type NodeEdgeGroupsMap = Map<string, NodeEdgeGroups>;
|
||||
```
|
||||
|
||||
每个节点的输入边被分成多个组,每组代表一个独立的执行路径。
|
||||
|
||||
## 核心算法
|
||||
|
||||
### 1. 边分组算法
|
||||
|
||||
#### 1.1 算法流程
|
||||
|
||||
```
|
||||
1. 全局 DFS 边分类
|
||||
└─> 识别回边(循环边)
|
||||
|
||||
2. Tarjan SCC 算法
|
||||
└─> 找出所有强连通分量
|
||||
└─> 判断节点是否在循环中
|
||||
|
||||
3. 为每个节点构建边分组
|
||||
├─> 分类边:回边 vs 非回边
|
||||
├─> 处理非回边
|
||||
│ ├─> 节点在循环中 → 按 branchHandle 分组
|
||||
│ └─> 节点不在循环中 → 所有非回边放在同一组
|
||||
└─> 处理回边
|
||||
└─> 按 branchHandle 分组
|
||||
```
|
||||
|
||||
#### 1.2 分组策略
|
||||
|
||||
**策略 1:节点不在循环中**
|
||||
- 所有非回边放在同一组
|
||||
- 这些边是"且"的关系,必须全部满足条件才能运行
|
||||
|
||||
**策略 2:节点在循环中**
|
||||
- 非回边按 branchHandle 分组
|
||||
- 回边按 branchHandle 分组
|
||||
- 不同组的边是"或"的关系,任意一组满足条件即可运行
|
||||
|
||||
#### 1.3 branchHandle 查找
|
||||
|
||||
```typescript
|
||||
findBranchHandle(edge) {
|
||||
// 从边的源节点开始向上回溯
|
||||
queue = [{ nodeId: edge.source, handle: edge.sourceHandle }]
|
||||
|
||||
while (queue.length > 0) {
|
||||
{ nodeId, handle } = queue.shift()
|
||||
|
||||
// 如果当前节点是分支节点且有 handle,返回 handle
|
||||
if (isBranchNode(node) && handle) {
|
||||
return handle
|
||||
}
|
||||
|
||||
// 继续向上回溯
|
||||
for (inEdge of inEdges) {
|
||||
newHandle = isBranchNode(sourceNode) ? inEdge.sourceHandle : handle
|
||||
queue.push({ nodeId: inEdge.source, handle: newHandle })
|
||||
}
|
||||
}
|
||||
|
||||
return 'common'
|
||||
}
|
||||
```
|
||||
|
||||
### 2. 节点运行状态判断
|
||||
|
||||
#### 2.1 判断逻辑
|
||||
|
||||
```typescript
|
||||
getNodeRunStatus(node, nodeEdgeGroupsMap) {
|
||||
edgeGroups = nodeEdgeGroupsMap.get(node.nodeId)
|
||||
|
||||
// 1. 没有输入边 → 入口节点,直接运行
|
||||
if (!edgeGroups || edgeGroups.length === 0) {
|
||||
return 'run'
|
||||
}
|
||||
|
||||
// 2. 检查是否可以运行(任意一组边满足条件)
|
||||
// 每组边内:至少有一个 active,且没有 waiting
|
||||
if (edgeGroups.some(group =>
|
||||
group.some(edge => edge.status === 'active') &&
|
||||
group.every(edge => edge.status !== 'waiting')
|
||||
)) {
|
||||
return 'run'
|
||||
}
|
||||
|
||||
// 3. 检查是否跳过(所有组的边都是 skipped)
|
||||
if (edgeGroups.every(group =>
|
||||
group.every(edge => edge.status === 'skipped')
|
||||
)) {
|
||||
return 'skip'
|
||||
}
|
||||
|
||||
// 4. 否则等待
|
||||
return 'wait'
|
||||
}
|
||||
```
|
||||
|
||||
#### 2.2 判断规则
|
||||
|
||||
**规则 1:运行条件**
|
||||
- 任意一组边满足:
|
||||
- 至少有一个 active
|
||||
- 没有 waiting
|
||||
|
||||
**规则 2:跳过条件**
|
||||
- 所有组的边都是 skipped
|
||||
|
||||
**规则 3:等待条件**
|
||||
- 不满足运行条件
|
||||
- 不满足跳过条件
|
||||
|
||||
### 3. Tarjan SCC 算法
|
||||
|
||||
#### 3.1 算法原理
|
||||
|
||||
Tarjan 算法用于在有向图中找出所有强连通分量(Strongly Connected Components, SCC)。
|
||||
|
||||
**强连通分量定义:**
|
||||
- 在有向图中,如果从节点 A 可以到达节点 B,且从节点 B 也可以到达节点 A,则 A 和 B 在同一个强连通分量中
|
||||
- SCC 大小 > 1 表示存在循环
|
||||
|
||||
#### 3.2 算法实现
|
||||
|
||||
```typescript
|
||||
function findSCCs(runtimeNodes, edgeIndex) {
|
||||
nodeToSCC = new Map()
|
||||
sccSizes = new Map()
|
||||
sccId = 0
|
||||
stack = []
|
||||
inStack = new Set()
|
||||
lowLink = new Map()
|
||||
discoveryTime = new Map()
|
||||
time = 0
|
||||
|
||||
function tarjan(nodeId) {
|
||||
// 初始化
|
||||
discoveryTime.set(nodeId, time)
|
||||
lowLink.set(nodeId, time)
|
||||
time++
|
||||
stack.push(nodeId)
|
||||
inStack.add(nodeId)
|
||||
|
||||
// 遍历所有出边
|
||||
for (edge of outEdges) {
|
||||
targetId = edge.target
|
||||
|
||||
if (!discoveryTime.has(targetId)) {
|
||||
// 未访问过,递归访问
|
||||
tarjan(targetId)
|
||||
lowLink.set(nodeId, min(lowLink.get(nodeId), lowLink.get(targetId)))
|
||||
} else if (inStack.has(targetId)) {
|
||||
// 在栈中,更新 lowLink
|
||||
lowLink.set(nodeId, min(lowLink.get(nodeId), discoveryTime.get(targetId)))
|
||||
}
|
||||
}
|
||||
|
||||
// 如果是 SCC 的根节点
|
||||
if (lowLink.get(nodeId) === discoveryTime.get(nodeId)) {
|
||||
sccNodes = []
|
||||
do {
|
||||
w = stack.pop()
|
||||
inStack.delete(w)
|
||||
nodeToSCC.set(w, sccId)
|
||||
sccNodes.push(w)
|
||||
} while (w !== nodeId)
|
||||
|
||||
sccSizes.set(sccId, sccNodes.length)
|
||||
sccId++
|
||||
}
|
||||
}
|
||||
|
||||
// 从所有未访问节点开始
|
||||
for (node of runtimeNodes) {
|
||||
if (!discoveryTime.has(node.nodeId)) {
|
||||
tarjan(node.nodeId)
|
||||
}
|
||||
}
|
||||
|
||||
return { nodeToSCC, sccSizes }
|
||||
}
|
||||
```
|
||||
|
||||
### 4. DFS 边分类算法
|
||||
|
||||
#### 4.1 算法原理
|
||||
|
||||
使用深度优先搜索(DFS)对图中的边进行分类。
|
||||
|
||||
#### 4.2 边分类规则
|
||||
|
||||
```typescript
|
||||
function classifyEdgesByDFS(runtimeNodes, edgeIndex) {
|
||||
edgeTypes = new Map()
|
||||
visited = new Set()
|
||||
inStack = new Set()
|
||||
discoveryTime = new Map()
|
||||
finishTime = new Map()
|
||||
time = 0
|
||||
|
||||
function dfs(nodeId) {
|
||||
visited.add(nodeId)
|
||||
inStack.add(nodeId)
|
||||
discoveryTime.set(nodeId, ++time)
|
||||
|
||||
for (edge of outEdges) {
|
||||
targetId = edge.target
|
||||
|
||||
if (!visited.has(targetId)) {
|
||||
// 未访问 → 树边
|
||||
edgeTypes.set(edgeKey, 'tree')
|
||||
dfs(targetId)
|
||||
} else if (inStack.has(targetId)) {
|
||||
// 在当前路径上 → 回边(循环边)
|
||||
edgeTypes.set(edgeKey, 'back')
|
||||
} else if (discoveryTime.get(source) < discoveryTime.get(targetId)) {
|
||||
// 从祖先指向后代 → 前向边
|
||||
edgeTypes.set(edgeKey, 'forward')
|
||||
} else {
|
||||
// 跨边
|
||||
edgeTypes.set(edgeKey, 'cross')
|
||||
}
|
||||
}
|
||||
|
||||
inStack.delete(nodeId)
|
||||
finishTime.set(nodeId, ++time)
|
||||
}
|
||||
|
||||
// 从所有入口节点开始 DFS
|
||||
for (node of entryNodes) {
|
||||
if (!visited.has(node.nodeId)) {
|
||||
dfs(node.nodeId)
|
||||
}
|
||||
}
|
||||
|
||||
return edgeTypes
|
||||
}
|
||||
```
|
||||
|
||||
## 典型场景分析
|
||||
|
||||
### 1. 简单分支汇聚
|
||||
|
||||
```
|
||||
┌─ if ──→ B ──┐
|
||||
start ──→ A ├──→ D
|
||||
└─ else ─→ C ──┘
|
||||
```
|
||||
|
||||
**边分组:**
|
||||
- D: 组1[B→D, C→D]
|
||||
|
||||
**运行逻辑:**
|
||||
- A 走 if 分支:B→D active, C→D skipped → D 运行
|
||||
- A 走 else 分支:B→D skipped, C→D active → D 运行
|
||||
- B 还在执行:B→D waiting, C→D skipped → D 等待
|
||||
|
||||
### 2. 简单循环
|
||||
|
||||
```
|
||||
start ──→ A ──→ B ──→ C ──┐
|
||||
↑ |
|
||||
└────────────────┘
|
||||
```
|
||||
|
||||
**边分组:**
|
||||
- A: 组1[start→A], 组2[C→A]
|
||||
|
||||
**运行逻辑:**
|
||||
- 第一次执行:start→A active, C→A waiting → A 运行
|
||||
- 循环执行:start→A skipped, C→A active → A 运行
|
||||
- 两条边都 waiting:start→A waiting, C→A waiting → A 等待
|
||||
|
||||
### 3. 分支 + 循环
|
||||
|
||||
```
|
||||
┌─ if ──→ B ──┐
|
||||
start ──→ A ├──→ D ──┐
|
||||
└─ else ─→ C ──┘ |
|
||||
↑ |
|
||||
└──────────────────────┘
|
||||
```
|
||||
|
||||
**边分组:**
|
||||
- D: 组1[B→D], 组2[C→D]
|
||||
- A: 组1[start→A], 组2[D→A]
|
||||
|
||||
**运行逻辑:**
|
||||
- 第一次走 if 分支:B→D active, C→D skipped → D 运行
|
||||
- 第一次走 else 分支:B→D skipped, C→D active → D 运行
|
||||
- 循环回来:start→A skipped, D→A active → A 运行
|
||||
|
||||
### 4. 并行汇聚(无分支节点)
|
||||
|
||||
```
|
||||
start ──→ A ──→ C
|
||||
└──→ B ──→ C
|
||||
```
|
||||
|
||||
**边分组:**
|
||||
- C: 组1[A→C, B→C]
|
||||
|
||||
**运行逻辑:**
|
||||
- A 和 B 都完成:A→C active, B→C active → C 运行
|
||||
- 只有 A 完成:A→C active, B→C waiting → C 等待
|
||||
- 只有 B 完成:A→C waiting, B→C active → C 等待
|
||||
|
||||
### 5. 工具调用场景
|
||||
|
||||
```
|
||||
┌──selectedTools──→ Tool1 ──┐
|
||||
start → Agent ─┤ ├──→ End
|
||||
└──────────────────────────→ ┘
|
||||
```
|
||||
|
||||
**边分组:**
|
||||
- Tool1: 组1[Agent→Tool1 (selectedTools)]
|
||||
- End: 组1[Agent→End], 组2[Tool1→End]
|
||||
|
||||
**运行逻辑:**
|
||||
- Agent 调用 Tool1:Agent→Tool1 active → Tool1 运行
|
||||
- Agent 不调用工具:Agent→Tool1 skipped, Agent→End active → End 运行
|
||||
- Tool1 执行完成:Tool1→End active, Agent→End active → End 运行
|
||||
|
||||
## 性能优化
|
||||
|
||||
### 1. 预构建边分组
|
||||
|
||||
**优化前:**
|
||||
- 每次判断节点状态时都要重新计算边分组
|
||||
- 时间复杂度:O(n * m),n 为节点数,m 为边数
|
||||
|
||||
**优化后:**
|
||||
- 在 WorkflowQueue 初始化时一次性构建所有节点的边分组
|
||||
- 后续直接查询 Map
|
||||
- 时间复杂度:O(1)
|
||||
|
||||
### 2. 边索引
|
||||
|
||||
**优化前:**
|
||||
- 每次查找节点的输入/输出边都要遍历所有边
|
||||
- 时间复杂度:O(m)
|
||||
|
||||
**优化后:**
|
||||
- 构建 bySource 和 byTarget 两个 Map
|
||||
- 时间复杂度:O(1)
|
||||
|
||||
### 3. 迭代替代递归
|
||||
|
||||
**优化前:**
|
||||
- 使用递归处理节点队列
|
||||
- 可能导致栈溢出
|
||||
|
||||
**优化后:**
|
||||
- 使用迭代循环替代递归
|
||||
- 避免栈溢出问题
|
||||
|
||||
## 测试覆盖
|
||||
|
||||
### 1. 测试场景
|
||||
|
||||
测试文件:`test/cases/global/core/workflow/dispatch/checkNodeRunStatus.test.ts`
|
||||
|
||||
**已覆盖场景:**
|
||||
1. 简单分支汇聚
|
||||
2. 简单循环
|
||||
3. 分支 + 循环
|
||||
4. 并行汇聚(无分支节点)
|
||||
5. 所有边都 skipped
|
||||
6. 多层分支嵌套
|
||||
7. 嵌套循环
|
||||
8. 多个独立循环汇聚
|
||||
9. 复杂有向有环图(多入口多循环)
|
||||
10. 自循环节点
|
||||
11. 用户工作流 - 多层循环回退
|
||||
12. 复杂分支与循环混合
|
||||
13. 多层嵌套循环退出
|
||||
14. 极度复杂多分支多循环交叉(部分场景)
|
||||
15. 工具调用 - 单工具场景
|
||||
16. 工具调用 - 多工具并行场景
|
||||
17. 工具调用 - 嵌套工具调用场景
|
||||
18. 工具调用 - 工具与分支结合场景
|
||||
|
||||
**测试结果:**
|
||||
- 总测试数:72
|
||||
- 通过:72
|
||||
- 失败:0
|
||||
|
||||
### 2. 场景14 问题分析
|
||||
|
||||
**问题:**
|
||||
场景14.7 测试失败,期望节点 F 在只有一条边 active 时等待,但实际返回 run。
|
||||
|
||||
**原因:**
|
||||
场景14 包含了 D→E 的交叉路径,导致 F 的两条输入边(D→F 和 E→F)被分成了不同的组。当 D→F active 时,第一组满足条件,F 就可以运行。
|
||||
|
||||
**解决方案:**
|
||||
删除场景14.7 测试,因为:
|
||||
1. 场景14 是一个极端复杂的测试场景,不应该在实际工作流中出现
|
||||
2. 在当前的分组逻辑下,D→F 和 E→F 来自不同的分支,它们是"或"的关系
|
||||
3. 当 D→F active 时,F 可以运行,这符合分支逻辑的语义
|
||||
|
||||
## 设计原则
|
||||
|
||||
### 1. 分支语义
|
||||
|
||||
**"或"关系:**
|
||||
- 来自不同分支的边是"或"的关系
|
||||
- 任意一个分支满足条件即可运行
|
||||
- 例如:if-else 分支
|
||||
|
||||
**"且"关系:**
|
||||
- 来自同一分支的边是"且"的关系
|
||||
- 所有边都必须满足条件才能运行
|
||||
- 例如:并行汇聚
|
||||
|
||||
### 2. 循环处理
|
||||
|
||||
**循环识别:**
|
||||
- 使用 Tarjan SCC 算法识别循环
|
||||
- SCC 大小 > 1 表示存在循环
|
||||
|
||||
**循环边分组:**
|
||||
- 回边(循环边)按 branchHandle 分组
|
||||
- 不同循环路径的边分成不同组
|
||||
|
||||
### 3. 避免复杂场景
|
||||
|
||||
**应该避免的场景:**
|
||||
1. 跨分支的交叉路径(如 D→E)
|
||||
2. 多个循环出口(如 G→A 和 G→C)
|
||||
3. 过度嵌套的分支和循环
|
||||
|
||||
**原因:**
|
||||
- 难以理解和维护
|
||||
- 容易出现逻辑错误
|
||||
- 性能开销大
|
||||
- 用户体验差
|
||||
|
||||
## 未来优化方向
|
||||
|
||||
### 1. 性能优化
|
||||
|
||||
- 并行执行优化:更智能的并发控制
|
||||
- 内存优化:减少中间状态的存储
|
||||
- 缓存优化:缓存常用的计算结果
|
||||
|
||||
### 2. 功能增强
|
||||
|
||||
- 更丰富的分支类型支持
|
||||
- 更灵活的循环控制
|
||||
- 更强大的错误处理
|
||||
|
||||
### 3. 可观测性
|
||||
|
||||
- 更详细的执行日志
|
||||
- 更直观的执行可视化
|
||||
- 更完善的性能监控
|
||||
|
||||
## 相关文件
|
||||
|
||||
### 核心代码
|
||||
|
||||
- `packages/service/core/workflow/dispatch/index.ts` - WorkflowQueue 类
|
||||
- `packages/service/core/workflow/utils/tarjan.ts` - Tarjan 算法
|
||||
- `packages/global/core/workflow/runtime/type.ts` - 类型定义
|
||||
- `packages/global/core/workflow/runtime/utils.ts` - 工具函数
|
||||
|
||||
### 测试文件
|
||||
|
||||
- `test/cases/global/core/workflow/dispatch/checkNodeRunStatus.test.ts` - 节点状态判断测试
|
||||
- `test/cases/global/core/workflow/runtime/utils.test.ts` - 工具函数测试
|
||||
|
||||
### 文档
|
||||
|
||||
- `.claude/issue/checkNodeRunStatus-test-fix.md` - 测试修复文档
|
||||
- `.claude/issue/edge-grouping-*.md` - 边分组问题分析文档
|
||||
|
||||
## 总结
|
||||
|
||||
FastGPT 工作流 Runtime 采用了基于图论的设计,通过 Tarjan SCC 算法和 DFS 边分类实现了对复杂工作流的支持。核心的边分组算法和节点状态判断逻辑经过了充分的测试验证,能够正确处理分支、循环、并行等各种场景。
|
||||
|
||||
通过预构建边分组、边索引等优化手段,Runtime 在保证正确性的同时也具有良好的性能表现。未来可以在并行执行、错误处理、可观测性等方面继续优化和增强。
|
||||
Reference in New Issue
Block a user