perf: memory leak (#5370)

* perf: memory leak

* perf: workflow share buffer;Circle checker;Get file from stream

* doc

* remove report.md
This commit is contained in:
Archer
2025-08-03 22:37:45 +08:00
committed by GitHub
parent baf18b14d4
commit 7bcee82f5f
21 changed files with 525 additions and 349 deletions

View File

@@ -23,13 +23,20 @@ import type { RuntimeEdgeItemType, RuntimeNodeItemType } from './type';
export const extractDeepestInteractive = (
interactive: WorkflowInteractiveResponseType
): WorkflowInteractiveResponseType => {
if (
(interactive?.type === 'childrenInteractive' || interactive?.type === 'loopInteractive') &&
interactive.params?.childrenResponse
const MAX_DEPTH = 100;
let current = interactive;
let depth = 0;
while (
depth < MAX_DEPTH &&
(current?.type === 'childrenInteractive' || current?.type === 'loopInteractive') &&
current.params?.childrenResponse
) {
return extractDeepestInteractive(interactive.params.childrenResponse);
current = current.params.childrenResponse;
depth++;
}
return interactive;
return current;
};
export const getMaxHistoryLimitFromNodes = (nodes: StoreNodeItemType[]): number => {
let limit = 10;
@@ -294,22 +301,42 @@ export const checkNodeRunStatus = ({
const commonEdges: RuntimeEdgeItemType[] = [];
const recursiveEdges: RuntimeEdgeItemType[] = [];
const checkIsCircular = (edge: RuntimeEdgeItemType, visited: Set<string>): boolean => {
if (edge.source === currentNode.nodeId) {
return true; // 检测到环,并且环中包含当前节点
}
if (visited.has(edge.source)) {
return false; // 检测到环,但不包含当前节点(子节点成环)
}
visited.add(edge.source);
const checkIsCircular = (startEdge: RuntimeEdgeItemType, initialVisited: string[]): boolean => {
const stack: Array<{ edge: RuntimeEdgeItemType; visited: Set<string> }> = [
{ edge: startEdge, visited: new Set(initialVisited) }
];
// 递归检测后面的 edge如果有其中一个成环则返回 true
const nextEdges = allEdges.filter((item) => item.target === edge.source);
return nextEdges.some((nextEdge) => checkIsCircular(nextEdge, new Set(visited)));
const MAX_DEPTH = 3000;
let iterations = 0;
while (stack.length > 0 && iterations < MAX_DEPTH) {
iterations++;
const { edge, visited } = stack.pop()!;
if (edge.source === currentNode.nodeId) {
return true; // 检测到环,并且环中包含当前节点
}
if (visited.has(edge.source)) {
continue; // 已访问过此节点,跳过(避免子环干扰)
}
const newVisited = new Set(visited);
newVisited.add(edge.source);
// 查找目标节点的 source edges 并加入栈中
const nextEdges = allEdges.filter((item) => item.target === edge.source);
for (const nextEdge of nextEdges) {
stack.push({ edge: nextEdge, visited: newVisited });
}
}
return false;
};
sourceEdges.forEach((edge) => {
if (checkIsCircular(edge, new Set([currentNode.nodeId]))) {
if (checkIsCircular(edge, [currentNode.nodeId])) {
recursiveEdges.push(edge);
} else {
commonEdges.push(edge);

View File

@@ -79,6 +79,8 @@ export async function uploadFile({
.pipe(stream as any)
.on('finish', resolve)
.on('error', reject);
}).finally(() => {
readStream.destroy();
});
return String(stream.id);

View File

@@ -2,6 +2,11 @@ import { isProduction } from '@fastgpt/global/common/system/constants';
import fs from 'fs';
import path from 'path';
export const getFileMaxSize = () => {
const mb = global.feConfigs?.uploadFileMaxSize || 1000;
return mb * 1024 * 1024;
};
export const removeFilesByPaths = (paths: string[]) => {
paths.forEach((path) => {
fs.unlink(path, (err) => {

View File

@@ -13,17 +13,22 @@ export async function connectMongo(db: Mongoose, url: string): Promise<Mongoose>
return db;
}
const RemoveListeners = () => {
db.connection.removeAllListeners('error');
db.connection.removeAllListeners('disconnected');
};
console.log('MongoDB start connect');
try {
// Remove existing listeners to prevent duplicates
db.connection.removeAllListeners('error');
db.connection.removeAllListeners('disconnected');
RemoveListeners();
db.set('strictQuery', 'throw');
db.connection.on('error', async (error) => {
console.log('mongo error', error);
try {
if (db.connection.readyState !== 0) {
RemoveListeners();
await db.disconnect();
await delay(1000);
await connectMongo(db, url);
@@ -34,6 +39,7 @@ export async function connectMongo(db: Mongoose, url: string): Promise<Mongoose>
console.log('mongo disconnected');
try {
if (db.connection.readyState !== 0) {
RemoveListeners();
await db.disconnect();
await delay(1000);
await connectMongo(db, url);

View File

@@ -13,6 +13,9 @@ import { getApiDatasetRequest } from './apiDataset';
import Papa from 'papaparse';
import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type';
import { text2Chunks } from '../../worker/function';
import { addLog } from '../../common/system/log';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { getFileMaxSize } from '../../common/file/utils';
export const readFileRawTextByUrl = async ({
teamId,
@@ -20,7 +23,8 @@ export const readFileRawTextByUrl = async ({
url,
customPdfParse,
getFormatText,
relatedId
relatedId,
maxFileSize = getFileMaxSize()
}: {
teamId: string;
tmbId: string;
@@ -28,30 +32,113 @@ export const readFileRawTextByUrl = async ({
customPdfParse?: boolean;
getFormatText?: boolean;
relatedId: string; // externalFileId / apiFileId
maxFileSize?: number;
}) => {
const extension = parseFileExtensionFromUrl(url);
// Check file size
try {
const headResponse = await axios.head(url, { timeout: 10000 });
const contentLength = parseInt(headResponse.headers['content-length'] || '0');
if (contentLength > 0 && contentLength > maxFileSize) {
return Promise.reject(
`File too large. Size: ${Math.round(contentLength / 1024 / 1024)}MB, Maximum allowed: ${Math.round(maxFileSize / 1024 / 1024)}MB`
);
}
} catch (error) {
addLog.warn('Check file HEAD request failed');
}
// Use stream response type, avoid double memory usage
const response = await axios({
method: 'get',
url: url,
responseType: 'arraybuffer'
});
const extension = parseFileExtensionFromUrl(url);
const buffer = Buffer.from(response.data, 'binary');
const { rawText } = await readRawContentByFileBuffer({
customPdfParse,
getFormatText,
extension,
teamId,
tmbId,
buffer,
encoding: 'utf-8',
metadata: {
relatedId
}
responseType: 'stream',
maxContentLength: maxFileSize,
timeout: 30000
});
return rawText;
// 优化:直接从 stream 转换为 buffer避免 arraybuffer 中间步骤
const chunks: Buffer[] = [];
let totalLength = 0;
return new Promise<string>((resolve, reject) => {
let isAborted = false;
const cleanup = () => {
if (!isAborted) {
isAborted = true;
chunks.length = 0; // 清理内存
response.data.destroy();
}
};
// Stream timeout
const timeoutId = setTimeout(() => {
cleanup();
reject('File download timeout after 30 seconds');
}, 600000);
response.data.on('data', (chunk: Buffer) => {
if (isAborted) return;
totalLength += chunk.length;
if (totalLength > maxFileSize) {
clearTimeout(timeoutId);
cleanup();
return reject(
`File too large. Maximum size allowed is ${Math.round(maxFileSize / 1024 / 1024)}MB.`
);
}
chunks.push(chunk);
});
response.data.on('end', async () => {
if (isAborted) return;
clearTimeout(timeoutId);
try {
// 合并所有 chunks 为单个 buffer
const buffer = Buffer.concat(chunks);
// 立即清理 chunks 数组释放内存
chunks.length = 0;
const { rawText } = await retryFn(() =>
readRawContentByFileBuffer({
customPdfParse,
getFormatText,
extension,
teamId,
tmbId,
buffer,
encoding: 'utf-8',
metadata: {
relatedId
}
})
);
resolve(rawText);
} catch (error) {
cleanup();
reject(error);
}
});
response.data.on('error', (error: Error) => {
clearTimeout(timeoutId);
cleanup();
reject(error);
});
response.data.on('close', () => {
clearTimeout(timeoutId);
cleanup();
});
});
};
/*

View File

@@ -180,6 +180,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
}
let workflowRunTimes = 0;
let streamCheckTimer: NodeJS.Timeout | null = null;
// Init
if (isRootRuntime) {
@@ -198,18 +199,14 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
res.setHeader('Cache-Control', 'no-cache, no-transform');
// 10s sends a message to prevent the browser from thinking that the connection is disconnected
const sendStreamTimerSign = () => {
setTimeout(() => {
props?.workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: ''
})
});
sendStreamTimerSign();
}, 10000);
};
sendStreamTimerSign();
streamCheckTimer = setInterval(() => {
props?.workflowStreamResponse?.({
event: SseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: ''
})
});
}, 10000);
}
// Get default variables
@@ -841,6 +838,10 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
};
} catch (error) {
return Promise.reject(error);
} finally {
if (streamCheckTimer) {
clearInterval(streamCheckTimer);
}
}
}

View File

@@ -37,15 +37,15 @@ export const useDoc2xServer = ({ apiKey }: { apiKey: string }) => {
if (typeof err === 'string') {
return Promise.reject({ message: `[Doc2x] ${err}` });
}
if (typeof err.message === 'string') {
return Promise.reject({ message: `[Doc2x] ${err.message}` });
}
if (typeof err.data === 'string') {
return Promise.reject({ message: `[Doc2x] ${err.data}` });
}
if (err?.response?.data) {
return Promise.reject({ message: `[Doc2x] ${getErrText(err?.response?.data)}` });
}
if (typeof err.message === 'string') {
return Promise.reject({ message: `[Doc2x] ${err.message}` });
}
addLog.error('[Doc2x] Unknown error', err);
return Promise.reject({ message: `[Doc2x] ${getErrText(err)}` });
@@ -78,7 +78,7 @@ export const useDoc2xServer = ({ apiKey }: { apiKey: string }) => {
code,
msg,
data: preupload_data
} = await request<{ uid: string; url: string }>('/v2/parse/preupload', null, 'POST');
} = await request<{ uid: string; url: string }>('/v2/parse/preupload', {}, 'POST');
if (!['ok', 'success'].includes(code)) {
return Promise.reject(`[Doc2x] Failed to get pre-upload URL: ${msg}`);
}
@@ -96,6 +96,7 @@ export const useDoc2xServer = ({ apiKey }: { apiKey: string }) => {
.catch((error) => {
return Promise.reject(`[Doc2x] Failed to upload file: ${getErrText(error)}`);
});
if (response.status !== 200) {
return Promise.reject(
`[Doc2x] Upload failed with status ${response.status}: ${response.statusText}`

View File

@@ -20,5 +20,17 @@ export const readRawContentFromBuffer = (props: {
encoding: string;
buffer: Buffer;
}) => {
return runWorker<ReadFileResponse>(WorkerNameEnum.readFile, props);
const bufferSize = props.buffer.length;
// 使用 SharedArrayBuffer避免数据复制
const sharedBuffer = new SharedArrayBuffer(bufferSize);
const sharedArray = new Uint8Array(sharedBuffer);
sharedArray.set(props.buffer);
return runWorker<ReadFileResponse>(WorkerNameEnum.readFile, {
extension: props.extension,
encoding: props.encoding,
sharedBuffer: sharedBuffer,
bufferSize: bufferSize
});
};

View File

@@ -56,16 +56,16 @@ export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise<Read
}
};
// @ts-ignore
const loadingTask = pdfjs.getDocument(buffer.buffer);
// Create a completely new ArrayBuffer to avoid SharedArrayBuffer transferList issues
const uint8Array = new Uint8Array(buffer.byteLength);
uint8Array.set(new Uint8Array(buffer.buffer, buffer.byteOffset, buffer.byteLength));
const loadingTask = pdfjs.getDocument({ data: uint8Array });
const doc = await loadingTask.promise;
// Avoid OOM.
let result = '';
const pageArr = Array.from({ length: doc.numPages }, (_, i) => i + 1);
for (let i = 0; i < pageArr.length; i++) {
result += await readPDFPage(doc, i + 1);
}
const result = (
await Promise.all(pageArr.map(async (page) => await readPDFPage(doc, page)))
).join('');
loadingTask.destroy();

View File

@@ -9,49 +9,60 @@ import { readXlsxRawText } from './extension/xlsx';
import { readCsvRawText } from './extension/csv';
import { workerResponse } from '../controller';
parentPort?.on('message', async (props: ReadRawTextProps<Uint8Array>) => {
const read = async (params: ReadRawTextByBuffer) => {
switch (params.extension) {
case 'txt':
case 'md':
return readFileRawText(params);
case 'html':
return readHtmlRawText(params);
case 'pdf':
return readPdfFile(params);
case 'docx':
return readDocsFile(params);
case 'pptx':
return readPptxRawText(params);
case 'xlsx':
return readXlsxRawText(params);
case 'csv':
return readCsvRawText(params);
default:
return Promise.reject(
`Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx. "${params.extension}" is not supported.`
);
parentPort?.on(
'message',
async (
props: Omit<ReadRawTextProps<any>, 'buffer'> & {
sharedBuffer: SharedArrayBuffer;
bufferSize: number;
}
};
) => {
const read = async (params: ReadRawTextByBuffer) => {
switch (params.extension) {
case 'txt':
case 'md':
return readFileRawText(params);
case 'html':
return readHtmlRawText(params);
case 'pdf':
return readPdfFile(params);
case 'docx':
return readDocsFile(params);
case 'pptx':
return readPptxRawText(params);
case 'xlsx':
return readXlsxRawText(params);
case 'csv':
return readCsvRawText(params);
default:
return Promise.reject(
`Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx. "${params.extension}" is not supported.`
);
}
};
// params.buffer: Uint8Array -> buffer
const buffer = Buffer.from(props.buffer);
const newProps: ReadRawTextByBuffer = {
...props,
buffer
};
// 使用 SharedArrayBuffer零拷贝共享内存
const sharedArray = new Uint8Array(props.sharedBuffer);
const buffer = Buffer.from(sharedArray.buffer, 0, props.bufferSize);
try {
workerResponse({
parentPort,
status: 'success',
data: await read(newProps)
});
} catch (error) {
workerResponse({
parentPort,
status: 'error',
data: error
});
const newProps: ReadRawTextByBuffer = {
extension: props.extension,
encoding: props.encoding,
buffer
};
try {
workerResponse({
parentPort,
status: 'success',
data: await read(newProps)
});
} catch (error) {
workerResponse({
parentPort,
status: 'error',
data: error
});
}
}
});
);

View File

@@ -198,6 +198,7 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
if (item) {
item.reject?.('error');
clearTimeout(item.timeoutId);
item.worker.removeAllListeners();
item.worker.terminate();
}