diff --git a/SECURITY.md b/SECURITY.md index 1d8223fd6..562cf092e 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -5,7 +5,7 @@ 如果您发现了 FastGPT 的安全漏洞,请按照以下步骤进行报告: 1. **报告方式** - 发送邮件至:dennis@sealos.io + 发送邮件至:yujinlong@sealos.io 请备注版本以及您的 GitHub 账号 3. **响应时间** diff --git a/document/content/docs/protocol/privacy.en.mdx b/document/content/docs/protocol/privacy.en.mdx index 1dcf9b37d..3c1e5a5fd 100644 --- a/document/content/docs/protocol/privacy.en.mdx +++ b/document/content/docs/protocol/privacy.en.mdx @@ -58,5 +58,5 @@ Due to servers potentially being located in different countries/regions, you agr **Contact Us** -1. For any questions, suggestions, or complaints about this policy, contact us at: dennis@sealos.io. +1. For any questions, suggestions, or complaints about this policy, contact us at: yujinlong@sealos.io. 2. We will respond promptly and address your concerns. diff --git a/document/content/docs/protocol/privacy.mdx b/document/content/docs/protocol/privacy.mdx index bea09b8eb..4ad911d79 100644 --- a/document/content/docs/protocol/privacy.mdx +++ b/document/content/docs/protocol/privacy.mdx @@ -58,5 +58,5 @@ description: ' FastGPT 隐私政策' **联系我们** -1. 如您对本隐私政策有任何疑问、建议或投诉,请通过以下方式与我们联系:dennis@sealos.io。 +1. 如您对本隐私政策有任何疑问、建议或投诉,请通过以下方式与我们联系:yujinlong@sealos.io。 2. 我们将尽快回复并解决您提出的问题。 diff --git a/document/content/docs/protocol/terms.en.mdx b/document/content/docs/protocol/terms.en.mdx index 08fea90a8..463b63a92 100644 --- a/document/content/docs/protocol/terms.en.mdx +++ b/document/content/docs/protocol/terms.en.mdx @@ -72,4 +72,4 @@ This FastGPT Service Agreement constitutes the terms and conditions agreed betwe **Article 7 Additional Provisions** 1. If any clause is deemed unlawful or invalid, the remaining provisions shall remain enforceable. -2. Sealos retains final authority in interpreting this Agreement and privacy policies. For any inquiries, please contact us at dennis@sealos.io. +2. Sealos retains final authority in interpreting this Agreement and privacy policies. For any inquiries, please contact us at yujinlong@sealos.io. diff --git a/document/content/docs/protocol/terms.mdx b/document/content/docs/protocol/terms.mdx index d9c777d77..43366c47d 100644 --- a/document/content/docs/protocol/terms.mdx +++ b/document/content/docs/protocol/terms.mdx @@ -67,4 +67,4 @@ FastGPT 服务协议是您与珠海环界云计算有限公司(以下简称“ **第7条 其他条款** 1. 如本协议中部分条款因违反法律法规而被视为无效,不影响其他条款的效力。 -2. 本公司保留对本协议及隐私政策的最终解释权。如您对本协议或隐私政策有任何疑问,请联系我们:dennis@sealos.io。 +2. 本公司保留对本协议及隐私政策的最终解释权。如您对本协议或隐私政策有任何疑问,请联系我们:yujinlong@sealos.io。 diff --git a/document/content/docs/upgrading/4-11/4112.mdx b/document/content/docs/upgrading/4-11/4112.mdx new file mode 100644 index 000000000..b2a79e364 --- /dev/null +++ b/document/content/docs/upgrading/4-11/4112.mdx @@ -0,0 +1,18 @@ +--- +title: 'V4.11.2' +description: 'FastGPT V4.11.2 更新说明' +--- + +## 🚀 新增内容 + +## ⚙️ 优化 + +1. 优化 3 处存在潜在内存泄露的代码。 +2. 优化工作流部分递归检查,避免无限递归。 +3. 优化文档阅读 Worker,采用 ShareBuffer 避免数据拷贝。 + +## 🐛 修复 + +1. Doc2x API 更新,导致解析失败。 + +## 🔨 工具更新 diff --git a/document/content/docs/upgrading/4-11/meta.json b/document/content/docs/upgrading/4-11/meta.json index dcc52023f..b973d73bf 100644 --- a/document/content/docs/upgrading/4-11/meta.json +++ b/document/content/docs/upgrading/4-11/meta.json @@ -1,5 +1,5 @@ { "title": "4.11.x", "description": "", - "pages": ["4111", "4110"] + "pages": ["4112", "4111", "4110"] } diff --git a/document/data/doc-last-modified.json b/document/data/doc-last-modified.json index ed44550ee..3b4eb2c60 100644 --- a/document/data/doc-last-modified.json +++ b/document/data/doc-last-modified.json @@ -1,8 +1,4 @@ { - "document/content/docs/api/api1.mdx": "2025-07-23T21:35:03+08:00", - "document/content/docs/api/api2.mdx": "2025-08-02T19:38:37+08:00", - "document/content/docs/api/index.mdx": "2025-07-30T15:38:30+08:00", - "document/content/docs/api/test/api3.mdx": "2025-07-23T21:35:03+08:00", "document/content/docs/faq/app.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/faq/chat.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/faq/dataset.mdx": "2025-08-02T19:38:37+08:00", @@ -96,14 +92,15 @@ "document/content/docs/protocol/index.mdx": "2025-07-30T15:38:30+08:00", "document/content/docs/protocol/open-source.en.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/protocol/open-source.mdx": "2025-08-02T19:38:37+08:00", - "document/content/docs/protocol/privacy.en.mdx": "2025-08-02T19:38:37+08:00", - "document/content/docs/protocol/privacy.mdx": "2025-08-02T19:38:37+08:00", - "document/content/docs/protocol/terms.en.mdx": "2025-08-02T19:38:37+08:00", - "document/content/docs/protocol/terms.mdx": "2025-08-02T19:38:37+08:00", + "document/content/docs/protocol/privacy.en.mdx": "2025-08-03T14:03:08+08:00", + "document/content/docs/protocol/privacy.mdx": "2025-08-03T14:03:08+08:00", + "document/content/docs/protocol/terms.en.mdx": "2025-08-03T14:03:08+08:00", + "document/content/docs/protocol/terms.mdx": "2025-08-03T14:03:08+08:00", "document/content/docs/upgrading/4-10/4100.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-10/4101.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-11/4110.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-11/4111.mdx": "2025-08-02T19:38:37+08:00", + "document/content/docs/upgrading/4-11/4112.mdx": "2025-08-03T14:03:08+08:00", "document/content/docs/upgrading/4-8/40.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/41.mdx": "2025-08-02T19:38:37+08:00", "document/content/docs/upgrading/4-8/42.mdx": "2025-08-02T19:38:37+08:00", diff --git a/packages/global/core/workflow/runtime/utils.ts b/packages/global/core/workflow/runtime/utils.ts index 7fcae94c1..7100b46fa 100644 --- a/packages/global/core/workflow/runtime/utils.ts +++ b/packages/global/core/workflow/runtime/utils.ts @@ -23,13 +23,20 @@ import type { RuntimeEdgeItemType, RuntimeNodeItemType } from './type'; export const extractDeepestInteractive = ( interactive: WorkflowInteractiveResponseType ): WorkflowInteractiveResponseType => { - if ( - (interactive?.type === 'childrenInteractive' || interactive?.type === 'loopInteractive') && - interactive.params?.childrenResponse + const MAX_DEPTH = 100; + let current = interactive; + let depth = 0; + + while ( + depth < MAX_DEPTH && + (current?.type === 'childrenInteractive' || current?.type === 'loopInteractive') && + current.params?.childrenResponse ) { - return extractDeepestInteractive(interactive.params.childrenResponse); + current = current.params.childrenResponse; + depth++; } - return interactive; + + return current; }; export const getMaxHistoryLimitFromNodes = (nodes: StoreNodeItemType[]): number => { let limit = 10; @@ -294,22 +301,42 @@ export const checkNodeRunStatus = ({ const commonEdges: RuntimeEdgeItemType[] = []; const recursiveEdges: RuntimeEdgeItemType[] = []; - const checkIsCircular = (edge: RuntimeEdgeItemType, visited: Set): boolean => { - if (edge.source === currentNode.nodeId) { - return true; // 检测到环,并且环中包含当前节点 - } - if (visited.has(edge.source)) { - return false; // 检测到环,但不包含当前节点(子节点成环) - } - visited.add(edge.source); + const checkIsCircular = (startEdge: RuntimeEdgeItemType, initialVisited: string[]): boolean => { + const stack: Array<{ edge: RuntimeEdgeItemType; visited: Set }> = [ + { edge: startEdge, visited: new Set(initialVisited) } + ]; - // 递归检测后面的 edge,如果有其中一个成环,则返回 true - const nextEdges = allEdges.filter((item) => item.target === edge.source); - return nextEdges.some((nextEdge) => checkIsCircular(nextEdge, new Set(visited))); + const MAX_DEPTH = 3000; + let iterations = 0; + + while (stack.length > 0 && iterations < MAX_DEPTH) { + iterations++; + + const { edge, visited } = stack.pop()!; + + if (edge.source === currentNode.nodeId) { + return true; // 检测到环,并且环中包含当前节点 + } + + if (visited.has(edge.source)) { + continue; // 已访问过此节点,跳过(避免子环干扰) + } + + const newVisited = new Set(visited); + newVisited.add(edge.source); + + // 查找目标节点的 source edges 并加入栈中 + const nextEdges = allEdges.filter((item) => item.target === edge.source); + for (const nextEdge of nextEdges) { + stack.push({ edge: nextEdge, visited: newVisited }); + } + } + + return false; }; sourceEdges.forEach((edge) => { - if (checkIsCircular(edge, new Set([currentNode.nodeId]))) { + if (checkIsCircular(edge, [currentNode.nodeId])) { recursiveEdges.push(edge); } else { commonEdges.push(edge); diff --git a/packages/service/common/file/gridfs/controller.ts b/packages/service/common/file/gridfs/controller.ts index 5d9788feb..b69d9fcde 100644 --- a/packages/service/common/file/gridfs/controller.ts +++ b/packages/service/common/file/gridfs/controller.ts @@ -79,6 +79,8 @@ export async function uploadFile({ .pipe(stream as any) .on('finish', resolve) .on('error', reject); + }).finally(() => { + readStream.destroy(); }); return String(stream.id); diff --git a/packages/service/common/file/utils.ts b/packages/service/common/file/utils.ts index 3a623e597..b1cda4ad2 100644 --- a/packages/service/common/file/utils.ts +++ b/packages/service/common/file/utils.ts @@ -2,6 +2,11 @@ import { isProduction } from '@fastgpt/global/common/system/constants'; import fs from 'fs'; import path from 'path'; +export const getFileMaxSize = () => { + const mb = global.feConfigs?.uploadFileMaxSize || 1000; + return mb * 1024 * 1024; +}; + export const removeFilesByPaths = (paths: string[]) => { paths.forEach((path) => { fs.unlink(path, (err) => { diff --git a/packages/service/common/mongo/init.ts b/packages/service/common/mongo/init.ts index 1202d42a9..b7233f06c 100644 --- a/packages/service/common/mongo/init.ts +++ b/packages/service/common/mongo/init.ts @@ -13,17 +13,22 @@ export async function connectMongo(db: Mongoose, url: string): Promise return db; } + const RemoveListeners = () => { + db.connection.removeAllListeners('error'); + db.connection.removeAllListeners('disconnected'); + }; + console.log('MongoDB start connect'); try { // Remove existing listeners to prevent duplicates - db.connection.removeAllListeners('error'); - db.connection.removeAllListeners('disconnected'); + RemoveListeners(); db.set('strictQuery', 'throw'); db.connection.on('error', async (error) => { console.log('mongo error', error); try { if (db.connection.readyState !== 0) { + RemoveListeners(); await db.disconnect(); await delay(1000); await connectMongo(db, url); @@ -34,6 +39,7 @@ export async function connectMongo(db: Mongoose, url: string): Promise console.log('mongo disconnected'); try { if (db.connection.readyState !== 0) { + RemoveListeners(); await db.disconnect(); await delay(1000); await connectMongo(db, url); diff --git a/packages/service/core/dataset/read.ts b/packages/service/core/dataset/read.ts index 95129c03b..e6f417cc8 100644 --- a/packages/service/core/dataset/read.ts +++ b/packages/service/core/dataset/read.ts @@ -13,6 +13,9 @@ import { getApiDatasetRequest } from './apiDataset'; import Papa from 'papaparse'; import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type'; import { text2Chunks } from '../../worker/function'; +import { addLog } from '../../common/system/log'; +import { retryFn } from '@fastgpt/global/common/system/utils'; +import { getFileMaxSize } from '../../common/file/utils'; export const readFileRawTextByUrl = async ({ teamId, @@ -20,7 +23,8 @@ export const readFileRawTextByUrl = async ({ url, customPdfParse, getFormatText, - relatedId + relatedId, + maxFileSize = getFileMaxSize() }: { teamId: string; tmbId: string; @@ -28,30 +32,113 @@ export const readFileRawTextByUrl = async ({ customPdfParse?: boolean; getFormatText?: boolean; relatedId: string; // externalFileId / apiFileId + maxFileSize?: number; }) => { + const extension = parseFileExtensionFromUrl(url); + + // Check file size + try { + const headResponse = await axios.head(url, { timeout: 10000 }); + const contentLength = parseInt(headResponse.headers['content-length'] || '0'); + + if (contentLength > 0 && contentLength > maxFileSize) { + return Promise.reject( + `File too large. Size: ${Math.round(contentLength / 1024 / 1024)}MB, Maximum allowed: ${Math.round(maxFileSize / 1024 / 1024)}MB` + ); + } + } catch (error) { + addLog.warn('Check file HEAD request failed'); + } + + // Use stream response type, avoid double memory usage const response = await axios({ method: 'get', url: url, - responseType: 'arraybuffer' - }); - const extension = parseFileExtensionFromUrl(url); - - const buffer = Buffer.from(response.data, 'binary'); - - const { rawText } = await readRawContentByFileBuffer({ - customPdfParse, - getFormatText, - extension, - teamId, - tmbId, - buffer, - encoding: 'utf-8', - metadata: { - relatedId - } + responseType: 'stream', + maxContentLength: maxFileSize, + timeout: 30000 }); - return rawText; + // 优化:直接从 stream 转换为 buffer,避免 arraybuffer 中间步骤 + const chunks: Buffer[] = []; + let totalLength = 0; + + return new Promise((resolve, reject) => { + let isAborted = false; + + const cleanup = () => { + if (!isAborted) { + isAborted = true; + chunks.length = 0; // 清理内存 + response.data.destroy(); + } + }; + + // Stream timeout + const timeoutId = setTimeout(() => { + cleanup(); + reject('File download timeout after 30 seconds'); + }, 600000); + + response.data.on('data', (chunk: Buffer) => { + if (isAborted) return; + totalLength += chunk.length; + if (totalLength > maxFileSize) { + clearTimeout(timeoutId); + cleanup(); + return reject( + `File too large. Maximum size allowed is ${Math.round(maxFileSize / 1024 / 1024)}MB.` + ); + } + + chunks.push(chunk); + }); + + response.data.on('end', async () => { + if (isAborted) return; + + clearTimeout(timeoutId); + + try { + // 合并所有 chunks 为单个 buffer + const buffer = Buffer.concat(chunks); + + // 立即清理 chunks 数组释放内存 + chunks.length = 0; + + const { rawText } = await retryFn(() => + readRawContentByFileBuffer({ + customPdfParse, + getFormatText, + extension, + teamId, + tmbId, + buffer, + encoding: 'utf-8', + metadata: { + relatedId + } + }) + ); + + resolve(rawText); + } catch (error) { + cleanup(); + reject(error); + } + }); + + response.data.on('error', (error: Error) => { + clearTimeout(timeoutId); + cleanup(); + reject(error); + }); + + response.data.on('close', () => { + clearTimeout(timeoutId); + cleanup(); + }); + }); }; /* diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index 13b60acee..fb9b002c3 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -180,6 +180,7 @@ export async function dispatchWorkFlow(data: Props): Promise { - setTimeout(() => { - props?.workflowStreamResponse?.({ - event: SseResponseEventEnum.answer, - data: textAdaptGptResponse({ - text: '' - }) - }); - sendStreamTimerSign(); - }, 10000); - }; - sendStreamTimerSign(); + streamCheckTimer = setInterval(() => { + props?.workflowStreamResponse?.({ + event: SseResponseEventEnum.answer, + data: textAdaptGptResponse({ + text: '' + }) + }); + }, 10000); } // Get default variables @@ -841,6 +838,10 @@ export async function dispatchWorkFlow(data: Props): Promise { if (typeof err === 'string') { return Promise.reject({ message: `[Doc2x] ${err}` }); } - if (typeof err.message === 'string') { - return Promise.reject({ message: `[Doc2x] ${err.message}` }); - } if (typeof err.data === 'string') { return Promise.reject({ message: `[Doc2x] ${err.data}` }); } if (err?.response?.data) { return Promise.reject({ message: `[Doc2x] ${getErrText(err?.response?.data)}` }); } + if (typeof err.message === 'string') { + return Promise.reject({ message: `[Doc2x] ${err.message}` }); + } addLog.error('[Doc2x] Unknown error', err); return Promise.reject({ message: `[Doc2x] ${getErrText(err)}` }); @@ -78,7 +78,7 @@ export const useDoc2xServer = ({ apiKey }: { apiKey: string }) => { code, msg, data: preupload_data - } = await request<{ uid: string; url: string }>('/v2/parse/preupload', null, 'POST'); + } = await request<{ uid: string; url: string }>('/v2/parse/preupload', {}, 'POST'); if (!['ok', 'success'].includes(code)) { return Promise.reject(`[Doc2x] Failed to get pre-upload URL: ${msg}`); } @@ -96,6 +96,7 @@ export const useDoc2xServer = ({ apiKey }: { apiKey: string }) => { .catch((error) => { return Promise.reject(`[Doc2x] Failed to upload file: ${getErrText(error)}`); }); + if (response.status !== 200) { return Promise.reject( `[Doc2x] Upload failed with status ${response.status}: ${response.statusText}` diff --git a/packages/service/worker/function.ts b/packages/service/worker/function.ts index 6e1e76168..134abbc43 100644 --- a/packages/service/worker/function.ts +++ b/packages/service/worker/function.ts @@ -20,5 +20,17 @@ export const readRawContentFromBuffer = (props: { encoding: string; buffer: Buffer; }) => { - return runWorker(WorkerNameEnum.readFile, props); + const bufferSize = props.buffer.length; + + // 使用 SharedArrayBuffer,避免数据复制 + const sharedBuffer = new SharedArrayBuffer(bufferSize); + const sharedArray = new Uint8Array(sharedBuffer); + sharedArray.set(props.buffer); + + return runWorker(WorkerNameEnum.readFile, { + extension: props.extension, + encoding: props.encoding, + sharedBuffer: sharedBuffer, + bufferSize: bufferSize + }); }; diff --git a/packages/service/worker/readFile/extension/pdf.ts b/packages/service/worker/readFile/extension/pdf.ts index 10d399b00..a32d65460 100644 --- a/packages/service/worker/readFile/extension/pdf.ts +++ b/packages/service/worker/readFile/extension/pdf.ts @@ -56,16 +56,16 @@ export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise i + 1); - for (let i = 0; i < pageArr.length; i++) { - result += await readPDFPage(doc, i + 1); - } + const result = ( + await Promise.all(pageArr.map(async (page) => await readPDFPage(doc, page))) + ).join(''); loadingTask.destroy(); diff --git a/packages/service/worker/readFile/index.ts b/packages/service/worker/readFile/index.ts index 78c3edc5b..807153ee1 100644 --- a/packages/service/worker/readFile/index.ts +++ b/packages/service/worker/readFile/index.ts @@ -9,49 +9,60 @@ import { readXlsxRawText } from './extension/xlsx'; import { readCsvRawText } from './extension/csv'; import { workerResponse } from '../controller'; -parentPort?.on('message', async (props: ReadRawTextProps) => { - const read = async (params: ReadRawTextByBuffer) => { - switch (params.extension) { - case 'txt': - case 'md': - return readFileRawText(params); - case 'html': - return readHtmlRawText(params); - case 'pdf': - return readPdfFile(params); - case 'docx': - return readDocsFile(params); - case 'pptx': - return readPptxRawText(params); - case 'xlsx': - return readXlsxRawText(params); - case 'csv': - return readCsvRawText(params); - default: - return Promise.reject( - `Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx. "${params.extension}" is not supported.` - ); +parentPort?.on( + 'message', + async ( + props: Omit, 'buffer'> & { + sharedBuffer: SharedArrayBuffer; + bufferSize: number; } - }; + ) => { + const read = async (params: ReadRawTextByBuffer) => { + switch (params.extension) { + case 'txt': + case 'md': + return readFileRawText(params); + case 'html': + return readHtmlRawText(params); + case 'pdf': + return readPdfFile(params); + case 'docx': + return readDocsFile(params); + case 'pptx': + return readPptxRawText(params); + case 'xlsx': + return readXlsxRawText(params); + case 'csv': + return readCsvRawText(params); + default: + return Promise.reject( + `Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx. "${params.extension}" is not supported.` + ); + } + }; - // params.buffer: Uint8Array -> buffer - const buffer = Buffer.from(props.buffer); - const newProps: ReadRawTextByBuffer = { - ...props, - buffer - }; + // 使用 SharedArrayBuffer,零拷贝共享内存 + const sharedArray = new Uint8Array(props.sharedBuffer); + const buffer = Buffer.from(sharedArray.buffer, 0, props.bufferSize); - try { - workerResponse({ - parentPort, - status: 'success', - data: await read(newProps) - }); - } catch (error) { - workerResponse({ - parentPort, - status: 'error', - data: error - }); + const newProps: ReadRawTextByBuffer = { + extension: props.extension, + encoding: props.encoding, + buffer + }; + + try { + workerResponse({ + parentPort, + status: 'success', + data: await read(newProps) + }); + } catch (error) { + workerResponse({ + parentPort, + status: 'error', + data: error + }); + } } -}); +); diff --git a/packages/service/worker/utils.ts b/packages/service/worker/utils.ts index 1a2a35f53..4891afa6d 100644 --- a/packages/service/worker/utils.ts +++ b/packages/service/worker/utils.ts @@ -198,6 +198,7 @@ export class WorkerPool, Response = any> { if (item) { item.reject?.('error'); clearTimeout(item.timeoutId); + item.worker.removeAllListeners(); item.worker.terminate(); } diff --git a/projects/app/src/service/core/dataset/queues/generateQA.ts b/projects/app/src/service/core/dataset/queues/generateQA.ts index a07bc97bd..8238c31e0 100644 --- a/projects/app/src/service/core/dataset/queues/generateQA.ts +++ b/projects/app/src/service/core/dataset/queues/generateQA.ts @@ -44,157 +44,161 @@ export async function generateQA(): Promise { if (global.qaQueueLen >= max) return; global.qaQueueLen++; - while (true) { - const startTime = Date.now(); - // get training data - const { - data, - text, - done = false, - error = false - } = await (async () => { - try { - const data = await MongoDatasetTraining.findOneAndUpdate( - { - mode: TrainingModeEnum.qa, - retryCount: { $gt: 0 }, - lockTime: { $lte: addMinutes(new Date(), -10) } - }, - { - lockTime: new Date(), - $inc: { retryCount: -1 } - } - ) - .populate([ + try { + while (true) { + const startTime = Date.now(); + // get training data + const { + data, + text, + done = false, + error = false + } = await (async () => { + try { + const data = await MongoDatasetTraining.findOneAndUpdate( { - path: 'dataset', - select: 'agentModel vectorModel vlmModel' + mode: TrainingModeEnum.qa, + retryCount: { $gt: 0 }, + lockTime: { $lte: addMinutes(new Date(), -10) } }, { - path: 'collection', - select: 'qaPrompt' + lockTime: new Date(), + $inc: { retryCount: -1 } } - ]) - .lean(); + ) + .populate([ + { + path: 'dataset', + select: 'agentModel vectorModel vlmModel' + }, + { + path: 'collection', + select: 'qaPrompt' + } + ]) + .lean(); - // task preemption - if (!data) { + // task preemption + if (!data) { + return { + done: true + }; + } return { - done: true + data, + text: data.q + }; + } catch (error) { + return { + error: true }; } - return { - data, - text: data.q - }; - } catch (error) { - return { - error: true - }; + })(); + + if (done || !data) { + break; + } + if (error) { + addLog.error(`[QA Queue] Error`, error); + await delay(500); + continue; } - })(); - if (done || !data) { - break; - } - if (error) { - addLog.error(`[QA Queue] Error`, error); - await delay(500); - continue; - } + if (!data.dataset || !data.collection) { + addLog.info(`[QA Queue] Dataset or collection not found`, data); + // Delete data + await MongoDatasetTraining.deleteOne({ _id: data._id }); + continue; + } + // auth balance + if (!(await checkTeamAiPointsAndLock(data.teamId))) { + continue; + } - if (!data.dataset || !data.collection) { - addLog.info(`[QA Queue] Dataset or collection not found`, data); - // Delete data - await MongoDatasetTraining.deleteOne({ _id: data._id }); - continue; - } - // auth balance - if (!(await checkTeamAiPointsAndLock(data.teamId))) { - continue; - } + addLog.info(`[QA Queue] Start`); - addLog.info(`[QA Queue] Start`); + try { + const modelData = getLLMModel(data.dataset.agentModel); + const prompt = `${data.collection.qaPrompt || Prompt_AgentQA.description} + ${replaceVariable(Prompt_AgentQA.fixedText, { text })}`; - try { - const modelData = getLLMModel(data.dataset.agentModel); - const prompt = `${data.collection.qaPrompt || Prompt_AgentQA.description} -${replaceVariable(Prompt_AgentQA.fixedText, { text })}`; - - // request LLM to get QA - const messages: ChatCompletionMessageParam[] = [ - { - role: 'user', - content: prompt - } - ]; - - const { response: chatResponse } = await createChatCompletion({ - body: llmCompletionsBodyFormat( + // request LLM to get QA + const messages: ChatCompletionMessageParam[] = [ { - model: modelData.model, - temperature: 0.3, - messages: await loadRequestMessages({ messages, useVision: false }), - stream: true + role: 'user', + content: prompt + } + ]; + + const { response: chatResponse } = await createChatCompletion({ + body: llmCompletionsBodyFormat( + { + model: modelData.model, + temperature: 0.3, + messages: await loadRequestMessages({ messages, useVision: false }), + stream: true + }, + modelData + ) + }); + const { text: answer, usage } = await formatLLMResponse(chatResponse); + const inputTokens = usage?.prompt_tokens || (await countGptMessagesTokens(messages)); + const outputTokens = usage?.completion_tokens || (await countPromptTokens(answer)); + + const qaArr = await formatSplitText({ answer, rawText: text, llmModel: modelData }); // 格式化后的QA对 + + // get vector and insert + await pushDataListToTrainingQueue({ + teamId: data.teamId, + tmbId: data.tmbId, + datasetId: data.datasetId, + collectionId: data.collectionId, + mode: TrainingModeEnum.chunk, + data: qaArr.map((item) => ({ + ...item, + chunkIndex: data.chunkIndex + })), + billId: data.billId, + vectorModel: data.dataset.vectorModel, + agentModel: data.dataset.agentModel, + vlmModel: data.dataset.vlmModel + }); + + // delete data from training + await MongoDatasetTraining.findByIdAndDelete(data._id); + + // Push usage + pushLLMTrainingUsage({ + teamId: data.teamId, + tmbId: data.tmbId, + inputTokens, + outputTokens, + billId: data.billId, + model: modelData.model, + mode: 'qa' + }); + + addLog.info(`[QA Queue] Finish`, { + time: Date.now() - startTime, + splitLength: qaArr.length, + usage + }); + } catch (err: any) { + addLog.error(`[QA Queue] Error`, err); + await MongoDatasetTraining.updateOne( + { + _id: data._id }, - modelData - ) - }); - const { text: answer, usage } = await formatLLMResponse(chatResponse); - const inputTokens = usage?.prompt_tokens || (await countGptMessagesTokens(messages)); - const outputTokens = usage?.completion_tokens || (await countPromptTokens(answer)); + { + errorMsg: getErrText(err, 'unknown error') + } + ); - const qaArr = await formatSplitText({ answer, rawText: text, llmModel: modelData }); // 格式化后的QA对 - - // get vector and insert - await pushDataListToTrainingQueue({ - teamId: data.teamId, - tmbId: data.tmbId, - datasetId: data.datasetId, - collectionId: data.collectionId, - mode: TrainingModeEnum.chunk, - data: qaArr.map((item) => ({ - ...item, - chunkIndex: data.chunkIndex - })), - billId: data.billId, - vectorModel: data.dataset.vectorModel, - agentModel: data.dataset.agentModel, - vlmModel: data.dataset.vlmModel - }); - - // delete data from training - await MongoDatasetTraining.findByIdAndDelete(data._id); - - // Push usage - pushLLMTrainingUsage({ - teamId: data.teamId, - tmbId: data.tmbId, - inputTokens, - outputTokens, - billId: data.billId, - model: modelData.model, - mode: 'qa' - }); - - addLog.info(`[QA Queue] Finish`, { - time: Date.now() - startTime, - splitLength: qaArr.length, - usage - }); - } catch (err: any) { - addLog.error(`[QA Queue] Error`, err); - await MongoDatasetTraining.updateOne( - { - _id: data._id - }, - { - errorMsg: getErrText(err, 'unknown error') - } - ); - - await delay(100); + await delay(100); + } } + } catch (error) { + addLog.error(`[QA Queue] Error`, error); } if (reduceQueue()) { diff --git a/projects/app/src/service/core/dataset/queues/generateVector.ts b/projects/app/src/service/core/dataset/queues/generateVector.ts index 412fd1609..77050c978 100644 --- a/projects/app/src/service/core/dataset/queues/generateVector.ts +++ b/projects/app/src/service/core/dataset/queues/generateVector.ts @@ -42,116 +42,120 @@ export async function generateVector(): Promise { if (global.vectorQueueLen >= max) return; global.vectorQueueLen++; - while (true) { - const start = Date.now(); + try { + while (true) { + const start = Date.now(); - // get training data - const { - data, - done = false, - error = false - } = await (async () => { - try { - const data = await MongoDatasetTraining.findOneAndUpdate( - { - mode: TrainingModeEnum.chunk, - retryCount: { $gt: 0 }, - lockTime: { $lte: addMinutes(new Date(), -3) } - }, - { - lockTime: new Date(), - $inc: { retryCount: -1 } - } - ) - .populate([ + // get training data + const { + data, + done = false, + error = false + } = await (async () => { + try { + const data = await MongoDatasetTraining.findOneAndUpdate( { - path: 'dataset', - select: 'vectorModel' + mode: TrainingModeEnum.chunk, + retryCount: { $gt: 0 }, + lockTime: { $lte: addMinutes(new Date(), -3) } }, { - path: 'collection', - select: 'name indexPrefixTitle' - }, - { - path: 'data', - select: '_id indexes' + lockTime: new Date(), + $inc: { retryCount: -1 } } - ]) - .lean(); + ) + .populate([ + { + path: 'dataset', + select: 'vectorModel' + }, + { + path: 'collection', + select: 'name indexPrefixTitle' + }, + { + path: 'data', + select: '_id indexes' + } + ]) + .lean(); - // task preemption - if (!data) { + // task preemption + if (!data) { + return { + done: true + }; + } return { - done: true + data + }; + } catch (error) { + return { + error: true }; - } - return { - data - }; - } catch (error) { - return { - error: true - }; - } - })(); - - // Break loop - if (done || !data) { - break; - } - if (error) { - addLog.error(`[Vector Queue] Error`, error); - await delay(500); - continue; - } - - if (!data.dataset || !data.collection) { - addLog.info(`[Vector Queue] Dataset or collection not found`, data); - // Delete data - await MongoDatasetTraining.deleteOne({ _id: data._id }); - continue; - } - - // auth balance - if (!(await checkTeamAiPointsAndLock(data.teamId))) { - continue; - } - - addLog.info(`[Vector Queue] Start`); - - try { - const { tokens } = await (async () => { - if (data.dataId) { - return rebuildData({ trainingData: data }); - } else { - return insertData({ trainingData: data }); } })(); - // push usage - pushGenerateVectorUsage({ - teamId: data.teamId, - tmbId: data.tmbId, - inputTokens: tokens, - model: data.dataset.vectorModel, - billId: data.billId - }); + // Break loop + if (done || !data) { + break; + } + if (error) { + addLog.error(`[Vector Queue] Error`, error); + await delay(500); + continue; + } - addLog.info(`[Vector Queue] Finish`, { - time: Date.now() - start - }); - } catch (err: any) { - addLog.error(`[Vector Queue] Error`, err); - await MongoDatasetTraining.updateOne( - { - _id: data._id - }, - { - errorMsg: getErrText(err, 'unknown error') - } - ); - await delay(100); + if (!data.dataset || !data.collection) { + addLog.info(`[Vector Queue] Dataset or collection not found`, data); + // Delete data + await MongoDatasetTraining.deleteOne({ _id: data._id }); + continue; + } + + // auth balance + if (!(await checkTeamAiPointsAndLock(data.teamId))) { + continue; + } + + addLog.info(`[Vector Queue] Start`); + + try { + const { tokens } = await (async () => { + if (data.dataId) { + return rebuildData({ trainingData: data }); + } else { + return insertData({ trainingData: data }); + } + })(); + + // push usage + pushGenerateVectorUsage({ + teamId: data.teamId, + tmbId: data.tmbId, + inputTokens: tokens, + model: data.dataset.vectorModel, + billId: data.billId + }); + + addLog.info(`[Vector Queue] Finish`, { + time: Date.now() - start + }); + } catch (err: any) { + addLog.error(`[Vector Queue] Error`, err); + await MongoDatasetTraining.updateOne( + { + _id: data._id + }, + { + errorMsg: getErrText(err, 'unknown error') + } + ); + await delay(100); + } } + } catch (error) { + addLog.error(`[Vector Queue] Error`, error); } if (reduceQueue()) {