mirror of
https://github.com/labring/FastGPT.git
synced 2025-08-01 03:48:24 +00:00
4.7.1-alpha (#1120)
Co-authored-by: heheer <71265218+newfish-cmyk@users.noreply.github.com>
This commit is contained in:
@@ -1,22 +1,61 @@
|
||||
import { setCron } from '@fastgpt/service/common/system/cron';
|
||||
import { startTrainingQueue } from '@/service/core/dataset/training/utils';
|
||||
import { clearTmpUploadFiles } from '@fastgpt/service/common/file/utils';
|
||||
import { checkInvalidDatasetFiles, checkInvalidDatasetData, checkInvalidVector } from './cronTask';
|
||||
import { checkTimerLock } from '@fastgpt/service/common/system/timerLock/utils';
|
||||
import { TimerIdEnum } from '@fastgpt/service/common/system/timerLock/constants';
|
||||
import { addHours } from 'date-fns';
|
||||
|
||||
export const startCron = () => {
|
||||
setTrainingQueueCron();
|
||||
setClearTmpUploadFilesCron();
|
||||
};
|
||||
|
||||
export const setTrainingQueueCron = () => {
|
||||
const setTrainingQueueCron = () => {
|
||||
setCron('*/1 * * * *', () => {
|
||||
startTrainingQueue();
|
||||
});
|
||||
};
|
||||
|
||||
export const setClearTmpUploadFilesCron = () => {
|
||||
clearTmpUploadFiles();
|
||||
const setClearTmpUploadFilesCron = () => {
|
||||
// Clear tmp upload files every ten minutes
|
||||
setCron('*/10 * * * *', () => {
|
||||
clearTmpUploadFiles();
|
||||
});
|
||||
};
|
||||
|
||||
const clearInvalidDataCron = () => {
|
||||
setCron('0 */1 * * *', async () => {
|
||||
if (
|
||||
await checkTimerLock({
|
||||
timerId: TimerIdEnum.checkInValidDatasetFiles,
|
||||
lockMinuted: 59
|
||||
})
|
||||
) {
|
||||
checkInvalidDatasetFiles(addHours(new Date(), 2), addHours(new Date(), 6));
|
||||
}
|
||||
});
|
||||
|
||||
setCron('10 */1 * * *', async () => {
|
||||
if (
|
||||
await checkTimerLock({
|
||||
timerId: TimerIdEnum.checkInvalidDatasetData,
|
||||
lockMinuted: 59
|
||||
})
|
||||
) {
|
||||
checkInvalidDatasetData(addHours(new Date(), 2), addHours(new Date(), 6));
|
||||
}
|
||||
});
|
||||
|
||||
setCron('30 */1 * * *', async () => {
|
||||
if (
|
||||
await checkTimerLock({
|
||||
timerId: TimerIdEnum.checkInvalidVector,
|
||||
lockMinuted: 59
|
||||
})
|
||||
) {
|
||||
checkInvalidVector(addHours(new Date(), 2), addHours(new Date(), 6));
|
||||
}
|
||||
});
|
||||
};
|
||||
|
||||
export const startCron = () => {
|
||||
setTrainingQueueCron();
|
||||
setClearTmpUploadFilesCron();
|
||||
clearInvalidDataCron();
|
||||
};
|
||||
|
157
projects/app/src/service/common/system/cronTask.ts
Normal file
157
projects/app/src/service/common/system/cronTask.ts
Normal file
@@ -0,0 +1,157 @@
|
||||
import {
|
||||
delFileByFileIdList,
|
||||
getGFSCollection
|
||||
} from '@fastgpt/service/common/file/gridfs/controller';
|
||||
import { addLog } from '@fastgpt/service/common/system/log';
|
||||
import {
|
||||
deleteDatasetDataVector,
|
||||
getVectorDataByTime
|
||||
} from '@fastgpt/service/common/vectorStore/controller';
|
||||
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
|
||||
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
|
||||
import { MongoDatasetTraining } from '@fastgpt/service/core/dataset/training/schema';
|
||||
|
||||
/*
|
||||
check dataset.files data. If there is no match in dataset.collections, delete it
|
||||
可能异常情况
|
||||
1. 上传了文件,未成功创建集合
|
||||
*/
|
||||
export async function checkInvalidDatasetFiles(start: Date, end: Date) {
|
||||
let deleteFileAmount = 0;
|
||||
const collection = getGFSCollection('dataset');
|
||||
const where = {
|
||||
uploadDate: { $gte: start, $lte: end }
|
||||
};
|
||||
|
||||
// 1. get all file _id
|
||||
const files = await collection
|
||||
.find(where, {
|
||||
projection: {
|
||||
metadata: 1,
|
||||
_id: 1
|
||||
}
|
||||
})
|
||||
.toArray();
|
||||
addLog.info(`Clear invalid dataset files, total files: ${files.length}`);
|
||||
|
||||
let index = 0;
|
||||
for await (const file of files) {
|
||||
try {
|
||||
// 2. find fileId in dataset.collections
|
||||
const hasCollection = await MongoDatasetCollection.countDocuments({
|
||||
teamId: file.metadata.teamId,
|
||||
fileId: file._id
|
||||
});
|
||||
|
||||
// 3. if not found, delete file
|
||||
if (hasCollection === 0) {
|
||||
await delFileByFileIdList({ bucketName: 'dataset', fileIdList: [String(file._id)] });
|
||||
console.log('delete file', file._id);
|
||||
deleteFileAmount++;
|
||||
}
|
||||
index++;
|
||||
index % 100 === 0 && console.log(index);
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
}
|
||||
addLog.info(`Clear invalid dataset files finish, remove ${deleteFileAmount} files`);
|
||||
}
|
||||
|
||||
/*
|
||||
检测无效的 Mongo 数据
|
||||
异常情况:
|
||||
1. 训练过程删除知识库,可能导致还会有新的数据继续插入,导致无效。
|
||||
*/
|
||||
export async function checkInvalidDatasetData(start: Date, end: Date) {
|
||||
// 1. 获取时间范围的所有data
|
||||
const rows = await MongoDatasetData.find(
|
||||
{
|
||||
updateTime: {
|
||||
$gte: start,
|
||||
$lte: end
|
||||
}
|
||||
},
|
||||
'_id teamId collectionId'
|
||||
).lean();
|
||||
|
||||
// 2. 合并所有的collectionId
|
||||
const map = new Map<string, { teamId: string; collectionId: string }>();
|
||||
for (const item of rows) {
|
||||
const collectionId = String(item.collectionId);
|
||||
if (!map.has(collectionId)) {
|
||||
map.set(collectionId, { teamId: item.teamId, collectionId });
|
||||
}
|
||||
}
|
||||
const list = Array.from(map.values());
|
||||
addLog.info(`Clear invalid dataset data, total collections: ${list.length}`);
|
||||
let index = 0;
|
||||
|
||||
for await (const item of list) {
|
||||
try {
|
||||
// 3. 查看该collection是否存在,不存在,则删除对应的数据
|
||||
const collection = await MongoDatasetCollection.findOne({ _id: item.collectionId });
|
||||
if (!collection) {
|
||||
const result = await Promise.all([
|
||||
MongoDatasetTraining.deleteMany({
|
||||
teamId: item.teamId,
|
||||
collectionId: item.collectionId
|
||||
}),
|
||||
MongoDatasetData.deleteMany({
|
||||
teamId: item.teamId,
|
||||
collectionId: item.collectionId
|
||||
}),
|
||||
deleteDatasetDataVector({
|
||||
teamId: item.teamId,
|
||||
collectionIds: [String(item.collectionId)]
|
||||
})
|
||||
]);
|
||||
console.log(result);
|
||||
console.log('collection is not found', item);
|
||||
continue;
|
||||
}
|
||||
} catch (error) {}
|
||||
console.log(++index);
|
||||
}
|
||||
}
|
||||
|
||||
export async function checkInvalidVector(start: Date, end: Date) {
|
||||
let deletedVectorAmount = 0;
|
||||
// 1. get all vector data
|
||||
const rows = await getVectorDataByTime(start, end);
|
||||
addLog.info(`Clear invalid vector, total vector data: ${rows.length}`);
|
||||
|
||||
let index = 0;
|
||||
|
||||
for await (const item of rows) {
|
||||
if (!item.teamId || !item.datasetId || !item.id) {
|
||||
addLog.error('error data', item);
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
// 2. find dataset.data
|
||||
const hasData = await MongoDatasetData.countDocuments({
|
||||
teamId: item.teamId,
|
||||
datasetId: item.datasetId,
|
||||
'indexes.dataId': item.id
|
||||
});
|
||||
|
||||
// 3. if not found, delete vector
|
||||
if (hasData === 0) {
|
||||
await deleteDatasetDataVector({
|
||||
teamId: item.teamId,
|
||||
id: item.id
|
||||
});
|
||||
console.log('delete vector data', item.id);
|
||||
deletedVectorAmount++;
|
||||
}
|
||||
|
||||
index++;
|
||||
index % 100 === 0 && console.log(index);
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
}
|
||||
|
||||
addLog.info(`Clear invalid vector finish, remove ${deletedVectorAmount} data`);
|
||||
}
|
@@ -44,8 +44,7 @@ export async function insertData2Dataset({
|
||||
indexes =
|
||||
Array.isArray(indexes) && indexes.length > 0
|
||||
? indexes.map((index) => ({
|
||||
// @ts-ignore
|
||||
...index.toObject(),
|
||||
text: index.text,
|
||||
dataId: undefined,
|
||||
defaultIndex: index.text.trim() === qaStr
|
||||
}))
|
||||
|
Reference in New Issue
Block a user