Workflow deep and workflow connection performance (#2664)

* feat: Workflow dispatch deep

* perf: workflow connection
This commit is contained in:
Archer
2024-09-10 16:57:59 +08:00
committed by GitHub
parent 7473be5922
commit aeba79267a
21 changed files with 274 additions and 1300 deletions

View File

@@ -22,3 +22,6 @@ weight: 813
5. 新增 - 插件支持配置使用引导、全局变量和文件输入。
6. 新增 - 简易模式支持新的版本管理方式。
7. 新增 - 聊天记录滚动加载,不再只加载 30 条。
8. 优化 - 工作流嵌套层级限制 20 层,避免因编排不合理导致的无限死循环。
9. 优化 - 工作流 handler 性能优化。
10. 修复 - 知识库选择权限问题。

View File

@@ -24,7 +24,8 @@ export enum DispatchNodeResponseKeyEnum {
assistantResponses = 'assistantResponses', // assistant response
rewriteHistories = 'rewriteHistories', // If have the response, workflow histories will be rewrite
interactive = 'INTERACTIVE' // is interactive
interactive = 'INTERACTIVE', // is interactive
runTimes = 'runTimes' // run times
}
export const needReplaceReferenceInputTypeList = [

View File

@@ -45,6 +45,7 @@ export type ChatDispatchProps = {
maxRunTimes: number;
isToolCall?: boolean;
workflowStreamResponse?: WorkflowResponseType;
workflowDispatchDeep?: number;
};
export type ModuleDispatchProps<T> = ChatDispatchProps & {
@@ -181,6 +182,7 @@ export type DispatchNodeResultType<T = {}> = {
[DispatchNodeResponseKeyEnum.toolResponses]?: ToolRunResponseItemType; // Tool response
[DispatchNodeResponseKeyEnum.assistantResponses]?: AIChatItemValueItemType[]; // Assistant response(Store to db)
[DispatchNodeResponseKeyEnum.rewriteHistories]?: ChatItemType[];
[DispatchNodeResponseKeyEnum.runTimes]?: number;
} & T;
/* Single node props */

View File

@@ -0,0 +1,3 @@
export const WORKFLOW_MAX_RUN_TIMES = process.env.WORKFLOW_MAX_RUN_TIMES
? parseInt(process.env.WORKFLOW_MAX_RUN_TIMES)
: 500;

View File

@@ -298,7 +298,10 @@ export const runToolWithFunctionCall = async (
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages: filterMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
};
}
@@ -310,7 +313,10 @@ export const runToolWithFunctionCall = async (
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
}
);
} else {
@@ -330,7 +336,8 @@ export const runToolWithFunctionCall = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}
};

View File

@@ -125,7 +125,8 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
dispatchFlowResponse, // tool flow response
totalTokens,
completeMessages = [], // The actual message sent to AI(just save text)
assistantResponses = [] // FastGPT system store assistant.value response
assistantResponses = [], // FastGPT system store assistant.value response
runTimes
} = await (async () => {
const adaptMessages = chats2GPTMessages({ messages, reserveId: false });
@@ -195,6 +196,7 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
const previewAssistantResponses = filterToolResponseToPreview(assistantResponses);
return {
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[NodeOutputKeyEnum.answerText]: previewAssistantResponses
.filter((item) => item.text?.content)
.map((item) => item.text?.content || '')

View File

@@ -180,7 +180,8 @@ export const runToolWithPromptCall = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}
@@ -318,7 +319,8 @@ ANSWER: `;
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages: filterMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes: (response?.runTimes || 0) + toolsRunResponse.moduleRunResponse.runTimes
};
}
@@ -330,7 +332,8 @@ ANSWER: `;
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes: (response?.runTimes || 0) + toolsRunResponse.moduleRunResponse.runTimes
}
);
};

View File

@@ -325,7 +325,10 @@ export const runToolWithToolChoice = async (
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
};
}
@@ -338,7 +341,10 @@ export const runToolWithToolChoice = async (
{
dispatchFlowResponse,
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
assistantResponses: toolNodeAssistants
assistantResponses: toolNodeAssistants,
runTimes:
(response?.runTimes || 0) +
flatToolsResponseData.reduce((sum, item) => sum + item.runTimes, 0)
}
);
} else {
@@ -358,7 +364,8 @@ export const runToolWithToolChoice = async (
dispatchFlowResponse: response?.dispatchFlowResponse || [],
totalTokens: response?.totalTokens ? response.totalTokens + tokens : tokens,
completeMessages,
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value]
assistantResponses: [...assistantResponses, ...toolNodeAssistant.value],
runTimes: (response?.runTimes || 0) + 1
};
}
} catch (error) {

View File

@@ -8,6 +8,7 @@ import type { RuntimeNodeItemType } from '@fastgpt/global/core/workflow/runtime/
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import type { DispatchFlowResponse } from '../../type.d';
import { AIChatItemValueItemType, ChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
export type DispatchToolModuleProps = ModuleDispatchProps<{
[NodeInputKeyEnum.history]?: ChatItemType[];
@@ -25,6 +26,7 @@ export type RunToolResponse = {
totalTokens: number;
completeMessages?: ChatCompletionMessageParam[];
assistantResponses?: AIChatItemValueItemType[];
[DispatchNodeResponseKeyEnum.runTimes]: number;
};
export type ToolNodeItemType = RuntimeNodeItemType & {
toolParams: RuntimeNodeItemType['inputs'];

View File

@@ -119,6 +119,31 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
...props
} = data;
// 初始化深度和自动增加深度,避免无限嵌套
if (!props.workflowDispatchDeep) {
props.workflowDispatchDeep = 1;
} else {
props.workflowDispatchDeep += 1;
}
if (props.workflowDispatchDeep > 20) {
return {
flowResponses: [],
flowUsages: [],
debugResponse: {
finishedNodes: [],
finishedEdges: [],
nextStepRunNodes: []
},
[DispatchNodeResponseKeyEnum.runTimes]: 1,
[DispatchNodeResponseKeyEnum.assistantResponses]: [],
[DispatchNodeResponseKeyEnum.toolResponses]: null,
newVariables: removeSystemVariable(variables)
};
}
let workflowRunTimes = 0;
// set sse response headers
if (stream && res) {
res.setHeader('Content-Type', 'text/event-stream;charset=utf-8');
@@ -154,7 +179,8 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nodeDispatchUsages,
toolResponses,
assistantResponses,
rewriteHistories
rewriteHistories,
runTimes = 1
}: Omit<
DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]?: string;
@@ -163,6 +189,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
'nodeResponse'
>
) {
// Add run times
workflowRunTimes += runTimes;
props.maxRunTimes -= runTimes;
if (responseData) {
chatResponses.push(responseData);
}
@@ -330,7 +360,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
const nodeRunResult = await (() => {
if (status === 'run') {
nodeRunBeforeHook(node);
props.maxRunTimes--;
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
@@ -565,6 +594,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
finishedEdges: runtimeEdges,
nextStepRunNodes: debugNextStepRunNodes
},
[DispatchNodeResponseKeyEnum.runTimes]: workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]:
mergeAssistantResponseAnswerText(chatAssistantResponse),
[DispatchNodeResponseKeyEnum.toolResponses]: toolRunResponse,

View File

@@ -66,7 +66,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
appId: String(plugin.id)
};
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
const { flowResponses, flowUsages, assistantResponses, runTimes } = await dispatchWorkFlow({
...props,
runningAppInfo: {
id: String(plugin.id),
@@ -92,6 +92,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
return {
assistantResponses,
// responseData, // debug
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: plugin.avatar,
totalPoints: usagePoints,

View File

@@ -75,7 +75,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
appId: String(appData._id)
};
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
const { flowResponses, flowUsages, assistantResponses, runTimes } = await dispatchWorkFlow({
...props,
runningAppInfo: {
id: String(appData._id),
@@ -107,6 +107,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
const { text } = chatValue2RuntimePrompt(assistantResponses);
return {
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: appData.avatar,
query: userChatInput,

View File

@@ -22,6 +22,7 @@ export type DispatchFlowResponse = {
};
[DispatchNodeResponseKeyEnum.toolResponses]: ToolRunResponseItemType;
[DispatchNodeResponseKeyEnum.assistantResponses]: AIChatItemValueItemType[];
[DispatchNodeResponseKeyEnum.runTimes]: number;
newVariables: Record<string, string>;
};

1167
pnpm-lock.yaml generated

File diff suppressed because it is too large Load Diff

View File

@@ -36,3 +36,6 @@ HOME_URL=/
# 日志等级: debug, info, warn, error
LOG_LEVEL=debug
STORE_LOG_LEVEL=warn
# 工作流最大运行次数,避免极端的死循环情况
WORKFLOW_MAX_RUN_TIMES=500

View File

@@ -27,6 +27,7 @@ import {
import { StoreNodeItemType } from '@fastgpt/global/core/workflow/type/node';
import { getWorkflowResponseWrite } from '@fastgpt/service/core/workflow/dispatch/utils';
import { getNanoid } from '@fastgpt/global/common/string/tools';
import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants';
export type Props = {
messages: ChatCompletionMessageParam[];
@@ -120,7 +121,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
chatConfig,
histories: chatMessages,
stream: true,
maxRunTimes: 200,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES,
workflowStreamResponse: workflowResponseWrite
});

View File

@@ -9,6 +9,7 @@ import { PostWorkflowDebugProps, PostWorkflowDebugResponse } from '@/global/core
import { NextAPI } from '@/service/middleware/entry';
import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant';
import { defaultApp } from '@/web/core/app/constants';
import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants';
async function handler(
req: NextApiRequest,
@@ -57,7 +58,7 @@ async function handler(
chatConfig: defaultApp.chatConfig,
histories: [],
stream: false,
maxRunTimes: 200
maxRunTimes: WORKFLOW_MAX_RUN_TIMES
});
pushChatUsage({

View File

@@ -59,6 +59,7 @@ import { getSystemTime } from '@fastgpt/global/common/time/timezone';
import { rewriteNodeOutputByHistories } from '@fastgpt/global/core/workflow/runtime/utils';
import { getWorkflowResponseWrite } from '@fastgpt/service/core/workflow/dispatch/utils';
import { getPluginRunUserQuery } from '@fastgpt/service/core/workflow/utils';
import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants';
type FastGptWebChatProps = {
chatId?: string; // undefined: get histories from messages, '': new chat, 'xxxxx': get histories from db
@@ -264,7 +265,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) {
chatConfig,
histories: newHistories,
stream,
maxRunTimes: 200,
maxRunTimes: WORKFLOW_MAX_RUN_TIMES,
workflowStreamResponse: workflowResponseWrite
});
}

View File

@@ -11,16 +11,17 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
const nodeList = useContextSelector(WorkflowContext, (v) => v.nodeList);
const edges = useContextSelector(WorkflowContext, (v) => v.edges);
const node = useMemo(() => nodeList.find((node) => node.nodeId === nodeId), [nodeList, nodeId]);
const { showSourceHandle, RightHandle, LeftHandlee, TopHandlee, BottomHandlee } = useMemo(() => {
const node = nodeList.find((node) => node.nodeId === nodeId);
/* not node/not connecting node, hidden */
const showSourceHandle = useMemo(() => {
const showSourceHandle = (() => {
if (!node) return false;
if (connectingEdge && connectingEdge.nodeId !== nodeId) return false;
return true;
}, [connectingEdge, node, nodeId]);
})();
const RightHandle = useMemo(() => {
const RightHandle = (() => {
const handleId = getHandleId(nodeId, 'source', Position.Right);
const rightTargetConnected = edges.some(
(edge) => edge.targetHandle === getHandleId(nodeId, 'target', Position.Right)
@@ -36,8 +37,8 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
translate={[2, 0]}
/>
);
}, [edges, node, nodeId]);
const LeftHandlee = useMemo(() => {
})();
const LeftHandlee = (() => {
const leftTargetConnected = edges.some(
(edge) => edge.targetHandle === getHandleId(nodeId, 'target', Position.Left)
);
@@ -53,8 +54,8 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
translate={[-6, 0]}
/>
);
}, [edges, node, nodeId]);
const TopHandlee = useMemo(() => {
})();
const TopHandlee = (() => {
if (
edges.some(
(edge) => edge.target === nodeId && edge.targetHandle === NodeOutputKeyEnum.selectedTools
@@ -76,8 +77,8 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
translate={[0, -2]}
/>
);
}, [edges, node, nodeId]);
const BottomHandlee = useMemo(() => {
})();
const BottomHandlee = (() => {
const handleId = getHandleId(nodeId, 'source', Position.Bottom);
const targetConnected = edges.some(
(edge) => edge.targetHandle === getHandleId(nodeId, 'target', Position.Bottom)
@@ -92,7 +93,16 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
translate={[0, 2]}
/>
);
}, [edges, node, nodeId]);
})();
return {
showSourceHandle,
RightHandle,
LeftHandlee,
TopHandlee,
BottomHandlee
};
}, [connectingEdge, edges, nodeId, nodeList]);
return showSourceHandle ? (
<>
@@ -104,19 +114,32 @@ export const ConnectionSourceHandle = ({ nodeId }: { nodeId: string }) => {
) : null;
};
export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
export const ConnectionTargetHandle = React.memo(function ConnectionTargetHandle({
nodeId
}: {
nodeId: string;
}) {
const connectingEdge = useContextSelector(WorkflowContext, (ctx) => ctx.connectingEdge);
const nodeList = useContextSelector(WorkflowContext, (v) => v.nodeList);
const edges = useContextSelector(WorkflowContext, (v) => v.edges);
const node = useMemo(() => nodeList.find((node) => node.nodeId === nodeId), [nodeList, nodeId]);
const { showHandle, LeftHandle, rightHandle, topHandle, bottomHandle } = useMemo(() => {
const node = nodeList.find((node) => node.nodeId === nodeId);
const connectingNode = nodeList.find((node) => node.nodeId === connectingEdge?.nodeId);
const showHandle = useMemo(() => {
const sourceEdges = edges.filter((edge) => edge.target === connectingNode?.nodeId);
const connectingNodeSourceNodeIds = sourceEdges.map((edge) => edge.source);
const showHandle = (() => {
if (!node) return false;
// Unable to connect oneself
if (connectingEdge && connectingEdge.nodeId === nodeId) return false;
// Unable to connect to the source node
if (connectingNodeSourceNodeIds.includes(nodeId)) return false;
return true;
}, [connectingEdge, node, nodeId]);
})();
const LeftHandle = useMemo(() => {
const LeftHandle = (() => {
if (!node || !node?.targetHandle?.left) return null;
const handleId = getHandleId(nodeId, 'target', Position.Left);
@@ -129,8 +152,8 @@ export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
translate={[-2, 0]}
/>
);
}, [node, nodeId]);
const rightHandle = useMemo(() => {
})();
const rightHandle = (() => {
if (!node || !node?.targetHandle?.right) return null;
const handleId = getHandleId(nodeId, 'target', Position.Right);
@@ -143,8 +166,8 @@ export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
translate={[2, 0]}
/>
);
}, [node, nodeId]);
const topHandle = useMemo(() => {
})();
const topHandle = (() => {
if (!node || !node?.targetHandle?.top) return null;
const handleId = getHandleId(nodeId, 'target', Position.Top);
@@ -157,8 +180,8 @@ export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
translate={[0, -2]}
/>
);
}, [node, nodeId]);
const bottomHandle = useMemo(() => {
})();
const bottomHandle = (() => {
if (!node || !node?.targetHandle?.bottom) return null;
const handleId = getHandleId(nodeId, 'target', Position.Bottom);
@@ -171,7 +194,16 @@ export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
translate={[0, 2]}
/>
);
}, [node, nodeId]);
})();
return {
showHandle,
LeftHandle,
rightHandle,
topHandle,
bottomHandle
};
}, [connectingEdge, edges, nodeId, nodeList]);
return showHandle ? (
<>
@@ -181,7 +213,7 @@ export const ConnectionTargetHandle = ({ nodeId }: { nodeId: string }) => {
{bottomHandle}
</>
) : null;
};
});
export default function Dom() {
return <></>;

View File

@@ -248,6 +248,14 @@ const NodeCard = (props: Props) => {
onChangeNode,
toast
]);
const RenderHandle = useMemo(() => {
return (
<>
<ConnectionSourceHandle nodeId={nodeId} />
<ConnectionTargetHandle nodeId={nodeId} />
</>
);
}, [nodeId]);
return (
<Box
@@ -283,8 +291,7 @@ const NodeCard = (props: Props) => {
<NodeDebugResponse nodeId={nodeId} debugResult={debugResult} />
{Header}
{children}
<ConnectionSourceHandle nodeId={nodeId} />
<ConnectionTargetHandle nodeId={nodeId} />
{RenderHandle}
<EditTitleModal maxLength={20} />
</Box>

View File

@@ -13,6 +13,7 @@ import {
import { UsageSourceEnum } from '@fastgpt/global/support/wallet/usage/constants';
import { addLog } from '@fastgpt/service/common/system/log';
import { MongoApp } from '@fastgpt/service/core/app/schema';
import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants';
import { dispatchWorkFlow } from '@fastgpt/service/core/workflow/dispatch';
export const getScheduleTriggerApp = async () => {
@@ -55,7 +56,7 @@ export const getScheduleTriggerApp = async () => {
chatConfig: defaultApp.chatConfig,
histories: [],
stream: false,
maxRunTimes: 200
maxRunTimes: WORKFLOW_MAX_RUN_TIMES
});
pushChatUsage({
appName: app.name,