perf: index (#6131)

* perf: index

* stop design doc

* perf: stop workflow;perf: mongo connection

* fix: ts

* mq export
This commit is contained in:
Archer
2025-12-21 19:15:10 +08:00
committed by GitHub
parent 4f95f6867e
commit 2fea73bb68
32 changed files with 1419 additions and 238 deletions
@@ -64,6 +64,7 @@ export type ChatResponse = DispatchNodeResultType<
export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResponse> => {
let {
res,
checkIsStopping,
requestOrigin,
stream = false,
retainDatasetCite = true,
@@ -201,7 +202,7 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise<ChatResp
requestOrigin
},
userKey: externalProvider.openaiAccount,
isAborted: () => res?.closed,
isAborted: checkIsStopping,
onReasoning({ text }) {
if (!aiChatReasoning) return;
workflowStreamResponse?.({
@@ -18,6 +18,7 @@ export const runToolCall = async (props: DispatchToolModuleProps): Promise<RunTo
const { messages, toolNodes, toolModel, childrenInteractiveParams, ...workflowProps } = props;
const {
res,
checkIsStopping,
requestOrigin,
runtimeNodes,
runtimeEdges,
@@ -129,7 +130,7 @@ export const runToolCall = async (props: DispatchToolModuleProps): Promise<RunTo
retainDatasetCite,
useVision: aiChatVision
},
isAborted: () => res?.closed,
isAborted: checkIsStopping,
userKey: externalProvider.openaiAccount,
onReasoning({ text }) {
if (!aiChatReasoning) return;
@@ -59,10 +59,11 @@ import { TeamErrEnum } from '@fastgpt/global/common/error/code/team';
import { i18nT } from '../../../../web/i18n/utils';
import { clone } from 'lodash';
import { validateFileUrlDomain } from '../../../common/security/fileUrlValidator';
import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus';
type Props = Omit<
ChatDispatchProps,
'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables'
'checkIsStopping' | 'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables'
> & {
runtimeNodes: RuntimeNodeItemType[];
runtimeEdges: RuntimeEdgeItemType[];
@@ -87,7 +88,17 @@ export async function dispatchWorkFlow({
concatUsage,
...data
}: Props & WorkflowUsageProps): Promise<DispatchFlowResponse> {
const { res, stream, runningUserInfo, runningAppInfo, lastInteractive, histories, query } = data;
const {
res,
stream,
runningUserInfo,
runningAppInfo,
lastInteractive,
histories,
query,
chatId,
apiVersion
} = data;
// Check url valid
const invalidInput = query.some((item) => {
@@ -101,6 +112,8 @@ export async function dispatchWorkFlow({
addLog.info('[Workflow run] Invalid file url');
return Promise.reject(new UserError('Invalid file url'));
}
/* Init function */
// Check point
await checkTeamAIPoints(runningUserInfo.teamId);
@@ -120,7 +133,22 @@ export async function dispatchWorkFlow({
});
}
return usageId;
})()
})(),
// Add preview url to chat items
await addPreviewUrlToChatItems(histories, 'chatFlow'),
// Add preview url to query
...query.map(async (item) => {
if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) return;
item.file.url = await getS3ChatSource().createGetChatFileURL({
key: item.file.key,
external: true
});
}),
// Remove stopping sign
delAgentRuntimeStopSign({
appId: runningAppInfo.id,
chatId
})
]);
let streamCheckTimer: NodeJS.Timeout | null = null;
@@ -152,16 +180,6 @@ export async function dispatchWorkFlow({
}
}
// Add preview url to chat items
await addPreviewUrlToChatItems(histories, 'chatFlow');
for (const item of query) {
if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) continue;
item.file.url = await getS3ChatSource().createGetChatFileURL({
key: item.file.key,
external: true
});
}
// Get default variables
const cloneVariables = clone(data.variables);
const defaultVariables = {
@@ -173,12 +191,34 @@ export async function dispatchWorkFlow({
timezone
}))
};
// MCP
let mcpClientMemory = {} as Record<string, MCPClient>;
// Stop sign(没有 apiVersion,说明不会有暂停)
let stopping = false;
const checkIsStopping = (): boolean => {
if (apiVersion === 'v2') {
return stopping;
}
if (apiVersion === 'v1') {
if (!res) return false;
return res.closed || !!res.errored;
}
return false;
};
const checkStoppingTimer =
apiVersion === 'v2'
? setInterval(async () => {
stopping = await shouldWorkflowStop({
appId: runningAppInfo.id,
chatId
});
}, 100)
: undefined;
// Init some props
return runWorkflow({
...data,
checkIsStopping,
query,
histories,
timezone,
@@ -189,15 +229,24 @@ export async function dispatchWorkFlow({
concatUsage,
mcpClientMemory,
cloneVariables
}).finally(() => {
}).finally(async () => {
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
if (checkStoppingTimer) {
clearInterval(checkStoppingTimer);
}
// Close mcpClient connections
Object.values(mcpClientMemory).forEach((client) => {
client.closeConnection();
});
// 工作流完成后删除 Redis 记录
await delAgentRuntimeStopSign({
appId: runningAppInfo.id,
chatId
});
});
}
@@ -210,14 +259,14 @@ type RunWorkflowProps = ChatDispatchProps & {
};
export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowResponse> => {
let {
res,
apiVersion,
checkIsStopping,
runtimeNodes = [],
runtimeEdges = [],
histories = [],
variables = {},
externalProvider,
retainDatasetCite = true,
version = 'v1',
responseDetail = true,
responseAllData = true,
usageId,
@@ -328,10 +377,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});
}
get connectionIsActive(): boolean {
return !res?.closed && !res?.errored;
}
// Add active node to queue (if already in the queue, it will not be added again)
addActiveNode(nodeId: string) {
if (this.activeRunQueue.has(nodeId)) {
@@ -585,7 +630,7 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
})();
// Response node response
if (version === 'v2' && !data.isToolCall && isRootRuntime && formatResponseData) {
if (apiVersion === 'v2' && !data.isToolCall && isRootRuntime && formatResponseData) {
data.workflowStreamResponse?.({
event: SseResponseEventEnum.flowNodeResponse,
data: responseAllData
@@ -813,8 +858,8 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise<DispatchFlowR
});
return;
}
if (!this.connectionIsActive) {
addLog.warn('Request is closed/errored', {
if (checkIsStopping()) {
addLog.warn('Workflow stopped', {
appId: data.runningAppInfo.id,
nodeId: node.nodeId,
nodeName: node.name
@@ -0,0 +1,79 @@
import { addLog } from '../../../common/system/log';
import { getGlobalRedisConnection } from '../../../common/redis/index';
import { delay } from '@fastgpt/global/common/system/utils';
const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping';
const TTL = 60; // 1分钟
export const StopStatus = 'STOPPING';
export type WorkflowStatusParams = {
appId: string;
chatId: string;
};
// 获取工作流状态键
export const getRuntimeStatusKey = (params: WorkflowStatusParams): string => {
return `${WORKFLOW_STATUS_PREFIX}:${params.appId}:${params.chatId}`;
};
// 暂停任务
export const setAgentRuntimeStop = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.set(key, 1, 'EX', TTL);
};
// 删除任务状态
export const delAgentRuntimeStopSign = async (params: WorkflowStatusParams): Promise<void> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
await redis.del(key).catch((err) => {
addLog.error(`[Agent Runtime Stop] Delete stop sign error`, err);
});
};
// 检查工作流是否应该停止
export const shouldWorkflowStop = (params: WorkflowStatusParams): Promise<boolean> => {
const redis = getGlobalRedisConnection();
const key = getRuntimeStatusKey(params);
return redis
.get(key)
.then((res) => !!res)
.catch(() => false);
};
/**
* 等待工作流完成(记录被删除)
* @param params 工作流参数
* @param timeout 超时时间(毫秒),默认5秒
* @param pollInterval 轮询间隔(毫秒),默认50毫秒
* @returns true=正常完成, false=超时
*/
export const waitForWorkflowComplete = async ({
appId,
chatId,
timeout = 5000,
pollInterval = 50
}: {
appId: string;
chatId: string;
timeout?: number;
pollInterval?: number;
}) => {
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
const sign = await shouldWorkflowStop({ appId, chatId });
// 如果没有暂停中的标志,则认为已经完成任务了。
if (!sign) {
return;
}
// 等待下一次轮询
await delay(pollInterval);
}
return;
};