perf: buffer;fix: back up split (#4913)

* perf: buffer

* fix: back up split

* fix: app limit

* doc
This commit is contained in:
Archer
2025-05-28 18:18:25 +08:00
committed by GitHub
parent 802de11363
commit a171c7b11c
11 changed files with 208 additions and 93 deletions

View File

@@ -14,8 +14,10 @@ weight: 789
## ⚙️ 优化 ## ⚙️ 优化
1. 原文缓存改用 gridfs 存储,提高上限。
## 🐛 修复 ## 🐛 修复
1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。 1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。
2. 工具调用节点前,有交互节点时,上下文异常。
3. 修复备份导入,小于 1000 字时,无法分块问题。

View File

@@ -0,0 +1,139 @@
import { retryFn } from '@fastgpt/global/common/system/utils';
import { connectionMongo } from '../../mongo';
import { MongoRawTextBufferSchema, bucketName } from './schema';
import { addLog } from '../../system/log';
const getGridBucket = () => {
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
bucketName: bucketName
});
};
export const addRawTextBuffer = async ({
sourceId,
sourceName,
text,
expiredTime
}: {
sourceId: string;
sourceName: string;
text: string;
expiredTime: Date;
}) => {
const gridBucket = getGridBucket();
const metadata = {
sourceId,
sourceName,
expiredTime
};
const buffer = Buffer.from(text);
const fileSize = buffer.length;
// 单块大小:尽可能大,但不超过 14MB不小于128KB
const chunkSizeBytes = (() => {
// 计算理想块大小:文件大小 ÷ 目标块数(10)。 并且每个块需要小于 14MB
const idealChunkSize = Math.min(Math.ceil(fileSize / 10), 14 * 1024 * 1024);
// 确保块大小至少为128KB
const minChunkSize = 128 * 1024; // 128KB
// 取理想块大小和最小块大小中的较大值
let chunkSize = Math.max(idealChunkSize, minChunkSize);
// 将块大小向上取整到最接近的64KB的倍数使其更整齐
chunkSize = Math.ceil(chunkSize / (64 * 1024)) * (64 * 1024);
return chunkSize;
})();
const uploadStream = gridBucket.openUploadStream(sourceId, {
metadata,
chunkSizeBytes
});
return retryFn(async () => {
return new Promise((resolve, reject) => {
uploadStream.end(buffer);
uploadStream.on('finish', () => {
resolve(uploadStream.id);
});
uploadStream.on('error', (error) => {
addLog.error('addRawTextBuffer error', error);
resolve('');
});
});
});
};
export const getRawTextBuffer = async (sourceId: string) => {
const gridBucket = getGridBucket();
return retryFn(async () => {
const bufferData = await MongoRawTextBufferSchema.findOne(
{
'metadata.sourceId': sourceId
},
'_id metadata'
).lean();
if (!bufferData) {
return null;
}
// Read file content
const downloadStream = gridBucket.openDownloadStream(bufferData._id);
const chunks: Buffer[] = [];
return new Promise<{
text: string;
sourceName: string;
} | null>((resolve, reject) => {
downloadStream.on('data', (chunk) => {
chunks.push(chunk);
});
downloadStream.on('end', () => {
const buffer = Buffer.concat(chunks);
const text = buffer.toString('utf8');
resolve({
text,
sourceName: bufferData.metadata?.sourceName || ''
});
});
downloadStream.on('error', (error) => {
addLog.error('getRawTextBuffer error', error);
resolve(null);
});
});
});
};
export const deleteRawTextBuffer = async (sourceId: string): Promise<boolean> => {
const gridBucket = getGridBucket();
return retryFn(async () => {
const buffer = await MongoRawTextBufferSchema.findOne({ 'metadata.sourceId': sourceId });
if (!buffer) {
return false;
}
await gridBucket.delete(buffer._id);
return true;
});
};
export const updateRawTextBufferExpiredTime = async ({
sourceId,
expiredTime
}: {
sourceId: string;
expiredTime: Date;
}) => {
return retryFn(async () => {
return MongoRawTextBufferSchema.updateOne(
{ 'metadata.sourceId': sourceId },
{ $set: { 'metadata.expiredTime': expiredTime } }
);
});
};

View File

@@ -1,33 +1,22 @@
import { getMongoModel, Schema } from '../../mongo'; import { getMongoModel, type Types, Schema } from '../../mongo';
import { type RawTextBufferSchemaType } from './type';
export const collectionName = 'buffer_rawtexts'; export const bucketName = 'buffer_rawtext';
const RawTextBufferSchema = new Schema({ const RawTextBufferSchema = new Schema({
sourceId: { metadata: {
type: String, sourceId: { type: String, required: true },
required: true sourceName: { type: String, required: true },
}, expiredTime: { type: Date, required: true }
rawText: {
type: String,
default: ''
},
createTime: {
type: Date,
default: () => new Date()
},
metadata: Object
});
try {
RawTextBufferSchema.index({ sourceId: 1 });
// 20 minutes
RawTextBufferSchema.index({ createTime: 1 }, { expireAfterSeconds: 20 * 60 });
} catch (error) {
console.log(error);
} }
});
RawTextBufferSchema.index({ 'metadata.sourceId': 'hashed' });
RawTextBufferSchema.index({ 'metadata.expiredTime': -1 });
export const MongoRawTextBuffer = getMongoModel<RawTextBufferSchemaType>( export const MongoRawTextBufferSchema = getMongoModel<{
collectionName, _id: Types.ObjectId;
RawTextBufferSchema metadata: {
); sourceId: string;
sourceName: string;
expiredTime: Date;
};
}>(`${bucketName}.files`, RawTextBufferSchema);

View File

@@ -1,8 +0,0 @@
export type RawTextBufferSchemaType = {
sourceId: string;
rawText: string;
createTime: Date;
metadata?: {
filename: string;
};
};

View File

@@ -6,13 +6,13 @@ import { type DatasetFileSchema } from '@fastgpt/global/core/dataset/type';
import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema'; import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema';
import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools'; import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools';
import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; import { CommonErrEnum } from '@fastgpt/global/common/error/code/common';
import { MongoRawTextBuffer } from '../../buffer/rawText/schema';
import { readRawContentByFileBuffer } from '../read/utils'; import { readRawContentByFileBuffer } from '../read/utils';
import { gridFsStream2Buffer, stream2Encoding } from './utils'; import { gridFsStream2Buffer, stream2Encoding } from './utils';
import { addLog } from '../../system/log'; import { addLog } from '../../system/log';
import { readFromSecondary } from '../../mongo/utils';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import { Readable } from 'stream'; import { Readable } from 'stream';
import { addRawTextBuffer, getRawTextBuffer } from '../../buffer/rawText/controller';
import { addMinutes } from 'date-fns';
export function getGFSCollection(bucket: `${BucketNameEnum}`) { export function getGFSCollection(bucket: `${BucketNameEnum}`) {
MongoDatasetFileSchema; MongoDatasetFileSchema;
@@ -225,13 +225,11 @@ export const readFileContentFromMongo = async ({
}> => { }> => {
const bufferId = `${fileId}-${customPdfParse}`; const bufferId = `${fileId}-${customPdfParse}`;
// read buffer // read buffer
const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: bufferId }, undefined, { const fileBuffer = await getRawTextBuffer(bufferId);
...readFromSecondary
}).lean();
if (fileBuffer) { if (fileBuffer) {
return { return {
rawText: fileBuffer.rawText, rawText: fileBuffer.text,
filename: fileBuffer.metadata?.filename || '' filename: fileBuffer?.sourceName
}; };
} }
@@ -265,16 +263,13 @@ export const readFileContentFromMongo = async ({
} }
}); });
// < 14M // Add buffer
if (fileBuffers.length < 14 * 1024 * 1024 && rawText.trim()) { addRawTextBuffer({
MongoRawTextBuffer.create({
sourceId: bufferId, sourceId: bufferId,
rawText, sourceName: file.filename,
metadata: { text: rawText,
filename: file.filename expiredTime: addMinutes(new Date(), 20)
}
}); });
}
return { return {
rawText, rawText,

View File

@@ -1,16 +1,16 @@
import { Schema, getMongoModel } from '../../mongo'; import { Schema, getMongoModel } from '../../mongo';
const DatasetFileSchema = new Schema({}); const DatasetFileSchema = new Schema({
const ChatFileSchema = new Schema({}); metadata: Object
});
const ChatFileSchema = new Schema({
metadata: Object
});
try {
DatasetFileSchema.index({ uploadDate: -1 }); DatasetFileSchema.index({ uploadDate: -1 });
ChatFileSchema.index({ uploadDate: -1 }); ChatFileSchema.index({ uploadDate: -1 });
ChatFileSchema.index({ 'metadata.chatId': 1 }); ChatFileSchema.index({ 'metadata.chatId': 1 });
} catch (error) {
console.log(error);
}
export const MongoDatasetFileSchema = getMongoModel('dataset.files', DatasetFileSchema); export const MongoDatasetFileSchema = getMongoModel('dataset.files', DatasetFileSchema);
export const MongoChatFileSchema = getMongoModel('chat.files', ChatFileSchema); export const MongoChatFileSchema = getMongoModel('chat.files', ChatFileSchema);

View File

@@ -77,7 +77,10 @@ export const createCollectionAndInsertData = async ({
const chunkSplitter = computeChunkSplitter(createCollectionParams); const chunkSplitter = computeChunkSplitter(createCollectionParams);
const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams); const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams);
if (trainingType === DatasetCollectionDataProcessModeEnum.qa) { if (
trainingType === DatasetCollectionDataProcessModeEnum.qa ||
trainingType === DatasetCollectionDataProcessModeEnum.backup
) {
delete createCollectionParams.chunkTriggerType; delete createCollectionParams.chunkTriggerType;
delete createCollectionParams.chunkTriggerMinSize; delete createCollectionParams.chunkTriggerMinSize;
delete createCollectionParams.dataEnhanceCollectionName; delete createCollectionParams.dataEnhanceCollectionName;

View File

@@ -218,6 +218,10 @@ export const rawText2Chunks = ({
}; };
}; };
if (backupParse) {
return parseDatasetBackup2Chunks(rawText).chunks;
}
// Chunk condition // Chunk condition
// 1. 选择最大值条件,只有超过了最大值(默认为模型的最大值*0.7),才会触发分块 // 1. 选择最大值条件,只有超过了最大值(默认为模型的最大值*0.7),才会触发分块
if (chunkTriggerType === ChunkTriggerConfigTypeEnum.maxSize) { if (chunkTriggerType === ChunkTriggerConfigTypeEnum.maxSize) {
@@ -240,10 +244,6 @@ export const rawText2Chunks = ({
} }
} }
if (backupParse) {
return parseDatasetBackup2Chunks(rawText).chunks;
}
const { chunks } = splitText2Chunks({ const { chunks } = splitText2Chunks({
text: rawText, text: rawText,
chunkSize, chunkSize,

View File

@@ -5,8 +5,6 @@ import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { type DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type'; import { type DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type';
import axios from 'axios'; import axios from 'axios';
import { serverRequestBaseUrl } from '../../../../common/api/serverRequest'; import { serverRequestBaseUrl } from '../../../../common/api/serverRequest';
import { MongoRawTextBuffer } from '../../../../common/buffer/rawText/schema';
import { readFromSecondary } from '../../../../common/mongo/utils';
import { getErrText } from '@fastgpt/global/common/error/utils'; import { getErrText } from '@fastgpt/global/common/error/utils';
import { detectFileEncoding, parseUrlToFileType } from '@fastgpt/global/common/file/tools'; import { detectFileEncoding, parseUrlToFileType } from '@fastgpt/global/common/file/tools';
import { readRawContentByFileBuffer } from '../../../../common/file/read/utils'; import { readRawContentByFileBuffer } from '../../../../common/file/read/utils';
@@ -14,6 +12,8 @@ import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants';
import { type ChatItemType, type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type'; import { type ChatItemType, type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import { addLog } from '../../../../common/system/log'; import { addLog } from '../../../../common/system/log';
import { addRawTextBuffer, getRawTextBuffer } from '../../../../common/buffer/rawText/controller';
import { addMinutes } from 'date-fns';
type Props = ModuleDispatchProps<{ type Props = ModuleDispatchProps<{
[NodeInputKeyEnum.fileUrlList]: string[]; [NodeInputKeyEnum.fileUrlList]: string[];
@@ -158,14 +158,12 @@ export const getFileContentFromLinks = async ({
parseUrlList parseUrlList
.map(async (url) => { .map(async (url) => {
// Get from buffer // Get from buffer
const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: url }, undefined, { const fileBuffer = await getRawTextBuffer(url);
...readFromSecondary
}).lean();
if (fileBuffer) { if (fileBuffer) {
return formatResponseObject({ return formatResponseObject({
filename: fileBuffer.metadata?.filename || url, filename: fileBuffer.sourceName || url,
url, url,
content: fileBuffer.rawText content: fileBuffer.text
}); });
} }
@@ -220,17 +218,12 @@ export const getFileContentFromLinks = async ({
}); });
// Add to buffer // Add to buffer
try { addRawTextBuffer({
if (buffer.length < 14 * 1024 * 1024 && rawText.trim()) {
MongoRawTextBuffer.create({
sourceId: url, sourceId: url,
rawText, sourceName: filename,
metadata: { text: rawText,
filename: filename expiredTime: addMinutes(new Date(), 20)
}
}); });
}
} catch (error) {}
return formatResponseObject({ filename, url, content: rawText }); return formatResponseObject({ filename, url, content: rawText });
} catch (error) { } catch (error) {

View File

@@ -138,18 +138,20 @@ async function handler(req: ApiRequestProps<ListAppBody>): Promise<AppListItemTy
})(); })();
const limit = (() => { const limit = (() => {
if (getRecentlyChat) return 15; if (getRecentlyChat) return 15;
if (searchKey) return 20; if (searchKey) return 50;
return 1000; return;
})(); })();
const myApps = await MongoApp.find( const myApps = await MongoApp.find(
findAppsQuery, findAppsQuery,
'_id parentId avatar type name intro tmbId updateTime pluginData inheritPermission' '_id parentId avatar type name intro tmbId updateTime pluginData inheritPermission',
{
limit: limit
}
) )
.sort({ .sort({
updateTime: -1 updateTime: -1
}) })
.limit(limit)
.lean(); .lean();
// Add app permission and filter apps by read permission // Add app permission and filter apps by read permission

View File

@@ -4,11 +4,11 @@ import { type FileIdCreateDatasetCollectionParams } from '@fastgpt/global/core/d
import { createCollectionAndInsertData } from '@fastgpt/service/core/dataset/collection/controller'; import { createCollectionAndInsertData } from '@fastgpt/service/core/dataset/collection/controller';
import { DatasetCollectionTypeEnum } from '@fastgpt/global/core/dataset/constants'; import { DatasetCollectionTypeEnum } from '@fastgpt/global/core/dataset/constants';
import { BucketNameEnum } from '@fastgpt/global/common/file/constants'; import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { MongoRawTextBuffer } from '@fastgpt/service/common/buffer/rawText/schema';
import { NextAPI } from '@/service/middleware/entry'; import { NextAPI } from '@/service/middleware/entry';
import { type ApiRequestProps } from '@fastgpt/service/type/next'; import { type ApiRequestProps } from '@fastgpt/service/type/next';
import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; import { WritePermissionVal } from '@fastgpt/global/support/permission/constant';
import { type CreateCollectionResponse } from '@/global/core/dataset/api'; import { type CreateCollectionResponse } from '@/global/core/dataset/api';
import { deleteRawTextBuffer } from '@fastgpt/service/common/buffer/rawText/controller';
async function handler( async function handler(
req: ApiRequestProps<FileIdCreateDatasetCollectionParams> req: ApiRequestProps<FileIdCreateDatasetCollectionParams>
@@ -52,7 +52,7 @@ async function handler(
}); });
// remove buffer // remove buffer
await MongoRawTextBuffer.deleteOne({ sourceId: fileId }); await deleteRawTextBuffer(fileId);
return { return {
collectionId, collectionId,