From dd3c25160321ae0fabcd3af83e4cc578ff80502f Mon Sep 17 00:00:00 2001 From: Archer <545436317@qq.com> Date: Wed, 21 May 2025 10:21:20 +0800 Subject: [PATCH] fix: stream response (#4853) --- .../zh-cn/docs/development/upgrading/4910.md | 4 +- packages/service/core/ai/utils.ts | 348 ++++++++++-------- .../dispatch/agent/runTool/functionCall.ts | 53 +-- .../dispatch/agent/runTool/promptCall.ts | 19 +- .../dispatch/agent/runTool/toolChoice.ts | 214 +++++------ .../core/workflow/dispatch/chat/oneapi.ts | 17 +- 6 files changed, 342 insertions(+), 313 deletions(-) diff --git a/docSite/content/zh-cn/docs/development/upgrading/4910.md b/docSite/content/zh-cn/docs/development/upgrading/4910.md index 6c87dfbb4..902088de3 100644 --- a/docSite/content/zh-cn/docs/development/upgrading/4910.md +++ b/docSite/content/zh-cn/docs/development/upgrading/4910.md @@ -19,4 +19,6 @@ weight: 790 ## 🐛 修复 -1. 全文检索多知识库时排序得分排序不正确 \ No newline at end of file +1. 全文检索多知识库时排序得分排序不正确。 +2. 流响应捕获 finish_reason 可能不正确。 +3. 工具调用模式,未保存思考输出。 \ No newline at end of file diff --git a/packages/service/core/ai/utils.ts b/packages/service/core/ai/utils.ts index d52713244..24866578c 100644 --- a/packages/service/core/ai/utils.ts +++ b/packages/service/core/ai/utils.ts @@ -18,15 +18,17 @@ import json5 from 'json5'; */ export const computedMaxToken = ({ maxToken, - model + model, + min }: { maxToken?: number; model: LLMModelItemType; + min?: number; }) => { if (maxToken === undefined) return; maxToken = Math.min(maxToken, model.maxResponse); - return maxToken; + return Math.max(maxToken, min || 0); }; // FastGPT temperature range: [0,10], ai temperature:[0,2],{0,1]…… @@ -178,7 +180,7 @@ export const llmStreamResponseToAnswerText = async ( } } return { - text: parseReasoningContent(answer)[1], + text: removeDatasetCiteText(parseReasoningContent(answer)[1], false), usage, toolCalls }; @@ -192,8 +194,9 @@ export const llmUnStreamResponseToAnswerText = async ( }> => { const answer = response.choices?.[0]?.message?.content || ''; const toolCalls = response.choices?.[0]?.message?.tool_calls; + return { - text: answer, + text: removeDatasetCiteText(parseReasoningContent(answer)[1], false), usage: response.usage, toolCalls }; @@ -240,6 +243,12 @@ export const parseLLMStreamResponse = () => { let citeBuffer = ''; const maxCiteBufferLength = 32; // [Object](CITE)总长度为32 + // Buffer + let buffer_finishReason: CompletionFinishReason = null; + let buffer_usage: CompletionUsage = getLLMDefaultUsage(); + let buffer_reasoningContent = ''; + let buffer_content = ''; + /* parseThinkTag - 只控制是否主动解析 ,如果接口已经解析了,则不再解析。 retainDatasetCite - @@ -257,6 +266,7 @@ export const parseLLMStreamResponse = () => { }; finish_reason?: CompletionFinishReason; }[]; + usage?: CompletionUsage; }; parseThinkTag?: boolean; retainDatasetCite?: boolean; @@ -266,72 +276,71 @@ export const parseLLMStreamResponse = () => { responseContent: string; finishReason: CompletionFinishReason; } => { - const finishReason = part.choices?.[0]?.finish_reason || null; - const content = part.choices?.[0]?.delta?.content || ''; - // @ts-ignore - const reasoningContent = part.choices?.[0]?.delta?.reasoning_content || ''; - const isStreamEnd = !!finishReason; + const data = (() => { + buffer_usage = part.usage || buffer_usage; - // Parse think - const { reasoningContent: parsedThinkReasoningContent, content: parsedThinkContent } = (() => { - if (reasoningContent || !parseThinkTag) { - isInThinkTag = false; - return { reasoningContent, content }; - } + const finishReason = part.choices?.[0]?.finish_reason || null; + buffer_finishReason = finishReason || buffer_finishReason; - if (!content) { - return { - reasoningContent: '', - content: '' - }; - } + const content = part.choices?.[0]?.delta?.content || ''; + // @ts-ignore + const reasoningContent = part.choices?.[0]?.delta?.reasoning_content || ''; + const isStreamEnd = !!buffer_finishReason; - // 如果不在 think 标签中,或者有 reasoningContent(接口已解析),则返回 reasoningContent 和 content - if (isInThinkTag === false) { - return { - reasoningContent: '', - content - }; - } + // Parse think + const { reasoningContent: parsedThinkReasoningContent, content: parsedThinkContent } = + (() => { + if (reasoningContent || !parseThinkTag) { + isInThinkTag = false; + return { reasoningContent, content }; + } - // 检测是否为 think 标签开头的数据 - if (isInThinkTag === undefined) { - // Parse content think and answer - startTagBuffer += content; - // 太少内容时候,暂时不解析 - if (startTagBuffer.length < thinkStartChars.length) { - if (isStreamEnd) { - const tmpContent = startTagBuffer; - startTagBuffer = ''; + // 如果不在 think 标签中,或者有 reasoningContent(接口已解析),则返回 reasoningContent 和 content + if (isInThinkTag === false) { return { reasoningContent: '', - content: tmpContent + content }; } - return { - reasoningContent: '', - content: '' - }; - } - if (startTagBuffer.startsWith(thinkStartChars)) { - isInThinkTag = true; - return { - reasoningContent: startTagBuffer.slice(thinkStartChars.length), - content: '' - }; - } + // 检测是否为 think 标签开头的数据 + if (isInThinkTag === undefined) { + // Parse content think and answer + startTagBuffer += content; + // 太少内容时候,暂时不解析 + if (startTagBuffer.length < thinkStartChars.length) { + if (isStreamEnd) { + const tmpContent = startTagBuffer; + startTagBuffer = ''; + return { + reasoningContent: '', + content: tmpContent + }; + } + return { + reasoningContent: '', + content: '' + }; + } - // 如果未命中 think 标签,则认为不在 think 标签中,返回 buffer 内容作为 content - isInThinkTag = false; - return { - reasoningContent: '', - content: startTagBuffer - }; - } + if (startTagBuffer.startsWith(thinkStartChars)) { + isInThinkTag = true; + return { + reasoningContent: startTagBuffer.slice(thinkStartChars.length), + content: '' + }; + } - // 确认是 think 标签内容,开始返回 think 内容,并实时检测 - /* + // 如果未命中 think 标签,则认为不在 think 标签中,返回 buffer 内容作为 content + isInThinkTag = false; + return { + reasoningContent: '', + content: startTagBuffer + }; + } + + // 确认是 think 标签内容,开始返回 think 内容,并实时检测 + /* 检测 方案。 存储所有疑似 的内容,直到检测到完整的 标签或超出 长度。 content 返回值包含以下几种情况: @@ -342,124 +351,145 @@ export const parseLLMStreamResponse = () => { abc - 完全命中尾标签 k>abc - 命中一部分尾标签 */ - // endTagBuffer 专门用来记录疑似尾标签的内容 - if (endTagBuffer) { - endTagBuffer += content; - if (endTagBuffer.includes(thinkEndChars)) { - isInThinkTag = false; - const answer = endTagBuffer.slice(thinkEndChars.length); - return { - reasoningContent: '', - content: answer - }; - } else if (endTagBuffer.length >= thinkEndChars.length) { - // 缓存内容超出尾标签长度,且仍未命中 ,则认为本次猜测 失败,仍处于 think 阶段。 - const tmp = endTagBuffer; - endTagBuffer = ''; - return { - reasoningContent: tmp, - content: '' - }; - } - return { - reasoningContent: '', - content: '' - }; - } else if (content.includes(thinkEndChars)) { - // 返回内容,完整命中,直接结束 - isInThinkTag = false; - const [think, answer] = content.split(thinkEndChars); - return { - reasoningContent: think, - content: answer - }; - } else { - // 无 buffer,且未命中 ,开始疑似 检测。 - for (let i = 1; i < thinkEndChars.length; i++) { - const partialEndTag = thinkEndChars.slice(0, i); - // 命中一部分尾标签 - if (content.endsWith(partialEndTag)) { - const think = content.slice(0, -partialEndTag.length); - endTagBuffer += partialEndTag; + // endTagBuffer 专门用来记录疑似尾标签的内容 + if (endTagBuffer) { + endTagBuffer += content; + if (endTagBuffer.includes(thinkEndChars)) { + isInThinkTag = false; + const answer = endTagBuffer.slice(thinkEndChars.length); + return { + reasoningContent: '', + content: answer + }; + } else if (endTagBuffer.length >= thinkEndChars.length) { + // 缓存内容超出尾标签长度,且仍未命中 ,则认为本次猜测 失败,仍处于 think 阶段。 + const tmp = endTagBuffer; + endTagBuffer = ''; + return { + reasoningContent: tmp, + content: '' + }; + } return { - reasoningContent: think, + reasoningContent: '', content: '' }; + } else if (content.includes(thinkEndChars)) { + // 返回内容,完整命中,直接结束 + isInThinkTag = false; + const [think, answer] = content.split(thinkEndChars); + return { + reasoningContent: think, + content: answer + }; + } else { + // 无 buffer,且未命中 ,开始疑似 检测。 + for (let i = 1; i < thinkEndChars.length; i++) { + const partialEndTag = thinkEndChars.slice(0, i); + // 命中一部分尾标签 + if (content.endsWith(partialEndTag)) { + const think = content.slice(0, -partialEndTag.length); + endTagBuffer += partialEndTag; + return { + reasoningContent: think, + content: '' + }; + } + } } - } + + // 完全未命中尾标签,还是 think 阶段。 + return { + reasoningContent: content, + content: '' + }; + })(); + + // Parse datset cite + if (retainDatasetCite) { + return { + reasoningContent: parsedThinkReasoningContent, + content: parsedThinkContent, + responseContent: parsedThinkContent, + finishReason: buffer_finishReason + }; } - // 完全未命中尾标签,还是 think 阶段。 - return { - reasoningContent: content, - content: '' - }; - })(); + // 缓存包含 [ 的字符串,直到超出 maxCiteBufferLength 再一次性返回 + const parseCite = (text: string) => { + // 结束时,返回所有剩余内容 + if (isStreamEnd) { + const content = citeBuffer + text; + return { + content: removeDatasetCiteText(content, false) + }; + } + + // 新内容包含 [,初始化缓冲数据 + if (text.includes('[')) { + const index = text.indexOf('['); + const beforeContent = citeBuffer + text.slice(0, index); + citeBuffer = text.slice(index); + + // beforeContent 可能是:普通字符串,带 [ 的字符串 + return { + content: removeDatasetCiteText(beforeContent, false) + }; + } + // 处于 Cite 缓冲区,判断是否满足条件 + else if (citeBuffer) { + citeBuffer += text; + + // 检查缓冲区长度是否达到完整Quote长度或已经流结束 + if (citeBuffer.length >= maxCiteBufferLength) { + const content = removeDatasetCiteText(citeBuffer, false); + citeBuffer = ''; + + return { + content + }; + } else { + // 暂时不返回内容 + return { content: '' }; + } + } + + return { + content: text + }; + }; + const { content: pasedCiteContent } = parseCite(parsedThinkContent); - // Parse datset cite - if (retainDatasetCite) { return { reasoningContent: parsedThinkReasoningContent, content: parsedThinkContent, - responseContent: parsedThinkContent, - finishReason + responseContent: pasedCiteContent, + finishReason: buffer_finishReason }; - } + })(); - // 缓存包含 [ 的字符串,直到超出 maxCiteBufferLength 再一次性返回 - const parseCite = (text: string) => { - // 结束时,返回所有剩余内容 - if (isStreamEnd) { - const content = citeBuffer + text; - return { - content: removeDatasetCiteText(content, false) - }; - } + buffer_reasoningContent += data.reasoningContent; + buffer_content += data.content; - // 新内容包含 [,初始化缓冲数据 - if (text.includes('[')) { - const index = text.indexOf('['); - const beforeContent = citeBuffer + text.slice(0, index); - citeBuffer = text.slice(index); - - // beforeContent 可能是:普通字符串,带 [ 的字符串 - return { - content: removeDatasetCiteText(beforeContent, false) - }; - } - // 处于 Cite 缓冲区,判断是否满足条件 - else if (citeBuffer) { - citeBuffer += text; - - // 检查缓冲区长度是否达到完整Quote长度或已经流结束 - if (citeBuffer.length >= maxCiteBufferLength) { - const content = removeDatasetCiteText(citeBuffer, false); - citeBuffer = ''; - - return { - content - }; - } else { - // 暂时不返回内容 - return { content: '' }; - } - } - - return { - content: text - }; - }; - const { content: pasedCiteContent } = parseCite(parsedThinkContent); + return data; + }; + const getResponseData = () => { return { - reasoningContent: parsedThinkReasoningContent, - content: parsedThinkContent, - responseContent: pasedCiteContent, - finishReason + finish_reason: buffer_finishReason, + usage: buffer_usage, + reasoningContent: buffer_reasoningContent, + content: buffer_content }; }; + const updateFinishReason = (finishReason: CompletionFinishReason) => { + buffer_finishReason = finishReason; + }; + return { - parsePart + parsePart, + getResponseData, + updateFinishReason }; }; diff --git a/packages/service/core/workflow/dispatch/agent/runTool/functionCall.ts b/packages/service/core/workflow/dispatch/agent/runTool/functionCall.ts index d22bbec3e..df8579ea9 100644 --- a/packages/service/core/workflow/dispatch/agent/runTool/functionCall.ts +++ b/packages/service/core/workflow/dispatch/agent/runTool/functionCall.ts @@ -1,13 +1,14 @@ import { createChatCompletion } from '../../../../ai/config'; import { filterGPTMessageByMaxContext, loadRequestMessages } from '../../../../chat/utils'; -import { - type ChatCompletion, - type StreamChatType, - type ChatCompletionMessageParam, - type ChatCompletionCreateParams, - type ChatCompletionMessageFunctionCall, - type ChatCompletionFunctionMessageParam, - type ChatCompletionAssistantMessageParam +import type { + ChatCompletion, + StreamChatType, + ChatCompletionMessageParam, + ChatCompletionCreateParams, + ChatCompletionMessageFunctionCall, + ChatCompletionFunctionMessageParam, + ChatCompletionAssistantMessageParam, + CompletionFinishReason } from '@fastgpt/global/core/ai/type.d'; import { type NextApiResponse } from 'next'; import { responseWriteController } from '../../../../../common/response'; @@ -259,14 +260,15 @@ export const runToolWithFunctionCall = async ( } }); - let { answer, functionCalls, inputTokens, outputTokens } = await (async () => { + let { answer, functionCalls, inputTokens, outputTokens, finish_reason } = await (async () => { if (isStreamResponse) { if (!res || res.closed) { return { answer: '', functionCalls: [], inputTokens: 0, - outputTokens: 0 + outputTokens: 0, + finish_reason: 'close' as const }; } const result = await streamResponse({ @@ -281,10 +283,12 @@ export const runToolWithFunctionCall = async ( answer: result.answer, functionCalls: result.functionCalls, inputTokens: result.usage.prompt_tokens, - outputTokens: result.usage.completion_tokens + outputTokens: result.usage.completion_tokens, + finish_reason: result.finish_reason }; } else { const result = aiResponse as ChatCompletion; + const finish_reason = result.choices?.[0]?.finish_reason as CompletionFinishReason; const function_call = result.choices?.[0]?.message?.function_call; const usage = result.usage; @@ -315,7 +319,8 @@ export const runToolWithFunctionCall = async ( answer, functionCalls: toolCalls, inputTokens: usage?.prompt_tokens, - outputTokens: usage?.completion_tokens + outputTokens: usage?.completion_tokens, + finish_reason }; } })(); @@ -481,7 +486,8 @@ export const runToolWithFunctionCall = async ( completeMessages, assistantResponses: toolNodeAssistants, runTimes, - toolWorkflowInteractiveResponse + toolWorkflowInteractiveResponse, + finish_reason }; } @@ -495,7 +501,8 @@ export const runToolWithFunctionCall = async ( toolNodeInputTokens, toolNodeOutputTokens, assistantResponses: toolNodeAssistants, - runTimes + runTimes, + finish_reason } ); } else { @@ -523,7 +530,8 @@ export const runToolWithFunctionCall = async ( : outputTokens, completeMessages, assistantResponses: [...assistantResponses, ...toolNodeAssistant.value], - runTimes: (response?.runTimes || 0) + 1 + runTimes: (response?.runTimes || 0) + 1, + finish_reason }; } }; @@ -546,28 +554,25 @@ async function streamResponse({ readStream: stream }); - let textAnswer = ''; let functionCalls: ChatCompletionMessageFunctionCall[] = []; let functionId = getNanoid(); - let usage = getLLMDefaultUsage(); - const { parsePart } = parseLLMStreamResponse(); + const { parsePart, getResponseData, updateFinishReason } = parseLLMStreamResponse(); for await (const part of stream) { - usage = part.usage || usage; if (res.closed) { stream.controller?.abort(); + updateFinishReason('close'); break; } - const { content: toolChoiceContent, responseContent } = parsePart({ + const { responseContent } = parsePart({ part, parseThinkTag: false, retainDatasetCite }); const responseChoice = part.choices?.[0]?.delta; - textAnswer += toolChoiceContent; if (responseContent) { workflowStreamResponse?.({ @@ -577,7 +582,7 @@ async function streamResponse({ text: responseContent }) }); - } else if (responseChoice.function_call) { + } else if (responseChoice?.function_call) { const functionCall: { arguments?: string; name?: string; @@ -640,5 +645,7 @@ async function streamResponse({ } } - return { answer: textAnswer, functionCalls, usage }; + const { content, finish_reason, usage } = getResponseData(); + + return { answer: content, functionCalls, finish_reason, usage }; } diff --git a/packages/service/core/workflow/dispatch/agent/runTool/promptCall.ts b/packages/service/core/workflow/dispatch/agent/runTool/promptCall.ts index 441797aee..0aebe99ce 100644 --- a/packages/service/core/workflow/dispatch/agent/runTool/promptCall.ts +++ b/packages/service/core/workflow/dispatch/agent/runTool/promptCall.ts @@ -220,7 +220,8 @@ export const runToolWithPromptCall = async ( const max_tokens = computedMaxToken({ model: toolModel, - maxToken + maxToken, + min: 100 }); const filterMessages = await filterGPTMessageByMaxContext({ messages, @@ -592,28 +593,22 @@ async function streamResponse({ let startResponseWrite = false; let answer = ''; - let reasoning = ''; - let finish_reason: CompletionFinishReason = null; - let usage = getLLMDefaultUsage(); - const { parsePart } = parseLLMStreamResponse(); + const { parsePart, getResponseData, updateFinishReason } = parseLLMStreamResponse(); for await (const part of stream) { - usage = part.usage || usage; if (res.closed) { stream.controller?.abort(); - finish_reason = 'close'; + updateFinishReason('close'); break; } - const { reasoningContent, content, responseContent, finishReason } = parsePart({ + const { reasoningContent, content, responseContent } = parsePart({ part, parseThinkTag: aiChatReasoning, retainDatasetCite }); - finish_reason = finish_reason || finishReason; answer += content; - reasoning += reasoningContent; // Reasoning response if (aiChatReasoning && reasoningContent) { @@ -658,7 +653,9 @@ async function streamResponse({ } } - return { answer, reasoning, finish_reason, usage }; + const { reasoningContent, content, finish_reason, usage } = getResponseData(); + + return { answer: content, reasoning: reasoningContent, finish_reason, usage }; } const parseAnswer = ( diff --git a/packages/service/core/workflow/dispatch/agent/runTool/toolChoice.ts b/packages/service/core/workflow/dispatch/agent/runTool/toolChoice.ts index 5f55ecf91..71ab13e7d 100644 --- a/packages/service/core/workflow/dispatch/agent/runTool/toolChoice.ts +++ b/packages/service/core/workflow/dispatch/agent/runTool/toolChoice.ts @@ -7,17 +7,13 @@ import { type ChatCompletionToolMessageParam, type ChatCompletionMessageParam, type ChatCompletionTool, - type ChatCompletionAssistantMessageParam, type CompletionFinishReason } from '@fastgpt/global/core/ai/type'; import { type NextApiResponse } from 'next'; import { responseWriteController } from '../../../../../common/response'; import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants'; import { textAdaptGptResponse } from '@fastgpt/global/core/workflow/runtime/utils'; -import { - ChatCompletionRequestMessageRoleEnum, - getLLMDefaultUsage -} from '@fastgpt/global/core/ai/constants'; +import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants'; import { dispatchWorkFlow } from '../../index'; import { type DispatchToolModuleProps, @@ -254,7 +250,8 @@ export const runToolWithToolChoice = async ( const max_tokens = computedMaxToken({ model: toolModel, - maxToken + maxToken, + min: 100 }); // Filter histories by maxToken @@ -319,97 +316,101 @@ export const runToolWithToolChoice = async ( } }); - let { answer, toolCalls, finish_reason, inputTokens, outputTokens } = await (async () => { - if (isStreamResponse) { - if (!res || res.closed) { - return { - answer: '', - toolCalls: [], - finish_reason: 'close' as const, - inputTokens: 0, - outputTokens: 0 - }; - } + let { reasoningContent, answer, toolCalls, finish_reason, inputTokens, outputTokens } = + await (async () => { + if (isStreamResponse) { + if (!res || res.closed) { + return { + reasoningContent: '', + answer: '', + toolCalls: [], + finish_reason: 'close' as const, + inputTokens: 0, + outputTokens: 0 + }; + } - const result = await streamResponse({ - res, - workflowStreamResponse, - toolNodes, - stream: aiResponse, - aiChatReasoning, - retainDatasetCite - }); - - return { - answer: result.answer, - toolCalls: result.toolCalls, - finish_reason: result.finish_reason, - inputTokens: result.usage.prompt_tokens, - outputTokens: result.usage.completion_tokens - }; - } else { - const result = aiResponse as ChatCompletion; - const finish_reason = result.choices?.[0]?.finish_reason as CompletionFinishReason; - const calls = result.choices?.[0]?.message?.tool_calls || []; - const answer = result.choices?.[0]?.message?.content || ''; - // @ts-ignore - const reasoningContent = result.choices?.[0]?.message?.reasoning_content || ''; - const usage = result.usage; - - if (aiChatReasoning && reasoningContent) { - workflowStreamResponse?.({ - event: SseResponseEventEnum.fastAnswer, - data: textAdaptGptResponse({ - reasoning_content: removeDatasetCiteText(reasoningContent, retainDatasetCite) - }) + const result = await streamResponse({ + res, + workflowStreamResponse, + toolNodes, + stream: aiResponse, + aiChatReasoning, + retainDatasetCite }); - } - // 格式化 toolCalls - const toolCalls = calls.map((tool) => { - const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name); + return { + reasoningContent: result.reasoningContent, + answer: result.answer, + toolCalls: result.toolCalls, + finish_reason: result.finish_reason, + inputTokens: result.usage.prompt_tokens, + outputTokens: result.usage.completion_tokens + }; + } else { + const result = aiResponse as ChatCompletion; + const finish_reason = result.choices?.[0]?.finish_reason as CompletionFinishReason; + const calls = result.choices?.[0]?.message?.tool_calls || []; + const answer = result.choices?.[0]?.message?.content || ''; + // @ts-ignore + const reasoningContent = result.choices?.[0]?.message?.reasoning_content || ''; + const usage = result.usage; - // 不支持 stream 模式的模型的这里需要补一个响应给客户端 - workflowStreamResponse?.({ - event: SseResponseEventEnum.toolCall, - data: { - tool: { - id: tool.id, - toolName: toolNode?.name || '', - toolAvatar: toolNode?.avatar || '', - functionName: tool.function.name, - params: tool.function?.arguments ?? '', - response: '' + if (aiChatReasoning && reasoningContent) { + workflowStreamResponse?.({ + event: SseResponseEventEnum.fastAnswer, + data: textAdaptGptResponse({ + reasoning_content: removeDatasetCiteText(reasoningContent, retainDatasetCite) + }) + }); + } + + // 格式化 toolCalls + const toolCalls = calls.map((tool) => { + const toolNode = toolNodes.find((item) => item.nodeId === tool.function?.name); + + // 不支持 stream 模式的模型的这里需要补一个响应给客户端 + workflowStreamResponse?.({ + event: SseResponseEventEnum.toolCall, + data: { + tool: { + id: tool.id, + toolName: toolNode?.name || '', + toolAvatar: toolNode?.avatar || '', + functionName: tool.function.name, + params: tool.function?.arguments ?? '', + response: '' + } } - } + }); + + return { + ...tool, + toolName: toolNode?.name || '', + toolAvatar: toolNode?.avatar || '' + }; }); + if (answer) { + workflowStreamResponse?.({ + event: SseResponseEventEnum.fastAnswer, + data: textAdaptGptResponse({ + text: removeDatasetCiteText(answer, retainDatasetCite) + }) + }); + } + return { - ...tool, - toolName: toolNode?.name || '', - toolAvatar: toolNode?.avatar || '' + reasoningContent: (reasoningContent as string) || '', + answer, + toolCalls: toolCalls, + finish_reason, + inputTokens: usage?.prompt_tokens, + outputTokens: usage?.completion_tokens }; - }); - - if (answer) { - workflowStreamResponse?.({ - event: SseResponseEventEnum.fastAnswer, - data: textAdaptGptResponse({ - text: removeDatasetCiteText(answer, retainDatasetCite) - }) - }); } - - return { - answer, - toolCalls: toolCalls, - finish_reason, - inputTokens: usage?.prompt_tokens, - outputTokens: usage?.completion_tokens - }; - } - })(); - if (!answer && toolCalls.length === 0) { + })(); + if (!answer && !reasoningContent && toolCalls.length === 0) { return Promise.reject(getEmptyResponseTip()); } @@ -501,12 +502,13 @@ export const runToolWithToolChoice = async ( if (toolCalls.length > 0) { // Run the tool, combine its results, and perform another round of AI calls - const assistantToolMsgParams: ChatCompletionAssistantMessageParam[] = [ - ...(answer + const assistantToolMsgParams: ChatCompletionMessageParam[] = [ + ...(answer || reasoningContent ? [ { role: ChatCompletionRequestMessageRoleEnum.Assistant as 'assistant', - content: answer + content: answer, + reasoning_text: reasoningContent } ] : []), @@ -627,9 +629,10 @@ export const runToolWithToolChoice = async ( ); } else { // No tool is invoked, indicating that the process is over - const gptAssistantResponse: ChatCompletionAssistantMessageParam = { + const gptAssistantResponse: ChatCompletionMessageParam = { role: ChatCompletionRequestMessageRoleEnum.Assistant, - content: answer + content: answer, + reasoning_text: reasoningContent }; const completeMessages = filterMessages.concat(gptAssistantResponse); inputTokens = inputTokens || (await countGptMessagesTokens(requestMessages, tools)); @@ -671,34 +674,23 @@ async function streamResponse({ readStream: stream }); - let textAnswer = ''; let callingTool: { name: string; arguments: string } | null = null; let toolCalls: ChatCompletionMessageToolCall[] = []; - let finish_reason: CompletionFinishReason = null; - let usage = getLLMDefaultUsage(); - const { parsePart } = parseLLMStreamResponse(); + const { parsePart, getResponseData, updateFinishReason } = parseLLMStreamResponse(); for await (const part of stream) { - usage = part.usage || usage; if (res.closed) { stream.controller?.abort(); - finish_reason = 'close'; + updateFinishReason('close'); break; } - const { - reasoningContent, - content: toolChoiceContent, - responseContent, - finishReason - } = parsePart({ + const { reasoningContent, responseContent } = parsePart({ part, parseThinkTag: true, retainDatasetCite }); - textAnswer += toolChoiceContent; - finish_reason = finishReason || finish_reason; const responseChoice = part.choices?.[0]?.delta; @@ -800,5 +792,13 @@ async function streamResponse({ } } - return { answer: textAnswer, toolCalls: toolCalls.filter(Boolean), finish_reason, usage }; + const { reasoningContent, content, finish_reason, usage } = getResponseData(); + + return { + reasoningContent, + answer: content, + toolCalls: toolCalls.filter(Boolean), + finish_reason, + usage + }; } diff --git a/packages/service/core/workflow/dispatch/chat/oneapi.ts b/packages/service/core/workflow/dispatch/chat/oneapi.ts index e9f1734c1..84bd9ab77 100644 --- a/packages/service/core/workflow/dispatch/chat/oneapi.ts +++ b/packages/service/core/workflow/dispatch/chat/oneapi.ts @@ -556,30 +556,21 @@ async function streamResponse({ res, readStream: stream }); - let answer = ''; - let reasoning = ''; - let finish_reason: CompletionFinishReason = null; - let usage: CompletionUsage = getLLMDefaultUsage(); - const { parsePart } = parseLLMStreamResponse(); + const { parsePart, getResponseData, updateFinishReason } = parseLLMStreamResponse(); for await (const part of stream) { - usage = part.usage || usage; - if (res.closed) { stream.controller?.abort(); - finish_reason = 'close'; + updateFinishReason('close'); break; } - const { reasoningContent, content, responseContent, finishReason } = parsePart({ + const { reasoningContent, responseContent } = parsePart({ part, parseThinkTag, retainDatasetCite }); - finish_reason = finish_reason || finishReason; - answer += content; - reasoning += reasoningContent; if (aiChatReasoning && reasoningContent) { workflowStreamResponse?.({ @@ -602,5 +593,7 @@ async function streamResponse({ } } + const { reasoningContent: reasoning, content: answer, finish_reason, usage } = getResponseData(); + return { answer, reasoning, finish_reason, usage }; }