import type { NextApiRequest, NextApiResponse } from 'next'; import { connectToDatabase } from '@/service/mongo'; import { authUser, authApp } from '@/service/utils/auth'; import { sseErrRes, jsonRes } from '@/service/response'; import { addLog, withNextCors } from '@/service/utils/tools'; import { ChatRoleEnum, ChatSourceEnum, sseResponseEventEnum } from '@/constants/chat'; import { dispatchHistory, dispatchChatInput, dispatchChatCompletion, dispatchKBSearch, dispatchAnswer, dispatchClassifyQuestion, dispatchContentExtract, dispatchHttpRequest } from '@/service/moduleDispatch'; import type { CreateChatCompletionRequest } from 'openai'; import { gptMessage2ChatType, textAdaptGptResponse } from '@/utils/adapt'; import { getChatHistory } from './getHistory'; import { saveChat } from '@/service/utils/chat/saveChat'; import { sseResponse } from '@/service/utils/tools'; import { type ChatCompletionRequestMessage } from 'openai'; import { TaskResponseKeyEnum } from '@/constants/chat'; import { FlowModuleTypeEnum, initModuleType } from '@/constants/flow'; import { AppModuleItemType, RunningModuleItemType } from '@/types/app'; import { pushTaskBill } from '@/service/common/bill/push'; import { BillSourceEnum } from '@/constants/user'; import { ChatHistoryItemResType } from '@/types/chat'; import { UserModelSchema } from '@/types/mongoSchema'; import { SystemInputEnum } from '@/constants/app'; import { getSystemTime } from '@/utils/user'; import { authOutLinkChat } from '@/service/support/outLink/auth'; import requestIp from 'request-ip'; import { replaceVariable } from '@/utils/common/tools/text'; import { ModuleDispatchProps } from '@/types/core/modules'; export type MessageItemType = ChatCompletionRequestMessage & { dataId?: string }; type FastGptWebChatProps = { chatId?: string; // undefined: nonuse history, '': new chat, 'xxxxx': use history appId?: string; }; type FastGptShareChatProps = { shareId?: string; }; export type Props = CreateChatCompletionRequest & FastGptWebChatProps & FastGptShareChatProps & { messages: MessageItemType[]; stream?: boolean; detail?: boolean; variables: Record; }; export type ChatResponseType = { newChatId: string; quoteLen?: number; }; export default withNextCors(async function handler(req: NextApiRequest, res: NextApiResponse) { res.on('close', () => { res.end(); }); res.on('error', () => { console.log('error: ', 'request error'); res.end(); }); let { chatId, appId, shareId, stream = false, detail = false, messages = [], variables = {} } = req.body as Props; try { if (!messages) { throw new Error('Prams Error'); } if (!Array.isArray(messages)) { throw new Error('messages is not array'); } if (messages.length === 0) { throw new Error('messages is empty'); } await connectToDatabase(); let startTime = Date.now(); /* user auth */ let { // @ts-ignore responseDetail, user, userId, appId: authAppid, authType } = await (shareId ? authOutLinkChat({ shareId, ip: requestIp.getClientIp(req) }) : authUser({ req, authBalance: true })); if (!user) { throw new Error('Account is error'); } appId = appId ? appId : authAppid; if (!appId) { throw new Error('appId is empty'); } // auth app, get history const [{ app }, { history }] = await Promise.all([ authApp({ appId, userId }), getChatHistory({ chatId, appId, userId }) ]); const isOwner = !shareId && userId === String(app.userId); responseDetail = isOwner || responseDetail; const prompts = history.concat(gptMessage2ChatType(messages)); if (prompts[prompts.length - 1]?.obj === 'AI') { prompts.pop(); } // user question const prompt = prompts.pop(); if (!prompt) { throw new Error('Question is empty'); } // 创建响应流 if (stream) { res.setHeader('Content-Type', 'text/event-stream;charset=utf-8'); res.setHeader('Access-Control-Allow-Origin', '*'); res.setHeader('X-Accel-Buffering', 'no'); res.setHeader('Cache-Control', 'no-cache, no-transform'); } /* start process */ const { responseData, answerText } = await dispatchModules({ res, modules: app.modules, user, variables, params: { history: prompts, userChatInput: prompt.value }, stream, detail }); // save chat if (chatId) { await saveChat({ chatId, appId, userId, variables, isOwner, // owner update use time shareId, source: (() => { if (shareId) { return ChatSourceEnum.share; } if (authType === 'apikey') { return ChatSourceEnum.api; } return ChatSourceEnum.online; })(), content: [ prompt, { dataId: messages[messages.length - 1].dataId, obj: ChatRoleEnum.AI, value: answerText, responseData } ] }); } addLog.info(`completions running time: ${(Date.now() - startTime) / 1000}s`); if (stream) { sseResponse({ res, event: detail ? sseResponseEventEnum.answer : undefined, data: textAdaptGptResponse({ text: null, finish_reason: 'stop' }) }); sseResponse({ res, event: detail ? sseResponseEventEnum.answer : undefined, data: '[DONE]' }); if (responseDetail && detail) { sseResponse({ res, event: sseResponseEventEnum.appStreamResponse, data: JSON.stringify(responseData) }); } res.end(); } else { res.json({ ...(detail ? { responseData } : {}), id: chatId || '', model: '', usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 1 }, choices: [ { message: { role: 'assistant', content: answerText }, finish_reason: 'stop', index: 0 } ] }); } pushTaskBill({ appName: app.name, appId, userId, source: (() => { if (authType === 'apikey') return BillSourceEnum.api; if (shareId) return BillSourceEnum.shareLink; return BillSourceEnum.fastgpt; })(), response: responseData, shareId }); } catch (err: any) { if (stream) { sseErrRes(res, err); res.end(); } else { jsonRes(res, { code: 500, error: err }); } } }); /* running */ export async function dispatchModules({ res, modules, user, params = {}, variables = {}, stream = false, detail = false }: { res: NextApiResponse; modules: AppModuleItemType[]; user: UserModelSchema; params?: Record; variables?: Record; stream?: boolean; detail?: boolean; }) { variables = { ...getSystemVariable({ timezone: user.timezone }), ...variables }; const runningModules = loadModules(modules, variables); // let storeData: Record = {}; // after module used let chatResponse: ChatHistoryItemResType[] = []; // response request and save to database let chatAnswerText = ''; // AI answer let runningTime = Date.now(); function pushStore({ answerText = '', responseData }: { answerText?: string; responseData?: ChatHistoryItemResType; }) { const time = Date.now(); responseData && chatResponse.push({ ...responseData, runningTime: +((time - runningTime) / 1000).toFixed(2) }); runningTime = time; chatAnswerText += answerText; } function moduleInput( module: RunningModuleItemType, data: Record = {} ): Promise { const checkInputFinish = () => { return !module.inputs.find((item: any) => item.value === undefined); }; const updateInputValue = (key: string, value: any) => { const index = module.inputs.findIndex((item: any) => item.key === key); if (index === -1) return; module.inputs[index].value = value; }; const set = new Set(); return Promise.all( Object.entries(data).map(([key, val]: any) => { updateInputValue(key, val); if (!set.has(module.moduleId) && checkInputFinish()) { set.add(module.moduleId); // remove switch updateInputValue(SystemInputEnum.switch, undefined); return moduleRun(module); } }) ); } function moduleOutput( module: RunningModuleItemType, result: Record = {} ): Promise { pushStore(result); return Promise.all( module.outputs.map((outputItem) => { if (result[outputItem.key] === undefined) return; /* update output value */ outputItem.value = result[outputItem.key]; /* update target */ return Promise.all( outputItem.targets.map((target: any) => { // find module const targetModule = runningModules.find((item) => item.moduleId === target.moduleId); if (!targetModule) return; return moduleInput(targetModule, { [target.key]: outputItem.value }); }) ); }) ); } async function moduleRun(module: RunningModuleItemType): Promise { if (res.closed) return Promise.resolve(); if (stream && detail && module.showStatus) { responseStatus({ res, name: module.name, status: 'running' }); } // get fetch params const params: Record = {}; module.inputs.forEach((item: any) => { params[item.key] = item.value; }); const props: ModuleDispatchProps> = { res, stream, detail, variables, moduleName: module.name, outputs: module.outputs, userOpenaiAccount: user?.openaiAccount, inputs: params }; const dispatchRes = await (async () => { const callbackMap: Record = { [FlowModuleTypeEnum.historyNode]: dispatchHistory, [FlowModuleTypeEnum.questionInput]: dispatchChatInput, [FlowModuleTypeEnum.answerNode]: dispatchAnswer, [FlowModuleTypeEnum.chatNode]: dispatchChatCompletion, [FlowModuleTypeEnum.kbSearchNode]: dispatchKBSearch, [FlowModuleTypeEnum.classifyQuestion]: dispatchClassifyQuestion, [FlowModuleTypeEnum.contentExtract]: dispatchContentExtract, [FlowModuleTypeEnum.httpRequest]: dispatchHttpRequest }; if (callbackMap[module.flowType]) { return callbackMap[module.flowType](props); } return {}; })(); return moduleOutput(module, dispatchRes); } // start process width initInput const initModules = runningModules.filter((item) => initModuleType[item.flowType]); await Promise.all(initModules.map((module) => moduleInput(module, params))); return { [TaskResponseKeyEnum.answerText]: chatAnswerText, [TaskResponseKeyEnum.responseData]: chatResponse }; } /* init store modules to running modules */ function loadModules( modules: AppModuleItemType[], variables: Record ): RunningModuleItemType[] { return modules.map((module) => { return { moduleId: module.moduleId, name: module.name, flowType: module.flowType, showStatus: module.showStatus, inputs: module.inputs .filter((item) => item.connected) // filter unconnected target input .map((item) => { if (typeof item.value !== 'string') { return { key: item.key, value: item.value }; } // variables replace const replacedVal = replaceVariable(item.value, variables); return { key: item.key, value: replacedVal }; }), outputs: module.outputs.map((item) => ({ key: item.key, answer: item.key === TaskResponseKeyEnum.answerText, value: undefined, targets: item.targets })) }; }); } /* sse response modules staus */ export function responseStatus({ res, status, name }: { res: NextApiResponse; status?: 'running' | 'finish'; name?: string; }) { if (!name) return; sseResponse({ res, event: sseResponseEventEnum.moduleStatus, data: JSON.stringify({ status: 'running', name }) }); } /* get system variable */ export function getSystemVariable({ timezone }: { timezone: string }) { return { cTime: getSystemTime(timezone) }; } export const config = { api: { responseLimit: '20mb' } };