perf: workflow code (#5548)

* perf: workflow code

* add tool call limit
This commit is contained in:
Archer
2025-08-27 11:45:46 +08:00
committed by GitHub
parent 610634e1a1
commit c4799df3fd
11 changed files with 133 additions and 118 deletions

View File

@@ -72,9 +72,10 @@ export type ChatDispatchProps = {
maxRunTimes: number;
isToolCall?: boolean;
workflowStreamResponse?: WorkflowResponseType;
workflowDispatchDeep?: number;
version?: 'v1' | 'v2';
workflowDispatchDeep: number;
responseAllData?: boolean;
responseDetail?: boolean;
};

View File

@@ -66,11 +66,12 @@ export const promptToolCallMessageRewrite = (
return cloneMessages;
};
const ERROR_TEXT = 'Tool run error';
const ERROR_TEXT = 'Tool call error';
export const parsePromptToolCall = (
str: string
): {
answer: string;
streamAnswer?: string;
toolCalls?: ChatCompletionMessageToolCall[];
} => {
str = str.trim();
@@ -99,11 +100,13 @@ export const parsePromptToolCall = (
} catch (error) {
if (prefixReg.test(str)) {
return {
answer: ERROR_TEXT
answer: `${ERROR_TEXT}: ${str}`,
streamAnswer: `${ERROR_TEXT}: ${str}`
};
} else {
return {
answer: str
answer: str,
streamAnswer: str
};
}
}

View File

@@ -324,7 +324,11 @@ export const createStreamResponse = async ({
}
const { reasoningContent, content, finish_reason, usage } = getResponseData();
const { answer: llmAnswer, toolCalls } = parsePromptToolCall(content);
const { answer: llmAnswer, streamAnswer, toolCalls } = parsePromptToolCall(content);
if (streamAnswer) {
onStreaming?.({ text: streamAnswer });
}
toolCalls?.forEach((call) => {
onToolCall?.({ call });

View File

@@ -2,7 +2,7 @@
import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { type SelectAppItemType } from '@fastgpt/global/core/workflow/template/system/abandoned/runApp/type';
import { dispatchWorkFlow } from '../index';
import { runWorkflow } from '../index';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
@@ -59,27 +59,25 @@ export const dispatchAppRequest = async (props: Props): Promise<Response> => {
const chatHistories = getHistories(history, histories);
const { files } = chatValue2RuntimePrompt(query);
const { flowResponses, flowUsages, assistantResponses, system_memories } = await dispatchWorkFlow(
{
...props,
runningAppInfo: {
id: String(appData._id),
teamId: String(appData.teamId),
tmbId: String(appData.tmbId)
},
runtimeNodes: storeNodes2RuntimeNodes(
appData.modules,
getWorkflowEntryNodeIds(appData.modules)
),
runtimeEdges: storeEdges2RuntimeEdges(appData.edges),
histories: chatHistories,
query: runtimePrompt2ChatsValue({
files,
text: userChatInput
}),
variables: props.variables
}
);
const { flowResponses, flowUsages, assistantResponses, system_memories } = await runWorkflow({
...props,
runningAppInfo: {
id: String(appData._id),
teamId: String(appData.teamId),
tmbId: String(appData.tmbId)
},
runtimeNodes: storeNodes2RuntimeNodes(
appData.modules,
getWorkflowEntryNodeIds(appData.modules)
),
runtimeEdges: storeEdges2RuntimeEdges(appData.edges),
histories: chatHistories,
query: runtimePrompt2ChatsValue({
files,
text: userChatInput
}),
variables: props.variables
});
const completeMessages = chatHistories.concat([
{

View File

@@ -201,7 +201,7 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
return runToolCall({
...props,
...requestParams,
maxRunToolTimes: 30
maxRunToolTimes: 100
});
})();

View File

@@ -8,7 +8,7 @@ import { responseWriteController } from '../../../../../common/response';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { dispatchWorkFlow } from '../../index';
import { runWorkflow } from '../../index';
import type { DispatchToolModuleProps, RunToolResponse, ToolNodeItemType } from './type';
import json5 from 'json5';
import type { DispatchFlowResponse } from '../../type';
@@ -110,7 +110,7 @@ export const runToolCall = async (
initToolCallEdges(runtimeEdges, interactiveEntryToolParams.entryNodeIds);
// Run entry tool
const toolRunResponse = await dispatchWorkFlow({
const toolRunResponse = await runWorkflow({
...workflowProps,
isToolCall: true
});
@@ -383,7 +383,7 @@ export const runToolCall = async (
})();
initToolNodes(runtimeNodes, [toolNode.nodeId], startParams);
const toolRunResponse = await dispatchWorkFlow({
const toolRunResponse = await runWorkflow({
...workflowProps,
isToolCall: true
});

View File

@@ -1,6 +1,6 @@
import type { ChatItemType } from '@fastgpt/global/core/chat/type.d';
import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type';
import { dispatchWorkFlow } from '../index';
import { runWorkflow } from '../index';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
@@ -132,7 +132,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
runTimes,
workflowInteractiveResponse,
system_memories
} = await dispatchWorkFlow({
} = await runWorkflow({
...props,
lastInteractive: childrenInteractive,
// Rewrite stream mode

View File

@@ -47,7 +47,7 @@ import { removeSystemVariable, rewriteRuntimeWorkFlow } from './utils';
import { getHandleId } from '@fastgpt/global/core/workflow/utils';
import { callbackMap } from './constants';
type Props = ChatDispatchProps & {
type Props = Omit<ChatDispatchProps, 'workflowDispatchDeep'> & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
};
@@ -58,62 +58,16 @@ type NodeResponseCompleteType = Omit<NodeResponseType, 'responseData'> & {
[DispatchNodeResponseKeyEnum.nodeResponse]?: ChatHistoryItemResType;
};
/* running */
// Run workflow
export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowResponse> {
let {
res,
runtimeNodes = [],
runtimeEdges = [],
histories = [],
variables = {},
externalProvider,
stream = false,
retainDatasetCite = true,
version = 'v1',
responseDetail = true,
responseAllData = true
} = data;
const startTime = Date.now();
const { res, stream, externalProvider } = data;
await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang });
// 初始化深度和自动增加深度,避免无限嵌套
if (!data.workflowDispatchDeep) {
data.workflowDispatchDeep = 1;
} else {
data.workflowDispatchDeep += 1;
}
const isRootRuntime = data.workflowDispatchDeep === 1;
// 初始化 runtimeNodesMap
const runtimeNodesMap = new Map(runtimeNodes.map((item) => [item.nodeId, item]));
// Over max depth
if (data.workflowDispatchDeep > 20) {
return {
flowResponses: [],
flowUsages: [],
debugResponse: {
finishedNodes: [],
finishedEdges: [],
nextStepRunNodes: []
},
[DispatchNodeResponseKeyEnum.runTimes]: 1,
[DispatchNodeResponseKeyEnum.assistantResponses]: [],
[DispatchNodeResponseKeyEnum.toolResponses]: null,
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables),
durationSeconds: 0
};
}
let workflowRunTimes = 0;
let streamCheckTimer: NodeJS.Timeout | null = null;
// Init
if (isRootRuntime) {
// set sse response headers
res?.setHeader('Connection', 'keep-alive'); // Set keepalive for long connection
if (stream && res) {
// set sse response headers
if (res) {
res.setHeader('Connection', 'keep-alive'); // Set keepalive for long connection
if (stream) {
res.on('close', () => res.end());
res.on('error', () => {
addLog.error('Request error');
@@ -135,14 +89,68 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
});
}, 10000);
}
}
// Get default variables
variables = {
...externalProvider.externalWorkflowVariables,
...getSystemVariables(data)
// Get default variables
const defaultVariables = {
...externalProvider.externalWorkflowVariables,
...getSystemVariables(data)
};
// Init some props
return runWorkflow({
...data,
variables: defaultVariables,
workflowDispatchDeep: 0
}).finally(() => {
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
});
}
type RunWorkflowProps = ChatDispatchProps & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
};
export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowResponse> => {
let {
res,
runtimeNodes = [],
runtimeEdges = [],
histories = [],
variables = {},
externalProvider,
retainDatasetCite = true,
version = 'v1',
responseDetail = true,
responseAllData = true
} = data;
// Over max depth
data.workflowDispatchDeep++;
const isRootRuntime = data.workflowDispatchDeep === 1;
if (data.workflowDispatchDeep > 20) {
return {
flowResponses: [],
flowUsages: [],
debugResponse: {
finishedNodes: [],
finishedEdges: [],
nextStepRunNodes: []
},
[DispatchNodeResponseKeyEnum.runTimes]: 1,
[DispatchNodeResponseKeyEnum.assistantResponses]: [],
[DispatchNodeResponseKeyEnum.toolResponses]: null,
newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables),
durationSeconds: 0
};
}
const startTime = Date.now();
await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang });
/*
工作流队列控制
特点:
@@ -161,7 +169,9 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
- 触发交互节点后,需要跳过所有 skip 节点,避免后续执行了 skipNode。
*/
class WorkflowQueue {
runtimeNodesMap = new Map(runtimeNodes.map((item) => [item.nodeId, item]));
// Workflow variables
workflowRunTimes = 0;
chatResponses: ChatHistoryItemResType[] = []; // response request and save to database
chatAssistantResponse: AIChatItemValueItemType[] = []; // The value will be returned to the user
chatNodeUsages: ChatNodeUsageType[] = [];
@@ -221,7 +231,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
const nodeId = this.activeRunQueue.keys().next().value;
const node = nodeId ? runtimeNodesMap.get(nodeId) : undefined;
const node = nodeId ? this.runtimeNodesMap.get(nodeId) : undefined;
if (nodeId) {
this.activeRunQueue.delete(nodeId);
@@ -501,7 +511,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
system_memories: newMemories
}: NodeResponseCompleteType) => {
// Add run times
workflowRunTimes += runTimes;
this.workflowRunTimes += runTimes;
data.maxRunTimes -= runTimes;
if (newMemories) {
@@ -650,7 +660,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// Get node run status by edges
const status = checkNodeRunStatus({
nodesMap: runtimeNodesMap,
nodesMap: this.runtimeNodesMap,
node,
runtimeEdges
});
@@ -820,10 +830,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
});
}
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
return {
flowResponses: workflowQueue.chatResponses,
flowUsages: workflowQueue.chatNodeUsages,
@@ -833,7 +839,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nextStepRunNodes: workflowQueue.debugNextStepRunNodes
},
workflowInteractiveResponse: interactiveResult,
[DispatchNodeResponseKeyEnum.runTimes]: workflowRunTimes,
[DispatchNodeResponseKeyEnum.runTimes]: workflowQueue.workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]: mergeAssistantResponseAnswerText(
workflowQueue.chatAssistantResponse
),
@@ -848,7 +854,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
: undefined,
durationSeconds
};
}
};
/* get system variable */
const getSystemVariables = ({

View File

@@ -4,7 +4,7 @@ import {
type DispatchNodeResultType,
type ModuleDispatchProps
} from '@fastgpt/global/core/workflow/runtime/type';
import { dispatchWorkFlow } from '..';
import { runWorkflow } from '..';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
type AIChatItemValueItemType,
@@ -93,7 +93,7 @@ export const dispatchLoop = async (props: Props): Promise<Response> => {
index++;
const response = await dispatchWorkFlow({
const response = await runWorkflow({
...props,
lastInteractive: interactiveData?.childrenResponse,
variables: newVariables,

View File

@@ -20,7 +20,7 @@ import { filterSystemVariables, getNodeErrResponse } from '../utils';
import { getPluginRunUserQuery } from '@fastgpt/global/core/workflow/utils';
import type { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { getChildAppRuntimeById } from '../../../app/plugin/controller';
import { dispatchWorkFlow } from '../index';
import { runWorkflow } from '../index';
import { getUserChatInfoAndAuthTeamPoints } from '../../../../support/permission/auth/team';
import { dispatchRunTool } from '../child/runTool';
import type { PluginRuntimeType } from '@fastgpt/global/core/app/plugin/type';
@@ -118,7 +118,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
...(externalProvider ? externalProvider.externalWorkflowVariables : {})
};
const { flowResponses, flowUsages, assistantResponses, runTimes, system_memories } =
await dispatchWorkFlow({
await runWorkflow({
...props,
// Rewrite stream mode
...(system_forbid_stream