Optimize the file storage structure of the knowledge base (#386)

This commit is contained in:
Archer
2023-10-10 22:41:05 +08:00
committed by GitHub
parent 29d152784f
commit d0041a98b4
41 changed files with 591 additions and 231 deletions

View File

@@ -0,0 +1,109 @@
import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@/service/response';
import { authUser } from '@/service/utils/auth';
import { connectToDatabase } from '@/service/mongo';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { DatasetSpecialIdEnum } from '@fastgpt/core/dataset/constant';
import mongoose, { Types } from 'mongoose';
import { delay } from '@/utils/tools';
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
let initFileIds: string[] = [];
try {
const { limit = 100 } = req.body;
await connectToDatabase();
await authUser({ req, authRoot: true });
console.log('add index');
await PgClient.query(
`
ALTER TABLE modeldata
ALTER COLUMN source TYPE VARCHAR(256),
ALTER COLUMN file_id TYPE VARCHAR(256);
CREATE INDEX IF NOT EXISTS modelData_fileId_index ON modeldata (file_id);
`
);
console.log('index success');
console.log('count rows');
// 去重获取 fileId
const { rows } = await PgClient.query(`SELECT DISTINCT file_id
FROM ${PgDatasetTableName} WHERE file_id IS NOT NULL AND file_id != '';
`);
console.log('count rows success', rows.length);
console.log('start filter');
for (let i = 0; i < rows.length; i += limit) {
await init(rows.slice(i, i + limit), initFileIds);
console.log(i);
}
console.log('filter success');
console.log('start update');
for (let i = 0; i < initFileIds.length; i++) {
await PgClient.query(`UPDATE ${PgDatasetTableName}
SET file_id = '${DatasetSpecialIdEnum.manual}'
WHERE file_id = '${initFileIds[i]}'`);
console.log('update: ', initFileIds[i]);
}
const { rows: emptyIds } = await PgClient.query(
`SELECT id FROM ${PgDatasetTableName} WHERE file_id IS NULL OR file_id=''`
);
console.log(emptyIds.length);
await delay(5000);
async function start(start: number) {
for (let i = start; i < emptyIds.length; i += limit) {
await PgClient.query(`UPDATE ${PgDatasetTableName}
SET file_id = '${DatasetSpecialIdEnum.manual}'
WHERE id = '${emptyIds[i].id}'`);
console.log('update: ', i, emptyIds[i].id);
}
}
for (let i = 0; i < limit; i++) {
start(i);
}
// await PgClient.query(
// `UPDATE ${PgDatasetTableName}
// SET file_id = '${DatasetSpecialIdEnum.manual}'
// WHERE file_id IS NULL OR file_id = ''`
// );
console.log('update success');
jsonRes(res, {
data: {
empty: emptyIds.length
}
});
} catch (error) {
jsonRes(res, {
code: 500,
error
});
}
}
async function init(rows: any[], initFileIds: string[]) {
const collection = mongoose.connection.db.collection(`dataset.files`);
/* 遍历所有的 fileId去找有没有对应的文件没有的话则改成manual */
const updateResult = await Promise.allSettled(
rows.map(async (item) => {
// 找下是否有对应的文件
const file = await collection.findOne({
_id: new Types.ObjectId(item.file_id)
});
if (file) return '';
// 没有文件的改成manual
initFileIds.push(item.file_id);
return item.file_id;
})
);
// @ts-ignore
console.log(updateResult.filter((item) => item?.value).length);
}

View File

@@ -91,6 +91,10 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
if (res.closed) {
return stream.destroy();
}
q = q.replace(/"/g, '""');
a = a.replace(/"/g, '""');
source = source?.replace(/"/g, '""');
write(`\n"${q}","${a || ''}","${source || ''}"`);
});
// finish

View File

@@ -4,7 +4,6 @@ import { connectToDatabase } from '@/service/mongo';
import { authUser } from '@/service/utils/auth';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { OtherFileId } from '@/constants/dataset';
import type { PgDataItemType } from '@/types/core/dataset/data';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
@@ -36,15 +35,12 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
['user_id', userId],
'AND',
['kb_id', kbId],
...(fileId
? fileId === OtherFileId
? ["AND (file_id IS NULL OR file_id = '')"]
: ['AND', ['file_id', fileId]]
: []),
'AND',
['file_id', fileId],
...(searchText
? [
'AND',
`(q LIKE '%${searchText}%' OR a LIKE '%${searchText}%' OR source LIKE '%${searchText}%')`
`(q ILIKE '%${searchText}%' OR a ILIKE '%${searchText}%' OR source ILIKE '%${searchText}%')`
]
: [])
];

View File

@@ -1,3 +1,7 @@
/*
insert one data to dataset (immediately insert)
manual input or mark data
*/
import type { NextApiRequest, NextApiResponse } from 'next';
import { jsonRes } from '@/service/response';
import { connectToDatabase } from '@/service/mongo';
@@ -11,7 +15,6 @@ import { DatasetDataItemType } from '@/types/core/dataset/data';
import { countPromptTokens } from '@/utils/common/tiktoken';
export type Props = {
billId?: string;
kbId: string;
data: DatasetDataItemType;
};
@@ -40,7 +43,7 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
export async function getVectorAndInsertDataset(
props: Props & { userId: string }
): Promise<string> {
const { kbId, data, userId, billId } = props;
const { kbId, data, userId } = props;
if (!kbId || !data?.q) {
return Promise.reject('缺少参数');
}
@@ -61,7 +64,7 @@ export async function getVectorAndInsertDataset(
const { rows: existsRows } = await PgClient.query(`
SELECT COUNT(*) > 0 AS exists
FROM ${PgDatasetTableName}
WHERE md5(q)=md5('${q}') AND md5(a)=md5('${a}') AND user_id='${userId}' AND kb_id='${kbId}'
WHERE md5(q)=md5('${q}') AND md5(a)=md5('${a}') AND user_id='${userId}' AND file_id='${data.file_id}' AND kb_id='${kbId}'
`);
const exists = existsRows[0]?.exists || false;
@@ -72,8 +75,7 @@ export async function getVectorAndInsertDataset(
const { vectors } = await getVector({
model: kb.vectorModel,
input: [q],
userId,
billId
userId
});
const response = await insertData2Dataset({

View File

@@ -6,7 +6,7 @@ import { GridFSStorage } from '@/service/lib/gridfs';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { Types } from 'mongoose';
import { OtherFileId } from '@/constants/dataset';
import { isSpecialFileId } from '@fastgpt/core/dataset/utils';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -22,14 +22,9 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
const { userId } = await authUser({ req, authToken: true });
// other data. Delete only vector data
if (fileId === OtherFileId) {
if (isSpecialFileId(fileId)) {
await PgClient.delete(PgDatasetTableName, {
where: [
['user_id', userId],
'AND',
['kb_id', kbId],
"AND (file_id IS NULL OR file_id = '')"
]
where: [['user_id', userId], 'AND', ['kb_id', kbId], 'AND', ['file_id', fileId]]
});
} else {
// auth file
@@ -48,7 +43,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
file_id: fileId
});
// delete file
// delete file
await bucket.delete(new Types.ObjectId(fileId));
}

View File

@@ -3,8 +3,12 @@ import { jsonRes } from '@/service/response';
import { connectToDatabase } from '@/service/mongo';
import { authUser } from '@/service/utils/auth';
import { GridFSStorage } from '@/service/lib/gridfs';
import { OtherFileId } from '@/constants/dataset';
import { datasetSpecialIdMap } from '@fastgpt/core/dataset/constant';
import { datasetSpecialIds } from '@fastgpt/core/dataset/constant';
import type { GSFileInfoType } from '@/types/common/file';
import { strIsLink } from '@fastgpt/common/tools/str';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -14,12 +18,32 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
// 凭证校验
const { userId } = await authUser({ req, authToken: true });
if (fileId === OtherFileId) {
// manual, mark
if (datasetSpecialIds.includes(fileId)) {
return jsonRes<GSFileInfoType>(res, {
data: {
id: OtherFileId,
id: fileId,
size: 0,
filename: 'kb.Other Data',
// @ts-ignore
filename: datasetSpecialIdMap[fileId]?.name || fileId,
uploadDate: new Date(),
encoding: '',
contentType: ''
}
});
}
// link file
if (strIsLink(fileId)) {
const { rows } = await PgClient.select(PgDatasetTableName, {
where: [['user_id', userId], 'AND', ['file_id', fileId]],
limit: 1,
fields: ['source']
});
return jsonRes<GSFileInfoType>(res, {
data: {
id: fileId,
size: 0,
filename: rows[0]?.source || fileId,
uploadDate: new Date(),
encoding: '',
contentType: ''

View File

@@ -5,7 +5,14 @@ import { authUser } from '@/service/utils/auth';
import { GridFSStorage } from '@/service/lib/gridfs';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { FileStatusEnum, OtherFileId } from '@/constants/dataset';
import { FileStatusEnum } from '@/constants/dataset';
import { strIsLink } from '@fastgpt/common/tools/str';
import {
DatasetSpecialIdEnum,
datasetSpecialIdMap,
datasetSpecialIds
} from '@fastgpt/core/dataset/constant';
import { Types } from 'mongoose';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -22,57 +29,106 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
// 凭证校验
const { userId } = await authUser({ req, authToken: true });
// select and count same file_id data, exclude special id
const pgWhere = `user_id = '${userId}' AND kb_id = '${kbId}' ${datasetSpecialIds
.map((item) => `AND file_id!='${item}'`)
.join(' ')}
${searchText ? `AND source ILIKE '%${searchText}%'` : ''}`;
const [{ rows }, { rowCount: total }] = await Promise.all([
PgClient.query(`SELECT file_id, COUNT(*) AS count
FROM ${PgDatasetTableName}
where ${pgWhere}
GROUP BY file_id
ORDER BY file_id DESC
LIMIT ${pageSize} OFFSET ${(pageNum - 1) * pageSize};
`),
PgClient.query(`SELECT DISTINCT file_id
FROM ${PgDatasetTableName}
where ${pgWhere}
`)
]);
// find files
const gridFs = new GridFSStorage('dataset', userId);
const collection = gridFs.Collection();
const mongoWhere = {
['metadata.kbId']: kbId,
['metadata.userId']: userId,
['metadata.datasetUsed']: true,
...(searchText && { filename: { $regex: searchText } })
};
const [files, total] = await Promise.all([
collection
.find(mongoWhere, {
projection: {
_id: 1,
filename: 1,
uploadDate: 1,
length: 1
}
})
.skip((pageNum - 1) * pageSize)
.limit(pageSize)
.sort({ uploadDate: -1 })
.toArray(),
collection.countDocuments(mongoWhere)
]);
async function GetOtherData() {
return {
id: OtherFileId,
size: 0,
filename: 'kb.Other Data',
uploadTime: new Date(),
status: (await TrainingData.findOne({ userId, kbId, file_id: '' }))
? FileStatusEnum.embedding
: FileStatusEnum.ready,
chunkLength: await PgClient.count(PgDatasetTableName, {
fields: ['id'],
where: [
['user_id', userId],
'AND',
['kb_id', kbId],
"AND (file_id IS NULL OR file_id = '')"
]
})
};
async function getSpecialData() {
if (pageNum !== 1) return [];
return [
{
id: DatasetSpecialIdEnum.manual,
size: 0,
filename: datasetSpecialIdMap[DatasetSpecialIdEnum.manual].name,
uploadTime: new Date(),
status: FileStatusEnum.ready,
chunkLength: await PgClient.count(PgDatasetTableName, {
fields: ['id'],
where: [
['user_id', userId],
'AND',
['file_id', DatasetSpecialIdEnum.manual],
'AND',
['kb_id', kbId]
]
})
},
{
id: DatasetSpecialIdEnum.mark,
size: 0,
filename: datasetSpecialIdMap[DatasetSpecialIdEnum.mark].name,
uploadTime: new Date(),
status: FileStatusEnum.ready,
chunkLength: await PgClient.count(PgDatasetTableName, {
fields: ['id'],
where: [
['user_id', userId],
'AND',
['file_id', DatasetSpecialIdEnum.mark],
'AND',
['kb_id', kbId]
]
})
}
];
}
const data = await Promise.all([
GetOtherData(),
...files.map(async (file) => {
getSpecialData(),
...rows.map(async (row) => {
// link data
if (strIsLink(row.file_id)) {
const { rows } = await PgClient.select(PgDatasetTableName, {
where: [['user_id', userId], 'AND', ['file_id', row.file_id]],
limit: 1,
fields: ['source']
});
return {
id: row.file_id,
size: 0,
filename: rows[0]?.source || row.file_id,
uploadTime: new Date(),
status: FileStatusEnum.ready,
chunkLength: row.count
};
}
// file data
const file = await collection.findOne(
{
_id: new Types.ObjectId(row.file_id),
['metadata.userId']: userId,
['metadata.kbId']: kbId
},
{
projection: {
_id: 1,
filename: 1,
uploadDate: 1,
length: 1
}
}
);
if (!file) return null;
return {
id: String(file._id),
size: file.length,
@@ -81,16 +137,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
status: (await TrainingData.findOne({ userId, kbId, file_id: file._id }))
? FileStatusEnum.embedding
: FileStatusEnum.ready,
chunkLength: await PgClient.count(PgDatasetTableName, {
fields: ['id'],
where: [
['user_id', userId],
'AND',
['kb_id', kbId],
'AND',
['file_id', String(file._id)]
]
})
chunkLength: row.count
};
})
]);
@@ -99,7 +146,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
data: {
pageNum,
pageSize,
data: data.flat(),
data: data.flat().filter((item) => item),
total
}
});

View File

@@ -8,6 +8,7 @@ import { Types } from 'mongoose';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { addLog } from '@/service/utils/tools';
import { strIsLink } from '@fastgpt/common/tools/str';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -19,20 +20,22 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
const gridFs = new GridFSStorage('dataset', userId);
const collection = gridFs.Collection();
await collection.findOneAndUpdate(
{
_id: new Types.ObjectId(id)
},
{
$set: {
...(name && { filename: name }),
...(datasetUsed && { ['metadata.datasetUsed']: datasetUsed })
if (id.length === 24 && !strIsLink(id)) {
await collection.findOneAndUpdate(
{
_id: new Types.ObjectId(id)
},
{
$set: {
...(name && { filename: name }),
...(datasetUsed && { ['metadata.datasetUsed']: datasetUsed })
}
}
}
);
);
}
// data source
updateDatasetSource({
await updateDatasetSource({
fileId: id,
userId,
name