From e99c91aaa6013ae63c962b024a36cb33d035691f Mon Sep 17 00:00:00 2001 From: Archer <545436317@qq.com> Date: Tue, 23 Jul 2024 11:23:42 +0800 Subject: [PATCH] Perf system plugin and worker (#2126) * perf: worker pool * perf: worker register * perf: worker controller * perf: system plugin worker * perf: system plugin worker * perf: worker * perf: worker * worker timeout * perf: copy icon --- .../zh-cn/docs/development/upgrading/488.md | 5 + packages/plugins/package.json | 1 + packages/plugins/register.ts | 22 +- packages/plugins/runtime/worker.ts | 24 +++ .../plugins/src/duckduckgo/search/index.ts | 5 +- .../plugins/src/duckduckgo/searchImg/index.ts | 5 +- .../src/duckduckgo/searchNews/index.ts | 5 +- .../src/duckduckgo/searchVideo/index.ts | 5 +- packages/plugins/src/mathExprVal/index.ts | 1 - packages/service/common/file/read/utils.ts | 2 +- .../service/common/string/tiktoken/index.ts | 106 +++------- packages/service/common/system/log.ts | 2 +- .../service/core/dataset/search/controller.ts | 11 +- .../service/core/workflow/dispatch/index.ts | 12 +- packages/service/type.d.ts | 9 +- .../cl100k_base.json | 0 .../index.ts} | 0 .../{file => readFile}/extension/csv.ts | 0 .../{file => readFile}/extension/docx.ts | 0 .../{file => readFile}/extension/html.ts | 0 .../{file => readFile}/extension/pdf.ts | 12 +- .../{file => readFile}/extension/pptx.ts | 0 .../{file => readFile}/extension/rawText.ts | 0 .../{file => readFile}/extension/xlsx.ts | 0 .../{file/read.ts => readFile/index.ts} | 0 .../worker/{file => readFile}/parseOffice.ts | 0 .../worker/{file => readFile}/type.d.ts | 0 packages/service/worker/utils.ts | 188 +++++++++++++++++- .../common/Icon/icons/core/chat/chevronUp.svg | 4 +- pnpm-lock.yaml | 9 +- projects/app/next.config.js | 50 ++++- .../ChatBox/components/ChatItem.tsx | 21 +- .../ChatBox/components/ResponseTags.tsx | 163 +++++++-------- .../detail/components/SimpleApp/ChatTest.tsx | 6 +- 34 files changed, 433 insertions(+), 235 deletions(-) create mode 100644 packages/plugins/runtime/worker.ts rename packages/service/worker/{tiktoken => countGptMessagesTokens}/cl100k_base.json (100%) rename packages/service/worker/{tiktoken/countGptMessagesTokens.ts => countGptMessagesTokens/index.ts} (100%) rename packages/service/worker/{file => readFile}/extension/csv.ts (100%) rename packages/service/worker/{file => readFile}/extension/docx.ts (100%) rename packages/service/worker/{file => readFile}/extension/html.ts (100%) rename packages/service/worker/{file => readFile}/extension/pdf.ts (88%) rename packages/service/worker/{file => readFile}/extension/pptx.ts (100%) rename packages/service/worker/{file => readFile}/extension/rawText.ts (100%) rename packages/service/worker/{file => readFile}/extension/xlsx.ts (100%) rename packages/service/worker/{file/read.ts => readFile/index.ts} (100%) rename packages/service/worker/{file => readFile}/parseOffice.ts (100%) rename packages/service/worker/{file => readFile}/type.d.ts (100%) diff --git a/docSite/content/zh-cn/docs/development/upgrading/488.md b/docSite/content/zh-cn/docs/development/upgrading/488.md index fd7fedce7..72c8d146f 100644 --- a/docSite/content/zh-cn/docs/development/upgrading/488.md +++ b/docSite/content/zh-cn/docs/development/upgrading/488.md @@ -23,3 +23,8 @@ weight: 816 1. 新增 - 重构系统插件的结构。允许向开源社区 PR 系统插件,具体可见: [如何向 FastGPT 社区提交系统插件](https://fael3z0zfze.feishu.cn/wiki/ERZnw9R26iRRG0kXZRec6WL9nwh)。 2. 新增 - DuckDuckGo 系统插件。 3. 优化 - 节点图标。 +4. 优化 - 对话框引用增加额外复制案件,便于复制。增加引用内容折叠。 +5. 修复 - Permission 表声明问题。 +6. 修复 - 并行执行节点,运行时间未正确记录。 +7. 修复 - 简易模式,首次进入,无法正确获取知识库配置。 +8. 修复 - Log level 配置 diff --git a/packages/plugins/package.json b/packages/plugins/package.json index 9f0a9be9a..8611adefd 100644 --- a/packages/plugins/package.json +++ b/packages/plugins/package.json @@ -4,6 +4,7 @@ "dependencies": { "duck-duck-scrape": "^2.2.5", "lodash": "^4.17.21", + "axios": "^1.5.1", "expr-eval": "^2.0.2" }, "devDependencies": { diff --git a/packages/plugins/register.ts b/packages/plugins/register.ts index f686c9400..bfc00a9a2 100644 --- a/packages/plugins/register.ts +++ b/packages/plugins/register.ts @@ -4,10 +4,12 @@ import { FastGPTProUrl, isProduction } from '../service/common/system/constants' import { GET, POST } from '@fastgpt/service/common/api/plusRequest'; import { SystemPluginTemplateItemType } from '@fastgpt/global/core/workflow/type'; import { cloneDeep } from 'lodash'; +import { WorkerNameEnum, runWorker } from '@fastgpt/service/worker/utils'; -let list = [ - 'getTime', - 'fetchUrl', +// Run in main thread +const staticPluginList = ['getTime', 'fetchUrl']; +// Run in worker thread (Have npm packages) +const packagePluginList = [ 'mathExprVal', 'duckduckgo', 'duckduckgo/search', @@ -16,6 +18,8 @@ let list = [ 'duckduckgo/searchVideo' ]; +const list = [...staticPluginList, ...packagePluginList]; + /* Get plugins */ export const getCommunityPlugins = () => { return list.map((name) => { @@ -58,8 +62,7 @@ export const getSystemPluginTemplates = async (refresh = false) => { }; export const getCommunityCb = async () => { - // Do not modify the following code - const loadModule = async (name: string) => { + const loadCommunityModule = async (name: string) => { const module = await import(`./src/${name}/index`); return module.default; }; @@ -70,7 +73,14 @@ export const getCommunityCb = async () => { try { return { name, - cb: await loadModule(name) + cb: staticPluginList.includes(name) + ? await loadCommunityModule(name) + : (e: any) => { + return runWorker(WorkerNameEnum.systemPluginRun, { + pluginName: name, + data: e + }); + } }; } catch (error) {} }) diff --git a/packages/plugins/runtime/worker.ts b/packages/plugins/runtime/worker.ts new file mode 100644 index 000000000..574b2423e --- /dev/null +++ b/packages/plugins/runtime/worker.ts @@ -0,0 +1,24 @@ +import { SystemPluginResponseType } from '../type'; +import { parentPort } from 'worker_threads'; + +const loadModule = async (name: string): Promise<(e: any) => SystemPluginResponseType> => { + const module = await import(`../src/${name}/index`); + return module.default; +}; + +parentPort?.on('message', async ({ pluginName, data }: { pluginName: string; data: any }) => { + try { + const cb = await loadModule(pluginName); + parentPort?.postMessage({ + type: 'success', + data: await cb(data) + }); + } catch (error) { + parentPort?.postMessage({ + type: 'error', + data: error + }); + } + + process.exit(); +}); diff --git a/packages/plugins/src/duckduckgo/search/index.ts b/packages/plugins/src/duckduckgo/search/index.ts index 0d93404a1..002ac5dbd 100644 --- a/packages/plugins/src/duckduckgo/search/index.ts +++ b/packages/plugins/src/duckduckgo/search/index.ts @@ -32,14 +32,13 @@ const main = async (props: Props, retry = 3): Response => { }; } catch (error) { if (retry <= 0) { + addLog.warn('DuckDuckGo error', { error }); return { result: 'Failed to fetch data' }; } - addLog.warn('DuckDuckGo error', { error }); - - await delay(Math.random() * 2000); + await delay(Math.random() * 5000); return main(props, retry - 1); } }; diff --git a/packages/plugins/src/duckduckgo/searchImg/index.ts b/packages/plugins/src/duckduckgo/searchImg/index.ts index a49bd3136..86a45db56 100644 --- a/packages/plugins/src/duckduckgo/searchImg/index.ts +++ b/packages/plugins/src/duckduckgo/searchImg/index.ts @@ -31,14 +31,13 @@ const main = async (props: Props, retry = 3): Response => { }; } catch (error) { if (retry <= 0) { + addLog.warn('DuckDuckGo error', { error }); return { result: 'Failed to fetch data' }; } - addLog.warn('DuckDuckGo error', { error }); - - await delay(Math.random() * 2000); + await delay(Math.random() * 5000); return main(props, retry - 1); } }; diff --git a/packages/plugins/src/duckduckgo/searchNews/index.ts b/packages/plugins/src/duckduckgo/searchNews/index.ts index 99a2af030..deeae0f6f 100644 --- a/packages/plugins/src/duckduckgo/searchNews/index.ts +++ b/packages/plugins/src/duckduckgo/searchNews/index.ts @@ -32,14 +32,13 @@ const main = async (props: Props, retry = 3): Response => { }; } catch (error) { if (retry <= 0) { + addLog.warn('DuckDuckGo error', { error }); return { result: 'Failed to fetch data' }; } - addLog.warn('DuckDuckGo error', { error }); - - await delay(Math.random() * 2000); + await delay(Math.random() * 5000); return main(props, retry - 1); } }; diff --git a/packages/plugins/src/duckduckgo/searchVideo/index.ts b/packages/plugins/src/duckduckgo/searchVideo/index.ts index ba0fe5b9e..948eb177a 100644 --- a/packages/plugins/src/duckduckgo/searchVideo/index.ts +++ b/packages/plugins/src/duckduckgo/searchVideo/index.ts @@ -32,14 +32,13 @@ const main = async (props: Props, retry = 3): Response => { }; } catch (error) { if (retry <= 0) { + addLog.warn('DuckDuckGo error', { error }); return { result: 'Failed to fetch data' }; } - addLog.warn('DuckDuckGo error', { error }); - - await delay(Math.random() * 2000); + await delay(Math.random() * 5000); return main(props, retry - 1); } }; diff --git a/packages/plugins/src/mathExprVal/index.ts b/packages/plugins/src/mathExprVal/index.ts index a033c6645..a171b7fb9 100644 --- a/packages/plugins/src/mathExprVal/index.ts +++ b/packages/plugins/src/mathExprVal/index.ts @@ -1,4 +1,3 @@ -import { NodeInputKeyEnum } from '@fastgpt/global/core/workflow/constants'; import { Parser } from 'expr-eval'; type Props = { diff --git a/packages/service/common/file/read/utils.ts b/packages/service/common/file/read/utils.ts index 788c3c11c..abac57119 100644 --- a/packages/service/common/file/read/utils.ts +++ b/packages/service/common/file/read/utils.ts @@ -6,7 +6,7 @@ import { addHours } from 'date-fns'; import { WorkerNameEnum, runWorker } from '../../../worker/utils'; import fs from 'fs'; import { detectFileEncoding } from '@fastgpt/global/common/file/tools'; -import { ReadFileResponse } from '../../../worker/file/type'; +import type { ReadFileResponse } from '../../../worker/readFile/type'; export const initMarkdownText = ({ teamId, diff --git a/packages/service/common/string/tiktoken/index.ts b/packages/service/common/string/tiktoken/index.ts index 8c825c906..391d3a243 100644 --- a/packages/service/common/string/tiktoken/index.ts +++ b/packages/service/common/string/tiktoken/index.ts @@ -6,95 +6,41 @@ import { } from '@fastgpt/global/core/ai/type'; import { chats2GPTMessages } from '@fastgpt/global/core/chat/adapt'; import { ChatItemType } from '@fastgpt/global/core/chat/type'; -import { WorkerNameEnum, getWorker } from '../../../worker/utils'; +import { WorkerNameEnum, getWorkerController } from '../../../worker/utils'; import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants'; -import { getNanoid } from '@fastgpt/global/common/string/tools'; import { addLog } from '../../system/log'; -export const getTiktokenWorker = () => { - const maxWorkers = global.systemEnv?.tokenWorkers || 20; - - if (!global.tiktokenWorkers) { - global.tiktokenWorkers = []; - } - - if (global.tiktokenWorkers.length >= maxWorkers) { - return global.tiktokenWorkers[Math.floor(Math.random() * global.tiktokenWorkers.length)]; - } - - const worker = getWorker(WorkerNameEnum.countGptMessagesTokens); - - const i = global.tiktokenWorkers.push({ - index: global.tiktokenWorkers.length, - worker, - callbackMap: {} - }); - - worker.on('message', ({ id, data }: { id: string; data: number }) => { - const callback = global.tiktokenWorkers[i - 1]?.callbackMap?.[id]; - - if (callback) { - callback?.(data); - delete global.tiktokenWorkers[i - 1].callbackMap[id]; - } - }); - - return global.tiktokenWorkers[i - 1]; -}; - -export const countGptMessagesTokens = ( +export const countGptMessagesTokens = async ( messages: ChatCompletionMessageParam[], tools?: ChatCompletionTool[], functionCall?: ChatCompletionCreateParams.Function[] ) => { - return new Promise(async (resolve) => { - try { - const start = Date.now(); + try { + const workerController = getWorkerController< + { + messages: ChatCompletionMessageParam[]; + tools?: ChatCompletionTool[]; + functionCall?: ChatCompletionCreateParams.Function[]; + }, + number + >({ + name: WorkerNameEnum.countGptMessagesTokens, + maxReservedThreads: global.systemEnv?.tokenWorkers || 20 + }); - const { worker, callbackMap } = getTiktokenWorker(); + const total = await workerController.run({ messages, tools, functionCall }); - const id = getNanoid(); - - const timer = setTimeout(() => { - console.log('Count token Time out'); - resolve( - messages.reduce((sum, item) => { - if (item.content) { - return sum + item.content.length * 0.5; - } - return sum; - }, 0) - ); - delete callbackMap[id]; - }, 60000); - - callbackMap[id] = (data) => { - // 检测是否有内存泄漏 - addLog.debug(`Count token time: ${Date.now() - start}, token: ${data}`); - // console.log(process.memoryUsage()); - - resolve(data); - clearTimeout(timer); - }; - - // 可以进一步优化(传递100w token数据,实际需要300ms,较慢) - worker.postMessage({ - id, - messages, - tools, - functionCall - }); - } catch (error) { - addLog.error('Count token error', error); - const total = messages.reduce((sum, item) => { - if (item.content) { - return sum + item.content.length; - } - return sum; - }, 0); - resolve(total); - } - }); + return total; + } catch (error) { + addLog.error('Count token error', error); + const total = messages.reduce((sum, item) => { + if (item.content) { + return sum + item.content.length * 0.5; + } + return sum; + }, 0); + return total; + } }; export const countMessagesTokens = (messages: ChatItemType[]) => { diff --git a/packages/service/common/system/log.ts b/packages/service/common/system/log.ts index cd5d97788..7efc64f08 100644 --- a/packages/service/common/system/log.ts +++ b/packages/service/common/system/log.ts @@ -30,7 +30,7 @@ const { LOG_LEVEL, STORE_LOG_LEVEL } = (() => { const STORE_LOG_LEVEL = (process.env.STORE_LOG_LEVEL || '').toLocaleLowerCase(); return { - LOG_LEVEL: envLogLevelMap[LOG_LEVEL] || LogLevelEnum.info, + LOG_LEVEL: envLogLevelMap[LOG_LEVEL] ?? LogLevelEnum.info, STORE_LOG_LEVEL: envLogLevelMap[STORE_LOG_LEVEL] ?? 99 }; })(); diff --git a/packages/service/core/dataset/search/controller.ts b/packages/service/core/dataset/search/controller.ts index 1f906ac81..e7536da05 100644 --- a/packages/service/core/dataset/search/controller.ts +++ b/packages/service/core/dataset/search/controller.ts @@ -441,11 +441,18 @@ export async function searchDatasetData(props: SearchDatasetDataProps) { // token filter const filterMaxTokensResult = await (async () => { + const tokensScoreFilter = await Promise.all( + scoreFilter.map(async (item) => ({ + ...item, + tokens: await countPromptTokens(item.q + item.a) + })) + ); + const results: SearchDataResponseItemType[] = []; let totalTokens = 0; - for await (const item of scoreFilter) { - totalTokens += await countPromptTokens(item.q + item.a); + for await (const item of tokensScoreFilter) { + totalTokens += item.tokens; if (totalTokens > maxTokens + 500) { break; diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index c8af42c08..b2b783547 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -122,7 +122,6 @@ export async function dispatchWorkFlow(data: Props): Promise void>; - }[]; - var systemLoadedGlobalVariables: boolean; var systemLoadedGlobalConfig: boolean; + + var workerPoll: Record; } diff --git a/packages/service/worker/tiktoken/cl100k_base.json b/packages/service/worker/countGptMessagesTokens/cl100k_base.json similarity index 100% rename from packages/service/worker/tiktoken/cl100k_base.json rename to packages/service/worker/countGptMessagesTokens/cl100k_base.json diff --git a/packages/service/worker/tiktoken/countGptMessagesTokens.ts b/packages/service/worker/countGptMessagesTokens/index.ts similarity index 100% rename from packages/service/worker/tiktoken/countGptMessagesTokens.ts rename to packages/service/worker/countGptMessagesTokens/index.ts diff --git a/packages/service/worker/file/extension/csv.ts b/packages/service/worker/readFile/extension/csv.ts similarity index 100% rename from packages/service/worker/file/extension/csv.ts rename to packages/service/worker/readFile/extension/csv.ts diff --git a/packages/service/worker/file/extension/docx.ts b/packages/service/worker/readFile/extension/docx.ts similarity index 100% rename from packages/service/worker/file/extension/docx.ts rename to packages/service/worker/readFile/extension/docx.ts diff --git a/packages/service/worker/file/extension/html.ts b/packages/service/worker/readFile/extension/html.ts similarity index 100% rename from packages/service/worker/file/extension/html.ts rename to packages/service/worker/readFile/extension/html.ts diff --git a/packages/service/worker/file/extension/pdf.ts b/packages/service/worker/readFile/extension/pdf.ts similarity index 88% rename from packages/service/worker/file/extension/pdf.ts rename to packages/service/worker/readFile/extension/pdf.ts index f68dfadeb..67da75284 100644 --- a/packages/service/worker/file/extension/pdf.ts +++ b/packages/service/worker/readFile/extension/pdf.ts @@ -59,16 +59,16 @@ export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise i + 1); + for await (const pageNo of pageArr) { + result += await readPDFPage(doc, pageNo); } - const pageTexts = await Promise.all(pageTextPromises); - loadingTask.destroy(); return { - rawText: pageTexts.join('') + rawText: result }; }; diff --git a/packages/service/worker/file/extension/pptx.ts b/packages/service/worker/readFile/extension/pptx.ts similarity index 100% rename from packages/service/worker/file/extension/pptx.ts rename to packages/service/worker/readFile/extension/pptx.ts diff --git a/packages/service/worker/file/extension/rawText.ts b/packages/service/worker/readFile/extension/rawText.ts similarity index 100% rename from packages/service/worker/file/extension/rawText.ts rename to packages/service/worker/readFile/extension/rawText.ts diff --git a/packages/service/worker/file/extension/xlsx.ts b/packages/service/worker/readFile/extension/xlsx.ts similarity index 100% rename from packages/service/worker/file/extension/xlsx.ts rename to packages/service/worker/readFile/extension/xlsx.ts diff --git a/packages/service/worker/file/read.ts b/packages/service/worker/readFile/index.ts similarity index 100% rename from packages/service/worker/file/read.ts rename to packages/service/worker/readFile/index.ts diff --git a/packages/service/worker/file/parseOffice.ts b/packages/service/worker/readFile/parseOffice.ts similarity index 100% rename from packages/service/worker/file/parseOffice.ts rename to packages/service/worker/readFile/parseOffice.ts diff --git a/packages/service/worker/file/type.d.ts b/packages/service/worker/readFile/type.d.ts similarity index 100% rename from packages/service/worker/file/type.d.ts rename to packages/service/worker/readFile/type.d.ts diff --git a/packages/service/worker/utils.ts b/packages/service/worker/utils.ts index e0286a0a9..a5fd3b4ae 100644 --- a/packages/service/worker/utils.ts +++ b/packages/service/worker/utils.ts @@ -1,19 +1,32 @@ import { Worker } from 'worker_threads'; import path from 'path'; +import { addLog } from '../common/system/log'; export enum WorkerNameEnum { readFile = 'readFile', htmlStr2Md = 'htmlStr2Md', - countGptMessagesTokens = 'countGptMessagesTokens' + countGptMessagesTokens = 'countGptMessagesTokens', + systemPluginRun = 'systemPluginRun' } +export const getSafeEnv = () => { + return { + LOG_LEVEL: process.env.LOG_LEVEL, + STORE_LOG_LEVEL: process.env.STORE_LOG_LEVEL, + NODE_ENV: process.env.NODE_ENV + }; +}; + export const getWorker = (name: WorkerNameEnum) => { const workerPath = path.join(process.cwd(), '.next', 'server', 'worker', `${name}.js`); - return new Worker(workerPath); + return new Worker(workerPath, { + env: getSafeEnv() + }); }; export const runWorker = (name: WorkerNameEnum, params?: Record) => { return new Promise((resolve, reject) => { + const start = Date.now(); const worker = getWorker(name); worker.postMessage(params); @@ -22,6 +35,11 @@ export const runWorker = (name: WorkerNameEnum, params?: Record 1000) { + addLog.info(`Worker ${name} run time: ${time}ms`); + } }); worker.on('error', (err) => { @@ -34,3 +52,169 @@ export const runWorker = (name: WorkerNameEnum, params?: Record = { data: T; resolve: (e: any) => void; reject: (e: any) => void }; +type WorkerQueueItem = { + id: string; + worker: Worker; + status: 'running' | 'idle'; + taskTime: number; + timeoutId?: NodeJS.Timeout; + resolve: (e: any) => void; + reject: (e: any) => void; +}; +type WorkerResponse = { + id: string; + type: 'success' | 'error'; + data: T; +}; + +/* + 多线程任务管理 + * 全局只需要创建一个示例 + * 可以设置最大常驻线程(不会被销毁),线程满了后,后续任务会等待执行。 + * 每次执行,会把数据丢到一个空闲线程里运行。主线程需要监听子线程返回的数据,并执行对于的 callback,主要是通过 workerId 进行标记。 + * 务必保证,每个线程只会同时运行 1 个任务,否则 callback 会对应不上。 +*/ +export class WorkerPool, Response = any> { + name: WorkerNameEnum; + maxReservedThreads: number; + workerQueue: WorkerQueueItem[] = []; + waitQueue: WorkerRunTaskType[] = []; + + constructor({ name, maxReservedThreads }: { name: WorkerNameEnum; maxReservedThreads: number }) { + this.name = name; + this.maxReservedThreads = maxReservedThreads; + } + + runTask({ data, resolve, reject }: WorkerRunTaskType) { + // Get idle worker or create a new worker + const runningWorker = (() => { + const worker = this.workerQueue.find((item) => item.status === 'idle'); + if (worker) return worker; + + if (this.workerQueue.length < this.maxReservedThreads) { + return this.createWorker(); + } + })(); + + if (runningWorker) { + // Update memory data to latest task + runningWorker.status = 'running'; + runningWorker.taskTime = Date.now(); + runningWorker.resolve = resolve; + runningWorker.reject = reject; + runningWorker.timeoutId = setTimeout(() => { + reject('Worker timeout'); + }, 30000); + + runningWorker.worker.postMessage({ + id: runningWorker.id, + ...data + }); + } else { + // Not enough worker, push to wait queue + this.waitQueue.push({ data, resolve, reject }); + } + } + + run(data: Props) { + // watch memory + addLog.debug(`${this.name} worker queueLength: ${this.workerQueue.length}`); + + return new Promise((resolve, reject) => { + /* + Whether the task is executed immediately or delayed, the promise callback will dispatch after task complete. + */ + this.runTask({ + data, + resolve, + reject + }); + }).finally(() => { + // Run wait queue + const waitTask = this.waitQueue.shift(); + if (waitTask) { + this.runTask(waitTask); + } + }); + } + + createWorker() { + // Create a new worker and push it queue. + const workerId = `${Date.now()}${Math.random()}`; + const worker = getWorker(this.name); + + const item: WorkerQueueItem = { + id: workerId, + worker, + status: 'running', + taskTime: Date.now(), + resolve: () => {}, + reject: () => {} + }; + this.workerQueue.push(item); + + // watch response + worker.on('message', ({ id, type, data }: WorkerResponse) => { + // Run callback + const workerItem = this.workerQueue.find((item) => item.id === id); + + if (!workerItem) { + addLog.warn('Invalid worker', { id, type, data }); + return; + } + + if (type === 'success') { + workerItem.resolve(data); + } else if (type === 'error') { + workerItem.reject(data); + } + + // Clear timeout timer and update worker status + clearTimeout(workerItem.timeoutId); + workerItem.status = 'idle'; + }); + + // Worker error, terminate and delete it.(Un catch error) + worker.on('error', (err) => { + addLog.warn('Worker error', { err }); + this.deleteWorker(workerId); + }); + worker.on('messageerror', (err) => { + addLog.warn('Worker error', { err }); + this.deleteWorker(workerId); + }); + + return item; + } + + deleteWorker(workerId: string) { + const item = this.workerQueue.find((item) => item.id === workerId); + if (item) { + item.reject?.('error'); + clearTimeout(item.timeoutId); + item.worker.terminate(); + } + + this.workerQueue = this.workerQueue.filter((item) => item.id !== workerId); + } +} + +export const getWorkerController = (props: { + name: WorkerNameEnum; + maxReservedThreads: number; +}) => { + if (!global.workerPoll) { + // @ts-ignore + global.workerPoll = {}; + } + + const name = props.name; + + if (global.workerPoll[name]) return global.workerPoll[name] as WorkerPool; + + global.workerPoll[name] = new WorkerPool(props); + + return global.workerPoll[name] as WorkerPool; +}; diff --git a/packages/web/components/common/Icon/icons/core/chat/chevronUp.svg b/packages/web/components/common/Icon/icons/core/chat/chevronUp.svg index bdca3776b..9088fc258 100644 --- a/packages/web/components/common/Icon/icons/core/chat/chevronUp.svg +++ b/packages/web/components/common/Icon/icons/core/chat/chevronUp.svg @@ -1,3 +1,3 @@ - - + + diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 61cc17e66..f5694e298 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -81,6 +81,9 @@ importers: packages/plugins: dependencies: + axios: + specifier: ^1.5.1 + version: 1.7.2 duck-duck-scrape: specifier: ^2.2.5 version: 2.2.5 @@ -13823,7 +13826,7 @@ snapshots: eslint: 8.56.0 eslint-import-resolver-node: 0.3.9 eslint-import-resolver-typescript: 3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1(eslint@8.56.0))(eslint@8.56.0) - eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1(eslint@8.56.0))(eslint@8.56.0))(eslint@8.56.0) + eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.56.0) eslint-plugin-jsx-a11y: 6.9.0(eslint@8.56.0) eslint-plugin-react: 7.34.4(eslint@8.56.0) eslint-plugin-react-hooks: 4.6.2(eslint@8.56.0) @@ -13847,7 +13850,7 @@ snapshots: enhanced-resolve: 5.17.0 eslint: 8.56.0 eslint-module-utils: 2.8.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1(eslint@8.56.0))(eslint@8.56.0))(eslint@8.56.0) - eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1(eslint@8.56.0))(eslint@8.56.0))(eslint@8.56.0) + eslint-plugin-import: 2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.56.0) fast-glob: 3.3.2 get-tsconfig: 4.7.5 is-core-module: 2.14.0 @@ -13869,7 +13872,7 @@ snapshots: transitivePeerDependencies: - supports-color - eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-node@0.3.9)(eslint-plugin-import@2.29.1(eslint@8.56.0))(eslint@8.56.0))(eslint@8.56.0): + eslint-plugin-import@2.29.1(@typescript-eslint/parser@6.21.0(eslint@8.56.0)(typescript@5.5.3))(eslint-import-resolver-typescript@3.6.1)(eslint@8.56.0): dependencies: array-includes: 3.1.8 array.prototype.findlastindex: 1.2.5 diff --git a/projects/app/next.config.js b/projects/app/next.config.js index 23a22daa5..1e455e044 100644 --- a/projects/app/next.config.js +++ b/projects/app/next.config.js @@ -1,5 +1,6 @@ const { i18n } = require('./next-i18next.config'); const path = require('path'); +const fs = require('fs'); const isDev = process.env.NODE_ENV === 'development'; @@ -53,17 +54,10 @@ const nextConfig = { const entries = await oldEntry(...args); return { ...entries, - 'worker/htmlStr2Md': path.resolve( + ...getWorkerConfig(), + 'worker/systemPluginRun': path.resolve( process.cwd(), - '../../packages/service/worker/htmlStr2Md/index.ts' - ), - 'worker/countGptMessagesTokens': path.resolve( - process.cwd(), - '../../packages/service/worker/tiktoken/countGptMessagesTokens.ts' - ), - 'worker/readFile': path.resolve( - process.cwd(), - '../../packages/service/worker/file/read.ts' + '../../packages/plugins/runtime/worker.ts' ) }; } @@ -95,3 +89,39 @@ const nextConfig = { }; module.exports = nextConfig; + +function getWorkerConfig() { + const result = fs.readdirSync(path.resolve(__dirname, '../../packages/service/worker')); + + // 获取所有的目录名 + const folderList = result.filter((item) => { + return fs + .statSync(path.resolve(__dirname, '../../packages/service/worker', item)) + .isDirectory(); + }); + + /* + { + 'worker/htmlStr2Md': path.resolve( + process.cwd(), + '../../packages/service/worker/htmlStr2Md/index.ts' + ), + 'worker/countGptMessagesTokens': path.resolve( + process.cwd(), + '../../packages/service/worker/countGptMessagesTokens/index.ts' + ), + 'worker/readFile': path.resolve( + process.cwd(), + '../../packages/service/worker/readFile/index.ts' + ) + } + */ + const workerConfig = folderList.reduce((acc, item) => { + acc[`worker/${item}`] = path.resolve( + process.cwd(), + `../../packages/service/worker/${item}/index.ts` + ); + return acc; + }, {}); + return workerConfig; +} diff --git a/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ChatItem.tsx b/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ChatItem.tsx index 2cf15aed2..ff1bfb123 100644 --- a/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ChatItem.tsx +++ b/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ChatItem.tsx @@ -144,7 +144,16 @@ const ChatItem = ({ )} {/* content */} - + - + copyData(chatText)} /> diff --git a/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ResponseTags.tsx b/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ResponseTags.tsx index 06a33a9f5..135842a99 100644 --- a/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ResponseTags.tsx +++ b/projects/app/src/components/core/chat/ChatContainer/ChatBox/components/ResponseTags.tsx @@ -40,7 +40,6 @@ const ResponseTags = ({ sourceName: string; }; }>(); - const [isOverflow, setIsOverflow] = useState(true); const [quoteFolded, setQuoteFolded] = useState(true); const [contextModalData, setContextModalData] = useState(); @@ -51,11 +50,9 @@ const ResponseTags = ({ } = useDisclosure(); const quoteListSize = useSize(quoteListRef); - useEffect(() => { - setIsOverflow( - quoteListRef.current ? quoteListRef.current.scrollHeight > (isPc ? 50 : 55) : true - ); - }, [isOverflow, quoteListSize]); + const quoteIsOverflow = quoteListRef.current + ? quoteListRef.current.scrollHeight > (isPc ? 50 : 55) + : true; const { llmModuleAccount, @@ -114,7 +111,7 @@ const ResponseTags = ({ {' '} - {quoteFolded && isOverflow && ( + {quoteFolded && quoteIsOverflow && ( - - { - - - {sourceList.map((item) => { - return ( - - { - e.stopPropagation(); - setQuoteModalData({ - rawSearch: quoteList, - metadata: { - collectionId: item.collectionId, - sourceId: item.sourceId, - sourceName: item.sourceName - } - }); - }} - > - - - {item.sourceName} - - - - ); - })} - {isOverflow && !quoteFolded && ( - setQuoteFolded(!quoteFolded)} - /> - )} - - + : {} } + > + {sourceList.map((item) => { + return ( + + { + e.stopPropagation(); + setQuoteModalData({ + rawSearch: quoteList, + metadata: { + collectionId: item.collectionId, + sourceId: item.sourceId, + sourceName: item.sourceName + } + }); + }} + > + + + {item.sourceName} + + + + ); + })} + {!quoteFolded && ( + setQuoteFolded(!quoteFolded)} + /> + )} )} diff --git a/projects/app/src/pages/app/detail/components/SimpleApp/ChatTest.tsx b/projects/app/src/pages/app/detail/components/SimpleApp/ChatTest.tsx index ac18ff40f..591f34290 100644 --- a/projects/app/src/pages/app/detail/components/SimpleApp/ChatTest.tsx +++ b/projects/app/src/pages/app/detail/components/SimpleApp/ChatTest.tsx @@ -11,21 +11,25 @@ import { useI18n } from '@/web/context/I18n'; import { useContextSelector } from 'use-context-selector'; import { AppContext } from '../context'; import { useChatTest } from '../useChatTest'; +import { useDatasetStore } from '@/web/core/dataset/store/dataset'; const ChatTest = ({ appForm }: { appForm: AppSimpleEditFormType }) => { const { t } = useTranslation(); const { appT } = useI18n(); const { appDetail } = useContextSelector(AppContext, (v) => v); + // form2AppWorkflow dependent allDatasets + const { allDatasets } = useDatasetStore(); const [workflowData, setWorkflowData] = useSafeState({ nodes: appDetail.modules || [], edges: appDetail.edges || [] }); + useEffect(() => { const { nodes, edges } = form2AppWorkflow(appForm); setWorkflowData({ nodes, edges }); - }, [appForm, setWorkflowData]); + }, [appForm, setWorkflowData, allDatasets]); const { restartChat, ChatContainer } = useChatTest({ ...workflowData,