perf: password special chars;feat: llm paragraph;perf: chunk setting params;perf: text splitter worker (#4984)

* perf: password special chars

* feat: llm paragraph;perf: chunk setting params

* perf: text splitter worker

* perf: get rawtext buffer

* fix: test

* fix: test

* doc

* min chunk size
This commit is contained in:
Archer
2025-06-10 00:05:54 +08:00
committed by GitHub
parent 068918a9ee
commit 01ff56b42b
41 changed files with 546 additions and 448 deletions

View File

@@ -5,6 +5,8 @@ import { addLog } from '../../system/log';
import { setCron } from '../../system/cron';
import { checkTimerLock } from '../../system/timerLock/utils';
import { TimerIdEnum } from '../../system/timerLock/constants';
import { gridFsStream2Buffer } from '../../file/gridfs/utils';
import { readRawContentFromBuffer } from '../../../worker/function';
const getGridBucket = () => {
return new connectionMongo.mongo.GridFSBucket(connectionMongo.connection.db!, {
@@ -85,30 +87,27 @@ export const getRawTextBuffer = async (sourceId: string) => {
// Read file content
const downloadStream = gridBucket.openDownloadStream(bufferData._id);
const chunks: Buffer[] = [];
return new Promise<{
text: string;
sourceName: string;
} | null>((resolve, reject) => {
downloadStream.on('data', (chunk) => {
chunks.push(chunk);
});
const fileBuffers = await gridFsStream2Buffer(downloadStream);
downloadStream.on('end', () => {
const buffer = Buffer.concat(chunks);
const text = buffer.toString('utf8');
resolve({
text,
sourceName: bufferData.metadata?.sourceName || ''
});
});
const rawText = await (async () => {
if (fileBuffers.length < 10000000) {
return fileBuffers.toString('utf8');
} else {
return (
await readRawContentFromBuffer({
extension: 'txt',
encoding: 'utf8',
buffer: fileBuffers
})
).rawText;
}
})();
downloadStream.on('error', (error) => {
addLog.error('getRawTextBuffer error', error);
resolve(null);
});
});
return {
text: rawText,
sourceName: bufferData.metadata?.sourceName || ''
};
});
};

View File

@@ -55,13 +55,17 @@ export const createFileFromText = async ({
export const gridFsStream2Buffer = (stream: NodeJS.ReadableStream) => {
return new Promise<Buffer>((resolve, reject) => {
if (!stream.readable) {
return resolve(Buffer.from([]));
}
const chunks: Uint8Array[] = [];
stream.on('data', (chunk) => {
chunks.push(chunk);
});
stream.on('end', () => {
const resultBuffer = Buffer.concat(chunks); // 一次性拼接
const resultBuffer = Buffer.concat(chunks); // One-time splicing
resolve(resultBuffer);
});
stream.on('error', (err) => {

View File

@@ -1,6 +1,5 @@
import { uploadMongoImg } from '../image/controller';
import FormData from 'form-data';
import { WorkerNameEnum, runWorker } from '../../../worker/utils';
import fs from 'fs';
import type { ReadFileResponse } from '../../../worker/readFile/type';
import axios from 'axios';
@@ -9,6 +8,7 @@ import { batchRun } from '@fastgpt/global/common/system/utils';
import { matchMdImg } from '@fastgpt/global/common/string/markdown';
import { createPdfParseUsage } from '../../../support/wallet/usage/controller';
import { useDoc2xServer } from '../../../thirdProvider/doc2x';
import { readRawContentFromBuffer } from '../../../worker/function';
export type readRawTextByLocalFileParams = {
teamId: string;
@@ -63,11 +63,10 @@ export const readRawContentByFileBuffer = async ({
rawText: string;
}> => {
const systemParse = () =>
runWorker<ReadFileResponse>(WorkerNameEnum.readFile, {
readRawContentFromBuffer({
extension,
encoding,
buffer,
teamId
buffer
});
const parsePdfFromCustomService = async (): Promise<ReadFileResponse> => {
const url = global.systemEnv.customPdfParse?.url;

View File

@@ -1,3 +1,4 @@
import { isTestEnv } from '@fastgpt/global/common/system/constants';
import { addLog } from '../../common/system/log';
import type { Model } from 'mongoose';
import mongoose, { Mongoose } from 'mongoose';
@@ -70,7 +71,7 @@ const addCommonMiddleware = (schema: mongoose.Schema) => {
export const getMongoModel = <T>(name: string, schema: mongoose.Schema) => {
if (connectionMongo.models[name]) return connectionMongo.models[name] as Model<T>;
if (process.env.NODE_ENV !== 'test') console.log('Load model======', name);
if (!isTestEnv) console.log('Load model======', name);
addCommonMiddleware(schema);
const model = connectionMongo.model<T>(name, schema);

View File

@@ -32,10 +32,7 @@ import { MongoDatasetDataText } from '../data/dataTextSchema';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { getTrainingModeByCollection } from './utils';
import {
computeChunkSize,
computeChunkSplitter,
computeParagraphChunkDeep,
getAutoIndexSize,
computedCollectionChunkSettings,
getLLMMaxChunkSize
} from '@fastgpt/global/core/dataset/training/utils';
import { DatasetDataIndexTypeEnum } from '@fastgpt/global/core/dataset/data/constants';
@@ -68,31 +65,50 @@ export const createCollectionAndInsertData = async ({
createCollectionParams.autoIndexes = true;
}
const teamId = createCollectionParams.teamId;
const tmbId = createCollectionParams.tmbId;
const formatCreateCollectionParams = computedCollectionChunkSettings({
...createCollectionParams,
llmModel: getLLMModel(dataset.agentModel),
vectorModel: getEmbeddingModel(dataset.vectorModel)
});
const teamId = formatCreateCollectionParams.teamId;
const tmbId = formatCreateCollectionParams.tmbId;
// Set default params
const trainingType =
createCollectionParams.trainingType || DatasetCollectionDataProcessModeEnum.chunk;
const chunkSplitter = computeChunkSplitter(createCollectionParams);
const paragraphChunkDeep = computeParagraphChunkDeep(createCollectionParams);
formatCreateCollectionParams.trainingType || DatasetCollectionDataProcessModeEnum.chunk;
const trainingMode = getTrainingModeByCollection({
trainingType: trainingType,
autoIndexes: createCollectionParams.autoIndexes,
imageIndex: createCollectionParams.imageIndex
autoIndexes: formatCreateCollectionParams.autoIndexes,
imageIndex: formatCreateCollectionParams.imageIndex
});
if (
trainingType === DatasetCollectionDataProcessModeEnum.qa ||
trainingType === DatasetCollectionDataProcessModeEnum.backup
trainingType === DatasetCollectionDataProcessModeEnum.backup ||
trainingType === DatasetCollectionDataProcessModeEnum.template
) {
delete createCollectionParams.chunkTriggerType;
delete createCollectionParams.chunkTriggerMinSize;
delete createCollectionParams.dataEnhanceCollectionName;
delete createCollectionParams.imageIndex;
delete createCollectionParams.autoIndexes;
delete createCollectionParams.indexSize;
delete createCollectionParams.qaPrompt;
delete formatCreateCollectionParams.chunkTriggerType;
delete formatCreateCollectionParams.chunkTriggerMinSize;
delete formatCreateCollectionParams.dataEnhanceCollectionName;
delete formatCreateCollectionParams.imageIndex;
delete formatCreateCollectionParams.autoIndexes;
if (
trainingType === DatasetCollectionDataProcessModeEnum.backup ||
trainingType === DatasetCollectionDataProcessModeEnum.template
) {
delete formatCreateCollectionParams.paragraphChunkAIMode;
delete formatCreateCollectionParams.paragraphChunkDeep;
delete formatCreateCollectionParams.paragraphChunkMinSize;
delete formatCreateCollectionParams.chunkSplitMode;
delete formatCreateCollectionParams.chunkSize;
delete formatCreateCollectionParams.chunkSplitter;
delete formatCreateCollectionParams.indexSize;
}
}
if (trainingType !== DatasetCollectionDataProcessModeEnum.qa) {
delete formatCreateCollectionParams.qaPrompt;
}
// 1. split chunks or create image chunks
@@ -109,30 +125,27 @@ export const createCollectionAndInsertData = async ({
}>;
chunkSize?: number;
indexSize?: number;
} = (() => {
} = await (async () => {
if (rawText) {
const chunkSize = computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
});
// Process text chunks
const chunks = rawText2Chunks({
const chunks = await rawText2Chunks({
rawText,
chunkTriggerType: createCollectionParams.chunkTriggerType,
chunkTriggerMinSize: createCollectionParams.chunkTriggerMinSize,
chunkSize,
paragraphChunkDeep,
paragraphChunkMinSize: createCollectionParams.paragraphChunkMinSize,
chunkTriggerType: formatCreateCollectionParams.chunkTriggerType,
chunkTriggerMinSize: formatCreateCollectionParams.chunkTriggerMinSize,
chunkSize: formatCreateCollectionParams.chunkSize,
paragraphChunkDeep: formatCreateCollectionParams.paragraphChunkDeep,
paragraphChunkMinSize: formatCreateCollectionParams.paragraphChunkMinSize,
maxSize: getLLMMaxChunkSize(getLLMModel(dataset.agentModel)),
overlapRatio: trainingType === DatasetCollectionDataProcessModeEnum.chunk ? 0.2 : 0,
customReg: chunkSplitter ? [chunkSplitter] : [],
customReg: formatCreateCollectionParams.chunkSplitter
? [formatCreateCollectionParams.chunkSplitter]
: [],
backupParse
});
return {
chunks,
chunkSize,
indexSize: createCollectionParams.indexSize ?? getAutoIndexSize(dataset.vectorModel)
chunkSize: formatCreateCollectionParams.chunkSize,
indexSize: formatCreateCollectionParams.indexSize
};
}
@@ -147,12 +160,8 @@ export const createCollectionAndInsertData = async ({
return {
chunks: [],
chunkSize: computeChunkSize({
...createCollectionParams,
trainingType,
llmModel: getLLMModel(dataset.agentModel)
}),
indexSize: createCollectionParams.indexSize ?? getAutoIndexSize(dataset.vectorModel)
chunkSize: formatCreateCollectionParams.chunkSize,
indexSize: formatCreateCollectionParams.indexSize
};
})();
@@ -165,11 +174,9 @@ export const createCollectionAndInsertData = async ({
const fn = async (session: ClientSession) => {
// 3. Create collection
const { _id: collectionId } = await createOneCollection({
...createCollectionParams,
...formatCreateCollectionParams,
trainingType,
paragraphChunkDeep,
chunkSize,
chunkSplitter,
indexSize,
hashRawText: rawText ? hashStr(rawText) : undefined,
@@ -179,7 +186,7 @@ export const createCollectionAndInsertData = async ({
if (!dataset.autoSync && dataset.type === DatasetTypeEnum.websiteDataset) return undefined;
if (
[DatasetCollectionTypeEnum.link, DatasetCollectionTypeEnum.apiFile].includes(
createCollectionParams.type
formatCreateCollectionParams.type
)
) {
return addDays(new Date(), 1);
@@ -195,7 +202,7 @@ export const createCollectionAndInsertData = async ({
const { billId: newBillId } = await createTrainingUsage({
teamId,
tmbId,
appName: createCollectionParams.name,
appName: formatCreateCollectionParams.name,
billSource: UsageSourceEnum.training,
vectorModel: getEmbeddingModel(dataset.vectorModel)?.name,
agentModel: getLLMModel(dataset.agentModel)?.name,
@@ -218,7 +225,7 @@ export const createCollectionAndInsertData = async ({
vlmModel: dataset.vlmModel,
indexSize,
mode: trainingMode,
prompt: createCollectionParams.qaPrompt,
prompt: formatCreateCollectionParams.qaPrompt,
billId: traingBillId,
data: chunks.map((item, index) => ({
...item,

View File

@@ -5,13 +5,14 @@ import {
} from '@fastgpt/global/core/dataset/constants';
import { readFileContentFromMongo } from '../../common/file/gridfs/controller';
import { urlsFetch } from '../../common/string/cheerio';
import { type TextSplitProps, splitText2Chunks } from '@fastgpt/global/common/string/textSplitter';
import { type TextSplitProps } from '@fastgpt/global/common/string/textSplitter';
import axios from 'axios';
import { readRawContentByFileBuffer } from '../../common/file/read/utils';
import { parseFileExtensionFromUrl } from '@fastgpt/global/common/string/tools';
import { getApiDatasetRequest } from './apiDataset';
import Papa from 'papaparse';
import type { ApiDatasetServerType } from '@fastgpt/global/core/dataset/apiDataset/type';
import { text2Chunks } from '../../worker/function';
export const readFileRawTextByUrl = async ({
teamId,
@@ -165,7 +166,7 @@ export const readApiServerFileContent = async ({
});
};
export const rawText2Chunks = ({
export const rawText2Chunks = async ({
rawText,
chunkTriggerType = ChunkTriggerConfigTypeEnum.minSize,
chunkTriggerMinSize = 1000,
@@ -182,12 +183,14 @@ export const rawText2Chunks = ({
backupParse?: boolean;
tableParse?: boolean;
} & TextSplitProps): {
q: string;
a: string;
indexes?: string[];
imageIdList?: string[];
}[] => {
} & TextSplitProps): Promise<
{
q: string;
a: string;
indexes?: string[];
imageIdList?: string[];
}[]
> => {
const parseDatasetBackup2Chunks = (rawText: string) => {
const csvArr = Papa.parse(rawText).data as string[][];
@@ -233,7 +236,7 @@ export const rawText2Chunks = ({
}
}
const { chunks } = splitText2Chunks({
const { chunks } = await text2Chunks({
text: rawText,
chunkSize,
...splitProps

View File

@@ -112,24 +112,15 @@ export async function pushDataListToTrainingQueue({
// format q and a, remove empty char
data = data.filter((item) => {
item.q = simpleText(item.q);
item.a = simpleText(item.a);
item.indexes = item.indexes
?.map((index) => {
return {
...index,
text: simpleText(index.text)
};
})
.filter(Boolean);
const q = item.q || '';
const a = item.a || '';
// filter repeat content
if (!item.imageId && !item.q) {
if (!item.imageId && !q) {
return;
}
const text = item.q + item.a;
const text = q + a;
// Oversize llm tokens
if (text.length > maxToken) {

View File

@@ -8,6 +8,8 @@ import {
type CreateUsageProps
} from '@fastgpt/global/support/wallet/usage/api';
import { i18nT } from '../../../../web/i18n/utils';
import { formatModelChars2Points } from './utils';
import { ModelTypeEnum } from '@fastgpt/global/core/ai/model';
export async function createUsage(data: CreateUsageProps) {
try {
@@ -67,6 +69,14 @@ export const createChatUsage = ({
return { totalPoints };
};
export type DatasetTrainingMode = 'paragraph' | 'qa' | 'autoIndex' | 'imageIndex' | 'imageParse';
export const datasetTrainingUsageIndexMap: Record<DatasetTrainingMode, number> = {
paragraph: 1,
qa: 2,
autoIndex: 3,
imageIndex: 4,
imageParse: 5
};
export const createTrainingUsage = async ({
teamId,
tmbId,
@@ -108,6 +118,13 @@ export const createTrainingUsage = async ({
: []),
...(agentModel
? [
{
moduleName: i18nT('account_usage:llm_paragraph'),
model: agentModel,
amount: 0,
inputTokens: 0,
outputTokens: 0
},
{
moduleName: i18nT('account_usage:qa'),
model: agentModel,
@@ -126,6 +143,13 @@ export const createTrainingUsage = async ({
: []),
...(vllmModel
? [
{
moduleName: i18nT('account_usage:image_index'),
model: vllmModel,
amount: 0,
inputTokens: 0,
outputTokens: 0
},
{
moduleName: i18nT('account_usage:image_parse'),
model: vllmModel,
@@ -171,3 +195,43 @@ export const createPdfParseUsage = async ({
]
});
};
export const pushLLMTrainingUsage = async ({
teamId,
tmbId,
model,
inputTokens,
outputTokens,
billId,
mode
}: {
teamId: string;
tmbId: string;
model: string;
inputTokens: number;
outputTokens: number;
billId: string;
mode: DatasetTrainingMode;
}) => {
const index = datasetTrainingUsageIndexMap[mode];
// Compute points
const { totalPoints } = formatModelChars2Points({
model,
modelType: ModelTypeEnum.llm,
inputTokens,
outputTokens
});
concatUsage({
billId,
teamId,
tmbId,
totalPoints,
inputTokens,
outputTokens,
listIndex: index
});
return { totalPoints };
};

View File

@@ -0,0 +1,18 @@
import type { MessagePort } from 'worker_threads';
export const workerResponse = ({
parentPort,
status,
data
}: {
parentPort: MessagePort | null;
status: 'success' | 'error';
data: any;
}) => {
parentPort?.postMessage({
type: status,
data: data
});
process.exit();
};

View File

@@ -0,0 +1,24 @@
import {
splitText2Chunks,
type SplitProps,
type SplitResponse
} from '@fastgpt/global/common/string/textSplitter';
import { runWorker, WorkerNameEnum } from './utils';
import type { ReadFileResponse } from './readFile/type';
import { isTestEnv } from '@fastgpt/global/common/system/constants';
export const text2Chunks = (props: SplitProps) => {
// Test env, not run worker
if (isTestEnv) {
return splitText2Chunks(props);
}
return runWorker<SplitResponse>(WorkerNameEnum.text2Chunks, props);
};
export const readRawContentFromBuffer = (props: {
extension: string;
encoding: string;
buffer: Buffer;
}) => {
return runWorker<ReadFileResponse>(WorkerNameEnum.readFile, props);
};

View File

@@ -1,19 +1,21 @@
import { parentPort } from 'worker_threads';
import { html2md } from './utils';
import { workerResponse } from '../controller';
parentPort?.on('message', (params: { html: string }) => {
try {
const md = html2md(params?.html || '');
parentPort?.postMessage({
type: 'success',
workerResponse({
parentPort,
status: 'success',
data: md
});
} catch (error) {
parentPort?.postMessage({
type: 'error',
workerResponse({
parentPort,
status: 'error',
data: error
});
}
process.exit();
});

View File

@@ -7,6 +7,7 @@ import { readDocsFile } from './extension/docx';
import { readPptxRawText } from './extension/pptx';
import { readXlsxRawText } from './extension/xlsx';
import { readCsvRawText } from './extension/csv';
import { workerResponse } from '../controller';
parentPort?.on('message', async (props: ReadRawTextProps<Uint8Array>) => {
const read = async (params: ReadRawTextByBuffer) => {
@@ -41,17 +42,16 @@ parentPort?.on('message', async (props: ReadRawTextProps<Uint8Array>) => {
};
try {
parentPort?.postMessage({
type: 'success',
workerResponse({
parentPort,
status: 'success',
data: await read(newProps)
});
} catch (error) {
console.log(error);
parentPort?.postMessage({
type: 'error',
workerResponse({
parentPort,
status: 'error',
data: error
});
}
process.exit();
});

View File

@@ -0,0 +1,14 @@
import { parentPort } from 'worker_threads';
import type { SplitProps } from '@fastgpt/global/common/string/textSplitter';
import { splitText2Chunks } from '@fastgpt/global/common/string/textSplitter';
import { workerResponse } from '../controller';
parentPort?.on('message', async (props: SplitProps) => {
const result = splitText2Chunks(props);
workerResponse({
parentPort,
status: 'success',
data: result
});
});

View File

@@ -6,7 +6,8 @@ export enum WorkerNameEnum {
readFile = 'readFile',
htmlStr2Md = 'htmlStr2Md',
countGptMessagesTokens = 'countGptMessagesTokens',
systemPluginRun = 'systemPluginRun'
systemPluginRun = 'systemPluginRun',
text2Chunks = 'text2Chunks'
}
export const getSafeEnv = () => {