perf: stop loop (#6816)

* perf: stop loop

* fix: test
This commit is contained in:
Archer
2026-04-25 17:27:33 +08:00
committed by GitHub
parent 306d797adc
commit 74264c5c37
4 changed files with 33 additions and 13 deletions
@@ -32,6 +32,7 @@ export const dispatchLoop = async (props: Props): Promise<Response> => {
runtimeEdges,
lastInteractive,
runtimeNodes,
checkIsStopping,
node: { name }
} = props;
const { loopInputArray = [], childrenNodeIdList = [] } = params;
@@ -60,6 +61,9 @@ export const dispatchLoop = async (props: Props): Promise<Response> => {
let index = 0;
for await (const item of loopInputArray) {
if (checkIsStopping()) {
break;
}
// Skip already looped
if (lastIndex && index < lastIndex) {
index++;
@@ -41,7 +41,7 @@ type Props = ModuleDispatchProps<{
type Response = DispatchNodeResultType<Record<string, any>>;
export const dispatchLoopRun = async (props: Props): Promise<Response> => {
const { params, runtimeNodes, runtimeEdges, node, lastInteractive } = props;
const { params, runtimeNodes, runtimeEdges, node, lastInteractive, checkIsStopping } = props;
const { name } = node;
const mode = params[NodeInputKeyEnum.loopRunMode] ?? LoopRunModeEnum.array;
const childrenNodeIdList = params[NodeInputKeyEnum.childrenNodeIdList] ?? [];
@@ -120,6 +120,9 @@ export const dispatchLoopRun = async (props: Props): Promise<Response> => {
let maxIterationsExceeded = false;
while (true) {
if (checkIsStopping()) {
break;
}
// Check exhaustion before maxLength so `inputArray.length === maxLength` runs cleanly.
const arrayItem = (() => {
if (mode !== LoopRunModeEnum.array) {
@@ -35,7 +35,7 @@ type Response = DispatchNodeResultType<{
}>;
export const dispatchParallelRun = async (props: Props): Promise<Response> => {
const { params, runtimeNodes, runtimeEdges, node } = props;
const { params, runtimeNodes, runtimeEdges, node, checkIsStopping } = props;
const { name } = node;
const {
loopInputArray = [],
@@ -70,6 +70,9 @@ export const dispatchParallelRun = async (props: Props): Promise<Response> => {
let accumulatedPoints = 0;
for (let attempt = 0; attempt < maxRetryAttempts + 1; attempt++) {
if (checkIsStopping()) {
return;
}
const { taskRuntimeNodes, taskRuntimeEdges } = buildTaskRuntimeContext({
runtimeNodes,
runtimeEdges,
@@ -108,7 +111,7 @@ export const dispatchParallelRun = async (props: Props): Promise<Response> => {
// taskRuntimeNodes / taskRuntimeEdges go out of scope → GC
}
return lastResult!;
return lastResult;
},
concurrency
);
@@ -122,10 +125,13 @@ export const dispatchParallelRun = async (props: Props): Promise<Response> => {
responseDetails,
assistantResponses,
customFeedbacks
} = aggregateParallelResults(taskResults, {
taskInputs: loopInputArray,
parentNodeId: node.nodeId
});
} = aggregateParallelResults(
taskResults.filter((item) => item !== undefined),
{
taskInputs: loopInputArray,
parentNodeId: node.nodeId
}
);
return {
data: {
@@ -171,7 +171,8 @@ const makeProps = (
runtimeEdges: [],
variables: {},
usagePush: vi.fn(),
lastInteractive: undefined
lastInteractive: undefined,
checkIsStopping: () => false
} as any;
};
@@ -216,7 +217,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
runtimeEdges: [],
variables: {},
usagePush: vi.fn(),
lastInteractive: undefined
lastInteractive: undefined,
checkIsStopping: () => false
} as any;
const result: any = await dispatchLoopRun(props);
@@ -268,7 +270,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
runtimeEdges: [],
variables: {},
usagePush: vi.fn(),
lastInteractive: undefined
lastInteractive: undefined,
checkIsStopping: () => false
} as any;
const result: any = await dispatchLoopRun(props);
@@ -482,7 +485,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
iteration: 2,
childrenResponse: { entryNodeIds: ['userSelectNode'] }
}
}
},
checkIsStopping: () => false
} as any;
const result: any = await dispatchLoopRun(props);
@@ -533,7 +537,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
iteration: 2,
childrenResponse: interactivePayload
}
}
},
checkIsStopping: () => false
} as any;
await dispatchLoopRun(props);
@@ -611,7 +616,8 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
runtimeEdges: [],
variables: {},
usagePush: vi.fn(),
lastInteractive: undefined
lastInteractive: undefined,
checkIsStopping: () => false
} as any;
const result: any = await dispatchLoopRun(props);
@@ -781,6 +787,7 @@ describe('runLoopRun (integration with mocked runWorkflow)', () => {
runtimeEdges: [],
variables: {},
usagePush: vi.fn(),
checkIsStopping: () => false,
lastInteractive: {
type: 'loopRunInteractive',
params: {