From cf0aaa10915b4da6b175481183e65066e8636c38 Mon Sep 17 00:00:00 2001 From: Archer <545436317@qq.com> Date: Fri, 28 Feb 2025 17:49:20 +0800 Subject: [PATCH] fix: invalid dataset data clear (#3927) * fix: collection list count * fix: collection list count * fix: invalid dataset data clear * update ts * perf: cron clear invalid data * perf: init * perf: clear invalid code * update init * perf: clear invalid code * perf: clear invalid code * perf: init count * batch init * batch init * batch init * batch init * add comment * perf: init * fix: api proxy type --- .../zh-cn/docs/development/upgrading/4823.md | 3 +- .../core/dataset/collection/controller.ts | 74 ++----- .../service/core/dataset/collection/utils.ts | 15 +- .../core/dataset/data/dataTextSchema.ts | 4 +- .../account/model/Channel/ModelTest.tsx | 1 - .../account/model/Log/index.tsx | 2 +- .../src/pages/api/admin/clearInvalidData.ts | 2 +- projects/app/src/pages/api/admin/initv4823.ts | 206 ++++++++++++++++++ .../collection/create/reTrainingCollection.ts | 13 +- .../api/core/dataset/collection/delete.ts | 3 +- .../app/src/service/common/system/cronTask.ts | 44 ++-- .../service/core/dataset/data/controller.ts | 3 +- projects/app/src/web/core/ai/channel.ts | 12 +- 13 files changed, 286 insertions(+), 96 deletions(-) create mode 100644 projects/app/src/pages/api/admin/initv4823.ts diff --git a/docSite/content/zh-cn/docs/development/upgrading/4823.md b/docSite/content/zh-cn/docs/development/upgrading/4823.md index 5dcc8fcf1..4faf0e140 100644 --- a/docSite/content/zh-cn/docs/development/upgrading/4823.md +++ b/docSite/content/zh-cn/docs/development/upgrading/4823.md @@ -27,4 +27,5 @@ weight: 802 1. 标签过滤时,子文件夹未成功过滤。 2. 暂时移除 md 阅读优化,避免链接分割错误。 3. 离开团队时,未刷新成员列表。 -4. PPTX 编码错误,导致解析失败。 \ No newline at end of file +4. PPTX 编码错误,导致解析失败。 +5. 删除知识库单条数据时,全文索引未跟随删除。 \ No newline at end of file diff --git a/packages/service/core/dataset/collection/controller.ts b/packages/service/core/dataset/collection/controller.ts index 1ad90015f..5aa63f4fe 100644 --- a/packages/service/core/dataset/collection/controller.ts +++ b/packages/service/core/dataset/collection/controller.ts @@ -227,7 +227,13 @@ export const delCollectionRelatedSource = async ({ collections, session }: { - collections: DatasetCollectionSchemaType[]; + collections: { + teamId: string; + fileId?: string; + metadata?: { + relatedImgId?: string; + }; + }[]; session: ClientSession; }) => { if (collections.length === 0) return; @@ -259,11 +265,13 @@ export const delCollectionRelatedSource = async ({ export async function delCollection({ collections, session, - delRelatedSource + delImg = true, + delFile = true }: { collections: DatasetCollectionSchemaType[]; session: ClientSession; - delRelatedSource: boolean; + delImg: boolean; + delFile: boolean; }) { if (collections.length === 0) return; @@ -281,9 +289,18 @@ export async function delCollection({ collectionId: { $in: collectionIds } }); - /* file and imgs */ - if (delRelatedSource) { - await delCollectionRelatedSource({ collections, session }); + if (delImg) { + await delImgByRelatedId({ + teamId, + relateIds: collections.map((item) => item?.metadata?.relatedImgId || '').filter(Boolean), + session + }); + } + if (delFile) { + await delFileByFileIdList({ + bucketName: BucketNameEnum.dataset, + fileIdList: collections.map((item) => item?.fileId || '').filter(Boolean) + }); } // Delete dataset_datas @@ -309,48 +326,3 @@ export async function delCollection({ // no session delete: delete files, vector data await deleteDatasetDataVector({ teamId, datasetIds, collectionIds }); } - -/** - * delete delOnlyCollection - */ -export async function delOnlyCollection({ - collections, - session -}: { - collections: DatasetCollectionSchemaType[]; - session: ClientSession; -}) { - if (collections.length === 0) return; - - const teamId = collections[0].teamId; - - if (!teamId) return Promise.reject('teamId is not exist'); - - const datasetIds = Array.from(new Set(collections.map((item) => String(item.datasetId)))); - const collectionIds = collections.map((item) => String(item._id)); - - // delete training data - await MongoDatasetTraining.deleteMany({ - teamId, - datasetId: { $in: datasetIds }, - collectionId: { $in: collectionIds } - }); - - // delete dataset.datas - await MongoDatasetData.deleteMany( - { teamId, datasetId: { $in: datasetIds }, collectionId: { $in: collectionIds } }, - { session } - ); - - // delete collections - await MongoDatasetCollection.deleteMany( - { - teamId, - _id: { $in: collectionIds } - }, - { session } - ); - - // no session delete: delete files, vector data - await deleteDatasetDataVector({ teamId, datasetIds, collectionIds }); -} diff --git a/packages/service/core/dataset/collection/utils.ts b/packages/service/core/dataset/collection/utils.ts index 9bf9a2262..e1a9e4632 100644 --- a/packages/service/core/dataset/collection/utils.ts +++ b/packages/service/core/dataset/collection/utils.ts @@ -174,6 +174,14 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => { } await mongoSessionRun(async (session) => { + // Delete old collection + await delCollection({ + collections: [collection], + delImg: false, + delFile: false, + session + }); + // Create new collection await createCollectionAndInsertData({ session, @@ -208,13 +216,6 @@ export const syncCollection = async (collection: CollectionWithDatasetType) => { updateTime: new Date() } }); - - // Delete old collection - await delCollection({ - collections: [collection], - delRelatedSource: false, - session - }); }); return DatasetCollectionSyncResultEnum.success; diff --git a/packages/service/core/dataset/data/dataTextSchema.ts b/packages/service/core/dataset/data/dataTextSchema.ts index da06a80a4..8bf1ba5d8 100644 --- a/packages/service/core/dataset/data/dataTextSchema.ts +++ b/packages/service/core/dataset/data/dataTextSchema.ts @@ -1,6 +1,6 @@ import { connectionMongo, getMongoModel } from '../../../common/mongo'; const { Schema } = connectionMongo; -import { DatasetDataSchemaType } from '@fastgpt/global/core/dataset/type.d'; +import { DatasetDataTextSchemaType } from '@fastgpt/global/core/dataset/type.d'; import { TeamCollectionName } from '@fastgpt/global/support/user/team/constant'; import { DatasetCollectionName } from '../schema'; import { DatasetColCollectionName } from '../collection/schema'; @@ -45,7 +45,7 @@ try { console.log(error); } -export const MongoDatasetDataText = getMongoModel( +export const MongoDatasetDataText = getMongoModel( DatasetDataTextCollectionName, DatasetDataTextSchema ); diff --git a/projects/app/src/pageComponents/account/model/Channel/ModelTest.tsx b/projects/app/src/pageComponents/account/model/Channel/ModelTest.tsx index fc7db5a30..2d61ca554 100644 --- a/projects/app/src/pageComponents/account/model/Channel/ModelTest.tsx +++ b/projects/app/src/pageComponents/account/model/Channel/ModelTest.tsx @@ -135,7 +135,6 @@ const ModelTest = ({ models, onClose }: { models: string[]; onClose: () => void } ); - console.log(testModelList); return ( { }, ...res ]; - }, [systemModelList]); + }, [systemModelList, t]); const { data, isLoading, ScrollData } = useScrollPagination(getChannelLog, { pageSize: 20, diff --git a/projects/app/src/pages/api/admin/clearInvalidData.ts b/projects/app/src/pages/api/admin/clearInvalidData.ts index d3f85a6a7..6259a01d9 100644 --- a/projects/app/src/pages/api/admin/clearInvalidData.ts +++ b/projects/app/src/pages/api/admin/clearInvalidData.ts @@ -35,7 +35,7 @@ async function checkInvalidImg(start: Date, end: Date, limit = 50) { 'metadata.relatedImgId': image.metadata?.relatedId }, '_id' - ); + ).lean(); if (!collection) { await image.deleteOne(); diff --git a/projects/app/src/pages/api/admin/initv4823.ts b/projects/app/src/pages/api/admin/initv4823.ts new file mode 100644 index 000000000..0b163406e --- /dev/null +++ b/projects/app/src/pages/api/admin/initv4823.ts @@ -0,0 +1,206 @@ +import type { NextApiRequest, NextApiResponse } from 'next'; +import { jsonRes } from '@fastgpt/service/common/response'; +import { connectToDatabase } from '@/service/mongo'; +import { authCert } from '@fastgpt/service/support/permission/auth/common'; +import { addHours } from 'date-fns'; +import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema'; +import { MongoDataset } from '@fastgpt/service/core/dataset/schema'; +import { delay, retryFn } from '@fastgpt/global/common/system/utils'; +import { delCollection } from '@fastgpt/service/core/dataset/collection/controller'; +import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; +import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema'; +import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema'; +import { DatasetCollectionSchemaType } from '@fastgpt/global/core/dataset/type'; +import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; +import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/controller'; + +// 删了库,没删集合 +const checkInvalidCollection = async () => { + const batchSize = 1000; + + let skip = 0; + let success = 0; + while (true) { + try { + const collections = await MongoDatasetCollection.find( + {}, + '_id teamId datasetId fileId metadata' + ) + .limit(batchSize) + .skip(skip) + .lean(); + if (collections.length === 0) break; + + const datasetMap: Record = {}; + + // 相同 datasetId 的集合放到一起 + for await (const collection of collections) { + const datasetId = String(collection.datasetId); + const val = datasetMap[datasetId]; + if (val) { + val.push(collection); + } else { + datasetMap[datasetId] = [collection]; + } + } + + const datasetIds = Object.keys(datasetMap); + for await (const datasetId of datasetIds) { + try { + const val = datasetMap[datasetId]; + if (!val) { + continue; + } + + await retryFn(async () => { + const datasetExists = await MongoDataset.findById(datasetId, '_id').lean(); + if (!datasetExists) { + console.log('清理无效的知识库集合, datasetId', datasetId); + await mongoSessionRun(async (session) => { + return await delCollection({ + collections: val, + delImg: true, + delFile: true, + session + }); + }); + } + }); + } catch (error) { + console.log(error); + } + } + + success += batchSize; + skip += batchSize; + console.log(`检测集合完成:${success}`); + } catch (error) { + console.log(error); + await delay(1000); + } + } +}; + +// 删了集合,没删 data +const checkInvalidData = async () => { + try { + const datas = (await MongoDatasetData.aggregate([ + { + $group: { + _id: '$collectionId', + teamId: { $first: '$teamId' }, + datasetId: { $first: '$datasetId' }, + collectionId: { $first: '$collectionId' } + } + } + ])) as { + _id: string; + teamId: string; + datasetId: string; + collectionId: string; + }[]; + console.log('Total data collections length', datas.length); + // 批量获取集合 + const collections = await MongoDatasetCollection.find({}, '_id').lean(); + console.log('Total collection length', collections.length); + const collectionMap: Record = {}; + for await (const collection of collections) { + collectionMap[collection._id] = collection; + } + // 逐一删除无效的集合内容 + for await (const data of datas) { + try { + const col = collectionMap[data.collectionId]; + if (!col) { + console.log('清理无效的知识库集合内容, collectionId', data.collectionId); + await retryFn(async () => { + await MongoDatasetTraining.deleteMany({ + teamId: data.teamId, + datasetId: data.datasetId, + collectionId: data.collectionId + }); + await MongoDatasetDataText.deleteMany({ + teamId: data.teamId, + datasetId: data.datasetId, + collectionId: data.collectionId + }); + await deleteDatasetDataVector({ + teamId: data.teamId, + datasetIds: [data.datasetId], + collectionIds: [data.collectionId] + }); + await MongoDatasetData.deleteMany({ + teamId: data.teamId, + datasetId: data.datasetId, + collectionId: data.collectionId + }); + }); + } + } catch (error) { + console.log(error); + } + } + + console.log(`检测集合完成`); + } catch (error) { + console.log(error); + } +}; + +// 删了data,没删 data_text +const checkInvalidDataText = async () => { + try { + // 获取所有索引层的 dataId + const dataTexts = await MongoDatasetDataText.find({}, 'dataId').lean(); + const dataIds = dataTexts.map((item) => String(item.dataId)); + console.log('Total data_text dataIds:', dataIds.length); + + // 获取数据层的 dataId + const datas = await MongoDatasetData.find({}, '_id').lean(); + const datasSet = new Set(datas.map((item) => String(item._id))); + console.log('Total data length:', datas.length); + + // 存在索引层,不存在数据层的 dataId,说明数据已经被删了 + const unExistsSet = dataIds.filter((id) => !datasSet.has(id)); + console.log('Total unExists dataIds:', unExistsSet.length); + await MongoDatasetDataText.deleteMany({ + dataId: { $in: unExistsSet } + }); + } catch (error) {} +}; + +/* pg 中的数据搬到 mongo dataset.datas 中,并做映射 */ +export default async function handler(req: NextApiRequest, res: NextApiResponse) { + try { + await connectToDatabase(); + await authCert({ req, authRoot: true }); + const { start = -2, end = -360 * 24 } = req.body as { start: number; end: number }; + + (async () => { + try { + // 360天 ~ 2小时前 + const endTime = addHours(new Date(), start); + const startTime = addHours(new Date(), end); + console.log('清理无效的集合'); + await checkInvalidCollection(); + console.log('清理无效的数据'); + await checkInvalidData(); + console.log('清理无效的data_text'); + await checkInvalidDataText(); + } catch (error) { + console.log('执行脏数据清理任务出错了'); + } + })(); + + jsonRes(res, { + message: 'success' + }); + } catch (error) { + console.log(error); + + jsonRes(res, { + code: 500, + error + }); + } +} diff --git a/projects/app/src/pages/api/core/dataset/collection/create/reTrainingCollection.ts b/projects/app/src/pages/api/core/dataset/collection/create/reTrainingCollection.ts index 6d0b1a41f..48cccdd26 100644 --- a/projects/app/src/pages/api/core/dataset/collection/create/reTrainingCollection.ts +++ b/projects/app/src/pages/api/core/dataset/collection/create/reTrainingCollection.ts @@ -10,7 +10,7 @@ import { hashStr } from '@fastgpt/global/common/string/tools'; import { readDatasetSourceRawText } from '@fastgpt/service/core/dataset/read'; import { NextAPI } from '@/service/middleware/entry'; import { ApiRequestProps } from '@fastgpt/service/type/next'; -import { delOnlyCollection } from '@fastgpt/service/core/dataset/collection/controller'; +import { delCollection } from '@fastgpt/service/core/dataset/collection/controller'; import { authDatasetCollection } from '@fastgpt/service/support/permission/dataset/auth'; import { CommonErrEnum } from '@fastgpt/global/common/error/code/common'; import { ReadPermissionVal } from '@fastgpt/global/support/permission/constant'; @@ -89,6 +89,13 @@ async function handler( }); return mongoSessionRun(async (session) => { + await delCollection({ + collections: [collection], + session, + delImg: false, + delFile: false + }); + const { collectionId } = await createCollectionAndInsertData({ dataset: collection.dataset, rawText, @@ -121,10 +128,6 @@ async function handler( metadata: collection.metadata } }); - await delOnlyCollection({ - collections: [collection], - session - }); return { collectionId }; }); diff --git a/projects/app/src/pages/api/core/dataset/collection/delete.ts b/projects/app/src/pages/api/core/dataset/collection/delete.ts index 6b84213e4..c52006820 100644 --- a/projects/app/src/pages/api/core/dataset/collection/delete.ts +++ b/projects/app/src/pages/api/core/dataset/collection/delete.ts @@ -34,7 +34,8 @@ async function handler(req: NextApiRequest) { await mongoSessionRun((session) => delCollection({ collections, - delRelatedSource: true, + delImg: true, + delFile: true, session }) ); diff --git a/projects/app/src/service/common/system/cronTask.ts b/projects/app/src/service/common/system/cronTask.ts index d12dd1d7b..bba0b348b 100644 --- a/projects/app/src/service/common/system/cronTask.ts +++ b/projects/app/src/service/common/system/cronTask.ts @@ -1,15 +1,16 @@ import { BucketNameEnum } from '@fastgpt/global/common/file/constants'; +import { retryFn } from '@fastgpt/global/common/system/utils'; import { delFileByFileIdList, getGFSCollection } from '@fastgpt/service/common/file/gridfs/controller'; -import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun'; import { addLog } from '@fastgpt/service/common/system/log'; import { deleteDatasetDataVector, getVectorDataByTime } from '@fastgpt/service/common/vectorStore/controller'; import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema'; +import { MongoDatasetDataText } from '@fastgpt/service/core/dataset/data/dataTextSchema'; import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema'; import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema'; import { addDays } from 'date-fns'; @@ -129,32 +130,35 @@ export async function checkInvalidDatasetData(start: Date, end: Date) { for await (const item of list) { try { // 3. 查看该collection是否存在,不存在,则删除对应的数据 - const collection = await MongoDatasetCollection.findOne({ _id: item.collectionId }); + const collection = await MongoDatasetCollection.findOne( + { _id: item.collectionId }, + '_id' + ).lean(); if (!collection) { - await mongoSessionRun(async (session) => { - await MongoDatasetTraining.deleteMany( - { - teamId: item.teamId, - collectionId: item.collectionId - }, - { session } - ); - await MongoDatasetData.deleteMany( - { - teamId: item.teamId, - collectionId: item.collectionId - }, - { session } - ); + console.log('collection is not found', item); + + await retryFn(async () => { + await MongoDatasetTraining.deleteMany({ + teamId: item.teamId, + datasetId: item.datasetId, + collectionId: item.collectionId + }); + await MongoDatasetDataText.deleteMany({ + teamId: item.teamId, + datasetId: item.datasetId, + collectionId: item.collectionId + }); await deleteDatasetDataVector({ teamId: item.teamId, datasetIds: [item.datasetId], collectionIds: [item.collectionId] }); + await MongoDatasetData.deleteMany({ + teamId: item.teamId, + datasetId: item.datasetId, + collectionId: item.collectionId + }); }); - - console.log('collection is not found', item); - continue; } } catch (error) {} if (++index % 100 === 0) { diff --git a/projects/app/src/service/core/dataset/data/controller.ts b/projects/app/src/service/core/dataset/data/controller.ts index aec06b2e7..f6da24288 100644 --- a/projects/app/src/service/core/dataset/data/controller.ts +++ b/projects/app/src/service/core/dataset/data/controller.ts @@ -275,7 +275,8 @@ export async function updateData2Dataset({ export const deleteDatasetData = async (data: DatasetDataItemType) => { await mongoSessionRun(async (session) => { - await MongoDatasetData.findByIdAndDelete(data.id, { session }); + await MongoDatasetData.deleteOne({ _id: data.id }, { session }); + await MongoDatasetDataText.deleteMany({ dataId: data.id }, { session }); await deleteDatasetDataVector({ teamId: data.teamId, idList: data.indexes.map((item) => item.dataId) diff --git a/projects/app/src/web/core/ai/channel.ts b/projects/app/src/web/core/ai/channel.ts index f199d54df..7169b9106 100644 --- a/projects/app/src/web/core/ai/channel.ts +++ b/projects/app/src/web/core/ai/channel.ts @@ -154,7 +154,7 @@ export const deleteChannel = (id: number) => DELETE(`/channel/${id}`); export const getChannelLog = (params: { channel?: string; model_name?: string; - status?: 'all' | 'success' | 'error'; + code_type?: 'all' | 'success' | 'error'; start_timestamp: number; end_timestamp: number; offset: number; @@ -164,11 +164,13 @@ export const getChannelLog = (params: { logs: ChannelLogListItemType[]; total: number; }>(`/logs/search`, { - ...params, + channel: params.channel, + model_name: params.model_name, + code_type: params.code_type, + start_timestamp: params.start_timestamp, + end_timestamp: params.end_timestamp, p: Math.floor(params.offset / params.pageSize) + 1, - per_page: params.pageSize, - offset: undefined, - pageSize: undefined + per_page: params.pageSize }).then((res) => { return { list: res.logs,