perf: completion dispatch

This commit is contained in:
archer
2023-07-23 14:07:59 +08:00
parent 8151350d9f
commit 6027a966d2
33 changed files with 1797 additions and 2181 deletions

View File

@@ -2,21 +2,29 @@ import type { NextApiRequest, NextApiResponse } from 'next';
import { connectToDatabase } from '@/service/mongo';
import { authUser, authApp, authShareChat } from '@/service/utils/auth';
import { sseErrRes, jsonRes } from '@/service/response';
import { ChatRoleEnum, sseResponseEventEnum } from '@/constants/chat';
import { withNextCors } from '@/service/utils/tools';
import { ChatRoleEnum, sseResponseEventEnum } from '@/constants/chat';
import {
dispatchHistory,
dispatchChatInput,
dispatchChatCompletion,
dispatchKBSearch,
dispatchAnswer,
dispatchClassifyQuestion
} from '@/service/moduleDispatch';
import type { CreateChatCompletionRequest } from 'openai';
import { gptMessage2ChatType, textAdaptGptResponse } from '@/utils/adapt';
import { gptMessage2ChatType } from '@/utils/adapt';
import { getChatHistory } from './getHistory';
import { saveChat } from '@/pages/api/chat/saveChat';
import { sseResponse } from '@/service/utils/tools';
import { type ChatCompletionRequestMessage } from 'openai';
import { TaskResponseKeyEnum, AppModuleItemTypeEnum } from '@/constants/app';
import { TaskResponseKeyEnum } from '@/constants/chat';
import { FlowModuleTypeEnum, initModuleType } from '@/constants/flow';
import { Types } from 'mongoose';
import { moduleFetch } from '@/service/api/request';
import { AppModuleItemType, RunningModuleItemType } from '@/types/app';
import { FlowInputItemTypeEnum } from '@/constants/flow';
import { finishTaskBill, createTaskBill, delTaskBill } from '@/service/events/pushBill';
import { pushTaskBill } from '@/service/events/pushBill';
import { BillSourceEnum } from '@/constants/user';
import { ChatHistoryItemResType } from '@/types/chat';
export type MessageItemType = ChatCompletionRequestMessage & { _id?: string };
type FastGptWebChatProps = {
@@ -49,8 +57,6 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
let { chatId, appId, shareId, stream = false, messages = [], variables = {} } = req.body as Props;
let billId = '';
try {
if (!messages) {
throw new Error('Prams Error');
@@ -105,13 +111,6 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
res.setHeader('newChatId', String(newChatId));
}
billId = await createTaskBill({
userId,
appName: app.name,
appId,
source: authType === 'apikey' ? BillSourceEnum.api : BillSourceEnum.fastgpt
});
/* start process */
const { responseData, answerText } = await dispatchModules({
res,
@@ -121,9 +120,9 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
history: prompts,
userChatInput: prompt.value
},
stream,
billId
stream
});
console.log(responseData, '===', answerText);
if (!answerText) {
throw new Error('回复内容为空,可能模块编排出现问题');
@@ -169,10 +168,7 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
res.end();
} else {
res.json({
data: {
newChatId,
...responseData
},
responseData,
id: chatId || '',
model: '',
usage: { prompt_tokens: 0, completion_tokens: 0, total_tokens: 0 },
@@ -186,14 +182,14 @@ export default withNextCors(async function handler(req: NextApiRequest, res: Nex
});
}
// bill
finishTaskBill({
billId,
shareId
pushTaskBill({
appName: app.name,
appId,
userId,
source: authType === 'apikey' ? BillSourceEnum.api : BillSourceEnum.fastgpt,
response: responseData
});
} catch (err: any) {
delTaskBill(billId);
if (stream) {
sseErrRes(res, err);
res.end();
@@ -211,35 +207,29 @@ export async function dispatchModules({
modules,
params = {},
variables = {},
stream = false,
billId
stream = false
}: {
res: NextApiResponse;
modules: AppModuleItemType[];
params?: Record<string, any>;
variables?: Record<string, any>;
billId: string;
stream?: boolean;
}) {
const runningModules = loadModules(modules, variables);
// let storeData: Record<string, any> = {}; // after module used
let chatResponse: Record<string, any> = {}; // response request and save to database
let answerText = ''; // AI answer
let chatResponse: ChatHistoryItemResType[] = []; // response request and save to database
let chatAnswerText = ''; // AI answer
function pushStore({
answer,
responseData = {}
answerText = '',
responseData
}: {
answer?: string;
responseData?: Record<string, any>;
answerText?: string;
responseData?: ChatHistoryItemResType;
}) {
chatResponse = {
...chatResponse,
...responseData
};
answerText += answer;
responseData && chatResponse.push(responseData);
chatAnswerText += answerText;
}
function moduleInput(
module: RunningModuleItemType,
@@ -292,63 +282,45 @@ export async function dispatchModules({
}
async function moduleRun(module: RunningModuleItemType): Promise<any> {
if (res.closed) return Promise.resolve();
console.log('run=========', module.type, module.url);
console.log('run=========', module.flowType);
// direct answer
if (module.type === AppModuleItemTypeEnum.answer) {
const text =
module.inputs.find((item) => item.key === TaskResponseKeyEnum.answerText)?.value || '';
pushStore({
answer: text
});
return StreamAnswer({
res,
stream,
text: text
});
}
// get fetch params
const params: Record<string, any> = {};
module.inputs.forEach((item: any) => {
params[item.key] = item.value;
});
const props: Record<string, any> = {
res,
stream,
...params
};
if (module.type === AppModuleItemTypeEnum.switch) {
return moduleOutput(module, switchResponse(module));
}
if (
(module.type === AppModuleItemTypeEnum.http ||
module.type === AppModuleItemTypeEnum.initInput) &&
module.url
) {
// get fetch params
const params: Record<string, any> = {};
module.inputs.forEach((item: any) => {
params[item.key] = item.value;
});
const data = {
stream,
billId,
...params
const dispatchRes = await (async () => {
const callbackMap: Record<string, Function> = {
[FlowModuleTypeEnum.historyNode]: dispatchHistory,
[FlowModuleTypeEnum.questionInput]: dispatchChatInput,
[FlowModuleTypeEnum.answerNode]: dispatchAnswer,
[FlowModuleTypeEnum.chatNode]: dispatchChatCompletion,
[FlowModuleTypeEnum.kbSearchNode]: dispatchKBSearch,
[FlowModuleTypeEnum.classifyQuestion]: dispatchClassifyQuestion
};
if (callbackMap[module.flowType]) {
return callbackMap[module.flowType](props);
}
return {};
})();
// response data
const fetchRes = await moduleFetch({
res,
url: module.url,
data
});
return moduleOutput(module, fetchRes);
}
return moduleOutput(module, dispatchRes);
}
// start process width initInput
const initModules = runningModules.filter(
(item) => item.type === AppModuleItemTypeEnum.initInput
);
const initModules = runningModules.filter((item) => initModuleType[item.flowType]);
await Promise.all(initModules.map((module) => moduleInput(module, params)));
return {
responseData: chatResponse,
answerText
[TaskResponseKeyEnum.answerText]: chatAnswerText,
[TaskResponseKeyEnum.responseData]: chatResponse
};
}
@@ -359,10 +331,9 @@ function loadModules(
return modules.map((module) => {
return {
moduleId: module.moduleId,
type: module.type,
url: module.url,
flowType: module.flowType,
inputs: module.inputs
.filter((item) => item.type !== FlowInputItemTypeEnum.target || item.connected) // filter unconnected target input
.filter((item) => item.connected) // filter unconnected target input
.map((item) => {
if (typeof item.value !== 'string') {
return {
@@ -385,38 +356,9 @@ function loadModules(
outputs: module.outputs.map((item) => ({
key: item.key,
answer: item.key === TaskResponseKeyEnum.answerText,
response: item.response,
value: undefined,
targets: item.targets
}))
};
});
}
function StreamAnswer({
res,
stream = false,
text = ''
}: {
res: NextApiResponse;
stream?: boolean;
text?: string;
}) {
if (stream && text) {
return sseResponse({
res,
event: sseResponseEventEnum.answer,
data: textAdaptGptResponse({
text: text.replace(/\\n/g, '\n')
})
});
}
return text;
}
function switchResponse(module: RunningModuleItemType) {
const val = module?.inputs?.[0]?.value;
if (val) {
return { true: 1 };
}
return { false: 1 };
}