This commit is contained in:
Archer
2023-11-15 11:36:25 +08:00
committed by GitHub
parent 592e1a93a2
commit bfd8be5df0
181 changed files with 2499 additions and 1552 deletions

View File

@@ -1,106 +1,19 @@
import { PgDatasetTableName } from '@fastgpt/global/core/dataset/constant';
import { getVectorsByText } from '@/service/core/ai/vector';
import { PgClient } from '@fastgpt/service/common/pg';
import { delay } from '@/utils/tools';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import {
DatasetDataItemType,
PgDataItemType,
PgRawDataItemType
} from '@fastgpt/global/core/dataset/type';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
CreateDatasetDataProps,
PatchIndexesProps,
UpdateDatasetDataProps
} from '@fastgpt/global/core/dataset/controller';
import { deletePgDataById, insertData2Pg, updatePgDataById } from './pg';
import { Types } from 'mongoose';
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/constant';
import { getDefaultIndex } from '@fastgpt/global/core/dataset/utils';
export async function formatPgRawData(data: PgRawDataItemType) {
return {
id: data.id,
q: data.q,
a: data.a,
teamId: data.team_id,
tmbId: data.tmb_id,
datasetId: data.dataset_id,
collectionId: data.collection_id
};
}
/* get */
export async function getDatasetPgData({ id }: { id: string }): Promise<PgDataItemType> {
const { rows } = await PgClient.select<PgRawDataItemType>(PgDatasetTableName, {
fields: ['id', 'q', 'a', 'team_id', 'tmb_id', 'dataset_id', 'collection_id'],
where: [['id', id]],
limit: 1
});
const row = rows[0];
if (!row) return Promise.reject('Data not found');
return formatPgRawData(row);
}
export async function getPgDataWithCollection({
pgDataList
}: {
pgDataList: PgRawDataItemType[];
}): Promise<DatasetDataItemType[]> {
const collections = await MongoDatasetCollection.find(
{
_id: { $in: pgDataList.map((item) => item.collection_id) }
},
'_id name datasetId metadata'
).lean();
return pgDataList.map((item) => {
const collection = collections.find(
(collection) => String(collection._id) === item.collection_id
);
return {
id: item.id,
q: item.q,
a: item.a,
datasetId: collection?.datasetId || '',
collectionId: item.collection_id,
sourceName: collection?.name || '',
sourceId: collection?.metadata?.fileId || collection?.metadata?.rawLink
};
});
}
type Props = {
q: string;
a?: string;
model: string;
};
/**
* update a or a
/* insert data.
* 1. create data id
* 2. insert pg
* 3. create mongo data
*/
export async function updateData2Dataset({ dataId, q, a = '', model }: Props & { dataId: string }) {
const { vectors = [], tokenLen = 0 } = await (async () => {
if (q) {
return getVectorsByText({
input: [q],
model
});
}
return { vectors: [[]], tokenLen: 0 };
})();
await PgClient.update(PgDatasetTableName, {
where: [['id', dataId]],
values: [
{ key: 'a', value: a.replace(/'/g, '"') },
...(q
? [
{ key: 'q', value: q.replace(/'/g, '"') },
{ key: 'vector', value: `[${vectors[0]}]` }
]
: [])
]
});
return {
vectors,
tokenLen
};
}
/* insert data to pg */
export async function insertData2Dataset({
teamId,
tmbId,
@@ -108,61 +21,215 @@ export async function insertData2Dataset({
collectionId,
q,
a = '',
indexes,
model
}: Props & {
teamId: string;
tmbId: string;
datasetId: string;
collectionId: string;
}: CreateDatasetDataProps & {
model: string;
}) {
if (!q || !datasetId || !collectionId || !model) {
return Promise.reject('q, datasetId, collectionId, model is required');
}
const { vectors, tokenLen } = await getVectorsByText({
model,
input: [q]
q = q.trim();
a = a.trim();
const id = new Types.ObjectId();
const qaStr = `${q}\n${a}`.trim();
// empty indexes check, if empty, create default index
indexes =
Array.isArray(indexes) && indexes.length > 0
? indexes.map((index) => ({
...index,
dataId: undefined,
defaultIndex: indexes?.length === 1 && index.text === qaStr ? true : index.defaultIndex
}))
: [getDefaultIndex({ q, a })];
// insert to pg
const result = await Promise.all(
indexes.map((item) =>
insertData2Pg({
mongoDataId: String(id),
input: item.text,
model,
teamId,
tmbId,
datasetId,
collectionId
})
)
);
// create mongo
const { _id } = await MongoDatasetData.create({
_id: id,
teamId,
tmbId,
datasetId,
collectionId,
q,
a,
indexes: indexes.map((item, i) => ({
...item,
dataId: result[i].insertId
}))
});
let retry = 2;
async function insertPg(): Promise<string> {
try {
const { rows } = await PgClient.insert(PgDatasetTableName, {
values: [
[
{ key: 'vector', value: `[${vectors[0]}]` },
{ key: 'team_id', value: String(teamId) },
{ key: 'tmb_id', value: String(tmbId) },
{ key: 'q', value: q },
{ key: 'a', value: a },
{ key: 'dataset_id', value: datasetId },
{ key: 'collection_id', value: collectionId }
]
]
});
return rows[0].id;
} catch (error) {
if (--retry < 0) {
return Promise.reject(error);
}
await delay(500);
return insertPg();
}
}
const insertId = await insertPg();
return {
insertId,
tokenLen,
vectors
insertId: _id,
tokenLen: result.reduce((acc, cur) => acc + cur.tokenLen, 0)
};
}
/**
* delete data by collectionIds
* update data
* 1. compare indexes
* 2. update pg data
* 3. update mongo data
*/
export async function updateData2Dataset({
dataId,
q,
a,
indexes,
model
}: UpdateDatasetDataProps & { model: string }) {
if (!Array.isArray(indexes)) {
return Promise.reject('indexes is required');
}
const qaStr = `${q}\n${a}`.trim();
// patch index and update pg
const mongoData = await MongoDatasetData.findById(dataId);
if (!mongoData) return Promise.reject('Data not found');
// make sure have one index
if (indexes.length === 0) {
const databaseDefaultIndex = mongoData.indexes.find((index) => index.defaultIndex);
indexes = [
getDefaultIndex({
q,
a,
dataId: databaseDefaultIndex ? String(databaseDefaultIndex.dataId) : undefined
})
];
}
// patch indexes, create, update, delete
const patchResult: PatchIndexesProps[] = [];
// find database indexes in new Indexes, if have not, delete it
for (const item of mongoData.indexes) {
const index = indexes.find((index) => index.dataId === item.dataId);
if (!index) {
patchResult.push({
type: 'delete',
index: item
});
}
}
for (const item of indexes) {
const index = mongoData.indexes.find((index) => index.dataId === item.dataId);
// in database, update
if (index) {
// manual update index
if (index.text !== item.text) {
patchResult.push({
type: 'update',
index: item
});
} else if (index.defaultIndex && index.text !== qaStr) {
// update default index
patchResult.push({
type: 'update',
index: {
...item,
type:
item.type === DatasetDataIndexTypeEnum.qa && !a
? DatasetDataIndexTypeEnum.chunk
: item.type,
text: qaStr
}
});
}
} else {
// not in database, create
patchResult.push({
type: 'create',
index: item
});
}
}
const result = await Promise.all(
patchResult.map(async (item) => {
if (item.type === 'create') {
const result = await insertData2Pg({
mongoDataId: dataId,
input: item.index.text,
model,
teamId: mongoData.teamId,
tmbId: mongoData.tmbId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
item.index.dataId = result.insertId;
return result;
}
if (item.type === 'update' && item.index.dataId) {
return updatePgDataById({
id: item.index.dataId,
input: item.index.text,
model
});
}
if (item.type === 'delete' && item.index.dataId) {
return deletePgDataById(['id', item.index.dataId]);
}
return {
tokenLen: 0
};
})
);
const tokenLen = result.reduce((acc, cur) => acc + cur.tokenLen, 0);
// update mongo
mongoData.q = q || mongoData.q;
mongoData.a = a ?? mongoData.a;
// @ts-ignore
mongoData.indexes = indexes;
await mongoData.save();
return {
tokenLen
};
}
/* delete all data by datasetIds */
export async function delDataByDatasetId({ datasetIds }: { datasetIds: string[] }) {
datasetIds = datasetIds.map((item) => String(item));
// delete pg data
await deletePgDataById(`dataset_id IN ('${datasetIds.join("','")}')`);
// delete dataset.datas
await MongoDatasetData.deleteMany({ datasetId: { $in: datasetIds } });
}
/**
* delete all data by collectionIds
*/
export async function delDataByCollectionId({ collectionIds }: { collectionIds: string[] }) {
const ids = collectionIds.map((item) => String(item));
return PgClient.delete(PgDatasetTableName, {
where: [`collection_id IN ('${ids.join("','")}')`]
});
// delete pg data
await deletePgDataById(`collection_id IN ('${ids.join("','")}')`);
// delete dataset.datas
await MongoDatasetData.deleteMany({ collectionId: { $in: ids } });
}
/**
* delete one data by mongoDataId
*/
export async function deleteDataByDataId(mongoDataId: string) {
await deletePgDataById(['data_id', mongoDataId]);
await MongoDatasetData.findByIdAndDelete(mongoDataId);
}

View File

@@ -0,0 +1,281 @@
import { PgDatasetTableName } from '@fastgpt/global/core/dataset/constant';
import type { SearchDataResponseItemType } from '@fastgpt/global/core/dataset/type.d';
import { PgClient } from '@fastgpt/service/common/pg';
import { getVectorsByText } from '@/service/core/ai/vector';
import { delay } from '@/utils/tools';
import { PgSearchRawType } from '@fastgpt/global/core/dataset/api';
import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection/schema';
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
import { POST } from '@fastgpt/service/common/api/plusRequest';
import { PostReRankResponse } from '@fastgpt/global/core/ai/api';
export async function insertData2Pg({
mongoDataId,
input,
model,
teamId,
tmbId,
datasetId,
collectionId
}: {
mongoDataId: string;
input: string;
model: string;
teamId: string;
tmbId: string;
datasetId: string;
collectionId: string;
}) {
let retry = 2;
async function insertPg(): Promise<{ insertId: string; vectors: number[][]; tokenLen: number }> {
try {
// get vector
const { vectors, tokenLen } = await getVectorsByText({
model,
input: [input]
});
const { rows } = await PgClient.insert(PgDatasetTableName, {
values: [
[
{ key: 'vector', value: `[${vectors[0]}]` },
{ key: 'team_id', value: String(teamId) },
{ key: 'tmb_id', value: String(tmbId) },
{ key: 'dataset_id', value: datasetId },
{ key: 'collection_id', value: collectionId },
{ key: 'data_id', value: String(mongoDataId) }
]
]
});
return {
insertId: rows[0].id,
vectors,
tokenLen
};
} catch (error) {
if (--retry < 0) {
return Promise.reject(error);
}
await delay(500);
return insertPg();
}
}
return insertPg();
}
export async function updatePgDataById({
id,
input,
model
}: {
id: string;
input: string;
model: string;
}) {
let retry = 2;
async function updatePg(): Promise<{ vectors: number[][]; tokenLen: number }> {
try {
// get vector
const { vectors, tokenLen } = await getVectorsByText({
model,
input: [input]
});
// update pg
await PgClient.update(PgDatasetTableName, {
where: [['id', id]],
values: [{ key: 'vector', value: `[${vectors[0]}]` }]
});
return {
vectors,
tokenLen
};
} catch (error) {
if (--retry < 0) {
return Promise.reject(error);
}
await delay(500);
return updatePg();
}
}
return updatePg();
}
export async function deletePgDataById(
where: ['id' | 'dataset_id' | 'collection_id' | 'data_id', string] | string
) {
let retry = 2;
async function deleteData(): Promise<any> {
try {
await PgClient.delete(PgDatasetTableName, {
where: [where]
});
} catch (error) {
if (--retry < 0) {
return Promise.reject(error);
}
await delay(500);
return deleteData();
}
}
await deleteData();
return {
tokenLen: 0
};
}
// search
export async function searchDatasetData({
text,
model,
similarity = 0,
limit,
datasetIds = []
}: {
text: string;
model: string;
similarity?: number; // min distance
limit: number;
datasetIds: string[];
}) {
const { vectors, tokenLen } = await getVectorsByText({
model,
input: [text]
});
const minLimit = global.systemEnv.pluginBaseUrl ? Math.max(50, limit * 4) : limit * 2;
const results: any = await PgClient.query(
`BEGIN;
SET LOCAL hnsw.ef_search = ${global.systemEnv.pgHNSWEfSearch || 100};
select id, collection_id, data_id, (vector <#> '[${
vectors[0]
}]') * -1 AS score from ${PgDatasetTableName} where dataset_id IN (${datasetIds
.map((id) => `'${String(id)}'`)
.join(',')}) AND vector <#> '[${vectors[0]}]' < -${similarity} order by vector <#> '[${
vectors[0]
}]' limit ${minLimit};
COMMIT;`
);
const rows = results?.[2]?.rows as PgSearchRawType[];
// concat same data_id
const filterRows: PgSearchRawType[] = [];
let set = new Set<string>();
for (const row of rows) {
if (!set.has(row.data_id)) {
filterRows.push(row);
set.add(row.data_id);
}
}
// get q and a
const [collections, dataList] = await Promise.all([
MongoDatasetCollection.find(
{
_id: { $in: filterRows.map((item) => item.collection_id) }
},
'name metadata'
).lean(),
MongoDatasetData.find(
{
_id: { $in: filterRows.map((item) => item.data_id?.trim()) }
},
'datasetId collectionId q a indexes'
).lean()
]);
const formatResult = filterRows
.map((item) => {
const collection = collections.find(
(collection) => String(collection._id) === item.collection_id
);
const data = dataList.find((data) => String(data._id) === item.data_id);
// if collection or data UnExist, the relational mongo data already deleted
if (!collection || !data) return null;
return {
id: String(data._id),
q: data.q,
a: data.a,
indexes: data.indexes,
datasetId: String(data.datasetId),
collectionId: String(data.collectionId),
sourceName: collection.name || '',
sourceId: collection.metadata?.fileId || collection.metadata?.rawLink,
score: item.score
};
})
.filter((item) => item !== null) as SearchDataResponseItemType[];
// remove same q and a data
set = new Set<string>();
const filterData = formatResult.filter((item) => {
const str = `${item.q}${item.a}`.trim();
if (set.has(str)) return false;
set.add(str);
return true;
});
// ReRank result
const reRankResult = await reRankSearchResult({
query: text,
data: filterData
});
// similarity filter
const filterReRankResult = reRankResult.filter((item) => item.score > similarity);
// concat rerank and embedding data
set = new Set<string>(filterReRankResult.map((item) => item.id));
const concatResult = filterReRankResult.concat(
filterData.filter((item) => {
if (set.has(item.id)) return false;
set.add(item.id);
return true;
})
);
return {
searchRes: concatResult.slice(0, limit),
tokenLen
};
}
// plus reRank search result
export async function reRankSearchResult({
data,
query
}: {
data: SearchDataResponseItemType[];
query: string;
}): Promise<SearchDataResponseItemType[]> {
if (!global.systemEnv.pluginBaseUrl) return data;
try {
const result = await POST<PostReRankResponse>('/core/ai/retrival/rerank', {
query,
inputs: data.map((item) => ({
id: item.id,
text: `${item.q}\n${item.a}`.trim()
}))
});
const mergeResult = result
.map((item) => {
const target = data.find((dataItem) => dataItem.id === item.id);
if (!target) return null;
return {
...target,
score: item.score ?? target.score
};
})
.filter((item) => item) as SearchDataResponseItemType[];
return mergeResult;
} catch (error) {
console.log(error);
return data;
}
}

View File

@@ -1,11 +1,5 @@
import { PgDatasetTableName } from '@fastgpt/global/core/dataset/constant';
import {
SearchDataResponseItemType,
SearchDataResultItemType
} from '@fastgpt/global/core/dataset/type';
import { PgClient } from '@fastgpt/service/common/pg';
import { getVectorsByText } from '../../ai/vector';
import { getPgDataWithCollection } from './controller';
/**
* Same value judgment
@@ -30,75 +24,3 @@ export async function hasSameValue({
return Promise.reject('已经存在完全一致的数据');
}
}
/**
* count one collection amount of total data
*/
export async function countCollectionData({
collectionIds,
datasetId
}: {
collectionIds: string[];
datasetId?: string;
}) {
collectionIds = collectionIds.map((item) => String(item));
if (collectionIds.length === 0) return [];
const { rows } = await PgClient.query(`
SELECT
${collectionIds
.map((id) => `SUM(CASE WHEN collection_id = '${id}' THEN 1 ELSE 0 END) AS count${id}`)
.join(',')}
FROM ${PgDatasetTableName}
WHERE collection_id IN (${collectionIds.map((id) => `'${id}'`).join(',')})
${datasetId ? `AND dataset_id='${String(datasetId)}` : ''}';
`);
const values = Object.values(rows[0]).map((item) => Number(item));
return values;
}
export async function searchDatasetData({
text,
model,
similarity = 0,
limit,
datasetIds = []
}: {
text: string;
model: string;
similarity?: number;
limit: number;
datasetIds: string[];
}) {
const { vectors, tokenLen } = await getVectorsByText({
model,
input: [text]
});
const results: any = await PgClient.query(
`BEGIN;
SET LOCAL hnsw.ef_search = ${global.systemEnv.pgHNSWEfSearch || 100};
select id, q, a, collection_id, (vector <#> '[${
vectors[0]
}]') * -1 AS score from ${PgDatasetTableName} where dataset_id IN (${datasetIds
.map((id) => `'${String(id)}'`)
.join(',')}) AND vector <#> '[${vectors[0]}]' < -${similarity} order by vector <#> '[${
vectors[0]
}]' limit ${limit};
COMMIT;`
);
const rows = results?.[2]?.rows as SearchDataResultItemType[];
const collectionsData = await getPgDataWithCollection({ pgDataList: rows });
const searchRes: SearchDataResponseItemType[] = collectionsData.map((item, index) => ({
...item,
score: rows[index].score
}));
return {
searchRes,
tokenLen
};
}