Add SSE controller; fix share page login failed (#330)

This commit is contained in:
Archer
2023-09-20 16:34:32 +08:00
committed by GitHub
parent 0d94db4331
commit 7e0deb29e0
13 changed files with 234 additions and 138 deletions

View File

@@ -1,6 +1,6 @@
{
"name": "fastgpt",
"version": "4.4.3",
"version": "4.4.4",
"private": false,
"scripts": {
"dev": "next dev",

View File

@@ -148,7 +148,7 @@ const ChatBox = (
onDelMessage
}: {
feedbackType?: `${FeedbackTypeEnum}`;
showMarkIcon?: boolean;
showMarkIcon?: boolean; // admin mark dataset
showVoiceIcon?: boolean;
showEmptyIntro?: boolean;
chatId?: string;
@@ -676,7 +676,11 @@ const ChatBox = (
<>
<Flex w={'100%'} alignItems={'flex-end'}>
<ChatAvatar src={appAvatar} type={'AI'} />
<Flex {...controlContainerStyle} ml={3}>
<Flex
{...controlContainerStyle}
ml={3}
display={index === chatHistory.length - 1 && isChatting ? 'none' : 'flex'}
>
<MyTooltip label={'复制'}>
<MyIcon
{...controlIconStyle}
@@ -984,6 +988,8 @@ const ChatBox = (
}}
/>
)}
{showMarkIcon && (
<>
{/* select one dataset to insert markData */}
<SelectDataset
isOpen={!!adminMarkData && !adminMarkData.kbId}
@@ -991,6 +997,7 @@ const ChatBox = (
// @ts-ignore
onSuccess={(kbId) => setAdminMarkData((state) => ({ ...state, kbId }))}
/>
{/* edit markData modal */}
{adminMarkData && adminMarkData.kbId && (
<InputDataModal
@@ -1048,6 +1055,8 @@ const ChatBox = (
}}
/>
)}
</>
)}
</Flex>
);
};

View File

@@ -88,8 +88,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
export const config = {
api: {
bodyParser: {
sizeLimit: '20mb'
}
responseLimit: '20mb'
}
};

View File

@@ -101,8 +101,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse)
export const config = {
api: {
bodyParser: {
sizeLimit: '10mb'
}
responseLimit: '10mb'
}
};

View File

@@ -7,6 +7,7 @@ import { UpdateFileProps } from '@/api/core/dataset/file.d';
import { Types } from 'mongoose';
import { PgClient } from '@/service/pg';
import { PgDatasetTableName } from '@/constants/plugin';
import { addLog } from '@/service/utils/tools';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -59,6 +60,7 @@ async function updateDatasetSource(data: { fileId: string; userId: string; name?
]
});
} catch (error) {
addLog.error(`Update dataset source error`, error);
setTimeout(() => {
updateDatasetSource(data);
}, 2000);

View File

@@ -173,8 +173,6 @@ export async function pushDataToKb({
export const config = {
api: {
bodyParser: {
sizeLimit: '12mb'
}
responseLimit: '12mb'
}
};

View File

@@ -477,8 +477,6 @@ export function getSystemVariable({ timezone }: { timezone: string }) {
export const config = {
api: {
bodyParser: {
sizeLimit: '20mb'
}
responseLimit: '20mb'
}
};

View File

@@ -5,7 +5,9 @@ import { authUser } from '@/service/utils/auth';
import { PgDatasetTableName } from '@/constants/plugin';
import { findAllChildrenIds } from '../delete';
import QueryStream from 'pg-query-stream';
import Papa from 'papaparse';
import { PgClient } from '@/service/pg';
import { addLog } from '@/service/utils/tools';
import { responseWriteController } from '@/service/common/stream';
export default async function handler(req: NextApiRequest, res: NextApiResponse<any>) {
try {
@@ -24,7 +26,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
const exportIds = [kbId, ...(await findAllChildrenIds(kbId))];
const thirtyMinutesAgo = new Date(
const limitMinutesAgo = new Date(
Date.now() - (global.feConfigs?.limit?.exportLimitMinutes || 0) * 60 * 1000
);
@@ -34,7 +36,7 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
_id: userId,
$or: [
{ 'limit.exportKbTime': { $exists: false } },
{ 'limit.exportKbTime': { $lte: thirtyMinutesAgo } }
{ 'limit.exportKbTime': { $lte: limitMinutesAgo } }
]
},
'_id limit'
@@ -45,6 +47,19 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
throw new Error(`上次导出未到 ${minutes},每 ${minutes}仅可导出一次。`);
}
const { rows } = await PgClient.query(
`SELECT count(id) FROM ${PgDatasetTableName} where user_id='${userId}' AND kb_id IN (${exportIds
.map((id) => `'${id}'`)
.join(',')})`
);
const total = rows?.[0]?.count || 0;
addLog.info(`export datasets: ${userId}`, { total });
if (total > 100000) {
throw new Error('数据量超出 10 万,无法导出');
}
// connect pg
global.pgClient.connect((err, client, done) => {
if (err) {
@@ -52,6 +67,8 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
res.end('Error connecting to database');
return;
}
console.log('export data');
// create pg select stream
const query = new QueryStream(
`SELECT q, a, source FROM ${PgDatasetTableName} where user_id='${userId}' AND kb_id IN (${exportIds
@@ -65,11 +82,19 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
res.write('index,content,source');
// parse data every row
stream.on('data', (row: { q: string; a: string; source?: string }) => {
const csv = Papa.unparse([row], { header: false });
res.write(`\n${csv}`);
const write = responseWriteController({
res,
readStream: stream
});
// parse data every row
stream.on('data', ({ q, a, source }: { q: string; a: string; source?: string }) => {
if (res.closed) {
return stream.destroy();
}
write(`\n"${q}","${a || ''}","${source || ''}"`);
});
// finish
stream.on('end', async () => {
try {
// update export time
@@ -98,8 +123,6 @@ export default async function handler(req: NextApiRequest, res: NextApiResponse<
export const config = {
api: {
bodyParser: {
sizeLimit: '200mb'
}
responseLimit: '100mb'
}
};

View File

@@ -12,11 +12,13 @@ import MyIcon from '@/components/Icon';
import CloseIcon from '@/components/Icon/close';
import DeleteIcon, { hoverDeleteStyles } from '@/components/Icon/delete';
import MyTooltip from '@/components/MyTooltip';
import { QuestionOutlineIcon } from '@chakra-ui/icons';
import { QuestionOutlineIcon, InfoOutlineIcon } from '@chakra-ui/icons';
import { TrainingModeEnum } from '@/constants/plugin';
import FileSelect, { type FileItemType } from './FileSelect';
import { useRouter } from 'next/router';
import { updateDatasetFile } from '@/api/core/dataset/file';
import { Prompt_AgentQA } from '@/prompts/core/agent';
import { replaceVariable } from '@/utils/common/tools/text';
const fileExtension = '.txt, .doc, .docx, .pdf, .md';
@@ -52,6 +54,12 @@ const QAImport = ({ kbId }: { kbId: string }) => {
content: `该任务无法终止!导入后会自动调用大模型生成问答对,会有一些细节丢失,请确认!如果余额不足,未完成的任务会被暂停。`
});
const previewQAPrompt = useMemo(() => {
return replaceVariable(Prompt_AgentQA.prompt, {
theme: prompt || Prompt_AgentQA.defaultTheme
});
}, [prompt]);
const { mutate: onclickUpload, isLoading: uploading } = useMutation({
mutationFn: async () => {
const chunks = files.map((file) => file.chunks).flat();
@@ -74,7 +82,7 @@ const QAImport = ({ kbId }: { kbId: string }) => {
kbId,
data: chunks.slice(i, i + step),
mode: TrainingModeEnum.qa,
prompt: prompt || '下面是一段长文本'
prompt: previewQAPrompt
});
success += insertLen;
@@ -202,21 +210,19 @@ const QAImport = ({ kbId }: { kbId: string }) => {
<Box py={5}>
<Box mb={2}>
QA {' '}
<MyTooltip
label={`可输入关于文件内容的范围介绍,例如:\n1. Laf 的介绍\n2. xxx的简历\n最终会补全为: 关于{输入的内容}`}
forceShow
>
<QuestionOutlineIcon ml={1} />
<MyTooltip label={previewQAPrompt} forceShow>
<InfoOutlineIcon ml={1} />
</MyTooltip>
</Box>
<Flex alignItems={'center'} fontSize={'sm'}>
<Box mr={2}></Box>
<Box mr={2}></Box>
<Input
fontSize={'sm'}
flex={1}
placeholder={'Laf 云函数的介绍'}
placeholder={Prompt_AgentQA.defaultTheme}
bg={'myWhite.500'}
defaultValue={prompt}
onBlur={(e) => (e.target.value ? setPrompt(`关于"${e.target.value}"`) : '')}
onChange={(e) => setPrompt(e.target.value || '')}
/>
</Flex>
</Box>

View File

@@ -0,0 +1,16 @@
export const Prompt_AgentQA = {
prompt: `我会给你一段文本,{{theme}},学习它们,并整理学习成果,要求为:
1. 提出最多 25 个问题。
2. 给出每个问题的答案。
3. 答案要详细完整,答案可以包含普通文字、链接、代码、表格、公示、媒体链接等 markdown 元素。
4. 按格式返回多个问题和答案:
Q1: 问题。
A1: 答案。
Q2:
A2:
……
我的文本:"""{{text}}"""`,
defaultTheme: '它们可能包含多个主题内容'
};

View File

@@ -0,0 +1,39 @@
import type { NextApiResponse } from 'next';
export function responseWriteController({
res,
readStream
}: {
res: NextApiResponse;
readStream: any;
}) {
res.on('drain', () => {
readStream.resume();
});
return (text: string) => {
const writeResult = res.write(text);
if (!writeResult) {
readStream.pause();
}
};
}
export function responseWrite({
res,
write,
event,
data
}: {
res?: NextApiResponse;
write?: (text: string) => void;
event?: string;
data: string;
}) {
const Write = write || res?.write;
if (!Write) return;
event && Write(`event: ${event}\n`);
Write(`data: ${data}\n\n`);
}

View File

@@ -11,6 +11,8 @@ import { gptMessage2ChatType } from '@/utils/adapt';
import { addLog } from '../utils/tools';
import { splitText2Chunks } from '@/utils/file';
import { countMessagesTokens } from '@/utils/common/tiktoken';
import { replaceVariable } from '@/utils/common/tools/text';
import { Prompt_AgentQA } from '@/prompts/core/agent';
const reduceQueue = () => {
global.qaQueueLen = global.qaQueueLen > 0 ? global.qaQueueLen - 1 : 0;
@@ -62,25 +64,18 @@ export async function generateQA(): Promise<any> {
// 请求 chatgpt 获取回答
const response = await Promise.all(
[data.q].map((text) => {
const modelTokenLimit = global.qaModel.maxToken || 16000;
const messages: ChatCompletionRequestMessage[] = [
{
role: 'system',
content: `我会给你发送一段长文本,${
data.prompt ? `${data.prompt}` : ''
}请学习它,并用 markdown 格式给出 25 个问题和答案,问题可以多样化、自由扩展;答案要详细、解读到位,答案包含普通文本、链接、代码、表格、公示、媒体链接等。按下面 QA 问答格式返回:
Q1:
A1:
Q2:
A2:
……`
},
{
role: 'user',
content: text
content: data.prompt
? replaceVariable(data.prompt, { text })
: replaceVariable(Prompt_AgentQA.prompt, {
theme: Prompt_AgentQA.defaultTheme,
text
})
}
];
const modelTokenLimit = global.qaModel.maxToken || 16000;
const promptsToken = countMessagesTokens({
messages: gptMessage2ChatType(messages)
});
@@ -90,7 +85,7 @@ A2:
.createChatCompletion(
{
model: global.qaModel.model,
temperature: 0.8,
temperature: 0.01,
messages,
stream: false,
max_tokens: maxToken
@@ -197,6 +192,7 @@ A2:
* 检查文本是否按格式返回
*/
function formatSplitText(text: string) {
text = text.replace(/\\n/g, '\n'); // 将换行符替换为空格
const regex = /Q\d+:(\s*)(.*)(\s*)A\d+:(\s*)([\s\S]*?)(?=Q|$)/g; // 匹配Q和A的正则表达式
const matches = text.matchAll(regex); // 获取所有匹配到的结果
@@ -207,8 +203,8 @@ function formatSplitText(text: string) {
if (q && a) {
// 如果Q和A都存在就将其添加到结果中
result.push({
q,
a: a.trim().replace(/\n\s*/g, '\n')
q: `${q}\n${a.trim().replace(/\n\s*/g, '\n')}`,
a: ''
});
}
}

View File

@@ -1,5 +1,4 @@
import type { NextApiResponse } from 'next';
import { sseResponse } from '@/service/utils/tools';
import { ChatContextFilter } from '@/service/common/tiktoken';
import type { ChatItemType, QuoteItemType } from '@/types/chat';
import type { ChatHistoryItemResType } from '@/types/chat';
@@ -21,6 +20,9 @@ import type { AIChatProps } from '@/types/core/aiChat';
import { replaceVariable } from '@/utils/common/tools/text';
import { FlowModuleTypeEnum } from '@/constants/flow';
import { ModuleDispatchProps } from '@/types/core/modules';
import { Readable } from 'stream';
import { responseWrite, responseWriteController } from '@/service/common/stream';
import { addLog } from '@/service/utils/tools';
export type ChatProps = ModuleDispatchProps<
AIChatProps & {
@@ -324,7 +326,7 @@ function targetResponse({
outputs.find((output) => output.key === TaskResponseKeyEnum.answerText)?.targets || [];
if (targets.length === 0) return;
sseResponse({
responseWrite({
res,
event: detail ? sseResponseEventEnum.answer : undefined,
data: textAdaptGptResponse({
@@ -342,42 +344,53 @@ async function streamResponse({
detail: boolean;
response: any;
}) {
return new Promise<{ answer: string }>((resolve, reject) => {
const stream = response.data as Readable;
let answer = '';
let error: any = null;
const parseData = new SSEParseData();
try {
for await (const chunk of response.data as any) {
if (res.closed) break;
const parse = parseStreamChunk(chunk);
const write = responseWriteController({
res,
readStream: stream
});
stream.on('data', (data) => {
if (res.closed) {
stream.destroy();
return resolve({ answer });
}
const parse = parseStreamChunk(data);
parse.forEach((item) => {
const { data } = parseData.parse(item);
if (!data || data === '[DONE]') return;
const content: string = data?.choices?.[0]?.delta?.content || '';
error = data.error;
if (data.error) {
addLog.error(`SSE response`, data.error);
} else {
answer += content;
sseResponse({
res,
responseWrite({
write,
event: detail ? sseResponseEventEnum.answer : undefined,
data: textAdaptGptResponse({
text: content
})
});
}
});
});
stream.on('end', () => {
resolve({ answer });
});
stream.on('close', () => {
resolve({ answer });
});
stream.on('error', (err) => {
reject(err);
});
});
}
} catch (error) {
console.log('pipe error', error);
}
if (error) {
return Promise.reject(error);
}
return {
answer
};
}
function getHistoryPreview(completeMessages: ChatItemType[]) {