diff --git a/packages/service/core/workflow/dispatch/abandoned/runLoop.ts b/packages/service/core/workflow/dispatch/abandoned/runLoop.ts index b509c6a9d0..d5b24c1a08 100644 --- a/packages/service/core/workflow/dispatch/abandoned/runLoop.ts +++ b/packages/service/core/workflow/dispatch/abandoned/runLoop.ts @@ -32,6 +32,7 @@ export const dispatchLoop = async (props: Props): Promise => { runtimeEdges, lastInteractive, runtimeNodes, + checkIsStopping, node: { name } } = props; const { loopInputArray = [], childrenNodeIdList = [] } = params; @@ -60,6 +61,9 @@ export const dispatchLoop = async (props: Props): Promise => { let index = 0; for await (const item of loopInputArray) { + if (checkIsStopping()) { + break; + } // Skip already looped if (lastIndex && index < lastIndex) { index++; diff --git a/packages/service/core/workflow/dispatch/loopRun/runLoopRun.ts b/packages/service/core/workflow/dispatch/loopRun/runLoopRun.ts index 0700db797e..a2db111597 100644 --- a/packages/service/core/workflow/dispatch/loopRun/runLoopRun.ts +++ b/packages/service/core/workflow/dispatch/loopRun/runLoopRun.ts @@ -41,7 +41,7 @@ type Props = ModuleDispatchProps<{ type Response = DispatchNodeResultType>; export const dispatchLoopRun = async (props: Props): Promise => { - const { params, runtimeNodes, runtimeEdges, node, lastInteractive } = props; + const { params, runtimeNodes, runtimeEdges, node, lastInteractive, checkIsStopping } = props; const { name } = node; const mode = params[NodeInputKeyEnum.loopRunMode] ?? LoopRunModeEnum.array; const childrenNodeIdList = params[NodeInputKeyEnum.childrenNodeIdList] ?? []; @@ -120,6 +120,9 @@ export const dispatchLoopRun = async (props: Props): Promise => { let maxIterationsExceeded = false; while (true) { + if (checkIsStopping()) { + break; + } // Check exhaustion before maxLength so `inputArray.length === maxLength` runs cleanly. const arrayItem = (() => { if (mode !== LoopRunModeEnum.array) { diff --git a/packages/service/core/workflow/dispatch/parallelRun/runParallelRun.ts b/packages/service/core/workflow/dispatch/parallelRun/runParallelRun.ts index 6585a9b137..0e6f72b8a5 100644 --- a/packages/service/core/workflow/dispatch/parallelRun/runParallelRun.ts +++ b/packages/service/core/workflow/dispatch/parallelRun/runParallelRun.ts @@ -35,7 +35,7 @@ type Response = DispatchNodeResultType<{ }>; export const dispatchParallelRun = async (props: Props): Promise => { - const { params, runtimeNodes, runtimeEdges, node } = props; + const { params, runtimeNodes, runtimeEdges, node, checkIsStopping } = props; const { name } = node; const { loopInputArray = [], @@ -70,6 +70,9 @@ export const dispatchParallelRun = async (props: Props): Promise => { let accumulatedPoints = 0; for (let attempt = 0; attempt < maxRetryAttempts + 1; attempt++) { + if (checkIsStopping()) { + return; + } const { taskRuntimeNodes, taskRuntimeEdges } = buildTaskRuntimeContext({ runtimeNodes, runtimeEdges, @@ -108,7 +111,7 @@ export const dispatchParallelRun = async (props: Props): Promise => { // taskRuntimeNodes / taskRuntimeEdges go out of scope → GC } - return lastResult!; + return lastResult; }, concurrency ); @@ -122,10 +125,13 @@ export const dispatchParallelRun = async (props: Props): Promise => { responseDetails, assistantResponses, customFeedbacks - } = aggregateParallelResults(taskResults, { - taskInputs: loopInputArray, - parentNodeId: node.nodeId - }); + } = aggregateParallelResults( + taskResults.filter((item) => item !== undefined), + { + taskInputs: loopInputArray, + parentNodeId: node.nodeId + } + ); return { data: { diff --git a/test/cases/service/core/workflow/dispatch/loopRun/runLoopRun.test.ts b/test/cases/service/core/workflow/dispatch/loopRun/runLoopRun.test.ts index 3a7d29f37a..6b67a39120 100644 --- a/test/cases/service/core/workflow/dispatch/loopRun/runLoopRun.test.ts +++ b/test/cases/service/core/workflow/dispatch/loopRun/runLoopRun.test.ts @@ -171,7 +171,8 @@ const makeProps = ( runtimeEdges: [], variables: {}, usagePush: vi.fn(), - lastInteractive: undefined + lastInteractive: undefined, + checkIsStopping: () => false } as any; }; @@ -216,7 +217,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { runtimeEdges: [], variables: {}, usagePush: vi.fn(), - lastInteractive: undefined + lastInteractive: undefined, + checkIsStopping: () => false } as any; const result: any = await dispatchLoopRun(props); @@ -268,7 +270,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { runtimeEdges: [], variables: {}, usagePush: vi.fn(), - lastInteractive: undefined + lastInteractive: undefined, + checkIsStopping: () => false } as any; const result: any = await dispatchLoopRun(props); @@ -482,7 +485,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { iteration: 2, childrenResponse: { entryNodeIds: ['userSelectNode'] } } - } + }, + checkIsStopping: () => false } as any; const result: any = await dispatchLoopRun(props); @@ -533,7 +537,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { iteration: 2, childrenResponse: interactivePayload } - } + }, + checkIsStopping: () => false } as any; await dispatchLoopRun(props); @@ -611,7 +616,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { runtimeEdges: [], variables: {}, usagePush: vi.fn(), - lastInteractive: undefined + lastInteractive: undefined, + checkIsStopping: () => false } as any; const result: any = await dispatchLoopRun(props); @@ -781,6 +787,7 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => { runtimeEdges: [], variables: {}, usagePush: vi.fn(), + checkIsStopping: () => false, lastInteractive: { type: 'loopRunInteractive', params: {