diff --git a/docSite/content/zh-cn/docs/development/upgrading/4911.md b/docSite/content/zh-cn/docs/development/upgrading/4911.md index ae2098bfc..179e40761 100644 --- a/docSite/content/zh-cn/docs/development/upgrading/4911.md +++ b/docSite/content/zh-cn/docs/development/upgrading/4911.md @@ -14,8 +14,10 @@ weight: 789 ## ⚙️ 优化 - +1. 原文缓存改用 gridfs 存储,提高上限。 ## 🐛 修复 -1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。 \ No newline at end of file +1. 工作流中,管理员声明的全局系统工具,无法进行版本管理。 +2. 工具调用节点前,有交互节点时,上下文异常。 +3. 修复备份导入,小于 1000 字时,无法分块问题。 \ No newline at end of file diff --git a/packages/service/common/buffer/rawText/controller.ts b/packages/service/common/buffer/rawText/controller.ts new file mode 100644 index 000000000..69289c06b --- /dev/null +++ b/packages/service/common/buffer/rawText/controller.ts @@ -0,0 +1,139 @@ +import { retryFn } from '@fastgpt/global/common/system/utils'; +import { connectionMongo } from '../../mongo'; +import { MongoRawTextBufferSchema, bucketName } from './schema'; +import { addLog } from '../../system/log'; + +const getGridBucket = () => { + return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, { + bucketName: bucketName + }); +}; + +export const addRawTextBuffer = async ({ + sourceId, + sourceName, + text, + expiredTime +}: { + sourceId: string; + sourceName: string; + text: string; + expiredTime: Date; +}) => { + const gridBucket = getGridBucket(); + const metadata = { + sourceId, + sourceName, + expiredTime + }; + + const buffer = Buffer.from(text); + + const fileSize = buffer.length; + // 单块大小:尽可能大,但不超过 14MB,不小于128KB + const chunkSizeBytes = (() => { + // 计算理想块大小:文件大小 ÷ 目标块数(10)。 并且每个块需要小于 14MB + const idealChunkSize = Math.min(Math.ceil(fileSize / 10), 14 * 1024 * 1024); + + // 确保块大小至少为128KB + const minChunkSize = 128 * 1024; // 128KB + + // 取理想块大小和最小块大小中的较大值 + let chunkSize = Math.max(idealChunkSize, minChunkSize); + + // 将块大小向上取整到最接近的64KB的倍数,使其更整齐 + chunkSize = Math.ceil(chunkSize / (64 * 1024)) * (64 * 1024); + + return chunkSize; + })(); + + const uploadStream = gridBucket.openUploadStream(sourceId, { + metadata, + chunkSizeBytes + }); + + return retryFn(async () => { + return new Promise((resolve, reject) => { + uploadStream.end(buffer); + uploadStream.on('finish', () => { + resolve(uploadStream.id); + }); + uploadStream.on('error', (error) => { + addLog.error('addRawTextBuffer error', error); + resolve(''); + }); + }); + }); +}; + +export const getRawTextBuffer = async (sourceId: string) => { + const gridBucket = getGridBucket(); + + return retryFn(async () => { + const bufferData = await MongoRawTextBufferSchema.findOne( + { + 'metadata.sourceId': sourceId + }, + '_id metadata' + ).lean(); + if (!bufferData) { + return null; + } + + // Read file content + const downloadStream = gridBucket.openDownloadStream(bufferData._id); + const chunks: Buffer[] = []; + + return new Promise<{ + text: string; + sourceName: string; + } | null>((resolve, reject) => { + downloadStream.on('data', (chunk) => { + chunks.push(chunk); + }); + + downloadStream.on('end', () => { + const buffer = Buffer.concat(chunks); + const text = buffer.toString('utf8'); + resolve({ + text, + sourceName: bufferData.metadata?.sourceName || '' + }); + }); + + downloadStream.on('error', (error) => { + addLog.error('getRawTextBuffer error', error); + resolve(null); + }); + }); + }); +}; + +export const deleteRawTextBuffer = async (sourceId: string): Promise => { + const gridBucket = getGridBucket(); + + return retryFn(async () => { + const buffer = await MongoRawTextBufferSchema.findOne({ 'metadata.sourceId': sourceId }); + if (!buffer) { + return false; + } + + await gridBucket.delete(buffer._id); + return true; + }); +}; + +export const updateRawTextBufferExpiredTime = async ({ + sourceId, + expiredTime +}: { + sourceId: string; + expiredTime: Date; +}) => { + return retryFn(async () => { + return MongoRawTextBufferSchema.updateOne( + { 'metadata.sourceId': sourceId }, + { $set: { 'metadata.expiredTime': expiredTime } } + ); + }); +}; diff --git a/packages/service/common/buffer/rawText/schema.ts b/packages/service/common/buffer/rawText/schema.ts index 74c4b100c..f6e9ea580 100644 --- a/packages/service/common/buffer/rawText/schema.ts +++ b/packages/service/common/buffer/rawText/schema.ts @@ -1,33 +1,22 @@ -import { getMongoModel, Schema } from '../../mongo'; -import { type RawTextBufferSchemaType } from './type'; +import { getMongoModel, type Types, Schema } from '../../mongo'; -export const collectionName = 'buffer_rawtexts'; +export const bucketName = 'buffer_rawtext'; const RawTextBufferSchema = new Schema({ - sourceId: { - type: String, - required: true - }, - rawText: { - type: String, - default: '' - }, - createTime: { - type: Date, - default: () => new Date() - }, - metadata: Object + metadata: { + sourceId: { type: String, required: true }, + sourceName: { type: String, required: true }, + expiredTime: { type: Date, required: true } + } }); +RawTextBufferSchema.index({ 'metadata.sourceId': 'hashed' }); +RawTextBufferSchema.index({ 'metadata.expiredTime': -1 }); -try { - RawTextBufferSchema.index({ sourceId: 1 }); - // 20 minutes - RawTextBufferSchema.index({ createTime: 1 }, { expireAfterSeconds: 20 * 60 }); -} catch (error) { - console.log(error); -} - -export const MongoRawTextBuffer = getMongoModel( - collectionName, - RawTextBufferSchema -); +export const MongoRawTextBufferSchema = getMongoModel<{ + _id: Types.ObjectId; + metadata: { + sourceId: string; + sourceName: string; + expiredTime: Date; + }; +}>(`${bucketName}.files`, RawTextBufferSchema); diff --git a/packages/service/common/buffer/rawText/type.d.ts b/packages/service/common/buffer/rawText/type.d.ts deleted file mode 100644 index 43a793adc..000000000 --- a/packages/service/common/buffer/rawText/type.d.ts +++ /dev/null @@ -1,8 +0,0 @@ -export type RawTextBufferSchemaType = { - sourceId: string; - rawText: string; - createTime: Date; - metadata?: { - filename: string; - }; -}; diff --git a/packages/service/common/file/gridfs/controller.ts b/packages/service/common/file/gridfs/controller.ts index c1d183e68..c85e8474f 100644 --- a/packages/service/common/file/gridfs/controller.ts +++ b/packages/service/common/file/gridfs/controller.ts @@ -6,13 +6,13 @@ import { type DatasetFileSchema } from '@fastgpt/global/core/dataset/type'; import { MongoChatFileSchema, MongoDatasetFileSchema } from './schema'; import { detectFileEncoding, detectFileEncodingByPath } from '@fastgpt/global/common/file/tools'; import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; -import { MongoRawTextBuffer } from '../../buffer/rawText/schema'; import { readRawContentByFileBuffer } from '../read/utils'; import { gridFsStream2Buffer, stream2Encoding } from './utils'; import { addLog } from '../../system/log'; -import { readFromSecondary } from '../../mongo/utils'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { Readable } from 'stream'; +import { addRawTextBuffer, getRawTextBuffer } from '../../buffer/rawText/controller'; +import { addMinutes } from 'date-fns'; export function getGFSCollection(bucket: `${BucketNameEnum}`) { MongoDatasetFileSchema; @@ -225,13 +225,11 @@ export const readFileContentFromMongo = async ({ }> => { const bufferId = `${fileId}-${customPdfParse}`; // read buffer - const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: bufferId }, undefined, { - ...readFromSecondary - }).lean(); + const fileBuffer = await getRawTextBuffer(bufferId); if (fileBuffer) { return { - rawText: fileBuffer.rawText, - filename: fileBuffer.metadata?.filename || '' + rawText: fileBuffer.text, + filename: fileBuffer?.sourceName }; } @@ -265,16 +263,13 @@ export const readFileContentFromMongo = async ({ } }); - // < 14M - if (fileBuffers.length < 14 * 1024 * 1024 && rawText.trim()) { - MongoRawTextBuffer.create({ - sourceId: bufferId, - rawText, - metadata: { - filename: file.filename - } - }); - } + // Add buffer + addRawTextBuffer({ + sourceId: bufferId, + sourceName: file.filename, + text: rawText, + expiredTime: addMinutes(new Date(), 20) + }); return { rawText, diff --git a/packages/service/common/file/gridfs/schema.ts b/packages/service/common/file/gridfs/schema.ts index d6670a8a9..61b98ea96 100644 --- a/packages/service/common/file/gridfs/schema.ts +++ b/packages/service/common/file/gridfs/schema.ts @@ -1,16 +1,16 @@ import { Schema, getMongoModel } from '../../mongo'; -const DatasetFileSchema = new Schema({}); -const ChatFileSchema = new Schema({}); +const DatasetFileSchema = new Schema({ + metadata: Object +}); +const ChatFileSchema = new Schema({ + metadata: Object +}); -try { - DatasetFileSchema.index({ uploadDate: -1 }); +DatasetFileSchema.index({ uploadDate: -1 }); - ChatFileSchema.index({ uploadDate: -1 }); - ChatFileSchema.index({ 'metadata.chatId': 1 }); -} catch (error) { - console.log(error); -} +ChatFileSchema.index({ uploadDate: -1 }); +ChatFileSchema.index({ 'metadata.chatId': 1 }); export const MongoDatasetFileSchema = getMongoModel('dataset.files', DatasetFileSchema); export const MongoChatFileSchema = getMongoModel('chat.files', ChatFileSchema); diff --git a/packages/service/core/dataset/collection/controller.ts b/packages/service/core/dataset/collection/controller.ts index 6ab2de71f..c734e8896 100644 --- a/packages/service/core/dataset/collection/controller.ts +++ b/packages/service/core/dataset/collection/controller.ts @@ -77,7 +77,10 @@ export const createCollectionAndInsertData = async ({ const chunkSplitter = computeChunkSplitter(createCollectionParams); const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams); - if (trainingType === DatasetCollectionDataProcessModeEnum.qa) { + if ( + trainingType === DatasetCollectionDataProcessModeEnum.qa || + trainingType === DatasetCollectionDataProcessModeEnum.backup + ) { delete createCollectionParams.chunkTriggerType; delete createCollectionParams.chunkTriggerMinSize; delete createCollectionParams.dataEnhanceCollectionName; diff --git a/packages/service/core/dataset/read.ts b/packages/service/core/dataset/read.ts index 4e530d735..647c05758 100644 --- a/packages/service/core/dataset/read.ts +++ b/packages/service/core/dataset/read.ts @@ -218,6 +218,10 @@ export const rawText2Chunks = ({ }; }; + if (backupParse) { + return parseDatasetBackup2Chunks(rawText).chunks; + } + // Chunk condition // 1. 选择最大值条件,只有超过了最大值(默认为模型的最大值*0.7),才会触发分块 if (chunkTriggerType === ChunkTriggerConfigTypeEnum.maxSize) { @@ -240,10 +244,6 @@ export const rawText2Chunks = ({ } } - if (backupParse) { - return parseDatasetBackup2Chunks(rawText).chunks; - } - const { chunks } = splitText2Chunks({ text: rawText, chunkSize, diff --git a/packages/service/core/workflow/dispatch/tools/readFiles.ts b/packages/service/core/workflow/dispatch/tools/readFiles.ts index f8a80efa7..a79405040 100644 --- a/packages/service/core/workflow/dispatch/tools/readFiles.ts +++ b/packages/service/core/workflow/dispatch/tools/readFiles.ts @@ -5,8 +5,6 @@ import { NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants'; import { type DispatchNodeResultType } from '@fastgpt/global/core/workflow/runtime/type'; import axios from 'axios'; import { serverRequestBaseUrl } from '../../../../common/api/serverRequest'; -import { MongoRawTextBuffer } from '../../../../common/buffer/rawText/schema'; -import { readFromSecondary } from '../../../../common/mongo/utils'; import { getErrText } from '@fastgpt/global/common/error/utils'; import { detectFileEncoding, parseUrlToFileType } from '@fastgpt/global/common/file/tools'; import { readRawContentByFileBuffer } from '../../../../common/file/read/utils'; @@ -14,6 +12,8 @@ import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants'; import { type ChatItemType, type UserChatItemValueItemType } from '@fastgpt/global/core/chat/type'; import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools'; import { addLog } from '../../../../common/system/log'; +import { addRawTextBuffer, getRawTextBuffer } from '../../../../common/buffer/rawText/controller'; +import { addMinutes } from 'date-fns'; type Props = ModuleDispatchProps<{ [NodeInputKeyEnum.fileUrlList]: string[]; @@ -158,14 +158,12 @@ export const getFileContentFromLinks = async ({ parseUrlList .map(async (url) => { // Get from buffer - const fileBuffer = await MongoRawTextBuffer.findOne({ sourceId: url }, undefined, { - ...readFromSecondary - }).lean(); + const fileBuffer = await getRawTextBuffer(url); if (fileBuffer) { return formatResponseObject({ - filename: fileBuffer.metadata?.filename || url, + filename: fileBuffer.sourceName || url, url, - content: fileBuffer.rawText + content: fileBuffer.text }); } @@ -220,17 +218,12 @@ export const getFileContentFromLinks = async ({ }); // Add to buffer - try { - if (buffer.length < 14 * 1024 * 1024 && rawText.trim()) { - MongoRawTextBuffer.create({ - sourceId: url, - rawText, - metadata: { - filename: filename - } - }); - } - } catch (error) {} + addRawTextBuffer({ + sourceId: url, + sourceName: filename, + text: rawText, + expiredTime: addMinutes(new Date(), 20) + }); return formatResponseObject({ filename, url, content: rawText }); } catch (error) { diff --git a/projects/app/src/pages/api/core/app/list.ts b/projects/app/src/pages/api/core/app/list.ts index 99e31d9d1..8b1ca35b2 100644 --- a/projects/app/src/pages/api/core/app/list.ts +++ b/projects/app/src/pages/api/core/app/list.ts @@ -138,18 +138,20 @@ async function handler(req: ApiRequestProps): Promise { if (getRecentlyChat) return 15; - if (searchKey) return 20; - return 1000; + if (searchKey) return 50; + return; })(); const myApps = await MongoApp.find( findAppsQuery, - '_id parentId avatar type name intro tmbId updateTime pluginData inheritPermission' + '_id parentId avatar type name intro tmbId updateTime pluginData inheritPermission', + { + limit: limit + } ) .sort({ updateTime: -1 }) - .limit(limit) .lean(); // Add app permission and filter apps by read permission diff --git a/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts b/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts index a6bfae9f4..20ecb1515 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/fileId.ts @@ -4,11 +4,11 @@ import { type FileIdCreateDatasetCollectionParams } from '@fastgpt/global/core/d import { createCollectionAndInsertData } from '@fastgpt/service/core/dataset/collection/controller'; import { DatasetCollectionTypeEnum } from '@fastgpt/global/core/dataset/constants'; import { BucketNameEnum } from '@fastgpt/global/common/file/constants'; -import { MongoRawTextBuffer } from '@fastgpt/service/common/buffer/rawText/schema'; import { NextAPI } from '@/service/middleware/entry'; import { type ApiRequestProps } from '@fastgpt/service/type/next'; import { WritePermissionVal } from '@fastgpt/global/support/permission/constant'; import { type CreateCollectionResponse } from '@/global/core/dataset/api'; +import { deleteRawTextBuffer } from '@fastgpt/service/common/buffer/rawText/controller'; async function handler( req: ApiRequestProps @@ -52,7 +52,7 @@ async function handler( }); // remove buffer - await MongoRawTextBuffer.deleteOne({ sourceId: fileId }); + await deleteRawTextBuffer(fileId); return { collectionId,