User select node (#2397)

* feat: add user select node (#2300)

* feat: add user select node

* fix

* type

* fix

* fix

* fix

* perf: user select code

* perf: user select histories

* perf: i18n

---------

Co-authored-by: heheer <heheer@sealos.io>
This commit is contained in:
Archer
2024-08-15 12:27:04 +08:00
committed by GitHub
parent f8b8fcc172
commit fdeb1590d7
51 changed files with 1060 additions and 184 deletions

View File

@@ -1,7 +1,7 @@
import type { ChatItemType, ChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import { MongoChatItem } from './chatItemSchema';
import { addLog } from '../../common/system/log';
import { ChatItemValueTypeEnum } from '@fastgpt/global/core/chat/constants';
import { ChatItemValueTypeEnum, ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { delFileByFileIdList, getGFSCollection } from '../../common/file/gridfs/controller';
import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { MongoChat } from './chatSchema';
@@ -79,6 +79,52 @@ export const addCustomFeedbacks = async ({
}
};
/*
Update the user selected index of the interactive module
*/
export const updateUserSelectedResult = async ({
appId,
chatId,
userSelectedVal
}: {
appId: string;
chatId?: string;
userSelectedVal: string;
}) => {
if (!chatId) return;
try {
const chatItem = await MongoChatItem.findOne(
{ appId, chatId, obj: ChatRoleEnum.AI },
'value'
).sort({ _id: -1 });
if (!chatItem) return;
const interactiveValue = chatItem.value.find(
(v) => v.type === ChatItemValueTypeEnum.interactive
);
if (
!interactiveValue ||
interactiveValue.type !== ChatItemValueTypeEnum.interactive ||
!interactiveValue.interactive?.params
)
return;
interactiveValue.interactive = {
...interactiveValue.interactive,
params: {
...interactiveValue.interactive.params,
userSelectedVal
}
};
await chatItem.save();
} catch (error) {
addLog.error('updateUserSelectedResult error', error);
}
};
/*
Delete chat files
1. ChatId: Delete one chat files

View File

@@ -1,6 +1,9 @@
import { NextApiResponse } from 'next';
import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import type {
ChatDispatchProps,
@@ -10,6 +13,7 @@ import type { RuntimeNodeItemType } from '@fastgpt/global/core/workflow/runtime/
import type {
AIChatItemValueItemType,
ChatHistoryItemResType,
NodeOutputItemType,
ToolRunResponseItemType
} from '@fastgpt/global/core/chat/type.d';
import {
@@ -17,7 +21,7 @@ import {
FlowNodeTypeEnum
} from '@fastgpt/global/core/workflow/node/constant';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import { responseWriteNodeStatus } from '../../../common/response';
import { responseWrite, responseWriteNodeStatus } from '../../../common/response';
import { getSystemTime } from '@fastgpt/global/common/time/timezone';
import { replaceVariableLabel } from '@fastgpt/global/core/workflow/utils';
@@ -37,7 +41,8 @@ import { dispatchPluginOutput } from './plugin/runOutput';
import { removeSystemVariable, valueTypeFormat } from './utils';
import {
filterWorkflowEdges,
checkNodeRunStatus
checkNodeRunStatus,
getLastInteractiveValue
} from '@fastgpt/global/core/workflow/runtime/utils';
import { ChatNodeUsageType } from '@fastgpt/global/support/wallet/bill/type';
import { dispatchRunTools } from './agent/runTool/index';
@@ -56,6 +61,13 @@ import { dispatchRunCode } from './code/run';
import { dispatchTextEditor } from './tools/textEditor';
import { dispatchCustomFeedback } from './tools/customFeedback';
import { dispatchReadFiles } from './tools/readFiles';
import { dispatchUserSelect } from './interactive/userSelect';
import { FlowNodeOutputItemType } from '@fastgpt/global/core/workflow/type/io';
import {
InteractiveNodeResponseItemType,
UserInteractiveType,
UserSelectInteractive
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.workflowStart]: dispatchWorkflowStart,
@@ -80,6 +92,7 @@ const callbackMap: Record<FlowNodeTypeEnum, Function> = {
[FlowNodeTypeEnum.textEditor]: dispatchTextEditor,
[FlowNodeTypeEnum.customFeedback]: dispatchCustomFeedback,
[FlowNodeTypeEnum.readFiles]: dispatchReadFiles,
[FlowNodeTypeEnum.userSelect]: dispatchUserSelect,
// none
[FlowNodeTypeEnum.systemConfig]: dispatchSystemConfig,
@@ -171,7 +184,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
}
}
/* Pass the output of the module to the next stage */
/* Pass the output of the node, to get next nodes and update edge status */
function nodeOutput(
node: RuntimeNodeItemType,
result: Record<string, any> = {}
@@ -211,54 +224,117 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
return nextStepNodes;
}
function checkNodeCanRun(nodes: RuntimeNodeItemType[] = []): Promise<any> {
return Promise.all(
nodes.map(async (node) => {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
if (res?.closed || props.maxRunTimes <= 0) return;
props.maxRunTimes--;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
await surrenderProcess();
if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
/* Have interactive result, computed edges and node outputs */
function handleInteractiveResult({
entryNodeIds,
interactiveResponse
}: {
entryNodeIds: string[];
interactiveResponse: UserSelectInteractive;
}): AIChatItemValueItemType {
// Get node outputs
const nodeOutputs: NodeOutputItemType[] = [];
runtimeNodes.forEach((node) => {
node.outputs.forEach((output) => {
if (output.value) {
nodeOutputs.push({
nodeId: node.nodeId,
key: output.key as NodeOutputKeyEnum,
value: output.value
});
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
return;
})
).then((result) => {
const flat = result.flat().filter(Boolean) as unknown as {
node: RuntimeNodeItemType;
result: Record<string, any>;
}[];
if (flat.length === 0) return;
// Update the node output at the end of the run and get the next nodes
const nextNodes = flat.map((item) => nodeOutput(item.node, item.result)).flat();
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
return checkNodeCanRun(filterNextNodes);
});
});
const interactiveResult: InteractiveNodeResponseItemType = {
...interactiveResponse,
entryNodeIds,
memoryEdges: runtimeEdges.map((edge) => ({
...edge,
status: entryNodeIds.includes(edge.target)
? 'active'
: entryNodeIds.includes(edge.source)
? 'waiting'
: edge.status
})),
nodeOutputs
};
if (stream && res) {
responseWrite({
res,
event: SseResponseEventEnum.interactive,
data: JSON.stringify({ interactive: interactiveResult })
});
}
return {
type: ChatItemValueTypeEnum.interactive,
interactive: interactiveResult
};
}
async function checkNodeCanRun(node: RuntimeNodeItemType): Promise<any> {
const status = checkNodeRunStatus({
node,
runtimeEdges
});
if (res?.closed || props.maxRunTimes <= 0) return;
props.maxRunTimes--;
addLog.debug(`Run node`, { maxRunTimes: props.maxRunTimes, uid: user._id });
await surrenderProcess();
const response:
| {
node: RuntimeNodeItemType;
result: Record<string, any>;
}
| undefined = await (() => {
if (status === 'run') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithActive: ${node.name}`);
return nodeRunWithActive(node);
}
if (status === 'skip') {
addLog.debug(`[dispatchWorkFlow] nodeRunWithSkip: ${node.name}`);
return nodeRunWithSkip(node);
}
})();
if (!response) return;
// Update the node output at the end of the run and get the next nodes
const nextNodes = nodeOutput(response.node, response.result);
// Remove repeat nodes(Make sure that the node is only executed once)
const filterNextNodes = nextNodes.filter(
(node, index, self) => self.findIndex((t) => t.nodeId === node.nodeId) === index
);
// In the current version, only one interactive node is allowed at the same time
const interactiveResponse: UserInteractiveType | undefined =
response.result?.[DispatchNodeResponseKeyEnum.interactive];
if (interactiveResponse) {
chatAssistantResponse.push(
handleInteractiveResult({
entryNodeIds: [response.node.nodeId],
interactiveResponse
})
);
return;
}
return Promise.all(filterNextNodes.map(checkNodeCanRun));
}
// 运行完一轮后,清除连线的状态,避免污染进程
function nodeRunFinish(node: RuntimeNodeItemType) {
const edges = runtimeEdges.filter((item) => item.target === node.nodeId);
edges.forEach((item) => {
item.status = 'waiting';
node.isEntry = false;
runtimeEdges.forEach((item) => {
if (item.target === node.nodeId) {
item.status = 'waiting';
}
});
}
/* Inject data into module input */
@@ -393,12 +469,12 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
// start process width initInput
const entryNodes = runtimeNodes.filter((item) => item.isEntry);
console.log(runtimeEdges);
// reset entry
runtimeNodes.forEach((item) => {
item.isEntry = false;
});
await checkNodeCanRun(entryNodes);
// runtimeNodes.forEach((item) => {
// item.isEntry = false;
// });
await Promise.all(entryNodes.map(checkNodeCanRun));
// focus try to run pluginOutput
const pluginOutputModule = runtimeNodes.find(

View File

@@ -0,0 +1,94 @@
import {
DispatchNodeResponseKeyEnum,
SseResponseEventEnum
} from '@fastgpt/global/core/workflow/runtime/constants';
import {
DispatchNodeResultType,
ModuleDispatchProps
} from '@fastgpt/global/core/workflow/runtime/type';
import { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { getHandleId } from '@fastgpt/global/core/workflow/utils';
import type {
UserSelectInteractive,
UserSelectOptionItemType
} from '@fastgpt/global/core/workflow/template/system/userSelect/type';
import { updateUserSelectedResult } from '../../../chat/controller';
import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils';
import { responseWrite } from '../../../../common/response';
import { chatValue2RuntimePrompt } from '@fastgpt/global/core/chat/adapt';
type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.description]: string;
[NodeInputKeyEnum.userSelectOptions]: UserSelectOptionItemType[];
}>;
type UserSelectResponse = DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]?: string;
[DispatchNodeResponseKeyEnum.interactive]?: UserSelectInteractive;
[NodeOutputKeyEnum.selectResult]?: string;
}>;
export const dispatchUserSelect = async (props: Props): Promise<UserSelectResponse> => {
const {
res,
detail,
histories,
stream,
app: { _id: appId },
chatId,
node: { nodeId, isEntry },
params: { description, userSelectOptions },
query
} = props;
// Interactive node is not the entry node, return interactive result
if (!isEntry) {
const answerText = description ? `\n${description}` : undefined;
if (res && stream && answerText) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
data: textAdaptGptResponse({
text: answerText
})
});
}
return {
[NodeOutputKeyEnum.answerText]: answerText,
[DispatchNodeResponseKeyEnum.interactive]: {
type: 'userSelect',
params: {
userSelectOptions
}
}
};
}
const { text: userSelectedVal } = chatValue2RuntimePrompt(query);
// Error status
if (userSelectedVal === undefined) {
return {
[DispatchNodeResponseKeyEnum.skipHandleId]: userSelectOptions.map((item) =>
getHandleId(nodeId, 'source', item.value)
)
};
}
// Update db
updateUserSelectedResult({
appId,
chatId,
userSelectedVal
});
return {
[DispatchNodeResponseKeyEnum.skipHandleId]: userSelectOptions
.filter((item) => item.value !== userSelectedVal)
.map((item: any) => getHandleId(nodeId, 'source', item.key)),
[DispatchNodeResponseKeyEnum.nodeResponse]: {
userSelectResult: userSelectedVal
},
[NodeOutputKeyEnum.selectResult]: userSelectedVal
};
};

View File

@@ -4,7 +4,7 @@ import { FlowNodeTypeEnum } from '@fastgpt/global/core/workflow/node/constant';
import { DispatchNodeResponseKeyEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import { getPluginRuntimeById } from '../../../app/plugin/controller';
import {
getDefaultEntryNodeIds,
getWorkflowEntryNodeIds,
initWorkflowEdgeStatus,
storeNodes2RuntimeNodes
} from '@fastgpt/global/core/workflow/runtime/utils';
@@ -49,7 +49,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise<RunPlugi
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
runtimeNodes: storeNodes2RuntimeNodes(plugin.nodes, getDefaultEntryNodeIds(plugin.nodes)).map(
runtimeNodes: storeNodes2RuntimeNodes(plugin.nodes, getWorkflowEntryNodeIds(plugin.nodes)).map(
(node) => {
if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) {
return {

View File

@@ -6,7 +6,7 @@ import { responseWrite } from '../../../../common/response';
import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants';
import {
getDefaultEntryNodeIds,
getWorkflowEntryNodeIds,
initWorkflowEdgeStatus,
storeNodes2RuntimeNodes,
textAdaptGptResponse
@@ -67,7 +67,10 @@ export const dispatchAppRequest = async (props: Props): Promise<Response> => {
const { flowResponses, flowUsages, assistantResponses } = await dispatchWorkFlow({
...props,
app: appData,
runtimeNodes: storeNodes2RuntimeNodes(appData.modules, getDefaultEntryNodeIds(appData.modules)),
runtimeNodes: storeNodes2RuntimeNodes(
appData.modules,
getWorkflowEntryNodeIds(appData.modules)
),
runtimeEdges: initWorkflowEdgeStatus(appData.edges),
histories: chatHistories,
query: runtimePrompt2ChatsValue({