From a62a9c4067c27c9dc42e796f3394ad9b6d5638b7 Mon Sep 17 00:00:00 2001 From: archer <545436317@qq.com> Date: Fri, 19 May 2023 00:00:56 +0800 Subject: [PATCH] perf: stream response --- src/api/fetch.ts | 2 +- src/pages/api/chat/chat.ts | 14 ++---------- src/pages/api/chat/shareChat/chat.ts | 14 ++---------- src/pages/api/openapi/chat/chat.ts | 14 ++---------- src/pages/api/openapi/chat/lafGpt.ts | 14 ++---------- src/pages/model/share/components/list.tsx | 9 +++++++- src/service/events/generateVector.ts | 10 ++++++--- src/service/mongo.ts | 2 +- src/service/response.ts | 2 +- src/service/utils/chat/claude.ts | 26 ++++++----------------- src/service/utils/chat/index.ts | 16 ++++---------- src/service/utils/chat/openai.ts | 6 +++--- src/types/pg.d.ts | 6 +++--- 13 files changed, 43 insertions(+), 92 deletions(-) diff --git a/src/api/fetch.ts b/src/api/fetch.ts index 77ecd53aa..6042fda35 100644 --- a/src/api/fetch.ts +++ b/src/api/fetch.ts @@ -41,7 +41,7 @@ export const streamFetch = ({ url, data, onMessage, abortSignal }: StreamFetchPr return; } - const text = decoder.decode(value).replace(//g, '\n'); + const text = decoder.decode(value); responseText += text; onMessage(text); read(); diff --git a/src/pages/api/chat/chat.ts b/src/pages/api/chat/chat.ts index 6acc914ff..d472dcde5 100644 --- a/src/pages/api/chat/chat.ts +++ b/src/pages/api/chat/chat.ts @@ -4,7 +4,6 @@ import { authChat } from '@/service/utils/auth'; import { modelServiceToolMap } from '@/service/utils/chat'; import { ChatItemSimpleType } from '@/types/chat'; import { jsonRes } from '@/service/response'; -import { PassThrough } from 'stream'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { pushChatBill } from '@/service/events/pushBill'; import { resStreamResponse } from '@/service/utils/chat'; @@ -14,17 +13,9 @@ import { ChatRoleEnum } from '@/constants/chat'; /* 发送提示词 */ export default async function handler(req: NextApiRequest, res: NextApiResponse) { let step = 0; // step=1时,表示开始了流响应 - const stream = new PassThrough(); - stream.on('error', () => { - console.log('error: ', 'stream error'); - stream.destroy(); - }); - res.on('close', () => { - stream.destroy(); - }); res.on('error', () => { console.log('error: ', 'request error'); - stream.destroy(); + res.end(); }); try { @@ -100,7 +91,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) const { totalTokens, finishMessages } = await resStreamResponse({ model: model.chat.chatModel, res, - stream, chatResponse: streamResponse, prompts, systemPrompt: showModelDetail @@ -123,8 +113,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) } catch (err: any) { if (step === 1) { // 直接结束流 + res.end(); console.log('error,结束'); - stream.destroy(); } else { res.status(500); jsonRes(res, { diff --git a/src/pages/api/chat/shareChat/chat.ts b/src/pages/api/chat/shareChat/chat.ts index 1602c81c3..689dd4cfe 100644 --- a/src/pages/api/chat/shareChat/chat.ts +++ b/src/pages/api/chat/shareChat/chat.ts @@ -4,7 +4,6 @@ import { authShareChat } from '@/service/utils/auth'; import { modelServiceToolMap } from '@/service/utils/chat'; import { ChatItemSimpleType } from '@/types/chat'; import { jsonRes } from '@/service/response'; -import { PassThrough } from 'stream'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { pushChatBill, updateShareChatBill } from '@/service/events/pushBill'; import { resStreamResponse } from '@/service/utils/chat'; @@ -14,17 +13,9 @@ import { ChatRoleEnum } from '@/constants/chat'; /* 发送提示词 */ export default async function handler(req: NextApiRequest, res: NextApiResponse) { let step = 0; // step=1 时,表示开始了流响应 - const stream = new PassThrough(); - stream.on('error', () => { - console.log('error: ', 'stream error'); - stream.destroy(); - }); - res.on('close', () => { - stream.destroy(); - }); res.on('error', () => { console.log('error: ', 'request error'); - stream.destroy(); + res.end(); }); try { @@ -96,7 +87,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) const { totalTokens, finishMessages } = await resStreamResponse({ model: model.chat.chatModel, res, - stream, chatResponse: streamResponse, prompts, systemPrompt: '' @@ -117,8 +107,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) } catch (err: any) { if (step === 1) { // 直接结束流 + res.end(); console.log('error,结束'); - stream.destroy(); } else { res.status(500); jsonRes(res, { diff --git a/src/pages/api/openapi/chat/chat.ts b/src/pages/api/openapi/chat/chat.ts index e4dbcc07f..f8b80bea3 100644 --- a/src/pages/api/openapi/chat/chat.ts +++ b/src/pages/api/openapi/chat/chat.ts @@ -4,7 +4,6 @@ import { authOpenApiKey, authModel, getApiKey } from '@/service/utils/auth'; import { modelServiceToolMap, resStreamResponse } from '@/service/utils/chat'; import { ChatItemSimpleType } from '@/types/chat'; import { jsonRes } from '@/service/response'; -import { PassThrough } from 'stream'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { pushChatBill } from '@/service/events/pushBill'; import { searchKb } from '@/service/plugins/searchKb'; @@ -13,17 +12,9 @@ import { ChatRoleEnum } from '@/constants/chat'; /* 发送提示词 */ export default async function handler(req: NextApiRequest, res: NextApiResponse) { let step = 0; // step=1时,表示开始了流响应 - const stream = new PassThrough(); - stream.on('error', () => { - console.log('error: ', 'stream error'); - stream.destroy(); - }); - res.on('close', () => { - stream.destroy(); - }); res.on('error', () => { console.log('error: ', 'request error'); - stream.destroy(); + res.end(); }); try { @@ -120,7 +111,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) const { finishMessages, totalTokens } = await resStreamResponse({ model: model.chat.chatModel, res, - stream, chatResponse: streamResponse, prompts }); @@ -143,8 +133,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) } catch (err: any) { if (step === 1) { // 直接结束流 + res.end(); console.log('error,结束'); - stream.destroy(); } else { res.status(500); jsonRes(res, { diff --git a/src/pages/api/openapi/chat/lafGpt.ts b/src/pages/api/openapi/chat/lafGpt.ts index f11cec20d..6e7864588 100644 --- a/src/pages/api/openapi/chat/lafGpt.ts +++ b/src/pages/api/openapi/chat/lafGpt.ts @@ -4,7 +4,6 @@ import { authOpenApiKey, authModel, getApiKey } from '@/service/utils/auth'; import { resStreamResponse, modelServiceToolMap } from '@/service/utils/chat'; import { ChatItemSimpleType } from '@/types/chat'; import { jsonRes } from '@/service/response'; -import { PassThrough } from 'stream'; import { ChatModelMap, ModelVectorSearchModeMap } from '@/constants/model'; import { pushChatBill } from '@/service/events/pushBill'; import { searchKb } from '@/service/plugins/searchKb'; @@ -13,17 +12,9 @@ import { ChatRoleEnum } from '@/constants/chat'; /* 发送提示词 */ export default async function handler(req: NextApiRequest, res: NextApiResponse) { let step = 0; // step=1时,表示开始了流响应 - const stream = new PassThrough(); - stream.on('error', () => { - console.log('error: ', 'stream error'); - stream.destroy(); - }); - res.on('close', () => { - stream.destroy(); - }); res.on('error', () => { console.log('error: ', 'request error'); - stream.destroy(); + res.end(); }); try { @@ -155,7 +146,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) const { finishMessages, totalTokens } = await resStreamResponse({ model: model.chat.chatModel, res, - stream, chatResponse: streamResponse, prompts }); @@ -182,7 +172,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse) if (step === 1) { // 直接结束流 console.log('error,结束'); - stream.destroy(); + res.end(); } else { res.status(500); jsonRes(res, { diff --git a/src/pages/model/share/components/list.tsx b/src/pages/model/share/components/list.tsx index 9dee73dbe..16d9c8515 100644 --- a/src/pages/model/share/components/list.tsx +++ b/src/pages/model/share/components/list.tsx @@ -38,7 +38,14 @@ const ShareModelList = ({ {model.name} - + {model.share.intro || '这个AI助手还没有介绍~'} diff --git a/src/service/events/generateVector.ts b/src/service/events/generateVector.ts index d3fe109dc..8202abf4c 100644 --- a/src/service/events/generateVector.ts +++ b/src/service/events/generateVector.ts @@ -86,9 +86,13 @@ export async function generateVector(next = false): Promise { // 没有余额或者凭证错误时,拒绝任务 if (dataId && openaiError2[error?.response?.data?.error?.type]) { console.log('删除向量生成任务记录'); - await PgClient.delete('modelData', { - where: [['id', dataId]] - }); + try { + await PgClient.delete('modelData', { + where: [['id', dataId]] + }); + } catch (error) { + error; + } generateVector(true); return; } diff --git a/src/service/mongo.ts b/src/service/mongo.ts index 315e6b7e9..9b0989cfe 100644 --- a/src/service/mongo.ts +++ b/src/service/mongo.ts @@ -28,7 +28,7 @@ export async function connectToDatabase(): Promise { } generateQA(); - generateVector(true); + generateVector(); // 创建代理对象 if (process.env.AXIOS_PROXY_HOST && process.env.AXIOS_PROXY_PORT) { diff --git a/src/service/response.ts b/src/service/response.ts index e0b0df9a2..2dce57220 100644 --- a/src/service/response.ts +++ b/src/service/response.ts @@ -37,7 +37,7 @@ export const jsonRes = ( if (typeof error === 'string') { msg = error; } else if (proxyError[error?.code]) { - msg = '服务器代理出错'; + msg = '接口连接异常'; } else if (error?.response?.data?.error?.message) { msg = error?.response?.data?.error?.message; } else if (openaiError2[error?.response?.data?.error?.type]) { diff --git a/src/service/utils/chat/claude.ts b/src/service/utils/chat/claude.ts index da37d1871..522f7f5b2 100644 --- a/src/service/utils/chat/claude.ts +++ b/src/service/utils/chat/claude.ts @@ -1,19 +1,11 @@ -import { modelToolMap } from '@/utils/chat'; import { ChatCompletionType, StreamResponseType } from './index'; import { ChatRoleEnum } from '@/constants/chat'; import axios from 'axios'; import mongoose from 'mongoose'; import { NEW_CHATID_HEADER } from '@/constants/chat'; -import { ClaudeEnum } from '@/constants/model'; /* 模型对话 */ -export const lafClaudChat = async ({ - apiKey, - messages, - stream, - chatId, - res -}: ChatCompletionType) => { +export const claudChat = async ({ apiKey, messages, stream, chatId, res }: ChatCompletionType) => { const conversationId = chatId || String(new mongoose.Types.ObjectId()); // create a new chat !chatId && @@ -29,7 +21,7 @@ export const lafClaudChat = async ({ const prompt = `${systemPromptText}'${messages[messages.length - 1].value}'`; - const lafResponse = await axios.post( + const response = await axios.post( process.env.CLAUDE_BASE_URL || '', { prompt, @@ -45,10 +37,10 @@ export const lafClaudChat = async ({ } ); - const responseText = stream ? '' : lafResponse.data?.text || ''; + const responseText = stream ? '' : response.data?.text || ''; return { - streamResponse: lafResponse, + streamResponse: response, responseMessages: messages.concat({ obj: ChatRoleEnum.AI, value: responseText }), responseText, totalTokens: 0 @@ -56,24 +48,20 @@ export const lafClaudChat = async ({ }; /* openai stream response */ -export const lafClaudStreamResponse = async ({ - stream, - chatResponse, - prompts -}: StreamResponseType) => { +export const claudStreamResponse = async ({ res, chatResponse, prompts }: StreamResponseType) => { try { let responseContent = ''; try { const decoder = new TextDecoder(); for await (const chunk of chatResponse.data as any) { - if (stream.destroyed) { + if (!res.writable) { // 流被中断了,直接忽略后面的内容 break; } const content = decoder.decode(chunk); responseContent += content; - content && stream.push(content.replace(/\n/g, '
')); + content && res.write(content); } } catch (error) { console.log('pipe error', error); diff --git a/src/service/utils/chat/index.ts b/src/service/utils/chat/index.ts index 51c7aa11d..895956e46 100644 --- a/src/service/utils/chat/index.ts +++ b/src/service/utils/chat/index.ts @@ -4,15 +4,13 @@ import type { ChatModelType } from '@/constants/model'; import { ChatRoleEnum, SYSTEM_PROMPT_HEADER } from '@/constants/chat'; import { OpenAiChatEnum, ClaudeEnum } from '@/constants/model'; import { chatResponse, openAiStreamResponse } from './openai'; -import { lafClaudChat, lafClaudStreamResponse } from './claude'; +import { claudChat, claudStreamResponse } from './claude'; import type { NextApiResponse } from 'next'; -import type { PassThrough } from 'stream'; export type ChatCompletionType = { apiKey: string; temperature: number; messages: ChatItemSimpleType[]; - stream: boolean; [key: string]: any; }; export type ChatCompletionResponseType = { @@ -22,7 +20,6 @@ export type ChatCompletionResponseType = { totalTokens: number; }; export type StreamResponseType = { - stream: PassThrough; chatResponse: any; prompts: ChatItemSimpleType[]; res: NextApiResponse; @@ -70,8 +67,8 @@ export const modelServiceToolMap: Record< }) }, [ClaudeEnum.Claude]: { - chatCompletion: lafClaudChat, - streamResponse: lafClaudStreamResponse + chatCompletion: claudChat, + streamResponse: claudStreamResponse } }; @@ -131,7 +128,6 @@ export const ChatContextFilter = ({ export const resStreamResponse = async ({ model, res, - stream, chatResponse, systemPrompt, prompts @@ -144,21 +140,17 @@ export const resStreamResponse = async ({ res.setHeader('X-Accel-Buffering', 'no'); res.setHeader('Cache-Control', 'no-cache, no-transform'); systemPrompt && res.setHeader(SYSTEM_PROMPT_HEADER, encodeURIComponent(systemPrompt)); - stream.pipe(res); const { responseContent, totalTokens, finishMessages } = await modelServiceToolMap[ model ].streamResponse({ chatResponse, - stream, prompts, res, systemPrompt }); - // close stream - !stream.destroyed && stream.push(null); - stream.destroy(); + res.end(); return { responseContent, totalTokens, finishMessages }; }; diff --git a/src/service/utils/chat/openai.ts b/src/service/utils/chat/openai.ts index 98b2347ae..95a2993fc 100644 --- a/src/service/utils/chat/openai.ts +++ b/src/service/utils/chat/openai.ts @@ -110,8 +110,8 @@ export const chatResponse = async ({ /* openai stream response */ export const openAiStreamResponse = async ({ + res, model, - stream, chatResponse, prompts }: StreamResponseType & { @@ -129,7 +129,7 @@ export const openAiStreamResponse = async ({ const content: string = json?.choices?.[0].delta.content || ''; responseContent += content; - !stream.destroyed && content && stream.push(content.replace(/\n/g, '
')); + res.writable && content && res.write(content); } catch (error) { error; } @@ -139,7 +139,7 @@ export const openAiStreamResponse = async ({ const decoder = new TextDecoder(); const parser = createParser(onParse); for await (const chunk of chatResponse.data as any) { - if (stream.destroyed) { + if (!res.writable) { // 流被中断了,直接忽略后面的内容 break; } diff --git a/src/types/pg.d.ts b/src/types/pg.d.ts index f2a6bce79..073cb1c88 100644 --- a/src/types/pg.d.ts +++ b/src/types/pg.d.ts @@ -5,7 +5,7 @@ export interface PgKBDataItemType { q: string; a: string; status: `${ModelDataStatusEnum}`; - model_id: string; - user_id: string; - kb_id: string; + // model_id: string; + // user_id: string; + // kb_id: string; }