External dataset (#1497)

* perf: read rawText and chunk code

* perf: read raw text

* perf: read rawtext

* perf: token count

* log
This commit is contained in:
Archer
2024-05-16 11:47:53 +08:00
committed by GitHub
parent d5073f98ab
commit c6d9b15897
36 changed files with 531 additions and 267 deletions

View File

@@ -151,12 +151,12 @@ export const readFileContentFromMongo = async ({
teamId,
bucketName,
fileId,
csvFormat = false
isQAImport = false
}: {
teamId: string;
bucketName: `${BucketNameEnum}`;
fileId: string;
csvFormat?: boolean;
isQAImport?: boolean;
}): Promise<{
rawText: string;
filename: string;
@@ -198,7 +198,7 @@ export const readFileContentFromMongo = async ({
const { rawText } = await readFileRawContent({
extension,
csvFormat,
isQAImport,
teamId,
buffer: fileBuffers,
encoding,

View File

@@ -5,6 +5,7 @@ import { addHours } from 'date-fns';
import { WorkerNameEnum, runWorker } from '../../../worker/utils';
import { ReadFileResponse } from '../../../worker/file/type';
import { rawTextBackupPrefix } from '@fastgpt/global/core/dataset/read';
export const initMarkdownText = ({
teamId,
@@ -29,36 +30,44 @@ export const initMarkdownText = ({
export const readFileRawContent = async ({
extension,
csvFormat,
isQAImport,
teamId,
buffer,
encoding,
metadata
}: {
csvFormat?: boolean;
isQAImport?: boolean;
extension: string;
teamId: string;
buffer: Buffer;
encoding: string;
metadata?: Record<string, any>;
}) => {
const result = await runWorker<ReadFileResponse>(WorkerNameEnum.readFile, {
let { rawText, formatText } = await runWorker<ReadFileResponse>(WorkerNameEnum.readFile, {
extension,
csvFormat,
encoding,
buffer
});
// markdown data format
if (['md', 'html', 'docx'].includes(extension)) {
result.rawText = await initMarkdownText({
rawText = await initMarkdownText({
teamId: teamId,
md: result.rawText,
md: rawText,
metadata: metadata
});
}
return result;
if (['csv', 'xlsx'].includes(extension)) {
// qa data
if (isQAImport) {
rawText = rawText || '';
} else {
rawText = formatText || '';
}
}
return { rawText };
};
export const htmlToMarkdown = async (html?: string | null) => {

View File

@@ -77,9 +77,8 @@ export const urlsFetch = async ({
$,
selector
});
console.log('html====', html);
const md = await htmlToMarkdown(html);
console.log('html====', md);
return {
url,

View File

@@ -12,27 +12,34 @@ import { getNanoid } from '@fastgpt/global/common/string/tools';
import { addLog } from '../../system/log';
export const getTiktokenWorker = () => {
if (global.tiktokenWorker) {
return global.tiktokenWorker;
const maxWorkers = global.systemEnv?.tokenWorkers || 20;
if (!global.tiktokenWorkers) {
global.tiktokenWorkers = [];
}
if (global.tiktokenWorkers.length >= maxWorkers) {
return global.tiktokenWorkers[Math.floor(Math.random() * global.tiktokenWorkers.length)];
}
const worker = getWorker(WorkerNameEnum.countGptMessagesTokens);
const i = global.tiktokenWorkers.push({
index: global.tiktokenWorkers.length,
worker,
callbackMap: {}
});
worker.on('message', ({ id, data }: { id: string; data: number }) => {
const callback = global.tiktokenWorker?.callbackMap?.[id];
const callback = global.tiktokenWorkers[i - 1]?.callbackMap?.[id];
if (callback) {
callback?.(data);
delete global.tiktokenWorker.callbackMap[id];
delete global.tiktokenWorkers[i - 1].callbackMap[id];
}
});
global.tiktokenWorker = {
worker,
callbackMap: {}
};
return global.tiktokenWorker;
return global.tiktokenWorkers[i - 1];
};
export const countGptMessagesTokens = (
@@ -44,20 +51,29 @@ export const countGptMessagesTokens = (
const start = Date.now();
const { worker, callbackMap } = getTiktokenWorker();
const id = getNanoid();
const timer = setTimeout(() => {
resolve(0);
console.log('Count token Time out');
resolve(
messages.reduce((sum, item) => {
if (item.content) {
return sum + item.content.length * 0.5;
}
return sum;
}, 0)
);
delete callbackMap[id];
}, 300);
}, 60000);
callbackMap[id] = (data) => {
// 检测是否有内存泄漏
addLog.info(`Count token time: ${Date.now() - start}, token: ${data}`);
// console.log(process.memoryUsage());
resolve(data);
clearTimeout(timer);
// 检测是否有内存泄漏
// addLog.info(`Count token time: ${Date.now() - start}, token: ${data}`);
// console.log(process.memoryUsage());
};
worker.postMessage({

View File

@@ -0,0 +1,99 @@
import { BucketNameEnum } from '@fastgpt/global/common/file/constants';
import { DatasetSourceReadTypeEnum } from '@fastgpt/global/core/dataset/constants';
import { readFileContentFromMongo } from '../../common/file/gridfs/controller';
import { urlsFetch } from '../../common/string/cheerio';
import { rawTextBackupPrefix } from '@fastgpt/global/core/dataset/read';
import { parseCsvTable2Chunks } from './training/utils';
import { TextSplitProps, splitText2Chunks } from '@fastgpt/global/common/string/textSplitter';
import axios from 'axios';
import { readFileRawContent } from '../../common/file/read/utils';
export const readFileRawTextByUrl = async ({ teamId, url }: { teamId: string; url: string }) => {
const response = await axios({
method: 'get',
url: url,
responseType: 'arraybuffer'
});
const extension = url.split('.')?.pop()?.toLowerCase() || '';
const buffer = Buffer.from(response.data, 'binary');
const { rawText } = await readFileRawContent({
extension,
teamId,
buffer,
encoding: 'utf-8'
});
return rawText;
};
/*
fileId - local file, read from mongo
link - request
externalFile = request read
*/
export const readDatasetSourceRawText = async ({
teamId,
type,
sourceId,
isQAImport,
selector
}: {
teamId: string;
type: DatasetSourceReadTypeEnum;
sourceId: string;
isQAImport?: boolean;
selector?: string;
}): Promise<string> => {
if (type === DatasetSourceReadTypeEnum.fileLocal) {
const { rawText } = await readFileContentFromMongo({
teamId,
bucketName: BucketNameEnum.dataset,
fileId: sourceId,
isQAImport
});
return rawText;
} else if (type === DatasetSourceReadTypeEnum.link) {
const result = await urlsFetch({
urlList: [sourceId],
selector
});
return result[0]?.content || '';
} else if (type === DatasetSourceReadTypeEnum.externalFile) {
const rawText = await readFileRawTextByUrl({
teamId,
url: sourceId
});
return rawText;
}
return '';
};
export const rawText2Chunks = ({
rawText,
isQAImport,
chunkLen = 512,
...splitProps
}: {
rawText: string;
isQAImport?: boolean;
} & TextSplitProps) => {
if (isQAImport) {
const { chunks } = parseCsvTable2Chunks(rawText);
return chunks;
}
const { chunks } = splitText2Chunks({
text: rawText,
chunkLen,
...splitProps
});
return chunks.map((item) => ({
q: item,
a: ''
}));
};

View File

@@ -71,7 +71,7 @@ export const dispatchHttp468Request = async (props: HttpRequestProps): Promise<H
chatId,
responseChatItemId,
...variables,
histories: histories.slice(-10),
histories: histories?.slice(-10) || [],
...body,
...dynamicInput
};

View File

@@ -62,7 +62,10 @@ export const valueTypeFormat = (value: any, type?: WorkflowIOValueTypeEnum) => {
return JSON.stringify(value);
}
if (type === 'number') return Number(value);
if (type === 'boolean') return value === 'true' ? true : false;
if (type === 'boolean') {
if (typeof value === 'string') return value === 'true';
return Boolean(value);
}
try {
if (type === WorkflowIOValueTypeEnum.datasetQuote && !Array.isArray(value)) {
return JSON.parse(value);

View File

@@ -13,10 +13,10 @@
"decompress": "^4.2.1",
"domino-ext": "^2.1.4",
"encoding": "^0.1.13",
"fastgpt-js-tiktoken": "^1.0.12",
"file-type": "^19.0.0",
"iconv-lite": "^0.6.3",
"joplin-turndown-plugin-gfm": "^1.0.12",
"js-tiktoken": "^1.0.7",
"json5": "^2.2.3",
"jsonwebtoken": "^9.0.2",
"mammoth": "^1.6.0",

View File

@@ -20,8 +20,9 @@ declare global {
var whisperModel: WhisperModelType;
var reRankModels: ReRankModelItemType[];
var tiktokenWorker: {
var tiktokenWorkers: {
index: number;
worker: Worker;
callbackMap: Record<string, (e: number) => void>;
};
}[];
}

View File

@@ -15,40 +15,45 @@ type TokenType = {
export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise<ReadFileResponse> => {
const readPDFPage = async (doc: any, pageNo: number) => {
const page = await doc.getPage(pageNo);
const tokenizedText = await page.getTextContent();
try {
const page = await doc.getPage(pageNo);
const tokenizedText = await page.getTextContent();
const viewport = page.getViewport({ scale: 1 });
const pageHeight = viewport.height;
const headerThreshold = pageHeight * 0.95;
const footerThreshold = pageHeight * 0.05;
const viewport = page.getViewport({ scale: 1 });
const pageHeight = viewport.height;
const headerThreshold = pageHeight * 0.95;
const footerThreshold = pageHeight * 0.05;
const pageTexts: TokenType[] = tokenizedText.items.filter((token: TokenType) => {
return (
!token.transform ||
(token.transform[5] < headerThreshold && token.transform[5] > footerThreshold)
);
});
const pageTexts: TokenType[] = tokenizedText.items.filter((token: TokenType) => {
return (
!token.transform ||
(token.transform[5] < headerThreshold && token.transform[5] > footerThreshold)
);
});
// concat empty string 'hasEOL'
for (let i = 0; i < pageTexts.length; i++) {
const item = pageTexts[i];
if (item.str === '' && pageTexts[i - 1]) {
pageTexts[i - 1].hasEOL = item.hasEOL;
pageTexts.splice(i, 1);
i--;
// concat empty string 'hasEOL'
for (let i = 0; i < pageTexts.length; i++) {
const item = pageTexts[i];
if (item.str === '' && pageTexts[i - 1]) {
pageTexts[i - 1].hasEOL = item.hasEOL;
pageTexts.splice(i, 1);
i--;
}
}
page.cleanup();
return pageTexts
.map((token) => {
const paragraphEnd = token.hasEOL && /([。?!.?!\n\r]|(\r\n))$/.test(token.str);
return paragraphEnd ? `${token.str}\n` : token.str;
})
.join('');
} catch (error) {
console.log('pdf read error', error);
return '';
}
page.cleanup();
return pageTexts
.map((token) => {
const paragraphEnd = token.hasEOL && /([。?!.?!\n\r]|(\r\n))$/.test(token.str);
return paragraphEnd ? `${token.str}\n` : token.str;
})
.join('');
};
const loadingTask = pdfjs.getDocument(buffer.buffer);
@@ -58,6 +63,7 @@ export const readPdfFile = async ({ buffer }: ReadRawTextByBuffer): Promise<Read
for (let pageNo = 1; pageNo <= doc.numPages; pageNo++) {
pageTextPromises.push(readPDFPage(doc, pageNo));
}
const pageTexts = await Promise.all(pageTextPromises);
loadingTask.destroy();

View File

@@ -23,25 +23,9 @@ parentPort?.on('message', async (props: ReadRawTextProps<Uint8Array>) => {
case 'pptx':
return readPptxRawText(params);
case 'xlsx':
const xlsxResult = await readXlsxRawText(params);
if (params.csvFormat) {
return {
rawText: xlsxResult.formatText || ''
};
}
return {
rawText: xlsxResult.rawText
};
return readXlsxRawText(params);
case 'csv':
const csvResult = await readCsvRawText(params);
if (params.csvFormat) {
return {
rawText: csvResult.formatText || ''
};
}
return {
rawText: csvResult.rawText
};
return readCsvRawText(params);
default:
return Promise.reject('Only support .txt, .md, .html, .pdf, .docx, pptx, .csv, .xlsx');
}

View File

@@ -1,7 +1,6 @@
import { ReadFileByBufferParams } from '../../common/file/read/type';
export type ReadRawTextProps<T> = {
csvFormat?: boolean;
extension: string;
buffer: T;
encoding: string;

View File

@@ -1,6 +1,6 @@
/* Only the token of gpt-3.5-turbo is used */
import { Tiktoken } from 'js-tiktoken/lite';
import encodingJson from './cl100k_base.json';
import { Tiktoken } from 'fastgpt-js-tiktoken/lite';
import cl100k_base from './cl100k_base.json';
import {
ChatCompletionMessageParam,
ChatCompletionContentPart,
@@ -10,7 +10,7 @@ import {
import { ChatCompletionRequestMessageRoleEnum } from '@fastgpt/global/core/ai/constants';
import { parentPort } from 'worker_threads';
const enc = new Tiktoken(encodingJson);
const enc = new Tiktoken(cl100k_base);
/* count messages tokens */
parentPort?.on(