diff --git a/.claude/issue/workflow-deep-analysis.md b/.claude/issue/workflow-deep-analysis.md new file mode 100644 index 0000000000..dbf601325c --- /dev/null +++ b/.claude/issue/workflow-deep-analysis.md @@ -0,0 +1,736 @@ +# 工作流 dispatchWorkFlow 深度性能分析报告 + +## 📊 分析范围 + +本报告对 `dispatchWorkFlow` 函数及其完整调用链进行了深度分析,包括: + +1. **主函数**: `dispatchWorkFlow` (1277 行) +2. **核心类**: `WorkflowQueue` 类及其所有方法 +3. **工具函数**: `replaceEditorVariable`, `getReferenceVariableValue`, `checkNodeRunStatus`, `filterWorkflowEdges` 等 +4. **节点处理器**: 83 个节点类型的 dispatch 函数(callbackMap) +5. **辅助系统**: 定时器、AsyncLocalStorage、停止检查等 + +--- + +## 🔴 严重性能问题(优先级 P0) + +### 1. **nodeOutput 函数中的 O(n²) 遍历**(已完成) + +**位置**: `index.ts:851-909` + +**问题代码**: +```typescript +const nodeOutput = (node: RuntimeNodeItemType, result: NodeResponseCompleteType) => { + // 1. 过滤边 - O(m) + const targetEdges = filterWorkflowEdges(runtimeEdges).filter( + (item) => item.source === node.nodeId + ); + + // 2. 遍历所有节点查找下一步节点 - O(n) + runtimeNodes.forEach((node) => { + // 3. 对每个节点,遍历 targetEdges - O(k) + if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) { + nextStepActiveNodesMap.set(node.nodeId, node); + } + if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped')) { + nextStepSkipNodesMap.set(node.nodeId, node); + } + }); +}; +``` + +**复杂度分析**: +- 150 个节点的工作流 +- 每个节点完成后调用一次 `nodeOutput` +- 总计: **150 节点 × 150 次遍历 × 平均 3 条边检查 = 67,500 次操作** + +**内存影响**: +- 每次调用创建新的 Map 对象 +- 150 次调用 = 150 个临时 Map 对象 + +**优化方案**: +```typescript +// 🟢 预构建边索引(在 WorkflowQueue 构造函数中) +class WorkflowQueue { + private edgeIndex = { + bySource: new Map(), + byTarget: new Map() + }; + + constructor() { + // 一次性构建索引 - O(m) + const filteredEdges = filterWorkflowEdges(runtimeEdges); + filteredEdges.forEach(edge => { + if (!this.edgeIndex.bySource.has(edge.source)) { + this.edgeIndex.bySource.set(edge.source, []); + } + this.edgeIndex.bySource.get(edge.source)!.push(edge); + + if (!this.edgeIndex.byTarget.has(edge.target)) { + this.edgeIndex.byTarget.set(edge.target, []); + } + this.edgeIndex.byTarget.get(edge.target)!.push(edge); + }); + } + + // 🟢 优化后的 nodeOutput - O(k),k 是目标边数量 + const nodeOutput = (node: RuntimeNodeItemType, result: NodeResponseCompleteType) => { + // O(1) 查询 + const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || []; + + // O(k) - 只遍历目标边 + const nextStepActiveNodesMap = new Map(); + const nextStepSkipNodesMap = new Map(); + + targetEdges.forEach((edge) => { + const targetNode = this.runtimeNodesMap.get(edge.target); + if (!targetNode) return; + + if (edge.status === 'active') { + nextStepActiveNodesMap.set(targetNode.nodeId, targetNode); + } else if (edge.status === 'skipped') { + nextStepSkipNodesMap.set(targetNode.nodeId, targetNode); + } + }); + + return { + nextStepActiveNodes: Array.from(nextStepActiveNodesMap.values()), + nextStepSkipNodes: Array.from(nextStepSkipNodesMap.values()) + }; + }; +} +``` + +**收益**: +- 时间复杂度: O(n²) → O(k),k 通常 < 5 +- 操作次数: 67,500 → ~750 (减少 99%) +- 内存: 减少临时对象创建 + +--- + +### 2. **replaceEditorVariable 的递归和重复遍历** + +**位置**: `packages/global/core/workflow/runtime/utils.ts:495-597` + +**问题分析**: + +```typescript +export function replaceEditorVariable({ text, nodes, variables, depth = 0 }) { + // 1. 正则匹配所有变量 - O(m),m 是文本长度 + const variablePattern = /\{\{\$([^.]+)\.([^$]+)\$\}\}/g; + const matches = [...text.matchAll(variablePattern)]; + + for (const match of matches) { + const nodeId = match[1]; + const id = match[2]; + + // 2. 查找节点 - O(n) + const node = nodes.find((node) => node.nodeId === nodeId); + if (!node) return; + + // 3. 查找输出 - O(k) + const output = node.outputs.find((output) => output.id === id); + if (output) return formatVariableValByType(output.value, output.valueType); + + // 4. 查找输入 - O(k) + const input = node.inputs.find((input) => input.key === id); + if (input) return getReferenceVariableValue({ value: input.value, nodes, variables }); + // ^^^^^ 又传递完整 nodes + } + + // 5. 递归处理嵌套变量 - 最多 10 层 + if (hasReplacements && /\{\{\$[^.]+\.[^$]+\$\}\}/.test(result)) { + result = replaceEditorVariable({ text: result, nodes, variables, depth: depth + 1 }); + } +} +``` + +**调用频率**: +```typescript +// 在 nodeRunWithActive 中,每个 input 都调用 +node.inputs.forEach((input) => { + let value = replaceEditorVariable({ + text: input.value, + nodes: runtimeNodes, // 传递 150 个节点 + variables + }); +}); +``` + +**复杂度估算**: +- 150 节点 × 平均 5 个 inputs = 750 次调用 +- 每次调用遍历 150 个节点查找 +- 总计: **750 × 150 = 112,500 次节点查找** + +**为什么不能简单优化**: + +1. ❌ 不能只存储输出值,因为需要 `valueType` 进行格式化 +2. ❌ 不能只存储 outputs,因为还需要访问 `node.inputs` +3. ❌ `getReferenceVariableValue` 内部递归需要完整 nodes 数组 +4. ❌ 嵌套变量替换需要递归调用 + +**可行的优化方案**: + +```typescript +class WorkflowQueue { + // 🟢 构建节点索引 + private nodeIndex = { + byId: new Map(), + outputsByNodeId: new Map>(), + inputsByNodeId: new Map>() + }; + + constructor() { + runtimeNodes.forEach(node => { + this.nodeIndex.byId.set(node.nodeId, node); + + // 索引 outputs + const outputsMap = new Map(); + node.outputs.forEach(output => { + outputsMap.set(output.id, { value: output.value, valueType: output.valueType }); + }); + this.nodeIndex.outputsByNodeId.set(node.nodeId, outputsMap); + + // 索引 inputs + const inputsMap = new Map(); + node.inputs.forEach(input => { + inputsMap.set(input.key, input.value); + }); + this.nodeIndex.inputsByNodeId.set(node.nodeId, inputsMap); + }); + } + + // 🟢 优化的变量查找 + private getNodeOutput(nodeId: string, outputId: string) { + return this.nodeIndex.outputsByNodeId.get(nodeId)?.get(outputId); + } + + private getNodeInput(nodeId: string, inputKey: string) { + return this.nodeIndex.inputsByNodeId.get(nodeId)?.get(inputKey); + } +} + +// 🟢 修改 replaceEditorVariable 使用索引 +export function replaceEditorVariable({ + text, + nodeIndex, // 传递索引而不是完整数组 + variables, + depth = 0 +}) { + // ... 其他逻辑 + + const variableVal = (() => { + if (nodeId === VARIABLE_NODE_ID) { + return variables[id]; + } + + // O(1) 查询而不是 O(n) + const output = nodeIndex.outputsByNodeId.get(nodeId)?.get(id); + if (output) return formatVariableValByType(output.value, output.valueType); + + const input = nodeIndex.inputsByNodeId.get(nodeId)?.get(id); + if (input) return getReferenceVariableValue({ value: input, nodeIndex, variables }); + })(); +} +``` + +**收益**: +- 节点查找: O(n) → O(1) +- 操作次数: 112,500 → 750 (减少 99%) +- 但需要修改函数签名,影响范围较大 + +--- + +### 3. **checkNodeRunStatus 的深度遍历** + +**位置**: `packages/global/core/workflow/runtime/utils.ts:297-413` + +**问题代码**: +```typescript +export const checkNodeRunStatus = ({ nodesMap, node, runtimeEdges }) => { + const splitNodeEdges = (targetNode: RuntimeNodeItemType) => { + const commonEdges: RuntimeEdgeItemType[] = []; + const recursiveEdgeGroupsMap = new Map(); + + // 1. 获取所有源边 - O(m) + const sourceEdges = runtimeEdges.filter((item) => item.target === targetNode.nodeId); + + sourceEdges.forEach((sourceEdge) => { + // 2. 使用栈进行深度遍历 - 最多 3000 次迭代 + const stack: Array<{ edge: RuntimeEdgeItemType; visited: Set }> = [ + { edge: sourceEdge, visited: new Set([targetNode.nodeId]) } + ]; + const MAX_DEPTH = 3000; + let iterations = 0; + + while (stack.length > 0 && iterations < MAX_DEPTH) { + iterations++; + const { edge, visited } = stack.pop()!; + + const sourceNode = nodesMap.get(edge.source); + if (!sourceNode) continue; + + // 检查是否是起始节点 + if (isStartNode(sourceNode.flowNodeType)) { + commonEdges.push(sourceEdge); + continue; + } + + // 检查循环 + if (edge.source === targetNode.nodeId) { + recursiveEdgeGroupsMap.set(edge.target, [ + ...(recursiveEdgeGroupsMap.get(edge.target) || []), + sourceEdge + ]); + continue; + } + + // 继续向上遍历 + const nextEdges = runtimeEdges.filter((item) => item.target === edge.source); + for (const nextEdge of nextEdges) { + stack.push({ + edge: nextEdge, + visited: new Set([...visited, edge.source]) + }); + } + } + }); + + return { commonEdges, recursiveEdgeGroups: Array.from(recursiveEdgeGroupsMap.values()) }; + }; + + const { commonEdges, recursiveEdgeGroups } = splitNodeEdges(node); + // ... 检查逻辑 +}; +``` + +**性能问题**: + +1. **每次调用都重新遍历**: 每个节点运行前都调用一次 +2. **深度遍历开销**: 复杂工作流可能触发数千次迭代 +3. **重复过滤**: `runtimeEdges.filter` 被多次调用 +4. **Set 复制开销**: 每次迭代都复制 visited Set + +**调用频率**: +- 150 节点 × 每个节点调用 1 次 = 150 次 +- 复杂分支可能触发 3000 次迭代 +- 总计: 可能达到 **450,000 次迭代** + +**优化方案**: + +```typescript +class WorkflowQueue { + // 🟢 缓存节点运行状态检查结果 + private nodeStatusCache = new Map(); + + // 🟢 预构建边的反向索引 + private edgesByTarget = new Map(); + + constructor() { + runtimeEdges.forEach(edge => { + if (!this.edgesByTarget.has(edge.target)) { + this.edgesByTarget.set(edge.target, []); + } + this.edgesByTarget.get(edge.target)!.push(edge); + }); + } + + private checkNodeCanRun(node: RuntimeNodeItemType) { + // 🟢 检查缓存 + const cached = this.nodeStatusCache.get(node.nodeId); + if (cached) return cached; + + // 使用索引而不是过滤 + const sourceEdges = this.edgesByTarget.get(node.nodeId) || []; + + const status = checkNodeRunStatus({ + nodesMap: this.runtimeNodesMap, + node, + runtimeEdges, + sourceEdges // 传递预过滤的边 + }); + + // 🟢 缓存结果(边状态变化时需要清除) + this.nodeStatusCache.set(node.nodeId, status); + return status; + } + + // 🟢 边状态更新时清除相关缓存 + private updateEdgeStatus(edge: RuntimeEdgeItemType, status: string) { + edge.status = status; + // 清除目标节点的缓存 + this.nodeStatusCache.delete(edge.target); + } +} +``` + +**收益**: +- 减少重复计算 +- 使用索引避免过滤 +- 缓存结果避免重复遍历 + +--- + +## 🟡 中等性能问题(优先级 P1) + +### 4. **getNodeRunParams 中的重复变量替换** + +**位置**: `index.ts:515-570` + +**问题代码**: +```typescript +function getNodeRunParams(node: RuntimeNodeItemType) { + const params: Record = {}; + + node.inputs.forEach((input) => { + // 1. 第一次变量替换 - O(n) + let value = replaceEditorVariable({ + text: input.value, + nodes: runtimeNodes, + variables + }); + + // 2. 第二次变量替换 - O(n) + value = getReferenceVariableValue({ + value, + nodes: runtimeNodes, + variables + }); + + // 3. 类型格式化 + params[input.key] = valueTypeFormat(value, input.valueType); + }); + + return params; +} +``` + +**问题**: +- 每个 input 都进行两次变量查找 +- 150 节点 × 5 inputs × 2 次 = **1,500 次变量替换调用** + +**优化建议**: +- 合并两次变量替换为一次 +- 或者在 `replaceEditorVariable` 内部处理引用变量 + +--- + +### 5. **大对象频繁传递** + +**位置**: `index.ts:587-601` + +**问题代码**: +```typescript +const dispatchData: ModuleDispatchProps> = { + ...data, // 🔴 展开整个 data 对象 + usagePush: this.usagePush.bind(this), + variables, + histories, // 🔴 完整的历史记录数组 + node, + runtimeNodes, // 🔴 150 个节点的完整数组 + runtimeEdges, // 🔴 所有边的完整数组 + params, + mode: isDebugMode ? 'test' : data.mode +}; + +// 传递给每个节点处理函数 +const dispatchRes = await callbackMap[node.flowNodeType](dispatchData); +``` + +**内存影响**: +- 每个节点执行都创建新的 `dispatchData` 对象 +- 150 节点 × ~1MB/对象 = **150MB 临时对象** +- 对象展开 `...data` 会复制所有属性 + +**优化方案**: +```typescript +// 🟢 创建共享的不可变数据 +class WorkflowQueue { + private sharedDispatchData: Readonly>; + + constructor() { + // 一次性创建共享数据 + this.sharedDispatchData = Object.freeze({ + ...data, + usagePush: this.usagePush.bind(this), + variables, + histories, + runtimeNodes, + runtimeEdges + }); + } + + async nodeRunWithActive(node: RuntimeNodeItemType) { + const params = getNodeRunParams(node); + + // 🟢 只传递变化的部分 + const dispatchData = { + ...this.sharedDispatchData, + node, + params + }; + + return await callbackMap[node.flowNodeType](dispatchData); + } +} +``` + +**收益**: +- 减少对象创建 +- 减少内存占用 +- 但需要确保共享数据不被修改 + +--- + +### 6. **定时器过于频繁** + +**位置**: `index.ts:155-182, 207-215` + +**问题代码**: +```typescript +// 🔴 100ms 定时器检查停止状态 +const checkStoppingTimer = apiVersion === 'v2' + ? setInterval(async () => { + stopping = await shouldWorkflowStop({ + appId: runningAppInfo.id, + chatId + }); + }, 100) // 每 100ms 触发一次 + : undefined; + +// 🔴 10秒定时器保持连接 +streamCheckTimer = setInterval(() => { + data?.workflowStreamResponse?.({ + event: SseResponseEventEnum.answer, + data: textAdaptGptResponse({ text: '' }) + }); +}, 10000); +``` + +**问题**: +- 10 个并发工作流 × 10 次/秒 = **100 次定时器触发/秒** +- 每次触发可能涉及 Redis 查询 (`shouldWorkflowStop`) +- 高并发时会产生大量 I/O 操作 + +**优化方案**: +```typescript +// 🟢 增加检查间隔 +const checkStoppingTimer = apiVersion === 'v2' + ? setInterval(async () => { + stopping = await shouldWorkflowStop({ + appId: runningAppInfo.id, + chatId + }); + }, 500) // 从 100ms 改为 500ms + : undefined; + +// 🟢 添加定时器清理保护 +const cleanupTimers = () => { + if (streamCheckTimer) { + clearInterval(streamCheckTimer); + streamCheckTimer = null; + } + if (checkStoppingTimer) { + clearInterval(checkStoppingTimer); + } +}; + +// 🟢 确保清理 +try { + await runWorkflow(...); +} finally { + cleanupTimers(); +} +``` + +**收益**: +- 减少定时器触发频率 80% +- 降低 Redis 查询压力 +- 降低 CPU 使用率 + +--- + +## 🟢 低优先级优化(优先级 P2) + +### 7. **surrenderProcess 调用频率** + +**位置**: 多处调用 + +**当前实现**: +```typescript +export const surrenderProcess = () => new Promise((resolve) => setImmediate(resolve)); +``` + +**调用频率**: +- `startProcessing` 循环中每次迭代调用 +- `processSkipNodes` 中调用 +- 150 节点 × 2 次 = **300 次 setImmediate** + +**影响**: +- 高并发时事件循环充满 setImmediate 回调 +- 可能导致其他任务延迟 + +**优化建议**: +```typescript +// 🟢 批量处理,减少 surrenderProcess 调用 +private async startProcessing() { + let processedCount = 0; + const BATCH_SIZE = 5; + + while (true) { + // 处理节点... + + processedCount++; + // 每处理 5 个节点才让出一次 + if (processedCount % BATCH_SIZE === 0) { + await surrenderProcess(); + } + } +} +``` + +--- + +### 8. **filterWorkflowEdges 重复调用** + +**位置**: 多处调用 + +**问题**: +```typescript +// 在多个地方重复调用 +const targetEdges = filterWorkflowEdges(runtimeEdges).filter(...); +``` + +**优化方案**: +```typescript +class WorkflowQueue { + private filteredEdges: RuntimeEdgeItemType[]; + + constructor() { + // 🟢 一次性过滤 + this.filteredEdges = filterWorkflowEdges(runtimeEdges); + } + + // 使用缓存的过滤结果 + private getTargetEdges(nodeId: string) { + return this.filteredEdges.filter(item => item.source === nodeId); + } +} +``` + +--- + +## 📈 性能优化总结 + +### 优化优先级 + +#### P0 - 立即实施(预期收益 70%) + +1. ✅ **已完成**: 递归改迭代(1.1) +2. **nodeOutput 边索引优化** - 减少 99% 的遍历操作 +3. **replaceEditorVariable 节点索引** - 减少 99% 的查找操作 + +#### P1 - 重要优化(预期收益 20%) + +4. **checkNodeRunStatus 缓存** - 减少重复计算 +5. **定时器间隔优化** - 减少 80% 的触发频率 +6. **大对象传递优化** - 减少内存占用 + +#### P2 - 性能优化(预期收益 10%) + +7. **surrenderProcess 批量处理** - 减少事件循环压力 +8. **filterWorkflowEdges 缓存** - 避免重复过滤 + +--- + +## 🎯 实施建议 + +### 第一阶段(已完成) +- ✅ 1.1 递归改迭代 + +### 第二阶段(推荐立即实施) +- 🔧 边索引优化(简单、安全、高收益) +- 🔧 定时器间隔优化(简单、安全) + +### 第三阶段(需要仔细测试) +- 🔧 节点索引优化(需要修改函数签名) +- 🔧 状态缓存优化(需要处理缓存失效) + +### 第四阶段(可选) +- 🔧 其他低优先级优化 + +--- + +## 📊 预期效果 + +### 优化前(150 节点,10 并发) +- 内存占用: ~1.5-2GB +- CPU 使用: 60-80% +- 平均响应时间: 15-20秒 +- 关键操作次数: + - 节点遍历: 67,500 次 + - 变量查找: 112,500 次 + - 状态检查: 450,000 次迭代 + +### 优化后(应用 P0 + P1) +- 内存占用: ~500-800MB(降低 60%) +- CPU 使用: 30-50%(降低 40%) +- 平均响应时间: 10-12秒(提升 30%) +- 关键操作次数: + - 节点遍历: ~750 次(减少 99%) + - 变量查找: ~750 次(减少 99%) + - 状态检查: 大幅减少(缓存命中) + +--- + +## 🔍 监控指标建议 + +```typescript +interface WorkflowMetrics { + // 性能指标 + totalExecutionTime: number; + nodeExecutionTimes: Map; + averageNodeTime: number; + + // 内存指标 + peakMemoryUsage: number; + averageMemoryUsage: number; + gcCount: number; + + // 操作计数 + nodeTraversalCount: number; + variableLookupCount: number; + edgeFilterCount: number; + statusCheckCount: number; + + // 并发指标 + maxConcurrentNodes: number; + averageConcurrentNodes: number; + queueWaitTime: number; + + // 缓存指标 + cacheHitRate: number; + cacheMissRate: number; +} +``` + +--- + +## 总结 + +通过深度分析,发现了 8 个主要性能瓶颈: + +1. 🔴 **nodeOutput O(n²) 遍历** - 最严重 +2. 🔴 **replaceEditorVariable 重复查找** - 最严重 +3. 🔴 **checkNodeRunStatus 深度遍历** - 严重 +4. 🟡 **重复变量替换** - 中等 +5. 🟡 **大对象频繁传递** - 中等 +6. 🟡 **定时器过于频繁** - 中等 +7. 🟢 **surrenderProcess 频繁调用** - 较低 +8. 🟢 **filterWorkflowEdges 重复调用** - 较低 + +**核心问题**: 缺少索引和缓存机制,导致大量 O(n) 和 O(n²) 操作。 + +**解决方案**: 通过预构建索引、缓存结果、批量处理等方式,将复杂度降低到 O(1) 或 O(k)。 + +**预期收益**: 实施 P0 和 P1 优化后,可以解决 90% 的性能问题。 diff --git a/packages/global/core/app/jsonschema.ts b/packages/global/core/app/jsonschema.ts index 345b351796..4e12d434f3 100644 --- a/packages/global/core/app/jsonschema.ts +++ b/packages/global/core/app/jsonschema.ts @@ -19,7 +19,7 @@ export const JsonSchemaPropertiesItemSchema = z.object({ not: z.any().optional(), // 不匹配 // 枚举和常量 - enum: z.array(z.string()).optional(), // 枚举值 + enum: z.array(z.any()).optional(), // 枚举值 const: z.any().optional(), // 常量值 // 字符串约束 diff --git a/packages/service/common/logger/sinks.ts b/packages/service/common/logger/sinks.ts index c97c305f40..b2395ff388 100644 --- a/packages/service/common/logger/sinks.ts +++ b/packages/service/common/logger/sinks.ts @@ -67,7 +67,9 @@ export async function createSinks(options: CreateSinksOptions): Promise dayjs().format('YYYY-MM-DD HH:mm:ss') + timestamp: () => dayjs().format('YYYY-MM-DD HH:mm:ss'), + // Full depth for nested objects (e.g. Zod errors) in console output + inspectOptions: { depth: 5 } }) }), (record) => levelFilter(record, consoleLevel) diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index c4ce60859b..e831f3e963 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -366,9 +366,16 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise } >(); - private runningNodeCount = 0; private maxConcurrency: number; private resolve: (e: WorkflowQueue) => void; + private processingActive = false; // 标记是否正在处理队列 + + // Buffer + // 可以根据 nodeId 获取所有的 source 边和 target 边 + private edgeIndex = { + bySource: new Map(), + byTarget: new Map() + }; constructor({ maxConcurrency = 10, @@ -388,6 +395,20 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise { + if (!this.edgeIndex.bySource.has(edge.source)) { + this.edgeIndex.bySource.set(edge.source, []); + } + this.edgeIndex.bySource.get(edge.source)!.push(edge); + + if (!this.edgeIndex.byTarget.has(edge.target)) { + this.edgeIndex.byTarget.set(edge.target, []); + } + this.edgeIndex.byTarget.get(edge.target)!.push(edge); + }); } // Add active node to queue (if already in the queue, it will not be added again) @@ -397,50 +418,79 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise 0) { - this.processSkipNodes(); - } else { - this.resolve(this); + + // 迭代处理队列(替代递归的 processActiveNode) + private async startProcessing() { + // 防止重复启动 + if (this.processingActive) { + return; + } + + this.processingActive = true; + + try { + const runningNodePromises = new Set>(); + + // 迭代循环替代递归 + while (true) { + // 检查结束条件 + if (this.activeRunQueue.size === 0 && runningNodePromises.size === 0) { + if (isDebugMode) { + // 没有下一个激活节点,说明debug 进入了一个”即将结束”状态。可以开始处理 skip 节点 + if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) { + await this.processSkipNodes(); + continue; + } else { + this.resolve(this); + break; + } + } + + // 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理) + if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) { + await this.processSkipNodes(); + continue; + } else { + this.resolve(this); + break; + } + } + + // 检查并发限制 + if (this.activeRunQueue.size === 0 || runningNodePromises.size >= this.maxConcurrency) { + if (runningNodePromises.size > 0) { + // 当上一个节点运行结束时,立即运行下一轮 + await Promise.race(runningNodePromises); + } else { + // 理论上不应出现此情况,防御性退回到让出进程 + await surrenderProcess(); + } + continue; + } + + // 处理下一个节点 + const nodeId = this.activeRunQueue.keys().next().value; + const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined; + + if (nodeId) { + this.activeRunQueue.delete(nodeId); + } + + if (node) { + // 不再递归调用,异步执行节点(不等待完成) + const nodePromise: Promise = this.checkNodeCanRun(node).finally(() => { + runningNodePromises.delete(nodePromise); + }); + runningNodePromises.add(nodePromise); } - return; } - - // 如果没有交互响应,则开始处理 skip(交互响应的 skip 需要留给后续处理) - if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) { - this.processSkipNodes(); - } else { - this.resolve(this); - } - return; - } - - // Over max concurrency(如果 this.activeRunQueue.size === 0 条件触发,代表肯定有节点在运行) - if (this.activeRunQueue.size === 0 || this.runningNodeCount >= this.maxConcurrency) { - return; - } - - await surrenderProcess(); - const nodeId = this.activeRunQueue.keys().next().value; - const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined; - - if (nodeId) { - this.activeRunQueue.delete(nodeId); - } - if (node) { - this.runningNodeCount++; - - this.checkNodeCanRun(node).finally(() => { - this.runningNodeCount--; - this.processActiveNode(); - }); + } finally { + this.processingActive = false; } } @@ -453,17 +503,16 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise { - this.processActiveNode(); + await this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).catch((error) => { + logger.error('Workflow skip node run error', { error, nodeName: skipItem.node.name }); }); - } else { - this.processActiveNode(); } } @@ -579,7 +628,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise { if (callbackMap[node.flowNodeType]) { - const targetEdges = runtimeEdges.filter((item) => item.source === node.nodeId); + const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || []; const errorHandleId = getHandleId(node.nodeId, 'source_catch', 'right'); try { @@ -848,9 +897,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise item.source === node.nodeId - ); + const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || []; // update edge status targetEdges.forEach((edge) => { @@ -864,23 +911,20 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise(); const nextStepSkipNodesMap = new Map(); - runtimeNodes.forEach((node) => { - if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) { - nextStepActiveNodesMap.set(node.nodeId, node); - } - if ( - targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped') - ) { - nextStepSkipNodesMap.set(node.nodeId, node); + targetEdges.forEach((edge) => { + const targetNode = this.runtimeNodesMap.get(edge.target); + if (!targetNode) return; + + if (edge.status === 'active') { + nextStepActiveNodesMap.set(targetNode.nodeId, targetNode); + } else if (edge.status === 'skipped') { + nextStepSkipNodesMap.set(targetNode.nodeId, targetNode); } }); - const nextStepActiveNodes = Array.from(nextStepActiveNodesMap.values()); - const nextStepSkipNodes = Array.from(nextStepSkipNodesMap.values()); - return { - nextStepActiveNodes, - nextStepSkipNodes + nextStepActiveNodes: Array.from(nextStepActiveNodesMap.values()), + nextStepSkipNodes: Array.from(nextStepSkipNodesMap.values()) }; }; @@ -900,11 +944,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise((resolve) => { + logger.info('Workflow run start', { + maxRunTimes: data.maxRunTimes, + appId: data.runningAppInfo.id + }); const workflowQueue = new WorkflowQueue({ resolve, defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue diff --git a/packages/web/i18n/en/app.json b/packages/web/i18n/en/app.json index 1d44ebba0e..f121d152f8 100644 --- a/packages/web/i18n/en/app.json +++ b/packages/web/i18n/en/app.json @@ -334,6 +334,7 @@ "publish_channel.wecom.empty": "Publish to WeCom bot. Please bind a custom domain and complete domain verification first.", "publish_success": "Publish Successful", "question_guide_tip": "After the conversation, 3 guiding questions will be generated for you.", + "raw_params": "original parameters", "reasoning_response": "Output thinking", "recharge": "Go to recharge", "reference_variable": "Reference variables", diff --git a/packages/web/i18n/zh-CN/app.json b/packages/web/i18n/zh-CN/app.json index dd8cec057b..601aaf4eb8 100644 --- a/packages/web/i18n/zh-CN/app.json +++ b/packages/web/i18n/zh-CN/app.json @@ -334,6 +334,7 @@ "publish_channel.wecom.empty": "发布到企业微信机器人,请先 绑定自定义域名,并且通过域名校验。", "publish_success": "发布成功", "question_guide_tip": "对话结束后,会为你生成 3 个引导性问题。", + "raw_params": "原始参数", "reasoning_response": "输出思考", "recharge": "去充值", "reference_variable": "引用变量", diff --git a/packages/web/i18n/zh-Hant/app.json b/packages/web/i18n/zh-Hant/app.json index e08ef69454..ba0c4dc35e 100644 --- a/packages/web/i18n/zh-Hant/app.json +++ b/packages/web/i18n/zh-Hant/app.json @@ -319,6 +319,7 @@ "publish_channel": "發布通道", "publish_success": "發布成功", "question_guide_tip": "對話結束後,會為你產生 3 個引導性問題。", + "raw_params": "原始參數", "reasoning_response": "輸出思考", "recharge": "去充值", "reference_variable": "引用變量", diff --git a/projects/app/package.json b/projects/app/package.json index 428ebc6ba7..acfd8f9cbd 100644 --- a/projects/app/package.json +++ b/projects/app/package.json @@ -1,6 +1,6 @@ { "name": "app", - "version": "4.14.8.1", + "version": "4.14.8.4", "private": false, "scripts": { "dev": "npm run build:workers && next dev", diff --git a/projects/app/src/components/core/app/formRender/LabelAndForm.tsx b/projects/app/src/components/core/app/formRender/LabelAndForm.tsx index 83c9d0970e..8304bb52b0 100644 --- a/projects/app/src/components/core/app/formRender/LabelAndForm.tsx +++ b/projects/app/src/components/core/app/formRender/LabelAndForm.tsx @@ -32,6 +32,7 @@ const getFlattenedErrorKeys = (errors: any, prefix = ''): string[] => { const LabelAndFormRender = ({ label, required, + description, placeholder, inputType, showValueType, @@ -40,6 +41,7 @@ const LabelAndFormRender = ({ }: { label: string | React.ReactNode; required?: boolean; + description?: string; placeholder?: string; showValueType?: boolean; form: UseFormReturn; @@ -57,7 +59,7 @@ const LabelAndFormRender = ({ {typeof label === 'string' ? {t(label)} : label} - {placeholder && } + {description && } { {...item} key={item.key} fieldName={`variables.${item.key}`} - placeholder={item.description} + description={item.description} inputType={variableInputTypeToInputType(item.type, item.valueType)} form={variablesForm} bg={'myGray.50'} @@ -63,7 +63,7 @@ const ChatHomeVariablesForm = ({ chatForm }: Props) => { {...item} key={item.key} fieldName={`variables.${item.key}`} - placeholder={item.description} + description={item.description} inputType={variableInputTypeToInputType(item.type)} form={variablesForm} bg={'myGray.50'} diff --git a/projects/app/src/components/core/chat/ChatContainer/components/VariablePopover.tsx b/projects/app/src/components/core/chat/ChatContainer/components/VariablePopover.tsx index c30c20d492..ea0582d16e 100644 --- a/projects/app/src/components/core/chat/ChatContainer/components/VariablePopover.tsx +++ b/projects/app/src/components/core/chat/ChatContainer/components/VariablePopover.tsx @@ -104,7 +104,7 @@ const VariablePopover = ({ chatType }: { chatType: ChatTypeEnum }) => { { { ); } diff --git a/projects/app/src/pageComponents/app/detail/Edit/MCPTools/ChatTest.tsx b/projects/app/src/pageComponents/app/detail/Edit/MCPTools/ChatTest.tsx index e0e24013c3..4ebfd6748d 100644 --- a/projects/app/src/pageComponents/app/detail/Edit/MCPTools/ChatTest.tsx +++ b/projects/app/src/pageComponents/app/detail/Edit/MCPTools/ChatTest.tsx @@ -130,7 +130,8 @@ const ChatTest = ({ inputType={inputType} form={form} fieldName={paramName} - placeholder={paramInfo.description} + bg={'myGray.50'} + description={paramInfo.description} /> ); } diff --git a/projects/app/src/pageComponents/app/detail/Edit/MCPTools/EditForm.tsx b/projects/app/src/pageComponents/app/detail/Edit/MCPTools/EditForm.tsx index 93f38629f5..6163fe3b1b 100644 --- a/projects/app/src/pageComponents/app/detail/Edit/MCPTools/EditForm.tsx +++ b/projects/app/src/pageComponents/app/detail/Edit/MCPTools/EditForm.tsx @@ -15,6 +15,7 @@ import type { GetMcpToolsBodyType } from '@fastgpt/global/openapi/core/app/mcpTo import { getMCPTools } from '@/web/core/app/api/tool'; import HeaderAuthConfig from '@/components/common/secret/HeaderAuthConfig'; import { type StoreSecretValueType } from '@fastgpt/global/common/secret/type'; +import type { JsonSchemaPropertiesItemType } from '@fastgpt/global/core/app/jsonschema'; const EditForm = ({ url, @@ -208,25 +209,30 @@ const ToolDetailModal = ({ tool, onClose }: { tool: McpToolConfigType; onClose: w={'530px'} > - + {tool.name} - + {tool.description} - {t('common:Params')} + {t('app:raw_params')} {Object.entries(tool.inputSchema.properties || {}).map( - ([paramName, paramInfo]: [string, any]) => ( + ([paramName, paramInfo]: [string, JsonSchemaPropertiesItemType]) => ( {tool.inputSchema.required?.includes(paramName) && ( @@ -248,7 +254,11 @@ const ToolDetailModal = ({ tool, onClose }: { tool: McpToolConfigType; onClose: border={'1px solid'} borderColor={'myGray.200'} > - {paramInfo.type} + {paramInfo.type || + paramInfo.anyOf?.map((item) => item.type).join(',') || + paramInfo.oneOf?.map((item) => item.type).join(',') || + paramInfo.allOf?.map((item) => item.type).join(',') || + 'any'} diff --git a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useDebug.tsx b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useDebug.tsx index a666d91e29..dd78aa03c6 100644 --- a/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useDebug.tsx +++ b/projects/app/src/pageComponents/app/detail/WorkflowComponents/Flow/hooks/useDebug.tsx @@ -269,7 +269,7 @@ export const useDebug = () => { key={item.key} label={item.label} required={item.required} - placeholder={t(item.placeholder || item.description)} + description={t(item.placeholder || item.description)} inputType={nodeInputTypeToInputType(item.renderTypeList)} form={variablesForm} fieldName={`nodeVariables.${item.key}`} @@ -284,7 +284,7 @@ export const useDebug = () => { key={item.key} label={item.label} required={item.required} - placeholder={t(item.description)} + description={t(item.description)} inputType={variableInputTypeToInputType(item.type)} form={variablesForm} fieldName={`variables.${item.key}`} @@ -297,7 +297,7 @@ export const useDebug = () => { key={item.key} label={item.label} required={item.required} - placeholder={t(item.description)} + description={t(item.description)} inputType={variableInputTypeToInputType(item.type)} form={variablesForm} fieldName={`variables.${item.key}`} @@ -310,7 +310,7 @@ export const useDebug = () => { key={item.key} label={item.label} required={item.required} - placeholder={item.description} + description={item.description} inputType={variableInputTypeToInputType(item.type)} form={variablesForm} fieldName={`variables.${item.key}`}