From 539f37f1b286e3d00d7ac99bc7c08495c1c17f97 Mon Sep 17 00:00:00 2001 From: xqvvu Date: Wed, 29 Apr 2026 18:49:53 +0800 Subject: [PATCH] refactor: centralize resume stream cancellation Ensure only one resume stream is active per client tab and discard queued resume messages after abort. Made-with: Cursor --- pro | 2 +- projects/app/src/pages/chat/index.tsx | 5 ++-- projects/app/src/web/common/api/fetch.ts | 31 +++++++++++++++++++----- 3 files changed, 28 insertions(+), 10 deletions(-) diff --git a/pro b/pro index 1a56dfc0d3..dccbea5187 160000 --- a/pro +++ b/pro @@ -1 +1 @@ -Subproject commit 1a56dfc0d3c2fc1dc3e4a2f23ad1847926c804a8 +Subproject commit dccbea518766ed59495e6f03880541a50a095cce diff --git a/projects/app/src/pages/chat/index.tsx b/projects/app/src/pages/chat/index.tsx index 98dce618ba..a15aebec8d 100644 --- a/projects/app/src/pages/chat/index.tsx +++ b/projects/app/src/pages/chat/index.tsx @@ -149,9 +149,8 @@ const ChatContent = (props: ChatPageProps) => { // show main chat interface return ( - + { isShowFullText={props.showFullText} showWholeResponse={props.showWholeResponse} > - + diff --git a/projects/app/src/web/common/api/fetch.ts b/projects/app/src/web/common/api/fetch.ts index 6e274e9016..a32afb9f63 100644 --- a/projects/app/src/web/common/api/fetch.ts +++ b/projects/app/src/web/common/api/fetch.ts @@ -401,6 +401,11 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum } return resolve({ responseText, completedChat, resumeUnavailable }); }; + const onAbort = () => { + finished = true; + responseQueue = []; + return onfinish(); + }; const onfailed = (err?: any) => { finished = true; const message = getErrText(err, error ?? '响应过程出现异常~'); @@ -424,8 +429,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum function animateResponseLoop() { if (signal.aborted) { - responseQueue.forEach(applyMessageItem); - return onfinish(); + return onAbort(); } if (responseQueue.length > 0) { @@ -448,6 +452,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum animateResponseLoop(); const enqueue = (data: ResponseQueueItemType) => { + if (signal.aborted) return; + if (resumePhase === StreamResumePhaseEnum.catchup) { applyMessageItem(data); return; @@ -485,6 +491,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum } }, onmessage: ({ event, data }) => { + if (signal.aborted) return; + if (event === StreamResumePhaseEvent) { if (data === StreamResumePhaseEnum.catchup || data === StreamResumePhaseEnum.live) { resumePhase = data; @@ -546,8 +554,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum clearTimeout(timer); if (controller.signal.aborted) { - finished = true; - return; + return onAbort(); } onfailed(err); @@ -601,7 +608,10 @@ type StreamResumeFetchParams = { onResumeUnavailable?: (data: ResumeUnavailableType) => void; controller: AbortController; }; -export function streamResumeFetch(params: StreamResumeFetchParams) { + +let activeResumeController: AbortController | undefined; + +export async function streamResumeFetch(params: StreamResumeFetchParams) { const { appId, chatId, outLinkAuthData, onmessage, onResumeUnavailable, controller } = params; const query = new URLSearchParams({ appId, chatId }); @@ -612,7 +622,16 @@ export function streamResumeFetch(params: StreamResumeFetchParams) { const url = `/api/core/chat/resume?${query}`; - return $resumefetch({ url, onmessage, onResumeUnavailable, controller }); + if (activeResumeController && activeResumeController !== controller) { + activeResumeController.abort('replace'); + } + activeResumeController = controller; + + return $resumefetch({ url, onmessage, onResumeUnavailable, controller }).finally(() => { + if (activeResumeController === controller) { + activeResumeController = undefined; + } + }); } export const onOptimizePrompt = async ({