fix: invalid dataset data clear (#3927)

* fix: collection list count

* fix: collection list count

* fix: invalid dataset data clear

* update ts

* perf: cron clear invalid data

* perf: init

* perf: clear invalid code

* update init

* perf: clear invalid code

* perf: clear invalid code

* perf: init count

* batch init

* batch init

* batch init

* batch init

* add comment

* perf: init

* fix: api proxy type
This commit is contained in:
Archer
2025-02-28 17:49:20 +08:00
committed by GitHub
parent ac4255ea0c
commit cf0aaa1091
13 changed files with 286 additions and 96 deletions

View File

@@ -27,4 +27,5 @@ weight: 802
1. 标签过滤时,子文件夹未成功过滤。 1. 标签过滤时,子文件夹未成功过滤。
2. 暂时移除 md 阅读优化,避免链接分割错误。 2. 暂时移除 md 阅读优化,避免链接分割错误。
3. 离开团队时,未刷新成员列表。 3. 离开团队时,未刷新成员列表。
4. PPTX 编码错误,导致解析失败。 4. PPTX 编码错误,导致解析失败。
5. 删除知识库单条数据时,全文索引未跟随删除。

View File

@@ -227,7 +227,13 @@ export const delCollectionRelatedSource = async ({
collections, collections,
session session
}: { }: {
collections: DatasetCollectionSchemaType[]; collections: {
teamId: string;
fileId?: string;
metadata?: {
relatedImgId?: string;
};
}[];
session: ClientSession; session: ClientSession;
}) => { }) => {
if (collections.length === 0) return; if (collections.length === 0) return;
@@ -259,11 +265,13 @@ export const delCollectionRelatedSource = async ({
export async function delCollection({ export async function delCollection({
collections, collections,
session, session,
delRelatedSource delImg = true,
delFile = true
}: { }: {
collections: DatasetCollectionSchemaType[]; collections: DatasetCollectionSchemaType[];
session: ClientSession; session: ClientSession;
delRelatedSource: boolean; delImg: boolean;
delFile: boolean;
}) { }) {
if (collections.length === 0) return; if (collections.length === 0) return;
@@ -281,9 +289,18 @@ export async function delCollection({
collectionId: { $in: collectionIds } collectionId: { $in: collectionIds }
}); });
/* file and imgs */ if (delImg) {
if (delRelatedSource) { await delImgByRelatedId({
await delCollectionRelatedSource({ collections, session }); teamId,
relateIds: collections.map((item) => item?.metadata?.relatedImgId || '').filter(Boolean),
session
});
}
if (delFile) {
await delFileByFileIdList({
bucketName: BucketNameEnum.dataset,
fileIdList: collections.map((item) => item?.fileId || '').filter(Boolean)
});
} }
// Delete dataset_datas // Delete dataset_datas
@@ -309,48 +326,3 @@ export async function delCollection({
// no session delete: delete files, vector data // no session delete: delete files, vector data
await deleteDatasetDataVector({ teamId, datasetIds, collectionIds }); await deleteDatasetDataVector({ teamId, datasetIds, collectionIds });
} }
/**
* delete delOnlyCollection
*/
export async function delOnlyCollection({
collections,
session
}: {
collections: DatasetCollectionSchemaType[];
session: ClientSession;
}) {
if (collections.length === 0) return;
const teamId = collections[0].teamId;
if (!teamId) return Promise.reject('teamId is not exist');
const datasetIds = Array.from(new Set(collections.map((item) => String(item.datasetId))));
const collectionIds = collections.map((item) => String(item._id));
// delete training data
await MongoDatasetTraining.deleteMany({
teamId,
datasetId: { $in: datasetIds },
collectionId: { $in: collectionIds }
});
// delete dataset.datas
await MongoDatasetData.deleteMany(
{ teamId, datasetId: { $in: datasetIds }, collectionId: { $in: collectionIds } },
{ session }
);
// delete collections
await MongoDatasetCollection.deleteMany(
{
teamId,
_id: { $in: collectionIds }
},
{ session }
);
// no session delete: delete files, vector data
await deleteDatasetDataVector({ teamId, datasetIds, collectionIds });
}

View File

@@ -174,6 +174,14 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
} }
await mongoSessionRun(async (session) => { await mongoSessionRun(async (session) => {
// Delete old collection
await delCollection({
collections: [collection],
delImg: false,
delFile: false,
session
});
// Create new collection // Create new collection
await createCollectionAndInsertData({ await createCollectionAndInsertData({
session, session,
@@ -208,13 +216,6 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => {
updateTime: new Date() updateTime: new Date()
} }
}); });
// Delete old collection
await delCollection({
collections: [collection],
delRelatedSource: false,
session
});
}); });
return DatasetCollectionSyncResultEnum.success; return DatasetCollectionSyncResultEnum.success;

View File

@@ -1,6 +1,6 @@
import { connectionMongo, getMongoModel } from '../../../common/mongo'; import { connectionMongo, getMongoModel } from '../../../common/mongo';
const { Schema } = connectionMongo; const { Schema } = connectionMongo;
import { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type.d'; import { DatasetDataTextSchemaType } from '@fastgpt/global/core/dataset/type.d';
import { TeamCollectionName } from '@fastgpt/global/support/user/team/constant'; import { TeamCollectionName } from '@fastgpt/global/support/user/team/constant';
import { DatasetCollectionName } from '../schema'; import { DatasetCollectionName } from '../schema';
import { DatasetColCollectionName } from '../collection/schema'; import { DatasetColCollectionName } from '../collection/schema';
@@ -45,7 +45,7 @@ try {
console.log(error); console.log(error);
} }
export const MongoDatasetDataText = getMongoModel<DatasetDataSchemaType>( export const MongoDatasetDataText = getMongoModel<DatasetDataTextSchemaType>(
DatasetDataTextCollectionName, DatasetDataTextCollectionName,
DatasetDataTextSchema DatasetDataTextSchema
); );

View File

@@ -135,7 +135,6 @@ const ModelTest = ({ models, onClose }: { models: string[]; onClose: () => void
} }
); );
console.log(testModelList);
return ( return (
<MyModal <MyModal
iconSrc={'core/chat/sendLight'} iconSrc={'core/chat/sendLight'}

View File

@@ -119,7 +119,7 @@ const ChannelLog = ({ Tab }: { Tab: React.ReactNode }) => {
}, },
...res ...res
]; ];
}, [systemModelList]); }, [systemModelList, t]);
const { data, isLoading, ScrollData } = useScrollPagination(getChannelLog, { const { data, isLoading, ScrollData } = useScrollPagination(getChannelLog, {
pageSize: 20, pageSize: 20,

View File

@@ -35,7 +35,7 @@ async function checkInvalidImg(start: Date, end: Date, limit = 50) {
'metadata.relatedImgId': image.metadata?.relatedId 'metadata.relatedImgId': image.metadata?.relatedId
}, },
'_id' '_id'
); ).lean();
if (!collection) { if (!collection) {
await image.deleteOne(); await image.deleteOne();

View File

@@ -0,0 +1,206 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@fastgpt/service/common/response';
import { connectToDatabase } from '@/service/mongo';
import { authCert } from '@fastgpt/service/support/permission/auth/common';
import { addHours } from 'date-fns';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
import { MongoDataset } from '@fastgpt/service/core/dataset/schema';
import { delay, retryFn } from '@fastgpt/global/common/system/utils';
import { delCollection } from '@fastgpt/service/core/dataset/collection/controller';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type';
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/controller';
// 删了库,没删集合
const checkInvalidCollection = async () => {
const batchSize = 1000;
let skip = 0;
let success = 0;
while (true) {
try {
const collections = await MongoDatasetCollection.find(
{},
'_id teamId datasetId fileId metadata'
)
.limit(batchSize)
.skip(skip)
.lean();
if (collections.length === 0) break;
const datasetMap: Record<string, DatasetCollectionSchemaType[]> = {};
// 相同 datasetId 的集合放到一起
for await (const collection of collections) {
const datasetId = String(collection.datasetId);
const val = datasetMap[datasetId];
if (val) {
val.push(collection);
} else {
datasetMap[datasetId] = [collection];
}
}
const datasetIds = Object.keys(datasetMap);
for await (const datasetId of datasetIds) {
try {
const val = datasetMap[datasetId];
if (!val) {
continue;
}
await retryFn(async () => {
const datasetExists = await MongoDataset.findById(datasetId, '_id').lean();
if (!datasetExists) {
console.log('清理无效的知识库集合, datasetId', datasetId);
await mongoSessionRun(async (session) => {
return await delCollection({
collections: val,
delImg: true,
delFile: true,
session
});
});
}
});
} catch (error) {
console.log(error);
}
}
success += batchSize;
skip += batchSize;
console.log(`检测集合完成:${success}`);
} catch (error) {
console.log(error);
await delay(1000);
}
}
};
// 删了集合,没删 data
const checkInvalidData = async () => {
try {
const datas = (await MongoDatasetData.aggregate([
{
$group: {
_id: '$collectionId',
teamId: { $first: '$teamId' },
datasetId: { $first: '$datasetId' },
collectionId: { $first: '$collectionId' }
}
}
])) as {
_id: string;
teamId: string;
datasetId: string;
collectionId: string;
}[];
console.log('Total data collections length', datas.length);
// 批量获取集合
const collections = await MongoDatasetCollection.find({}, '_id').lean();
console.log('Total collection length', collections.length);
const collectionMap: Record<string, DatasetCollectionSchemaType> = {};
for await (const collection of collections) {
collectionMap[collection._id] = collection;
}
// 逐一删除无效的集合内容
for await (const data of datas) {
try {
const col = collectionMap[data.collectionId];
if (!col) {
console.log('清理无效的知识库集合内容, collectionId', data.collectionId);
await retryFn(async () => {
await MongoDatasetTraining.deleteMany({
teamId: data.teamId,
datasetId: data.datasetId,
collectionId: data.collectionId
});
await MongoDatasetDataText.deleteMany({
teamId: data.teamId,
datasetId: data.datasetId,
collectionId: data.collectionId
});
await deleteDatasetDataVector({
teamId: data.teamId,
datasetIds: [data.datasetId],
collectionIds: [data.collectionId]
});
await MongoDatasetData.deleteMany({
teamId: data.teamId,
datasetId: data.datasetId,
collectionId: data.collectionId
});
});
}
} catch (error) {
console.log(error);
}
}
console.log(`检测集合完成`);
} catch (error) {
console.log(error);
}
};
// 删了data没删 data_text
const checkInvalidDataText = async () => {
try {
// 获取所有索引层的 dataId
const dataTexts = await MongoDatasetDataText.find({}, 'dataId').lean();
const dataIds = dataTexts.map((item) => String(item.dataId));
console.log('Total data_text dataIds:', dataIds.length);
// 获取数据层的 dataId
const datas = await MongoDatasetData.find({}, '_id').lean();
const datasSet = new Set(datas.map((item) => String(item._id)));
console.log('Total data length:', datas.length);
// 存在索引层,不存在数据层的 dataId说明数据已经被删了
const unExistsSet = dataIds.filter((id) => !datasSet.has(id));
console.log('Total unExists dataIds:', unExistsSet.length);
await MongoDatasetDataText.deleteMany({
dataId: { $in: unExistsSet }
});
} catch (error) {}
};
/* pg 中的数据搬到 mongo dataset.datas 中,并做映射 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
try {
await connectToDatabase();
await authCert({ req, authRoot: true });
const { start = -2, end = -360 * 24 } = req.body as { start: number; end: number };
(async () => {
try {
// 360天 ~ 2小时前
const endTime = addHours(new Date(), start);
const startTime = addHours(new Date(), end);
console.log('清理无效的集合');
await checkInvalidCollection();
console.log('清理无效的数据');
await checkInvalidData();
console.log('清理无效的data_text');
await checkInvalidDataText();
} catch (error) {
console.log('执行脏数据清理任务出错了');
}
})();
jsonRes(res, {
message: 'success'
});
} catch (error) {
console.log(error);
jsonRes(res, {
code: 500,
error
});
}
}

View File

@@ -10,7 +10,7 @@ import { hashStr } from '@fastgpt/global/common/string/tools';
import { readDatasetSourceRawText } from '@fastgpt/service/core/dataset/read'; import { readDatasetSourceRawText } from '@fastgpt/service/core/dataset/read';
import { NextAPI } from '@/service/middleware/entry'; import { NextAPI } from '@/service/middleware/entry';
import { ApiRequestProps } from '@fastgpt/service/type/next'; import { ApiRequestProps } from '@fastgpt/service/type/next';
import { delOnlyCollection } from '@fastgpt/service/core/dataset/collection/controller'; import { delCollection } from '@fastgpt/service/core/dataset/collection/controller';
import { authDatasetCollection } from '@fastgpt/service/support/permission/dataset/auth'; import { authDatasetCollection } from '@fastgpt/service/support/permission/dataset/auth';
import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; import { CommonErrEnum } from '@fastgpt/global/common/error/code/common';
import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant'; import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant';
@@ -89,6 +89,13 @@ async function handler(
}); });
return mongoSessionRun(async (session) => { return mongoSessionRun(async (session) => {
await delCollection({
collections: [collection],
session,
delImg: false,
delFile: false
});
const { collectionId } = await createCollectionAndInsertData({ const { collectionId } = await createCollectionAndInsertData({
dataset: collection.dataset, dataset: collection.dataset,
rawText, rawText,
@@ -121,10 +128,6 @@ async function handler(
metadata: collection.metadata metadata: collection.metadata
} }
}); });
await delOnlyCollection({
collections: [collection],
session
});
return { collectionId }; return { collectionId };
}); });

View File

@@ -34,7 +34,8 @@ async function handler(req: NextApiRequest) {
await mongoSessionRun((session) => await mongoSessionRun((session) =>
delCollection({ delCollection({
collections, collections,
delRelatedSource: true, delImg: true,
delFile: true,
session session
}) })
); );

View File

@@ -1,15 +1,16 @@
import { BucketNameEnum } from '@fastgpt/global/common/file/constants'; import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { import {
delFileByFileIdList, delFileByFileIdList,
getGFSCollection getGFSCollection
} from '@fastgpt/service/common/file/gridfs/controller'; } from '@fastgpt/service/common/file/gridfs/controller';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { addLog } from '@fastgpt/service/common/system/log'; import { addLog } from '@fastgpt/service/common/system/log';
import { import {
deleteDatasetDataVector, deleteDatasetDataVector,
getVectorDataByTime getVectorDataByTime
} from '@fastgpt/service/common/vectorStore/controller'; } from '@fastgpt/service/common/vectorStore/controller';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema'; import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema'; import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
import { addDays } from 'date-fns'; import { addDays } from 'date-fns';
@@ -129,32 +130,35 @@ export async function checkInvalidDatasetData(start: Date, end: Date) {
for await (const item of list) { for await (const item of list) {
try { try {
// 3. 查看该collection是否存在不存在则删除对应的数据 // 3. 查看该collection是否存在不存在则删除对应的数据
const collection = await MongoDatasetCollection.findOne({ _id: item.collectionId }); const collection = await MongoDatasetCollection.findOne(
{ _id: item.collectionId },
'_id'
).lean();
if (!collection) { if (!collection) {
await mongoSessionRun(async (session) => { console.log('collection is not found', item);
await MongoDatasetTraining.deleteMany(
{ await retryFn(async () => {
teamId: item.teamId, await MongoDatasetTraining.deleteMany({
collectionId: item.collectionId teamId: item.teamId,
}, datasetId: item.datasetId,
{ session } collectionId: item.collectionId
); });
await MongoDatasetData.deleteMany( await MongoDatasetDataText.deleteMany({
{ teamId: item.teamId,
teamId: item.teamId, datasetId: item.datasetId,
collectionId: item.collectionId collectionId: item.collectionId
}, });
{ session }
);
await deleteDatasetDataVector({ await deleteDatasetDataVector({
teamId: item.teamId, teamId: item.teamId,
datasetIds: [item.datasetId], datasetIds: [item.datasetId],
collectionIds: [item.collectionId] collectionIds: [item.collectionId]
}); });
await MongoDatasetData.deleteMany({
teamId: item.teamId,
datasetId: item.datasetId,
collectionId: item.collectionId
});
}); });
console.log('collection is not found', item);
continue;
} }
} catch (error) {} } catch (error) {}
if (++index % 100 === 0) { if (++index % 100 === 0) {

View File

@@ -275,7 +275,8 @@ export async function updateData2Dataset({
export const deleteDatasetData = async (data: DatasetDataItemType) => { export const deleteDatasetData = async (data: DatasetDataItemType) => {
await mongoSessionRun(async (session) => { await mongoSessionRun(async (session) => {
await MongoDatasetData.findByIdAndDelete(data.id, { session }); await MongoDatasetData.deleteOne({ _id: data.id }, { session });
await MongoDatasetDataText.deleteMany({ dataId: data.id }, { session });
await deleteDatasetDataVector({ await deleteDatasetDataVector({
teamId: data.teamId, teamId: data.teamId,
idList: data.indexes.map((item) => item.dataId) idList: data.indexes.map((item) => item.dataId)

View File

@@ -154,7 +154,7 @@ export const deleteChannel = (id: number) => DELETE(`/channel/${id}`);
export const getChannelLog = (params: { export const getChannelLog = (params: {
channel?: string; channel?: string;
model_name?: string; model_name?: string;
status?: 'all' | 'success' | 'error'; code_type?: 'all' | 'success' | 'error';
start_timestamp: number; start_timestamp: number;
end_timestamp: number; end_timestamp: number;
offset: number; offset: number;
@@ -164,11 +164,13 @@ export const getChannelLog = (params: {
logs: ChannelLogListItemType[]; logs: ChannelLogListItemType[];
total: number; total: number;
}>(`/logs/search`, { }>(`/logs/search`, {
...params, channel: params.channel,
model_name: params.model_name,
code_type: params.code_type,
start_timestamp: params.start_timestamp,
end_timestamp: params.end_timestamp,
p: Math.floor(params.offset / params.pageSize) + 1, p: Math.floor(params.offset / params.pageSize) + 1,
per_page: params.pageSize, per_page: params.pageSize
offset: undefined,
pageSize: undefined
}).then((res) => { }).then((res) => {
return { return {
list: res.logs, list: res.logs,