From 4b8db293ce677f2f411388383f9eb842b2076eeb Mon Sep 17 00:00:00 2001 From: Archer <545436317@qq.com> Date: Tue, 29 Apr 2025 16:00:35 +0800 Subject: [PATCH] perf: init token worker (#4726) * perf: init token worker * init worker * preload worker * preload worker * remove invalid code --- packages/service/worker/preload.ts | 24 +++++++++++++++++ packages/service/worker/utils.ts | 26 +++++++++---------- .../app/src/service/common/system/index.ts | 9 ++++++- 3 files changed, 44 insertions(+), 15 deletions(-) create mode 100644 packages/service/worker/preload.ts diff --git a/packages/service/worker/preload.ts b/packages/service/worker/preload.ts new file mode 100644 index 000000000..d2ee467b8 --- /dev/null +++ b/packages/service/worker/preload.ts @@ -0,0 +1,24 @@ +import { getWorkerController, WorkerNameEnum } from './utils'; + +export const preLoadWorker = async () => { + const max = Number(global.systemEnv?.tokenWorkers || 30); + const workerController = getWorkerController({ + name: WorkerNameEnum.countGptMessagesTokens, + maxReservedThreads: max + }); + + for await (const item of new Array(max).fill(0)) { + const worker = workerController.createWorker(); + await workerController.run({ + workerId: worker.id, + messages: [ + { + role: 'user', + content: '1' + } + ] + }); + console.log(`Preload worker ${workerController.workerQueue.length}`); + } + console.log('Preload worker success'); +}; diff --git a/packages/service/worker/utils.ts b/packages/service/worker/utils.ts index 850c60099..b7508def0 100644 --- a/packages/service/worker/utils.ts +++ b/packages/service/worker/utils.ts @@ -87,9 +87,15 @@ export class WorkerPool, Response = any> { this.maxReservedThreads = maxReservedThreads; } - runTask({ data, resolve, reject }: WorkerRunTaskType) { + private runTask({ data, resolve, reject }: WorkerRunTaskType) { // Get idle worker or create a new worker const runningWorker = (() => { + // @ts-ignore + if (data.workerId) { + // @ts-ignore + const worker = this.workerQueue.find((item) => item.id === data.workerId); + if (worker) return worker; + } const worker = this.workerQueue.find((item) => item.status === 'idle'); if (worker) return worker; @@ -157,23 +163,15 @@ export class WorkerPool, Response = any> { // watch response worker.on('message', ({ id, type, data }: WorkerResponse) => { - // Run callback - const workerItem = this.workerQueue.find((item) => item.id === id); - - if (!workerItem) { - addLog.warn('Invalid worker', { id, type, data }); - return; - } - if (type === 'success') { - workerItem.resolve(data); + item.resolve(data); } else if (type === 'error') { - workerItem.reject(data); + item.reject(data); } // Clear timeout timer and update worker status - clearTimeout(workerItem.timeoutId); - workerItem.status = 'idle'; + clearTimeout(item.timeoutId); + item.status = 'idle'; }); // Worker error, terminate and delete it.(Un catch error) @@ -191,7 +189,7 @@ export class WorkerPool, Response = any> { return item; } - deleteWorker(workerId: string) { + private deleteWorker(workerId: string) { const item = this.workerQueue.find((item) => item.id === workerId); if (item) { item.reject?.('error'); diff --git a/projects/app/src/service/common/system/index.ts b/projects/app/src/service/common/system/index.ts index 2f7f16662..3513d1980 100644 --- a/projects/app/src/service/common/system/index.ts +++ b/projects/app/src/service/common/system/index.ts @@ -24,6 +24,8 @@ import { getProApiDatasetFilePreviewUrlRequest } from '@/service/core/dataset/apiDataset/controller'; import { isProVersion } from './constants'; +import { countPromptTokens } from '@fastgpt/service/common/string/tiktoken'; +import { preLoadWorker } from '../../../../../../packages/service/worker/preload'; export const readConfigData = async (name: string) => { const splitName = name.split('.'); @@ -89,7 +91,12 @@ export function initGlobalVariables() { /* Init system data(Need to connected db). It only needs to run once */ export async function getInitConfig() { - return Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]); + await Promise.all([initSystemConfig(), getSystemVersion(), loadSystemModels()]); + try { + await preLoadWorker(); + } catch (error) { + console.error('Preload worker error', error); + } } const defaultFeConfigs: FastGPTFeConfigsType = {