diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index ad6f2988b..b168a8268 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -275,60 +275,8 @@ export async function dispatchWorkFlow(data: Props): Promise { - const status = checkNodeRunStatus({ - node, - runtimeEdges - }); - - if (res?.closed || props.maxRunTimes <= 0) return; - props.maxRunTimes--; - addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id }); - - await surrenderProcess(); - - const response: - | { - node: RuntimeNodeItemType; - result: Record; - } - | undefined = await (() => { - if (status === 'run') { - addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`); - return nodeRunWithActive(node); - } - if (status === 'skip') { - addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`); - return nodeRunWithSkip(node); - } - })(); - - if (!response) return; - - // Update the node output at the end of the run and get the next nodes - const nextNodes = nodeOutput(response.node, response.result); - // Remove repeat nodes(Make sure that the node is only executed once) - const filterNextNodes = nextNodes.filter( - (node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index - ); - - // In the current version, only one interactive node is allowed at the same time - const interactiveResponse: UserInteractiveType | undefined = - response.result?.[DispatchNodeResponseKeyEnum.interactive]; - if (interactiveResponse) { - chatAssistantResponse.push( - handleInteractiveResult({ - entryNodeIds: [response.node.nodeId], - interactiveResponse - }) - ); - return; - } - - return Promise.all(filterNextNodes.map(checkNodeCanRun)); - } - // 运行完一轮后,清除连线的状态,避免污染进程 - function nodeRunFinish(node: RuntimeNodeItemType) { + // 每个节点 运行/跳过 后,初始化边的状态 + function nodeRunAfterHook(node: RuntimeNodeItemType) { node.isEntry = false; runtimeEdges.forEach((item) => { @@ -337,6 +285,69 @@ export async function dispatchWorkFlow(data: Props): Promise { + return Promise.all( + nodes.map(async (node) => { + const status = checkNodeRunStatus({ + node, + runtimeEdges + }); + + if (res?.closed || props.maxRunTimes <= 0) return; + + addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id }); + + // Thread avoidance + await surrenderProcess(); + + if (status === 'run') { + addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`); + return nodeRunWithActive(node); + } + if (status === 'skip') { + addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`); + return nodeRunWithSkip(node); + } + + return; + }) + ).then((result) => { + props.maxRunTimes--; + + const flat = result.flat().filter(Boolean) as unknown as { + node: RuntimeNodeItemType; + result: Record; + }[]; + if (flat.length === 0) return; + + // Update the node output at the end of the run and get the next nodes + const nextNodes = flat.map((item) => nodeOutput(item.node, item.result)).flat(); + // Remove repeat nodes(Make sure that the node is only executed once) + const filterNextNodes = nextNodes.filter( + (node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index + ); + + // In the current version, only one interactive node is allowed at the same time + const haveInteractiveResponse = flat + .map((response) => { + const interactiveResponse = response.result?.[DispatchNodeResponseKeyEnum.interactive]; + if (interactiveResponse) { + chatAssistantResponse.push( + handleInteractiveResult({ + entryNodeIds: [response.node.nodeId], + interactiveResponse + }) + ); + return 1; + } + }) + .filter(Boolean); + if (haveInteractiveResponse.length > 0) return; + + return checkNodeCanRun(filterNextNodes); + }); + } /* Inject data into module input */ function getNodeRunParams(node: RuntimeNodeItemType) { if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) { @@ -444,7 +455,7 @@ export async function dispatchWorkFlow(data: Props): Promise item.source === node.nodeId); - nodeRunFinish(node); + nodeRunAfterHook(node); return { node, @@ -474,7 +485,7 @@ export async function dispatchWorkFlow(data: Props): Promise { // item.isEntry = false; // }); - await Promise.all(entryNodes.map(checkNodeCanRun)); + await checkNodeCanRun(entryNodes); // focus try to run pluginOutput const pluginOutputModule = runtimeNodes.find(