4.8.12 test (#2994)

* perf: run loop code

* doc

* fix: mulity loop node will error; loop node variables cannot inherit

* back save tip position

* fix: child workflow runtime

* stream connection
This commit is contained in:
Archer
2024-10-25 23:13:53 +08:00
committed by GitHub
parent f89452acdd
commit c722ced68d
12 changed files with 134 additions and 92 deletions

View File

@@ -211,18 +211,7 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
});
// flat child tool response
let newVariables: Record<string, any> = props.variables;
const childToolResponse = dispatchFlowResponse
.map((item) => {
// Computed new variables
newVariables = {
...newVariables,
...item.newVariables
};
return item.flowResponses;
})
.flat();
const childToolResponse = dispatchFlowResponse.map((item) => item.flowResponses).flat();
// concat tool usage
const totalPointsUsage =
@@ -261,7 +250,6 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
},
...flatUsages
],
[DispatchNodeResponseKeyEnum.newVariables]: newVariables,
[DispatchNodeResponseKeyEnum.interactive]: toolWorkflowInteractiveResponse
};
};

View File

@@ -27,7 +27,7 @@ import { getNanoid, sliceStrStartEnd } from '@fastgpt/global/common/string/tools
import { addLog } from '../../../../../common/system/log';
import { toolValueTypeList } from '@fastgpt/global/core/workflow/constants';
import { WorkflowInteractiveResponseType } from '@fastgpt/global/core/workflow/template/system/interactive/type';
import { ChatItemValueTypeEnum, ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { ChatItemValueTypeEnum } from '@fastgpt/global/core/chat/constants';
type ToolRunResponseType = {
toolRunResponse: DispatchFlowResponse;

View File

@@ -41,7 +41,8 @@ import { dispatchPluginOutput } from './plugin/runOutput';
import { removeSystemVariable, valueTypeFormat } from './utils';
import {
filterWorkflowEdges,
checkNodeRunStatus
checkNodeRunStatus,
textAdaptGptResponse
} from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import { dispatchRunTools } from './agent/runTool/index';
@@ -161,6 +162,20 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('X-Accel-Buffering', 'no');
res.setHeader('Cache-Control', 'no-cache, no-transform');
// 10s sends a message to prevent the browser from thinking that the connection is disconnected
const sendStreamTimerSign = () => {
setTimeout(() => {
props?.workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: ''
})
});
sendStreamTimerSign();
}, 10000);
};
sendStreamTimerSign();
}
variables = {
@@ -592,56 +607,60 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
};
}
// start process width initInput
const entryNodes = runtimeNodes.filter((item) => item.isEntry);
// reset entry
runtimeNodes.forEach((item) => {
// Interactive node is not the entry node, return interactive result
if (
item.flowNodeType !== FlowNodeTypeEnum.userSelect &&
item.flowNodeType !== FlowNodeTypeEnum.formInput &&
item.flowNodeType !== FlowNodeTypeEnum.tools
) {
item.isEntry = false;
}
});
await Promise.all(entryNodes.map((node) => checkNodeCanRun(node)));
try {
// start process width initInput
const entryNodes = runtimeNodes.filter((item) => item.isEntry);
// reset entry
runtimeNodes.forEach((item) => {
// Interactive node is not the entry node, return interactive result
if (
item.flowNodeType !== FlowNodeTypeEnum.userSelect &&
item.flowNodeType !== FlowNodeTypeEnum.formInput &&
item.flowNodeType !== FlowNodeTypeEnum.tools
) {
item.isEntry = false;
}
});
await Promise.all(entryNodes.map((node) => checkNodeCanRun(node)));
// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(
(item) => item.flowNodeType === FlowNodeTypeEnum.pluginOutput
);
if (pluginOutputModule && props.mode !== 'debug') {
await nodeRunWithActive(pluginOutputModule);
// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(
(item) => item.flowNodeType === FlowNodeTypeEnum.pluginOutput
);
if (pluginOutputModule && props.mode !== 'debug') {
await nodeRunWithActive(pluginOutputModule);
}
// Interactive node
const interactiveResult = (() => {
if (nodeInteractiveResponse) {
const interactiveAssistant = handleInteractiveResult({
entryNodeIds: nodeInteractiveResponse.entryNodeIds,
interactiveResponse: nodeInteractiveResponse.interactiveResponse
});
chatAssistantResponse.push(interactiveAssistant);
return interactiveAssistant.interactive;
}
})();
return {
flowResponses: chatResponses,
flowUsages: chatNodeUsages,
debugResponse: {
finishedNodes: runtimeNodes,
finishedEdges: runtimeEdges,
nextStepRunNodes: debugNextStepRunNodes
},
workflowInteractiveResponse: interactiveResult,
[DispatchNodeResponseKeyEnum.runTimes]: workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]:
mergeAssistantResponseAnswerText(chatAssistantResponse),
[DispatchNodeResponseKeyEnum.toolResponses]: toolRunResponse,
newVariables: removeSystemVariable(variables)
};
} catch (error) {
return Promise.reject(error);
}
// Interactive node
const interactiveResult = (() => {
if (nodeInteractiveResponse) {
const interactiveAssistant = handleInteractiveResult({
entryNodeIds: nodeInteractiveResponse.entryNodeIds,
interactiveResponse: nodeInteractiveResponse.interactiveResponse
});
chatAssistantResponse.push(interactiveAssistant);
return interactiveAssistant.interactive;
}
})();
return {
flowResponses: chatResponses,
flowUsages: chatNodeUsages,
debugResponse: {
finishedNodes: runtimeNodes,
finishedEdges: runtimeEdges,
nextStepRunNodes: debugNextStepRunNodes
},
workflowInteractiveResponse: interactiveResult,
[DispatchNodeResponseKeyEnum.runTimes]: workflowRunTimes,
[DispatchNodeResponseKeyEnum.assistantResponses]:
mergeAssistantResponseAnswerText(chatAssistantResponse),
[DispatchNodeResponseKeyEnum.toolResponses]: toolRunResponse,
newVariables: removeSystemVariable(variables)
};
}
/* get system variable */

View File

@@ -7,6 +7,7 @@ import {
import { dispatchWorkFlow } from '..';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { AIChatItemValueItemType, ChatHistoryItemResType } from '@fastgpt/global/core/chat/type';
import { cloneDeep } from 'lodash';
type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.loopInputArray]: Array<any>;
@@ -19,6 +20,7 @@ type Response = DispatchNodeResultType<{
export const dispatchLoop = async (props: Props): Promise<Response> => {
const {
params,
runtimeEdges,
runtimeNodes,
user,
node: { name }
@@ -28,7 +30,10 @@ export const dispatchLoop = async (props: Props): Promise<Response> => {
if (!Array.isArray(loopInputArray)) {
return Promise.reject('Input value is not an array');
}
if (loopInputArray.length > 50) {
const maxLength = process.env.WORKFLOW_MAX_LOOP_TIMES
? Number(process.env.WORKFLOW_MAX_LOOP_TIMES)
: 50;
if (loopInputArray.length > maxLength) {
return Promise.reject('Input array length cannot be greater than 50');
}
@@ -39,27 +44,25 @@ export const dispatchLoop = async (props: Props): Promise<Response> => {
let newVariables: Record<string, any> = props.variables;
for await (const item of loopInputArray) {
runtimeNodes.forEach((node) => {
if (
childrenNodeIdList.includes(node.nodeId) &&
node.flowNodeType === FlowNodeTypeEnum.loopStart
) {
node.isEntry = true;
node.inputs = node.inputs.map((input) =>
input.key === NodeInputKeyEnum.loopStartInput
? {
...input,
value: item
}
: input
);
}
});
const response = await dispatchWorkFlow({
...props,
runtimeNodes: runtimeNodes.map((node) =>
node.flowNodeType === FlowNodeTypeEnum.loopStart
? {
...node,
isEntry: true,
inputs: node.inputs.map((input) =>
input.key === NodeInputKeyEnum.loopStartInput
? {
...input,
value: item
}
: input
)
}
: {
...node,
isEntry: false
}
)
runtimeEdges: cloneDeep(runtimeEdges)
});
const loopOutputValue = response.flowResponses.find(

View File

@@ -113,11 +113,10 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
}
const usagePoints = await computedPluginUsage(plugin, flowUsages);
const childStreamResponse = system_forbid_stream ? false : props.stream;
return {
// 嵌套运行时,如果 childApp stream=false实际上不会有任何内容输出给用户所以不需要存储
assistantResponses: childStreamResponse ? assistantResponses : [],
assistantResponses: system_forbid_stream ? [] : assistantResponses,
// responseData, // debug
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {

View File

@@ -124,7 +124,7 @@ export const dispatchRunAppNode = async (props: Props): Promise<Response> => {
const usagePoints = flowUsages.reduce((sum, item) => sum + (item.totalPoints || 0), 0);
return {
assistantResponses: childStreamResponse ? assistantResponses : [],
assistantResponses: system_forbid_stream ? [] : assistantResponses,
[DispatchNodeResponseKeyEnum.runTimes]: runTimes,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
moduleLogo: appData.avatar,