Fix workflow (#5592)

* fix: fileselector default

* fix: workflow run process
This commit is contained in:
Archer
2025-09-04 20:22:35 +08:00
committed by GitHub
parent 9be1e591d3
commit 85ea117481
13 changed files with 230 additions and 192 deletions

View File

@@ -42,7 +42,7 @@ import type { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/type/edg
import type { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import { addLog } from '../../../common/system/log';
import { surrenderProcess } from '../../../common/system/tools';
import type { DispatchFlowResponse } from './type';
import type { DispatchFlowResponse, WorkflowDebugResponse } from './type';
import { removeSystemVariable, rewriteRuntimeWorkFlow } from './utils';
import { getHandleId } from '@fastgpt/global/core/workflow/utils';
import { callbackMap } from './constants';
@@ -50,6 +50,7 @@ import { callbackMap } from './constants';
type Props = Omit<ChatDispatchProps, 'workflowDispatchDeep'> & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
defaultSkipNodeQueue?: WorkflowDebugResponse['skipNodeQueue'];
};
type NodeResponseType = DispatchNodeResultType<{
[key: string]: any;
@@ -100,6 +101,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// Init some props
return runWorkflow({
...data,
defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue,
variables: defaultVariables,
workflowDispatchDeep: 0
}).finally(() => {
@@ -112,12 +114,14 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
type RunWorkflowProps = ChatDispatchProps & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
defaultSkipNodeQueue?: WorkflowDebugResponse['skipNodeQueue'];
};
export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowResponse> => {
let {
res,
runtimeNodes = [],
runtimeEdges = [],
defaultSkipNodeQueue,
histories = [],
variables = {},
externalProvider,
@@ -135,9 +139,10 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
flowResponses: [],
flowUsages: [],
debugResponse: {
finishedNodes: [],
finishedEdges: [],
nextStepRunNodes: []
memoryEdges: [],
entryNodeIds: [],
nodeResponses: {},
skipNodeQueue: []
},
[DispatchNodeResponseKeyEnum.runTimes]: 1,
[DispatchNodeResponseKeyEnum.assistantResponses]: [],
@@ -151,6 +156,8 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang });
const isDebugMode = data.mode === 'debug';
/*
工作流队列控制
特点:
@@ -176,7 +183,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
chatAssistantResponse: AIChatItemValueItemType[] = []; // The value will be returned to the user
chatNodeUsages: ChatNodeUsageType[] = [];
toolRunResponse: ToolRunResponseItemType; // Run with tool mode. Result will response to tool node.
debugNextStepRunNodes: RuntimeNodeItemType[] = []; // 记录 Debug 模式下,下一个阶段需要执行的节点。
// 记录交互节点,交互节点需要在工作流完全结束后再进行计算
nodeInteractiveResponse:
| {
@@ -186,22 +192,38 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
| undefined;
system_memories: Record<string, any> = {}; // Workflow node memories
// Debug
debugNextStepRunNodes: RuntimeNodeItemType[] = []; // 记录 Debug 模式下,下一个阶段需要执行的节点。
debugNodeResponses: WorkflowDebugResponse['nodeResponses'] = {};
// Queue variables
private activeRunQueue = new Set<string>();
private skipNodeQueue: { node: RuntimeNodeItemType; skippedNodeIdList: Set<string> }[] = [];
private skipNodeQueue = new Map<
string,
{ node: RuntimeNodeItemType; skippedNodeIdList: Set<string> }
>();
private runningNodeCount = 0;
private maxConcurrency: number;
private resolve: (e: WorkflowQueue) => void;
constructor({
maxConcurrency = 10,
defaultSkipNodeQueue,
resolve
}: {
maxConcurrency?: number;
defaultSkipNodeQueue?: WorkflowDebugResponse['skipNodeQueue'];
resolve: (e: WorkflowQueue) => void;
}) {
this.maxConcurrency = maxConcurrency;
this.resolve = resolve;
// Init skip node queue
defaultSkipNodeQueue?.forEach(({ id, skippedNodeIdList }) => {
const node = this.runtimeNodesMap.get(id);
if (!node) return;
this.addSkipNode(node, new Set(skippedNodeIdList));
});
}
// Add active node to queue (if already in the queue, it will not be added again)
@@ -217,7 +239,18 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
private processActiveNode() {
// Finish
if (this.activeRunQueue.size === 0 && this.runningNodeCount === 0) {
if (this.skipNodeQueue.length > 0 && !this.nodeInteractiveResponse) {
if (isDebugMode) {
// 没有下一个激活节点说明debug 进入了一个“即将结束”状态。可以开始处理 skip 节点
if (this.debugNextStepRunNodes.length === 0 && this.skipNodeQueue.size > 0) {
this.processSkipNodes();
} else {
this.resolve(this);
}
return;
}
// 如果没有交互响应,则开始处理 skip交互响应的 skip 需要留给后续处理)
if (this.skipNodeQueue.size > 0 && !this.nodeInteractiveResponse) {
this.processSkipNodes();
} else {
this.resolve(this);
@@ -251,11 +284,19 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
}
private addSkipNode(node: RuntimeNodeItemType, skippedNodeIdList: Set<string>) {
this.skipNodeQueue.push({ node, skippedNodeIdList });
// 保证一个node 只在queue里记录一次
const skipNodeSkippedNodeIdList =
this.skipNodeQueue.get(node.nodeId)?.skippedNodeIdList || new Set<string>();
const concatSkippedNodeIdList = new Set([...skippedNodeIdList, ...skipNodeSkippedNodeIdList]);
this.skipNodeQueue.set(node.nodeId, { node, skippedNodeIdList: concatSkippedNodeIdList });
}
private processSkipNodes() {
const skipItem = this.skipNodeQueue.shift();
// 取一个 node并且从队列里删除
const skipItem = this.skipNodeQueue.values().next().value;
if (skipItem) {
this.skipNodeQueue.delete(skipItem.node.nodeId);
this.checkNodeCanRun(skipItem.node, skipItem.skippedNodeIdList).finally(() => {
this.processActiveNode();
});
@@ -351,7 +392,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
runtimeNodes,
runtimeEdges,
params,
mode: data.mode === 'debug' ? 'test' : data.mode
mode: isDebugMode ? 'test' : data.mode
};
// run module
@@ -620,18 +661,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
const nextStepActiveNodes = Array.from(nextStepActiveNodesMap.values());
const nextStepSkipNodes = Array.from(nextStepSkipNodesMap.values());
if (data.mode === 'debug') {
this.debugNextStepRunNodes = this.debugNextStepRunNodes.concat(
data.lastInteractive
? nextStepActiveNodes
: [...nextStepActiveNodes, ...nextStepSkipNodes]
);
return {
nextStepActiveNodes: [],
nextStepSkipNodes: []
};
}
return {
nextStepActiveNodes,
nextStepSkipNodes
@@ -690,8 +719,31 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
return this.nodeRunWithSkip(node);
}
})();
if (!nodeRunResult) return;
// Store debug data
if (isDebugMode) {
if (status === 'run') {
this.debugNodeResponses[node.nodeId] = {
nodeId: node.nodeId,
type: 'run',
interactiveResponse: nodeRunResult.result[DispatchNodeResponseKeyEnum.interactive],
response: nodeRunResult.result[DispatchNodeResponseKeyEnum.nodeResponse]
};
} else if (status === 'skip') {
this.debugNodeResponses[node.nodeId] = {
nodeId: node.nodeId,
type: 'skip',
response: nodeRunResult.result[DispatchNodeResponseKeyEnum.nodeResponse]
};
}
}
// 如果一个节点 active 运行了,则需要把它从 skip queue 里删除
if (status === 'run') {
this.skipNodeQueue.delete(node.nodeId);
}
/*
特殊情况:
通过 skipEdges 可以判断是运行了分支节点。
@@ -704,22 +756,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
skippedNodeIdList.add(node.nodeId);
}
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
pushStore(nodeRunResult.result);
if (data.mode === 'debug') {
this.debugNextStepRunNodes = this.debugNextStepRunNodes.concat([nodeRunResult.node]);
}
this.nodeInteractiveResponse = {
entryNodeIds: [nodeRunResult.node.nodeId],
interactiveResponse
};
return;
}
// Update the node output at the end of the run and get the next nodes
const { nextStepActiveNodes, nextStepSkipNodes } = nodeOutput(
nodeRunResult.node,
@@ -730,10 +766,26 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
this.addSkipNode(node, skippedNodeIdList);
});
// Run next nodes
nextStepActiveNodes.forEach((node) => {
this.addActiveNode(node.nodeId);
});
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse = nodeRunResult.result[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
if (isDebugMode) {
this.debugNextStepRunNodes = this.debugNextStepRunNodes.concat([nodeRunResult.node]);
}
this.nodeInteractiveResponse = {
entryNodeIds: [nodeRunResult.node.nodeId],
interactiveResponse
};
return;
} else if (isDebugMode) {
// Debug 模式下一步时候,会自己增加 activeNode
this.debugNextStepRunNodes = this.debugNextStepRunNodes.concat(nextStepActiveNodes);
} else {
nextStepActiveNodes.forEach((node) => {
this.addActiveNode(node.nodeId);
});
}
}
/* Have interactive result, computed edges and node outputs */
@@ -760,6 +812,10 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
const interactiveResult: WorkflowInteractiveResponseType = {
...interactiveResponse,
skipNodeQueue: Array.from(this.skipNodeQueue.values()).map((item) => ({
id: item.node.nodeId,
skippedNodeIdList: Array.from(item.skippedNodeIdList)
})),
entryNodeIds,
memoryEdges: runtimeEdges.map((edge) => ({
...edge,
@@ -781,6 +837,22 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
interactive: interactiveResult
};
}
getDebugResponse(): WorkflowDebugResponse {
const entryNodeIds = this.debugNextStepRunNodes.map((item) => item.nodeId);
return {
memoryEdges: runtimeEdges.map((edge) => ({
...edge,
status: entryNodeIds.includes(edge.target) ? 'active' : edge.status
})),
entryNodeIds,
nodeResponses: this.debugNodeResponses,
skipNodeQueue: Array.from(this.skipNodeQueue.values()).map((item) => ({
id: item.node.nodeId,
skippedNodeIdList: Array.from(item.skippedNodeIdList)
}))
};
}
}
// Start process width initInput
@@ -799,7 +871,8 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
const workflowQueue = await new Promise<WorkflowQueue>((resolve) => {
const workflowQueue = new WorkflowQueue({
resolve
resolve,
defaultSkipNodeQueue
});
entryNodes.forEach((node) => {
@@ -833,11 +906,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
return {
flowResponses: workflowQueue.chatResponses,
flowUsages: workflowQueue.chatNodeUsages,
debugResponse: {
finishedNodes: runtimeNodes,
finishedEdges: runtimeEdges,
nextStepRunNodes: workflowQueue.debugNextStepRunNodes
},
debugResponse: workflowQueue.getDebugResponse(),
workflowInteractiveResponse: interactiveResult,
[DispatchNodeResponseKeyEnum.runTimes]: workflowQueue.workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]: mergeAssistantResponseAnswerText(

View File

@@ -13,14 +13,24 @@ import type { WorkflowInteractiveResponseType } from '@fastgpt/global/core/workf
import type { RuntimeEdgeItemType } from '@fastgpt/global/core/workflow/type/edge';
import type { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
export type WorkflowDebugResponse = {
memoryEdges: RuntimeEdgeItemType[];
entryNodeIds: string[]; // Next step entry nodes
nodeResponses: Record<
string,
{
nodeId: string;
type: 'skip' | 'run';
response?: ChatHistoryItemResType;
interactiveResponse?: InteractiveNodeResponseType;
}
>;
skipNodeQueue?: { id: string; skippedNodeIdList: string[] }[]; // Cache
};
export type DispatchFlowResponse = {
flowResponses: ChatHistoryItemResType[];
flowUsages: ChatNodeUsageType[];
debugResponse: {
finishedNodes: RuntimeNodeItemType[];
finishedEdges: RuntimeEdgeItemType[];
nextStepRunNodes: RuntimeNodeItemType[];
};
debugResponse: WorkflowDebugResponse;
workflowInteractiveResponse?: WorkflowInteractiveResponseType;
[DispatchNodeResponseKeyEnum.toolResponses]: ToolRunResponseItemType;
[DispatchNodeResponseKeyEnum.assistantResponses]: AIChatItemValueItemType[];