fix: dispatch workflow skip status (#2496)

This commit is contained in:
Archer
2024-08-24 23:36:59 +08:00
committed by GitHub
parent 3248e95d53
commit fa106eb24c

View File

@@ -275,60 +275,8 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
};
}
async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<any> {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
if (res?.closed || props.maxRunTimes <= 0) return;
props.maxRunTimes--;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
await surrenderProcess();
const response:
| {
node: RuntimeNodeItemType;
result: Record<string, any>;
}
| undefined = await (() => {
if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
})();
if (!response) return;
// Update the node output at the end of the run and get the next nodes
const nextNodes = nodeOutput(response.node, response.result);
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse: UserInteractiveType | undefined =
response.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [response.node.nodeId],
interactiveResponse
})
);
return;
}
return Promise.all(filterNextNodes.map(checkNodeCanRun));
}
// 运行完一轮后,清除连线的状态,避免污染进程
function nodeRunFinish(node: RuntimeNodeItemType) {
// 每个节点 运行/跳过 后,初始化边的状态
function nodeRunAfterHook(node: RuntimeNodeItemType) {
node.isEntry = false;
runtimeEdges.forEach((item) => {
@@ -337,6 +285,69 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
});
}
/* Check node run/skip or wait */
function checkNodeCanRun(nodes: RuntimeNodeItemType[] = []): Promise<any> {
return Promise.all(
nodes.map(async (node) => {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
if (res?.closed || props.maxRunTimes <= 0) return;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
// Thread avoidance
await surrenderProcess();
if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
return;
})
).then((result) => {
props.maxRunTimes--;
const flat = result.flat().filter(Boolean) as unknown as {
node: RuntimeNodeItemType;
result: Record<string, any>;
}[];
if (flat.length === 0) return;
// Update the node output at the end of the run and get the next nodes
const nextNodes = flat.map((item) => nodeOutput(item.node, item.result)).flat();
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
// In the current version, only one interactive node is allowed at the same time
const haveInteractiveResponse = flat
.map((response) => {
const interactiveResponse = response.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [response.node.nodeId],
interactiveResponse
})
);
return 1;
}
})
.filter(Boolean);
if (haveInteractiveResponse.length > 0) return;
return checkNodeCanRun(filterNextNodes);
});
}
/* Inject data into module input */
function getNodeRunParams(node: RuntimeNodeItemType) {
if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) {
@@ -444,7 +455,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
dispatchRes[item.key] = valueTypeFormat(item.defaultValue, item.valueType);
});
nodeRunFinish(node);
nodeRunAfterHook(node);
return {
node,
@@ -457,7 +468,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
async function nodeRunWithSkip(node: RuntimeNodeItemType) {
// 其后所有target的节点都设置为skip
const targetEdges = runtimeEdges.filter((item) => item.source === node.nodeId);
nodeRunFinish(node);
nodeRunAfterHook(node);
return {
node,
@@ -474,7 +485,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// runtimeNodes.forEach((item) => {
// item.isEntry = false;
// });
await Promise.all(entryNodes.map(checkNodeCanRun));
await checkNodeCanRun(entryNodes);
// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(