refactor: centralize resume stream cancellation

Ensure only one resume stream is active per client tab and discard queued resume messages after abort.

Made-with: Cursor
This commit is contained in:
xqvvu
2026-04-29 18:49:53 +08:00
parent 2d86f7ef84
commit 539f37f1b2
3 changed files with 28 additions and 10 deletions
+1 -1
Submodule pro updated: 1a56dfc0d3...dccbea5187
+2 -3
View File
@@ -149,9 +149,8 @@ const ChatContent = (props: ChatPageProps) => {
// show main chat interface
return (
<ChatContextProvider key={appId} params={chatHistoryProviderParams}>
<ChatContextProvider params={chatHistoryProviderParams}>
<ChatItemContextProvider
key={appId}
showRouteToDatasetDetail={isStandalone !== '1'}
showRunningStatus={props.showRunningStatus}
showSkillReferences={props.showSkillReferences}
@@ -160,7 +159,7 @@ const ChatContent = (props: ChatPageProps) => {
isShowFullText={props.showFullText}
showWholeResponse={props.showWholeResponse}
>
<ChatRecordContextProvider key={`${appId}:${chatId}`} params={chatRecordProviderParams}>
<ChatRecordContextProvider params={chatRecordProviderParams}>
<Chat />
</ChatRecordContextProvider>
</ChatItemContextProvider>
+25 -6
View File
@@ -401,6 +401,11 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
}
return resolve({ responseText, completedChat, resumeUnavailable });
};
const onAbort = () => {
finished = true;
responseQueue = [];
return onfinish();
};
const onfailed = (err?: any) => {
finished = true;
const message = getErrText(err, error ?? '响应过程出现异常~');
@@ -424,8 +429,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
function animateResponseLoop() {
if (signal.aborted) {
responseQueue.forEach(applyMessageItem);
return onfinish();
return onAbort();
}
if (responseQueue.length > 0) {
@@ -448,6 +452,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
animateResponseLoop();
const enqueue = (data: ResponseQueueItemType) => {
if (signal.aborted) return;
if (resumePhase === StreamResumePhaseEnum.catchup) {
applyMessageItem(data);
return;
@@ -485,6 +491,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
}
},
onmessage: ({ event, data }) => {
if (signal.aborted) return;
if (event === StreamResumePhaseEvent) {
if (data === StreamResumePhaseEnum.catchup || data === StreamResumePhaseEnum.live) {
resumePhase = data;
@@ -546,8 +554,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
clearTimeout(timer);
if (controller.signal.aborted) {
finished = true;
return;
return onAbort();
}
onfailed(err);
@@ -601,7 +608,10 @@ type StreamResumeFetchParams = {
onResumeUnavailable?: (data: ResumeUnavailableType) => void;
controller: AbortController;
};
export function streamResumeFetch(params: StreamResumeFetchParams) {
let activeResumeController: AbortController | undefined;
export async function streamResumeFetch(params: StreamResumeFetchParams) {
const { appId, chatId, outLinkAuthData, onmessage, onResumeUnavailable, controller } = params;
const query = new URLSearchParams({ appId, chatId });
@@ -612,7 +622,16 @@ export function streamResumeFetch(params: StreamResumeFetchParams) {
const url = `/api/core/chat/resume?${query}`;
return $resumefetch({ url, onmessage, onResumeUnavailable, controller });
if (activeResumeController && activeResumeController !== controller) {
activeResumeController.abort('replace');
}
activeResumeController = controller;
return $resumefetch({ url, onmessage, onResumeUnavailable, controller }).finally(() => {
if (activeResumeController === controller) {
activeResumeController = undefined;
}
});
}
export const onOptimizePrompt = async ({