mirror of
https://github.com/labring/FastGPT.git
synced 2026-05-08 01:08:43 +08:00
fix: scope chat resume to current app (#6860)
* fix: scope chat resume to current app Abort stale resume streams when switching apps and only resume after the current app/chat init state is aligned. Made-with: Cursor * refactor: centralize resume stream cancellation Ensure only one resume stream is active per client tab and discard queued resume messages after abort. Made-with: Cursor * fix: make sure controller unique * chore: remove redundant effect
This commit is contained in:
@@ -154,7 +154,7 @@ const ChatBox = ({
|
||||
const questionGuideController = useRef(new AbortController());
|
||||
const pluginController = useRef(new AbortController());
|
||||
const resumeController = useRef<AbortController>();
|
||||
const resumedChatIdRef = useRef<string>();
|
||||
const resumedChatTargetRef = useRef<string>();
|
||||
|
||||
const [isLoading, setIsLoading] = useState(false);
|
||||
const [feedbackId, setFeedbackId] = useState<string>();
|
||||
@@ -180,6 +180,8 @@ const ChatBox = ({
|
||||
|
||||
const appId = useContextSelector(WorkflowRuntimeContext, (v) => v.appId);
|
||||
const chatId = useContextSelector(WorkflowRuntimeContext, (v) => v.chatId);
|
||||
const activeAppIdRef = useRef<string | undefined>(appId);
|
||||
activeAppIdRef.current = appId;
|
||||
const activeChatIdRef = useRef<string | undefined>(chatId);
|
||||
activeChatIdRef.current = chatId;
|
||||
const outLinkAuthData = useContextSelector(WorkflowRuntimeContext, (v) => v.outLinkAuthData);
|
||||
@@ -198,18 +200,21 @@ const ChatBox = ({
|
||||
const syncSidebarChatGenerateStatus = useMemoizedFn(
|
||||
(
|
||||
status: ChatGenerateStatusEnum,
|
||||
options?: { hasBeenRead?: boolean; targetChatId?: string }
|
||||
options?: { hasBeenRead?: boolean; targetAppId?: string; targetChatId?: string }
|
||||
) => {
|
||||
const targetAppId = options?.targetAppId ?? appId;
|
||||
if (targetAppId !== appId) return;
|
||||
|
||||
const targetChatId = options?.targetChatId ?? chatId;
|
||||
if (!targetChatId) return;
|
||||
setHistories((prev) => {
|
||||
const idx = prev.findIndex((h) => h.chatId === targetChatId);
|
||||
const idx = prev.findIndex((h) => h.chatId === targetChatId && h.appId === targetAppId);
|
||||
if (idx === -1) {
|
||||
queueMicrotask(loadHistories);
|
||||
return [
|
||||
{
|
||||
chatId: targetChatId,
|
||||
appId,
|
||||
appId: targetAppId,
|
||||
title: chatBoxData.title || t('common:core.chat.New Chat'),
|
||||
customTitle: '',
|
||||
top: false,
|
||||
@@ -221,7 +226,7 @@ const ChatBox = ({
|
||||
];
|
||||
}
|
||||
return prev.map((h) =>
|
||||
h.chatId === targetChatId
|
||||
h.chatId === targetChatId && h.appId === targetAppId
|
||||
? {
|
||||
...h,
|
||||
chatGenerateStatus: status,
|
||||
@@ -662,16 +667,26 @@ const ChatBox = ({
|
||||
});
|
||||
});
|
||||
|
||||
const isActiveResumeTarget = useMemoizedFn(
|
||||
({ appId, chatId }: { appId: string; chatId: string }) =>
|
||||
activeAppIdRef.current === appId && activeChatIdRef.current === chatId
|
||||
);
|
||||
|
||||
const getResumeUnavailablePlaceholderText = useMemoizedFn(() =>
|
||||
t('chat:resume_placeholder_generating')
|
||||
);
|
||||
|
||||
const upsertResumeAiPlaceholder = useMemoizedFn(
|
||||
(responseChatId: string, text = '', status: `${ChatStatusEnum}` = ChatStatusEnum.loading) => {
|
||||
(
|
||||
responseChatId: string,
|
||||
text = '',
|
||||
status: `${ChatStatusEnum}` = ChatStatusEnum.loading,
|
||||
options?: { resetExistingValue?: boolean }
|
||||
) => {
|
||||
setChatRecords((state) => {
|
||||
const lastItem = state[state.length - 1];
|
||||
if (lastItem?.dataId === responseChatId && lastItem.obj === ChatRoleEnum.AI) {
|
||||
if (!text) {
|
||||
if (!text && !options?.resetExistingValue) {
|
||||
return state;
|
||||
}
|
||||
|
||||
@@ -687,6 +702,7 @@ const ChatBox = ({
|
||||
}
|
||||
}
|
||||
],
|
||||
responseData: options?.resetExistingValue ? [] : item.responseData,
|
||||
status,
|
||||
...(status === ChatStatusEnum.finish ? { time: new Date() } : {})
|
||||
}
|
||||
@@ -833,7 +849,7 @@ const ChatBox = ({
|
||||
}
|
||||
];
|
||||
|
||||
resumedChatIdRef.current = chatId;
|
||||
resumedChatTargetRef.current = `${appId}:${chatId}`;
|
||||
|
||||
setChatBoxData((state) =>
|
||||
state.chatId === chatId
|
||||
@@ -1255,8 +1271,12 @@ const ChatBox = ({
|
||||
useEffect(() => {
|
||||
setQuestionGuide([]);
|
||||
setValue('chatStarted', false);
|
||||
resumedChatIdRef.current = undefined;
|
||||
resumedChatTargetRef.current = undefined;
|
||||
abortRequest('leave');
|
||||
|
||||
return () => {
|
||||
abortRequest('leave');
|
||||
};
|
||||
}, [chatId, appId, abortRequest, setValue]);
|
||||
|
||||
useEffect(() => {
|
||||
@@ -1267,20 +1287,24 @@ const ChatBox = ({
|
||||
!appId ||
|
||||
!chatId ||
|
||||
isChatting ||
|
||||
chatBoxData.appId !== appId ||
|
||||
chatBoxData.chatId !== chatId ||
|
||||
chatBoxData.chatGenerateStatus !== ChatGenerateStatusEnum.generating ||
|
||||
resumedChatIdRef.current === chatId
|
||||
resumedChatTargetRef.current === `${appId}:${chatId}`
|
||||
) {
|
||||
return;
|
||||
}
|
||||
|
||||
resumedChatIdRef.current = chatId;
|
||||
resumedChatTargetRef.current = `${appId}:${chatId}`;
|
||||
|
||||
const resumeForAppId = appId;
|
||||
const resumeForChatId = chatId;
|
||||
const responseChatId = resumeTargetAiDataId ?? getNanoid(24);
|
||||
const controller = new AbortController();
|
||||
resumeController.current = controller;
|
||||
scrollToBottom('auto');
|
||||
let resumeFinalStatus = ChatGenerateStatusEnum.done;
|
||||
let hasPreparedResumeAiRecord = false;
|
||||
|
||||
(async () => {
|
||||
try {
|
||||
@@ -1290,7 +1314,7 @@ const ChatBox = ({
|
||||
outLinkAuthData,
|
||||
controller,
|
||||
onResumeUnavailable: () => {
|
||||
if (resumeForChatId !== activeChatIdRef.current) return;
|
||||
if (!isActiveResumeTarget({ appId: resumeForAppId, chatId: resumeForChatId })) return;
|
||||
resumeFinalStatus = ChatGenerateStatusEnum.generating;
|
||||
upsertResumeAiPlaceholder(
|
||||
responseChatId,
|
||||
@@ -1299,15 +1323,18 @@ const ChatBox = ({
|
||||
);
|
||||
},
|
||||
onmessage: (message) => {
|
||||
if (resumeForChatId !== activeChatIdRef.current) return;
|
||||
if (!isActiveResumeTarget({ appId: resumeForAppId, chatId: resumeForChatId })) return;
|
||||
if (shouldCreateResumeAiPlaceholder(message.event)) {
|
||||
upsertResumeAiPlaceholder(responseChatId);
|
||||
upsertResumeAiPlaceholder(responseChatId, '', ChatStatusEnum.loading, {
|
||||
resetExistingValue: !hasPreparedResumeAiRecord
|
||||
});
|
||||
hasPreparedResumeAiRecord = true;
|
||||
}
|
||||
generatingMessage(message);
|
||||
}
|
||||
});
|
||||
|
||||
if (resumeForChatId !== activeChatIdRef.current) return;
|
||||
if (!isActiveResumeTarget({ appId: resumeForAppId, chatId: resumeForChatId })) return;
|
||||
|
||||
if (completedChat) {
|
||||
resumeFinalStatus = completedChat.chatGenerateStatus;
|
||||
@@ -1362,7 +1389,7 @@ const ChatBox = ({
|
||||
});
|
||||
} catch (error) {
|
||||
if (controller.signal.aborted) return;
|
||||
if (resumeForChatId !== activeChatIdRef.current) return;
|
||||
if (!isActiveResumeTarget({ appId: resumeForAppId, chatId: resumeForChatId })) return;
|
||||
|
||||
const isStreamError = (error as ResumeStreamErrorType | undefined)?.isStreamError === true;
|
||||
resumeFinalStatus = isStreamError
|
||||
@@ -1407,8 +1434,13 @@ const ChatBox = ({
|
||||
});
|
||||
}
|
||||
} finally {
|
||||
resumeController.current = undefined;
|
||||
const finishedInActiveChat = activeChatIdRef.current === resumeForChatId;
|
||||
if (resumeController.current === controller) {
|
||||
resumeController.current = undefined;
|
||||
}
|
||||
const finishedInActiveChat = isActiveResumeTarget({
|
||||
appId: resumeForAppId,
|
||||
chatId: resumeForChatId
|
||||
});
|
||||
const leftWhileResuming =
|
||||
controller.signal.aborted && isAbortByLeave(controller.signal.reason);
|
||||
|
||||
@@ -1417,7 +1449,7 @@ const ChatBox = ({
|
||||
}
|
||||
|
||||
setChatBoxData((state) =>
|
||||
state.chatId === resumeForChatId
|
||||
state.appId === resumeForAppId && state.chatId === resumeForChatId
|
||||
? {
|
||||
...state,
|
||||
chatGenerateStatus: resumeFinalStatus,
|
||||
@@ -1428,19 +1460,21 @@ const ChatBox = ({
|
||||
|
||||
if (finishedInActiveChat) {
|
||||
void postMarkChatRead({
|
||||
appId,
|
||||
appId: resumeForAppId,
|
||||
chatId: resumeForChatId,
|
||||
...outLinkAuthData
|
||||
})
|
||||
.catch(() => {})
|
||||
.finally(() => {
|
||||
syncSidebarChatGenerateStatus(resumeFinalStatus, {
|
||||
targetAppId: resumeForAppId,
|
||||
hasBeenRead: true,
|
||||
targetChatId: resumeForChatId
|
||||
});
|
||||
});
|
||||
} else {
|
||||
syncSidebarChatGenerateStatus(resumeFinalStatus, {
|
||||
targetAppId: resumeForAppId,
|
||||
hasBeenRead: false,
|
||||
targetChatId: resumeForChatId
|
||||
});
|
||||
@@ -1454,9 +1488,12 @@ const ChatBox = ({
|
||||
appId,
|
||||
chatId,
|
||||
isChatting,
|
||||
chatBoxData.appId,
|
||||
chatBoxData.chatId,
|
||||
chatBoxData.chatGenerateStatus,
|
||||
generatingMessage,
|
||||
hasMeaningfulAiOutput,
|
||||
isActiveResumeTarget,
|
||||
getResumeUnavailablePlaceholderText,
|
||||
outLinkAuthData,
|
||||
resumeTargetAiDataId,
|
||||
|
||||
@@ -52,6 +52,8 @@ const AppChatWindow = () => {
|
||||
const chatRecords = useContextSelector(ChatRecordContext, (v) => v.chatRecords);
|
||||
const totalRecordsCount = useContextSelector(ChatRecordContext, (v) => v.totalRecordsCount);
|
||||
|
||||
const isCurrentChatReady = chatBoxData.appId === appId && chatBoxData.chatId === chatId;
|
||||
|
||||
const pane = useContextSelector(ChatPageContext, (v) => v.pane);
|
||||
const chatSettings = useContextSelector(ChatPageContext, (v) => v.chatSettings);
|
||||
const handlePaneChange = useContextSelector(ChatPageContext, (v) => v.handlePaneChange);
|
||||
@@ -192,7 +194,7 @@ const AppChatWindow = () => {
|
||||
<ChatBox
|
||||
appId={appId}
|
||||
chatId={chatId}
|
||||
isReady={!loading && !!appId}
|
||||
isReady={!loading && !!appId && isCurrentChatReady}
|
||||
enableAutoResume
|
||||
feedbackType={'user'}
|
||||
chatType={ChatTypeEnum.chat}
|
||||
|
||||
@@ -89,6 +89,8 @@ const HomeChatWindow = () => {
|
||||
const chatRecords = useContextSelector(ChatRecordContext, (v) => v.chatRecords);
|
||||
const totalRecordsCount = useContextSelector(ChatRecordContext, (v) => v.totalRecordsCount);
|
||||
|
||||
const isCurrentChatReady = chatBoxData.appId === appId && chatBoxData.chatId === chatId;
|
||||
|
||||
const isQuickApp = useMemo(
|
||||
() => chatSettings?.quickAppList.some((app) => app._id === appId),
|
||||
[chatSettings?.quickAppList, appId]
|
||||
@@ -463,7 +465,7 @@ const HomeChatWindow = () => {
|
||||
<ChatBox
|
||||
appId={appId}
|
||||
chatId={chatId}
|
||||
isReady={!loading && !!appId}
|
||||
isReady={!loading && !!appId && isCurrentChatReady}
|
||||
enableAutoResume
|
||||
feedbackType={'user'}
|
||||
chatType={ChatTypeEnum.home}
|
||||
|
||||
@@ -401,6 +401,11 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
|
||||
}
|
||||
return resolve({ responseText, completedChat, resumeUnavailable });
|
||||
};
|
||||
const onAbort = () => {
|
||||
finished = true;
|
||||
responseQueue = [];
|
||||
return onfinish();
|
||||
};
|
||||
const onfailed = (err?: any) => {
|
||||
finished = true;
|
||||
const message = getErrText(err, error ?? '响应过程出现异常~');
|
||||
@@ -424,8 +429,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
|
||||
|
||||
function animateResponseLoop() {
|
||||
if (signal.aborted) {
|
||||
responseQueue.forEach(applyMessageItem);
|
||||
return onfinish();
|
||||
return onAbort();
|
||||
}
|
||||
|
||||
if (responseQueue.length > 0) {
|
||||
@@ -448,6 +452,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
|
||||
animateResponseLoop();
|
||||
|
||||
const enqueue = (data: ResponseQueueItemType) => {
|
||||
if (signal.aborted) return;
|
||||
|
||||
if (resumePhase === StreamResumePhaseEnum.catchup) {
|
||||
applyMessageItem(data);
|
||||
return;
|
||||
@@ -485,6 +491,8 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
|
||||
}
|
||||
},
|
||||
onmessage: ({ event, data }) => {
|
||||
if (signal.aborted) return;
|
||||
|
||||
if (event === StreamResumePhaseEvent) {
|
||||
if (data === StreamResumePhaseEnum.catchup || data === StreamResumePhaseEnum.live) {
|
||||
resumePhase = data;
|
||||
@@ -546,8 +554,7 @@ function $resumefetch({ url, onmessage, onResumeUnavailable, controller }: Resum
|
||||
clearTimeout(timer);
|
||||
|
||||
if (controller.signal.aborted) {
|
||||
finished = true;
|
||||
return;
|
||||
return onAbort();
|
||||
}
|
||||
|
||||
onfailed(err);
|
||||
@@ -601,7 +608,10 @@ type StreamResumeFetchParams = {
|
||||
onResumeUnavailable?: (data: ResumeUnavailableType) => void;
|
||||
controller: AbortController;
|
||||
};
|
||||
export function streamResumeFetch(params: StreamResumeFetchParams) {
|
||||
|
||||
let activeResumeController: AbortController | undefined;
|
||||
|
||||
export async function streamResumeFetch(params: StreamResumeFetchParams) {
|
||||
const { appId, chatId, outLinkAuthData, onmessage, onResumeUnavailable, controller } = params;
|
||||
const query = new URLSearchParams({ appId, chatId });
|
||||
|
||||
@@ -612,7 +622,16 @@ export function streamResumeFetch(params: StreamResumeFetchParams) {
|
||||
|
||||
const url = `/api/core/chat/resume?${query}`;
|
||||
|
||||
return $resumefetch({ url, onmessage, onResumeUnavailable, controller });
|
||||
if (activeResumeController && activeResumeController !== controller) {
|
||||
activeResumeController.abort('replace');
|
||||
}
|
||||
activeResumeController = controller;
|
||||
|
||||
return $resumefetch({ url, onmessage, onResumeUnavailable, controller }).finally(() => {
|
||||
if (activeResumeController === controller) {
|
||||
activeResumeController = undefined;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
export const onOptimizePrompt = async ({
|
||||
|
||||
Reference in New Issue
Block a user