From 986206b691af1fc4be68d0c57534d8281f258f62 Mon Sep 17 00:00:00 2001 From: archer <545436317@qq.com> Date: Fri, 23 Jun 2023 23:11:22 +0800 Subject: [PATCH] perf: sse response --- client/package.json | 1 - client/pnpm-lock.yaml | 10 -- client/src/api/fetch.ts | 20 +--- client/src/constants/chat.ts | 1 + .../api/openapi/plugin/openaiEmbedding.ts | 28 ++--- .../pages/api/openapi/v1/chat/completions.ts | 16 ++- client/src/pages/kb/components/Test.tsx | 6 + client/src/service/utils/auth.ts | 2 +- client/src/service/utils/chat/index.ts | 107 +++++++++--------- client/src/service/utils/chat/openai.ts | 48 ++++---- client/src/service/utils/tools.ts | 20 ++-- client/src/utils/adapt.ts | 21 ++++ docs/deploy/fastgpt/docker-compose.yml | 2 +- 13 files changed, 150 insertions(+), 132 deletions(-) diff --git a/client/package.json b/client/package.json index 1042ac9c1..e47d230ce 100644 --- a/client/package.json +++ b/client/package.json @@ -26,7 +26,6 @@ "crypto": "^1.0.1", "date-fns": "^2.30.0", "dayjs": "^1.11.7", - "eventsource-parser": "^0.1.0", "formidable": "^2.1.1", "framer-motion": "^9.0.6", "hyperdown": "^2.4.29", diff --git a/client/pnpm-lock.yaml b/client/pnpm-lock.yaml index 2f6636958..2a84498a2 100644 --- a/client/pnpm-lock.yaml +++ b/client/pnpm-lock.yaml @@ -56,9 +56,6 @@ dependencies: dayjs: specifier: ^1.11.7 version: registry.npmmirror.com/dayjs@1.11.7 - eventsource-parser: - specifier: ^0.1.0 - version: registry.npmmirror.com/eventsource-parser@0.1.0 formidable: specifier: ^2.1.1 version: registry.npmmirror.com/formidable@2.1.1 @@ -7510,13 +7507,6 @@ packages: version: 2.0.3 engines: {node: '>=0.10.0'} - registry.npmmirror.com/eventsource-parser@0.1.0: - resolution: {integrity: sha512-M9QjFtEIkwytUarnx113HGmgtk52LSn3jNAtnWKi3V+b9rqSfQeVdLsaD5AG/O4IrGQwmAAHBIsqbmURPTd2rA==, registry: https://registry.npm.taobao.org/, tarball: https://registry.npmmirror.com/eventsource-parser/-/eventsource-parser-0.1.0.tgz} - name: eventsource-parser - version: 0.1.0 - engines: {node: '>=14.18'} - dev: false - registry.npmmirror.com/execa@5.1.1: resolution: {integrity: sha512-8uSpZZocAZRBAPIEINJj3Lo9HyGitllczc27Eh5YYojjMFMn8yHMDMaUHE2Jqfq05D/wucwI4JGURyXt1vchyg==, registry: https://registry.npm.taobao.org/, tarball: https://registry.npmmirror.com/execa/-/execa-5.1.1.tgz} name: execa diff --git a/client/src/api/fetch.ts b/client/src/api/fetch.ts index 7ca564576..4a154e42c 100644 --- a/client/src/api/fetch.ts +++ b/client/src/api/fetch.ts @@ -1,6 +1,7 @@ import { Props, ChatResponseType } from '@/pages/api/openapi/v1/chat/completions'; import { sseResponseEventEnum } from '@/constants/chat'; import { getErrText } from '@/utils/tools'; +import { parseStreamChunk } from '@/utils/adapt'; interface StreamFetchProps { data: Props; @@ -32,7 +33,6 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps) } const reader = response.body?.getReader(); - const decoder = new TextDecoder('utf-8'); // response data let responseText = ''; @@ -53,21 +53,7 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps) return reject('响应过程出现异常~'); } } - const chunk = decoder.decode(value); - const chunkLines = chunk.split('\n\n').filter((item) => item); - const chunkResponse = chunkLines.map((item) => { - const splitEvent = item.split('\n'); - if (splitEvent.length === 2) { - return { - event: splitEvent[0].replace('event: ', ''), - data: splitEvent[1].replace('data: ', '') - }; - } - return { - event: '', - data: splitEvent[0].replace('data: ', '') - }; - }); + const chunkResponse = parseStreamChunk(value); chunkResponse.forEach((item) => { // parse json data @@ -87,6 +73,8 @@ export const streamFetch = ({ data, onMessage, abortSignal }: StreamFetchProps) const chatResponse = data as ChatResponseType; newChatId = chatResponse.newChatId; quoteLen = chatResponse.quoteLen || 0; + } else if (item.event === sseResponseEventEnum.error) { + return reject(getErrText(data, '流响应错误')); } }); read(); diff --git a/client/src/constants/chat.ts b/client/src/constants/chat.ts index 5f2cddcee..541429ce3 100644 --- a/client/src/constants/chat.ts +++ b/client/src/constants/chat.ts @@ -1,4 +1,5 @@ export enum sseResponseEventEnum { + error = 'error', answer = 'answer', chatResponse = 'chatResponse' } diff --git a/client/src/pages/api/openapi/plugin/openaiEmbedding.ts b/client/src/pages/api/openapi/plugin/openaiEmbedding.ts index 86e78bcd6..0c7e7a097 100644 --- a/client/src/pages/api/openapi/plugin/openaiEmbedding.ts +++ b/client/src/pages/api/openapi/plugin/openaiEmbedding.ts @@ -1,6 +1,6 @@ import type { NextApiRequest, NextApiResponse } from 'next'; import { jsonRes } from '@/service/response'; -import { authUser, getApiKey } from '@/service/utils/auth'; +import { authUser, getApiKey, getSystemOpenAiKey } from '@/service/utils/auth'; import { withNextCors } from '@/service/utils/tools'; import { getOpenAIApi } from '@/service/utils/chat/openai'; import { embeddingModel } from '@/constants/model'; @@ -39,14 +39,10 @@ export async function openaiEmbedding({ input, mustPay = false }: { userId: string; mustPay?: boolean } & Props) { - const { userOpenAiKey, systemAuthKey } = await getApiKey({ - model: OpenAiChatEnum.GPT35, - userId, - mustPay - }); + const apiKey = getSystemOpenAiKey(); // 获取 chatAPI - const chatAPI = getOpenAIApi(); + const chatAPI = getOpenAIApi(apiKey); // 把输入的内容转成向量 const result = await chatAPI @@ -57,16 +53,22 @@ export async function openaiEmbedding({ }, { timeout: 60000, - ...axiosConfig(userOpenAiKey || systemAuthKey) + ...axiosConfig(apiKey) } ) - .then((res) => ({ - tokenLen: res.data?.usage?.total_tokens || 0, - vectors: res.data.data.map((item) => item.embedding) - })); + .then((res) => { + if (!res.data?.usage?.total_tokens) { + // @ts-ignore + return Promise.reject(res.data?.error?.message || 'Embedding Error'); + } + return { + tokenLen: res.data.usage.total_tokens || 0, + vectors: res.data.data.map((item) => item.embedding) + }; + }); pushGenerateVectorBill({ - isPay: !userOpenAiKey, + isPay: mustPay, userId, text: input.join(''), tokenLen: result.tokenLen diff --git a/client/src/pages/api/openapi/v1/chat/completions.ts b/client/src/pages/api/openapi/v1/chat/completions.ts index 6d63a865d..e756756bf 100644 --- a/client/src/pages/api/openapi/v1/chat/completions.ts +++ b/client/src/pages/api/openapi/v1/chat/completions.ts @@ -14,9 +14,9 @@ import { gptMessage2ChatType, textAdaptGptResponse } from '@/utils/adapt'; import { getChatHistory } from './getHistory'; import { saveChat } from '@/pages/api/chat/saveChat'; import { sseResponse } from '@/service/utils/tools'; -import { getErrText } from '@/utils/tools'; import { type ChatCompletionRequestMessage } from 'openai'; import { Types } from 'mongoose'; +import { sensitiveCheck } from '../../text/sensitiveCheck'; export type MessageItemType = ChatCompletionRequestMessage & { _id?: string }; type FastGptWebChatProps = { @@ -175,6 +175,10 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex 2 ); + await sensitiveCheck({ + input: `${prompt.value}` + }); + // start model api. responseText and totalTokens: valid only if stream = false const { streamResponse, responseMessages, responseText, totalTokens } = await modelServiceToolMap[model.chat.chatModel].chatCompletion({ @@ -231,8 +235,7 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex tokens: totalTokens }; } catch (error) { - console.log('stream response error', error); - return {}; + return Promise.reject(error); } } else { return { @@ -301,7 +304,12 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex } catch (err: any) { res.status(500); if (step === 1) { - res.end(getErrText(err, 'Stream response error')); + sseResponse({ + res, + event: sseResponseEventEnum.error, + data: JSON.stringify(err) + }); + res.end(); } else { jsonRes(res, { code: 500, diff --git a/client/src/pages/kb/components/Test.tsx b/client/src/pages/kb/components/Test.tsx index c7aa2a937..e23becdb7 100644 --- a/client/src/pages/kb/components/Test.tsx +++ b/client/src/pages/kb/components/Test.tsx @@ -42,6 +42,12 @@ const Test = () => { pushKbTestItem(testItem); setInputText(''); setKbTestItem(testItem); + }, + onError(err) { + toast({ + title: getErrText(err), + status: 'error' + }); } }); diff --git a/client/src/service/utils/auth.ts b/client/src/service/utils/auth.ts index 82f74c490..029f17e62 100644 --- a/client/src/service/utils/auth.ts +++ b/client/src/service/utils/auth.ts @@ -163,7 +163,7 @@ export const authUser = async ({ /* random get openai api key */ export const getSystemOpenAiKey = () => { - return process.env.OPENAIKEY || ''; + return process.env.ONEAPI_KEY || process.env.OPENAIKEY || ''; }; /* 获取 api 请求的 key */ diff --git a/client/src/service/utils/chat/index.ts b/client/src/service/utils/chat/index.ts index 0daf56465..3bf6eb3ec 100644 --- a/client/src/service/utils/chat/index.ts +++ b/client/src/service/utils/chat/index.ts @@ -6,8 +6,8 @@ import { sseResponse } from '../tools'; import { OpenAiChatEnum } from '@/constants/model'; import { chatResponse, openAiStreamResponse } from './openai'; import type { NextApiResponse } from 'next'; -import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser'; import { textAdaptGptResponse } from '@/utils/adapt'; +import { parseStreamChunk } from '@/utils/adapt'; export type ChatCompletionType = { apiKey: string; @@ -185,65 +185,62 @@ export const V2_StreamResponse = async ({ model: ChatModelType; }) => { let responseContent = ''; + let error: any = null; + + const clientRes = async (data: string) => { + const { content = '' } = (() => { + try { + const json = JSON.parse(data); + const content: string = json?.choices?.[0].delta.content || ''; + error = json.error; + responseContent += content; + return { content }; + } catch (error) { + return {}; + } + })(); + + if (res.closed || error) return; + + if (data === '[DONE]') { + sseResponse({ + res, + event: sseResponseEventEnum.answer, + data: textAdaptGptResponse({ + text: null, + finish_reason: 'stop' + }) + }); + sseResponse({ + res, + event: sseResponseEventEnum.answer, + data: '[DONE]' + }); + } else { + sseResponse({ + res, + event: sseResponseEventEnum.answer, + data: textAdaptGptResponse({ + text: content + }) + }); + } + }; try { - const onParse = async (e: ParsedEvent | ReconnectInterval) => { - if (e.type !== 'event') return; - - const data = e.data; - - const { content = '' } = (() => { - try { - const json = JSON.parse(data); - const content: string = json?.choices?.[0].delta.content || ''; - responseContent += content; - return { content }; - } catch (error) {} - return {}; - })(); - - if (res.closed) return; - - if (data === '[DONE]') { - sseResponse({ - res, - event: sseResponseEventEnum.answer, - data: textAdaptGptResponse({ - text: null, - finish_reason: 'stop' - }) - }); - sseResponse({ - res, - event: sseResponseEventEnum.answer, - data: '[DONE]' - }); - } else { - sseResponse({ - res, - event: sseResponseEventEnum.answer, - data: textAdaptGptResponse({ - text: content - }) - }); - } - }; - - try { - const parser = createParser(onParse); - const decoder = new TextDecoder(); - for await (const chunk of chatResponse.data as any) { - if (res.closed) { - break; - } - parser.feed(decoder.decode(chunk, { stream: true })); - } - } catch (error) { - console.log('pipe error', error); + for await (const chunk of chatResponse.data as any) { + if (res.closed) break; + const parse = parseStreamChunk(chunk); + parse.forEach((item) => clientRes(item.data)); } } catch (error) { - console.log('stream error', error); + console.log('pipe error', error); } + + if (error) { + return Promise.reject(error); + } + // count tokens const finishMessages = prompts.concat({ obj: ChatRoleEnum.AI, diff --git a/client/src/service/utils/chat/openai.ts b/client/src/service/utils/chat/openai.ts index 70148d450..a0a389334 100644 --- a/client/src/service/utils/chat/openai.ts +++ b/client/src/service/utils/chat/openai.ts @@ -1,18 +1,20 @@ import { Configuration, OpenAIApi } from 'openai'; -import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser'; import { axiosConfig } from '../tools'; import { ChatModelMap, OpenAiChatEnum } from '@/constants/model'; import { adaptChatItem_openAI } from '@/utils/plugin/openai'; import { modelToolMap } from '@/utils/plugin'; import { ChatCompletionType, ChatContextFilter, StreamResponseType } from './index'; import { ChatRoleEnum } from '@/constants/chat'; +import { parseStreamChunk } from '@/utils/adapt'; -export const getOpenAIApi = () => - new OpenAIApi( +export const getOpenAIApi = (apiKey: string) => { + const openaiBaseUrl = process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1'; + return new OpenAIApi( new Configuration({ - basePath: process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1' + basePath: apiKey === process.env.ONEAPI_KEY ? process.env.ONEAPI_URL : openaiBaseUrl }) ); +}; /* 模型对话 */ export const chatResponse = async ({ @@ -31,7 +33,7 @@ export const chatResponse = async ({ }); const adaptMessages = adaptChatItem_openAI({ messages: filterMessages, reserveId: false }); - const chatAPI = getOpenAIApi(); + const chatAPI = getOpenAIApi(apiKey); const promptsToken = modelToolMap[model].countTokens({ messages: filterMessages @@ -80,29 +82,29 @@ export const openAiStreamResponse = async ({ try { let responseContent = ''; - const onParse = async (event: ParsedEvent | ReconnectInterval) => { - if (event.type !== 'event') return; - const data = event.data; - if (data === '[DONE]') return; - try { - const json = JSON.parse(data); - const content: string = json?.choices?.[0].delta.content || ''; - responseContent += content; + const clientRes = async (data: string) => { + const { content = '' } = (() => { + try { + const json = JSON.parse(data); + const content: string = json?.choices?.[0].delta.content || ''; + responseContent += content; + return { content }; + } catch (error) { + return {}; + } + })(); - !res.closed && content && res.write(content); - } catch (error) { - error; - } + if (data === '[DONE]') return; + + !res.closed && content && res.write(content); }; try { - const decoder = new TextDecoder(); - const parser = createParser(onParse); for await (const chunk of chatResponse.data as any) { - if (res.closed) { - break; - } - parser.feed(decoder.decode(chunk, { stream: true })); + if (res.closed) break; + + const parse = parseStreamChunk(chunk); + parse.forEach((item) => clientRes(item.data)); } } catch (error) { console.log('pipe error', error); diff --git a/client/src/service/utils/tools.ts b/client/src/service/utils/tools.ts index 7664ab488..ec9f5f7bd 100644 --- a/client/src/service/utils/tools.ts +++ b/client/src/service/utils/tools.ts @@ -34,14 +34,18 @@ export const clearCookie = (res: NextApiResponse) => { }; /* openai axios config */ -export const axiosConfig = (apikey: string) => ({ - baseURL: process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1', - httpsAgent: global.httpsAgent, - headers: { - Authorization: `Bearer ${apikey}`, - auth: process.env.OPENAI_BASE_URL_AUTH || '' - } -}); +export const axiosConfig = (apikey: string) => { + const openaiBaseUrl = process.env.OPENAI_BASE_URL || 'https://api.openai.com/v1'; + + return { + baseURL: apikey === process.env.ONEAPI_KEY ? process.env.ONEAPI_URL : openaiBaseUrl, // 此处仅对非 npm 模块有效 + httpsAgent: global.httpsAgent, + headers: { + Authorization: `Bearer ${apikey}`, + auth: process.env.OPENAI_BASE_URL_AUTH || '' + } + }; +}; export function withNextCors(handler: NextApiHandler): NextApiHandler { return async function nextApiHandlerWrappedWithNextCors( diff --git a/client/src/utils/adapt.ts b/client/src/utils/adapt.ts index e17595e0c..d87558df2 100644 --- a/client/src/utils/adapt.ts +++ b/client/src/utils/adapt.ts @@ -54,3 +54,24 @@ export const textAdaptGptResponse = ({ choices: [{ delta: text === null ? {} : { content: text }, index: 0, finish_reason }] }); }; + +const decoder = new TextDecoder(); +export const parseStreamChunk = (value: BufferSource) => { + const chunk = decoder.decode(value); + const chunkLines = chunk.split('\n\n').filter((item) => item); + const chunkResponse = chunkLines.map((item) => { + const splitEvent = item.split('\n'); + if (splitEvent.length === 2) { + return { + event: splitEvent[0].replace('event: ', ''), + data: splitEvent[1].replace('data: ', '') + }; + } + return { + event: '', + data: splitEvent[0].replace('data: ', '') + }; + }); + + return chunkResponse; +}; diff --git a/docs/deploy/fastgpt/docker-compose.yml b/docs/deploy/fastgpt/docker-compose.yml index f00204fd0..79444f518 100644 --- a/docs/deploy/fastgpt/docker-compose.yml +++ b/docs/deploy/fastgpt/docker-compose.yml @@ -70,7 +70,7 @@ services: # openai, 推荐使用 one-api 管理key - OPENAIKEY=sk-xxxxx - OPENAI_BASE_URL=https://api.openai.com/v1 - - OPENAI_BASE_URL_AUTH=可选的安全凭证 + - OPENAI_BASE_URL_AUTH=可选的安全凭证,会放到 header.auth 里 fastgpt-admin: image: registry.cn-hangzhou.aliyuncs.com/fastgpt/fastgpt-admin:latest container_name: fastgpt-admin