perf: stream response

This commit is contained in:
archer
2023-05-19 00:00:56 +08:00
parent 7408db9cf6
commit a62a9c4067
13 changed files with 43 additions and 92 deletions

View File

@@ -41,7 +41,7 @@ export const streamFetch = ({ url, data, onMessage, abortSignal }: StreamFetchPr
return; return;
} }
const text = decoder.decode(value).replace(/<br\/>/g, '\n'); const text = decoder.decode(value);
responseText += text; responseText += text;
onMessage(text); onMessage(text);
read(); read();

View File

@@ -4,7 +4,6 @@ import { authChat } from '@/service/utils/auth';
import { modelServiceToolMap } from '@/service/utils/chat'; import { modelServiceToolMap } from '@/service/utils/chat';
import { ChatItemSimpleType } from '@/types/chat'; import { ChatItemSimpleType } from '@/types/chat';
import { jsonRes } from '@/service/response'; import { jsonRes } from '@/service/response';
import { PassThrough } from 'stream';
import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model';
import { pushChatBill } from '@/service/events/pushBill'; import { pushChatBill } from '@/service/events/pushBill';
import { resStreamResponse } from '@/service/utils/chat'; import { resStreamResponse } from '@/service/utils/chat';
@@ -14,17 +13,9 @@ import { ChatRoleEnum } from '@/constants/chat';
/* 发送提示词 */ /* 发送提示词 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) { export default async function handler(req: NextApiRequest, res: NextApiResponse) {
let step = 0; // step=1时表示开始了流响应 let step = 0; // step=1时表示开始了流响应
const stream = new PassThrough();
stream.on('error', () => {
console.log('error: ', 'stream error');
stream.destroy();
});
res.on('close', () => {
stream.destroy();
});
res.on('error', () => { res.on('error', () => {
console.log('error: ', 'request error'); console.log('error: ', 'request error');
stream.destroy(); res.end();
}); });
try { try {
@@ -100,7 +91,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const { totalTokens, finishMessages } = await resStreamResponse({ const { totalTokens, finishMessages } = await resStreamResponse({
model: model.chat.chatModel, model: model.chat.chatModel,
res, res,
stream,
chatResponse: streamResponse, chatResponse: streamResponse,
prompts, prompts,
systemPrompt: showModelDetail systemPrompt: showModelDetail
@@ -123,8 +113,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
} catch (err: any) { } catch (err: any) {
if (step === 1) { if (step === 1) {
// 直接结束流 // 直接结束流
res.end();
console.log('error结束'); console.log('error结束');
stream.destroy();
} else { } else {
res.status(500); res.status(500);
jsonRes(res, { jsonRes(res, {

View File

@@ -4,7 +4,6 @@ import { authShareChat } from '@/service/utils/auth';
import { modelServiceToolMap } from '@/service/utils/chat'; import { modelServiceToolMap } from '@/service/utils/chat';
import { ChatItemSimpleType } from '@/types/chat'; import { ChatItemSimpleType } from '@/types/chat';
import { jsonRes } from '@/service/response'; import { jsonRes } from '@/service/response';
import { PassThrough } from 'stream';
import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model';
import { pushChatBill, updateShareChatBill } from '@/service/events/pushBill'; import { pushChatBill, updateShareChatBill } from '@/service/events/pushBill';
import { resStreamResponse } from '@/service/utils/chat'; import { resStreamResponse } from '@/service/utils/chat';
@@ -14,17 +13,9 @@ import { ChatRoleEnum } from '@/constants/chat';
/* 发送提示词 */ /* 发送提示词 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) { export default async function handler(req: NextApiRequest, res: NextApiResponse) {
let step = 0; // step=1 时,表示开始了流响应 let step = 0; // step=1 时,表示开始了流响应
const stream = new PassThrough();
stream.on('error', () => {
console.log('error: ', 'stream error');
stream.destroy();
});
res.on('close', () => {
stream.destroy();
});
res.on('error', () => { res.on('error', () => {
console.log('error: ', 'request error'); console.log('error: ', 'request error');
stream.destroy(); res.end();
}); });
try { try {
@@ -96,7 +87,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const { totalTokens, finishMessages } = await resStreamResponse({ const { totalTokens, finishMessages } = await resStreamResponse({
model: model.chat.chatModel, model: model.chat.chatModel,
res, res,
stream,
chatResponse: streamResponse, chatResponse: streamResponse,
prompts, prompts,
systemPrompt: '' systemPrompt: ''
@@ -117,8 +107,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
} catch (err: any) { } catch (err: any) {
if (step === 1) { if (step === 1) {
// 直接结束流 // 直接结束流
res.end();
console.log('error结束'); console.log('error结束');
stream.destroy();
} else { } else {
res.status(500); res.status(500);
jsonRes(res, { jsonRes(res, {

View File

@@ -4,7 +4,6 @@ import { authOpenApiKey, authModel, getApiKey } from '@/service/utils/auth';
import { modelServiceToolMap, resStreamResponse } from '@/service/utils/chat'; import { modelServiceToolMap, resStreamResponse } from '@/service/utils/chat';
import { ChatItemSimpleType } from '@/types/chat'; import { ChatItemSimpleType } from '@/types/chat';
import { jsonRes } from '@/service/response'; import { jsonRes } from '@/service/response';
import { PassThrough } from 'stream';
import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model';
import { pushChatBill } from '@/service/events/pushBill'; import { pushChatBill } from '@/service/events/pushBill';
import { searchKb } from '@/service/plugins/searchKb'; import { searchKb } from '@/service/plugins/searchKb';
@@ -13,17 +12,9 @@ import { ChatRoleEnum } from '@/constants/chat';
/* 发送提示词 */ /* 发送提示词 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) { export default async function handler(req: NextApiRequest, res: NextApiResponse) {
let step = 0; // step=1时表示开始了流响应 let step = 0; // step=1时表示开始了流响应
const stream = new PassThrough();
stream.on('error', () => {
console.log('error: ', 'stream error');
stream.destroy();
});
res.on('close', () => {
stream.destroy();
});
res.on('error', () => { res.on('error', () => {
console.log('error: ', 'request error'); console.log('error: ', 'request error');
stream.destroy(); res.end();
}); });
try { try {
@@ -120,7 +111,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const { finishMessages, totalTokens } = await resStreamResponse({ const { finishMessages, totalTokens } = await resStreamResponse({
model: model.chat.chatModel, model: model.chat.chatModel,
res, res,
stream,
chatResponse: streamResponse, chatResponse: streamResponse,
prompts prompts
}); });
@@ -143,8 +133,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
} catch (err: any) { } catch (err: any) {
if (step === 1) { if (step === 1) {
// 直接结束流 // 直接结束流
res.end();
console.log('error结束'); console.log('error结束');
stream.destroy();
} else { } else {
res.status(500); res.status(500);
jsonRes(res, { jsonRes(res, {

View File

@@ -4,7 +4,6 @@ import { authOpenApiKey, authModel, getApiKey } from '@/service/utils/auth';
import { resStreamResponse, modelServiceToolMap } from '@/service/utils/chat'; import { resStreamResponse, modelServiceToolMap } from '@/service/utils/chat';
import { ChatItemSimpleType } from '@/types/chat'; import { ChatItemSimpleType } from '@/types/chat';
import { jsonRes } from '@/service/response'; import { jsonRes } from '@/service/response';
import { PassThrough } from 'stream';
import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model';
import { pushChatBill } from '@/service/events/pushBill'; import { pushChatBill } from '@/service/events/pushBill';
import { searchKb } from '@/service/plugins/searchKb'; import { searchKb } from '@/service/plugins/searchKb';
@@ -13,17 +12,9 @@ import { ChatRoleEnum } from '@/constants/chat';
/* 发送提示词 */ /* 发送提示词 */
export default async function handler(req: NextApiRequest, res: NextApiResponse) { export default async function handler(req: NextApiRequest, res: NextApiResponse) {
let step = 0; // step=1时表示开始了流响应 let step = 0; // step=1时表示开始了流响应
const stream = new PassThrough();
stream.on('error', () => {
console.log('error: ', 'stream error');
stream.destroy();
});
res.on('close', () => {
stream.destroy();
});
res.on('error', () => { res.on('error', () => {
console.log('error: ', 'request error'); console.log('error: ', 'request error');
stream.destroy(); res.end();
}); });
try { try {
@@ -155,7 +146,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
const { finishMessages, totalTokens } = await resStreamResponse({ const { finishMessages, totalTokens } = await resStreamResponse({
model: model.chat.chatModel, model: model.chat.chatModel,
res, res,
stream,
chatResponse: streamResponse, chatResponse: streamResponse,
prompts prompts
}); });
@@ -182,7 +172,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
if (step === 1) { if (step === 1) {
// 直接结束流 // 直接结束流
console.log('error结束'); console.log('error结束');
stream.destroy(); res.end();
} else { } else {
res.status(500); res.status(500);
jsonRes(res, { jsonRes(res, {

View File

@@ -38,7 +38,14 @@ const ShareModelList = ({
{model.name} {model.name}
</Box> </Box>
</Flex> </Flex>
<Box flex={1} className={styles.intro} my={4} fontSize={'sm'} color={'blackAlpha.600'}> <Box
flex={1}
className={styles.intro}
my={4}
fontSize={'sm'}
wordBreak={'break-all'}
color={'blackAlpha.600'}
>
{model.share.intro || '这个AI助手还没有介绍~'} {model.share.intro || '这个AI助手还没有介绍~'}
</Box> </Box>
<Flex justifyContent={'space-between'}> <Flex justifyContent={'space-between'}>

View File

@@ -86,9 +86,13 @@ export async function generateVector(next = false): Promise<any> {
// 没有余额或者凭证错误时,拒绝任务 // 没有余额或者凭证错误时,拒绝任务
if (dataId && openaiError2[error?.response?.data?.error?.type]) { if (dataId && openaiError2[error?.response?.data?.error?.type]) {
console.log('删除向量生成任务记录'); console.log('删除向量生成任务记录');
await PgClient.delete('modelData', { try {
where: [['id', dataId]] await PgClient.delete('modelData', {
}); where: [['id', dataId]]
});
} catch (error) {
error;
}
generateVector(true); generateVector(true);
return; return;
} }

View File

@@ -28,7 +28,7 @@ export async function connectToDatabase(): Promise<void> {
} }
generateQA(); generateQA();
generateVector(true); generateVector();
// 创建代理对象 // 创建代理对象
if (process.env.AXIOS_PROXY_HOST && process.env.AXIOS_PROXY_PORT) { if (process.env.AXIOS_PROXY_HOST && process.env.AXIOS_PROXY_PORT) {

View File

@@ -37,7 +37,7 @@ export const jsonRes = <T = any>(
if (typeof error === 'string') { if (typeof error === 'string') {
msg = error; msg = error;
} else if (proxyError[error?.code]) { } else if (proxyError[error?.code]) {
msg = '服务器代理出错'; msg = '接口连接异常';
} else if (error?.response?.data?.error?.message) { } else if (error?.response?.data?.error?.message) {
msg = error?.response?.data?.error?.message; msg = error?.response?.data?.error?.message;
} else if (openaiError2[error?.response?.data?.error?.type]) { } else if (openaiError2[error?.response?.data?.error?.type]) {

View File

@@ -1,19 +1,11 @@
import { modelToolMap } from '@/utils/chat';
import { ChatCompletionType, StreamResponseType } from './index'; import { ChatCompletionType, StreamResponseType } from './index';
import { ChatRoleEnum } from '@/constants/chat'; import { ChatRoleEnum } from '@/constants/chat';
import axios from 'axios'; import axios from 'axios';
import mongoose from 'mongoose'; import mongoose from 'mongoose';
import { NEW_CHATID_HEADER } from '@/constants/chat'; import { NEW_CHATID_HEADER } from '@/constants/chat';
import { ClaudeEnum } from '@/constants/model';
/* 模型对话 */ /* 模型对话 */
export const lafClaudChat = async ({ export const claudChat = async ({ apiKey, messages, stream, chatId, res }: ChatCompletionType) => {
apiKey,
messages,
stream,
chatId,
res
}: ChatCompletionType) => {
const conversationId = chatId || String(new mongoose.Types.ObjectId()); const conversationId = chatId || String(new mongoose.Types.ObjectId());
// create a new chat // create a new chat
!chatId && !chatId &&
@@ -29,7 +21,7 @@ export const lafClaudChat = async ({
const prompt = `${systemPromptText}'${messages[messages.length - 1].value}'`; const prompt = `${systemPromptText}'${messages[messages.length - 1].value}'`;
const lafResponse = await axios.post( const response = await axios.post(
process.env.CLAUDE_BASE_URL || '', process.env.CLAUDE_BASE_URL || '',
{ {
prompt, prompt,
@@ -45,10 +37,10 @@ export const lafClaudChat = async ({
} }
); );
const responseText = stream ? '' : lafResponse.data?.text || ''; const responseText = stream ? '' : response.data?.text || '';
return { return {
streamResponse: lafResponse, streamResponse: response,
responseMessages: messages.concat({ obj: ChatRoleEnum.AI, value: responseText }), responseMessages: messages.concat({ obj: ChatRoleEnum.AI, value: responseText }),
responseText, responseText,
totalTokens: 0 totalTokens: 0
@@ -56,24 +48,20 @@ export const lafClaudChat = async ({
}; };
/* openai stream response */ /* openai stream response */
export const lafClaudStreamResponse = async ({ export const claudStreamResponse = async ({ res, chatResponse, prompts }: StreamResponseType) => {
stream,
chatResponse,
prompts
}: StreamResponseType) => {
try { try {
let responseContent = ''; let responseContent = '';
try { try {
const decoder = new TextDecoder(); const decoder = new TextDecoder();
for await (const chunk of chatResponse.data as any) { for await (const chunk of chatResponse.data as any) {
if (stream.destroyed) { if (!res.writable) {
// 流被中断了,直接忽略后面的内容 // 流被中断了,直接忽略后面的内容
break; break;
} }
const content = decoder.decode(chunk); const content = decoder.decode(chunk);
responseContent += content; responseContent += content;
content && stream.push(content.replace(/\n/g, '<br/>')); content && res.write(content);
} }
} catch (error) { } catch (error) {
console.log('pipe error', error); console.log('pipe error', error);

View File

@@ -4,15 +4,13 @@ import type { ChatModelType } from '@/constants/model';
import { ChatRoleEnum, SYSTEM_PROMPT_HEADER } from '@/constants/chat'; import { ChatRoleEnum, SYSTEM_PROMPT_HEADER } from '@/constants/chat';
import { OpenAiChatEnum, ClaudeEnum } from '@/constants/model'; import { OpenAiChatEnum, ClaudeEnum } from '@/constants/model';
import { chatResponse, openAiStreamResponse } from './openai'; import { chatResponse, openAiStreamResponse } from './openai';
import { lafClaudChat, lafClaudStreamResponse } from './claude'; import { claudChat, claudStreamResponse } from './claude';
import type { NextApiResponse } from 'next'; import type { NextApiResponse } from 'next';
import type { PassThrough } from 'stream';
export type ChatCompletionType = { export type ChatCompletionType = {
apiKey: string; apiKey: string;
temperature: number; temperature: number;
messages: ChatItemSimpleType[]; messages: ChatItemSimpleType[];
stream: boolean;
[key: string]: any; [key: string]: any;
}; };
export type ChatCompletionResponseType = { export type ChatCompletionResponseType = {
@@ -22,7 +20,6 @@ export type ChatCompletionResponseType = {
totalTokens: number; totalTokens: number;
}; };
export type StreamResponseType = { export type StreamResponseType = {
stream: PassThrough;
chatResponse: any; chatResponse: any;
prompts: ChatItemSimpleType[]; prompts: ChatItemSimpleType[];
res: NextApiResponse; res: NextApiResponse;
@@ -70,8 +67,8 @@ export const modelServiceToolMap: Record<
}) })
}, },
[ClaudeEnum.Claude]: { [ClaudeEnum.Claude]: {
chatCompletion: lafClaudChat, chatCompletion: claudChat,
streamResponse: lafClaudStreamResponse streamResponse: claudStreamResponse
} }
}; };
@@ -131,7 +128,6 @@ export const ChatContextFilter = ({
export const resStreamResponse = async ({ export const resStreamResponse = async ({
model, model,
res, res,
stream,
chatResponse, chatResponse,
systemPrompt, systemPrompt,
prompts prompts
@@ -144,21 +140,17 @@ export const resStreamResponse = async ({
res.setHeader('X-Accel-Buffering', 'no'); res.setHeader('X-Accel-Buffering', 'no');
res.setHeader('Cache-Control', 'no-cache, no-transform'); res.setHeader('Cache-Control', 'no-cache, no-transform');
systemPrompt && res.setHeader(SYSTEM_PROMPT_HEADER, encodeURIComponent(systemPrompt)); systemPrompt && res.setHeader(SYSTEM_PROMPT_HEADER, encodeURIComponent(systemPrompt));
stream.pipe(res);
const { responseContent, totalTokens, finishMessages } = await modelServiceToolMap[ const { responseContent, totalTokens, finishMessages } = await modelServiceToolMap[
model model
].streamResponse({ ].streamResponse({
chatResponse, chatResponse,
stream,
prompts, prompts,
res, res,
systemPrompt systemPrompt
}); });
// close stream res.end();
!stream.destroyed && stream.push(null);
stream.destroy();
return { responseContent, totalTokens, finishMessages }; return { responseContent, totalTokens, finishMessages };
}; };

View File

@@ -110,8 +110,8 @@ export const chatResponse = async ({
/* openai stream response */ /* openai stream response */
export const openAiStreamResponse = async ({ export const openAiStreamResponse = async ({
res,
model, model,
stream,
chatResponse, chatResponse,
prompts prompts
}: StreamResponseType & { }: StreamResponseType & {
@@ -129,7 +129,7 @@ export const openAiStreamResponse = async ({
const content: string = json?.choices?.[0].delta.content || ''; const content: string = json?.choices?.[0].delta.content || '';
responseContent += content; responseContent += content;
!stream.destroyed && content && stream.push(content.replace(/\n/g, '<br/>')); res.writable && content && res.write(content);
} catch (error) { } catch (error) {
error; error;
} }
@@ -139,7 +139,7 @@ export const openAiStreamResponse = async ({
const decoder = new TextDecoder(); const decoder = new TextDecoder();
const parser = createParser(onParse); const parser = createParser(onParse);
for await (const chunk of chatResponse.data as any) { for await (const chunk of chatResponse.data as any) {
if (stream.destroyed) { if (!res.writable) {
// 流被中断了,直接忽略后面的内容 // 流被中断了,直接忽略后面的内容
break; break;
} }

6
src/types/pg.d.ts vendored
View File

@@ -5,7 +5,7 @@ export interface PgKBDataItemType {
q: string; q: string;
a: string; a: string;
status: `${ModelDataStatusEnum}`; status: `${ModelDataStatusEnum}`;
model_id: string; // model_id: string;
user_id: string; // user_id: string;
kb_id: string; // kb_id: string;
} }