mirror of
https://github.com/labring/FastGPT.git
synced 2025-08-02 20:58:12 +00:00
@@ -6,9 +6,11 @@ import {
|
||||
} from '@fastgpt/global/core/dataset/controller';
|
||||
import {
|
||||
insertDatasetDataVector,
|
||||
recallFromVectorStore
|
||||
recallFromVectorStore,
|
||||
updateDatasetDataVector
|
||||
} from '@fastgpt/service/common/vectorStore/controller';
|
||||
import {
|
||||
DatasetDataIndexTypeEnum,
|
||||
DatasetSearchModeEnum,
|
||||
DatasetSearchModeMap,
|
||||
SearchScoreTypeEnum
|
||||
@@ -20,7 +22,6 @@ import { deleteDatasetDataVector } from '@fastgpt/service/common/vectorStore/con
|
||||
import { getVectorsByText } from '@fastgpt/service/core/ai/embedding';
|
||||
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
|
||||
import {
|
||||
DatasetDataItemType,
|
||||
DatasetDataSchemaType,
|
||||
DatasetDataWithCollectionType,
|
||||
SearchDataResponseItemType
|
||||
@@ -34,7 +35,7 @@ import type {
|
||||
} from '@fastgpt/global/core/dataset/api.d';
|
||||
import { pushDataListToTrainingQueue } from '@fastgpt/service/core/dataset/training/controller';
|
||||
import { getVectorModel } from '../../ai/model';
|
||||
import { mongoSessionRun } from '@fastgpt/service/common/mongo/sessionRun';
|
||||
import { ModuleInputKeyEnum } from '@fastgpt/global/core/module/constants';
|
||||
|
||||
export async function pushDataToTrainingQueue(
|
||||
props: {
|
||||
@@ -77,7 +78,7 @@ export async function insertData2Dataset({
|
||||
return Promise.reject("teamId and tmbId can't be the same");
|
||||
}
|
||||
|
||||
const qaStr = getDefaultIndex({ q, a }).text;
|
||||
const qaStr = `${q}\n${a}`.trim();
|
||||
|
||||
// empty indexes check, if empty, create default index
|
||||
indexes =
|
||||
@@ -85,16 +86,10 @@ export async function insertData2Dataset({
|
||||
? indexes.map((index) => ({
|
||||
...index,
|
||||
dataId: undefined,
|
||||
defaultIndex: index.text.trim() === qaStr
|
||||
defaultIndex: indexes?.length === 1 && index.text === qaStr ? true : index.defaultIndex
|
||||
}))
|
||||
: [getDefaultIndex({ q, a })];
|
||||
|
||||
if (!indexes.find((index) => index.defaultIndex)) {
|
||||
indexes.unshift(getDefaultIndex({ q, a }));
|
||||
}
|
||||
|
||||
indexes = indexes.slice(0, 6);
|
||||
|
||||
// insert to vector store
|
||||
const result = await Promise.all(
|
||||
indexes.map((item) =>
|
||||
@@ -133,10 +128,8 @@ export async function insertData2Dataset({
|
||||
/**
|
||||
* update data
|
||||
* 1. compare indexes
|
||||
* 2. insert new pg data
|
||||
* session run:
|
||||
* 3. update mongo data(session run)
|
||||
* 4. delete old pg data
|
||||
* 2. update pg data
|
||||
* 3. update mongo data
|
||||
*/
|
||||
export async function updateData2Dataset({
|
||||
dataId,
|
||||
@@ -148,30 +141,31 @@ export async function updateData2Dataset({
|
||||
if (!Array.isArray(indexes)) {
|
||||
return Promise.reject('indexes is required');
|
||||
}
|
||||
const qaStr = getDefaultIndex({ q, a }).text;
|
||||
const qaStr = `${q}\n${a}`.trim();
|
||||
|
||||
// patch index and update pg
|
||||
const mongoData = await MongoDatasetData.findById(dataId);
|
||||
if (!mongoData) return Promise.reject('core.dataset.error.Data not found');
|
||||
|
||||
// remove defaultIndex
|
||||
let formatIndexes = indexes.map((index) => ({
|
||||
...index,
|
||||
text: index.text.trim(),
|
||||
defaultIndex: index.text.trim() === qaStr
|
||||
}));
|
||||
if (!formatIndexes.find((index) => index.defaultIndex)) {
|
||||
const defaultIndex = mongoData.indexes.find((index) => index.defaultIndex);
|
||||
formatIndexes.unshift(defaultIndex ? defaultIndex : getDefaultIndex({ q, a }));
|
||||
// make sure have one index
|
||||
if (indexes.length === 0) {
|
||||
const databaseDefaultIndex = mongoData.indexes.find((index) => index.defaultIndex);
|
||||
|
||||
indexes = [
|
||||
getDefaultIndex({
|
||||
q,
|
||||
a,
|
||||
dataId: databaseDefaultIndex ? String(databaseDefaultIndex.dataId) : undefined
|
||||
})
|
||||
];
|
||||
}
|
||||
formatIndexes = formatIndexes.slice(0, 6);
|
||||
|
||||
// patch indexes, create, update, delete
|
||||
const patchResult: PatchIndexesProps[] = [];
|
||||
|
||||
// find database indexes in new Indexes, if have not, delete it
|
||||
for (const item of mongoData.indexes) {
|
||||
const index = formatIndexes.find((index) => index.dataId === item.dataId);
|
||||
const index = indexes.find((index) => index.dataId === item.dataId);
|
||||
if (!index) {
|
||||
patchResult.push({
|
||||
type: 'delete',
|
||||
@@ -179,34 +173,35 @@ export async function updateData2Dataset({
|
||||
});
|
||||
}
|
||||
}
|
||||
for (const item of formatIndexes) {
|
||||
for (const item of indexes) {
|
||||
const index = mongoData.indexes.find((index) => index.dataId === item.dataId);
|
||||
// in database, update
|
||||
if (index) {
|
||||
// default index update
|
||||
if (index.defaultIndex && index.text !== qaStr) {
|
||||
patchResult.push({
|
||||
type: 'update',
|
||||
index: {
|
||||
//@ts-ignore
|
||||
...index.toObject(),
|
||||
text: qaStr
|
||||
}
|
||||
});
|
||||
continue;
|
||||
}
|
||||
// custom index update
|
||||
// manual update index
|
||||
if (index.text !== item.text) {
|
||||
patchResult.push({
|
||||
type: 'update',
|
||||
index: item
|
||||
});
|
||||
continue;
|
||||
} else if (index.defaultIndex && index.text !== qaStr) {
|
||||
// update default index
|
||||
patchResult.push({
|
||||
type: 'update',
|
||||
index: {
|
||||
...item,
|
||||
type:
|
||||
item.type === DatasetDataIndexTypeEnum.qa && !a
|
||||
? DatasetDataIndexTypeEnum.chunk
|
||||
: item.type,
|
||||
text: qaStr
|
||||
}
|
||||
});
|
||||
} else {
|
||||
patchResult.push({
|
||||
type: 'unChange',
|
||||
index: item
|
||||
});
|
||||
}
|
||||
patchResult.push({
|
||||
type: 'unChange',
|
||||
index: item
|
||||
});
|
||||
} else {
|
||||
// not in database, create
|
||||
patchResult.push({
|
||||
@@ -220,12 +215,10 @@ export async function updateData2Dataset({
|
||||
mongoData.updateTime = new Date();
|
||||
await mongoData.save();
|
||||
|
||||
// insert vector
|
||||
const clonePatchResult2Insert: PatchIndexesProps[] = JSON.parse(JSON.stringify(patchResult));
|
||||
const insertResult = await Promise.all(
|
||||
clonePatchResult2Insert.map(async (item) => {
|
||||
// insert new vector and update dateId
|
||||
if (item.type === 'create' || item.type === 'update') {
|
||||
// update vector
|
||||
const result = await Promise.all(
|
||||
patchResult.map(async (item) => {
|
||||
if (item.type === 'create') {
|
||||
const result = await insertDatasetDataVector({
|
||||
query: item.index.text,
|
||||
model: getVectorModel(model),
|
||||
@@ -236,54 +229,50 @@ export async function updateData2Dataset({
|
||||
item.index.dataId = result.insertId;
|
||||
return result;
|
||||
}
|
||||
if (item.type === 'update' && item.index.dataId) {
|
||||
const result = await updateDatasetDataVector({
|
||||
teamId: mongoData.teamId,
|
||||
datasetId: mongoData.datasetId,
|
||||
collectionId: mongoData.collectionId,
|
||||
id: item.index.dataId,
|
||||
query: item.index.text,
|
||||
model: getVectorModel(model)
|
||||
});
|
||||
item.index.dataId = result.insertId;
|
||||
|
||||
return result;
|
||||
}
|
||||
if (item.type === 'delete' && item.index.dataId) {
|
||||
await deleteDatasetDataVector({
|
||||
teamId: mongoData.teamId,
|
||||
id: item.index.dataId
|
||||
});
|
||||
return {
|
||||
charsLength: 0
|
||||
};
|
||||
}
|
||||
return {
|
||||
charsLength: 0
|
||||
};
|
||||
})
|
||||
);
|
||||
const charsLength = insertResult.reduce((acc, cur) => acc + cur.charsLength, 0);
|
||||
// console.log(clonePatchResult2Insert);
|
||||
await mongoSessionRun(async (session) => {
|
||||
// update mongo
|
||||
const newIndexes = clonePatchResult2Insert
|
||||
.filter((item) => item.type !== 'delete')
|
||||
.map((item) => item.index);
|
||||
// update mongo other data
|
||||
mongoData.q = q || mongoData.q;
|
||||
mongoData.a = a ?? mongoData.a;
|
||||
mongoData.fullTextToken = jiebaSplit({ text: mongoData.q + mongoData.a });
|
||||
// @ts-ignore
|
||||
mongoData.indexes = newIndexes;
|
||||
await mongoData.save({ session });
|
||||
|
||||
// delete vector
|
||||
const deleteIdList = patchResult
|
||||
.filter((item) => item.type === 'delete' || item.type === 'update')
|
||||
.map((item) => item.index.dataId)
|
||||
.filter(Boolean);
|
||||
if (deleteIdList.length > 0) {
|
||||
await deleteDatasetDataVector({
|
||||
teamId: mongoData.teamId,
|
||||
idList: deleteIdList as string[]
|
||||
});
|
||||
}
|
||||
});
|
||||
const charsLength = result.reduce((acc, cur) => acc + cur.charsLength, 0);
|
||||
const newIndexes = patchResult.filter((item) => item.type !== 'delete').map((item) => item.index);
|
||||
|
||||
// update mongo other data
|
||||
mongoData.q = q || mongoData.q;
|
||||
mongoData.a = a ?? mongoData.a;
|
||||
mongoData.fullTextToken = jiebaSplit({ text: mongoData.q + mongoData.a });
|
||||
// @ts-ignore
|
||||
mongoData.indexes = newIndexes;
|
||||
await mongoData.save();
|
||||
|
||||
return {
|
||||
charsLength
|
||||
};
|
||||
}
|
||||
|
||||
export const deleteDatasetData = async (data: DatasetDataItemType) => {
|
||||
await mongoSessionRun(async (session) => {
|
||||
await MongoDatasetData.findByIdAndDelete(data.id, { session });
|
||||
await deleteDatasetDataVector({
|
||||
teamId: data.teamId,
|
||||
idList: data.indexes.map((item) => item.dataId)
|
||||
});
|
||||
});
|
||||
};
|
||||
|
||||
type SearchDatasetDataProps = {
|
||||
teamId: string;
|
||||
model: string;
|
||||
@@ -388,7 +377,7 @@ export async function searchDatasetData(props: SearchDatasetDataProps) {
|
||||
a: data.a,
|
||||
chunkIndex: data.chunkIndex,
|
||||
datasetId: String(data.datasetId),
|
||||
collectionId: String(data.collectionId?._id),
|
||||
collectionId: String(data.collectionId._id),
|
||||
sourceName: data.collectionId.name || '',
|
||||
sourceId: data.collectionId?.fileId || data.collectionId?.rawLink,
|
||||
score: [{ type: SearchScoreTypeEnum.embedding, value: data.score, index }]
|
||||
@@ -492,7 +481,7 @@ export async function searchDatasetData(props: SearchDatasetDataProps) {
|
||||
}))
|
||||
});
|
||||
|
||||
if (results.length === 0) {
|
||||
if (!Array.isArray(results)) {
|
||||
usingReRank = false;
|
||||
return [];
|
||||
}
|
||||
|
Reference in New Issue
Block a user