feat: 增加中断流.fix: 中断流导致的服务端错误

This commit is contained in:
archer
2023-03-22 22:09:40 +08:00
parent 5ec303610c
commit af35e17fdb
4 changed files with 81 additions and 40 deletions

View File

@@ -3,8 +3,9 @@ interface StreamFetchProps {
url: string;
data: any;
onMessage: (text: string) => void;
abortSignal: AbortController;
}
export const streamFetch = ({ url, data, onMessage }: StreamFetchProps) =>
export const streamFetch = ({ url, data, onMessage, abortSignal }: StreamFetchProps) =>
new Promise(async (resolve, reject) => {
try {
const res = await fetch(url, {
@@ -13,7 +14,8 @@ export const streamFetch = ({ url, data, onMessage }: StreamFetchProps) =>
'Content-Type': 'application/json',
Authorization: getToken() || ''
},
body: JSON.stringify(data)
body: JSON.stringify(data),
signal: abortSignal.signal
});
const reader = res.body?.getReader();
if (!reader) return;

View File

@@ -13,13 +13,27 @@ import { pushBill } from '@/service/events/bill';
/* 发送提示词 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
const { chatId, prompt } = req.body as {
prompt: ChatItemType;
chatId: string;
};
const { authorization } = req.headers;
let step = 0; // step=1时表示开始了流响应
const stream = new PassThrough();
stream.on('error', () => {
console.log('error: ', 'stream error');
stream.destroy();
});
res.on('close', () => {
console.log('stream request close');
stream.destroy();
});
res.on('error', () => {
console.log('error: ', 'request error');
stream.destroy();
});
try {
const { chatId, prompt } = req.body as {
prompt: ChatItemType;
chatId: string;
};
const { authorization } = req.headers;
if (!chatId || !prompt) {
throw new Error('缺少参数');
}
@@ -92,10 +106,10 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
res.setHeader('Access-Control-Allow-Origin', '*');
res.setHeader('X-Accel-Buffering', 'no');
res.setHeader('Cache-Control', 'no-cache, no-transform');
step = 1;
let responseContent = '';
const pass = new PassThrough();
pass.pipe(res);
stream.pipe(res);
const onParse = async (event: ParsedEvent | ReconnectInterval) => {
if (event.type !== 'event') return;
@@ -107,7 +121,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
if (!content) return;
responseContent += content;
// console.log('content:', content)
pass.push(content.replace(/\n/g, '<br/>'));
stream.push(content.replace(/\n/g, '<br/>'));
} catch (error) {
error;
}
@@ -116,13 +130,17 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const decoder = new TextDecoder();
try {
for await (const chunk of chatResponse.data as any) {
if (stream.destroyed) {
// 流被中断了,直接忽略后面的内容
break;
}
const parser = createParser(onParse);
parser.feed(decoder.decode(chunk));
}
} catch (error) {
console.log('pipe error', error);
}
pass.push(null);
stream.push(null);
const promptsLen = formatPrompts.reduce((sum, item) => sum + item.content.length, 0);
console.log(`responseLen: ${responseContent.length}`, `promptLen: ${promptsLen}`);
@@ -135,10 +153,16 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
textLen: promptsLen + responseContent.length
});
} catch (err: any) {
res.status(500);
jsonRes(res, {
code: 500,
error: err
});
if (step === 1) {
console.log('error结束');
// 直接结束流
stream.destroy();
} else {
res.status(500);
jsonRes(res, {
code: 500,
error: err
});
}
}
}

View File

@@ -1,4 +1,4 @@
import React, { useCallback, useState, useRef, useMemo } from 'react';
import React, { useCallback, useState, useRef, useMemo, useEffect } from 'react';
import { useRouter } from 'next/router';
import Image from 'next/image';
import {
@@ -88,6 +88,16 @@ const Chat = ({ chatId }: { chatId: string }) => {
}, [chatData]);
const { pushChatHistory } = useChatStore();
// 中断请求
const controller = useRef(new AbortController());
useEffect(() => {
controller.current = new AbortController();
return () => {
console.log('close========');
// eslint-disable-next-line react-hooks/exhaustive-deps
controller.current?.abort();
};
}, [chatId]);
// 滚动到底部
const scrollToBottom = useCallback(() => {
@@ -212,7 +222,8 @@ const Chat = ({ chatId }: { chatId: string }) => {
};
})
}));
}
},
abortSignal: controller.current
});
// 保存对话信息

View File

@@ -12,30 +12,34 @@ export const pushBill = async ({
chatId: string;
textLen: number;
}) => {
await connectToDatabase();
const modelItem = ModelList.find((item) => item.model === modelName);
if (!modelItem) return;
const price = modelItem.price * textLen;
let billId;
try {
// 插入 Bill 记录
const res = await Bill.create({
userId,
chatId,
textLen,
price
});
billId = res._id;
await connectToDatabase();
// 扣费
await User.findByIdAndUpdate(userId, {
$inc: { balance: -price }
});
const modelItem = ModelList.find((item) => item.model === modelName);
if (!modelItem) return;
const price = modelItem.price * textLen;
let billId;
try {
// 插入 Bill 记录
const res = await Bill.create({
userId,
chatId,
textLen,
price
});
billId = res._id;
// 扣费
await User.findByIdAndUpdate(userId, {
$inc: { balance: -price }
});
} catch (error) {
billId && Bill.findByIdAndDelete(billId);
}
} catch (error) {
billId && Bill.findByIdAndDelete(billId);
console.log(error);
}
};