diff --git a/packages/global/core/workflow/runtime/type.d.ts b/packages/global/core/workflow/runtime/type.d.ts index e5925790c..6eca00de6 100644 --- a/packages/global/core/workflow/runtime/type.d.ts +++ b/packages/global/core/workflow/runtime/type.d.ts @@ -72,9 +72,10 @@ export type ChatDispatchProps = { maxRunTimes: number; isToolCall?: boolean; workflowStreamResponse?: WorkflowResponseType; - workflowDispatchDeep?: number; version?: 'v1' | 'v2'; + workflowDispatchDeep: number; + responseAllData?: boolean; responseDetail?: boolean; }; diff --git a/packages/service/core/ai/llm/promptToolCall.ts b/packages/service/core/ai/llm/promptToolCall.ts index 3d82fb669..0d4d31cdb 100644 --- a/packages/service/core/ai/llm/promptToolCall.ts +++ b/packages/service/core/ai/llm/promptToolCall.ts @@ -66,11 +66,12 @@ export const promptToolCallMessageRewrite = ( return cloneMessages; }; -const ERROR_TEXT = 'Tool run error'; +const ERROR_TEXT = 'Tool call error'; export const parsePromptToolCall = ( str: string ): { answer: string; + streamAnswer?: string; toolCalls?: ChatCompletionMessageToolCall[]; } => { str = str.trim(); @@ -99,11 +100,13 @@ export const parsePromptToolCall = ( } catch (error) { if (prefixReg.test(str)) { return { - answer: ERROR_TEXT + answer: `${ERROR_TEXT}: ${str}`, + streamAnswer: `${ERROR_TEXT}: ${str}` }; } else { return { - answer: str + answer: str, + streamAnswer: str }; } } diff --git a/packages/service/core/ai/llm/request.ts b/packages/service/core/ai/llm/request.ts index d5188930a..185661657 100644 --- a/packages/service/core/ai/llm/request.ts +++ b/packages/service/core/ai/llm/request.ts @@ -324,7 +324,11 @@ export const createStreamResponse = async ({ } const { reasoningContent, content, finish_reason, usage } = getResponseData(); - const { answer: llmAnswer, toolCalls } = parsePromptToolCall(content); + const { answer: llmAnswer, streamAnswer, toolCalls } = parsePromptToolCall(content); + + if (streamAnswer) { + onStreaming?.({ text: streamAnswer }); + } toolCalls?.forEach((call) => { onToolCall?.({ call }); diff --git a/packages/service/core/workflow/dispatch/abandoned/runApp.ts b/packages/service/core/workflow/dispatch/abandoned/runApp.ts index a0fdca13d..d97229957 100644 --- a/packages/service/core/workflow/dispatch/abandoned/runApp.ts +++ b/packages/service/core/workflow/dispatch/abandoned/runApp.ts @@ -2,7 +2,7 @@ import type { ChatItemType } from '@fastgpt/global/core/chat/type.d'; import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type'; import { type SelectAppItemType } from '@fastgpt/global/core/workflow/template/system/abandoned/runApp/type'; -import { dispatchWorkFlow } from '../index'; +import { runWorkflow } from '../index'; import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants'; import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants'; import { @@ -59,27 +59,25 @@ export const dispatchAppRequest = async (props: Props): Promise => { const chatHistories = getHistories(history, histories); const { files } = chatValue2RuntimePrompt(query); - const { flowResponses, flowUsages, assistantResponses, system_memories } = await dispatchWorkFlow( - { - ...props, - runningAppInfo: { - id: String(appData._id), - teamId: String(appData.teamId), - tmbId: String(appData.tmbId) - }, - runtimeNodes: storeNodes2RuntimeNodes( - appData.modules, - getWorkflowEntryNodeIds(appData.modules) - ), - runtimeEdges: storeEdges2RuntimeEdges(appData.edges), - histories: chatHistories, - query: runtimePrompt2ChatsValue({ - files, - text: userChatInput - }), - variables: props.variables - } - ); + const { flowResponses, flowUsages, assistantResponses, system_memories } = await runWorkflow({ + ...props, + runningAppInfo: { + id: String(appData._id), + teamId: String(appData.teamId), + tmbId: String(appData.tmbId) + }, + runtimeNodes: storeNodes2RuntimeNodes( + appData.modules, + getWorkflowEntryNodeIds(appData.modules) + ), + runtimeEdges: storeEdges2RuntimeEdges(appData.edges), + histories: chatHistories, + query: runtimePrompt2ChatsValue({ + files, + text: userChatInput + }), + variables: props.variables + }); const completeMessages = chatHistories.concat([ { diff --git a/packages/service/core/workflow/dispatch/ai/agent/index.ts b/packages/service/core/workflow/dispatch/ai/agent/index.ts index b9dab2c68..da334a04b 100644 --- a/packages/service/core/workflow/dispatch/ai/agent/index.ts +++ b/packages/service/core/workflow/dispatch/ai/agent/index.ts @@ -201,7 +201,7 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise< return runToolCall({ ...props, ...requestParams, - maxRunToolTimes: 30 + maxRunToolTimes: 100 }); })(); diff --git a/packages/service/core/workflow/dispatch/ai/agent/toolCall.ts b/packages/service/core/workflow/dispatch/ai/agent/toolCall.ts index 07c01e043..2cea7a92c 100644 --- a/packages/service/core/workflow/dispatch/ai/agent/toolCall.ts +++ b/packages/service/core/workflow/dispatch/ai/agent/toolCall.ts @@ -8,7 +8,7 @@ import { responseWriteController } from '../../../../../common/response'; import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants'; import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils'; import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants'; -import { dispatchWorkFlow } from '../../index'; +import { runWorkflow } from '../../index'; import type { DispatchToolModuleProps, RunToolResponse, ToolNodeItemType } from './type'; import json5 from 'json5'; import type { DispatchFlowResponse } from '../../type'; @@ -110,7 +110,7 @@ export const runToolCall = async ( initToolCallEdges(runtimeEdges, interactiveEntryToolParams.entryNodeIds); // Run entry tool - const toolRunResponse = await dispatchWorkFlow({ + const toolRunResponse = await runWorkflow({ ...workflowProps, isToolCall: true }); @@ -383,7 +383,7 @@ export const runToolCall = async ( })(); initToolNodes(runtimeNodes, [toolNode.nodeId], startParams); - const toolRunResponse = await dispatchWorkFlow({ + const toolRunResponse = await runWorkflow({ ...workflowProps, isToolCall: true }); diff --git a/packages/service/core/workflow/dispatch/child/runApp.ts b/packages/service/core/workflow/dispatch/child/runApp.ts index 680413790..878e3702f 100644 --- a/packages/service/core/workflow/dispatch/child/runApp.ts +++ b/packages/service/core/workflow/dispatch/child/runApp.ts @@ -1,6 +1,6 @@ import type { ChatItemType } from '@fastgpt/global/core/chat/type.d'; import type { ModuleDispatchProps } from '@fastgpt/global/core/workflow/runtime/type'; -import { dispatchWorkFlow } from '../index'; +import { runWorkflow } from '../index'; import { ChatRoleEnum } from '@fastgpt/global/core/chat/constants'; import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants'; import { @@ -132,7 +132,7 @@ export const dispatchRunAppNode = async (props: Props): Promise => { runTimes, workflowInteractiveResponse, system_memories - } = await dispatchWorkFlow({ + } = await runWorkflow({ ...props, lastInteractive: childrenInteractive, // Rewrite stream mode diff --git a/packages/service/core/workflow/dispatch/index.ts b/packages/service/core/workflow/dispatch/index.ts index e4881fe8f..1231f4109 100644 --- a/packages/service/core/workflow/dispatch/index.ts +++ b/packages/service/core/workflow/dispatch/index.ts @@ -47,7 +47,7 @@ import { removeSystemVariable, rewriteRuntimeWorkFlow } from './utils'; import { getHandleId } from '@fastgpt/global/core/workflow/utils'; import { callbackMap } from './constants'; -type Props = ChatDispatchProps & { +type Props = Omit & { runtimeNodes: RuntimeNodeItemType[]; runtimeEdges: RuntimeEdgeItemType[]; }; @@ -58,62 +58,16 @@ type NodeResponseCompleteType = Omit & { [DispatchNodeResponseKeyEnum.nodeResponse]?: ChatHistoryItemResType; }; -/* running */ +// Run workflow export async function dispatchWorkFlow(data: Props): Promise { - let { - res, - runtimeNodes = [], - runtimeEdges = [], - histories = [], - variables = {}, - externalProvider, - stream = false, - retainDatasetCite = true, - version = 'v1', - responseDetail = true, - responseAllData = true - } = data; - const startTime = Date.now(); + const { res, stream, externalProvider } = data; - await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang }); - - // 初始化深度和自动增加深度,避免无限嵌套 - if (!data.workflowDispatchDeep) { - data.workflowDispatchDeep = 1; - } else { - data.workflowDispatchDeep += 1; - } - const isRootRuntime = data.workflowDispatchDeep === 1; - - // 初始化 runtimeNodesMap - const runtimeNodesMap = new Map(runtimeNodes.map((item) => [item.nodeId, item])); - - // Over max depth - if (data.workflowDispatchDeep > 20) { - return { - flowResponses: [], - flowUsages: [], - debugResponse: { - finishedNodes: [], - finishedEdges: [], - nextStepRunNodes: [] - }, - [DispatchNodeResponseKeyEnum.runTimes]: 1, - [DispatchNodeResponseKeyEnum.assistantResponses]: [], - [DispatchNodeResponseKeyEnum.toolResponses]: null, - newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables), - durationSeconds: 0 - }; - } - - let workflowRunTimes = 0; let streamCheckTimer: NodeJS.Timeout | null = null; - // Init - if (isRootRuntime) { - // set sse response headers - res?.setHeader('Connection', 'keep-alive'); // Set keepalive for long connection - if (stream && res) { + // set sse response headers + if (res) { + res.setHeader('Connection', 'keep-alive'); // Set keepalive for long connection + if (stream) { res.on('close', () => res.end()); res.on('error', () => { addLog.error('Request error'); @@ -135,14 +89,68 @@ export async function dispatchWorkFlow(data: Props): Promise { + if (streamCheckTimer) { + clearInterval(streamCheckTimer); + } + }); +} + +type RunWorkflowProps = ChatDispatchProps & { + runtimeNodes: RuntimeNodeItemType[]; + runtimeEdges: RuntimeEdgeItemType[]; +}; +export const runWorkflow = async (data: RunWorkflowProps): Promise => { + let { + res, + runtimeNodes = [], + runtimeEdges = [], + histories = [], + variables = {}, + externalProvider, + retainDatasetCite = true, + version = 'v1', + responseDetail = true, + responseAllData = true + } = data; + + // Over max depth + data.workflowDispatchDeep++; + const isRootRuntime = data.workflowDispatchDeep === 1; + if (data.workflowDispatchDeep > 20) { + return { + flowResponses: [], + flowUsages: [], + debugResponse: { + finishedNodes: [], + finishedEdges: [], + nextStepRunNodes: [] + }, + [DispatchNodeResponseKeyEnum.runTimes]: 1, + [DispatchNodeResponseKeyEnum.assistantResponses]: [], + [DispatchNodeResponseKeyEnum.toolResponses]: null, + newVariables: removeSystemVariable(variables, externalProvider.externalWorkflowVariables), + durationSeconds: 0 }; } + const startTime = Date.now(); + + await rewriteRuntimeWorkFlow({ nodes: runtimeNodes, edges: runtimeEdges, lang: data.lang }); + /* 工作流队列控制 特点: @@ -161,7 +169,9 @@ export async function dispatchWorkFlow(data: Props): Promise [item.nodeId, item])); // Workflow variables + workflowRunTimes = 0; chatResponses: ChatHistoryItemResType[] = []; // response request and save to database chatAssistantResponse: AIChatItemValueItemType[] = []; // The value will be returned to the user chatNodeUsages: ChatNodeUsageType[] = []; @@ -221,7 +231,7 @@ export async function dispatchWorkFlow(data: Props): Promise { // Add run times - workflowRunTimes += runTimes; + this.workflowRunTimes += runTimes; data.maxRunTimes -= runTimes; if (newMemories) { @@ -650,7 +660,7 @@ export async function dispatchWorkFlow(data: Props): Promise => { index++; - const response = await dispatchWorkFlow({ + const response = await runWorkflow({ ...props, lastInteractive: interactiveData?.childrenResponse, variables: newVariables, diff --git a/packages/service/core/workflow/dispatch/plugin/run.ts b/packages/service/core/workflow/dispatch/plugin/run.ts index 234bdf30a..275600609 100644 --- a/packages/service/core/workflow/dispatch/plugin/run.ts +++ b/packages/service/core/workflow/dispatch/plugin/run.ts @@ -20,7 +20,7 @@ import { filterSystemVariables, getNodeErrResponse } from '../utils'; import { getPluginRunUserQuery } from '@fastgpt/global/core/workflow/utils'; import type { NodeInputKeyEnum, NodeOutputKeyEnum } from '@fastgpt/global/core/workflow/constants'; import { getChildAppRuntimeById } from '../../../app/plugin/controller'; -import { dispatchWorkFlow } from '../index'; +import { runWorkflow } from '../index'; import { getUserChatInfoAndAuthTeamPoints } from '../../../../support/permission/auth/team'; import { dispatchRunTool } from '../child/runTool'; import type { PluginRuntimeType } from '@fastgpt/global/core/app/plugin/type'; @@ -118,7 +118,7 @@ export const dispatchRunPlugin = async (props: RunPluginProps): Promise { const input = '1: {"name": "tool", "arguments": invalid json}'; const result = parsePromptToolCall(input); - expect(result).toEqual({ - answer: 'Tool run error' - }); + expect(result.answer).toEqual( + 'Tool call error: 1: {"name": "tool", "arguments": invalid json}' + ); + expect(result.streamAnswer).toEqual( + 'Tool call error: 1: {"name": "tool", "arguments": invalid json}' + ); }); it('should return error message for incomplete JSON with 1:', () => { const input = '1: {"name": "tool"'; const result = parsePromptToolCall(input); - expect(result).toEqual({ - answer: 'Tool run error' - }); + expect(result.answer).toEqual('Tool call error: 1: {"name": "tool"'); + expect(result.streamAnswer).toEqual('Tool call error: 1: {"name": "tool"'); }); it('should handle empty JSON object with 1: (creates tool call with undefined properties)', () => { @@ -187,18 +189,16 @@ describe('parsePromptToolCall function tests', () => { const input = '1:'; const result = parsePromptToolCall(input); - expect(result).toEqual({ - answer: 'Tool run error' - }); + expect(result.answer).toEqual('Tool call error: 1:'); + expect(result.streamAnswer).toEqual('Tool call error: 1:'); }); it('should handle input with only prefix and whitespace', () => { const input = '1: '; const result = parsePromptToolCall(input); - expect(result).toEqual({ - answer: 'Tool run error' - }); + expect(result.answer).toEqual('Tool call error: 1:'); + expect(result.streamAnswer).toEqual('Tool call error: 1:'); }); it('should handle JSON5 syntax in tool call', () => { @@ -244,9 +244,12 @@ describe('parsePromptToolCall function tests', () => { const result = parsePromptToolCall(input); // The sliceJsonStr function can't properly extract JSON when there's extra text after - expect(result).toEqual({ - answer: 'Tool run error' - }); + expect(result.answer).toEqual( + 'Tool call error: Text 1: {"name": "tool1", "arguments": {"param": "value"}} more text 1: {"name": "tool2", "arguments": {}}' + ); + expect(result.streamAnswer).toEqual( + 'Tool call error: Text 1: {"name": "tool1", "arguments": {"param": "value"}} more text 1: {"name": "tool2", "arguments": {}}' + ); }); it('should handle tool name with underscores and numbers', () => {