mirror of
https://github.com/labring/FastGPT.git
synced 2026-05-07 01:02:55 +08:00
@@ -0,0 +1,503 @@
|
||||
# 微信机器人轮询链路改造方案
|
||||
|
||||
## 背景
|
||||
|
||||
线上现象:微信机器人有时消息要 10+ 分钟才回复。
|
||||
|
||||
## 根因
|
||||
|
||||
`packages/service/support/outLink/wechat/mq.ts` 当前实现把"拉取消息"和"调用 agent 回复"放在**同一个 BullMQ job 里串行执行**,而且续链 `scheduleNextPoll` 放在回复发送之后:
|
||||
|
||||
```ts
|
||||
// 当前流程(串行)
|
||||
Poll Job:
|
||||
getUpdates // ~0-35s 长轮询
|
||||
→ outlinkInvokeChat (slow LLM) // ~可能几分钟
|
||||
→ client.sendMessage // ~几秒
|
||||
→ scheduleNextPoll // ← 回复完才续链
|
||||
```
|
||||
|
||||
后果:
|
||||
|
||||
1. **同一渠道同时只有 1 条流水线**。A 用户消息的 agent 回复要 5 分钟,B 用户的新消息就在 ilink 服务器缓冲区里等 5 分钟才被拉下来。
|
||||
2. **`lockDuration=120s` 可能触发 stalled 误判**。回复超过 2 分钟,BullMQ 认为 job stalled,重新入队给另一个 worker —— 同一批消息被处理两次,重复回复 + `syncBuf` 被旧响应覆盖导致消息回退。
|
||||
3. **续链无 singleton 保证**。`jobId` 用 `Date.now()` 每次都不同,BullMQ 无法去重。重启时 `resumeAllWechatPolling` 直接加一条,不检查 Redis 里残留的旧链 → 多条链并发轮询同一渠道,争抢 `syncBuf`。
|
||||
4. **无外层超时**。agent 卡死会无限期占住该 shareId 的轮询位。
|
||||
|
||||
## 关于 stalled 误判说明
|
||||
|
||||
BullMQ 的 worker 靠 **周期性续租 lock** 保活:
|
||||
|
||||
```
|
||||
Worker 拿到 job → Redis 给 job 打一把锁 (lockDuration 有效期)
|
||||
Worker 每 lockDuration/2 续一次锁
|
||||
另一个 Worker 每 stalledInterval 扫一次:锁过期的 job 视为 stalled → 重新入队
|
||||
```
|
||||
|
||||
- **续锁依赖 Node 事件循环**。只要 job 里是正常 `await`(fetch / LLM / sendMessage),无论跑多久都不会 stalled。
|
||||
- **什么时候真 stalled**:worker 进程 kill -9、OOM、CPU 密集同步代码阻塞事件循环 >lockDuration、Redis 断连续锁失败。
|
||||
|
||||
我们的应对:
|
||||
|
||||
| 机制 | 作用 |
|
||||
|---|---|
|
||||
| `REPLY_LOCK_MS = 30min` + `stalledInterval = 60s` | 抗住 GC/网络抖动、长回复,理论上给足余量 |
|
||||
| 幂等 `replyJobId = wechat-reply:{shareId}:{lastMsgId}` | 拦住队列层的重复入队 |
|
||||
| `outlinkInvokeChat` 内部按 `messageId` 幂等(由被调用方保证) | 真发生 stalled retry / attempt 重试时,保证不重复回复 |
|
||||
|
||||
## 目标
|
||||
|
||||
1. 消除 10+ 分钟消息延迟:拉取与回复解耦,回复慢不阻塞摄入
|
||||
2. 消除重复回复:续链幂等、stalled retry 不产生副作用
|
||||
3. 消除僵尸链:重启、重复扫码不产生并发轮询
|
||||
|
||||
## 改造后架构
|
||||
|
||||
```
|
||||
Poll Queue (wechatPoll) concurrency=20, lockDuration=60s
|
||||
getUpdates → 写 syncBuf → dispatch reply jobs → scheduleNextPoll
|
||||
※ 每 shareId 仅 1 条链(幂等 jobId)
|
||||
|
||||
Reply Queue (wechatReply) concurrency=30, lockDuration=30min
|
||||
invokeChat → sendMessage
|
||||
※ 每 (shareId, lastMsgId) 仅 1 个 job(幂等 jobId)
|
||||
```
|
||||
|
||||
## 改动文件清单
|
||||
|
||||
1. `packages/service/common/bullmq/index.ts` — 新增 `QueueNames.wechatReply`
|
||||
2. `packages/service/support/outLink/wechat/type.ts` — 新增 `WechatReplyJobData`
|
||||
3. `packages/service/support/outLink/wechat/messageParser.ts` — `msgIds[]` → `lastMsgId`
|
||||
4. `packages/service/support/outLink/wechat/mq.ts` — 拆分 poll / reply worker
|
||||
|
||||
---
|
||||
|
||||
## 一、`packages/service/common/bullmq/index.ts`
|
||||
|
||||
```ts
|
||||
export enum QueueNames {
|
||||
// ...existing
|
||||
wechatPoll = 'wechatPoll',
|
||||
wechatReply = 'wechatReply' // 新增
|
||||
}
|
||||
```
|
||||
|
||||
## 二、`packages/service/support/outLink/wechat/type.ts`
|
||||
|
||||
```ts
|
||||
export type WechatPollJobData = {
|
||||
shareId: string;
|
||||
};
|
||||
|
||||
// 新增
|
||||
export type WechatReplyJobData = {
|
||||
shareId: string;
|
||||
userId: string;
|
||||
text: string;
|
||||
contextToken: string;
|
||||
lastMsgId: string;
|
||||
};
|
||||
```
|
||||
|
||||
## 三、`packages/service/support/outLink/wechat/messageParser.ts`
|
||||
|
||||
```ts
|
||||
import type { WeixinMessage } from './ilinkClient';
|
||||
|
||||
const MSG_TYPE_USER = 1;
|
||||
const MSG_ITEM_TEXT = 1;
|
||||
const MSG_ITEM_VOICE = 3;
|
||||
|
||||
export type ParsedMessageGroup = {
|
||||
userId: string;
|
||||
text: string;
|
||||
contextToken: string;
|
||||
lastMsgId: string;
|
||||
};
|
||||
|
||||
export function extractTextFromItem(item: NonNullable<WeixinMessage['item_list']>[number]): string {
|
||||
if (item.type === MSG_ITEM_TEXT && item.text_item?.text) {
|
||||
const text = item.text_item.text;
|
||||
if (item.ref_msg?.title) {
|
||||
return `[引用: ${item.ref_msg.title}]\n${text}`;
|
||||
}
|
||||
return text;
|
||||
}
|
||||
if (item.type === MSG_ITEM_VOICE && item.voice_item?.text) {
|
||||
return item.voice_item.text;
|
||||
}
|
||||
return '';
|
||||
}
|
||||
|
||||
export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[] {
|
||||
const groups = new Map<string, ParsedMessageGroup>();
|
||||
|
||||
for (const msg of msgs) {
|
||||
if (msg.message_type !== MSG_TYPE_USER) continue;
|
||||
|
||||
let text = '';
|
||||
for (const item of msg.item_list ?? []) {
|
||||
const t = extractTextFromItem(item);
|
||||
if (t) {
|
||||
text = t;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!text) continue;
|
||||
|
||||
const userId = msg.from_user_id ?? 'unknown';
|
||||
const existing = groups.get(userId);
|
||||
|
||||
if (existing) {
|
||||
existing.text += '\n' + text;
|
||||
existing.lastMsgId = msg.msgid;
|
||||
if (msg.context_token) {
|
||||
existing.contextToken = msg.context_token;
|
||||
}
|
||||
} else {
|
||||
groups.set(userId, {
|
||||
userId,
|
||||
text,
|
||||
contextToken: msg.context_token ?? '',
|
||||
lastMsgId: msg.msgid
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(groups.values());
|
||||
}
|
||||
```
|
||||
|
||||
## 四、`packages/service/support/outLink/wechat/mq.ts`
|
||||
|
||||
```ts
|
||||
import { getWorker, getQueue, QueueNames, type Job } from '../../../common/bullmq';
|
||||
import { getLogger, LogCategories } from '../../../common/logger';
|
||||
import { ILinkClient } from './ilinkClient';
|
||||
import type { WechatPollJobData, WechatReplyJobData } from './type';
|
||||
import type { OutLinkSchemaType, WechatAppType } from '@fastgpt/global/support/outLink/type';
|
||||
import { MongoOutLink } from '../../../support/outLink/schema';
|
||||
import { outlinkInvokeChat } from '../../../support/outLink/runtime/utils';
|
||||
import { setRedisCache, getRedisCache } from '../../../common/redis/cache';
|
||||
import { groupMessagesByUser } from './messageParser';
|
||||
import { getErrText } from '@fastgpt/global/common/error/utils';
|
||||
|
||||
const logger = getLogger(LogCategories.MODULE.OUTLINK.WECHAT);
|
||||
|
||||
const POLL_JOB_NAME = 'wechatPublishPoll';
|
||||
const REPLY_JOB_NAME = 'wechatPublishReply';
|
||||
|
||||
const MAX_CONSECUTIVE_FAILURES = 5;
|
||||
const FAILURE_BACKOFF_MS = 10_000;
|
||||
const POLL_LOCK_MS = 60_000;
|
||||
const REPLY_LOCK_MS = 30 * 60_000;
|
||||
const REPLY_DEDUP_TTL = 24 * 60 * 60;
|
||||
|
||||
/* ============ 幂等键 ============ */
|
||||
|
||||
const pollJobId = (shareId: string) => `wechat-poll:${shareId}`;
|
||||
const replyJobId = (shareId: string, lastMsgId: string) =>
|
||||
`wechat-reply:${shareId}:${lastMsgId}`;
|
||||
const replyDedupKey = (shareId: string, lastMsgId: string) =>
|
||||
`wechat:reply:done:${shareId}:${lastMsgId}`;
|
||||
const failKey = (shareId: string) => `wechat:publish:failures:${shareId}`;
|
||||
|
||||
/* ============ Poll Worker ============ */
|
||||
|
||||
async function processWechatPollJob(job: Job<WechatPollJobData>): Promise<void> {
|
||||
const { shareId } = job.data;
|
||||
|
||||
const outLink = (await MongoOutLink.findOne({ shareId }).lean()) as unknown as
|
||||
| OutLinkSchemaType<WechatAppType>
|
||||
| null;
|
||||
|
||||
if (!outLink || !outLink.app) {
|
||||
logger.warn('OutLink not found, stop polling', { shareId });
|
||||
return;
|
||||
}
|
||||
|
||||
const app = outLink.app;
|
||||
if (app.status !== 'online') {
|
||||
logger.info('Channel not online, stop polling', { shareId, status: app.status });
|
||||
return;
|
||||
}
|
||||
if (!app.token) {
|
||||
logger.warn('No token, stop polling', { shareId });
|
||||
return;
|
||||
}
|
||||
|
||||
const client = new ILinkClient(app.baseUrl, app.token);
|
||||
|
||||
try {
|
||||
const resp = await client.getUpdates(app.syncBuf || '');
|
||||
|
||||
const isError =
|
||||
(resp.ret !== undefined && resp.ret !== 0) ||
|
||||
(resp.errcode !== undefined && resp.errcode !== 0);
|
||||
|
||||
if (isError) {
|
||||
logger.error('getUpdates API error', {
|
||||
shareId,
|
||||
ret: resp.ret,
|
||||
errcode: resp.errcode,
|
||||
errmsg: resp.errmsg
|
||||
});
|
||||
|
||||
const failures = Number((await getRedisCache(failKey(shareId))) ?? '0') + 1;
|
||||
await setRedisCache(failKey(shareId), String(failures), 300);
|
||||
|
||||
if (failures >= MAX_CONSECUTIVE_FAILURES) {
|
||||
await MongoOutLink.updateOne(
|
||||
{ shareId },
|
||||
{ $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } }
|
||||
);
|
||||
logger.error('Too many failures, stop polling', { shareId, failures });
|
||||
return;
|
||||
}
|
||||
|
||||
await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
await setRedisCache(failKey(shareId), '0', 300);
|
||||
|
||||
// 1) 先分发回复任务(失败则 syncBuf 不推进,下次 poll 重拉;靠幂等键去重)
|
||||
if (resp.msgs && resp.msgs.length > 0) {
|
||||
const groups = groupMessagesByUser(resp.msgs);
|
||||
logger.debug('Dispatch reply jobs', {
|
||||
shareId,
|
||||
totalMsgs: resp.msgs.length,
|
||||
userGroups: groups.length
|
||||
});
|
||||
|
||||
const replyQueue = getQueue<WechatReplyJobData>(QueueNames.wechatReply);
|
||||
await Promise.all(
|
||||
groups.map((g) =>
|
||||
replyQueue.add(
|
||||
REPLY_JOB_NAME,
|
||||
{
|
||||
shareId,
|
||||
userId: g.userId,
|
||||
text: g.text,
|
||||
contextToken: g.contextToken,
|
||||
lastMsgId: g.lastMsgId
|
||||
},
|
||||
{
|
||||
jobId: replyJobId(shareId, g.lastMsgId),
|
||||
attempts: 2,
|
||||
backoff: { type: 'fixed', delay: 2000 }
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// 2) 全部入队成功后再推进 syncBuf
|
||||
if (resp.get_updates_buf) {
|
||||
await MongoOutLink.updateOne(
|
||||
{ shareId },
|
||||
{ $set: { 'app.syncBuf': resp.get_updates_buf } }
|
||||
);
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Poll job error', { shareId, error: String(error) });
|
||||
}
|
||||
|
||||
// 3) 立即续链
|
||||
await scheduleNextPoll(shareId);
|
||||
}
|
||||
|
||||
/* ============ Reply Worker ============ */
|
||||
|
||||
async function processWechatReplyJob(job: Job<WechatReplyJobData>): Promise<void> {
|
||||
const { shareId, userId, text, contextToken, lastMsgId } = job.data;
|
||||
|
||||
const dedupKey = replyDedupKey(shareId, lastMsgId);
|
||||
if (await getRedisCache(dedupKey)) {
|
||||
logger.info('Reply already processed, skip', { shareId, lastMsgId });
|
||||
return;
|
||||
}
|
||||
|
||||
const outLink = (await MongoOutLink.findOne({ shareId }).lean()) as unknown as
|
||||
| OutLinkSchemaType<WechatAppType>
|
||||
| null;
|
||||
if (!outLink || !outLink.app || outLink.app.status !== 'online' || !outLink.app.token) {
|
||||
logger.warn('Channel not available, drop reply', { shareId, lastMsgId });
|
||||
return;
|
||||
}
|
||||
|
||||
const app = outLink.app;
|
||||
const client = new ILinkClient(app.baseUrl, app.token);
|
||||
const chatId = `wechat_${shareId}_${userId}`;
|
||||
|
||||
try {
|
||||
await outlinkInvokeChat({
|
||||
outLinkConfig: outLink,
|
||||
chatId,
|
||||
query: [{ text: { content: text } }],
|
||||
messageId: lastMsgId,
|
||||
chatUserId: userId,
|
||||
onReply: async (replyContent: string) => {
|
||||
await client.sendMessage({
|
||||
to_user_id: userId,
|
||||
text: replyContent,
|
||||
context_token: contextToken
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
await setRedisCache(dedupKey, '1', REPLY_DEDUP_TTL);
|
||||
} catch (error) {
|
||||
logger.error('Reply job failed', {
|
||||
shareId,
|
||||
userId,
|
||||
lastMsgId,
|
||||
attempt: job.attemptsMade + 1,
|
||||
error: String(error)
|
||||
});
|
||||
|
||||
// 仅最后一次 attempt 失败才发 fallback,避免重试期间重复发
|
||||
if (job.attemptsMade + 1 >= (job.opts.attempts ?? 1)) {
|
||||
try {
|
||||
const errorText = outLink.defaultResponse || `Run agent error: ${getErrText(error)}`;
|
||||
await client.sendMessage({
|
||||
to_user_id: userId,
|
||||
text: errorText,
|
||||
context_token: contextToken
|
||||
});
|
||||
await setRedisCache(dedupKey, '1', REPLY_DEDUP_TTL);
|
||||
} catch {
|
||||
// 忽略
|
||||
}
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/* ============ 续链 ============ */
|
||||
|
||||
async function scheduleNextPoll(shareId: string, delayMs?: number): Promise<void> {
|
||||
const queue = getQueue<WechatPollJobData>(QueueNames.wechatPoll);
|
||||
await queue.add(
|
||||
POLL_JOB_NAME,
|
||||
{ shareId },
|
||||
{
|
||||
jobId: pollJobId(shareId),
|
||||
...(delayMs ? { delay: delayMs } : {}),
|
||||
removeOnComplete: true,
|
||||
removeOnFail: { count: 50 }
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/* ============ 对外接口 ============ */
|
||||
|
||||
export const initWechatPollWorker = async () => {
|
||||
getWorker<WechatPollJobData>(QueueNames.wechatPoll, processWechatPollJob, {
|
||||
concurrency: 20,
|
||||
lockDuration: POLL_LOCK_MS,
|
||||
stalledInterval: 30_000,
|
||||
removeOnComplete: { count: 0 },
|
||||
removeOnFail: { count: 100, age: 7 * 24 * 60 * 60 }
|
||||
});
|
||||
|
||||
getWorker<WechatReplyJobData>(QueueNames.wechatReply, processWechatReplyJob, {
|
||||
concurrency: 30,
|
||||
lockDuration: REPLY_LOCK_MS,
|
||||
stalledInterval: 60_000,
|
||||
removeOnComplete: { count: 0 },
|
||||
removeOnFail: { count: 500, age: 7 * 24 * 60 * 60 }
|
||||
});
|
||||
|
||||
await resumeAllWechatPolling();
|
||||
logger.info('Wechat poll/reply workers initialized');
|
||||
};
|
||||
|
||||
async function resumeAllWechatPolling(): Promise<void> {
|
||||
const onlineChannels = await MongoOutLink.find(
|
||||
{ type: 'wechat', 'app.status': 'online', 'app.token': { $exists: true, $ne: '' } },
|
||||
{ shareId: 1 }
|
||||
).lean();
|
||||
|
||||
logger.info('Resuming wechat polling', { count: onlineChannels.length });
|
||||
for (const ch of onlineChannels) {
|
||||
await scheduleNextPoll(ch.shareId);
|
||||
}
|
||||
}
|
||||
|
||||
export const startWechatPolling = async (shareId: string): Promise<void> => {
|
||||
await scheduleNextPoll(shareId);
|
||||
logger.info('Wechat polling started', { shareId });
|
||||
};
|
||||
|
||||
export const stopWechatPolling = async (shareId: string): Promise<void> => {
|
||||
await MongoOutLink.updateOne(
|
||||
{ shareId },
|
||||
{ $set: { 'app.status': 'offline', 'app.token': '' } }
|
||||
);
|
||||
|
||||
const queue = getQueue<WechatPollJobData>(QueueNames.wechatPoll);
|
||||
const existing = await queue.getJob(pollJobId(shareId));
|
||||
if (existing) {
|
||||
try {
|
||||
await existing.remove();
|
||||
} catch {
|
||||
// 忽略
|
||||
}
|
||||
}
|
||||
|
||||
logger.info('Wechat polling stopped', { shareId });
|
||||
};
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## 关键设计要点
|
||||
|
||||
| 问题 | 解决手段 | 代码位置 |
|
||||
|---|---|---|
|
||||
| 回复阻塞拉取 | 拆 `wechatReply` 队列,poll dispatch 后立即续链 | `mq.ts` processWechatPollJob |
|
||||
| 续链重复 | `pollJobId = wechat-poll:{shareId}` 幂等 | `scheduleNextPoll` |
|
||||
| 回复重复(入队重复) | `replyJobId = wechat-reply:{shareId}:{lastMsgId}` 幂等 | `processWechatReplyJob` |
|
||||
| 回复重复(stalled retry / attempt 重试) | 依赖 `outlinkInvokeChat` 按 `messageId` 自身幂等 | — |
|
||||
| enqueue 失败丢消息 | 先 dispatch reply 成功后才推进 `syncBuf`;at-least-once + 幂等键去重 | poll worker 1) → 2) 顺序 |
|
||||
| 重试时错误提示被重复发 | 仅最后一次 attempt 失败才发 defaultResponse | `processWechatReplyJob` catch |
|
||||
| 长回复被 stalled 误判 | `REPLY_LOCK_MS = 30min` 足够长 + `outlinkInvokeChat` 幂等兜底 | worker 配置 |
|
||||
| `stopWechatPolling` 残留链 | 主动 `queue.getJob().remove()` | `stopWechatPolling` |
|
||||
| 服务重启多实例 | `resumeAllWechatPolling` 用幂等 jobId,BullMQ 自然去重 | `resumeAllWechatPolling` |
|
||||
|
||||
---
|
||||
|
||||
## 消息合并语义说明
|
||||
|
||||
- **同一 poll 周期内同一用户多条消息**:`groupMessagesByUser` 用 `Map<userId, Group>` 聚合,`text` 用 `\n` 拼接,`contextToken` 取最后一条,`lastMsgId` 取最后一条。**1 次 `invokeChat`,1 次合并回复**。
|
||||
- **跨 poll 周期**:2 个独立 reply job,2 次回复,但共享 `chatId = wechat_{shareId}_{userId}` → 上下文连续。
|
||||
- **多个用户**:每个用户 1 个 reply job,并行处理。
|
||||
|
||||
---
|
||||
|
||||
## 落地 TODO
|
||||
|
||||
- [ ] 1. `bullmq/index.ts` 加 `QueueNames.wechatReply`
|
||||
- [ ] 2. `wechat/type.ts` 加 `WechatReplyJobData`
|
||||
- [ ] 3. `wechat/messageParser.ts` 把 `msgIds[]` 改 `lastMsgId`
|
||||
- [ ] 4. `wechat/mq.ts` 按上文全量替换
|
||||
- [ ] 5. `pnpm lint` 过
|
||||
- [ ] 6. 本地联调
|
||||
- 扫码登录,观察 poll job p99 <40s
|
||||
- 模拟 agent 慢回复(sleep 5 分钟),验证期间新消息在 35s 内被拉取
|
||||
- kill worker 进程,验证重启后无重复回复
|
||||
- 同一用户 10s 内连发 3 条,验证合并成 1 次回复(同 poll 周期内)
|
||||
- [ ] 7. 灰度发布,监控
|
||||
- `wechatPoll` waiting/active/failed
|
||||
- `wechatReply` waiting/active/failed
|
||||
- `wechat:reply:done:*` key 命中率
|
||||
- Mongo `app.syncBuf` 写入频率
|
||||
|
||||
---
|
||||
|
||||
## 风险 & 回滚
|
||||
|
||||
- **风险 1**:reply worker 堆积 → 加监控告警,必要时提高 `concurrency`
|
||||
- **风险 2**:`REPLY_DEDUP_TTL=24h` 内如果 `lastMsgId` 被 ilink 服务端复用,会漏回复。需要确认 ilink 的 msgid 是否全局唯一 —— 从现网抓取样本验证
|
||||
- **回滚**:保留旧 `mq.ts` 为 `mq.legacy.ts`,通过环境变量 `USE_LEGACY_WECHAT_MQ=1` 切换
|
||||
@@ -0,0 +1,18 @@
|
||||
---
|
||||
title: 'V4.14.14'
|
||||
description: 'FastGPT V4.14.14 更新说明'
|
||||
---
|
||||
|
||||
## 升级指南
|
||||
|
||||
### 1. 更新镜像 tag
|
||||
|
||||
- 更新 fastgpt-app(fastgpt 主服务) 镜像 tag: v4.14.14
|
||||
|
||||
## 🐛 修复
|
||||
|
||||
|
||||
## ⚙️ 优化
|
||||
|
||||
1. 个人微信发布渠道,优化轮询策略(拉取与回复解耦),避免数据量超大时出现阻塞。
|
||||
2. 新增环境变量 `WECHAT_CHANNEL_CONCURRENCY`(默认 1000)用于控制微信渠道 poll worker 并发数,建议 ≥ online channel 峰值。
|
||||
@@ -117,6 +117,7 @@ description: FastGPT 文档目录
|
||||
- [/docs/self-host/upgrading/4-14/41411](/docs/self-host/upgrading/4-14/41411)
|
||||
- [/docs/self-host/upgrading/4-14/41412](/docs/self-host/upgrading/4-14/41412)
|
||||
- [/docs/self-host/upgrading/4-14/41413](/docs/self-host/upgrading/4-14/41413)
|
||||
- [/docs/self-host/upgrading/4-14/41414](/docs/self-host/upgrading/4-14/41414)
|
||||
- [/docs/self-host/upgrading/4-14/4142](/docs/self-host/upgrading/4-14/4142)
|
||||
- [/docs/self-host/upgrading/4-14/4143](/docs/self-host/upgrading/4-14/4143)
|
||||
- [/docs/self-host/upgrading/4-14/4144](/docs/self-host/upgrading/4-14/4144)
|
||||
|
||||
@@ -230,6 +230,7 @@
|
||||
"document/content/docs/self-host/upgrading/4-14/41412.mdx": "2026-04-21T23:04:26+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/41413.en.mdx": "2026-04-21T23:04:26+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/41413.mdx": "2026-04-21T23:04:26+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/41414.mdx": "2026-04-22T16:09:29+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/4142.en.mdx": "2026-03-03T17:39:47+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/4142.mdx": "2026-03-03T17:39:47+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/4143.en.mdx": "2026-03-03T17:39:47+08:00",
|
||||
@@ -250,6 +251,7 @@
|
||||
"document/content/docs/self-host/upgrading/4-14/41481.mdx": "2026-03-09T17:39:53+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/4149.en.mdx": "2026-03-23T12:17:04+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-14/4149.mdx": "2026-04-07T21:01:52+08:00",
|
||||
"document/content/docs/self-host/upgrading/4-15/4150.mdx": "2026-04-22T14:36:14+08:00",
|
||||
"document/content/docs/self-host/upgrading/outdated/40.en.mdx": "2026-03-03T17:39:47+08:00",
|
||||
"document/content/docs/self-host/upgrading/outdated/40.mdx": "2026-03-03T17:39:47+08:00",
|
||||
"document/content/docs/self-host/upgrading/outdated/41.en.mdx": "2026-03-03T17:39:47+08:00",
|
||||
@@ -391,7 +393,7 @@
|
||||
"document/content/docs/self-host/upgrading/upgrade-intruction.en.mdx": "2026-03-03T17:39:47+08:00",
|
||||
"document/content/docs/self-host/upgrading/upgrade-intruction.mdx": "2026-04-20T13:51:34+08:00",
|
||||
"document/content/docs/toc.en.mdx": "2026-04-21T23:04:26+08:00",
|
||||
"document/content/docs/toc.mdx": "2026-04-21T23:04:26+08:00",
|
||||
"document/content/docs/toc.mdx": "2026-04-22T16:09:29+08:00",
|
||||
"document/content/docs/use-cases/app-cases/dalle3.en.mdx": "2026-02-26T22:14:30+08:00",
|
||||
"document/content/docs/use-cases/app-cases/dalle3.mdx": "2025-07-23T21:35:03+08:00",
|
||||
"document/content/docs/use-cases/app-cases/english_essay_correction_bot.en.mdx": "2026-02-26T22:14:30+08:00",
|
||||
|
||||
@@ -36,6 +36,7 @@ export enum QueueNames {
|
||||
|
||||
// Publish
|
||||
wechatPoll = 'wechatPoll',
|
||||
wechatReply = 'wechatReply',
|
||||
|
||||
/** @deprecated */
|
||||
websiteSync = 'websiteSync'
|
||||
|
||||
@@ -113,6 +113,12 @@ export const env = createEnv({
|
||||
/** Redis 内存水位检测缓存时长(毫秒),避免每个流请求都调用 INFO MEMORY */
|
||||
STREAM_RESUME_REDIS_MEMORY_CHECK_INTERVAL_MS: IntSchema.positive().default(5000),
|
||||
|
||||
// ===== Wechat outLink =====
|
||||
/** 微信渠道 poll worker 并发数,需 ≥ online shareId 峰值,否则消息延迟会线性恶化 */
|
||||
WECHAT_CHANNEL_CONCURRENCY: NumSchema.int().positive().default(1000).meta({
|
||||
description: '微信渠道 poll worker 并发数'
|
||||
}),
|
||||
|
||||
// Beta features
|
||||
// Whether the Skill feature is enabled (frontend entries + backend runtime)
|
||||
SHOW_SKILL: BoolSchema.default(false),
|
||||
|
||||
@@ -8,7 +8,7 @@ export type ParsedMessageGroup = {
|
||||
userId: string;
|
||||
text: string;
|
||||
contextToken: string;
|
||||
msgIds: string[];
|
||||
lastMsgId: string;
|
||||
};
|
||||
|
||||
export function extractTextFromItem(item: NonNullable<WeixinMessage['item_list']>[number]): string {
|
||||
@@ -42,11 +42,11 @@ export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[]
|
||||
if (!text) continue;
|
||||
|
||||
const userId = msg.from_user_id ?? 'unknown';
|
||||
|
||||
const existing = groups.get(userId);
|
||||
|
||||
if (existing) {
|
||||
existing.text += '\n' + text;
|
||||
existing.msgIds.push(msg.msgid);
|
||||
existing.lastMsgId = msg.msgid;
|
||||
if (msg.context_token) {
|
||||
existing.contextToken = msg.context_token;
|
||||
}
|
||||
@@ -55,7 +55,7 @@ export function groupMessagesByUser(msgs: WeixinMessage[]): ParsedMessageGroup[]
|
||||
userId,
|
||||
text,
|
||||
contextToken: msg.context_token ?? '',
|
||||
msgIds: [msg.msgid]
|
||||
lastMsgId: msg.msgid
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,228 +1,309 @@
|
||||
import { getWorker, getQueue, QueueNames, type Job } from '../../../common/bullmq';
|
||||
import { getLogger, LogCategories } from '../../../common/logger';
|
||||
import { ILinkClient } from './ilinkClient';
|
||||
import type { WechatPollJobData } from './type';
|
||||
import type { WechatPollJobData, WechatReplyJobData } from './type';
|
||||
import type { OutLinkSchemaType, WechatAppType } from '@fastgpt/global/support/outLink/type';
|
||||
import { MongoOutLink } from '../../../support/outLink/schema';
|
||||
import { outlinkInvokeChat } from '../../../support/outLink/runtime/utils';
|
||||
import { setRedisCache, getRedisCache } from '../../../common/redis/cache';
|
||||
import { groupMessagesByUser, type ParsedMessageGroup } from './messageParser';
|
||||
import { getErrText } from '@fastgpt/global/common/error/utils';
|
||||
import { delRedisCache, getRedisCache, setRedisCache } from '../../../common/redis/cache';
|
||||
import { groupMessagesByUser } from './messageParser';
|
||||
import { env } from '../../../env';
|
||||
import { batchRun, retryFn } from '@fastgpt/global/common/system/utils';
|
||||
|
||||
const logger = getLogger(LogCategories.MODULE.OUTLINK.WECHAT);
|
||||
|
||||
const queueName = 'wechatPublishPoll';
|
||||
const POLL_JOB_NAME = 'wechatPublishPoll';
|
||||
const REPLY_JOB_NAME = 'wechatPublishReply';
|
||||
|
||||
const MAX_CONSECUTIVE_FAILURES = 5;
|
||||
const FAILURE_BACKOFF_MS = 10_000;
|
||||
const POLL_LOCK_MS = 120_000;
|
||||
const REPLY_LOCK_MS = 30 * 60_000;
|
||||
// Poll processor 硬超时:防止 worker 活着但 processor hang 导致确定 jobId 永远阻塞
|
||||
// 应 > LONG_POLL_TIMEOUT_MS(35s) + Mongo/Redis 操作余量
|
||||
const POLL_HARD_TIMEOUT_MS = 120_000;
|
||||
|
||||
/* ============ Worker 处理逻辑 ============ */
|
||||
/* ============ 幂等键 ============ */
|
||||
|
||||
// 确定 jobId → BullMQ 自动保证同 shareId 同一时刻只存在一个 poll job(singleton)
|
||||
// 续链在 worker 'completed' / 'failed' 事件里发起,此时 job 已从 Redis 删除,add 不会冲突
|
||||
const pollJobId = (shareId: string) => `wechat-poll:${shareId}`;
|
||||
const replyJobId = (shareId: string, lastMsgId: string) => `wechat-reply:${shareId}:${lastMsgId}`;
|
||||
const failKey = (shareId: string) => `wechat:publish:failures:${shareId}`;
|
||||
|
||||
/* ============ Poll Worker 处理器 ============ */
|
||||
// 设计约定:
|
||||
// - 正常完成 → return → 'completed' 事件 → 续链(立即)
|
||||
// - 任何异常/停止条件 → throw → 'failed' 事件 → shouldContinuePolling 决定是否续链
|
||||
// - 外层 Promise.race 兜底:processor 最多 POLL_HARD_TIMEOUT_MS 就必须终止,
|
||||
// 防止 worker 活着但 processor hang 导致确定 jobId 永远阻塞
|
||||
async function processWechatPollJob(job: Job<WechatPollJobData>): Promise<void> {
|
||||
const { shareId } = job.data;
|
||||
|
||||
// 1. 获取渠道配置
|
||||
const outLink = (await MongoOutLink.findOne({
|
||||
shareId
|
||||
}).lean()) as unknown as OutLinkSchemaType<WechatAppType>;
|
||||
if (!outLink || !outLink.app) {
|
||||
logger.warn('OutLink not found, stop polling', { shareId });
|
||||
return;
|
||||
}
|
||||
|
||||
const app = outLink.app;
|
||||
|
||||
// 2. 检查状态
|
||||
if (app.status !== 'online') {
|
||||
logger.info('Channel not online, stop polling', { shareId, status: app.status });
|
||||
return;
|
||||
}
|
||||
|
||||
if (!app.token) {
|
||||
logger.warn('No token, stop polling', { shareId });
|
||||
return;
|
||||
}
|
||||
|
||||
const client = new ILinkClient(app.baseUrl, app.token);
|
||||
const failKey = `publish:wechat:failures:${shareId}`;
|
||||
|
||||
let timer: NodeJS.Timeout | undefined;
|
||||
const timeout = new Promise<never>((_, reject) => {
|
||||
timer = setTimeout(
|
||||
() => reject(new Error(`Poll job hard timeout after ${POLL_HARD_TIMEOUT_MS}ms`)),
|
||||
POLL_HARD_TIMEOUT_MS
|
||||
);
|
||||
});
|
||||
try {
|
||||
// 3. 长轮询拉取消息
|
||||
const resp = await client.getUpdates(app.syncBuf || '');
|
||||
|
||||
// 检查 API 错误
|
||||
const isError =
|
||||
(resp.ret !== undefined && resp.ret !== 0) ||
|
||||
(resp.errcode !== undefined && resp.errcode !== 0);
|
||||
|
||||
if (isError) {
|
||||
logger.error('getUpdates API error', {
|
||||
shareId,
|
||||
ret: resp.ret,
|
||||
errcode: resp.errcode,
|
||||
errmsg: resp.errmsg
|
||||
});
|
||||
|
||||
const failures = Number((await getRedisCache(failKey)) ?? '0') + 1;
|
||||
await setRedisCache(failKey, String(failures), 300);
|
||||
|
||||
if (failures >= MAX_CONSECUTIVE_FAILURES) {
|
||||
await MongoOutLink.updateOne(
|
||||
{ shareId },
|
||||
{ $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } }
|
||||
);
|
||||
logger.error('Too many failures, stop polling', { shareId, failures });
|
||||
return;
|
||||
}
|
||||
|
||||
// 延迟续链
|
||||
await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS);
|
||||
return;
|
||||
}
|
||||
|
||||
// 清除失败计数
|
||||
await setRedisCache(failKey, '0', 300);
|
||||
|
||||
// 4. 处理消息
|
||||
if (resp.msgs && resp.msgs.length > 0) {
|
||||
const groups = groupMessagesByUser(resp.msgs);
|
||||
|
||||
logger.debug('Processing messages', {
|
||||
shareId,
|
||||
totalMsgs: resp.msgs.length,
|
||||
userGroups: groups.length
|
||||
});
|
||||
|
||||
// 并发处理各用户分组
|
||||
await Promise.allSettled(groups.map((group) => processUserGroup(outLink, group)));
|
||||
}
|
||||
|
||||
// 5. 更新 buf
|
||||
if (resp.get_updates_buf) {
|
||||
await MongoOutLink.updateOne({ shareId }, { $set: { 'app.syncBuf': resp.get_updates_buf } });
|
||||
}
|
||||
} catch (error) {
|
||||
logger.error('Poll job error', { shareId, error: String(error) });
|
||||
await Promise.race([pollImpl(job), timeout]);
|
||||
} finally {
|
||||
if (timer) clearTimeout(timer);
|
||||
}
|
||||
|
||||
// 6. 续链
|
||||
await scheduleNextPoll(shareId);
|
||||
}
|
||||
|
||||
/* ============ 处理单个用户分组 ============ */
|
||||
async function pollImpl(job: Job<WechatPollJobData>): Promise<void> {
|
||||
const { shareId } = job.data;
|
||||
|
||||
const outLink = (await MongoOutLink.findOne({
|
||||
shareId
|
||||
}).lean()) as unknown as OutLinkSchemaType<WechatAppType> | null;
|
||||
|
||||
if (!outLink || !outLink.app) {
|
||||
logger.warn('OutLink not found, stop polling', { shareId });
|
||||
throw new Error('OutLink not found');
|
||||
}
|
||||
|
||||
async function processUserGroup(
|
||||
outLink: OutLinkSchemaType<WechatAppType>,
|
||||
group: ParsedMessageGroup
|
||||
): Promise<void> {
|
||||
const app = outLink.app;
|
||||
const chatId = `wechat_${outLink.shareId}_${group.userId}`;
|
||||
if (app.status !== 'online') {
|
||||
logger.info('Channel not online, stop polling', { shareId, status: app.status });
|
||||
throw new Error('Channel not online');
|
||||
}
|
||||
if (!app.token) {
|
||||
logger.warn('No token, stop polling', { shareId });
|
||||
throw new Error('No token');
|
||||
}
|
||||
|
||||
const client = new ILinkClient(app.baseUrl, app.token);
|
||||
const resp = await client.getUpdates(app.syncBuf || '');
|
||||
|
||||
const isError =
|
||||
(resp.ret !== undefined && resp.ret !== 0) ||
|
||||
(resp.errcode !== undefined && resp.errcode !== 0);
|
||||
|
||||
if (isError) {
|
||||
logger.error('getUpdates API error', {
|
||||
shareId,
|
||||
ret: resp.ret,
|
||||
errcode: resp.errcode,
|
||||
errmsg: resp.errmsg
|
||||
});
|
||||
|
||||
const failures = Number((await getRedisCache(failKey(shareId))) ?? '0') + 1;
|
||||
await setRedisCache(failKey(shareId), String(failures), 300);
|
||||
|
||||
if (failures >= MAX_CONSECUTIVE_FAILURES) {
|
||||
await MongoOutLink.updateOne(
|
||||
{ shareId },
|
||||
{ $set: { 'app.status': 'error', 'app.lastError': resp.errmsg || 'Too many failures' } }
|
||||
);
|
||||
logger.error('Too many failures, stop polling', { shareId, failures });
|
||||
await delRedisCache(failKey(shareId));
|
||||
}
|
||||
|
||||
// 抛错走 'failed' 事件 → 续链带退避
|
||||
throw new Error(`getUpdates API error: ret=${resp.ret} errcode=${resp.errcode}`);
|
||||
}
|
||||
|
||||
await setRedisCache(failKey(shareId), '0', 300);
|
||||
|
||||
// 1) 先分发回复任务(失败则 syncBuf 不推进,下次 poll 重拉;靠 replyJobId 幂等去重)
|
||||
if (resp.msgs && resp.msgs.length > 0) {
|
||||
const groups = groupMessagesByUser(resp.msgs);
|
||||
logger.debug('Dispatch reply jobs', {
|
||||
shareId,
|
||||
totalMsgs: resp.msgs.length,
|
||||
userGroups: groups.length
|
||||
});
|
||||
|
||||
const replyQueue = getQueue<WechatReplyJobData>(QueueNames.wechatReply);
|
||||
await Promise.all(
|
||||
groups.map((g) =>
|
||||
replyQueue.add(
|
||||
REPLY_JOB_NAME,
|
||||
{
|
||||
shareId,
|
||||
userId: g.userId,
|
||||
text: g.text,
|
||||
contextToken: g.contextToken,
|
||||
lastMsgId: g.lastMsgId
|
||||
},
|
||||
{
|
||||
jobId: replyJobId(shareId, g.lastMsgId),
|
||||
backoff: { type: 'fixed', delay: 2000 }
|
||||
}
|
||||
)
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// 2) 全部入队成功后再推进 syncBuf
|
||||
if (resp.get_updates_buf) {
|
||||
await MongoOutLink.updateOne({ shareId }, { $set: { 'app.syncBuf': resp.get_updates_buf } });
|
||||
}
|
||||
|
||||
// 3) 不在这里续链,交给 worker 'completed' 事件处理器
|
||||
}
|
||||
|
||||
/* ============ Reply Worker ============ */
|
||||
async function processWechatReplyJob(job: Job<WechatReplyJobData>): Promise<void> {
|
||||
const { shareId, userId, text, contextToken, lastMsgId } = job.data;
|
||||
|
||||
const outLink = (await MongoOutLink.findOne({
|
||||
shareId
|
||||
}).lean()) as unknown as OutLinkSchemaType<WechatAppType> | null;
|
||||
if (!outLink || !outLink.app || outLink.app.status !== 'online' || !outLink.app.token) {
|
||||
logger.warn('Channel not available, drop reply', { shareId, lastMsgId });
|
||||
return;
|
||||
}
|
||||
|
||||
const app = outLink.app;
|
||||
const client = new ILinkClient(app.baseUrl, app.token);
|
||||
const chatId = `wechat_${shareId}_${userId}`;
|
||||
|
||||
try {
|
||||
await outlinkInvokeChat({
|
||||
outLinkConfig: outLink,
|
||||
chatId,
|
||||
query: [{ text: { content: group.text } }],
|
||||
messageId: group.msgIds[group.msgIds.length - 1],
|
||||
chatUserId: group.userId,
|
||||
query: [{ text: { content: text } }],
|
||||
messageId: lastMsgId,
|
||||
chatUserId: userId,
|
||||
onReply: async (replyContent: string) => {
|
||||
await client.sendMessage({
|
||||
to_user_id: group.userId,
|
||||
to_user_id: userId,
|
||||
text: replyContent,
|
||||
context_token: group.contextToken
|
||||
context_token: contextToken
|
||||
});
|
||||
}
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Process user group failed', {
|
||||
shareId: outLink.shareId,
|
||||
userId: group.userId,
|
||||
logger.error('Reply job failed', {
|
||||
shareId,
|
||||
userId,
|
||||
lastMsgId,
|
||||
error: String(error)
|
||||
});
|
||||
|
||||
// 尝试发送错误提示
|
||||
try {
|
||||
const errorText = outLink.defaultResponse || `Run agent error: ${getErrText(error)}`;
|
||||
await client.sendMessage({
|
||||
to_user_id: group.userId,
|
||||
text: errorText,
|
||||
context_token: group.contextToken
|
||||
});
|
||||
} catch {
|
||||
// 忽略发送失败
|
||||
}
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/* ============ 续链调度 ============ */
|
||||
|
||||
// 在 worker 'completed' / 'failed' 事件里调用,此时 job hash 已从 Redis 删除
|
||||
// startWechatPolling / resumeAllWechatPolling 也调用本函数:
|
||||
// - 如果链正在运行(job 处于 active),add 会因 jobId 冲突被 BullMQ 静默忽略(幂等)
|
||||
// - 如果链已死(无 job),add 正常入队
|
||||
async function scheduleNextPoll(shareId: string, delayMs?: number): Promise<void> {
|
||||
const queue = getQueue<WechatPollJobData>(QueueNames.wechatPoll);
|
||||
|
||||
await queue.add(
|
||||
queueName,
|
||||
POLL_JOB_NAME,
|
||||
{ shareId },
|
||||
{
|
||||
jobId: `wechat-poll-${shareId}-${Date.now()}`,
|
||||
...(delayMs ? { delay: delayMs } : {})
|
||||
jobId: pollJobId(shareId),
|
||||
...(delayMs ? { delay: delayMs } : {}),
|
||||
removeOnComplete: true,
|
||||
removeOnFail: true
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 判断渠道是否仍应继续轮询。
|
||||
* 用于 'completed' 事件处理器 —— 渠道已被停用时不再续链。
|
||||
*/
|
||||
async function shouldContinuePolling(shareId: string): Promise<boolean> {
|
||||
const outLink = await MongoOutLink.findOne(
|
||||
{
|
||||
shareId,
|
||||
type: 'wechat',
|
||||
'app.status': 'online',
|
||||
'app.token': { $exists: true, $ne: '' }
|
||||
},
|
||||
{ _id: 1 }
|
||||
).lean();
|
||||
return Boolean(outLink);
|
||||
}
|
||||
|
||||
/* ============ 对外接口 ============ */
|
||||
|
||||
/**
|
||||
* 初始化微信轮询 Worker
|
||||
* 初始化微信轮询 / 回复 Worker
|
||||
*/
|
||||
export const initWechatPollWorker = async () => {
|
||||
/**
|
||||
* 服务启动时恢复所有 online 渠道的轮询
|
||||
*/
|
||||
const resumeAllWechatPolling = async (): Promise<void> => {
|
||||
const onlineChannels = await MongoOutLink.find(
|
||||
{
|
||||
type: 'wechat',
|
||||
'app.status': 'online',
|
||||
'app.token': { $exists: true, $ne: '' }
|
||||
},
|
||||
{ shareId: 1 }
|
||||
).lean();
|
||||
const pollWorker = getWorker<WechatPollJobData>(QueueNames.wechatPoll, processWechatPollJob, {
|
||||
// poll job 主要阻塞在 getUpdates 长轮询 I/O(~30s),不吃 CPU
|
||||
concurrency: env.WECHAT_CHANNEL_CONCURRENCY,
|
||||
lockDuration: POLL_LOCK_MS, // 120s 防止 job 被误判为 stalled
|
||||
stalledInterval: 30_000, // 30s 检查下是否活跃
|
||||
removeOnComplete: { count: 0 },
|
||||
removeOnFail: { count: 0 }
|
||||
});
|
||||
|
||||
logger.info('Resuming wechat polling', { count: onlineChannels.length });
|
||||
|
||||
for (const ch of onlineChannels) {
|
||||
await startWechatPolling(ch.shareId);
|
||||
// 成功完成:续链(立即)。事件内 add 因 job 已被移除,不会冲突
|
||||
pollWorker.on('completed', async (job) => {
|
||||
if (job.name !== POLL_JOB_NAME) return;
|
||||
const { shareId } = job.data as WechatPollJobData;
|
||||
try {
|
||||
await scheduleNextPoll(shareId);
|
||||
} catch (error) {
|
||||
logger.error('Schedule next poll (completed) failed', { shareId, error: String(error) });
|
||||
}
|
||||
};
|
||||
});
|
||||
|
||||
getWorker<WechatPollJobData>(QueueNames.wechatPoll, processWechatPollJob, {
|
||||
concurrency: 10,
|
||||
lockDuration: 120_000,
|
||||
// 失败:续链(带退避)。渠道仍 online 时尝试恢复
|
||||
pollWorker.on('failed', async (job) => {
|
||||
if (!job || job.name !== POLL_JOB_NAME) return;
|
||||
const { shareId } = job.data as WechatPollJobData;
|
||||
try {
|
||||
await retryFn(async () => {
|
||||
if (!(await shouldContinuePolling(shareId))) return;
|
||||
await scheduleNextPoll(shareId, FAILURE_BACKOFF_MS);
|
||||
});
|
||||
} catch (error) {
|
||||
logger.error('Schedule next poll (failed) failed', { shareId, error: String(error) });
|
||||
}
|
||||
});
|
||||
|
||||
getWorker<WechatReplyJobData>(QueueNames.wechatReply, processWechatReplyJob, {
|
||||
concurrency: env.WECHAT_CHANNEL_CONCURRENCY,
|
||||
lockDuration: REPLY_LOCK_MS,
|
||||
stalledInterval: 60_000,
|
||||
removeOnComplete: { count: 0 },
|
||||
removeOnFail: { count: 100, age: 7 * 24 * 60 * 60 }
|
||||
removeOnFail: { count: 500, age: 7 * 24 * 60 * 60 }
|
||||
});
|
||||
|
||||
await resumeAllWechatPolling();
|
||||
|
||||
logger.info('Wechat poll worker initialized');
|
||||
logger.info('Wechat poll/reply workers initialized');
|
||||
};
|
||||
|
||||
/**
|
||||
* 启动某个渠道的轮询(扫码登录成功后调用)
|
||||
* 服务启动时恢复所有 online 渠道的轮询
|
||||
*/
|
||||
async function resumeAllWechatPolling(): Promise<void> {
|
||||
const onlineChannels = await MongoOutLink.find(
|
||||
{
|
||||
type: 'wechat',
|
||||
'app.status': 'online',
|
||||
'app.token': { $exists: true, $ne: '' }
|
||||
},
|
||||
{ shareId: 1 }
|
||||
).lean();
|
||||
|
||||
logger.info('Resuming wechat polling', { count: onlineChannels.length });
|
||||
|
||||
await batchRun(
|
||||
onlineChannels,
|
||||
async (ch) => {
|
||||
await scheduleNextPoll(ch.shareId);
|
||||
},
|
||||
100
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* 启动某个渠道的轮询
|
||||
*/
|
||||
export const startWechatPolling = async (shareId: string): Promise<void> => {
|
||||
const queue = getQueue<WechatPollJobData>(QueueNames.wechatPoll);
|
||||
|
||||
await queue.add(
|
||||
queueName,
|
||||
{ shareId },
|
||||
{
|
||||
jobId: `wechat-poll-${shareId}-${Date.now()}`
|
||||
}
|
||||
);
|
||||
|
||||
await scheduleNextPoll(shareId);
|
||||
logger.info('Wechat polling started', { shareId });
|
||||
};
|
||||
|
||||
@@ -232,5 +313,11 @@ export const startWechatPolling = async (shareId: string): Promise<void> => {
|
||||
export const stopWechatPolling = async (shareId: string): Promise<void> => {
|
||||
await MongoOutLink.updateOne({ shareId }, { $set: { 'app.status': 'offline', 'app.token': '' } });
|
||||
|
||||
// Delete job from queue
|
||||
const queue = getQueue<WechatPollJobData>(QueueNames.wechatPoll);
|
||||
await queue.remove(pollJobId(shareId)).catch((error) => {
|
||||
logger.warn('Remove poll job failed (job may be active)', { shareId, error: String(error) });
|
||||
});
|
||||
|
||||
logger.info('Wechat polling stopped', { shareId });
|
||||
};
|
||||
|
||||
@@ -1,3 +1,11 @@
|
||||
export type WechatPollJobData = {
|
||||
shareId: string;
|
||||
};
|
||||
|
||||
export type WechatReplyJobData = {
|
||||
shareId: string;
|
||||
userId: string;
|
||||
text: string;
|
||||
contextToken: string;
|
||||
lastMsgId: string;
|
||||
};
|
||||
|
||||
@@ -160,6 +160,8 @@ CHECK_INTERNAL_IP=false
|
||||
APP_FOLDER_MAX_AMOUNT=1000
|
||||
# 数据集文件夹最大数量
|
||||
DATASET_FOLDER_MAX_AMOUNT=1000
|
||||
# 微信渠道 poll worker 并发数(默认 1000),需 ≥ online channel 数;channel 数超过该值时消息延迟会线性恶化
|
||||
WECHAT_CHANNEL_CONCURRENCY=1000
|
||||
|
||||
# ==================== 上传与账号策略 ====================
|
||||
# 最大上传文件大小(MB)
|
||||
|
||||
@@ -10,10 +10,9 @@ import { useDebounceEffect, useMount } from 'ahooks';
|
||||
import { v1Workflow2V2 } from '@/web/core/workflow/adapt';
|
||||
import { defaultAppSelectFileConfig } from '@fastgpt/global/core/app/constants';
|
||||
import { form2AppWorkflow, appWorkflow2Form } from './utils';
|
||||
|
||||
const Edit = dynamic(() => import('./Edit'));
|
||||
const Logs = dynamic(() => import('../../Logs/index'));
|
||||
const PublishChannel = dynamic(() => import('../../Publish'));
|
||||
import PublishChannel from '../../Publish';
|
||||
import Logs from '../../Logs';
|
||||
import Edit from './Edit';
|
||||
|
||||
const SimpleEdit = () => {
|
||||
const { t } = useTranslation();
|
||||
|
||||
@@ -6,6 +6,7 @@ import { NextAPI } from '@/service/middleware/entry';
|
||||
import { addAuditLog } from '@fastgpt/service/support/user/audit/util';
|
||||
import { AuditEventEnum } from '@fastgpt/global/support/user/audit/constants';
|
||||
import { getI18nAppType } from '@fastgpt/service/support/user/audit/util';
|
||||
import { stopWechatPolling } from '@fastgpt/service/support/outLink/wechat/mq';
|
||||
|
||||
export type OutLinkDeleteQuery = {
|
||||
id: string;
|
||||
@@ -25,7 +26,15 @@ async function handler(
|
||||
per: OwnerPermissionVal
|
||||
});
|
||||
|
||||
await MongoOutLink.findByIdAndDelete(id);
|
||||
const outlink = await MongoOutLink.findById(id);
|
||||
|
||||
if (outlink && outlink.type === 'wechat') {
|
||||
await stopWechatPolling(outlink.shareId).catch((error) => {
|
||||
console.warn('Stop wechat polling failed', error);
|
||||
});
|
||||
}
|
||||
|
||||
await outlink?.deleteOne();
|
||||
|
||||
(async () => {
|
||||
addAuditLog({
|
||||
|
||||
@@ -61,7 +61,7 @@ describe('groupMessagesByUser', () => {
|
||||
userId: 'u1',
|
||||
text: 'default text',
|
||||
contextToken: 'ctx1',
|
||||
msgIds: ['m1']
|
||||
lastMsgId: 'm1'
|
||||
});
|
||||
});
|
||||
|
||||
@@ -84,7 +84,7 @@ describe('groupMessagesByUser', () => {
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0].text).toBe('first\nsecond');
|
||||
expect(result[0].msgIds).toEqual(['m1', 'm2']);
|
||||
expect(result[0].lastMsgId).toBe('m2');
|
||||
expect(result[0].contextToken).toBe('ctx2'); // 取最后一条的
|
||||
});
|
||||
|
||||
@@ -107,7 +107,7 @@ describe('groupMessagesByUser', () => {
|
||||
const result = groupMessagesByUser(msgs);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0].msgIds).toEqual(['m2']);
|
||||
expect(result[0].lastMsgId).toBe('m2');
|
||||
});
|
||||
|
||||
it('should skip messages with no extractable text', () => {
|
||||
@@ -122,7 +122,7 @@ describe('groupMessagesByUser', () => {
|
||||
const result = groupMessagesByUser(msgs);
|
||||
|
||||
expect(result).toHaveLength(1);
|
||||
expect(result[0].msgIds).toEqual(['m2']);
|
||||
expect(result[0].lastMsgId).toBe('m2');
|
||||
});
|
||||
|
||||
it('should handle empty message list', () => {
|
||||
|
||||
Reference in New Issue
Block a user