feat: operation index (#5056)

* feat: operation index

* fix: delete update vector

* perf: Clear invalid data

* perf: index

* perf: cleare invalid data

* index
This commit is contained in:
Archer
2025-06-18 00:46:31 +08:00
committed by GitHub
parent 6060543222
commit 6b2ea696c5
8 changed files with 84 additions and 31 deletions

View File

@@ -2,16 +2,20 @@ import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@fastgpt/service/common/response';
import { authCert } from '@fastgpt/service/support/permission/auth/common';
import { addHours } from 'date-fns';
import { MongoImage } from '@fastgpt/service/common/file/image/schema';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
import {
checkInvalidDatasetFiles,
checkInvalidDatasetData,
checkInvalidVector
} from '@/service/common/system/cronTask';
import dayjs from 'dayjs';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { NextAPI } from '@/service/middleware/entry';
import { useIPFrequencyLimit } from '@fastgpt/service/common/middle/reqFrequencyLimit';
import { MongoImage } from '@fastgpt/service/common/file/image/schema';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
let deleteImageAmount = 0;
async function checkInvalidImg(start: Date, end: Date, limit = 50) {
async function checkInvalidImg(start: Date, end: Date) {
const images = await MongoImage.find(
{
createTime: {
@@ -52,8 +56,8 @@ async function checkInvalidImg(start: Date, end: Date, limit = 50) {
console.log(`检测完成,共删除 ${deleteImageAmount} 个无效图片`);
}
/* pg 中的数据搬到 mongo dataset.datas 中,并做映射 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
async function handler(req: NextApiRequest, res: NextApiResponse) {
deleteImageAmount = 0;
try {
await authCert({ req, authRoot: true });
const { start = -2, end = -360 * 24 } = req.body as { start: number; end: number };
@@ -61,13 +65,37 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
(async () => {
try {
console.log('执行脏数据清理任务');
// 360天 ~ 2小时前
const endTime = addHours(new Date(), start);
const startTime = addHours(new Date(), end);
await checkInvalidDatasetFiles(startTime, endTime);
await checkInvalidImg(startTime, endTime);
await checkInvalidDatasetData(startTime, endTime);
await checkInvalidVector(startTime, endTime);
// Split time range into 6-hour chunks to avoid processing too much data at once
const totalHours = Math.abs(start - end);
const chunkHours = 6;
const chunks = Math.ceil(totalHours / chunkHours);
console.log(
`Total time range: ${totalHours} hours, split into ${chunks} chunks of ${chunkHours} hours each`
);
for (let i = 0; i < chunks; i++) {
const chunkStart = start - i * chunkHours;
const chunkEnd = Math.max(start - (i + 1) * chunkHours, end);
const chunkEndTime = addHours(new Date(), chunkStart);
const chunkStartTime = addHours(new Date(), chunkEnd);
console.log(
`Processing chunk ${i + 1}/${chunks}: ${dayjs(chunkStartTime).format(
'YYYY-MM-DD HH:mm'
)} to ${dayjs(chunkEndTime).format('YYYY-MM-DD HH:mm')}`
);
await retryFn(() => checkInvalidDatasetFiles(chunkStartTime, chunkEndTime));
await retryFn(() => checkInvalidImg(chunkStartTime, chunkEndTime));
await retryFn(() => checkInvalidDatasetData(chunkStartTime, chunkEndTime));
await retryFn(() => checkInvalidVector(chunkStartTime, chunkEndTime));
console.log(`Chunk ${i + 1}/${chunks} completed`);
}
console.log('执行脏数据清理任务完毕');
} catch (error) {
console.log('执行脏数据清理任务出错了');
@@ -86,3 +114,5 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
});
}
}
export default NextAPI(useIPFrequencyLimit({ id: 'admin-api', seconds: 60, limit: 1 }), handler);

View File

@@ -145,16 +145,20 @@ export async function checkInvalidDatasetData(start: Date, end: Date) {
datasetId: item.datasetId,
collectionId: item.collectionId
});
await MongoDatasetDataText.deleteMany({
teamId: item.teamId,
datasetId: item.datasetId,
collectionId: item.collectionId
});
await deleteDatasetDataVector({
teamId: item.teamId,
datasetIds: [item.datasetId],
collectionIds: [item.collectionId]
});
await Promise.all([
MongoDatasetDataText.deleteMany({
teamId: item.teamId,
datasetId: item.datasetId,
collectionId: item.collectionId
}),
deleteDatasetDataVector({
teamId: item.teamId,
datasetIds: [item.datasetId],
collectionIds: [item.collectionId]
})
]);
await MongoDatasetData.deleteMany({
teamId: item.teamId,
datasetId: item.datasetId,

View File

@@ -318,6 +318,11 @@ export async function updateData2Dataset({
}
}
const deleteVectorIdList = patchResult
.filter((item) => item.type === 'delete' || item.type === 'update')
.map((item) => item.index.dataId)
.filter(Boolean) as string[];
// 4. Update mongo updateTime(便于脏数据检查器识别)
const updateTime = mongoData.updateTime;
mongoData.updateTime = new Date();
@@ -377,14 +382,10 @@ export async function updateData2Dataset({
);
// Delete vector
const deleteIdList = patchResult
.filter((item) => item.type === 'delete' || item.type === 'update')
.map((item) => item.index.dataId)
.filter(Boolean) as string[];
if (deleteIdList.length > 0) {
if (deleteVectorIdList.length > 0) {
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteIdList
idList: deleteVectorIdList
});
}
});