diff --git a/.claude/skills/core/app/workflow/stop/SKILL.md b/.claude/skills/core/app/workflow/stop/SKILL.md new file mode 100644 index 0000000000..a109f45e01 --- /dev/null +++ b/.claude/skills/core/app/workflow/stop/SKILL.md @@ -0,0 +1,672 @@ +--- +name: workflow-stop-design +description: 工作流暂停逻辑设计方案 +--- + +## 1. Redis 状态管理方案 + +### 1.1 状态键设计 + +**Redis Key 结构:** +```typescript +// Key 格式: agent_runtime_stopping:{appId}:{chatId} +const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping'; + +type WorkflowStatusKey = `${typeof WORKFLOW_STATUS_PREFIX}:${string}:${string}`; + +// 示例: agent_runtime_stopping:app_123456:chat_789012 +``` + +**状态值设计:** +- **存在键 (值为 1)**: 工作流应该停止 +- **不存在键**: 工作流正常运行 +- **设计简化**: 不使用状态枚举,仅通过键的存在与否判断 + +**参数类型定义:** +```typescript +type WorkflowStatusParams = { + appId: string; + chatId: string; +}; +``` + +### 1.2 状态生命周期管理 + +**状态转换流程:** +``` +正常运行(无键) → 停止中(键存在) → 完成(删除键) +``` + +**TTL 设置:** +- **停止标志 TTL**: 60 秒 + - 原因: 避免因意外情况导致的键泄漏 + - 正常情况下会在工作流完成时主动删除 +- **工作流完成后**: 直接删除 Redis 键 + - 原因: 不需要保留终态,减少 Redis 内存占用 + +### 1.3 核心函数说明 + +**1. setAgentRuntimeStop** +- **功能**: 设置停止标志 +- **参数**: `{ appId, chatId }` +- **实现**: 使用 `SETEX` 命令,设置键值为 1,TTL 60 秒 + +**2. shouldWorkflowStop** +- **功能**: 检查工作流是否应该停止 +- **参数**: `{ appId, chatId }` +- **返回**: `Promise` - true=应该停止, false=继续运行 +- **实现**: GET 命令获取键值,存在则返回 true + +**3. delAgentRuntimeStopSign** +- **功能**: 删除停止标志 +- **参数**: `{ appId, chatId }` +- **实现**: DEL 命令删除键 + +**4. waitForWorkflowComplete** +- **功能**: 等待工作流完成(停止标志被删除) +- **参数**: `{ appId, chatId, timeout?, pollInterval? }` +- **实现**: 轮询检查停止标志是否被删除,超时返回 + +### 1.4 边界情况处理 + +**1. Redis 操作失败** +- **错误处理**: 所有 Redis 操作都包含 `.catch()` 错误处理 +- **降级策略**: + - `shouldWorkflowStop`: 出错时返回 `false` (认为不需要停止,继续运行) + - `delAgentRuntimeStopSign`: 出错时记录错误日志,但不影响主流程 +- **设计原因**: Redis 异常不应阻塞工作流运行,降级到继续执行策略 + +**2. TTL 自动清理** +- **TTL 设置**: 60 秒 +- **清理时机**: Redis 自动清理过期键 +- **设计原因**: + - 避免因异常情况导致的 Redis 键泄漏 + - 自动清理减少手动维护成本 + - 60 秒足够大多数工作流完成停止操作 + +**3. stop 接口等待超时** +- **超时时间**: 5 秒 +- **超时策略**: `waitForWorkflowComplete` 在 5 秒内轮询检查停止标志是否被删除 +- **超时处理**: 5 秒后直接返回,不影响工作流继续执行 +- **设计原因**: + - 避免前端长时间等待 + - 5 秒足够大多数节点完成当前操作 + - 用户体验优先,超时后前端可选择重试或放弃 + +**4. 并发停止请求** +- **处理方式**: 多次调用 `setAgentRuntimeStop` 是安全的,Redis SETEX 是幂等操作 +- **设计原因**: 避免用户多次点击停止按钮导致的问题 + +--- + +## 2. Redis 工具函数实现 + +**位置**: `packages/service/core/workflow/dispatch/workflowStatus.ts` + +```typescript +import { addLog } from '../../../common/system/log'; +import { getGlobalRedisConnection } from '../../../common/redis/index'; +import { delay } from '@fastgpt/global/common/system/utils'; + +const WORKFLOW_STATUS_PREFIX = 'agent_runtime_stopping'; +const TTL = 60; // 60秒 + +export const StopStatus = 'STOPPING'; + +export type WorkflowStatusParams = { + appId: string; + chatId: string; +}; + +// 获取工作流状态键 +export const getRuntimeStatusKey = (params: WorkflowStatusParams): string => { + return `${WORKFLOW_STATUS_PREFIX}:${params.appId}:${params.chatId}`; +}; + +// 设置停止标志 +export const setAgentRuntimeStop = async (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + await redis.setex(key, TTL, 1); +}; + +// 删除停止标志 +export const delAgentRuntimeStopSign = async (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + await redis.del(key).catch((err) => { + addLog.error(`[Agent Runtime Stop] Delete stop sign error`, err); + }); +}; + +// 检查工作流是否应该停止 +export const shouldWorkflowStop = (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + return redis + .get(key) + .then((res) => !!res) + .catch(() => false); +}; + +/** + * 等待工作流完成(停止标志被删除) + * @param params 工作流参数 + * @param timeout 超时时间(毫秒),默认5秒 + * @param pollInterval 轮询间隔(毫秒),默认50毫秒 + */ +export const waitForWorkflowComplete = async ({ + appId, + chatId, + timeout = 5000, + pollInterval = 50 +}: { + appId: string; + chatId: string; + timeout?: number; + pollInterval?: number; +}) => { + const startTime = Date.now(); + + while (Date.now() - startTime < timeout) { + const sign = await shouldWorkflowStop({ appId, chatId }); + + // 如果停止标志已被删除,说明工作流已完成 + if (!sign) { + return; + } + + // 等待下一次轮询 + await delay(pollInterval); + } + + // 超时后直接返回 + return; +}; +``` + +**测试用例位置**: `test/cases/service/core/app/workflow/workflowStatus.test.ts` + +```typescript +import { describe, test, expect, beforeEach } from 'vitest'; +import { + setAgentRuntimeStop, + delAgentRuntimeStopSign, + shouldWorkflowStop, + waitForWorkflowComplete +} from '@fastgpt/service/core/workflow/dispatch/workflowStatus'; + +describe('Workflow Status Redis Functions', () => { + const testAppId = 'test_app_123'; + const testChatId = 'test_chat_456'; + + beforeEach(async () => { + // 清理测试数据 + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + }); + + test('should set stopping sign', async () => { + await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); + + test('should return false for non-existent status', async () => { + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should return false after deleting stop sign', async () => { + await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }); + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should wait for workflow completion', async () => { + // 设置初始停止标志 + await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }); + + // 模拟异步完成(删除停止标志) + setTimeout(async () => { + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + }, 500); + + // 等待完成 + await waitForWorkflowComplete({ + appId: testAppId, + chatId: testChatId, + timeout: 2000 + }); + + // 验证停止标志已被删除 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should timeout when waiting too long', async () => { + await setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }); + + // 等待超时(不删除标志) + await waitForWorkflowComplete({ + appId: testAppId, + chatId: testChatId, + timeout: 100 + }); + + // 验证停止标志仍然存在 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); + + test('should handle concurrent stop sign operations', async () => { + // 并发设置停止标志 + await Promise.all([ + setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }), + setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }) + ]); + + // 停止标志应该存在 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); +}); +``` + +## 3. 工作流停止检测机制改造 + +### 3.1 修改位置 + +**文件**: `packages/service/core/workflow/dispatch/index.ts` + +### 3.2 工作流启动时的停止检测机制 + +**改造点 1: 停止检测逻辑 (行 196-216)** + +使用内存变量 + 定时轮询 Redis 的方式: + +```typescript +import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus'; + +// 初始化停止检测 +let stopping = false; +const checkIsStopping = (): boolean => { + if (apiVersion === 'v2') { + return stopping; + } + if (apiVersion === 'v1') { + if (!res) return false; + return res.closed || !!res.errored; + } + return false; +}; + +// v2 版本: 启动定时器定期检查 Redis +const checkStoppingTimer = + apiVersion === 'v2' + ? setInterval(async () => { + stopping = await shouldWorkflowStop({ + appId: runningAppInfo.id, + chatId + }); + }, 100) + : undefined; +``` + +**设计要点**: +- v2 版本使用内存变量 `stopping` + 100ms 定时器轮询 Redis +- v1 版本仍使用原有的 `res.closed/res.errored` 检测 +- 轮询频率 100ms,平衡性能和响应速度 + +**改造点 2: 工作流完成后清理 (行 232-249)** + +```typescript +return runWorkflow({ + ...data, + checkIsStopping, // 传递检测函数 + query, + histories, + // ... 其他参数 +}).finally(async () => { + // 清理定时器 + if (streamCheckTimer) { + clearInterval(streamCheckTimer); + } + if (checkStoppingTimer) { + clearInterval(checkStoppingTimer); + } + + // Close mcpClient connections + Object.values(mcpClientMemory).forEach((client) => { + client.closeConnection(); + }); + + // 工作流完成后删除 Redis 记录 + await delAgentRuntimeStopSign({ + appId: runningAppInfo.id, + chatId + }); +}); +``` + +### 3.3 节点执行前的停止检测 + +**位置**: `packages/service/core/workflow/dispatch/index.ts:861-868` + +在 `checkNodeCanRun` 方法中,每个节点执行前检查: + +```typescript +private async checkNodeCanRun( + node: RuntimeNodeItemType, + skippedNodeIdList = new Set() +) { + // ... 其他检查逻辑 ... + + // Check queue status + if (data.maxRunTimes <= 0) { + addLog.error('Max run times is 0', { + appId: data.runningAppInfo.id + }); + return; + } + + // 停止检测 + if (checkIsStopping()) { + addLog.warn('Workflow stopped', { + appId: data.runningAppInfo.id, + nodeId: node.nodeId, + nodeName: node.name + }); + return; + } + + // ... 执行节点逻辑 ... +} +``` + +**说明**: +- 直接调用 `checkIsStopping()` 同步方法 +- 内部会检查内存变量 `stopping` +- 定时器每 100ms 更新一次该变量 +- 检测到停止时记录日志并直接返回,不执行节点 + +## 4. v2/chat/stop 接口设计 + +### 4.1 接口规范 + +**接口路径**: `/api/v2/chat/stop` + +**Schema 位置**: `packages/global/openapi/core/chat/api.ts` + +**接口文档位置**: `packages/global/openapi/core/chat/index.ts` + +**请求方法**: POST + +**请求参数**: +```typescript +// packages/global/openapi/core/chat/api.ts +export const StopV2ChatSchema = z + .object({ + appId: ObjectIdSchema.describe('应用ID'), + chatId: z.string().min(1).describe('对话ID'), + outLinkAuthData: OutLinkChatAuthSchema.optional().describe('外链鉴权数据') + }); + +export type StopV2ChatParams = z.infer; +``` + +**响应格式**: +```typescript +export const StopV2ChatResponseSchema = z + .object({ + success: z.boolean().describe('是否成功停止') + }); + +export type StopV2ChatResponse = z.infer; +``` + +### 4.2 接口实现 + +**文件位置**: `projects/app/src/pages/api/v2/chat/stop.ts` + +```typescript +import type { NextApiRequest, NextApiResponse } from 'next'; +import { NextAPI } from '@/service/middleware/entry'; +import { authChatCrud } from '@/service/support/permission/auth/chat'; +import { + setAgentRuntimeStop, + waitForWorkflowComplete +} from '@fastgpt/service/core/workflow/dispatch/workflowStatus'; +import { StopV2ChatSchema, type StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api'; + +async function handler(req: NextApiRequest, res: NextApiResponse): Promise { + const { appId, chatId, outLinkAuthData } = StopV2ChatSchema.parse(req.body); + + // 鉴权 (复用聊天 CRUD 鉴权) + await authChatCrud({ + req, + authToken: true, + authApiKey: true, + appId, + chatId, + ...outLinkAuthData + }); + + // 设置停止标志 + await setAgentRuntimeStop({ + appId, + chatId + }); + + // 等待工作流完成 (最多等待 5 秒) + await waitForWorkflowComplete({ appId, chatId, timeout: 5000 }); + + return { + success: true + }; +} + +export default NextAPI(handler); +``` + +**接口文档** (`packages/global/openapi/core/chat/index.ts`): + +```typescript +export const ChatPath: OpenAPIPath = { + // ... 其他路径 + + '/v2/chat/stop': { + post: { + summary: '停止 Agent 运行', + description: `优雅停止正在运行的 Agent, 会尝试等待当前节点结束后返回,最长 5s,超过 5s 仍未结束,则会返回成功。 +LLM 节点,流输出时会同时被终止,但 HTTP 请求节点这种可能长时间运行的,不会被终止。`, + tags: [TagsMap.chatPage], + requestBody: { + content: { + 'application/json': { + schema: StopV2ChatSchema + } + } + }, + responses: { + 200: { + description: '成功停止工作流', + content: { + 'application/json': { + schema: StopV2ChatResponseSchema + } + } + } + } + } + } +}; +``` + +**说明**: +- 接口使用 `authChatCrud` 进行鉴权,支持 Token 和 API Key +- 支持分享链接和团队空间的鉴权数据 +- 设置停止标志后等待最多 5 秒 +- 无论是否超时,都返回 `success: true` + +## 5. 前端改造 + +由于当前代码已经能够正常工作,且 v2 版本的后端已经实现了基于 Redis 的停止机制,前端可以保持现有的简单实现: + +**保持现有实现的原因**: +1. 后端已经通过定时器轮询 Redis 实现了停止检测 +2. 前端调用 `abort()` 后,后端会在下个检测周期(100ms内)发现停止标志 +3. 简化前端逻辑,避免增加复杂性 +4. 用户体验上,立即中断连接响应更快 + +**可选的增强方案**: + +如果需要在前端显示更详细的停止状态,可以添加 API 客户端函数: + +**文件位置**: `projects/app/src/web/core/chat/api.ts` + +```typescript +import { POST } from '@/web/common/api/request'; +import type { StopV2ChatParams, StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api'; + +/** + * 停止 v2 版本工作流运行 + */ +export const stopV2Chat = (data: StopV2ChatParams) => + POST('/api/v2/chat/stop', data); +``` + +**增强的 abortRequest 函数**: + +```typescript +/* Abort chat completions, questionGuide */ +const abortRequest = useMemoizedFn(async (reason: string = 'stop') => { + // 先调用 abort 中断连接 + chatController.current?.abort(new Error(reason)); + questionGuideController.current?.abort(new Error(reason)); + pluginController.current?.abort(new Error(reason)); + + // v2 版本: 可选地通知后端优雅停止 + if (chatBoxData?.app?.version === 'v2' && appId && chatId) { + try { + await stopV2Chat({ + appId, + chatId, + outLinkAuthData + }); + } catch (error) { + // 静默失败,不影响用户体验 + console.warn('Failed to notify backend to stop workflow', error); + } + } +}); +``` + +**建议**: +- **推荐**: 保持当前简单实现,后端已经足够健壮 +- **可选**: 如果需要更精确的停止状态追踪,可以实现上述增强方案 + +## 6. 完整调用流程 + +### 6.1 正常停止流程 + +``` +用户点击停止按钮 + ↓ +前端: abortRequest() + ↓ +前端: chatController.abort() [立即中断 HTTP 连接] + ↓ +[可选] 前端: POST /api/v2/chat/stop + ↓ +后端: setAgentRuntimeStop(appId, chatId) [设置停止标志] + ↓ +后端: 定时器检测到 Redis 停止标志,更新内存变量 stopping = true + ↓ +后端: 下个节点执行前 checkIsStopping() 返回 true + ↓ +后端: 停止处理新节点,记录日志 + ↓ +后端: 工作流 finally 块删除 Redis 停止标志 + ↓ +[可选] 后端: waitForWorkflowComplete() 检测到停止标志被删除 + ↓ +[可选] 前端: 显示停止成功提示 +``` + +### 6.2 超时流程 + +``` +[可选] 前端: POST /api/v2/chat/stop + ↓ +后端: setAgentRuntimeStop(appId, chatId) + ↓ +后端: waitForWorkflowComplete(timeout=5s) + ↓ +后端: 5秒后停止标志仍存在 + ↓ +后端: 返回成功响应 (不区分超时) + ↓ +[可选] 前端: 显示成功提示 + ↓ +后端: 工作流继续运行,最终完成后删除停止标志 +``` + +### 6.3 工作流自然完成流程 + +``` +工作流运行中 + ↓ +所有节点执行完成 + ↓ +dispatchWorkFlow.finally() + ↓ +删除 Redis 停止标志 + ↓ +清理定时器 + ↓ +60秒 TTL 确保即使删除失败也会自动清理 +``` + +### 6.4 时序说明 + +**关键时间点**: +- **100ms**: 后端定时器检查 Redis 停止标志的频率 +- **5s**: stop 接口等待工作流完成的超时时间 +- **60s**: Redis 键的 TTL,自动清理时间 + +**响应时间**: +- 用户点击停止 → HTTP 连接中断: **立即** (前端 abort) +- 停止标志写入 Redis: **< 50ms** (Redis SETEX 操作) +- 后端检测到停止: **< 100ms** (定时器轮询周期) +- 当前节点停止执行: **取决于节点类型** + - LLM 流式输出: **立即**中断流 + - HTTP 请求节点: **等待请求完成** + - 其他节点: **等待当前操作完成** + +## 7. 测试策略 + +### 7.1 单元测试 + +**Redis 工具函数测试**: +- `setAgentRuntimeStop` / `shouldWorkflowStop` 基本功能 +- `delAgentRuntimeStopSign` 删除功能 +- `waitForWorkflowComplete` 等待机制和超时 +- 并发操作安全性 + +**文件位置**: `test/cases/service/core/app/workflow/workflowStatus.test.ts` + +**测试用例**: +```typescript +describe('Workflow Status Redis Functions', () => { + test('should set stopping sign') + test('should return false for non-existent status') + test('should detect stopping status') + test('should return false after deleting stop sign') + test('should wait for workflow completion') + test('should timeout when waiting too long') + test('should delete workflow stop sign') + test('should handle concurrent stop sign operations') +}); +``` + diff --git a/document/content/docs/upgrading/4-14/4145.mdx b/document/content/docs/upgrading/4-14/4145.mdx index 5166bc0623..89b74d787d 100644 --- a/document/content/docs/upgrading/4-14/4145.mdx +++ b/document/content/docs/upgrading/4-14/4145.mdx @@ -11,7 +11,7 @@ description: 'FastGPT V4.14.5 更新说明' ## ⚙️ 优化 1. 优化获取 redis 所有 key 的逻辑,避免大量获取时导致阻塞。 -2. Redis 和 MQ 的重连逻辑优化。 +2. MongoDB, Redis 和 MQ 的重连逻辑优化。 ## 🐛 修复 diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index e7ecdc9004..c58df14f30 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -120,7 +120,7 @@ "document/content/docs/upgrading/4-14/4142.mdx": "2025-11-18T19:27:14+08:00", "document/content/docs/upgrading/4-14/4143.mdx": "2025-11-26T20:52:05+08:00", "document/content/docs/upgrading/4-14/4144.mdx": "2025-12-16T14:56:04+08:00", - "document/content/docs/upgrading/4-14/4145.mdx": "2025-12-19T00:08:30+08:00", + "document/content/docs/upgrading/4-14/4145.mdx": "2025-12-20T13:11:02+08:00", "document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00", diff --git a/packages/global/core/workflow/runtime/type.d.ts b/packages/global/core/workflow/runtime/type.d.ts index 483a944896..0900795dd6 100644 --- a/packages/global/core/workflow/runtime/type.d.ts +++ b/packages/global/core/workflow/runtime/type.d.ts @@ -40,6 +40,7 @@ export type ExternalProviderType = { /* workflow props */ export type ChatDispatchProps = { res?: NextApiResponse; + checkIsStopping: () => boolean; lang?: localeType; requestOrigin?: string; mode: 'test' | 'chat' | 'debug'; @@ -63,7 +64,7 @@ export type ChatDispatchProps = { }; uid: string; // Who run this workflow - chatId?: string; + chatId: string; responseChatItemId?: string; histories: ChatItemType[]; variables: Record; // global variable @@ -76,7 +77,7 @@ export type ChatDispatchProps = { maxRunTimes: number; isToolCall?: boolean; workflowStreamResponse?: WorkflowResponseType; - version?: 'v1' | 'v2'; + apiVersion?: 'v1' | 'v2'; workflowDispatchDeep: number; diff --git a/packages/global/openapi/core/chat/api.ts b/packages/global/openapi/core/chat/api.ts index cb0a97a61d..c81ab1fc1a 100644 --- a/packages/global/openapi/core/chat/api.ts +++ b/packages/global/openapi/core/chat/api.ts @@ -1,8 +1,38 @@ -import type { OutLinkChatAuthType } from '../../../support/permission/chat/type'; import { OutLinkChatAuthSchema } from '../../../support/permission/chat/type'; import { ObjectIdSchema } from '../../../common/type/mongo'; import z from 'zod'; +/* ============ v2/chat/stop ============ */ +export const StopV2ChatSchema = z + .object({ + appId: ObjectIdSchema.describe('应用ID'), + chatId: z.string().min(1).describe('对话ID'), + outLinkAuthData: OutLinkChatAuthSchema.optional().describe('外链鉴权数据') + }) + .meta({ + example: { + appId: '1234567890', + chatId: '1234567890', + outLinkAuthData: { + shareId: '1234567890', + outLinkUid: '1234567890' + } + } + }); +export type StopV2ChatParams = z.infer; + +export const StopV2ChatResponseSchema = z + .object({ + success: z.boolean().describe('是否成功停止') + }) + .meta({ + example: { + success: true + } + }); +export type StopV2ChatResponse = z.infer; + +/* ============ chat file ============ */ export const PresignChatFileGetUrlSchema = z .object({ key: z.string().min(1).describe('文件key'), diff --git a/packages/global/openapi/core/chat/index.ts b/packages/global/openapi/core/chat/index.ts index 48f1b53006..6b2ad43c9e 100644 --- a/packages/global/openapi/core/chat/index.ts +++ b/packages/global/openapi/core/chat/index.ts @@ -5,7 +5,12 @@ import { ChatFeedbackPath } from './feedback/index'; import { ChatHistoryPath } from './history/index'; import { z } from 'zod'; import { CreatePostPresignedUrlResultSchema } from '../../../../service/common/s3/type'; -import { PresignChatFileGetUrlSchema, PresignChatFilePostUrlSchema } from './api'; +import { + PresignChatFileGetUrlSchema, + PresignChatFilePostUrlSchema, + StopV2ChatSchema, + StopV2ChatResponseSchema +} from './api'; import { TagsMap } from '../../tag'; export const ChatPath: OpenAPIPath = { @@ -14,6 +19,31 @@ export const ChatPath: OpenAPIPath = { ...ChatFeedbackPath, ...ChatHistoryPath, + '/v2/chat/stop': { + post: { + summary: '停止 Agent 运行', + description: `优雅停止正在运行的 Agent, 会尝试等待当前节点结束后返回,最长 5s,超过 5s 仍未结束,则会返回成功。 +LLM 节点,流输出时会同时被终止,但 HTTP 请求节点这种可能长时间运行的,不会被终止。`, + tags: [TagsMap.chatPage], + requestBody: { + content: { + 'application/json': { + schema: StopV2ChatSchema + } + } + }, + responses: { + 200: { + description: '成功停止工作流', + content: { + 'application/json': { + schema: StopV2ChatResponseSchema + } + } + } + } + } + }, '/core/chat/presignChatFilePostUrl': { post: { summary: '获取文件上传 URL', diff --git a/packages/service/common/bullmq/index.ts b/packages/service/common/bullmq/index.ts index 2ee7df802a..72aff04562 100644 --- a/packages/service/common/bullmq/index.ts +++ b/packages/service/common/bullmq/index.ts @@ -60,7 +60,7 @@ export function getQueue( // default error handler, to avoid unhandled exceptions newQueue.on('error', (error) => { - addLog.error(`MQ Queue [${name}]: ${error.message}`, error); + addLog.error(`MQ Queue] error`, error); }); queues.set(name, newQueue); return newQueue; @@ -76,44 +76,59 @@ export function getWorker( return worker as Worker; } - const newWorker = new Worker(name.toString(), processor, { - connection: newWorkerRedisConnection(), - ...defaultWorkerOpts, - // BullMQ Worker important settings - lockDuration: 600000, // 10 minutes for large file operations - stalledInterval: 30000, // Check for stalled jobs every 30s - maxStalledCount: 3, // Move job to failed after 1 stall (default behavior) - ...opts - }); - // default error handler, to avoid unhandled exceptions - newWorker.on('error', async (error) => { - addLog.error(`MQ Worker error`, { - message: error.message, - data: { name } + const createWorker = () => { + const newWorker = new Worker(name.toString(), processor, { + connection: newWorkerRedisConnection(), + ...defaultWorkerOpts, + // BullMQ Worker important settings + lockDuration: 600000, // 10 minutes for large file operations + stalledInterval: 30000, // Check for stalled jobs every 30s + maxStalledCount: 3, // Move job to failed after 1 stall (default behavior) + ...opts }); - await newWorker.close(); - }); - // Critical: Worker has been closed - remove from pool - newWorker.on('closed', async () => { - addLog.error(`MQ Worker [${name}] closed unexpectedly`, { - data: { - name, - message: 'Worker will need to be manually restarted' + + // Worker is ready to process jobs (fired on initial connection and after reconnection) + newWorker.on('ready', () => { + addLog.info(`[MQ Worker] ready`, { name }); + }); + // default error handler, to avoid unhandled exceptions + newWorker.on('error', async (error) => { + addLog.error(`[MQ Worker] error`, { + message: error.message, + data: { name } + }); + }); + // Critical: Worker has been closed - remove from pool and restart + newWorker.on('closed', async () => { + addLog.warn(`[MQ Worker] closed, attempting restart...`); + + // Clean up: remove all listeners to prevent memory leaks + newWorker.removeAllListeners(); + + // Retry create new worker with infinite retries + while (true) { + try { + // Call getWorker to create a new worker (now workers.get(name) returns undefined) + const worker = createWorker(); + workers.set(name, worker); + addLog.info(`[MQ Worker] restarted successfully`); + break; + } catch (error) { + addLog.error(`[MQ Worker] failed to restart, retrying...`, error); + await delay(1000); + } } }); - try { + newWorker.on('paused', async () => { + addLog.warn(`[MQ Worker] paused`); await delay(1000); - workers.delete(name); - getWorker(name, processor, opts); - } catch (error) {} - }); + newWorker.resume(); + }); - newWorker.on('paused', async () => { - addLog.warn(`MQ Worker [${name}] paused`); - await delay(1000); - newWorker.resume(); - }); + return newWorker; + }; + const newWorker = createWorker(); workers.set(name, newWorker); return newWorker; } diff --git a/packages/service/common/mongo/init.ts b/packages/service/common/mongo/init.ts index cd5184b34b..f2dbd9cf78 100644 --- a/packages/service/common/mongo/init.ts +++ b/packages/service/common/mongo/init.ts @@ -31,26 +31,13 @@ export async function connectMongo(props: { db.set('strictQuery', 'throw'); db.connection.on('error', async (error) => { - console.log('mongo error', error); - try { - if (db.connection.readyState !== 0) { - RemoveListeners(); - await db.disconnect(); - await delay(1000); - await connectMongo(props); - } - } catch (error) {} + console.error('mongo error', error); + }); + db.connection.on('connected', async () => { + console.log('mongo connected'); }); db.connection.on('disconnected', async () => { - console.log('mongo disconnected'); - try { - if (db.connection.readyState !== 0) { - RemoveListeners(); - await db.disconnect(); - await delay(1000); - await connectMongo(props); - } - } catch (error) {} + console.error('mongo disconnected'); }); await db.connect(url, { @@ -64,9 +51,9 @@ export async function connectMongo(props: { maxIdleTimeMS: 300000, // 空闲连接超时: 5分钟,防止空闲连接长时间占用资源 retryWrites: true, // 重试写入: 重试写入失败的操作 retryReads: true, // 重试读取: 重试读取失败的操作 - serverSelectionTimeoutMS: 10000 // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞 + serverSelectionTimeoutMS: 10000, // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞 + heartbeatFrequencyMS: 5000 // 5s 进行一次健康检查 }); - console.log('mongo connected'); connectedCb?.(); diff --git a/packages/service/common/redis/index.ts b/packages/service/common/redis/index.ts index 701613d505..f9e0a2a7d3 100644 --- a/packages/service/common/redis/index.ts +++ b/packages/service/common/redis/index.ts @@ -19,9 +19,11 @@ const REDIS_BASE_OPTION = { // Reconnect on specific errors (Redis master-slave switch, network issues) reconnectOnError: (err: any) => { const reconnectErrors = ['READONLY', 'ECONNREFUSED', 'ETIMEDOUT', 'ECONNRESET']; - const shouldReconnect = reconnectErrors.some((errType) => err.message.includes(errType)); + const message = typeof err?.message === 'string' ? err.message : String(err ?? ''); + + const shouldReconnect = reconnectErrors.some((errType) => message.includes(errType)); if (shouldReconnect) { - addLog.warn(`Redis reconnecting due to error: ${err.message}`); + addLog.warn(`Redis reconnecting due to error: ${message}`); } return shouldReconnect; }, @@ -37,9 +39,6 @@ export const newQueueRedisConnection = () => { // Limit retries for queue operations maxRetriesPerRequest: 3 }); - redis.on('error', (error) => { - addLog.error('[Redis Queue connection error]', error); - }); return redis; }; @@ -49,9 +48,6 @@ export const newWorkerRedisConnection = () => { // BullMQ requires maxRetriesPerRequest: null for blocking operations maxRetriesPerRequest: null }); - redis.on('error', (error) => { - addLog.error('[Redis Worker connection error]', error); - }); return redis; }; @@ -65,11 +61,14 @@ export const getGlobalRedisConnection = () => { maxRetriesPerRequest: 3 }); + global.redisClient.on('connect', () => { + addLog.info('[Global Redis] connected'); + }); global.redisClient.on('error', (error) => { - addLog.error('[Redis Global connection error]', error); + addLog.error('[Global Redis] connection error', error); }); global.redisClient.on('close', () => { - addLog.warn('[Redis Global connection closed]'); + addLog.warn('[Global Redis] connection closed'); }); return global.redisClient; diff --git a/packages/service/common/s3/mq.ts b/packages/service/common/s3/mq.ts index c4712c7dce..f04d5f157b 100644 --- a/packages/service/common/s3/mq.ts +++ b/packages/service/common/s3/mq.ts @@ -40,7 +40,7 @@ export const addS3DelJob = async (data: S3MQJobData): Promise => { await queue.add('delete-s3-files', data, { jobId, ...jobOption }); }; -const prefixDel = async (bucket: S3BaseBucket, prefix: string) => { +export const prefixDel = async (bucket: S3BaseBucket, prefix: string) => { addLog.debug(`[S3 delete] delete prefix: ${prefix}`); let tasks: Promise[] = []; return new Promise(async (resolve, reject) => { @@ -103,7 +103,7 @@ export const startS3DelWorker = async () => { } }, { - concurrency: 3 + concurrency: 6 } ); }; diff --git a/packages/service/core/chat/chatSchema.ts b/packages/service/core/chat/chatSchema.ts index 6bb62cc744..6dd01c86b6 100644 --- a/packages/service/core/chat/chatSchema.ts +++ b/packages/service/core/chat/chatSchema.ts @@ -196,6 +196,7 @@ try { // timer, clear history ChatSchema.index({ updateTime: -1, teamId: 1 }); + ChatSchema.index({ teamId: 1, updateTime: -1 }); } catch (error) { console.log(error); } diff --git a/packages/service/core/workflow/dispatch/ai/chat.ts b/packages/service/core/workflow/dispatch/ai/chat.ts index 574e774616..9234578600 100644 --- a/packages/service/core/workflow/dispatch/ai/chat.ts +++ b/packages/service/core/workflow/dispatch/ai/chat.ts @@ -64,6 +64,7 @@ export type ChatResponse = DispatchNodeResultType< export const dispatchChatCompletion = async (props: ChatProps): Promise => { let { res, + checkIsStopping, requestOrigin, stream = false, retainDatasetCite = true, @@ -201,7 +202,7 @@ export const dispatchChatCompletion = async (props: ChatProps): Promise res?.closed, + isAborted: checkIsStopping, onReasoning({ text }) { if (!aiChatReasoning) return; workflowStreamResponse?.({ diff --git a/packages/service/core/workflow/dispatch/ai/tool/toolCall.ts b/packages/service/core/workflow/dispatch/ai/tool/toolCall.ts index 68129fa65a..807d09211c 100644 --- a/packages/service/core/workflow/dispatch/ai/tool/toolCall.ts +++ b/packages/service/core/workflow/dispatch/ai/tool/toolCall.ts @@ -18,6 +18,7 @@ export const runToolCall = async (props: DispatchToolModuleProps): Promise res?.closed, + isAborted: checkIsStopping, userKey: externalProvider.openaiAccount, onReasoning({ text }) { if (!aiChatReasoning) return; diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index 88e865c9f2..b6be94f1c5 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -59,10 +59,11 @@ import { TeamErrEnum } from '@fastgpt/global/common/error/code/team'; import { i18nT } from '../../../../web/i18n/utils'; import { clone } from 'lodash'; import { validateFileUrlDomain } from '../../../common/security/fileUrlValidator'; +import { delAgentRuntimeStopSign, shouldWorkflowStop } from './workflowStatus'; type Props = Omit< ChatDispatchProps, - 'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables' + 'checkIsStopping' | 'workflowDispatchDeep' | 'timezone' | 'externalProvider' | 'cloneVariables' > & { runtimeNodes: RuntimeNodeItemType[]; runtimeEdges: RuntimeEdgeItemType[]; @@ -87,7 +88,17 @@ export async function dispatchWorkFlow({ concatUsage, ...data }: Props & WorkflowUsageProps): Promise { - const { res, stream, runningUserInfo, runningAppInfo, lastInteractive, histories, query } = data; + const { + res, + stream, + runningUserInfo, + runningAppInfo, + lastInteractive, + histories, + query, + chatId, + apiVersion + } = data; // Check url valid const invalidInput = query.some((item) => { @@ -101,6 +112,8 @@ export async function dispatchWorkFlow({ addLog.info('[Workflow run] Invalid file url'); return Promise.reject(new UserError('Invalid file url')); } + + /* Init function */ // Check point await checkTeamAIPoints(runningUserInfo.teamId); @@ -120,7 +133,22 @@ export async function dispatchWorkFlow({ }); } return usageId; - })() + })(), + // Add preview url to chat items + await addPreviewUrlToChatItems(histories, 'chatFlow'), + // Add preview url to query + ...query.map(async (item) => { + if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) return; + item.file.url = await getS3ChatSource().createGetChatFileURL({ + key: item.file.key, + external: true + }); + }), + // Remove stopping sign + delAgentRuntimeStopSign({ + appId: runningAppInfo.id, + chatId + }) ]); let streamCheckTimer: NodeJS.Timeout | null = null; @@ -152,16 +180,6 @@ export async function dispatchWorkFlow({ } } - // Add preview url to chat items - await addPreviewUrlToChatItems(histories, 'chatFlow'); - for (const item of query) { - if (item.type !== ChatItemValueTypeEnum.file || !item.file?.key) continue; - item.file.url = await getS3ChatSource().createGetChatFileURL({ - key: item.file.key, - external: true - }); - } - // Get default variables const cloneVariables = clone(data.variables); const defaultVariables = { @@ -173,12 +191,34 @@ export async function dispatchWorkFlow({ timezone })) }; - + // MCP let mcpClientMemory = {} as Record; + // Stop sign(没有 apiVersion,说明不会有暂停) + let stopping = false; + const checkIsStopping = (): boolean => { + if (apiVersion === 'v2') { + return stopping; + } + if (apiVersion === 'v1') { + if (!res) return false; + return res.closed || !!res.errored; + } + return false; + }; + const checkStoppingTimer = + apiVersion === 'v2' + ? setInterval(async () => { + stopping = await shouldWorkflowStop({ + appId: runningAppInfo.id, + chatId + }); + }, 100) + : undefined; // Init some props return runWorkflow({ ...data, + checkIsStopping, query, histories, timezone, @@ -189,15 +229,24 @@ export async function dispatchWorkFlow({ concatUsage, mcpClientMemory, cloneVariables - }).finally(() => { + }).finally(async () => { if (streamCheckTimer) { clearInterval(streamCheckTimer); } + if (checkStoppingTimer) { + clearInterval(checkStoppingTimer); + } // Close mcpClient connections Object.values(mcpClientMemory).forEach((client) => { client.closeConnection(); }); + + // 工作流完成后删除 Redis 记录 + await delAgentRuntimeStopSign({ + appId: runningAppInfo.id, + chatId + }); }); } @@ -210,14 +259,14 @@ type RunWorkflowProps = ChatDispatchProps & { }; export const runWorkflow = async (data: RunWorkflowProps): Promise => { let { - res, + apiVersion, + checkIsStopping, runtimeNodes = [], runtimeEdges = [], histories = [], variables = {}, externalProvider, retainDatasetCite = true, - version = 'v1', responseDetail = true, responseAllData = true, usageId, @@ -328,10 +377,6 @@ export const runWorkflow = async (data: RunWorkflowProps): Promise { + return `${WORKFLOW_STATUS_PREFIX}:${params.appId}:${params.chatId}`; +}; + +// 暂停任务 +export const setAgentRuntimeStop = async (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + await redis.set(key, 1, 'EX', TTL); +}; + +// 删除任务状态 +export const delAgentRuntimeStopSign = async (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + await redis.del(key).catch((err) => { + addLog.error(`[Agent Runtime Stop] Delete stop sign error`, err); + }); +}; + +// 检查工作流是否应该停止 +export const shouldWorkflowStop = (params: WorkflowStatusParams): Promise => { + const redis = getGlobalRedisConnection(); + const key = getRuntimeStatusKey(params); + return redis + .get(key) + .then((res) => !!res) + .catch(() => false); +}; + +/** + * 等待工作流完成(记录被删除) + * @param params 工作流参数 + * @param timeout 超时时间(毫秒),默认5秒 + * @param pollInterval 轮询间隔(毫秒),默认50毫秒 + * @returns true=正常完成, false=超时 + */ +export const waitForWorkflowComplete = async ({ + appId, + chatId, + timeout = 5000, + pollInterval = 50 +}: { + appId: string; + chatId: string; + timeout?: number; + pollInterval?: number; +}) => { + const startTime = Date.now(); + + while (Date.now() - startTime < timeout) { + const sign = await shouldWorkflowStop({ appId, chatId }); + + // 如果没有暂停中的标志,则认为已经完成任务了。 + if (!sign) { + return; + } + + // 等待下一次轮询 + await delay(pollInterval); + } + + return; +}; diff --git a/packages/web/i18n/zh-CN/chat.json b/packages/web/i18n/zh-CN/chat.json index d2ea791445..91366610d4 100644 --- a/packages/web/i18n/zh-CN/chat.json +++ b/packages/web/i18n/zh-CN/chat.json @@ -17,7 +17,7 @@ "clear_input_value": "清空输入", "click_contextual_preview": "点击查看上下文预览", "click_to_add_url": "输入文件链接", - "completion_finish_close": "连接断开", + "completion_finish_close": "请求关闭", "completion_finish_content_filter": "触发安全风控", "completion_finish_function_call": "函数调用", "completion_finish_length": "超出回复限制", diff --git a/projects/app/src/components/core/chat/ChatContainer/ChatBox/Input/ChatInput.tsx b/projects/app/src/components/core/chat/ChatContainer/ChatBox/Input/ChatInput.tsx index cea60a59e9..3d56aceda6 100644 --- a/projects/app/src/components/core/chat/ChatContainer/ChatBox/Input/ChatInput.tsx +++ b/projects/app/src/components/core/chat/ChatContainer/ChatBox/Input/ChatInput.tsx @@ -19,6 +19,8 @@ import { useFileUpload } from '../hooks/useFileUpload'; import ComplianceTip from '@/components/common/ComplianceTip/index'; import { useToast } from '@fastgpt/web/hooks/useToast'; import VoiceInput, { type VoiceInputComponentRef } from './VoiceInput'; +import MyBox from '@fastgpt/web/components/common/MyBox'; +import { postStopV2Chat } from '@/web/core/chat/api'; const InputGuideBox = dynamic(() => import('./InputGuideBox')); @@ -124,6 +126,19 @@ const ChatInput = ({ }, [TextareaDom, canSendMessage, fileList, onSendMessage, replaceFiles] ); + const { runAsync: handleStop, loading: isStopping } = useRequest2(async () => { + try { + if (isChatting) { + await postStopV2Chat({ + appId, + chatId, + outLinkAuthData + }).catch(); + } + } finally { + onStop(); + } + }); const RenderTextarea = useMemo( () => ( @@ -329,7 +344,9 @@ const ChatInput = ({ {/* Send Button Container */} - { e.stopPropagation(); if (isChatting) { - return onStop(); + return handleStop(); } return handleSend(); }} @@ -355,7 +372,7 @@ const ChatInput = ({ )} - + @@ -370,12 +387,13 @@ const ChatInput = ({ whisperConfig?.open, inputValue, t, + isStopping, isChatting, canSendMessage, onOpenSelectFile, onSelectFile, handleSend, - onStop + handleStop ]); const activeStyles: FlexProps = { diff --git a/projects/app/src/components/core/chat/ChatContainer/ChatBox/index.tsx b/projects/app/src/components/core/chat/ChatContainer/ChatBox/index.tsx index 9db60858e5..7f12ed15f6 100644 --- a/projects/app/src/components/core/chat/ChatContainer/ChatBox/index.tsx +++ b/projects/app/src/components/core/chat/ChatContainer/ChatBox/index.tsx @@ -432,10 +432,10 @@ const ChatBox = ({ }, [questionGuide, appId, chatId, outLinkAuthData, scrollToBottom]); /* Abort chat completions, questionGuide */ - const abortRequest = useMemoizedFn((signal: string = 'stop') => { - chatController.current?.abort(signal); - questionGuideController.current?.abort(signal); - pluginController.current?.abort(signal); + const abortRequest = useMemoizedFn((reason: string = 'stop') => { + chatController.current?.abort(new Error(reason)); + questionGuideController.current?.abort(new Error(reason)); + pluginController.current?.abort(new Error(reason)); }); /** @@ -463,8 +463,7 @@ const ChatBox = ({ } // Abort the previous request - abortRequest(); - questionGuideController.current?.abort('stop'); + questionGuideController.current?.abort(new Error('stop')); text = text.trim(); @@ -605,16 +604,18 @@ const ChatBox = ({ newChatHistories = state.map((item, index) => { if (index !== state.length - 1) return item; - // Check node response error const responseData = mergeChatResponseData(item.responseData || []); - const err = - responseData[responseData.length - 1]?.error || - responseData[responseData.length - 1]?.errorText; - if (err) { - toast({ - title: t(getErrText(err)), - status: 'warning' - }); + // Check node response error + if (!abortSignal?.signal?.aborted) { + const err = + responseData[responseData.length - 1]?.error || + responseData[responseData.length - 1]?.errorText; + if (err) { + toast({ + title: t(getErrText(err)), + status: 'warning' + }); + } } return { @@ -1184,7 +1185,7 @@ const ChatBox = ({ ) : ( chatController.current?.abort('stop')} + onStop={() => abortRequest('stop')} TextareaDom={TextareaDom} resetInputVal={resetInputVal} chatForm={chatForm} @@ -1206,7 +1207,7 @@ const ChatBox = ({ chatController.current?.abort('stop')} + onStop={() => abortRequest('stop')} TextareaDom={TextareaDom} resetInputVal={resetInputVal} chatForm={chatForm} diff --git a/projects/app/src/pages/api/core/chat/chatTest.ts b/projects/app/src/pages/api/core/chat/chatTest.ts index 4d992f2ad8..8c934d3970 100644 --- a/projects/app/src/pages/api/core/chat/chatTest.ts +++ b/projects/app/src/pages/api/core/chat/chatTest.ts @@ -181,6 +181,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { /* start process */ const { flowResponses, assistantResponses, system_memories, newVariables, durationSeconds } = await dispatchWorkFlow({ + apiVersion: 'v2', res, lang: getLocale(req), requestOrigin: req.headers.origin, @@ -209,7 +210,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { stream: true, maxRunTimes: WORKFLOW_MAX_RUN_TIMES, workflowStreamResponse: workflowResponseWrite, - version: 'v2', responseDetail: true }); diff --git a/projects/app/src/pages/api/core/workflow/debug.ts b/projects/app/src/pages/api/core/workflow/debug.ts index 7485966f84..28e07da203 100644 --- a/projects/app/src/pages/api/core/workflow/debug.ts +++ b/projects/app/src/pages/api/core/workflow/debug.ts @@ -11,7 +11,7 @@ import { WORKFLOW_MAX_RUN_TIMES } from '@fastgpt/service/core/workflow/constants import { getLastInteractiveValue } from '@fastgpt/global/core/workflow/runtime/utils'; import { getLocale } from '@fastgpt/service/common/middle/i18n'; import { createChatUsageRecord } from '@fastgpt/service/support/wallet/usage/controller'; -import { clone } from 'lodash'; +import { getNanoid } from '@fastgpt/global/common/string/tools'; async function handler( req: NextApiRequest, @@ -73,6 +73,7 @@ async function handler( tmbId: app.tmbId }, runningUserInfo: await getRunningUserInfoByTmbId(tmbId), + chatId: getNanoid(), runtimeNodes: nodes, runtimeEdges: edges, defaultSkipNodeQueue: skipNodeQueue, diff --git a/projects/app/src/pages/api/v1/chat/completions.ts b/projects/app/src/pages/api/v1/chat/completions.ts index 004e5f60a2..3e3c2d3613 100644 --- a/projects/app/src/pages/api/v1/chat/completions.ts +++ b/projects/app/src/pages/api/v1/chat/completions.ts @@ -278,6 +278,8 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { showNodeStatus }); + const saveChatId = chatId || getNanoid(24); + /* start flow controller */ const { flowResponses, @@ -289,6 +291,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { } = await (async () => { if (app.version === 'v2') { return dispatchWorkFlow({ + apiVersion: 'v1', res, lang: getLocale(req), requestOrigin: req.headers.origin, @@ -304,7 +307,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { runningUserInfo: await getRunningUserInfoByTmbId(tmbId), uid: String(outLinkUserId || tmbId), - chatId, + chatId: saveChatId, responseChatItemId, runtimeNodes, runtimeEdges: storeEdges2RuntimeEdges(edges, interactive), @@ -351,7 +354,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { memories: system_memories }; - const saveChatId = chatId || getNanoid(24); const params: SaveChatProps = { chatId: saveChatId, appId: app._id, diff --git a/projects/app/src/pages/api/v2/chat/completions.ts b/projects/app/src/pages/api/v2/chat/completions.ts index 26fef8dca9..14c442dd7d 100644 --- a/projects/app/src/pages/api/v2/chat/completions.ts +++ b/projects/app/src/pages/api/v2/chat/completions.ts @@ -278,6 +278,8 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { showNodeStatus }); + const saveChatId = chatId || getNanoid(24); + /* start flow controller */ const { flowResponses, @@ -289,6 +291,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { } = await (async () => { if (app.version === 'v2') { return dispatchWorkFlow({ + apiVersion: 'v2', res, lang: getLocale(req), requestOrigin: req.headers.origin, @@ -304,7 +307,7 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { runningUserInfo: await getRunningUserInfoByTmbId(tmbId), uid: String(outLinkUserId || tmbId), - chatId, + chatId: saveChatId, responseChatItemId, runtimeNodes, runtimeEdges: storeEdges2RuntimeEdges(edges, interactive), @@ -317,7 +320,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { retainDatasetCite, maxRunTimes: WORKFLOW_MAX_RUN_TIMES, workflowStreamResponse: workflowResponseWrite, - version: 'v2', responseAllData, responseDetail }); @@ -354,7 +356,6 @@ async function handler(req: NextApiRequest, res: NextApiResponse) { memories: system_memories }; - const saveChatId = chatId || getNanoid(24); const params: SaveChatProps = { chatId: saveChatId, appId: app._id, diff --git a/projects/app/src/pages/api/v2/chat/stop.ts b/projects/app/src/pages/api/v2/chat/stop.ts new file mode 100644 index 0000000000..e027272f09 --- /dev/null +++ b/projects/app/src/pages/api/v2/chat/stop.ts @@ -0,0 +1,36 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { NextAPI } from '@/service/middleware/entry'; +import { authChatCrud } from '@/service/support/permission/auth/chat'; +import { + setAgentRuntimeStop, + waitForWorkflowComplete +} from '@fastgpt/service/core/workflow/dispatch/workflowStatus'; +import { StopV2ChatSchema, type StopV2ChatResponse } from '@fastgpt/global/openapi/core/chat/api'; + +async function handler(req: NextApiRequest, res: NextApiResponse): Promise { + const { appId, chatId, outLinkAuthData } = StopV2ChatSchema.parse(req.body); + + await authChatCrud({ + req, + authToken: true, + authApiKey: true, + appId, + chatId, + ...outLinkAuthData + }); + + // 设置停止状态 + await setAgentRuntimeStop({ + appId, + chatId + }); + + // 等待工作流完成 (最多等待 5 秒) + await waitForWorkflowComplete({ appId, chatId, timeout: 5000 }); + + return { + success: true + }; +} + +export default NextAPI(handler); diff --git a/projects/app/src/web/core/chat/api.ts b/projects/app/src/web/core/chat/api.ts index 4a7a482481..551e2106c9 100644 --- a/projects/app/src/web/core/chat/api.ts +++ b/projects/app/src/web/core/chat/api.ts @@ -24,6 +24,7 @@ import type { UpdateFavouriteAppParamsType } from '@fastgpt/global/openapi/core/chat/favourite/api'; import type { ChatFavouriteAppType } from '@fastgpt/global/core/chat/favouriteApp/type'; +import type { StopV2ChatParams } from '@fastgpt/global/openapi/core/chat/api'; /** * 获取初始化聊天内容 @@ -76,3 +77,6 @@ export const updateFavouriteAppTags = (data: { id: string; tags: string[] }[]) = export const deleteFavouriteApp = (data: { id: string }) => DELETE('/proApi/core/chat/setting/favourite/delete', data); + +/* Chat controller */ +export const postStopV2Chat = (data: StopV2ChatParams) => POST('/v2/chat/stop', data); diff --git a/projects/app/test/api/core/chat/feedback/closeCustom.test.ts b/projects/app/test/api/core/chat/feedback/closeCustom.test.ts index a8a8d32045..b730c57302 100644 --- a/projects/app/test/api/core/chat/feedback/closeCustom.test.ts +++ b/projects/app/test/api/core/chat/feedback/closeCustom.test.ts @@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe.sequential('closeCustom api test', () => { +describe('closeCustom api test', () => { let testUser: Awaited>; let appId: string; let chatId: string; diff --git a/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts b/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts index ddebacf9b9..ceac2d2064 100644 --- a/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts +++ b/projects/app/test/api/core/chat/feedback/updateFeedbackReadStatus.test.ts @@ -13,7 +13,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe.sequential('updateFeedbackReadStatus api test', () => { +describe('updateFeedbackReadStatus api test', () => { let testUser: Awaited>; let appId: string; let chatId: string; diff --git a/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts b/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts index 8163db5300..0b8cf98ad8 100644 --- a/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts +++ b/projects/app/test/api/core/chat/feedback/updateUserFeedback.test.ts @@ -14,7 +14,7 @@ import { getUser } from '@test/datas/users'; import { Call } from '@test/utils/request'; import { describe, expect, it, beforeEach } from 'vitest'; -describe.sequential('updateUserFeedback api test', () => { +describe('updateUserFeedback api test', () => { let testUser: Awaited>; let appId: string; let chatId: string; diff --git a/projects/marketplace/src/service/mongo/index.ts b/projects/marketplace/src/service/mongo/index.ts index 1cb3f6e365..48fe0e566f 100644 --- a/projects/marketplace/src/service/mongo/index.ts +++ b/projects/marketplace/src/service/mongo/index.ts @@ -3,6 +3,7 @@ import type { Model, Schema } from 'mongoose'; import { Mongoose } from 'mongoose'; export const MONGO_URL = process.env.MONGODB_URI ?? ''; +const maxConnecting = Math.max(30, Number(process.env.DB_MAX_LINK || 20)); declare global { var mongodb: Mongoose | undefined; @@ -52,49 +53,30 @@ export async function connectMongo(db: Mongoose, url: string): Promise db.connection.removeAllListeners('disconnected'); db.set('strictQuery', 'throw'); - db.connection.on('error', async (error: any) => { - addLog.error('mongo error', error); - try { - if (db.connection.readyState !== 0) { - await db.disconnect(); - await delay(1000); - await connectMongo(db, url); - } - } catch (_error) { - addLog.error('Error during reconnection:', _error); - } + db.connection.on('error', async (error) => { + console.error('mongo error', error); + }); + db.connection.on('connected', async () => { + console.log('mongo connected'); }); - db.connection.on('disconnected', async () => { - addLog.warn('mongo disconnected'); - try { - if (db.connection.readyState !== 0) { - await db.disconnect(); - await delay(1000); - await connectMongo(db, url); - } - } catch (_error) { - addLog.error('Error during reconnection:', _error); - } + console.error('mongo disconnected'); }); - const options = { + await db.connect(url, { bufferCommands: true, - maxPoolSize: Math.max(30, Number(process.env.MONGO_MAX_LINK || 20)), - minPoolSize: 20, - connectTimeoutMS: 60000, - waitQueueTimeoutMS: 60000, - socketTimeoutMS: 60000, - maxIdleTimeMS: 300000, - retryWrites: true, - retryReads: true, - serverSelectionTimeoutMS: 60000, - heartbeatFrequencyMS: 20000, - maxStalenessSeconds: 120 - }; - - await db.connect(url, options); - addLog.info('mongo connected'); + maxConnecting: maxConnecting, // 最大连接数: 防止连接数过多时无法满足需求 + maxPoolSize: maxConnecting, // 最大连接池大小: 防止连接池过大时无法满足需求 + minPoolSize: 20, // 最小连接数: 20,防止连接数过少时无法满足需求 + connectTimeoutMS: 60000, // 连接超时: 60秒,防止连接失败时长时间阻塞 + waitQueueTimeoutMS: 60000, // 等待队列超时: 60秒,防止等待队列长时间阻塞 + socketTimeoutMS: 60000, // Socket 超时: 60秒,防止Socket连接失败时长时间阻塞 + maxIdleTimeMS: 300000, // 空闲连接超时: 5分钟,防止空闲连接长时间占用资源 + retryWrites: true, // 重试写入: 重试写入失败的操作 + retryReads: true, // 重试读取: 重试读取失败的操作 + serverSelectionTimeoutMS: 10000, // 服务器选择超时: 10秒,防止副本集故障时长时间阻塞 + heartbeatFrequencyMS: 5000 // 5s 进行一次健康检查 + }); return db; } catch (error) { addLog.error('Mongo connect error', error); diff --git a/test/cases/service/core/app/workflow/workflowStatus.test.ts b/test/cases/service/core/app/workflow/workflowStatus.test.ts new file mode 100644 index 0000000000..4565379804 --- /dev/null +++ b/test/cases/service/core/app/workflow/workflowStatus.test.ts @@ -0,0 +1,117 @@ +import { describe, test, expect, beforeEach } from 'vitest'; +import { + setAgentRuntimeStop, + delAgentRuntimeStopSign, + shouldWorkflowStop, + waitForWorkflowComplete +} from '@fastgpt/service/core/workflow/dispatch/workflowStatus'; + +describe('Workflow Status Redis Functions', () => { + const testAppId = 'test_app_123'; + const testChatId = 'test_chat_456'; + + beforeEach(async () => { + // 清理测试数据 + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + }); + + test('should set stopping sign', async () => { + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); + + test('should return false for non-existent status', async () => { + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should detect stopping status', async () => { + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); + + test('should return false after deleting stop sign', async () => { + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + await delAgentRuntimeStopSign({ + appId: testAppId, + chatId: testChatId + }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should wait for workflow completion', async () => { + // 设置初始停止标志 + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + + // 模拟异步完成(删除停止标志) + setTimeout(async () => { + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + }, 500); + + // 等待完成,waitForWorkflowComplete 现在是 void 返回 + await waitForWorkflowComplete({ + appId: testAppId, + chatId: testChatId, + timeout: 2000 + }); + + // 验证停止标志已被删除 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should timeout when waiting too long', async () => { + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + + // 等待超时(不删除标志) + await waitForWorkflowComplete({ + appId: testAppId, + chatId: testChatId, + timeout: 100 + }); + + // 验证停止标志仍然存在 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); + + test('should delete workflow stop sign', async () => { + await setAgentRuntimeStop({ + appId: testAppId, + chatId: testChatId + }); + await delAgentRuntimeStopSign({ appId: testAppId, chatId: testChatId }); + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(false); + }); + + test('should handle concurrent stop sign operations', async () => { + // 并发设置停止标志 + await Promise.all([ + setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }), + setAgentRuntimeStop({ appId: testAppId, chatId: testChatId }) + ]); + + // 停止标志应该存在 + const shouldStop = await shouldWorkflowStop({ appId: testAppId, chatId: testChatId }); + expect(shouldStop).toBe(true); + }); +}); diff --git a/test/mocks/common/redis.ts b/test/mocks/common/redis.ts index c4d94ec9b0..b05465f735 100644 --- a/test/mocks/common/redis.ts +++ b/test/mocks/common/redis.ts @@ -1,73 +1,233 @@ import { vi } from 'vitest'; +// In-memory storage for mock Redis +const createRedisStorage = () => { + const storage = new Map(); + const expiryMap = new Map(); + + // Check and remove expired keys + const isExpired = (key: string): boolean => { + const expiry = expiryMap.get(key); + if (expiry && expiry < Date.now()) { + storage.delete(key); + expiryMap.delete(key); + return true; + } + return false; + }; + + return { + get: (key: string) => { + if (isExpired(key)) return null; + return storage.get(key) ?? null; + }, + set: (key: string, value: any, exMode?: string, exValue?: number) => { + storage.set(key, value); + // Handle EX (seconds) and PX (milliseconds) options + if (exMode === 'EX' && typeof exValue === 'number') { + expiryMap.set(key, Date.now() + exValue * 1000); + } else if (exMode === 'PX' && typeof exValue === 'number') { + expiryMap.set(key, Date.now() + exValue); + } + return 'OK'; + }, + del: (...keys: string[]) => { + let deletedCount = 0; + keys.forEach((key) => { + if (storage.has(key)) { + storage.delete(key); + expiryMap.delete(key); + deletedCount++; + } + }); + return deletedCount; + }, + exists: (...keys: string[]) => { + let count = 0; + keys.forEach((key) => { + if (!isExpired(key) && storage.has(key)) count++; + }); + return count; + }, + clear: () => { + storage.clear(); + expiryMap.clear(); + } + }; +}; + // Create a comprehensive mock Redis client factory -const createMockRedisClient = () => ({ - // Connection methods - on: vi.fn().mockReturnThis(), - connect: vi.fn().mockResolvedValue(undefined), - disconnect: vi.fn().mockResolvedValue(undefined), - quit: vi.fn().mockResolvedValue('OK'), - duplicate: vi.fn(function (this: any) { - return createMockRedisClient(); - }), +const createMockRedisClient = () => { + const redisStorage = createRedisStorage(); - // Key-value operations - get: vi.fn().mockResolvedValue(null), - set: vi.fn().mockResolvedValue('OK'), - del: vi.fn().mockResolvedValue(1), - exists: vi.fn().mockResolvedValue(0), - keys: vi.fn().mockResolvedValue([]), - scan: vi.fn().mockImplementation((cursor) => { - // 模拟多次迭代的场景 - if (cursor === '0') return ['100', ['key1', 'key2']]; - if (cursor === '100') return ['0', ['key3']]; - return ['0', []]; - }), + return { + // Connection methods + on: vi.fn().mockReturnThis(), + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + quit: vi.fn().mockResolvedValue('OK'), + duplicate: vi.fn(function (this: any) { + return createMockRedisClient(); + }), - // Hash operations - hget: vi.fn().mockResolvedValue(null), - hset: vi.fn().mockResolvedValue(1), - hdel: vi.fn().mockResolvedValue(1), - hgetall: vi.fn().mockResolvedValue({}), - hmset: vi.fn().mockResolvedValue('OK'), + // Key-value operations with actual storage + get: vi.fn().mockImplementation((key: string) => Promise.resolve(redisStorage.get(key))), + set: vi + .fn() + .mockImplementation((key: string, value: any, exMode?: string, exValue?: number) => + Promise.resolve(redisStorage.set(key, value, exMode, exValue)) + ), + del: vi + .fn() + .mockImplementation((...keys: string[]) => Promise.resolve(redisStorage.del(...keys))), + exists: vi + .fn() + .mockImplementation((...keys: string[]) => Promise.resolve(redisStorage.exists(...keys))), + keys: vi.fn().mockResolvedValue([]), + scan: vi.fn().mockImplementation((cursor) => { + // 模拟多次迭代的场景 + if (cursor === '0') return ['100', ['key1', 'key2']]; + if (cursor === '100') return ['0', ['key3']]; + return ['0', []]; + }), - // Expiry operations - expire: vi.fn().mockResolvedValue(1), - ttl: vi.fn().mockResolvedValue(-1), - expireat: vi.fn().mockResolvedValue(1), + // Hash operations + hget: vi.fn().mockResolvedValue(null), + hset: vi.fn().mockResolvedValue(1), + hdel: vi.fn().mockResolvedValue(1), + hgetall: vi.fn().mockResolvedValue({}), + hmset: vi.fn().mockResolvedValue('OK'), - // Increment operations - incr: vi.fn().mockResolvedValue(1), - decr: vi.fn().mockResolvedValue(1), - incrby: vi.fn().mockResolvedValue(1), - decrby: vi.fn().mockResolvedValue(1), - incrbyfloat: vi.fn().mockResolvedValue(1), + // Expiry operations + expire: vi.fn().mockResolvedValue(1), + ttl: vi.fn().mockResolvedValue(-1), + expireat: vi.fn().mockResolvedValue(1), - // Server commands - info: vi.fn().mockResolvedValue(''), - ping: vi.fn().mockResolvedValue('PONG'), - flushdb: vi.fn().mockResolvedValue('OK'), + // Increment operations + incr: vi.fn().mockResolvedValue(1), + decr: vi.fn().mockResolvedValue(1), + incrby: vi.fn().mockResolvedValue(1), + decrby: vi.fn().mockResolvedValue(1), + incrbyfloat: vi.fn().mockResolvedValue(1), - // List operations - lpush: vi.fn().mockResolvedValue(1), - rpush: vi.fn().mockResolvedValue(1), - lpop: vi.fn().mockResolvedValue(null), - rpop: vi.fn().mockResolvedValue(null), - llen: vi.fn().mockResolvedValue(0), + // Server commands + info: vi.fn().mockResolvedValue(''), + ping: vi.fn().mockResolvedValue('PONG'), + flushdb: vi.fn().mockResolvedValue('OK'), - // Set operations - sadd: vi.fn().mockResolvedValue(1), - srem: vi.fn().mockResolvedValue(1), - smembers: vi.fn().mockResolvedValue([]), - sismember: vi.fn().mockResolvedValue(0), + // List operations + lpush: vi.fn().mockResolvedValue(1), + rpush: vi.fn().mockResolvedValue(1), + lpop: vi.fn().mockResolvedValue(null), + rpop: vi.fn().mockResolvedValue(null), + llen: vi.fn().mockResolvedValue(0), - // pipeline - pipeline: vi.fn(() => ({ - del: vi.fn().mockReturnThis(), - unlink: vi.fn().mockReturnThis(), - exec: vi.fn().mockResolvedValue([]) - })) -}); + // Set operations + sadd: vi.fn().mockResolvedValue(1), + srem: vi.fn().mockResolvedValue(1), + smembers: vi.fn().mockResolvedValue([]), + sismember: vi.fn().mockResolvedValue(0), + + // pipeline + pipeline: vi.fn(() => ({ + del: vi.fn().mockReturnThis(), + unlink: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]) + })), + + // Internal storage for testing purposes + _storage: redisStorage + }; +}; + +// Shared global Redis storage for all mock clients +const globalRedisStorage = createRedisStorage(); + +// Create mock client with shared storage +const createSharedMockRedisClient = () => { + return { + // Connection methods + on: vi.fn().mockReturnThis(), + connect: vi.fn().mockResolvedValue(undefined), + disconnect: vi.fn().mockResolvedValue(undefined), + quit: vi.fn().mockResolvedValue('OK'), + duplicate: vi.fn(function (this: any) { + return createSharedMockRedisClient(); + }), + + // Key-value operations with shared storage + get: vi.fn().mockImplementation((key: string) => Promise.resolve(globalRedisStorage.get(key))), + set: vi + .fn() + .mockImplementation((key: string, value: any, exMode?: string, exValue?: number) => + Promise.resolve(globalRedisStorage.set(key, value, exMode, exValue)) + ), + del: vi + .fn() + .mockImplementation((...keys: string[]) => Promise.resolve(globalRedisStorage.del(...keys))), + exists: vi + .fn() + .mockImplementation((...keys: string[]) => + Promise.resolve(globalRedisStorage.exists(...keys)) + ), + keys: vi.fn().mockResolvedValue([]), + scan: vi.fn().mockImplementation((cursor) => { + if (cursor === '0') return ['100', ['key1', 'key2']]; + if (cursor === '100') return ['0', ['key3']]; + return ['0', []]; + }), + + // Hash operations + hget: vi.fn().mockResolvedValue(null), + hset: vi.fn().mockResolvedValue(1), + hdel: vi.fn().mockResolvedValue(1), + hgetall: vi.fn().mockResolvedValue({}), + hmset: vi.fn().mockResolvedValue('OK'), + + // Expiry operations + expire: vi.fn().mockResolvedValue(1), + ttl: vi.fn().mockResolvedValue(-1), + expireat: vi.fn().mockResolvedValue(1), + + // Increment operations + incr: vi.fn().mockResolvedValue(1), + decr: vi.fn().mockResolvedValue(1), + incrby: vi.fn().mockResolvedValue(1), + decrby: vi.fn().mockResolvedValue(1), + incrbyfloat: vi.fn().mockResolvedValue(1), + + // Server commands + info: vi.fn().mockResolvedValue(''), + ping: vi.fn().mockResolvedValue('PONG'), + flushdb: vi.fn().mockImplementation(() => { + globalRedisStorage.clear(); + return Promise.resolve('OK'); + }), + + // List operations + lpush: vi.fn().mockResolvedValue(1), + rpush: vi.fn().mockResolvedValue(1), + lpop: vi.fn().mockResolvedValue(null), + rpop: vi.fn().mockResolvedValue(null), + llen: vi.fn().mockResolvedValue(0), + + // Set operations + sadd: vi.fn().mockResolvedValue(1), + srem: vi.fn().mockResolvedValue(1), + smembers: vi.fn().mockResolvedValue([]), + sismember: vi.fn().mockResolvedValue(0), + + // pipeline + pipeline: vi.fn(() => ({ + del: vi.fn().mockReturnThis(), + unlink: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]) + })), + + // Internal storage for testing purposes + _storage: globalRedisStorage + }; +}; // Mock Redis connections to prevent connection errors in tests vi.mock('@fastgpt/service/common/redis', async (importOriginal) => { @@ -75,20 +235,20 @@ vi.mock('@fastgpt/service/common/redis', async (importOriginal) => { return { ...actual, - newQueueRedisConnection: vi.fn(createMockRedisClient), - newWorkerRedisConnection: vi.fn(createMockRedisClient), + newQueueRedisConnection: vi.fn(createSharedMockRedisClient), + newWorkerRedisConnection: vi.fn(createSharedMockRedisClient), getGlobalRedisConnection: vi.fn(() => { if (!global.mockRedisClient) { - global.mockRedisClient = createMockRedisClient(); + global.mockRedisClient = createSharedMockRedisClient(); } return global.mockRedisClient; }), - initRedisClient: vi.fn().mockResolvedValue(createMockRedisClient()) + initRedisClient: vi.fn().mockResolvedValue(createSharedMockRedisClient()) }; }); // Initialize global.redisClient with mock before any module imports it // This prevents getGlobalRedisConnection() from creating a real Redis client if (!global.redisClient) { - global.redisClient = createMockRedisClient() as any; + global.redisClient = createSharedMockRedisClient() as any; } diff --git a/test/setup.ts b/test/setup.ts index 3b22055d11..e13bb879b8 100644 --- a/test/setup.ts +++ b/test/setup.ts @@ -49,8 +49,6 @@ beforeEach(async () => { onTestFinished(async () => { clean(); - // Wait for any ongoing transactions and operations to complete - await delay(500); // Ensure all sessions are closed before dropping database try { @@ -62,9 +60,6 @@ beforeEach(async () => { // Ignore errors during cleanup console.warn('Error during test cleanup:', error); } - - // Additional delay to prevent lock contention between tests - await delay(100); }); }); diff --git a/vitest.config.mts b/vitest.config.mts index 394609f668..d17f9476b4 100644 --- a/vitest.config.mts +++ b/vitest.config.mts @@ -20,8 +20,10 @@ export default defineConfig({ outputFile: 'test-results.json', setupFiles: 'test/setup.ts', globalSetup: 'test/globalSetup.ts', - // fileParallelism: false, - maxConcurrency: 5, + // File-level execution: serial (one file at a time to avoid MongoDB conflicts) + fileParallelism: false, + // Test-level execution within a file: parallel (up to 5 concurrent tests) + maxConcurrency: 10, pool: 'threads', include: [ 'test/test.ts', @@ -31,6 +33,7 @@ export default defineConfig({ 'projects/marketplace/test/**/*.test.ts' ], testTimeout: 20000, + hookTimeout: 30000, reporters: ['github-actions', 'default'] } });