fix: queue (#4485)

This commit is contained in:
Archer
2025-04-09 13:43:26 +08:00
committed by GitHub
parent 2dc3cb75fe
commit e4629a5c8c
6 changed files with 104 additions and 80 deletions

View File

@@ -62,3 +62,4 @@ curl --location --request POST 'https://{{host}}/api/admin/initv494' \
## 🐛 修复
1. 搜索应用/知识库时,无法点击目录进入下一层。
2. 重新训练时,参数未成功初始化。

View File

@@ -8,6 +8,7 @@ import { MILVUS_ADDRESS, PG_ADDRESS, OCEANBASE_ADDRESS } from './constants';
import { MilvusCtrl } from './milvus/class';
import { setRedisCache, getRedisCache, delRedisCache, CacheKeyEnum } from '../redis/cache';
import { throttle } from 'lodash';
import { retryFn } from '@fastgpt/global/common/system/utils';
const getVectorObj = () => {
if (PG_ADDRESS) return new PgVectorCtrl();
@@ -55,22 +56,24 @@ export const insertDatasetDataVector = async ({
query: string;
model: EmbeddingModelItemType;
}) => {
const { vectors, tokens } = await getVectorsByText({
model,
input: query,
type: 'db'
});
const { insertId } = await Vector.insert({
...props,
vector: vectors[0]
});
return retryFn(async () => {
const { vectors, tokens } = await getVectorsByText({
model,
input: query,
type: 'db'
});
const { insertId } = await Vector.insert({
...props,
vector: vectors[0]
});
onDelCache(props.teamId);
onDelCache(props.teamId);
return {
tokens,
insertId
};
return {
tokens,
insertId
};
});
};
export const deleteDatasetDataVector = async (props: DelDatasetVectorCtrlProps) => {

View File

@@ -115,6 +115,7 @@ const CollectionChunkForm = ({ form }: { form: UseFormReturn<CollectionChunkForm
const chunkSplitMode = watch('chunkSplitMode');
const autoIndexes = watch('autoIndexes');
const indexSize = watch('indexSize');
const imageIndex = watch('imageIndex');
const trainingModeList = useMemo(() => {
const list = Object.entries(DatasetCollectionDataProcessModeMap);
@@ -225,7 +226,11 @@ const CollectionChunkForm = ({ form }: { form: UseFormReturn<CollectionChunkForm
<HStack gap={[3, 7]}>
<HStack flex={'1'} spacing={1}>
<MyTooltip label={!feConfigs?.isPlus ? t('common:commercial_function_tip') : ''}>
<Checkbox isDisabled={!feConfigs?.isPlus} {...register('autoIndexes')}>
<Checkbox
isDisabled={!feConfigs?.isPlus}
isChecked={autoIndexes}
{...register('autoIndexes')}
>
<FormLabel>{t('dataset:auto_indexes')}</FormLabel>
</Checkbox>
</MyTooltip>
@@ -243,6 +248,7 @@ const CollectionChunkForm = ({ form }: { form: UseFormReturn<CollectionChunkForm
>
<Checkbox
isDisabled={!feConfigs?.isPlus || !datasetDetail?.vlmModel}
isChecked={imageIndex}
{...register('imageIndex')}
>
<FormLabel>{t('dataset:image_auto_parse')}</FormLabel>

View File

@@ -20,17 +20,14 @@ import FormLabel from '@fastgpt/web/components/common/MyBox/FormLabel';
import QuestionTip from '@fastgpt/web/components/common/MyTooltip/QuestionTip';
import { shadowLight } from '@fastgpt/web/styles/theme';
import CollectionChunkForm from '../../Form/CollectionChunkForm';
import { DatasetCollectionDataProcessModeEnum } from '@fastgpt/global/core/dataset/constants';
function DataProcess() {
const { t } = useTranslation();
const { feConfigs } = useSystemStore();
const { goToNext, processParamsForm, chunkSize } = useContextSelector(
DatasetImportContext,
(v) => v
);
const { register } = processParamsForm;
const { goToNext, processParamsForm } = useContextSelector(DatasetImportContext, (v) => v);
const { register, watch } = processParamsForm;
const customPdfParseValue = watch('customPdfParse');
const Title = useCallback(({ title }: { title: string }) => {
return (
@@ -66,7 +63,7 @@ function DataProcess() {
>
{feConfigs.showCustomPdfParse && (
<HStack spacing={1}>
<Checkbox {...register('customPdfParse')}>
<Checkbox isChecked={customPdfParseValue} {...register('customPdfParse')}>
<FormLabel>{t('dataset:pdf_enhance_parse')}</FormLabel>
</Checkbox>
<QuestionTip label={t('dataset:pdf_enhance_parse_tips')} />

View File

@@ -17,7 +17,7 @@ import { splitText2Chunks } from '@fastgpt/global/common/string/textSplitter';
import { countPromptTokens } from '@fastgpt/service/common/string/tiktoken';
const formatIndexes = async ({
indexes,
indexes = [],
q,
a = '',
indexSize,
@@ -66,7 +66,6 @@ const formatIndexes = async ({
];
};
indexes = indexes || [];
// If index not type, set it to custom
indexes = indexes
.map((item) => ({
@@ -93,7 +92,7 @@ const formatIndexes = async ({
indexes = indexes.filter((item) => item.type !== DatasetDataIndexTypeEnum.default);
indexes.push(...concatDefaultIndexes);
// Filter same text
// Remove same text
indexes = indexes.filter(
(item, index, self) => index === self.findIndex((t) => t.text === item.text)
);
@@ -101,12 +100,16 @@ const formatIndexes = async ({
const chekcIndexes = (
await Promise.all(
indexes.map(async (item) => {
if (item.type === DatasetDataIndexTypeEnum.default) {
return item;
}
// If oversize tokens, split it
const tokens = await countPromptTokens(item.text);
if (tokens > indexSize) {
if (tokens > maxIndexSize) {
const splitText = splitText2Chunks({
text: item.text,
chunkSize: 512,
chunkSize: indexSize,
maxSize: maxIndexSize
}).chunks;
return splitText.map((text) => ({
@@ -114,6 +117,7 @@ const formatIndexes = async ({
type: item.type
}));
}
return item;
})
)
@@ -164,24 +168,30 @@ export async function insertData2Dataset({
});
// insert to vector store
const result = await Promise.all(
newIndexes.map(async (item) => {
const result = await insertDatasetDataVector({
query: item.text,
model: embModel,
teamId,
datasetId,
collectionId
});
return {
tokens: result.tokens,
index: {
...item,
dataId: result.insertId
}
};
})
);
const results: {
tokens: number;
index: {
dataId: string;
type: `${DatasetDataIndexTypeEnum}`;
text: string;
};
}[] = [];
for await (const item of newIndexes) {
const result = await insertDatasetDataVector({
query: item.text,
model: embModel,
teamId,
datasetId,
collectionId
});
results.push({
tokens: result.tokens,
index: {
...item,
dataId: result.insertId
}
});
}
// 2. Create mongo data
const [{ _id }] = await MongoDatasetData.create(
@@ -194,7 +204,7 @@ export async function insertData2Dataset({
q,
a,
chunkIndex,
indexes: result.map((item) => item.index)
indexes: results.map((item) => item.index)
}
],
{ session, ordered: true }
@@ -216,7 +226,7 @@ export async function insertData2Dataset({
return {
insertId: _id,
tokens: result.reduce((acc, cur) => acc + cur.tokens, 0)
tokens: results.reduce((acc, cur) => acc + cur.tokens, 0)
};
}
@@ -303,25 +313,27 @@ export async function updateData2Dataset({
await mongoData.save();
// 5. insert vector
const insertResult = await Promise.all(
patchResult
.filter((item) => item.type === 'create' || item.type === 'update')
.map(async (item) => {
// insert new vector and update dateId
const result = await insertDatasetDataVector({
query: item.index.text,
model: getEmbeddingModel(model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
item.index.dataId = result.insertId;
return {
tokens: result.tokens
};
})
);
const tokens = insertResult.reduce((acc, cur) => acc + cur.tokens, 0);
const insertResults: {
tokens: number;
}[] = [];
for await (const item of patchResult) {
if (item.type === 'delete' || item.type === 'unChange') continue;
// insert new vector and update dateId
const result = await insertDatasetDataVector({
query: item.index.text,
model: getEmbeddingModel(model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
item.index.dataId = result.insertId;
insertResults.push({
tokens: result.tokens
});
}
const tokens = insertResults.reduce((acc, cur) => acc + cur.tokens, 0);
const newIndexes = patchResult
.filter((item) => item.type !== 'delete')

View File

@@ -200,19 +200,24 @@ const rebuildData = async ({
// update vector, update dataset_data rebuilding status, delete data from training
// 1. Insert new vector to dataset_data
const updateResult = await Promise.all(
mongoData.indexes.map(async (index, i) => {
const result = await insertDatasetDataVector({
query: index.text,
model: getEmbeddingModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
return result;
})
);
const updateResult: {
tokens: number;
insertId: string;
}[] = [];
let i = 0;
for await (const index of mongoData.indexes) {
const result = await insertDatasetDataVector({
query: index.text,
model: getEmbeddingModel(trainingData.model),
teamId: mongoData.teamId,
datasetId: mongoData.datasetId,
collectionId: mongoData.collectionId
});
mongoData.indexes[i].dataId = result.insertId;
updateResult.push(result);
i++;
}
const { tokens } = await mongoSessionRun(async (session) => {
// 2. Ensure that the training data is deleted after the Mongo update is successful
await mongoData.save({ session });