mirror of
https://github.com/labring/FastGPT.git
synced 2026-05-16 01:09:01 +08:00
V4.14.9 features (#6599)
* fix: image read and json error (Agent) (#6502) * fix: 1.image read 2.JSON parsing error * dataset cite and pause * perf: plancall second parse * add test --------- Co-authored-by: archer <545436317@qq.com> * master message * remove invalid code * feat(sre): integrate traces, logs, metrics into one sdk (#6580) * fix: image read and json error (Agent) (#6502) * fix: 1.image read 2.JSON parsing error * dataset cite and pause * perf: plancall second parse * add test --------- Co-authored-by: archer <545436317@qq.com> * master message * wip: otel sdk * feat(sre): integrate traces, logs, metrics into one sdk * fix(sre): use SpanStatusCode constants * fix(sre): clarify step memory measurement * update package * fix: ts --------- Co-authored-by: YeYuheng <57035043+YYH211@users.noreply.github.com> Co-authored-by: archer <545436317@qq.com> * doc * sandbox in agent (#6579) * doc * update template * fix: pr * fix: sdk package * update lock * update next * update dockerfile * dockerfile * dockerfile * update sdk version * update dockerefile * version --------- Co-authored-by: YeYuheng <57035043+YYH211@users.noreply.github.com> Co-authored-by: Ryo <whoeverimf5@gmail.com>
This commit is contained in:
@@ -89,6 +89,7 @@ export const dispatchRunAgent = async (props: DispatchAgentModuleProps): Promise
|
||||
userChatInput, // 本次任务的输入
|
||||
history = 6,
|
||||
fileUrlList: fileLinks,
|
||||
aiChatVision = true,
|
||||
agent_selectedTools: selectedTools = [],
|
||||
// Dataset search configuration
|
||||
agent_datasetParams: datasetParams,
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import { getNanoid } from '@fastgpt/global/common/string/tools';
|
||||
import { getSystemTime } from '@fastgpt/global/common/time/timezone';
|
||||
import { SpanStatusCode } from '@opentelemetry/api';
|
||||
import type {
|
||||
AIChatItemValueItemType,
|
||||
ChatHistoryItemResType,
|
||||
@@ -60,6 +61,8 @@ import { TeamErrEnum } from '@fastgpt/global/common/error/code/team';
|
||||
import { i18nT } from '../../../../web/i18n/utils';
|
||||
import { validateFileUrlDomain } from '../../../common/security/fileUrlValidator';
|
||||
import { classifyEdgesByDFS, findSCCs, isNodeInCycle, getEdgeType } from '../utils/tarjan';
|
||||
import { observeWorkflowStep } from '../metrics';
|
||||
import { withActiveSpan } from '../../../common/tracing';
|
||||
|
||||
const logger = getLogger(LogCategories.MODULE.WORKFLOW.DISPATCH);
|
||||
import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus';
|
||||
@@ -736,233 +739,278 @@ export class WorkflowQueue {
|
||||
runStatus: 'run';
|
||||
result: NodeResponseCompleteType;
|
||||
}> {
|
||||
/* Inject data into module input */
|
||||
const getNodeRunParams = (node: RuntimeNodeItemType) => {
|
||||
if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) {
|
||||
// Format plugin input to object
|
||||
return node.inputs.reduce<Record<string, any>>((acc, item) => {
|
||||
acc[item.key] = valueTypeFormat(item.value, item.valueType);
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
|
||||
// Dynamic input need to store a key.
|
||||
const dynamicInput = node.inputs.find(
|
||||
(item) => item.renderTypeList[0] === FlowNodeInputTypeEnum.addInputParam
|
||||
);
|
||||
const params: Record<string, any> = dynamicInput
|
||||
? {
|
||||
[dynamicInput.key]: {}
|
||||
}
|
||||
: {};
|
||||
|
||||
node.inputs.forEach((input) => {
|
||||
// Special input, not format
|
||||
if (input.key === dynamicInput?.key) return;
|
||||
|
||||
// Skip some special key
|
||||
if (
|
||||
[NodeInputKeyEnum.childrenNodeIdList, NodeInputKeyEnum.httpJsonBody].includes(
|
||||
input.key as NodeInputKeyEnum
|
||||
)
|
||||
) {
|
||||
params[input.key] = input.value;
|
||||
return;
|
||||
}
|
||||
|
||||
// replace {{$xx.xx$}} and {{xx}} variables
|
||||
let value = replaceEditorVariable({
|
||||
text: input.value,
|
||||
nodes: this.data.runtimeNodes,
|
||||
variables: this.data.variables
|
||||
});
|
||||
|
||||
// replace reference variables
|
||||
value = getReferenceVariableValue({
|
||||
value,
|
||||
nodes: this.data.runtimeNodes,
|
||||
variables: this.data.variables
|
||||
});
|
||||
|
||||
// Dynamic input is stored in the dynamic key
|
||||
if (input.canEdit && dynamicInput && params[dynamicInput.key]) {
|
||||
params[dynamicInput.key][input.key] = valueTypeFormat(value, input.valueType);
|
||||
}
|
||||
params[input.key] = valueTypeFormat(value, input.valueType);
|
||||
});
|
||||
|
||||
return params;
|
||||
};
|
||||
|
||||
// push run status messages
|
||||
if (node.showStatus && !this.data.isToolCall) {
|
||||
this.data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.flowNodeStatus,
|
||||
data: {
|
||||
status: 'running',
|
||||
name: node.name
|
||||
}
|
||||
});
|
||||
}
|
||||
const startTime = Date.now();
|
||||
|
||||
// get node running params
|
||||
const params = getNodeRunParams(node);
|
||||
|
||||
const dispatchData: ModuleDispatchProps<Record<string, any>> = {
|
||||
...this.data,
|
||||
usagePush: this.usagePush.bind(this),
|
||||
lastInteractive: this.data.lastInteractive?.entryNodeIds?.includes(node.nodeId)
|
||||
? this.data.lastInteractive
|
||||
: undefined,
|
||||
variables: this.data.variables,
|
||||
histories: this.data.histories,
|
||||
retainDatasetCite: this.data.retainDatasetCite,
|
||||
node,
|
||||
runtimeNodes: this.data.runtimeNodes,
|
||||
runtimeEdges: this.data.runtimeEdges,
|
||||
params,
|
||||
const stepMetricAttributes = {
|
||||
workflowId: this.data.runningAppInfo.id,
|
||||
workflowName: this.data.runningAppInfo.name,
|
||||
nodeId: node.nodeId,
|
||||
nodeName: node.name,
|
||||
nodeType: node.flowNodeType,
|
||||
mode: this.isDebugMode ? 'test' : this.data.mode
|
||||
};
|
||||
|
||||
// run module
|
||||
const dispatchRes: NodeResponseType = await (async () => {
|
||||
if (callbackMap[node.flowNodeType]) {
|
||||
const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || [];
|
||||
const errorHandleId = getHandleId(node.nodeId, 'source_catch', 'right');
|
||||
|
||||
try {
|
||||
const result = (await callbackMap[node.flowNodeType](dispatchData)) as NodeResponseType;
|
||||
|
||||
if (result.error) {
|
||||
// Run error and not catch error, skip all edges
|
||||
if (!node.catchError) {
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: targetEdges.map(
|
||||
(item) => item.sourceHandle
|
||||
)
|
||||
};
|
||||
return observeWorkflowStep(stepMetricAttributes, () =>
|
||||
withActiveSpan(
|
||||
{
|
||||
name: `workflow.step ${node.name || node.nodeId}`,
|
||||
tracerName: 'fastgpt.workflow',
|
||||
attributes: {
|
||||
'fastgpt.workflow.id': this.data.runningAppInfo.id,
|
||||
'fastgpt.workflow.name': this.data.runningAppInfo.name,
|
||||
'fastgpt.workflow.node.id': node.nodeId,
|
||||
'fastgpt.workflow.node.name': node.name,
|
||||
'fastgpt.workflow.node.type': node.flowNodeType,
|
||||
'fastgpt.workflow.mode': stepMetricAttributes.mode
|
||||
}
|
||||
},
|
||||
async (stepSpan) => {
|
||||
/* Inject data into module input */
|
||||
const getNodeRunParams = (node: RuntimeNodeItemType) => {
|
||||
if (node.flowNodeType === FlowNodeTypeEnum.pluginInput) {
|
||||
// Format plugin input to object
|
||||
return node.inputs.reduce<Record<string, any>>((acc, item) => {
|
||||
acc[item.key] = valueTypeFormat(item.value, item.valueType);
|
||||
return acc;
|
||||
}, {});
|
||||
}
|
||||
|
||||
// Catch error, skip unError handle
|
||||
const skipHandleIds = targetEdges
|
||||
.filter((item) => item.sourceHandle !== errorHandleId)
|
||||
.map((item) => item.sourceHandle);
|
||||
// Dynamic input need to store a key.
|
||||
const dynamicInput = node.inputs.find(
|
||||
(item) => item.renderTypeList[0] === FlowNodeInputTypeEnum.addInputParam
|
||||
);
|
||||
const params: Record<string, any> = dynamicInput
|
||||
? {
|
||||
[dynamicInput.key]: {}
|
||||
}
|
||||
: {};
|
||||
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: result[
|
||||
DispatchNodeResponseKeyEnum.skipHandleId
|
||||
]
|
||||
? [...result[DispatchNodeResponseKeyEnum.skipHandleId], ...skipHandleIds].filter(
|
||||
Boolean
|
||||
)
|
||||
: skipHandleIds
|
||||
node.inputs.forEach((input) => {
|
||||
// Special input, not format
|
||||
if (input.key === dynamicInput?.key) return;
|
||||
|
||||
// Skip some special key
|
||||
if (
|
||||
[NodeInputKeyEnum.childrenNodeIdList, NodeInputKeyEnum.httpJsonBody].includes(
|
||||
input.key as NodeInputKeyEnum
|
||||
)
|
||||
) {
|
||||
params[input.key] = input.value;
|
||||
return;
|
||||
}
|
||||
|
||||
// replace {{$xx.xx$}} and {{xx}} variables
|
||||
let value = replaceEditorVariable({
|
||||
text: input.value,
|
||||
nodes: this.data.runtimeNodes,
|
||||
variables: this.data.variables
|
||||
});
|
||||
|
||||
// replace reference variables
|
||||
value = getReferenceVariableValue({
|
||||
value,
|
||||
nodes: this.data.runtimeNodes,
|
||||
variables: this.data.variables
|
||||
});
|
||||
|
||||
// Dynamic input is stored in the dynamic key
|
||||
if (input.canEdit && dynamicInput && params[dynamicInput.key]) {
|
||||
params[dynamicInput.key][input.key] = valueTypeFormat(value, input.valueType);
|
||||
}
|
||||
params[input.key] = valueTypeFormat(value, input.valueType);
|
||||
});
|
||||
|
||||
return params;
|
||||
};
|
||||
|
||||
// push run status messages
|
||||
if (node.showStatus && !this.data.isToolCall) {
|
||||
this.data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.flowNodeStatus,
|
||||
data: {
|
||||
status: 'running',
|
||||
name: node.name
|
||||
}
|
||||
});
|
||||
}
|
||||
const startTime = Date.now();
|
||||
|
||||
// get node running params
|
||||
const params = getNodeRunParams(node);
|
||||
|
||||
const dispatchData: ModuleDispatchProps<Record<string, any>> = {
|
||||
...this.data,
|
||||
usagePush: this.usagePush.bind(this),
|
||||
lastInteractive: this.data.lastInteractive?.entryNodeIds?.includes(node.nodeId)
|
||||
? this.data.lastInteractive
|
||||
: undefined,
|
||||
variables: this.data.variables,
|
||||
histories: this.data.histories,
|
||||
retainDatasetCite: this.data.retainDatasetCite,
|
||||
node,
|
||||
runtimeNodes: this.data.runtimeNodes,
|
||||
runtimeEdges: this.data.runtimeEdges,
|
||||
params,
|
||||
mode: this.isDebugMode ? 'test' : this.data.mode
|
||||
};
|
||||
|
||||
// run module
|
||||
const dispatchRes: NodeResponseType = await (async () => {
|
||||
if (callbackMap[node.flowNodeType]) {
|
||||
const targetEdges = this.edgeIndex.bySource.get(node.nodeId) || [];
|
||||
const errorHandleId = getHandleId(node.nodeId, 'source_catch', 'right');
|
||||
|
||||
try {
|
||||
const result = (await callbackMap[node.flowNodeType](
|
||||
dispatchData
|
||||
)) as NodeResponseType;
|
||||
|
||||
if (result.error) {
|
||||
// Run error and not catch error, skip all edges
|
||||
if (!node.catchError) {
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: targetEdges.map(
|
||||
(item) => item.sourceHandle
|
||||
)
|
||||
};
|
||||
}
|
||||
|
||||
// Catch error, skip unError handle
|
||||
const skipHandleIds = targetEdges
|
||||
.filter((item) => item.sourceHandle !== errorHandleId)
|
||||
.map((item) => item.sourceHandle);
|
||||
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: result[
|
||||
DispatchNodeResponseKeyEnum.skipHandleId
|
||||
]
|
||||
? [
|
||||
...result[DispatchNodeResponseKeyEnum.skipHandleId],
|
||||
...skipHandleIds
|
||||
].filter(Boolean)
|
||||
: skipHandleIds
|
||||
};
|
||||
}
|
||||
|
||||
// Not error
|
||||
const errorHandle =
|
||||
targetEdges.find((item) => item.sourceHandle === errorHandleId)?.sourceHandle ||
|
||||
'';
|
||||
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: (result[
|
||||
DispatchNodeResponseKeyEnum.skipHandleId
|
||||
]
|
||||
? [...result[DispatchNodeResponseKeyEnum.skipHandleId], errorHandle]
|
||||
: [errorHandle]
|
||||
).filter(Boolean)
|
||||
};
|
||||
} catch (error) {
|
||||
// Skip all edges and return error
|
||||
let skipHandleId = targetEdges.map((item) => item.sourceHandle);
|
||||
if (node.catchError) {
|
||||
skipHandleId = skipHandleId.filter((item) => item !== errorHandleId);
|
||||
}
|
||||
|
||||
return {
|
||||
[DispatchNodeResponseKeyEnum.nodeResponse]: {
|
||||
error: getErrText(error)
|
||||
},
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleId
|
||||
};
|
||||
}
|
||||
}
|
||||
return {};
|
||||
})();
|
||||
|
||||
const nodeResponses = dispatchRes[DispatchNodeResponseKeyEnum.nodeResponses] || [];
|
||||
// format response data. Add modulename and module type
|
||||
const formatResponseData: NodeResponseCompleteType['responseData'] = (() => {
|
||||
if (!dispatchRes[DispatchNodeResponseKeyEnum.nodeResponse]) return undefined;
|
||||
|
||||
const val = {
|
||||
moduleName: node.name,
|
||||
moduleType: node.flowNodeType,
|
||||
moduleLogo: node.avatar,
|
||||
...dispatchRes[DispatchNodeResponseKeyEnum.nodeResponse],
|
||||
id: getNanoid(),
|
||||
nodeId: node.nodeId,
|
||||
runningTime: +((Date.now() - startTime) / 1000).toFixed(2)
|
||||
};
|
||||
nodeResponses.push(val);
|
||||
return val;
|
||||
})();
|
||||
|
||||
// Response node response
|
||||
if (
|
||||
this.data.apiVersion === 'v2' &&
|
||||
!this.data.isToolCall &&
|
||||
this.isRootRuntime &&
|
||||
nodeResponses.length > 0
|
||||
) {
|
||||
const filteredResponses = this.data.responseAllData
|
||||
? nodeResponses
|
||||
: filterPublicNodeResponseData({
|
||||
nodeRespones: nodeResponses,
|
||||
responseDetail: this.data.responseDetail
|
||||
});
|
||||
|
||||
filteredResponses.forEach((item) => {
|
||||
this.data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.flowNodeResponse,
|
||||
data: item
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Add output default value
|
||||
if (dispatchRes.data) {
|
||||
node.outputs.forEach((item) => {
|
||||
if (!item.required) return;
|
||||
if (dispatchRes.data?.[item.key] !== undefined) return;
|
||||
dispatchRes.data![item.key] = valueTypeFormat(item.defaultValue, item.valueType);
|
||||
});
|
||||
}
|
||||
|
||||
// Update new variables
|
||||
if (dispatchRes[DispatchNodeResponseKeyEnum.newVariables]) {
|
||||
this.data.variables = {
|
||||
...this.data.variables,
|
||||
...dispatchRes[DispatchNodeResponseKeyEnum.newVariables]
|
||||
};
|
||||
}
|
||||
|
||||
// Not error
|
||||
const errorHandle =
|
||||
targetEdges.find((item) => item.sourceHandle === errorHandleId)?.sourceHandle || '';
|
||||
// Error
|
||||
if (dispatchRes?.responseData?.error) {
|
||||
stepSpan.setAttribute('fastgpt.workflow.step.error', true);
|
||||
stepSpan.setStatus({
|
||||
code: SpanStatusCode.ERROR,
|
||||
message: String(dispatchRes.responseData.error)
|
||||
});
|
||||
logger.warn('Workflow node returned error', { error: dispatchRes.responseData.error });
|
||||
} else {
|
||||
stepSpan.setStatus({ code: SpanStatusCode.OK });
|
||||
}
|
||||
|
||||
return {
|
||||
...result,
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: (result[
|
||||
DispatchNodeResponseKeyEnum.skipHandleId
|
||||
]
|
||||
? [...result[DispatchNodeResponseKeyEnum.skipHandleId], errorHandle]
|
||||
: [errorHandle]
|
||||
).filter(Boolean)
|
||||
};
|
||||
} catch (error) {
|
||||
// Skip all edges and return error
|
||||
let skipHandleId = targetEdges.map((item) => item.sourceHandle);
|
||||
if (node.catchError) {
|
||||
skipHandleId = skipHandleId.filter((item) => item !== errorHandleId);
|
||||
if (formatResponseData?.runningTime !== undefined) {
|
||||
stepSpan.setAttribute(
|
||||
'fastgpt.workflow.step.running_time_seconds',
|
||||
formatResponseData.runningTime
|
||||
);
|
||||
}
|
||||
|
||||
return {
|
||||
[DispatchNodeResponseKeyEnum.nodeResponse]: {
|
||||
error: getErrText(error)
|
||||
},
|
||||
[DispatchNodeResponseKeyEnum.skipHandleId]: skipHandleId
|
||||
node,
|
||||
runStatus: 'run',
|
||||
result: {
|
||||
...dispatchRes,
|
||||
[DispatchNodeResponseKeyEnum.nodeResponse]: formatResponseData
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
return {};
|
||||
})();
|
||||
|
||||
const nodeResponses = dispatchRes[DispatchNodeResponseKeyEnum.nodeResponses] || [];
|
||||
// format response data. Add modulename and module type
|
||||
const formatResponseData: NodeResponseCompleteType['responseData'] = (() => {
|
||||
if (!dispatchRes[DispatchNodeResponseKeyEnum.nodeResponse]) return undefined;
|
||||
|
||||
const val = {
|
||||
moduleName: node.name,
|
||||
moduleType: node.flowNodeType,
|
||||
moduleLogo: node.avatar,
|
||||
...dispatchRes[DispatchNodeResponseKeyEnum.nodeResponse],
|
||||
id: getNanoid(),
|
||||
nodeId: node.nodeId,
|
||||
runningTime: +((Date.now() - startTime) / 1000).toFixed(2)
|
||||
};
|
||||
nodeResponses.push(val);
|
||||
return val;
|
||||
})();
|
||||
|
||||
// Response node response
|
||||
if (
|
||||
this.data.apiVersion === 'v2' &&
|
||||
!this.data.isToolCall &&
|
||||
this.isRootRuntime &&
|
||||
nodeResponses.length > 0
|
||||
) {
|
||||
const filteredResponses = this.data.responseAllData
|
||||
? nodeResponses
|
||||
: filterPublicNodeResponseData({
|
||||
nodeRespones: nodeResponses,
|
||||
responseDetail: this.data.responseDetail
|
||||
});
|
||||
|
||||
filteredResponses.forEach((item) => {
|
||||
this.data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.flowNodeResponse,
|
||||
data: item
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
// Add output default value
|
||||
if (dispatchRes.data) {
|
||||
node.outputs.forEach((item) => {
|
||||
if (!item.required) return;
|
||||
if (dispatchRes.data?.[item.key] !== undefined) return;
|
||||
dispatchRes.data![item.key] = valueTypeFormat(item.defaultValue, item.valueType);
|
||||
});
|
||||
}
|
||||
|
||||
// Update new variables
|
||||
if (dispatchRes[DispatchNodeResponseKeyEnum.newVariables]) {
|
||||
this.data.variables = {
|
||||
...this.data.variables,
|
||||
...dispatchRes[DispatchNodeResponseKeyEnum.newVariables]
|
||||
};
|
||||
}
|
||||
|
||||
// Error
|
||||
if (dispatchRes?.responseData?.error) {
|
||||
logger.warn('Workflow node returned error', { error: dispatchRes.responseData.error });
|
||||
}
|
||||
|
||||
return {
|
||||
node,
|
||||
runStatus: 'run',
|
||||
result: {
|
||||
...dispatchRes,
|
||||
[DispatchNodeResponseKeyEnum.nodeResponse]: formatResponseData
|
||||
}
|
||||
};
|
||||
)
|
||||
);
|
||||
}
|
||||
private nodeRunWithSkip(node: RuntimeNodeItemType): {
|
||||
node: RuntimeNodeItemType;
|
||||
@@ -1342,6 +1390,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
|
||||
|
||||
// Over max depth
|
||||
data.workflowDispatchDeep++;
|
||||
const isRootRuntime = data.workflowDispatchDeep === 1;
|
||||
if (data.workflowDispatchDeep > 20) {
|
||||
return {
|
||||
flowResponses: [],
|
||||
@@ -1365,95 +1414,127 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
|
||||
};
|
||||
}
|
||||
|
||||
const startTime = Date.now();
|
||||
|
||||
data.runtimeEdges = filterOrphanEdges({
|
||||
runtimeEdges = filterOrphanEdges({
|
||||
edges: runtimeEdges,
|
||||
nodes: runtimeNodes,
|
||||
workflowId: data.runningAppInfo.id
|
||||
});
|
||||
await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang });
|
||||
// Init default value
|
||||
data.retainDatasetCite = data.retainDatasetCite ?? true;
|
||||
data.responseDetail = data.responseDetail ?? true;
|
||||
data.responseAllData = data.responseAllData ?? true;
|
||||
data.runtimeEdges = runtimeEdges;
|
||||
|
||||
// Start process width initInput
|
||||
const entryNodes = runtimeNodes.filter((item) => item.isEntry);
|
||||
// Reset entry
|
||||
runtimeNodes.forEach((item) => {
|
||||
// Interactively nodes will use the "isEntry", which does not need to be updated
|
||||
if (
|
||||
item.flowNodeType !== FlowNodeTypeEnum.userSelect &&
|
||||
item.flowNodeType !== FlowNodeTypeEnum.formInput &&
|
||||
item.flowNodeType !== FlowNodeTypeEnum.toolCall
|
||||
) {
|
||||
item.isEntry = false;
|
||||
}
|
||||
});
|
||||
|
||||
const workflowQueue = await new Promise<WorkflowQueue>((resolve) => {
|
||||
logger.info('Workflow run start', {
|
||||
maxRunTimes: data.maxRunTimes,
|
||||
appId: data.runningAppInfo.id
|
||||
});
|
||||
const workflowQueue = new WorkflowQueue({
|
||||
data,
|
||||
resolve,
|
||||
defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue
|
||||
});
|
||||
|
||||
entryNodes.forEach((node) => {
|
||||
workflowQueue.addActiveNode(node.nodeId);
|
||||
});
|
||||
});
|
||||
|
||||
// Get interactive node response.
|
||||
const interactiveResult = (() => {
|
||||
if (workflowQueue.nodeInteractiveResponse) {
|
||||
const interactiveAssistant = workflowQueue.handleInteractiveResult({
|
||||
entryNodeIds: workflowQueue.nodeInteractiveResponse.entryNodeIds,
|
||||
interactiveResponse: workflowQueue.nodeInteractiveResponse.interactiveResponse
|
||||
});
|
||||
if (workflowQueue.isRootRuntime) {
|
||||
workflowQueue.chatAssistantResponse.push(interactiveAssistant);
|
||||
return withActiveSpan(
|
||||
{
|
||||
name: isRootRuntime ? 'workflow.run' : 'workflow.child.run',
|
||||
tracerName: 'fastgpt.workflow',
|
||||
attributes: {
|
||||
'fastgpt.workflow.id': data.runningAppInfo.id,
|
||||
'fastgpt.workflow.name': data.runningAppInfo.name,
|
||||
'fastgpt.workflow.mode': data.mode,
|
||||
'fastgpt.workflow.depth': data.workflowDispatchDeep,
|
||||
'fastgpt.workflow.is_root': isRootRuntime,
|
||||
'fastgpt.workflow.chat_id': data.chatId,
|
||||
'fastgpt.workflow.app_version': data.apiVersion,
|
||||
'fastgpt.workflow.is_tool_call': !!data.isToolCall,
|
||||
'fastgpt.workflow.node_count': runtimeNodes.length,
|
||||
'fastgpt.workflow.edge_count': runtimeEdges.length
|
||||
}
|
||||
return interactiveAssistant.interactive;
|
||||
},
|
||||
async (workflowSpan) => {
|
||||
const startTime = Date.now();
|
||||
|
||||
await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang });
|
||||
// Init default value
|
||||
data.retainDatasetCite = data.retainDatasetCite ?? true;
|
||||
data.responseDetail = data.responseDetail ?? true;
|
||||
data.responseAllData = data.responseAllData ?? true;
|
||||
|
||||
// Start process width initInput
|
||||
const entryNodes = runtimeNodes.filter((item) => item.isEntry);
|
||||
// Reset entry
|
||||
runtimeNodes.forEach((item) => {
|
||||
// Interactively nodes will use the "isEntry", which does not need to be updated
|
||||
if (
|
||||
item.flowNodeType !== FlowNodeTypeEnum.userSelect &&
|
||||
item.flowNodeType !== FlowNodeTypeEnum.formInput &&
|
||||
item.flowNodeType !== FlowNodeTypeEnum.toolCall
|
||||
) {
|
||||
item.isEntry = false;
|
||||
}
|
||||
});
|
||||
|
||||
const workflowQueue = await new Promise<WorkflowQueue>((resolve) => {
|
||||
logger.info('Workflow run start', {
|
||||
maxRunTimes: data.maxRunTimes,
|
||||
appId: data.runningAppInfo.id
|
||||
});
|
||||
const workflowQueue = new WorkflowQueue({
|
||||
data,
|
||||
resolve,
|
||||
defaultSkipNodeQueue: data.lastInteractive?.skipNodeQueue || data.defaultSkipNodeQueue
|
||||
});
|
||||
|
||||
entryNodes.forEach((node) => {
|
||||
workflowQueue.addActiveNode(node.nodeId);
|
||||
});
|
||||
});
|
||||
|
||||
// Get interactive node response.
|
||||
const interactiveResult = (() => {
|
||||
if (workflowQueue.nodeInteractiveResponse) {
|
||||
const interactiveAssistant = workflowQueue.handleInteractiveResult({
|
||||
entryNodeIds: workflowQueue.nodeInteractiveResponse.entryNodeIds,
|
||||
interactiveResponse: workflowQueue.nodeInteractiveResponse.interactiveResponse
|
||||
});
|
||||
if (workflowQueue.isRootRuntime) {
|
||||
workflowQueue.chatAssistantResponse.push(interactiveAssistant);
|
||||
}
|
||||
return interactiveAssistant.interactive;
|
||||
}
|
||||
})();
|
||||
|
||||
const durationSeconds = +((Date.now() - startTime) / 1000).toFixed(2);
|
||||
|
||||
workflowSpan.setAttribute('fastgpt.workflow.duration_seconds', durationSeconds);
|
||||
workflowSpan.setAttribute('fastgpt.workflow.run_times', workflowQueue.workflowRunTimes);
|
||||
workflowSpan.setAttribute(
|
||||
'fastgpt.workflow.has_interactive_response',
|
||||
!!workflowQueue.nodeInteractiveResponse
|
||||
);
|
||||
workflowSpan.setStatus({ code: SpanStatusCode.OK });
|
||||
|
||||
if (isRootRuntime) {
|
||||
data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.workflowDuration,
|
||||
data: { durationSeconds }
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
flowResponses: workflowQueue.chatResponses,
|
||||
flowUsages: workflowQueue.chatNodeUsages,
|
||||
debugResponse: workflowQueue.getDebugResponse(),
|
||||
workflowInteractiveResponse: interactiveResult,
|
||||
[DispatchNodeResponseKeyEnum.runTimes]: workflowQueue.workflowRunTimes,
|
||||
[DispatchNodeResponseKeyEnum.assistantResponses]: mergeAssistantResponseAnswerText(
|
||||
workflowQueue.chatAssistantResponse
|
||||
),
|
||||
[DispatchNodeResponseKeyEnum.toolResponses]: workflowQueue.toolRunResponse,
|
||||
[DispatchNodeResponseKeyEnum.newVariables]: runtimeSystemVar2StoreType({
|
||||
variables,
|
||||
removeObj: externalProvider.externalWorkflowVariables,
|
||||
userVariablesConfigs: data.chatConfig?.variables
|
||||
}),
|
||||
[DispatchNodeResponseKeyEnum.memories]:
|
||||
Object.keys(workflowQueue.system_memories).length > 0
|
||||
? workflowQueue.system_memories
|
||||
: undefined,
|
||||
[DispatchNodeResponseKeyEnum.customFeedbacks]:
|
||||
workflowQueue.customFeedbackList.length > 0
|
||||
? workflowQueue.customFeedbackList
|
||||
: undefined,
|
||||
durationSeconds
|
||||
};
|
||||
}
|
||||
})();
|
||||
|
||||
const durationSeconds = +((Date.now() - startTime) / 1000).toFixed(2);
|
||||
|
||||
if (workflowQueue.isRootRuntime) {
|
||||
data.workflowStreamResponse?.({
|
||||
event: SseResponseEventEnum.workflowDuration,
|
||||
data: { durationSeconds }
|
||||
});
|
||||
}
|
||||
|
||||
return {
|
||||
flowResponses: workflowQueue.chatResponses,
|
||||
flowUsages: workflowQueue.chatNodeUsages,
|
||||
debugResponse: workflowQueue.getDebugResponse(),
|
||||
workflowInteractiveResponse: interactiveResult,
|
||||
[DispatchNodeResponseKeyEnum.runTimes]: workflowQueue.workflowRunTimes,
|
||||
[DispatchNodeResponseKeyEnum.assistantResponses]: mergeAssistantResponseAnswerText(
|
||||
workflowQueue.chatAssistantResponse
|
||||
),
|
||||
[DispatchNodeResponseKeyEnum.toolResponses]: workflowQueue.toolRunResponse,
|
||||
[DispatchNodeResponseKeyEnum.newVariables]: runtimeSystemVar2StoreType({
|
||||
variables,
|
||||
removeObj: externalProvider.externalWorkflowVariables,
|
||||
userVariablesConfigs: data.chatConfig?.variables
|
||||
}),
|
||||
[DispatchNodeResponseKeyEnum.memories]:
|
||||
Object.keys(workflowQueue.system_memories).length > 0
|
||||
? workflowQueue.system_memories
|
||||
: undefined,
|
||||
[DispatchNodeResponseKeyEnum.customFeedbacks]:
|
||||
workflowQueue.customFeedbackList.length > 0 ? workflowQueue.customFeedbackList : undefined,
|
||||
durationSeconds
|
||||
};
|
||||
);
|
||||
};
|
||||
|
||||
/* get system variable */
|
||||
|
||||
Reference in New Issue
Block a user