diff --git a/.claude/design/outlink/wechat-polling-refactor.md b/.claude/design/outlink/wechat-polling-refactor.md new file mode 100644 index 0000000000..63421b4b91 --- /dev/null +++ b/.claude/design/outlink/wechat-polling-refactor.md @@ -0,0 +1,503 @@ +# 微信机器人轮询链路改造方案 + +## 背景 + +线上现象:微信机器人有时消息要 10+ 分钟才回复。 + +## 根因 + +`packages/service/support/outLink/wechat/mq.ts` 当前实现把"拉取消息"和"调用 agent 回复"放在**同一个 BullMQ job 里串行执行**,而且续链 `scheduleNextPoll` 放在回复发送之后: + +```ts +// 当前流程(串行) +Poll Job: + getUpdates // ~0-35s 长轮询 + → outlinkInvokeChat (slow LLM) // ~可能几分钟 + → client.sendMessage // ~几秒 + → scheduleNextPoll // ← 回复完才续链 +``` + +后果: + +1. **同一渠道同时只有 1 条流水线**。A 用户消息的 agent 回复要 5 分钟,B 用户的新消息就在 ilink 服务器缓冲区里等 5 分钟才被拉下来。 +2. **`lockDuration=120s` 可能触发 stalled 误判**。回复超过 2 分钟,BullMQ 认为 job stalled,重新入队给另一个 worker —— 同一批消息被处理两次,重复回复 + `syncBuf` 被旧响应覆盖导致消息回退。 +3. **续链无 singleton 保证**。`jobId` 用 `Date.now()` 每次都不同,BullMQ 无法去重。重启时 `resumeAllWechatPolling` 直接加一条,不检查 Redis 里残留的旧链 → 多条链并发轮询同一渠道,争抢 `syncBuf`。 +4. **无外层超时**。agent 卡死会无限期占住该 shareId 的轮询位。 + +## 关于 stalled 误判说明 + +BullMQ 的 worker 靠 **周期性续租 lock** 保活: + +``` +Worker 拿到 job → Redis 给 job 打一把锁 (lockDuration 有效期) +Worker 每 lockDuration/2 续一次锁 +另一个 Worker 每 stalledInterval 扫一次:锁过期的 job 视为 stalled → 重新入队 +``` + +- **续锁依赖 Node 事件循环**。只要 job 里是正常 `await`(fetch / LLM / sendMessage),无论跑多久都不会 stalled。 +- **什么时候真 stalled**:worker 进程 kill -9、OOM、CPU 密集同步代码阻塞事件循环 >lockDuration、Redis 断连续锁失败。 + +我们的应对: + +| 机制 | 作用 | +|---|---| +| `REPLY_LOCK_MS = 30min` + `stalledInterval = 60s` | 抗住 GC/网络抖动、长回复,理论上给足余量 | +| 幂等 `replyJobId = wechat-reply:{shareId}:{lastMsgId}` | 拦住队列层的重复入队 | +| `outlinkInvokeChat` 内部按 `messageId` 幂等(由被调用方保证) | 真发生 stalled retry / attempt 重试时,保证不重复回复 | + +## 目标 + +1. 消除 10+ 分钟消息延迟:拉取与回复解耦,回复慢不阻塞摄入 +2. 消除重复回复:续链幂等、stalled retry 不产生副作用 +3. 消除僵尸链:重启、重复扫码不产生并发轮询 + +## 改造后架构 + +``` +Poll Queue (wechatPoll) concurrency=20, lockDuration=60s + getUpdates → 写 syncBuf → dispatch reply jobs → scheduleNextPoll + ※ 每 shareId 仅 1 条链(幂等 jobId) + +Reply Queue (wechatReply) concurrency=30, lockDuration=30min + invokeChat → sendMessage + ※ 每 (shareId, lastMsgId) 仅 1 个 job(幂等 jobId) +``` + +## 改动文件清单 + +1. `packages/service/common/bullmq/index.ts` — 新增 `QueueNames.wechatReply` +2. `packages/service/support/outLink/wechat/type.ts` — 新增 `WechatReplyJobData` +3. `packages/service/support/outLink/wechat/messageParser.ts` — `msgIds[]` → `lastMsgId` +4. `packages/service/support/outLink/wechat/mq.ts` — 拆分 poll / reply worker + +--- + +## 一、`packages/service/common/bullmq/index.ts` + +```ts +export enum QueueNames { + // ...existing + wechatPoll = 'wechatPoll', + wechatReply = 'wechatReply' // 新增 +} +``` + +## 二、`packages/service/support/outLink/wechat/type.ts` + +```ts +export type WechatPollJobData = { + shareId: string; +}; + +// 新增 +export type WechatReplyJobData = { + shareId: string; + userId: string; + text: string; + contextToken: string; + lastMsgId: string; +}; +``` + +## 三、`packages/service/support/outLink/wechat/messageParser.ts` + +```ts +import type { WeixinMessage } from './ilinkClient'; + +const MSG_TYPE_USER = 1; +const MSG_ITEM_TEXT = 1; +const MSG_ITEM_VOICE = 3; + +export type ParsedMessageGroup = { + userId: string; + text: string; + contextToken: string; + lastMsgId: string; +}; + +export function extractTextFromItem(item: NonNullable[number]): string { + if (item.type === MSG_ITEM_TEXT && item.text_item?.text) { + const text = item.text_item.text; + if (item.ref_msg?.title) { + return `[引用: ${item.ref_msg.title}]\n${text}`; + } + return text; + } + if (item.type === MSG_ITEM_VOICE && item.voice_item?.text) { + return item.voice_item.text; + } + return ''; +} + +export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[] { + const groups = new Map(); + + for (const msg of msgs) { + if (msg.message_type !== MSG_TYPE_USER) continue; + + let text = ''; + for (const item of msg.item_list ?? []) { + const t = extractTextFromItem(item); + if (t) { + text = t; + break; + } + } + if (!text) continue; + + const userId = msg.from_user_id ?? 'unknown'; + const existing = groups.get(userId); + + if (existing) { + existing.text += '\n' + text; + existing.lastMsgId = msg.msgid; + if (msg.context_token) { + existing.contextToken = msg.context_token; + } + } else { + groups.set(userId, { + userId, + text, + contextToken: msg.context_token ?? '', + lastMsgId: msg.msgid + }); + } + } + + return Array.from(groups.values()); +} +``` + +## 四、`packages/service/support/outLink/wechat/mq.ts` + +```ts +import { getWorker, getQueue, QueueNames, type Job } from '../../../common/bullmq'; +import { getLogger, LogCategories } from '../../../common/logger'; +import { ILinkClient } from './ilinkClient'; +import type { WechatPollJobData, WechatReplyJobData } from './type'; +import type { OutLinkSchemaType, WechatAppType } from '@fastgpt/global/support/outLink/type'; +import { MongoOutLink } from '../../../support/outLink/schema'; +import { outlinkInvokeChat } from '../../../support/outLink/runtime/utils'; +import { setRedisCache, getRedisCache } from '../../../common/redis/cache'; +import { groupMessagesByUser } from './messageParser'; +import { getErrText } from '@fastgpt/global/common/error/utils'; + +const logger = getLogger(LogCategories.MODULE.OUTLINK.WECHAT); + +const POLL_JOB_NAME = 'wechatPublishPoll'; +const REPLY_JOB_NAME = 'wechatPublishReply'; + +const MAX_CONSECUTIVE_FAILURES = 5; +const FAILURE_BACKOFF_MS = 10_000; +const POLL_LOCK_MS = 60_000; +const REPLY_LOCK_MS = 30 * 60_000; +const REPLY_DEDUP_TTL = 24 * 60 * 60; + +/* ============ 幂等键 ============ */ + +const pollJobId = (shareId: string) => `wechat-poll:${shareId}`; +const replyJobId = (shareId: string, lastMsgId: string) => + `wechat-reply:${shareId}:${lastMsgId}`; +const replyDedupKey = (shareId: string, lastMsgId: string) => + `wechat:reply:done:${shareId}:${lastMsgId}`; +const failKey = (shareId: string) => `wechat:publish:failures:${shareId}`; + +/* ============ Poll Worker ============ */ + +async function processWechatPollJob(job: Job): Promise { + const { shareId } = job.data; + + const outLink = (await MongoOutLink.findOne({ shareId }).lean()) as unknown as + | OutLinkSchemaType + | null; + + if (!outLink || !outLink.app) { + logger.warn('OutLink not found, stop polling', { shareId }); + return; + } + + const app = outLink.app; + if (app.status !== 'online') { + logger.info('Channel not online, stop polling', { shareId, status: app.status }); + return; + } + if (!app.token) { + logger.warn('No token, stop polling', { shareId }); + return; + } + + const client = new ILinkClient(app.baseUrl, app.token); + + try { + const resp = await client.getUpdates(app.syncBuf || ''); + + const isError = + (resp.ret !== undefined && resp.ret !== 0) || + (resp.errcode !== undefined && resp.errcode !== 0); + + if (isError) { + logger.error('getUpdates API error', { + shareId, + ret: resp.ret, + errcode: resp.errcode, + errmsg: resp.errmsg + }); + + const failures = Number((await getRedisCache(failKey(shareId))) ?? '0') + 1; + await setRedisCache(failKey(shareId), String(failures), 300); + + if (failures >= MAX_CONSECUTIVE_FAILURES) { + await MongoOutLink.updateOne( + { shareId }, + { $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } } + ); + logger.error('Too many failures, stop polling', { shareId, failures }); + return; + } + + await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS); + return; + } + + await setRedisCache(failKey(shareId), '0', 300); + + // 1) 先分发回复任务(失败则 syncBuf 不推进,下次 poll 重拉;靠幂等键去重) + if (resp.msgs && resp.msgs.length > 0) { + const groups = groupMessagesByUser(resp.msgs); + logger.debug('Dispatch reply jobs', { + shareId, + totalMsgs: resp.msgs.length, + userGroups: groups.length + }); + + const replyQueue = getQueue(QueueNames.wechatReply); + await Promise.all( + groups.map((g) => + replyQueue.add( + REPLY_JOB_NAME, + { + shareId, + userId: g.userId, + text: g.text, + contextToken: g.contextToken, + lastMsgId: g.lastMsgId + }, + { + jobId: replyJobId(shareId, g.lastMsgId), + attempts: 2, + backoff: { type: 'fixed', delay: 2000 } + } + ) + ) + ); + } + + // 2) 全部入队成功后再推进 syncBuf + if (resp.get_updates_buf) { + await MongoOutLink.updateOne( + { shareId }, + { $set: { 'app.syncBuf': resp.get_updates_buf } } + ); + } + } catch (error) { + logger.error('Poll job error', { shareId, error: String(error) }); + } + + // 3) 立即续链 + await scheduleNextPoll(shareId); +} + +/* ============ Reply Worker ============ */ + +async function processWechatReplyJob(job: Job): Promise { + const { shareId, userId, text, contextToken, lastMsgId } = job.data; + + const dedupKey = replyDedupKey(shareId, lastMsgId); + if (await getRedisCache(dedupKey)) { + logger.info('Reply already processed, skip', { shareId, lastMsgId }); + return; + } + + const outLink = (await MongoOutLink.findOne({ shareId }).lean()) as unknown as + | OutLinkSchemaType + | null; + if (!outLink || !outLink.app || outLink.app.status !== 'online' || !outLink.app.token) { + logger.warn('Channel not available, drop reply', { shareId, lastMsgId }); + return; + } + + const app = outLink.app; + const client = new ILinkClient(app.baseUrl, app.token); + const chatId = `wechat_${shareId}_${userId}`; + + try { + await outlinkInvokeChat({ + outLinkConfig: outLink, + chatId, + query: [{ text: { content: text } }], + messageId: lastMsgId, + chatUserId: userId, + onReply: async (replyContent: string) => { + await client.sendMessage({ + to_user_id: userId, + text: replyContent, + context_token: contextToken + }); + } + }); + + await setRedisCache(dedupKey, '1', REPLY_DEDUP_TTL); + } catch (error) { + logger.error('Reply job failed', { + shareId, + userId, + lastMsgId, + attempt: job.attemptsMade + 1, + error: String(error) + }); + + // 仅最后一次 attempt 失败才发 fallback,避免重试期间重复发 + if (job.attemptsMade + 1 >= (job.opts.attempts ?? 1)) { + try { + const errorText = outLink.defaultResponse || `Run agent error: ${getErrText(error)}`; + await client.sendMessage({ + to_user_id: userId, + text: errorText, + context_token: contextToken + }); + await setRedisCache(dedupKey, '1', REPLY_DEDUP_TTL); + } catch { + // 忽略 + } + } + throw error; + } +} + +/* ============ 续链 ============ */ + +async function scheduleNextPoll(shareId: string, delayMs?: number): Promise { + const queue = getQueue(QueueNames.wechatPoll); + await queue.add( + POLL_JOB_NAME, + { shareId }, + { + jobId: pollJobId(shareId), + ...(delayMs ? { delay: delayMs } : {}), + removeOnComplete: true, + removeOnFail: { count: 50 } + } + ); +} + +/* ============ 对外接口 ============ */ + +export const initWechatPollWorker = async () => { + getWorker(QueueNames.wechatPoll, processWechatPollJob, { + concurrency: 20, + lockDuration: POLL_LOCK_MS, + stalledInterval: 30_000, + removeOnComplete: { count: 0 }, + removeOnFail: { count: 100, age: 7 * 24 * 60 * 60 } + }); + + getWorker(QueueNames.wechatReply, processWechatReplyJob, { + concurrency: 30, + lockDuration: REPLY_LOCK_MS, + stalledInterval: 60_000, + removeOnComplete: { count: 0 }, + removeOnFail: { count: 500, age: 7 * 24 * 60 * 60 } + }); + + await resumeAllWechatPolling(); + logger.info('Wechat poll/reply workers initialized'); +}; + +async function resumeAllWechatPolling(): Promise { + const onlineChannels = await MongoOutLink.find( + { type: 'wechat', 'app.status': 'online', 'app.token': { $exists: true, $ne: '' } }, + { shareId: 1 } + ).lean(); + + logger.info('Resuming wechat polling', { count: onlineChannels.length }); + for (const ch of onlineChannels) { + await scheduleNextPoll(ch.shareId); + } +} + +export const startWechatPolling = async (shareId: string): Promise => { + await scheduleNextPoll(shareId); + logger.info('Wechat polling started', { shareId }); +}; + +export const stopWechatPolling = async (shareId: string): Promise => { + await MongoOutLink.updateOne( + { shareId }, + { $set: { 'app.status': 'offline', 'app.token': '' } } + ); + + const queue = getQueue(QueueNames.wechatPoll); + const existing = await queue.getJob(pollJobId(shareId)); + if (existing) { + try { + await existing.remove(); + } catch { + // 忽略 + } + } + + logger.info('Wechat polling stopped', { shareId }); +}; +``` + +--- + +## 关键设计要点 + +| 问题 | 解决手段 | 代码位置 | +|---|---|---| +| 回复阻塞拉取 | 拆 `wechatReply` 队列,poll dispatch 后立即续链 | `mq.ts` processWechatPollJob | +| 续链重复 | `pollJobId = wechat-poll:{shareId}` 幂等 | `scheduleNextPoll` | +| 回复重复(入队重复) | `replyJobId = wechat-reply:{shareId}:{lastMsgId}` 幂等 | `processWechatReplyJob` | +| 回复重复(stalled retry / attempt 重试) | 依赖 `outlinkInvokeChat` 按 `messageId` 自身幂等 | — | +| enqueue 失败丢消息 | 先 dispatch reply 成功后才推进 `syncBuf`;at-least-once + 幂等键去重 | poll worker 1) → 2) 顺序 | +| 重试时错误提示被重复发 | 仅最后一次 attempt 失败才发 defaultResponse | `processWechatReplyJob` catch | +| 长回复被 stalled 误判 | `REPLY_LOCK_MS = 30min` 足够长 + `outlinkInvokeChat` 幂等兜底 | worker 配置 | +| `stopWechatPolling` 残留链 | 主动 `queue.getJob().remove()` | `stopWechatPolling` | +| 服务重启多实例 | `resumeAllWechatPolling` 用幂等 jobId,BullMQ 自然去重 | `resumeAllWechatPolling` | + +--- + +## 消息合并语义说明 + +- **同一 poll 周期内同一用户多条消息**:`groupMessagesByUser` 用 `Map` 聚合,`text` 用 `\n` 拼接,`contextToken` 取最后一条,`lastMsgId` 取最后一条。**1 次 `invokeChat`,1 次合并回复**。 +- **跨 poll 周期**:2 个独立 reply job,2 次回复,但共享 `chatId = wechat_{shareId}_{userId}` → 上下文连续。 +- **多个用户**:每个用户 1 个 reply job,并行处理。 + +--- + +## 落地 TODO + +- [ ] 1. `bullmq/index.ts` 加 `QueueNames.wechatReply` +- [ ] 2. `wechat/type.ts` 加 `WechatReplyJobData` +- [ ] 3. `wechat/messageParser.ts` 把 `msgIds[]` 改 `lastMsgId` +- [ ] 4. `wechat/mq.ts` 按上文全量替换 +- [ ] 5. `pnpm lint` 过 +- [ ] 6. 本地联调 + - 扫码登录,观察 poll job p99 <40s + - 模拟 agent 慢回复(sleep 5 分钟),验证期间新消息在 35s 内被拉取 + - kill worker 进程,验证重启后无重复回复 + - 同一用户 10s 内连发 3 条,验证合并成 1 次回复(同 poll 周期内) +- [ ] 7. 灰度发布,监控 + - `wechatPoll` waiting/active/failed + - `wechatReply` waiting/active/failed + - `wechat:reply:done:*` key 命中率 + - Mongo `app.syncBuf` 写入频率 + +--- + +## 风险 & 回滚 + +- **风险 1**:reply worker 堆积 → 加监控告警,必要时提高 `concurrency` +- **风险 2**:`REPLY_DEDUP_TTL=24h` 内如果 `lastMsgId` 被 ilink 服务端复用,会漏回复。需要确认 ilink 的 msgid 是否全局唯一 —— 从现网抓取样本验证 +- **回滚**:保留旧 `mq.ts` 为 `mq.legacy.ts`,通过环境变量 `USE_LEGACY_WECHAT_MQ=1` 切换 diff --git a/document/content/docs/self-host/upgrading/4-14/41414.mdx b/document/content/docs/self-host/upgrading/4-14/41414.mdx new file mode 100644 index 0000000000..90169518b3 --- /dev/null +++ b/document/content/docs/self-host/upgrading/4-14/41414.mdx @@ -0,0 +1,18 @@ +--- +title: 'V4.14.14' +description: 'FastGPT V4.14.14 更新说明' +--- + +## 升级指南 + +### 1. 更新镜像 tag + +- 更新 fastgpt-app(fastgpt 主服务) 镜像 tag: v4.14.14 + +## 🐛 修复 + + +## ⚙️ 优化 + +1. 个人微信发布渠道,优化轮询策略(拉取与回复解耦),避免数据量超大时出现阻塞。 +2. 新增环境变量 `WECHAT_CHANNEL_CONCURRENCY`(默认 1000)用于控制微信渠道 poll worker 并发数,建议 ≥ online channel 峰值。 \ No newline at end of file diff --git a/document/content/docs/toc.mdx b/document/content/docs/toc.mdx index 8f07c74636..1e5da97c82 100644 --- a/document/content/docs/toc.mdx +++ b/document/content/docs/toc.mdx @@ -117,6 +117,7 @@ description: FastGPT 文档目录 - [/docs/self-host/upgrading/4-14/41411](/docs/self-host/upgrading/4-14/41411) - [/docs/self-host/upgrading/4-14/41412](/docs/self-host/upgrading/4-14/41412) - [/docs/self-host/upgrading/4-14/41413](/docs/self-host/upgrading/4-14/41413) +- [/docs/self-host/upgrading/4-14/41414](/docs/self-host/upgrading/4-14/41414) - [/docs/self-host/upgrading/4-14/4142](/docs/self-host/upgrading/4-14/4142) - [/docs/self-host/upgrading/4-14/4143](/docs/self-host/upgrading/4-14/4143) - [/docs/self-host/upgrading/4-14/4144](/docs/self-host/upgrading/4-14/4144) diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index d042276cf6..bacf478659 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -230,6 +230,7 @@ "document/content/docs/self-host/upgrading/4-14/41412.mdx": "2026-04-21T23:04:26+08:00", "document/content/docs/self-host/upgrading/4-14/41413.en.mdx": "2026-04-21T23:04:26+08:00", "document/content/docs/self-host/upgrading/4-14/41413.mdx": "2026-04-21T23:04:26+08:00", + "document/content/docs/self-host/upgrading/4-14/41414.mdx": "2026-04-22T16:09:29+08:00", "document/content/docs/self-host/upgrading/4-14/4142.en.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4142.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/4-14/4143.en.mdx": "2026-03-03T17:39:47+08:00", @@ -250,6 +251,7 @@ "document/content/docs/self-host/upgrading/4-14/41481.mdx": "2026-03-09T17:39:53+08:00", "document/content/docs/self-host/upgrading/4-14/4149.en.mdx": "2026-03-23T12:17:04+08:00", "document/content/docs/self-host/upgrading/4-14/4149.mdx": "2026-04-07T21:01:52+08:00", + "document/content/docs/self-host/upgrading/4-15/4150.mdx": "2026-04-22T14:36:14+08:00", "document/content/docs/self-host/upgrading/outdated/40.en.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/outdated/40.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/outdated/41.en.mdx": "2026-03-03T17:39:47+08:00", @@ -391,7 +393,7 @@ "document/content/docs/self-host/upgrading/upgrade-intruction.en.mdx": "2026-03-03T17:39:47+08:00", "document/content/docs/self-host/upgrading/upgrade-intruction.mdx": "2026-04-20T13:51:34+08:00", "document/content/docs/toc.en.mdx": "2026-04-21T23:04:26+08:00", - "document/content/docs/toc.mdx": "2026-04-21T23:04:26+08:00", + "document/content/docs/toc.mdx": "2026-04-22T16:09:29+08:00", "document/content/docs/use-cases/app-cases/dalle3.en.mdx": "2026-02-26T22:14:30+08:00", "document/content/docs/use-cases/app-cases/dalle3.mdx": "2025-07-23T21:35:03+08:00", "document/content/docs/use-cases/app-cases/english_essay_correction_bot.en.mdx": "2026-02-26T22:14:30+08:00", diff --git a/packages/service/common/bullmq/index.ts b/packages/service/common/bullmq/index.ts index d7f3b669ee..93b5d929ff 100644 --- a/packages/service/common/bullmq/index.ts +++ b/packages/service/common/bullmq/index.ts @@ -36,6 +36,7 @@ export enum QueueNames { // Publish wechatPoll = 'wechatPoll', + wechatReply = 'wechatReply', /** @deprecated */ websiteSync = 'websiteSync' diff --git a/packages/service/env.ts b/packages/service/env.ts index 82ef1370da..e584c7ea55 100644 --- a/packages/service/env.ts +++ b/packages/service/env.ts @@ -113,6 +113,12 @@ export const env = createEnv({ /** Redis 内存水位检测缓存时长(毫秒),避免每个流请求都调用 INFO MEMORY */ STREAM_RESUME_REDIS_MEMORY_CHECK_INTERVAL_MS: IntSchema.positive().default(5000), + // ===== Wechat outLink ===== + /** 微信渠道 poll worker 并发数,需 ≥ online shareId 峰值,否则消息延迟会线性恶化 */ + WECHAT_CHANNEL_CONCURRENCY: NumSchema.int().positive().default(1000).meta({ + description: '微信渠道 poll worker 并发数' + }), + // Beta features // Whether the Skill feature is enabled (frontend entries + backend runtime) SHOW_SKILL: BoolSchema.default(false), diff --git a/packages/service/support/outLink/wechat/messageParser.ts b/packages/service/support/outLink/wechat/messageParser.ts index fffc17f058..a7c960ab40 100644 --- a/packages/service/support/outLink/wechat/messageParser.ts +++ b/packages/service/support/outLink/wechat/messageParser.ts @@ -8,7 +8,7 @@ export type ParsedMessageGroup = { userId: string; text: string; contextToken: string; - msgIds: string[]; + lastMsgId: string; }; export function extractTextFromItem(item: NonNullable[number]): string { @@ -42,11 +42,11 @@ export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[] if (!text) continue; const userId = msg.from_user_id ?? 'unknown'; - const existing = groups.get(userId); + if (existing) { existing.text += '\n' + text; - existing.msgIds.push(msg.msgid); + existing.lastMsgId = msg.msgid; if (msg.context_token) { existing.contextToken = msg.context_token; } @@ -55,7 +55,7 @@ export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[] userId, text, contextToken: msg.context_token ?? '', - msgIds: [msg.msgid] + lastMsgId: msg.msgid }); } } diff --git a/packages/service/support/outLink/wechat/mq.ts b/packages/service/support/outLink/wechat/mq.ts index 6ee6f5bc2b..82c2826c97 100644 --- a/packages/service/support/outLink/wechat/mq.ts +++ b/packages/service/support/outLink/wechat/mq.ts @@ -1,228 +1,309 @@ import { getWorker, getQueue, QueueNames, type Job } from '../../../common/bullmq'; import { getLogger, LogCategories } from '../../../common/logger'; import { ILinkClient } from './ilinkClient'; -import type { WechatPollJobData } from './type'; +import type { WechatPollJobData, WechatReplyJobData } from './type'; import type { OutLinkSchemaType, WechatAppType } from '@fastgpt/global/support/outLink/type'; import { MongoOutLink } from '../../../support/outLink/schema'; import { outlinkInvokeChat } from '../../../support/outLink/runtime/utils'; -import { setRedisCache, getRedisCache } from '../../../common/redis/cache'; -import { groupMessagesByUser, type ParsedMessageGroup } from './messageParser'; -import { getErrText } from '@fastgpt/global/common/error/utils'; +import { delRedisCache, getRedisCache, setRedisCache } from '../../../common/redis/cache'; +import { groupMessagesByUser } from './messageParser'; +import { env } from '../../../env'; +import { batchRun, retryFn } from '@fastgpt/global/common/system/utils'; const logger = getLogger(LogCategories.MODULE.OUTLINK.WECHAT); -const queueName = 'wechatPublishPoll'; +const POLL_JOB_NAME = 'wechatPublishPoll'; +const REPLY_JOB_NAME = 'wechatPublishReply'; + const MAX_CONSECUTIVE_FAILURES = 5; const FAILURE_BACKOFF_MS = 10_000; +const POLL_LOCK_MS = 120_000; +const REPLY_LOCK_MS = 30 * 60_000; +// Poll processor 硬超时:防止 worker 活着但 processor hang 导致确定 jobId 永远阻塞 +// 应 > LONG_POLL_TIMEOUT_MS(35s) + Mongo/Redis 操作余量 +const POLL_HARD_TIMEOUT_MS = 120_000; -/* ============ Worker 处理逻辑 ============ */ +/* ============ 幂等键 ============ */ +// 确定 jobId → BullMQ 自动保证同 shareId 同一时刻只存在一个 poll job(singleton) +// 续链在 worker 'completed' / 'failed' 事件里发起,此时 job 已从 Redis 删除,add 不会冲突 +const pollJobId = (shareId: string) => `wechat-poll:${shareId}`; +const replyJobId = (shareId: string, lastMsgId: string) => `wechat-reply:${shareId}:${lastMsgId}`; +const failKey = (shareId: string) => `wechat:publish:failures:${shareId}`; + +/* ============ Poll Worker 处理器 ============ */ +// 设计约定: +// - 正常完成 → return → 'completed' 事件 → 续链(立即) +// - 任何异常/停止条件 → throw → 'failed' 事件 → shouldContinuePolling 决定是否续链 +// - 外层 Promise.race 兜底:processor 最多 POLL_HARD_TIMEOUT_MS 就必须终止, +// 防止 worker 活着但 processor hang 导致确定 jobId 永远阻塞 async function processWechatPollJob(job: Job): Promise { - const { shareId } = job.data; - - // 1. 获取渠道配置 - const outLink = (await MongoOutLink.findOne({ - shareId - }).lean()) as unknown as OutLinkSchemaType; - if (!outLink || !outLink.app) { - logger.warn('OutLink not found, stop polling', { shareId }); - return; - } - - const app = outLink.app; - - // 2. 检查状态 - if (app.status !== 'online') { - logger.info('Channel not online, stop polling', { shareId, status: app.status }); - return; - } - - if (!app.token) { - logger.warn('No token, stop polling', { shareId }); - return; - } - - const client = new ILinkClient(app.baseUrl, app.token); - const failKey = `publish:wechat:failures:${shareId}`; - + let timer: NodeJS.Timeout | undefined; + const timeout = new Promise((_, reject) => { + timer = setTimeout( + () => reject(new Error(`Poll job hard timeout after ${POLL_HARD_TIMEOUT_MS}ms`)), + POLL_HARD_TIMEOUT_MS + ); + }); try { - // 3. 长轮询拉取消息 - const resp = await client.getUpdates(app.syncBuf || ''); - - // 检查 API 错误 - const isError = - (resp.ret !== undefined && resp.ret !== 0) || - (resp.errcode !== undefined && resp.errcode !== 0); - - if (isError) { - logger.error('getUpdates API error', { - shareId, - ret: resp.ret, - errcode: resp.errcode, - errmsg: resp.errmsg - }); - - const failures = Number((await getRedisCache(failKey)) ?? '0') + 1; - await setRedisCache(failKey, String(failures), 300); - - if (failures >= MAX_CONSECUTIVE_FAILURES) { - await MongoOutLink.updateOne( - { shareId }, - { $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } } - ); - logger.error('Too many failures, stop polling', { shareId, failures }); - return; - } - - // 延迟续链 - await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS); - return; - } - - // 清除失败计数 - await setRedisCache(failKey, '0', 300); - - // 4. 处理消息 - if (resp.msgs && resp.msgs.length > 0) { - const groups = groupMessagesByUser(resp.msgs); - - logger.debug('Processing messages', { - shareId, - totalMsgs: resp.msgs.length, - userGroups: groups.length - }); - - // 并发处理各用户分组 - await Promise.allSettled(groups.map((group) => processUserGroup(outLink, group))); - } - - // 5. 更新 buf - if (resp.get_updates_buf) { - await MongoOutLink.updateOne({ shareId }, { $set: { 'app.syncBuf': resp.get_updates_buf } }); - } - } catch (error) { - logger.error('Poll job error', { shareId, error: String(error) }); + await Promise.race([pollImpl(job), timeout]); + } finally { + if (timer) clearTimeout(timer); } - - // 6. 续链 - await scheduleNextPoll(shareId); } -/* ============ 处理单个用户分组 ============ */ +async function pollImpl(job: Job): Promise { + const { shareId } = job.data; + + const outLink = (await MongoOutLink.findOne({ + shareId + }).lean()) as unknown as OutLinkSchemaType | null; + + if (!outLink || !outLink.app) { + logger.warn('OutLink not found, stop polling', { shareId }); + throw new Error('OutLink not found'); + } -async function processUserGroup( - outLink: OutLinkSchemaType, - group: ParsedMessageGroup -): Promise { const app = outLink.app; - const chatId = `wechat_${outLink.shareId}_${group.userId}`; + if (app.status !== 'online') { + logger.info('Channel not online, stop polling', { shareId, status: app.status }); + throw new Error('Channel not online'); + } + if (!app.token) { + logger.warn('No token, stop polling', { shareId }); + throw new Error('No token'); + } const client = new ILinkClient(app.baseUrl, app.token); + const resp = await client.getUpdates(app.syncBuf || ''); + + const isError = + (resp.ret !== undefined && resp.ret !== 0) || + (resp.errcode !== undefined && resp.errcode !== 0); + + if (isError) { + logger.error('getUpdates API error', { + shareId, + ret: resp.ret, + errcode: resp.errcode, + errmsg: resp.errmsg + }); + + const failures = Number((await getRedisCache(failKey(shareId))) ?? '0') + 1; + await setRedisCache(failKey(shareId), String(failures), 300); + + if (failures >= MAX_CONSECUTIVE_FAILURES) { + await MongoOutLink.updateOne( + { shareId }, + { $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } } + ); + logger.error('Too many failures, stop polling', { shareId, failures }); + await delRedisCache(failKey(shareId)); + } + + // 抛错走 'failed' 事件 → 续链带退避 + throw new Error(`getUpdates API error: ret=${resp.ret} errcode=${resp.errcode}`); + } + + await setRedisCache(failKey(shareId), '0', 300); + + // 1) 先分发回复任务(失败则 syncBuf 不推进,下次 poll 重拉;靠 replyJobId 幂等去重) + if (resp.msgs && resp.msgs.length > 0) { + const groups = groupMessagesByUser(resp.msgs); + logger.debug('Dispatch reply jobs', { + shareId, + totalMsgs: resp.msgs.length, + userGroups: groups.length + }); + + const replyQueue = getQueue(QueueNames.wechatReply); + await Promise.all( + groups.map((g) => + replyQueue.add( + REPLY_JOB_NAME, + { + shareId, + userId: g.userId, + text: g.text, + contextToken: g.contextToken, + lastMsgId: g.lastMsgId + }, + { + jobId: replyJobId(shareId, g.lastMsgId), + backoff: { type: 'fixed', delay: 2000 } + } + ) + ) + ); + } + + // 2) 全部入队成功后再推进 syncBuf + if (resp.get_updates_buf) { + await MongoOutLink.updateOne({ shareId }, { $set: { 'app.syncBuf': resp.get_updates_buf } }); + } + + // 3) 不在这里续链,交给 worker 'completed' 事件处理器 +} + +/* ============ Reply Worker ============ */ +async function processWechatReplyJob(job: Job): Promise { + const { shareId, userId, text, contextToken, lastMsgId } = job.data; + + const outLink = (await MongoOutLink.findOne({ + shareId + }).lean()) as unknown as OutLinkSchemaType | null; + if (!outLink || !outLink.app || outLink.app.status !== 'online' || !outLink.app.token) { + logger.warn('Channel not available, drop reply', { shareId, lastMsgId }); + return; + } + + const app = outLink.app; + const client = new ILinkClient(app.baseUrl, app.token); + const chatId = `wechat_${shareId}_${userId}`; try { await outlinkInvokeChat({ outLinkConfig: outLink, chatId, - query: [{ text: { content: group.text } }], - messageId: group.msgIds[group.msgIds.length - 1], - chatUserId: group.userId, + query: [{ text: { content: text } }], + messageId: lastMsgId, + chatUserId: userId, onReply: async (replyContent: string) => { await client.sendMessage({ - to_user_id: group.userId, + to_user_id: userId, text: replyContent, - context_token: group.contextToken + context_token: contextToken }); } }); } catch (error) { - logger.error('Process user group failed', { - shareId: outLink.shareId, - userId: group.userId, + logger.error('Reply job failed', { + shareId, + userId, + lastMsgId, error: String(error) }); - - // 尝试发送错误提示 - try { - const errorText = outLink.defaultResponse || `Run agent error: ${getErrText(error)}`; - await client.sendMessage({ - to_user_id: group.userId, - text: errorText, - context_token: group.contextToken - }); - } catch { - // 忽略发送失败 - } + throw error; } } /* ============ 续链调度 ============ */ - +// 在 worker 'completed' / 'failed' 事件里调用,此时 job hash 已从 Redis 删除 +// startWechatPolling / resumeAllWechatPolling 也调用本函数: +// - 如果链正在运行(job 处于 active),add 会因 jobId 冲突被 BullMQ 静默忽略(幂等) +// - 如果链已死(无 job),add 正常入队 async function scheduleNextPoll(shareId: string, delayMs?: number): Promise { const queue = getQueue(QueueNames.wechatPoll); - await queue.add( - queueName, + POLL_JOB_NAME, { shareId }, { - jobId: `wechat-poll-${shareId}-${Date.now()}`, - ...(delayMs ? { delay: delayMs } : {}) + jobId: pollJobId(shareId), + ...(delayMs ? { delay: delayMs } : {}), + removeOnComplete: true, + removeOnFail: true } ); } +/** + * 判断渠道是否仍应继续轮询。 + * 用于 'completed' 事件处理器 —— 渠道已被停用时不再续链。 + */ +async function shouldContinuePolling(shareId: string): Promise { + const outLink = await MongoOutLink.findOne( + { + shareId, + type: 'wechat', + 'app.status': 'online', + 'app.token': { $exists: true, $ne: '' } + }, + { _id: 1 } + ).lean(); + return Boolean(outLink); +} + /* ============ 对外接口 ============ */ /** - * 初始化微信轮询 Worker + * 初始化微信轮询 / 回复 Worker */ export const initWechatPollWorker = async () => { - /** - * 服务启动时恢复所有 online 渠道的轮询 - */ - const resumeAllWechatPolling = async (): Promise => { - const onlineChannels = await MongoOutLink.find( - { - type: 'wechat', - 'app.status': 'online', - 'app.token': { $exists: true, $ne: '' } - }, - { shareId: 1 } - ).lean(); + const pollWorker = getWorker(QueueNames.wechatPoll, processWechatPollJob, { + // poll job 主要阻塞在 getUpdates 长轮询 I/O(~30s),不吃 CPU + concurrency: env.WECHAT_CHANNEL_CONCURRENCY, + lockDuration: POLL_LOCK_MS, // 120s 防止 job 被误判为 stalled + stalledInterval: 30_000, // 30s 检查下是否活跃 + removeOnComplete: { count: 0 }, + removeOnFail: { count: 0 } + }); - logger.info('Resuming wechat polling', { count: onlineChannels.length }); - - for (const ch of onlineChannels) { - await startWechatPolling(ch.shareId); + // 成功完成:续链(立即)。事件内 add 因 job 已被移除,不会冲突 + pollWorker.on('completed', async (job) => { + if (job.name !== POLL_JOB_NAME) return; + const { shareId } = job.data as WechatPollJobData; + try { + await scheduleNextPoll(shareId); + } catch (error) { + logger.error('Schedule next poll (completed) failed', { shareId, error: String(error) }); } - }; + }); - getWorker(QueueNames.wechatPoll, processWechatPollJob, { - concurrency: 10, - lockDuration: 120_000, + // 失败:续链(带退避)。渠道仍 online 时尝试恢复 + pollWorker.on('failed', async (job) => { + if (!job || job.name !== POLL_JOB_NAME) return; + const { shareId } = job.data as WechatPollJobData; + try { + await retryFn(async () => { + if (!(await shouldContinuePolling(shareId))) return; + await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS); + }); + } catch (error) { + logger.error('Schedule next poll (failed) failed', { shareId, error: String(error) }); + } + }); + + getWorker(QueueNames.wechatReply, processWechatReplyJob, { + concurrency: env.WECHAT_CHANNEL_CONCURRENCY, + lockDuration: REPLY_LOCK_MS, stalledInterval: 60_000, removeOnComplete: { count: 0 }, - removeOnFail: { count: 100, age: 7 * 24 * 60 * 60 } + removeOnFail: { count: 500, age: 7 * 24 * 60 * 60 } }); await resumeAllWechatPolling(); - - logger.info('Wechat poll worker initialized'); + logger.info('Wechat poll/reply workers initialized'); }; /** - * 启动某个渠道的轮询(扫码登录成功后调用) + * 服务启动时恢复所有 online 渠道的轮询 + */ +async function resumeAllWechatPolling(): Promise { + const onlineChannels = await MongoOutLink.find( + { + type: 'wechat', + 'app.status': 'online', + 'app.token': { $exists: true, $ne: '' } + }, + { shareId: 1 } + ).lean(); + + logger.info('Resuming wechat polling', { count: onlineChannels.length }); + + await batchRun( + onlineChannels, + async (ch) => { + await scheduleNextPoll(ch.shareId); + }, + 100 + ); +} + +/** + * 启动某个渠道的轮询 */ export const startWechatPolling = async (shareId: string): Promise => { - const queue = getQueue(QueueNames.wechatPoll); - - await queue.add( - queueName, - { shareId }, - { - jobId: `wechat-poll-${shareId}-${Date.now()}` - } - ); - + await scheduleNextPoll(shareId); logger.info('Wechat polling started', { shareId }); }; @@ -232,5 +313,11 @@ export const startWechatPolling = async (shareId: string): Promise => { export const stopWechatPolling = async (shareId: string): Promise => { await MongoOutLink.updateOne({ shareId }, { $set: { 'app.status': 'offline', 'app.token': '' } }); + // Delete job from queue + const queue = getQueue(QueueNames.wechatPoll); + await queue.remove(pollJobId(shareId)).catch((error) => { + logger.warn('Remove poll job failed (job may be active)', { shareId, error: String(error) }); + }); + logger.info('Wechat polling stopped', { shareId }); }; diff --git a/packages/service/support/outLink/wechat/type.ts b/packages/service/support/outLink/wechat/type.ts index 9e9475d391..6213cba92e 100644 --- a/packages/service/support/outLink/wechat/type.ts +++ b/packages/service/support/outLink/wechat/type.ts @@ -1,3 +1,11 @@ export type WechatPollJobData = { shareId: string; }; + +export type WechatReplyJobData = { + shareId: string; + userId: string; + text: string; + contextToken: string; + lastMsgId: string; +}; diff --git a/projects/app/.env.template b/projects/app/.env.template index cf6cc347b8..21efc20e01 100644 --- a/projects/app/.env.template +++ b/projects/app/.env.template @@ -160,6 +160,8 @@ CHECK_INTERNAL_IP=false APP_FOLDER_MAX_AMOUNT=1000 # 数据集文件夹最大数量 DATASET_FOLDER_MAX_AMOUNT=1000 +# 微信渠道 poll worker 并发数(默认 1000),需 ≥ online channel 数;channel 数超过该值时消息延迟会线性恶化 +WECHAT_CHANNEL_CONCURRENCY=1000 # ==================== 上传与账号策略 ==================== # 最大上传文件大小(MB) diff --git a/projects/app/src/pageComponents/app/detail/Edit/SimpleApp/index.tsx b/projects/app/src/pageComponents/app/detail/Edit/SimpleApp/index.tsx index b6b6d8033d..4d1ff0b6ba 100644 --- a/projects/app/src/pageComponents/app/detail/Edit/SimpleApp/index.tsx +++ b/projects/app/src/pageComponents/app/detail/Edit/SimpleApp/index.tsx @@ -10,10 +10,9 @@ import { useDebounceEffect, useMount } from 'ahooks'; import { v1Workflow2V2 } from '@/web/core/workflow/adapt'; import { defaultAppSelectFileConfig } from '@fastgpt/global/core/app/constants'; import { form2AppWorkflow, appWorkflow2Form } from './utils'; - -const Edit = dynamic(() => import('./Edit')); -const Logs = dynamic(() => import('../../Logs/index')); -const PublishChannel = dynamic(() => import('../../Publish')); +import PublishChannel from '../../Publish'; +import Logs from '../../Logs'; +import Edit from './Edit'; const SimpleEdit = () => { const { t } = useTranslation(); diff --git a/projects/app/src/pages/api/support/outLink/delete.ts b/projects/app/src/pages/api/support/outLink/delete.ts index e1c617d585..d5c36bbf2b 100644 --- a/projects/app/src/pages/api/support/outLink/delete.ts +++ b/projects/app/src/pages/api/support/outLink/delete.ts @@ -6,6 +6,7 @@ import { NextAPI } from '@/service/middleware/entry'; import { addAuditLog } from '@fastgpt/service/support/user/audit/util'; import { AuditEventEnum } from '@fastgpt/global/support/user/audit/constants'; import { getI18nAppType } from '@fastgpt/service/support/user/audit/util'; +import { stopWechatPolling } from '@fastgpt/service/support/outLink/wechat/mq'; export type OutLinkDeleteQuery = { id: string; @@ -25,7 +26,15 @@ async function handler( per: OwnerPermissionVal }); - await MongoOutLink.findByIdAndDelete(id); + const outlink = await MongoOutLink.findById(id); + + if (outlink && outlink.type === 'wechat') { + await stopWechatPolling(outlink.shareId).catch((error) => { + console.warn('Stop wechat polling failed', error); + }); + } + + await outlink?.deleteOne(); (async () => { addAuditLog({ diff --git a/test/cases/service/support/outLink/wechat/messageParser.test.ts b/test/cases/service/support/outLink/wechat/messageParser.test.ts index 2336d68e48..b1e663c6e3 100644 --- a/test/cases/service/support/outLink/wechat/messageParser.test.ts +++ b/test/cases/service/support/outLink/wechat/messageParser.test.ts @@ -61,7 +61,7 @@ describe('groupMessagesByUser', () => { userId: 'u1', text: 'default text', contextToken: 'ctx1', - msgIds: ['m1'] + lastMsgId: 'm1' }); }); @@ -84,7 +84,7 @@ describe('groupMessagesByUser', () => { expect(result).toHaveLength(1); expect(result[0].text).toBe('first\nsecond'); - expect(result[0].msgIds).toEqual(['m1', 'm2']); + expect(result[0].lastMsgId).toBe('m2'); expect(result[0].contextToken).toBe('ctx2'); // 取最后一条的 }); @@ -107,7 +107,7 @@ describe('groupMessagesByUser', () => { const result = groupMessagesByUser(msgs); expect(result).toHaveLength(1); - expect(result[0].msgIds).toEqual(['m2']); + expect(result[0].lastMsgId).toBe('m2'); }); it('should skip messages with no extractable text', () => { @@ -122,7 +122,7 @@ describe('groupMessagesByUser', () => { const result = groupMessagesByUser(msgs); expect(result).toHaveLength(1); - expect(result[0].msgIds).toEqual(['m2']); + expect(result[0].lastMsgId).toBe('m2'); }); it('should handle empty message list', () => {