perf: init token worker (#4726)

* perf: init token worker

* init worker

* preload worker

* preload worker

* remove invalid code
This commit is contained in:
Archer
2025-04-29 16:00:35 +08:00
committed by GitHub
parent 5e3ec4d6f3
commit 4b8db293ce
3 changed files with 44 additions and 15 deletions

View File

@@ -0,0 +1,24 @@
import { getWorkerController, WorkerNameEnum } from './utils';
export const preLoadWorker = async () => {
const max = Number(global.systemEnv?.tokenWorkers || 30);
const workerController = getWorkerController({
name: WorkerNameEnum.countGptMessagesTokens,
maxReservedThreads: max
});
for await (const item of new Array(max).fill(0)) {
const worker = workerController.createWorker();
await workerController.run({
workerId: worker.id,
messages: [
{
role: 'user',
content: '1'
}
]
});
console.log(`Preload worker ${workerController.workerQueue.length}`);
}
console.log('Preload worker success');
};

View File

@@ -87,9 +87,15 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
this.maxReservedThreads = maxReservedThreads;
}
runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
private runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
// Get idle worker or create a new worker
const runningWorker = (() => {
// @ts-ignore
if (data.workerId) {
// @ts-ignore
const worker = this.workerQueue.find((item) => item.id === data.workerId);
if (worker) return worker;
}
const worker = this.workerQueue.find((item) => item.status === 'idle');
if (worker) return worker;
@@ -157,23 +163,15 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
// watch response
worker.on('message', ({ id, type, data }: WorkerResponse<Response>) => {
// Run callback
const workerItem = this.workerQueue.find((item) => item.id === id);
if (!workerItem) {
addLog.warn('Invalid worker', { id, type, data });
return;
}
if (type === 'success') {
workerItem.resolve(data);
item.resolve(data);
} else if (type === 'error') {
workerItem.reject(data);
item.reject(data);
}
// Clear timeout timer and update worker status
clearTimeout(workerItem.timeoutId);
workerItem.status = 'idle';
clearTimeout(item.timeoutId);
item.status = 'idle';
});
// Worker error, terminate and delete it.Un catch error)
@@ -191,7 +189,7 @@ export class WorkerPool<Props = Record<string, any>, Response = any> {
return item;
}
deleteWorker(workerId: string) {
private deleteWorker(workerId: string) {
const item = this.workerQueue.find((item) => item.id === workerId);
if (item) {
item.reject?.('error');

View File

@@ -24,6 +24,8 @@ import {
getProApiDatasetFilePreviewUrlRequest
} from '@/service/core/dataset/apiDataset/controller';
import { isProVersion } from './constants';
import { countPromptTokens } from '@fastgpt/service/common/string/tiktoken';
import { preLoadWorker } from '../../../../../../packages/service/worker/preload';
export const readConfigData = async (name: string) => {
const splitName = name.split('.');
@@ -89,7 +91,12 @@ export function initGlobalVariables() {
/* Init system data(Need to connected db). It only needs to run once */
export async function getInitConfig() {
return Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]);
await Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]);
try {
await preLoadWorker();
} catch (error) {
console.error('Preload worker error', error);
}
}
const defaultFeConfigs: FastGPTFeConfigsType = {