mirror of
https://github.com/labring/FastGPT.git
synced 2025-08-03 13:38:00 +00:00
perf: 响应流抽离
This commit is contained in:
@@ -1,5 +1,4 @@
|
|||||||
import type { NextApiRequest, NextApiResponse } from 'next';
|
import type { NextApiRequest, NextApiResponse } from 'next';
|
||||||
import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser';
|
|
||||||
import { connectToDatabase } from '@/service/mongo';
|
import { connectToDatabase } from '@/service/mongo';
|
||||||
import { getOpenAIApi, authChat } from '@/service/utils/chat';
|
import { getOpenAIApi, authChat } from '@/service/utils/chat';
|
||||||
import { httpsAgent, openaiChatFilter } from '@/service/utils/tools';
|
import { httpsAgent, openaiChatFilter } from '@/service/utils/tools';
|
||||||
@@ -10,6 +9,7 @@ import type { ModelSchema } from '@/types/mongoSchema';
|
|||||||
import { PassThrough } from 'stream';
|
import { PassThrough } from 'stream';
|
||||||
import { modelList } from '@/constants/model';
|
import { modelList } from '@/constants/model';
|
||||||
import { pushChatBill } from '@/service/events/pushBill';
|
import { pushChatBill } from '@/service/events/pushBill';
|
||||||
|
import { gpt35StreamResponse } from '@/service/utils/openai';
|
||||||
|
|
||||||
/* 发送提示词 */
|
/* 发送提示词 */
|
||||||
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
|
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
|
||||||
@@ -102,52 +102,15 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
|
|||||||
|
|
||||||
console.log('api response time:', `${(Date.now() - startTime) / 1000}s`);
|
console.log('api response time:', `${(Date.now() - startTime) / 1000}s`);
|
||||||
|
|
||||||
// 创建响应流
|
|
||||||
res.setHeader('Content-Type', 'text/event-stream;charset-utf-8');
|
|
||||||
res.setHeader('Access-Control-Allow-Origin', '*');
|
|
||||||
res.setHeader('X-Accel-Buffering', 'no');
|
|
||||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
|
||||||
step = 1;
|
step = 1;
|
||||||
|
|
||||||
let responseContent = '';
|
const { responseContent } = await gpt35StreamResponse({
|
||||||
stream.pipe(res);
|
res,
|
||||||
|
stream,
|
||||||
const onParse = async (event: ParsedEvent | ReconnectInterval) => {
|
chatResponse
|
||||||
if (event.type !== 'event') return;
|
});
|
||||||
const data = event.data;
|
|
||||||
if (data === '[DONE]') return;
|
|
||||||
try {
|
|
||||||
const json = JSON.parse(data);
|
|
||||||
const content: string = json?.choices?.[0].delta.content || '';
|
|
||||||
// 空内容不要。首行换行符不要
|
|
||||||
if (!content || (responseContent === '' && content === '\n')) return;
|
|
||||||
|
|
||||||
responseContent += content;
|
|
||||||
// console.log('content:', content)
|
|
||||||
!stream.destroyed && stream.push(content.replace(/\n/g, '<br/>'));
|
|
||||||
} catch (error) {
|
|
||||||
error;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
try {
|
|
||||||
for await (const chunk of chatResponse.data as any) {
|
|
||||||
if (stream.destroyed) {
|
|
||||||
// 流被中断了,直接忽略后面的内容
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
const parser = createParser(onParse);
|
|
||||||
parser.feed(decoder.decode(chunk));
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.log('pipe error', error);
|
|
||||||
}
|
|
||||||
// close stream
|
|
||||||
!stream.destroyed && stream.push(null);
|
|
||||||
stream.destroy();
|
|
||||||
|
|
||||||
const promptsContent = formatPrompts.map((item) => item.content).join('');
|
const promptsContent = formatPrompts.map((item) => item.content).join('');
|
||||||
|
|
||||||
// 只有使用平台的 key 才计费
|
// 只有使用平台的 key 才计费
|
||||||
pushChatBill({
|
pushChatBill({
|
||||||
isPay: !userApiKey,
|
isPay: !userApiKey,
|
||||||
|
@@ -14,6 +14,7 @@ import { connectRedis } from '@/service/redis';
|
|||||||
import { VecModelDataPrefix } from '@/constants/redis';
|
import { VecModelDataPrefix } from '@/constants/redis';
|
||||||
import { vectorToBuffer } from '@/utils/tools';
|
import { vectorToBuffer } from '@/utils/tools';
|
||||||
import { openaiCreateEmbedding } from '@/service/utils/openai';
|
import { openaiCreateEmbedding } from '@/service/utils/openai';
|
||||||
|
import { gpt35StreamResponse } from '@/service/utils/openai';
|
||||||
|
|
||||||
/* 发送提示词 */
|
/* 发送提示词 */
|
||||||
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
|
export default async function handler(req: NextApiRequest, res: NextApiResponse) {
|
||||||
@@ -208,49 +209,12 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
|
|||||||
|
|
||||||
console.log('api response time:', `${(Date.now() - startTime) / 1000}s`);
|
console.log('api response time:', `${(Date.now() - startTime) / 1000}s`);
|
||||||
|
|
||||||
// 创建响应流
|
|
||||||
res.setHeader('Content-Type', 'text/event-stream;charset-utf-8');
|
|
||||||
res.setHeader('Access-Control-Allow-Origin', '*');
|
|
||||||
res.setHeader('X-Accel-Buffering', 'no');
|
|
||||||
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
|
||||||
step = 1;
|
step = 1;
|
||||||
|
const { responseContent } = await gpt35StreamResponse({
|
||||||
let responseContent = '';
|
res,
|
||||||
stream.pipe(res);
|
stream,
|
||||||
|
chatResponse
|
||||||
const onParse = async (event: ParsedEvent | ReconnectInterval) => {
|
});
|
||||||
if (event.type !== 'event') return;
|
|
||||||
const data = event.data;
|
|
||||||
if (data === '[DONE]') return;
|
|
||||||
try {
|
|
||||||
const json = JSON.parse(data);
|
|
||||||
const content: string = json?.choices?.[0].delta.content || '';
|
|
||||||
if (!content || (responseContent === '' && content === '\n')) return;
|
|
||||||
|
|
||||||
responseContent += content;
|
|
||||||
// console.log('content:', content)
|
|
||||||
!stream.destroyed && stream.push(content.replace(/\n/g, '<br/>'));
|
|
||||||
} catch (error) {
|
|
||||||
error;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
const decoder = new TextDecoder();
|
|
||||||
try {
|
|
||||||
for await (const chunk of chatResponse.data as any) {
|
|
||||||
if (stream.destroyed) {
|
|
||||||
// 流被中断了,直接忽略后面的内容
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
const parser = createParser(onParse);
|
|
||||||
parser.feed(decoder.decode(chunk));
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.log('pipe error', error);
|
|
||||||
}
|
|
||||||
// close stream
|
|
||||||
!stream.destroyed && stream.push(null);
|
|
||||||
stream.destroy();
|
|
||||||
|
|
||||||
const promptsContent = formatPrompts.map((item) => item.content).join('');
|
const promptsContent = formatPrompts.map((item) => item.content).join('');
|
||||||
// 只有使用平台的 key 才计费
|
// 只有使用平台的 key 才计费
|
@@ -1,3 +1,6 @@
|
|||||||
|
import type { NextApiResponse } from 'next';
|
||||||
|
import type { PassThrough } from 'stream';
|
||||||
|
import { createParser, ParsedEvent, ReconnectInterval } from 'eventsource-parser';
|
||||||
import { getOpenAIApi } from '@/service/utils/chat';
|
import { getOpenAIApi } from '@/service/utils/chat';
|
||||||
import { httpsAgent } from './tools';
|
import { httpsAgent } from './tools';
|
||||||
import { User } from '../models/user';
|
import { User } from '../models/user';
|
||||||
@@ -102,3 +105,66 @@ export const openaiCreateEmbedding = async ({
|
|||||||
chatAPI
|
chatAPI
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/* gpt35 响应 */
|
||||||
|
export const gpt35StreamResponse = ({
|
||||||
|
res,
|
||||||
|
stream,
|
||||||
|
chatResponse
|
||||||
|
}: {
|
||||||
|
res: NextApiResponse;
|
||||||
|
stream: PassThrough;
|
||||||
|
chatResponse: any;
|
||||||
|
}) =>
|
||||||
|
new Promise<{ responseContent: string }>(async (resolve, reject) => {
|
||||||
|
try {
|
||||||
|
// 创建响应流
|
||||||
|
res.setHeader('Content-Type', 'text/event-stream;charset-utf-8');
|
||||||
|
res.setHeader('Access-Control-Allow-Origin', '*');
|
||||||
|
res.setHeader('X-Accel-Buffering', 'no');
|
||||||
|
res.setHeader('Cache-Control', 'no-cache, no-transform');
|
||||||
|
|
||||||
|
let responseContent = '';
|
||||||
|
stream.pipe(res);
|
||||||
|
|
||||||
|
const onParse = async (event: ParsedEvent | ReconnectInterval) => {
|
||||||
|
if (event.type !== 'event') return;
|
||||||
|
const data = event.data;
|
||||||
|
if (data === '[DONE]') return;
|
||||||
|
try {
|
||||||
|
const json = JSON.parse(data);
|
||||||
|
const content: string = json?.choices?.[0].delta.content || '';
|
||||||
|
// console.log('content:', content);
|
||||||
|
if (!content || (responseContent === '' && content === '\n')) return;
|
||||||
|
|
||||||
|
responseContent += content;
|
||||||
|
!stream.destroyed && stream.push(content.replace(/\n/g, '<br/>'));
|
||||||
|
} catch (error) {
|
||||||
|
error;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
const decoder = new TextDecoder();
|
||||||
|
try {
|
||||||
|
for await (const chunk of chatResponse.data as any) {
|
||||||
|
if (stream.destroyed) {
|
||||||
|
// 流被中断了,直接忽略后面的内容
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
const parser = createParser(onParse);
|
||||||
|
parser.feed(decoder.decode(chunk));
|
||||||
|
}
|
||||||
|
} catch (error) {
|
||||||
|
console.log('pipe error', error);
|
||||||
|
}
|
||||||
|
// close stream
|
||||||
|
!stream.destroyed && stream.push(null);
|
||||||
|
stream.destroy();
|
||||||
|
|
||||||
|
resolve({
|
||||||
|
responseContent
|
||||||
|
});
|
||||||
|
} catch (error) {
|
||||||
|
reject(error);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Reference in New Issue
Block a user