Perf: read file woker (#1337)

* perf: read file worker

* fix: Http node url input

* fix: htm2md

* fix: html2md

* fix: ts

* perf: Problem classification increases the matching order

* feat: tool response answer
This commit is contained in:
Archer
2024-04-30 18:12:20 +08:00
committed by GitHub
parent 1529c1e991
commit b5f0ac3e1d
35 changed files with 413 additions and 398 deletions

View File

@@ -64,5 +64,14 @@ export const ToolModule: FlowNodeTemplateType = {
Input_Template_History,
Input_Template_UserChatInput
],
outputs: []
outputs: [
{
id: NodeOutputKeyEnum.answerText,
key: NodeOutputKeyEnum.answerText,
label: 'core.module.output.label.Ai response content',
description: 'core.module.output.description.Ai response content',
valueType: WorkflowIOValueTypeEnum.string,
type: FlowNodeOutputTypeEnum.static
}
]
};

View File

@@ -6,7 +6,6 @@ import { DatasetFileSchema } from '@fastgpt/global/core/dataset/type';
import { MongoFileSchema } from './schema';
import { detectFileEncoding } from '@fastgpt/global/common/file/tools';
import { CommonErrEnum } from '@fastgpt/global/common/error/code/common';
import { ReadFileByBufferParams } from '../read/type';
import { MongoRwaTextBuffer } from '../../buffer/rawText/schema';
import { readFileRawContent } from '../read/utils';
import { PassThrough } from 'stream';
@@ -197,19 +196,15 @@ export const readFileContentFromMongo = async ({
});
})();
const params: ReadFileByBufferParams = {
const { rawText } = await readFileRawContent({
extension,
csvFormat,
teamId,
buffer: fileBuffers,
encoding,
metadata: {
relatedId: fileId
}
};
const { rawText } = await readFileRawContent({
extension,
csvFormat,
params
});
if (rawText.trim()) {

View File

@@ -1,23 +0,0 @@
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import { initMarkdownText } from './utils';
import { htmlToMarkdown } from '../../string/markdown';
import { readFileRawText } from './rawText';
export const readHtmlRawText = async (
params: ReadFileByBufferParams
): Promise<ReadFileResponse> => {
const { teamId, metadata } = params;
const { rawText: html } = readFileRawText(params);
const md = await htmlToMarkdown(html);
const rawText = await initMarkdownText({
teamId,
md,
metadata
});
return {
rawText
};
};

View File

@@ -1,18 +0,0 @@
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import { initMarkdownText } from './utils';
import { readFileRawText } from './rawText';
export const readMarkdown = async (params: ReadFileByBufferParams): Promise<ReadFileResponse> => {
const { teamId, metadata } = params;
const { rawText: md } = readFileRawText(params);
const rawText = await initMarkdownText({
teamId,
md,
metadata
});
return {
rawText
};
};

View File

@@ -1,12 +0,0 @@
export type ReadFileByBufferParams = {
teamId: string;
buffer: Buffer;
encoding: string;
metadata?: Record<string, any>;
};
export type ReadFileResponse = {
rawText: string;
formatText?: string;
metadata?: Record<string, any>;
};

View File

@@ -1,16 +1,10 @@
import { markdownProcess } from '@fastgpt/global/common/string/markdown';
import { markdownProcess, simpleMarkdownText } from '@fastgpt/global/common/string/markdown';
import { uploadMongoImg } from '../image/controller';
import { MongoImageTypeEnum } from '@fastgpt/global/common/file/image/constants';
import { addHours } from 'date-fns';
import { ReadFileByBufferParams } from './type';
import { readFileRawText } from '../read/rawText';
import { readMarkdown } from '../read/markdown';
import { readHtmlRawText } from '../read/html';
import { readPdfFile } from '../read/pdf';
import { readWordFile } from '../read/word';
import { readCsvRawText } from '../read/csv';
import { readPptxRawText } from '../read/pptx';
import { readXlsxRawText } from '../read/xlsx';
import { WorkerNameEnum, runWorker } from '../../../worker/utils';
import { ReadFileResponse } from '../../../worker/file/type';
export const initMarkdownText = ({
teamId,
@@ -36,46 +30,39 @@ export const initMarkdownText = ({
export const readFileRawContent = async ({
extension,
csvFormat,
params
teamId,
buffer,
encoding,
metadata
}: {
csvFormat?: boolean;
extension: string;
params: ReadFileByBufferParams;
teamId: string;
buffer: Buffer;
encoding: string;
metadata?: Record<string, any>;
}) => {
switch (extension) {
case 'txt':
return readFileRawText(params);
case 'md':
return readMarkdown(params);
case 'html':
return readHtmlRawText(params);
case 'pdf':
return readPdfFile(params);
case 'docx':
return readWordFile(params);
case 'pptx':
return readPptxRawText(params);
case 'xlsx':
const xlsxResult = await readXlsxRawText(params);
if (csvFormat) {
return {
rawText: xlsxResult.formatText || ''
};
}
return {
rawText: xlsxResult.rawText
};
case 'csv':
const csvResult = await readCsvRawText(params);
if (csvFormat) {
return {
rawText: csvResult.formatText || ''
};
}
return {
rawText: csvResult.rawText
};
default:
return Promise.reject('Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx');
const result = await runWorker<ReadFileResponse>(WorkerNameEnum.readFile, {
extension,
csvFormat,
encoding,
buffer
});
// markdown data format
if (['md', 'html', 'docx'].includes(extension)) {
result.rawText = await initMarkdownText({
teamId: teamId,
md: result.rawText,
metadata: metadata
});
}
return result;
};
export const htmlToMarkdown = async (html?: string | null) => {
const md = await runWorker<string>(WorkerNameEnum.htmlStr2Md, { html: html || '' });
return simpleMarkdownText(md);
};

View File

@@ -1,35 +0,0 @@
import mammoth from 'mammoth';
import { htmlToMarkdown } from '../../string/markdown';
import { ReadFileByBufferParams, ReadFileResponse } from './type';
import { initMarkdownText } from './utils';
/**
* read docx to markdown
*/
export const readWordFile = async ({
teamId,
buffer,
metadata = {}
}: ReadFileByBufferParams): Promise<ReadFileResponse> => {
try {
const { value: html } = await mammoth.convertToHtml({
buffer
});
const md = await htmlToMarkdown(html);
const rawText = await initMarkdownText({
teamId,
md,
metadata
});
return {
rawText,
metadata: {}
};
} catch (error) {
console.log('error doc read:', error);
return Promise.reject('Can not read doc file, please convert to PDF');
}
};

View File

@@ -1,7 +1,7 @@
import { UrlFetchParams, UrlFetchResponse } from '@fastgpt/global/common/file/api';
import * as cheerio from 'cheerio';
import axios from 'axios';
import { htmlToMarkdown } from './markdown';
import { htmlToMarkdown } from '../file/read/utils';
export const cheerioToHtml = ({
fetchUrl,
@@ -77,7 +77,9 @@ export const urlsFetch = async ({
$,
selector
});
console.log('html====', html);
const md = await htmlToMarkdown(html);
console.log('html====', md);
return {
url,

View File

@@ -1,9 +0,0 @@
import { simpleMarkdownText } from '@fastgpt/global/common/string/markdown';
import { WorkerNameEnum, runWorker } from '../../worker/utils';
/* html string to markdown */
export const htmlToMarkdown = async (html?: string | null) => {
const md = await runWorker<string>(WorkerNameEnum.htmlStr2Md, { html: html || '' });
return simpleMarkdownText(md);
};

View File

@@ -23,7 +23,7 @@ export async function initPg() {
`);
await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${PgDatasetTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 64);`
`CREATE INDEX CONCURRENTLY IF NOT EXISTS vector_index ON ${PgDatasetTableName} USING hnsw (vector vector_ip_ops) WITH (m = 32, ef_construction = 100);`
);
await PgClient.query(
`CREATE INDEX CONCURRENTLY IF NOT EXISTS team_dataset_collection_index ON ${PgDatasetTableName} USING btree(team_id, dataset_id, collection_id);`

View File

@@ -131,7 +131,9 @@ const completions = async ({
console.log(answer, '----');
const id =
agents.find((item) => answer.includes(item.key) || answer.includes(item.value))?.key || '';
agents.find((item) => answer.includes(item.key))?.key ||
agents.find((item) => answer.includes(item.value))?.key ||
'';
return {
tokens: await countMessagesTokens(messages),

View File

@@ -23,7 +23,9 @@ import { runToolWithPromptCall } from './promptCall';
import { replaceVariable } from '@fastgpt/global/common/string/tools';
import { Prompt_Tool_Call } from './constants';
type Response = DispatchNodeResultType<{}>;
type Response = DispatchNodeResultType<{
[NodeOutputKeyEnum.answerText]: string;
}>;
export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<Response> => {
const {
@@ -129,6 +131,10 @@ export const dispatchRunTools = async (props: DispatchToolModuleProps): Promise<
const flatUsages = dispatchFlowResponse.map((item) => item.flowUsages).flat();
return {
[NodeOutputKeyEnum.answerText]: assistantResponses
.filter((item) => item.text?.content)
.map((item) => item.text?.content || '')
.join(''),
[DispatchNodeResponseKeyEnum.assistantResponses]: assistantResponses,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
totalPoints: totalPointsUsage,

View File

@@ -142,10 +142,8 @@ export async function dispatchWorkFlow({
}
if (assistantResponses) {
chatAssistantResponse = chatAssistantResponse.concat(assistantResponses);
}
// save assistant text response
if (answerText) {
} else if (answerText) {
// save assistant text response
const isResponseAnswerText =
inputs.find((item) => item.key === NodeInputKeyEnum.aiChatIsResponseText)?.value ?? true;
if (isResponseAnswerText) {

View File

@@ -19,24 +19,24 @@ export const dispatchAnswer = (props: Record<string, any>): AnswerResponse => {
res,
detail,
stream,
node: { name },
params: { text = '' }
} = props as AnswerProps;
const formatText = typeof text === 'string' ? text : JSON.stringify(text, null, 2);
const responseText = `\n${formatText}`;
if (res && stream) {
responseWrite({
res,
event: detail ? SseResponseEventEnum.fastAnswer : undefined,
data: textAdaptGptResponse({
text: `\n${formatText}`
text: responseText
})
});
}
return {
[NodeOutputKeyEnum.answerText]: formatText,
[NodeOutputKeyEnum.answerText]: responseText,
[DispatchNodeResponseKeyEnum.nodeResponse]: {
textOutput: formatText
}

View File

@@ -1,9 +1,9 @@
import Papa from 'papaparse';
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
import { readFileRawText } from './rawText';
// 加载源文件内容
export const readCsvRawText = async (params: ReadFileByBufferParams): Promise<ReadFileResponse> => {
export const readCsvRawText = async (params: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const { rawText } = readFileRawText(params);
const csvArr = Papa.parse(rawText).data as string[][];

View File

@@ -0,0 +1,23 @@
import mammoth from 'mammoth';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
import { html2md } from '../../htmlStr2Md/utils';
/**
* read docx to markdown
*/
export const readDocsFile = async ({ buffer }: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
try {
const { value: html } = await mammoth.convertToHtml({
buffer
});
const rawText = html2md(html);
return {
rawText
};
} catch (error) {
console.log('error doc read:', error);
return Promise.reject('Can not read doc file, please convert to PDF');
}
};

View File

@@ -0,0 +1,13 @@
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
import { readFileRawText } from './rawText';
import { html2md } from '../../htmlStr2Md/utils';
export const readHtmlRawText = async (params: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const { rawText: html } = readFileRawText(params);
const rawText = html2md(html);
return {
rawText
};
};

View File

@@ -1,7 +1,7 @@
import * as pdfjs from 'pdfjs-dist/legacy/build/pdf.mjs';
// @ts-ignore
import('pdfjs-dist/legacy/build/pdf.worker.min.mjs');
import { ReadFileByBufferParams, ReadFileResponse } from './type';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
type TokenType = {
str: string;
@@ -13,9 +13,7 @@ type TokenType = {
hasEOL: boolean;
};
export const readPdfFile = async ({
buffer
}: ReadFileByBufferParams): Promise<ReadFileResponse> => {
export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const readPDFPage = async (doc: any, pageNo: number) => {
const page = await doc.getPage(pageNo);
const tokenizedText = await page.getTextContent();
@@ -65,7 +63,6 @@ export const readPdfFile = async ({
loadingTask.destroy();
return {
rawText: pageTexts.join(''),
metadata: {}
rawText: pageTexts.join('')
};
};

View File

@@ -1,11 +1,11 @@
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
// import { parseOfficeAsync } from 'officeparser';
import { parseOffice } from './parseOffice';
import { parseOffice } from '../parseOffice';
export const readPptxRawText = async ({
buffer,
encoding
}: ReadFileByBufferParams): Promise<ReadFileResponse> => {
}: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const result = await parseOffice({
buffer,
encoding: encoding as BufferEncoding,

View File

@@ -1,5 +1,5 @@
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import iconv from 'iconv-lite';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
const rawEncodingList = [
'ascii',
@@ -17,7 +17,7 @@ const rawEncodingList = [
];
// 加载源文件内容
export const readFileRawText = ({ buffer, encoding }: ReadFileByBufferParams): ReadFileResponse => {
export const readFileRawText = ({ buffer, encoding }: ReadRawTextByBuffer): ReadFileResponse => {
const content = rawEncodingList.includes(encoding)
? buffer.toString(encoding as BufferEncoding)
: iconv.decode(buffer, 'gbk');

View File

@@ -1,10 +1,10 @@
import { ReadFileByBufferParams, ReadFileResponse } from './type.d';
import { ReadRawTextByBuffer, ReadFileResponse } from '../type';
import xlsx from 'node-xlsx';
import Papa from 'papaparse';
export const readXlsxRawText = async ({
buffer
}: ReadFileByBufferParams): Promise<ReadFileResponse> => {
}: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const result = xlsx.parse(buffer, {
skipHidden: false,
defval: ''

View File

@@ -2,8 +2,8 @@ import { getNanoid } from '@fastgpt/global/common/string/tools';
import fs from 'fs';
import decompress from 'decompress';
import { DOMParser } from '@xmldom/xmldom';
import { clearDirFiles } from '../utils';
import { addLog } from '../../system/log';
import { clearDirFiles } from '../../common/file/utils';
import { addLog } from '../../common/system/log';
const DEFAULTDECOMPRESSSUBLOCATION = '/tmp';

View File

@@ -0,0 +1,71 @@
import { parentPort } from 'worker_threads';
import { readFileRawText } from './extension/rawText';
import { ReadRawTextByBuffer, ReadRawTextProps } from './type';
import { readHtmlRawText } from './extension/html';
import { readPdfFile } from './extension/pdf';
import { readDocsFile } from './extension/docx';
import { readPptxRawText } from './extension/pptx';
import { readXlsxRawText } from './extension/xlsx';
import { readCsvRawText } from './extension/csv';
parentPort?.on('message', async (props: ReadRawTextProps<Uint8Array>) => {
const readFileRawContent = async (params: ReadRawTextByBuffer) => {
switch (params.extension) {
case 'txt':
case 'md':
return readFileRawText(params);
case 'html':
return readHtmlRawText(params);
case 'pdf':
return readPdfFile(params);
case 'docx':
return readDocsFile(params);
case 'pptx':
return readPptxRawText(params);
case 'xlsx':
const xlsxResult = await readXlsxRawText(params);
if (params.csvFormat) {
return {
rawText: xlsxResult.formatText || ''
};
}
return {
rawText: xlsxResult.rawText
};
case 'csv':
const csvResult = await readCsvRawText(params);
if (params.csvFormat) {
return {
rawText: csvResult.formatText || ''
};
}
return {
rawText: csvResult.rawText
};
default:
return Promise.reject('Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx');
}
};
// params.buffer: Uint8Array -> buffer
const buffer = Buffer.from(props.buffer);
const newProps: ReadRawTextByBuffer = {
...props,
buffer
};
try {
parentPort?.postMessage({
type: 'success',
data: await readFileRawContent(newProps)
});
} catch (error) {
console.log(error);
parentPort?.postMessage({
type: 'error',
data: error
});
}
global?.close?.();
});

15
packages/service/worker/file/type.d.ts vendored Normal file
View File

@@ -0,0 +1,15 @@
import { ReadFileByBufferParams } from '../../common/file/read/type';
export type ReadRawTextProps<T> = {
csvFormat?: boolean;
extension: string;
buffer: T;
encoding: string;
};
export type ReadRawTextByBuffer = ReadRawTextProps<Buffer>;
export type ReadFileResponse = {
rawText: string;
formatText?: string;
};

View File

@@ -1,60 +0,0 @@
import { parentPort } from 'worker_threads';
import TurndownService from 'turndown';
//@ts-ignore
import domino from 'domino';
//@ts-ignore
import * as turndownPluginGfm from 'joplin-turndown-plugin-gfm';
const turndownService = new TurndownService({
headingStyle: 'atx',
bulletListMarker: '-',
codeBlockStyle: 'fenced',
fence: '```',
emDelimiter: '_',
strongDelimiter: '**',
linkStyle: 'inlined',
linkReferenceStyle: 'full'
});
parentPort?.on('message', (params: { html: string }) => {
const html2md = (html: string): string => {
try {
const window = domino.createWindow(html);
const document = window.document;
turndownService.remove(['i', 'script', 'iframe']);
turndownService.addRule('codeBlock', {
filter: 'pre',
replacement(_, node) {
const content = node.textContent?.trim() || '';
// @ts-ignore
const codeName = node?._attrsByQName?.class?.data?.trim() || '';
return `\n\`\`\`${codeName}\n${content}\n\`\`\`\n`;
}
});
turndownService.use(turndownPluginGfm.gfm);
// @ts-ignore
return turndownService.turndown(document);
} catch (error) {
return '';
}
};
try {
const md = html2md(params?.html || '');
parentPort?.postMessage({
type: 'success',
data: md
});
} catch (error) {
parentPort?.postMessage({
type: 'error',
data: error
});
}
global?.close?.();
});

View File

@@ -0,0 +1,20 @@
import { parentPort } from 'worker_threads';
import { html2md } from './utils';
parentPort?.on('message', (params: { html: string }) => {
try {
const md = html2md(params?.html || '');
parentPort?.postMessage({
type: 'success',
data: md
});
} catch (error) {
parentPort?.postMessage({
type: 'error',
data: error
});
}
global?.close?.();
});

View File

@@ -0,0 +1,40 @@
import TurndownService from 'turndown';
const domino = require('domino-ext');
const turndownPluginGfm = require('joplin-turndown-plugin-gfm');
export const html2md = (html: string): string => {
const turndownService = new TurndownService({
headingStyle: 'atx',
bulletListMarker: '-',
codeBlockStyle: 'fenced',
fence: '```',
emDelimiter: '_',
strongDelimiter: '**',
linkStyle: 'inlined',
linkReferenceStyle: 'full'
});
try {
const window = domino.createWindow(html);
const document = window.document;
turndownService.remove(['i', 'script', 'iframe']);
turndownService.addRule('codeBlock', {
filter: 'pre',
replacement(_, node) {
const content = node.textContent?.trim() || '';
// @ts-ignore
const codeName = node?._attrsByQName?.class?.data?.trim() || '';
return `\n\`\`\`${codeName}\n${content}\n\`\`\`\n`;
}
});
turndownService.use(turndownPluginGfm.gfm);
return turndownService.turndown(document);
} catch (error) {
console.log('html 2 markdown error', error);
return '';
}
};

View File

@@ -2,6 +2,7 @@ import { Worker } from 'worker_threads';
import path from 'path';
export enum WorkerNameEnum {
readFile = 'readFile',
htmlStr2Md = 'htmlStr2Md',
countGptMessagesTokens = 'countGptMessagesTokens'
}

View File

@@ -37,7 +37,7 @@ export function useScrollPagination<
const [data, setData] = useState<TData['list']>([]);
const [isLoading, { setTrue, setFalse }] = useBoolean(false);
const [list] = useVirtualList(data, {
const [list] = useVirtualList<TData['list'][0]>(data, {
containerTarget: containerRef,
wrapperTarget: wrapperRef,
itemHeight,