Files
FastGPT/packages/service/worker/utils.ts
Archer 33d40fd077 feature: System plugin (#5131)
* feat: system Tool (#4959)

* feat: independent system tool

* chore: use ToolNode instead of PluginModule

* chore: tools

* chore: tools templateDir

* refactor: templates

* feat: flush code

* chore: update template

* refactor: migrate delay

* feat: worker pool

* chore: Dockerfile

* docs:  add tools.template.json

* feat: auto flush system tools

* fix: ts error

* chore: create new pool temporarily

* chore: system tool migration

* chore: migration

* fix: fix pnpm-workspace.yaml

* chore: update pnpm-lock.yaml to integrate tool

* chore(systemTool): chore

* chore: add system plugin

* chore(deps): update @fastgpt-sdk/plugin

* fix: type error

* chore: remove plugin package

* chore: move pro plugins code to open source

* feat: support system tool config input

* fix: type error

* perf: i18n

* fix: cr

* chore: update sdk

* feat: system plugin cache

* update mcp server (#5076)

* update mcp server

* fix: action

* fix: dockerfile

* fix: dockerfile

* fix: dockerfile

* fix: dockerfile

* fix: dockerfile

* fix: dockerfile

* feat: system Tool (#4959)

* feat: independent system tool

* chore: use ToolNode instead of PluginModule

* chore: tools

* chore: tools templateDir

* refactor: templates

* feat: flush code

* chore: update template

* refactor: migrate delay

* feat: worker pool

* chore: Dockerfile

* docs:  add tools.template.json

* feat: auto flush system tools

* fix: ts error

* chore: create new pool temporarily

* chore: system tool migration

* chore: migration

* fix: fix pnpm-workspace.yaml

* chore: update pnpm-lock.yaml to integrate tool

* chore(systemTool): chore

* chore: add system plugin

* chore(deps): update @fastgpt-sdk/plugin

* fix: type error

* chore: remove plugin package

* chore: move pro plugins code to open source

* feat: support system tool config input

* fix: type error

* perf: i18n

* fix: cr

* chore: update sdk

* feat: system plugin cache

* perf: run tool

* update package

* perf: config key

* fix: tool ini

* tool config params

* perf: workflow type

* rename tools to  agent

* version list

* perf: tool error

* config secret ux

* perf: config secret ux

* fix: tool config field

* add course to secret input

* feat: support inputConfig switch (#5099)

* feat: support inputConfig switch

* deps: update @fastgpt-sdk/plugin

* chore: update workflows

* fix: inputType

* fix: secret

* add default value to node

* update i18n

* eslint

* add precision to number input

* feat: add number input and select

* perf: number ux

* fix: code

* Proxies image requests to plugin service (#5111)

* Proxies image requests to plugin service

Adds a rewrite rule and API endpoint to proxy image requests
to the plugin service. This allows the app to fetch images from
the plugin's tools directory.

It also adds the plugin base URL to the service's constants, so that
it can use the plugin URL when proxying requests.

* fix: update FastGPTPluginUrl to remove unnecessary API path

* feat: update image proxy destination and add plugin image handler

* Adapt plugin id

* replace avatar

* remove rewrite

* fix: plugin avatar

* update system tool doc

* feat: system tool type

* yml sh

* yml sh

* update doc

* fix: simple app tool select

* fix: switch ui

* update pacakge

* Yamljs (#5129)

* update docker-compose configuration: bump fastgpt and fastgpt-plugin images, change minio host to service name, and adjust service dependencies

* refactor: comment out port exposure in docker-compose configuration

* update: uncomment port exposure in docker-compose configuration

* update: change MINIO_HOST to use specific IP address in docker configuration

* update: modify fastgpt-plugin image version in docker configuration

* update readme

* doc

* remove

---------

Co-authored-by: Finley Ge <32237950+FinleyGe@users.noreply.github.com>
Co-authored-by: Theresa <63280168+sd0ric4@users.noreply.github.com>
2025-07-02 18:15:00 +08:00

225 lines
6.2 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import { Worker } from 'worker_threads';
import path from 'path';
import { addLog } from '../common/system/log';
export enum WorkerNameEnum {
readFile = 'readFile',
htmlStr2Md = 'htmlStr2Md',
countGptMessagesTokens = 'countGptMessagesTokens',
systemPluginRun = 'systemPluginRun',
text2Chunks = 'text2Chunks'
}
export const getSafeEnv = () => {
return {
LOG_LEVEL: process.env.LOG_LEVEL,
STORE_LOG_LEVEL: process.env.STORE_LOG_LEVEL,
NODE_ENV: process.env.NODE_ENV,
HTTP_PROXY: process.env.HTTP_PROXY,
HTTPS_PROXY: process.env.HTTPS_PROXY,
NO_PROXY: process.env.NO_PROXY
};
};
export const getWorker = (name: `${WorkerNameEnum}`) => {
const workerPath = path.join(process.cwd(), '.next', 'server', 'worker', `${name}.js`);
return new Worker(workerPath, {
env: getSafeEnv()
});
};
export const runWorker = <T = any>(name: WorkerNameEnum, params?: Record<string, any>) => {
return new Promise<T>((resolve, reject) => {
const start = Date.now();
const worker = getWorker(name);
worker.postMessage(params);
worker.on('message', (msg: { type: 'success' | 'error'; data: any }) => {
if (msg.type === 'error') return reject(msg.data);
resolve(msg.data);
const time = Date.now() - start;
if (time > 1000) {
addLog.info(`Worker ${name} run time: ${time}ms`);
}
});
worker.on('error', (err) => {
reject(err);
worker.terminate();
});
worker.on('messageerror', (err) => {
reject(err);
worker.terminate();
});
});
};
type WorkerRunTaskType<T> = { data: T; resolve: (e: any) => void; reject: (e: any) => void };
type WorkerQueueItem = {
id: string;
worker: Worker;
status: 'running' | 'idle';
taskTime: number;
timeoutId?: NodeJS.Timeout;
resolve: (e: any) => void;
reject: (e: any) => void;
};
type WorkerResponse<T = any> = {
id: string;
type: 'success' | 'error';
data: T;
};
/*
多线程任务管理
* 全局只需要创建一个示例
* 可以设置最大常驻线程(不会被销毁),线程满了后,后续任务会等待执行。
* 每次执行,会把数据丢到一个空闲线程里运行。主线程需要监听子线程返回的数据,并执行对于的 callback主要是通过 workerId 进行标记。
* 务必保证,每个线程只会同时运行 1 个任务,否则 callback 会对应不上。
*/
export class WorkerPool<Props = Record<string, any>, Response = any> {
name: WorkerNameEnum;
maxReservedThreads: number;
workerQueue: WorkerQueueItem[] = [];
waitQueue: WorkerRunTaskType<Props>[] = [];
constructor({ name, maxReservedThreads }: { name: WorkerNameEnum; maxReservedThreads: number }) {
this.name = name;
this.maxReservedThreads = maxReservedThreads;
}
private runTask({ data, resolve, reject }: WorkerRunTaskType<Props>) {
// Get idle worker or create a new worker
const runningWorker = (() => {
// @ts-ignore
if (data.workerId) {
// @ts-ignore
const worker = this.workerQueue.find((item) => item.id === data.workerId);
if (worker) return worker;
}
const worker = this.workerQueue.find((item) => item.status === 'idle');
if (worker) return worker;
if (this.workerQueue.length < this.maxReservedThreads) {
return this.createWorker();
}
})();
if (runningWorker) {
// Update memory data to latest task
runningWorker.status = 'running';
runningWorker.taskTime = Date.now();
runningWorker.resolve = resolve;
runningWorker.reject = reject;
runningWorker.timeoutId = setTimeout(() => {
reject('Worker timeout');
}, 30000);
runningWorker.worker.postMessage({
id: runningWorker.id,
...data
});
} else {
// Not enough worker, push to wait queue
this.waitQueue.push({ data, resolve, reject });
}
}
run(data: Props) {
// watch memory
// addLog.debug(`${this.name} worker queueLength: ${this.workerQueue.length}`);
return new Promise<Response>((resolve, reject) => {
/*
Whether the task is executed immediately or delayed, the promise callback will dispatch after task complete.
*/
this.runTask({
data,
resolve,
reject
});
}).finally(() => {
// Run wait queue
const waitTask = this.waitQueue.shift();
if (waitTask) {
this.runTask(waitTask);
}
});
}
createWorker() {
// Create a new worker and push it queue.
const workerId = `${Date.now()}${Math.random()}`;
const worker = getWorker(this.name);
const item: WorkerQueueItem = {
id: workerId,
worker,
status: 'running',
taskTime: Date.now(),
resolve: () => {},
reject: () => {}
};
this.workerQueue.push(item);
// watch response
worker.on('message', ({ id, type, data }: WorkerResponse<Response>) => {
if (type === 'success') {
item.resolve(data);
} else if (type === 'error') {
item.reject(data);
}
// Clear timeout timer and update worker status
clearTimeout(item.timeoutId);
item.status = 'idle';
});
// Worker error, terminate and delete it.Un catch error)
worker.on('error', (err) => {
console.log(err);
addLog.error('Worker error', err);
this.deleteWorker(workerId);
});
worker.on('messageerror', (err) => {
console.log(err);
addLog.error('Worker messageerror', err);
this.deleteWorker(workerId);
});
return item;
}
private deleteWorker(workerId: string) {
const item = this.workerQueue.find((item) => item.id === workerId);
if (item) {
item.reject?.('error');
clearTimeout(item.timeoutId);
item.worker.terminate();
}
this.workerQueue = this.workerQueue.filter((item) => item.id !== workerId);
}
}
export const getWorkerController = <Props, Response>(props: {
name: WorkerNameEnum;
maxReservedThreads: number;
}) => {
if (!global.workerPoll) {
// @ts-ignore
global.workerPoll = {};
}
const name = props.name;
if (global.workerPoll[name]) return global.workerPoll[name] as WorkerPool<Props, Response>;
global.workerPoll[name] = new WorkerPool(props);
return global.workerPoll[name] as WorkerPool<Props, Response>;
};