import { SseResponseEventEnum } from '@fastgpt/global/core/workflow/runtime/constants'; import { getErrText } from '@fastgpt/global/common/error/utils'; import type { StartChatFnProps } from '@/components/core/chat/ChatContainer/type'; import { // refer to https://github.com/ChatGPTNextWeb/ChatGPT-Next-Web EventStreamContentType, fetchEventSource } from '@fortaine/fetch-event-source'; import { TeamErrEnum } from '@fastgpt/global/common/error/code/team'; import { useSystemStore } from '../system/useSystemStore'; import { formatTime2YMDHMW } from '@fastgpt/global/common/string/time'; import { getWebReqUrl } from '@fastgpt/web/common/system/utils'; import type { OnOptimizePromptProps } from '@/components/common/PromptEditor/OptimizerPopover'; import type { OnOptimizeCodeProps } from '@/pageComponents/app/detail/WorkflowComponents/Flow/nodes/NodeCode/Copilot'; import type { AIChatItemValueItemType } from '@fastgpt/global/core/chat/type'; type StreamFetchProps = { url?: string; data: Record; onMessage: StartChatFnProps['generatingMessage']; abortCtrl: AbortController; }; export type StreamResponseType = { responseText: string; }; type CommonResponseType = { responseValueId?: string; stepId?: string; }; type ResponseQueueItemType = CommonResponseType & ( | { event: SseResponseEventEnum.fastAnswer | SseResponseEventEnum.answer; text?: string; reasoningText?: string; } | { event: SseResponseEventEnum.interactive; [key: string]: any; } | { event: SseResponseEventEnum.agentPlan; agentPlan: AIChatItemValueItemType['agentPlan']; } | { event: | SseResponseEventEnum.toolCall | SseResponseEventEnum.toolParams | SseResponseEventEnum.toolResponse; tools: any; } | { event: SseResponseEventEnum.formData; } ); class FatalError extends Error {} export const streamFetch = ({ url = '/api/v2/chat/completions', data, onMessage, abortCtrl }: StreamFetchProps) => new Promise(async (resolve, reject) => { // First res const timeoutId = setTimeout(() => { abortCtrl.abort('Time out'); }, 60000); // response data let responseText = ''; let responseQueue: ResponseQueueItemType[] = []; let errMsg: string | undefined; let finished = false; const finish = () => { if (errMsg !== undefined) { return failedFinish(); } return resolve({ responseText }); }; const failedFinish = (err?: any) => { finished = true; reject({ message: getErrText(err, errMsg ?? '响应过程出现异常~'), responseText }); }; const isAnswerEvent = (event: SseResponseEventEnum) => event === SseResponseEventEnum.answer || event === SseResponseEventEnum.fastAnswer; // animate response to make it looks smooth function animateResponseText() { // abort message if (abortCtrl.signal.aborted) { responseQueue.forEach((item) => { onMessage(item); if (isAnswerEvent(item.event) && 'text' in item && item.text) { responseText += item.text; } }); return finish(); } if (responseQueue.length > 0) { const fetchCount = Math.max(1, Math.round(responseQueue.length / 30)); for (let i = 0; i < fetchCount; i++) { const item = responseQueue[i]; onMessage(item); if (isAnswerEvent(item.event) && 'text' in item && item.text) { responseText += item.text; } } responseQueue = responseQueue.slice(fetchCount); } if (finished && responseQueue.length === 0) { return finish(); } requestAnimationFrame(animateResponseText); } // start animation animateResponseText(); const pushDataToQueue = (data: ResponseQueueItemType) => { // If the document is hidden, the data is directly sent to the front end responseQueue.push(data); if (document.hidden) { animateResponseText(); } }; try { // auto complete variables const variables = data?.variables || {}; variables.cTime = formatTime2YMDHMW(new Date()); const requestData = { method: 'POST', headers: { 'Content-Type': 'application/json' }, signal: abortCtrl.signal, body: JSON.stringify({ ...data, variables, detail: true, stream: true, retainDatasetCite: data.retainDatasetCite ?? true }) }; // send request await fetchEventSource(getWebReqUrl(url), { ...requestData, async onopen(res) { clearTimeout(timeoutId); const contentType = res.headers.get('content-type'); // not stream if (contentType?.startsWith('text/plain')) { return failedFinish(await res.clone().text()); } // failed stream if ( !res.ok || !res.headers.get('content-type')?.startsWith(EventStreamContentType) || res.status !== 200 ) { try { failedFinish(await res.clone().json()); } catch { const errText = await res.clone().text(); if (!errText.startsWith('event: error')) { failedFinish(); } } } }, onmessage: ({ event, data }) => { if (data === '[DONE]') { return; } // parse text to json const parseJson = (() => { try { return JSON.parse(data); } catch (error) { return; } })(); if (typeof parseJson !== 'object') return; const { responseValueId, stepId, ...rest } = parseJson; // console.log(parseJson, event); if (event === SseResponseEventEnum.answer) { const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || ''; pushDataToQueue({ responseValueId, stepId, event, reasoningText }); const text = rest.choices?.[0]?.delta?.content || ''; for (const item of text) { pushDataToQueue({ responseValueId, stepId, event, text: item }); } } else if (event === SseResponseEventEnum.fastAnswer) { const reasoningText = rest.choices?.[0]?.delta?.reasoning_content || ''; pushDataToQueue({ responseValueId, stepId, event, reasoningText }); const text = rest.choices?.[0]?.delta?.content || ''; pushDataToQueue({ responseValueId, stepId, event, text }); } else if ( event === SseResponseEventEnum.toolCall || event === SseResponseEventEnum.toolParams || event === SseResponseEventEnum.toolResponse ) { pushDataToQueue({ responseValueId, stepId, event, ...rest }); } else if (event === SseResponseEventEnum.flowNodeResponse) { onMessage({ event, nodeResponse: rest }); } else if (event === SseResponseEventEnum.updateVariables) { onMessage({ event, variables: rest }); } else if (event === SseResponseEventEnum.interactive) { pushDataToQueue({ responseValueId, stepId, event, ...rest }); } else if (event === SseResponseEventEnum.agentPlan) { pushDataToQueue({ responseValueId, stepId, event, agentPlan: rest.agentPlan }); } else if (event === SseResponseEventEnum.formData) { // Directly call onMessage for formData, no need to queue onMessage({ event, data: rest }); } else if (event === SseResponseEventEnum.error) { if (rest.statusText === TeamErrEnum.aiPointsNotEnough) { useSystemStore.getState().setNotSufficientModalType(TeamErrEnum.aiPointsNotEnough); } errMsg = getErrText(rest, '流响应错误'); } else if ( [SseResponseEventEnum.workflowDuration, SseResponseEventEnum.flowNodeStatus].includes( event as any ) ) { onMessage({ event, ...rest }); } }, onclose() { finished = true; }, onerror(err) { console.log(err, 'fetch error'); clearTimeout(timeoutId); failedFinish(getErrText(err)); throw new Error(err); }, openWhenHidden: true }); } catch (err: any) { clearTimeout(timeoutId); if (abortCtrl.signal.aborted) { finished = true; return; } console.log(err, 'fetch error'); failedFinish(err); } }); export const onOptimizePrompt = async ({ originalPrompt, model, input, onResult, abortController }: OnOptimizePromptProps) => { const controller = abortController || new AbortController(); await streamFetch({ url: '/api/core/ai/optimizePrompt', data: { originalPrompt, optimizerInput: input, model }, onMessage: ({ event, text }) => { if (event === SseResponseEventEnum.answer && text) { onResult(text); } }, abortCtrl: controller }); }; export const onOptimizeCode = async ({ optimizerInput, model, conversationHistory = [], onResult, abortController }: OnOptimizeCodeProps) => { const controller = abortController || new AbortController(); await streamFetch({ url: '/api/core/workflow/optimizeCode', data: { optimizerInput, model, conversationHistory }, onMessage: ({ event, text }) => { if (event === SseResponseEventEnum.answer && text) { onResult(text); } }, abortCtrl: controller }); };