mirror of
https://github.com/labring/FastGPT.git
synced 2025-08-01 03:48:24 +00:00
4.6.2-alpha (#517)
This commit is contained in:
@@ -8,5 +8,5 @@ export const getChatModelNameListByModules = (modules: ModuleItemType[]): string
|
||||
const model = item.inputs.find((input) => input.key === 'model')?.value;
|
||||
return global.chatModels.find((item) => item.model === model)?.name || '';
|
||||
})
|
||||
.filter((item) => item);
|
||||
.filter(Boolean);
|
||||
};
|
||||
|
@@ -8,6 +8,7 @@ import { deletePgDataById, insertData2Pg, updatePgDataById } from './pg';
|
||||
import { Types } from 'mongoose';
|
||||
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/constant';
|
||||
import { getDefaultIndex } from '@fastgpt/global/core/dataset/utils';
|
||||
import { jiebaSplit } from '../utils';
|
||||
|
||||
/* insert data.
|
||||
* 1. create data id
|
||||
@@ -34,9 +35,6 @@ export async function insertData2Dataset({
|
||||
return Promise.reject("teamId and tmbId can't be the same");
|
||||
}
|
||||
|
||||
q = q.trim();
|
||||
a = a.trim();
|
||||
|
||||
const id = new Types.ObjectId();
|
||||
const qaStr = `${q}\n${a}`.trim();
|
||||
|
||||
@@ -74,6 +72,7 @@ export async function insertData2Dataset({
|
||||
collectionId,
|
||||
q,
|
||||
a,
|
||||
fullTextToken: jiebaSplit({ text: q + a }),
|
||||
indexes: indexes.map((item, i) => ({
|
||||
...item,
|
||||
dataId: result[i].insertId
|
||||
@@ -203,6 +202,7 @@ export async function updateData2Dataset({
|
||||
// update mongo
|
||||
mongoData.q = q || mongoData.q;
|
||||
mongoData.a = a ?? mongoData.a;
|
||||
mongoData.fullTextToken = jiebaSplit({ text: mongoData.q + mongoData.a });
|
||||
// @ts-ignore
|
||||
mongoData.indexes = indexes;
|
||||
await mongoData.save();
|
||||
|
@@ -1,5 +1,8 @@
|
||||
import { PgDatasetTableName } from '@fastgpt/global/core/dataset/constant';
|
||||
import type { SearchDataResponseItemType } from '@fastgpt/global/core/dataset/type.d';
|
||||
import type {
|
||||
DatasetDataWithCollectionType,
|
||||
SearchDataResponseItemType
|
||||
} from '@fastgpt/global/core/dataset/type.d';
|
||||
import { PgClient } from '@fastgpt/service/common/pg';
|
||||
import { getVectorsByText } from '@/service/core/ai/vector';
|
||||
import { delay } from '@/utils/tools';
|
||||
@@ -8,6 +11,7 @@ import { MongoDatasetCollection } from '@fastgpt/service/core/dataset/collection
|
||||
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
|
||||
import { POST } from '@fastgpt/service/common/api/plusRequest';
|
||||
import { PostReRankResponse } from '@fastgpt/global/core/ai/api';
|
||||
import { jiebaSplit } from '../utils';
|
||||
|
||||
export async function insertData2Pg({
|
||||
mongoDataId,
|
||||
@@ -125,39 +129,100 @@ export async function deletePgDataById(
|
||||
};
|
||||
}
|
||||
|
||||
// search
|
||||
export async function searchDatasetData({
|
||||
text,
|
||||
model,
|
||||
similarity = 0,
|
||||
limit,
|
||||
datasetIds = [],
|
||||
rerank = false
|
||||
}: {
|
||||
// ------------------ search start ------------------
|
||||
type SearchProps = {
|
||||
text: string;
|
||||
model: string;
|
||||
similarity?: number; // min distance
|
||||
limit: number;
|
||||
datasetIds: string[];
|
||||
rerank?: boolean;
|
||||
}) {
|
||||
};
|
||||
export async function searchDatasetData(props: SearchProps) {
|
||||
const { text, similarity = 0, limit, rerank = false } = props;
|
||||
|
||||
const [{ tokenLen, embeddingRecallResults }, { fullTextRecallResults }] = await Promise.all([
|
||||
embeddingRecall({
|
||||
...props,
|
||||
limit: rerank ? Math.max(50, limit * 3) : limit * 2
|
||||
}),
|
||||
fullTextRecall({
|
||||
...props,
|
||||
limit: 40
|
||||
})
|
||||
]);
|
||||
|
||||
// concat recall result
|
||||
let set = new Set<string>();
|
||||
const concatRecallResults = embeddingRecallResults;
|
||||
for (const item of fullTextRecallResults) {
|
||||
if (!set.has(item.id)) {
|
||||
concatRecallResults.push(item);
|
||||
set.add(item.id);
|
||||
}
|
||||
}
|
||||
|
||||
// remove same q and a data
|
||||
set = new Set<string>();
|
||||
const filterSameDataResults = concatRecallResults.filter((item) => {
|
||||
const str = `${item.q}${item.a}`.trim();
|
||||
if (set.has(str)) return false;
|
||||
set.add(str);
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!rerank) {
|
||||
return {
|
||||
searchRes: filterSameDataResults.slice(0, limit),
|
||||
tokenLen
|
||||
};
|
||||
}
|
||||
|
||||
// ReRank result
|
||||
const reRankResults = await reRankSearchResult({
|
||||
query: text,
|
||||
data: filterSameDataResults
|
||||
});
|
||||
|
||||
// similarity filter
|
||||
const filterReRankResults = reRankResults.filter((item) => item.score > similarity);
|
||||
|
||||
// concat rerank and embedding data
|
||||
set = new Set<string>(filterReRankResults.map((item) => item.id));
|
||||
const concatResult = filterReRankResults.concat(
|
||||
filterSameDataResults.filter((item) => {
|
||||
if (set.has(item.id)) return false;
|
||||
set.add(item.id);
|
||||
return true;
|
||||
})
|
||||
);
|
||||
|
||||
return {
|
||||
searchRes: concatResult.slice(0, limit),
|
||||
tokenLen
|
||||
};
|
||||
}
|
||||
export async function embeddingRecall({
|
||||
text,
|
||||
model,
|
||||
similarity = 0,
|
||||
limit,
|
||||
datasetIds = [],
|
||||
rerank = false
|
||||
}: SearchProps) {
|
||||
const { vectors, tokenLen } = await getVectorsByText({
|
||||
model,
|
||||
input: [text]
|
||||
});
|
||||
|
||||
const minLimit = global.systemEnv.pluginBaseUrl ? Math.max(50, limit * 4) : limit * 2;
|
||||
|
||||
const results: any = await PgClient.query(
|
||||
`BEGIN;
|
||||
SET LOCAL hnsw.ef_search = ${global.systemEnv.pgHNSWEfSearch || 100};
|
||||
select id, collection_id, data_id, (vector <#> '[${
|
||||
vectors[0]
|
||||
}]') * -1 AS score from ${PgDatasetTableName}
|
||||
where dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')}) AND vector <#> '[${
|
||||
vectors[0]
|
||||
}]' < -${similarity}
|
||||
order by score desc limit ${minLimit};
|
||||
select id, collection_id, data_id, (vector <#> '[${vectors[0]}]') * -1 AS score
|
||||
from ${PgDatasetTableName}
|
||||
where dataset_id IN (${datasetIds.map((id) => `'${String(id)}'`).join(',')})
|
||||
${rerank ? '' : `AND vector <#> '[${vectors[0]}]' < -${similarity}`}
|
||||
order by score desc limit ${limit};
|
||||
COMMIT;`
|
||||
);
|
||||
|
||||
@@ -212,47 +277,54 @@ export async function searchDatasetData({
|
||||
})
|
||||
.filter((item) => item !== null) as SearchDataResponseItemType[];
|
||||
|
||||
// remove same q and a data
|
||||
set = new Set<string>();
|
||||
const filterData = formatResult.filter((item) => {
|
||||
const str = `${item.q}${item.a}`.trim();
|
||||
if (set.has(str)) return false;
|
||||
set.add(str);
|
||||
return true;
|
||||
});
|
||||
|
||||
if (!rerank) {
|
||||
return {
|
||||
searchRes: filterData.slice(0, limit),
|
||||
tokenLen
|
||||
};
|
||||
}
|
||||
|
||||
// ReRank result
|
||||
const reRankResult = await reRankSearchResult({
|
||||
query: text,
|
||||
data: filterData
|
||||
});
|
||||
|
||||
// similarity filter
|
||||
const filterReRankResult = reRankResult.filter((item) => item.score > similarity);
|
||||
|
||||
// concat rerank and embedding data
|
||||
set = new Set<string>(filterReRankResult.map((item) => item.id));
|
||||
const concatResult = filterReRankResult.concat(
|
||||
filterData.filter((item) => {
|
||||
if (set.has(item.id)) return false;
|
||||
set.add(item.id);
|
||||
return true;
|
||||
})
|
||||
);
|
||||
|
||||
return {
|
||||
searchRes: concatResult.slice(0, limit),
|
||||
embeddingRecallResults: formatResult,
|
||||
tokenLen
|
||||
};
|
||||
}
|
||||
export async function fullTextRecall({
|
||||
text,
|
||||
limit,
|
||||
datasetIds = [],
|
||||
rerank = false
|
||||
}: SearchProps): Promise<{
|
||||
fullTextRecallResults: SearchDataResponseItemType[];
|
||||
tokenLen: number;
|
||||
}> {
|
||||
if (!rerank) {
|
||||
return {
|
||||
fullTextRecallResults: [],
|
||||
tokenLen: 0
|
||||
};
|
||||
}
|
||||
|
||||
const result = (await MongoDatasetData.find(
|
||||
{
|
||||
datasetId: { $in: datasetIds.map((item) => item) },
|
||||
$text: { $search: jiebaSplit({ text }) }
|
||||
},
|
||||
{ score: { $meta: 'textScore' } }
|
||||
)
|
||||
.sort({ score: { $meta: 'textScore' } })
|
||||
.limit(limit)
|
||||
.populate('collectionId')
|
||||
.lean()) as DatasetDataWithCollectionType[];
|
||||
|
||||
return {
|
||||
fullTextRecallResults: result.map((item) => ({
|
||||
id: String(item._id),
|
||||
datasetId: String(item.datasetId),
|
||||
collectionId: String(item.collectionId._id),
|
||||
sourceName: item.collectionId.name || '',
|
||||
sourceId: item.collectionId.metadata?.fileId || item.collectionId.metadata?.rawLink,
|
||||
q: item.q,
|
||||
a: item.a,
|
||||
indexes: item.indexes,
|
||||
score: 1
|
||||
})),
|
||||
tokenLen: 0
|
||||
};
|
||||
}
|
||||
// plus reRank search result
|
||||
export async function reRankSearchResult({
|
||||
data,
|
||||
@@ -279,7 +351,7 @@ export async function reRankSearchResult({
|
||||
score: item.score ?? target.score
|
||||
};
|
||||
})
|
||||
.filter((item) => item) as SearchDataResponseItemType[];
|
||||
.filter(Boolean) as SearchDataResponseItemType[];
|
||||
|
||||
return mergeResult;
|
||||
} catch (error) {
|
||||
@@ -288,3 +360,4 @@ export async function reRankSearchResult({
|
||||
return data;
|
||||
}
|
||||
}
|
||||
// ------------------ search end ------------------
|
||||
|
34
projects/app/src/service/core/dataset/utils.ts
Normal file
34
projects/app/src/service/core/dataset/utils.ts
Normal file
@@ -0,0 +1,34 @@
|
||||
import { MongoDatasetData } from '@fastgpt/service/core/dataset/data/schema';
|
||||
import { cut, extract } from '@node-rs/jieba';
|
||||
|
||||
/**
|
||||
* Same value judgment
|
||||
*/
|
||||
export async function hasSameValue({
|
||||
collectionId,
|
||||
q,
|
||||
a = ''
|
||||
}: {
|
||||
collectionId: string;
|
||||
q: string;
|
||||
a?: string;
|
||||
}) {
|
||||
const count = await MongoDatasetData.countDocuments({
|
||||
q,
|
||||
a,
|
||||
collectionId
|
||||
});
|
||||
|
||||
if (count > 0) {
|
||||
return Promise.reject('已经存在完全一致的数据');
|
||||
}
|
||||
}
|
||||
|
||||
export function jiebaSplit({ text }: { text: string }) {
|
||||
const tokens = cut(text, true);
|
||||
|
||||
return tokens
|
||||
.map((item) => item.replace(/[^\u4e00-\u9fa5a-zA-Z0-9\s]/g, '').trim())
|
||||
.filter(Boolean)
|
||||
.join(' ');
|
||||
}
|
@@ -13,8 +13,15 @@ import { getErrText } from '@fastgpt/global/common/error/utils';
|
||||
import { authTeamBalance } from '../support/permission/auth/bill';
|
||||
import type { PushDatasetDataChunkProps } from '@fastgpt/global/core/dataset/api.d';
|
||||
|
||||
const reduceQueue = () => {
|
||||
const reduceQueue = (retry = false) => {
|
||||
global.qaQueueLen = global.qaQueueLen > 0 ? global.qaQueueLen - 1 : 0;
|
||||
if (global.qaQueueLen === 0 && retry) {
|
||||
setTimeout(() => {
|
||||
generateQA();
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
return global.vectorQueueLen === 0;
|
||||
};
|
||||
|
||||
export async function generateQA(): Promise<any> {
|
||||
@@ -32,7 +39,7 @@ export async function generateQA(): Promise<any> {
|
||||
const data = await MongoDatasetTraining.findOneAndUpdate(
|
||||
{
|
||||
mode: TrainingModeEnum.qa,
|
||||
lockTime: { $lte: new Date(Date.now() - 10 * 60 * 1000) }
|
||||
lockTime: { $lte: new Date(Date.now() - 6 * 60 * 1000) }
|
||||
},
|
||||
{
|
||||
lockTime: new Date()
|
||||
@@ -70,12 +77,13 @@ export async function generateQA(): Promise<any> {
|
||||
}
|
||||
})();
|
||||
|
||||
if (done) {
|
||||
reduceQueue();
|
||||
global.vectorQueueLen <= 0 && console.log(`【QA】Task Done`);
|
||||
if (done || !data) {
|
||||
if (reduceQueue()) {
|
||||
console.log(`【QA】Task Done`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (error || !data) {
|
||||
if (error) {
|
||||
reduceQueue();
|
||||
return generateQA();
|
||||
}
|
||||
@@ -171,7 +179,7 @@ export async function generateQA(): Promise<any> {
|
||||
reduceQueue();
|
||||
generateQA();
|
||||
} catch (err: any) {
|
||||
reduceQueue();
|
||||
reduceQueue(true);
|
||||
// log
|
||||
if (err?.response) {
|
||||
addLog.info('openai error: 生成QA错误', {
|
||||
|
@@ -7,8 +7,16 @@ import { getErrText } from '@fastgpt/global/common/error/utils';
|
||||
import { authTeamBalance } from '@/service/support/permission/auth/bill';
|
||||
import { pushGenerateVectorBill } from '@/service/support/wallet/bill/push';
|
||||
|
||||
const reduceQueue = () => {
|
||||
const reduceQueue = (retry = false) => {
|
||||
global.vectorQueueLen = global.vectorQueueLen > 0 ? global.vectorQueueLen - 1 : 0;
|
||||
|
||||
if (global.vectorQueueLen === 0 && retry) {
|
||||
setTimeout(() => {
|
||||
generateVector();
|
||||
}, 60000);
|
||||
}
|
||||
|
||||
return global.vectorQueueLen === 0;
|
||||
};
|
||||
|
||||
/* 索引生成队列。每导入一次,就是一个单独的线程 */
|
||||
@@ -57,8 +65,8 @@ export async function generateVector(): Promise<any> {
|
||||
return {
|
||||
data,
|
||||
dataItem: {
|
||||
q: data.q.replace(/[\x00-\x08]/g, ' '),
|
||||
a: data.a?.replace(/[\x00-\x08]/g, ' ') || '',
|
||||
q: data.q,
|
||||
a: data.a || '',
|
||||
indexes: data.indexes
|
||||
}
|
||||
};
|
||||
@@ -70,12 +78,13 @@ export async function generateVector(): Promise<any> {
|
||||
}
|
||||
})();
|
||||
|
||||
if (done) {
|
||||
reduceQueue();
|
||||
global.vectorQueueLen <= 0 && console.log(`【index】Task done`);
|
||||
if (done || !data) {
|
||||
if (reduceQueue()) {
|
||||
console.log(`【index】Task done`);
|
||||
}
|
||||
return;
|
||||
}
|
||||
if (error || !data) {
|
||||
if (error) {
|
||||
reduceQueue();
|
||||
return generateVector();
|
||||
}
|
||||
@@ -108,8 +117,15 @@ export async function generateVector(): Promise<any> {
|
||||
}
|
||||
|
||||
// create vector and insert
|
||||
|
||||
try {
|
||||
// invalid data
|
||||
if (!data.q.trim()) {
|
||||
await MongoDatasetTraining.findByIdAndDelete(data._id);
|
||||
reduceQueue();
|
||||
generateVector();
|
||||
return;
|
||||
}
|
||||
|
||||
// insert data to pg
|
||||
const { tokenLen } = await insertData2Dataset({
|
||||
teamId: data.teamId,
|
||||
@@ -135,7 +151,7 @@ export async function generateVector(): Promise<any> {
|
||||
reduceQueue();
|
||||
generateVector();
|
||||
} catch (err: any) {
|
||||
reduceQueue();
|
||||
reduceQueue(true);
|
||||
// log
|
||||
if (err?.response) {
|
||||
addLog.info('openai error: 生成向量错误', {
|
||||
|
Reference in New Issue
Block a user