Perf system plugin and worker (#2126)

* perf: worker pool

* perf: worker register

* perf: worker controller

* perf: system plugin worker

* perf: system plugin worker

* perf: worker

* perf: worker

* worker timeout

* perf: copy icon
This commit is contained in:
Archer
2024-07-23 11:23:42 +08:00
committed by GitHub
parent a4787bce5c
commit e99c91aaa6
34 changed files with 433 additions and 235 deletions

View File

@@ -6,7 +6,7 @@ import { addHours } from 'date-fns';
import { WorkerNameEnum, runWorker } from '../../../worker/utils';
import fs from 'fs';
import { detectFileEncoding } from '@fastgpt/global/common/file/tools';
import { ReadFileResponse } from '../../../worker/file/type';
import type { ReadFileResponse } from '../../../worker/readFile/type';
export const initMarkdownText = ({
teamId,

View File

@@ -6,95 +6,41 @@ import {
} from '@fastgpt/global/core/ai/type';
import { chats2GPTMessages } from '@fastgpt/global/core/chat/adapt';
import { ChatItemType } from '@fastgpt/global/core/chat/type';
import { WorkerNameEnum, getWorker } from '../../../worker/utils';
import { WorkerNameEnum, getWorkerController } from '../../../worker/utils';
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { getNanoid } from '@fastgpt/global/common/string/tools';
import { addLog } from '../../system/log';
export const getTiktokenWorker = () => {
const maxWorkers = global.systemEnv?.tokenWorkers || 20;
if (!global.tiktokenWorkers) {
global.tiktokenWorkers = [];
}
if (global.tiktokenWorkers.length >= maxWorkers) {
return global.tiktokenWorkers[Math.floor(Math.random() * global.tiktokenWorkers.length)];
}
const worker = getWorker(WorkerNameEnum.countGptMessagesTokens);
const i = global.tiktokenWorkers.push({
index: global.tiktokenWorkers.length,
worker,
callbackMap: {}
});
worker.on('message', ({ id, data }: { id: string; data: number }) => {
const callback = global.tiktokenWorkers[i - 1]?.callbackMap?.[id];
if (callback) {
callback?.(data);
delete global.tiktokenWorkers[i - 1].callbackMap[id];
}
});
return global.tiktokenWorkers[i - 1];
};
export const countGptMessagesTokens = (
export const countGptMessagesTokens = async (
messages: ChatCompletionMessageParam[],
tools?: ChatCompletionTool[],
functionCall?: ChatCompletionCreateParams.Function[]
) => {
return new Promise<number>(async (resolve) => {
try {
const start = Date.now();
try {
const workerController = getWorkerController<
{
messages: ChatCompletionMessageParam[];
tools?: ChatCompletionTool[];
functionCall?: ChatCompletionCreateParams.Function[];
},
number
>({
name: WorkerNameEnum.countGptMessagesTokens,
maxReservedThreads: global.systemEnv?.tokenWorkers || 20
});
const { worker, callbackMap } = getTiktokenWorker();
const total = await workerController.run({ messages, tools, functionCall });
const id = getNanoid();
const timer = setTimeout(() => {
console.log('Count token Time out');
resolve(
messages.reduce((sum, item) => {
if (item.content) {
return sum + item.content.length * 0.5;
}
return sum;
}, 0)
);
delete callbackMap[id];
}, 60000);
callbackMap[id] = (data) => {
// 检测是否有内存泄漏
addLog.debug(`Count token time: ${Date.now() - start}, token: ${data}`);
// console.log(process.memoryUsage());
resolve(data);
clearTimeout(timer);
};
// 可以进一步优化(传递100w token数据,实际需要300ms,较慢)
worker.postMessage({
id,
messages,
tools,
functionCall
});
} catch (error) {
addLog.error('Count token error', error);
const total = messages.reduce((sum, item) => {
if (item.content) {
return sum + item.content.length;
}
return sum;
}, 0);
resolve(total);
}
});
return total;
} catch (error) {
addLog.error('Count token error', error);
const total = messages.reduce((sum, item) => {
if (item.content) {
return sum + item.content.length * 0.5;
}
return sum;
}, 0);
return total;
}
};
export const countMessagesTokens = (messages: ChatItemType[]) => {

View File

@@ -30,7 +30,7 @@ const { LOG_LEVEL, STORE_LOG_LEVEL } = (() => {
const STORE_LOG_LEVEL = (process.env.STORE_LOG_LEVEL || '').toLocaleLowerCase();
return {
LOG_LEVEL: envLogLevelMap[LOG_LEVEL] || LogLevelEnum.info,
LOG_LEVEL: envLogLevelMap[LOG_LEVEL] ?? LogLevelEnum.info,
STORE_LOG_LEVEL: envLogLevelMap[STORE_LOG_LEVEL] ?? 99
};
})();

View File

@@ -441,11 +441,18 @@ export async function searchDatasetData(props: SearchDatasetDataProps) {
// token filter
const filterMaxTokensResult = await (async () => {
const tokensScoreFilter = await Promise.all(
scoreFilter.map(async (item) => ({
...item,
tokens: await countPromptTokens(item.q + item.a)
}))
);
const results: SearchDataResponseItemType[] = [];
let totalTokens = 0;
for await (const item of scoreFilter) {
totalTokens += await countPromptTokens(item.q + item.a);
for await (const item of tokensScoreFilter) {
totalTokens += item.tokens;
if (totalTokens > maxTokens + 500) {
break;

View File

@@ -122,7 +122,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
let chatAssistantResponse: AIChatItemValueItemType[] = []; // The value will be returned to the user
let chatNodeUsages: ChatNodeUsageType[] = [];
let toolRunResponse: ToolRunResponseItemType;
let runningTime = Date.now();
let debugNextStepRunNodes: RuntimeNodeItemType[] = [];
/* Store special response field */
@@ -142,13 +141,8 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
[DispatchNodeResponseKeyEnum.assistantResponses]?: AIChatItemValueItemType[]; // tool module, save the response value
}
) {
const time = Date.now();
if (responseData) {
chatResponses.push({
...responseData,
runningTime: +((time - runningTime) / 1000).toFixed(2)
});
chatResponses.push(responseData);
}
if (nodeDispatchUsages) {
chatNodeUsages = chatNodeUsages.concat(nodeDispatchUsages);
@@ -175,8 +169,6 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
});
}
}
runningTime = time;
}
/* Pass the output of the module to the next stage */
function nodeOutput(
@@ -328,6 +320,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
status: 'running'
});
}
const startTime = Date.now();
// get node running params
const params = getNodeRunParams(node);
@@ -362,6 +355,7 @@ export async function dispatchWorkFlow(data: Props): Promise<DispatchFlowRespons
nodeId: node.nodeId,
moduleName: node.name,
moduleType: node.flowNodeType,
runningTime: +((Date.now() - startTime) / 1000).toFixed(2),
...dispatchRes[DispatchNodeResponseKeyEnum.nodeResponse]
};
})();

View File

@@ -7,6 +7,7 @@ import {
LLMModelItemType
} from '@fastgpt/global/core/ai/model.d';
import { SubPlanType } from '@fastgpt/global/support/wallet/sub/type';
import { WorkerNameEnum, WorkerPool } from './worker/utils';
import { Worker } from 'worker_threads';
declare global {
@@ -20,12 +21,8 @@ declare global {
var whisperModel: WhisperModelType;
var reRankModels: ReRankModelItemType[];
var tiktokenWorkers: {
index: number;
worker: Worker;
callbackMap: Record<string, (e: number) => void>;
}[];
var systemLoadedGlobalVariables: boolean;
var systemLoadedGlobalConfig: boolean;
var workerPoll: Record<WorkerNameEnum, WorkerPool>;
}

View File

@@ -59,16 +59,16 @@ export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise<Read
const loadingTask = pdfjs.getDocument(buffer.buffer);
const doc = await loadingTask.promise;
const pageTextPromises = [];
for (let pageNo = 1; pageNo <= doc.numPages; pageNo++) {
pageTextPromises.push(readPDFPage(doc, pageNo));
// Avoid OOM.
let result = '';
const pageArr = Array.from({ length: doc.numPages }, (_, i) => i + 1);
for await (const pageNo of pageArr) {
result += await readPDFPage(doc, pageNo);
}
const pageTexts = await Promise.all(pageTextPromises);
loadingTask.destroy();
return {
rawText: pageTexts.join('')
rawText: result
};
};

View File

@@ -1,19 +1,32 @@
import { Worker } from 'worker_threads';
import path from 'path';
import { addLog } from '../common/system/log';
export enum WorkerNameEnum {
readFile = 'readFile',
htmlStr2Md = 'htmlStr2Md',
countGptMessagesTokens = 'countGptMessagesTokens'
countGptMessagesTokens = 'countGptMessagesTokens',
systemPluginRun = 'systemPluginRun'
}
export const getSafeEnv = () => {
return {
LOG_LEVEL: process.env.LOG_LEVEL,
STORE_LOG_LEVEL: process.env.STORE_LOG_LEVEL,
NODE_ENV: process.env.NODE_ENV
};
};
export const getWorker = (name: WorkerNameEnum) => {
const workerPath = path.join(process.cwd(), '.next', 'server', 'worker', `${name}.js`);
return new Worker(workerPath);
return new Worker(workerPath, {
env: getSafeEnv()
});
};
export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string, any>) => {
return new Promise<T>((resolve, reject) => {
const start = Date.now();
const worker = getWorker(name);
worker.postMessage(params);
@@ -22,6 +35,11 @@ export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string,
if (msg.type === 'error') return reject(msg.data);
resolve(msg.data);
const time = Date.now() - start;
if (time > 1000) {
addLog.info(`Worker ${name} run time: ${time}ms`);
}
});
worker.on('error', (err) => {
@@ -34,3 +52,169 @@ export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string,
});
});
};
type WorkerRunTaskType<T> = { data: T; resolve: (e: any) => void; reject: (e: any) => void };
type WorkerQueueItem = {
id: string;
worker: Worker;
status: 'running' | 'idle';
taskTime: number;
timeoutId?: NodeJS.Timeout;
resolve: (e: any) => void;
reject: (e: any) => void;
};
type WorkerResponse<T = any> = {
id: string;
type: 'success' | 'error';
data: T;
};
/*
多线程任务管理
* 全局只需要创建一个示例
* 可以设置最大常驻线程(不会被销毁),线程满了后,后续任务会等待执行。
* 每次执行,会把数据丢到一个空闲线程里运行。主线程需要监听子线程返回的数据,并执行对于的 callback主要是通过 workerId 进行标记。
* 务必保证,每个线程只会同时运行 1 个任务,否则 callback 会对应不上。
*/
export class WorkerPool<Props = Record<string, any>, Response = any> {
name: WorkerNameEnum;
maxReservedThreads: number;
workerQueue: WorkerQueueItem[] = [];
waitQueue: WorkerRunTaskType<Props>[] = [];
constructor({ name, maxReservedThreads }: { name: WorkerNameEnum; maxReservedThreads: number }) {
this.name = name;
this.maxReservedThreads = maxReservedThreads;
}
runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
// Get idle worker or create a new worker
const runningWorker = (() => {
const worker = this.workerQueue.find((item) => item.status === 'idle');
if (worker) return worker;
if (this.workerQueue.length < this.maxReservedThreads) {
return this.createWorker();
}
})();
if (runningWorker) {
// Update memory data to latest task
runningWorker.status = 'running';
runningWorker.taskTime = Date.now();
runningWorker.resolve = resolve;
runningWorker.reject = reject;
runningWorker.timeoutId = setTimeout(() => {
reject('Worker timeout');
}, 30000);
runningWorker.worker.postMessage({
id: runningWorker.id,
...data
});
} else {
// Not enough worker, push to wait queue
this.waitQueue.push({ data, resolve, reject });
}
}
run(data: Props) {
// watch memory
addLog.debug(`${this.name} worker queueLength: ${this.workerQueue.length}`);
return new Promise<Response>((resolve, reject) => {
/*
Whether the task is executed immediately or delayed, the promise callback will dispatch after task complete.
*/
this.runTask({
data,
resolve,
reject
});
}).finally(() => {
// Run wait queue
const waitTask = this.waitQueue.shift();
if (waitTask) {
this.runTask(waitTask);
}
});
}
createWorker() {
// Create a new worker and push it queue.
const workerId = `${Date.now()}${Math.random()}`;
const worker = getWorker(this.name);
const item: WorkerQueueItem = {
id: workerId,
worker,
status: 'running',
taskTime: Date.now(),
resolve: () => {},
reject: () => {}
};
this.workerQueue.push(item);
// watch response
worker.on('message', ({ id, type, data }: WorkerResponse<Response>) => {
// Run callback
const workerItem = this.workerQueue.find((item) => item.id === id);
if (!workerItem) {
addLog.warn('Invalid worker', { id, type, data });
return;
}
if (type === 'success') {
workerItem.resolve(data);
} else if (type === 'error') {
workerItem.reject(data);
}
// Clear timeout timer and update worker status
clearTimeout(workerItem.timeoutId);
workerItem.status = 'idle';
});
// Worker error, terminate and delete it.Un catch error)
worker.on('error', (err) => {
addLog.warn('Worker error', { err });
this.deleteWorker(workerId);
});
worker.on('messageerror', (err) => {
addLog.warn('Worker error', { err });
this.deleteWorker(workerId);
});
return item;
}
deleteWorker(workerId: string) {
const item = this.workerQueue.find((item) => item.id === workerId);
if (item) {
item.reject?.('error');
clearTimeout(item.timeoutId);
item.worker.terminate();
}
this.workerQueue = this.workerQueue.filter((item) => item.id !== workerId);
}
}
export const getWorkerController = <Props, Response>(props: {
name: WorkerNameEnum;
maxReservedThreads: number;
}) => {
if (!global.workerPoll) {
// @ts-ignore
global.workerPoll = {};
}
const name = props.name;
if (global.workerPoll[name]) return global.workerPoll[name] as WorkerPool<Props, Response>;
global.workerPoll[name] = new WorkerPool(props);
return global.workerPoll[name] as WorkerPool<Props, Response>;
};