V4.9.11 feature (#4969)

* Feat: Images dataset collection (#4941)

* New pic (#4858)

* 更新数据集相关类型,添加图像文件ID和预览URL支持;优化数据集导入功能,新增图像数据集处理组件;修复部分国际化文本;更新文件上传逻辑以支持新功能。

* 与原先代码的差别

* 新增 V4.9.10 更新说明,支持 PG 设置`systemEnv.hnswMaxScanTuples`参数,优化 LLM stream 调用超时,修复全文检索多知识库排序问题。同时更新数据集索引,移除 datasetId 字段以简化查询。

* 更换成fileId_image逻辑,并增加训练队列匹配的逻辑

* 新增图片集合判断逻辑,优化预览URL生成流程,确保仅在数据集为图片集合时生成预览URL,并添加相关日志输出以便调试。

* Refactor Docker Compose configuration to comment out exposed ports for production environments, update image versions for pgvector, fastgpt, and mcp_server, and enhance Redis service with a health check. Additionally, standardize dataset collection labels in constants and improve internationalization strings across multiple languages.

* Enhance TrainingStates component by adding internationalization support for the imageParse training mode and update defaultCounts to include imageParse mode in trainingDetail API.

* Enhance dataset import context by adding additional steps for image dataset import process and improve internationalization strings for modal buttons in the useEditTitle hook.

* Update DatasetImportContext to conditionally render MyStep component based on data source type, improving the import process for non-image datasets.

* Refactor image dataset handling by improving internationalization strings, enhancing error messages, and streamlining the preview URL generation process.

* 图片上传到新建的 dataset_collection_images 表,逻辑跟随更改

* 修改了除了controller的其他部分问题

* 把图片数据集的逻辑整合到controller里面

* 补充i18n

* 补充i18n

* resolve评论:主要是上传逻辑的更改和组件复用

* 图片名称的图标显示

* 修改编译报错的命名问题

* 删除不需要的collectionid部分

* 多余文件的处理和改动一个删除按钮

* 除了loading和统一的imageId,其他都resolve掉的

* 处理图标报错

* 复用了MyPhotoView并采用全部替换的方式将imageFileId变成imageId

* 去除不必要文件修改

* 报错和字段修改

* 增加上传成功后删除临时文件的逻辑以及回退一些修改

* 删除path字段,将图片保存到gridfs内,并修改增删等操作的代码

* 修正编译错误

---------

Co-authored-by: archer <545436317@qq.com>

* perf: image dataset

* feat: insert image

* perf: image icon

* fix: training state

---------

Co-authored-by: Zhuangzai fa <143257420+ctrlz526@users.noreply.github.com>

* fix: ts (#4948)

* Thirddatasetmd (#4942)

* add thirddataset.md

* fix thirddataset.md

* fix

* delete wrong png

---------

Co-authored-by: dreamer6680 <146868355@qq.com>

* perf: api dataset code

* perf: log

* add secondary.tsx (#4946)

* add secondary.tsx

* fix

---------

Co-authored-by: dreamer6680 <146868355@qq.com>

* perf: multiple menu

* perf: i18n

* feat: parse queue (#4960)

* feat: parse queue

* feat: sync parse queue

* fix thirddataset.md (#4962)

* fix thirddataset-4.png (#4963)

* feat: Dataset template import (#4934)

* 模版导入部分除了文档还没写

* 修复模版导入的 build 错误

* Document production

* compress pictures

* Change some constants to variables

---------

Co-authored-by: Archer <545436317@qq.com>

* perf: template import

* doc

* llm pargraph

* bocha tool

* fix: del collection

---------

Co-authored-by: Zhuangzai fa <143257420+ctrlz526@users.noreply.github.com>
Co-authored-by: dreamer6680 <1468683855@qq.com>
Co-authored-by: dreamer6680 <146868355@qq.com>
This commit is contained in:
Archer
2025-06-06 14:48:44 +08:00
committed by GitHub
parent bb810a43a1
commit c30f069f2f
198 changed files with 4934 additions and 2290 deletions

View File

@@ -3,14 +3,15 @@ import type {
ApiFileReadContentResponse,
APIFileReadResponse,
ApiDatasetDetailResponse,
APIFileServer,
APIFileItem
} from '@fastgpt/global/core/dataset/apiDataset';
APIFileServer
} from '@fastgpt/global/core/dataset/apiDataset/type';
import axios, { type Method } from 'axios';
import { addLog } from '../../../common/system/log';
import { readFileRawTextByUrl } from '../read';
import { addLog } from '../../../../common/system/log';
import { readFileRawTextByUrl } from '../../read';
import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type';
import { type RequireOnlyOne } from '@fastgpt/global/common/type/utils';
import { addRawTextBuffer, getRawTextBuffer } from '../../../../common/buffer/rawText/controller';
import { addMinutes } from 'date-fns';
type ResponseDataType = {
success: boolean;
@@ -141,6 +142,15 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer }
};
}
if (previewUrl) {
// Get from buffer
const buffer = await getRawTextBuffer(previewUrl);
if (buffer) {
return {
title,
rawText: buffer.text
};
}
const rawText = await readFileRawTextByUrl({
teamId,
tmbId,
@@ -149,6 +159,14 @@ export const useApiDatasetRequest = ({ apiServer }: { apiServer: APIFileServer }
customPdfParse,
getFormatText: true
});
await addRawTextBuffer({
sourceId: previewUrl,
sourceName: title || '',
text: rawText,
expiredTime: addMinutes(new Date(), 30)
});
return {
title,
rawText

View File

@@ -3,10 +3,10 @@ import type {
ApiFileReadContentResponse,
ApiDatasetDetailResponse,
FeishuServer
} from '@fastgpt/global/core/dataset/apiDataset';
} from '@fastgpt/global/core/dataset/apiDataset/type';
import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type';
import axios, { type Method } from 'axios';
import { addLog } from '../../../common/system/log';
import { addLog } from '../../../../common/system/log';
type ResponseDataType = {
success: boolean;

View File

@@ -1,18 +1,10 @@
import type {
APIFileServer,
YuqueServer,
FeishuServer
} from '@fastgpt/global/core/dataset/apiDataset';
import { useApiDatasetRequest } from './api';
import { useYuqueDatasetRequest } from '../yuqueDataset/api';
import { useFeishuDatasetRequest } from '../feishuDataset/api';
import { useApiDatasetRequest } from './custom/api';
import { useYuqueDatasetRequest } from './yuqueDataset/api';
import { useFeishuDatasetRequest } from './feishuDataset/api';
import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type';
export const getApiDatasetRequest = async (data: {
apiServer?: APIFileServer;
yuqueServer?: YuqueServer;
feishuServer?: FeishuServer;
}) => {
const { apiServer, yuqueServer, feishuServer } = data;
export const getApiDatasetRequest = async (apiDatasetServer?: ApiDatasetServerType) => {
const { apiServer, yuqueServer, feishuServer } = apiDatasetServer || {};
if (apiServer) {
return useApiDatasetRequest({ apiServer });

View File

@@ -3,9 +3,9 @@ import type {
ApiFileReadContentResponse,
YuqueServer,
ApiDatasetDetailResponse
} from '@fastgpt/global/core/dataset/apiDataset';
} from '@fastgpt/global/core/dataset/apiDataset/type';
import axios, { type Method } from 'axios';
import { addLog } from '../../../common/system/log';
import { addLog } from '../../../../common/system/log';
import { type ParentIdType } from '@fastgpt/global/common/parentFolder/type';
type ResponseDataType = {

View File

@@ -5,9 +5,9 @@ import {
} from '@fastgpt/global/core/dataset/constants';
import type { CreateDatasetCollectionParams } from '@fastgpt/global/core/dataset/api.d';
import { MongoDatasetCollection } from './schema';
import {
type DatasetCollectionSchemaType,
type DatasetSchemaType
import type {
DatasetCollectionSchemaType,
DatasetSchemaType
} from '@fastgpt/global/core/dataset/type';
import { MongoDatasetTraining } from '../training/schema';
import { MongoDatasetData } from '../data/schema';
@@ -15,7 +15,7 @@ import { delImgByRelatedId } from '../../../common/file/image/controller';
import { deleteDatasetDataVector } from '../../../common/vectorDB/controller';
import { delFileByFileIdList } from '../../../common/file/gridfs/controller';
import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { type ClientSession } from '../../../common/mongo';
import type { ClientSession } from '../../../common/mongo';
import { createOrGetCollectionTags } from './utils';
import { rawText2Chunks } from '../read';
import { checkDatasetLimit } from '../../../support/permission/teamLimit';
@@ -24,7 +24,7 @@ import { mongoSessionRun } from '../../../common/mongo/sessionRun';
import { createTrainingUsage } from '../../../support/wallet/usage/controller';
import { UsageSourceEnum } from '@fastgpt/global/support/wallet/usage/constants';
import { getLLMModel, getEmbeddingModel, getVlmModel } from '../../ai/model';
import { pushDataListToTrainingQueue } from '../training/controller';
import { pushDataListToTrainingQueue, pushDatasetToParseQueue } from '../training/controller';
import { MongoImage } from '../../../common/file/image/schema';
import { hashStr } from '@fastgpt/global/common/string/tools';
import { addDays } from 'date-fns';
@@ -35,23 +35,28 @@ import {
computeChunkSize,
computeChunkSplitter,
computeParagraphChunkDeep,
getAutoIndexSize,
getLLMMaxChunkSize
} from '@fastgpt/global/core/dataset/training/utils';
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/constants';
import { clearCollectionImages, removeDatasetImageExpiredTime } from '../image/utils';
export const createCollectionAndInsertData = async ({
dataset,
rawText,
relatedId,
imageIds,
createCollectionParams,
backupParse = false,
billId,
session
}: {
dataset: DatasetSchemaType;
rawText: string;
rawText?: string;
relatedId?: string;
imageIds?: string[];
createCollectionParams: CreateOneCollectionParams;
backupParse?: boolean;
billId?: string;
@@ -69,13 +74,13 @@ export const createCollectionAndInsertData = async ({
// Set default params
const trainingType =
createCollectionParams.trainingType || DatasetCollectionDataProcessModeEnum.chunk;
const chunkSize = computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
});
const chunkSplitter = computeChunkSplitter(createCollectionParams);
const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams);
const trainingMode = getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
});
if (
trainingType === DatasetCollectionDataProcessModeEnum.qa ||
@@ -90,44 +95,85 @@ export const createCollectionAndInsertData = async ({
delete createCollectionParams.qaPrompt;
}
// 1. split chunks
const chunks = rawText2Chunks({
rawText,
chunkTriggerType: createCollectionParams.chunkTriggerType,
chunkTriggerMinSize: createCollectionParams.chunkTriggerMinSize,
// 1. split chunks or create image chunks
const {
chunks,
chunkSize,
paragraphChunkDeep,
paragraphChunkMinSize: createCollectionParams.paragraphChunkMinSize,
maxSize: getLLMMaxChunkSize(getLLMModel(dataset.agentModel)),
overlapRatio: trainingType === DatasetCollectionDataProcessModeEnum.chunk ? 0.2 : 0,
customReg: chunkSplitter ? [chunkSplitter] : [],
backupParse
});
indexSize
}: {
chunks: Array<{
q?: string;
a?: string; // answer or custom content
imageId?: string;
indexes?: string[];
}>;
chunkSize?: number;
indexSize?: number;
} = (() => {
if (rawText) {
const chunkSize = computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
});
// Process text chunks
const chunks = rawText2Chunks({
rawText,
chunkTriggerType: createCollectionParams.chunkTriggerType,
chunkTriggerMinSize: createCollectionParams.chunkTriggerMinSize,
chunkSize,
paragraphChunkDeep,
paragraphChunkMinSize: createCollectionParams.paragraphChunkMinSize,
maxSize: getLLMMaxChunkSize(getLLMModel(dataset.agentModel)),
overlapRatio: trainingType === DatasetCollectionDataProcessModeEnum.chunk ? 0.2 : 0,
customReg: chunkSplitter ? [chunkSplitter] : [],
backupParse
});
return {
chunks,
chunkSize,
indexSize: createCollectionParams.indexSize ?? getAutoIndexSize(dataset.vectorModel)
};
}
if (imageIds) {
// Process image chunks
const chunks = imageIds.map((imageId: string) => ({
imageId,
indexes: []
}));
return { chunks };
}
return {
chunks: [],
chunkSize: computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
}),
indexSize: createCollectionParams.indexSize ?? getAutoIndexSize(dataset.vectorModel)
};
})();
// 2. auth limit
await checkDatasetLimit({
teamId,
insertLen: predictDataLimitLength(
getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
}),
chunks
)
insertLen: predictDataLimitLength(trainingMode, chunks)
});
const fn = async (session: ClientSession) => {
// 3. create collection
// 3. Create collection
const { _id: collectionId } = await createOneCollection({
...createCollectionParams,
trainingType,
paragraphChunkDeep,
chunkSize,
chunkSplitter,
indexSize,
hashRawText: hashStr(rawText),
rawTextLength: rawText.length,
hashRawText: rawText ? hashStr(rawText) : undefined,
rawTextLength: rawText?.length,
nextSyncTime: (() => {
// ignore auto collections sync for website datasets
if (!dataset.autoSync && dataset.type === DatasetTypeEnum.websiteDataset) return undefined;
@@ -160,34 +206,51 @@ export const createCollectionAndInsertData = async ({
})();
// 5. insert to training queue
const insertResults = await pushDataListToTrainingQueue({
teamId,
tmbId,
datasetId: dataset._id,
const insertResults = await (async () => {
if (rawText || imageIds) {
return pushDataListToTrainingQueue({
teamId,
tmbId,
datasetId: dataset._id,
collectionId,
agentModel: dataset.agentModel,
vectorModel: dataset.vectorModel,
vlmModel: dataset.vlmModel,
indexSize,
mode: trainingMode,
prompt: createCollectionParams.qaPrompt,
billId: traingBillId,
data: chunks.map((item, index) => ({
...item,
indexes: item.indexes?.map((text) => ({
type: DatasetDataIndexTypeEnum.custom,
text
})),
chunkIndex: index
})),
session
});
} else {
await pushDatasetToParseQueue({
teamId,
tmbId,
datasetId: dataset._id,
collectionId,
billId: traingBillId,
session
});
return {
insertLen: 0
};
}
})();
// 6. Remove images ttl index
await removeDatasetImageExpiredTime({
ids: imageIds,
collectionId,
agentModel: dataset.agentModel,
vectorModel: dataset.vectorModel,
vlmModel: dataset.vlmModel,
indexSize: createCollectionParams.indexSize,
mode: getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
}),
prompt: createCollectionParams.qaPrompt,
billId: traingBillId,
data: chunks.map((item, index) => ({
...item,
indexes: item.indexes?.map((text) => ({
type: DatasetDataIndexTypeEnum.custom,
text
})),
chunkIndex: index
})),
session
});
// 6. remove related image ttl
if (relatedId) {
await MongoImage.updateMany(
{
@@ -207,7 +270,7 @@ export const createCollectionAndInsertData = async ({
}
return {
collectionId,
collectionId: String(collectionId),
insertResults
};
};
@@ -244,9 +307,9 @@ export async function createOneCollection({ session, ...props }: CreateOneCollec
[
{
...props,
teamId,
_id: undefined,
parentId: parentId || null,
datasetId,
tags: collectionTags,
@@ -288,17 +351,20 @@ export const delCollectionRelatedSource = async ({
.map((item) => item?.metadata?.relatedImgId || '')
.filter(Boolean);
// Delete files
await delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList
});
// Delete images
await delImgByRelatedId({
teamId,
relateIds: relatedImageIds,
session
});
// Delete files and images in parallel
await Promise.all([
// Delete files
delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList
}),
// Delete images
delImgByRelatedId({
teamId,
relateIds: relatedImageIds,
session
})
]);
};
/**
* delete collection and it related data
@@ -343,6 +409,9 @@ export async function delCollection({
datasetId: { $in: datasetIds },
collectionId: { $in: collectionIds }
}),
// Delete dataset_images
clearCollectionImages(collectionIds),
// Delete images if needed
...(delImg
? [
delImgByRelatedId({
@@ -353,6 +422,7 @@ export async function delCollection({
})
]
: []),
// Delete files if needed
...(delFile
? [
delFileByFileIdList({

View File

@@ -1,11 +1,9 @@
import { MongoDatasetCollection } from './schema';
import { type ClientSession } from '../../../common/mongo';
import type { ClientSession } from '../../../common/mongo';
import { MongoDatasetCollectionTags } from '../tag/schema';
import { readFromSecondary } from '../../../common/mongo/utils';
import {
type CollectionWithDatasetType,
type DatasetCollectionSchemaType
} from '@fastgpt/global/core/dataset/type';
import type { CollectionWithDatasetType } from '@fastgpt/global/core/dataset/type';
import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type';
import {
DatasetCollectionDataProcessModeEnum,
DatasetCollectionSyncResultEnum,
@@ -159,9 +157,7 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
return {
type: DatasetSourceReadTypeEnum.apiFile,
sourceId,
apiServer: dataset.apiServer,
feishuServer: dataset.feishuServer,
yuqueServer: dataset.yuqueServer
apiDatasetServer: dataset.apiDatasetServer
};
})();
@@ -196,31 +192,8 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
dataset,
rawText: rawText,
createCollectionParams: {
teamId: collection.teamId,
tmbId: collection.tmbId,
...collection,
name: title || collection.name,
datasetId: collection.datasetId,
parentId: collection.parentId,
type: collection.type,
trainingType: collection.trainingType,
chunkSize: collection.chunkSize,
chunkSplitter: collection.chunkSplitter,
qaPrompt: collection.qaPrompt,
fileId: collection.fileId,
rawLink: collection.rawLink,
externalFileId: collection.externalFileId,
externalFileUrl: collection.externalFileUrl,
apiFileId: collection.apiFileId,
hashRawText,
rawTextLength: rawText.length,
metadata: collection.metadata,
tags: collection.tags,
createTime: collection.createTime,
updateTime: new Date()
}
});
@@ -233,18 +206,37 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
QA: 独立进程
Chunk: Image Index -> Auto index -> chunk index
*/
export const getTrainingModeByCollection = (collection: {
trainingType: DatasetCollectionSchemaType['trainingType'];
autoIndexes?: DatasetCollectionSchemaType['autoIndexes'];
imageIndex?: DatasetCollectionSchemaType['imageIndex'];
export const getTrainingModeByCollection = ({
trainingType,
autoIndexes,
imageIndex
}: {
trainingType: DatasetCollectionDataProcessModeEnum;
autoIndexes?: boolean;
imageIndex?: boolean;
}) => {
if (collection.trainingType === DatasetCollectionDataProcessModeEnum.qa) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.imageParse &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.imageParse;
}
if (trainingType === DatasetCollectionDataProcessModeEnum.qa) {
return TrainingModeEnum.qa;
}
if (collection.imageIndex && global.feConfigs?.isPlus) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.chunk &&
imageIndex &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.image;
}
if (collection.autoIndexes && global.feConfigs?.isPlus) {
if (
trainingType === DatasetCollectionDataProcessModeEnum.chunk &&
autoIndexes &&
global.feConfigs?.isPlus
) {
return TrainingModeEnum.auto;
}
return TrainingModeEnum.chunk;

View File

@@ -9,6 +9,7 @@ import { deleteDatasetDataVector } from '../../common/vectorDB/controller';
import { MongoDatasetDataText } from './data/dataTextSchema';
import { DatasetErrEnum } from '@fastgpt/global/common/error/code/dataset';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { clearDatasetImages } from './image/utils';
/* ============= dataset ========== */
/* find all datasetId by top datasetId */
@@ -102,8 +103,10 @@ export async function delDatasetRelevantData({
}),
//delete dataset_datas
MongoDatasetData.deleteMany({ teamId, datasetId: { $in: datasetIds } }),
// Delete Image and file
// Delete collection image and file
delCollectionRelatedSource({ collections }),
// Delete dataset Image
clearDatasetImages(datasetIds),
// Delete vector data
deleteDatasetDataVector({ teamId, datasetIds })
]);

View File

@@ -0,0 +1,56 @@
import { getDatasetImagePreviewUrl } from '../image/utils';
import type { DatasetCiteItemType, DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type';
export const formatDatasetDataValue = ({
q,
a,
imageId,
teamId,
datasetId
}: {
q: string;
a?: string;
imageId?: string;
teamId: string;
datasetId: string;
}): {
q: string;
a?: string;
imagePreivewUrl?: string;
} => {
if (!imageId) {
return {
q,
a
};
}
const previewUrl = getDatasetImagePreviewUrl({
imageId,
teamId,
datasetId,
expiredMinutes: 60 * 24 * 7 // 7 days
});
return {
q: `![${q.replaceAll('\n', '\\n')}](${previewUrl})`,
a,
imagePreivewUrl: previewUrl
};
};
export const getFormatDatasetCiteList = (list: DatasetDataSchemaType[]) => {
return list.map<DatasetCiteItemType>((item) => ({
_id: item._id,
...formatDatasetDataValue({
teamId: item.teamId,
datasetId: item.datasetId,
q: item.q,
a: item.a,
imageId: item.imageId
}),
history: item.history,
updateTime: item.updateTime,
index: item.chunkIndex
}));
};

View File

@@ -37,8 +37,7 @@ const DatasetDataSchema = new Schema({
required: true
},
a: {
type: String,
default: ''
type: String
},
history: {
type: [
@@ -74,6 +73,9 @@ const DatasetDataSchema = new Schema({
default: []
},
imageId: {
type: String
},
updateTime: {
type: Date,
default: () => new Date()

View File

@@ -0,0 +1,166 @@
import { addMinutes } from 'date-fns';
import { bucketName, MongoDatasetImageSchema } from './schema';
import { connectionMongo, Types } from '../../../common/mongo';
import fs from 'fs';
import type { FileType } from '../../../common/file/multer';
import fsp from 'fs/promises';
import { computeGridFsChunSize } from '../../../common/file/gridfs/utils';
import { setCron } from '../../../common/system/cron';
import { checkTimerLock } from '../../../common/system/timerLock/utils';
import { TimerIdEnum } from '../../../common/system/timerLock/constants';
import { addLog } from '../../../common/system/log';
const getGridBucket = () => {
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
bucketName: bucketName
});
};
export const createDatasetImage = async ({
teamId,
datasetId,
file,
expiredTime = addMinutes(new Date(), 30)
}: {
teamId: string;
datasetId: string;
file: FileType;
expiredTime?: Date;
}): Promise<{ imageId: string; previewUrl: string }> => {
const path = file.path;
const gridBucket = getGridBucket();
const metadata = {
teamId: String(teamId),
datasetId: String(datasetId),
expiredTime
};
const stats = await fsp.stat(path);
if (!stats.isFile()) return Promise.reject(`${path} is not a file`);
const readStream = fs.createReadStream(path, {
highWaterMark: 256 * 1024
});
const chunkSizeBytes = computeGridFsChunSize(stats.size);
const stream = gridBucket.openUploadStream(file.originalname, {
metadata,
contentType: file.mimetype,
chunkSizeBytes
});
// save to gridfs
await new Promise((resolve, reject) => {
readStream
.pipe(stream as any)
.on('finish', resolve)
.on('error', reject);
});
return {
imageId: String(stream.id),
previewUrl: ''
};
};
export const getDatasetImageReadData = async (imageId: string) => {
// Get file metadata to get contentType
const fileInfo = await MongoDatasetImageSchema.findOne({
_id: new Types.ObjectId(imageId)
}).lean();
if (!fileInfo) {
return Promise.reject('Image not found');
}
const gridBucket = getGridBucket();
return {
stream: gridBucket.openDownloadStream(new Types.ObjectId(imageId)),
fileInfo
};
};
export const getDatasetImageBase64 = async (imageId: string) => {
// Get file metadata to get contentType
const fileInfo = await MongoDatasetImageSchema.findOne({
_id: new Types.ObjectId(imageId)
}).lean();
if (!fileInfo) {
return Promise.reject('Image not found');
}
// Get image stream from GridFS
const { stream } = await getDatasetImageReadData(imageId);
// Convert stream to buffer
const chunks: Buffer[] = [];
return new Promise<string>((resolve, reject) => {
stream.on('data', (chunk: Buffer) => {
chunks.push(chunk);
});
stream.on('end', () => {
// Combine all chunks into a single buffer
const buffer = Buffer.concat(chunks);
// Convert buffer to base64 string
const base64 = buffer.toString('base64');
const dataUrl = `data:${fileInfo.contentType || 'image/jpeg'};base64,${base64}`;
resolve(dataUrl);
});
stream.on('error', reject);
});
};
export const deleteDatasetImage = async (imageId: string) => {
const gridBucket = getGridBucket();
try {
await gridBucket.delete(new Types.ObjectId(imageId));
} catch (error: any) {
const msg = error?.message;
if (msg.includes('File not found')) {
addLog.warn('Delete dataset image error', error);
return;
} else {
return Promise.reject(error);
}
}
};
export const clearExpiredDatasetImageCron = async () => {
const gridBucket = getGridBucket();
const clearExpiredDatasetImages = async () => {
addLog.debug('Clear expired dataset image start');
const data = await MongoDatasetImageSchema.find(
{
'metadata.expiredTime': { $lt: new Date() }
},
'_id'
).lean();
for (const item of data) {
try {
await gridBucket.delete(item._id);
} catch (error) {
addLog.error('Delete expired dataset image error', error);
}
}
addLog.debug('Clear expired dataset image end');
};
setCron('*/10 * * * *', async () => {
if (
await checkTimerLock({
timerId: TimerIdEnum.clearExpiredDatasetImage,
lockMinuted: 9
})
) {
try {
await clearExpiredDatasetImages();
} catch (error) {
addLog.error('clearExpiredDatasetImageCron error', error);
}
}
});
};

View File

@@ -0,0 +1,36 @@
import type { Types } from '../../../common/mongo';
import { getMongoModel, Schema } from '../../../common/mongo';
export const bucketName = 'dataset_image';
const MongoDatasetImage = new Schema({
length: { type: Number, required: true },
chunkSize: { type: Number, required: true },
uploadDate: { type: Date, required: true },
filename: { type: String, required: true },
contentType: { type: String, required: true },
metadata: {
teamId: { type: String, required: true },
datasetId: { type: String, required: true },
collectionId: { type: String },
expiredTime: { type: Date, required: true }
}
});
MongoDatasetImage.index({ 'metadata.datasetId': 'hashed' });
MongoDatasetImage.index({ 'metadata.collectionId': 'hashed' });
MongoDatasetImage.index({ 'metadata.expiredTime': -1 });
export const MongoDatasetImageSchema = getMongoModel<{
_id: Types.ObjectId;
length: number;
chunkSize: number;
uploadDate: Date;
filename: string;
contentType: string;
metadata: {
teamId: string;
datasetId: string;
collectionId: string;
expiredTime: Date;
};
}>(`${bucketName}.files`, MongoDatasetImage);

View File

@@ -0,0 +1,103 @@
import { ERROR_ENUM } from '@fastgpt/global/common/error/errorCode';
import { Types, type ClientSession } from '../../../common/mongo';
import { deleteDatasetImage } from './controller';
import { MongoDatasetImageSchema } from './schema';
import { addMinutes } from 'date-fns';
import jwt from 'jsonwebtoken';
export const removeDatasetImageExpiredTime = async ({
ids = [],
collectionId,
session
}: {
ids?: string[];
collectionId: string;
session?: ClientSession;
}) => {
if (ids.length === 0) return;
return MongoDatasetImageSchema.updateMany(
{
_id: {
$in: ids
.filter((id) => Types.ObjectId.isValid(id))
.map((id) => (typeof id === 'string' ? new Types.ObjectId(id) : id))
}
},
{
$unset: { 'metadata.expiredTime': '' },
$set: {
'metadata.collectionId': String(collectionId)
}
},
{ session }
);
};
export const getDatasetImagePreviewUrl = ({
imageId,
teamId,
datasetId,
expiredMinutes
}: {
imageId: string;
teamId: string;
datasetId: string;
expiredMinutes: number;
}) => {
const expiredTime = Math.floor(addMinutes(new Date(), expiredMinutes).getTime() / 1000);
const key = (process.env.FILE_TOKEN_KEY as string) ?? 'filetoken';
const token = jwt.sign(
{
teamId: String(teamId),
datasetId: String(datasetId),
exp: expiredTime
},
key
);
return `/api/core/dataset/image/${imageId}?token=${token}`;
};
export const authDatasetImagePreviewUrl = (token?: string) =>
new Promise<{
teamId: string;
datasetId: string;
}>((resolve, reject) => {
if (!token) {
return reject(ERROR_ENUM.unAuthFile);
}
const key = (process.env.FILE_TOKEN_KEY as string) ?? 'filetoken';
jwt.verify(token, key, (err, decoded: any) => {
if (err || !decoded?.teamId || !decoded?.datasetId) {
reject(ERROR_ENUM.unAuthFile);
return;
}
resolve({
teamId: decoded.teamId,
datasetId: decoded.datasetId
});
});
});
export const clearDatasetImages = async (datasetIds: string[]) => {
if (datasetIds.length === 0) return;
const images = await MongoDatasetImageSchema.find(
{
'metadata.datasetId': { $in: datasetIds.map((item) => String(item)) }
},
'_id'
).lean();
await Promise.all(images.map((image) => deleteDatasetImage(String(image._id))));
};
export const clearCollectionImages = async (collectionIds: string[]) => {
if (collectionIds.length === 0) return;
const images = await MongoDatasetImageSchema.find(
{
'metadata.collectionId': { $in: collectionIds.map((item) => String(item)) }
},
'_id'
).lean();
await Promise.all(images.map((image) => deleteDatasetImage(String(image._id))));
};

View File

@@ -9,13 +9,9 @@ import { type TextSplitProps, splitText2Chunks } from '@fastgpt/global/common/st
import axios from 'axios';
import { readRawContentByFileBuffer } from '../../common/file/read/utils';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import {
type APIFileServer,
type FeishuServer,
type YuqueServer
} from '@fastgpt/global/core/dataset/apiDataset';
import { getApiDatasetRequest } from './apiDataset';
import Papa from 'papaparse';
import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type';
export const readFileRawTextByUrl = async ({
teamId,
@@ -69,9 +65,7 @@ export const readDatasetSourceRawText = async ({
sourceId,
selector,
externalFileId,
apiServer,
feishuServer,
yuqueServer,
apiDatasetServer,
customPdfParse,
getFormatText
}: {
@@ -84,9 +78,7 @@ export const readDatasetSourceRawText = async ({
selector?: string; // link selector
externalFileId?: string; // external file dataset
apiServer?: APIFileServer; // api dataset
feishuServer?: FeishuServer; // feishu dataset
yuqueServer?: YuqueServer; // yuque dataset
apiDatasetServer?: ApiDatasetServerType; // api dataset
}): Promise<{
title?: string;
rawText: string;
@@ -110,9 +102,14 @@ export const readDatasetSourceRawText = async ({
selector
});
const { title = sourceId, content = '' } = result[0];
if (!content || content === 'Cannot fetch internal url') {
return Promise.reject(content || 'Can not fetch content from link');
}
return {
title: result[0]?.title,
rawText: result[0]?.content || ''
title,
rawText: content
};
} else if (type === DatasetSourceReadTypeEnum.externalFile) {
if (!externalFileId) return Promise.reject('FileId not found');
@@ -128,9 +125,7 @@ export const readDatasetSourceRawText = async ({
};
} else if (type === DatasetSourceReadTypeEnum.apiFile) {
const { title, rawText } = await readApiServerFileContent({
apiServer,
feishuServer,
yuqueServer,
apiDatasetServer,
apiFileId: sourceId,
teamId,
tmbId
@@ -147,17 +142,13 @@ export const readDatasetSourceRawText = async ({
};
export const readApiServerFileContent = async ({
apiServer,
feishuServer,
yuqueServer,
apiDatasetServer,
apiFileId,
teamId,
tmbId,
customPdfParse
}: {
apiServer?: APIFileServer;
feishuServer?: FeishuServer;
yuqueServer?: YuqueServer;
apiDatasetServer?: ApiDatasetServerType;
apiFileId: string;
teamId: string;
tmbId: string;
@@ -166,13 +157,7 @@ export const readApiServerFileContent = async ({
title?: string;
rawText: string;
}> => {
return (
await getApiDatasetRequest({
apiServer,
yuqueServer,
feishuServer
})
).getFileContent({
return (await getApiDatasetRequest(apiDatasetServer)).getFileContent({
teamId,
tmbId,
apiFileId,
@@ -186,9 +171,11 @@ export const rawText2Chunks = ({
chunkTriggerMinSize = 1000,
backupParse,
chunkSize = 512,
imageIdList,
...splitProps
}: {
rawText: string;
imageIdList?: string[];
chunkTriggerType?: ChunkTriggerConfigTypeEnum;
chunkTriggerMinSize?: number; // maxSize from agent model, not store
@@ -199,17 +186,18 @@ export const rawText2Chunks = ({
q: string;
a: string;
indexes?: string[];
imageIdList?: string[];
}[] => {
const parseDatasetBackup2Chunks = (rawText: string) => {
const csvArr = Papa.parse(rawText).data as string[][];
console.log(rawText, csvArr);
const chunks = csvArr
.slice(1)
.map((item) => ({
q: item[0] || '',
a: item[1] || '',
indexes: item.slice(2)
indexes: item.slice(2),
imageIdList
}))
.filter((item) => item.q || item.a);
@@ -231,7 +219,8 @@ export const rawText2Chunks = ({
return [
{
q: rawText,
a: ''
a: '',
imageIdList
}
];
}
@@ -240,7 +229,7 @@ export const rawText2Chunks = ({
if (chunkTriggerType !== ChunkTriggerConfigTypeEnum.forceChunk) {
const textLength = rawText.trim().length;
if (textLength < chunkTriggerMinSize) {
return [{ q: rawText, a: '' }];
return [{ q: rawText, a: '', imageIdList }];
}
}
@@ -253,6 +242,7 @@ export const rawText2Chunks = ({
return chunks.map((item) => ({
q: item,
a: '',
indexes: []
indexes: [],
imageIdList
}));
};

View File

@@ -127,14 +127,16 @@ const DatasetSchema = new Schema({
type: Boolean,
default: true
},
apiServer: Object,
feishuServer: Object,
yuqueServer: Object,
apiDatasetServer: Object,
// abandoned
autoSync: Boolean,
externalReadUrl: String,
defaultPermission: Number
defaultPermission: Number,
apiServer: Object,
feishuServer: Object,
yuqueServer: Object
});
try {

View File

@@ -28,6 +28,7 @@ import type { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants';
import { datasetSearchQueryExtension } from './utils';
import type { RerankModelItemType } from '@fastgpt/global/core/ai/model.d';
import { addLog } from '../../../common/system/log';
import { formatDatasetDataValue } from '../data/controller';
export type SearchDatasetDataProps = {
histories: ChatItemType[];
@@ -175,6 +176,12 @@ export async function searchDatasetData(
collectionFilterMatch
} = props;
// Constants data
const datasetDataSelectField =
'_id datasetId collectionId updateTime q a imageId chunkIndex indexes';
const datsaetCollectionSelectField =
'_id name fileId rawLink apiFileId externalFileId externalFileUrl';
/* init params */
searchMode = DatasetSearchModeMap[searchMode] ? searchMode : DatasetSearchModeEnum.embedding;
usingReRank = usingReRank && !!getDefaultRerankModel();
@@ -463,14 +470,14 @@ export async function searchDatasetData(
collectionId: { $in: collectionIdList },
'indexes.dataId': { $in: results.map((item) => item.id?.trim()) }
},
'_id datasetId collectionId updateTime q a chunkIndex indexes',
datasetDataSelectField,
{ ...readFromSecondary }
).lean(),
MongoDatasetCollection.find(
{
_id: { $in: collectionIdList }
},
'_id name fileId rawLink apiFileId externalFileId externalFileUrl',
datsaetCollectionSelectField,
{ ...readFromSecondary }
).lean()
]);
@@ -494,8 +501,13 @@ export async function searchDatasetData(
const result: SearchDataResponseItemType = {
id: String(data._id),
updateTime: data.updateTime,
q: data.q,
a: data.a,
...formatDatasetDataValue({
teamId,
datasetId: data.datasetId,
q: data.q,
a: data.a,
imageId: data.imageId
}),
chunkIndex: data.chunkIndex,
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
@@ -597,14 +609,14 @@ export async function searchDatasetData(
{
_id: { $in: searchResults.map((item) => item.dataId) }
},
'_id datasetId collectionId updateTime q a chunkIndex indexes',
datasetDataSelectField,
{ ...readFromSecondary }
).lean(),
MongoDatasetCollection.find(
{
_id: { $in: searchResults.map((item) => item.collectionId) }
},
'_id name fileId rawLink apiFileId externalFileId externalFileUrl',
datsaetCollectionSelectField,
{ ...readFromSecondary }
).lean()
]);
@@ -630,8 +642,13 @@ export async function searchDatasetData(
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
updateTime: data.updateTime,
q: data.q,
a: data.a,
...formatDatasetDataValue({
teamId,
datasetId: data.datasetId,
q: data.q,
a: data.a,
imageId: data.imageId
}),
chunkIndex: data.chunkIndex,
indexes: data.indexes,
...getCollectionSourceData(collection),

View File

@@ -12,10 +12,7 @@ import { getCollectionWithDataset } from '../controller';
import { mongoSessionRun } from '../../../common/mongo/sessionRun';
import { type PushDataToTrainingQueueProps } from '@fastgpt/global/core/dataset/training/type';
import { i18nT } from '../../../../web/i18n/utils';
import {
getLLMDefaultChunkSize,
getLLMMaxChunkSize
} from '../../../../global/core/dataset/training/utils';
import { getLLMMaxChunkSize } from '../../../../global/core/dataset/training/utils';
export const lockTrainingDataByTeamId = async (teamId: string): Promise<any> => {
try {
@@ -62,10 +59,10 @@ export async function pushDataListToTrainingQueue({
indexSize,
session
}: PushDataToTrainingQueueProps): Promise<PushDatasetDataResponse> {
const getImageChunkMode = (data: PushDatasetDataChunkProps, mode: TrainingModeEnum) => {
const formatTrainingMode = (data: PushDatasetDataChunkProps, mode: TrainingModeEnum) => {
if (mode !== TrainingModeEnum.image) return mode;
// 检查内容中,是否包含 ![](xxx) 的图片格式
const text = data.q + data.a || '';
const text = (data.q || '') + (data.a || '');
const regex = /!\[\]\((.*?)\)/g;
const match = text.match(regex);
if (match) {
@@ -82,9 +79,6 @@ export async function pushDataListToTrainingQueue({
if (!agentModelData) {
return Promise.reject(i18nT('common:error_llm_not_config'));
}
if (mode === TrainingModeEnum.chunk || mode === TrainingModeEnum.auto) {
prompt = undefined;
}
const { model, maxToken, weight } = await (async () => {
if (mode === TrainingModeEnum.chunk) {
@@ -101,7 +95,7 @@ export async function pushDataListToTrainingQueue({
weight: 0
};
}
if (mode === TrainingModeEnum.image) {
if (mode === TrainingModeEnum.image || mode === TrainingModeEnum.imageParse) {
const vllmModelData = getVlmModel(vlmModel);
if (!vllmModelData) {
return Promise.reject(i18nT('common:error_vlm_not_config'));
@@ -116,17 +110,8 @@ export async function pushDataListToTrainingQueue({
return Promise.reject(`Training mode "${mode}" is inValid`);
})();
// filter repeat or equal content
const set = new Set();
const filterResult: Record<string, PushDatasetDataChunkProps[]> = {
success: [],
overToken: [],
repeat: [],
error: []
};
// format q and a, remove empty char
data.forEach((item) => {
data = data.filter((item) => {
item.q = simpleText(item.q);
item.a = simpleText(item.a);
@@ -140,8 +125,7 @@ export async function pushDataListToTrainingQueue({
.filter(Boolean);
// filter repeat content
if (!item.q) {
filterResult.error.push(item);
if (!item.imageId && !item.q) {
return;
}
@@ -149,42 +133,36 @@ export async function pushDataListToTrainingQueue({
// Oversize llm tokens
if (text.length > maxToken) {
filterResult.overToken.push(item);
return;
}
if (set.has(text)) {
filterResult.repeat.push(item);
} else {
filterResult.success.push(item);
set.add(text);
}
return true;
});
// insert data to db
const insertLen = filterResult.success.length;
const failedDocuments: PushDatasetDataChunkProps[] = [];
const insertLen = data.length;
// 使用 insertMany 批量插入
const batchSize = 200;
const batchSize = 500;
const insertData = async (startIndex: number, session: ClientSession) => {
const list = filterResult.success.slice(startIndex, startIndex + batchSize);
const list = data.slice(startIndex, startIndex + batchSize);
if (list.length === 0) return;
try {
await MongoDatasetTraining.insertMany(
const result = await MongoDatasetTraining.insertMany(
list.map((item) => ({
teamId,
tmbId,
datasetId,
collectionId,
datasetId: datasetId,
collectionId: collectionId,
billId,
mode: getImageChunkMode(item, mode),
mode: formatTrainingMode(item, mode),
prompt,
model,
q: item.q,
a: item.a,
...(item.q && { q: item.q }),
...(item.a && { a: item.a }),
...(item.imageId && { imageId: item.imageId }),
chunkIndex: item.chunkIndex ?? 0,
indexSize,
weight: weight ?? 0,
@@ -193,21 +171,20 @@ export async function pushDataListToTrainingQueue({
})),
{
session,
ordered: true
ordered: false,
rawResult: true,
includeResultMetadata: false // 进一步减少返回数据
}
);
if (result.insertedCount !== list.length) {
return Promise.reject(`Insert data error, ${JSON.stringify(result)}`);
}
} catch (error: any) {
addLog.error(`Insert error`, error);
// 如果有错误,将失败的文档添加到失败列表中
error.writeErrors?.forEach((writeError: any) => {
failedDocuments.push(data[writeError.index]);
});
console.log('failed', failedDocuments);
return Promise.reject(error);
}
// 对于失败的文档,尝试单独插入
await MongoDatasetTraining.create(failedDocuments, { session });
return insertData(startIndex + batchSize, session);
};
@@ -219,10 +196,37 @@ export async function pushDataListToTrainingQueue({
});
}
delete filterResult.success;
return {
insertLen,
...filterResult
insertLen
};
}
export const pushDatasetToParseQueue = async ({
teamId,
tmbId,
datasetId,
collectionId,
billId,
session
}: {
teamId: string;
tmbId: string;
datasetId: string;
collectionId: string;
billId: string;
session: ClientSession;
}) => {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId,
billId,
mode: TrainingModeEnum.parse
}
],
{ session, ordered: true }
);
};

View File

@@ -54,16 +54,8 @@ const TrainingDataSchema = new Schema({
default: 5
},
model: {
// ai model
type: String,
required: true
},
prompt: {
// qa split prompt
type: String,
default: ''
},
model: String,
prompt: String,
q: {
type: String,
default: ''
@@ -72,6 +64,7 @@ const TrainingDataSchema = new Schema({
type: String,
default: ''
},
imageId: String,
chunkIndex: {
type: Number,
default: 0
@@ -81,9 +74,7 @@ const TrainingDataSchema = new Schema({
type: Number,
default: 0
},
dataId: {
type: Schema.Types.ObjectId
},
dataId: Schema.Types.ObjectId,
indexes: {
type: [
{