From 03dd9c00a83ee6557524f160462abcffceab1d71 Mon Sep 17 00:00:00 2001 From: Archer <545436317@qq.com> Date: Fri, 27 Mar 2026 17:06:36 +0800 Subject: [PATCH] perf: runtime performance (#6665) * perf: runtime performance * add stringify trace * remove trace val * remove trace val * remove logger * remove logger * add test * add log --- .../workflow/cpu-blocking-optimization.md | 336 ++++++++++++++++ .../workflow-thread-blocking-analysis.md | 160 ++++++++ .../docs/self-host/upgrading/4-14/41410.mdx | 3 + document/data/doc-last-modified.json | 2 +- packages/global/common/string/tools.ts | 31 +- packages/global/core/workflow/runtime/type.ts | 1 + .../global/core/workflow/runtime/utils.ts | 88 ++-- packages/service/core/app/http.ts | 2 +- .../service/core/workflow/dispatch/index.ts | 7 +- .../core/workflow/dispatch/tools/http468.ts | 21 +- .../core/workflow/dispatch/tools/runIfElse.ts | 10 +- .../workflow/dispatch/tools/runUpdateVar.ts | 23 +- .../src/pages/api/support/openapi/health.ts | 8 +- .../core/workflow/runtime/utils.test.ts | 380 +++++++++++++++--- .../core/workflow/dispatch/tools/http.test.ts | 2 +- 15 files changed, 965 insertions(+), 109 deletions(-) create mode 100644 .claude/design/core/workflow/cpu-blocking-optimization.md create mode 100644 .claude/issue/workflow-thread-blocking-analysis.md diff --git a/.claude/design/core/workflow/cpu-blocking-optimization.md b/.claude/design/core/workflow/cpu-blocking-optimization.md new file mode 100644 index 0000000000..8f1fe6dabd --- /dev/null +++ b/.claude/design/core/workflow/cpu-blocking-optimization.md @@ -0,0 +1,336 @@ +# 工作流 CPU 阻塞优化方案 + +> 基于 `.claude/issue/workflow-thread-blocking-analysis.md` 中的分析结论,给出逐项优化方案。 + +--- + +## 方案一:节点/输出 O(1) 索引(最高优先级)✅ + +### 问题 + +`replaceEditorVariable` 和 `getReferenceVariableValue` 每次调用都用 `nodes.find()` O(N) 线性扫描,在大型工作流(50节点 × 10输入 × 5引用)中产生 2500 次 O(N) 扫描,全部同步。 + +### 方案 + +**在 `WorkflowQueue` 构造时,一次性建立两级 Map 索引**,然后向下传递,替换所有 `nodes.find()`。 + +#### 1.1 新增 OutputIndex 类型 + +```ts +// packages/global/core/workflow/runtime/type.ts 新增 +export type NodeOutputIndex = Map>; +// key: nodeId → Map +``` + +#### 1.2 构造函数建立索引 + +```ts +// packages/service/core/workflow/dispatch/index.ts +constructor(...) { + this.runtimeNodesMap = new Map(data.runtimeNodes.map((item) => [item.nodeId, item])); + + // 新增:输出值索引,O(1) 查找 + this.nodeOutputIndex = new Map( + data.runtimeNodes.map((node) => [ + node.nodeId, + new Map(node.outputs.map((output) => [output.id, output])) + ]) + ); + + // 已有的边/图算法... +} +``` + +#### 1.3 修改两个工具函数签名,增加可选 Map 参数 + +```ts +// getReferenceVariableValue 增加 nodesMap 参数 +export const getReferenceVariableValue = ({ + value, + nodes, + nodesMap, // 新增:优先使用 Map,无则降级到 nodes.find() + variables +}: { + value?: ReferenceValueType; + nodes: RuntimeNodeItemType[]; + nodesMap?: Map; + variables: Record; +}) => { + // ... + const node = nodesMap + ? nodesMap.get(sourceNodeId) + : nodes.find((n) => n.nodeId === sourceNodeId); + // ... +}; + +// replaceEditorVariable 同理增加 nodesMap 参数 +export function replaceEditorVariable({ + text, nodes, nodesMap, variables, depth = 0 +}: { + // ... + nodesMap?: Map; +}) { + // nodes.find() 全部替换为 nodesMap?.get() ?? nodes.find() +} +``` + +#### 1.4 调用侧传入 Map + +```ts +// packages/service/core/workflow/dispatch/index.ts - getNodeRunParams +node.inputs.forEach((input) => { + let value = replaceEditorVariable({ + text: input.value, + nodes: this.data.runtimeNodes, + nodesMap: this.runtimeNodesMap, // 传入预建 Map + variables: this.data.variables + }); + value = getReferenceVariableValue({ + value, + nodes: this.data.runtimeNodes, + nodesMap: this.runtimeNodesMap, // 传入预建 Map + variables: this.data.variables + }); +}); +``` + +**预期效果**:每次 `nodes.find()` O(N) → O(1) Map 查找,高频节点运行场景效果最明显。 + +--- + +## 方案二:RegExp 编译缓存 ✅ + +### 问题 + +`replaceEditorVariable` 每次调用对每个变量引用执行 `new RegExp(escapedPattern)`,正则编译有 CPU 开销,且模式是 `nodeId.outputId` 的确定性字符串,完全可以缓存。 + +### 方案 + +**模块级 Map 缓存已编译的 RegExp**: + +```ts +// packages/global/core/workflow/runtime/utils.ts + +// 模块级缓存,跨调用复用 +const regexCache = new Map(); + +function getCachedRegex(pattern: string): RegExp { + let re = regexCache.get(pattern); + if (!re) { + re = new RegExp(pattern, 'g'); + // 防止缓存无限增长(工作流变量数量有限,但多租户场景下累积) + if (regexCache.size > 10000) regexCache.clear(); + regexCache.set(pattern, re); + } + return re; +} + +// 替换原有的 +// result = result.replace(new RegExp(pattern, 'g'), replacement); +// 改为: +result = result.replace(getCachedRegex(pattern), replacement); +``` + +注意:`RegExp` 带 `g` flag 使用时有 `lastIndex` 状态,每次调用前需要 reset: + +```ts +const re = getCachedRegex(pattern); +re.lastIndex = 0; // 重置,防止 g flag 状态残留 +result = result.replace(re, replacement); +``` + +**预期效果**:相同变量名(常见场景:同一个工作流内节点反复引用相同变量)完全跳过正则编译。 + +--- + +## 方案三:Tarjan / DFS 递归改迭代 + +### 问题 + +`findSCCs` 和 `classifyEdgesByDFS` 均使用递归 DFS,递归深度 = 工作流拓扑深度。节点数 100+ 时,同步递归阻塞事件循环;节点数 10000+ 时(极端场景)有栈溢出风险。 + +### 方案 + +**用显式栈替换递归**,保持算法语义不变: + +#### 3.1 迭代版 Tarjan + +```ts +export function findSCCs(runtimeNodes: RuntimeNodeItemType[], edgeIndex: EdgeIndex): SCCResult { + const nodeToSCC = new Map(); + const sccSizes = new Map(); + let sccId = 0; + + const stack: string[] = []; + const inStack = new Set(); + const lowLink = new Map(); + const discoveryTime = new Map(); + let time = 0; + + // 迭代版:使用显式调用栈 + // 每个栈帧记录 { nodeId, edgeIndex(当前处理到第几条出边) } + for (const startNode of runtimeNodes) { + if (discoveryTime.has(startNode.nodeId)) continue; + + const callStack: Array<{ nodeId: string; edgeIdx: number }> = [ + { nodeId: startNode.nodeId, edgeIdx: 0 } + ]; + + discoveryTime.set(startNode.nodeId, time); + lowLink.set(startNode.nodeId, time++); + stack.push(startNode.nodeId); + inStack.add(startNode.nodeId); + + while (callStack.length > 0) { + const frame = callStack[callStack.length - 1]; + const { nodeId } = frame; + const outEdges = edgeIndex.bySource.get(nodeId) || []; + + if (frame.edgeIdx < outEdges.length) { + const targetId = outEdges[frame.edgeIdx++].target; + + if (!discoveryTime.has(targetId)) { + // 未访问:入栈,相当于递归调用 + discoveryTime.set(targetId, time); + lowLink.set(targetId, time++); + stack.push(targetId); + inStack.add(targetId); + callStack.push({ nodeId: targetId, edgeIdx: 0 }); + } else if (inStack.has(targetId)) { + lowLink.set(nodeId, Math.min(lowLink.get(nodeId)!, discoveryTime.get(targetId)!)); + } + } else { + // 当前节点所有出边处理完毕,相当于递归返回 + callStack.pop(); + if (callStack.length > 0) { + const parentId = callStack[callStack.length - 1].nodeId; + lowLink.set(parentId, Math.min(lowLink.get(parentId)!, lowLink.get(nodeId)!)); + } + + // 判断是否为 SCC 根节点 + if (lowLink.get(nodeId) === discoveryTime.get(nodeId)) { + const sccNodes: string[] = []; + let w: string; + do { + w = stack.pop()!; + inStack.delete(w); + nodeToSCC.set(w, sccId); + sccNodes.push(w); + } while (w !== nodeId); + sccSizes.set(sccId++, sccNodes.length); + } + } + } + } + + return { nodeToSCC, sccSizes }; +} +``` + +`classifyEdgesByDFS` 同理改为迭代版(结构更简单,一个 `while` 循环替换递归 `dfs()`)。 + +**预期效果**:消除调用栈深度限制,计算时间不变但不会有栈溢出风险;代码结构更清晰,更易分段插入 yield 点(见方案五)。 + +--- + +## 方案四:`findBranchHandle` BFS 结果缓存 + +### 问题 + +`buildNodeEdgeGroupsMap` 对每个节点的每条入边调用 `findBranchHandle`,做一次向上回溯 BFS。同一个 source 节点被多条边共享时,BFS 结果是相同的,重复计算。整体 O(N²)。 + +### 方案 + +**以 `(sourceNodeId + sourceHandle)` 为 key 缓存 BFS 结果**: + +```ts +private static groupEdgesByBranch( + edges: RuntimeEdgeItemType[], + edgeIndex: ..., + nodesMap: Map, + isBranchNode: ... +): RuntimeEdgeItemType[][] { + // 新增:缓存本次 buildNodeEdgeGroupsMap 调用内的 BFS 结果 + const branchHandleCache = new Map(); + + const edgeBranchMap = new Map(); + edges.forEach((edge) => { + const cacheKey = `${edge.source}::${edge.sourceHandle ?? 'default'}`; + let branchHandle = branchHandleCache.get(cacheKey); + if (branchHandle === undefined) { + branchHandle = this.findBranchHandle(edge, edgeIndex, nodesMap, isBranchNode); + branchHandleCache.set(cacheKey, branchHandle); + } + edgeBranchMap.set(edge, branchHandle); + }); + // ... +} +``` + +注意:`branchHandleCache` 在单次 `buildNodeEdgeGroupsMap` 调用内有效,不跨工作流实例共享(不同工作流拓扑不同)。 + +**预期效果**:重复边的 BFS 结果直接复用,从 O(N²) 降至接近 O(N×平均扇入),节点多、分支多的工作流效果最明显。 + +--- + +## 方案五:构造函数完成后让出事件循环 + +### 问题 + +`WorkflowQueue` 构造函数里的图算法(方案一~四优化后仍有固定开销)全部同步完成,多个并发工作流请求时,这些同步计算依次占用主线程,导致后续请求等待。 + +### 方案 + +`WorkflowQueue` 本身是同步构造的,无法在构造函数里 `await`。可以把构造拆成两步,或者在 `runWorkflow` 入口构造完成后立即让出: + +```ts +// packages/service/core/workflow/dispatch/index.ts - runWorkflow 函数 +export async function runWorkflow(props: RunWorkflowProps): Promise { + return new Promise((resolve) => { + const queue = new WorkflowQueue({ + data: props, + maxConcurrency: 10, + defaultSkipNodeQueue: props.defaultSkipNodeQueue, + resolve + }); + + // 构造完成(图算法已执行)后,先让出一次事件循环 + // 让其他并发请求有机会执行,避免连续多个工作流启动时的 CPU 连续占用 + setImmediate(() => { + queue.addActiveNode(entryNodeId); + }); + }); +} +``` + +实际上 `runWorkflow` 现有代码里有 `addActiveNode` 的调用,在那里加一个 `await surrenderProcess()` 即可。 + +**预期效果**:每次工作流启动后主动让出一次,高并发时多个工作流的图初始化计算被事件循环交错调度,而不是连续堵塞。 + +--- + +## 实施优先级 + +| 方案 | 改动量 | 风险 | 效果 | 优先级 | +|------|-------|------|------|-------| +| 方案一:节点 O(1) 索引 | 中(函数签名变化,调用侧修改) | 低(向后兼容,可选参数) | ⭐⭐⭐⭐ | P0 | +| 方案二:RegExp 缓存 | 小(本地改动) | 极低 | ⭐⭐⭐ | P1 | +| 方案三:递归改迭代 | 中(逻辑重写,需测试) | 中(算法正确性需验证) | ⭐⭐(防栈溢出) | P1 | +| 方案四:BFS 缓存 | 小(加 Map 缓存) | 极低 | ⭐⭐⭐ | P1 | +| 方案五:构造后让出 | 极小(加一行) | 极低 | ⭐⭐(并发公平性) | P2 | + +**建议执行顺序**:方案一 → 方案二 + 方案四(可并行)→ 方案三(配套单测)→ 方案五 + +--- + +## 改动文件清单 + +``` +packages/global/core/workflow/runtime/ + ├── utils.ts 方案一(函数签名)、方案二(RegExp 缓存) + └── type.ts 方案一(新增 NodeOutputIndex 类型) + +packages/service/core/workflow/ + ├── dispatch/index.ts 方案一(传 Map)、方案四(BFS缓存)、方案五(让出) + └── utils/tarjan.ts 方案三(递归改迭代) +``` diff --git a/.claude/issue/workflow-thread-blocking-analysis.md b/.claude/issue/workflow-thread-blocking-analysis.md new file mode 100644 index 0000000000..7ae08dc80c --- /dev/null +++ b/.claude/issue/workflow-thread-blocking-analysis.md @@ -0,0 +1,160 @@ +# 工作流 CPU 阻塞模块分析 + +> 从 `packages/service/core/workflow/dispatch/index.ts` 入口出发,排查所有**同步占用 CPU、阻塞整个进程**的模块。 + +Node.js 单线程模型下,CPU 阻塞指:在当前调用栈未让出事件循环(无 `await`)的情况下执行大量计算,导致其他请求无法被处理。 + +--- + +## 一、`WorkflowQueue` 构造函数——图算法批量同步执行 + +**文件**: `packages/service/core/workflow/dispatch/index.ts:348` + +每次创建工作流实例时,构造函数**同步**依次执行: + +```ts +constructor(...) { + // 1. O(E) 构建边索引 + this.edgeIndex = WorkflowQueue.buildEdgeIndex({ runtimeEdges }); + + // 2. O(N+E) DFS 边分类 ← 递归,全同步 + // 3. O(N+E) Tarjan SCC ← 递归,全同步 + // 4. O(N²) BFS per node ← 每个节点一次 BFS 回溯 + this.nodeEdgeGroupsMap = WorkflowQueue.buildNodeEdgeGroupsMap({ ... }); +} +``` + +三个算法全部是纯同步的 CPU 密集计算,无任何 `await` 让出点。 + +--- + +## 二、Tarjan SCC 算法——递归 DFS,无让出 + +**文件**: `packages/service/core/workflow/utils/tarjan.ts:31` + +```ts +function tarjan(nodeId: string) { + // ... + for (const edge of outEdges) { + if (!discoveryTime.has(targetId)) { + tarjan(targetId); // ⚠️ 同步递归,无 await + } + } +} +for (const node of runtimeNodes) { + tarjan(node.nodeId); // 对每个未访问节点启动递归 +} +``` + +**问题**: +- 纯同步递归,执行期间完全占用 Event Loop。 +- 节点数 N 较大(如 100+ 节点)时,递归深度 = 工作流拓扑深度,调用栈可能很深。 +- 同文件 `classifyEdgesByDFS` 也是完全相同的递归 DFS 结构,与 Tarjan 串行执行,等于一次工作流启动 **做两遍图遍历**。 + +--- + +## 三、`findBranchHandle`——每节点一次 BFS,合计 O(N²) + +**文件**: `packages/service/core/workflow/dispatch/index.ts:543` + +```ts +private static buildNodeEdgeGroupsMap(...) { + runtimeNodes.forEach((targetNode) => { + // 对每个节点的每条边,调用 findBranchHandle + const branchGroups = this.groupEdgesByBranch(nonBackEdges, ...); + }); +} + +private static findBranchHandle(edge, ...) { + const queue = [{ nodeId: edge.source, ... }]; + while (queue.length > 0) { + // BFS 向上回溯,最坏遍历所有节点 ← 纯同步 + const inEdges = edgeIndex.byTarget.get(nodeId) || []; + for (const inEdge of inEdges) { + queue.push({ nodeId: inEdge.source, ... }); + } + } +} +``` + +**问题**: +- `buildNodeEdgeGroupsMap` 在构造函数中对每个节点调用,每次调用又做一次 BFS。 +- 最坏复杂度 O(N × (N + E)),对于 100 节点、200 边的工作流约为 30000 次循环迭代,全同步。 + +--- + +## 四、`replaceEditorVariable`——每节点每输入做正则+递归,全同步 + +**文件**: `packages/global/core/workflow/runtime/utils.ts:372` + +每次节点运行前,`getNodeRunParams` 对每个 input 都调用: + +```ts +node.inputs.forEach((input) => { + // 每个 input 都调用一次 replaceEditorVariable + let value = replaceEditorVariable({ + text: input.value, + nodes: this.data.runtimeNodes, // 传入所有节点 + variables: this.data.variables + }); + value = getReferenceVariableValue({ value, nodes, variables }); +}); +``` + +`replaceEditorVariable` 内部: + +```ts +// 1. 全局正则匹配,提取所有变量引用 +const matches = [...text.matchAll(variablePattern)]; + +for (const match of matches) { + // 2. nodes.find() O(N) 线性扫描 + const node = nodes.find((node) => node.nodeId === nodeId); + // 3. 每个变量编译一次新 RegExp ← 正则编译有 CPU 开销 + replacements.push({ pattern: `\\{\\{\\$${escapedNodeId}...`, replacement: formatVal }); +} + +// 4. 如果有嵌套变量,递归调用自身(最多 depth=10) +if (hasReplacements && /\{\{\$[^.]+\.[^$]+\$\}\}/.test(result)) { + result = replaceEditorVariable({ text: result, nodes, variables, depth: depth + 1 }); +} +``` + +**问题**: +- **每次 `nodes.find()`** 是 O(N) 线性扫描,完全没有缓存。一个节点有 10 个 input、每个 input 引用 5 个变量、工作流有 50 个节点 → **2500 次 O(N) 扫描**。 +- **每个变量引用** `new RegExp(pattern)` 一次,正则编译有 CPU 成本。 +- **最多 10 层递归**,每层都重复上述过程。 +- 整个函数链(`replaceEditorVariable` + `getReferenceVariableValue`)全同步,**每个节点运行前都会触发,且节点越多调用越频繁**。 + +--- + +## 五、`getReferenceVariableValue`——O(N) 数组扫描,无缓存 + +**文件**: `packages/global/core/workflow/runtime/utils.ts:297` + +```ts +const node = nodes.find((node) => node.nodeId === sourceNodeId); // O(N) +return node.outputs.find((output) => output.id === outputId)?.value; // O(outputs) +``` + +**问题**: +- 每次调用都线性扫描整个 `nodes` 数组。 +- 被 `replaceEditorVariable` 频繁调用(每个变量引用一次)。 +- `nodes` 数组在运行时不会变化(节点结构固定),却没有预建索引,每次都从头扫。 + +--- + +## 汇总 + +| 位置 | 函数 | 复杂度 | 触发时机 | 是否有让出点 | +|------|------|--------|---------|------------| +| `dispatch/index.ts` 构造函数 | `buildEdgeIndex` | O(E) | 每次工作流启动 | ❌ 无 | +| `utils/tarjan.ts` | `classifyEdgesByDFS` | O(N+E) 递归 | 每次工作流启动 | ❌ 无 | +| `utils/tarjan.ts` | `findSCCs (tarjan)` | O(N+E) 递归 | 每次工作流启动 | ❌ 无 | +| `dispatch/index.ts` | `buildNodeEdgeGroupsMap` + `findBranchHandle` | O(N²) | 每次工作流启动 | ❌ 无 | +| `runtime/utils.ts` | `replaceEditorVariable` | O(N × inputs × depth) | 每节点运行前 | ❌ 无 | +| `runtime/utils.ts` | `getReferenceVariableValue` | O(N) per call | 每 input 一次 | ❌ 无 | + +**最严重的场景**:大型工作流(100+ 节点)并发启动时,构造函数中的图算法(第一~四项)全部同步执行,每个请求都会独占 Event Loop 若干毫秒,并发时互相堆叠,导致明显卡顿。 + +**最高频的场景**:Agent 节点有大量工具调用时,每轮工具调用后节点重新 resolve 触发下游节点的参数注入,`replaceEditorVariable` 被高频调用,且每次都对所有节点做线性扫描。 diff --git a/document/content/docs/self-host/upgrading/4-14/41410.mdx b/document/content/docs/self-host/upgrading/4-14/41410.mdx index 6bc9d6bb95..1d4a52d89a 100644 --- a/document/content/docs/self-host/upgrading/4-14/41410.mdx +++ b/document/content/docs/self-host/upgrading/4-14/41410.mdx @@ -6,6 +6,7 @@ description: 'FastGPT V4.14.10 更新说明' ## 注意 1. 代码沙盒镜像名变更: `{{hub}}/fastgpt-sandbox` -> `{{hub}}/fastgpt-code-sandbox` +2. 系统工具部分头像,移除了 icon,都转用图片链接,如果丢失了头像,可以重新更新一次系统工具(卸载再安装,或者直接导入 pkg 覆盖) ## 🚀 新增内容 @@ -15,6 +16,8 @@ description: 'FastGPT V4.14.10 更新说明' ## ⚙️ 优化 +1. 工作流 runtime,减少计算复杂。 +2. 增加一些对于大变量的计算限制,避免计算复杂度过高导致线程阻塞。 ## 🐛 修复 diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index 170cdc97cb..0bd87f6ba7 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -220,7 +220,7 @@ "document/content/docs/self-host/upgrading/4-14/4140.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4141.en.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4141.mdx": "2026-03-03T17:39:47+08:00", - "document/content/docs/self-host/upgrading/4-14/41410.mdx": "2026-03-26T16:35:07+08:00", + "document/content/docs/self-host/upgrading/4-14/41410.mdx": "2026-03-27T12:01:02+08:00", "document/content/docs/self-host/upgrading/4-14/4142.en.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4142.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4143.en.mdx": "2026-03-03T17:39:47+08:00", diff --git a/packages/global/common/string/tools.ts b/packages/global/common/string/tools.ts index 87571355b1..dd5f4ba3ec 100644 --- a/packages/global/common/string/tools.ts +++ b/packages/global/common/string/tools.ts @@ -1,6 +1,14 @@ import crypto from 'crypto'; import { customAlphabet } from 'nanoid'; import path from 'path'; +import { getErrText } from '../error/utils'; + +export const checkStrOversize = (str: string, size = 1e8) => { + if (str.length > size) { + return true; + } + return false; +}; /* check string is a web link */ export function strIsLink(str?: string) { @@ -30,7 +38,25 @@ export const valToStr = (val: any) => { if (val === undefined) return ''; if (val === null) return 'null'; - if (typeof val === 'object') return JSON.stringify(val); + if (typeof val === 'object') { + try { + const start = Date.now(); + const res = JSON.stringify(val); + + if (Date.now() - start > 1000) { + console.warn('Slow JSON.stringify', { + duration: Date.now() - start, + valLength: res.length + }); + } + + return res; + } catch (error) { + console.error('Failed to stringify value', { error }); + return `Failed to stringify value: ${getErrText(error)}`; + } + } + return String(val); }; @@ -41,6 +67,9 @@ export function replaceVariable( depth = 0 ) { if (typeof text !== 'string') return text; + if (checkStrOversize(text)) { + throw new Error('Text length exceeds 100,000,000 characters.'); + } const MAX_REPLACEMENT_DEPTH = 10; const processedVariables = new Set(); diff --git a/packages/global/core/workflow/runtime/type.ts b/packages/global/core/workflow/runtime/type.ts index 0fc55a3375..e4ed8a2f2f 100644 --- a/packages/global/core/workflow/runtime/type.ts +++ b/packages/global/core/workflow/runtime/type.ts @@ -99,6 +99,7 @@ export type ChatDispatchProps = { export type ModuleDispatchProps = ChatDispatchProps & { node: RuntimeNodeItemType; runtimeNodes: RuntimeNodeItemType[]; + runtimeNodesMap: Map; runtimeEdges: RuntimeEdgeItemType[]; params: T; diff --git a/packages/global/core/workflow/runtime/utils.ts b/packages/global/core/workflow/runtime/utils.ts index 03501d44e3..5da00e430d 100644 --- a/packages/global/core/workflow/runtime/utils.ts +++ b/packages/global/core/workflow/runtime/utils.ts @@ -1,5 +1,5 @@ import json5 from 'json5'; -import { replaceVariable, valToStr } from '../../../common/string/tools'; +import { checkStrOversize, replaceVariable, valToStr } from '../../../common/string/tools'; import { ChatRoleEnum } from '../../../core/chat/constants'; import type { ChatItemType } from '../../../core/chat/type'; import type { NodeOutputItemType } from './type'; @@ -296,17 +296,16 @@ export const filterWorkflowEdges = (edges: RuntimeEdgeItemType[]) => { */ export const getReferenceVariableValue = ({ value, - nodes, + nodesMap, variables }: { value?: ReferenceValueType; - nodes: RuntimeNodeItemType[]; + nodesMap: Record | Map; variables: Record; }) => { if (!value) return value; - // handle single reference value - if (isValidReferenceValueFormat(value)) { + const resoleValue = (value: [string, string | undefined]) => { const sourceNodeId = value[0]; const outputId = value[1]; @@ -316,12 +315,17 @@ export const getReferenceVariableValue = ({ } // 避免 value 刚好就是二个元素的字符串数组 - const node = nodes.find((node) => node.nodeId === sourceNodeId); + const node = nodesMap instanceof Map ? nodesMap.get(sourceNodeId) : nodesMap[sourceNodeId]; if (!node) { return value; } return node.outputs.find((output) => output.id === outputId)?.value; + }; + + // handle single reference value + if (isValidReferenceValueFormat(value)) { + return resoleValue(value as [string, string | undefined]); } // handle reference array @@ -330,15 +334,12 @@ export const getReferenceVariableValue = ({ value.length > 0 && value.every((item) => isValidReferenceValueFormat(item)) ) { - const result = value.map((val) => { - return getReferenceVariableValue({ - value: val, - nodes, - variables - }); - }); - - return result.flat().filter((item) => item !== undefined); + return value + .map((val) => { + return resoleValue(val as [string, string | undefined]); + }) + .flat() + .filter((item) => item !== undefined); } return value; @@ -368,20 +369,42 @@ export const formatVariableValByType = (val: any, valueType?: WorkflowIOValueTyp return val; }; +// 模块级 RegExp 缓存,避免每次变量替换都重新编译正则 +const _replaceRegexCache = new Map(); +const _MAX_REGEX_CACHE_SIZE = 5000; + +const _getCachedRegex = (pattern: string): RegExp => { + let re = _replaceRegexCache.get(pattern); + if (!re) { + if (_replaceRegexCache.size >= _MAX_REGEX_CACHE_SIZE) { + _replaceRegexCache.clear(); + } + re = new RegExp(pattern, 'g'); + _replaceRegexCache.set(pattern, re); + } + return re; +}; + // replace {{$xx.xx$}} variables for text export function replaceEditorVariable({ text, - nodes, + nodesMap, variables, depth = 0 }: { text: any; - nodes: RuntimeNodeItemType[]; + nodesMap: Record | Map; variables: Record; // global variables depth?: number; }) { + const getNode = (nodeId: string) => { + return nodesMap instanceof Map ? nodesMap.get(nodeId) : nodesMap[nodeId]; + }; if (typeof text !== 'string') return text; if (text === '') return text; + if (checkStrOversize(text)) { + throw new Error('Text length exceeds 100,000,000 characters.'); + } const MAX_REPLACEMENT_DEPTH = 10; const processedVariables = new Set(); @@ -398,10 +421,10 @@ export function replaceEditorVariable({ if (typeof value !== 'string') return false; // Check if the value contains the target variable pattern (direct self-reference) - const selfRefPattern = new RegExp( - `\\{\\{\\$${targetKey.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\$\\}\\}`, - 'g' + const selfRefPattern = _getCachedRegex( + `\\{\\{\\$${targetKey.replace(/[.*+?^${}()|[\]\\]/g, '\\$&')}\\$\\}\\}` ); + selfRefPattern.lastIndex = 0; return selfRefPattern.test(value); }; @@ -431,7 +454,7 @@ export function replaceEditorVariable({ return variables[id]; } // Find upstream node input/output - const node = nodes.find((node) => node.nodeId === nodeId); + const node = getNode(nodeId); if (!node) return; const output = node.outputs.find((output) => output.id === id); @@ -439,7 +462,13 @@ export function replaceEditorVariable({ // Use the node's input as the variable value(Example: HTTP data will reference its own dynamic input) const input = node.inputs.find((input) => input.key === id); - if (input) return getReferenceVariableValue({ value: input.value, nodes, variables }); + if (input) { + return getReferenceVariableValue({ + value: input.value, + nodesMap, + variables + }); + } })(); // Check for direct circular reference @@ -461,13 +490,20 @@ export function replaceEditorVariable({ } // Apply all replacements - replacements.forEach(({ pattern, replacement }) => { - result = result.replace(new RegExp(pattern, 'g'), replacement); - }); + for (const { pattern, replacement } of replacements) { + if (checkStrOversize(result)) { + console.warn('Text length exceeds 100,000,000 characters.'); + break; + } + + const re = _getCachedRegex(pattern); + re.lastIndex = 0; + result = result.replace(re, () => replacement); + } // If we made replacements and there might be nested variables, recursively process if (hasReplacements && /\{\{\$[^.]+\.[^$]+\$\}\}/.test(result)) { - result = replaceEditorVariable({ text: result, nodes, variables, depth: depth + 1 }); + result = replaceEditorVariable({ text: result, nodesMap, variables, depth: depth + 1 }); } return result || ''; diff --git a/packages/service/core/app/http.ts b/packages/service/core/app/http.ts index fba01a13c1..9edfda02c4 100644 --- a/packages/service/core/app/http.ts +++ b/packages/service/core/app/http.ts @@ -37,7 +37,7 @@ const buildHttpRequest = ({ const replaceVariables = (text: string) => { return replaceEditorVariable({ text, - nodes: [], + nodesMap: new Map(), variables: params }); }; diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index c27573a273..770e2a6b09 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -749,7 +749,7 @@ export class WorkflowQueue { if (runningNodePromises.size > 0) { // 当上一个节点运行结束时,立即运行下一轮 await Promise.race(runningNodePromises).catch((error) => { - logger.error('Workflow race error', { error }); + logger.error('Workflow race error', { chatId: this.data.chatId, error }); }); } else { // 理论上不应出现此情况,防御性退回到让出进程 @@ -852,14 +852,14 @@ export class WorkflowQueue { // replace {{$xx.xx$}} and {{xx}} variables let value = replaceEditorVariable({ text: input.value, - nodes: this.data.runtimeNodes, + nodesMap: this.runtimeNodesMap, variables: this.data.variables }); // replace reference variables value = getReferenceVariableValue({ value, - nodes: this.data.runtimeNodes, + nodesMap: this.runtimeNodesMap, variables: this.data.variables }); @@ -899,6 +899,7 @@ export class WorkflowQueue { retainDatasetCite: this.data.retainDatasetCite, node, runtimeNodes: this.data.runtimeNodes, + runtimeNodesMap: this.runtimeNodesMap, runtimeEdges: this.data.runtimeEdges, params, mode diff --git a/packages/service/core/workflow/dispatch/tools/http468.ts b/packages/service/core/workflow/dispatch/tools/http468.ts index 7884842959..c3f01de965 100644 --- a/packages/service/core/workflow/dispatch/tools/http468.ts +++ b/packages/service/core/workflow/dispatch/tools/http468.ts @@ -69,7 +69,7 @@ export const dispatchHttp468Request = async (props: HttpRequestProps): Promise { return replaceEditorVariable({ text, - nodes: runtimeNodes, + nodesMap: runtimeNodesMap, variables: allVariables }); }; @@ -178,7 +178,7 @@ export const dispatchHttp468Request = async (props: HttpRequestProps): Promise; allVariables: Record; - runtimeNodes: RuntimeNodeItemType[]; + runtimeNodesMap: Map; } ) => { - const { variables, allVariables, runtimeNodes } = props; + const { variables, allVariables, runtimeNodesMap } = props; const MAX_REPLACEMENT_DEPTH = 10; const processedVariables = new Set(); @@ -405,15 +405,20 @@ export const replaceJsonBodyString = ( return variables[id]; } // Find upstream node input/output - const node = runtimeNodes.find((node) => node.nodeId === nodeId); + const node = runtimeNodesMap.get(nodeId); if (!node) return; const output = node.outputs.find((output) => output.id === id); if (output) return formatVariableValByType(output.value, output.valueType); const input = node.inputs.find((input) => input.key === id); - if (input) - return getReferenceVariableValue({ value: input.value, nodes: runtimeNodes, variables }); + if (input) { + return getReferenceVariableValue({ + value: input.value, + nodesMap: runtimeNodesMap, + variables + }); + } })(); const formatVal = valToStr(variableVal, isInQuotes); diff --git a/packages/service/core/workflow/dispatch/tools/runIfElse.ts b/packages/service/core/workflow/dispatch/tools/runIfElse.ts index b87f705699..af86382b0f 100644 --- a/packages/service/core/workflow/dispatch/tools/runIfElse.ts +++ b/packages/service/core/workflow/dispatch/tools/runIfElse.ts @@ -105,7 +105,7 @@ function getResult( condition: IfElseConditionType, list: ConditionListItemType[], variables: Record, - runtimeNodes: RuntimeNodeItemType[] + runtimeNodesMap: Map ) { const listResult = list.map((item) => { const { variable, condition: variableCondition, value, valueType } = item; @@ -114,7 +114,7 @@ function getResult( const conditionLeftValue = getReferenceVariableValue({ value: variable, variables, - nodes: runtimeNodes + nodesMap: runtimeNodesMap }); const conditionRightValue = @@ -122,7 +122,7 @@ function getResult( ? getReferenceVariableValue({ value: value as ReferenceItemValueType, variables, - nodes: runtimeNodes + nodesMap: runtimeNodesMap }) : value; @@ -135,7 +135,7 @@ function getResult( export const dispatchIfElse = async (props: Props): Promise => { const { params, - runtimeNodes, + runtimeNodesMap, variables, node: { nodeId } } = props; @@ -144,7 +144,7 @@ export const dispatchIfElse = async (props: Props): Promise => { let res = IfElseResultEnum.ELSE as string; for (let i = 0; i < ifElseList.length; i++) { const item = ifElseList[i]; - const result = getResult(item.condition, item.list, variables, runtimeNodes); + const result = getResult(item.condition, item.list, variables, runtimeNodesMap); if (result) { res = getElseIFLabel(i); break; diff --git a/packages/service/core/workflow/dispatch/tools/runUpdateVar.ts b/packages/service/core/workflow/dispatch/tools/runUpdateVar.ts index c0fc037395..8e77f27d22 100644 --- a/packages/service/core/workflow/dispatch/tools/runUpdateVar.ts +++ b/packages/service/core/workflow/dispatch/tools/runUpdateVar.ts @@ -25,14 +25,14 @@ export const dispatchUpdateVariable = async (props: Props): Promise => chatConfig, params, variables, - runtimeNodes, + runtimeNodesMap, workflowStreamResponse, externalProvider, runningAppInfo } = props; const { updateList } = params; - const nodeIds = runtimeNodes.map((node) => node.nodeId); + const nodeIds = Array.from(runtimeNodesMap.keys()); const result = updateList.map((item) => { const variable = item.variable; @@ -55,7 +55,7 @@ export const dispatchUpdateVariable = async (props: Props): Promise => typeof item.value?.[1] === 'string' ? replaceEditorVariable({ text: item.value?.[1], - nodes: runtimeNodes, + nodesMap: runtimeNodesMap, variables }) : item.value?.[1]; @@ -65,7 +65,7 @@ export const dispatchUpdateVariable = async (props: Props): Promise => return getReferenceVariableValue({ value: item.value, variables, - nodes: runtimeNodes + nodesMap: runtimeNodesMap }); } })(); @@ -75,15 +75,14 @@ export const dispatchUpdateVariable = async (props: Props): Promise => if (varNodeId === VARIABLE_NODE_ID) { variables[varKey] = value; } else { + const node = runtimeNodesMap.get(varNodeId); // Other nodes - runtimeNodes - .find((node) => node.nodeId === varNodeId) - ?.outputs?.find((output) => { - if (output.id === varKey) { - output.value = value; - return true; - } - }); + node?.outputs?.find((output) => { + if (output.id === varKey) { + output.value = value; + return true; + } + }); } return value; diff --git a/projects/app/src/pages/api/support/openapi/health.ts b/projects/app/src/pages/api/support/openapi/health.ts index 9b14631d77..d03c895fd0 100644 --- a/projects/app/src/pages/api/support/openapi/health.ts +++ b/projects/app/src/pages/api/support/openapi/health.ts @@ -1,11 +1,7 @@ import type { ApiRequestProps, ApiResponseType } from '@fastgpt/service/type/next'; import { NextAPI } from '@/service/middleware/entry'; -import type { - ApiKeyHealthResponseType -} from '@fastgpt/global/openapi/support/openapi/api'; -import { - ApiKeyHealthParamsSchema -} from '@fastgpt/global/openapi/support/openapi/api'; +import type { ApiKeyHealthResponseType } from '@fastgpt/global/openapi/support/openapi/api'; +import { ApiKeyHealthParamsSchema } from '@fastgpt/global/openapi/support/openapi/api'; import { MongoOpenApi } from '@fastgpt/service/support/openapi/schema'; import { useIPFrequencyLimit } from '../../../../../../../packages/service/common/middle/reqFrequencyLimit'; diff --git a/test/cases/global/core/workflow/runtime/utils.test.ts b/test/cases/global/core/workflow/runtime/utils.test.ts index 07e7509d14..6ff797ffbb 100644 --- a/test/cases/global/core/workflow/runtime/utils.test.ts +++ b/test/cases/global/core/workflow/runtime/utils.test.ts @@ -598,6 +598,31 @@ describe('valueTypeFormat', () => { expect(valueTypeFormat(item.value, item.type)).toEqual(item.result); }); }); + + it('should return default value when json5.parse throws for object type', () => { + // '{a: }' passes isObjectString but fails json5.parse + expect(valueTypeFormat('{a: }', WorkflowIOValueTypeEnum.object)).toEqual({}); + }); + + it('should return [value] when json5.parse throws for array type', () => { + // '[a]' passes isObjectString but fails json5.parse (bare identifier is not a valid json5 value) + expect(valueTypeFormat('[a]', WorkflowIOValueTypeEnum.arrayString)).toEqual(['[a]']); + }); + + it('should return [] when json5.parse throws for special types', () => { + expect(valueTypeFormat('[a]', WorkflowIOValueTypeEnum.datasetQuote)).toEqual([]); + expect(valueTypeFormat('[a]', WorkflowIOValueTypeEnum.selectDataset)).toEqual([]); + expect(valueTypeFormat('[a]', WorkflowIOValueTypeEnum.selectApp)).toEqual([]); + }); + + it('should return [] when json5.parse throws for chatHistory type', () => { + expect(valueTypeFormat('[a]', WorkflowIOValueTypeEnum.chatHistory)).toEqual([]); + }); + + it('should return value as-is for unhandled valueType (dynamic)', () => { + expect(valueTypeFormat('hello', WorkflowIOValueTypeEnum.dynamic)).toBe('hello'); + expect(valueTypeFormat(42, WorkflowIOValueTypeEnum.dynamic)).toBe(42); + }); }); describe('getLastInteractiveValue', () => { @@ -933,6 +958,24 @@ describe('storeEdges2RuntimeEdges', () => { const result = storeEdges2RuntimeEdges(undefined as any); expect(result).toEqual([]); }); + + it('should use fallback [] when lastInteractive.memoryEdges is undefined', () => { + const edges: StoreEdgeItemType[] = [ + { source: 'n1', sourceHandle: 'out1', target: 'n2', targetHandle: 'in1' } + ]; + const lastInteractive = { + type: 'userSelect', + entryNodeIds: [], + memoryEdges: undefined, + nodeOutputs: [], + params: { description: '', userSelectOptions: [] } + } as any as WorkflowInteractiveResponseType; + + const result = storeEdges2RuntimeEdges(edges, lastInteractive); + expect(result).toEqual([ + { source: 'n1', sourceHandle: 'out1', target: 'n2', targetHandle: 'in1', status: 'waiting' } + ]); + }); }); describe('getWorkflowEntryNodeIds', () => { @@ -1062,6 +1105,28 @@ describe('getWorkflowEntryNodeIds', () => { const result = getWorkflowEntryNodeIds(nodes); expect(result).toEqual(['start1']); }); + + it('should fall through to node scan when lastInteractive.entryNodeIds is undefined', () => { + const nodes: RuntimeNodeItemType[] = [ + { + nodeId: 'start1', + name: 'start', + flowNodeType: FlowNodeTypeEnum.workflowStart, + inputs: [], + outputs: [] + } + ]; + const lastInteractive = { + type: 'userSelect', + entryNodeIds: undefined, + memoryEdges: [], + nodeOutputs: [], + params: { description: '', userSelectOptions: [] } + } as any as WorkflowInteractiveResponseType; + + const result = getWorkflowEntryNodeIds(nodes, lastInteractive); + expect(result).toEqual(['start1']); + }); }); describe('storeNodes2RuntimeNodes', () => { @@ -1165,14 +1230,14 @@ describe('filterWorkflowEdges', () => { describe('getReferenceVariableValue', () => { it('should return undefined for undefined value', () => { expect( - getReferenceVariableValue({ value: undefined, nodes: [], variables: {} }) + getReferenceVariableValue({ value: undefined, nodesMap: {}, variables: {} }) ).toBeUndefined(); }); it('should return variable value for VARIABLE_NODE_ID reference', () => { const result = getReferenceVariableValue({ value: [VARIABLE_NODE_ID, 'myVar'], - nodes: [], + nodesMap: {}, variables: { myVar: 'hello' } }); expect(result).toBe('hello'); @@ -1181,15 +1246,15 @@ describe('getReferenceVariableValue', () => { it('should return undefined for VARIABLE_NODE_ID with empty outputId', () => { const result = getReferenceVariableValue({ value: [VARIABLE_NODE_ID, ''], - nodes: [], + nodesMap: {}, variables: { myVar: 'hello' } }); expect(result).toBeUndefined(); }); it('should return node output value', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1198,27 +1263,74 @@ describe('getReferenceVariableValue', () => { { id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: 'outputValue' } ] } - ]; + }; const result = getReferenceVariableValue({ value: ['node1', 'out1'], - nodes, + nodesMap, variables: {} }); expect(result).toBe('outputValue'); }); + it('should return undefined when output id not found in node', () => { + const nodesMap: Record = { + node1: { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [ + { id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: 'outputValue' } + ] + } + }; + const result = getReferenceVariableValue({ + value: ['node1', 'nonexistent'], + nodesMap, + variables: {} + }); + expect(result).toBeUndefined(); + }); + it('should return original value when node not found', () => { const result = getReferenceVariableValue({ value: ['nonexistent', 'out1'], - nodes: [], + nodesMap: {}, variables: {} }); expect(result).toEqual(['nonexistent', 'out1']); }); + it('should return non-reference value as-is', () => { + const result = getReferenceVariableValue({ + value: 'plain string' as any, + nodesMap: {}, + variables: {} + }); + expect(result).toBe('plain string'); + }); + + it('should handle array with single reference', () => { + const nodesMap: Record = { + node1: { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [{ id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: 'v1' }] + } + }; + const result = getReferenceVariableValue({ + value: [['node1', 'out1']], + nodesMap, + variables: {} + }); + expect(result).toEqual(['v1']); + }); + it('should handle array of references', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1227,7 +1339,7 @@ describe('getReferenceVariableValue', () => { { id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: 'value1' } ] }, - { + node2: { nodeId: 'node2', name: 'test2', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1236,21 +1348,33 @@ describe('getReferenceVariableValue', () => { { id: 'out2', key: 'output2', type: FlowNodeOutputTypeEnum.static, value: 'value2' } ] } - ]; + }; const result = getReferenceVariableValue({ value: [ ['node1', 'out1'], ['node2', 'out2'] ], - nodes, + nodesMap, variables: {} }); expect(result).toEqual(['value1', 'value2']); }); + it('should handle array with VARIABLE_NODE_ID references', () => { + const result = getReferenceVariableValue({ + value: [ + [VARIABLE_NODE_ID, 'var1'], + [VARIABLE_NODE_ID, 'var2'] + ], + nodesMap: {}, + variables: { var1: 'hello', var2: 'world' } + }); + expect(result).toEqual(['hello', 'world']); + }); + it('should filter undefined values from array result', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1259,17 +1383,102 @@ describe('getReferenceVariableValue', () => { { id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: 'value1' } ] } - ]; + }; const result = getReferenceVariableValue({ value: [ ['node1', 'out1'], ['node1', 'nonexistent'] ], - nodes, + nodesMap, variables: {} }); expect(result).toEqual(['value1']); }); + + it('should flatten array output values in reference array', () => { + const nodesMap: Record = { + node1: { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [ + { id: 'out1', key: 'output1', type: FlowNodeOutputTypeEnum.static, value: ['a', 'b'] } + ] + }, + node2: { + nodeId: 'node2', + name: 'test2', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [ + { id: 'out2', key: 'output2', type: FlowNodeOutputTypeEnum.static, value: ['c', 'd'] } + ] + } + }; + const result = getReferenceVariableValue({ + value: [ + ['node1', 'out1'], + ['node2', 'out2'] + ], + nodesMap, + variables: {} + }); + expect(result).toEqual(['a', 'b', 'c', 'd']); + }); + + it('should handle array with 3+ references', () => { + const nodesMap: Record = { + n1: { + nodeId: 'n1', + name: 'n1', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [{ id: 'o1', key: 'o1', type: FlowNodeOutputTypeEnum.static, value: 'r1' }] + }, + n2: { + nodeId: 'n2', + name: 'n2', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [{ id: 'o2', key: 'o2', type: FlowNodeOutputTypeEnum.static, value: 'r2' }] + }, + n3: { + nodeId: 'n3', + name: 'n3', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [{ id: 'o3', key: 'o3', type: FlowNodeOutputTypeEnum.static, value: 'r3' }] + } + }; + const result = getReferenceVariableValue({ + value: [ + ['n1', 'o1'], + ['n2', 'o2'], + ['n3', 'o3'] + ], + nodesMap, + variables: {} + }); + expect(result).toEqual(['r1', 'r2', 'r3']); + }); + + it('should support Map as nodesMap', () => { + const nodesMap = new Map([ + [ + 'node1', + { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [{ id: 'out1', key: 'o1', type: FlowNodeOutputTypeEnum.static, value: 'mapVal' }] + } + ] + ]); + const result = getReferenceVariableValue({ value: ['node1', 'out1'], nodesMap, variables: {} }); + expect(result).toBe('mapVal'); + }); }); describe('formatVariableValByType', () => { @@ -1322,26 +1531,26 @@ describe('formatVariableValByType', () => { describe('replaceEditorVariable', () => { it('should return non-string values as is', () => { - expect(replaceEditorVariable({ text: 123, nodes: [], variables: {} })).toBe(123); - expect(replaceEditorVariable({ text: null, nodes: [], variables: {} })).toBe(null); + expect(replaceEditorVariable({ text: 123, nodesMap: {}, variables: {} })).toBe(123); + expect(replaceEditorVariable({ text: null, nodesMap: {}, variables: {} })).toBe(null); }); it('should return empty string as is', () => { - expect(replaceEditorVariable({ text: '', nodes: [], variables: {} })).toBe(''); + expect(replaceEditorVariable({ text: '', nodesMap: {}, variables: {} })).toBe(''); }); it('should replace global variables', () => { const result = replaceEditorVariable({ text: 'Hello {{name}}', - nodes: [], + nodesMap: {}, variables: { name: 'World' } }); expect(result).toBe('Hello World'); }); it('should replace node output variables', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1356,10 +1565,10 @@ describe('replaceEditorVariable', () => { } ] } - ]; + }; const result = replaceEditorVariable({ text: 'Result: {{$node1.out1$}}', - nodes, + nodesMap, variables: {} }); expect(result).toBe('Result: outputValue'); @@ -1368,15 +1577,15 @@ describe('replaceEditorVariable', () => { it('should replace VARIABLE_NODE_ID variables', () => { const result = replaceEditorVariable({ text: `Value: {{$${VARIABLE_NODE_ID}.myVar$}}`, - nodes: [], + nodesMap: {}, variables: { myVar: 'varValue' } }); expect(result).toBe('Value: varValue'); }); it('should handle nested variable replacement', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1391,7 +1600,7 @@ describe('replaceEditorVariable', () => { } ] }, - { + node2: { nodeId: 'node2', name: 'test2', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1406,18 +1615,18 @@ describe('replaceEditorVariable', () => { } ] } - ]; + }; const result = replaceEditorVariable({ text: 'Result: {{$node1.out1$}}', - nodes, + nodesMap, variables: {} }); expect(result).toBe('Result: finalValue'); }); it('should handle circular reference protection', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1432,10 +1641,10 @@ describe('replaceEditorVariable', () => { } ] } - ]; + }; const result = replaceEditorVariable({ text: 'Result: {{$node1.out1$}}', - nodes, + nodesMap, variables: {} }); expect(result).toBe('Result: {{$node1.out1$}}'); @@ -1444,7 +1653,7 @@ describe('replaceEditorVariable', () => { it('should handle max depth protection', () => { const result = replaceEditorVariable({ text: 'test', - nodes: [], + nodesMap: {}, variables: {}, depth: 15 }); @@ -1452,26 +1661,26 @@ describe('replaceEditorVariable', () => { }); it('should handle node input as variable source', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, inputs: [{ key: 'myInput', label: '', renderTypeList: [], value: 'inputValue' }], outputs: [] } - ]; + }; const result = replaceEditorVariable({ text: 'Input: {{$node1.myInput$}}', - nodes, + nodesMap, variables: {} }); expect(result).toBe('Input: inputValue'); }); it('should convert object values to string', () => { - const nodes: RuntimeNodeItemType[] = [ - { + const nodesMap: Record = { + node1: { nodeId: 'node1', name: 'test', flowNodeType: FlowNodeTypeEnum.chatNode, @@ -1486,10 +1695,10 @@ describe('replaceEditorVariable', () => { } ] } - ]; + }; const result = replaceEditorVariable({ text: 'Object: {{$node1.out1$}}', - nodes, + nodesMap, variables: {} }); expect(result).toBe('Object: {"a":1}'); @@ -1498,12 +1707,93 @@ describe('replaceEditorVariable', () => { it('should keep original pattern when node not found', () => { const result = replaceEditorVariable({ text: '{{$nonexistent.out$}}', - nodes: [], + nodesMap: {}, variables: {} }); // When node is not found, the pattern is not replaced expect(result).toBe(''); }); + + it('should skip duplicate variable pattern in the same text', () => { + const nodesMap: Record = { + node1: { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [ + { + id: 'out1', + key: 'output1', + type: FlowNodeOutputTypeEnum.static, + value: 'val', + valueType: WorkflowIOValueTypeEnum.string + } + ] + } + }; + // Same pattern appears twice — second occurrence reuses first replacement + const result = replaceEditorVariable({ + text: '{{$node1.out1$}} and {{$node1.out1$}}', + nodesMap, + variables: {} + }); + expect(result).toBe('val and val'); + }); + + it('should support Map as nodesMap', () => { + const nodesMap = new Map([ + [ + 'node1', + { + nodeId: 'node1', + name: 'test', + flowNodeType: FlowNodeTypeEnum.chatNode, + inputs: [], + outputs: [ + { + id: 'out1', + key: 'output1', + type: FlowNodeOutputTypeEnum.static, + value: 'mapValue', + valueType: WorkflowIOValueTypeEnum.string + } + ] + } + ] + ]); + const result = replaceEditorVariable({ + text: 'Result: {{$node1.out1$}}', + nodesMap, + variables: {} + }); + expect(result).toBe('Result: mapValue'); + }); + + it('should handle $ special characters in variable values literally', () => { + // $& in replacement string would be interpreted as "matched substring" by JS replace() + // Using () => replacement prevents this behavior + const result1 = replaceEditorVariable({ + text: `Value: {{$${VARIABLE_NODE_ID}.myVar$}}`, + nodesMap: {}, + variables: { myVar: '$& some text' } + }); + expect(result1).toBe('Value: $& some text'); + + const result2 = replaceEditorVariable({ + text: `Price: {{$${VARIABLE_NODE_ID}.price$}}`, + nodesMap: {}, + variables: { price: '$100' } + }); + expect(result2).toBe('Price: $100'); + + const result3 = replaceEditorVariable({ + text: `Code: {{$${VARIABLE_NODE_ID}.code$}}`, + nodesMap: {}, + variables: { code: '$$' } + }); + expect(result3).toBe('Code: $$'); + }); }); describe('textAdaptGptResponse', () => { diff --git a/test/cases/service/core/workflow/dispatch/tools/http.test.ts b/test/cases/service/core/workflow/dispatch/tools/http.test.ts index 76b2ddf1b7..22c723be88 100644 --- a/test/cases/service/core/workflow/dispatch/tools/http.test.ts +++ b/test/cases/service/core/workflow/dispatch/tools/http.test.ts @@ -41,7 +41,7 @@ describe('replaceJsonBodyString', () => { const mockProps = { variables: mockVariables, allVariables: mockAllVariables, - runtimeNodes: mockRuntimeNodes + runtimeNodesMap: new Map(mockRuntimeNodes.map((node) => [node.nodeId, node])) }; describe('Basic variable replacement functionality', () => {