Change embedding (#1463)

* rebuild embedding queue

* dataset menu

* feat: rebuild data api

* feat: ui change embedding model

* dataset ui

* feat: rebuild index ui

* rename collection
This commit is contained in:
Archer
2024-05-13 14:51:42 +08:00
committed by GitHub
parent 59fd94384d
commit 80a84a5733
37 changed files with 1260 additions and 419 deletions

View File

@@ -28,7 +28,7 @@ const clearInvalidDataCron = () => {
lockMinuted: 59
})
) {
checkInvalidDatasetFiles(addHours(new Date(), 2), addHours(new Date(), 6));
checkInvalidDatasetFiles(addHours(new Date(), -6), addHours(new Date(), -2));
}
});
@@ -39,7 +39,7 @@ const clearInvalidDataCron = () => {
lockMinuted: 59
})
) {
checkInvalidDatasetData(addHours(new Date(), 2), addHours(new Date(), 6));
checkInvalidDatasetData(addHours(new Date(), -6), addHours(new Date(), -2));
}
});
@@ -50,7 +50,7 @@ const clearInvalidDataCron = () => {
lockMinuted: 59
})
) {
checkInvalidVector(addHours(new Date(), 2), addHours(new Date(), 6));
checkInvalidVector(addHours(new Date(), -6), addHours(new Date(), -2));
}
});
};

View File

@@ -11,6 +11,7 @@ import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/con
import { DatasetDataItemType } from '@fastgpt/global/core/dataset/type';
import { getVectorModel } from '@fastgpt/service/core/ai/model';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { ClientSession } from '@fastgpt/service/common/mongo';
/* insert data.
* 1. create data id
@@ -26,9 +27,11 @@ export async function insertData2Dataset({
a = '',
chunkIndex = 0,
indexes,
model
model,
session
}: CreateDatasetDataProps & {
model: string;
session?: ClientSession;
}) {
if (!q || !datasetId || !collectionId || !model) {
console.log(q, a, datasetId, collectionId, model);
@@ -70,20 +73,25 @@ export async function insertData2Dataset({
);
// create mongo data
const { _id } = await MongoDatasetData.create({
teamId,
tmbId,
datasetId,
collectionId,
q,
a,
fullTextToken: jiebaSplit({ text: qaStr }),
chunkIndex,
indexes: indexes?.map((item, i) => ({
...item,
dataId: result[i].insertId
}))
});
const [{ _id }] = await MongoDatasetData.create(
[
{
teamId,
tmbId,
datasetId,
collectionId,
q,
a,
fullTextToken: jiebaSplit({ text: qaStr }),
chunkIndex,
indexes: indexes?.map((item, i) => ({
...item,
dataId: result[i].insertId
}))
}
],
{ session }
);
return {
insertId: _id,

View File

@@ -46,7 +46,6 @@ export async function generateQA(): Promise<any> {
)
.select({
_id: 1,
userId: 1,
teamId: 1,
tmbId: 1,
datasetId: 1,

View File

@@ -6,6 +6,15 @@ import { checkTeamAiPointsAndLock } from './utils';
import { checkInvalidChunkAndLock } from '@fastgpt/service/core/dataset/training/utils';
import { addMinutes } from 'date-fns';
import { addLog } from '@fastgpt/service/common/system/log';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import {
deleteDatasetDataVector,
insertDatasetDataVector
} from '@fastgpt/service/common/vectorStore/controller';
import { getVectorModel } from '@fastgpt/service/core/ai/model';
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
import { DatasetTrainingSchemaType } from '@fastgpt/global/core/dataset/type';
import { Document } from '@fastgpt/service/common/mongo';
const reduceQueue = () => {
global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0;
@@ -23,7 +32,6 @@ export async function generateVector(): Promise<any> {
// get training data
const {
data,
dataItem,
done = false,
error = false
} = await (async () => {
@@ -38,7 +46,6 @@ export async function generateVector(): Promise<any> {
}
).select({
_id: 1,
userId: 1,
teamId: 1,
tmbId: 1,
datasetId: 1,
@@ -46,6 +53,7 @@ export async function generateVector(): Promise<any> {
q: 1,
a: 1,
chunkIndex: 1,
dataId: 1,
indexes: 1,
model: 1,
billId: 1
@@ -58,12 +66,7 @@ export async function generateVector(): Promise<any> {
};
}
return {
data,
dataItem: {
q: data.q,
a: data.a || '',
indexes: data.indexes
}
data
};
} catch (error) {
addLog.error(`Get Training Data error`, error);
@@ -93,28 +96,14 @@ export async function generateVector(): Promise<any> {
addLog.info(`[Vector Queue] Start`);
// create vector and insert
try {
// invalid data
if (!data.q.trim()) {
await data.deleteOne();
reduceQueue();
generateVector();
return;
}
// insert to dataset
const { tokens } = await insertData2Dataset({
teamId: data.teamId,
tmbId: data.tmbId,
datasetId: data.datasetId,
collectionId: data.collectionId,
q: dataItem.q,
a: dataItem.a,
chunkIndex: data.chunkIndex,
indexes: dataItem.indexes,
model: data.model
});
const { tokens } = await (async () => {
if (data.dataId) {
return rebuildData({ trainingData: data });
} else {
return insertData({ trainingData: data });
}
})();
// push usage
pushGenerateVectorUsage({
@@ -125,14 +114,12 @@ export async function generateVector(): Promise<any> {
billId: data.billId
});
// delete data from training
await data.deleteOne();
reduceQueue();
generateVector();
addLog.info(`[Vector Queue] Finish`, {
time: Date.now() - start
});
reduceQueue();
generateVector();
} catch (err: any) {
reduceQueue();
@@ -145,3 +132,152 @@ export async function generateVector(): Promise<any> {
}, 1000);
}
}
const rebuildData = async ({
trainingData
}: {
trainingData: Document<unknown, {}, DatasetTrainingSchemaType> &
Omit<
DatasetTrainingSchemaType &
Required<{
_id: string;
}>,
never
>;
}) => {
// find data
const mongoData = await MongoDatasetData.findById(
trainingData.dataId,
'indexes teamId datasetId collectionId'
);
if (!mongoData) {
await trainingData.deleteOne();
return Promise.reject('Not data');
}
const deleteVectorIdList = mongoData.indexes.map((index) => index.dataId);
const { tokens } = await mongoSessionRun(async (session) => {
// update vector, update dataset.data rebuilding status, delete data from training
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getVectorModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
// Ensure that the training data is deleted after the Mongo update is successful
await mongoData.save({ session });
await trainingData.deleteOne({ session });
// delete old vector
await deleteDatasetDataVector({
teamId: mongoData.teamId,
idList: deleteVectorIdList
});
return {
tokens: updateResult.reduce((acc, cur) => acc + cur.tokens, 0)
};
});
// find next data insert to training queue
const arr = new Array(5).fill(0);
for await (const _ of arr) {
try {
const hasNextData = await mongoSessionRun(async (session) => {
// get new mongoData insert to training
const newRebuildingData = await MongoDatasetData.findOneAndUpdate(
{
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
rebuilding: true
},
{
$unset: {
rebuilding: null
},
updateTime: new Date()
},
{ session }
).select({
_id: 1,
collectionId: 1
});
if (newRebuildingData) {
await MongoDatasetTraining.create(
[
{
teamId: mongoData.teamId,
tmbId: trainingData.tmbId,
datasetId: mongoData.datasetId,
collectionId: newRebuildingData.collectionId,
billId: trainingData.billId,
mode: TrainingModeEnum.chunk,
model: trainingData.model,
q: '1',
dataId: newRebuildingData._id
}
],
{ session }
);
}
return !!newRebuildingData;
});
if (!hasNextData) {
break;
}
} catch (error) {}
}
return { tokens };
};
const insertData = async ({
trainingData
}: {
trainingData: Document<unknown, {}, DatasetTrainingSchemaType> &
Omit<
DatasetTrainingSchemaType &
Required<{
_id: string;
}>,
never
>;
}) => {
const { tokens } = await mongoSessionRun(async (session) => {
// insert new data to dataset
const { tokens } = await insertData2Dataset({
teamId: trainingData.teamId,
tmbId: trainingData.tmbId,
datasetId: trainingData.datasetId,
collectionId: trainingData.collectionId,
q: trainingData.q,
a: trainingData.a,
chunkIndex: trainingData.chunkIndex,
indexes: trainingData.indexes,
model: trainingData.model,
session
});
// delete data from training
await trainingData.deleteOne({ session });
return {
tokens
};
});
return { tokens };
};

View File

@@ -1,10 +1,16 @@
import { jsonRes } from '@fastgpt/service/common/response';
import type { NextApiResponse, NextApiHandler, NextApiRequest } from 'next';
import type { NextApiResponse } from 'next';
import { connectToDatabase } from '../mongo';
import { withNextCors } from '@fastgpt/service/common/middle/cors';
import { ApiRequestProps } from '@fastgpt/service/type/next';
export type NextApiHandler<T = any> = (
req: ApiRequestProps,
res: NextApiResponse<T>
) => unknown | Promise<unknown>;
export const NextAPI = (...args: NextApiHandler[]): NextApiHandler => {
return async function api(req: NextApiRequest, res: NextApiResponse) {
return async function api(req: ApiRequestProps, res: NextApiResponse) {
try {
await Promise.all([withNextCors(req, res), connectToDatabase()]);