Files
FastGPT/packages/service/core/dataset/training/controller.ts
T
Archer 2e18f1ebc2 next 15 (#6457)
* next 15

* lock

* feat: rename .d.ts to .ts for Next 15 compatibility

- Rename 104 .d.ts files to .ts (Next 15 no longer supports .d.ts in src)
- Remove 5 redundant .d.ts files that had .ts counterparts
- Update all import paths: remove .d suffix from 100 import statements
- Update tsconfig.json include patterns across all packages
- Add pnpm overrides to unify react@18.3.1 across monorepo
- Fix react version mismatch (packages/global and packages/service were resolving to react@19.1.1)

* fix: resolve 61 TypeScript errors from .d.ts to .ts migration

- Fix broken imports using non-relative module paths (e.g. 'support/user/team/type' → relative paths)
- Remove unused/dead imports referencing deleted modules
- Fix duplicate identifiers (show_emptyChat, concatMd, TrainingModeEnum)
- Add missing imports (BoxProps, GroupMemberRole, UsageSourceEnum, dashboard_evaluation)
- Fix generic type constraints (OutLinkEditType, createShareChat)
- Replace removed types with correct alternatives (ChatModelItemType → LLMModelItemType)
- Delete 5 dead code files with 0 references
- Add global type declaration for countTrackQueue
- Fix nullable type narrowing (sourceMember, ParentIdType, optional app fields)

* refactor: replace as ClientSession assertion with proper type narrowing via Omit & intersection

* fix: remove experimental.workerThreads to fix DataCloneError in Next 15 static generation

Next 15 worker threads attempt to structuredClone the config object,
which fails on the webpack function. workerThreads is not needed for
the build to work correctly.

* Update document/content/docs/upgrading/4-14/4148.mdx

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

* fix: ts

* update next config

* update next

* fix: dockerfile

* fix: comment

---------

Co-authored-by: Archer <c121914yu@gmail.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2026-02-25 18:28:16 +08:00

243 lines
6.2 KiB
TypeScript

import { MongoDatasetTraining } from './schema';
import type {
PushDatasetDataChunkProps,
PushDatasetDataResponse
} from '@fastgpt/global/core/dataset/api';
import { TrainingModeEnum } from '@fastgpt/global/core/dataset/constants';
import { type ClientSession } from '../../../common/mongo';
import { getLLMModel, getEmbeddingModel, getVlmModel } from '../../ai/model';
import { mongoSessionRun } from '../../../common/mongo/sessionRun';
import { i18nT } from '../../../../web/i18n/utils';
import { getLLMMaxChunkSize } from '../../../../global/core/dataset/training/utils';
import { retryFn } from '@fastgpt/global/common/system/utils';
import { getLogger, LogCategories } from '../../../common/logger';
const logger = getLogger(LogCategories.MODULE.DATASET.TRAINING);
export const lockTrainingDataByTeamId = async (teamId: string): Promise<any> => {
try {
await MongoDatasetTraining.updateMany(
{
teamId
},
{
lockTime: new Date('2999/5/5')
}
);
} catch (error) {}
};
export async function pushDataListToTrainingQueue({
teamId,
tmbId,
datasetId,
collectionId,
agentModel,
vectorModel,
vlmModel,
data,
billId,
mode = TrainingModeEnum.chunk,
indexSize,
session
}: {
teamId: string;
tmbId: string;
datasetId: string;
collectionId: string;
data: PushDatasetDataChunkProps[];
mode?: TrainingModeEnum;
agentModel: string;
vectorModel: string;
vlmModel?: string;
indexSize?: number;
billId?: string;
session?: ClientSession;
}): Promise<PushDatasetDataResponse> {
const vectorModelData = getEmbeddingModel(vectorModel);
if (!vectorModelData) {
return Promise.reject(i18nT('common:error_embedding_not_config'));
}
const agentModelData = getLLMModel(agentModel);
if (!agentModelData) {
return Promise.reject(i18nT('common:error_llm_not_config'));
}
const { model, maxToken, weight } = await (async () => {
if (mode === TrainingModeEnum.chunk) {
return {
maxToken: Infinity,
model: vectorModelData.model,
weight: vectorModelData.weight
};
}
if (mode === TrainingModeEnum.qa || mode === TrainingModeEnum.auto) {
return {
maxToken: getLLMMaxChunkSize(agentModelData),
model: agentModelData.model,
weight: 0
};
}
if (mode === TrainingModeEnum.image || mode === TrainingModeEnum.imageParse) {
const vllmModelData = getVlmModel(vlmModel);
if (!vllmModelData) {
return Promise.reject(i18nT('common:error_vlm_not_config'));
}
return {
maxToken: getLLMMaxChunkSize(vllmModelData),
model: vllmModelData.model,
weight: 0
};
}
return Promise.reject(`Training mode "${mode}" is inValid`);
})();
// format q and a, remove empty char
data = data.filter((item) => {
const q = item.q || '';
const a = item.a || '';
// filter repeat content
if (!item.imageId && !q) {
return;
}
const text = q + a;
// Oversize llm tokens
if (text.length > maxToken) {
return;
}
return true;
});
// insert data to db
const batchSize = 500; // Batch insert size
const maxBatchesPerTransaction = 20; // Every session can insert at most 20 batches
const insertDataIterative = async (
dataToInsert: typeof data,
session: ClientSession
): Promise<number> => {
let insertedCount = 0;
for (let i = 0; i < dataToInsert.length; i += batchSize) {
const batch = dataToInsert.slice(i, i + batchSize);
if (batch.length === 0) continue;
const result = await MongoDatasetTraining.insertMany(
batch.map((item) => ({
teamId,
tmbId,
datasetId,
collectionId,
billId,
mode,
...(item.q && { q: item.q }),
...(item.a && { a: item.a }),
...(item.imageId && { imageId: item.imageId }),
chunkIndex: item.chunkIndex ?? 0,
indexSize,
weight: weight ?? 0,
indexes: item.indexes,
retryCount: 5
})),
{
session,
ordered: true, // 改为 true: 任何失败立即停止,事务回滚
rawResult: true,
includeResultMetadata: false
}
);
// ordered: true 模式下,成功必定等于批次大小
insertedCount += result.insertedCount;
logger.debug('Training data insert progress', {
insertedCount,
total: dataToInsert.length
});
}
return insertedCount;
};
// 大数据量分段事务处理 (避免事务超时)
const chunkSize = maxBatchesPerTransaction * batchSize; // 10,000 条
let start = Date.now();
if (data.length > chunkSize) {
logger.info('Large dataset detected, using chunked transactions', {
itemCount: data.length,
chunkSize
});
let totalInserted = 0;
for (let i = 0; i < data.length; i += chunkSize) {
const chunk = data.slice(i, i + chunkSize);
await retryFn(async () => {
const inserted = await mongoSessionRun(async (chunkSession) => {
return insertDataIterative(chunk, chunkSession);
});
totalInserted += inserted;
});
}
logger.info('Chunked transactions completed', { durationMs: Date.now() - start });
return { insertLen: totalInserted };
}
// 小数据量单事务处理
if (session) {
const insertedCount = await insertDataIterative(data, session);
logger.info('Single transaction completed', { durationMs: Date.now() - start });
return { insertLen: insertedCount };
} else {
const insertedCount = await mongoSessionRun(async (session) => {
return insertDataIterative(data, session);
});
logger.info('Single transaction completed', { durationMs: Date.now() - start });
return { insertLen: insertedCount };
}
}
export const pushDatasetToParseQueue = async ({
teamId,
tmbId,
datasetId,
collectionId,
billId,
session
}: {
teamId: string;
tmbId: string;
datasetId: string;
collectionId: string;
billId: string;
session: ClientSession;
}) => {
await MongoDatasetTraining.create(
[
{
teamId,
tmbId,
datasetId,
collectionId,
billId,
mode: TrainingModeEnum.parse
}
],
{ session, ordered: true }
);
};