4.8.10 workflow perf (#2596)

* perf: run plugin variables init

* perf: init free plan

* perf: dataset data ui

* perf: workflow theme

* perf: plugin input modal ui

* perf: workflow dispatch

* fix: account ui

* feat: 4810 doc
This commit is contained in:
Archer
2024-09-03 09:56:33 +08:00
committed by GitHub
parent 5ebe0017a0
commit 761e35c226
19 changed files with 216 additions and 180 deletions

View File

@@ -63,7 +63,7 @@ import {
InteractiveNodeResponseItemType,
UserSelectInteractive
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { dispatchRunAppNode } from './agent/runApp';
import { dispatchRunAppNode } from './plugin/runApp';
const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
@@ -186,7 +186,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
function nodeOutput(
node: RuntimeNodeItemType,
result: Record<string, any> = {}
): RuntimeNodeItemType[] {
): {
nextStepActiveNodes: RuntimeNodeItemType[];
nextStepSkipNodes: RuntimeNodeItemType[];
} {
pushStore(node, result);
// Assign the output value to the next node
@@ -211,16 +214,32 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
});
const nextStepNodes = runtimeNodes.filter((node) => {
return targetEdges.some((item) => item.target === node.nodeId);
const nextStepActiveNodes: RuntimeNodeItemType[] = [];
const nextStepSkipNodes: RuntimeNodeItemType[] = [];
runtimeNodes.forEach((node) => {
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'active')) {
nextStepActiveNodes.push(node);
}
if (targetEdges.some((item) => item.target === node.nodeId && item.status === 'skipped')) {
nextStepSkipNodes.push(node);
}
});
if (props.mode === 'debug') {
debugNextStepRunNodes = debugNextStepRunNodes.concat(nextStepNodes);
return [];
debugNextStepRunNodes = debugNextStepRunNodes.concat([
...nextStepActiveNodes,
...nextStepSkipNodes
]);
return {
nextStepActiveNodes: [],
nextStepSkipNodes: []
};
}
return nextStepNodes;
return {
nextStepActiveNodes,
nextStepSkipNodes
};
}
/* Have interactive result, computed edges and node outputs */
@@ -281,69 +300,82 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
});
}
/* Check node run/skip or wait */
function checkNodeCanRun(nodes: RuntimeNodeItemType[] = []): Promise<any> {
return Promise.all(
nodes.map(async (node) => {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
async function checkNodeCanRun(
node: RuntimeNodeItemType,
skippedNodeIdList = new Set<string>()
): Promise<RuntimeNodeItemType[]> {
if (res?.closed || props.maxRunTimes <= 0) return [];
// Thread avoidance
await surrenderProcess();
if (res?.closed || props.maxRunTimes <= 0) return;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
// Thread avoidance
await surrenderProcess();
if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
return;
})
).then((result) => {
props.maxRunTimes--;
const flat = result.flat().filter(Boolean) as unknown as {
node: RuntimeNodeItemType;
runStatus: 'run' | 'skip';
result: Record<string, any>;
}[];
// If there are no running nodes, the workflow is complete
if (flat.length === 0) return;
// Update the node output at the end of the run and get the next nodes
const nextNodes = flat.map((item) => nodeOutput(item.node, item.result)).flat();
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
// In the current version, only one interactive node is allowed at the same time
const haveInteractiveResponse = flat
.map((response) => {
const interactiveResponse = response.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [response.node.nodeId],
interactiveResponse
})
);
return 1;
}
})
.filter(Boolean);
if (haveInteractiveResponse.length > 0) return;
return checkNodeCanRun(filterNextNodes);
// Get node run status by edges
const status = checkNodeRunStatus({
node,
runtimeEdges
});
const nodeRunResult = await (() => {
if (status === 'run') {
props.maxRunTimes--;
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip' && !skippedNodeIdList.has(node.nodeId)) {
props.maxRunTimes -= 0.1;
skippedNodeIdList.add(node.nodeId);
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
})();
if (!nodeRunResult) return [];
// Update the node output at the end of the run and get the next nodes
let { nextStepActiveNodes, nextStepSkipNodes } = nodeOutput(
nodeRunResult.node,
nodeRunResult.result
);
// Remove repeat nodes(Make sure that the node is only executed once)
nextStepActiveNodes = nextStepActiveNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
nextStepSkipNodes = nextStepSkipNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse = nodeRunResult.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [nodeRunResult.node.nodeId],
interactiveResponse
})
);
return [];
}
// Run next nodes先运行 run 的,再运行 skip 的)
const nextStepActiveNodesResults = (
await Promise.all(nextStepActiveNodes.map((node) => checkNodeCanRun(node)))
).flat();
// 如果已经 active 运行过,不再执行 skipactive 中有闭环)
nextStepSkipNodes = nextStepSkipNodes.filter(
(node) => !nextStepActiveNodesResults.some((item) => item.nodeId === node.nodeId)
);
const nextStepSkipNodesResults = (
await Promise.all(nextStepSkipNodes.map((node) => checkNodeCanRun(node, skippedNodeIdList)))
).flat();
return [
...nextStepActiveNodes,
...nextStepSkipNodes,
...nextStepActiveNodesResults,
...nextStepSkipNodesResults
];
}
/* Inject data into module input */
function getNodeRunParams(node: RuntimeNodeItemType) {
@@ -396,7 +428,11 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
return params;
}
async function nodeRunWithActive(node: RuntimeNodeItemType) {
async function nodeRunWithActive(node: RuntimeNodeItemType): Promise<{
node: RuntimeNodeItemType;
runStatus: 'run';
result: Record<string, any>;
}> {
// push run status messages
if (node.showStatus) {
props.workflowStreamResponse?.({
@@ -465,8 +501,12 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
};
}
async function nodeRunWithSkip(node: RuntimeNodeItemType) {
// 其后所有target的节点都设置为skip
async function nodeRunWithSkip(node: RuntimeNodeItemType): Promise<{
node: RuntimeNodeItemType;
runStatus: 'skip';
result: Record<string, any>;
}> {
// Set target edges status to skipped
const targetEdges = runtimeEdges.filter((item) => item.source === node.nodeId);
nodeRunAfterHook(node);
@@ -486,7 +526,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// runtimeNodes.forEach((item) => {
// item.isEntry = false;
// });
await checkNodeCanRun(entryNodes);
await Promise.all(entryNodes.map((node) => checkNodeCanRun(node)));
// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(

View File

@@ -64,7 +64,11 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
variables: filterSystemVariables(props.variables),
variables: {
...filterSystemVariables(props.variables),
appId: String(plugin.id)
},
runtimeNodes,
runtimeEdges: initWorkflowEdgeStatus(plugin.edges)
});